> ## Documentation Index
> Fetch the complete documentation index at: https://docs.adaptive-ml.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Asynchronous utilities

> Helper functions for concurrent processing in recipes

Adaptive Harmony provides common utilities for efficient concurrent processing of data. Use these functions to parallelize model inference, grading, and other async operations in your recipes.

### `async_map`

Process all items concurrently with automatic progress tracking:

```python theme={null}
from adaptive_harmony.core.utils import async_map

async def grade_completion(thread: StringThread) -> float:
    return await grader.grade(thread)

# Process all threads in parallel
scores = await async_map(grade_completion, threads)
```

All tasks run concurrently. Progress is tracked automatically.

**Parameters:**

* `f` — Async function to apply
* `data` — Sequence of items
* `max_concurrent_samples` — Optional `int` cap on in-flight tasks. Backed by an `asyncio.Semaphore`. Use this to avoid overwhelming a downstream model or API.
* `stage_notifier` — Optional `StageNotifier`. When set, progress is reported to the platform as each sample finishes, so the job UI shows live progress.

```python theme={null}
results = await async_map(
    grader.score_float_value,
    samples,
    max_concurrent_samples=8,
    stage_notifier=ctx.job.stage_notifier("Grading"),
)
```

### `async_map_fallible`

Same as `async_map` but silently skips failures instead of crashing.
Failed samples are excluded from results.

```python theme={null}
from adaptive_harmony.core.utils import async_map_fallible

# Some threads may fail (e.g., too long for model context)
completions = await async_map_fallible(model.generate, threads)
```

Use when you expect some samples to fail and want to continue processing the rest. Accepts the same `max_concurrent_samples` and `stage_notifier` as `async_map`.

Pass `return_indices=True` to recover which inputs survived. The result type changes to `list[tuple[int, T]]` where the `int` is the index in the original `data` sequence:

```python theme={null}
indexed = await async_map_fallible(model.generate, threads, return_indices=True)
for idx, completion in indexed:
    print(f"thread {idx} succeeded")
```

### async\_map\_batch

Process an iterator in batches with automatic retry on failure:

```python theme={null}
from adaptive_harmony.core.utils import async_map_batch

# Process batches of 50, retry failed samples up to 50% failure rate
results = await async_map_batch(
    f=model.generate,
    data=iter(threads),
    batch_size=50,
    max_failure_fraction=0.5
)
```

**Parameters:**

* `f` - Async function to apply
* `data` - Iterator (not list) of items
* `batch_size` - Number of items per batch
* `max_failure_fraction` - Max fraction of failures before raising exception (default 0.5)

**Behavior:**

* Processes `batch_size` items concurrently
* If a sample fails, pulls next item from iterator and retries
* Fails if more than `max_failure_fraction * batch_size` samples fail
* Results are not ordered

Use when:

* Working with iterators
* Want automatic retry with fresh samples on failure (as in training, where batch size must remain constant)
* Don't need to preserve ordering
