GPT_Data_Processing_Tutorial
从零开始构建GPT数据处理管道:完整教程
目录
- 引言:为什么数据处理是LLM的基础?
- 文本分词:从字符到token
- 构建词汇表:token到ID的映射
- 特殊标记:处理未知和边界情况
- BytePair编码:工业级解决方案
- 滑动窗口:创造训练样本
- 嵌入层:从离散到连续
- 位置编码:让模型理解顺序
- 完整实现:数据加载器
- 实践建议与常见问题
引言:为什么数据处理是LLM的基础?
在深入理解大语言模型(LLM)之前,我们必须先解决一个根本问题:如何将人类语言转换为模型可以处理的数字形式?
本教程将带你一步步构建完整的数据处理管道,从原始文本到模型输入。我们将以GPT-2为例,理解现代LLM处理文本的核心原理。
学习目标:
- 理解文本分词的必要性和方法
- 掌握从简单到复杂的分词器实现
- 学会使用滑动窗口创建训练数据
- 理解嵌入层和位置编码的作用
- 构建可复用的数据加载器
文本分词:从字符到token
为什么需要分词?
计算机无法直接理解文字,需要将文本切分成更小的单元,这些单元称为token。token可以是:
- 单词(“hello”, “world”)
- 子词(“un”, “happy”)
- 字符(“a”, “b”, “c”)
简单分词实现
让我们从最基础的分词开始:
import retext = "Hello, world. Is this-- a test?"# 使用正则表达式分割文本
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)
tokens = [item.strip() for item in tokens if item.strip()]print(tokens)
# 输出:['Hello', ',', 'world', '.', 'Is', 'this', '--', 'a', 'test', '?']
正则表达式解析:
[,.:;?_!"()\']
- 匹配标点符号|--
- 匹配双破折号|\s
- 匹配空白字符- 外层的
()
保留分隔符
处理实际文本
让我们处理一个完整的故事:
# 下载并加载文本
import urllib.request
url = "https://raw.githubusercontent.com/rasbt/LLMs-from-scratch/main/ch02/01_main-chapter-code/the-verdict.txt"
raw_text = urllib.request.urlopen(url).read().decode('utf-8')# 分词处理
tokens = re.split(r'([,.:;?_!"()\']|--|\s)', raw_text)
tokens = [item.strip() for item in tokens if item.strip()]print(f"总token数: {len(tokens)}")
# 输出:4690
构建词汇表:token到ID的映射
词汇表的作用
神经网络处理的是数字,不是文本。词汇表建立了token和唯一ID之间的映射关系。
# 构建词汇表
all_words = sorted(set(tokens)) # 去重并排序
vocab = {token: integer for integer, token in enumerate(all_words)}print(f"词汇表大小: {len(vocab)}")
# 输出:1130
实现基础分词器
class SimpleTokenizerV1:def __init__(self, vocab):self.str_to_int = vocab # token → IDself.int_to_str = {i: s for s, i in vocab.items()} # ID → tokendef encode(self, text):"""将文本转换为ID序列"""tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)tokens = [item.strip() for item in tokens if item.strip()]ids = [self.str_to_int[token] for token in tokens]return idsdef decode(self, ids):"""将ID序列转换回文本"""text = " ".join([self.int_to_str[i] for i in ids])# 清理标点符号前的空格text = re.sub(r'\s+([,.?!"()\'])', r'\1', text)return text# 使用示例
tokenizer = SimpleTokenizerV1(vocab)
text = "Hello, world."
ids = tokenizer.encode(text)
print(f"编码结果: {ids}")
print(f"解码结果: {tokenizer.decode(ids)}")
特殊标记:处理未知和边界情况
问题:未知词汇
基础分词器遇到训练时未见过的词会报错:
# 尝试编码不在词汇表中的文本
text = "Hello, do you like tea?"
try:tokenizer.encode(text)
except KeyError as e:print(f"错误: {e}")
# 输出:错误: 'Hello' # 'Hello' 不在词汇表中
解决方案:添加特殊标记
# 扩展词汇表,添加特殊标记
all_tokens.extend(["", "<|unk|>"])
vocab = {token: integer for integer, token in enumerate(all_tokens)}# 改进的分词器
class SimpleTokenizerV2:def __init__(self, vocab):self.str_to_int = vocabself.int_to_str = {i: s for s, i in vocab.items()}def encode(self, text):tokens = re.split(r'([,.:;?_!"()\']|--|\s)', text)tokens = [item.strip() for item in tokens if item.strip()]# 处理未知词tokens = [token if token in self.str_to_int else "<|unk|>" for token in tokens]ids = [self.str_to_int[token] for token in tokens]return idsdef decode(self, ids):text = " ".join([self.int_to_str[i] for i in ids])text = re.sub(r'\s+([,.?!"()\'])', r'\1', text)return text# 测试改进的分词器
tokenizer = SimpleTokenizerV2(vocab)
text = "Hello, do you like tea?"
ids = tokenizer.encode(text)
print(f"编码: {ids}")
print(f"解码: {tokenizer.decode(ids)}")
常用特殊标记
标记 | 作用 | GPT-2中的处理 |
---|---|---|
`< | unk | >` |
<s> | 序列开始 | 不使用 |
</s> | 序列结束 | 使用`` |
<pad> | 填充 | 使用`` |
BytePair编码:工业级解决方案
BPE的原理
BytePair编码(BPE)是一种数据压缩算法,被OpenAI用于GPT系列模型。它的核心思想是:
- 从字符级别开始
- 迭代合并最频繁的相邻token对
- 形成更大的子词单元
使用tiktoken库
import tiktoken# 初始化GPT-2编码器
tokenizer = tiktoken.get_encoding("gpt2")# 编码文本
text = "Hello, do you like tea? In the sunlit terraces of someunknownPlace."
ids = tokenizer.encode(text, allowed_special={""})
print(f"Token IDs: {ids}")# 解码
decoded = tokenizer.decode(ids)
print(f"解码文本: {decoded}")# 查看token对应的文本
for i, token_id in enumerate(ids[:10]):token_text = tokenizer.decode([token_id])print(f"Token {i}: ID={token_id}, Text='{token_text}'")
BPE的优势
# 处理未知词
text = "unfamiliarword"
ids = tokenizer.encode(text)
tokens = [tokenizer.decode([id]) for id in ids]
print(f"BPE分解: {tokens}")
# 可能输出:['unfam', 'iliar', 'word']# BPE将未知词分解为已知的子词组合
# 避免了<unk>标记,保持更多信息
滑动窗口:创造训练样本
为什么需要滑动窗口?
LLM的任务是预测下一个词。我们需要将长文本切分成多个训练样本,每个样本包含:
- 输入序列:连续的token
- 目标序列:输入序列向右移动一位
滑动窗口实现
# 示例:创建训练样本
token_ids = [290, 4920, 2241, 287, 257, 984, 15632, 438]
context_size = 4# 使用滑动窗口
samples = []
for i in range(len(token_ids) - context_size):input_seq = token_ids[i:i + context_size]target_seq = token_ids[i + 1: i + context_size + 1]samples.append((input_seq, target_seq))# 打印样本
for i, (inp, tgt) in enumerate(samples):print(f"样本 {i}:")print(f" 输入: {inp}")print(f" 目标: {tgt}")print(f" 任务: 根据前{len(inp)}个词预测下一个词")
关键参数说明
# 滑动窗口参数
max_length = 256 # 每个样本的长度
stride = 128 # 滑动步长# 完整实现
def create_samples(token_ids, max_length, stride):inputs = []targets = []for i in range(0, len(token_ids) - max_length, stride):input_chunk = token_ids[i:i + max_length]target_chunk = token_ids[i + 1: i + max_length + 1]inputs.append(input_chunk)targets.append(target_chunk)return inputs, targets
参数选择策略:
stride = 1
:最大数据利用率,高重叠stride = max_length
:无重叠,最高效stride < max_length
:平衡效率和多样性
嵌入层:从离散到连续
嵌入层的作用
神经网络需要连续的数值输入,而不是离散的整数。嵌入层将token ID映射为高维向量。
import torch
import torch.nn as nn# 创建嵌入层
vocab_size = 50257 # GPT-2词汇表大小
embedding_dim = 256 # 嵌入维度embedding_layer = nn.Embedding(vocab_size, embedding_dim)# 查看嵌入矩阵
print(f"嵌入层形状: {embedding_layer.weight.shape}")
# 输出:torch.Size([50257, 256])# 转换token ID到向量
token_ids = torch.tensor([15496, 11, 466]) # "Hello"的token
embeddings = embedding_layer(token_ids)
print(f"嵌入向量形状: {embeddings.shape}")
# 输出:torch.Size([3, 256])
嵌入层的工作原理
# 嵌入层本质上是一个查找表
# 每个token ID对应一行向量# 模拟简单例子
simple_vocab = 5
embed_dim = 3
simple_embedding = nn.Embedding(simple_vocab, embed_dim)# 手动查看
for i in range(simple_vocab):vector = simple_embedding(torch.tensor([i]))print(f"Token {i}: {vector.squeeze().tolist()}")
为什么使用嵌入层?
- 降维:将高维one-hot编码压缩到低维空间
- 语义相似性:相似的词在向量空间中更接近
- 可训练:嵌入向量通过训练学习到最优表示
位置编码:让模型理解顺序
问题:嵌入层丢失位置信息
# 相同的词,不同位置
text1 = "The cat sat"
text2 = "Sat the cat"# 经过嵌入层后,相同词的向量相同
# 但位置信息丢失了!
解决方案:绝对位置编码
# 创建位置嵌入层
context_length = 1024 # 最大序列长度
pos_embedding_layer = nn.Embedding(context_length, embedding_dim)# 生成位置索引
max_length = 4
position_ids = torch.arange(max_length)
print(f"位置ID: {position_ids}")
# 输出:tensor([0, 1, 2, 3])# 获取位置嵌入
pos_embeddings = pos_embedding_layer(position_ids)
print(f"位置嵌入形状: {pos_embeddings.shape}")
# 输出:torch.Size([4, 256])
组合token嵌入和位置嵌入
# 完整的输入嵌入计算
def create_input_embeddings(token_ids, max_length):# Token嵌入token_embeddings = token_embedding_layer(token_ids)# 位置嵌入position_ids = torch.arange(max_length)position_embeddings = pos_embedding_layer(position_ids)# 相加(广播机制)input_embeddings = token_embeddings + position_embeddingsreturn input_embeddings# 示例
batch_size = 8
token_ids = torch.randint(0, vocab_size, (batch_size, max_length))
input_embeddings = create_input_embeddings(token_ids, max_length)
print(f"最终嵌入形状: {input_embeddings.shape}")
# 输出:torch.Size([8, 4, 256])
位置编码的作用机制
# 可视化位置编码的影响
import matplotlib.pyplot as plt# 计算不同位置的余弦相似度
def plot_position_similarity():pos_embeds = pos_embedding_layer.weight.detach().numpy()# 计算位置0与其他位置的相似度pos_0 = pos_embeds[0]similarities = []for i in range(20):pos_i = pos_embeds[i]sim = np.dot(pos_0, pos_i) / (np.linalg.norm(pos_0) * np.linalg.norm(pos_i))similarities.append(sim)plt.figure(figsize=(10, 5))plt.plot(similarities)plt.xlabel('Position')plt.ylabel('Cosine Similarity with Position 0')plt.title('Position Embedding Similarity')plt.show()# 每个位置都有独特的表示,模型可以区分不同的位置
完整实现:数据加载器
PyTorch数据集类
import torch
from torch.utils.data import Dataset, DataLoader
import tiktokenclass GPTDataset(Dataset):def __init__(self, text, max_length=256, stride=128):self.max_length = max_lengthself.stride = stride# 初始化tokenizerself.tokenizer = tiktoken.get_encoding("gpt2")# 编码全文self.token_ids = self.tokenizer.encode(text, allowed_special={""})# 创建输入和目标self.inputs = []self.targets = []for i in range(0, len(self.token_ids) - max_length, stride):input_chunk = self.token_ids[i:i + max_length]target_chunk = self.token_ids[i + 1: i + max_length + 1]self.inputs.append(torch.tensor(input_chunk))self.targets.append(torch.tensor(target_chunk))def __len__(self):return len(self.inputs)def __getitem__(self, idx):return self.inputs[idx], self.targets[idx]def create_dataloader(text, batch_size=4, max_length=256,stride=128, shuffle=True, drop_last=True):"""创建数据加载器"""dataset = GPTDataset(text, max_length, stride)dataloader = DataLoader(dataset,batch_size=batch_size,shuffle=shuffle,drop_last=drop_last,num_workers=0)return dataloader
完整的嵌入处理流程
class GPTEmbeddingProcessor:def __init__(self, vocab_size=50257, embed_dim=768, max_length=1024):self.vocab_size = vocab_sizeself.embed_dim = embed_dimself.max_length = max_length# 初始化嵌入层self.token_embedding = nn.Embedding(vocab_size, embed_dim)self.position_embedding = nn.Embedding(max_length, embed_dim)# 初始化tokenizerself.tokenizer = tiktoken.get_encoding("gpt2")def process_batch(self, token_ids):"""处理一个批次的token IDs"""batch_size, seq_length = token_ids.shape# Token嵌入token_embeds = self.token_embedding(token_ids)# 位置嵌入position_ids = torch.arange(seq_length, device=token_ids.device)position_embeds = self.position_embedding(position_ids)# 广播相加position_embeds = position_embeds.unsqueeze(0).expand(batch_size, -1, -1)# 最终嵌入input_embeddings = token_embeds + position_embedsreturn input_embeddings# 使用示例
processor = GPTEmbeddingProcessor()# 加载数据
with open("the-verdict.txt", "r", encoding="utf-8") as f:text = f.read()# 创建数据加载器
dataloader = create_dataloader(text, batch_size=8, max_length=4, stride=4)# 处理数据
for batch_inputs, batch_targets in dataloader:# 获取嵌入embeddings = processor.process_batch(batch_inputs)print(f"输入形状: {batch_inputs.shape}")print(f"嵌入形状: {embeddings.shape}")print(f"目标形状: {batch_targets.shape}")break # 只处理第一个batch
数据流完整示例
# 完整的数据处理流程
def demonstrate_full_pipeline():"""演示完整的数据处理流程"""# 1. 原始文本text = "The quick brown fox jumps over the lazy dog."print(f"原始文本: {text}")# 2. 分词tokenizer = tiktoken.get_encoding("gpt2")token_ids = tokenizer.encode(text)print(f"Token IDs: {token_ids}")# 3. 创建滑动窗口样本max_length = 5samples = []for i in range(len(token_ids) - max_length):input_ids = token_ids[i:i + max_length]target_ids = token_ids[i + 1: i + max_length + 1]samples.append((input_ids, target_ids))print(f"\n创建了 {len(samples)} 个训练样本")# 4. 转换为tensorinput_tensor = torch.tensor(samples[0][0]).unsqueeze(0)target_tensor = torch.tensor(samples[0][1]).unsqueeze(0)print(f"\n第一个样本:")print(f"输入: {input_tensor.tolist()[0]}")print(f"目标: {target_tensor.tolist()[0]}")# 5. 嵌入处理processor = GPTEmbeddingProcessor()embeddings = processor.process_batch(input_tensor)print(f"\n嵌入形状: {embeddings.shape}")print(f"每个token被映射到 {embeddings.shape[-1]} 维向量")# 6. 解码展示print("\n解码展示:")input_text = tokenizer.decode(input_tensor[0])target_text = tokenizer.decode(target_tensor[0])print(f"输入文本: '{input_text}'")print(f"目标文本: '{target_text}'")demonstrate_full_pipeline()
实践建议与常见问题
最佳实践
-
选择合适的max_length
# 根据硬件和任务选择 GPU内存 < 8GB: max_length = 128-256 GPU内存 = 16GB: max_length = 512-1024 GPU内存 > 32GB: max_length = 1024-2048
-
stride的设置策略
# 训练初期:小stride,更多数据 stride = max_length // 4# 训练后期:大stride,避免过拟合 stride = max_length // 2
-
批处理优化
# 动态batch size batch_size = {"embed_dim": 256: 32,"embed_dim": 512: 16,"embed_dim": 768: 8,"embed_dim": 1024: 4 }[embed_dim]
常见问题解决
Q1: 如何处理超长文本?
def chunk_long_text(text, chunk_size=10000, overlap=100):"""将长文本分块处理"""chunks = []for i in range(0, len(text), chunk_size - overlap):chunk = text[i:i + chunk_size]chunks.append(chunk)return chunks
Q2: 内存不足怎么办?
# 使用梯度累积
accumulation_steps = 4
effective_batch_size = batch_size * accumulation_steps# 或者减小batch size和max_length
batch_size = 2
max_length = 128
Q3: 如何验证数据处理正确性?
def verify_data_pipeline(dataloader):"""验证数据处理的正确性"""for inputs, targets in dataloader:# 检查形状assert inputs.shape == targets.shape# 检查目标是否是输入的偏移assert torch.all(targets[:, :-1] == inputs[:, 1:])# 检查范围assert inputs.min() >= 0assert inputs.max() < vocab_sizeprint("✓ 数据验证通过")break
性能优化技巧
-
预加载tokenizer
# 避免重复初始化 tokenizer = tiktoken.get_encoding("gpt2")
-
使用pin_memory
dataloader = DataLoader(dataset, batch_size=32, pin_memory=True)
-
多进程加载
dataloader = DataLoader(dataset, batch_size=32, num_workers=4)
调试技巧
def debug_dataloader(dataloader, num_samples=3):"""调试数据加载器"""print("=== 数据加载器调试信息 ===")for i, (inputs, targets) in enumerate(dataloader):if i >= num_samples:breakprint(f"\n样本 {i}:")print(f" 输入形状: {inputs.shape}")print(f" 目标形状: {targets.shape}")print(f" 输入范围: [{inputs.min()}, {inputs.max()}]")# 解码第一个样本input_text = tokenizer.decode(inputs[0])target_text = tokenizer.decode(targets[0])print(f" 输入文本: {input_text[:50]}...")print(f" 目标文本: {target_text[:50]}...")