Skip to main content
Adaptive Harmony provides common utilities for efficient concurrent processing of data. Use these functions to parallelize model inference, grading, and other async operations in your recipes.

async_map

Process all items concurrently with automatic progress tracking:
from adaptive_harmony.core.utils import async_map

async def grade_completion(thread: StringThread) -> float:
    return await grader.grade(thread)

# Process all threads in parallel
scores = await async_map(grade_completion, threads)
All tasks run concurrently. Progress is tracked automatically.

async_map_fallible

Same as async_map but silently skips failures instead of crashing. Failed samples are excluded from results
from adaptive_harmony.core.utils import async_map_fallible

# Some threads may fail (e.g., too long for model context)
completions = await async_map_fallible(model.generate, threads)
Use when you expect some samples to fail and want to continue processing the rest.

batch_process_fallible

Process items in controlled batches with failure handling:
from adaptive_harmony.core.utils import batch_process_fallible

# Process 100 threads at a time
results = await batch_process_fallible(
    fn=model.generate,
    data=threads,
    batch_size=100,
    stage_notifier=ctx.job.stage_notifier("Inference")
)
Returns list[tuple[int, T]] where the int is the original index in the input data. Use this to track which samples succeeded. When to use:
  • Large datasets where running all at once would exceed memory
  • Want to report progress in batches
  • Need to preserve sample indices

async_map_batch

Process an iterator in batches with automatic retry on failure:
from adaptive_harmony.core.utils import async_map_batch

# Process batches of 50, retry failed samples up to 50% failure rate
results = await async_map_batch(
    f=model.generate,
    data=iter(threads),
    batch_size=50,
    max_failure_fraction=0.5
)
Parameters:
  • f - Async function to apply
  • data - Iterator (not list) of items
  • batch_size - Number of items per batch
  • max_failure_fraction - Max fraction of failures before raising exception (default 0.5)
Behavior:
  • Processes batch_size items concurrently
  • If a sample fails, pulls next item from iterator and retries
  • Fails if more than max_failure_fraction * batch_size samples fail
  • Results are not ordered
Use when:
  • Working with iterators
  • Want automatic retry with fresh samples on failure (as in training, where batch size must remain constant)
  • Don’t need to preserve ordering