过程监督(Process Supervision)融入到 GRPO (Group Relative Policy Optimization)
下面演示如何把“过程监督(Process Supervision)”的思想融入到 GRPO (Group Relative Policy Optimization) 中,从而对每个输出的中间步骤逐一打分、计算相对优势。本文的示例代码与 grpo_train.py
(来源见下文)类似,核心区别在于对“中间步骤”进行奖励评估并将其累计到对应 token,而不是只在回答最终完成时打分。
1. 背景:什么是过程监督(Process Supervision)?
在 LLM 的强化学习 (RL) 训练中,我们可以根据不同的“奖励时间点”来划分:
- Outcome Supervision: 只对最后的完成输出给一个整体奖励。之前的
grpo_train.py
即是这种做法。 - Process Supervision: 在生成完成输出的过程中,每一步或每个重要阶段都可以有一个“中间奖励(或惩罚)”,这样模型会及时地知道哪些中间推理步骤更优,从而更精细地调整策略。
对于数学推理、代码生成等场景,如果我们能获取“过程”中的正确/错误信息,那么过程监督往往会收敛得更好、更快。
2. 核心思路:对每个中间步骤打分,再把奖励“回填”到后续 token
在下方示例里,我们做了一个极简版本的 GRPO + 过程监督:
- 同一 Prompt 下采样 G 条回答:每条回答可能包含多个步骤(或多行拆开)。
- 把回答分成若干步骤:例如碰到某些分隔符时就视为一个step;
- 调用奖励函数:对每一步骤打一个分
r(step_j)
; - 回填(token-level):对于第 j 步的奖励,要赋给从该步骤对应的起始 token到下一步开始 token前的那些 tokens;这样后续的 tokens 也会“继承”该步的优劣信息。
- 组内做相对比较:跟 outcome supervision 类似,把 G 条回答中所有步骤的分数合在一起做 group-wise 标准化,得到相对优势。
- 计算损失:跟常规 PPO/GRPO 相似,不过“优势”现在是按步骤累加。
请注意,这只是一个示例,实际工程中,如何判定“一个步骤”可能需要事先插入特殊分割符,或正则去找“Step N: ”之类的标签等。
3. 示例代码
为直观起见,我们先给出一个与 GRPOTrainer
类似的 PSGRPOTrainer
(Process-Supervision GRPO Trainer,请参考笔者的另一篇博客:TRL里面GRPOTrainer中grpo_train.py文件详解)示例文件。你可以把它当成 pseudo code 或 demo,因为很多实现细节(比如如何准确分割 step、如何加载 reward model)需要你在项目中自行完成。
import torch
import torch.nn.functional as F
from torch import nn
from transformers import Trainer
import math
class PSGRPOTrainer(Trainer):
"""
A simple example of GRPO with Process Supervision, by inheriting from Trainer.
Similar structure to `grpo_train.py`, but modifies `_prepare_inputs` to do step-level scoring.
"""
def __init__(self, *args, reward_model=None, num_generations=4, beta=0.02, **kwargs):
super().__init__(*args, **kwargs)
self.reward_model = reward_model # 过程监督用的RM
self.num_generations = num_generations
self.beta = beta
# 其他初始化:参考模型、max_prompt_length 等等在此省略
def _prepare_inputs(self, inputs):
"""
1. 对每条prompt采样G条输出
2. 对每条输出分步骤打分
3. 计算组内相对优势(此处演示Process-level)
"""
device = self.args.device
# Step 1: 取出 prompt
prompts = [example["prompt"] for example in inputs]
# 用 tokenizer 编码 prompt,这里省略
# prompt_ids, prompt_mask = ...
# Step 2: 对每个prompt一次性生成多条回答
# 下面是伪码:
# completion_ids = []
# for each prompt in batch:
# for g in range(self.num_generations):
# out = self.model.generate(prompt_ids, ...)
# completion_ids.append(out)
# completion_ids => shape: [B * G, seq_len]
# 省略: 需要类似 grpo_train.py 的 EOS masking, 参考模型 logprob 计算等
# ...
# ----下面只演示 “分步骤打分 + 累计奖励” 部分----
# Step 3: 分割回答成若干step并打分
# 这是个极简示例:假设回答里用特殊符号 "STEP:" 标识中间过程
# 需要把 tokens decode -> text, split -> steps
# 也可以在token层面做,但此处为直观简化
# decode 到文本
completions_text = ["示例回答1: STEP: 中间推理. STEP: 最终结论", # for B*G lines
"示例回答2: ...",
"..."]
# 我们打算对每一条回答的 step 做打分
# 先把 group reshape
# group_size = self.num_generations
# B = len(prompts)
# 这里省略
# 用于存储 "每条输出" 的每个step的分数
# 例如 step_rewards_list[i] = [r_step1, r_step2, ...]
step_rewards_list = []
for completion_text in completions_text:
# 分割出多个step:
steps = completion_text.split("STEP:")
# 每个step都要调用 reward_model 打分 (过程监督)
# 这里假设 reward_model 接受字符串输入 => 返回 [分数]
step_rewards = []
for s in steps:
# 纯演示
r = self._score_one_step(s) # 后面定义个例子函数
step_rewards.append(r)
step_rewards_list.append(step_rewards)
# Step 4: 把 step-level reward 转成 token-level advantage
# 我们需要知道 "step_j" 覆盖了哪些 token
# 这里做一个简单的假设:steps间等长拆分, 真实情况需要依赖 token idx.
# 先假设 each step = roughly same length
# 伪码:
# for each sample i in [0..B*G):
# step_boundaries[i][j] = (start_token_idx, end_token_idx)
# total_token_in_completion[i] = ...
# then for token in [start,end): advantage[token] += step_rewards[j]
# 由于是示例,我们简要写成:
# shape = [B*G, max_completion_length], 先都 0
completion_mask = torch.ones(len(completions_text), 30, dtype=torch.int32) # 30假设
step_advantages = torch.zeros_like(completion_mask, dtype=torch.float32)
for i, step_rs in enumerate(step_rewards_list):
# step_rs: [r1, r2, ...]
# 假设回答长 30 tokens, steps = len(step_rs), step_len = 30 // len(step_rs)
# 真实情况要用 tokenizer decode去找step起止
num_steps = len(step_rs)
if num_steps == 0:
continue
step_len = 30 // num_steps
for j, r in enumerate(step_rs):
start_idx = j * step_len
end_idx = (j+1)*step_len if j < num_steps-1 else 30
# 把这个step的奖励写到对应token上
step_advantages[i, start_idx:end_idx] = r
# 现在 step_advantages[i,t] 就是 “第i条回答在token t 处的 reward”
# 但还不是 group内部标准化 => 我们得先算 group内均值/方差
# 这里先把 step_advantages -> sum => single scalar?
# 1) 求每条回答 sum => shape: [B*G]
# (也可对step更精细, 这里演示)
sums = step_advantages.sum(dim=1)
# 2) 分组
# suppose shape [B, G]
# B = len(prompts)
# 这里略做 pseudo
B = 2
G = self.num_generations
sums_2d = sums.view(B, G) # => [B, G]
mean = sums_2d.mean(dim=1, keepdim=True) # => [B,1]
std = sums_2d.std(dim=1, keepdim=True) # => [B,1]
# broadcast回去 => [B, G]
sums_norm = (sums_2d - mean) / (std + 1e-4)
# 3) expand回 [B*G]
adv_norm = sums_norm.view(-1)
# 4) token级别 advantage: 这时我们可以把 adv_norm[i] 覆盖到 step_advantages
# 以便每个token都乘到相同“组内相对值”
for i in range(step_advantages.size(0)):
step_advantages[i] *= adv_norm[i] / (step_advantages[i].mean() + 1e-5)
# 这里做一个很粗糙的 “再归一化” 示例 => 真实情况你可用别的方法
# 最终 step_advantages 中每个token有个 相对 advantage
# 下面返回给外面 compute_loss 用
return {
"completion_ids": ...,
"completion_mask": completion_mask,
"step_advantages": step_advantages,
# 还需要 ref_model_logps, prompt_ids, etc...
}
def compute_loss(self, model, inputs, return_outputs=False):
"""
类似grpo_train.py的compute_loss,但过程奖励是 step_advantages
"""
completion_ids = inputs["completion_ids"]
completion_mask = inputs["completion_mask"]
step_advantages = inputs["step_advantages"]
# 1) compute current model log prob
# => shape (B*G, seq_len)
# logps = self._get_per_token_logps(model, completion_ids,...)
logps = torch.randn_like(step_advantages) # 伪造
# 2) compute KL with ref model
# => shape same as logps
kl = torch.zeros_like(logps) # demo
# 3) 组装loss,
# 这里也跟 grpo_train.py相似: - [ exp(logp-old_logp)*A - beta*kl ]
# 但此处A换成 step_advantages
ratio = torch.exp(logps - logps.detach()) # placeholder
per_token_loss = ratio * step_advantages - self.beta * kl
per_token_loss = -per_token_loss # maximize => negative
# apply mask
masked_loss = per_token_loss * completion_mask
loss = masked_loss.sum(dim=1) / (completion_mask.sum(dim=1)+1e-5)
final_loss = loss.mean()
return final_loss
def _score_one_step(self, step_text):
"""
给示例用的过程监督打分函数
你可以对 step_text 做任何解析、用 reward_model forward...
这里就简单假设: 越长的文本给越高reward
"""
return float(len(step_text)) * 0.01
上面这段代码的思路和原有 grpo_train.py
类似,但在 _prepare_inputs
的部分我们做了过程监督:
- 拆分多步:
completion_text.split("STEP:")
; - step 级打分:
_score_one_step
; - 把每个step的分数写到token区间上;
- 再做group内均值/方差归一化,得到相对优势;
- 最后在
compute_loss
时,按照这些“step_advantages”来计算每个token应被鼓励还是惩罚。
你需要做的改进
- 步骤边界如何划分:实际中,你可能在上游 prompt 或回答中嵌入特殊标签(“Step 1: …”),或者对Chain-of-thought做额外标记;
- 奖励模型:可以是真实训练好的 reward model,输入 step_text + prompt,输出一个float分数;
- KL 与 参考模型:示例中只用 0 做占位,应换成真正
ref_model
计算 token-level logp,以做 PPO/GRPO style KL 惩罚; - 组内对每一步Reward做归一化:示例里先把 step-level reward 累加成
[B*G]
scalar,再做 group wise std / mean;实际你可考虑更精细化:比如先对 step-level reward 同一组内比较,然后再把它加到 token-level advantage。
4. 关键差异:Outcome vs. Process Supervision
- Outcome Supervision:
rewards
是[B*G]
,对整条输出一次性打分 -> group 内均值/方差 -> advantage。 - Process Supervision:每个输出包含若干步骤(
[step1, step2, ...]
),给每个step一个reward,然后映射到token级别,再进行group-wise归一化。- 好处:对复杂推理题,模型能“分步知道”哪些环节导致错误/正确;
- 代价:实现更复杂,需要在生成或解析环节多做拆分和打分,reward model 也要能处理细粒度步骤。
5. 总结
- 过程监督在理论和实践中都有助于提高训练效率、减少大模型出现错误推理的概率;对长序列任务(如数学、多段代码生成)尤其有价值。
- 本文用一个简单的
PSGRPOTrainer
示例演示了“如何在_prepare_inputs
阶段对每条回答拆分多步 -> 打分 -> 累加到 token”,再在compute_loss
做相对优势与 KL 的计算。 - 相比起
grpo_train.py
中的 outcome 监督,此处的关键是:“一个回答” 不再只产出一个 reward,而是对多个“step”产出多个 reward,并把它累加到每个 step 的 token 区间上。 - 若你想真正应用到生产环境,需要配合真实的“步骤拆分 / 高质量过程 reward model”,以及可靠的 KL 计算、去重、并行等机制。
希望以上分析能帮助你理解如何把过程监督融入到 GRPO 中,从而获得更细粒度的奖励信号,让大语言模型更加“稳健”地学会多步推理、代码生成等复杂任务。
改进的GRPO:可运行的demo
下面这段代码展示了一个可运行的示例,演示如何在“过程监督(Process Supervision)”场景下,用GRPO的思路来训练大语言模型。相比之前的简化版本,我们做了以下改进:
- 上游 Prompt 或回答中显式插入“Step 1: … Step N: …”标签,以便我们能够准确识别和解析“中间步骤”。
- 奖励模型(reward_model)可被外部传入,并且在本示例中通过
forward(prompt, step_text)
返回一个 float 分数(模拟真实训练好的模型)。 - KL 与 参考模型:我们定义了一个
ref_model
,在计算 loss 时真正地对比“当前模型 vs. 参考模型”的对数概率,得到 KL 惩罚项。
请注意,这段示例是一个“可运行的演示”:
- 在
_sample_completions
中并未真正调用model.generate
,而是生成一些“带 step 标签的回答”用于说明过程监督的思路; - 你可以在实际项目中替换
_sample_completions
为真实的文本生成逻辑,并替换DummyRewardModel
为你的“训练好的奖励模型”或“自定义评分函数”。
以下示例使用了 transformers
中的 Trainer
机制,同时使用了 PyTorch 进行数值运算。你可以把这段代码保存为 ps_grpo_train.py
之类的文件,安装好 transformers >=4.26.0, torch >=1.10
后直接跑一个“假训练循环”来观察它的执行。
import torch
import torch.nn.functional as F
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
from dataclasses import dataclass, field
from typing import Optional, List, Dict
def _get_per_token_logp(model: nn.Module, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
"""
给定 model、input_ids、attention_mask,返回token级别的 log p (对数概率),
形状: (batch_size, seq_len)。这是 PPO/GRPO 常见的辅助函数。
"""
with torch.no_grad():
outputs = model(input_ids=input_ids, attention_mask=attention_mask)
# outputs.logits形状 [B, L, vocab_size]
logits = outputs.logits
# 对最后一列logits无token对应,所以取[:, :-1, :] => logit for token0..token(L-2)
# 并与input_ids[:,1..]对齐,下面做示例中的简单对齐:
# 你也可以根据需要做 shift
# 这里只是演示:
shifted_logits = logits[:, :-1, :]
shifted_input_ids = input_ids[:, 1:]
# log_softmax 后 gather:
log_probs_all = F.log_softmax(shifted_logits, dim=-1)
# gather
gathered = log_probs_all.gather(dim=-1, index=shifted_input_ids.unsqueeze(-1)).squeeze(-1)
# shape [B, L-1]
# 补齐成与 input_ids 同长(或加padding)
pad_len = input_ids.size(1) - gathered.size(1)
if pad_len > 0:
pad_zeros = gathered.new_zeros(gathered.size(0), pad_len)
gathered = torch.cat([gathered, pad_zeros], dim=1)
return gathered # [B, L], 末尾几个可能是0
class DummyRewardModel:
"""
模拟训练好的奖励模型,对 (prompt, step_text) 返回一个 float 分数
这里根据 step_text 的长度 & 某些关键词来进行伪打分。
"""
def __init__(self):
pass
def forward(self, prompt: str, step_text: str) -> float:
# 演示:越长的 step_text,分数越高;如果包含 'mistake' 字眼就减分
base_score = float(len(step_text)) * 0.01
if "mistake" in step_text.lower():
base_score -= 0.3
return base_score
@dataclass
class PSGRPOConfig:
"""
训练配置: 包括 GRPO 相关参数
"""
num_generations: int = 2 # 一次对同一个 prompt 生成多少回答
beta: float = 0.02 # KL 系数
max_steps: int = 2 # 假设回答中最多会出现 Step 1: ... Step 2: ...
# 其他可拓展
class PSGRPOTrainer(Trainer):
def __init__(
self,
ref_model: nn.Module,
reward_model: DummyRewardModel,
tokenizer: AutoTokenizer,
ps_config: PSGRPOConfig,
*args,
**kwargs
):
super().__init__(*args, **kwargs)
self.ref_model = ref_model
self.reward_model = reward_model
self.tokenizer = tokenizer
self.ps_config = ps_config
def _sample_completions(self, prompt: str) -> List[str]:
"""
演示:给定 prompt,模拟生成 self.ps_config.num_generations 条包含 "Step i:" 的回答。
真实场景你会在这里使用 self.model.generate。
"""
completions = []
for g in range(self.ps_config.num_generations):
# 这里简单拼:
# Step 1: (some text)
# Step 2: ...
# 并带一点随机
step1 = f"Step 1: Analysis for prompt [{prompt}] part {g}."
step2 = f"Step 2: Conclusion. {('mistake' if g % 2 == 0 else 'correctness')} info"
completions.append(step1 + "\n" + step2)
return completions
def _process_supervision_rewards(
self,
prompt: str,
completion: str
) -> List[float]:
"""
给定某条回答(内含 'Step i:' 标签),拆分出多个步骤,对每一步调用 reward_model 打分。
返回一个list, 对应每个step的reward.
"""
# 以 "Step i:" 做拆分
# 这里写个简单split
# 真实情况可能需要更精细的正则
lines = completion.split("\n")
step_rewards = []
for line in lines:
if "Step" in line:
# line示例: "Step 1: Analysis for prompt ..."
step_text = line.strip()
# 用 reward_model
r = self.reward_model.forward(prompt, step_text)
step_rewards.append(r)
return step_rewards
def _prepare_inputs(self, inputs: List[Dict]) -> Dict[str, torch.Tensor]:
"""
模拟 grpo_train.py 中 _prepare_inputs 作用:
1. 批量对 prompt 生成多条回答
2. 对回答分步骤打分
3. 分组计算 advantage
4. 返回计算loss需要的数据(含ref_model对数概率)
"""
# inputs 里包含 prompt
batch_prompts = [ex["prompt"] for ex in inputs]
B = len(batch_prompts)
G = self.ps_config.num_generations
all_completions = []
# shape => [B*G] text
for prompt in batch_prompts:
completions = self._sample_completions(prompt) # G条
all_completions.extend([(prompt, c) for c in completions])
# 对每条回答做 step-level reward
step_rewards_list = [] # len=B*G, each => list of step rewards
for (prompt, completion) in all_completions:
step_rs = self._process_supervision_rewards(prompt, completion)
step_rewards_list.append(step_rs)
# 假设对每条回答embedding => token IDs
# 统一拼 [prompt, completion], 以便后面做 model logp
# 真实情况你也可分开
concat_texts = []
for (p, c) in all_completions:
concat_texts.append(p + "\n" + c)
encoded = self.tokenizer(
concat_texts,
return_tensors="pt",
padding=True,
truncation=True,
max_length=128
)
input_ids = encoded["input_ids"]
attention_mask = encoded["attention_mask"]
# 计算 “当前模型” 对 token 的 logp
with torch.no_grad():
per_token_logps = _get_per_token_logp(self.model, input_ids, attention_mask)
# 计算 “参考模型” 对 token 的 logp (用于 KL)
with torch.no_grad():
ref_token_logps = _get_per_token_logp(self.ref_model, input_ids, attention_mask)
# 现在每条回答有 step_rewards_list[i] 形如 [r_step1, r_step2, ...]
# 需要将这些step reward映射到 token 上
# 这里只是演示: 用 sum(r_step_j) 作为回答总reward => group内做相对比较
# (你也可以做更精细 token-level对step区间赋值)
total_rewards = []
for rs in step_rewards_list:
s = sum(rs) # sum of step-level
total_rewards.append(s)
total_rewards = torch.tensor(total_rewards, dtype=torch.float32)
# => [B*G]
# Group: reshape => [B, G]
total_rewards_2d = total_rewards.view(B, G)
group_mean = total_rewards_2d.mean(dim=1, keepdim=True)
group_std = total_rewards_2d.std(dim=1, keepdim=True)
# broadcast => [B, G]
group_adv = (total_rewards_2d - group_mean) / (group_std + 1e-5)
# => flatten => [B*G]
advantages = group_adv.view(-1)
# 下面把 advantages broadcast到 token级别
# 这里只是最简写法: 每个回答里所有token共享同一 advantage
# shape => (B*G, seq_len)
expanded_adv = advantages.unsqueeze(1).expand(-1, input_ids.size(1))
# 返回 dict,给 compute_loss 用
return {
"input_ids": input_ids,
"attention_mask": attention_mask,
"per_token_logps": per_token_logps,
"ref_token_logps": ref_token_logps,
"advantages": expanded_adv
}
def compute_loss(self, model, inputs, return_outputs=False):
"""
类似grpo_train的compute_loss,
- ratio*(advantages) - beta*KL
"""
input_ids = inputs["input_ids"]
attention_mask = inputs["attention_mask"]
current_logps = inputs["per_token_logps"] # [B*G, seq_len]
ref_logps = inputs["ref_token_logps"] # [B*G, seq_len]
advantages = inputs["advantages"] # [B*G, seq_len]
# ratio => exp(current_logps - detach(current_logps))
# 这里原版 PPO/GRPO 是 compare old vs current,
# 我们为了演示 => compare current with current.detach()
# 真实可传 old_logps
ratio = torch.exp(current_logps - current_logps.detach())
# 计算KL: 这里是( reference vs current ) 或( current vs reference )?
# PPO/GRPO 常见写法 = exp(ref-curr) - (ref-curr) - 1
kl_diff = ref_logps - current_logps
per_token_kl = torch.exp(kl_diff) - kl_diff - 1
# token-level loss
per_token_loss = ratio * advantages - self.ps_config.beta * per_token_kl
# 我们想 maximize => 取负
per_token_loss = -per_token_loss
# mask
mask = attention_mask.float()
masked_loss = per_token_loss * mask
loss_per_seq = masked_loss.sum(dim=1) / mask.sum(dim=1).clamp_min(1e-5)
final_loss = loss_per_seq.mean()
return final_loss
def training_step(self, model, inputs):
"""
重写 Trainer 默认的 training_step,
先调 _prepare_inputs,再调 compute_loss
"""
# 1) 根据 batch (list of dict) 做 prepare
prepared = self._prepare_inputs(inputs)
# 2) compute_loss
loss = self.compute_loss(model, prepared)
loss.backward()
return loss.detach()
##
# 一个简易main: 演示如何用 PSGRPOTrainer
##
def main():
# 1.加载或初始化 "当前策略模型" 及 "参考模型"
# 演示: 同一个初始权重
model_name = "distilroberta-base" # 用roberta做文本demo
current_model = AutoModelForCausalLM.from_pretrained("gpt2") # GPT2 for simpler causal
ref_model = AutoModelForCausalLM.from_pretrained("gpt2")
tokenizer = AutoTokenizer.from_pretrained("gpt2")
tokenizer.pad_token = tokenizer.eos_token # GPT2没有pad_token,我们指定 eos
# 2. 准备一个RewardModel
reward_model = DummyRewardModel()
# 3. 准备训练超参 & Config
ps_config = PSGRPOConfig(
num_generations=2,
beta=0.01,
max_steps=2
)
# 4. 构造个小训练数据
# 只存一个字段 "prompt"
train_data = [{"prompt":"How to solve x^2=4?"}, {"prompt":"What is AI safety?"}]
# 伪做 2 steps epochs
# Transformers的Trainer需要 "Dataset" 或list => 这里可用
# 但 Trainer默认会把传入batch => dict of list. 所以,
# 这里我们用 "collator" 直接返回batch list
# 也可自定义 dataset
# 5. 定义PSGRPOTrainer
training_args = TrainingArguments(
output_dir="./outputs_ps_grpo",
num_train_epochs=1,
per_device_train_batch_size=2,
logging_steps=1,
learning_rate=1e-4
)
def my_data_collator(features):
# features是list of {"prompt":...}
# 直接原样返回 => trainer内部会把它当作batch: list of dict
return features
trainer = PSGRPOTrainer(
model=current_model,
ref_model=ref_model,
reward_model=reward_model,
tokenizer=tokenizer,
ps_config=ps_config,
args=training_args,
train_dataset=train_data, # 传入list => Trainer自动把它当dataset
data_collator=my_data_collator
)
# 6. 调Trainer进行"假训练"
trainer.train()
if __name__ == "__main__":
main()
代码说明
-
模型加载
- 我们使用
GPT2
作为当前策略模型current_model
和参考模型ref_model
,以便在compute_loss
阶段对比它们的对数概率计算 KL 惩罚。 - 需要注意的是,如果你真的要做 PPO/GRPO,需要在更新策略模型前存一份
old_model
的快照,然后在下一个 batch 中将old_model
用于 ratio;在本示例里,为了简化,我们把“旧策略”近似成current_logps.detach()
来模拟 PPO/GRPO 公式中的ratio
。
- 我们使用
-
过程监督
- 在
_sample_completions
函数中,我们人为地在回答里插入了 “Step 1:” / “Step 2:” 标签,表示中间推理步骤。 _process_supervision_rewards
逐行查找 “Step i:” 的信息,并调用reward_model.forward(prompt, step_text)
来拿到每个 step 的分数。- 在这里,我们只取了它们的和
sum(rs)
作为回答最终 reward,然后做 group 内的归一化; - 若你想对每个 step 映射到 token 级别,你可以更加精细地记录 “Step j 对应从 token_x 到 token_y”,再给
[x..y]
的 token 分数。
- 在
-
KL 惩罚
ref_token_logps = _get_per_token_logp(self.ref_model, ...)
返回参考模型对每个 token 的对数概率。current_logps = _get_per_token_logp(self.model, ...)
返回当前策略对每个 token 的对数概率。- 在
compute_loss
中,kl = exp(ref - current) - (ref - current) - 1
是 PPO/GRPO 常见近似写法。 - 最终
(ratio*advantage - beta*kl)
做了合并,这就是 GRPO 的核心公式之一。
-
训练循环
Trainer
内部会把train_dataset
分成 batch(这里 batch_size=2),并在training_step
阶段调用_prepare_inputs
+compute_loss
。_prepare_inputs
里做 prompt -> completions -> rewards -> advantage;compute_loss
里做 PPO/GRPO formula,loss.backward()
。- 这样就完成了一次 RL 迭代。实际项目中你会迭代多 epochs / steps。
总结
通过这个示例,我们把过程监督融入到 GRPO 流程中,包括对每个 step 进行单独打分、对参考模型做 KL 惩罚,以及把分数合并为 advantage。虽然这里做了很多简化(例如 _sample_completions
是伪造的),但核心思路完全一致:
- 识别中间步骤(
Step i:
标签), - 对每个步骤打分(过程监督),
- 将分数映射到 token(或直接作为回答整体分数),
- 做组内比较(Group advantage)并结合 KL 来更新策略模型。
实际中,你可以:
- 用真正的
_sample_completions
(即model.generate
)或在线采样; - 用真实的“过程Reward模型”对中间步骤进行评估;
- 对 steps -> token 做更精细的映射;
- 保存/加载参考模型
old_model
做严格的 PPO ratio 计算。
这样就可以构建一个完整的“Process Supervision RL with GRPO”训练流水线。希望本示例能帮助你更好地落地过程监督的做法,让大模型在多步骤推理、代码生成、复杂任务中获得更好的训练与表现。
后记
2025年2月22日22点07分于上海。在GPT o1大模型辅助下完成。