代码描述:
- 数据生成:
generate_data
函数生成序列数据用于训练。 - 注意力机制:定义了缩放点积注意力和多头注意力机制。
- 前馈神经网络:定义了前馈神经网络层。
- 编码器和解码器层:定义了编码器层和解码器层。
- 编码器和解码器:定义了完整的编码器和解码器结构。
- Transformer模型:构建了完整的Transformer模型。
- 训练和预测:定义了模型的训练过程,并进行训练
完整代码
import numpy as np import pandas as pd import tensorflow as tf from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout, Embedding from tensorflow.keras.models import Model import time # 数据生成 def generate_data(num_samples, sequence_length, vocab_size): X = np.random.randint(0, vocab_size, (num_samples, sequence_length)) y = np.random.randint(0, vocab_size, (num_samples, sequence_length)) return X, y num_samples = 1000 sequence_length = 20 vocab_size = 50 X, y = generate_data(num_samples, sequence_length, vocab_size) def scaled_dot_product_attention(q, k, v, mask): matmul_qk = tf.matmul(q, k, transpose_b=True) dk = tf.cast(tf.shape(k)[-1], tf.float32) scaled_attention_logits = matmul_qk / tf.math.sqrt(dk) if mask is not None: scaled_attention_logits += (mask * -1e9) attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1) output = tf.matmul(attention_weights, v) return output, attention_weights class MultiHeadAttention(tf.keras.layers.Layer): def __init__(self, d_model, num_heads): super(MultiHeadAttention, self).__init__() self.num_heads = num_heads self.d_model = d_model assert d_model % self.num_heads == 0 self.depth = d_model // self.num_heads self.wq = Dense(d_model) self.wk = Dense(d_model) self.wv = Dense(d_model) self.dense = Dense(d_model) def split_heads(self, x, batch_size): x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth)) return tf.transpose(x, perm=[0, 2, 1, 3]) def call(self, v, k, q, mask): batch_size = tf.shape(q)[0] q = self.wq(q) k = self.wk(k) v = self.wv(v) q = self.split_heads(q, batch_size) k = self.split_heads(k, batch_size) v = self.split_heads(v, batch_size) scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask) scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3]) concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model)) output = self.dense(concat_attention) return output, attention_weights def point_wise_feed_forward_network(d_model, dff): return tf.keras.Sequential([ Dense(dff, activation='relu'), Dense(d_model) ]) class EncoderLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, dff, rate=0.1): super(EncoderLayer, self).__init__() self.mha = MultiHeadAttention(d_model, num_heads) self.ffn = point_wise_feed_forward_network(d_model, dff) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) def call(self, x, training, mask): attn_output, _ = self.mha(x, x, x, mask) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(x + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) out2 = self.layernorm2(out1 + ffn_output) return out2 class DecoderLayer(tf.keras.layers.Layer): def __init__(self, d_model, num_heads, dff, rate=0.1): super(DecoderLayer, self).__init__() self.mha1 = MultiHeadAttention(d_model, num_heads) self.mha2 = MultiHeadAttention(d_model, num_heads) self.ffn = point_wise_feed_forward_network(d_model, dff) self.layernorm1 = LayerNormalization(epsilon=1e-6) self.layernorm2 = LayerNormalization(epsilon=1e-6) self.layernorm3 = LayerNormalization(epsilon=1e-6) self.dropout1 = Dropout(rate) self.dropout2 = Dropout(rate) self.dropout3 = Dropout(rate) def call(self, x, enc_output, training, look_ahead_mask, padding_mask): attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask) attn1 = self.dropout1(attn1, training=training) out1 = self.layernorm1(x + attn1) attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask) attn2 = self.dropout2(attn2, training=training) out2 = self.layernorm2(out1 + attn2) ffn_output = self.ffn(out2) ffn_output = self.dropout3(ffn_output, training=training) out3 = self.layernorm3(out2 + ffn_output) return out3, attn_weights_block1, attn_weights_block2 class Encoder(tf.keras.layers.Layer): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1): super(Encoder, self).__init__() self.d_model = d_model self.num_layers = num_layers self.embedding = Embedding(input_vocab_size, d_model) self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model) self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)] self.dropout = Dropout(rate) def call(self, x, training, mask): seq_len = tf.shape(x)[1] x = self.embedding(x) x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) x += self.pos_encoding[:, :seq_len, :] x = self.dropout(x, training=training) for i in range(self.num_layers): x = self.enc_layers[i](x, training, mask) return x class Decoder(tf.keras.layers.Layer): def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1): super(Decoder, self).__init__() self.d_model = d_model self.num_layers = num_layers self.embedding = Embedding(target_vocab_size, d_model) self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model) self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)] self.dropout = Dropout(rate) def call(self, x, enc_output, training, look_ahead_mask, padding_mask): seq_len = tf.shape(x)[1] attention_weights = {} x = self.embedding(x) x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32)) x += self.pos_encoding[:, :seq_len, :] x = self.dropout(x, training=training) for i in range(self.num_layers): x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask) attention_weights[f'decoder_layer{i+1}_block1'] = block1 attention_weights[f'decoder_layer{i+1}_block2'] = block2 return x, attention_weights class Transformer(tf.keras.Model): def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1): super(Transformer, self).__init__() self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate) self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate) self.final_layer = Dense(target_vocab_size) def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask): enc_output = self.encoder(inp, training, enc_padding_mask) dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask) final_output = self.final_layer(dec_output) return final_output, attention_weights def create_padding_mask(seq): seq = tf.cast(tf.math.equal(seq, 0), tf.float32) return seq[:, tf.newaxis, tf.newaxis, :] def create_look_ahead_mask(size): mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0) return mask def positional_encoding(position, d_model): angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model) angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2]) angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2]) pos_encoding = angle_rads[np.newaxis, ...] return tf.cast(pos_encoding, dtype=tf.float32) def get_angles(pos, i, d_model): angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model)) return pos * angle_rates class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule): def __init__(self, d_model, warmup_steps=4000): super(CustomSchedule, self).__init__() self.d_model = d_model self.d_model = tf.cast(self.d_model, tf.float32) self.warmup_steps = warmup_steps def __call__(self, step): arg1 = tf.math.rsqrt(step) arg2 = step * (self.warmup_steps ** -1.5) return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2) learning_rate = CustomSchedule(d_model=128) optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9) loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none') def loss_function(real, pred): mask = tf.math.logical_not(tf.math.equal(real, 0)) loss_ = loss_object(real, pred) mask = tf.cast(mask, dtype=loss_.dtype) loss_ *= mask return tf.reduce_sum(loss_) / tf.reduce_sum(mask) train_loss = tf.keras.metrics.Mean(name='train_loss') def create_masks(inp, tar): enc_padding_mask = create_padding_mask(inp) dec_padding_mask = create_padding_mask(inp) look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1]) dec_target_padding_mask = create_padding_mask(tar) combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask) return enc_padding_mask, combined_mask, dec_padding_mask num_layers = 4 d_model = 128 dff = 512 num_heads = 8 input_vocab_size = vocab_size + 1 target_vocab_size = vocab_size + 1 dropout_rate = 0.1 transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input=input_vocab_size, pe_target=target_vocab_size, rate=dropout_rate) EPOCHS = 20 BATCH_SIZE = 64 # 数据集 dataset = tf.data.Dataset.from_tensor_slices((X, y)).shuffle(num_samples).batch(BATCH_SIZE) for epoch in range(EPOCHS): start = time.time() train_loss.reset_states() for (batch, (inp, tar)) in enumerate(dataset): tar_inp = tar[:, :-1] tar_real = tar[:, 1:] enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp) with tf.GradientTape() as tape: predictions, _ = transformer(inp, tar_inp, True, enc_padding_mask, combined_mask, dec_padding_mask) loss = loss_function(tar_real, predictions) gradients = tape.gradient(loss, transformer.trainable_variables) optimizer.apply_gradients(zip(gradients, transformer.trainable_variables)) train_loss(loss) if batch % 50 == 0: print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f}') print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f}') print(f'Time taken for 1 epoch: {time.time() - start:.2f} secs\n')标签:Transformer,self,mask,num,模块,tf,即插即用,model,size From: https://blog.csdn.net/GDHBFTGGG/article/details/140698401