当前位置: 首页 > news >正文

Model 速通系列(一)nanoGPT

这个是新开的一个系列用来手把手复现一些模型工程,之所以开这个系列是因为有人留言说看到一个工程不知道从哪里读起,出于对自身能力的提升与兴趣,故新开了这个系列。由于主要动机是顺一遍代码并提供注释。

该系列第一篇博客是 nanoGPT ,这个仓库是前OpenAI大佬 karpathy Andrej 的工程,基本是GPT的mini版,虽然最近一次更新是在2年前,部分组件和理念已经有些过时了,但作为入门工程而言是再适合不过的。

  • 项目链接:https://github.com/karpathy/nanoGPT
  • 代码分支:master
  • commit:93a43d9a5c22450bbf06e78da2cb6eeef084b717
  • 代码仓库:https://github.com/GaohaoZhou-ops/Model-Reproduction/tree/main/nanoGPT

写在最前面

  • 在没有特殊说明的情况下不会对训练和微调进行实际操作,但会包含相关代码
  • 项目中的文件实现顺序以大标题的1,2,3 … 为依据,按照该顺序即可构建出完成工程
  • 文章会尽可能保留原始工程的目录结构,但对于一些git、assets等文件夹就不进行赘述
  • 文章的主要目的是过一遍代码,特别是模型工程的构建流程,如果你想直接使用工程建议还是去拉取原始仓库
  • 每篇文章都会在我的github仓库中对应一个文件夹,建议在阅读时配合代码仓库一起看
  • 期间中会根据情况删除、添加、翻译一些注释

Github 仓库:https://github.com/GaohaoZhou-ops/Model-Reproduction


1. 模型构建 model.py

这个工程相对比较简单,在模型构建方面只用到了pytorch的一些基本组件,通过以下命令可以查看原始工程中 model.py 文件的库依赖关系:

【Note】:这一步需要在原始工程文件夹中执行,你也可以不执行只看我的推演过程。

(model) $ pydeps model.py --max-bacon=1

在这里插入图片描述

导入必要的包:

import math
import inspect
from dataclasses import dataclassimport torch
import torch.nn as nn
from torch.nn import functional as F

定义归一化层类:

因为torch自带的 layer_norm 对象没有设置 bias 为None 的选项,所以这里需要覆写一下类:

class LayerNorm(nn.Module):def __init__(self, ndim, bias):super().__init__()self.weight = nn.Parameter(torch.ones(ndim))self.bias = nn.Parameter(torch.zeros(ndim)) if bias else Nonedef forward(self, input):return F.layer_norm(input, self.weight.shape, self.weight, self.bias, 1e-5)

定义CausalSelfAttention类:

  • 参考文章:https://zhuanlan.zhihu.com/p/663840848

在这里插入图片描述

CausalSelfAttention 是一种自注意力机制,主要用于序列到序列的任务中,例如机器翻译、文本摘要等。主要特点如下:

  • 仅允许注意力模型 访问当前和之前的输入,不能访问之后的输入,例如在机器翻译任务中,翻译某个词时不能访问该词之后的内容;
  • 通过掩盖(masking)未来的输入来实现因果性。具体方法是在计算注意力权重时,对角线以上的权重置零,从而避免模型看到未来的信息;
  • 可用于 Transformer 和其变种模型中捕捉序列的时间依赖关系。Transformer 原本是并行计算的,使用 CausalSelfAttention 赋予其顺序处理能力;
  • CausalSelfAttention 学习到的表示更符合时间顺序,更有利于序列生成任务
  • 一般用于编码器端,而解码器端则使用 MaskedSelfAttention,它允许解码器访问编码器输出的所有时刻,以使用整个输入序列的上下文信息;
class CausalSelfAttention(nn.Module):def __init__(self, config) -> None:super().__init__()assert config.n_embd % config.n_head == 0   # 确保嵌入维度(n_embd)能被注意力头数(n_head)整除,每个头将分配到相同大小的子空间self.c_attn = nn.Linear(config.n_embd, 3*config.n_head, bias=config.bias)   # 线性变换层,用于生成查询(Q)、键(K)和值(V),将嵌入维度扩展为三倍,以便后续拆分为 Q、K、V。self.c_proj = nn.Linear(config.n_embd, config.n_embd, bias=config.bias)     # 将注意力输出重新投影回原始维度self.attn_dropout = nn.Dropout(config.dropout)      # 用于注意力矩阵(防止过拟合)self.resid_dropout = nn.Dropout(config.dropout)     # 用于残差连接的输出self.n_head = config.n_headself.n_embd = config.n_embdself.dropout = config.dropputself.flash = hasattr(torch.nn.functional, 'scaled_dot_product_attention')if not self.flash:  # 判断是否支持GPU 加速的Flash Attentionprint("WARNING: using slow attention. Flash Attention requires PyTorch >= 2.0")# 使用下三角矩阵构建因果掩码,确保只关注左侧信息(即历史信息)self.register_buffer("bias", torch.tril(torch.ones(config.block_size, config.block_size)).view(1,1, config.block_size, config.block_size))def forward(self, x):B,T,C = x.size()    # batch_size, token_length, embedding_dimq,k,v = self.c_attn(x).split(self.n_embd, dim=2)    # 计算 Q、K、Vk = k.view(B, T, self.n_head, C // self.n_head).transpose(1,2)  # shape=(B,nh,T,hs)q = q.view(B, T, self.n_head, C // self.n_head).transpose(1,2)  # shape=(B,nh,T,hs)v = v.view(B, T, self.n_head, C // self.n_head).transpose(1,2)  # shape=(B,nh,T,hs)if self.flash:      # 如果支持GPU加速,则使用flash attentiony = torch.nn.functional.scaled_dot_product_attention(q,k,v, attn_mask=None, dropout_p=self.dropout if self.training else 0, is_causal=True)else:               # 否则手动计算att = (q @ k.transpose(-2,-1)) * (1.0 / math.sqrt(k.size(-1)))att = att.masked_fill(self.bias[:,:,:T,:T] == 0, float('-inf'))    att = F.softmax(att, dim=-1)att = self.attn_dropout(att)y = att @ v     # (B, nh, T, T) x (B, nh, T, hs) -> (B, nh, T, hs)y = y.transpose(1,2).contiguous().view(B,T,C)       # 还原为原始形状,便于后续处理y = self.resid_dropout(self.c_proj(y))return y

定义 MLP 类

该类用于线性映射,总体结构非常简单

class MLP(nn.Module):def __init__(self, config) -> None:super().__init__()self.c_fc = nn.Linear(config.n_embd, 4*config.n_embd, bias=config.bias)self.gelu = nn.GELU()self.c_proj = nn.Linear(4*config.n_embd, config.n_embd, bias=config.bias)self.dropout = nn.Dropout(config.dropout)def forward(self, x):x = self.c_fc(x)x = self.gelu(x)x = self.c_proj(x)x = self.dropout(x)return x

定义单个计算单元 Block

class Block(nn.Module):def __init__(self, config) -> None:super().__init__()self.ln_1 = LayerNorm(config.n_embd, bias=config.bias)self.attn = CausalSelfAttention(config)self.ln_2 = LayerNorm(config.n_embd, bias=config.bias)self.mlp = MLP(config)def forward(self, x):x = x + self.attn(self.ln_1(x))x = x + self.mlp(self.ln_2(x))return x

定义GPT类需要的配置对象

@dataclass
class GPTConfig:block_size: int=1024vocab_size: int=50304n_layer: int=12n_head: int=12n_embd: int=768dropout: float=0.0bias: bool=True

定义完整GPT模型

在模型中使用了 AdamW 优化器,相对与Adam的改动其实十分简单,其将权重衰减项从梯度的计算中拿出来直接加在了最后的权重更新步骤上。其提出的动机在于:原先Adam的实现中如果采用了L2权重衰减,则相应的权重衰减项会被直接加在loss里,从而导致动量的一阶与二阶滑动平均均考虑了该权重衰减项,而这影响了Adam的优化效果,而将权重衰减与梯度的计算进行解耦能够显著提升Adam的效果

目前,AdamW现在已经成为transformer训练中的默认优化器。

  • 参考链接:https://www.zhihu.com/question/536185388/answer/83259341287
class GPT(nn.Module):def __init__(self, config:GPTConfig) -> None:super().__init__()assert config.vocab_size is not None    # 检查配置中 词汇表大小(vocab_size)和块大小(block_size)是否有效。assert config.block_size is not Noneself.config = configself.transformer = nn.ModuleDict(dict(                      # Transformer结构wte=nn.Embedding(config.vocab_size, config.n_embd),     # 词嵌入(wte):将词索引映射为向量表示,shape=(vocab_size, n_embd)wpe=nn.Embedding(config.block_size, config.n_embd),     # 位置嵌入(wpe):将位置索引映射为向量,shape=(block_size, n_embd)drop=nn.Dropout(config.dropout),h=nn.ModuleList([Block(config) for _ in range(config.n_layer)]),    # 多个并列的注意力块,每个块由自定义 Block(config) 生成ln_f = LayerNorm(config.n_embd, bias=config.bias),      # 最终输出的层归一化))self.lm_head = nn.Linear(config.n_embd, config.vocab_size, bias=False)  # 语言模型头:将最后的特征映射回词汇表空间,用于词预测self.transformer.wte.weight = self.lm_head.weight   # 权重绑定:词嵌入层和输出投影层共享权重,减少参数量,提高性能self.apply(self._init_weights)      # 权重初始化for pn, p in self.named_parameters():   # 对残差连接权重(c_proj)进行特殊初始化,符合 GPT-2 论文规范if pn.endswith('c_proj.weight'):torch.nn.init.normal_(p, mean=0.0, std=0.02/math.sqrt(2*config.n_layer))print("number of parameters: %.2fM" % (self.get_num_params()/1e6,))# 获得参数总量def get_num_params(self, non_embedding=True):n_params = sum(p.numel() for p in self.parameters())if non_embedding:n_params -= self.transformer.wpe.weight.numel()return n_params# 初始化整体权重def _init_weights(self, module):if isinstance(module, nn.Linear):torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)if module.bias is not None:torch.nn.init.zeros_(module.bias)elif isinstance(module, nn.Embedding):torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)def forward(self, idx, targets=None):device = idx.deviceb,t = idx.size()assert t <= self.config.block_size, f"Cannot forward sequence of length {t}, block size is only {self.config.block_size}"   # 确保输入序列长度不超过配置的块大小pos = torch.arange(0, t, dtype=torch.long, device=device)   # 位置编码,shape=(t)# 嵌入层前向计算:词嵌入 & 位置嵌入tok_emb = self.transformer.wte(idx) # shape=(b, t, n_embd)pos_emb = self.transformer.wpe(pos) # shape=(t, n_embd)x = self.transformer.drop(tok_emb+pos_emb)# 逐层通过多层注意力块(Block),最后进行层归一化for block in self.transformer.h:x = block(x)x = self.transformer.ln_f(x)if targets is not None:         # 训练阶段logits = self.lm_head(x)        # 计算预测分布(logits):使用语言模型头得到词概率分布loss = F.cross_entropy(logits.view(-1, logits.size(-1)), targets.view(-1), ignore_index=-1) # 交叉熵损失:比较预测值和目标值,忽略填充索引 -1else:                           # 推理阶段logits = self.lm_head(x[:, [-1], :])    # 仅计算最后一个时间步的预测,减少推理时的计算量loss = Nonereturn logits, lossdef crop_block_size(self, block_size):assert block_size <= self.config.block_sizeself.config.block_size = block_sizeself.transformer.wpe.weight = nn.Parameter(self.transformer.wpe.weight[:block_size])for block in self.transformer.h:if hasattr(block.attn, 'bias'):block.attn.bias = block.attn.bias[:, :, :block_size, :block_size]@classmethoddef from_pretrained(cls, model_type, override_args=None):assert model_type in {'gpt2', 'gpt2-medium', 'gpt2-large', 'gpt2-xl'}override_args = override_args or {}assert all(k=='dropout' for k in override_args)from transformers import GPT2LMHeadModelprint("loading weights from pretrained gpt: %s" % model_type)# 模型配置参数config_args = {'gpt2':         dict(n_layer=12, n_head=12, n_embd=768),  # 124M params'gpt2-medium':  dict(n_layer=24, n_head=16, n_embd=1024), # 350M params'gpt2-large':   dict(n_layer=36, n_head=20, n_embd=1280), # 774M params'gpt2-xl':      dict(n_layer=48, n_head=25, n_embd=1600), # 1558M params}[model_type]print("forcing vocab_size=50257, block_size=1024, bias=True")config_args['vocab_size'] = 50257config_args['block_size'] = 1024config_args['bias'] = True# 如果 dropout 在参数列表中则对齐进行覆写if 'dropout' in override_args:            print(f"overriding dropout rate to {override_args['dropout']}")config_args['dropout'] = override_args['dropout']config = GPTConfig(**config_args)model = GPT(config)sd = model.state_dict()sd_keys = sd.keys()sd_keys = [k for k in sd_keys if not k.endswith('.attn.bias')]# 加载预训练权重model_hf = GPT2LMHeadModel.from_pretrained(model_type)sd_hf = model_hf.state_dict()# 因为这里是nanoGPT,所以只需要从完整GPT模型中拷贝一部分权重即可sd_keys_hf = sd_hf.keys()sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.masked_bias')]sd_keys_hf = [k for k in sd_keys_hf if not k.endswith('.attn.bias')]transposed = ['attn.c_attn.weight', 'attn.c_proj.weight', 'mlp.c_fc.weight', 'mlp.c_proj.weight']assert len(sd_keys_hf) == len(sd_keys), f"mismatched keys: {len(sd_keys_hf)} != {len(sd_keys)}"for k in sd_keys_hf:if any(k.endswith(w) for w in transposed):assert sd_hf[k].shape[::-1] == sd[k].shapewith torch.no_grad():sd[k].copy_(sd_hf[k].t())else:assert sd_hf[k].shape == sd[k].shapewith torch.no_grad():sd[k].copy_(sd_hf[k])return model            # 配置 AdamW 优化器def configure_optimizers(self, weight_decay, learning_rate, betas, device_type):param_dict = {pn: p for pn,p in self.named_parameters()}param_dict = {pn: p for pn,p in param_dict.items() if p.requires_grad}decay_params = [p for n,p in param_dict.items() if p.dim() >= 2]nodecay_params = [p for n,p in param_dict.items() if p.dim() < 2]optim_groups = [{'params': decay_params, 'weight_decay': weight_decay},{'params': nodecay_params, 'weight_decay': 0.0}]num_decay_params = sum(p.numel() for p in decay_params)num_nodecay_params = sum(p.numel() for p in nodecay_params)print(f"num decayed parameter tensors: {len(decay_params)}, with {num_decay_params:,} parameters")print(f"num non-decayed parameter tensors: {len(nodecay_params)}, with {num_nodecay_params:,} parameters")fused_available = 'fused' in inspect.signature(torch.optim.AdamW).parametersuse_fused = fused_available and device_type == 'cuda'extra_args = dict(fused=True) if use_fused else dict()optimizer = torch.optim.AdamW(optim_groups, lr=learning_rate, betas=betas, **extra_args)print(f"using fused AdamW: {use_fused}")return optimizer# 估算模型的GPU利用率def estimate_mfu(self, fwdbwd_per_iter, dt):N = self.get_num_params()cfg = self.configL,H,Q,T = cfg.n_layer, cfg.n_head, cfg.n_embd//cfg.n_head, cfg.block_sizeflops_per_token = 6*N + 12*L*H*Q*Tflops_per_fwdbwd = flops_per_token * Tflops_per_iter = flops_per_fwdbwd * fwdbwd_per_iterflops_achieved = flops_per_iter * (1.0/dt)flops_promised = 312e12mfu = flops_achieved / flops_promisedreturn mfu@torch.no_grad()def generate(self, idx, max_new_tokens, temperature=0.1, top_k=None):for _ in range(max_new_tokens):idx_cond = idx if idx.size(1) <= self.config.block_size else idx [:, -self.config.block_size:]logits, _ = self(idx_cond)logits = logits[:, -1, :] / temperatureif top_k is not None:v, _ = torch.topk(logits, min(top_k, logits.size(-1)))logits[logits < v[:, [-1]]] = -float('Inf')probs = F.softmax(logits, dim=-1)idx_next = torch.multinomial(probs, num_samples=1)idx = torch.cat((idx, idx_next), dim=1)return idx

2. 训练用的快捷配置导入 configuration.py

因为在模型训练时需要导入很多公共配置信息,这里额外给了一个文件用来从 config 文件夹中加载模型配置,这部分就是python简单的文件与sys读取。

具体用法如下:

(model) $ python train.py config/override_file.py --batch_size=32

config 文件夹结构如下:

(model) $ tree -L 2 
├── config
│   ├── eval_gpt2.py
│   ├── eval_gpt2_large.py
│   ├── eval_gpt2_medium.py
│   ├── eval_gpt2_xl.py
│   ├── finetune_shakespeare.py
│   ├── train_gpt2.py
│   └── train_shakespeare_char.py

代码如下:

import sys
from ast import literal_evalfor arg in sys.argv[1:]:if '=' not in arg:assert not arg.startswith('--')config_file = argprint(f"Overriding config with {config_file}:")with open(config_file) as f:print(f.read())exec(open(config_file).read())else:assert arg.startswith('--')key, val = arg.split('=')key = key[2:]if key in globals():try:attempt = literal_eval(val)except (SyntaxError, ValueError):attempt = valassert type(attempt) == type(globals()[key])print(f"Overriding: {key} = {attempt}")globals()[key] = attemptelse:raise ValueError(f"Unknown config key: {key}")

3. 训练模型 train.py

定义好模型后通常是进行训练,这里的训练是基于huggingface上的预训练权重进行的。

在这里插入图片描述

导入必要的包

import os, time, math, pickle
from contextlib import nullcontextimport numpy as np
import torch
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_groupfrom model import GPTConfig, GPT

配置模型超参数

# I/O 相关
out_dir = 'out'
eval_interval = 2000
log_interval = 1
eval_iters = 200
eval_only = False
always_save_checkpoint = True
init_from = 'scratch' # 'scratch' or 'resume' or 'gpt2*'# wandb logging 日志
wandb_log = False
wandb_project = 'owt'
wandb_run_name = 'gpt2'# data
dataset = 'openwebtext'
gradient_accumulation_steps = 5 * 8
batch_size = 12
block_size = 1024# model
n_layer = 12
n_head = 12
n_embd = 768
dropout = 0.0
bias = False# adamw optimizer
learning_rate = 6e-4
max_iters = 600000
weight_decay = 1e-1
beta1 = 0.9
beta2 = 0.95
grad_clip = 1.0# learning rate 衰减
decay_lr = True
warmup_iters = 2000
lr_decay_iters = 600000
min_lr = 6e-5# DDP settings
backend = 'nccl' # 'nccl', 'gloo', etc.# system
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1', 'mps'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32', 'bfloat16', 'float16'
compile = True # 使用 PyTorch 2.0 编译得到更快的模型config_keys = [k for k,v in globals().items() if not k.startswith('_') and isinstance(v, (int, float, bool, str))]
exec(open('configurator.py').read())
config = {k: globals()[k] for k in config_keys}

配置多GPU并行计算

ddp = int(os.environ.get('RANK', -1)) != -1
if ddp:init_process_group(backend=backend)ddp_rank = int(os.environ['RANK'])ddp_local_rank = int(os.environ['LOCAL_RANK'])ddp_world_size = int(os.environ['WORLD_SIZE'])device = f'cuda:{ddp_local_rank}'torch.cuda.set_device(device)master_process = ddp_rank == 0seed_offset = ddp_rankassert gradient_accumulation_steps % ddp_world_size == 0gradient_accumulation_steps //= ddp_world_size
else:master_process = Trueseed_offset = 0ddp_world_size = 1
tokens_per_iter = gradient_accumulation_steps * ddp_world_size * batch_size * block_size
print(f"tokens per iteration will be: {tokens_per_iter:,}")

初始化torch的一些训练用配置

if master_process:os.makedirs(out_dir, exist_ok=True)
torch.manual_seed(1337 + seed_offset)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)data_dir = os.path.join('data', dataset)
def get_batch(split):if split == 'train':data = np.memmap(os.path.join(data_dir, 'train.bin'), dtype=np.uint16, mode='r')else:data = np.memmap(os.path.join(data_dir, 'val.bin'), dtype=np.uint16, mode='r')ix = torch.randint(len(data) - block_size, (batch_size,))x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])if device_type == 'cuda':x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(device, non_blocking=True)else:x, y = x.to(device), y.to(device)return x, yiter_num = 0
best_val_loss = 1e9meta_path = os.path.join(data_dir, 'meta.pkl')
meta_vocab_size = None
if os.path.exists(meta_path):with open(meta_path, 'rb') as f:meta = pickle.load(f)meta_vocab_size = meta['vocab_size']print(f"found vocab_size = {meta_vocab_size} (inside {meta_path})")

初始化模型

model_args = dict(n_layer=n_layer, n_head=n_head, n_embd=n_embd, block_size=block_size,bias=bias, vocab_size=None, dropout=dropout)# 根据配置信息初始化模型
if init_from == 'scratch':print("Initializing a new model from scratch")if meta_vocab_size is None:print("defaulting to vocab_size of GPT-2 to 50304 (50257 rounded up for efficiency)")model_args['vocab_size'] = meta_vocab_size if meta_vocab_size is not None else 50304gptconf = GPTConfig(**model_args)model = GPT(gptconf)
elif init_from == 'resume':print(f"Resuming training from {out_dir}")ckpt_path = os.path.join(out_dir, 'ckpt.pt')checkpoint = torch.load(ckpt_path, map_location=device)checkpoint_model_args = checkpoint['model_args']for k in ['n_layer', 'n_head', 'n_embd', 'block_size', 'bias', 'vocab_size']:model_args[k] = checkpoint_model_args[k]gptconf = GPTConfig(**model_args)model = GPT(gptconf)state_dict = checkpoint['model']unwanted_prefix = '_orig_mod.'for k,v in list(state_dict.items()):if k.startswith(unwanted_prefix):state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)model.load_state_dict(state_dict)iter_num = checkpoint['iter_num']best_val_loss = checkpoint['best_val_loss']
elif init_from.startswith('gpt2'):print(f"Initializing from OpenAI GPT-2 weights: {init_from}")override_args = dict(dropout=dropout)model = GPT.from_pretrained(init_from, override_args)for k in ['n_layer', 'n_head', 'n_embd', 'block_size', 'bias', 'vocab_size']:model_args[k] = getattr(model.config, k)if block_size < model.config.block_size:model.crop_block_size(block_size)model_args['block_size'] = block_size
model.to(device)# 初始化 GradScaler
scaler = torch.cuda.amp.GradScaler(enabled=(dtype == 'float16'))

配置优化器

optimizer = model.configure_optimizers(weight_decay, learning_rate, (beta1, beta2), device_type)
if init_from == 'resume':optimizer.load_state_dict(checkpoint['optimizer'])
checkpoint = None

Pytorch 2.0 编译模型(可选)

if compile:print("compiling the model... (takes a ~minute)")unoptimized_model = modelmodel = torch.compile(model)# 配置分布式训练
if ddp:model = DDP(model, device_ids=[ddp_local_rank])

定义评估损失函数

@torch.no_grad()
def estimate_loss():out = {}model.eval()for split in ['train', 'val']:losses = torch.zeros(eval_iters)for k in range(eval_iters):X,Y = get_batch(split)with ctx:logits, loss = model(X,Y)losses[k] = loss.item()out[split] = losses.mean()model.train()return out

定义学习率衰减计划

def get_lr(it):if it < warmup_iters:return learning_rate * (it + 1) / (warmup_iters + 1)if it > warmup_iters:return min_lrdecay_ratio = (it - warmup_iters) / (lr_decay_iters - warmup_iters)assert 0 <= decay_ratio <= 1coeff = 0.5 * (1.0 + math.cos(math.pi * decay_ratio))return min_lr + coeff * (learning_rate - min_lr)

初始化日志

if wandb_log and master_process:import wandbwandb.init(project=wandb_project, name=wandb_run_name, config=config)

启动训练

X, Y = get_batch('train')
t0 = time.time()
local_iter_num = 0
raw_model = model.module if ddp else model
running_mfu = -1.0while True:lr = get_lr(iter_num) if decay_lr else learning_ratefor param_group in optimizer.param_groups:param_group['lr'] = lr# 根据当前迭代的次数进行日志与evalif iter_num % eval_interval == 0 and master_process:losses = estimate_loss()print(f"step {iter_num}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")if wandb_log:wandb.log({"iter": iter_num,"train/loss": losses['train'],"val/loss": losses['val'],"lr": lr,"mfu": running_mfu*100, # convert to percentage})if losses['val'] < best_val_loss or always_save_checkpoint:best_val_loss = losses['val']if iter_num > 0:checkpoint = {'model': raw_model.state_dict(),'optimizer': optimizer.state_dict(),'model_args': model_args,'iter_num': iter_num,'best_val_loss': best_val_loss,'config': config,}print(f"saving checkpoint to {out_dir}")torch.save(checkpoint, os.path.join(out_dir, 'ckpt.pt'))if iter_num == 0 and eval_only:break# 前向传播与训练for micro_step in range(gradient_accumulation_steps):if ddp:model.require_backward_grad_sync = (micro_step == gradient_accumulation_steps - 1)with ctx:logits, loss = model(X, Y)loss = loss / gradient_accumulation_stepsX, Y = get_batch('train')# 反向传播更新梯度scaler.scale(loss).backward()# 优化器根据梯度对参数进行更新if grad_clip != 0.0:scaler.unscale_(optimizer)torch.nn.utils.clip_grad_norm_(model.parameters(), grad_clip)scaler.step(optimizer)scaler.update()optimizer.zero_grad(set_to_none=True)# 训练期间的日志与计时t1 = time.time()dt = t1 - t0t0 = t1if iter_num % log_interval == 0 and master_process:lossf = loss.item() * gradient_accumulation_stepsif local_iter_num >= 5: mfu = raw_model.estimate_mfu(batch_size * gradient_accumulation_steps, dt)running_mfu = mfu if running_mfu == -1.0 else 0.9*running_mfu + 0.1*mfuprint(f"iter {iter_num}: loss {lossf:.4f}, time {dt*1000:.2f}ms, mfu {running_mfu*100:.2f}%")iter_num += 1local_iter_num += 1if iter_num > max_iters:break# 终止分布式配置
if ddp:destroy_process_group()

4. 使用示例 sample.py

在定义好模型结构、训练流程后就可以调用使用示例了,这里的使用示例就是一个纯推理。

在这里插入图片描述

导入必要的包

import os, pickle, torch, tiktoken
from contextlib import nullcontext
from model import GPTConfig, GPT

配置超参数

init_from = 'resume' # 'resume' or 'gpt2-xl'
out_dir = 'out'
start = "\n"
num_samples = 10
max_new_tokens = 500
temperature = 0.8
top_k = 200
seed = 1337
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32' or 'bfloat16' or 'float16'
compile = False # PyTorch 2.0 加速编译
exec(open('configurator.py').read())

配置torch

torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)

初始化模型

if init_from == 'resume':ckpt_path = os.path.join(out_dir, 'ckpt.pt')checkpoint = torch.load(ckpt_path, map_location=device)gptconf = GPTConfig(**checkpoint['model_args'])model = GPT(gptconf)state_dict = checkpoint['model']unwanted_prefix = '_orig_mod.'for k,v in list(state_dict.items()):if k.startswith(unwanted_prefix):state_dict[k[len(unwanted_prefix):]] = state_dict.pop(k)model.load_state_dict(state_dict)
elif init_from.startswith('gpt2'):model = GPT.from_pretrained(init_from, dict(dropout=0.0))model.eval()
model.to(device)
if compile:model = torch.compile(model)

初始化环境

load_meta = False
if init_from == 'resume' and 'config' in checkpoint and 'dataset' in checkpoint['config']:meta_path = os.path.join('data', checkpoint['config']['dataset'], 'meta.pkl')load_meta = os.path.exists(meta_path)
if load_meta:print(f"Loading meta from {meta_path}...")with open(meta_path, 'rb') as f:meta = pickle.load(f)stoi, itos = meta['stoi'], meta['itos']encode = lambda s: [stoi[c] for c in s]decode = lambda l: ''.join([itos[i] for i in l])
else:print("No meta.pkl found, assuming GPT-2 encodings...")enc = tiktoken.get_encoding("gpt2")encode = lambda s: enc.encode(s, allowed_special={"<|endoftext|>"})decode = lambda l: enc.decode(l)

对提示词进行编码

if start.startswith('FILE:'):with open(start[5:], 'r', encoding='utf-8') as f:start = f.read()
start_ids = encode(start)
x = (torch.tensor(start_ids, dtype=torch.long, device=device)[None, ...])

进行测试

with torch.no_grad():with ctx:for k in range(num_samples):y = model.generate(x, max_new_tokens, temperature=temperature, top_k=top_k)print(decode(y[0].tolist()))print('---------------')

5. benchmark评估 bench.py

在训练和测试完模型后通常需要对其进行一次bench评估,如果要发论文的话这步是必不可少的环节。

在这里插入图片描述

导入必要的包

import os
from contextlib import nullcontext
import numpy as np
import time
import torch
from model import GPTConfig, GPT

配置超参数

batch_size = 12
block_size = 1024
bias = False
real_data = True
seed = 1337
device = 'cuda' # 'cpu', 'cuda', 'cuda:0', 'cuda:1'
dtype = 'bfloat16' if torch.cuda.is_available() and torch.cuda.is_bf16_supported() else 'float16' # 'float32' or 'bfloat16' or 'float16'
compile = True # PyTorch 2.0 加速编译
profile = False # 是否使用 pytorch profiler 或只进行简单评估
exec(open('configurator.py').read())

配置torch

torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True
device_type = 'cuda' if 'cuda' in device else 'cpu'
ptdtype = {'float32': torch.float32, 'bfloat16': torch.bfloat16, 'float16': torch.float16}[dtype]
ctx = nullcontext() if device_type == 'cpu' else torch.amp.autocast(device_type=device_type, dtype=ptdtype)

加载数据并初始化 - openwebtext

if real_data:dataset = 'openwebtext'data_dir = os.path.join('data', dataset)train_data = np.memmap(os.path.join(data_dir, 'train.bin'), dtype=np.uint16, mode='r')def get_batch(split):data = train_dataix = torch.randint(len(data) - block_size, (batch_size,))x = torch.stack([torch.from_numpy((data[i:i+block_size]).astype(np.int64)) for i in ix])y = torch.stack([torch.from_numpy((data[i+1:i+1+block_size]).astype(np.int64)) for i in ix])x, y = x.pin_memory().to(device, non_blocking=True), y.pin_memory().to(device, non_blocking=True)return x, y
else:   # 直接给一堆噪声x = torch.randint(50304, (batch_size, block_size), device=device)y = torch.randint(50304, (batch_size, block_size), device=device)get_batch = lambda split: (x, y)

模型初始化

gptconf = GPTConfig(block_size = block_size,n_layer = 12, n_head = 12, n_embd = 768, dropout = 0, bias = bias,
)
model = GPT(gptconf)
model.to(device)optimizer = model.configure_optimizers(weight_decay=1e-2, learning_rate=1e-4, betas=(0.9, 0.95), device_type=device_type)if compile:print("Compiling model...")model = torch.compile(model) # pytorch 2.0

benchmark评估

if profile:wait, warmup, active = 5, 5, 5num_steps = wait + warmup + activewith torch.profiler.profile(activities=[torch.profiler.ProfilerActivity.CPU, torch.profiler.ProfilerActivity.CUDA],schedule=torch.profiler.schedule(wait=wait, warmup=warmup, active=active, repeat=1),on_trace_ready=torch.profiler.tensorboard_trace_handler('./bench_log'),record_shapes=False,profile_memory=False,with_stack=False,with_flops=True,with_modules=False,) as prof:X, Y = get_batch('train')for k in range(num_steps):with ctx:logits, loss = model(X, Y)X, Y = get_batch('train')optimizer.zero_grad(set_to_none=True)loss.backward()optimizer.step()lossf = loss.item()print(f"{k}/{num_steps} loss: {lossf:.4f}")prof.step()
else: # simple benchmarkingtorch.cuda.synchronize()for stage, num_steps in enumerate([10, 20]):t0 = time.time()X, Y = get_batch('train')for k in range(num_steps):with ctx:logits, loss = model(X, Y)X, Y = get_batch('train')optimizer.zero_grad(set_to_none=True)loss.backward()optimizer.step()lossf = loss.item()print(f"{k}/{num_steps} loss: {lossf:.4f}")torch.cuda.synchronize()t1 = time.time()dt = t1-t0mfu = model.estimate_mfu(batch_size * 1 * num_steps, dt)if stage == 1:print(f"time per iteration: {dt/num_steps*1000:.4f}ms, MFU: {mfu*100:.2f}%")

相关文章:

  • 智能开发工具PhpStorm v2025.1——增强AI辅助编码功能
  • uniapp打包H5,输入网址空白情况
  • 设计模式的原理及深入解析
  • Cursor日常配置指南
  • 【C++进阶篇】AVL树的实现(赋源码)
  • 元注解(Meta-Annotations)详解
  • 双条件拆分工作表,一键生成独立工作簿-Excel易用宝
  • Python将Excel单元格某一范围生成—截图(进阶版—带样式+批量+多级表头)
  • reserve学习笔记(花指令)
  • W3电力线载波通信技术
  • 项目删除了,为什么vscode中的git还是存在未提交记录,应该怎么删除掉
  • 系统安全应用
  • 系统安全及应用深度笔记
  • Android13 以太网(YT8531)
  • MetaERP:开启企业数字化管理新时代
  • 2025年渗透测试面试题总结-各厂商二面试题01(题目+回答)
  • 智能呼叫中心系统的功能
  • 设计模式-面试题
  • 用Caffeine和自定义注解+AOP优雅实现本地防抖接口限流
  • 基于RT-Thread的STM32F4开发第五讲——软件模拟I2C
  • 83岁山水花鸟画家、书法家吴静山离世,系岭南画派代表人物
  • 天问二号探测器顺利转入发射区,计划5月底择机发射
  • 因救心梗同学缺席职教高考的姜昭鹏顺利完成补考
  • 山东茌平民企巨头实控人省外再出手:斥资16亿拿下山西一宗探矿权
  • 专利申请全球领先!去年我国卫星导航与位置服务产值超5700亿
  • 学者三年实地调查被判AI代笔,论文AI率检测如何避免“误伤”