当前位置: 首页 > news >正文

过程监督(Process Supervision)融入到 GRPO (Group Relative Policy Optimization)

下面演示如何把“过程监督(Process Supervision)”的思想融入到 GRPO (Group Relative Policy Optimization) 中,从而对每个输出的中间步骤逐一打分、计算相对优势。本文的示例代码与 grpo_train.py (来源见下文)类似,核心区别在于对“中间步骤”进行奖励评估并将其累计到对应 token,而不是只在回答最终完成时打分。

1. 背景:什么是过程监督(Process Supervision)?

在 LLM 的强化学习 (RL) 训练中,我们可以根据不同的“奖励时间点”来划分:

  • Outcome Supervision: 只对最后的完成输出给一个整体奖励。之前的 grpo_train.py 即是这种做法。
  • Process Supervision: 在生成完成输出的过程中,每一步或每个重要阶段都可以有一个“中间奖励(或惩罚)”,这样模型会及时地知道哪些中间推理步骤更优,从而更精细地调整策略。

对于数学推理、代码生成等场景,如果我们能获取“过程”中的正确/错误信息,那么过程监督往往会收敛得更好、更快。

2. 核心思路:对每个中间步骤打分,再把奖励“回填”到后续 token

在下方示例里,我们做了一个极简版本的 GRPO + 过程监督:

  1. 同一 Prompt 下采样 G 条回答:每条回答可能包含多个步骤(或多行拆开)。
  2. 把回答分成若干步骤:例如碰到某些分隔符时就视为一个step;
  3. 调用奖励函数:对每一步骤打一个分 r(step_j)
  4. 回填(token-level):对于第 j 步的奖励,要赋给从该步骤对应的起始 token下一步开始 token前的那些 tokens;这样后续的 tokens 也会“继承”该步的优劣信息。
  5. 组内做相对比较:跟 outcome supervision 类似,把 G 条回答中所有步骤的分数合在一起做 group-wise 标准化,得到相对优势。
  6. 计算损失:跟常规 PPO/GRPO 相似,不过“优势”现在是按步骤累加。

请注意,这只是一个示例,实际工程中,如何判定“一个步骤”可能需要事先插入特殊分割符,或正则去找“Step N: ”之类的标签等。


3. 示例代码

为直观起见,我们先给出一个与 GRPOTrainer 类似的 PSGRPOTrainer(Process-Supervision GRPO Trainer,请参考笔者的另一篇博客:TRL里面GRPOTrainer中grpo_train.py文件详解)示例文件。你可以把它当成 pseudo code 或 demo,因为很多实现细节(比如如何准确分割 step、如何加载 reward model)需要你在项目中自行完成。

import torch
import torch.nn.functional as F
from torch import nn
from transformers import Trainer
import math

class PSGRPOTrainer(Trainer):
    """
    A simple example of GRPO with Process Supervision, by inheriting from Trainer.
    Similar structure to `grpo_train.py`, but modifies `_prepare_inputs` to do step-level scoring.
    """

    def __init__(self, *args, reward_model=None, num_generations=4, beta=0.02, **kwargs):
        super().__init__(*args, **kwargs)
        self.reward_model = reward_model  # 过程监督用的RM
        self.num_generations = num_generations
        self.beta = beta
        # 其他初始化:参考模型、max_prompt_length 等等在此省略

    def _prepare_inputs(self, inputs):
        """
        1. 对每条prompt采样G条输出
        2. 对每条输出分步骤打分
        3. 计算组内相对优势(此处演示Process-level)
        """
        device = self.args.device

        # Step 1: 取出 prompt
        prompts = [example["prompt"] for example in inputs]

        # 用 tokenizer 编码 prompt,这里省略
        # prompt_ids, prompt_mask = ...

        # Step 2: 对每个prompt一次性生成多条回答
        # 下面是伪码: 
        # completion_ids = []
        # for each prompt in batch:
        #   for g in range(self.num_generations):
        #       out = self.model.generate(prompt_ids, ...)
        #       completion_ids.append(out)

        # completion_ids => shape: [B * G, seq_len]
        # 省略: 需要类似 grpo_train.py 的 EOS masking, 参考模型 logprob 计算等
        # ...
        
        # ----下面只演示 “分步骤打分 + 累计奖励” 部分----

        # Step 3: 分割回答成若干step并打分
        # 这是个极简示例:假设回答里用特殊符号 "STEP:" 标识中间过程
        # 需要把 tokens decode -> text, split -> steps
        # 也可以在token层面做,但此处为直观简化

        # decode 到文本
        completions_text = ["示例回答1: STEP: 中间推理. STEP: 最终结论",  # for B*G lines
                            "示例回答2: ...",
                            "..."]

        # 我们打算对每一条回答的 step 做打分
        # 先把 group reshape
        # group_size = self.num_generations
        # B = len(prompts)
        # 这里省略

        # 用于存储 "每条输出" 的每个step的分数
        # 例如 step_rewards_list[i] = [r_step1, r_step2, ...]
        step_rewards_list = []

        for completion_text in completions_text:
            # 分割出多个step:
            steps = completion_text.split("STEP:")
            # 每个step都要调用 reward_model 打分 (过程监督)
            # 这里假设 reward_model 接受字符串输入 => 返回 [分数]
            step_rewards = []
            for s in steps:
                # 纯演示
                r = self._score_one_step(s)  # 后面定义个例子函数
                step_rewards.append(r)
            step_rewards_list.append(step_rewards)

        # Step 4: 把 step-level reward 转成 token-level advantage
        # 我们需要知道 "step_j" 覆盖了哪些 token
        # 这里做一个简单的假设:steps间等长拆分, 真实情况需要依赖 token idx.
        # 先假设 each step = roughly same length

        # 伪码: 
        # for each sample i in [0..B*G):
        #   step_boundaries[i][j] = (start_token_idx, end_token_idx)
        #   total_token_in_completion[i] = ...
        #   then for token in [start,end): advantage[token] += step_rewards[j]

        # 由于是示例,我们简要写成:
        # shape = [B*G, max_completion_length], 先都 0
        completion_mask = torch.ones(len(completions_text), 30, dtype=torch.int32) # 30假设
        step_advantages = torch.zeros_like(completion_mask, dtype=torch.float32)

        for i, step_rs in enumerate(step_rewards_list):
            # step_rs: [r1, r2, ...]
            # 假设回答长 30 tokens, steps = len(step_rs), step_len = 30 // len(step_rs)
            # 真实情况要用 tokenizer decode去找step起止
            num_steps = len(step_rs)
            if num_steps == 0:
                continue
            step_len = 30 // num_steps
            for j, r in enumerate(step_rs):
                start_idx = j * step_len
                end_idx = (j+1)*step_len if j < num_steps-1 else 30
                # 把这个step的奖励写到对应token上
                step_advantages[i, start_idx:end_idx] = r

        # 现在 step_advantages[i,t] 就是 “第i条回答在token t 处的 reward”
        # 但还不是 group内部标准化 => 我们得先算 group内均值/方差
        # 这里先把 step_advantages -> sum => single scalar?

        # 1) 求每条回答 sum => shape: [B*G]
        # (也可对step更精细, 这里演示)
        sums = step_advantages.sum(dim=1)

        # 2) 分组
        # suppose shape [B, G]
        # B = len(prompts)
        # 这里略做 pseudo
        B = 2
        G = self.num_generations
        sums_2d = sums.view(B, G)  # => [B, G]
        mean = sums_2d.mean(dim=1, keepdim=True)  # => [B,1]
        std = sums_2d.std(dim=1, keepdim=True)    # => [B,1]

        # broadcast回去 => [B, G]
        sums_norm = (sums_2d - mean) / (std + 1e-4)

        # 3) expand回 [B*G]
        adv_norm = sums_norm.view(-1)
        
        # 4) token级别 advantage: 这时我们可以把 adv_norm[i] 覆盖到 step_advantages
        #   以便每个token都乘到相同“组内相对值”
        for i in range(step_advantages.size(0)):
            step_advantages[i] *= adv_norm[i] / (step_advantages[i].mean() + 1e-5)
            # 这里做一个很粗糙的 “再归一化” 示例 => 真实情况你可用别的方法

        # 最终 step_advantages 中每个token有个 相对 advantage
        # 下面返回给外面 compute_loss 用
        return {
            "completion_ids": ...,
            "completion_mask": completion_mask,
            "step_advantages": step_advantages,
            # 还需要 ref_model_logps, prompt_ids, etc...
        }

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        类似grpo_train.py的compute_loss,但过程奖励是 step_advantages
        """
        completion_ids = inputs["completion_ids"]
        completion_mask = inputs["completion_mask"]
        step_advantages = inputs["step_advantages"]

        # 1) compute current model log prob
        #    => shape (B*G, seq_len)
        # logps = self._get_per_token_logps(model, completion_ids,...)
        logps = torch.randn_like(step_advantages)  # 伪造

        # 2) compute KL with ref model
        #    => shape same as logps
        kl = torch.zeros_like(logps)  # demo

        # 3) 组装loss, 
        #    这里也跟 grpo_train.py相似: - [ exp(logp-old_logp)*A - beta*kl ]
        #    但此处A换成 step_advantages
        ratio = torch.exp(logps - logps.detach())  # placeholder
        per_token_loss = ratio * step_advantages - self.beta * kl
        per_token_loss = -per_token_loss  # maximize => negative

        # apply mask
        masked_loss = per_token_loss * completion_mask
        loss = masked_loss.sum(dim=1) / (completion_mask.sum(dim=1)+1e-5)
        final_loss = loss.mean()

        return final_loss

    def _score_one_step(self, step_text):
        """
        给示例用的过程监督打分函数
        你可以对 step_text 做任何解析、用 reward_model forward...
        这里就简单假设: 越长的文本给越高reward
        """
        return float(len(step_text)) * 0.01

上面这段代码的思路和原有 grpo_train.py 类似,但在 _prepare_inputs 的部分我们做了过程监督:

  • 拆分多步completion_text.split("STEP:")
  • step 级打分_score_one_step
  • 把每个step的分数写到token区间上
  • 再做group内均值/方差归一化,得到相对优势;
  • 最后在 compute_loss,按照这些“step_advantages”来计算每个token应被鼓励还是惩罚。

你需要做的改进

  1. 步骤边界如何划分:实际中,你可能在上游 prompt 或回答中嵌入特殊标签(“Step 1: …”),或者对Chain-of-thought做额外标记;
  2. 奖励模型:可以是真实训练好的 reward model,输入 step_text + prompt,输出一个float分数;
  3. KL 与 参考模型:示例中只用 0 做占位,应换成真正 ref_model 计算 token-level logp,以做 PPO/GRPO style KL 惩罚;
  4. 组内对每一步Reward做归一化:示例里先把 step-level reward 累加成 [B*G] scalar,再做 group wise std / mean;实际你可考虑更精细化:比如先对 step-level reward 同一组内比较,然后再把它加到 token-level advantage。

4. 关键差异:Outcome vs. Process Supervision

  • Outcome Supervisionrewards[B*G],对整条输出一次性打分 -> group 内均值/方差 -> advantage。
  • Process Supervision:每个输出包含若干步骤([step1, step2, ...]),给每个step一个reward,然后映射到token级别,再进行group-wise归一化。
    • 好处:对复杂推理题,模型能“分步知道”哪些环节导致错误/正确;
    • 代价:实现更复杂,需要在生成或解析环节多做拆分和打分,reward model 也要能处理细粒度步骤。

5. 总结

  • 过程监督在理论和实践中都有助于提高训练效率、减少大模型出现错误推理的概率;对长序列任务(如数学、多段代码生成)尤其有价值。
  • 本文用一个简单的PSGRPOTrainer示例演示了“如何在 _prepare_inputs 阶段对每条回答拆分多步 -> 打分 -> 累加到 token”,再在 compute_loss 做相对优势与 KL 的计算。
  • 相比起 grpo_train.py 中的 outcome 监督,此处的关键是:“一个回答” 不再只产出一个 reward,而是对多个“step”产出多个 reward,并把它累加到每个 step 的 token 区间上
  • 若你想真正应用到生产环境,需要配合真实的“步骤拆分 / 高质量过程 reward model”,以及可靠的 KL 计算、去重、并行等机制。

希望以上分析能帮助你理解如何把过程监督融入到 GRPO 中,从而获得更细粒度的奖励信号,让大语言模型更加“稳健”地学会多步推理、代码生成等复杂任务。

改进的GRPO:可运行的demo

下面这段代码展示了一个可运行的示例,演示如何在“过程监督(Process Supervision)”场景下,用GRPO的思路来训练大语言模型。相比之前的简化版本,我们做了以下改进:

  1. 上游 Prompt 或回答中显式插入“Step 1: … Step N: …”标签,以便我们能够准确识别和解析“中间步骤”。
  2. 奖励模型(reward_model)可被外部传入,并且在本示例中通过 forward(prompt, step_text) 返回一个 float 分数(模拟真实训练好的模型)。
  3. KL 与 参考模型:我们定义了一个 ref_model,在计算 loss 时真正地对比“当前模型 vs. 参考模型”的对数概率,得到 KL 惩罚项。

请注意,这段示例是一个“可运行的演示”:

  • _sample_completions 中并未真正调用 model.generate,而是生成一些“带 step 标签的回答”用于说明过程监督的思路;
  • 你可以在实际项目中替换 _sample_completions 为真实的文本生成逻辑,并替换 DummyRewardModel 为你的“训练好的奖励模型”或“自定义评分函数”。

以下示例使用了 transformers 中的 Trainer 机制,同时使用了 PyTorch 进行数值运算。你可以把这段代码保存为 ps_grpo_train.py 之类的文件,安装好 transformers >=4.26.0, torch >=1.10 后直接跑一个“假训练循环”来观察它的执行。


import torch
import torch.nn.functional as F
from torch import nn
from transformers import AutoModelForCausalLM, AutoTokenizer, Trainer, TrainingArguments
from dataclasses import dataclass, field
from typing import Optional, List, Dict

def _get_per_token_logp(model: nn.Module, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
    """
    给定 model、input_ids、attention_mask,返回token级别的 log p (对数概率),
    形状: (batch_size, seq_len)。这是 PPO/GRPO 常见的辅助函数。
    """
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        # outputs.logits形状 [B, L, vocab_size]
    logits = outputs.logits
    # 对最后一列logits无token对应,所以取[:, :-1, :] => logit for token0..token(L-2)
    # 并与input_ids[:,1..]对齐,下面做示例中的简单对齐: 
    # 你也可以根据需要做 shift
    # 这里只是演示:
    shifted_logits = logits[:, :-1, :]
    shifted_input_ids = input_ids[:, 1:]
    # log_softmax 后 gather:
    log_probs_all = F.log_softmax(shifted_logits, dim=-1)
    # gather
    gathered = log_probs_all.gather(dim=-1, index=shifted_input_ids.unsqueeze(-1)).squeeze(-1)
    # shape [B, L-1]
    # 补齐成与 input_ids 同长(或加padding)
    pad_len = input_ids.size(1) - gathered.size(1)
    if pad_len > 0:
        pad_zeros = gathered.new_zeros(gathered.size(0), pad_len)
        gathered = torch.cat([gathered, pad_zeros], dim=1)

    return gathered  # [B, L], 末尾几个可能是0

class DummyRewardModel:
    """
    模拟训练好的奖励模型,对 (prompt, step_text) 返回一个 float 分数
    这里根据 step_text 的长度 & 某些关键词来进行伪打分。
    """
    def __init__(self):
        pass

    def forward(self, prompt: str, step_text: str) -> float:
        # 演示:越长的 step_text,分数越高;如果包含 'mistake' 字眼就减分
        base_score = float(len(step_text)) * 0.01
        if "mistake" in step_text.lower():
            base_score -= 0.3
        return base_score

@dataclass
class PSGRPOConfig:
    """
    训练配置: 包括 GRPO 相关参数
    """
    num_generations: int = 2  # 一次对同一个 prompt 生成多少回答
    beta: float = 0.02        # KL 系数
    max_steps: int = 2        # 假设回答中最多会出现 Step 1: ... Step 2: ... 
    # 其他可拓展

class PSGRPOTrainer(Trainer):
    def __init__(
        self,
        ref_model: nn.Module,
        reward_model: DummyRewardModel,
        tokenizer: AutoTokenizer,
        ps_config: PSGRPOConfig,
        *args, 
        **kwargs
    ):
        super().__init__(*args, **kwargs)
        self.ref_model = ref_model
        self.reward_model = reward_model
        self.tokenizer = tokenizer
        self.ps_config = ps_config

    def _sample_completions(self, prompt: str) -> List[str]:
        """
        演示:给定 prompt,模拟生成 self.ps_config.num_generations 条包含 "Step i:" 的回答。
        真实场景你会在这里使用 self.model.generate。
        """
        completions = []
        for g in range(self.ps_config.num_generations):
            # 这里简单拼: 
            # Step 1: (some text) 
            # Step 2: ...
            # 并带一点随机
            step1 = f"Step 1: Analysis for prompt [{prompt}] part {g}."
            step2 = f"Step 2: Conclusion. {('mistake' if g % 2 == 0 else 'correctness')} info"
            completions.append(step1 + "\n" + step2)
        return completions

    def _process_supervision_rewards(
        self, 
        prompt: str, 
        completion: str
    ) -> List[float]:
        """
        给定某条回答(内含 'Step i:' 标签),拆分出多个步骤,对每一步调用 reward_model 打分。
        返回一个list, 对应每个step的reward.
        """
        # 以 "Step i:" 做拆分
        # 这里写个简单split
        # 真实情况可能需要更精细的正则
        lines = completion.split("\n")
        step_rewards = []
        for line in lines:
            if "Step" in line:
                # line示例: "Step 1: Analysis for prompt ..."
                step_text = line.strip()
                # 用 reward_model
                r = self.reward_model.forward(prompt, step_text)
                step_rewards.append(r)
        return step_rewards

    def _prepare_inputs(self, inputs: List[Dict]) -> Dict[str, torch.Tensor]:
        """
        模拟 grpo_train.py 中 _prepare_inputs 作用:
        1. 批量对 prompt 生成多条回答
        2. 对回答分步骤打分
        3. 分组计算 advantage
        4. 返回计算loss需要的数据(含ref_model对数概率)
        """
        # inputs 里包含 prompt
        batch_prompts = [ex["prompt"] for ex in inputs]
        B = len(batch_prompts)
        G = self.ps_config.num_generations

        all_completions = []
        # shape => [B*G] text
        for prompt in batch_prompts:
            completions = self._sample_completions(prompt)  # G条
            all_completions.extend([(prompt, c) for c in completions])

        # 对每条回答做 step-level reward
        step_rewards_list = []  # len=B*G, each => list of step rewards
        for (prompt, completion) in all_completions:
            step_rs = self._process_supervision_rewards(prompt, completion)
            step_rewards_list.append(step_rs)

        # 假设对每条回答embedding => token IDs
        # 统一拼 [prompt, completion], 以便后面做 model logp
        # 真实情况你也可分开
        concat_texts = []
        for (p, c) in all_completions:
            concat_texts.append(p + "\n" + c)
        encoded = self.tokenizer(
            concat_texts, 
            return_tensors="pt", 
            padding=True, 
            truncation=True,
            max_length=128
        )
        input_ids = encoded["input_ids"]
        attention_mask = encoded["attention_mask"]

        # 计算 “当前模型” 对 token 的 logp
        with torch.no_grad():
            per_token_logps = _get_per_token_logp(self.model, input_ids, attention_mask)
        # 计算 “参考模型” 对 token 的 logp (用于 KL)
        with torch.no_grad():
            ref_token_logps = _get_per_token_logp(self.ref_model, input_ids, attention_mask)

        # 现在每条回答有 step_rewards_list[i] 形如 [r_step1, r_step2, ...]
        # 需要将这些step reward映射到 token 上
        # 这里只是演示: 用 sum(r_step_j) 作为回答总reward => group内做相对比较
        # (你也可以做更精细 token-level对step区间赋值)
        total_rewards = []
        for rs in step_rewards_list:
            s = sum(rs)  # sum of step-level
            total_rewards.append(s)

        total_rewards = torch.tensor(total_rewards, dtype=torch.float32)
        # => [B*G]

        # Group: reshape => [B, G]
        total_rewards_2d = total_rewards.view(B, G)
        group_mean = total_rewards_2d.mean(dim=1, keepdim=True)
        group_std = total_rewards_2d.std(dim=1, keepdim=True)
        # broadcast => [B, G]
        group_adv = (total_rewards_2d - group_mean) / (group_std + 1e-5)
        # => flatten => [B*G]
        advantages = group_adv.view(-1)

        # 下面把 advantages broadcast到 token级别
        # 这里只是最简写法: 每个回答里所有token共享同一 advantage
        # shape => (B*G, seq_len)
        expanded_adv = advantages.unsqueeze(1).expand(-1, input_ids.size(1))

        # 返回 dict,给 compute_loss 用
        return {
            "input_ids": input_ids, 
            "attention_mask": attention_mask,
            "per_token_logps": per_token_logps,
            "ref_token_logps": ref_token_logps,
            "advantages": expanded_adv
        }

    def compute_loss(self, model, inputs, return_outputs=False):
        """
        类似grpo_train的compute_loss,
        - ratio*(advantages) - beta*KL
        """
        input_ids = inputs["input_ids"]
        attention_mask = inputs["attention_mask"]

        current_logps = inputs["per_token_logps"]        # [B*G, seq_len]
        ref_logps = inputs["ref_token_logps"]            # [B*G, seq_len]
        advantages = inputs["advantages"]                # [B*G, seq_len]

        # ratio => exp(current_logps - detach(current_logps))
        # 这里原版 PPO/GRPO 是 compare old vs current, 
        # 我们为了演示 => compare current with current.detach()
        # 真实可传 old_logps
        ratio = torch.exp(current_logps - current_logps.detach())

        # 计算KL: 这里是( reference vs current ) 或( current vs reference )?
        # PPO/GRPO 常见写法 = exp(ref-curr) - (ref-curr) - 1
        kl_diff = ref_logps - current_logps
        per_token_kl = torch.exp(kl_diff) - kl_diff - 1

        # token-level loss
        per_token_loss = ratio * advantages - self.ps_config.beta * per_token_kl
        # 我们想 maximize => 取负
        per_token_loss = -per_token_loss

        # mask
        mask = attention_mask.float()
        masked_loss = per_token_loss * mask
        loss_per_seq = masked_loss.sum(dim=1) / mask.sum(dim=1).clamp_min(1e-5)
        final_loss = loss_per_seq.mean()
        return final_loss

    def training_step(self, model, inputs):
        """
        重写 Trainer 默认的 training_step,
        先调 _prepare_inputs,再调 compute_loss
        """
        # 1) 根据 batch (list of dict) 做 prepare
        prepared = self._prepare_inputs(inputs)
        # 2) compute_loss
        loss = self.compute_loss(model, prepared)
        loss.backward()
        return loss.detach()

##
# 一个简易main: 演示如何用 PSGRPOTrainer 
## 

def main():
    # 1.加载或初始化 "当前策略模型" 及 "参考模型"
    # 演示: 同一个初始权重
    model_name = "distilroberta-base"  # 用roberta做文本demo
    current_model = AutoModelForCausalLM.from_pretrained("gpt2")  # GPT2 for simpler causal
    ref_model = AutoModelForCausalLM.from_pretrained("gpt2")

    tokenizer = AutoTokenizer.from_pretrained("gpt2")
    tokenizer.pad_token = tokenizer.eos_token  # GPT2没有pad_token,我们指定 eos

    # 2. 准备一个RewardModel
    reward_model = DummyRewardModel()

    # 3. 准备训练超参 & Config
    ps_config = PSGRPOConfig(
        num_generations=2,
        beta=0.01,
        max_steps=2
    )

    # 4. 构造个小训练数据
    # 只存一个字段 "prompt"
    train_data = [{"prompt":"How to solve x^2=4?"}, {"prompt":"What is AI safety?"}]
    # 伪做 2 steps epochs
    # Transformers的Trainer需要 "Dataset" 或list => 这里可用
    # 但 Trainer默认会把传入batch => dict of list. 所以, 
    # 这里我们用 "collator" 直接返回batch list
    # 也可自定义 dataset

    # 5. 定义PSGRPOTrainer
    training_args = TrainingArguments(
        output_dir="./outputs_ps_grpo",
        num_train_epochs=1,
        per_device_train_batch_size=2,
        logging_steps=1,
        learning_rate=1e-4
    )

    def my_data_collator(features):
        # features是list of {"prompt":...}
        # 直接原样返回 => trainer内部会把它当作batch: list of dict
        return features

    trainer = PSGRPOTrainer(
        model=current_model,
        ref_model=ref_model,
        reward_model=reward_model,
        tokenizer=tokenizer,
        ps_config=ps_config,
        args=training_args,
        train_dataset=train_data,  # 传入list => Trainer自动把它当dataset
        data_collator=my_data_collator
    )

    # 6. 调Trainer进行"假训练"
    trainer.train()

if __name__ == "__main__":
    main()

代码说明

  1. 模型加载

    • 我们使用 GPT2 作为当前策略模型 current_model 和参考模型 ref_model,以便在 compute_loss 阶段对比它们的对数概率计算 KL 惩罚。
    • 需要注意的是,如果你真的要做 PPO/GRPO,需要在更新策略模型前存一份 old_model 的快照,然后在下一个 batch 中将 old_model 用于 ratio;在本示例里,为了简化,我们把“旧策略”近似成 current_logps.detach() 来模拟 PPO/GRPO 公式中的 ratio
  2. 过程监督

    • _sample_completions 函数中,我们人为地在回答里插入了 “Step 1:” / “Step 2:” 标签,表示中间推理步骤。
    • _process_supervision_rewards 逐行查找 “Step i:” 的信息,并调用 reward_model.forward(prompt, step_text) 来拿到每个 step 的分数。
    • 在这里,我们只取了它们的和 sum(rs) 作为回答最终 reward,然后做 group 内的归一化
    • 若你想对每个 step 映射到 token 级别,你可以更加精细地记录 “Step j 对应从 token_x 到 token_y”,再给 [x..y] 的 token 分数。
  3. KL 惩罚

    • ref_token_logps = _get_per_token_logp(self.ref_model, ...) 返回参考模型对每个 token 的对数概率。
    • current_logps = _get_per_token_logp(self.model, ...) 返回当前策略对每个 token 的对数概率。
    • compute_loss 中,kl = exp(ref - current) - (ref - current) - 1 是 PPO/GRPO 常见近似写法。
    • 最终 (ratio*advantage - beta*kl) 做了合并,这就是 GRPO 的核心公式之一。
  4. 训练循环

    • Trainer 内部会把 train_dataset 分成 batch(这里 batch_size=2),并在 training_step 阶段调用 _prepare_inputs + compute_loss
    • _prepare_inputs 里做 prompt -> completions -> rewards -> advantage;
    • compute_loss 里做 PPO/GRPO formula, loss.backward()
    • 这样就完成了一次 RL 迭代。实际项目中你会迭代多 epochs / steps。

总结

通过这个示例,我们把过程监督融入到 GRPO 流程中,包括对每个 step 进行单独打分、对参考模型做 KL 惩罚,以及把分数合并为 advantage。虽然这里做了很多简化(例如 _sample_completions 是伪造的),但核心思路完全一致:

  1. 识别中间步骤Step i:标签),
  2. 对每个步骤打分(过程监督),
  3. 将分数映射到 token(或直接作为回答整体分数),
  4. 做组内比较(Group advantage)并结合 KL 来更新策略模型。

实际中,你可以:

  • 用真正的 _sample_completions(即 model.generate)或在线采样;
  • 用真实的“过程Reward模型”对中间步骤进行评估;
  • 对 steps -> token 做更精细的映射;
  • 保存/加载参考模型 old_model 做严格的 PPO ratio 计算。

这样就可以构建一个完整的“Process Supervision RL with GRPO”训练流水线。希望本示例能帮助你更好地落地过程监督的做法,让大模型在多步骤推理、代码生成、复杂任务中获得更好的训练与表现。

后记

2025年2月22日22点07分于上海。在GPT o1大模型辅助下完成。

相关文章:

  • MT7628基于原厂的SDK包, 修改ra1网卡的MAC方法。
  • 【ORB-SLAM3】鲁棒核函数的阈值设置
  • docker-rss:容器更新的RSS订阅源
  • 卷积与动态特征选择:重塑YOLOv8的多尺度目标检测能力
  • 商汤绝影发布全新端到端自动驾驶技术路线R-UniAD
  • 【Python爬虫(49)】分布式爬虫:在新兴技术浪潮下的蜕变与展望
  • 从0开始:OpenCV入门教程【图像处理基础】
  • 【网络】高级IO
  • sklearn中的决策树
  • Java子类调用父类构造器的应用场景
  • STM32-有关内存堆栈、map文件
  • ROS2 应用:按键控制 MoveIt2 中 Panda 机械臂关节位置
  • golang内存泄漏
  • 下载CentOS 10
  • 探索未知:alpha测试的神秘序章【量化理论】
  • 模块化设计的力量:从「乐高积木」看组合式开发如何降低软件工程风险
  • SpringCloud系列教程:微服务的未来(二十五)-基于注解的声明队列交换机、消息转换器、业务改造
  • Ubuntu 查看mysql用户和数据库
  • 使用postman测试api接口基本步骤
  • 【Python】Python顺序语句经典题合集
  • 体验中国传统文化、采购非遗文创,波兰游客走进上海市群艺馆
  • 中俄就应对美加征所谓“对等关税”等问题进行深入交流
  • 金地集团:今年前4个月实现销售额109.3亿元,同比下降52.44%
  • 读图|展现城市品格,上海城市影像走进南美
  • 中科院院士魏辅文已卸任江西农业大学校长
  • 国家矿山安全监察局发布《煤矿瓦斯防治能力评估办法》