import math import torch from torch import nn import matplotlib.pyplot as plt from d2l import torch as d2l def sequence_mask(X, valid_len, value=0): """在序列中屏蔽不相关的项""" max_len = X.size(1) mask = torch.arange((max_len), dtype=torch.float32, device=X.device)[None, :] < valid_len[:, None] X[~mask] = value return X def masked_softmax(x, valid_lens): if valid_lens is None: return nn.functional.softmax(x, dim=-1) else: shape = x.shape if valid_lens.dim() == 1: valid_lens = torch.repeat_interleave(valid_lens, shape[1]) else: valid_lens = valid_lens.reshape(-1) # 最后一轴上被掩蔽的元素使用一个非常大的负值替换,从而其softmax输出为0 x = sequence_mask(x.reshape(-1, shape[-1]), valid_lens, value=-1e6) return nn.functional.softmax(x.reshape(shape), dim=-1) x = torch.ones(2, 3, 4) print(masked_softmax(torch.rand(2, 2, 4), torch.tensor([2, 3]))) # 加性注意力: class AdditiveAttention(nn.Module): """加性注意力""" def __init__(self, key_size, query_size, num_hidden, dropout, **kwargs): super(AdditiveAttention, self).__init__(**kwargs) self.w_k = nn.Linear(key_size, num_hidden, bias=False) self.w_q = nn.Linear(query_size, num_hidden, bias=False) self.w_v = nn.Linear(num_hidden, 1, bias=False) self.dropout = nn.Dropout(dropout) def forward(self, queries, keys, values, valid_lens): queries, keys = self.w_q(queries), self.w_k(keys) features = queries.unsqueeze(2) + keys.unsqueeze(1) features = torch.tanh(features) scores = self.w_v(features).squeeze(-1) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values) queries, keys = torch.normal(0, 1, (2, 1, 20)), torch.ones((2, 10, 2)) print("queries:") print(queries) print("keys:") print(keys) values = torch.arange(40, dtype=torch.float32).reshape(1, 10, 4).repeat(2, 1, 1) print("values:") print(values) valid_lens = torch.tensor([2, 6]) attention = AdditiveAttention(key_size=2, query_size=20, num_hidden=8, dropout=0.1) attention.eval() print(attention(queries, keys, values, valid_lens)) d2l.show_heatmaps(attention.attention_weights.reshape((1, 1, 2, 10)), xlabel='Keys', ylabel='Queries') plt.show() # 点积模型: class DotProductAttention(nn.Module): def __init__(self, dropout, **kwargs): super(DotProductAttention, self).__init__(**kwargs) self.dropout = nn.Dropout(dropout) # queries的形状:(batch_size,查询的个数,d) # keys的形状:(batch_size,“键-值”对的个数,d) # values的形状:(batch_size,“键-值”对的个数,值的维度) # valid_lens的形状:(batch_size,)或者(batch_size,查询的个数) def forward(self, queries, keys, values, valid_lens=None): d = queries.shape[-1] scores = torch.bmm(queries, keys.transpose(1, 2) / math.sqrt(d)) self.attention_weights = masked_softmax(scores, valid_lens) return torch.bmm(self.dropout(self.attention_weights), values) queries = torch.normal(0, 1, (2, 1, 2)) attention = DotProductAttention(dropout=0.5) attention.eval() print(attention(queries, keys, values, valid_lens)) d2l.show_heatmaps(attention.attention_weights.reshape((1, 1, 2, 10)), xlabel='Keys', ylabel='Queries') plt.show() import torch from torch import nn from d2l import torch as d2l import matplotlib.pyplot as plt # @save class AttentionDecoder(d2l.Decoder): """带有注意力机制解码器的基本接口""" def __init__(self, **kwargs): super(AttentionDecoder, self).__init__(**kwargs) @property def attention_weights(self): raise NotImplementedError class Seq2SeqAttentionDecoder(AttentionDecoder): def __init__(self, vocab_size, embed_size, num_hiddens, num_layers, dropout=0, **kwargs): super(Seq2SeqAttentionDecoder, self).__init__(**kwargs) self.attention = d2l.AdditiveAttention(num_hiddens, num_hiddens, num_hiddens, dropout) self.embedding = nn.Embedding(vocab_size, embed_size) self.rnn = nn.GRU(embed_size + num_hiddens, num_hiddens, num_layers, dropout=dropout) self.dense = nn.Linear(num_hiddens, vocab_size) def init_state(self, enc_outputs, enc_valid_lens, *args): # outputs的形状为(batch_size,num_steps,num_hidden). # hidden_state的形状为(num_layers,batch_size,num_hidden) outputs, hidden_state = enc_outputs return (outputs.permute(1, 0, 2), hidden_state, enc_valid_lens) def forward(self, x, state): # enc_outputs的形状为(batch_size,num_steps,num_hidden). # hidden_state的形状为(num_layers,batch_size, # num_hidden) enc_outputs, hidden_state, enc_valid_lens = state # 输出X的形状为(num_steps,batch_size,embed_size) x = self.embedding(x).permute(1, 0, 2) outputs, self._attention_weights = [], [] for x_ in x: query = torch.unsqueeze(hidden_state[-1], dim=1) # query的形状为(batch_size,1,num_hidden) context = self.attention(query, enc_outputs, enc_outputs, enc_valid_lens) # context的形状为(batch_size,1,num_hidden) x_ = torch.cat((context, torch.unsqueeze(x_, dim=1)), dim=-1) # 将x变形为(1,batch_size,embed_size+num_hidden) out, hidden_state = self.rnn(x_.permute(1, 0, 2), hidden_state) outputs.append(out) self._attention_weights.append(self.attention.attention_weights) # 全连接层变换后,outputs的形状为 # (num_steps,batch_size,vocab_size) outputs = self.dense(torch.cat(outputs, dim=0)) return outputs.permute(1, 0, 2), [enc_outputs, hidden_state, enc_valid_lens] @property def attention_weights(self): return self._attention_weights encoder = d2l.Seq2SeqEncoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2) encoder.eval() decoder = Seq2SeqAttentionDecoder(vocab_size=10, embed_size=8, num_hiddens=16, num_layers=2) decoder.eval() x = torch.zeros((4, 7), dtype=torch.long) state = decoder.init_state(encoder(x), None) output, state = decoder(x, state) print(output.shape, len(state), state[0].shape, len(state[1]), state[1][0].shape) embed_size, num_hiddens, num_layers, dropout = 32, 32, 2, 0.1 batch_size, num_steps = 64, 10 lr, num_epochs, device = 0.005, 250, d2l.try_gpu() tion_weight_seq = d2l.predict_seq2seq( net, eng, src_vocab # train_iter, src_vocab, tgt_vocab = d2l.load_data_nmt(batch_size, num_steps) # encoder = d2l.Seq2SeqEncoder( # len(src_vocab), embed_size, num_hiddens, num_layers, dropout) # decoder = Seq2SeqAttentionDecoder( # len(tgt_vocab), embed_size, num_hiddens, num_layers, dropout) # net = d2l.EncoderDecoder(encoder, decoder) # print(d2l.train_seq2seq(net, train_iter, lr, num_epochs, tgt_vocab, device)) # # plt.show() # engs = ['go .', "i lost .", 'he\'s calm .', 'i\'m home .'] # fras = ['va !', 'j\'ai perdu .', 'il est calme .', 'je suis chez moi .'] # for eng, fra in zip(engs, fras): # translation, dec_atten, tgt_vocab, num_steps, device, True) print(f'{eng} => {translation}, ', f'bleu {d2l.bleu(translation, fra, k=2):.3f}') Transformer 多头注意力 import math import torch from torch import nn from d2l import torch as d2l # @save def transpose_qkv(X, num_heads): """为了多注意力头的并行计算而变换形状""" # 输入X的形状:(batch_size,查询或者“键-值”对的个数,num_hiddens) # 输出X的形状:(batch_size,查询或者“键-值”对的个数,num_heads, # num_hiddens/num_heads) X = X.reshape(X.shape[0], X.shape[1], num_heads, -1) # 输出X的形状:(batch_size,num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) X = X.permute(0, 2, 1, 3) # 最终输出的形状:(batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) return X.reshape(-1, X.shape[2], X.shape[3]) # @save def transpose_output(X, num_heads): """逆转transpose_qkv函数的操作""" X = X.reshape(-1, num_heads, X.shape[1], X.shape[2]) X = X.permute(0, 2, 1, 3) return X.reshape(X.shape[0], X.shape[1], -1) class MultiHeadAttention(nn.Module): """多头注意力""" def __init__(self, key_size, query_size, value_size, num_hiddens, num_heads, dropout, bias=False, **kwargs): super(MultiHeadAttention, self).__init__(**kwargs) self.num_heads = num_heads self.attention = d2l.DotProductAttention(dropout) self.W_q = nn.Linear(query_size, num_hiddens, bias=bias) self.W_k = nn.Linear(key_size, num_hiddens, bias=bias) self.W_v = nn.Linear(value_size, num_hiddens, bias=bias) self.W_o = nn.Linear(num_hiddens, num_hiddens, bias=bias) # queries,keys,values的形状: # (batch_size,查询或者“键-值”对的个数,num_hiddens) # valid_lens 的形状: # (batch_size,)或(batch_size,查询的个数) # 经过变换后,输出的queries,keys,values 的形状: # (batch_size*num_heads,查询或者“键-值”对的个数, # num_hiddens/num_heads) def forward(self, queries, keys, values, valid_lens): queries = transpose_qkv(self.W_q(queries), self.num_heads) keys = transpose_qkv(self.W_k(keys), self.num_heads) values = transpose_qkv(self.W_v(values), self.num_heads) if valid_lens is not None: # 在轴0,将第一项(标量或者矢量)复制num_heads次, # 然后如此复制第二项,然后诸如此类。 valid_lens = torch.repeat_interleave( valid_lens, repeats=self.num_heads, dim=0) # output的形状:(batch_size*num_heads,查询的个数, # num_hiddens/num_heads) output = self.attention(queries, keys, values, valid_lens) # output_concat的形状:(batch_size,查询的个数,num_hiddens) output_concat = transpose_output(output, self.num_heads) return self.W_o(output_concat) num_hiddens, num_heads = 100, 5 attention = MultiHeadAttention(num_hiddens, num_hiddens, num_hiddens, num_hiddens, num_heads, 0.5) print(attention.eval()) batch_size, num_queries = 2, 4 num_kvpairs, valid_lens = 6, torch.tensor([3, 2]) X = torch.ones((batch_size, num_queries, num_hiddens)) Y = torch.ones((batch_size, num_kvpairs, num_hiddens)) print(attention(X, Y, Y, valid_lens).shape) import math import pandas as pd import torch from torch import nn from d2l import torch as d2l class PositionWiseFFN(nn.Module): def __init__(self,ffn_num_input,ffn_num_hiddens,ffn_num_output,**kwargs): super(PositionWiseFFN,self).__init__(**kwargs) self.dense1=nn.Linear(ffn_num_input,ffn_num_hiddens) self.relu=nn.ReLU() self.dense2=nn.Linear(ffn_num_hiddens,ffn_num_output) def forward(self,x): return self.dense2(self.relu(self.dense1(x))) ffn = PositionWiseFFN(4, 4, 8) ffn.eval() print(ffn(torch.ones((2, 3, 4)))[0]) ln = nn.LayerNorm(2) bn = nn.BatchNorm1d(2) X = torch.tensor([[1, 2], [2, 3]], dtype=torch.float32) # 在训练模式下计算X的均值和方差 print('layer norm:', ln(X), '\nbatch norm:', bn(X)) # 用残差连接和层归一化 class AddNorm(nn.Module): def __init__(self,normalized_shape,dropout,**kwargs): super(AddNorm).__init__(**kwargs) self.dropout=nn.Dropout(dropout) self.ln=nn.LayerNorm(normalized_shape) def forward(self,x,y): return self.ln(self.dropout(y)+x) #输入和输出相加然后进入下一层 # 实现编码器中的一层 #@save class EncoderBlock(nn.Module): """Transformer编码器块""" def __init__(self, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias=False, **kwargs): super(EncoderBlock, self).__init__(**kwargs) self.attention = d2l.MultiHeadAttention( key_size, query_size, value_size, num_hiddens, num_heads, dropout, use_bias) self.addnorm1 = AddNorm(norm_shape, dropout) self.ffn = PositionWiseFFN( ffn_num_input, ffn_num_hiddens, num_hiddens) self.addnorm2 = AddNorm(norm_shape, dropout) def forward(self, X, valid_lens): Y = self.addnorm1(X, self.attention(X, X, X, valid_lens)) return self.addnorm2(Y, self.ffn(Y)) X = torch.ones((2, 100, 24)) valid_lens = torch.tensor([3, 2]) encoder_blk = EncoderBlock(24, 24, 24, 24, [100, 24], 24, 48, 8, 0.5) encoder_blk.eval() print(encoder_blk(X, valid_lens).shape) #@save class TransformerEncoder(d2l.Encoder): """Transformer编码器""" def __init__(self, vocab_size, key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, num_layers, dropout, use_bias=False, **kwargs): super(TransformerEncoder, self).__init__(**kwargs) self.num_hiddens = num_hiddens self.embedding = nn.Embedding(vocab_size, num_hiddens) self.pos_encoding = d2l.PositionalEncoding(num_hiddens, dropout) self.blks = nn.Sequential() for i in range(num_layers): self.blks.add_module("block"+str(i), EncoderBlock(key_size, query_size, value_size, num_hiddens, norm_shape, ffn_num_input, ffn_num_hiddens, num_heads, dropout, use_bias)) def forward(self, X, valid_lens, *args): # 因为位置编码值在-1和1之间, # 因此嵌入值乘以嵌入维度的平方根进行缩放, # 然后再与位置编码相加。 X = self.pos_encoding(self.embedding(X) * math.sqrt(self.num_hiddens)) self.attention_weights = [None] * len(self.blks) for i, blk in enumerate(self.blks): X = blk(X, valid_lens) self.attention_weights[ i] = blk.attention.attention.attention_weights return X encoder = TransformerEncoder( 200, 24, 24, 24, 24, [100, 24], 24, 48, 8, 2, 0.5) encoder.eval()
标签:hiddens,Transformer,self,torch,num,valid,size From: https://www.cnblogs.com/o-Sakurajimamai-o/p/17755399.html