【深度学习:理论篇】--一文理解Transformer
阅读摘要:
本文于2017年6月发布,属于Transformer模型的开山之作,地位不言而喻。Transformer是继于MLP、RNN、CNN模型的又一大模型,且解决了RNN应用于长输入乏力的情况,随后提出的BERT、GPT都是基于Transformer。本文主要基于机器翻译任务来讲述Transformer,近年的论文证明其在图像、文本、音频、视频都可广泛应用。
参考文献:
[1] Attention Is All You Need
论文链接:https://arxiv.org/abs/1706.03762
从整体角度上来说,编码器将输入序列映射到向量中,该向量保存该输入的所有学习信息。然后,解码器获取该连续向量,同时还被输入先前的输出序列,然后逐步生成单个输出。
从代码实现上来看,我们依次需要实现的模块有:
组件名称 | 功能描述 |
---|---|
嵌入层 Embedding | 将输入的单词或符号转换为固定大小的向量表示。 |
位置编码 Positional Encoding | 为嵌入层添加位置信息,使模型能够理解词序。 |
带缩放的点积注意力机制 Scaled Dot-Product Attention | 计算查询(Query)、键(Key)和值(Value)之间的注意力权重,并通过点积缩放实现自注意力。 |
多头注意力 Multi-Head Attention | 将输入分割成多个头,分别进行带缩放的点积注意力计算,然后将结果拼接起来。 |
分位置的前馈机制 Position-wise Feed-Forward | 对每个位置的向量进行相同的线性变换,然后通过激活函数处理。 |
序列掩码 Look-Ahead Mask | 在解码器中,防止未来位置的信息泄露,确保自回归特性。 |
解码器 Decoder | 由多个解码器块组成,用于生成输出序列。 |
Transformer是一种用于自然语言处理(NLP)和其他序列到序列(sequence-to-sequence)任务的深度学习模型架构,它在2017年由Vaswani等人首次提出。Transformer架构引入了自注意力机制(self-attention mechanism),这是一个关键的创新,使其在处理序列数据时表现出色。
以下是Transformer的一些重要组成部分和特点:
- 自注意力机制(Self-Attention):这是Transformer的核心概念之一,它使模型能够同时考虑输入序列中的所有位置,而不是像循环神经网络(RNN)或卷积神经网络(CNN)一样逐步处理。自注意力机制允许模型根据输入序列中的不同部分来赋予不同的注意权重,从而更好地捕捉语义关系。
- 多头注意力(Multi-Head Attention):Transformer中的自注意力机制被扩展为多个注意力头,每个头可以学习不同的注意权重,以更好地捕捉不同类型的关系。多头注意力允许模型并行处理不同的信息子空间。
- 堆叠层(Stacked Layers):Transformer通常由多个相同的编码器和解码器层堆叠而成。这些堆叠的层有助于模型学习复杂的特征表示和语义。
- 位置编码(Positional Encoding):由于Transformer没有内置的序列位置信息,它需要额外的位置编码来表达输入序列中单词的位置顺序。
- 残差连接和层归一化(Residual Connections and Layer Normalization):这些技术有助于减轻训练过程中的梯度消失和爆炸问题,使模型更容易训练。
- 编码器和解码器:Transformer通常包括一个编码器用于处理输入序列和一个解码器用于生成输出序列,这使其适用于序列到序列的任务,如机器翻译。
目录
1.Encoder(编码器)
1.1.输入部分
1.1.1.单词嵌入(Embedding)
1.1.2.位置编码 ( Positional Encoding)
1.1.3.Transformer嵌入层 ( Transformer Embedding )
1.2.多头自注意力机制
1.2.1.自注意力机制(Self attention )
1.2.2.多头自注意力层(Multi-head attention)
1.2.3.残差连接与层归一化(Add+Norm)
1.3.前馈神经网络层(Feed Forward)
2. 解码器(Decoder)
2.1.掩码多头自注意力层
2.2.编码器-解码器注意力层
2.3.Softmax 预测输出单词
3.代码实现
3.1.一个完整的解码器层·
3.2.一个完整的解码器(六层)
3.3.transformer完整代码:
1.Encoder(编码器)
1.1.输入部分
1.1.1.单词嵌入(Embedding)
第一步是将输入到单词嵌入层。
单词嵌入层可以被认为是获取每个单词的学习矢量表示的查找表。神经网络通过数字来学习,所以每个单词都映射到一个具有连续值的向量来表示该单词。
嵌入是最最基础的概念,它的目的是把句子中的每个词转化成对应的向量。我之前写过很多介绍嵌入的博客,请参考:【文本分类】深入理解embedding层的模型、结构与文本表示_embedding模型-CSDN博客
1.1.2.位置编码 ( Positional Encoding)
下一步是将位置信息添加到嵌入中。因为变换器编码器不像递归神经网络那样具有递归性,所以我们必须将一些关于位置的信息添加到输入嵌入中。
这是使用位置编码完成的。作者想出了一个使用正弦和余弦函数的聪明绝招。
位置编码的512维度和词向量512维度相加,形成整个transformer的输入:
更好的理解:
1.为什么要位置编码?
Transformer模型采用位置编码,因为其自注意力机制无法识别序列顺序。位置编码为模型提供了理解词序的能力,对于处理固定长度序列和保持语言数据的顺序信息至关重要。通过位置编码,Transformer能够在处理语言任务时,正确识别每个元素的位置,即使序列被填充或截断。
1.1.3.Transformer嵌入层 ( Transformer Embedding
)
将词嵌入和位置嵌入统一一下称作transformer的嵌入TransformerEmbeddings
。最终的向量结果是词嵌入和位置嵌入直接做加法,比较简单。
已完成:
1.2.多头自注意力机制
Multi head attention mechanism
1.2.1.自注意力机制(Self attention )
在了解多头自注意力层之前,首先需要了解自注意力机制。自注意力机制允许模型在计算序列中每个位置的表示时,能够同时关注序列中的其他位置,而不是只关注当前位置。通过自注意力机制,模型可以根据序列中每个位置的重要性来动态地加权计算每个位置的表示。自注意力机制的结构如下图所示:
接收的是
- 输入(单词的表示向量x组成的矩阵X)
- 或者上一个 Encoder block 的输出
在 自注意力机制中,首先要进行 Q(查询), K(键值), V(值) 的获取。我们可以根据输入来进行线性变换得到 Q , K , V,即:
Q, K , V 获取之后的self-attention输出计算公式为:
是Q,K的列数,即:3
公式中计算矩阵Q和K每一行向量的内积,为了防止内积过大,因此除以 𝑑𝑘 的平方根。Q乘以K的转置后,得到的矩阵行列数都为 n,n 为句子单词数,这个矩阵可以表示单词之间的 attention 强度。下图为Q乘以 𝐾𝑇 ,1234 表示的是句子中的单词。
softmax函数定义如下:
得到𝑄𝐾𝑇 之后,使用 Softmax 计算每一个单词对于其他单词的 attention 系数,公式中的 Softmax 是对矩阵的每一行进行 Softmax,即每一行的和都变为 1。
得到 Softmax 矩阵之后可以和V相乘,得到最终的输出Z。
上图中 Softmax 矩阵的第 1 行表示单词 1 与其他所有单词的 attention 系数,最终单词 1 的输出 𝑍1 等于所有单词 i 的值 𝑉𝑖 根据 attention 系数的比例加在一起得到,如下图所示:
1.2.2.多头自注意力层(Multi-head attention)
多头自注意力层由多个注意力头组成,每个注意力头都独立地学习注意力权重。
每个注意力头产生的注意力权重会被合并成一个全局的注意力权重,然后用于加权计算输入序列中每个位置的表示。在现在公开的代码写法中,多头注意力的表示更倾向于如下表示,即先根据输入算出一个总的Q, K, V, 然后根据注意力头数的多少来拆分Q, K, V。
下面是构建多头自注意力层的一般步骤:
- 投影(Projection):将输入序列通过投影矩阵映射到多个不同的表示空间,以供不同的注意力头使用。
- 分割(Split):将投影后的表示分割成多个部分,每个部分用于不同的注意力头。
- 独立注意力计算(Independent Attention Computation):对每个部分进行独立的注意力计算,即每个注意力头都使用自注意力机制来计算注意力权重。
- 拼接(Concatenation):将每个注意力头计算得到的注意力权重合并为一个全局的注意力权重。
- 线性变换:(Linear transform):对合并后的多头注意力表示进行线性变换,以进一步整合信息并调整其维度。
从上图可以看到 Multi-Head Attention 包含多个 Self-Attention 层,首先将输入X分别传递到 h 个不同的 Self-Attention 中,计算得到 h 个输出矩阵Z。下图是 h=8 时候的情况,此时会得到 8 个输出矩阵Z。
得到 8 个输出矩阵 𝑍1 到 𝑍8 之后,Multi-Head Attention 将它们拼接在一起 (Concat),然后传入一个Linear层,得到 Multi-Head Attention 最终的输出Z。
可以看到 Multi-Head Attention 输出的矩阵Z与其输入的矩阵X的维度是一样的。
1.2.3.残差连接与层归一化(Add+Norm)
(Residual Connection)and (Layer Normalization)
- 残差连接:是指将输入直接与多头注意力层的输出相加,以形成残差块(Residual Block)。
Transformer中的残差连接(Residual Connection)可表示为 Z = + X ′ ,
是输入, X ′ 为多头注意力的输出。其过程如下图所示
- 归一化层:用于规范化神经网络中的每个层的输出。它计算每个层的均值和方差,并将每个层的输出进行归一化。这有助于加速训练过程,并提高网络的泛化能力。
-
数学上,给定一个神经网络层的输入 x,层归一化将对输入进行归一化处理,计算出新的输出
,μ 和 σ 分别是输入x 的均值和标准差,γ和 β 是可学习的缩放因子和偏移因子,ϵ 是一个很小的数,用于数值稳定性。
-
层归一化在每个样本的每个特征维度上进行归一化,而不是在每个样本的整个批次上进行归一化,这使得它更适用于序列数据和小批次训练。例如,我们可以对上图中输出 Z 进行层归一化,就是对 Z 的每一行进行归一化操作,如下图所示
class LayerNorm(nn.Module):def __init__(self, d_model, eps=1e-6):super().__init__()self.gamma = nn.Parameter(torch.ones(d_model))self.beta = nn.Parameter(torch.zeros(d_model))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True) # -1代表最后一个维度,对每一行元素进行操作var = x.var(-1, keepdim=True) # -1代表最后一个维度,对每一行元素进行操作return self.gamma * (x - mean) / torch.sqrt(var + self.eps) + self.beta# 测试代码
input_tensor = torch.tensor([[1.0, 2.0, 3.0],[4.0, 5.0, 6.0]])
layer_norm = LayerNorm(d_model=3)
normed_reuslt = layer_norm(input_tensor)
print(normed_reuslt)
# tensor([[-1.0000, 0.0000, 1.0000],
# [-1.0000, 0.0000, 1.0000]], grad_fn=<AddBackward0>)
1.3.前馈神经网络层(Feed Forward)
(Feedforward Neural Network Layer)
负责对每个位置的隐藏表示进行非线性变换,从而帮助模型学习适应不同任务的特征表示。其结构如下图所示:
具体来说,Transformer中的前馈神经网络层通常由两个全连接层组成,第一层的激活函数为 Relu,第二层不使用激活函数
- 全连接层(FC):由两个全连接层组成,其中每个全连接层的权重矩阵是可学习的参数。
- 激活函数:通常在全连接层之间使用激活函数,例如ReLU。
- 正则化:通常会在前馈神经网络层中使用正则化技术,如Dropout,以防止过拟合。
两层如下:
class FeedForward(nn.Module):def __init__(self, d_model, hidden_size=2048, dropout=0.1):super().__init__()self.linear1 = nn.Linear(d_model, hidden_size)self.relu = nn.ReLU()self.dropout = nn.Dropout(p=dropout)self.linear2 = nn.Linear(hidden_size, d_model)def forward(self, x):x = self.linear1(x)x = self.relu(x)x = self.dropout(x)x = self.linear2(x)return x# 测试代码
d_model = 512
x = torch.randn(64, 10, 512)
ff_layer = FeedForward(d_model)
output = ff_layer(x)
print(output.shape) # torch.Size([64, 10, 512])
FeedForward类:
我们定义了一个名为FeedForward的PyTorch模块,表示一个前馈神经网络层。
- 在__init__方法中,我们定义了两个线性层linear1和linear2,分别将输入映射到隐藏层和将隐藏层映射到输出层。此外,我们还定义了一个Dropout层dropout,用于随机丢弃部分神经元以防止过拟合。
- 在forward方法中,我们首先通过第一个线性层进行线性变换,并应用ReLU激活函数。然后,我们对激活后的结果应用了Dropout操作,最后通过第二个线性层获得输出。
feedforward之后的再次的残差连接与层归一化,如下图所示。这一部分,基于之前的基础就比较简单,层归一化就是对输出 O 的每一行进行处理。
已完成整个编码器:
2. 解码器(Decoder)
完整示意图:
Decoder结构示意图:
前面的单词嵌入和位置编码和前面的编码器一样。
解码器最开始的 Outputs (shifted right)
是解码器输入序列经过预处理后的表示,具体含义如下:
-
右移的目标序列嵌入:
在自回归生成任务(如机器翻译)中,解码器的输入是目标序列的嵌入表示(例如翻译任务中的目标语言词嵌入),但需要右移一位(Shifted Right)并添加位置编码(Positional Encoding)。- 例如:真实目标序列是
[<SOS>, 我, 爱, 猫, <EOS>]
→ 右移后输入解码器的是[<SOS>, 我, 爱, 猫]
(去掉<EOS>
,右移使模型看不到未来词)。
- 例如:真实目标序列是
原因:
-
掩码自回归性质:
解码器在训练时需保证生成第t
个词时仅能看到前t-1
个词(防止信息泄露)。右移后,模型在位置i
的输入对应真实目标序列的位置i-1
,确保自回归性质。 - 例如:输入
[<SOS>, 我, 爱]
→ 模型应预测[我, 爱, 猫]
2.1.掩码多头自注意力层
(Masked Multi-Head Self-Attention Layer)
用于处理目标序列中不同位置之间的关联性,并帮助解码器生成目标序列。
解码器的掩码多头自注意力层(Masked Multi-Head Self-Attention Layer)与编码器的多头自注意力层类似,但在注意力计算过程中需要考虑到遮挡机制(Masking),以确保解码器在生成目标序列时只依赖于之前已生成的部分序列。
具体示例讲解:
第一个 Multi-Head Attention 采用了 Masked 操作,因为在翻译的过程中是顺序翻译的,即翻译完第 i 个单词,才可以翻译第 i+1 个单词。通过 Masked 操作可以防止第 i 个单词知道 i+1 个单词之后的信息。下面以 "我有一只猫" 翻译成 "I have a cat" 为例,了解一下 Masked 操作。
下面的描述中使用了类似 Teacher Forcing 的概念,不熟悉 Teacher Forcing 的童鞋可以参考以下上一篇文章Seq2Seq 模型详解。在 Decoder 的时候,是需要根据之前的翻译,求解当前最有可能的翻译,如下图所示。首先根据输入 "<Begin>" 预测出第一个单词为 "I",然后根据输入 "<Begin> I" 预测下一个单词 "have"。
Decoder 可以在训练的过程中使用 Teacher Forcing 并且并行化训练,即将正确的单词序列 (<Begin> I have a cat) 和对应输出 (I have a cat <end>) 传递到 Decoder。那么在预测第 i 个输出时,就要将第 i+1 之后的单词掩盖住,注意 Mask 操作是在 Self-Attention 的 Softmax 之前使用的,下面用 0 1 2 3 4 5 分别表示 "<Begin> I have a cat <end>"。
第一步:准备的是 Decoder 的输入矩阵和 Mask 矩阵,输入矩阵包含 "<Begin> I have a cat" (0, 1, 2, 3, 4) 五个单词的表示向量,Mask 是一个 5×5 的矩阵。在 Mask 可以发现单词 0 只能使用单词 0 的信息,而单词 1 可以使用单词 0, 1 的信息,即只能使用之前的信息。
第二步:接下来的操作和之前的 Self-Attention 一样,通过输入矩阵X计算得到Q,K,V矩阵。然后计算Q和 𝐾𝑇 的乘积 𝑄𝐾𝑇 。
第三步:在得到 𝑄𝐾𝑇 之后需要进行 Softmax,计算 attention score,我们在 Softmax 之前需要使用Mask矩阵遮挡住每一个单词之后的信息,遮挡操作如下:
Mask 𝑄𝐾𝑇每一行的和都为 1。但是单词 0 在单词 1, 2, 3, 4 上的 attention score 都为 0。
第四步:使用 Mask 𝑄𝐾𝑇与矩阵 V相乘,得到输出 Z,则单词 1 的输出向量 𝑍1 是只包含单词 1 信息的。
第五步:通过上述步骤就可以得到一个 Mask Self-Attention 的输出矩阵 𝑍𝑖 ,然后和 Encoder 类似,通过 Multi-Head Attention 拼接多个输出𝑍𝑖 然后计算得到第一个 Multi-Head Attention 的输出Z,Z与输入X维度一样。
解码器的多头自注意力层有助于解码器在生成目标序列时对不同位置的信息进行整合和关联,从而提高了解码器的性能和生成质量。
2.2.编码器-解码器注意力层
(Encoder-Decoder Attention Layer)
编码器-解码器注意力层的输入包括解码器当前位置的表示和编码器所有位置的表示,如下图红色虚线框所示。
其计算过程与自注意力层(self-attention layer)类似,但有一点不同:在计算注意力分数时,解码器位置的查询Q来自于解码器当前位置的表示,而键和值K、V来自于编码器所有位置的表示。 这样,解码器当前位置可以根据编码器的所有信息来进行注意力计算,以便更好地生成输出。
下面是编码器-解码器注意力层的主要步骤:
- 计算查询、键和值: 对于解码器当前位置的表示,通过线性变换得到查询(query),对于编码器所有位置的表示,也通过线性变换得到键(key)和值(value)。
- 计算注意力分数: 将解码器位置的查询与编码器所有位置的键进行点积,然后对每个注意力分数进行缩放以避免梯度消失或爆炸。
- 计算注意力权重: 对注意力分数进行 softmax 归一化,得到注意力权重,这表示解码器当前位置与编码器各个位置的关注程度。
- 加权求和: 使用注意力权重将编码器所有位置的值加权求和,得到解码器当前位置的上下文表示。
- 输出计算: 将上下文表示与解码器当前位置的表示拼接或相加,然后通过线性变换得到最终的输出表示。
2.3.Softmax 预测输出单词
Decoder block 最后的部分是利用 Softmax 预测下一个单词,在之前的网络层我们可以得到一个最终的输出 Z,因为 Mask 的存在,使得单词 0 的输出 Z0 只包含单词 0 的信息,如下:
Softmax 根据输出矩阵的每一行预测下一个单词:
3.代码实现
3.1.一个完整的解码器层·
class DecodeLayer(nn.Module):def __init__(self, d_model, dropout=0.1):super().__init__()self.layer_norm = LayerNorm(d_model)self.dropout = nn.Dropout(p=dropout)self.multi_attention = MultiheadAttention(d_model)self.ff_layer = FeedForward(d_model)def forward(self, x, encode_output, trg_mask):_x = x # 用于掩码多头注意力机制的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.multi_attention(x, x, x, trg_mask)) # 残差连接_x = x # 用于编码-解码多头注意力机制的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.multi_attention(x, encode_output, encode_output)) # 残差连接_x = x # 用于feedfoward输出的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.ff_layer(x))return x# 测试代码
# 产生掩码
def create_mask(size):# size: the sequence length of inputnp_mask = np.triu(np.ones((1, size, size)), k=1).astype('uint8')trg_mask = torch.from_numpy(np_mask == 0)return trg_masktrg_mask = create_mask(size=5) # size = sequence lengthd_model = 512
input_encode = torch.LongTensor([[2, 7, 3, 4, 8]]) # encode的输入为整数索引张量
encode = Encode(d_model)
encode_output = encode(input_encode)input_decode_layer = torch.randn(1, 5, 512)
decode_layer = DecodeLayer(d_model)
output = decode_layer(input_decode_layer, encode_output=encode_output, trg_mask=trg_mask)
print(output.shape) # torch.Size([1, 5, 512])
3.2.一个完整的解码器(六层)
class Decode(nn.Module):def __init__(self, d_model, vocab_size=2000, num_decode_layer=6, num_heads=8, dropout=0.1):super().__init__()self.num_decode_layer = num_decode_layerself.embed = nn.Embedding(vocab_size, d_model)self.position_encode = PositionalEncoder(d_model)self.decode_layer = DecodeLayer(d_model)self.decode_layers = nn.ModuleList([copy.deepcopy(self.decode_layer) for i in range(num_decode_layer)])self.layer_norm = LayerNorm(d_model)def forward(self, trg, encode_output, trg_mask):x = self.embed(trg)x = self.position_encode(x)for i in range(self.num_decode_layer):x = self.decode_layers[i](x, encode_output, trg_mask)return self.layer_norm(x)# 测试代码
# 产生掩码
def create_mask(size):# size: the sequence length of inputnp_mask = np.triu(np.ones((1, size, size)), k=1).astype('uint8')trg_mask = torch.from_numpy(np_mask == 0)return trg_masktrg_mask = create_mask(size=50) # size = sequence lengthd_model = 512
input_encode = torch.randint(1, 5, (64, 50)) # 输入是形状为: [batch_size, seq_length]
input_decode = torch.randint(1, 5, (64, 50))encode = Encode(d_model=d_model)
encode_output = encode(input_encode)decode = Decode(d_model=d_model)
output = decode(input_decode, encode_output=encode_output, trg_mask=trg_mask)print(output.shape) # torch.Size([64, 50, 512])
3.3.transformer完整代码:
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
import copy
import numpy as npclass PositionalEncoder(nn.Module):def __init__(self, d_model, max_seq_len=5000, dropout=0.1):super().__init__()self.d_model = d_modelself.max_seq_len = max_seq_lenself.dropout = nn.Dropout(p=dropout)# 初始化位置编码矩阵pe = torch.zeros(max_seq_len, d_model)# 计算位置编码for pos in range(max_seq_len):for i in range(d_model // 2):pe[pos, 2 * i] = math.sin(pos / 10000 ** ((2 * i) / d_model))pe[pos, 2 * i + 1] = math.cos(pos / 10000 ** ((2 * i) / d_model))# 增加 batch_size 维度pe = pe.unsqueeze(0)# 将pe加入模型,但是不进行更新self.register_buffer('pe', pe)def forward(self, x):seq_len = x.size(1)return self.dropout(x + self.pe[:, :seq_len, :])class MultiheadAttention(nn.Module):def __init__(self, d_model, num_heads=8, dropout=0.1):super().__init__()self.d_model = d_modelself.num_heads = num_headsself.d_k = d_model // num_heads# W_Q, W_K, W_Vself.q_linear = nn.Linear(d_model, d_model)self.k_linear = nn.Linear(d_model, d_model)self.v_linear = nn.Linear(d_model, d_model)self.dropout = nn.Dropout(p=dropout)# W_Oself.out_linear = nn.Linear(d_model, d_model)def forward(self, query, key, value, mask=None):batch_size = key.size(0)# 对输入进行线性变换得到Q, K, V, 然后按头数拆分,最后调整成形状[batch_size, num_heads, seq_len, d_k]# 目的就是为了后续计算过程中,各个head之间的独立计算,具体可参考上面详细例子示意q = self.q_linear(query).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)k = self.k_linear(key).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)v = self.v_linear(value).view(batch_size, -1, self.num_heads, self.d_k).transpose(1, 2)scores = torch.matmul(q, k.transpose(-2, -1) / math.sqrt(self.d_k))if mask is not None: # mask 是为了在decode过程中用mask = mask.unsqueeze(1) # 增加一个head维度scores = scores.masked_fill(mask == 0, float('-inf'))scores = F.softmax(scores, dim=-1) # -1代表最后一个维度,这里就是为了表示对每一行的元素进行操作# 这里实现的就是多个head输出后,然后concatenationattention_output = torch.matmul(scores, v).transpose(1, 2).contiguous().view(batch_size, -1, self.d_model)# 线性变换得到最后输出output = self.out_linear(attention_output)return output class LayerNorm(nn.Module):def __init__(self, d_model, eps=1e-6):super().__init__()self.gamma = nn.Parameter(torch.ones(d_model))self.beta = nn.Parameter(torch.zeros(d_model))self.eps = epsdef forward(self, x):mean = x.mean(-1, keepdim=True) # -1代表最后一个维度,对每一行元素进行操作var = x.var(-1, keepdim=True) # -1代表最后一个维度,对每一行元素进行操作return self.gamma * (x - mean) / torch.sqrt(var + self.eps) + self.betaclass FeedForward(nn.Module):def __init__(self, d_model, hidden_size=2048, dropout=0.1):super().__init__()self.linear1 = nn.Linear(d_model, hidden_size)self.relu = nn.ReLU()self.dropout = nn.Dropout(p=dropout)self.linear2 = nn.Linear(hidden_size, d_model)def forward(self, x):x = self.linear1(x)x = self.relu(x)x = self.dropout(x)x = self.linear2(x)return xclass EncodeLayer(nn.Module):def __init__(self, d_model, num_heads=8, dropout=0.1):super().__init__()self.d_model = d_modelself.num_heads = num_headsself.layer_norm = LayerNorm(d_model)self.multi_attention = MultiheadAttention(d_model, num_heads)self.ff_layer = FeedForward(d_model)self.dropout = nn.Dropout(p=dropout)def forward(self, x):_x = x # 先存储输入,用于后面和attention输出进行残差连接x = self.layer_norm(x) # 输入到attention之前新进行层归一化x = self.dropout(self.multi_attention(query=x, key=x, value=x)) # attention 输出__x = x # 存储attention的输出, 用于后面和feedforward的输出残差连接x = self.layer_norm(_x + x) # 对 add之后的结果 normx = self.dropout(self.ff_layer(x)) # feedforward 输出x = __x + x # 残差连接return xclass Encode(nn.Module):def __init__(self, d_model, vocab_size=2000, num_encode_layer=6, num_heads=8, dropout=0.1):super().__init__()self.vocab_size = vocab_sizeself.d_model = d_modelself.num_encode_layer = num_encode_layerself.num_heads = num_headsself.dropout = dropoutself.embed = nn.Embedding(vocab_size, d_model) # 定义词典大小self.position_encode = PositionalEncoder(d_model)self.encode_layer = EncodeLayer(d_model)# 六个EncodeLayer层self.encode_layers = nn.ModuleList([copy.deepcopy(self.encode_layer) for i in range(num_encode_layer)])self.layer_norm = LayerNorm(d_model)def forward(self, src):x = self.embed(src)x = self.position_encode(x)# 六个EncodeLayer层依次输出for i in range(self.num_encode_layer):x = self.encode_layers[i](x)return self.layer_norm(x)class DecodeLayer(nn.Module):def __init__(self, d_model, dropout=0.1):super().__init__()self.layer_norm = LayerNorm(d_model)self.dropout = nn.Dropout(p=dropout)self.multi_attention = MultiheadAttention(d_model)self.ff_layer = FeedForward(d_model)def forward(self, x, encode_output, trg_mask):_x = x # 用于掩码多头注意力机制的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.multi_attention(x, x, x, trg_mask)) # 残差连接_x = x # 用于编码-解码多头注意力机制的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.multi_attention(x, encode_output, encode_output)) # 残差连接_x = x # 用于feedfoward输出的残差连接x = self.layer_norm(x)x = _x + self.dropout(self.ff_layer(x))return xclass Decode(nn.Module):def __init__(self, d_model, vocab_size=2000, num_decode_layer=6, num_heads=8, dropout=0.1):super().__init__()self.num_decode_layer = num_decode_layerself.embed = nn.Embedding(vocab_size, d_model)self.position_encode = PositionalEncoder(d_model)self.decode_layer = DecodeLayer(d_model)self.decode_layers = nn.ModuleList([copy.deepcopy(self.decode_layer) for i in range(num_decode_layer)])self.layer_norm = LayerNorm(d_model)def forward(self, trg, encode_output, trg_mask):x = self.embed(trg)x = self.position_encode(x)for i in range(self.num_decode_layer):x = self.decode_layers[i](x, encode_output, trg_mask)return self.layer_norm(x)def create_mask(size):# size: the sequence length of inputnp_mask = np.triu(np.ones((1, size, size)), k=1).astype('uint8')trg_mask = torch.from_numpy(np_mask == 0)return trg_mask if __name__ == "__main__":trg_mask = create_mask(size=50) # size = sequence lengthd_model = 512input_encode = torch.randint(1, 5, (64, 50)) # 输入是形状为: [batch_size, seq_length]input_decode = torch.randint(1, 5, (64, 50))encode = Encode(d_model=d_model)encode_output = encode(input_encode)decode = Decode(d_model=d_model)output = decode(input_decode, encode_output=encode_output, trg_mask=trg_mask)print(output.shape) # torch.Size([64, 50, 512])