文本编码--BPE
文章目录
- 为什么要文本编码
- 字节对编码/BPE
- 原理说明
- 训练过程举例
- 简单自定义实现
为什么要文本编码
在机器学习或深度学习中,不管是何种模型,底层运算都依赖于数值。在LLMs中,需要将文本字符串转换为可以计算的数值型数据。在分类任务中,会为每个类别分配一个数值型的lable,LLMs中文本的转换其实也是如此;以“字符单位”举例,从左至右预测下一个字符本质就是在预测一个类别标签,每预测一个标签就能周到对应的字符,即实现了文本字符串与数值型数据的转换。
字符串转换的前提是需要搞清楚“具体应该将哪些部分看作是一个独立个体,即基础单位是什么”。对于中文文本,一般是将一个汉字视为基础单位,也可能会将高频的多字短语视为基础单位;对于英文文本,简单的可以以单词为基础单位,也可以以字母为基础单位。
确定好基础单位定义后就可以将连续的字符串离散为“基础单位”组成的序列,为每个独立的基础单位分配对应的、不重复的数值编码,就能将文本编码为数值编码序列,进而用于后续的计算操作。
字节对编码/BPE
早期的LLMs均基于英文语料训练,对于字符级建模,词表很小,只需要包含26个英文字母和逗号、句号等附后即可,可能只有几百个字符,但是字符级建模会导致分析后文本序列过长,如“internationalization” 这个单词就需要20多个token表示,进而使得训练和推理开销极大。若进行词级建模,为了能覆盖所有单词,又会导致词表爆炸,因为光不同的英文单词可能就包含几十万个,词表过大会导致token的词向量维度过多,也会导致训练和推理开销大。
Byte Pair Encode,即BPE介于字符级建模和词级建模两者之间,通过统计高频的字节/字符对,合并称子词单元,保留高频整词,低频走子词/字符拆分,从而兼顾覆盖与序列长度。字节级BPE可以处理任何UTF-8文本,甚至是表情符号、阿拉伯文等;对于词表中不存在的新词,BPE可自适应地“拼出来”,不需要使用""等表示未知的token;对于代码、数学公式、URL等非自然语言文本也很友好。
对于中文,BPE编码的分词结果接近“字级建模”,因为大部分单个汉字频率高、信息量足,不需要再拆分成更小的字节,分词结果往往是一个汉字等价一个token;此外高频的多字词会被BPE合并,比如中国、人工智能等高频组合可能会合并为一个token。
原理说明
前文说到,需要给每个“基础”单位分配一个独一无二的数值编码,其实计算机中的字符编码本质就相当于一种“字符级建模+字级建模”的编码模式。计算机底层只能识别二进制数,但这不是人类可以识别的自然语言,在底层运行和前端展示之间存在一个编码、解码的过程,本质就是依赖人为设置的对应关系进行转换。
在计算机原理中,一个二进制位称为一个bit,8个bit称为一个字节/byte,最常见的编码ASCII编码,其定义了以英文字母为主的常见字符及对应的二进制数,使用一个字节表示一个字符;在UTF-8中用三个字节表示一个汉字,用四个字节表示一个emoji表情。UTF-8定义的有效字符数量大约有15万左右,一个字符不管由几个字节表示,其本质都和一个独一无二的二进制数值对应。
BPE算法最早由1994年的论文
提出,在该论文中是为了进行数据压缩。后续学者基于该论文思想将其引用到NLP领域,最初是为解决文本中出现的未知词,将文本分解到字符层面,英文通常是26个字母+标点符号,中文则是单个汉字,不断统计相邻字符对的频率,把出现最频繁的字符对合并为新的子词单元;字符级别BPE的优势是,可通过已有单词和子词组合表达一些OOV的未知词,但对于训练数据中未出现的如emoji表情、冷门Unicode符号等特殊字符还是不能表征,还是需要使用等特殊字符表示。
OpenAI为了解决上述问题从GPT-2提出了字节级别的BPE,即将文本转换为字节序列,然后从字节层面统计字节对,把出现最频繁的字节对合并为新的独立单元,因为所有Unicode符号均可以被拆分成字节序列,所以不再需要使用等特殊字符表示OOV的未知词。当训练数据足够多时,在英文语料上训练的字节级BPE和字符级BPE区别不大,因为对应常用的单词最终肯定会被组合出来,而不是全是字节级的符号。在中文等其他语言语料上训练的字节级BPE,子词的边界可能有些奇怪,但也能稳定工作,因为在UTF-8中一个字符由三个字节表示,可能前两字节组成了独立单元,进而导致一个中文需要由两个词表的子词组合表示。字节级的BPE完全覆盖Unicode,能处理任何输入,适合多语言场景。
1个字节有8个bits,即28=2562^8=25628=256种可能的值,即一个字节可表示256种不同值,BPE分词器通常使用这256值作为其256个单字符标记,通过以下代码可查看gpt2分词器前300项的具体情况,可以看到第256、257项不是单字符值,而是双字符值(一个空格+一个字母),第0到第255这256项是单字符值。
import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")for i in range(300):decoded = gpt2_tokenizer.decode([i])print(f"{i}: {decoded}")# 输入如下:
"""
prints:
0: !
1: "
2: #
...
255: � # <---- single character tokens up to here
256: t
257: a
...
298: ent
299: n
"""
BPE的目标是构建一个包含常见子词的词汇表,词汇表记录基于语聊训练得到的子词符号与数值的对应关系,此处的数值就是正常的十进制数。BPE训练过程可分解为以下步骤:
- 识别最频繁的对:每次迭代种扫描文本,找到出现次数最多的字节对或字符对
- 替换记录:
- 用一个新的数值或占位符ID替换频繁对
- 在查找表中记录这个映射;查找表就是词汇表,其大小是一个超参数,GPT-2种是50257
- 重复直到没有收益:
- 不断重复上面两个步骤,持续合并最频繁的对
- 直到不能继续压缩或词汇表满了为止
训练过程举例
假设有训练数据"the cat in the hat",要从中为BPE分词器构建词汇表,按上述先初始化了256个单字符,进行以下训练迭代步骤:
- 迭代一
- 识别频繁对:当前文本,"th"出现了两次
- 替换并记录:
- 用尚未使用的新标记ID–256替换"th"
- 新文本转换为:“<256>e cat in <256>e hat”
- 新词汇表为
0: …
…
256: “th”
- 迭代二
- 识别频繁对:在文本"<256>e cat in <256>e hat"中,"<256>e"出现了两次
- 替换并记录:
- 用尚未使用的新标记ID–257替换"<256>e"
- 新文本转换为:“<257> cat in <257> hat”
- 新词汇表为
0: …
…
256: “th”
257: “<256>e”
- 迭代三
- 识别频繁对:在文本"<257> cat in <257> hat"中,"<257> "出现了两次
- 替换并记录:
- 用尚未使用的新标记ID–258替换"<257> "
- 新文本转换为:“<258>cat in <258>hat”
- 新词汇表为
0: …
…
256: “th”
257: “<256>e”
258: "<257> "
- 依次迭代
简单自定义实现
以下是开源项目LLMs-from-scratch给出的一个用于教育目的的BPE实现,实现逻辑拆分如下:
- 将输入文本拆分为单个字节
- 重复查找并合并、替换字节对,继续合并关系
- 重复步骤2,直到无法构建更多的合并
- 最终构建有效词表,基于其可进行编解码
from collections import Counter, deque
from typing import Union, Tuple, List
from functools import lru_cache
import jsonclass BPETokenizerSimple:def __init__(self):# Maps token_id to token_str (e.g., {11246: "some"})self.vocab = {}# Maps token_str to token_id (e.g., {"some": 11246})self.inverse_vocab = {}# Dictionary of BPE merges: {(token_id1, token_id2): merged_token_id}self.bpe_merges = {} # 字典,key是元组,即合并的两个tokend_id,value是合并后的一个token_iddef train(self, text: str, vocab_size: int, allowed_special: set = {"<|endoftext|>"}) -> None:"""Train the BPE tokenizer from scratch.Args:text (str): The training text.vocab_size (int): The desired vocabulary size.allowed_special (set): A set of special tokens to include."""# Preprocess: Replace spaces with 'Ġ'# Note that Ġ is a particularity of the GPT-2 BPE implementation# E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]# (GPT-4 BPE would tokenize it as ["Hello", " world"])processed_text = []for i, char in enumerate(text): # 将文本中的空格替换为Ġif char == " " and i != 0:processed_text.append("Ġ")if char != " ":processed_text.append(char)processed_text = "".join(processed_text)# Initialize vocab with unique characters, including 'Ġ' if present# Start with the first 256 ASCII charactersunique_chars = [chr(i) for i in range(256)]# Extend unique_chars with characters from processed_text that are not already includedunique_chars.extend(char for char in sorted(set(processed_text))if char not in unique_chars)# Optionally, ensure 'Ġ' is included if it is relevant to your text processingif "Ġ" not in unique_chars:unique_chars.append("Ġ")# Now create the vocab and inverse vocab dictionariesself.vocab = {i: char for i, char in enumerate(unique_chars)}self.inverse_vocab = {char: i for i, char in self.vocab.items()}# Add allowed special tokensif allowed_special: # 添加允许的特殊tokenfor token in allowed_special:if token not in self.inverse_vocab:new_id = len(self.vocab)self.vocab[new_id] = tokenself.inverse_vocab[token] = new_id# Tokenize the processed_text into token IDs;将processed_text以字符为单位转换为token_idtoken_ids = [self.inverse_vocab[char] for char in processed_text]# BPE steps 1-3: Repeatedly find and replace frequent pairsfor new_id in range(len(self.vocab), vocab_size): # 从len(self.vocab)开始,直到设置的vocab_sizepair_id = self.find_freq_pair(token_ids, mode="most") # 从目前的token_ids中找到最频繁的pairif pair_id is None: # No more pairs to merge. Stopping training.break# 当前遍历的new_id就是上述找到的最频繁pair的新的token_id,用这个new_id替换token_ids中所有的pair_idtoken_ids = self.replace_pair(token_ids, pair_id, new_id)self.bpe_merges[pair_id] = new_id # 记录pair_id合并过程,用于后续解码# Build the vocabulary with merged tokensfor (p0, p1), new_id in self.bpe_merges.items():merged_token = self.vocab[p0] + self.vocab[p1]self.vocab[new_id] = merged_tokenself.inverse_vocab[merged_token] = new_iddef load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):"""Load pre-trained vocabulary and BPE merges from OpenAI's GPT-2 files.Args:vocab_path (str): Path to the vocab file (GPT-2 calls it 'encoder.json').bpe_merges_path (str): Path to the bpe_merges file (GPT-2 calls it 'vocab.bpe')."""# Load vocabularywith open(vocab_path, "r", encoding="utf-8") as file:loaded_vocab = json.load(file)# Convert loaded vocabulary to correct formatself.vocab = {int(v): k for k, v in loaded_vocab.items()}self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}# Handle newline character without adding a new token 处理换行符if "\n" not in self.inverse_vocab:# Use an existing token ID as a placeholder for '\n'# Preferentially use "<|endoftext|>" if available# 尝试使用现有的特殊 token 作为换行符的占位符,优先使用"<|endoftext|>",如果不可用,则使用"Ġ",如果也不可用,则使用""fallback_token = next((token for token in ["<|endoftext|>", "Ġ", ""] if token in self.inverse_vocab), None)if fallback_token is not None:newline_token_id = self.inverse_vocab[fallback_token]else:# If no fallback token is available, raise an errorraise KeyError("No suitable token found in vocabulary to map '\\n'.")self.inverse_vocab["\n"] = newline_token_idself.vocab[newline_token_id] = "\n"# Load BPE merges 加载BPE合并规则with open(bpe_merges_path, "r", encoding="utf-8") as file:lines = file.readlines()# Skip header line if presentif lines and lines[0].startswith("#"): # 跳过可能存在的注释行lines = lines[1:]for line in lines:pair = tuple(line.strip().split()) # 将每行按空格分割成一个元组 if len(pair) == 2: # 确保每行只有两个元素token1, token2 = pair # 将元组中的两个元素分别赋值给token1和token2if token1 in self.inverse_vocab and token2 in self.inverse_vocab: # 确保token1和token2都在词汇表中token_id1 = self.inverse_vocab[token1]token_id2 = self.inverse_vocab[token2]merged_token = token1 + token2if merged_token in self.inverse_vocab: # 确保合并后的token也在词汇表中merged_token_id = self.inverse_vocab[merged_token]self.bpe_merges[(token_id1, token_id2)] = merged_token_id# print(f"Loaded merge: '{token1}' + '{token2}' -> '{merged_token}' (ID: {merged_token_id})")else:print(f"Merged token '{merged_token}' not found in vocab. Skipping.")else:print(f"Skipping pair {pair} as one of the tokens is not in the vocabulary.")def encode(self, text: str, allowed_special: set = None) -> List[int]:"""Encode the input text into a list of token IDs, with tiktoken-style handling of special tokens.Args:text (str): The text to encode.allowed_special (set or None): Special tokens to allow passthrough. If None, special handling is disabled.Returns:List[int]: The list of token IDs."""import retoken_ids = []# If special token handling is enabledif allowed_special is not None and len(allowed_special) > 0:# Build regex to match allowed special tokensspecial_pattern = ("(" + "|".join(re.escape(tok) for tok in sorted(allowed_special, key=len, reverse=True)) + ")")last_index = 0for match in re.finditer(special_pattern, text):prefix = text[last_index:match.start()]token_ids.extend(self.encode(prefix, allowed_special=None)) # Encode prefix without special handlingspecial_token = match.group(0)if special_token in self.inverse_vocab:token_ids.append(self.inverse_vocab[special_token])else:raise ValueError(f"Special token {special_token} not found in vocabulary.")last_index = match.end()text = text[last_index:] # Remaining part to process normally# Check if any disallowed special tokens are in the remainderdisallowed = [tok for tok in self.inverse_vocabif tok.startswith("<|") and tok.endswith("|>") and tok in text and tok not in allowed_special]if disallowed:raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")# If no special tokens, or remaining text after special token split:tokens = []# First split on newlines to preserve themlines = text.split("\n")for i, line in enumerate(lines):if i > 0:tokens.append("\n")words = line.split()for j, word in enumerate(words):if j == 0 and i > 0:tokens.append("Ġ" + word)elif j == 0:tokens.append(word)else:tokens.append("Ġ" + word)for token in tokens:if token in self.inverse_vocab: # 如果token在词汇表中,则直接添加到token_ids中# token is contained in the vocabulary as istoken_ids.append(self.inverse_vocab[token])else: # 如果token不在词汇表中,则需要通过BPE进行处理# Attempt to handle subword tokenization via BPEsub_token_ids = self.tokenize_with_bpe(token) # 将token进行BPE处理token_ids.extend(sub_token_ids) # 将处理后的token_ids添加到token_ids中return token_idsdef tokenize_with_bpe(self, token: str) -> List[int]:"""Tokenize a single token using BPE merges.Args:token (str): The token to tokenize.Returns:List[int]: The list of token IDs after applying BPE."""# Tokenize the token into individual characters (as initial token IDs)token_ids = [self.inverse_vocab.get(char, None) for char in token]if None in token_ids: # 如果token_ids中存在None,则说明token中存在词汇表中不存在的字符missing_chars = [char for char, tid in zip(token, token_ids) if tid is None] # 找出token中哪些字符不在词汇表中raise ValueError(f"Characters not found in vocab: {missing_chars}") # 抛出异常,提示哪些字符不在词汇表中can_merge = Truewhile can_merge and len(token_ids) > 1: # 当can_merge为False或token_ids长度小于1时表明不可合并,不会执行can_merge = Falsenew_tokens = []i = 0while i < len(token_ids) - 1:pair = (token_ids[i], token_ids[i + 1]) # 获取当前token_ids中的相邻两个元素if pair in self.bpe_merges: # 如果pair在BPE合并规则中merged_token_id = self.bpe_merges[pair] # 获取合并后的token_idnew_tokens.append(merged_token_id) # 将合并后的token_id添加到new_tokens中# Uncomment for educational purposes:# print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")i += 2 # Skip the next token as it's mergedcan_merge = Trueelse: # 如果pair不在BPE合并规则中,则将当前token添加到new_tokens中new_tokens.append(token_ids[i])i += 1if i < len(token_ids):new_tokens.append(token_ids[i])token_ids = new_tokens # 更新token_ids,继续下一轮合并return token_idsdef decode(self, token_ids: List[int]) -> str:"""Decode a list of token IDs back into a string.Args:token_ids (List[int]): The list of token IDs to decode.Returns:str: The decoded string."""decoded_string = ""for i, token_id in enumerate(token_ids):if token_id not in self.vocab:raise ValueError(f"Token ID {token_id} not found in vocab.")token = self.vocab[token_id]if token == "\n":if decoded_string and not decoded_string.endswith(" "):decoded_string += " " # Add space if not present before a newlinedecoded_string += tokenelif token.startswith("Ġ"):decoded_string += " " + token[1:]else:decoded_string += tokenreturn decoded_stringdef save_vocab_and_merges(self, vocab_path, bpe_merges_path):"""Save the vocabulary and BPE merges to JSON files.Args:vocab_path (str): Path to save the vocabulary.bpe_merges_path (str): Path to save the BPE merges."""# Save vocabularywith open(vocab_path, "w", encoding="utf-8") as file:json.dump(self.vocab, file, ensure_ascii=False, indent=2)# Save BPE merges as a list of dictionarieswith open(bpe_merges_path, "w", encoding="utf-8") as file:merges_list = [{"pair": list(pair), "new_id": new_id}for pair, new_id in self.bpe_merges.items()]json.dump(merges_list, file, ensure_ascii=False, indent=2)def load_vocab_and_merges(self, vocab_path, bpe_merges_path):"""Load the vocabulary and BPE merges from JSON files.Args:vocab_path (str): Path to the vocabulary file.bpe_merges_path (str): Path to the BPE merges file."""# Load vocabularywith open(vocab_path, "r", encoding="utf-8") as file:loaded_vocab = json.load(file)self.vocab = {int(k): v for k, v in loaded_vocab.items()}self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}# Load BPE mergeswith open(bpe_merges_path, "r", encoding="utf-8") as file:merges_list = json.load(file)for merge in merges_list:pair = tuple(merge["pair"])new_id = merge["new_id"]self.bpe_merges[pair] = new_id@lru_cache(maxsize=None)def get_special_token_id(self, token):return self.inverse_vocab.get(token, None)@staticmethoddef find_freq_pair(token_ids: List[int], mode: str = "most") -> Union[Tuple[int, int], None]: # 基于mode可以找到最频繁或最不频繁的pair# zip(token_ids, token_ids[1:])先创建一个迭代器,生成相邻标记对的元组;假设token_ids = [1, 2, 3, 4],会生成 [(1, 2), (2, 3), (3, 4)]# Counter对上述生成的迭代器计数,就是计算每个元素出现的次数,返回一个字典,key为标记对,value为出现的次数pairs = Counter(zip(token_ids, token_ids[1:]))if not pairs:return Noneif mode == "most":return max(pairs.items(), key=lambda x: x[1])[0] # 返回出现次数最多的pairelif mode == "least":return min(pairs.items(), key=lambda x: x[1])[0] # 返回出现次数最少的pairelse:raise ValueError("Invalid mode. Choose 'most' or 'least'.")@staticmethoddef replace_pair(token_ids: List[int], pair_id: Tuple[int, int], new_id: int) -> List[int]:dq = deque(token_ids) # 便于高效地从左侧移除元素replaced = [] # 记录替换后的token_idswhile dq:current = dq.popleft() # 先从左侧取出第一个元素if dq and (current, dq[0]) == pair_id: # 如果当前元素和下一个元素组成的pair和传入的pair_id相同replaced.append(new_id) # 将new_id添加到replaced中# Remove the 2nd token of the pair, 1st was already removeddq.popleft() # 因为已经将dq中第一个元素和current合并,所以也要将其移除else:replaced.append(current) # 如果当前元素和下一个元素组成的pair和传入的pair_id不同,则将当前元素添加到replaced中return replaced