当前位置: 首页 > news >正文

文本编码--BPE

文章目录

  • 为什么要文本编码
  • 字节对编码/BPE
    • 原理说明
    • 训练过程举例
    • 简单自定义实现

为什么要文本编码

在机器学习或深度学习中,不管是何种模型,底层运算都依赖于数值。在LLMs中,需要将文本字符串转换为可以计算的数值型数据。在分类任务中,会为每个类别分配一个数值型的lable,LLMs中文本的转换其实也是如此;以“字符单位”举例,从左至右预测下一个字符本质就是在预测一个类别标签,每预测一个标签就能周到对应的字符,即实现了文本字符串与数值型数据的转换。

字符串转换的前提是需要搞清楚“具体应该将哪些部分看作是一个独立个体,即基础单位是什么”。对于中文文本,一般是将一个汉字视为基础单位,也可能会将高频的多字短语视为基础单位;对于英文文本,简单的可以以单词为基础单位,也可以以字母为基础单位。

确定好基础单位定义后就可以将连续的字符串离散为“基础单位”组成的序列,为每个独立的基础单位分配对应的、不重复的数值编码,就能将文本编码为数值编码序列,进而用于后续的计算操作。

字节对编码/BPE

早期的LLMs均基于英文语料训练,对于字符级建模,词表很小,只需要包含26个英文字母和逗号、句号等附后即可,可能只有几百个字符,但是字符级建模会导致分析后文本序列过长,如“internationalization” 这个单词就需要20多个token表示,进而使得训练和推理开销极大。若进行词级建模,为了能覆盖所有单词,又会导致词表爆炸,因为光不同的英文单词可能就包含几十万个,词表过大会导致token的词向量维度过多,也会导致训练和推理开销大。

Byte Pair Encode,即BPE介于字符级建模和词级建模两者之间,通过统计高频的字节/字符对,合并称子词单元,保留高频整词,低频走子词/字符拆分,从而兼顾覆盖与序列长度。字节级BPE可以处理任何UTF-8文本,甚至是表情符号、阿拉伯文等;对于词表中不存在的新词,BPE可自适应地“拼出来”,不需要使用""等表示未知的token;对于代码、数学公式、URL等非自然语言文本也很友好。

对于中文,BPE编码的分词结果接近“字级建模”,因为大部分单个汉字频率高、信息量足,不需要再拆分成更小的字节,分词结果往往是一个汉字等价一个token;此外高频的多字词会被BPE合并,比如中国、人工智能等高频组合可能会合并为一个token。

原理说明

前文说到,需要给每个“基础”单位分配一个独一无二的数值编码,其实计算机中的字符编码本质就相当于一种“字符级建模+字级建模”的编码模式。计算机底层只能识别二进制数,但这不是人类可以识别的自然语言,在底层运行和前端展示之间存在一个编码、解码的过程,本质就是依赖人为设置的对应关系进行转换。

在计算机原理中,一个二进制位称为一个bit,8个bit称为一个字节/byte,最常见的编码ASCII编码,其定义了以英文字母为主的常见字符及对应的二进制数,使用一个字节表示一个字符;在UTF-8中用三个字节表示一个汉字,用四个字节表示一个emoji表情。UTF-8定义的有效字符数量大约有15万左右,一个字符不管由几个字节表示,其本质都和一个独一无二的二进制数值对应。

BPE算法最早由1994年的论文
提出,在该论文中是为了进行数据压缩。后续学者基于该论文思想将其引用到NLP领域,最初是为解决文本中出现的未知词,将文本分解到字符层面,英文通常是26个字母+标点符号,中文则是单个汉字,不断统计相邻字符对的频率,把出现最频繁的字符对合并为新的子词单元;字符级别BPE的优势是,可通过已有单词和子词组合表达一些OOV的未知词,但对于训练数据中未出现的如emoji表情、冷门Unicode符号等特殊字符还是不能表征,还是需要使用等特殊字符表示。

OpenAI为了解决上述问题从GPT-2提出了字节级别的BPE,即将文本转换为字节序列,然后从字节层面统计字节对,把出现最频繁的字节对合并为新的独立单元,因为所有Unicode符号均可以被拆分成字节序列,所以不再需要使用等特殊字符表示OOV的未知词。当训练数据足够多时,在英文语料上训练的字节级BPE和字符级BPE区别不大,因为对应常用的单词最终肯定会被组合出来,而不是全是字节级的符号。在中文等其他语言语料上训练的字节级BPE,子词的边界可能有些奇怪,但也能稳定工作,因为在UTF-8中一个字符由三个字节表示,可能前两字节组成了独立单元,进而导致一个中文需要由两个词表的子词组合表示。字节级的BPE完全覆盖Unicode,能处理任何输入,适合多语言场景。

1个字节有8个bits,即28=2562^8=25628=256种可能的值,即一个字节可表示256种不同值,BPE分词器通常使用这256值作为其256个单字符标记,通过以下代码可查看gpt2分词器前300项的具体情况,可以看到第256、257项不是单字符值,而是双字符值(一个空格+一个字母),第0到第255这256项是单字符值。

import tiktoken
gpt2_tokenizer = tiktoken.get_encoding("gpt2")for i in range(300):decoded = gpt2_tokenizer.decode([i])print(f"{i}: {decoded}")# 输入如下:
"""
prints:
0: !
1: "
2: #
...
255: �  # <---- single character tokens up to here
256:  t
257:  a
...
298: ent
299:  n
"""

BPE的目标是构建一个包含常见子词的词汇表,词汇表记录基于语聊训练得到的子词符号与数值的对应关系,此处的数值就是正常的十进制数。BPE训练过程可分解为以下步骤:

  • 识别最频繁的对:每次迭代种扫描文本,找到出现次数最多的字节对或字符对
  • 替换记录:
    • 用一个新的数值或占位符ID替换频繁对
    • 在查找表中记录这个映射;查找表就是词汇表,其大小是一个超参数,GPT-2种是50257
  • 重复直到没有收益:
    • 不断重复上面两个步骤,持续合并最频繁的对
    • 直到不能继续压缩或词汇表满了为止

训练过程举例

假设有训练数据"the cat in the hat",要从中为BPE分词器构建词汇表,按上述先初始化了256个单字符,进行以下训练迭代步骤:

  • 迭代一
    • 识别频繁对:当前文本,"th"出现了两次
    • 替换并记录:
      • 用尚未使用的新标记ID–256替换"th"
      • 新文本转换为:“<256>e cat in <256>e hat”
      • 新词汇表为
        0: …

        256: “th”
  • 迭代二
    • 识别频繁对:在文本"<256>e cat in <256>e hat"中,"<256>e"出现了两次
    • 替换并记录:
      • 用尚未使用的新标记ID–257替换"<256>e"
      • 新文本转换为:“<257> cat in <257> hat”
      • 新词汇表为
        0: …

        256: “th”
        257: “<256>e”
  • 迭代三
    • 识别频繁对:在文本"<257> cat in <257> hat"中,"<257> "出现了两次
    • 替换并记录:
      • 用尚未使用的新标记ID–258替换"<257> "
      • 新文本转换为:“<258>cat in <258>hat”
      • 新词汇表为
        0: …

        256: “th”
        257: “<256>e”
        258: "<257> "
  • 依次迭代

简单自定义实现

以下是开源项目LLMs-from-scratch给出的一个用于教育目的的BPE实现,实现逻辑拆分如下:

  1. 将输入文本拆分为单个字节
  2. 重复查找并合并、替换字节对,继续合并关系
  3. 重复步骤2,直到无法构建更多的合并
  4. 最终构建有效词表,基于其可进行编解码
from collections import Counter, deque
from typing import Union, Tuple, List
from functools import lru_cache
import jsonclass BPETokenizerSimple:def __init__(self):# Maps token_id to token_str (e.g., {11246: "some"})self.vocab = {}# Maps token_str to token_id (e.g., {"some": 11246})self.inverse_vocab = {}# Dictionary of BPE merges: {(token_id1, token_id2): merged_token_id}self.bpe_merges = {}  # 字典,key是元组,即合并的两个tokend_id,value是合并后的一个token_iddef train(self, text: str, vocab_size: int, allowed_special: set = {"<|endoftext|>"}) -> None:"""Train the BPE tokenizer from scratch.Args:text (str): The training text.vocab_size (int): The desired vocabulary size.allowed_special (set): A set of special tokens to include."""# Preprocess: Replace spaces with 'Ġ'# Note that Ġ is a particularity of the GPT-2 BPE implementation# E.g., "Hello world" might be tokenized as ["Hello", "Ġworld"]# (GPT-4 BPE would tokenize it as ["Hello", " world"])processed_text = []for i, char in enumerate(text):  # 将文本中的空格替换为Ġif char == " " and i != 0:processed_text.append("Ġ")if char != " ":processed_text.append(char)processed_text = "".join(processed_text)# Initialize vocab with unique characters, including 'Ġ' if present# Start with the first 256 ASCII charactersunique_chars = [chr(i) for i in range(256)]# Extend unique_chars with characters from processed_text that are not already includedunique_chars.extend(char for char in sorted(set(processed_text))if char not in unique_chars)# Optionally, ensure 'Ġ' is included if it is relevant to your text processingif "Ġ" not in unique_chars:unique_chars.append("Ġ")# Now create the vocab and inverse vocab dictionariesself.vocab = {i: char for i, char in enumerate(unique_chars)}self.inverse_vocab = {char: i for i, char in self.vocab.items()}# Add allowed special tokensif allowed_special:  # 添加允许的特殊tokenfor token in allowed_special:if token not in self.inverse_vocab:new_id = len(self.vocab)self.vocab[new_id] = tokenself.inverse_vocab[token] = new_id# Tokenize the processed_text into token IDs;将processed_text以字符为单位转换为token_idtoken_ids = [self.inverse_vocab[char] for char in processed_text]# BPE steps 1-3: Repeatedly find and replace frequent pairsfor new_id in range(len(self.vocab), vocab_size):  # 从len(self.vocab)开始,直到设置的vocab_sizepair_id = self.find_freq_pair(token_ids, mode="most")  # 从目前的token_ids中找到最频繁的pairif pair_id is None:  # No more pairs to merge. Stopping training.break# 当前遍历的new_id就是上述找到的最频繁pair的新的token_id,用这个new_id替换token_ids中所有的pair_idtoken_ids = self.replace_pair(token_ids, pair_id, new_id)self.bpe_merges[pair_id] = new_id  # 记录pair_id合并过程,用于后续解码# Build the vocabulary with merged tokensfor (p0, p1), new_id in self.bpe_merges.items():merged_token = self.vocab[p0] + self.vocab[p1]self.vocab[new_id] = merged_tokenself.inverse_vocab[merged_token] = new_iddef load_vocab_and_merges_from_openai(self, vocab_path, bpe_merges_path):"""Load pre-trained vocabulary and BPE merges from OpenAI's GPT-2 files.Args:vocab_path (str): Path to the vocab file (GPT-2 calls it 'encoder.json').bpe_merges_path (str): Path to the bpe_merges file  (GPT-2 calls it 'vocab.bpe')."""# Load vocabularywith open(vocab_path, "r", encoding="utf-8") as file:loaded_vocab = json.load(file)# Convert loaded vocabulary to correct formatself.vocab = {int(v): k for k, v in loaded_vocab.items()}self.inverse_vocab = {k: int(v) for k, v in loaded_vocab.items()}# Handle newline character without adding a new token 处理换行符if "\n" not in self.inverse_vocab:# Use an existing token ID as a placeholder for '\n'# Preferentially use "<|endoftext|>" if available# 尝试使用现有的特殊 token 作为换行符的占位符,优先使用"<|endoftext|>",如果不可用,则使用"Ġ",如果也不可用,则使用""fallback_token = next((token for token in ["<|endoftext|>", "Ġ", ""] if token in self.inverse_vocab), None)if fallback_token is not None:newline_token_id = self.inverse_vocab[fallback_token]else:# If no fallback token is available, raise an errorraise KeyError("No suitable token found in vocabulary to map '\\n'.")self.inverse_vocab["\n"] = newline_token_idself.vocab[newline_token_id] = "\n"# Load BPE merges  加载BPE合并规则with open(bpe_merges_path, "r", encoding="utf-8") as file:lines = file.readlines()# Skip header line if presentif lines and lines[0].startswith("#"):  # 跳过可能存在的注释行lines = lines[1:]for line in lines:pair = tuple(line.strip().split())  # 将每行按空格分割成一个元组    if len(pair) == 2:  # 确保每行只有两个元素token1, token2 = pair  # 将元组中的两个元素分别赋值给token1和token2if token1 in self.inverse_vocab and token2 in self.inverse_vocab:  # 确保token1和token2都在词汇表中token_id1 = self.inverse_vocab[token1]token_id2 = self.inverse_vocab[token2]merged_token = token1 + token2if merged_token in self.inverse_vocab:  # 确保合并后的token也在词汇表中merged_token_id = self.inverse_vocab[merged_token]self.bpe_merges[(token_id1, token_id2)] = merged_token_id# print(f"Loaded merge: '{token1}' + '{token2}' -> '{merged_token}' (ID: {merged_token_id})")else:print(f"Merged token '{merged_token}' not found in vocab. Skipping.")else:print(f"Skipping pair {pair} as one of the tokens is not in the vocabulary.")def encode(self, text: str, allowed_special: set = None) -> List[int]:"""Encode the input text into a list of token IDs, with tiktoken-style handling of special tokens.Args:text (str): The text to encode.allowed_special (set or None): Special tokens to allow passthrough. If None, special handling is disabled.Returns:List[int]: The list of token IDs."""import retoken_ids = []# If special token handling is enabledif allowed_special is not None and len(allowed_special) > 0:# Build regex to match allowed special tokensspecial_pattern = ("(" + "|".join(re.escape(tok) for tok in sorted(allowed_special, key=len, reverse=True)) + ")")last_index = 0for match in re.finditer(special_pattern, text):prefix = text[last_index:match.start()]token_ids.extend(self.encode(prefix, allowed_special=None))  # Encode prefix without special handlingspecial_token = match.group(0)if special_token in self.inverse_vocab:token_ids.append(self.inverse_vocab[special_token])else:raise ValueError(f"Special token {special_token} not found in vocabulary.")last_index = match.end()text = text[last_index:]  # Remaining part to process normally# Check if any disallowed special tokens are in the remainderdisallowed = [tok for tok in self.inverse_vocabif tok.startswith("<|") and tok.endswith("|>") and tok in text and tok not in allowed_special]if disallowed:raise ValueError(f"Disallowed special tokens encountered in text: {disallowed}")# If no special tokens, or remaining text after special token split:tokens = []# First split on newlines to preserve themlines = text.split("\n")for i, line in enumerate(lines):if i > 0:tokens.append("\n")words = line.split()for j, word in enumerate(words):if j == 0 and i > 0:tokens.append("Ġ" + word)elif j == 0:tokens.append(word)else:tokens.append("Ġ" + word)for token in tokens:if token in self.inverse_vocab:  # 如果token在词汇表中,则直接添加到token_ids中# token is contained in the vocabulary as istoken_ids.append(self.inverse_vocab[token])else:  # 如果token不在词汇表中,则需要通过BPE进行处理# Attempt to handle subword tokenization via BPEsub_token_ids = self.tokenize_with_bpe(token)  # 将token进行BPE处理token_ids.extend(sub_token_ids)  # 将处理后的token_ids添加到token_ids中return token_idsdef tokenize_with_bpe(self, token: str) -> List[int]:"""Tokenize a single token using BPE merges.Args:token (str): The token to tokenize.Returns:List[int]: The list of token IDs after applying BPE."""# Tokenize the token into individual characters (as initial token IDs)token_ids = [self.inverse_vocab.get(char, None) for char in token]if None in token_ids:  # 如果token_ids中存在None,则说明token中存在词汇表中不存在的字符missing_chars = [char for char, tid in zip(token, token_ids) if tid is None]  # 找出token中哪些字符不在词汇表中raise ValueError(f"Characters not found in vocab: {missing_chars}")  # 抛出异常,提示哪些字符不在词汇表中can_merge = Truewhile can_merge and len(token_ids) > 1:  # 当can_merge为False或token_ids长度小于1时表明不可合并,不会执行can_merge = Falsenew_tokens = []i = 0while i < len(token_ids) - 1:pair = (token_ids[i], token_ids[i + 1])  # 获取当前token_ids中的相邻两个元素if pair in self.bpe_merges:  # 如果pair在BPE合并规则中merged_token_id = self.bpe_merges[pair]  # 获取合并后的token_idnew_tokens.append(merged_token_id)  # 将合并后的token_id添加到new_tokens中# Uncomment for educational purposes:# print(f"Merged pair {pair} -> {merged_token_id} ('{self.vocab[merged_token_id]}')")i += 2  # Skip the next token as it's mergedcan_merge = Trueelse:  # 如果pair不在BPE合并规则中,则将当前token添加到new_tokens中new_tokens.append(token_ids[i])i += 1if i < len(token_ids):new_tokens.append(token_ids[i])token_ids = new_tokens  # 更新token_ids,继续下一轮合并return token_idsdef decode(self, token_ids: List[int]) -> str:"""Decode a list of token IDs back into a string.Args:token_ids (List[int]): The list of token IDs to decode.Returns:str: The decoded string."""decoded_string = ""for i, token_id in enumerate(token_ids):if token_id not in self.vocab:raise ValueError(f"Token ID {token_id} not found in vocab.")token = self.vocab[token_id]if token == "\n":if decoded_string and not decoded_string.endswith(" "):decoded_string += " "  # Add space if not present before a newlinedecoded_string += tokenelif token.startswith("Ġ"):decoded_string += " " + token[1:]else:decoded_string += tokenreturn decoded_stringdef save_vocab_and_merges(self, vocab_path, bpe_merges_path):"""Save the vocabulary and BPE merges to JSON files.Args:vocab_path (str): Path to save the vocabulary.bpe_merges_path (str): Path to save the BPE merges."""# Save vocabularywith open(vocab_path, "w", encoding="utf-8") as file:json.dump(self.vocab, file, ensure_ascii=False, indent=2)# Save BPE merges as a list of dictionarieswith open(bpe_merges_path, "w", encoding="utf-8") as file:merges_list = [{"pair": list(pair), "new_id": new_id}for pair, new_id in self.bpe_merges.items()]json.dump(merges_list, file, ensure_ascii=False, indent=2)def load_vocab_and_merges(self, vocab_path, bpe_merges_path):"""Load the vocabulary and BPE merges from JSON files.Args:vocab_path (str): Path to the vocabulary file.bpe_merges_path (str): Path to the BPE merges file."""# Load vocabularywith open(vocab_path, "r", encoding="utf-8") as file:loaded_vocab = json.load(file)self.vocab = {int(k): v for k, v in loaded_vocab.items()}self.inverse_vocab = {v: int(k) for k, v in loaded_vocab.items()}# Load BPE mergeswith open(bpe_merges_path, "r", encoding="utf-8") as file:merges_list = json.load(file)for merge in merges_list:pair = tuple(merge["pair"])new_id = merge["new_id"]self.bpe_merges[pair] = new_id@lru_cache(maxsize=None)def get_special_token_id(self, token):return self.inverse_vocab.get(token, None)@staticmethoddef find_freq_pair(token_ids: List[int], mode: str = "most") -> Union[Tuple[int, int], None]:  # 基于mode可以找到最频繁或最不频繁的pair# zip(token_ids, token_ids[1:])先创建一个迭代器,生成相邻标记对的元组;假设token_ids = [1, 2, 3, 4],会生成 [(1, 2), (2, 3), (3, 4)]# Counter对上述生成的迭代器计数,就是计算每个元素出现的次数,返回一个字典,key为标记对,value为出现的次数pairs = Counter(zip(token_ids, token_ids[1:]))if not pairs:return Noneif mode == "most":return max(pairs.items(), key=lambda x: x[1])[0]  # 返回出现次数最多的pairelif mode == "least":return min(pairs.items(), key=lambda x: x[1])[0]  # 返回出现次数最少的pairelse:raise ValueError("Invalid mode. Choose 'most' or 'least'.")@staticmethoddef replace_pair(token_ids: List[int], pair_id: Tuple[int, int], new_id: int) -> List[int]:dq = deque(token_ids)  # 便于高效地从左侧移除元素replaced = []  # 记录替换后的token_idswhile dq:current = dq.popleft()  # 先从左侧取出第一个元素if dq and (current, dq[0]) == pair_id:  # 如果当前元素和下一个元素组成的pair和传入的pair_id相同replaced.append(new_id)  # 将new_id添加到replaced中# Remove the 2nd token of the pair, 1st was already removeddq.popleft()  # 因为已经将dq中第一个元素和current合并,所以也要将其移除else:replaced.append(current)  # 如果当前元素和下一个元素组成的pair和传入的pair_id不同,则将当前元素添加到replaced中return replaced
http://www.dtcms.com/a/415276.html

相关文章:

  • 信息安全仿真环境十一
  • 淡水网站建设跨境电商运营基础知识
  • 山西省建设厅官网站小公司使用的网站开发
  • 红宝书 基础词回忆
  • 【靶场练习】--DVWA第三关CSRF(跨站请求伪造)全难度分析
  • 商城网站建设正规公司光辉网站建设公司
  • Linux读者写者问题与读写锁
  • Kurt-Blender零基础教程:第3章:材质篇——第2节:凹凸感和置换形变;混合材质节点和NodeWrangler的五大用法;简单的UV纹理绘制
  • 潍坊高密网站建设wordpress myisam
  • 南充能够建设网站的公司有网站制作专家
  • @Import 导入bean对象
  • JavaScript 介绍
  • AiNiee - AI 翻译工具
  • 【Qt6项目转Qt5项目的一些API设置】
  • 音乐网站开发环境描述要建设一个网站需要准备些什么
  • display ip interface brief 概念及题目
  • asp网站整站下载器网站建设入什么科目
  • 网站建设国内排行如何做网站 知乎
  • 网站关于 模板三亚网站优化
  • Nginx部署vue以及转发配置记录
  • Elasticsearch - 分布式搜索与分析引擎
  • 网站开发者模式下载视频设计网站做多大合适
  • wordpress建企业商城南宁网站的优化
  • 通才机器人策略中的捷径学习:数据集多样性和碎片化的作用
  • 【轮播图】HTML+CSS+JavaScript实现轮播图
  • Low-Overhead Sensing RS Design for Integrated Sensing and Communication (ISAC)
  • 如何快速收录一个网站的信息网页设计与制作作业成品免费
  • MyEclipse在高分辨率显示屏上图标显示太小的解决方案
  • 网站 多语言处理wordpress搜索表单
  • Python 2025:物联网与边缘计算的智能融合新纪元