Skip to main content

Documentation Index

Fetch the complete documentation index at: https://docs.adaptive-ml.com/llms.txt

Use this file to discover all available pages before exploring further.

Adaptive Harmony provides common utilities for efficient concurrent processing of data. Use these functions to parallelize model inference, grading, and other async operations in your recipes.

async_map

Process all items concurrently with automatic progress tracking:
from adaptive_harmony.core.utils import async_map

async def grade_completion(thread: StringThread) -> float:
    return await grader.grade(thread)

# Process all threads in parallel
scores = await async_map(grade_completion, threads)
All tasks run concurrently. Progress is tracked automatically. Parameters:
  • f — Async function to apply
  • data — Sequence of items
  • max_concurrent_samples — Optional int cap on in-flight tasks. Backed by an asyncio.Semaphore. Use this to avoid overwhelming a downstream model or API.
  • stage_notifier — Optional StageNotifier. When set, progress is reported to the platform as each sample finishes, so the job UI shows live progress.
results = await async_map(
    grader.score_float_value,
    samples,
    max_concurrent_samples=8,
    stage_notifier=ctx.job.stage_notifier("Grading"),
)

async_map_fallible

Same as async_map but silently skips failures instead of crashing. Failed samples are excluded from results.
from adaptive_harmony.core.utils import async_map_fallible

# Some threads may fail (e.g., too long for model context)
completions = await async_map_fallible(model.generate, threads)
Use when you expect some samples to fail and want to continue processing the rest. Accepts the same max_concurrent_samples and stage_notifier as async_map. Pass return_indices=True to recover which inputs survived. The result type changes to list[tuple[int, T]] where the int is the index in the original data sequence:
indexed = await async_map_fallible(model.generate, threads, return_indices=True)
for idx, completion in indexed:
    print(f"thread {idx} succeeded")

async_map_batch

Process an iterator in batches with automatic retry on failure:
from adaptive_harmony.core.utils import async_map_batch

# Process batches of 50, retry failed samples up to 50% failure rate
results = await async_map_batch(
    f=model.generate,
    data=iter(threads),
    batch_size=50,
    max_failure_fraction=0.5
)
Parameters:
  • f - Async function to apply
  • data - Iterator (not list) of items
  • batch_size - Number of items per batch
  • max_failure_fraction - Max fraction of failures before raising exception (default 0.5)
Behavior:
  • Processes batch_size items concurrently
  • If a sample fails, pulls next item from iterator and retries
  • Fails if more than max_failure_fraction * batch_size samples fail
  • Results are not ordered
Use when:
  • Working with iterators
  • Want automatic retry with fresh samples on failure (as in training, where batch size must remain constant)
  • Don’t need to preserve ordering