大模型 Transformer模型(上)
目录
一、Dataprocess 数据处理
二、Position 位置编码
三、Mask 掩码
四、MHA 多头注意力机制
五、FFN 前馈神经网络
这篇主要讲一下Transformer 框架搭建流程,苯人总结了一套:
Dataprocess 数据处理 --》Position位置编码 --》Mask掩码 --》MHA多头注意力机制 --》FFN前馈神经网络 --》Encoder编码器 --》Decoder解码器 --》Transformer 模型构建 --》模型训练 --》模型预测
一、Dataprocess 数据处理
数据处理的基本流程就是:拿到文本序列(即任务设定,比如德英翻译)--》定义特殊符号 --》构建词表 --》将文本编码为整数序列 --》自定义数据集以及数据加载器 --》代码调试
代码如下:
import torch
import torch.utils.data as Data#定义特殊符号
# S: decoding input 的开始符
# E: decoding output 的结束符
# P: padding的占位符# 法语译英文
#第一列是编码器输入的原始文本序列,第二列是解码器的输入(S开头的译文),第三列是解码器的输出(E结尾的译文)
sentences = [# enc_input dec_input dec_output['ich mochte ein bier P', 'S i want a beer .', 'i want a beer . E'],['ich mochte ein cola P', 'S i want a coke .', 'i want a coke . E']
]# 构建词汇表
#原始输入文本序列的词汇表
src_vocab = {'P' : 0, 'ich' : 1, 'mochte' : 2, 'ein' : 3, 'bier' : 4, 'cola' : 5}
src_vocab_size = len(src_vocab)
src_idx2word = {i: w for i, w in enumerate(src_vocab)}#译文的词汇表
tgt_vocab = {'P' : 0, 'i' : 1, 'want' : 2, 'a' : 3, 'beer' : 4, 'coke' : 5, 'S' : 6, 'E' : 7, '.' : 8}
idx2word = {i: w for i, w in enumerate(tgt_vocab)}
tgt_vocab_size = len(tgt_vocab)src_len = 5 # enc_input最大序列长度(源序列长度)
tgt_len = 6 # dec_input(等于dec_output)最大序列长度(目标序列长度)# Transformer 参数
d_model = 512 # 嵌入维度大小
d_ff = 2048 # 前馈网络的维度大小
d_k = d_v = 64 # K(等于Q)和V的维度大小
n_layers = 6 # 编码器和解码器层的数量
n_heads = 8 # 多头注意力机制中的头数# 将传入编码器的文本转为数字序列
def make_data(sentences):enc_inputs, dec_inputs, dec_outputs = [], [], []# 二维列表所以套两个循环for i in range(len(sentences)):enc_input = [src_vocab[n] for n in sentences[i][0].split()]dec_input = [tgt_vocab[n] for n in sentences[i][1].split()]dec_output = [tgt_vocab[n] for n in sentences[i][2].split()]'''首先len(sentences)为2,range(len(sentences))就是(0,2),所以i取0和1,分别表示sentences的第一行和第二行然后比如sentences[i][0],当i=0时就取的是sentences的第0行第0列,也就是'ich mochte ein bier P'这句,用split按空格分隔后再用 for n 来取到每个词最后 src_vocab[n]拿到每个词对应的索引,加入到创建的空列表,比如此时 enc_input = [1, 2, 3, 4, 0]'''enc_inputs.append(enc_input)dec_inputs.append(dec_input)dec_outputs.append(dec_output)#最后转为张量return torch.LongTensor(enc_inputs), torch.LongTensor(dec_inputs), torch.LongTensor(dec_outputs)enc_inputs, dec_inputs, dec_outputs = make_data(sentences) #保证了三种数据的形状一致
# print(enc_inputs)
# print(enc_inputs.shape)# 定义自定义数据集类 MyDataSet
#这里注意我们的数据集结构是:一条样本 = enc_input + dec_input + dec_output 这三部分组成的一套输入输出组
class MyDataSet(Data.Dataset):#1、定义好数据放哪def __init__(self, enc_inputs, dec_inputs, dec_outputs):super(MyDataSet, self).__init__() # 调用父类的初始化方法#把三块输入保存下来,类似于装进一个大书包self.enc_inputs = enc_inputs # 初始化编码器输入数据self.dec_inputs = dec_inputs # 初始化解码器输入数据self.dec_outputs = dec_outputs # 初始化解码器输出数据# 2、告诉一共有多少条数据def __len__(self):return self.enc_inputs.shape[0] # 返回数据集样本数量,shape[0]表示有多少行,即多少条样本#这里只返回 enc_inputs 的shape[0]是因为之前已经确定了三种数据的形状都是一致的,所以只用返回一组# 3、告诉怎么拿到某一条数据def __getitem__(self, idx):return self.enc_inputs[idx], self.dec_inputs[idx], self.dec_outputs[idx] # 获取指定索引处的样本数据#比如 idx = 0,则第一条数据集中的样本就是:[1, 2, 3, 4, 0],[6, 1, 2, 3, 4, 8],[1, 2, 3, 4, 8, 7]# 创建 DataLoader 对象 loader,用于批量加载数据
loader = Data.DataLoader(MyDataSet(enc_inputs, dec_inputs, dec_outputs), # 自定义数据集对象作为数据源batch_size=2, # 每个批次的样本数量shuffle=True # 是否打乱数据集顺序,True 表示打乱
)# 代码调试
if __name__ == '__main__':for i, (src_seq, tgt_in_seq, tgt_out_seq) in enumerate(loader):print(src_seq)print(tgt_in_seq)print(tgt_out_seq)break
相关介绍都写在注释里了
运行结果:
二、Position 位置编码
位置编码的代码其实是根据公式来的:
下面是一段模版代码:
# 定义一个位置编码类
import mathimport torch
from torch import nnclass PositionalEncoding(nn.Module):def __init__(self, d_model, dropout=0.1, max_len=5000):""":param d_model: 词向量的维度:param dropout: 丢弃比例:param max_len: 预定义一个最大序列长度"""super(PositionalEncoding, self).__init__()self.dropout = nn.Dropout(p=dropout) # 定义一个 Dropout 层,用于随机丢弃部分数据,防止过拟合# 初始化位置编码矩阵 pe,用来保存每个位置的编码向量pe = torch.zeros(max_len, d_model)# # shape: (50, 1) * (256,) → (50, 256)position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1) # 生成位置的下标,shape: (max_len, 1),升维是为了后面好广播div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model)) # 计算分母项pe[:, 0::2] = torch.sin(position * div_term) # 偶数位置使用 sin 函数编码位置信息pe[:, 1::2] = torch.cos(position * div_term) # 奇数位置使用 cos 函数编码位置信息pe = pe.unsqueeze(0) # 转置并增加一个维度,shape: ( 1, max_len,d_model)self.register_buffer('pe', pe) # 将位置编码矩阵 pe 注册为模型的缓冲区,不算模型的参数,但希望它随着模型保存、加载def forward(self, x):'''x: [batch_size, seq_len, d_model]'''x = x + self.pe[:,:x.size(1), :] # 将输入张量 x 与位置编码矩阵 pe 相加,根据输入序列长度截取对应位置编码return self.dropout(x) # 对相加后的张量进行 Dropout 操作并返回# 测试代码
if __name__ == '__main__':pe = PositionalEncoding(d_model=512)embed = torch.randn(5, 10, 512)out = pe(embed)print(out)print(out.shape) #torch.Size([5, 10, 512])
这里我就直接用老师发的模版代码了 (๑•̀ㅂ•́)و✧
三、Mask 掩码
掩码的两个作用:一个是为了掩盖之前为了固定长度而填充的0(填充掩码),一个是为了掩盖未来的信息(未来掩码),这里详细解释一下未来掩码的原理:
在利用公式计算出注意力分数后(经过softmax激活函数之前)模型会应用一个上三角矩阵,这个上三角矩阵的值通常为负无穷,与原注意力分数相加再经过激活函数后会变为0,下一步在与 V 相乘后得到的实际值仍为0,即不会保留这部分的信息,过程大概如下:
这里解释一下为什么是上三角矩阵呢,是因为这里把“未来”定义成“当前位置右边的 token”,所以就遮住的是上三角部分。
代码如下:
"""构建所需要的掩码每个attention都需要mask,只是不同部分所需要的Mask不同
"""
import numpy as np
import torchdef att_pad_mask(seq_k, seq_q):"""填充掩码特别情况:交叉注意力机制:param seq_k: [b,n1] tensor([[1, 2, 3, 4, 0],[1, 2, 3, 5, 0]]):param seq_q: [b,n2]:return:数据格式【b,n2,n1]"""batch_size, len_q = seq_q.size()# 跟sk里面每个数据比较 为0返回Truemask = seq_k.eq(0).unsqueeze(1) # 形状由 [b,n1] -- > [b,1,n1]mask = mask.repeat(1, len_q, 1) # 【b,n2,n1]return mask #形状 [batch_size, len_q, len_k] 的布尔矩阵def att_sub_mask(seq):"""未来掩码:param seq: [b,n]:return: [b,n,n]"""att_shape = [seq.size(0), seq.size(1), seq.size(1)] #构造形状为[batch_size, n, n] 的 masksub_mask = np.triu(np.ones(att_shape), k=1) #构建一个上三角矩阵sub_mask = torch.from_numpy(sub_mask).byte() #从 NumPy 转换为 PyTorchreturn sub_mask #输出形状为[batch_size, n, n],未来位置是 1(True),当前和过去是 0(False)# 测试数据
if __name__ == '__main__':import torchseq_q = torch.tensor([[1, 2, 3, 0], [1, 2, 3, 0]])seq_k = torch.tensor([[1, 2, 3, 0], [1, 2, 3, 0]])print(att_pad_mask(seq_q, seq_k))re1 = att_pad_mask(seq_q, seq_k) #填充掩码print(att_sub_mask(seq_q))re2 = att_sub_mask(seq_k) #未来掩码# 合并掩码,如果某位置为0,表示“既不是pad,也不是未来”,是可以看的;否则是要mask掉的# gt的作用是只要不是0,就返回True,即为要遮住的地方mask_self = torch.gt((re1 + re2), 0)print(mask_self)
运行结果这里就不贴了,可自行运行
四、MHA 多头注意力机制
多头注意力机制(Multi-Head Attention)是 Transformer 模型中的核心结构,它的设计灵感来自人类“关注多个事物不同方面”的能力,简单来说,它的本质是:在同一个输入上,设置多个“注意力头”来并行地捕捉不同的语义特征或关系。每一个注意力头其实就是一个缩小版的“注意力机制”,它会独立地去计算输入序列中各个位置之间的依赖关系,当然最后会将每个头的结果合并起来然后返回。
实现代码如下:
'''封装注意力机制 :多头注意力结构+交叉输入 = 多头交叉注意力机制
'''
import mathimport torch
from torch import nn#单头注意力机制 Attention类,实现注意力得分的计算
class Attention(nn.Module):"""注意力分数计算公式:Q * K的转置 / 根号下dk"""def __init__(self,dropout=0.1):super().__init__()self.dropout = nn.Dropout(dropout)self.softmax = nn.Softmax(dim=-1)def forward(self,q,k,v,mask=None):# 按照公式算注意力机制scores = torch.matmul(q,k.transpose(-1,-2))/math.sqrt(k.size(-1))# 判断掩码if mask is not None:# 根据mask判断哪个值是true 然后乘以-1e9,一个超大负数scores = scores.masked_fill_(mask, -1e9)#注意力得分经过softmax激活函数att = self.softmax(scores)#拿到最终加权的实际值output = torch.matmul(att,v)return self.dropout(output)#多头注意力机制 MultiHeadAttention类,分成多个头,每个头都调用Attention类,最后合并结果
class MultiHeadAttention(nn.Module):def __init__(self,d_model,num_heads):super().__init__()self.d_model = d_modelself.num_heads = num_headsself.d_k = d_model//num_heads #每个头的维度#初始化权重矩阵self.Wq = nn.Linear(d_model,d_model)self.Wk = nn.Linear(d_model,d_model)self.Wv = nn.Linear(d_model,d_model)self.Wo = nn.Linear(d_model, d_model)self.attention = Attention()self.dropout = nn.Dropout(0.1)self.layer_norm = nn.LayerNorm(d_model)def forward(self,enc_inputs,dec_inputs,mask=None):res = dec_inputs# 交叉输入:这里要注意 Q是来自解码器,K、V来自编码器q = self.Wq(dec_inputs)k = self.Wk(enc_inputs)v = self.Wv(enc_inputs)# 多头的实现,实际就是拆分QKVQ = q.view(q.size(0),-1,self.num_heads,self.d_k).transpose(1,2)K = k.view(k.size(0),-1,self.num_heads,self.d_k).transpose(1,2)V = v.view(v.size(0),-1,self.num_heads,self.d_k).transpose(1,2)# 处理mask的维度:[b,n,n]--->【b,h,n,n】,确保每个头都能使用# repeat 是重复几次的方法if mask is not None:mask = mask.unsqueeze(1).repeat(1,self.num_heads,1,1)# 计算注意力 形状是:[batch, heads, seq_len, d_k]output = self.attention(Q,K,V,mask)# 多头处理#首先合并多个头,形状变回来:[batch, heads, seq_len, d_k] → [batch, seq_len, heads * d_k]output = output.transpose(1,2).contiguous().view(output.size(0),-1,self.d_model)output = self.Wo(output) #恢复为 d_model 的 shapeoutput = self.dropout(output)output = self.layer_norm( output+ res) #残差连接+层归一化return outputif __name__ == '__main__':# 测试# 上个代码生成的掩码mask = [[[False, True, True, True],[False, False, True, True],[False, False, False, True],[False, False, False, True]],[[False, True, True, True],[False, False, True, True],[False, False, False, True],[False, False, False, True]]]mask = torch.tensor(mask)mha = MultiHeadAttention(d_model=512,num_heads=8)#输入:[batch=2, seq_len=4, d_model=512]enc_inputs = torch.randn(2,4,512)dec_inputs = torch.randn(2,4,512)output = mha(enc_inputs,dec_inputs,mask)print(output.shape) #torch.Size([2, 4, 512])
可以看到最后的数据形状还是与输入一样,只是表达效果更强了
五、FFN 前馈神经网络
前馈神经网络(Feed-Forward Neural Network, FFN)是一种基础构建模块,像一个勤劳的“信息加工厂”,通过多层线性变换和非线性激活函数,将输入数据逐层加工成更有意义的表示,一般是像个三明治一样两个线性层中间夹一层激活函数,具体代码如下:
"""搭建FFN子层"""
import torch
from torch import nn#全连接前馈网络 像三明治一样两个线性层中间夹一层激活函数
class FFN(nn.Module):def __init__(self,d_model,d_ff):super(FFN, self).__init__()self.ffn = nn.Sequential(nn.Linear(d_model,d_ff), #把输入维度从 d_model(通常是512)升维到 d_ff(通常是2048)nn.ReLU(), #激活函数nn.Dropout(0.1),nn.Linear(d_ff,d_model) #把维度从 d_ff 降回 d_model)self.dropout = nn.Dropout(0.1)self.layer_norm = nn.LayerNorm(d_model) #层归一化def forward(self,x):res = x #保留原始输入x = self.ffn(x) #传入三明治ffn网络x = self.dropout(x)output = self.layer_norm(x+res) #x+res 形成残差结构,再送入层归一化return output# 测试数据
if __name__ == '__main__':batch_size = 2seq_len = 4d_model = 512d_ff = 2048ffn = FFN(d_model,d_ff)x = torch.randn(batch_size,seq_len,d_model)output = ffn(x) print(output.shape) #torch.Size([2, 4, 512])
同样输出形状不变
因为文本与时间原因暂时写一半,剩下的下篇继续 (๑•̀ㅂ•́)و✧
以上有问题可以指出。