09 大模型学习——Transformer详解
从零开始,手撕transformer
1、Transformer基本结构
Transformer 是一种用于自然语言处理和机器翻译等任务的深度学习模型架构,它由Google的研究人员在2017年提出。相比于传统的循环神经网络(RNN)和卷积神经网络(CNN),Transformer 使用了全新的架构,通过注意力机制(Attention)来捕捉输入序列中的上下文关系,Attention是这个模型的重点。
注意力机制是一个帮助算法辨别信息重要性的计算流程,它通过计算样本与样本之间相关性来判断每个样本在序列中的重要程度,并**给这些样本赋予能代表其重要性的权重**。很显然,注意力机制能够为样本赋予权重的属性与序列模型研究领域的追求完美匹配,Transformer正是利用了自注意力机制的这一特点,从而想到利用注意力机制来进行权重的计算。
在注意力机制当中,跨序列进行样本相关性计算的是经典的注意力机制(Attention),在一个序列内部对样本进行相关性计算的是自注意力机制(self-attention)。在Transformer架构中我们所使用的是自注意力机制。
- 第一,**为什么要判断序列中样本的重要性?**
对序列数据来说,每个样本对于理解序列所做出的贡献是不相同的,能够帮助我们理解序列数据含义的样本更为重要,而对序列数据的本质逻辑/含义影响不大的样本则不那么重要。
- 第二,**那样本的重要性是如何定义的?为什么?**
自注意力机制通过**计算样本与样本之间的相关性**来判断样本的重要性,在一个序列当中,如果一个样本与其他许多样本都高度相关,则这个样本大概率会对整体的序列有重大的影响。
这样的规律可以被推广到许多序列数据上,在序列数据中我们认为**与其他样本高度相关的样本,大概率会对序列整体的理解有重大影响。因此样本与样本之间的相关性可以用来衡量一个样本对于序列整体的重要性**。
- 第三,**样本的重要性(既一个样本与其他样本之间的相关性)具体是如何计算的?**
在NLP的世界中,序列数据中的每个样本都会被编码成一个向量,其中文字数据被编码后的结果被称为词向量,时间序列数据则被编码为时序向量。
因此,要计算样本与样本之间的相关性,本质就是计算向量与向量之间的相关性。**向量的相关性可以由两个向量的点积来衡量**。如果两个向量完全相同方向(夹角为0度),它们的点积最大,这表示两个向量完全正相关;如果它们方向完全相反(夹角为180度),点积是一个最大负数,表示两个向量完全负相关;如果它们垂直(夹角为90度或270度),则点积为零,表示这两个向量是不相关的。因此,向量的点积值的绝对值越大,则表示两个向量之间的相关性越强,如果向量的点积值绝对值越接近0,则说明两个向量相关性越弱。
在NLP的世界当中,我们所拿到的词向量数据或时间序列数据一定是具有多个样本的。我们需要求解**样本与样本两两之间的相关性**,综合该相关性分数,我们才能够计算出一个样本对于整个序列的重要性。在这里需要注意的是,在NLP的领域中,样本与样本之间的相关性计算、即向量的之间的相关性计算会受到向量顺序的影响。**这是说,以一个单词为核心来计算相关性,和以另一个单词为核心来计算相关性,会得出不同的相关程度**。
因此,假设数据中存在A和B两个样本,则我们必须计算AB、AA、BA、BB四组相关性才可以。在每次计算相关性时,作为核心词的那个词被认为是在“询问”(Question),而作为非核心的词的那个词被认为是在“应答”(Key),AB之间的相关性就是A询问、B应答的结果,AA之间的相关性就是A向自己询问、A自己应答的结果。
这个过程可以通过矩阵的乘法来完成。假设现在我们的向量中有2个样本(A与B),每个样本被编码为了拥有4个特征的词向量。如下所示,如果我们要计算A、B两个向量之间的相关性,只需要让特征矩阵与其转置矩阵做乘法就可以了——
该乘法规律可以推广到任意维度的数据上,因此面对任意的数据,我们只需要让该数据与自身的转置矩阵相乘,就可以自然得到样本与样本之间的相关性构成的相关性矩阵了。
当然,在实际计算相关性的时候,我们一般不会直接使用原始特征矩阵并让它与转置矩阵相乘,**因为我们渴望得到的是语义的相关性,而非单纯数字上的相关性**。因此在NLP中使用注意力机制的时候,**我们往往会先在原始特征矩阵的基础上乘以一个解读语义的w参数矩阵,以生成用于询问的矩阵Q、用于应答的矩阵K以及其他可能有用的矩阵**。
在实际进行运算时,w是神经网络的参数,是由迭代获得的,因此w会依据损失函数的需求不断对原始特征矩阵进行语义解读,而我们实际的相关性计算是在矩阵Q和K之间运行的。使用Q和K求解出相关性分数的过程,就是自注意力机制的核心过程。
以下是来自论文《All you need is Attention》的架构图:
2、Embedding层(把文本转化为可计算的数据)
在Transformer中,embedding层位于encoder和decoder之前,主要负责进行语义编码(文本向量化,需要把文字转化成向量,才能进行数学运算。)
文本向量化过程,要分为两步,第一步是是把文字转化为一个索引,也就是找到这个字在词表中的索引位置;第二步,就是从随机生成的 lookup table (查找表)中,找这个索引对应的向量,作为这个词的表征。
- 嵌入层就是一个10000*512的参数矩阵
- 嵌入矩阵的每一行对应一个token的向量表示
- 嵌入层的参数是随机初始化的,经过学习后嵌入矩阵便可以学习到词语的向量表示
# embedding层
from torch import nn
import math
import torch
class Embeddings(nn.Module):
def __init__(self, vocab_size,d_model):
super().__init__()
# Embedding层
self.lut = nn.Embedding(vocab_size,d_model)
# Embedding维数
self.d_model = d_model
def forward(self,x):
# 返回x对应的embedding矩阵,需要乘以math.sqrt(d_model)
return self.lut(x) * math.sqrt(self.d_model)
emb = Embeddings(10,8)
inputs = torch.tensor([
[1, 2, 3],
[4, 5, 0],
])
emb_out = emb(inputs) # torch.Size([2, 3, 8])
print(emb_out.shape)
print(emb_out)
embedding矩阵乘以math.sqrt(d_model)的原因:
在后面要对模型参数进行初始化时,是随机初始化模型的参数,随机抽样分布满足N(0, 1/d_model),d_model越大,方差会越小,乘以 math.sqrt(self.d_model),可以拉回到 N(0, 1) 分布。把单词的嵌入向量放大根号下d_model倍
在嵌入层(Embedding Layer)的输出中乘以 math.sqrt(self.d_model) 是 Transformer 模型的一个关键设计,主要目的是平衡数值量级并优化梯度传播。以下是具体原因:
- 平衡词嵌入与位置编码的数值量级,避免位置信息淹没语义信息。
- 维持网络各层的方差一致性,防止梯度消失或爆炸。
- 与注意力机制中的缩放操作对称,确保数值动态范围合理。
这一设计是 Transformer 模型成功的关键细节之一,直接影响模型的收敛速度和最终性能。
3、Positional Encoding位置编码(⭐)
**自注意力机制无法捕捉到字之间的位置信息。**这样会导致一个很严重的问题:当调换文字顺序之后,其语义将发生极大改变,而自注意力机制无法捕捉到这种改变,这将会产生理解上的问题。
位置编码为什么不会破坏词向量本身的信息?
# 位置编码层
## 位置编码的实现其实很简单,直接对照着公式去敲代码就可以,下面这个代码只是其中一种实现方式;
## 从理解来讲,需要注意的就是偶数和奇数在公式上有一个共同部分;
## pos代表的是单词在句子中的索引,这点需要注意;比如max_len是128个,那么索引就是从0,1,2,...,127
##假设我的d_model是512,2i那个符号中i从0取到了255,
# 那么2i对应取值就是0,2,4...510,那么2i+1对应取值就是1,3,5...511
class PositionalEncoding(nn.Module):
def __init__(self, d_model,max_len=5000,dropout=0.1) -> None:
super().__init__()
self.dropout = nn.Dropout(dropout)
pe = torch.zeros(max_len,d_model)
pos = torch.arange(0,max_len).unsqueeze(1)
div_term = torch.pow(10000, torch.arange(0, d_model, 2).float() / d_model)
## 这里需要注意的是pe[:, 0::2]这个用法,就是从0开始到最后面,步长为2,其实代表的就是偶数位置
pe[:, 0::2] = torch.sin(pos / div_term)
##这里需要注意的是pe[:, 1::2]这个用法,就是从1开始到最后面,步长为2,其实代表的就是奇数位置
pe[:, 1::2] = torch.cos(pos / div_term)
#升维
pe = pe.unsqueeze(0)
#将位置编码矩阵设置为缓冲区(这些缓冲区不会被视为模型的可训练参数(即不会被优化器更新),但会作为模型状态的一部分保存和加载)
#相当于把位置信息保存在了一个常量数组中
self.register_buffer("pe",pe)
def forward(self,x):
x = x+ self.pe[:,:x.size(1)]
return self.dropout(x)
pos = PositionalEncoding(8)
pos_out = pos(emb_out)
print(pos_out.shape)
import torch
import torch.nn as nn
class MyModel(nn.Module):
def __init__(self):
super(MyModel, self).__init__()
self.weight = nn.Parameter(torch.randn(2, 2)) # 可训练参数
# 注册缓冲区 这个张量会随模型的状态字典 (state_dict) 一起保存和加载
#保存不需要训练但需持久化的张量。
self.register_buffer('running_mean', torch.zeros(2))
model = MyModel()
print(model.state_dict()) # 输出包含 weight 和 running_mean
import matplotlib.pyplot as plt
# 创建位置编码实例
pos_encoder = PositionalEncoding(d_model=512, max_len=100)
# 获取位置编码矩阵
pe = pos_encoder.pe.squeeze(0) # (100, 512)
# 可视化
plt.figure(figsize=(10, 6))
plt.imshow(pe.numpy(), cmap='viridis', aspect='auto')
plt.xlabel('Embedding Dimension')
plt.ylabel('Position')
plt.colorbar(label='Value')
plt.title('Positional Encoding')
plt.show()
位置编码设计的优点:
Transformer 采用的是正余弦的绝对位置编码,这种编码方式可以保证,不同位置在所有维度上不会被编码到完全一样的值,从而使每个位置都获得独一无二的编码。通俗理解,就是将位置信息编码成向量后,每个位置对应的向量都是不同的。
位置编码使用了一种特殊的函数,这个函数会为序列中的每个位置生成一个向量。对于一个特定的位置,这个函数生成的向量在所有维度上的值都是不同的。这保证了每个位置的编码都是唯一的,而且不同位置的编码能够保持一定的相对关系。**在transformer的位置编码中,我们需要对每个词的每个特征值给与位置编码**。
在Transformer模型中,词嵌入和位置编码相加,帮助模型更好地理解语言的顺序性质,然后输入到模型的第一层。这样,Transformer就可以同时处理词语的语义和它在句子中的位置信息。这也是Transformer模型在处理序列数据,特别是自然语言处理任务中表现出色的一个重要原因。
4、多头自注意力
Transformer中的注意力模块都是采用的都是多头注意力机制。
在深度学习中,模型的特征表达能力对模型的性能起着至关重要的影响,模型的特征表达能力越强,模型的准确性就越高。
在图像处理任务中,模型为了能提取到更多样的特征,每一层都会有多个卷积核,不同的卷积核负责提取不同特征,比如边缘、纹理特征等。最终得到多个通道的特征表示
NLP任务也同样,想要更充分地理解语义,就要提取出更丰富的特征。
**多头注意力相比单个注意力模块,只增加了一个线性映射的计算量,却可以提取到更丰富的信息.**
输入模型的句子一般都是有长有短的,为方便批处理,会强行用 pad 填充到等长。而填充的 pad 经过词嵌入和位置编码层,会被编码成一个正常的特征向量,为了防止 pad 影响计算结果,需要把 pad 对应位置的数值 mask 掉。
为什么赋值为-∞?
-∞经过softmax后变为0,则可以有效去除padding的影响
# masked_fill方法
scores = torch.randn(2, 3, 3)
print(scores)
inputs = torch.tensor([
[1, 2, 3],
[4, 5, 0]
])
mask = (inputs == 0).unsqueeze(1)
print(mask)
print(mask.shape)
scores = scores.masked_fill(mask, -1e9)
print(scores)
# 注意力机制增加掩码
def get_padding_mask(x,padding_idx):
return (x == padding_idx).unsqueeze(1)
# 注意力机制
def attention(q,k,v,mask=None,dropout=None):
d_k = k.size(-1)
scores = torch.matmul(q,k.transpose(-2,-1)) / math.sqrt(d_k)
if mask is not None:
scores = scores.masked_fill(mask,-1e9)
p_attn = torch.softmax(scores,dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn,v)
q = k = v = pos_out
mask = get_padding_mask(inputs,0)
result = attention(q, k, v,mask=mask)
print(result.shape)
5、Add&LayerNorm
**Transformer中使用LayerNorm而不是BatchNorm的主要原因在于LayerNorm能够处理变长序列,而BatchNorm需要固定的batch大小。LayerNorm在每个样本内进行归一化,适合处理序列数据。LayerNorm在Transformer的每个子层中应用,能够提高模型的训练稳定性.**
# 多头注意力机制+add+layernorm
class MultiHeadedAttention(nn.Module):
def __init__(self, d_model,n_head,dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
#必须能被整除
assert d_model % n_head == 0
self.d_k = d_model // n_head
self.n_head = n_head
# Linear
self.W_Q = nn.Linear(d_model,d_model,bias=False)
self.W_K = nn.Linear(d_model,d_model,bias=False)
self.W_V = nn.Linear(d_model,d_model,bias=False)
self.linear = nn.Linear(d_model,d_model,bias=False)
#norm
self.norm = nn.LayerNorm(d_model)
def forward(self,q,k,v,mask=None):
res = q
#分头
#shape batch_size,seq_len,d_mdoel
q = self.W_Q(q).view(q.shape[0],q.shape[1],self.n_head,self.d_k).transpose(1,2)
k = self.W_K(k).view(k.shape[0],k.shape[1],self.n_head,self.d_k).transpose(1,2)
v = self.W_V(v).view(v.shape[0],v.shape[1],self.n_head,self.d_k).transpose(1,2)
#掩码也需要升维
if mask is not None:
mask = mask.unsqueeze(1)
#计算注意力
context = attention(q,k,v,mask,self.dropout)
#拼接
concat = context.transpose(1,2).reshape(q.shape[0],-1,self.n_head * self.d_k)
output = self.linear(concat)
return self.norm(res+output)
mha = MultiHeadedAttention(8,2)
mha_out = mha(q,k,v,mask)
print(mha_out)
多头注意力机制最后的全连接层是多头注意力机制的核心组件,它负责:
- 融合多头的异构注意力结果。
- 通过可学习参数增强模型的表达能力。
- 确保输出与输入维度一致,适配残差连接。
若省略该层,模型可能无法有效整合不同注意力头的信息,导致性能下降。
6、前馈神经网络(FeedForward)
前馈神经网络,这一层其实很简单,就是两个线性层加一个ReLU的激活就可以了.
Feed Forward的作用是对自注意力机制输出的特征进行进一步的非线性变换和特征提取;
- 1.非线性特征变换:引入非线性激活函数,增强模型的表达能力。
- 2.特征增强:对自注意力机制的输出进行进一步处理,提取更丰富的特征。
- 3.增加模型容量:通过额外的参数提高模型的拟合能力。
# 前馈神经网络
class FeedForwardNet(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU()
self.norm = nn.LayerNorm(d_model)
def forward(self, x):
res = x
x = self.relu(self.w1(x))
x = self.dropout(self.w2(x))
return self.norm(x + res)
# 调用测试
ffn = FeedForwardNet(8, 2048)
ffn_out = ffn(mha_out)
print(ffn_out.shape)
7、Encoder结构
# 编码器子层
class EncoderLayer(nn.Module):
def __init__(self,d_model,n_head,d_ff,dropout=0.1):
super().__init__()
self.mha=MultiHeadedAttention(d_model,n_head,dropout)
self.ffn=FeedForwardNet(d_model,d_ff,dropout)
def forward(self,x,mask=None):
output = self.mha(x,x,x,mask)
return self.ffn(output)
# 编码器层
class Encoder(nn.Module):
def __init__(self,vocab_size,d_model,n_head,d_ff,N=6,dropout=0.1):
super().__init__()
self.emb = Embeddings(vocab_size,d_model)
self.pos = PositionalEncoding(d_model=d_model,dropout=dropout)
self.layers = nn.ModuleList([
EncoderLayer(d_model,n_head,d_ff,dropout)
for _ in range(N)
])
def forward(self,x,mask=None):
x = self.emb(x)
x = self.pos(x)
for layer in self.layers:
x = layer(x,mask)
return x
inputs = torch.tensor([
[1,2,3],
[4,5,0],
])
mask = get_padding_mask(inputs,0)
enc = Encoder(10,8,2,32)
enc_out = enc(inputs,mask)
print(enc_out)
8、解码器masked掩码张量(⭐)
自回归特性(Autoregressive Property)是指模型在生成序列数据时,当前时刻的输出依赖于之前时刻的输出。换句话说,模型通过逐步生成序列中的每个元素,每次生成一个新元素时,都会基于已经生成的部分序列来预测下一个元素。
- 单词1 用于预测 单词2
- 单词1、2 用于预测 单词3
- 单词1、2、3 用于预测 单词4
在自注意力(Self-Attention)机制中,掩码(look ahead mask)的作用是确保模型在处理序列数据时,只关注到当前位置之前的元素,而不能看到当前位置之后的元素。这种机制对于保持模型的合理性和预测的准确性至关重要。在Transformer的Decoder部分,为了按照自回归的方式进行训练,即模型在预测下一个词时只能依赖于当前词及之前的词的信息,而不能看到未来的词.
解码器 Mask 是由两部分组成的,一部分是 padding mask,很好理解,另一部分,是一个三角矩阵,下面给大家重点解析一下,为什么要做这个三角矩阵的 mask。
使用一个三角矩阵进行掩码操作,目的是为了确保解码器在生成每个位置的输出时,只依赖于已经生成的位置的输入,而不会依赖于后续位置的输入,避免利用未来的信息,保持模型的自回归性质。
Transformer 的解码器通过掩码机制(如三角矩阵掩码)实现自回归特性,确保生成当前词时只能看到之前的词。
import torch
# 保留矩阵的下三角部分,包括主对角线,而将上三角部分填充为零
tril = torch.tril(torch.ones((3, 3)))
print(tril)
# 上三角部分填充为1
print(1-tril)
# 参数 size 为句子长度
def get_subsequent_mask(size):
# 1为batch维度
mask_shape = (1, size, size)
return 1-torch.tril(torch.ones(mask_shape)).byte()
# 和padding mask 叠加
inputs = torch.tensor([
[1, 2, 3],
[4, 5, 0],
])
pad_mask = get_padding_mask(inputs, 0) # padding mask
print(pad_mask)
sub_mask = get_subsequent_mask(3) # 上三角掩码
print(sub_mask)
mask = sub_mask | pad_mask # 掩码合并
print(mask)
9、Decoder结构
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_head, d_ff, dropout=0.1):
super().__init__()
self.self_mha = MultiHeadedAttention(d_model, n_head, dropout)
self.mha = MultiHeadedAttention(d_model, n_head, dropout)
self.ffn = FeedForwardNet(d_model, d_ff, dropout)
def forward(self, x, mask, memory, src_mask):
#mask 指 掩码多头自注意力 中的 掩码矩阵
# src_mask 指 paddding mask
x = self.self_mha(x, x, x, mask)
x = self.mha(x, memory, memory, src_mask)
return self.ffn(x)
class Decoder(nn.Module):
# vocab_size与encoder的不是同一个
def __init__(self, vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
super().__init__()
self.emb = Embeddings(vocab_size, d_model)
self.pos = PositionalEncoding(d_model=d_model, dropout=dropout)
self.layers = nn.ModuleList([
DecoderLayer(d_model, n_head, d_ff, dropout)
for _ in range(N)
])
def forward(self, x, mask, memory, src_mask):
x = self.emb(x)
x = self.pos(x)
for layer in self.layers:
x = layer(x, mask, memory, src_mask)
return x
10、输出层代码
# 生成器
class Generator(nn.Module):
def __init__(self,d_model,vocab_size):
super().__init__()
self.linear = nn.Linear(d_model,vocab_size)
def forward(self,x):
return torch.softmax(self.linear(x),dim=-1)
generator = Generator(8, 20)
predict = generator(output)
print(predict.shape)
print(predict)
print(torch.argmax(predict, dim=-1))
11、封装完整的Transformer结构
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
super().__init__()
self.encoder = Encoder(src_vocab_size, d_model, n_head, d_ff, N, dropout)
self.decoder = Decoder(tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
self.generator = Generator(d_model, tgt_vocab_size)
def forward(self, src_x, src_mask, tgt_x, tgt_mask):
memory = self.encoder(src_x, src_mask)
output = self.decoder(tgt_x, tgt_mask, memory, src_mask)
return self.generator(output)
12、使用 xavier 初始化模型参数
在 PyTorch 中,xavier_uniform_ 是一种用于初始化神经网络权重的函数,基于 Xavier 初始化方法。它的主要作用是帮助神经网络在训练初期保持梯度的稳定性,从而加速收敛。
def make_model(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
src_vocab_size = 6
tgt_vocab_size = 8
d_model = 512
n_head = 8
d_ff = 2048
N = 6
dropout = 0.1
model = make_model(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
# print(model)
# 计算模型的总参数数量
total_params = sum(p.numel() for p in model.parameters())
print(f"Total number of parameters: {total_params}")
# 输入数据
src_inputs = torch.tensor([
[1, 2, 3],
[4, 5, 0],
])
src_mask = get_padding_mask(src_inputs, 0)
tgt_inputs = torch.tensor([
[1, 2, 3, 4],
[4, 5, 0, 0],
])
# 处理mask
tgt_pad_mask = get_padding_mask(tgt_inputs, 0)
subsequent_mask = get_subsequent_mask(4)
tgt_mask = tgt_pad_mask | subsequent_mask
tgt_mask = tgt_mask != 0
predict = model(src_inputs, src_mask, tgt_inputs, tgt_mask)
print(predict.shape)
print(predict)
13、transformer.py完整代码
import math
import torch
import torch.nn as nn
# --------------
# Embedding层
# --------------
class Embedding(nn.Module):
def __init__(self, vocab_size, d_model):
super().__init__()
self.emb = nn.Embedding(vocab_size, d_model)
self.d_model = d_model
def forward(self, x):
# 返回x对应的embedding矩阵,需要乘以math.sqrt(d_model)
return self.emb(x) * math.sqrt(self.d_model)
# --------------
# 位置编码层(embedding结果+位置编码)
# --------------
class PositionalEncoding(nn.Module):
def __init__(self, d_model, max_len=5000, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
# 初始化位置编码矩阵
pe = torch.zeros((max_len, d_model))
# 序列位置,转形状为(max_len,1)
pos = torch.arange(max_len).unsqueeze(-1)
# 被除数
div_term = torch.pow(10000, torch.arange(0, d_model, 2).float() / d_model)
# 基数维度
pe[:, 0::2] = torch.sin(pos / div_term)
# 偶数维度
pe[:, 1::2] = torch.cos(pos / div_term)
# 升维 (batch_size,max_len,d_model)
pe = pe.unsqueeze(0)
# 将位置编码矩阵设置为缓冲区,不参与训练
self.register_buffer("pe", pe)
def forward(self, x):
x = x + self.pe[:, :x.size(1)]
return self.dropout(x)
# --------------
# 多头注意力机制 + Add + LayerNorm
# --------------
# 注意力机制增加填充掩码
def get_padding_mask(inputs, padding_idx):
return (inputs == padding_idx).unsqueeze(1)
# 注意力机制
def attention(q, k, v, mask=None, dropout=None):
d_k = k.size(-1)
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
if mask is not None:
# 掩码为Ture的位置填充为 -∞
scores = scores.masked_fill(mask, -1e9)
p_attn = torch.softmax(scores, dim=-1)
if dropout is not None:
p_attn = dropout(p_attn)
return torch.matmul(p_attn, v)
class MultiHeadedAttention(nn.Module):
def __init__(self, d_model, n_head, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
# 断言 要能整除
assert d_model % n_head == 0
self.d_k = d_model // n_head
self.n_head = n_head
self.W_Q = nn.Linear(d_model, d_model, bias=False)
self.W_K = nn.Linear(d_model, d_model, bias=False)
self.W_V = nn.Linear(d_model, d_model, bias=False)
self.linear = nn.Linear(d_model, d_model, bias=False)
# LayerNorm 层归一化
self.norm = nn.LayerNorm(d_model)
def forward(self, q, k, v, mask=None):
res = q
# 分头
# shape (batch_size,seq_len,d_mdoel)
q = self.W_Q(q).view(q.shape[0], q.shape[1], self.n_head, self.d_k).transpose(1, 2)
k = self.W_K(k).view(k.shape[0], k.shape[1], self.n_head, self.d_k).transpose(1, 2)
v = self.W_V(v).view(v.shape[0], v.shape[1], self.n_head, self.d_k).transpose(1, 2)
if mask is not None:
# 多头时,掩码也需要升维,升在n_head维度
mask = mask.unsqueeze(1)
# 计算注意力
attn = attention(q, k, v, mask=mask)
# concat连接
concat = attn.transpose(1, 2).contiguous().reshape(q.shape[0], -1, self.n_head * self.d_k)
# 全连接层
output = self.linear(concat)
return self.norm(res + output) # 残差+layernorm
# --------------
# 前馈神经网络层
# --------------
class FeedForwardNet(nn.Module):
def __init__(self, d_model, d_ff, dropout=0.1):
super().__init__()
self.dropout = nn.Dropout(dropout)
self.w1 = nn.Linear(d_model, d_ff)
self.w2 = nn.Linear(d_ff, d_model)
self.relu = nn.ReLU(inplace=True)
self.norm = nn.LayerNorm(d_model)
def forward(self, x):
res = x
x = self.relu(self.w1(x))
x = self.dropout(self.w2(x))
return self.norm(res + x)
# --------------
# Encoder 编码器结构
# --------------
# 编码器子层
class EncoderLayer(nn.Module):
def __init__(self, d_model, n_head, d_ff, dropout=0.1):
super().__init__()
self.mha = MultiHeadedAttention(d_model, n_head, dropout)
self.ffn = FeedForwardNet(d_model, d_ff, dropout)
def forward(self, x, mask=None):
x = self.mha(x, x, x, mask)
return self.ffn(x)
# 编码器
class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
super().__init__()
self.emb = Embedding(vocab_size, d_model)
self.pos = PositionalEncoding(d_model, dropout=dropout)
self.layers = nn.ModuleList([EncoderLayer(d_model, n_head, d_ff, dropout) for _ in range(N)])
def forward(self, x, mask=None):
x = self.emb(x)
x = self.pos(x)
for layer in self.layers:
x = layer(x, mask)
return x
# --------------
# Decoder 解码器结构
# --------------
# mask三角矩阵掩码
def get_subsequent_mask(seq_len):
return 1 - torch.tril(torch.ones(seq_len, seq_len)).byte()
# 解码器子层
class DecoderLayer(nn.Module):
def __init__(self, d_model, n_head, d_ff, dropout=0.1):
super().__init__()
self.mask_mha = MultiHeadedAttention(d_model, n_head, dropout)
self.mha = MultiHeadedAttention(d_model, n_head, dropout)
self.ffn = FeedForwardNet(d_model, d_ff, dropout)
def forward(self, x, mask, memory, src_mask):
x = self.mask_mha(x, x, x, mask)
x = self.mha(x, memory, memory, src_mask)
return self.ffn(x)
# 解码器
class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
super().__init__()
self.emb = Embedding(vocab_size, d_model)
self.pos = PositionalEncoding(d_model, dropout=dropout)
self.layers = nn.ModuleList([DecoderLayer(d_model, n_head, d_ff, dropout) for _ in range(N)])
def forward(self, x, mask, memory, src_mask):
x = self.emb(x)
x = self.pos(x)
for layer in self.layers:
x = layer(x, mask, memory, src_mask)
return x
# --------------
# 输出层(生成器)
# --------------
class Generator(nn.Module):
def __init__(self, d_model, vocab_size):
super().__init__()
self.linear = nn.Linear(d_model, vocab_size)
def forward(self, x):
out = self.linear(x)
# out = torch.softmax(out, dim=-1) # 后续用交叉熵损失函数则不用softmax
return out
# --------------
# 完整的Transformer结构
# --------------
class Transformer(nn.Module):
def __init__(self, src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
super().__init__()
self.encoder = Encoder(src_vocab_size, d_model, n_head, d_ff, N, dropout)
self.decoder = Decoder(tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
self.generator = Generator(d_model, tgt_vocab_size)
def forward(self, src_x, src_mask, tgt_x, tgt_mask):
memory = self.encoder(src_x, src_mask)
output = self.decoder(tgt_x, tgt_mask, memory, src_mask)
return self.generator(output)
def make_model(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N=6, dropout=0.1):
model = Transformer(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
# 模型初始化
for p in model.parameters():
if p.dim() > 1:
nn.init.xavier_uniform_(p)
return model
if __name__ == '__main__':
src_vocab_size = 1000
tgt_vocab_size = 2000
d_model = 512
n_head = 8
d_ff = 2048
N = 6
dropout = 0.1
model = make_model(src_vocab_size, tgt_vocab_size, d_model, n_head, d_ff, N, dropout)
print(model)
# 编码部分 输入
src_inputs = torch.tensor([
[1, 2, 3],
[4, 5, 0],
])
src_mask = get_padding_mask(src_inputs, 0)
# 解码部分 输入
tgt_inputs = torch.tensor([
[1, 2, 3, 4],
[4, 5, 0, 0],
])
tgt_pad_mask = get_padding_mask(tgt_inputs, 0)
subsequent_mask = get_subsequent_mask(4)
tgt_mask = tgt_pad_mask | subsequent_mask
tgt_mask = tgt_mask != 0
output = model(src_x=src_inputs, src_mask=src_mask, tgt_x=tgt_inputs, tgt_mask=tgt_mask)
print(output.shape) # torch.Size([2, 4, 2000])
print(torch.argmax(output, dim=-1))
更多推荐
所有评论(0)