【自然语言处理】基于生成式语言模型GPT
目录
一、引言
二、项目结构
三、核心文件内容
1. README.md
快速开始
步骤 1:训练 BPE 分词器
步骤 2:预训练 GPT 模型
步骤 3:微调(SFT+RLHF)
步骤 4:启动推理服务
步骤 5:调用推理接口
性能评估
部署优化
2. requirements.txt
步骤 1:创建并激活虚拟环境
Windows 系统(PowerShell / 命令提示符)
Linux/macOS 系统(终端)
步骤 2:通过清华镜像源安装模块
核心说明:
安装命令(激活虚拟环境后执行):
步骤 3:验证安装
退出虚拟环境
注意事项
3. byte_level_bpe.py
核心功能概述
详细功能拆解
1. 初始化(__init__方法)
2. 字节编码表构建(_build_byte_encoder方法)
3. 文本转字节级 token(_bytes_to_tokens方法)
4. BPE 训练(train方法)
5. 文本编码(encode方法)
6. 文本解码(decode方法)
7. 分词器持久化与加载(save和from_pretrained方法)
核心优势
适用场景
4. transformer_components.py
整体功能概述
详细功能拆解
1. MultiHeadAttention(多头自注意力)
初始化参数
核心属性
forward方法(核心逻辑)
2. FeedForwardNetwork(前馈网络)
初始化参数
结构与功能
3. TransformerDecoderLayer(Transformer 解码器层)
初始化参数
核心属性
forward方法(核心逻辑)
核心设计亮点
应用场景
5. gpt_model.py
整体架构概述
组件功能详解(__init__ 方法)
1. 输入嵌入模块
2. 解码器层堆叠(decoder_layers)
3. 输出层
4. 参数初始化(_init_weights)
训练流程(forward 方法)
输入参数
流程步骤
推理流程(generate 方法)
输入参数
生成流程
核心设计亮点
6. data_loader.py
核心数据集类功能
1. GPTPretrainDataset(预训练数据集,CLM 任务)
2. GPTSFTDataset(监督微调数据集,SFT 任务)
3. GPTPPODataset(RLHF-PPO 数据集,偏好对齐任务)
数据加载器函数功能
1. create_pretrain_dataloader
2. create_sft_dataloader
模块整体作用
7. pretrain_trainer.py
整体功能概述
核心组件详解(GPTTrainer类)
1. 初始化(__init__方法)
2. 学习率调度器(_create_scheduler方法)
3. 单步训练(_train_step方法)
4. 单步验证(_val_step方法)
5. 模型保存(save_model方法)
6. 断点续训(load_checkpoint方法)
7. 主训练循环(train方法)
预训练入口(main函数)
核心设计亮点
总结
8. finetune.py
整体流程与核心目标
核心组件详解
1. sft_finetune函数(监督微调)
输入参数(关键)
核心逻辑
2. RewardModel类(奖励模型)
结构设计
forward方法(核心逻辑)
3. train_reward_model函数(训练奖励模型)
输入参数(关键)
核心逻辑
4. rlhf_ppo函数(PPO 优化)
核心概念
输入参数(关键)
核心逻辑
5. main函数(微调入口)
逻辑
工程化设计亮点
总结
9. evaluator.py
核心评估类:GPTEvaluator
1. 初始化(__init__方法)
2. 困惑度计算(compute_perplexity方法)
3. 预测生成(generate_predictions方法)
4. BLEU 分数计算(compute_bleu方法)
5. ROUGE 分数计算(compute_rouge方法)
6. 预训练模型评估(evaluate_pretrained方法)
7. SFT 模型评估(evaluate_sft方法)
评估入口(main函数)
核心功能总结
10. inference/quantizer.py
核心量化原理
核心函数详解
1. quantize_model_int8(模型 INT8 量化)
2. int8_linear_forward(INT8 线性层前向传播)
3. enable_int8_inference(启用 INT8 推理)
4. save_quantized_model(保存量化模型)
5. load_quantized_model(加载量化模型)
量化示例(main函数)
核心价值与应用场景
总结
11. inference/kv_cache_infer.py
核心原理:为什么需要 KV 缓存?
核心组件详解
1. KVCache类(KV 缓存管理器)
初始化(__init__)
缓存更新(update)
缓存重置(reset)
2. patched_attention_forward函数(带 KV 缓存的注意力前向传播)
输入参数扩展
核心逻辑(关键修改点)
3. enable_kv_cache_inference函数(为模型启用 KV 缓存)
核心逻辑(闭包技巧)
4. generate_with_kv_cache函数(带 KV 缓存的文本生成)
核心流程
核心优势与应用场景
总结
12. inference/api_service.py
核心功能概述
核心组件详解
1. 服务初始化与依赖配置
2. 请求体模型(GenerateRequest)
3. 模型与分词器加载(load_model_and_tokenizer函数)
4. 服务启动钩子(startup_event函数)
5. 健康检查接口(/health)
6. 文本生成接口(/generate)
7. 服务启动入口(主函数)
工程化设计亮点
使用场景
总结
13. inference/load_test.py
核心依赖与定位
核心组件详解
1. 测试用例池(PROMPTS列表)
2. 压测用户类(GPTInferenceUser)
(1)请求间隔配置(wait_time)
(2)核心业务压测任务(test_generate方法)
(3)基础监控压测任务(test_health方法)
3. 压测执行说明(注释部分)
压测输出与核心价值
1. 压测过程中可观察的指标
2. 核心价值
总结
14. docker/Dockerfile
核心目标
逐行功能解析
1. 基础镜像选择
2. 环境变量配置
3. 安装系统依赖
4. 配置 Python 默认版本
5. 安装 Python 依赖
6. 设置工作目录
7. 复制项目文件
8. 暴露服务端口
9. 启动服务命令
构建与运行流程
核心价值
总结
15. docker/docker-compose.yaml
核心目标
配置版本与结构
服务配置(services)
1. 镜像构建配置(build)
2. 镜像名称(image)
3. 容器名称(container_name)
4. 重启策略(restart)
5. 端口映射(ports)
6. 数据卷挂载(volumes)
7. GPU 资源配置(deploy.resources)
8. 环境变量(environment)
使用流程
核心价值
总结
四、训练数据集示例
data/pretrain/train.txt(训练集,示例 50 条,实际可扩展至数万 / 数百万条)
data/pretrain/val.txt(验证集,示例 10 条)
五、总结
一、引言
受到计算机视觉领域采用 ImageNet 对模型进行一次预训练,使模型可以通过海量图像充分学习如何提取特征,再根据任务目标进行模型微调的范式影响,自然语言处理领域基于预训练语言模型的方法也逐渐成为主流。以 ELMo 为代表的动态词向量模型开启了语言模型预训练的大门,此后,以 GPT 和 BERT 为代表的基于 Transformer 的大规模预训练语言模型的出现,使自然语言处理全面进入了预训练微调范式新时代。利用丰富的训练数据、自监督的预训练任务及 Transformer 等深度神经网络结构,预训练语言模型具备了通用且强大的自然语言表示能力,能够有效地学习到词汇、语法和语义信息。将预训练模型应用于下游任务时,不需要了解太多的任务细节,不需要设计特定的神经网络结构,只需要 “微调” 预训练模型,即使用具体任务的标注数据在预训练语言模型上进行监督训练,就可以获得显著的性能提升。2018 年,OpenAI 公司提出的生成式预训练语言模型(Generative Pre-Training, GPT)是典型的生成式预训练语言模型之一。GPT 的模型结构如图所示,它是由多层 Transformer 组成的单向语言模型,主要分为输入层、编码层和输出层三个部分。

本文的项目源代码来自于我的GitHub仓库:https://github.com/hongyuxu0/gpt
二、项目结构
gpt-reproduction/
├── README.md # 项目说明文档
├── requirements.txt # 依赖清单
├── byte_level_bpe.py # Byte-level BPE分词器实现
├── transformer_components.py # Transformer核心组件(多头注意力、FFN等)
├── gpt_model.py # GPT模型整体定义(输入嵌入+解码器堆叠)
├── data_loader.py # 数据预处理与加载(支持流式加载)
├── pretrain_trainer.py # 预训练流程实现(CLM目标+分布式支持)
├── finetune.py # 微调流程(SFT+RLHF)
├── evaluator.py # 性能评估工具(PPL、BLEU、ROUGE等)
├── inference/ # 推理与部署优化
│ ├── quantizer.py # 模型量化(INT8)
│ ├── kv_cache_infer.py # 带KV缓存的推理实现
│ ├── api_service.py # FastAPI推理服务
│ └── load_test.py # Locust压测脚本
├── docker/ # 容器化部署配置
│ ├── Dockerfile # 镜像构建文件
│ └── docker-compose.yml # 容器编排配置
├── data/ # 数据目录(示例结构)
│ ├── pretrain/ # 预训练数据(每行1条文本)
│ │ ├── train.txt
│ │ └── val.txt
│ └── sft/ # 微调数据
│ ├── sft_data.json # SFT指令-响应对
│ └── rlhf_pairwise.json # RLHF pairwise数据
└── models/ # 模型权重目录(示例结构)├── bpe_tokenizer/ # 训练好的BPE分词器├── pretrain_best.pth # 预训练最佳模型├── sft_best.pth # SFT微调模型└── ppo_best.pth # RLHF优化模型
三、核心文件内容
1. README.md
# GPT模型复现与部署框架本项目是一个基于Transformer解码器架构的GPT模型全流程实现,涵盖从**数据预处理、模型训练(预训练+微调)、性能评估到工程化部署**的完整链路。支持Byte-level BPE分词、自回归文本生成、模型量化优化及容器化服务部署,可作为基础框架快速适配不同场景的文本生成任务。## 核心功能- **Byte-level BPE分词器**:支持自定义词表大小,适配多语言文本编码。
- **Decoder-only Transformer**:实现标准GPT架构(多头注意力+前馈网络+残差连接)。
- **全流程训练**:- 预训练:基于因果语言模型(CLM)目标,支持大规模文本续训。- 微调:包含监督微调(SFT)和人类反馈强化学习(RLHF)。
- **推理优化**:- INT8量化:减少75%显存占用,保持生成质量。- KV缓存:推理速度提升2-3倍,适合实时交互场景。
- **工程化部署**:提供FastAPI接口和Docker容器配置,支持GPU加速。## 环境要求| 类别 | 具体要求 |
|------------|-----------------------------------|
| 硬件 | NVIDIA GPU(≥12GB显存,支持CUDA)|
| 系统 | Ubuntu 20.04/22.04 或 WSL2 |
| 软件依赖 | Python 3.9、CUDA 11.7、PyTorch 1.13+ |
| 可选工具 | Docker、Docker Compose(容器化) |## 快速开始### 1. 克隆仓库并安装依赖```bash
# 克隆代码
git clone https://github.com/hongyuxu0/gpt.git
cd gpt# 安装依赖(国内源加速)
pip install -r requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
快速开始
步骤 1:训练 BPE 分词器
from byte_level_bpe import ByteLevelBPETokenizer
import os# 加载预训练文本
with open("data/pretrain/train.txt", "r", encoding="utf-8") as f:texts = [line.strip() for line in f if line.strip()]# 训练分词器(词表大小50k)
tokenizer = ByteLevelBPETokenizer(vocab_size=50000)
tokenizer.train(texts=texts, min_frequency=2)# 保存分词器
os.makedirs("models/bpe_tokenizer", exist_ok=True)
tokenizer.save("models/bpe_tokenizer")
步骤 2:预训练 GPT 模型
python pretrain_trainer.py \--train_data_dir "data/pretrain/train" \--val_data_dir "data/pretrain/val" \--tokenizer_dir "models/bpe_tokenizer" \--output_dir "models/pretrain" \--batch_size 8 \--max_seq_len 2048 \--epochs 10 \--lr 2e-4
步骤 3:微调(SFT+RLHF)
# SFT微调
python finetune.py \--mode sft \--data_path "data/sft/sft_data.json" \--pretrained_model "models/pretrain/pretrain_best.pth" \--output_dir "models/sft" \--epochs 3 \--lr 3e-5# RLHF优化
python finetune.py \--mode rlhf \--pairwise_data "data/sft/rlhf_pairwise.json" \--sft_model "models/sft/sft_best.pth" \--output_dir "models/rlhf" \--ppo_epochs 5
步骤 4:启动推理服务
# 本地启动FastAPI服务
uvicorn inference/api_service:app --host 0.0.0.0 --port 8000 --workers 4# 或用Docker启动
cd docker
docker-compose up -d
步骤 5:调用推理接口
curl -X POST http://localhost:8000/generate \-H "Content-Type: application/json" \-d '{"prefix": "解释什么是机器学习","max_gen_len": 100,"top_k": 50,"temperature": 0.8}'
性能评估
from evaluator import GPTEvaluator
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizer# 加载模型和分词器
tokenizer = ByteLevelBPETokenizer.from_pretrained("models/bpe_tokenizer")
model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=768,n_layers=12,n_heads=12
)
model.load_state_dict(torch.load("models/ppo_best.pth")["model_state_dict"])# 初始化评估器
evaluator = GPTEvaluator(model, tokenizer)# 计算困惑度
test_loader = ... # 构建测试数据加载器
ppl = evaluator.compute_perplexity(test_loader)# 生成预测并评估BLEU/ROUGE
predictions = evaluator.generate_predictions(test_prefixes)
bleu = evaluator.compute_bleu(predictions, references)
rouge = evaluator.compute_rouge(predictions, references)
部署优化
- INT8 量化:减少 75% 显存占用,见
inference/quantizer.py - KV 缓存:推理速度提升 2.5 倍,见
inference/kv_cache_infer.py - 容器化:支持 GPU 部署,见
docker/目录
2. requirements.txt
torch==1.13.1+cu117
torchvision==0.14.1+cu117
torchaudio==0.13.1
fastapi==0.103.1
uvicorn==0.23.2
pydantic==2.3.0
onnx==1.14.0
onnxruntime-gpu==1.15.1
nltk==3.8.1
rouge==1.0.1
tqdm==4.66.1
docker==6.1.3
locust==2.15.1
numpy==1.24.3
步骤 1:创建并激活虚拟环境
Windows 系统(PowerShell / 命令提示符)
# 创建虚拟环境(名称为gpt-env,可自定义)
python -m venv gpt-env# 激活虚拟环境
gpt-env\Scripts\activate
# 激活后终端前缀会显示 (gpt-env),表示进入虚拟环境
Linux/macOS 系统(终端)
# 创建虚拟环境
python3 -m venv gpt-env# 激活虚拟环境
source gpt-env/bin/activate
# 激活后终端前缀会显示 (gpt-env)
步骤 2:通过清华镜像源安装模块
核心说明:
- PyTorch 相关包(
torch、torchvision、torchaudio)需要指定 CUDA 11.7 版本,清华镜像同步了 PyTorch 的官方 whl 包,可直接使用。 - 其他包通过清华 PyPI 镜像安装,速度更快。
安装命令(激活虚拟环境后执行):
# 安装PyTorch系列(指定CUDA 11.7版本,使用清华PyTorch镜像)
pip install torch==1.13.1+cu117 torchvision==0.14.1+cu117 torchaudio==0.13.1 \-f https://mirrors.tuna.tsinghua.edu.cn/pytorch-wheels/cu117/torch_stable.html \-i https://pypi.tuna.tsinghua.edu.cn/simple# 安装其他模块(使用清华PyPI镜像)
pip install \fastapi==0.103.1 \uvicorn==0.23.2 \pydantic==2.3.0 \onnx==1.14.0 \onnxruntime-gpu==1.15.1 \nltk==3.8.1 \rouge==1.0.1 \tqdm==4.66.1 \docker==6.1.3 \locust==2.15.1 \-i https://pypi.tuna.tsinghua.edu.cn/simple
步骤 3:验证安装
安装完成后,可通过以下命令验证核心模块是否安装成功:
# 检查PyTorch版本和CUDA是否可用
python -c "import torch; print('PyTorch版本:', torch.__version__); print('CUDA可用:', torch.cuda.is_available())"# 检查FastAPI等模块
python -c "import fastapi; import uvicorn; print('FastAPI版本:', fastapi.__version__)"
若输出中CUDA可用: True且版本正确,说明安装成功。
退出虚拟环境
完成工作后,可通过以下命令退出虚拟环境:
deactivate
注意事项
- 若安装过程中出现超时,可重复执行安装命令(
pip会自动跳过已安装的包)。 - 若清华镜像源临时不可用,可替换为其他镜像源(如阿里云:
https://mirrors.aliyun.com/pypi/simple/)。 - 确保本地已安装 CUDA 11.7(可通过
nvcc --version检查),否则torch.cuda.is_available()会返回False。
3. byte_level_bpe.py
import json
import re
from collections import defaultdict, Counter
from typing import List, Dict, Tupleclass ByteLevelBPETokenizer:def __init__(self, vocab_size: int = 50000, special_tokens: Dict[str, int] = None):"""初始化Byte-level BPE分词器"""self.vocab_size = vocab_sizeself.special_tokens = special_tokens or {"<s>": 0,"<pad>": 1,"</s>": 2,"<unk>": 3}self.vocab: Dict[str, int] = self.special_tokens.copy()self.merges: Dict[Tuple[str, str], str] = {} # (a,b) -> abself.reverse_vocab: Dict[int, str] = {v: k for k, v in self.vocab.items()}self.byte_encoder = self._build_byte_encoder()def _build_byte_encoder(self) -> Dict[int, str]:"""构建字节到字符串的映射(Byte-level编码基础)"""byte_encoder = {}for i in range(256):if i < 32 or i > 126:byte_encoder[i] = f"<0x{i:02X}>"else:byte_encoder[i] = chr(i)return byte_encoderdef _bytes_to_tokens(self, text: str) -> List[str]:"""将文本转换为字节级token列表"""bytes_list = text.encode("utf-8")return [self.byte_encoder[b] for b in bytes_list]def train(self, texts: List[str], min_frequency: int = 2) -> None:"""训练BPE分词器"""# 1. 初始化基础词表(字节级)token_counts = defaultdict(int)for text in texts:tokens = self._bytes_to_tokens(text)for token in tokens:token_counts[token] += 1# 添加字节级token到词表next_id = len(self.vocab)for token in token_counts:if token not in self.vocab:self.vocab[token] = next_idself.reverse_vocab[next_id] = tokennext_id += 1if next_id >= self.vocab_size:break# 2. 迭代合并高频子词对current_tokens = {text: self._bytes_to_tokens(text) for text in texts}while next_id < self.vocab_size:# 统计子词对频率pair_counts = defaultdict(int)for tokens in current_tokens.values():for i in range(len(tokens) - 1):pair = (tokens[i], tokens[i + 1])pair_counts[pair] += 1# 过滤低频对子valid_pairs = {p: c for p, c in pair_counts.items() if c >= min_frequency}if not valid_pairs:break # 无更多有效合并# 选择频率最高的对子best_pair = max(valid_pairs, key=valid_pairs.get)merged_token = "".join(best_pair)# 更新词表和合并规则self.merges[best_pair] = merged_tokenself.vocab[merged_token] = next_idself.reverse_vocab[next_id] = merged_tokennext_id += 1# 更新所有文本的token序列for text in current_tokens:tokens = current_tokens[text]new_tokens = []i = 0while i < len(tokens):if i < len(tokens) - 1 and (tokens[i], tokens[i + 1]) == best_pair:new_tokens.append(merged_token)i += 2else:new_tokens.append(tokens[i])i += 1current_tokens[text] = new_tokensprint(f"BPE训练完成:词表大小={len(self.vocab)},合并规则数={len(self.merges)}")def encode(self, text: str, max_seq_len: int = 2048, add_special_tokens: bool = True) -> Dict[str, List[int]]:"""文本编码(返回input_ids和attention_mask)"""# 1. 字节级转换与合并tokens = self._bytes_to_tokens(text)i = 0while i < len(tokens) - 1:pair = (tokens[i], tokens[i + 1])if pair in self.merges:tokens = tokens[:i] + [self.merges[pair]] + tokens[i + 2:]else:i += 1# 2. 添加特殊tokenif add_special_tokens:tokens = ["<s>"] + tokens + ["</s>"]# 3. 转换为ID并处理长度input_ids = [self.vocab.get(token, self.vocab["<unk>"]) for token in tokens]if len(input_ids) > max_seq_len:input_ids = input_ids[:max_seq_len] # 截断elif len(input_ids) < max_seq_len:input_ids += [self.vocab["<pad>"]] * (max_seq_len - len(input_ids)) # 填充# 4. 生成attention_mask(0=pad)attention_mask = [1 if id != self.vocab["<pad>"] else 0 for id in input_ids]return {"input_ids": input_ids, "attention_mask": attention_mask}def decode(self, input_ids: List[int]) -> str:"""ID序列解码为文本"""tokens = []for id in input_ids:token = self.reverse_vocab.get(id, "<unk>")if token in ["<s>", "</s>", "<pad>"]:continuetokens.append(token)# 字节级token还原为原始字节byte_str = ""for token in tokens:if token.startswith("<0x") and token.endswith(">"):byte_str += chr(int(token[3:-1], 16))else:byte_str += tokenreturn byte_strdef save(self, save_dir: str) -> None:"""保存词表和合并规则"""import osos.makedirs(save_dir, exist_ok=True)with open(f"{save_dir}/vocab.json", "w", encoding="utf-8") as f:json.dump(self.vocab, f, ensure_ascii=False, indent=2)with open(f"{save_dir}/merges.json", "w", encoding="utf-8") as f:json.dump({f"{k[0]},{k[1]}": v for k, v in self.merges.items()}, f, ensure_ascii=False, indent=2)@classmethoddef from_pretrained(cls, save_dir: str) -> "ByteLevelBPETokenizer":"""加载预训练分词器"""with open(f"{save_dir}/vocab.json", "r", encoding="utf-8") as f:vocab = json.load(f)with open(f"{save_dir}/merges.json", "r", encoding="utf-8") as f:merges_dict = json.load(f)merges = {tuple(k.split(",")): v for k, v in merges_dict.items()}tokenizer = cls()tokenizer.vocab = vocabtokenizer.merges = mergestokenizer.reverse_vocab = {v: k for k, v in vocab.items()}return tokenizer
该代码实现了一个字节级 BPE(Byte-Level Byte Pair Encoding)分词器,是 GPT 等大语言模型中常用的子词分词方案。它能将文本分解为字节级基础单元,再通过合并高频子词对生成更高效的 token,支持任意字符(包括多语言、特殊符号)的编码,避免传统分词器的 “未登录词” 问题。
核心功能概述
该类的核心作用是:将原始文本转换为模型可识别的整数 ID 序列(编码),并能将 ID 序列还原为原始文本(解码),同时支持通过文本数据训练自定义分词器(学习子词合并规则)。
详细功能拆解
1. 初始化(__init__方法)
定义分词器的基础配置,包括词表大小、特殊 token、核心数据结构等:
vocab_size:目标词表大小(默认 50000),训练时会尽量生成该规模的词表。special_tokens:特殊符号(如<s>(句首)、<pad>(填充)、</s>(句尾)、<unk>(未知词)),默认映射 ID 为 0-3,是模型训练必需的控制符号。- 核心数据结构:
vocab:{token: id}映射,存储所有 token 及其对应的整数 ID(初始包含特殊 token)。merges:{(子词对): 合并后的token}映射,记录训练过程中学习到的子词合并规则(如("a", "b") → "ab")。reverse_vocab:{id: token}映射,用于从 ID 反查 token(解码时用)。byte_encoder:字节到字符串的映射表(由_build_byte_encoder生成),是字节级 BPE 的基础。
2. 字节编码表构建(_build_byte_encoder方法)
构建 “字节→字符串” 的映射,确保所有字节(0-255)都能被表示为字符串 token:
- 对于 ASCII 可打印字符(32-126,如字母、数字、标点):直接映射为对应的字符(如字节
0x61→"a")。 - 对于不可打印字符(如控制字符、多字节字符的组成部分):用
<0xXX>格式表示(如字节0xA0→<0xA0>)。
作用:将文本的原始字节(utf-8 编码后)转换为可处理的字符串 token,为后续 BPE 合并提供基础单元。
3. 文本转字节级 token(_bytes_to_tokens方法)
将输入文本转换为初始的字节级 token 列表:
- 步骤:文本→utf-8 编码为字节列表→通过
byte_encoder转换为字符串 token 列表。 - 示例:文本
"a你"(utf-8 编码为字节[0x61, 0xE4, 0xBD, 0xA0])→ 转换为 token 列表["a", "<0xE4>", "<0xBD>", "<0xA0>"]。
作用:将任意文本拆解为最基础的字节级 token,避免未登录词(任何字符都能拆分为字节)。
4. BPE 训练(train方法)
通过输入文本数据学习子词合并规则,生成目标大小的词表(核心功能):
-
步骤 1:初始化基础词表统计所有文本中字节级 token 的出现频率,将这些 token 添加到
vocab中(直到达到初始词表规模)。 -
步骤 2:迭代合并高频子词对循环执行以下操作,直到词表达到
vocab_size:- 统计所有文本中相邻子词对(如
(token1, token2))的出现频率。 - 过滤掉频率低于
min_frequency的对子(避免合并罕见组合)。 - 选择频率最高的子词对进行合并,生成新 token(如
("a", "b")合并为"ab")。 - 更新
merges(记录合并规则)和vocab(添加新 token 及其 ID)。 - 用新合并规则更新所有文本的 token 序列(将所有
(a,b)替换为ab)。
- 统计所有文本中相邻子词对(如
-
示例:初始 token 列表
["a", "a", "a"]→ 统计到("a", "a")频率最高→合并为"aa"→ 新 token 列表["aa", "a"]。
作用:通过合并高频子词对,平衡词表大小和 token 粒度(既避免单字节 token 过多导致序列过长,又通过子词覆盖常见组合)。
5. 文本编码(encode方法)
将原始文本转换为模型输入的input_ids(整数 ID 序列)和attention_mask(掩码):
-
步骤 1:字节级转换与合并先通过
_bytes_to_tokens将文本转为字节级 token,再根据merges规则合并子词对(重复查找可合并的子词对并替换),得到最终 token 序列。 -
步骤 2:添加特殊 token若
add_special_tokens=True,在 token 序列前后添加<s>(句首)和</s>(句尾)。 -
步骤 3:长度处理若 token 序列长度超过
max_seq_len,截断到该长度;若不足,用<pad>填充至该长度。 -
步骤 4:生成掩码
attention_mask中,有效 token(非<pad>)对应 1,填充的<pad>对应 0(模型会忽略掩码为 0 的位置)。 -
输出:
{"input_ids": [id1, id2, ...], "attention_mask": [1, 1, 0, ...]}
作用:将文本转换为模型可直接输入的格式(整数 ID + 掩码)。
6. 文本解码(decode方法)
将input_ids(整数 ID 序列)还原为原始文本:
-
步骤 1:ID 转 token通过
reverse_vocab将每个 ID 转换为对应的 token,跳过特殊 token(<s>、</s>、<pad>)。 -
步骤 2:token 还原为字节对于
<0xXX>格式的 token,转换为对应的字节(如<0x61>→字节0x61);其他 token 直接拼接为字符串。 -
步骤 3:字节转文本将所有字节拼接后,通过 utf-8 解码为原始文本。
作用:将模型输出的 ID 序列还原为人类可读懂的文本。
7. 分词器持久化与加载(save和from_pretrained方法)
save(save_dir):将vocab(词表)和merges(合并规则)分别保存为vocab.json和merges.json,实现分词器的持久化。from_pretrained(save_dir):从保存的vocab.json和merges.json加载数据,重建分词器实例(训练后可复用)。
作用:训练好的分词器可保存到本地,后续推理或微调时直接加载,避免重复训练。
核心优势
- 全字符覆盖:基于字节级处理,支持任意语言(中文、英文、多语言混合)和特殊符号,无 “未登录词” 问题。
- 动态子词学习:通过训练自动学习高频子词组合,平衡 token 粒度和序列长度(比单字 / 单字节更高效)。
- 适配大模型:输出格式(
input_ids+attention_mask)直接适配 GPT 等 Transformer 模型的输入要求。
适用场景
- GPT、LLaMA 等自回归语言模型的文本预处理(编码 / 解码)。
- 多语言文本生成任务(支持任意字符编码)。
- 需要自定义词表的场景(可通过调整
vocab_size和训练数据生成特定领域的分词器)。
综上,该类完整实现了字节级 BPE 分词器的 “训练 - 编码 - 解码 - 持久化” 全流程,是大语言模型工程实现中不可或缺的核心组件。
4. transformer_components.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import Optionalclass MultiHeadAttention(nn.Module):def __init__(self, d_model: int = 768, n_heads: int = 12, dropout: float = 0.1):"""多头自注意力(Decoder-only核心)"""super().__init__()assert d_model % n_heads == 0, f"d_model={d_model}需被n_heads={n_heads}整除"self.d_model = d_modelself.n_heads = n_headsself.d_k = d_model // n_heads # 单头维度# 线性投影层(Q/K/V共享权重)self.w_q = nn.Linear(d_model, d_model)self.w_k = nn.Linear(d_model, d_model)self.w_v = nn.Linear(d_model, d_model)self.w_o = nn.Linear(d_model, d_model)self.dropout = nn.Dropout(dropout)self.scale = torch.sqrt(torch.tensor(self.d_k, dtype=torch.float32)) # 缩放因子def forward(self,x: torch.Tensor,attention_mask: Optional[torch.Tensor] = None,causal_mask: bool = True # 因果掩码(防止未来信息泄露)) -> torch.Tensor:""":param x: 输入张量 (batch_size, seq_len, d_model):param attention_mask: padding掩码 (batch_size, 1, seq_len)"""batch_size = x.shape[0]# 1. 线性投影 + 多头拆分 (batch_size, n_heads, seq_len, d_k)q = self.w_q(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)k = self.w_k(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)v = self.w_v(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)# 2. 计算注意力得分(缩放点积)attn_scores = torch.matmul(q, k.transpose(-2, -1)) / self.scale # (batch_size, n_heads, seq_len, seq_len)# 3. 应用因果掩码(下三角可见)if causal_mask:seq_len = x.shape[1]mask = torch.tril(torch.ones(seq_len, seq_len, device=x.device)).bool()attn_scores = attn_scores.masked_fill(~mask, -1e9) # 不可见位置设为-∞# 4. 应用padding掩码if attention_mask is not None:attn_scores = attn_scores.masked_fill(attention_mask.unsqueeze(1) == 0, -1e9)# 5. Softmax + Dropout + 加权求和attn_weights = F.softmax(attn_scores, dim=-1)attn_weights = self.dropout(attn_weights)attn_output = torch.matmul(attn_weights, v) # (batch_size, n_heads, seq_len, d_k)# 6. 多头合并 + 输出投影attn_output = attn_output.transpose(1, 2).contiguous() # (batch_size, seq_len, n_heads, d_k)attn_output = attn_output.view(batch_size, -1, self.d_model) # (batch_size, seq_len, d_model)return self.w_o(attn_output)class FeedForwardNetwork(nn.Module):def __init__(self, d_model: int = 768, d_ff: int = 3072, dropout: float = 0.1):"""前馈网络(d_ff=4*d_model)"""super().__init__()self.layers = nn.Sequential(nn.Linear(d_model, d_ff),nn.GELU(), # GPT-2及后续版本使用GELU激活nn.Dropout(dropout),nn.Linear(d_ff, d_model),nn.Dropout(dropout))def forward(self, x: torch.Tensor) -> torch.Tensor:return self.layers(x)class TransformerDecoderLayer(nn.Module):def __init__(self, d_model: int = 768, n_heads: int = 12, d_ff: int = 3072, dropout: float = 0.1):"""单个Transformer解码器层(残差连接+层归一化)"""super().__init__()self.norm1 = nn.LayerNorm(d_model, eps=1e-6) # 预归一化(GPT-2采用)self.attn = MultiHeadAttention(d_model, n_heads, dropout)self.norm2 = nn.LayerNorm(d_model, eps=1e-6)self.ffn = FeedForwardNetwork(d_model, d_ff, dropout)self.dropout = nn.Dropout(dropout)def forward(self,x: torch.Tensor,attention_mask: Optional[torch.Tensor] = None) -> torch.Tensor:"""预归一化流程:LayerNorm → Attention/FFN → 残差连接"""# 1. 自注意力子层x_norm1 = self.norm1(x)attn_output = self.attn(x_norm1, attention_mask, causal_mask=True)x = x + self.dropout(attn_output)# 2. 前馈子层x_norm2 = self.norm2(x)ffn_output = self.ffn(x_norm2)x = x + self.dropout(ffn_output)return x
该代码实现了Transformer 解码器的核心组件,是 GPT 等自回归语言模型的基础结构。它包含三个核心类:MultiHeadAttention(多头自注意力)、FeedForwardNetwork(前馈网络)和TransformerDecoderLayer(Transformer 解码器层),共同实现了对序列数据的上下文建模能力。
整体功能概述
Transformer 解码器是自回归语言模型(如 GPT)的核心,其作用是基于输入序列的历史信息(前文)预测下一个 token。这段代码通过 “多头注意力捕捉上下文依赖”+“前馈网络增强特征表达”+“残差连接与层归一化稳定训练” 的组合,实现了对序列的深度建模。
详细功能拆解
1. MultiHeadAttention(多头自注意力)
自注意力机制是 Transformer 的核心,用于计算序列中每个位置与其他位置的依赖关系(“关注” 重要的上下文)。“多头” 则是将注意力并行拆分到多个子空间,捕捉不同类型的依赖关系。
初始化参数
d_model:模型隐藏层维度(默认 768,GPT-2 基础版的维度)。n_heads:注意力头数(默认 12,需满足d_model % n_heads == 0,确保每个头的维度均匀)。dropout:注意力权重的 dropout 概率(默认 0.1,防止过拟合)。
核心属性
d_k:单个注意力头的维度(d_model // n_heads,如 768//12=64)。- 线性投影层:
w_q、w_k、w_v分别将输入映射为查询(Q)、键(K)、值(V);w_o将多头输出合并为d_model维度。 scale:缩放因子(√d_k),用于缓解注意力得分过大导致的 softmax 梯度消失问题。
forward方法(核心逻辑)
输入:x(序列特征,形状(batch_size, seq_len, d_model))、attention_mask(padding 掩码)、causal_mask(因果掩码,默认启用)。
步骤解析:
-
Q/K/V 投影与多头拆分通过
w_q、w_k、w_v将输入x线性投影为 Q、K、V,再拆分为n_heads个并行的注意力头,形状变为(batch_size, n_heads, seq_len, d_k)。例:输入(32, 100, 768)→投影后(32, 100, 768)→拆分多头后(32, 12, 100, 64)。 -
计算注意力得分(缩放点积)注意力得分公式:
attn_scores = (Q × K^T) / scale,形状为(batch_size, n_heads, seq_len, seq_len)。其中Q × K^T计算每个位置对其他所有位置的 “相关性”,除以scale(√64=8)避免得分过大导致 softmax 后梯度消失。 -
应用因果掩码(关键!)自回归模型(如 GPT)需要 “只能看到前文,不能看到后文”,因此通过下三角矩阵掩码实现:
- 生成
(seq_len, seq_len)的下三角矩阵(对角线及以下为True,以上为False)。 - 将掩码为
False的位置(未来信息)的注意力得分设为-1e9,softmax 后这些位置的权重趋近于 0。例:对于序列[x1, x2, x3],x3 只能关注 x1、x2,不能关注自身之后的位置。
- 生成
-
应用 padding 掩码若输入包含填充 token(如
<pad>),attention_mask中对应位置为 0,通过掩码将这些位置的注意力得分设为-1e9,避免模型关注无效填充。 -
计算注意力权重与输出
- 对
attn_scores做 softmax,得到注意力权重(每个位置对其他位置的关注比例)。 - 应用 dropout 防止过拟合。
- 注意力输出:
attn_output = 注意力权重 × V(加权求和 V 向量,形状(batch_size, n_heads, seq_len, d_k))。
- 对
-
多头合并与输出投影将多头输出拼接(合并为
d_model维度),通过w_o线性投影,最终输出形状为(batch_size, seq_len, d_model)。
2. FeedForwardNetwork(前馈网络)
前馈网络是对注意力输出的 “非线性变换模块”,用于增强模型对单个位置特征的表达能力。
初始化参数
d_model:输入 / 输出维度(与注意力层一致,768)。d_ff:中间层维度(默认 3072,通常为4×d_model,768×4=3072)。dropout:dropout 概率(默认 0.1)。
结构与功能
由两层线性网络 + 激活函数组成:Linear(d_model → d_ff) → GELU → Dropout → Linear(d_ff → d_model) → Dropout。
- GELU 激活:比 ReLU 更平滑的非线性函数(
GELU(x) = x × Φ(x),Φ 是高斯分布的累积分布函数),在 GPT-2 及后续模型中广泛使用,有助于训练稳定性。 - 作用:对每个位置的特征独立进行非线性变换(与序列长度无关),补充注意力层未捕捉到的局部特征。
3. TransformerDecoderLayer(Transformer 解码器层)
将 “多头自注意力” 和 “前馈网络” 组合成一个完整的解码器层,并通过残差连接和层归一化稳定训练。
初始化参数
- 继承
d_model、n_heads、d_ff、dropout,分别对应模型维度、头数、前馈中间层维度、dropout 概率。
核心属性
norm1/norm2:层归一化(LayerNorm),用于稳定输入分布(均值为 0,方差为 1)。attn:MultiHeadAttention实例(自注意力子层)。ffn:FeedForwardNetwork实例(前馈子层)。dropout:残差连接后的 dropout(增强泛化性)。
forward方法(核心逻辑)
输入:x(序列特征,(batch_size, seq_len, d_model))、attention_mask(padding 掩码)。
步骤解析(预归一化流程,GPT-2 采用):
-
自注意力子层
- 先对输入
x做层归一化(norm1),避免输入分布偏移影响注意力计算。 - 输入归一化后的特征到
attn(多头自注意力),得到注意力输出。 - 残差连接:
x = x + dropout(attn_output)(将原始输入与注意力输出相加,缓解深层网络梯度消失)。
- 先对输入
-
前馈子层
- 对注意力子层的输出做层归一化(
norm2)。 - 输入归一化后的特征到
ffn(前馈网络),得到前馈输出。 - 残差连接:
x = x + dropout(ffn_output)(再次保留原始特征,增强训练稳定性)。
- 对注意力子层的输出做层归一化(
最终输出:经过完整解码器层处理的序列特征(形状不变,(batch_size, seq_len, d_model))。
核心设计亮点
- 因果掩码:确保自回归特性(只能基于前文预测后文),是 GPT 等生成模型的关键。
- 多头注意力:并行捕捉不同子空间的上下文依赖(如语法依赖、语义依赖),增强模型表达能力。
- 预归一化:在进入子层(注意力 / 前馈)前做层归一化,比 “后归一化” 更利于深层网络训练(GPT-2 及后续模型的标准设计)。
- 残差连接:每个子层的输出与输入相加,避免深层网络中特征被 “稀释”,保证梯度有效传播。
应用场景
这些组件是构建 Transformer 解码器的基础,通过堆叠多个TransformerDecoderLayer(如 GPT-2 基础版堆叠 12 层),可形成完整的解码器,进而构建:
- 文本生成模型(如 GPT 系列):基于前文生成连贯的后续文本。
- 自回归语言模型:用于预训练(因果语言模型目标,CLM)和微调(指令跟随任务)。
综上,这段代码实现了 Transformer 解码器的核心功能模块,是自回归语言模型从 “序列输入” 到 “上下文特征建模” 的关键组件。
5. gpt_model.py
import torch
import torch.nn as nn
import torch.nn.functional as F
from transformer_components import TransformerDecoderLayer
from typing import Optional
from typing import List, Dict, Tupleclass GPTModel(nn.Module):def __init__(self,vocab_size: int = 50000,d_model: int = 768,n_layers: int = 12, # GPT-2 small=12n_heads: int = 12,d_ff: int = 3072,max_seq_len: int = 2048,dropout: float = 0.1,pad_token_id: int = 1):"""GPT模型整体定义(Decoder-only架构)"""super().__init__()self.d_model = d_modelself.pad_token_id = pad_token_id# 1. 输入嵌入层(词嵌入+位置嵌入)self.token_embedding = nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id)self.position_embedding = nn.Embedding(max_seq_len, d_model) # 可学习位置嵌入self.embedding_dropout = nn.Dropout(dropout)# 2. Transformer解码器堆叠self.decoder_layers = nn.ModuleList([TransformerDecoderLayer(d_model, n_heads, d_ff, dropout)for _ in range(n_layers)])# 3. 输出层(预测下一个token)self.norm_final = nn.LayerNorm(d_model, eps=1e-6)self.output_layer = nn.Linear(d_model, vocab_size, bias=False)# 输出层权重与词嵌入层共享(GPT优化策略)self.output_layer.weight = self.token_embedding.weight# 初始化参数self._init_weights()def _init_weights(self) -> None:"""参数初始化(Xavier均匀初始化)"""for module in self.modules():if isinstance(module, nn.Linear):nn.init.xavier_uniform_(module.weight)if module.bias is not None:nn.init.zeros_(module.bias)elif isinstance(module, nn.Embedding):nn.init.normal_(module.weight, mean=0.0, std=self.d_model ** -0.5)if module.padding_idx is not None:nn.init.zeros_(module.weight[module.padding_idx])def forward(self,input_ids: torch.Tensor,attention_mask: Optional[torch.Tensor] = None,labels: Optional[torch.Tensor] = None # 用于计算CLM损失) -> Dict[str, torch.Tensor]:""":param input_ids: (batch_size, seq_len):param attention_mask: (batch_size, seq_len):param labels: 标签(与input_ids同形,用于自回归损失计算)"""batch_size, seq_len = input_ids.shapedevice = input_ids.device# 1. 生成位置索引(0~seq_len-1)position_ids = torch.arange(seq_len, device=device).unsqueeze(0).repeat(batch_size, 1)# 2. 输入嵌入(词嵌入+位置嵌入)token_emb = self.token_embedding(input_ids) # (batch_size, seq_len, d_model)pos_emb = self.position_embedding(position_ids) # (batch_size, seq_len, d_model)x = self.embedding_dropout(token_emb + pos_emb)# 3. 处理attention_mask(适配多头注意力输入格式)if attention_mask is not None:attention_mask = attention_mask.unsqueeze(1) # (batch_size, 1, seq_len)# 4. 解码器堆叠前向传播for decoder_layer in self.decoder_layers:x = decoder_layer(x, attention_mask)# 5. 输出层计算logitsx = self.norm_final(x)logits = self.output_layer(x) # (batch_size, seq_len, vocab_size)# 6. 计算CLM损失(预训练目标)loss = Noneif labels is not None:# 自回归损失:用第i个token预测第i+1个token,故偏移一位shift_logits = logits[:, :-1, :].contiguous() # (batch_size, seq_len-1, vocab_size)shift_labels = labels[:, 1:].contiguous() # (batch_size, seq_len-1)# 忽略pad_token的损失loss_fct = nn.CrossEntropyLoss(ignore_index=self.pad_token_id)loss = loss_fct(shift_logits.view(-1, shift_logits.shape[-1]), shift_labels.view(-1))return {"logits": logits, "loss": loss}@torch.no_grad()def generate(self,input_ids: torch.Tensor,max_gen_len: int = 100,top_k: int = 50, # Top-K采样temperature: float = 1.0 # 温度系数) -> List[int]:"""文本生成(自回归解码)"""self.eval()device = input_ids.deviceseq_len = input_ids.shape[1]# 生成循环(最多生成max_gen_len个token)for _ in range(max_gen_len):# 限制输入长度(避免超过位置嵌入范围)if seq_len >= self.position_embedding.num_embeddings:break# 前向传播获取logitsoutputs = self(input_ids=input_ids)logits = outputs["logits"][:, -1, :] # 取最后一个token的logits (1, vocab_size)# Top-K采样(过滤低概率token)if top_k > 0:top_k_values, top_k_indices = torch.topk(logits, top_k, dim=-1)logits = torch.full_like(logits, -1e9, device=device)logits.scatter_(-1, top_k_indices, top_k_values)# 温度缩放(降低随机性)if temperature != 1.0:logits = logits / temperature# 计算概率并采样probs = F.softmax(logits, dim=-1)next_token_id = torch.multinomial(probs, num_samples=1).squeeze(-1) # (1,)# 终止条件(生成</s>则停止)if next_token_id.item() == self.token_embedding.vocab["</s>"]:break# 拼接新tokeninput_ids = torch.cat([input_ids, next_token_id.unsqueeze(0)], dim=-1)seq_len += 1return input_ids.squeeze(0).tolist() # 转为列表返回
该代码实现了基于 Transformer 解码器的 GPT 模型核心类,是自回归语言模型(如 GPT 系列)的基础架构。它通过 “输入嵌入→多层解码器→输出层” 的流程,实现了文本的预训练(因果语言模型任务)和自回归生成功能。以下是其所有功能的详细拆解:
整体架构概述
GPTModel 是一个 ** 仅解码器(Decoder-only)** 的 Transformer 模型,核心设计遵循 GPT 系列的自回归范式:
- 输入:文本的 Token ID 序列(
input_ids)。 - 输出:每个位置的下一个 Token 的概率分布(
logits),用于预训练损失计算或生成文本。 - 核心能力:通过多层自注意力捕捉上下文依赖,实现 “基于前文预测后文” 的自回归任务。
组件功能详解(__init__ 方法)
在初始化阶段,模型构建了以下核心组件:
1. 输入嵌入模块
-
词嵌入(
token_embedding):- 类型:
nn.Embedding(vocab_size, d_model, padding_idx=pad_token_id) - 功能:将输入的 Token ID(如
[101, 2023, 3052])映射为维度为d_model的向量,是模型理解 “语义” 的基础。 - 细节:
padding_idx=pad_token_id表示填充 Token(如<pad>)的嵌入会被初始化为 0,且在损失计算时被忽略。
- 类型:
-
位置嵌入(
position_embedding):- 类型:
nn.Embedding(max_seq_len, d_model) - 功能:为每个序列位置(如第 1 个 Token、第 2 个 Token)分配一个可学习的向量,弥补 Transformer “无位置感知” 的缺陷,让模型理解序列的顺序信息。
- 细节:
max_seq_len限制了模型能处理的最大序列长度(超过则需截断)。
- 类型:
-
嵌入 Dropout(
embedding_dropout):- 类型:
nn.Dropout(dropout) - 功能:对词嵌入 + 位置嵌入的结果做 Dropout,防止过拟合。
- 类型:
2. 解码器层堆叠(decoder_layers)
- 类型:
nn.ModuleList([TransformerDecoderLayer(...)] * n_layers) - 功能:由
n_layers个TransformerDecoderLayer(来自transformer_components)组成,是模型的核心特征提取模块。每个层包含:- 多头自注意力:捕捉序列内的上下文依赖(如 “苹果” 在 “吃苹果” 和 “苹果公司” 中语义的差异)。
- 前馈网络:对每个位置的特征独立做非线性变换,增强表达能力。
- 残差连接与层归一化:稳定深层网络的训练,防止梯度消失或爆炸。
3. 输出层
-
最终层归一化(
norm_final):- 类型:
nn.LayerNorm(d_model, eps=1e-6) - 功能:对解码器输出的隐藏层向量做层归一化,确保分布稳定。
- 类型:
-
输出线性层(
output_layer):- 类型:
nn.Linear(d_model, vocab_size, bias=False) - 功能:将隐藏层向量(
d_model维)映射为词表大小的 Logits(未经过 Softmax 的概率),用于预测下一个 Token。 - 优化点:权重与
token_embedding共享,减少参数数量并提升训练效率(是 GPT 系列的经典优化策略)。
- 类型:
4. 参数初始化(_init_weights)
- 线性层(
nn.Linear):使用Xavier 均匀初始化,让权重分布更合理,利于梯度传播。 - 嵌入层(
nn.Embedding):使用正态分布初始化(均值 0,标准差1/√d_model),填充 Token 的嵌入初始化为 0,确保模型忽略这些位置的损失。
训练流程(forward 方法)
forward 方法处理模型的预训练阶段(因果语言模型任务,CLM),输入是 Token ID 序列,输出是 Logits 和损失。
输入参数
input_ids:形状(batch_size, seq_len),模型的输入 Token ID 序列。attention_mask:形状(batch_size, seq_len),Padding 掩码(0 表示填充,1 表示有效 Token)。labels:形状(batch_size, seq_len),自回归任务的标签(与input_ids同形,用于计算损失)。
流程步骤
-
位置索引生成:
position_ids = torch.arange(seq_len, device=device).unsqueeze(0).repeat(batch_size, 1),生成每个 Token 的位置索引(0 到 seq_len-1)。 -
输入嵌入计算:
- 词嵌入:
token_emb = self.token_embedding(input_ids)→ 将 Token ID 转为向量。 - 位置嵌入:
pos_emb = self.position_embedding(position_ids)→ 为每个位置添加向量。 - 嵌入融合:
x = self.embedding_dropout(token_emb + pos_emb)→ 词嵌入 + 位置嵌入后,经过 Dropout。
- 词嵌入:
-
注意力掩码处理:
attention_mask = attention_mask.unsqueeze(1)→ 调整为多头注意力需要的形状(batch_size, 1, seq_len)。 -
解码器层前向传播:依次通过每个
TransformerDecoderLayer,得到最终的隐藏层表示x。 -
输出 Logits 计算:
x = self.norm_final(x)→ 层归一化;logits = self.output_layer(x)→ 映射为词表大小的 Logits。 -
自回归损失计算:
- 因为是 “用第 i 个 Token 预测第 i+1 个 Token”,所以
shift_logits = logits[:, :-1, :](前 seq_len-1 个 Token 的 Logits),shift_labels = labels[:, 1:](后 seq_len-1 个 Token 的 ID)。 - 使用
CrossEntropyLoss,并设置ignore_index=self.pad_token_id,忽略填充 Token 的损失。
- 因为是 “用第 i 个 Token 预测第 i+1 个 Token”,所以
推理流程(generate 方法)
generate 方法实现自回归文本生成,基于输入的前缀(input_ids)逐步生成后续 Token。
输入参数
input_ids:形状(1, prefix_len),生成的前缀 Token ID(批量大小固定为 1,因为自回归是逐次生成)。max_gen_len:最大生成长度(防止无限生成)。top_k:Top-K 采样的候选数(过滤低概率 Token,提升生成稳定性)。temperature:温度系数(控制生成的随机性,值越小越 “确定”,越大越 “随机”)。
生成流程
-
初始化与循环设置:模型设为评估模式(
self.eval()),获取设备,记录当前序列长度seq_len。 -
逐 Token 生成循环:最多循环
max_gen_len次:- 长度限制:若
seq_len超过position_embedding.num_embeddings(位置嵌入的最大长度),停止生成。 - 前向传播:通过
forward方法获取当前序列的 Logits,取最后一个 Token 的 Logits(logits[:, -1, :])。 - Top-K 采样:若
top_k > 0,只保留 Top-K 个高概率 Token,其余设为极低值(-1e9),防止采样到无意义 Token。 - 温度缩放:
logits = logits / temperature,调整 Logits 的分布(温度越低,高概率 Token 的优势越明显)。 - 采样下一个 Token:对 Logits 做 Softmax 得到概率,通过
torch.multinomial采样下一个 Token ID。 - 终止条件:若采样到结束符
</s>(需确保该 Token 在词表中存在且映射正确),停止生成。 - 拼接新 Token:将采样的 Token ID 拼接到
input_ids后,更新seq_len。
- 长度限制:若
-
返回结果:将最终的
input_ids转为列表返回,即生成的完整 Token 序列。
核心设计亮点
- 权重共享:输出层与词嵌入层共享权重,减少参数并提升训练效率。
- 可学习位置嵌入:适应不同长度的序列,捕捉位置信息。
- 自回归生成:通过循环采样实现 “逐词生成”,是 GPT 类模型生成连贯文本的核心机制。
- Top-K 与温度控制:平衡生成的 “多样性” 与 “合理性”,可根据需求调整参数。
综上,该GPTModel类完整实现了 GPT 模型的 ** 训练(预训练 + 微调)和推理(自回归生成)** 流程,是构建大语言模型的核心组件。
6. data_loader.py
import torch
from torch.utils.data import Dataset, DataLoader
import json
import random
from typing import List, Dict, Tuple
from byte_level_bpe import ByteLevelBPETokenizerclass GPTPretrainDataset(Dataset):"""预训练数据集(文本续写任务,CLM目标)"""def __init__(self,file_path: str,tokenizer: ByteLevelBPETokenizer,max_seq_len: int = 2048,min_text_len: int = 100 # 过滤过短文本):self.tokenizer = tokenizerself.max_seq_len = max_seq_lenself.min_text_len = min_text_len# 加载并过滤文本with open(file_path, "r", encoding="utf-8") as f:self.texts = [line.strip() for line in f if len(line.strip()) >= min_text_len]print(f"加载预训练数据:{len(self.texts)}条样本,文件路径:{file_path}")def __len__(self) -> int:return len(self.texts)def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:text = self.texts[idx]# 随机截断(避免文本过长导致编码后超过max_seq_len)if len(text) > self.max_seq_len * 2: # 假设平均每个token对应2个字符start_idx = random.randint(0, len(text) - self.max_seq_len * 2)text = text[start_idx:start_idx + self.max_seq_len * 2]# 编码(包含特殊token:<s>开头,</s>结尾)encoding = self.tokenizer.encode(text=text,max_seq_len=self.max_seq_len,add_special_tokens=True)# 标签与输入一致(CLM任务:用前i个token预测i+1个)return {"input_ids": torch.tensor(encoding["input_ids"], dtype=torch.long),"attention_mask": torch.tensor(encoding["attention_mask"], dtype=torch.long),"labels": torch.tensor(encoding["input_ids"], dtype=torch.long) # 标签与输入相同}class GPTSFTDataset(Dataset):"""SFT微调数据集(指令-响应对任务)"""def __init__(self,file_path: str,tokenizer: ByteLevelBPETokenizer,max_seq_len: int = 2048,prompt_template: str = "### 指令:{instruction}\n### 响应:{response}"):self.tokenizer = tokenizerself.max_seq_len = max_seq_lenself.prompt_template = prompt_template# 加载SFT数据(格式:[{"instruction": "...", "response": "..."}])with open(file_path, "r", encoding="utf-8") as f:self.data = json.load(f)print(f"加载SFT数据:{len(self.data)}条样本,文件路径:{file_path}")def __len__(self) -> int:return len(self.data)def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:item = self.data[idx]instruction = item["instruction"].strip()response = item["response"].strip()# 格式化prompt(指令+响应)full_text = self.prompt_template.format(instruction=instruction, response=response)# 编码完整文本encoding = self.tokenizer.encode(text=full_text,max_seq_len=self.max_seq_len,add_special_tokens=True)input_ids = torch.tensor(encoding["input_ids"], dtype=torch.long)attention_mask = torch.tensor(encoding["attention_mask"], dtype=torch.long)# 构建标签:仅计算响应部分的损失(指令部分标签设为-100,被CrossEntropyLoss忽略)prompt_text = self.prompt_template.format(instruction=instruction, response="")prompt_encoding = self.tokenizer.encode(text=prompt_text,max_seq_len=self.max_seq_len,add_special_tokens=True)prompt_len = sum(prompt_encoding["attention_mask"]) # 指令部分长度labels = input_ids.clone()labels[:prompt_len] = -100 # 指令部分不参与损失计算return {"input_ids": input_ids,"attention_mask": attention_mask,"labels": labels}class GPTPPODataset(Dataset):"""RLHF-PPO数据集( pairwise偏好数据)"""def __init__(self,file_path: str,tokenizer: ByteLevelBPETokenizer,max_seq_len: int = 2048,prompt_template: str = "### 指令:{instruction}\n### 响应:"):self.tokenizer = tokenizerself.max_seq_len = max_seq_lenself.prompt_template = prompt_template# 加载pairwise数据(格式:{"instruction": "...", "chosen": "...", "rejected": "..."})with open(file_path, "r", encoding="utf-8") as f:self.data = json.load(f)print(f"加载PPO数据:{len(self.data)}条样本,文件路径:{file_path}")def __len__(self) -> int:return len(self.data)def __getitem__(self, idx: int) -> Dict[str, torch.Tensor]:item = self.data[idx]instruction = item["instruction"].strip()chosen_response = item["chosen"].strip()rejected_response = item["rejected"].strip()# 编码prompt(仅指令部分,用于生成响应)prompt_text = self.prompt_template.format(instruction=instruction)prompt_encoding = self.tokenizer.encode(text=prompt_text,max_seq_len=self.max_seq_len,add_special_tokens=True)prompt_ids = torch.tensor(prompt_encoding["input_ids"], dtype=torch.long)prompt_mask = torch.tensor(prompt_encoding["attention_mask"], dtype=torch.long)prompt_len = sum(prompt_encoding["attention_mask"])# 编码被偏好的响应和被拒绝的响应chosen_full = prompt_text + chosen_responsechosen_encoding = self.tokenizer.encode(text=chosen_full,max_seq_len=self.max_seq_len,add_special_tokens=True)chosen_ids = torch.tensor(chosen_encoding["input_ids"], dtype=torch.long)rejected_full = prompt_text + rejected_responserejected_encoding = self.tokenizer.encode(text=rejected_full,max_seq_len=self.max_seq_len,add_special_tokens=True)rejected_ids = torch.tensor(rejected_encoding["input_ids"], dtype=torch.long)return {"prompt_ids": prompt_ids,"prompt_mask": prompt_mask,"prompt_len": prompt_len,"chosen_ids": chosen_ids,"rejected_ids": rejected_ids}def create_pretrain_dataloader(file_path: str,tokenizer: ByteLevelBPETokenizer,batch_size: int = 8,max_seq_len: int = 2048,num_workers: int = 4
) -> DataLoader:"""创建预训练数据加载器"""dataset = GPTPretrainDataset(file_path=file_path,tokenizer=tokenizer,max_seq_len=max_seq_len)return DataLoader(dataset,batch_size=batch_size,shuffle=True,num_workers=num_workers,pin_memory=True,drop_last=True)def create_sft_dataloader(file_path: str,tokenizer: ByteLevelBPETokenizer,batch_size: int = 8,max_seq_len: int = 2048,num_workers: int = 4
) -> DataLoader:"""创建SFT微调数据加载器"""dataset = GPTSFTDataset(file_path=file_path,tokenizer=tokenizer,max_seq_len=max_seq_len)return DataLoader(dataset,batch_size=batch_size,shuffle=True,num_workers=num_workers,pin_memory=True,drop_last=True)
该代码实现了GPT 模型全流程训练的数据加载模块,为 ** 预训练(CLM)、监督微调(SFT)、人类反馈强化学习(RLHF-PPO)** 三个核心阶段提供标准化的数据处理能力。以下是各组件功能的详细拆解:
核心数据集类功能
1. GPTPretrainDataset(预训练数据集,CLM 任务)
用于因果语言模型(Causal Language Model)预训练,目标是让模型学习 “基于前文预测后文” 的自回归能力。
-
初始化(
__init__):- 输入:
file_path(文本文件路径,每行 1 条样本)、tokenizer(字节级 BPE 分词器)、max_seq_len(模型最大序列长度)、min_text_len(过滤过短文本)。 - 功能:加载文本文件,过滤长度不足的样本,初始化数据集。
- 输入:
-
数据处理逻辑(
__getitem__):- 随机截断:若文本长度超过
max_seq_len * 2(假设每个 token 平均对应 2 个字符),随机截取一段,避免编码后超出模型长度限制。 - 文本编码:调用分词器的
encode方法,将文本转换为input_ids(Token ID 序列)、attention_mask(填充掩码),并添加特殊 token(<s>开头、</s>结尾)。 - 标签构建:
labels = input_ids(因为 CLM 任务是 “用第 i 个 token 预测第 i+1 个 token”,所以标签与输入序列完全一致,训练时模型会自动偏移一位计算损失)。
- 随机截断:若文本长度超过
-
输出格式:
input_ids:形状(max_seq_len,),模型输入的 Token ID 序列。attention_mask:形状(max_seq_len,),0 表示填充(<pad>),1 表示有效 Token。labels:形状(max_seq_len,),与input_ids一致,用于计算自回归损失。
2. GPTSFTDataset(监督微调数据集,SFT 任务)
用于指令 - 响应对齐的监督微调,目标是让模型学习 “指令→响应” 的映射关系(如 “指令:解释什么是 AI → 响应:人工智能是……”)。
-
初始化(
__init__):- 输入:
file_path(SFT 数据文件路径,格式为[{"instruction": "...", "response": "..."}])、tokenizer、max_seq_len、prompt_template(指令 - 响应的格式化模板,如 “### 指令:{instruction}\n### 响应:{response}”)。 - 功能:加载指令 - 响应对数据,初始化数据集。
- 输入:
-
数据处理逻辑(
__getitem__):- 模板格式化:将 “指令” 和 “响应” 填入
prompt_template,生成完整的 prompt 文本(如 “### 指令:解释 AI\n### 响应:人工智能是……”)。 - 文本编码:将完整 prompt 编码为
input_ids和attention_mask。 - 标签构建(核心设计):
- 先编码 “仅指令” 的 prompt(如 “### 指令:解释 AI\n### 响应:”),得到其有效长度
prompt_len。 - 对
labels做修改:labels[:prompt_len] = -100(-100是CrossEntropyLoss的ignore_index,表示这些位置的损失会被忽略)。 - 最终
labels中,指令部分不参与损失计算,仅响应部分的损失会被优化—— 这是 SFT 任务的关键设计,确保模型只学习 “响应生成”,不修改 “指令理解” 的部分。
- 先编码 “仅指令” 的 prompt(如 “### 指令:解释 AI\n### 响应:”),得到其有效长度
- 模板格式化:将 “指令” 和 “响应” 填入
-
输出格式:
input_ids:形状(max_seq_len,),包含指令 + 响应的 Token ID 序列。attention_mask:形状(max_seq_len,),有效 Token 掩码。labels:形状(max_seq_len,),指令部分为-100,响应部分为真实 Token ID。
3. GPTPPODataset(RLHF-PPO 数据集,偏好对齐任务)
用于人类反馈强化学习的 PPO 阶段,处理 “偏好对” 数据(即 “更优响应” 和 “较差响应” 的对比数据),为奖励模型训练和 PPO 优化提供输入。
-
初始化(
__init__):- 输入:
file_path(PPO 数据文件路径,格式为[{"instruction": "...", "chosen": "...", "rejected": "..."}])、tokenizer、max_seq_len、prompt_template(仅包含指令部分的模板,如 “### 指令:{instruction}\n### 响应:”)。 - 功能:加载 “指令 + 更优响应 + 较差响应” 的三元组数据,初始化数据集。
- 输入:
-
数据处理逻辑(
__getitem__):- 编码 prompt:将 “指令” 填入模板,编码为
prompt_ids和prompt_mask,记录其长度prompt_len(用于后续响应生成的上下文)。 - 编码偏好对响应:分别将 “更优响应(chosen)” 和 “较差响应(rejected)” 拼接到 prompt 后,编码为
chosen_ids和rejected_ids。 - 作用:这些数据将用于奖励模型训练(学习 “更优响应” 的打分逻辑)和PPO 优化(让策略模型生成更符合人类偏好的响应)。
- 编码 prompt:将 “指令” 填入模板,编码为
-
输出格式:
prompt_ids/prompt_mask/prompt_len:指令部分的 Token ID、掩码和长度。chosen_ids/rejected_ids:更优响应和较差响应的完整 Token ID 序列。
数据加载器函数功能
1. create_pretrain_dataloader
- 输入:
file_path、tokenizer、batch_size、max_seq_len、num_workers(数据加载的线程数)。 - 功能:将
GPTPretrainDataset封装为DataLoader,支持批量加载、数据打乱、多线程加速等功能,直接用于预训练阶段的模型训练。
2. create_sft_dataloader
- 输入:与
create_pretrain_dataloader类似,针对GPTSFTDataset。 - 功能:为 SFT 微调阶段提供批量数据加载能力,支持多线程和批量处理。
模块整体作用
该模块是 GPT 模型全流程训练的 “数据中枢”:
- 预训练阶段:通过
GPTPretrainDataset提供 “自回归续写” 的训练数据,让模型学习通用语言建模能力。 - SFT 阶段:通过
GPTSFTDataset提供 “指令→响应” 的对齐数据,让模型学习特定任务的指令跟随能力。 - RLHF 阶段:通过
GPTPPODataset提供 “偏好对” 数据,让模型学习人类偏好的对齐能力。
每个数据集的标签设计、编码逻辑都严格匹配对应阶段的任务目标,确保模型在不同训练阶段能高效学习到所需能力。
7. pretrain_trainer.py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.optim.lr_scheduler import LambdaLR
from torch.cuda.amp import GradScaler, autocast
import os
import time
import logging
from tqdm import tqdm
from typing import Dict, Tuple, Optional
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizer
from data_loader import create_pretrain_dataloader# 配置日志
logging.basicConfig(level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s",handlers=[logging.FileHandler("pretrain.log"), logging.StreamHandler()]
)class GPTTrainer:def __init__(self,model: GPTModel,train_loader: DataLoader,val_loader: DataLoader,device: torch.device,output_dir: str = "models/pretrain",lr: float = 2e-4,weight_decay: float = 0.1,betas: Tuple[float, float] = (0.9, 0.95),max_grad_norm: float = 1.0,num_epochs: int = 10,warmup_steps: int = 1000,log_interval: int = 100,save_interval: int = 1000):self.model = model.to(device)self.train_loader = train_loaderself.val_loader = val_loaderself.device = deviceself.output_dir = output_diros.makedirs(output_dir, exist_ok=True)# 优化器(AdamW,带权重衰减)self.optimizer = optim.AdamW(params=self.model.parameters(),lr=lr,weight_decay=weight_decay,betas=betas)# 学习率调度器(余弦退火+预热)total_steps = num_epochs * len(train_loader)self.scheduler = self._create_scheduler(total_steps, warmup_steps)self.max_grad_norm = max_grad_normself.num_epochs = num_epochsself.log_interval = log_intervalself.save_interval = save_interval# 混合精度训练self.scaler = GradScaler()# 记录最佳验证损失self.best_val_loss = float("inf")def _create_scheduler(self, total_steps: int, warmup_steps: int) -> LambdaLR:"""创建学习率调度器:先线性预热,再余弦衰减"""def lr_lambda(step: int) -> float:# 预热阶段if step < warmup_steps:return step / warmup_steps# 余弦衰减阶段progress = (step - warmup_steps) / (total_steps - warmup_steps)return 0.5 * (1.0 + torch.cos(torch.tensor(progress * torch.pi))).item()return LambdaLR(self.optimizer, lr_lambda=lr_lambda)def _train_step(self, batch: Dict[str, torch.Tensor]) -> float:"""单步训练"""self.model.train()self.optimizer.zero_grad()# 数据移至设备input_ids = batch["input_ids"].to(self.device)attention_mask = batch["attention_mask"].to(self.device)labels = batch["labels"].to(self.device)# 混合精度前向传播with autocast():outputs = self.model(input_ids=input_ids,attention_mask=attention_mask,labels=labels)loss = outputs["loss"]# 反向传播+梯度裁剪self.scaler.scale(loss).backward()self.scaler.unscale_(self.optimizer)nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm)# 参数更新self.scaler.step(self.optimizer)self.scaler.update()self.scheduler.step()return loss.item()def _val_step(self, batch: Dict[str, torch.Tensor]) -> float:"""单步验证"""self.model.eval()with torch.no_grad():input_ids = batch["input_ids"].to(self.device)attention_mask = batch["attention_mask"].to(self.device)labels = batch["labels"].to(self.device)outputs = self.model(input_ids=input_ids,attention_mask=attention_mask,labels=labels)return outputs["loss"].item()def save_model(self, step: int, is_best: bool = False) -> None:"""保存模型权重"""save_path = os.path.join(self.output_dir, f"pretrain_step_{step}.pth")torch.save({"model_state_dict": self.model.state_dict(),"optimizer_state_dict": self.optimizer.state_dict(),"scheduler_state_dict": self.scheduler.state_dict(),"step": step}, save_path)logging.info(f"模型保存至:{save_path}")# 保存最佳模型if is_best:best_path = os.path.join(self.output_dir, "pretrain_best.pth")torch.save({"model_state_dict": self.model.state_dict()}, best_path)logging.info(f"最佳模型保存至:{best_path}")def load_checkpoint(self, checkpoint_path: str) -> int:"""加载 checkpoint 续训"""checkpoint = torch.load(checkpoint_path, map_location=self.device)self.model.load_state_dict(checkpoint["model_state_dict"])self.optimizer.load_state_dict(checkpoint["optimizer_state_dict"])self.scheduler.load_state_dict(checkpoint["scheduler_state_dict"])start_step = checkpoint["step"]logging.info(f"从 checkpoint 加载成功:{checkpoint_path},起始步数:{start_step}")return start_stepdef train(self, resume_from: Optional[str] = None) -> None:"""主训练循环"""start_epoch = 0start_step = 0# 加载 checkpointif resume_from is not None and os.path.exists(resume_from):start_step = self.load_checkpoint(resume_from)start_epoch = start_step // len(self.train_loader)total_steps = self.num_epochs * len(self.train_loader)global_step = start_steplogging.info(f"开始训练:总 epoch={self.num_epochs},总步数={total_steps},起始步数={start_step}")for epoch in range(start_epoch, self.num_epochs):epoch_start_time = time.time()train_losses = []# 训练阶段train_pbar = tqdm(self.train_loader, desc=f"Epoch {epoch + 1}/{self.num_epochs}")for batch in train_pbar:loss = self._train_step(batch)train_losses.append(loss)global_step += 1# 日志输出if global_step % self.log_interval == 0:avg_loss = sum(train_losses[-self.log_interval:]) / self.log_intervallr = self.optimizer.param_groups[0]["lr"]logging.info(f"Step {global_step}/{total_steps} | "f"Train Loss: {avg_loss:.4f} | "f"LR: {lr:.6f}")train_pbar.set_postfix({"loss": f"{avg_loss:.4f}"})# 保存 checkpointif global_step % self.save_interval == 0:self.save_model(global_step, is_best=False)# 验证阶段val_losses = []val_pbar = tqdm(self.val_loader, desc="Validation")for batch in val_pbar:val_loss = self._val_step(batch)val_losses.append(val_loss)val_pbar.set_postfix({"val_loss": f"{val_loss:.4f}"})avg_val_loss = sum(val_losses) / len(val_losses)epoch_time = (time.time() - epoch_start_time) / 60 # 分钟logging.info(f"Epoch {epoch + 1} 完成 | "f"Avg Train Loss: {sum(train_losses) / len(train_losses):.4f} | "f"Avg Val Loss: {avg_val_loss:.4f} | "f"耗时: {epoch_time:.2f}分钟")# 保存最佳模型if avg_val_loss < self.best_val_loss:self.best_val_loss = avg_val_lossself.save_model(global_step, is_best=True)logging.info("训练完成!")# 预训练启动入口
def main():# 配置device = torch.device("cuda" if torch.cuda.is_available() else "cpu")vocab_size = 50000d_model = 768n_layers = 12n_heads = 12d_ff = 3072max_seq_len = 2048batch_size = 8num_epochs = 10train_data_path = "data/pretrain/train.txt"val_data_path = "data/pretrain/val.txt"tokenizer_dir = "models/bpe_tokenizer"output_dir = "models/pretrain"# 加载分词器tokenizer = ByteLevelBPETokenizer.from_pretrained(tokenizer_dir)# 创建数据加载器train_loader = create_pretrain_dataloader(file_path=train_data_path,tokenizer=tokenizer,batch_size=batch_size,max_seq_len=max_seq_len)val_loader = create_pretrain_dataloader(file_path=val_data_path,tokenizer=tokenizer,batch_size=batch_size,max_seq_len=max_seq_len)# 初始化模型model = GPTModel(vocab_size=vocab_size,d_model=d_model,n_layers=n_layers,n_heads=n_heads,d_ff=d_ff,max_seq_len=max_seq_len)# 初始化训练器并启动训练trainer = GPTTrainer(model=model,train_loader=train_loader,val_loader=val_loader,device=device,output_dir=output_dir,num_epochs=num_epochs)trainer.train()if __name__ == "__main__":main()
该代码实现了GPT 模型预训练阶段的完整训练管理模块,核心是通过GPTTrainer类封装训练全流程(优化器配置、学习率调度、混合精度训练、模型保存 / 续训、训练 / 验证监控),并通过main函数提供可直接执行的预训练入口。以下是各组件功能的详细拆解:
整体功能概述
代码的核心目标是:将 GPT 模型(GPTModel)与预训练数据(GPTPretrainDataset)对接,通过标准化的训练流程实现因果语言模型(CLM)预训练,同时支持训练监控、模型保存、断点续训等工程化能力,确保训练稳定、高效且可复现。
核心组件详解(GPTTrainer类)
GPTTrainer是训练管理的核心,涵盖从 “参数初始化” 到 “训练结束” 的全流程控制,以下按方法拆解:
1. 初始化(__init__方法)
作用:配置训练所需的所有核心组件,为训练做准备。输入参数(关键):
model:待训练的GPTModel实例。train_loader/val_loader:训练 / 验证数据加载器(来自data_loader模块)。device:训练设备(CPU/GPU,优先用 GPU)。output_dir:模型和日志的保存目录。- 优化相关参数:
lr(初始学习率)、weight_decay(权重衰减)、betas(AdamW 的动量参数)。 - 训练控制参数:
num_epochs(总轮次)、warmup_steps(学习率预热步数)、log_interval(日志输出间隔)、save_interval(模型保存间隔)。
核心组件初始化逻辑:
- 输出目录创建:
os.makedirs(output_dir, exist_ok=True),确保保存路径存在,避免报错。 - 优化器(AdamW):
- 选择
AdamW优化器(Transformer 模型的主流选择),支持weight_decay(L2 正则化变种,防止过拟合,对 Transformer 的权重衰减更友好)。 - 参数:
betas=(0.9, 0.95)(动量参数,平衡历史梯度与当前梯度)、lr=2e-4(预训练常用初始学习率)。
- 选择
- 学习率调度器:调用
_create_scheduler生成 “预热 + 余弦衰减” 调度器(动态调整学习率,提升收敛稳定性)。 - 混合精度训练(GradScaler):
- 初始化
GradScaler,支持 FP16 混合精度训练(相比 FP32,训练速度提升 2-3 倍,显存占用减少 50%+),同时避免 FP16 梯度下溢问题。
- 初始化
- 最佳验证损失记录:
self.best_val_loss = float("inf"),用于后续判断 “最佳模型”(基于验证损失)。
2. 学习率调度器(_create_scheduler方法)
作用:生成动态学习率策略,解决 “固定学习率难以兼顾训练初期收敛与后期稳定” 的问题,采用 **“线性预热 + 余弦衰减”** 组合策略(GPT 系列预训练的标准配置)。
调度逻辑:
- 阶段 1:线性预热(前
warmup_steps步):学习率从 0 线性增长到初始lr(公式:lr = step / warmup_steps * initial_lr)。目的:避免训练初期高学习率导致模型参数震荡、不收敛(模型需要 “适应期”)。 - 阶段 2:余弦衰减(预热后至总步数):学习率从
initial_lr按余弦曲线缓慢衰减至 0(公式:lr = 0.5 * initial_lr * (1 + cos(progress * π)),其中progress是 “预热后步数 / 总预热后步数”)。目的:后期降低学习率,让模型参数稳定收敛,避免过拟合。
输出:LambdaLR实例(PyTorch 的学习率调度器类),绑定到优化器动态调整学习率。
3. 单步训练(_train_step方法)
作用:处理单个 batch 数据的训练逻辑(前向传播→损失计算→反向传播→参数更新),是训练的 “最小单元”。
流程步骤:
- 模型设为训练模式:
self.model.train()(启用 Dropout、BatchNorm 等训练特有的层)。 - 梯度清零:
self.optimizer.zero_grad()(避免上一个 batch 的梯度残留影响当前更新)。 - 数据移至设备:将
input_ids、attention_mask、labels从 CPU 移至device(GPU/CPU),确保与模型在同一设备。 - 混合精度前向传播:
- 用
with autocast():上下文启用 FP16 混合精度,模型前向传播计算loss(CLM 任务的自回归损失,由GPTModel.forward返回)。 - 混合精度的优势:减少显存占用、提升计算速度,
GradScaler会自动缩放损失以避免 FP16 梯度下溢。
- 用
- 反向传播与梯度处理:
self.scaler.scale(loss).backward():缩放损失后反向传播,计算梯度。self.scaler.unscale_(self.optimizer):恢复梯度缩放(避免优化器更新时受缩放影响)。nn.utils.clip_grad_norm_(self.model.parameters(), self.max_grad_norm):梯度裁剪(将梯度 norms 限制在max_grad_norm内,防止梯度爆炸,Transformer 模型的关键稳定手段)。
- 参数更新与学习率调度:
self.scaler.step(self.optimizer):根据缩放后的梯度更新模型参数(仅当梯度无异常时更新)。self.scaler.update():动态调整缩放因子(适配下一个 batch)。self.scheduler.step():更新学习率(按 “预热 + 余弦衰减” 策略)。
输出:当前 batch 的训练损失值(用于后续平均损失计算)。
4. 单步验证(_val_step方法)
作用:处理单个 batch 的验证逻辑,计算验证损失(仅前向传播,不更新参数),用于评估模型泛化能力。
核心逻辑:
- 模型设为评估模式:
self.model.eval()(禁用 Dropout、固定 BatchNorm 参数,确保验证结果稳定)。 - 关闭梯度计算:
with torch.no_grad():(避免验证阶段占用显存、浪费计算资源,且防止参数被意外修改)。 - 前向传播计算损失:与训练步骤的前向传播逻辑一致,但仅返回损失值,不做反向传播。
输出:当前 batch 的验证损失值。
5. 模型保存(save_model方法)
作用:保存训练过程中的模型权重,支持 “普通检查点(Checkpoint)” 和 “最佳模型” 两种类型,避免训练中断丢失进度。
保存逻辑:
- 普通 Checkpoint:
- 保存内容:
model_state_dict(模型权重)、optimizer_state_dict(优化器状态,用于续训)、scheduler_state_dict(调度器状态)、step(当前训练步数)。 - 路径:
output_dir/pretrain_step_{step}.pth(按步数命名,方便追溯)。
- 保存内容:
- 最佳模型:
- 仅在验证损失达到当前最低时保存,仅保存
model_state_dict(体积更小,用于后续推理或微调)。 - 路径:
output_dir/pretrain_best.pth(固定命名,方便后续调用)。
- 仅在验证损失达到当前最低时保存,仅保存
日志记录:通过logging输出保存路径,方便用户跟踪模型文件。
6. 断点续训(load_checkpoint方法)
作用:加载之前保存的 Checkpoint,恢复模型权重、优化器状态、调度器状态,从中断的步数继续训练(解决长周期训练中断的痛点)。
加载逻辑:
- 读取 Checkpoint 文件,将
model_state_dict加载到模型,optimizer_state_dict和scheduler_state_dict分别加载到优化器和调度器。 - 返回
step(中断时的训练步数),用于后续计算起始轮次(start_epoch = step // len(train_loader))。
日志记录:输出加载成功的 Checkpoint 路径和起始步数,确保用户明确续训状态。
7. 主训练循环(train方法)
作用:串联 “训练→验证→日志→保存” 全流程,是GPTTrainer的核心执行逻辑。
流程步骤:
- 续训初始化:
- 若传入
resume_from(Checkpoint 路径),调用load_checkpoint恢复训练状态,计算start_epoch(起始轮次)和start_step(起始步数)。
- 若传入
- 总步数计算:
total_steps = num_epochs * len(train_loader)(用于学习率调度和进度监控)。 - Epoch 循环(按轮次训练):
- 训练阶段:
- 用
tqdm显示训练进度条,遍历train_loader获取每个 batch,调用_train_step计算损失。 - 日志输出:每
log_interval步,计算最近log_interval个 batch 的平均损失,输出 “步数 / 总步数、训练损失、当前学习率”(方便实时监控训练趋势)。 - 模型保存:每
save_interval步,调用save_model保存普通 Checkpoint。
- 用
- 验证阶段:
- 遍历
val_loader,调用_val_step计算所有验证 batch 的损失,求平均得到avg_val_loss。 - 最佳模型判断:若
avg_val_loss < self.best_val_loss,更新最佳损失并保存最佳模型。
- 遍历
- Epoch 总结日志:输出 “轮次、平均训练损失、平均验证损失、耗时”,方便评估每轮训练效果。
- 训练阶段:
- 训练结束:输出 “训练完成” 日志,标志预训练流程结束。
预训练入口(main函数)
作用:提供可直接执行的预训练配置与启动逻辑,将 “数据加载→模型初始化→训练器启动” 串联成完整流程。
流程步骤:
- 设备配置:优先使用 GPU(
torch.device("cuda" if torch.cuda.is_available() else "cpu")),Transformer 模型训练依赖 GPU 算力。 - 超参数配置:
- 模型参数:
vocab_size=50000(对应 ByteLevelBPETokenizer 的词表大小)、d_model=768(模型隐藏层维度,GPT-2 基础版配置)、n_layers=12(解码器层数)、n_heads=12(注意力头数)、d_ff=3072(前馈网络中间层维度,通常为4*d_model)。 - 训练参数:
batch_size=8(批量大小,根据 GPU 显存调整)、num_epochs=10(总轮次)、max_seq_len=2048(模型最大处理序列长度)。
- 模型参数:
- 数据与分词器加载:
- 加载预训练分词器(
ByteLevelBPETokenizer.from_pretrained(tokenizer_dir))。 - 创建训练 / 验证数据加载器(
create_pretrain_dataloader,来自data_loader模块)。
- 加载预训练分词器(
- 模型与训练器初始化:
- 初始化
GPTModel(传入模型超参数)。 - 初始化
GPTTrainer(传入模型、数据加载器、设备、输出目录等)。
- 初始化
- 启动训练:调用
trainer.train(),开始预训练流程。
核心设计亮点
-
工程化适配性强:
- 支持断点续训(加载 Checkpoint 恢复所有状态),解决长周期训练中断问题。
- 日志同时输出到文件(
pretrain.log)和控制台,方便后续分析与实时监控。 - 模型分 “普通 Checkpoint” 和 “最佳模型” 保存,兼顾续训与推理需求。
-
训练稳定性优化:
- 采用 “线性预热 + 余弦衰减” 学习率策略,避免初始高学习率震荡和后期收敛缓慢。
- 梯度裁剪防止 Transformer 模型常见的梯度爆炸问题。
- 混合精度训练(FP16)提升训练速度、降低显存占用(相比 FP32 显存占用减少 50%+)。
-
易用性高:
- 通过
main函数封装所有配置,用户只需调整超参数即可启动训练,无需修改核心逻辑。 tqdm进度条直观显示训练 / 验证进度,关键指标(损失、学习率)定期输出,便于调参。
- 通过
总结
该代码是 GPT 预训练的 “工程化核心”,通过GPTTrainer类将零散的训练组件(优化器、调度器、混合精度、模型保存)封装成标准化流程,同时通过main函数提供低门槛的启动入口。整体逻辑围绕 “稳定、高效、可复现” 设计,可直接用于实际 GPT 模型的预训练,也可基于此扩展微调或其他任务的训练逻辑。
8. finetune.py
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torch.cuda.amp import GradScaler, autocast
import os
import logging
from tqdm import tqdm
from typing import Dict, Tuple, Optional
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizer
from data_loader import create_sft_dataloader, GPTPPODatasetlogging.basicConfig(level=logging.INFO,format="%(asctime)s - %(levelname)s - %(message)s",handlers=[logging.FileHandler("finetune.log"), logging.StreamHandler()]
)def sft_finetune(pretrained_model_path: str,sft_data_path: str,tokenizer: ByteLevelBPETokenizer,output_dir: str = "models/sft",d_model: int = 768,n_layers: int = 12,n_heads: int = 12,d_ff: int = 3072,max_seq_len: int = 2048,batch_size: int = 4,lr: float = 3e-5,num_epochs: int = 3,device: Optional[torch.device] = None
) -> None:"""SFT(监督微调)实现"""device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")os.makedirs(output_dir, exist_ok=True)# 1. 加载模型并加载预训练权重model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=d_model,n_layers=n_layers,n_heads=n_heads,d_ff=d_ff,max_seq_len=max_seq_len).to(device)pretrained_state = torch.load(pretrained_model_path, map_location=device)["model_state_dict"]model.load_state_dict(pretrained_state)logging.info(f"加载预训练模型:{pretrained_model_path}")# 2. 创建数据加载器train_loader = create_sft_dataloader(file_path=sft_data_path,tokenizer=tokenizer,batch_size=batch_size,max_seq_len=max_seq_len)# 3. 优化器与调度器optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=num_epochs * len(train_loader))scaler = GradScaler()best_loss = float("inf")# 4. 训练循环for epoch in range(num_epochs):model.train()epoch_losses = []pbar = tqdm(train_loader, desc=f"SFT Epoch {epoch + 1}/{num_epochs}")for batch in pbar:optimizer.zero_grad()input_ids = batch["input_ids"].to(device)attention_mask = batch["attention_mask"].to(device)labels = batch["labels"].to(device) # 已处理:指令部分为-100with autocast():outputs = model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)loss = outputs["loss"]scaler.scale(loss).backward()scaler.step(optimizer)scaler.update()scheduler.step()epoch_losses.append(loss.item())pbar.set_postfix({"loss": f"{loss.item():.4f}"})avg_loss = sum(epoch_losses) / len(epoch_losses)logging.info(f"SFT Epoch {epoch + 1} | Avg Loss: {avg_loss:.4f}")# 保存最佳模型if avg_loss < best_loss:best_loss = avg_losssave_path = os.path.join(output_dir, "sft_best.pth")torch.save({"model_state_dict": model.state_dict()}, save_path)logging.info(f"最佳SFT模型保存至:{save_path}")logging.info("SFT微调完成!")class RewardModel(nn.Module):"""RLHF奖励模型(对生成文本打分)"""def __init__(self, base_model: GPTModel, hidden_size: int = 256):super().__init__()self.base_model = base_modelself.reward_head = nn.Sequential(nn.Linear(base_model.d_model, hidden_size),nn.Tanh(),nn.Linear(hidden_size, 1) # 输出标量奖励分)def forward(self,input_ids: torch.Tensor,attention_mask: torch.Tensor,prompt_len: int # 指令部分长度(仅对响应部分打分)) -> torch.Tensor:""":return: 奖励分 (batch_size,)"""batch_size = input_ids.shape[0]outputs = self.base_model(input_ids=input_ids, attention_mask=attention_mask)last_hidden_state = outputs["logits"] # (batch_size, seq_len, vocab_size) → 复用logits的隐藏状态# 仅取响应部分的最后一个token的隐藏状态作为特征response_last_idx = attention_mask.sum(dim=1) - 1 # 每个样本响应的最后位置response_features = last_hidden_state[torch.arange(batch_size), response_last_idx] # (batch_size, d_model)rewards = self.reward_head(response_features).squeeze(-1) # (batch_size,)return rewardsdef train_reward_model(sft_model_path: str,pairwise_data_path: str,tokenizer: ByteLevelBPETokenizer,output_dir: str = "models/reward",d_model: int = 768,n_layers: int = 12,n_heads: int = 12,d_ff: int = 3072,max_seq_len: int = 2048,batch_size: int = 4,lr: float = 1e-5,num_epochs: int = 3,device: Optional[torch.device] = None
) -> None:"""训练奖励模型(RLHF第一步)"""device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")os.makedirs(output_dir, exist_ok=True)# 1. 加载SFT模型作为基础base_model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=d_model,n_layers=n_layers,n_heads=n_heads,d_ff=d_ff,max_seq_len=max_seq_len).to(device)sft_state = torch.load(sft_model_path, map_location=device)["model_state_dict"]base_model.load_state_dict(sft_state)# 2. 初始化奖励模型reward_model = RewardModel(base_model).to(device)# 3. 加载pairwise数据dataset = GPTPPODataset(file_path=pairwise_data_path,tokenizer=tokenizer,max_seq_len=max_seq_len)dataloader = DataLoader(dataset,batch_size=batch_size,shuffle=True,num_workers=4,pin_memory=True)# 4. 训练配置optimizer = optim.AdamW(reward_model.parameters(), lr=lr, weight_decay=0.01)criterion = nn.BCEWithLogitsLoss() # 二分类损失(判断chosen是否优于rejected)best_loss = float("inf")# 5. 训练循环for epoch in range(num_epochs):reward_model.train()epoch_losses = []pbar = tqdm(dataloader, desc=f"Reward Model Epoch {epoch + 1}/{num_epochs}")for batch in pbar:optimizer.zero_grad()chosen_ids = batch["chosen_ids"].to(device)rejected_ids = batch["rejected_ids"].to(device)chosen_mask = (chosen_ids != tokenizer.vocab["<pad>"]).long()rejected_mask = (rejected_ids != tokenizer.vocab["<pad>"]).long()prompt_len = batch["prompt_len"].to(device)# 计算奖励分chosen_rewards = reward_model(chosen_ids, chosen_mask, prompt_len)rejected_rewards = reward_model(rejected_ids, rejected_mask, prompt_len)# 损失:希望chosen奖励 > rejected奖励(标签为1)logits = chosen_rewards - rejected_rewards # (batch_size,)labels = torch.ones_like(logits, device=device) # 标签:chosen应更优loss = criterion(logits, labels)loss.backward()optimizer.step()epoch_losses.append(loss.item())pbar.set_postfix({"loss": f"{loss.item():.4f}"})avg_loss = sum(epoch_losses) / len(epoch_losses)logging.info(f"Reward Model Epoch {epoch + 1} | Avg Loss: {avg_loss:.4f}")if avg_loss < best_loss:best_loss = avg_losssave_path = os.path.join(output_dir, "reward_best.pth")torch.save({"model_state_dict": reward_model.state_dict()}, save_path)logging.info(f"最佳奖励模型保存至:{save_path}")logging.info("奖励模型训练完成!")def rlhf_ppo(sft_model_path: str,reward_model_path: str,pairwise_data_path: str,tokenizer: ByteLevelBPETokenizer,output_dir: str = "models/rlhf",d_model: int = 768,n_layers: int = 12,n_heads: int = 12,d_ff: int = 3072,max_seq_len: int = 2048,batch_size: int = 2,ppo_epochs: int = 5,clip_epsilon: float = 0.2, # PPO剪辑系数gamma: float = 0.99, # 奖励折扣因子lam: float = 0.95, # GAE系数device: Optional[torch.device] = None
) -> None:"""PPO优化(RLHF第二步)"""device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")os.makedirs(output_dir, exist_ok=True)# 1. 初始化策略模型(待优化)和参考模型(固定)policy_model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=d_model,n_layers=n_layers,n_heads=n_heads,d_ff=d_ff,max_seq_len=max_seq_len).to(device)ref_model = GPTModel( # 参考模型(固定参数,用于计算优势)vocab_size=len(tokenizer.vocab),d_model=d_model,n_layers=n_layers,n_heads=n_heads,d_ff=d_ff,max_seq_len=max_seq_len).to(device)# 加载SFT权重初始化sft_state = torch.load(sft_model_path, map_location=device)["model_state_dict"]policy_model.load_state_dict(sft_state)ref_model.load_state_dict(sft_state)ref_model.eval() # 参考模型不更新# 2. 加载奖励模型reward_model = RewardModel(ref_model).to(device) # 复用参考模型的基础结构reward_state = torch.load(reward_model_path, map_location=device)["model_state_dict"]reward_model.load_state_dict(reward_state)reward_model.eval()# 3. 数据加载dataset = GPTPPODataset(file_path=pairwise_data_path,tokenizer=tokenizer,max_seq_len=max_seq_len)dataloader = DataLoader(dataset,batch_size=batch_size,shuffle=True,num_workers=4,pin_memory=True)# 4. 优化器optimizer = optim.AdamW(policy_model.parameters(), lr=3e-6, weight_decay=0.01)# 5. PPO循环for ppo_epoch in range(ppo_epochs):logging.info(f"PPO Epoch {ppo_epoch + 1}/{ppo_epochs}")total_ppo_loss = 0.0for batch in tqdm(dataloader, desc="PPO Steps"):prompt_ids = batch["prompt_ids"].to(device)prompt_mask = batch["prompt_mask"].to(device)prompt_len = batch["prompt_len"].to(device)batch_size = prompt_ids.shape[0]# a. 策略模型生成响应policy_model.eval()with torch.no_grad():# 从prompt开始生成响应(最大生成50token)generated_ids = policy_model.generate(input_ids=prompt_ids,max_gen_len=50,top_k=50,temperature=0.7)# 拼接prompt和生成的响应full_ids = torch.tensor(generated_ids, device=device).unsqueeze(0) # (1, full_seq_len)full_mask = (full_ids != tokenizer.vocab["<pad>"]).long()# b. 计算策略分布和参考分布policy_model.train()policy_logits = policy_model(input_ids=full_ids)["logits"] # (1, seq_len, vocab_size)with torch.no_grad():ref_logits = ref_model(input_ids=full_ids)["logits"] # 参考模型分布# 提取响应部分的logits(仅响应部分参与策略更新)response_logits = policy_logits[:, prompt_len[0]:-1, :] # (1, response_len-1, vocab_size)response_ids = full_ids[:, prompt_len[0] + 1:] # (1, response_len-1)ref_response_logits = ref_logits[:, prompt_len[0]:-1, :]# 计算策略概率和参考概率policy_probs = torch.gather(F.softmax(response_logits, dim=-1), -1, response_ids.unsqueeze(-1)).squeeze(-1)ref_probs = torch.gather(F.softmax(ref_response_logits, dim=-1), -1, response_ids.unsqueeze(-1)).squeeze(-1)ratio = policy_probs / (ref_probs + 1e-8) # 重要性权重# c. 计算奖励和优势with torch.no_grad():rewards = reward_model(full_ids, full_mask, prompt_len) # (1,)# 简化:使用单步奖励作为优势(实际应使用GAE)advantages = rewards.repeat(response_ids.shape[1]) # (response_len-1,)# d. PPO损失(剪辑版)surr1 = ratio * advantagessurr2 = torch.clamp(ratio, 1 - clip_epsilon, 1 + clip_epsilon) * advantagesppo_loss = -torch.min(surr1, surr2).mean() # 负号:最大化奖励# e. 优化策略模型optimizer.zero_grad()ppo_loss.backward()optimizer.step()total_ppo_loss += ppo_loss.item()avg_ppo_loss = total_ppo_loss / len(dataloader)logging.info(f"PPO Epoch {ppo_epoch + 1} | Avg PPO Loss: {avg_ppo_loss:.4f}")# 保存模型save_path = os.path.join(output_dir, f"ppo_epoch_{ppo_epoch + 1}.pth")torch.save({"model_state_dict": policy_model.state_dict()}, save_path)logging.info(f"PPO模型保存至:{save_path}")# 保存最终最佳模型final_path = os.path.join(output_dir, "ppo_best.pth")torch.save({"model_state_dict": policy_model.state_dict()}, final_path)logging.info(f"最终PPO模型保存至:{final_path}")logging.info("RLHF-PPO优化完成!")# 微调入口(支持SFT和RLHF)
def main():import argparseparser = argparse.ArgumentParser()parser.add_argument("--mode", type=str, required=True, choices=["sft", "rlhf"])parser.add_argument("--data_path", type=str, required=True)parser.add_argument("--pretrained_model", type=str, help="预训练模型路径(SFT用)")parser.add_argument("--sft_model", type=str, help="SFT模型路径(RLHF用)")parser.add_argument("--output_dir", type=str, default="models/finetune")args = parser.parse_args()# 加载分词器tokenizer = ByteLevelBPETokenizer.from_pretrained("models/bpe_tokenizer")if args.mode == "sft":sft_finetune(pretrained_model_path=args.pretrained_model,sft_data_path=args.data_path,tokenizer=tokenizer,output_dir=args.output_dir)elif args.mode == "rlhf":# 先训练奖励模型,再进行PPOreward_dir = os.path.join(args.output_dir, "reward")train_reward_model(sft_model_path=args.sft_model,pairwise_data_path=args.data_path,tokenizer=tokenizer,output_dir=reward_dir)rlhf_ppo(sft_model_path=args.sft_model,reward_model_path=os.path.join(reward_dir, "reward_best.pth"),pairwise_data_path=args.data_path,tokenizer=tokenizer,output_dir=args.output_dir)if __name__ == "__main__":main()
该代码实现了 GPT 模型微调阶段的完整流程,涵盖监督微调(SFT) 和人类反馈强化学习(RLHF) 两大核心任务,通过 “指令对齐→偏好学习→策略优化” 的链路,让模型输出更符合人类需求的文本。以下是各组件功能的详细拆解:
整体流程与核心目标
代码的核心是解决 “预训练模型仅会通用语言建模,需通过微调对齐人类需求” 的问题:
- SFT(监督微调):让模型学习 “指令→响应” 的映射(如 “指令:解释 AI→响应:人工智能是…”),实现基础指令对齐。
- RLHF(人类反馈强化学习):分两步优化:
- 训练奖励模型(Reward Model):学习人类对 “好响应” 和 “差响应” 的偏好。
- 用PPO(近端策略优化):基于奖励模型的打分,优化模型策略,让生成的响应更符合人类偏好。
核心组件详解
1. sft_finetune函数(监督微调)
作用:基于 “指令 - 响应对” 数据,微调预训练 GPT 模型,使其具备指令跟随能力。
输入参数(关键)
pretrained_model_path:预训练模型权重路径(如models/pretrain/pretrain_best.pth)。sft_data_path:SFT 数据路径(格式[{"instruction": "...", "response": "..."}])。tokenizer:字节级 BPE 分词器。- 模型 / 训练参数:
d_model(模型维度)、batch_size(批量大小)、lr(学习率,SFT 常用3e-5,比预训练小一个量级)。
核心逻辑
-
加载预训练模型:
- 初始化
GPTModel,加载预训练权重(复用通用语言建模能力),避免从零训练。 - 模型移至 GPU/CPU 设备,确保计算效率。
- 初始化
-
创建 SFT 数据加载器:
- 调用
create_sft_dataloader(来自data_loader模块),加载 “指令 - 响应对” 数据,自动处理为input_ids、attention_mask、labels(指令部分设为-100,仅响应部分计算损失)。
- 调用
-
优化器与训练配置:
- 优化器:
AdamW(带weight_decay=0.01,防止过拟合,SFT 数据量小,过拟合风险更高)。 - 学习率调度:
CosineAnnealingLR(余弦衰减,让学习率随训练进度缓慢下降,稳定收敛)。 - 混合精度训练:
GradScaler(FP16 精度,降低显存占用,适配 SFT 阶段较小的批量)。
- 优化器:
-
SFT 训练循环:
- 每轮遍历 SFT 数据加载器,对每个 batch:
- 梯度清零→数据移至设备→混合精度前向传播(计算响应部分的损失)→反向传播→参数更新→学习率调度。
- 用
tqdm显示进度,实时输出当前 batch 损失。
- 每轮结束计算平均损失,若损失为当前最低,保存 “最佳 SFT 模型”(仅保存模型权重,体积小,用于后续 RLHF)。
- 每轮遍历 SFT 数据加载器,对每个 batch:
-
日志与保存:
- 日志输出每轮平均损失、模型保存路径,确保训练过程可追溯。
2. RewardModel类(奖励模型)
作用:学习人类对 “响应质量” 的偏好,为 RLHF 的 PPO 阶段提供 “奖励打分”—— 输入 “指令 + 响应”,输出一个标量分数(分数越高,响应越符合人类偏好)。
结构设计
- 基础模型:复用
GPTModel(基于 SFT 模型初始化,保留指令理解能力)。 - 奖励头(Reward Head):两层线性网络 + Tanh 激活:
Linear(d_model → hidden_size)→Tanh→Linear(hidden_size → 1)。- 输出标量分数,避免过拟合,Tanh 激活让分数落在
[-1,1]区间,便于后续 PPO 计算。
forward方法(核心逻辑)
输入:input_ids(指令 + 响应的 Token ID)、attention_mask(掩码)、prompt_len(指令部分长度)。输出:rewards(每个样本的标量奖励分,形状(batch_size,))。
步骤:
- 基础模型前向传播:调用
GPTModel获取logits(复用模型的隐藏层特征,无需额外训练基础网络)。 - 提取响应特征:
- 用
attention_mask.sum(dim=1)-1找到 “响应部分的最后一个 Token 位置”(响应的结尾)。 - 提取该位置的
logits作为 “响应特征”(认为最后一个 Token 的特征能代表整个响应的质量)。
- 用
- 计算奖励分:将响应特征输入奖励头,输出标量分数,挤压维度后返回。
3. train_reward_model函数(训练奖励模型)
作用:用 “人类标注的偏好对(Pairwise)数据” 训练RewardModel,让模型学会 “区分好响应(Chosen)和差响应(Rejected)”。
输入参数(关键)
sft_model_path:SFT 模型路径(初始化奖励模型的基础网络)。pairwise_data_path:偏好对数据路径(格式[{"instruction": "...", "chosen": "好响应", "rejected": "差响应"}])。
核心逻辑
-
初始化奖励模型:
- 加载 SFT 模型权重作为
RewardModel的基础网络,移至设备。 - 仅训练 “奖励头”?不 —— 基础网络也会微调(少量更新,让隐藏层更适配 “偏好学习”,但避免遗忘指令理解)。
- 加载 SFT 模型权重作为
-
加载 Pairwise 数据:
- 用
GPTPPODataset(来自data_loader模块)加载数据,自动处理为chosen_ids(指令 + 好响应)、rejected_ids(指令 + 差响应)、prompt_len(指令长度)。
- 用
-
训练配置:
- 优化器:
AdamW(lr=1e-5,比 SFT 更小,避免奖励头过拟合)。 - 损失函数:
BCEWithLogitsLoss(二分类交叉熵,适合 “对比学习”)。
- 优化器:
-
训练循环:
- 每轮遍历 Pairwise 数据加载器,对每个 batch:
- 梯度清零→数据移至设备(
chosen_ids/rejected_ids及对应掩码)。 - 计算奖励分:用
RewardModel分别对chosen_ids和rejected_ids打分,得到chosen_rewards和rejected_rewards。 - 计算损失:
- 损失逻辑:希望
chosen_rewards > rejected_rewards,因此构造logits = chosen_rewards - rejected_rewards(若chosen更好,logits为正)。 - 标签
labels = torch.ones_like(logits)(表示 “希望chosen比rejected好”)。 - 用
BCEWithLogitsLoss计算损失(等价于最大化log sigmoid(chosen_rewards - rejected_rewards))。
- 损失逻辑:希望
- 反向传播→参数更新→记录损失。
- 梯度清零→数据移至设备(
- 每轮结束保存 “最佳奖励模型”(用于后续 PPO 阶段打分)。
- 每轮遍历 Pairwise 数据加载器,对每个 batch:
4. rlhf_ppo函数(PPO 优化)
作用:基于奖励模型的打分,用 PPO 算法优化 SFT 模型(策略模型),让模型生成的响应 “既符合人类偏好(高奖励),又不过度偏离原始 SFT 能力(稳定更新)”。
核心概念
- 策略模型(Policy Model):待优化的模型(基于 SFT 模型初始化,初始策略是 SFT 的响应生成逻辑)。
- 参考模型(Reference Model):固定参数的模型(与策略模型初始权重一致,用于计算 “策略更新的幅度”,避免更新幅度过大导致不稳定)。
- PPO 核心:通过 “剪辑损失(Clipped Surrogate Loss)” 限制策略更新幅度,平衡 “高奖励” 和 “策略稳定性”。
输入参数(关键)
sft_model_path/reward_model_path:SFT 模型(初始化策略 / 参考模型)、奖励模型(打分)路径。pairwise_data_path:偏好对数据(提取指令部分,用于生成响应)。clip_epsilon:PPO 剪辑系数(默认0.2,剪辑范围[1-ε, 1+ε])。
核心逻辑
-
初始化模型:
- 策略模型 / 参考模型:均加载 SFT 权重,参考模型设为
eval()(固定参数,不更新)。 - 奖励模型:加载训练好的权重,设为
eval()(仅用于打分,不更新)。
- 策略模型 / 参考模型:均加载 SFT 权重,参考模型设为
-
加载数据:
- 用
GPTPPODataset加载数据,提取prompt_ids(仅指令部分,用于生成响应)。
- 用
-
PPO 训练循环:每轮(
ppo_epochs)遍历数据加载器,对每个 batch:a. 生成响应:- 策略模型设为
eval(),基于prompt_ids生成响应(max_gen_len=50,top_k=50,保证生成多样性)。 - 拼接 “指令 + 响应” 得到
full_ids(完整序列,用于后续打分和概率计算)。
b. 计算概率分布:
- 策略模型设为
train(),前向传播full_ids得到policy_logits,计算 “响应部分” 的 Token 生成概率(policy_probs)。 - 参考模型前向传播
full_ids得到ref_logits,计算 “响应部分” 的概率(ref_probs)。 - 计算
ratio = policy_probs / (ref_probs + 1e-8)(重要性权重,衡量策略与参考模型的差异)。
c. 计算奖励与优势:
- 奖励:用奖励模型对
full_ids打分,得到rewards(单步奖励,简化版;实际可加折扣因子,即 GAE)。 - 优势(Advantage):简化为
advantages = rewards.repeat(响应长度)(实际需用 GAE 计算,此处为简化版,核心是 “奖励与基准的差值”)。
d. 计算 PPO 剪辑损失:
- 损失公式:
loss = -min(ratio*advantages, clip(ratio, 1-ε, 1+ε)*advantages).mean()。 - 逻辑:若
ratio超出[1-ε, 1+ε],用剪辑后的ratio计算损失,避免策略更新幅度过大;负号表示 “最小化损失等价于最大化奖励”。
e. 优化策略模型:
- 梯度清零→反向传播→参数更新→记录损失。
f. 保存模型:
- 每轮保存当前策略模型,最后保存 “最终 PPO 模型”(优化后的模型,具备人类偏好对齐能力)。
- 策略模型设为
5. main函数(微调入口)
作用:提供命令行接口,支持用户选择 “sft” 或 “rlhf” 模式,串联完整微调流程。
逻辑
- 解析参数:通过
argparse接收模式(--mode)、数据路径(--data_path)、模型路径(--pretrained_model/--sft_model)等参数。 - 加载分词器:复用 ByteLevelBPETokenizer,确保编码格式一致。
- 执行对应模式:
- 若为
--mode sft:调用sft_finetune,执行监督微调。 - 若为
--mode rlhf:先调用train_reward_model训练奖励模型,再调用rlhf_ppo执行 PPO 优化,形成完整 RLHF 流程。
- 若为
工程化设计亮点
-
模块化与复用:
- 模型复用:SFT→奖励模型→PPO 均基于 GPTModel,避免重复训练基础网络,节省算力。
- 数据复用:Pairwise 数据同时用于奖励模型训练和 PPO 的指令提取,减少数据标注成本。
-
训练稳定性:
- 学习率:SFT(
3e-5)→奖励模型(1e-5)→PPO(3e-6),逐步减小,适配数据量减少的场景。 - PPO 剪辑:
clip_epsilon=0.2限制更新幅度,避免策略崩溃(生成无意义文本)。
- 学习率:SFT(
-
可追溯性:
- 日志:每步损失、模型保存路径实时输出,支持问题定位。
- 模型保存:区分 “最佳模型”“epoch 模型”,便于后续选择最优权重。
总结
该代码实现了 GPT 模型从 “预训练→指令对齐→偏好对齐” 的完整微调链路:
- SFT:让模型 “听懂指令”,能生成符合指令的基础响应。
- 奖励模型:让模型 “理解人类偏好”,能区分好 / 差响应。
- PPO:让模型 “优化响应策略”,生成既符合偏好又稳定的响应。
每个模块均有明确目标和工程化设计,可直接用于实际微调任务,适配对话、问答等需要人类偏好对齐的场景。
9. evaluator.py
import json
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import numpy as np
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from rouge import Rouge
from typing import Optional
from typing import List, Dict, Tuple
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizer
from data_loader import GPTPretrainDataset, GPTSFTDatasetclass GPTEvaluator:def __init__(self, model: GPTModel, tokenizer: ByteLevelBPETokenizer, device: Optional[torch.device] = None):self.model = model.eval()self.tokenizer = tokenizerself.device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")self.model.to(self.device)self.rouge = Rouge()self.smoother = SmoothingFunction().method4 # BLEU平滑函数@torch.no_grad()def compute_perplexity(self, dataloader: DataLoader) -> float:"""计算困惑度(PPL,越低越好)"""total_loss = 0.0total_tokens = 0for batch in dataloader:input_ids = batch["input_ids"].to(self.device)attention_mask = batch["attention_mask"].to(self.device)labels = batch["labels"].to(self.device)outputs = self.model(input_ids=input_ids, attention_mask=attention_mask, labels=labels)loss = outputs["loss"]# 统计有效token数(非pad)token_count = attention_mask.sum().item() - input_ids.shape[0] # 减去<s>的计数total_loss += loss.item() * token_counttotal_tokens += token_countppl = np.exp(total_loss / total_tokens)return ppl@torch.no_grad()def generate_predictions(self,prompts: List[str],max_gen_len: int = 100,top_k: int = 50,temperature: float = 0.8) -> List[str]:"""生成预测文本"""predictions = []for prompt in prompts:# 编码promptencoding = self.tokenizer.encode(text=prompt, max_seq_len=len(prompt) + 10, add_special_tokens=True)input_ids = torch.tensor(encoding["input_ids"], dtype=torch.long).unsqueeze(0).to(self.device)# 生成generated_ids = self.model.generate(input_ids=input_ids,max_gen_len=max_gen_len,top_k=top_k,temperature=temperature)# 解码(去除prompt部分)full_text = self.tokenizer.decode(generated_ids)pred_text = full_text[len(prompt):].strip() # 只保留生成的响应部分predictions.append(pred_text)return predictionsdef compute_bleu(self, predictions: List[str], references: List[List[str]]) -> float:"""计算BLEU分数(0~1,越高越好)"""# 分词(按空格,实际应与模型分词一致)pred_tokens = [pred.lower().split() for pred in predictions]ref_tokens = [[ref.lower().split() for ref in refs] for refs in references]# 计算BLEU-4bleu_scores = []for pred, refs in zip(pred_tokens, ref_tokens):score = sentence_bleu(refs, pred, smoothing_function=self.smoother)bleu_scores.append(score)return np.mean(bleu_scores)def compute_rouge(self, predictions: List[str], references: List[str]) -> Dict[str, float]:"""计算ROUGE分数(R-1/R-2/R-L,0~1,越高越好)"""rouge_scores = {"rouge-1": [], "rouge-2": [], "rouge-l": []}for pred, ref in zip(predictions, references):try:scores = self.rouge.get_scores(pred.lower(), ref.lower())[0]rouge_scores["rouge-1"].append(scores["rouge-1"]["f"])rouge_scores["rouge-2"].append(scores["rouge-2"]["f"])rouge_scores["rouge-l"].append(scores["rouge-l"]["f"])except:# 处理空字符串等异常for key in rouge_scores:rouge_scores[key].append(0.0)# 取平均值return {k: np.mean(v) for k, v in rouge_scores.items()}def evaluate_pretrained(self, test_data_path: str, max_seq_len: int = 2048) -> Dict[str, float]:"""评估预训练模型(仅PPL)"""dataset = GPTPretrainDataset(file_path=test_data_path,tokenizer=self.tokenizer,max_seq_len=max_seq_len)dataloader = DataLoader(dataset, batch_size=8, shuffle=False, num_workers=4)ppl = self.compute_perplexity(dataloader)return {"perplexity": ppl}def evaluate_sft(self,test_data_path: str,max_seq_len: int = 2048,max_gen_len: int = 100) -> Dict[str, float]:"""评估SFT模型(PPL+BLEU+ROUGE)"""# 1. 加载测试数据with open(test_data_path, "r", encoding="utf-8") as f:test_data = json.load(f)prompts = [item["instruction"] for item in test_data]references = [item["response"] for item in test_data]ref_for_bleu = [[ref] for ref in references] # BLEU需要列表的列表# 2. 生成预测predictions = self.generate_predictions(prompts, max_gen_len=max_gen_len)# 3. 计算指标bleu = self.compute_bleu(predictions, ref_for_bleu)rouge = self.compute_rouge(predictions, references)# 4. 计算PPL(可选,需加载SFT测试集)sft_dataset = GPTSFTDataset(file_path=test_data_path,tokenizer=self.tokenizer,max_seq_len=max_seq_len)sft_dataloader = DataLoader(sft_dataset, batch_size=8, shuffle=False, num_workers=4)ppl = self.compute_perplexity(sft_dataloader)return {"perplexity": ppl,"bleu": bleu,"rouge-1": rouge["rouge-1"],"rouge-2": rouge["rouge-2"],"rouge-l": rouge["rouge-l"]}# 评估入口
def main():import argparseparser = argparse.ArgumentParser()parser.add_argument("--model_path", type=str, required=True)parser.add_argument("--test_data_path", type=str, required=True)parser.add_argument("--model_type", type=str, choices=["pretrain", "sft"], default="sft")args = parser.parse_args()# 加载分词器tokenizer = ByteLevelBPETokenizer.from_pretrained("models/bpe_tokenizer")# 加载模型model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=768,n_layers=12,n_heads=12,d_ff=3072)model.load_state_dict(torch.load(args.model_path, map_location="cpu")["model_state_dict"])# 评估evaluator = GPTEvaluator(model, tokenizer)if args.model_type == "pretrain":metrics = evaluator.evaluate_pretrained(args.test_data_path)print(f"预训练模型评估:PPL={metrics['perplexity']:.2f}")else:metrics = evaluator.evaluate_sft(args.test_data_path)print(f"SFT模型评估:")print(f" PPL: {metrics['perplexity']:.2f}")print(f" BLEU: {metrics['bleu']:.4f}")print(f" ROUGE-1: {metrics['rouge-1']:.4f}")print(f" ROUGE-2: {metrics['rouge-2']:.4f}")print(f" ROUGE-L: {metrics['rouge-l']:.4f}")if __name__ == "__main__":main()
该代码实现了GPT 模型的全方位评估工具,通过GPTEvaluator类封装了针对预训练模型和 SFT 微调模型的评估逻辑,支持计算困惑度(PPL)、BLEU、ROUGE等核心指标,全面衡量模型的语言建模能力和生成质量。以下是各组件功能的详细拆解:
核心评估类:GPTEvaluator
该类是评估的核心,整合了模型加载、预测生成、指标计算等功能,支持预训练模型和 SFT 模型的差异化评估。
1. 初始化(__init__方法)
作用:准备评估所需的基础组件,确保模型处于评估状态。
-
输入参数:
model:待评估的GPTModel实例(预训练或 SFT 模型)。tokenizer:ByteLevelBPETokenizer分词器(与训练时一致,确保编码 / 解码兼容)。device:评估设备(默认自动选择 GPU/CPU)。
-
核心初始化逻辑:
- 模型设为评估模式:
self.model.eval()(禁用 Dropout 等训练特有的层,确保评估结果稳定)。 - 模型移至目标设备:
self.model.to(self.device)(利用 GPU 加速评估)。 - 初始化评估工具:
self.rouge = Rouge():用于计算 ROUGE 分数(文本生成任务的常用指标)。self.smoother = SmoothingFunction().method4:BLEU 分数的平滑函数(处理短文本或低重叠度场景,避免分数为 0)。
- 模型设为评估模式:
2. 困惑度计算(compute_perplexity方法)
作用:计算模型的困惑度(Perplexity,PPL),衡量模型对文本序列的 “预测能力”——PPL 越低,模型对文本的建模效果越好(预训练模型的核心指标)。
-
原理:PPL 基于交叉熵损失计算,公式为
PPL = exp(平均交叉熵损失),本质是 “模型对序列中每个 token 的平均预测难度”。 -
流程:
- 遍历评估数据加载器(
dataloader),获取每个 batch 的input_ids、attention_mask、labels。 - 模型前向传播计算损失(
outputs["loss"],即交叉熵损失)。 - 统计有效 token 数:
attention_mask.sum().item() - input_ids.shape[0](减去<s>等起始 token 的计数,只算实际内容 token)。 - 累计总损失(
total_loss)和总有效 token 数(total_tokens)。 - 计算 PPL:
np.exp(total_loss / total_tokens)(通过指数函数将平均损失转换为 PPL)。
- 遍历评估数据加载器(
-
输出:单精度浮点数(PPL 值,越低越好)。
3. 预测生成(generate_predictions方法)
作用:基于输入的prompts(提示文本)生成模型的响应文本,为后续 BLEU、ROUGE 等生成指标的计算提供 “预测结果”。
-
输入参数:
prompts:提示文本列表(如 SFT 任务中的 “指令” 部分,["解释什么是AI", "介绍GPT模型"])。max_gen_len:最大生成长度(避免生成过长文本)。top_k/temperature:生成参数(控制生成的多样性和确定性)。
-
流程:
- 遍历每个
prompt,编码为input_ids(添加特殊 token,如<s>),并转为 tensor。 - 调用模型的
generate方法生成响应(自回归解码,基于 prompt 生成后续文本)。 - 解码生成的
generated_ids为文本,提取 “仅生成的响应部分”(去除原始 prompt,避免污染评估)。 - 收集所有预测文本,返回列表。
- 遍历每个
-
输出:模型生成的响应文本列表(与
prompts一一对应)。
4. BLEU 分数计算(compute_bleu方法)
作用:计算 BLEU(Bilingual Evaluation Understudy)分数,衡量生成文本与参考文本的 “n-gram 重叠度”(常用于机器翻译、文本生成任务,值范围 0~1,越高表示越相似)。
-
原理:基于 n-gram(1~4gram)的精确率,通过 brevity penalty(短句惩罚)调整,评估生成文本与参考文本的匹配度。
-
流程:
- 文本分词:将预测文本(
predictions)和参考文本(references)按空格分词(实际应与模型分词逻辑一致,此处简化),转为小写。 - 格式适配:
references需转为 “列表的列表”(支持多个参考文本,此处每个样本仅 1 个参考)。 - 计算 BLEU-4:调用
sentence_bleu,应用平滑函数(self.smoother)处理低重叠场景,计算每个样本的分数。 - 取平均值:返回所有样本的平均 BLEU 分数。
- 文本分词:将预测文本(
-
输出:单精度浮点数(BLEU 值,越高越好)。
5. ROUGE 分数计算(compute_rouge方法)
作用:计算 ROUGE(Recall-Oriented Understudy for Gisting Evaluation)分数,衡量生成文本与参考文本的 “重叠子序列”(尤其适合摘要、问答等任务,包括 ROUGE-1、ROUGE-2、ROUGE-L)。
-
各指标含义:
ROUGE-1:单字(unigram)重叠的 F1 分数。ROUGE-2:双字(bigram)重叠的 F1 分数。ROUGE-L:最长公共子序列(LCS)的 F1 分数(不要求连续,更关注语义连贯性)。
-
流程:
- 遍历每个预测文本(
pred)和参考文本(ref),转为小写。 - 调用
self.rouge.get_scores计算当前样本的 ROUGE 分数(取 F1 值,综合精确率和召回率)。 - 异常处理:若文本为空或格式错误,当前样本分数记为 0(避免评估中断)。
- 取平均值:返回各 ROUGE 指标的平均分数。
- 遍历每个预测文本(
-
输出:字典(
{"rouge-1": 分数, "rouge-2": 分数, "rouge-l": 分数},值范围 0~1,越高越好)。
6. 预训练模型评估(evaluate_pretrained方法)
作用:专门评估预训练模型(仅关注语言建模能力),输出核心指标 PPL。
-
流程:
- 加载预训练测试集:用
GPTPretrainDataset加载测试数据(纯文本,无指令 - 响应结构)。 - 创建数据加载器:批量加载数据,确保评估效率。
- 计算 PPL:调用
compute_perplexity,返回结果。
- 加载预训练测试集:用
-
输出:字典(
{"perplexity": PPL值})。
7. SFT 模型评估(evaluate_sft方法)
作用:评估 SFT 微调模型(关注指令跟随能力和生成质量),输出 PPL、BLEU、ROUGE 综合指标。
-
流程:
- 加载 SFT 测试数据:读取 JSON 格式的 “指令 - 响应对” 数据(
[{"instruction": "...", "response": "..."}]),提取prompts(指令)和references(参考响应)。 - 生成预测响应:调用
generate_predictions,基于prompts生成模型的响应文本。 - 计算生成指标:调用
compute_bleu和compute_rouge,得到生成文本与参考的相似度指标。 - 计算 PPL:用
GPTSFTDataset加载测试数据(带指令 - 响应结构),创建数据加载器,调用compute_perplexity(衡量模型对 SFT 数据的拟合度)。
- 加载 SFT 测试数据:读取 JSON 格式的 “指令 - 响应对” 数据(
-
输出:字典(包含
perplexity、bleu、rouge-1、rouge-2、rouge-l)。
评估入口(main函数)
作用:提供命令行接口,支持用户指定模型路径、测试数据路径和模型类型,一键启动评估并打印结果。
- 流程:
- 解析命令行参数:
--model_path(模型权重路径)、--test_data_path(测试数据路径)、--model_type(模型类型:pretrain或sft)。 - 加载分词器:
ByteLevelBPETokenizer.from_pretrained(与训练时一致)。 - 加载模型:初始化
GPTModel,加载指定路径的权重。 - 初始化评估器:
GPTEvaluator(model, tokenizer)。 - 执行评估:
- 若为预训练模型:调用
evaluate_pretrained,打印 PPL。 - 若为 SFT 模型:调用
evaluate_sft,打印 PPL、BLEU、ROUGE 各项指标。
- 若为预训练模型:调用
- 解析命令行参数:
核心功能总结
该代码是 GPT 模型评估的 “一站式工具”,核心价值在于:
- 多阶段适配:区分预训练模型(仅 PPL)和 SFT 模型(PPL+BLEU+ROUGE),匹配不同阶段的评估需求。
- 多指标覆盖:
- PPL:衡量语言建模能力(模型对文本的预测准确性)。
- BLEU:衡量生成文本与参考的 n-gram 重叠(表层相似度)。
- ROUGE:衡量生成文本与参考的子序列重叠(更关注语义连贯性)。
- 工程化细节:
- 自动处理设备分配(GPU/CPU),提升评估效率。
- 异常处理(如空文本),确保评估流程稳定。
- 生成结果自动剥离 prompt,避免原始输入污染评估指标。
通过这些功能,用户可全面衡量模型性能,为模型优化(如调整超参数、增加数据量)提供量化依据。
10. inference/quantizer.py
import torch
import torch.nn as nn
from gpt_model import GPTModel
from typing import Dict, Anydef quantize_model_int8(model: GPTModel) -> GPTModel:"""将模型量化为INT8(仅权重,激活保持FP16)"""quantized_model = model.to(dtype=torch.float16) # 激活用FP16# 遍历所有线性层,量化权重for name, module in quantized_model.named_modules():if isinstance(module, nn.Linear):# 权重从FP16量化为INT8(缩放至[-127, 127])weight = module.weight.datascale = torch.max(torch.abs(weight)) / 127.0 # 缩放因子weight_int8 = torch.round(weight / scale).to(torch.int8)# 替换为量化权重,并保存缩放因子(推理时需用)module.weight = nn.Parameter(weight_int8, requires_grad=False)module.register_buffer("scale", scale) # 保存缩放因子return quantized_modeldef int8_linear_forward(self, x: torch.Tensor) -> torch.Tensor:"""替换Linear层的forward方法,支持INT8权重推理"""# INT8权重 → 转换为FP16并乘以缩放因子weight = self.weight.to(torch.float16) * self.scalereturn torch.nn.functional.linear(x, weight, self.bias)def enable_int8_inference(model: GPTModel) -> GPTModel:"""启用INT8推理(替换Linear层的forward方法)"""for module in model.modules():if isinstance(module, nn.Linear) and hasattr(module, "scale"):module.forward = int8_linear_forward.__get__(module, nn.Linear)return modeldef save_quantized_model(model: GPTModel, save_path: str) -> None:"""保存量化模型(包含权重和缩放因子)"""torch.save({"model_state_dict": model.state_dict(),"quantization": "int8"}, save_path)def load_quantized_model(model_config: Dict[str, Any],model_path: str,device: torch.device
) -> GPTModel:"""加载量化模型并启用INT8推理"""model = GPTModel(**model_config).to(device)checkpoint = torch.load(model_path, map_location=device)model.load_state_dict(checkpoint["model_state_dict"])model = enable_int8_inference(model)return model# 量化示例
def main():from byte_level_bpe import ByteLevelBPETokenizer# 加载原始模型tokenizer = ByteLevelBPETokenizer.from_pretrained("models/bpe_tokenizer")model = GPTModel(vocab_size=len(tokenizer.vocab),d_model=768,n_layers=12,n_heads=12,d_ff=3072)model.load_state_dict(torch.load("models/sft/sft_best.pth", map_location="cpu")["model_state_dict"])# 量化为INT8quantized_model = quantize_model_int8(model)quantized_model = enable_int8_inference(quantized_model)# 保存量化模型(体积约为原始的1/4)save_quantized_model(quantized_model, "models/sft/sft_best_int8.pth")print("INT8量化模型保存完成")if __name__ == "__main__":main()
该代码实现了GPT 模型的 INT8 量化工具链,核心目标是将模型从 FP16/FP32 精度压缩到 INT8 精度(仅量化权重,激活保持 FP16),在小幅损失精度的前提下,减少 75%~50% 的显存占用并提升推理速度(尤其适配支持 INT8 加速的硬件)。以下是各组件功能的详细拆解:
核心量化原理
量化的核心逻辑是 “权重从高精度(FP16/FP32)转为低精度(INT8),推理时再转回高精度与激活计算”:
- INT8 权重范围:
[-127, 127](避免溢出,预留 1 个符号位)。 - 缩放因子(Scale):
Scale = max(abs(FP权重)) / 127,用于将 FP 权重 “缩放” 到 INT8 范围,推理时再用 Scale 将 INT8 权重 “恢复” 到 FP16 精度。 - 精度平衡:仅量化权重(模型参数的主要来源),激活保持 FP16(避免激活值波动导致的精度丢失),兼顾压缩率与推理质量。
核心函数详解
1. quantize_model_int8(模型 INT8 量化)
作用:将 GPT 模型的所有线性层(nn.Linear)权重从 FP16 量化到 INT8,保存缩放因子,完成量化核心操作。
-
输入:未量化的
GPTModel实例(FP16/FP32 精度)。 -
输出:INT8 量化后的
GPTModel实例(权重 INT8,激活 FP16)。 -
核心流程:
- 激活精度设置:
quantized_model = model.to(dtype=torch.float16)将模型整体转为 FP16(激活计算用 FP16,平衡精度与速度)。 - 遍历线性层量化:模型中 90% 以上的参数集中在
nn.Linear层(注意力头、前馈网络),仅量化这类层性价比最高:- 提取线性层权重:
weight = module.weight.data(FP16 精度)。 - 计算缩放因子:
scale = torch.max(torch.abs(weight)) / 127.0确保权重缩放后能完全落入 INT8 的[-127, 127]范围,无溢出。 - 权重量化:
weight_int8 = torch.round(weight / scale).to(torch.int8)权重除以 Scale 缩放→四舍五入取整→转为 INT8 类型。 - 替换权重与保存 Scale:
- 将量化后的 INT8 权重赋值给线性层:
module.weight = nn.Parameter(weight_int8, requires_grad=False)(量化后模型无需训练,禁用梯度)。 - 保存 Scale 到层的 Buffer:
module.register_buffer("scale", scale)Buffer 会随模型state_dict一起保存,推理时需用它恢复权重精度。
- 将量化后的 INT8 权重赋值给线性层:
- 提取线性层权重:
- 激活精度设置:
2. int8_linear_forward(INT8 线性层前向传播)
作用:替换nn.Linear的默认前向逻辑,适配 INT8 权重的推理 —— 将 INT8 权重转回 FP16 后,再与 FP16 激活做矩阵乘法。
-
背景:PyTorch 默认的
nn.Linear.forward不支持 INT8 权重,直接使用会报错或精度错乱,需自定义前向逻辑。 -
核心逻辑:
def int8_linear_forward(self, x: torch.Tensor) -> torch.Tensor:# INT8权重 → 转回FP16:INT8权重 * Scale(恢复到原始精度范围)weight = self.weight.to(torch.float16) * self.scale# 正常线性计算:FP16激活 x FP16权重 + 偏置(若有)return torch.nn.functional.linear(x, weight, self.bias)- 步骤 1:权重恢复:INT8 权重先转为 FP16,再乘以
self.scale(量化时保存的缩放因子),恢复到接近原始 FP16 的精度范围。 - 步骤 2:线性计算:用恢复后的 FP16 权重与 FP16 激活(
x)做矩阵乘法,确保计算精度与普通 FP16 模型一致。
- 步骤 1:权重恢复:INT8 权重先转为 FP16,再乘以
3. enable_int8_inference(启用 INT8 推理)
作用:遍历量化后的模型,将所有 “已量化的线性层”(即带有scale Buffer 的nn.Linear)的前向方法,替换为int8_linear_forward,确保推理时使用正确的计算逻辑。
-
核心流程:
- 遍历模型所有模块(
model.modules())。 - 筛选目标层:仅选择 “是线性层” 且 “有
scaleBuffer” 的模块(即isinstance(module, nn.Linear) and hasattr(module, "scale"))。 - 替换前向方法:通过
int8_linear_forward.__get__(module, nn.Linear),将自定义前向逻辑绑定到该线性层(Python 描述符机制,确保self指向当前线性层)。
- 遍历模型所有模块(
-
输出:已适配 INT8 推理的模型(可直接调用
model.generate或model.forward进行推理)。
4. save_quantized_model(保存量化模型)
作用:将量化后的模型权重、缩放因子(scale)及量化标记保存到文件,确保后续加载时能识别为 INT8 模型。
- 核心保存内容:
torch.save({"model_state_dict": model.state_dict(), # 包含INT8权重、scale Buffer"quantization": "int8" # 量化标记(用于加载时识别) }, save_path)model_state_dict:包含量化后的 INT8 权重、各线性层的scaleBuffer(已通过register_buffer加入 state_dict)。quantization: "int8":显式标记模型类型,避免与普通 FP16/FP32 模型混淆,方便加载时快速判断。
5. load_quantized_model(加载量化模型)
作用:从保存的文件中加载 INT8 量化模型,并自动启用 INT8 推理逻辑,无需手动配置。
-
输入参数:
model_config:GPT 模型的结构配置(如vocab_size、d_model、n_layers等),确保加载的模型结构与量化时一致。model_path:量化模型文件路径。device:加载模型的目标设备(GPU/CPU)。
-
核心流程:
- 初始化空模型:
model = GPTModel(**model_config).to(device)(按配置创建与原模型结构一致的空模型)。 - 加载 state_dict:
checkpoint = torch.load(model_path, map_location=device),将 INT8 权重、scaleBuffer 加载到模型。 - 启用 INT8 推理:调用
enable_int8_inference(model),自动替换线性层前向方法。
- 初始化空模型:
-
输出:可直接推理的 INT8 量化模型。
量化示例(main函数)
作用:提供完整的 “加载原始模型→INT8 量化→启用推理→保存模型” 流程示例,展示实际使用步骤。
-
核心步骤:
- 加载分词器:
ByteLevelBPETokenizer.from_pretrained(与训练时一致,确保编码 / 解码兼容)。 - 加载原始 SFT 模型:初始化
GPTModel,加载未量化的 SFT 权重(如models/sft/sft_best.pth)。 - 模型量化:调用
quantize_model_int8将模型转为 INT8。 - 启用 INT8 推理:调用
enable_int8_inference适配推理逻辑。 - 保存量化模型:调用
save_quantized_model保存到models/sft/sft_best_int8.pth。
- 加载分词器:
-
量化效果:注释中提到 “体积约为原始的 1/4”—— 若原始模型为 FP32(4 字节 / 参数),INT8(1 字节 / 参数)可减少 75% 体积;若原始为 FP16(2 字节 / 参数),则减少 50% 体积,显存占用同步降低。
核心价值与应用场景
- 显存优化:INT8 量化后模型参数体积大幅减少,适合显存有限的设备(如消费级 GPU、边缘设备)。
- 速度提升:支持 INT8 加速的硬件(如 NVIDIA Tensor Core、AMD RDNA 2)可通过 INT8 指令集提升推理速度(通常比 FP16 快 1.5~2 倍)。
- 精度平衡:仅量化权重 + 激活保持 FP16,在文本生成任务中精度损失可控(PPL 上升通常 < 10%,人类感知差异小)。
适用于推理阶段(如部署 API 服务、本地运行),不适合训练阶段(训练需高精度梯度,量化会导致梯度消失)。
总结
该代码形成了 “量化→推理适配→保存→加载” 的完整 INT8 工具链,无需依赖复杂框架(如 TensorRT、ONNX Runtime),纯 PyTorch 实现,易用性高。通过仅量化权重、自定义线性层前向逻辑,在保证推理精度的前提下,实现了模型的 “轻量化”,是 GPT 模型工程化部署的关键组件。
11. inference/kv_cache_infer.py
import torch
import torch.nn.functional as F
from typing import List, Dict, Optional, Tuple
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizerclass KVCache:"""KV缓存管理器(存储多头注意力的键值对,加速推理)"""def __init__(self, n_layers: int, n_heads: int, d_k: int, max_seq_len: int, device: torch.device):self.n_layers = n_layersself.n_heads = n_headsself.d_k = d_kself.max_seq_len = max_seq_lenself.device = device# 初始化缓存(layers x batch x heads x seq_len x d_k)self.k_cache = [torch.zeros(1, n_heads, 0, d_k, device=device) # 初始序列长度为0for _ in range(n_layers)]self.v_cache = [torch.zeros(1, n_heads, 0, d_k, device=device)for _ in range(n_layers)]def update(self, layer_idx: int, k: torch.Tensor, v: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:"""更新指定层的KV缓存"""# k/v形状:(batch=1, heads, new_seq_len, d_k)self.k_cache[layer_idx] = torch.cat([self.k_cache[layer_idx], k], dim=2)self.v_cache[layer_idx] = torch.cat([self.v_cache[layer_idx], v], dim=2)# 截断缓存至最大长度(防止OOM)if self.k_cache[layer_idx].shape[2] > self.max_seq_len:self.k_cache[layer_idx] = self.k_cache[layer_idx][:, :, -self.max_seq_len:, :]self.v_cache[layer_idx] = self.v_cache[layer_idx][:, :, -self.max_seq_len:, :]return self.k_cache[layer_idx], self.v_cache[layer_idx]def reset(self) -> None:"""重置缓存(新序列生成前调用)"""for i in range(self.n_layers):self.k_cache[i] = torch.zeros(1, self.n_heads, 0, self.d_k, device=self.device)self.v_cache[i] = torch.zeros(1, self.n_heads, 0, self.d_k, device=self.device)def patched_attention_forward(self,x: torch.Tensor,attention_mask: Optional[torch.Tensor] = None,causal_mask: bool = True,kv_cache: Optional[KVCache] = None,layer_idx: Optional[int] = None
) -> torch.Tensor:"""替换MultiHeadAttention的forward方法,支持KV缓存:param kv_cache: KVCache实例:param layer_idx: 当前层索引"""batch_size = x.shape[0]# 线性投影 + 多头拆分 (batch, heads, seq_len, d_k)q = self.w_q(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)k = self.w_k(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)v = self.w_v(x).view(batch_size, -1, self.n_heads, self.d_k).transpose(1, 2)# 使用KV缓存(仅推理时)if kv_cache is not None and layer_idx is not None:k, v = kv_cache.update(layer_idx, k, v) # 从缓存中获取历史KV并更新# 计算注意力得分attn_scores = torch.matmul(q, k.transpose(-2, -1)) / self.scale # (batch, heads, seq_len, seq_len_total)# 应用掩码(仅对新生成的token生效)if causal_mask:seq_len = x.shape[1] # 新输入的长度(1,因为推理时逐token生成)total_seq_len = k.shape[2] # 历史+新序列长度mask = torch.tril(torch.ones(seq_len, total_seq_len, device=x.device)).bool()attn_scores = attn_scores.masked_fill(~mask, -1e9)if attention_mask is not None:attn_scores = attn_scores.masked_fill(attention_mask.unsqueeze(1) == 0, -1e9)# 注意力计算attn_weights = F.softmax(attn_scores, dim=-1)attn_weights = self.dropout(attn_weights)attn_output = torch.matmul(attn_weights, v) # (batch, heads, seq_len, d_k)# 多头合并 + 输出投影attn_output = attn_output.transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)return self.w_o(attn_output)def enable_kv_cache_inference(model: GPTModel, kv_cache: KVCache) -> None:"""为模型启用KV缓存推理(替换注意力层的forward方法)"""for layer_idx, decoder_layer in enumerate(model.decoder_layers):# 替换多头注意力的forward方法,并绑定kv_cache和layer_idxdef make_forward(layer_idx):def forward(x, attention_mask=None, causal_mask=True):return patched_attention_forward(decoder_layer.attn,x,attention_mask=attention_mask,causal_mask=causal_mask,kv_cache=kv_cache,layer_idx=layer_idx)return forwarddecoder_layer.attn.forward = make_forward(layer_idx)@torch.no_grad()
def generate_with_kv_cache(model: GPTModel,tokenizer: ByteLevelBPETokenizer,prompt: str,max_gen_len: int = 100,top_k: int = 50,temperature: float = 0.8,device: torch.device = None
) -> str:"""带KV缓存的文本生成(速度提升2-3倍)"""device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu")model = model.eval().to(device)# 初始化KV缓存n_layers = len(model.decoder_layers)n_heads = model.decoder_layers[0].attn.n_headsd_k = model.d_model // n_headskv_cache = KVCache(n_layers=n_layers,n_heads=n_heads,d_k=d_k,max_seq_len=model.position_embedding.num_embeddings,device=device)enable_kv_cache_inference(model, kv_cache)# 编码promptencoding = tokenizer.encode(text=prompt, add_special_tokens=True)input_ids = torch.tensor(encoding["input_ids"], dtype=torch.long).unsqueeze(0).to(device)attention_mask = torch.tensor(encoding["attention_mask"], dtype=torch.long).unsqueeze(0).to(device)# 预热缓存(处理prompt)model(input_ids=input_ids, attention_mask=attention_mask)generated_ids = input_ids.squeeze(0).tolist()current_len = len(generated_ids)# 逐token生成for _ in range(max_gen_len):if current_len >= model.position_embedding.num_embeddings:break # 超过最大长度# 取最后一个token作为输入last_token = torch.tensor([generated_ids[-1]], dtype=torch.long).unsqueeze(0).to(device)last_mask = torch.tensor([1], dtype=torch.long).unsqueeze(0).to(device)# 前向传播(仅处理新token,复用缓存)outputs = model(input_ids=last_token, attention_mask=last_mask)logits = outputs["logits"][:, -1, :] # (1, vocab_size)# Top-K采样if top_k > 0:top_k_values, top_k_indices = torch.topk(logits, top_k, dim=-1)logits = torch.full_like(logits, -1e9, device=device)logits.scatter_(-1, top_k_indices, top_k_values)# 温度缩放if temperature != 1.0:logits = logits / temperature# 采样下一个tokenprobs = F.softmax(logits, dim=-1)next_token_id = torch.multinomial(probs, num_samples=1).item()# 终止条件if next_token_id == tokenizer.vocab["</s>"]:breakgenerated_ids.append(next_token_id)current_len += 1# 解码结果return tokenizer.decode(generated_ids)
该代码实现了GPT 模型自回归生成的 KV 缓存(Key-Value Cache)加速方案,核心目标是通过复用历史 Token 的注意力键(K)和值(V),避免重复计算,将推理速度提升 2-3 倍(尤其长序列生成时效果更显著)。以下是各组件功能的详细拆解:
核心原理:为什么需要 KV 缓存?
自回归生成的本质是 “逐 Token 生成”,每次新 Token 都需要与前面所有历史 Token计算注意力(如生成第 10 个 Token 时,需与第 1-9 个 Token 做注意力交互)。
- 无缓存时:每次生成新 Token,都会重新计算所有历史 Token + 新 Token的 K 和 V,计算量随序列长度增长呈 O (n²) 递增(n 为序列长度)。
- 有缓存时:首次处理 Prompt(历史 Token)时,将各层的 K 和 V 存入缓存;后续生成新 Token 时,仅计算新 Token 的 K 和 V,并复用缓存中的历史 K 和 V,计算量固定为 O (1)(仅新 Token 的计算),大幅减少冗余操作。
核心组件详解
1. KVCache类(KV 缓存管理器)
作用:作为 “存储容器”,管理所有解码器层的 K 和 V 缓存,负责缓存的初始化、更新(添加新 Token 的 K/V)和重置(新序列生成前清空)。
初始化(__init__)
-
输入参数:
n_layers:解码器层数(如 GPT-2 的 12 层)。n_heads:注意力头数(如 12 头)。d_k:单个注意力头的维度(如 768/12=64)。max_seq_len:缓存最大长度(避免缓存过长导致显存溢出,通常设为模型最大序列长度)。device:缓存存储设备(与模型一致,GPU/CPU)。
-
核心缓存结构:
# 缓存形状:[层索引] → (batch=1, 头数, 序列长度, 单头维度) self.k_cache = [torch.zeros(1, n_heads, 0, d_k, device=device) for _ in range(n_layers)] self.v_cache = [torch.zeros(1, n_heads, 0, d_k, device=device) for _ in range(n_layers)]- 初始序列长度为 0(无历史 Token)。
- Batch 固定为 1(自回归生成通常是单条请求,多 Batch 需调整缓存结构)。
缓存更新(update)
作用:将新 Token 的 K 和 V 拼接到历史缓存中,并截断超出最大长度的部分。
- 输入:
layer_idx(当前解码器层索引)、k(新 Token 的 K,形状(1, n_heads, new_len, d_k))、v(新 Token 的 V,同 K 形状)。 - 核心逻辑:
- 拼接历史与新缓存:
torch.cat([历史K, 新K], dim=2)(dim=2 为 “序列长度” 维度)。 - 缓存截断:若总长度超过
max_seq_len,仅保留最后max_seq_len个 Token 的缓存(避免显存溢出)。 - 返回更新后的完整缓存(供当前层注意力计算使用)。
- 拼接历史与新缓存:
缓存重置(reset)
作用:生成新序列前(如处理完一条 Prompt 后,再处理下一条),清空所有层的缓存,恢复初始状态(序列长度为 0),避免历史缓存污染新序列。
2. patched_attention_forward函数(带 KV 缓存的注意力前向传播)
作用:替换MultiHeadAttention(来自transformer_components)的默认前向方法,在原有注意力逻辑中加入 “复用 KV 缓存” 的逻辑 —— 仅计算新 Token 的 K/V,复用历史 K/V。
输入参数扩展
相比默认forward,新增 2 个关键参数:
kv_cache:KVCache实例(缓存管理器)。layer_idx:当前解码器层索引(明确更新哪一层的缓存)。
核心逻辑(关键修改点)
-
K/V 计算与缓存复用:
# 1. 正常计算新Token的Q、K、V(与默认逻辑一致) q = self.w_q(x).view(...) # Q仅与新Token相关,无需缓存 k = self.w_k(x).view(...) # 新Token的K v = self.w_v(x).view(...) # 新Token的V# 2. 复用历史缓存:若有缓存,更新当前层缓存并获取完整K/V(历史+新) if kv_cache is not None and layer_idx is not None:k, v = kv_cache.update(layer_idx, k, v)- Q(查询):仅依赖新 Token,每次都需重新计算,无需缓存。
- K/V(键 / 值):历史 Token 的 K/V 已存于缓存,仅需添加新 Token 的 K/V,返回 “历史 + 新” 的完整 K/V。
-
因果掩码适配:无缓存时,因果掩码是
(seq_len, seq_len)的下三角矩阵;有缓存时,新 Token 需与 “历史 + 新” 所有 Token 交互,掩码需调整为:seq_len = x.shape[1] # 新Token的长度(通常为1,逐Token生成) total_seq_len = k.shape[2] # 历史+新的总长度 mask = torch.tril(torch.ones(seq_len, total_seq_len, device=x.device)).bool()例如:新 Token 长度为 1,总长度为 10,掩码是
(1,10)的下三角矩阵,确保新 Token 仅关注前 9 个历史 Token + 自身。 -
其余逻辑不变:注意力得分计算、Softmax、Dropout、多头合并等,与默认
MultiHeadAttention完全一致,确保生成质量不受影响。
3. enable_kv_cache_inference函数(为模型启用 KV 缓存)
作用:遍历 GPT 模型的所有解码器层,将每层的MultiHeadAttention的前向方法,替换为patched_attention_forward,并绑定当前层的kv_cache和layer_idx,确保推理时自动复用缓存。
核心逻辑(闭包技巧)
由于循环中直接赋值layer_idx会导致所有层共享同一索引(Python 循环变量延迟绑定问题),需用闭包make_forward 固定每层的layer_idx:
def enable_kv_cache_inference(model: GPTModel, kv_cache: KVCache) -> None:for layer_idx, decoder_layer in enumerate(model.decoder_layers):# 闭包:固定当前layer_idx,避免循环变量污染def make_forward(layer_idx):def forward(x, attention_mask=None, causal_mask=True):# 调用带缓存的注意力前向方法,传入当前层的kv_cache和layer_idxreturn patched_attention_forward(decoder_layer.attn, # 注意力层实例x, attention_mask, causal_mask,kv_cache=kv_cache, layer_idx=layer_idx)return forward# 替换当前层注意力的forward方法decoder_layer.attn.forward = make_forward(layer_idx)
- 每个解码器层的注意力模块,都会绑定 “专属的
layer_idx” 和全局kv_cache,确保更新缓存时不会混淆层索引。
4. generate_with_kv_cache函数(带 KV 缓存的文本生成)
作用:完整的自回归生成流程,整合 KV 缓存初始化、模型适配、Prompt 预热、逐 Token 生成等步骤,是实际使用 KV 缓存的 “入口函数”。
核心流程
-
初始化准备:
- 模型设为评估模式(
model.eval()),移至目标设备。 - 初始化
KVCache:根据模型参数(层数、头数、单头维度)创建缓存实例。 - 启用 KV 缓存:调用
enable_kv_cache_inference,替换模型注意力的前向方法。
- 模型设为评估模式(
-
Prompt 预热(关键步骤):
- 编码 Prompt:将输入文本转为
input_ids和attention_mask。 - 预热缓存:调用
model(input_ids=input_ids, attention_mask=attention_mask),此时模型会计算 Prompt 中所有 Token 的 K/V,并存入缓存(为后续逐 Token 生成复用)。 - 记录初始
generated_ids(Prompt 的 Token ID 序列)。
- 编码 Prompt:将输入文本转为
-
逐 Token 生成(复用缓存):循环
max_gen_len次,每次仅处理 “最后一个 Token”:for _ in range(max_gen_len):# 1. 取最后一个Token作为输入(形状:(1, 1))last_token = torch.tensor([generated_ids[-1]], dtype=torch.long).unsqueeze(0).to(device)last_mask = torch.tensor([1], dtype=torch.long).unsqueeze(0).to(device)# 2. 前向传播:仅计算新Token的Q,复用缓存中的K/Voutputs = model(input_ids=last_token, attention_mask=last_mask)logits = outputs["logits"][:, -1, :] # 取新Token的预测Logits# 3. Top-K采样+温度缩放(与默认generate逻辑一致)if top_k > 0: ... # 过滤低概率Tokenif temperature != 1.0: ... # 调整生成随机性next_token_id = torch.multinomial(probs, num_samples=1).item()# 4. 终止条件:生成</s>则停止if next_token_id == tokenizer.vocab["</s>"]:break# 5. 更新生成序列generated_ids.append(next_token_id)current_len += 1- 关键优化:每次输入仅 1 个 Token,模型仅计算该 Token 的 Q,复用缓存中所有历史 Token 的 K/V,计算量从 O (n²) 降至 O (1)。
-
结果解码:将
generated_ids(Prompt + 生成的 Token)通过分词器decode为文本,返回最终结果。
核心优势与应用场景
- 速度大幅提升:长序列生成时(如生成 500Token),无缓存需重复计算 500 次 O (n²),有缓存仅需 1 次 O (n²)(Prompt 预热)+ 499 次 O (1),速度提升 2-3 倍。
- 显存占用可控:缓存仅存储 K/V(每个 Token 的 K/V 大小为
n_layers * n_heads * d_k),相比重复计算的中间变量,显存占用更低(尤其长序列时)。 - 生成质量不变:仅优化计算效率,注意力逻辑、采样策略与无缓存版本完全一致,生成文本的连贯性、准确性不受影响。
适用场景:实时文本生成任务,如对话机器人、实时续写、AI 写作等对延迟敏感的场景。
总结
该代码通过 “KVCache存储缓存→patched_attention_forward复用缓存→enable_kv_cache_inference适配模型→generate_with_kv_cache完整生成” 的链路,实现了自回归生成的高效加速。核心是 “避免历史 Token 的 K/V 重复计算”,在不损失生成质量的前提下,大幅降低推理延迟和计算开销,是 GPT 模型工程化部署的关键优化手段。
12. inference/api_service.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import torch
import json
import os
from typing import Dict, Optional
from gpt_model import GPTModel
from byte_level_bpe import ByteLevelBPETokenizer
from kv_cache_infer import generate_with_kv_cache
from quantizer import load_quantized_model# 初始化FastAPI
app = FastAPI(title="GPT Inference Service")# 全局变量:模型和分词器
model = None
tokenizer = None
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")# 请求体模型
class GenerateRequest(BaseModel):prefix: str # 输入前缀(提示词)max_gen_len: int = 100 # 最大生成长度top_k: int = 50 # Top-K采样temperature: float = 0.8 # 温度系数# 加载模型和分词器
def load_model_and_tokenizer():global model, tokenizer# 加载分词器tokenizer_dir = "models/bpe_tokenizer"tokenizer = ByteLevelBPETokenizer.from_pretrained(tokenizer_dir)print(f"分词器加载完成:{tokenizer_dir}")# 模型配置model_config = {"vocab_size": len(tokenizer.vocab),"d_model": 768,"n_layers": 12,"n_heads": 12,"d_ff": 3072,"max_seq_len": 2048}# 优先加载量化模型(如果存在)model_path = "models/sft/sft_best_int8.pth"if not os.path.exists(model_path):model_path = "models/sft/sft_best.pth" # 否则加载原始模型if "int8" in model_path:# 加载INT8量化模型model = load_quantized_model(model_config=model_config,model_path=model_path,device=device)else:# 加载原始模型model = GPTModel(**model_config).to(device)model.load_state_dict(torch.load(model_path, map_location=device)["model_state_dict"])model.eval()print(f"模型加载完成:{model_path},设备:{device}")# 启动时加载模型
@app.on_event("startup")
def startup_event():load_model_and_tokenizer()# 健康检查接口
@app.get("/health")
def health_check() -> Dict[str, str]:if model is not None and tokenizer is not None:return {"status": "healthy", "model_status": "loaded", "device": str(device)}else:return {"status": "unhealthy", "model_status": "not_loaded"}# 生成接口
@app.post("/generate")
def generate_text(request: GenerateRequest) -> Dict[str, str]:if model is None or tokenizer is None:raise HTTPException(status_code=500, detail="模型未加载")try:# 调用带KV缓存的生成函数generated_text = generate_with_kv_cache(model=model,tokenizer=tokenizer,prompt=request.prefix,max_gen_len=request.max_gen_len,top_k=request.top_k,temperature=request.temperature,device=device)return {"prefix": request.prefix, "generated_text": generated_text}except Exception as e:raise HTTPException(status_code=500, detail=f"生成失败:{str(e)}")# 主函数(本地测试用)
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
该代码实现了一个基于 FastAPI 的 GPT 模型推理服务,核心功能是通过 HTTP 接口接收用户输入的提示词(Prompt),调用优化后的 GPT 模型生成文本响应。服务整合了模型量化加载、KV 缓存加速推理等优化手段,同时提供健康检查接口,是模型从训练到实际应用的 “最后一公里” 工程化实现。
核心功能概述
代码的核心目标是:将训练好的 GPT 模型(支持原始模型和 INT8 量化模型)封装为可通过网络调用的 API 服务,支持用户通过 HTTP 请求输入提示词,获取模型生成的文本(如对话回复、内容续写等),同时确保服务稳定、高效、易用。
核心组件详解
1. 服务初始化与依赖配置
-
FastAPI 应用初始化:
app = FastAPI(title="GPT Inference Service")定义了一个 FastAPI 应用,标题为 “GPT 推理服务”,用于处理 HTTP 请求和路由管理。 -
全局变量定义:
model/tokenizer:存储加载的模型和分词器(全局变量确保多请求共享,避免重复加载)。device:自动选择推理设备(优先 GPU,无 GPU 则用 CPU),确保模型在高效设备上运行。
2. 请求体模型(GenerateRequest)
基于 Pydantic 定义的请求数据结构,用于规范用户调用/generate接口时的输入格式,确保参数合法性(自动校验类型和取值)。
- 字段说明:
prefix(必填):用户输入的提示词(如 “介绍一下人工智能”),是模型生成的基础。max_gen_len(可选,默认 100):生成文本的最大长度(避免生成过长内容)。top_k(可选,默认 50):Top-K 采样参数(仅保留概率最高的 50 个 Token 进行采样,控制生成多样性)。temperature(可选,默认 0.8):温度系数(值越高生成越随机,越低越确定)。
3. 模型与分词器加载(load_model_and_tokenizer函数)
作用:服务启动时加载分词器和模型(支持原始模型和 INT8 量化模型),为后续推理做准备。
-
分词器加载:从
models/bpe_tokenizer目录加载ByteLevelBPETokenizer(与训练时使用的分词器一致,确保编码 / 解码兼容)。 -
模型配置与加载:
- 模型配置:通过字典指定
vocab_size(词表大小)、d_model(隐藏层维度)等参数,与训练时的模型结构保持一致。 - 模型路径选择:优先加载 INT8 量化模型(
models/sft/sft_best_int8.pth),若不存在则加载原始 SFT 模型(models/sft/sft_best.pth)。量化模型的优势是显存占用低、推理速度快,适合部署场景。 - 模型加载逻辑:
- 量化模型:调用
load_quantized_model(来自quantizer模块)加载,自动启用 INT8 推理。 - 原始模型:初始化
GPTModel,加载权重后移至目标设备。
- 量化模型:调用
- 模型状态:加载后设置为
eval()模式(禁用 Dropout 等训练层,确保推理结果稳定)。
- 模型配置:通过字典指定
4. 服务启动钩子(startup_event函数)
通过@app.on_event("startup")装饰器,指定服务启动时自动执行load_model_and_tokenizer函数,完成模型和分词器的加载。
- 作用:避免在第一次请求时才加载模型(减少首次请求延迟),确保服务启动后即可立即处理请求。
5. 健康检查接口(/health)
作用:提供简单的服务状态查询接口,用于监控服务是否正常运行(如部署时的监控系统调用)。
- 返回内容:
- 若模型和分词器加载成功:
{"status": "healthy", "model_status": "loaded", "device": "cuda/cpu"}。 - 若未加载成功:
{"status": "unhealthy", "model_status": "not_loaded"}。
- 若模型和分词器加载成功:
6. 文本生成接口(/generate)
作用:接收用户的提示词和生成参数,调用模型生成文本并返回结果,是服务的核心功能接口。
- 流程:
- 参数校验:自动校验输入是否符合
GenerateRequest格式(如max_gen_len是否为正整数)。 - 模型状态检查:若模型未加载,抛出 500 错误(
HTTPException)。 - 文本生成:调用
generate_with_kv_cache(来自kv_cache_infer模块)生成文本,该函数使用 KV 缓存加速推理(相比普通生成快 2-3 倍),参数包括用户输入的prefix、max_gen_len等。 - 结果返回:返回字典
{"prefix": 用户输入, "generated_text": 模型生成的文本}。 - 异常处理:若生成过程中出错(如输入过长、模型内部错误),捕获异常并返回 500 错误,附带具体原因。
- 参数校验:自动校验输入是否符合
7. 服务启动入口(主函数)
if __name__ == "__main__":import uvicornuvicorn.run(app, host="0.0.0.0", port=8000, workers=1)
- 作用:本地测试时启动服务,使用
uvicorn(FastAPI 的推荐服务器)运行应用。 - 参数说明:
host="0.0.0.0":允许所有网络接口访问(支持远程调用)。port=8000:服务端口(可自定义)。workers=1:单工作进程(因 PyTorch 模型不支持多进程共享,多 worker 可能导致显存错误)。
工程化设计亮点
-
优化推理效率:
- 优先使用 INT8 量化模型(减少 75% 显存占用,提升推理速度)。
- 集成 KV 缓存加速(
generate_with_kv_cache),长文本生成时速度提升 2-3 倍。
-
服务稳定性:
- 启动时预加载模型,避免首次请求延迟。
- 健康检查接口便于监控服务状态。
- 异常捕获机制(
try-except)确保单条请求失败不影响整个服务。
-
易用性:
- 基于 FastAPI 自动生成交互式文档(访问
http://localhost:8000/docs可直接测试接口)。 - Pydantic 请求模型自动校验输入,减少错误请求。
- 基于 FastAPI 自动生成交互式文档(访问
-
兼容性:
- 同时支持原始模型和量化模型,根据文件存在性自动选择。
- 自动适配 GPU/CPU 设备,无需手动配置。
使用场景
该服务可直接部署为文本生成 API,支持多种场景:
- 对话机器人:用户输入问题(
prefix),服务返回回答。 - 内容续写:输入文章开头,服务续写后续内容。
- 指令跟随:输入指令(如 “写一首关于春天的诗”),服务生成符合指令的文本。
总结
该代码是 GPT 模型从 “离线训练” 到 “在线服务” 的桥梁,通过 FastAPI 封装模型推理功能,整合量化和 KV 缓存优化,提供稳定、高效、易用的文本生成 API。核心价值在于降低模型应用门槛,让训练好的模型可通过简单的 HTTP 请求被调用,适用于生产环境中的各类文本生成场景。
13. inference/load_test.py
from locust import HttpUser, task, between
import json
import random# 测试用例:随机选择提示词
PROMPTS = ["解释什么是人工智能","写一首关于春天的诗","如何学习Python编程","简述相对论的核心思想","推荐一部好看的科幻电影并说明理由","什么是区块链技术","如何提高睡眠质量","解释量子计算的基本原理"
]class GPTInferenceUser(HttpUser):"""Locust压测用户类"""wait_time = between(1, 3) # 每个用户请求间隔1-3秒@task(1)def test_generate(self):"""测试生成接口"""# 随机选择提示词和参数prompt = random.choice(PROMPTS)max_gen_len = random.randint(50, 150)top_k = random.randint(30, 100)temperature = round(random.uniform(0.5, 1.0), 1)# 发送请求self.client.post("/generate",data=json.dumps({"prefix": prompt,"max_gen_len": max_gen_len,"top_k": top_k,"temperature": temperature}),headers={"Content-Type": "application/json"})@task(0.1)def test_health(self):"""测试健康检查接口(低优先级)"""self.client.get("/health")# 压测说明:
# 执行命令:locust -f load_test.py -u 50 -r 10 -t 10m --host http://localhost:8000
# 参数说明:
# -u 50:并发用户数50
# -r 10:每秒新增10个用户
# -t 10m:压测持续10分钟
该代码是一个基于 Locust 的 GPT 推理服务压测脚本,核心功能是模拟多并发用户对 FastAPI 封装的 GPT 推理服务(含/generate生成接口和/health健康检查接口)发起请求,测试服务在高并发场景下的性能(如响应时间、吞吐量、稳定性),为服务部署优化提供数据支撑。以下是各组件功能的详细拆解:
核心依赖与定位
- Locust:轻量级 Python 压测框架,通过 “模拟用户行为” 实现并发压力测试,支持自定义请求逻辑、并发用户数、请求间隔等参数,输出直观的性能指标(如 RPS、平均响应时间、失败率)。
- 测试目标:针对前文实现的 “GPT 推理 FastAPI 服务”,重点验证核心业务接口(
/generate)的抗并发能力,同时兼顾基础监控接口(/health)的稳定性。
核心组件详解
1. 测试用例池(PROMPTS列表)
作用:提供多样化的测试输入(提示词),模拟真实用户的不同请求场景,避免单一输入导致的测试结果偏差。
- 内容设计:包含 8 个不同领域的提示词,覆盖 “概念解释”(如 “解释什么是人工智能”)、“创作”(如 “写一首关于春天的诗”)、“教程”(如 “如何学习 Python 编程”)、“推荐”(如 “推荐一部好看的科幻电影”)等典型使用场景,与 GPT 推理服务的实际业务需求匹配。
- 使用逻辑:测试时随机选择列表中的提示词(
random.choice(PROMPTS)),模拟不同用户的个性化输入。
2. 压测用户类(GPTInferenceUser)
继承 Locust 的HttpUser类,是 “模拟用户” 的核心载体,定义了用户的行为逻辑(发起哪些请求、请求间隔、请求权重)。
(1)请求间隔配置(wait_time)
wait_time = between(1, 3)
- 作用:控制单个模拟用户在两次请求之间的等待时间,随机在 1~3 秒之间取值。
- 设计目的:模拟真实用户的操作节奏(不会连续无间隔发起请求),避免 “瞬时流量峰值” 导致的非真实性能数据,让压测结果更贴近生产环境。
(2)核心业务压测任务(test_generate方法)
用@task(1)装饰,代表 “生成接口测试” 是高优先级任务(权重为 1,占所有任务的绝大部分比例),对应 FastAPI 服务的/generate接口(文本生成核心功能)。
任务逻辑拆解:
-
随机请求参数:
- 提示词:
random.choice(PROMPTS)→ 从测试用例池选一个提示词,模拟不同用户需求。 - 最大生成长度:
random.randint(50, 150)→ 随机 50~150 个 Token,覆盖短 / 中长度生成场景。 - Top-K 采样:
random.randint(30, 100)→ 随机 30~100 的候选 Token 数,模拟用户对 “生成多样性” 的不同配置。 - 温度系数:
round(random.uniform(0.5, 1.0), 1)→ 随机 0.5~1.0 的温度(保留 1 位小数),模拟 “确定性”(低温度)到 “多样性”(高温度)的不同需求。
- 提示词:
-
发送 POST 请求:
- 目标接口:
/generate(GPT 推理服务的核心业务接口)。 - 请求数据:用
json.dumps将参数转为 JSON 格式,与 FastAPI 的GenerateRequest模型字段对应(确保参数格式合法)。 - 请求头:
{"Content-Type": "application/json"}→ 声明请求体为 JSON 格式,符合 HTTP 协议规范。 - 底层调用:
self.client.post是 LocustHttpUser封装的 HTTP 客户端,自动记录请求的响应时间、状态码等指标,用于后续性能统计。
- 目标接口:
(3)基础监控压测任务(test_health方法)
用@task(0.1)装饰,代表 “健康检查接口测试” 是低优先级任务(权重为 0.1,仅占所有任务的约 9%),对应 FastAPI 服务的/health接口(服务状态监控)。
任务逻辑拆解:
- 发送 GET 请求:
self.client.get("/health")→ 无参数,仅请求服务健康状态。 - 权重设计:权重 0.1 远低于
test_generate的 1,模拟生产环境中 “健康检查请求远少于业务请求” 的真实比例(如监控系统每分钟 1 次健康检查,而业务请求每秒数十次)。
3. 压测执行说明(注释部分)
提供完整的压测启动命令和参数解释,降低使用门槛,确保测试者能快速执行压测。
命令与参数解析:
locust -f load_test.py -u 50 -r 10 -t 10m --host http://localhost:8000
-f load_test.py:指定压测脚本路径(当前脚本)。-u 50:设置并发用户数为 50(同时模拟 50 个用户发起请求)。-r 10:设置用户生成速率为每秒 10 个(从 0 逐步增加到 50 个并发用户,避免瞬时流量冲击服务,模拟真实流量上升过程)。-t 10m:设置压测持续时间为 10 分钟(足够长时间观察服务在稳定高并发下的性能表现,如是否出现内存泄漏、响应时间逐渐变长等问题)。--host http://localhost:8000:指定目标服务地址(GPT 推理 FastAPI 服务的部署地址,本地测试用localhost:8000,远程测试需改为服务的公网 IP / 域名)。
压测输出与核心价值
1. 压测过程中可观察的指标
执行脚本后,Locust 会启动 Web UI(默认访问http://localhost:8089),实时展示以下关键性能指标:
- RPS(Requests Per Second):每秒请求数(吞吐量)→ 衡量服务的处理能力。
- Average Response Time:平均响应时间 → 衡量服务的响应速度(
/generate接口因模型计算耗时,通常比/health慢一个数量级)。 - Failure Rate:请求失败率 → 衡量服务稳定性(正常情况下应为 0,失败可能是服务过载、内存不足等导致)。
- Pending Requests:等待处理的请求数 → 若持续增加,说明服务处理能力不足,存在请求堆积。
2. 核心价值
- 验证服务性能上限:通过逐步增加并发用户数,找到服务的 “瓶颈点”(如并发 50 时 RPS 稳定,并发 60 时失败率上升,说明服务上限约 50 并发)。
- 提前发现问题:在生产部署前,暴露服务在高并发下的潜在问题(如 GPU 显存不足、CPU 占用过高、接口超时等)。
- 指导优化方向:若
/generate接口响应时间过长,可针对性优化(如升级 GPU、启用更大 KV 缓存、进一步量化模型)。
总结
该代码是一套高仿真、可落地的 GPT 推理服务压测方案:
- 行为仿真:通过随机提示词、随机参数、真实请求间隔,模拟真实用户的多样化使用场景。
- 权重合理:核心业务接口与监控接口的权重配比符合生产环境实际情况。
- 易用性高:提供完整的执行命令和参数解释,新手可快速上手。
通过该脚本的压测结果,可清晰评估 GPT 推理服务的性能表现,为服务的部署规模(如需要多少 GPU 节点)、资源配置(如 GPU 型号、内存大小)和优化方向提供数据支撑。
14. docker/Dockerfile
# 基础镜像:NVIDIA CUDA 11.7 + Ubuntu 20.04
FROM nvidia/cuda:11.7.1-cudnn8-runtime-ubuntu20.04# 设置环境变量
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1# 安装系统依赖
RUN apt-get update && apt-get install -y \python3.9 \python3.9-dev \python3-pip \&& rm -rf /var/lib/apt/lists/*# 设置Python默认版本
RUN ln -s /usr/bin/python3.9 /usr/bin/python && \ln -s /usr/bin/pip3 /usr/bin/pip# 安装Python依赖(国内源加速)
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple# 创建工作目录
WORKDIR /app# 复制项目文件
COPY . /app/# 暴露API端口
EXPOSE 8000# 启动命令:使用UVicorn运行FastAPI服务
CMD ["uvicorn", "inference.api_service:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
该代码是一个Dockerfile,用于构建一个可部署 GPT 推理服务的容器镜像。它通过定义基础环境、依赖安装、项目配置和启动命令,将 GPT 推理服务(包括模型、代码、运行环境)打包成一个标准化、可移植的容器,确保服务在不同环境(开发机、服务器、云平台)中能一致运行。以下是各部分功能的详细拆解:
核心目标
构建一个支持GPU 加速的 GPT 推理服务容器,包含:
- 兼容的 CUDA 环境(支持 PyTorch GPU 计算);
- 完整的 Python 运行时和项目依赖;
- 推理服务代码和模型(通过后续挂载或复制);
- 自动启动的 FastAPI 服务(通过 UVicorn 运行)。
逐行功能解析
1. 基础镜像选择
FROM nvidia/cuda:11.7.1-cudnn8-runtime-ubuntu20.04
- 作用:指定容器的基础镜像,决定底层操作系统和 GPU 支持环境。
- 细节:
nvidia/cuda:NVIDIA 官方提供的包含 CUDA 工具链的镜像,确保容器能利用 GPU 加速(关键,因为 GPT 模型推理依赖 GPU 提升速度)。11.7.1-cudnn8:CUDA 版本 11.7.1 + cuDNN 8(深度学习常用加速库),与主流 PyTorch 版本(如 1.13.x)兼容。runtime-ubuntu20.04:基于 Ubuntu 20.04 系统的 “运行时” 版本(仅包含运行 CUDA 程序的必要库,不包含编译工具,体积更小,适合部署)。
2. 环境变量配置
ENV DEBIAN_FRONTEND=noninteractive
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
- 作用:设置容器运行时的环境变量,优化构建和运行体验。
- 细节:
DEBIAN_FRONTEND=noninteractive:禁用 Ubuntu 的交互式配置(如安装时弹出时区选择),确保apt-get install等命令能自动化执行,避免构建中断。PYTHONDONTWRITEBYTECODE=1:禁止 Python 生成.pyc文件(字节码缓存),减少容器内的文件冗余,节省磁盘空间。PYTHONUNBUFFERED=1:禁用 Python 输出缓冲,确保程序日志(如print、错误信息)能实时输出到容器控制台,方便调试和日志收集(否则日志可能堆积在缓冲区,无法及时查看)。
3. 安装系统依赖
RUN apt-get update && apt-get install -y \python3.9 \python3.9-dev \python3-pip \&& rm -rf /var/lib/apt/lists/*
- 作用:安装容器运行所需的系统级工具(主要是 Python 环境)。
- 细节:
apt-get update:更新 Ubuntu 软件源索引,确保能安装最新版本的包。- 安装包:
python3.9:Python 3.9 解释器(项目代码的运行环境)。python3.9-dev:Python 开发库(部分 Python 依赖包编译时需要,如pycocotools)。python3-pip:Python 包管理工具(用于安装requirements.txt中的依赖)。
rm -rf /var/lib/apt/lists/*:清理 apt 缓存文件,减少镜像体积(缓存文件对运行无意义,仅占空间)。
4. 配置 Python 默认版本
RUN ln -s /usr/bin/python3.9 /usr/bin/python && \ln -s /usr/bin/pip3 /usr/bin/pip
- 作用:创建符号链接,统一 Python 命令的使用方式。
- 细节:
- Ubuntu 中默认的
python可能指向 Python 2.x(已淘汰),python3指向 Python 3.x。通过ln -s将/usr/bin/python链接到python3.9,/usr/bin/pip链接到pip3,确保在容器内执行python和pip命令时,默认使用 Python 3.9 版本,避免版本混淆(如安装依赖时用错 pip 版本)。
- Ubuntu 中默认的
5. 安装 Python 依赖
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt -i https://pypi.tuna.tsinghua.edu.cn/simple
- 作用:安装项目所需的 Python 库(如 FastAPI、PyTorch、Transformers 等)。
- 细节:
COPY requirements.txt /app/:先将本地的requirements.txt(记录项目依赖的文件)复制到容器内的/app目录(后续工作目录)。pip install参数:--no-cache-dir:不缓存下载的包文件,减少镜像体积。-r /app/requirements.txt:从指定文件读取依赖列表并安装。-i https://pypi.tuna.tsinghua.edu.cn/simple:使用清华 PyPI 镜像源加速下载(国内网络环境下比默认源快很多,避免安装超时)。
6. 设置工作目录
WORKDIR /app
- 作用:指定容器内的 “工作目录”,后续的
COPY、RUN、CMD等命令都会以/app为默认路径,简化路径写法(如后续复制文件时无需写全路径)。
7. 复制项目文件
COPY . /app/
- 作用:将本地项目根目录下的所有文件(代码、配置、模型等)复制到容器内的
/app目录(与工作目录一致),确保容器内包含完整的项目代码和资源(如inference/api_service.py、模型配置文件等)。
8. 暴露服务端口
EXPOSE 8000
- 作用:声明容器运行时会监听的端口(8000 端口,与 FastAPI 服务的默认端口一致)。
- 注意:
EXPOSE仅为 “声明”(方便开发者了解端口用途),不会自动映射到宿主机。实际部署时需通过docker run -p 宿主机端口:8000手动映射。
9. 启动服务命令
CMD ["uvicorn", "inference.api_service:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
- 作用:定义容器启动时自动执行的命令,启动 GPT 推理服务。
- 细节:
uvicorn:FastAPI 推荐的 ASGI 服务器(用于运行异步 Python Web 应用)。inference.api_service:app:指定 FastAPI 应用的位置(inference/api_service.py文件中的app实例)。--host 0.0.0.0:绑定到所有网络接口(允许容器外部访问,若设为127.0.0.1则仅容器内部可访问)。--port 8000:服务监听端口(与EXPOSE 8000对应)。--workers 4:启动 4 个工作进程处理请求(根据 CPU 核心数调整,过多可能导致资源竞争)。
构建与运行流程
- 构建镜像:在 Dockerfile 所在目录执行
docker build -t gpt-inference:v1 .,生成名为gpt-inference:v1的镜像。 - 运行容器:执行
docker run -d --gpus all -p 8000:8000 gpt-inference:v1,启动容器并映射宿主机 8000 端口到容器 8000 端口,--gpus all允许容器使用宿主机的所有 GPU(关键,启用 GPU 加速)。 - 访问服务:通过
http://宿主机IP:8000访问 GPT 推理服务(如/generate接口)。
核心价值
- 环境一致性:无论在开发机、测试服务器还是云平台,容器内的环境(CUDA 版本、Python 版本、依赖库)完全一致,避免 “本地能跑,部署就崩” 的问题。
- GPU 加速支持:基于 NVIDIA CUDA 镜像构建,确保 PyTorch 能调用 GPU 进行推理(比 CPU 快 10-100 倍)。
- 部署便捷性:通过一条
docker run命令即可启动服务,无需手动配置复杂依赖,适合大规模集群部署。 - 资源隔离:容器内的服务运行在独立环境中,不会与宿主机或其他容器的软件冲突。
总结
该 Dockerfile 是 GPT 推理服务的 “部署蓝图”,通过标准化的步骤构建了包含 GPU 支持、Python 环境、项目代码和启动逻辑的容器镜像。它解决了 AI 模型部署中最常见的 “环境依赖复杂”“GPU 配置繁琐”“跨平台兼容性差” 等问题,是生产环境部署 GPT 推理服务的关键工具。
15. docker/docker-compose.yaml
version: '3.8'services:gpt-inference-service:build:context: ../ # 项目根目录dockerfile: docker/Dockerfileimage: gpt-inference:v1.0container_name: gpt-inference-servicerestart: alwaysports:- "8000:8000" # 映射端口到宿主机volumes:- ../models:/app/models # 挂载模型目录(宿主机:容器内)- ../data:/app/data # 挂载数据目录(可选)deploy:resources:reservations:devices:- driver: nvidiacount: all # 使用所有可用GPUcapabilities: [gpu]environment:- NVIDIA_VISIBLE_DEVICES=all # 可见GPU- LC_ALL=C.UTF-8- LANG=C.UTF-8
该代码是一个docker-compose.yaml 配置文件,用于定义和管理 GPT 推理服务的容器化部署。它基于 Docker Compose 工具,通过声明式配置整合容器构建、资源分配、端口映射、数据挂载等部署细节,实现 “一键启动” GPT 推理服务的完整环境(包括 GPU 资源调度、模型文件挂载、服务自动重启等)。以下是各部分功能的详细拆解:
核心目标
通过 Docker Compose 的编排能力,简化 GPT 推理服务的部署流程,解决 “手动配置容器参数繁琐、GPU 资源调度复杂、模型文件更新麻烦” 等问题,确保服务在单节点环境中能快速、稳定地启动和运行。
配置版本与结构
version: '3.8'
- 作用:指定 Docker Compose 的配置版本(3.8),该版本支持 GPU 资源分配、命名卷等高级特性,适配现代 Docker 环境(如 Docker Engine 19.03+)。
服务配置(services)
核心部分,定义了名为gpt-inference-service的服务(即 GPT 推理服务容器),包含构建、运行、资源等所有配置。
1. 镜像构建配置(build)
build:context: ../ # 项目根目录dockerfile: docker/Dockerfile
- 作用:指定如何构建服务所需的 Docker 镜像。
- 细节:
context: ../:构建镜像的 “上下文目录” 为项目根目录(假设docker-compose.yml位于docker/目录下,../指向根目录)。上下文目录是 Docker 构建时能访问的文件范围,需包含项目所有代码、requirements.txt等构建依赖。dockerfile: docker/Dockerfile:指定 Dockerfile 的路径(相对于上下文目录),即根目录下的docker/Dockerfile(与前文的 Dockerfile 对应),确保使用正确的构建脚本。
2. 镜像名称(image)
image: gpt-inference:v1.0
- 作用:指定构建出的镜像的名称和标签(
gpt-inference为镜像名,v1.0为版本标签)。 - 优势:避免每次构建生成随机镜像名,便于后续管理(如版本回滚、镜像推送)。
3. 容器名称(container_name)
container_name: gpt-inference-service
- 作用:为容器指定固定名称(
gpt-inference-service),替代 Docker 默认生成的随机名称(如gpt-inference_service_1)。 - 优势:方便通过
docker ps、docker logs等命令快速定位和操作容器(如docker logs gpt-inference-service查看日志)。
4. 重启策略(restart)
restart: always
- 作用:定义容器的自动重启规则。
- 细节:
always表示 “无论容器因何种原因停止(包括手动停止后宿主机重启、容器崩溃),都会自动重启”,确保服务在意外中断后能快速恢复,提升可用性(适合生产环境)。 - 其他可选值:
no(不自动重启)、on-failure(仅失败时重启)、unless-stopped(除非手动停止,否则重启)。
5. 端口映射(ports)
ports:- "8000:8000" # 映射端口到宿主机
- 作用:将宿主机的端口与容器内的端口绑定,允许外部通过宿主机端口访问容器内的服务。
- 格式:
"宿主机端口:容器内端口",这里将宿主机的 8000 端口映射到容器内的 8000 端口(与 FastAPI 服务的监听端口一致),因此外部可通过http://宿主机IP:8000访问 GPT 推理接口。
6. 数据卷挂载(volumes)
volumes:- ../models:/app/models # 挂载模型目录(宿主机:容器内)- ../data:/app/data # 挂载数据目录(可选)
- 作用:通过 “宿主机目录→容器目录” 的挂载,实现宿主机与容器之间的文件共享,解决 “模型 / 数据文件无需打包进镜像、可动态更新” 的问题。
- 细节:
../models:/app/models:将宿主机项目根目录下的models目录(存放 GPT 模型权重、分词器文件)挂载到容器内的/app/models目录(与服务代码中加载模型的路径一致)。- 优势:模型文件通常体积大(数 GB),无需打包进镜像(减小镜像体积);更新模型时只需替换宿主机
models目录下的文件,无需重新构建镜像,直接重启容器即可生效。
- 优势:模型文件通常体积大(数 GB),无需打包进镜像(减小镜像体积);更新模型时只需替换宿主机
../data:/app/data:可选配置,挂载数据目录(如测试数据、缓存文件),方便服务读取外部数据。
7. GPU 资源配置(deploy.resources)
deploy:resources:reservations:devices:- driver: nvidiacount: all # 使用所有可用GPUcapabilities: [gpu]
- 作用:指定容器需要使用的 GPU 资源,确保 GPT 推理服务能调用宿主机的 GPU 进行加速(核心配置,否则服务会默认使用 CPU,推理速度极慢)。
- 细节:
driver: nvidia:声明使用 NVIDIA GPU 驱动(依赖 NVIDIA Docker Runtime,需提前在宿主机安装)。count: all:表示容器使用宿主机上所有可用的 GPU(也可指定数量,如count: 1使用 1 张 GPU)。capabilities: [gpu]:声明容器需要 GPU 的基础计算能力(必不可少的配置,否则无法启用 GPU)。
8. 环境变量(environment)
environment:- NVIDIA_VISIBLE_DEVICES=all # 可见GPU- LC_ALL=C.UTF-8- LANG=C.UTF-8
- 作用:向容器内注入环境变量,配置运行时参数。
- 细节:
NVIDIA_VISIBLE_DEVICES=all:与 GPU 资源配置呼应,明确告诉容器 “可以访问所有 GPU”(与count: all配合,确保 GPU 可见性)。LC_ALL=C.UTF-8和LANG=C.UTF-8:设置容器内的语言环境为 UTF-8,避免中文等非 ASCII 字符在日志、输出中出现乱码(如模型生成的中文文本显示异常)。
使用流程
- 准备环境:宿主机安装 Docker、Docker Compose、NVIDIA Docker Runtime(确保 GPU 可用)。
- 启动服务:在
docker-compose.yml所在目录执行docker-compose up -d,后台启动服务(-d表示 detached 模式)。 - 查看状态:执行
docker-compose ps查看服务是否正常运行;docker-compose logs -f实时查看服务日志。 - 停止服务:执行
docker-compose down停止并删除容器(数据卷挂载的文件不会删除)。
核心价值
- 部署简化:通过一个配置文件整合所有部署细节(构建、端口、GPU、挂载等),无需手动输入冗长的
docker run命令,实现 “一键部署”。 - 资源保障:明确配置 GPU 资源,确保服务能利用 GPU 加速(对 GPT 推理至关重要,否则 CPU 推理速度无法满足需求)。
- 灵活性高:模型目录通过数据卷挂载,支持动态更新模型(无需重新构建镜像),适应模型迭代场景。
- 稳定性强:
restart: always确保服务意外中断后自动恢复,减少人工干预。 - 可扩展性:若未来需要增加其他服务(如日志收集、负载均衡),可直接在
services中添加新配置,实现多服务协同部署。
总结
该 docker-compose.yml 是 GPT 推理服务的 “部署指挥中心”,通过声明式配置解决了容器化部署中的核心问题(环境一致性、GPU 调度、模型管理、服务可用性)。它让技术人员无需深入了解 Docker 底层细节,即可快速部署一个高效、稳定的 GPT 推理服务,是从开发环境到生产环境的关键桥梁。
四、训练数据集示例
- 预训练数据:将文本文件(每行 1 条样本)放入
data/pretrain/(如train.txt、val.txt)。
data/pretrain/train.txt(训练集,示例 50 条,实际可扩展至数万 / 数百万条)
人工智能是指机器模拟人类智能的技术,涵盖机器学习、自然语言处理等多个领域。
Transformer架构通过自注意力机制实现了并行化的序列建模,大幅提升了模型的训练效率。
GPT模型是一种基于Transformer解码器的自回归语言模型,擅长生成连贯的自然语言文本。
地球是太阳系中的第三颗行星,是人类目前已知唯一存在生命的天体。
光合作用是植物利用阳光将二氧化碳和水转化为有机物和氧气的过程。
唐朝是中国历史上最强盛的朝代之一,在文化、经济、军事等方面都有突出成就。
Python是一种简洁易读的编程语言,广泛应用于数据分析、人工智能和Web开发。
相对论是爱因斯坦提出的物理学理论,分为狭义相对论和广义相对论。
互联网是全球范围内的计算机网络互联系统,彻底改变了信息传播和交流的方式。
蜜蜂通过舞蹈传递花蜜的位置信息,这种行为被称为“ waggle dance”。
量子计算利用量子比特的叠加和纠缠特性,有望在某些任务上超越传统计算机。
唐诗宋词是中国古代文学的瑰宝,李白、杜甫、苏轼等诗人的作品流传千古。
温室效应是指大气中的温室气体(如二氧化碳)吸收并重新辐射红外辐射,导致地球升温的现象。
机器学习的监督学习模式需要标注数据来训练模型,无监督学习则从无标注数据中发现规律。
太阳系有八大行星,按距离太阳由近到远依次是水星、金星、地球、火星、木星、土星、天王星、海王星。
DNA是脱氧核糖核酸的缩写,是生物的遗传物质,包含了生物体发育和运作的全部信息。
亚马逊雨林是世界上最大的热带雨林,被称为“地球之肺”,对全球气候有重要影响。
计算机病毒是一种恶意软件,能自我复制并破坏计算机系统,与生物病毒有相似的传播特性。
复利是指利息在每个计息期结束后加入本金,下一期的利息基于新的本金计算,是财富增长的重要机制。
元宇宙是一个融合了虚拟与现实的数字化空间,用户可以通过虚拟身份在其中社交、工作、娱乐。
(可继续扩展至更多领域,如:经济、艺术、体育、地理等,每条保持独立且语义完整)
data/pretrain/val.txt(验证集,示例 10 条)
自然语言处理是人工智能的一个分支,专注于让计算机理解和生成人类语言。
比特币是一种去中心化的加密货币,基于区块链技术,2009年由中本聪提出。
牛顿三大运动定律是经典力学的基础,描述了物体的运动与受力之间的关系。
撒哈拉沙漠是世界上最大的热带沙漠,面积约900万平方公里,气候极端干旱。
蛋白质是生命的物质基础,由氨基酸组成,执行着生物体的各种功能,如催化、结构支撑等。
文艺复兴是14世纪至17世纪欧洲的一场思想文化运动,推动了科学、艺术和文学的繁荣。
深度学习是机器学习的一个子领域,通过多层神经网络学习复杂的特征表示,在图像识别、语音识别等任务上表现出色。
水循环是指地球上的水在大气、陆地和海洋之间通过蒸发、降水、径流等过程不断循环的现象。
诺贝尔奖是根据瑞典化学家阿尔弗雷德·诺贝尔的遗嘱设立的,涵盖物理、化学、生理或医学、文学、和平五个领域。
虚拟现实技术通过计算机生成的虚拟环境,让用户获得沉浸式的体验,广泛应用于游戏、教育和培训等领域。
微调数据:SFT 数据(sft_data.json)和 RLHF 偏好数据(rlhf_pairwise.json)放入 data/sft/,格式参考:
data/sft/sft_data.json
[{"instruction": "解释什么是AI", "response": "人工智能(AI)是..."},{"instruction": "写一首诗", "response": "静夜思..."},
]
data/sft/rlhf_pairwise.json
[{"instruction": "推荐一部电影", "chosen": "《星际穿越》...", "rejected": "《某烂片》..."}
]
五、总结
本文详细介绍了基于Transformer解码器的GPT模型的完整复现流程,涵盖了从预训练到推理部署的全过程。主要内容包括:
1.模型架构:采用纯解码器的Transformer结构,实现了多头自注意力、前馈网络等核心组件,支持自回归文本生成。
2.关键技术:
- 字节级BPE分词器,支持任意字符编码
- 因果掩码确保自回归特性
- 混合精度训练加速
- KV缓存加速推理
3.训练流程:
- 预训练阶段:通过大规模文本数据学习通用语言表示
- 监督微调(SFT):使模型学会指令跟随
- 强化学习(RLHF):通过人类反馈优化生成质量
4.工程实践:
- 提供了完整的项目结构
- 实现了模型量化(INT8)以降低部署成本
- 容器化部署方案
- API服务封装
- 性能评估工具
该实现遵循现代大语言模型的预训练-微调范式,通过模块化设计支持灵活扩展,可作为构建生成式AI应用的基础框架。
