Transformer 是一种基于自注意力机制的深度神经网络结构,由谷歌在2017年提出,最初应用于机器翻译任务。与传统的循环神经网络(RNN)不同,Transformer 摒弃了序列依赖的结构,依靠自注意力机制全局建模输入序列中的依赖关系,极大提升了并行计算效率和捕捉长程依赖的能力。其主要组件包括编码器、解码器、多头注意力机制、前馈网络、残差连接和层归一化。
核心模块:
多头自注意力机制(Multi-Head Self-Attention): 捕捉序列中不同位置之间的依赖关系,通过多个注意力头来并行处理不同的子空间信息。
位置编码(Positional Encoding): 通过正弦和余弦函数为序列中每个位置引入唯一的位置信息,弥补模型缺乏序列顺序感的不足。
前馈网络(Feed-Forward Network, FFN): 每个 Transformer 块包含的逐位置独立的两层全连接网络,对每个位置上的特征进行非线性变换。
残差连接和层归一化: 残差连接通过直接连接输入和输出,增强梯度流动,层归一化用于稳定模型训练,避免梯度消失。
结构特点:
- 编码器由多层自注意力和前馈网络组成,逐层处理输入序列,捕捉全局语义。
- 解码器使用自注意力和交叉注意力机制,逐步生成目标序列,利用编码器的输出建模目标语言的生成。
优势:
Transformer 通过全局自注意力机制代替了序列依赖的RNN,显著提高了并行化处理能力,尤其适合处理长序列数据,广泛应用于自然语言处理、机器翻译、语音识别等任务,成为现代大规模语言模型(如BERT、GPT)的基础架构。
依次介绍各个模块的具体功能和实现方法:
1.嵌入表示层
Transformer模型首先通过输入嵌入层将输入文本序列的每个单词转化为对应的向量表示,位置编码(Positional Encoding)则被加入以表示序列中单词之间的相对位置。通过正余弦函数生成的位置编码具备将距离信息编码进输入的能力。位置编码的公式为:其中,pos
表示单词位置,i
为位置编码维度索引,d
为总维度。位置编码加入后,不会破坏词嵌入的语义信息,并且位置编码中的距离信息能够通过正余弦的线性组合进行传播。
# 定义一个继承自 nn.Module 的类 PositionalEncoder,用于实现位置编码
class PositionalEncoder(nn.Module):
# 构造函数,初始化位置编码器
def __init__(self, d_model, max_seq_len=80):
# 调用父类 nn.Module 的构造函数
super().__init__()
# d_model 表示嵌入维度(词向量的维度)
self.d_model = d_model
# 初始化一个大小为 (max_seq_len, d_model) 的零矩阵,用来存放位置编码(Position Encoding, PE)
pe = torch.zeros(max_seq_len, d_model)
# 遍历每个序列位置(pos 表示句子的序列索引)
for pos in range(max_seq_len):
# 遍历每个嵌入维度,步长为 2,因为位置编码需要对偶数维度和奇数维度分别进行处理
for i in range(0, d_model, 2):
# 对偶数维度使用 sin 函数,生成位置编码
pe[pos, i] = math.sin(pos / (10000 ** ((2 * i) / d_model)))
# 对奇数维度使用 cos 函数,生成位置编码
pe[pos, i + 1] = math.cos(pos / (10000 ** ((2 * (i + 1)) / d_model)))
# 将 pe 矩阵的维度扩展一维,使其能够在后续进行 batch 处理
# pe 形状由 (max_seq_len, d_model) 变为 (1, max_seq_len, d_model)
pe = pe.unsqueeze(0)
# 使用 register_buffer 函数将 pe 注册为模型的常量参数,意味着在模型训练过程中不更新此变量的值
self.register_buffer('pe', pe)
# 前向传播函数,将位置编码应用到输入的嵌入向量中
def forward(self, x):
# x 是输入的词向量矩阵,大小为 (batch_size, seq_len, d_model)
# 对输入的词嵌入乘以 sqrt(d_model) 进行缩放,以保证位置编码对嵌入的影响不会过小
# 这种缩放的设计参考了原始的 Transformer 论文
x = x * math.sqrt(self.d_model)
# 获取输入序列的长度(即 x 在第二个维度上的大小,seq_len)
seq_len = x.size(1)
# 将前面注册好的位置编码矩阵添加到输入的词向量中
# self.pe[:, :seq_len] 取出与当前输入 x 相匹配的长度位置编码,大小为 (1, seq_len, d_model)
# 使用 Variable 包裹位置编码矩阵,并设置 requires_grad=False,防止反向传播时对其进行梯度计算
# 最后将位置编码加到输入的嵌入表示上,并将结果放到 GPU 上运行
x = x + Variable(self.pe[:, :seq_len], requires_grad=False).cuda()
# 返回带有位置编码的嵌入向量
return x
2.注意力层
自注意力(Self-Attention)机制通过三个线性变换,分别生成查询(Query)、键(Key)和值(Value),用于计算不同单词之间的关联权重。自注意力机制的计算公式为:
为了增强模型表达能力,Transformer通过多头注意力(Multi-head Attention)将不同的子空间信息组合起来。每个头会在不同的子空间中执行上述注意力操作,最后经过线性变换汇总,提升模型对不同上下文的捕捉能力。
import torch
import torch.nn as nn
import torch.nn.functional as F
import math
# 定义一个多头注意力机制的类 MultiHeadAttention,继承自 nn.Module
class MultiHeadAttention(nn.Module):
# 初始化多头注意力机制
def __init__(self, heads, d_model, dropout=0.1):
# 调用父类 nn.Module 的构造函数
super().__init__()
# d_model 是嵌入维度
self.d_model = d_model
# d_k 是每个注意力头的维度,等于 d_model 除以头的数量
self.d_k = d_model // heads
# 头的数量
self.h = heads
# 定义三个线性层,分别用于生成查询(Q)、键(K)和值(V)的表示
self.q_linear = nn.Linear(d_model, d_model)
self.v_linear = nn.Linear(d_model, d_model)
self.k_linear = nn.Linear(d_model, d_model)
# 定义 Dropout 层,防止过拟合
self.dropout = nn.Dropout(dropout)
# 最后的线性层,将多个注意力头的输出拼接后映射回 d_model 维度
self.out = nn.Linear(d_model, d_model)
# 定义计算注意力得分的函数
def attention(q, k, v, d_k, mask=None, dropout=None):
# 计算注意力得分,先计算 Q 和 K 的点积,然后除以 sqrt(d_k) 进行缩放
scores = torch.matmul(q, k.transpose(-2, -1)) / math.sqrt(d_k)
# 如果有掩码(mask),则应用掩码来遮盖不需要的部分
if mask is not None:
mask = mask.unsqueeze(1) # 扩展维度以匹配 scores 的形状
scores = scores.masked_fill(mask == 0, -1e9) # 将被掩盖的分数设置为极小值
# 对得分应用 softmax,得到注意力权重
scores = F.softmax(scores, dim=-1)
# 如果指定了 dropout,则对注意力权重应用 dropout
if dropout is not None:
scores = dropout(scores)
# 计算输出,将注意力权重应用于值(V)
output = torch.matmul(scores, v)
return output
# 前向传播函数
def forward(self, q, k, v, mask=None):
# 获取批次大小
bs = q.size(0)
# 将输入的 Q、K、V 进行线性变换并重塑为 (bs, seq_len, heads, d_k) 的形状
k = self.k_linear(k).view(bs, -1, self.h, self.d_k)
q = self.q_linear(q).view(bs, -1, self.h, self.d_k)
v = self.v_linear(v).view(bs, -1, self.h, self.d_k)
# 转置 Q、K 和 V,使得其形状为 (bs, heads, seq_len, d_k)
k = k.transpose(1, 2) # (bs, heads, seq_len, d_k)
q = q.transpose(1, 2) # (bs, heads, seq_len, d_k)
v = v.transpose(1, 2) # (bs, heads, seq_len, d_k)
# 计算注意力输出
scores = self.attention(q, k, v, self.d_k, mask, self.dropout)
# 将多个头的输出连接在一起,并恢复成 (bs, seq_len, d_model) 的形状
concat = scores.transpose(1, 2).contiguous().view(bs, -1, self.d_model)
# 将连接后的输出通过最后的线性层
output = self.out(concat)
# 返回最终的输出
return output
3.前馈层
前馈层进一步对注意力层的输出进行非线性变换。公式如下:
通常,前馈网络的隐层维度比自注意力层更大,有助于提升模型的表现。
import torch
import torch.nn as nn
import torch.nn.functional as F
# 定义一个前馈神经网络的类 FeedForward,继承自 nn.Module
class FeedForward(nn.Module):
# 初始化前馈神经网络
def __init__(self, d_model, d_ff=2048, dropout=0.1):
# 调用父类 nn.Module 的构造函数
super().__init__()
# d_ff 是前馈网络的隐藏层维度,默认为 2048
self.linear_1 = nn.Linear(d_model, d_ff) # 第一个线性层,从 d_model 维度映射到 d_ff 维度
self.dropout = nn.Dropout(dropout) # Dropout 层,用于防止过拟合
self.linear_2 = nn.Linear(d_ff, d_model) # 第二个线性层,将 d_ff 维度映射回 d_model 维度
# 前向传播函数
def forward(self, x):
# 通过第一个线性层进行映射,并应用 ReLU 激活函数,然后添加 Dropout
x = self.dropout(F.relu(self.linear_1(x)))
# 通过第二个线性层将结果映射回原始维度
x = self.linear_2(x)
return x # 返回最终输出
4.残差连接与层归一化
为了防止深层网络中的梯度消失问题,Transformer引入了残差连接,使得子层的输入直接与输出相加:
此外,层归一化(Layer Normalization)用于保持网络稳定性。归一化公式为:
import torch
import torch.nn as nn
# 定义层归一化的类 NormLayer,继承自 nn.Module
class NormLayer(nn.Module):
# 初始化层归一化
def __init__(self, d_model, eps=1e-6):
# 调用父类 nn.Module 的构造函数
super().__init__()
self.size = d_model # 保存输入特征的维度
# 层归一化的可学习参数,alpha 和 bias
self.alpha = nn.Parameter(torch.ones(self.size)) # 初始化为1的可学习参数
self.bias = nn.Parameter(torch.zeros(self.size)) # 初始化为0的可学习参数
self.eps = eps # 防止除零的一个小常数
# 前向传播函数
def forward(self, x):
# 计算层归一化
# norm = alpha * (x - x.mean()) / (x.std() + eps) + bias
norm = self.alpha * (x - x.mean(dim=-1, keepdim=True)) \
/ (x.std(dim=-1, keepdim=True) + self.eps) + self.bias
return norm # 返回归一化后的结果
5.编码器和解码器结构
编码器和解码器由多个Transformer块堆叠而成。编码器主要用于对源语言进行编码,生成上下文语义的抽象表示。解码器则需要通过自回归方式生成目标序列,解码器中的Masked Multi-Head Attention模块掩盖了未来时刻的词汇,防止模型直接看到未来词的影响。此外,解码器还引入了交叉注意力机制,通过结合编码器输出与解码器自身的输出,生成合理的翻译结果。
编码器参考代码:import torch
import torch.nn as nn
# 定义编码器层 EncoderLayer,继承自 nn.Module
class EncoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
# 调用父类 nn.Module 的构造函数
super().__init__()
# 初始化层归一化
self.norm_1 = Norm(d_model) # 第一层归一化
self.norm_2 = Norm(d_model) # 第二层归一化
# 初始化多头注意力机制
self.attn = MultiHeadAttention(heads, d_model, dropout=dropout)
# 初始化前馈神经网络
self.ff = FeedForward(d_model, dropout=dropout)
# 初始化 dropout 层
self.dropout_1 = nn.Dropout(dropout) # 第一个 dropout
self.dropout_2 = nn.Dropout(dropout) # 第二个 dropout
# 前向传播函数
def forward(self, x, mask):
# 第一次归一化
x2 = self.norm_1(x)
# 添加多头注意力的输出并应用 dropout
x = x + self.dropout_1(self.attn(x2, x2, x2, mask))
# 第二次归一化
x2 = self.norm_2(x)
# 添加前馈网络的输出并应用 dropout
x = x + self.dropout_2(self.ff(x2))
return x # 返回经过编码器层处理后的输出
# 定义编码器 Encoder,继承自 nn.Module
class Encoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
# 调用父类 nn.Module 的构造函数
super().__init__()
self.N = N # 编码器层的数量
# 初始化嵌入层
self.embed = Embedder(vocab_size, d_model)
# 初始化位置编码器
self.pe = PositionalEncoder(d_model, dropout=dropout)
# 克隆 EncoderLayer,以创建多个编码器层
self.layers = get_clones(EncoderLayer(d_model, heads, dropout), N)
# 初始化最终的层归一化
self.norm = Norm(d_model)
# 前向传播函数
def forward(self, src, mask):
# 通过嵌入层获取输入的嵌入表示
x = self.embed(src)
# 通过位置编码器增加位置编码
x = self.pe(x)
# 依次通过每个编码器层
for i in range(self.N):
x = self.layers[i](x, mask)
# 返回最终的层归一化输出
return self.norm(x)
解码器参考代码:
import torch
import torch.nn as nn
# 定义解码器层 DecoderLayer,继承自 nn.Module
class DecoderLayer(nn.Module):
def __init__(self, d_model, heads, dropout=0.1):
# 调用父类 nn.Module 的构造函数
super().__init__()
# 初始化三个层归一化模块
self.norm_1 = Norm(d_model) # 第一个归一化,用于自注意力机制
self.norm_2 = Norm(d_model) # 第二个归一化,用于交叉注意力机制
self.norm_3 = Norm(d_model) # 第三个归一化,用于前馈神经网络
# 初始化 dropout 层
self.dropout_1 = nn.Dropout(dropout) # 第一个 dropout
self.dropout_2 = nn.Dropout(dropout) # 第二个 dropout
self.dropout_3 = nn.Dropout(dropout) # 第三个 dropout
# 初始化多头自注意力机制
self.attn_1 = MultiHeadAttention(heads, d_model, dropout=dropout)
# 初始化多头交叉注意力机制
self.attn_2 = MultiHeadAttention(heads, d_model, dropout=dropout)
# 初始化前馈神经网络
self.ff = FeedForward(d_model, dropout=dropout)
# 前向传播函数
def forward(self, x, e_outputs, src_mask, trg_mask):
# 第一次归一化
x2 = self.norm_1(x)
# 添加自注意力的输出并应用 dropout
x = x + self.dropout_1(self.attn_1(x2, x2, x2, trg_mask))
# 第二次归一化
x2 = self.norm_2(x)
# 添加交叉注意力的输出并应用 dropout
x = x + self.dropout_2(self.attn_2(x2, e_outputs, e_outputs, src_mask))
# 第三次归一化
x2 = self.norm_3(x)
# 添加前馈网络的输出并应用 dropout
x = x + self.dropout_3(self.ff(x2))
return x # 返回经过解码器层处理后的输出
# 定义解码器 Decoder,继承自 nn.Module
class Decoder(nn.Module):
def __init__(self, vocab_size, d_model, N, heads, dropout):
# 调用父类 nn.Module 的构造函数
super().__init__()
self.N = N # 解码器层的数量
# 初始化嵌入层
self.embed = Embedder(vocab_size, d_model)
# 初始化位置编码器
self.pe = PositionalEncoder(d_model, dropout=dropout)
# 克隆 DecoderLayer,以创建多个解码器层
self.layers = get_clones(DecoderLayer(d_model, heads, dropout), N)
# 初始化最终的层归一化
self.norm = Norm(d_model)
# 前向传播函数
def forward(self, trg, e_outputs, src_mask, trg_mask):
# 通过嵌入层获取输入的嵌入表示
x = self.embed(trg)
# 通过位置编码器增加位置编码
x = self.pe(x)
# 依次通过每个解码器层
for i in range(self.N):
x = self.layers[i](x, e_outputs, src_mask, trg_mask)
# 返回最终的层归一化输出
return self.norm(x)
基于 Transformer
的编码器和解码器结构整体实现参考代码:
import torch
import torch.nn as nn
# 定义 Transformer 模型,继承自 nn.Module
class Transformer(nn.Module):
def __init__(self, src_vocab, trg_vocab, d_model, N, heads, dropout):
# 调用父类 nn.Module 的构造函数
super().__init__()
# 初始化编码器,使用源语言词汇大小、模型维度、层数、头数和 dropout
self.encoder = Encoder(src_vocab, d_model, N, heads, dropout)
# 初始化解码器,使用目标语言词汇大小、模型维度、层数、头数和 dropout
self.decoder = Decoder(trg_vocab, d_model, N, heads, dropout)
# 最终线性层,用于将解码器的输出转换为目标语言的词汇
self.out = nn.Linear(d_model, trg_vocab)
# 前向传播函数
def forward(self, src, trg, src_mask, trg_mask):
# 通过编码器获取编码器输出
e_outputs = self.encoder(src, src_mask)
# 通过解码器获取解码器输出,使用编码器输出和目标输入
d_output = self.decoder(trg, e_outputs, src_mask, trg_mask)
# 通过线性层获取最终输出
output = self.out(d_output)
return output # 返回模型输出
模型训练和测试:
import time
import torch
import torch.nn.functional as F
from torch.autograd import Variable
# 模型参数设置
d_model = 512 # 模型的维度
heads = 8 # 注意力头的数量
N = 6 # 编码器和解码器层的数量
src_vocab = len(EN_TEXT.vocab) # 源语言的词汇大小
trg_vocab = len(FR_TEXT.vocab) # 目标语言的词汇大小
# 初始化 Transformer 模型
model = Transformer(src_vocab, trg_vocab, d_model, N, heads)
# 对模型参数进行 Xavier 均匀初始化
for p in model.parameters():
if p.dim() > 1: # 仅对权重矩阵进行初始化
nn.init.xavier_uniform_(p)
# 设置优化器为 Adam,学习率为 0.0001,使用 beta 和 epsilon 参数
optim = torch.optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)
# 模型训练函数
def train_model(epochs, print_every=100):
model.train() # 设置模型为训练模式
start = time.time() # 记录开始时间
temp = start # 临时变量,用于记录上次打印时间
total_loss = 0 # 初始化总损失
for epoch in range(epochs):
for i, batch in enumerate(train_iter):
src = batch.English.transpose(0, 1) # 转置源语言数据
trg = batch.French.transpose(0, 1) # 转置目标语言数据
# 目标输入:去掉最后一个词,作为解码器的输入
trg_input = trg[:, :-1]
# 需要预测的目标词:去掉第一个词,保留其余部分
targets = trg[:, 1:].contiguous().view(-1)
# 创建源和目标的掩码
src_mask, trg_mask = create_masks(src, trg_input)
# 通过模型进行前向传播
preds = model(src, trg_input, src_mask, trg_mask)
optim.zero_grad() # 清空优化器中的梯度
# 计算交叉熵损失,忽略目标中填充的部分
loss = F.cross_entropy(preds.view(-1, preds.size(-1)), targets, ignore_index=target_pad)
loss.backward() # 反向传播计算梯度
optim.step() # 更新模型参数
total_loss += loss.item() # 累加总损失
# 每隔一定的迭代次数打印一次信息
if (i + 1) % print_every == 0:
loss_avg = total_loss / print_every # 计算平均损失
print("time = %dm, epoch %d, iter = %d, loss = %.3f, %ds per %d iters" % (
(time.time() - start) // 60,
epoch + 1, i + 1, loss_avg, time.time() - temp,
print_every))
total_loss = 0 # 重置总损失
temp = time.time() # 更新临时时间
# 模型测试函数,用于翻译源句子
def translate(model, src, max_len=80, custom_string=False):
model.eval() # 设置模型为评估模式
if custom_string: # 如果是自定义句子
src = tokenize_en(src) # 对源句子进行分词
sentence = Variable(torch.LongTensor([[EN_TEXT.vocab.stoi[tok] for tok in sentence]])).cuda() # 转换为张量并移动到 GPU
src_mask = (src != input_pad).unsqueeze(-2) # 创建源掩码
# 通过编码器获取编码器输出
e_outputs = model.encoder(src, src_mask)
outputs = torch.zeros(max_len).type_as(src.data) # 初始化输出张量
outputs[0] = torch.LongTensor([FR_TEXT.vocab.stoi['<sos>']]) # 设置开始标记
for i in range(1, max_len):
# 创建目标掩码,防止未来信息的泄露
trg_mask = np.triu(np.ones((1, i, i), k=1).astype('uint8'))
trg_mask = Variable(torch.from_numpy(trg_mask) == 0).cuda() # 转换为张量并移动到 GPU
# 通过解码器获取输出
out = model.out(model.decoder(outputs[:i].unsqueeze(0), e_outputs, src_mask, trg_mask))
out = F.softmax(out, dim=-1) # 对输出进行 softmax
val, ix = out[:, -1].data.topk(1) # 选择概率最大的词
outputs[i] = ix[0][0] # 将选中的词添加到输出中
# 如果选中结束标记,则停止翻译
if ix[0][0] == FR_TEXT.vocab.stoi['<eos>']:
break
# 返回生成的翻译结果
return ' '.join([FR_TEXT.vocab.itos[ix] for ix in outputs[:i]])
标签:Transformer,nn,dropout,mask,self,归一化,model,模型
From: https://blog.csdn.net/qq_45998729/article/details/143203853