Transformer架构
Transformer是一种用于自然语言处理(NLP)和其他序列到序列(sequence-to-sequence)任务的深度学习模型架构,它在2017年由Vaswani等人首次提出。Transformer架构引入了自注意力机制(self-attention mechanism),这是一个关键的创新,使其在处理序列数据时表现出色。
以下是Transformer的一些重要组成部分和特点:
- 自注意力机制(Self-Attention):这是Transformer的核心概念之一,它使模型能够同时考虑输入序列中的所有位置,而不是像循环神经网络(RNN)或卷积神经网络(CNN)一样逐步处理。自注意力机制允许模型根据输入序列中的不同部分来赋予不同的注意权重,从而更好地捕捉语义关系。
- 多头注意力(Multi-Head Attention):Transformer中的自注意力机制被扩展为多个注意力头,每个头可以学习不同的注意权重,以更好地捕捉不同类型的关系。多头注意力允许模型并行处理不同的信息子空间。
- 堆叠层(Stacked Layers):Transformer通常由多个相同的编码器和解码器层堆叠而成。这些堆叠的层有助于模型学习复杂的特征表示和语义。
- 位置编码(Positional Encoding):由于Transformer没有内置的序列位置信息,它需要额外的位置编码来表达输入序列中单词的位置顺序。
- 残差连接和层归一化(Residual Connections and Layer Normalization):这些技术有助于减轻训练过程中的梯度消失和爆炸问题,使模型更容易训练。
- 编码器和解码器:Transformer通常包括一个编码器用于处理输入序列和一个解码器用于生成输出序列,这使其适用于序列到序列的任务,如机器翻译。
这里重点说明一下自注意力机制:
自注意力的作用:随着模型处理输入序列的每个单词,自注意力会关注整个输入序列的所有单词,帮助模型对本单词更好地进行编码。在处理过程中,自注意力机制会将对所有相关单词的理解融入到我们正在处理的单词中。更具体的功能如下:
- 序列建模:自注意力可以用于序列数据(例如文本、时间序列、音频等)的建模。它可以捕捉序列中不同位置的依赖关系,从而更好地理解上下文。这对于机器翻译、文本生成、情感分析等任务非常有用。
- 并行计算:自注意力可以并行计算,这意味着可以有效地在现代硬件上进行加速。相比于RNN和CNN等序列模型,它更容易在GPU和TPU等硬件上进行高效的训练和推理。(因为在自注意力中可以并行的计算得分)
- 长距离依赖捕捉:传统的循环神经网络(RNN)在处理长序列时可能面临梯度消失或梯度爆炸的问题。自注意力可以更好地处理长距离依赖关系,因为它不需要按顺序处理输入序列。
下图指出了自注意力的基本结构:
具体来说,自注意力机制有三个关键步骤:
- 计算相关性: 对于输入序列中的每个元素,通过计算它与其他所有元素的相关性得分(注意力分数)。这些分数决定了每个元素对其他元素的关注程度,即哪些元素对当前元素更重要。
- 加权求和: 使用相关性分数来加权计算一个加权和,这个加权和可以看作是对其他所有元素的信息聚合,而权重由相关性分数决定。
- 转换和输出: 将加权和通过线性变换和激活函数(比如ReLU)进行转换,生成最终的输出表示。这个输出不仅包含了元素本身的信息,还融合了来自所有其他元素的信息。
通过这种自注意力机制,Transformer能够在输入序列中建立复杂的依赖关系,无论是长距离的依赖还是短距离的依赖,都可以被有效地捕捉和利用。这种能力使得Transformer在处理自然语言处理任务(如机器翻译、文本生成)时表现出色。
下面我们来举一个例子:
假设我们有这样一句话:"The cat sat on the mat."
- 输入表示: 首先,将这句话分割成词(tokens)或者子词(subword units),比如:"The", "cat", "sat", "on", "the", "mat", "."
- 词嵌入(Word Embeddings): 每个词被映射到一个高维空间的向量表示,这些向量包含了词的语义信息。
- 位置编码(Positional Encoding): 为了区分句子中不同位置的词,Transformer引入了位置编码,它将位置信息加入到词的向量表示中。
- 自注意力机制:
- 计算相关性: 对于每个词,计算它与所有其他词的相关性分数,这些分数由点积计算得出。
- 加权求和: 使用相关性分数来加权所有词的表示,生成每个词的新表示。这个过程允许每个词聚焦于句子中其他词的重要性,而不受它们在句子中位置的限制。
- 前馈神经网络(Feedforward Network): 对于每个词的新表示,通过一个前馈神经网络进行转换和非线性变换。
- 层归一化(Layer Normalization)和残差连接(Residual Connection): 在每个子层的处理过程中,通过层归一化来稳定训练,同时保留残差连接以避免信息丢失。
- 输出层: 最后,将经过多层Transformer编码器处理后的表示送入输出层,进行具体任务的预测或生成。
因此,在解决本题的时候,我们使用Transformer的Encoder部分,编码我们的SMILES表达式。然后,再将等到的向量z通过一个线性层,输出一个值。我们期望通过模型的学习,这个输出的值就是该化学反应的产率。这样就实现了通过Encoder将SMILES表达式与产率结合起来。
下面是具体代码:
!pip install pandas
import pandas as pd
from torch.utils.data import Dataset, DataLoader, Subset
from typing import List, Tuple
import re
import torch
import torch.nn as nn
import time
import torch.optim as optim
# 定义SMILES字符串的分词器类
class Smiles_tokenizer():
def __init__(self, pad_token, regex, vocab_file, max_length):
# 初始化分词器所需的参数
self.pad_token = pad_token # 用于填充的token
self.regex = regex # 用于匹配SMILES字符串中有效部分的正则表达式
self.vocab_file = vocab_file # 词汇表文件的路径
self.max_length = max_length # 序列的最大长度
# 加载词汇表并构建词汇表字典
with open(self.vocab_file, "r") as f:
lines = f.readlines()
lines = [line.strip("\n") for line in lines] # 去除每行末尾的换行符
vocab_dic = {}
for index, token in enumerate(lines):
vocab_dic[token] = index # 词汇表中的token映射为其索引
self.vocab_dic = vocab_dic
def _regex_match(self, smiles):
# 使用正则表达式匹配SMILES字符串,将每个字符或匹配项视为一个token
regex_string = r"(" + self.regex + r"|" # 构造正则表达式,首先包含自定义的正则表达式部分
regex_string += r".)" # 如果不匹配自定义部分,则匹配任意单个字符
prog = re.compile(regex_string) # 编译正则表达式
tokenised = []
for smi in smiles: # 这里假设smiles是单个字符串的迭代(实际上可能应为字符迭代)
tokens = prog.findall(smi) # 使用正则表达式查找所有匹配项
if len(tokens) > self.max_length:
tokens = tokens[:self.max_length] # 如果匹配项过多,则截断
tokenised.append(tokens) # 将匹配项列表添加到结果列表中
return tokenised
def tokenize(self, smiles):
# 对SMILES字符串进行分词处理
tokens = self._regex_match(smiles) # 使用正则表达式进行分词
# 为每个分词后的序列添加开始和结束token
tokens = [["<CLS>"] + token + ["<SEP>"] for token in tokens]
tokens = self._pad_seqs(tokens, self.pad_token) # 对序列进行填充
token_idx = self._pad_token_to_idx(tokens)
return tokens, token_idx # 返回分词后的序列和对应的索引序列(但后者需要实现_pad_token_to_idx方法)
def _pad_seqs(self, seqs, pad_token):
# 对序列进行填充,使得所有序列长度相同
pad_length = max([len(seq) for seq in seqs]) # 计算最长序列的长度
padded = [seq + ([pad_token] * (pad_length - len(seq))) for seq in seqs] # 填充短序列
return padded
def _pad_token_to_idx(self, tokens):
"""
将分词后的tokens转换为对应的索引列表,并更新词汇表以包含新发现的词汇。
参数:
tokens (list of lists): 列表的列表,其中每个内部列表包含了一个SMILES字符串分词后的token。
过程:
1. 遍历每个SMILES字符串的分词结果。
2. 对于每个token,检查它是否已在词汇表中。
- 如果在,则获取其索引。
- 如果不在,则将其添加到词汇表中,并为其分配一个新的索引。
3. 将每个SMILES字符串的token索引列表收集起来。
4. 将新发现的词汇写入到文件"../new_vocab_list.txt"中。
返回值:
idx_list (list of lists): 分词后的SMILES字符串对应的索引列表的列表。
"""
idx_list = []
new_vocab = []
for token_list in tokens: # 更清晰的变量名,表示每个SMILES字符串的分词结果
tokens_idx = []
for token in token_list: # 遍历当前SMILES字符串的每个token
if token in self.vocab_dic.keys():
tokens_idx.append(self.vocab_dic[token])
else:
new_vocab.append(token)
self.vocab_dic[token] = max(self.vocab_dic.values()) + 1
tokens_idx.append(self.vocab_dic[token])
idx_list.append(tokens_idx)
with open("../new_vocab_list.txt", "a") as f:
for vocab in new_vocab: # 更清晰的变量名
f.write(vocab)
f.write("\n")
return idx_list
def _save_vocab(self, vocab_path):
"""
将当前词汇表保存到指定的文件路径。
参数:
vocab_path (str): 词汇表将要保存的文件路径。
过程:
1. 打开(或创建)指定路径的文件。
2. 遍历词汇表的键(即词汇),并将它们写入到文件中,每个词汇后紧跟一个换行符。
3. 打印一条消息,表明新的词汇表已经被更新并保存。
"""
with open(vocab_path, "w") as f:
for vocab in self.vocab_dic.keys(): # 更清晰的变量名
f.write(vocab)
f.write("\n")
print("update new vocab!")
def read_data(file_path, train=True):
"""
从指定文件路径读取数据,并准备用于训练或预测的输入和输出数据。
参数:
file_path (str): 数据文件的路径。
train (bool): 指示是否读取训练数据。如果是训练数据,则包含Yield列;否则,Yield列将被填充为0。
过程:
1. 使用pandas的read_csv函数读取CSV文件到DataFrame。
2. 从DataFrame中提取Reactant1, Reactant2, Product, Additive, Solvent列的数据,并转换为列表。
3. 根据train参数决定是否读取Yield列。如果是训练模式,则读取Yield;否则,为Reactant1的数量生成相同长度的0列表。
4. 遍历Reactant1, Reactant2, Product, Additive, Solvent的列表(使用zip函数同时迭代),将Reactant1和Reactant2用'.'连接,然后与Product用'>>'连接,构建输入信息。
5. 将构建好的输入信息和对应的Yield(或0)组合成元组列表,作为输出数据。
返回值:
output (list of tuples): 包含输入信息和对应Yield(或0)的元组列表。
"""
df = pd.read_csv(file_path)
reactant1 = df["Reactant1"].tolist()
reactant2 = df["Reactant2"].tolist()
product = df["Product"].tolist()
additive = df["Additive"].tolist()
solvent = df["Solvent"].tolist()
if train:
react_yield = df["Yield"].tolist()
else:
react_yield = [0 for i in range(len(reactant1))]
input_data_list = []
for react1, react2, prod, addi, sol in zip(reactant1, reactant2, product, additive, solvent):
# 只将reactant1, reactant2和product拼接到一起,忽略了additive和solvent
input_info = ".".join([react1, react2])
input_info = ">".join([input_info, prod])
input_data_list.append(input_info)
output = [(react, y) for react, y in zip(input_data_list, react_yield)]
return output
# 定义数据集类,用于处理化学反应数据集
class ReactionDataset(Dataset):
def __init__(self, data: List[Tuple[List[str], float]]):
"""
初始化数据集类。
参数:
data (List[Tuple[List[str], float]]): 包含数据集的列表,每个元素是一个元组,元组的第一个元素是SMILES字符串的列表(已处理为字符串),
第二个元素是该反应对应的产率(float类型)。
"""
self.data = data # 存储数据集
def __len__(self):
"""
返回数据集中的样本数量。
返回:
int: 数据集中的样本总数。
"""
return len(self.data)
def __getitem__(self, idx):
"""
根据索引获取数据集中的单个样本。
参数:
idx (int): 样本的索引。
返回:
Tuple[List[str], float]: 一个元组,包含SMILES字符串的列表和对应的产率。
"""
return self.data[idx]
# 定义自定义的collate_fn函数,用于在DataLoader中处理批次数据
def collate_fn(batch):
"""
将一批数据转换为模型训练所需的格式。
参数:
batch (List[Tuple[List[str], float]]): 一批数据,每个元素都是一个包含SMILES字符串列表和产率的元组。
返回:
Tuple[torch.Tensor, torch.Tensor]: 处理后的SMILES字符串的Tensor和产率的Tensor。
"""
REGEX = r"\[[^\]]+]|Br?|Cl?|N|O|S|P|F|I|b|c|n|o|s|p|\(|\)|\.|=|#|-|\+|\\\\|\/|:|~|@|\?|>|\*|\$|\%[0-9]{2}|[0-9]"
# 初始化SMILES字符串的tokenizer,这里假设Smiles_tokenizer是一个自定义的tokenizer类
tokenizer = Smiles_tokenizer("<PAD>", REGEX, "../vocab_full.txt", 300)
smi_list = [] # 存储SMILES字符串的列表
yield_list = [] # 存储产率的列表
# 遍历批次中的每个样本
for i in batch:
smi_list.append(i[0]) # 添加SMILES字符串列表
yield_list.append(i[1]) # 添加产率
# 使用tokenizer将SMILES字符串列表转换为Tensor,注意这里tokenizer.tokenize可能返回多个Tensor,这里只取第一个[1](假设是处理后的SMILES)
tokenizer_batch = torch.tensor(tokenizer.tokenize(smi_list)[1])
# 将产率列表转换为Tensor
yield_list = torch.tensor(yield_list)
# 返回处理后的SMILES Tensor和产率Tensor
return tokenizer_batch, yield_list
# 模型定义
'''
该模型直接采用了一个Transformer编码器结构,用于处理输入数据并输出预测结果。
'''
class TransformerEncoderModel(nn.Module):
def __init__(self, input_dim, d_model, num_heads, fnn_dim, num_layers, dropout):
"""
初始化Transformer编码器模型。
参数:
input_dim (int): 输入的词汇表大小。
d_model (int): Transformer模型中的特征维度。
num_heads (int): 多头注意力机制中的头数。
fnn_dim (int): 前馈神经网络中的维度。
num_layers (int): Transformer编码器中的层数。
dropout (float): Dropout比率,用于防止过拟合。
"""
super().__init__()
# 嵌入层,将输入的索引转换为d_model维的嵌入向量
self.embedding = nn.Embedding(input_dim, d_model)
# 层归一化层,用于稳定训练过程
self.layerNorm = nn.LayerNorm(d_model)
# Transformer编码器层,包含多头注意力机制和前馈神经网络
self.encoder_layer = nn.TransformerEncoderLayer(d_model=d_model,
nhead=num_heads,
dim_feedforward=fnn_dim,
dropout=dropout,
batch_first=True, # 输入的batch维度是第一维
norm_first=True) # 在多头注意力和前馈神经网络之前进行层归一化
# Transformer编码器,由多个编码器层堆叠而成
self.transformer_encoder = nn.TransformerEncoder(self.encoder_layer,
num_layers=num_layers,
norm=self.layerNorm) # 注意:这里的norm参数在PyTorch的较新版本中可能不被支持,应直接使用layerNorm层在模型的其他部分
# Dropout层,用于减少过拟合
self.dropout = nn.Dropout(dropout)
# 线性层组合,用于将Transformer的输出转换为最终的预测结果
self.lc = nn.Sequential(nn.Linear(d_model, 256), # 第一层线性变换
nn.Sigmoid(), # Sigmoid激活函数
nn.Linear(256, 96), # 第二层线性变换
nn.Sigmoid(), # Sigmoid激活函数
nn.Linear(96, 1)) # 输出层,输出预测结果
def forward(self, src):
"""
模型的前向传播过程。
参数:
src (Tensor): 输入的源数据,形状为[batch_size, src_len]。
返回:
Tensor: 模型的预测结果,形状为[batch_size]。
"""
# 将输入通过嵌入层得到嵌入向量,并通过Dropout层
embedded = self.dropout(self.embedding(src))
# 将嵌入向量传递给Transformer编码器
outputs = self.transformer_encoder(embedded)
# 选择Transformer编码器输出的第一个时间步的隐藏状态作为后续处理的输入
# 这里假设我们只关心序列的第一个元素(或者整个序列的某种聚合表示)
z = outputs[:,0,:]
# 通过线性层组合进行预测
outputs = self.lc(z)
# 去除最后一个维度(如果它是1的话),通常是因为输出层设计为输出单个值但保持了额外的维度
return outputs.squeeze(-1)
# 训练函数
def train():
## 超参数设置
N = 10 # 使用的样本数量,这里直接设置为10,或者可以根据数据集大小动态调整,如 int(len(dataset) * 0.1)
INPUT_DIM = 292 # 输入数据的维度,即源序列的长度
D_MODEL = 512 # Transformer模型中编码器和解码器嵌入的维度
NUM_HEADS = 4 # 多头注意力机制中的头数
FNN_DIM = 1024 # 前馈网络中的维度
NUM_LAYERS = 4 # Transformer模型中编码器和解码器的层数
DROPOUT = 0.2 # Dropout比例,用于防止过拟合
CLIP = 1 # 梯度裁剪的阈值,防止梯度爆炸
N_EPOCHS = 40 # 训练的总周期数
LR = 1e-4 # 初始学习率
# 开始计时
start_time = time.time()
# 设备选择,优先使用GPU,如果不可用则使用CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# device = 'cpu' # 注释掉的代码,表示强制使用CPU
# 读取数据
data = read_data("../dataset/round1_train_data.csv") # 假设这是一个自定义的读取数据的函数
dataset = ReactionDataset(data) # 假设这是一个自定义的数据集类
# 实际上这里只使用了前N个样本进行训练,但train_loader却是对整个dataset进行采样
# 这里可能存在逻辑上的不一致,应该是想对subset_dataset使用DataLoader
subset_indices = list(range(N))
subset_dataset = Subset(dataset, subset_indices) # 创建一个只包含前N个样本的数据集
# 注意:下面的train_loader实际上是对整个dataset进行操作的,而不是subset_dataset
train_loader = DataLoader(dataset, batch_size=128, shuffle=True, collate_fn=collate_fn)
# 初始化模型
model = TransformerEncoderModel(INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT)
model = model.to(device) # 将模型移动到选定的设备上
model.train() # 设置模型为训练模式
# 初始化优化器和学习率调度器
optimizer = optim.AdamW(model.parameters(), lr=LR, weight_decay=0.01)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=10)
# 注意:这里的scheduler使用方式可能需要调整,因为它依赖于验证损失来触发学习率调整
criterion = nn.MSELoss() # 使用均方误差作为损失函数
best_valid_loss = 10 # 假设的初始最佳验证损失,用于比较
# 训练循环
for epoch in range(N_EPOCHS):
epoch_loss = 0 # 初始化当前周期的损失总和
# adjust_learning_rate(optimizer, epoch, LR) # 动态调整学习率的代码被注释掉了
for i, (src, y) in enumerate(train_loader):
# 将数据和标签移动到选定的设备上
src, y = src.to(device), y.to(device)
# 梯度清零
optimizer.zero_grad()
# 前向传播
output = model(src)
# 计算损失
loss = criterion(output, y)
# 反向传播
loss.backward()
# 梯度裁剪
torch.nn.utils.clip_grad_norm_(model.parameters(), CLIP)
# 更新参数
optimizer.step()
# 累加损失
epoch_loss += loss.detach().item()
# 每50步打印一次训练损失
if i % 50 == 0:
print(f'Step: {i} | Train Loss: {epoch_loss:.4f}')
# 注意:这里之前缺少了对scheduler的loss_in_a_epoch的传递
# 但由于我们是在训练循环中,通常使用训练损失来更新scheduler(尽管这取决于scheduler的具体类型)
# 这里我们简单地使用当前周期的平均训练损失
scheduler.step(loss_in_a_epoch) # 如果scheduler需要验证损失,这里需要调整
loss_in_a_epoch = epoch_loss / len(train_loader) # 计算当前周期的平均训练损失
print(f'Epoch: {epoch+1:02} | Train Loss: {loss_in_a_epoch:.3f}')
# 如果当前周期的平均训练损失小于之前记录的最佳验证损失(尽管这里实际上没有验证集)
# 我们假设这里是一个简化的例子,直接用训练损失作为比较基准
if loss_in_a_epoch < best_valid_loss:
best_valid_loss = loss_in_a_epoch
# 保存模型状态字典到文件
torch.save(model.state_dict(), '../model/transformer.pth')
# 结束计时
end_time = time.time()
# 计算总运行时间(分钟)
elapsed_time_minute = (end_time - start_time) / 60
# 打印总运行时间
print(f"Total running time: {elapsed_time_minute:.2f} minutes")
# 如果这段代码是作为脚本直接运行的
if __name__ == '__main__':
train()
# 生成结果文件
def predicit_and_make_submit_file(model_file, output_file):
# 定义模型参数
INPUT_DIM = 292 # 输入序列的长度
D_MODEL = 512 # 模型的嵌入维度
NUM_HEADS = 4 # 自注意力机制中的头数
FNN_DIM = 1024 # 前馈网络中的维度
NUM_LAYERS = 4 # 编码器的层数
DROPOUT = 0.2 # Dropout比例
# 选择设备:如果可用则使用GPU,否则使用CPU
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 读取测试数据
test_data = read_data("../dataset/round1_test_data.csv", train=False)
# 创建测试数据集
test_dataset = ReactionDataset(test_data)
# 创建测试数据加载器
test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False, collate_fn=collate_fn)
# 初始化模型并移动到选定的设备上
model = TransformerEncoderModel(INPUT_DIM, D_MODEL, NUM_HEADS, FNN_DIM, NUM_LAYERS, DROPOUT).to(device)
# 加载之前保存的最佳模型状态
model.load_state_dict(torch.load(model_file))
# 将模型设置为评估模式
model.eval()
# 初始化一个列表来存储预测结果
output_list = []
# 遍历测试数据加载器
for i, (src, y) in enumerate(test_loader):
# 将输入数据移动到选定的设备上
src = src.to(device)
# 关闭梯度计算,因为我们不需要在预测时更新模型参数
with torch.no_grad():
# 执行前向传播以获取预测输出
output = model(src)
# 将预测输出从张量转换为列表,并添加到输出列表中
output_list += output.detach().tolist()
# 初始化一个列表来构建最终的提交字符串
ans_str_lst = ['rxnid,Yield'] # 提交文件的第一行,包含列名
# 将预测结果转换为字符串格式,并添加到提交字符串列表中
for idx, y in enumerate(output_list):
# 假设每个预测对应一个测试样本,并格式化为'test{idx+1},{y:.4f}'
ans_str_lst.append(f'test{idx+1},{y:.4f}')
# 打开输出文件,并将提交字符串列表写入文件
with open(output_file, 'w') as fw:
fw.writelines('\n'.join(ans_str_lst))
# 调用函数,传入模型文件路径和输出文件路径
predicit_and_make_submit_file("../model/transformer.pth",
"../output/result.txt")
标签:vocab,Transformer,Task3,AI,self,list,Datawhale,token,model
From: https://www.cnblogs.com/qaz961501/p/18341316