Skip to main content
Writing a recipe typically requires you to use a training algorithm, as covered in the previous page. Harmony allows you to create your own algorithms through convenient methods that give you access to models for operations such as backward or optim step. You can mix and match these methods to create your own algorithms, or copy and modify our supported algorithms in adaptive_harmony.common. In this page, we assume reasonable familiarity with reinforcement learning with LLMs.

Training primitives

TrainingModel

This is the main class through which you interact with model weights. Load models explains in detail how to a model, but to recap:
# given that you have your RecipeContext ctx
model_builder = Model("gemma-3-12b").to_builder(ctx)
model = await model_builder.tp(tp).spawn_train("policy", max_seq_len)
Choose the tp parameter (Tensor Parallelism) to be the smallest possible while still fitting in your GPUs. Choose the max_seq_len parameter to be larger than your expected rollout size in tokens, while still fitting in your GPUs.

Generating text

Once a TrainingModel has been loaded, you can use it straightforwardly to make async inference calls. The input and outputs are StringThread objects, corresponding to chats. Details about creation methods are given in the thread page.
thread = StringThread(turns=[('user', "Hey, what's up?")])
response_thread = await model.generate(thread)
# convenient method to access the completion text
completion = response_thread.last_content()

Generating tokens

When training with RL, you need the raw tokens to be able to compute log-probabilities and losses. Harmony provides tokenized versions of threads called adaptive_harmony.TokenizedThread, as well as utils for tokenization and detokenization:
thread = StringThread(turns=[('user', "Hey, what's up?")])
tokenized_thread = await model.tokenize_thread(thread)
tokenized_response = await model.generate_tokens(tokenized_thread)
response = await model.detokenize_thread(tokenized_response)

Logprobs

Get model logprobs like so:
logprobs = await model.logprobs_per_token()

Training: backward

For training we have implemented losses of the most common algorithms (cross-entropy for SFT, PPO, GRPO, DPO…) inside Harmony. You can do a backward step for each of these losses by applying the corresponding trainer function on the TrainingModel, for instance:
model.train_grpo(
    sample,  # TokenizedThread
    logprobs,
    ref_logprobs,
    [advantage] * len(sample.logprobs),
    clip_range,
    kl_beta,
)
The list of training methods for RL policies is:
  • train_ppo
  • train_grpo
  • train_gspo
  • train_dpo
  • train_tangents
We refer the reader to harmony_client.harmony_client for definition of the parameters for these methods. Note that these methods only compute gradient contribution for the given samples, they do not change model parameters (this is performed by calling optim_step). The train_tangents method is special in that it allows you to pass arbitrary losses, allowing you to implement any RL algorithm not covered in our libraries. See the Bring your own loss functions section at the end of this file for instructions on how to use it.

Training: optim step

Once you have computed the backward pass for all samples in your batch, you need to perform a gradient descent step. We use the AdamW optimizer under the hood.
await model.optim_step(
    learning_rate,  # compute it with your lr scheduler
    wd,  # weight decay
    max_grad_norm,
)

Putting it all together: simplified GRPO walkthrough

We will see how all these building blocks interact by implementing GRPO, one of the most common RL methods for LLMs. Please refer to the original paper for an in-depth explanation of the algorithm. For simplicity, we only illustrate the single-turn variant of GRPO here. Please take a look at adaptive_harmony.common.env_grpo for our multi-turn variant.

Init: defining useful variables

We will need to use the following objects in the GRPO class:
class GRPO:
def __init__(self, ...):
    # DataSet is a helper iterator over StringThreads
    self.dataset = DataSet(dataset, allow_looping=True, seed=data_seed)
    self.model = model
    self.grader = grader
    self.scoring_fn = grader.score_float_value
    self.completions_per_sample = completions_per_sample  # GRPO creates a few completions for each prompt
    self.lr_schedule = lr_scheduler or CosineScheduler(lr)  # built-in scheduler
    self.max_grad_norm = max_grad_norm
    self.prompts_per_batch = samples_per_batch // completions_per_sample
    self.samples_per_mini_batch = samples_per_mini_batch
    # GRPO hyperparameters
    self.clip_range = clip_range
    self.kl_beta = kl_beta
    self.weight_decay = weight_decay
    self.mini_epochs_per_batch = mini_epochs_per_batch
And before running the data generation, we initialize the reference model to the untrained one:
# this creates an InferenceModel with the weights of self.model
self.model_ref = await self.model.clone_inf()

Generate data

The first phase in the algorithm is to generate self.completions_per_sample completions from a single prompt. We use a helper dataclass for sample collection:
@dataclass
class Sample:
    sample: TokenizedThread
    logprobs: list[float]
    ref_logprobs: list[float]
    advantage: float
    score: float
    kl_div: list[float]
and we proceed like so, creating several completions per prompt and grading them:
# generation method for one prompt
async def gen_data(self, sample: StringThread) -> list[Sample]:
    # map the generate_tokens function on completion_per_sample copies of sample
    # async_map_batch samples items from the iterator until batch_size elements, with retries on errors
    all_samples = await async_map_batch(
        self.model.generate_tokens, repeat_thread(sample), batch_size=self.completions_per_sample
    )
    # detokenize all for grading
    string_samples = await async_map(self.model.detokenize_thread, all_samples)
    # apply grader
    all_scores = np.array(await async_map(self.scoring_fn, string_samples), dtype=np.float32)

    # GRPO advantages
    advantages: FloatArray = all_scores - all_scores.mean()
    advantages /= advantages.std() + 1e-8

    # get all logprobs for the current and reference model
    # async_map processes all samples concurrently given the provided coroutine
    logprobs = await async_map(self.model.logprobs_per_token, all_samples)
    ref_logprobs = await async_map(self.model_ref.logprobs_per_token, all_samples)

    # compute a KL-divergence estimate
    kl = [
        (np.array(lp, dtype=np.float32) - np.array(ref_lp, dtype=np.float32)).tolist()
        for lp, ref_lp in zip(logprobs, ref_logprobs)
    ]

    # collect samples
    samples = []
    for i in range(len(logprobs)):
        samples.append(
            Sample(
                sample=all_samples[i],
                logprobs=logprobs[i],
                ref_logprobs=ref_logprobs[i],
                advantage=advantages[i],
                score=all_scores[i],
                kl_div=kl[i],
            )
        )
    return samples

Run the algorithm

Once the data generation part is written, the entire algorithm is quite straightforward.
async def run(self):

    while self.training_completion_percentage < 1.0:
        # generate training samples
        data = await async_map_batch(
            self.gen_data,
            self.dataset,
            self.prompts_per_batch
        )

        # set learning rate
        current_lr = self.lr_schedule(self.training_completion_percentage)
        
        # flatten samples to mix them
        flattened_data = sum([inner_list for inner_list in data], start=[])
        # shuffles the dataset and gets minibatches of the specified size
        minibatches = get_minibatches(
            flattened_data,
            self.samples_per_mini_batch,
            self.mini_epochs_per_batch
        )
        for idx, mini_batch in enumerate(minibatches):
            # train_sample is a wrapper around train_grpo
            await async_map(self.train_sample, mini_batch)
            await self.model.optim_step(
                current_lr,
                wd=self.weight_decay,
                max_grad_norm=self.max_grad_norm,
            )
If you look at our actual code, you will see some added complexity from checkpointing, callbacks and logging. That’s it! Pretty simple, isn’t it?

Bring your own loss functions

We also offer a way to train with arbitrary loss functions with train_tangents, by specifying gradient values (tangents) at each token. There are two options:
  • If you have a closed-form solution of the gradient of your loss with respect to the model logprobs, you can simply compute this value with your numerical library of choice and pass it to train_tangents.
  • Otherwise, you can also use autodiff software to compute the gradient of your loss function with respect to the logprobs.
For this second option, imagine you have access to your logprobs as well as your loss loss_fn written in torch. You would do the following, for a given sample (TokenizedThread):
# get gradients with torch
logprobs_th = torch.Tensor(logprobs, requires_grad=True)
loss = loss_fn(logprobs_th, *other_args)
loss.backward()
tangents = logprobs_th.grad

# get tokens, weights
tokens, image_patches, weights = await model.serialize_tokenized_thread(sample)
# each input token tries to predict the next target one
inp_tokens = tokens[:-1]
tgt_tokens = tokens[1:]
weights = np.array(weights[1:])

# expand tangents to full length using weights mask
full_tangents = torch.zeros_like(weights, dtype=np.float32)
full_tangents[weights > 0] = tangents

# backward through model
await model.train_tangents(inp_tokens, image_patches, full_tangents.tolist(), tgt_tokens)
While slightly more technical, this option offers full customizability of your training algorithm.