首页 > 其他分享 >Transformer模块的相关代码实现/缝合模块/即插即用模块

Transformer模块的相关代码实现/缝合模块/即插即用模块

时间:2024-07-26 08:57:48浏览次数:16  
标签:Transformer self mask num 模块 tf 即插即用 model size

代码描述:

  1. 数据生成generate_data函数生成序列数据用于训练。
  2. 注意力机制:定义了缩放点积注意力和多头注意力机制。
  3. 前馈神经网络:定义了前馈神经网络层。
  4. 编码器和解码器层:定义了编码器层和解码器层。
  5. 编码器和解码器:定义了完整的编码器和解码器结构。
  6. Transformer模型:构建了完整的Transformer模型。
  7. 训练和预测:定义了模型的训练过程,并进行训练

完整代码

import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, LayerNormalization, Dropout, Embedding
from tensorflow.keras.models import Model
import time

# 数据生成
def generate_data(num_samples, sequence_length, vocab_size):
    X = np.random.randint(0, vocab_size, (num_samples, sequence_length))
    y = np.random.randint(0, vocab_size, (num_samples, sequence_length))
    return X, y

num_samples = 1000
sequence_length = 20
vocab_size = 50

X, y = generate_data(num_samples, sequence_length, vocab_size)

def scaled_dot_product_attention(q, k, v, mask):
    matmul_qk = tf.matmul(q, k, transpose_b=True)
    dk = tf.cast(tf.shape(k)[-1], tf.float32)
    scaled_attention_logits = matmul_qk / tf.math.sqrt(dk)

    if mask is not None:
        scaled_attention_logits += (mask * -1e9)

    attention_weights = tf.nn.softmax(scaled_attention_logits, axis=-1)
    output = tf.matmul(attention_weights, v)

    return output, attention_weights

class MultiHeadAttention(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads):
        super(MultiHeadAttention, self).__init__()
        self.num_heads = num_heads
        self.d_model = d_model

        assert d_model % self.num_heads == 0

        self.depth = d_model // self.num_heads

        self.wq = Dense(d_model)
        self.wk = Dense(d_model)
        self.wv = Dense(d_model)

        self.dense = Dense(d_model)

    def split_heads(self, x, batch_size):
        x = tf.reshape(x, (batch_size, -1, self.num_heads, self.depth))
        return tf.transpose(x, perm=[0, 2, 1, 3])

    def call(self, v, k, q, mask):
        batch_size = tf.shape(q)[0]

        q = self.wq(q)
        k = self.wk(k)
        v = self.wv(v)

        q = self.split_heads(q, batch_size)
        k = self.split_heads(k, batch_size)
        v = self.split_heads(v, batch_size)

        scaled_attention, attention_weights = scaled_dot_product_attention(q, k, v, mask)
        scaled_attention = tf.transpose(scaled_attention, perm=[0, 2, 1, 3])
        concat_attention = tf.reshape(scaled_attention, (batch_size, -1, self.d_model))

        output = self.dense(concat_attention)

        return output, attention_weights

def point_wise_feed_forward_network(d_model, dff):
    return tf.keras.Sequential([
        Dense(dff, activation='relu'),
        Dense(d_model)
    ])

class EncoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(EncoderLayer, self).__init__()

        self.mha = MultiHeadAttention(d_model, num_heads)
        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)

    def call(self, x, training, mask):
        attn_output, _ = self.mha(x, x, x, mask)
        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(x + attn_output)

        ffn_output = self.ffn(out1)
        ffn_output = self.dropout2(ffn_output, training=training)
        out2 = self.layernorm2(out1 + ffn_output)

        return out2

class DecoderLayer(tf.keras.layers.Layer):
    def __init__(self, d_model, num_heads, dff, rate=0.1):
        super(DecoderLayer, self).__init__()

        self.mha1 = MultiHeadAttention(d_model, num_heads)
        self.mha2 = MultiHeadAttention(d_model, num_heads)

        self.ffn = point_wise_feed_forward_network(d_model, dff)

        self.layernorm1 = LayerNormalization(epsilon=1e-6)
        self.layernorm2 = LayerNormalization(epsilon=1e-6)
        self.layernorm3 = LayerNormalization(epsilon=1e-6)

        self.dropout1 = Dropout(rate)
        self.dropout2 = Dropout(rate)
        self.dropout3 = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        attn1, attn_weights_block1 = self.mha1(x, x, x, look_ahead_mask)
        attn1 = self.dropout1(attn1, training=training)
        out1 = self.layernorm1(x + attn1)

        attn2, attn_weights_block2 = self.mha2(enc_output, enc_output, out1, padding_mask)
        attn2 = self.dropout2(attn2, training=training)
        out2 = self.layernorm2(out1 + attn2)

        ffn_output = self.ffn(out2)
        ffn_output = self.dropout3(ffn_output, training=training)
        out3 = self.layernorm3(out2 + ffn_output)

        return out3, attn_weights_block1, attn_weights_block2

class Encoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, maximum_position_encoding, rate=0.1):
        super(Encoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embedding(input_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)

        self.enc_layers = [EncoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, training, mask):
        seq_len = tf.shape(x)[1]

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x = self.enc_layers[i](x, training, mask)

        return x

class Decoder(tf.keras.layers.Layer):
    def __init__(self, num_layers, d_model, num_heads, dff, target_vocab_size, maximum_position_encoding, rate=0.1):
        super(Decoder, self).__init__()

        self.d_model = d_model
        self.num_layers = num_layers

        self.embedding = Embedding(target_vocab_size, d_model)
        self.pos_encoding = positional_encoding(maximum_position_encoding, self.d_model)

        self.dec_layers = [DecoderLayer(d_model, num_heads, dff, rate) for _ in range(num_layers)]
        self.dropout = Dropout(rate)

    def call(self, x, enc_output, training, look_ahead_mask, padding_mask):
        seq_len = tf.shape(x)[1]
        attention_weights = {}

        x = self.embedding(x)
        x *= tf.math.sqrt(tf.cast(self.d_model, tf.float32))
        x += self.pos_encoding[:, :seq_len, :]

        x = self.dropout(x, training=training)

        for i in range(self.num_layers):
            x, block1, block2 = self.dec_layers[i](x, enc_output, training, look_ahead_mask, padding_mask)

            attention_weights[f'decoder_layer{i+1}_block1'] = block1
            attention_weights[f'decoder_layer{i+1}_block2'] = block2

        return x, attention_weights

class Transformer(tf.keras.Model):
    def __init__(self, num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input, pe_target, rate=0.1):
        super(Transformer, self).__init__()

        self.encoder = Encoder(num_layers, d_model, num_heads, dff, input_vocab_size, pe_input, rate)
        self.decoder = Decoder(num_layers, d_model, num_heads, dff, target_vocab_size, pe_target, rate)
        self.final_layer = Dense(target_vocab_size)

    def call(self, inp, tar, training, enc_padding_mask, look_ahead_mask, dec_padding_mask):
        enc_output = self.encoder(inp, training, enc_padding_mask)
        dec_output, attention_weights = self.decoder(tar, enc_output, training, look_ahead_mask, dec_padding_mask)
        final_output = self.final_layer(dec_output)

        return final_output, attention_weights

def create_padding_mask(seq):
    seq = tf.cast(tf.math.equal(seq, 0), tf.float32)
    return seq[:, tf.newaxis, tf.newaxis, :]

def create_look_ahead_mask(size):
    mask = 1 - tf.linalg.band_part(tf.ones((size, size)), -1, 0)
    return mask

def positional_encoding(position, d_model):
    angle_rads = get_angles(np.arange(position)[:, np.newaxis], np.arange(d_model)[np.newaxis, :], d_model)
    angle_rads[:, 0::2] = np.sin(angle_rads[:, 0::2])
    angle_rads[:, 1::2] = np.cos(angle_rads[:, 1::2])

    pos_encoding = angle_rads[np.newaxis, ...]

    return tf.cast(pos_encoding, dtype=tf.float32)

def get_angles(pos, i, d_model):
    angle_rates = 1 / np.power(10000, (2 * (i // 2)) / np.float32(d_model))
    return pos * angle_rates

class CustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
    def __init__(self, d_model, warmup_steps=4000):
        super(CustomSchedule, self).__init__()

        self.d_model = d_model
        self.d_model = tf.cast(self.d_model, tf.float32)

        self.warmup_steps = warmup_steps

    def __call__(self, step):
        arg1 = tf.math.rsqrt(step)
        arg2 = step * (self.warmup_steps ** -1.5)

        return tf.math.rsqrt(self.d_model) * tf.math.minimum(arg1, arg2)

learning_rate = CustomSchedule(d_model=128)
optimizer = tf.keras.optimizers.Adam(learning_rate, beta_1=0.9, beta_2=0.98, epsilon=1e-9)

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_sum(loss_) / tf.reduce_sum(mask)

train_loss = tf.keras.metrics.Mean(name='train_loss')

def create_masks(inp, tar):
    enc_padding_mask = create_padding_mask(inp)
    dec_padding_mask = create_padding_mask(inp)
    look_ahead_mask = create_look_ahead_mask(tf.shape(tar)[1])
    dec_target_padding_mask = create_padding_mask(tar)
    combined_mask = tf.maximum(dec_target_padding_mask, look_ahead_mask)

    return enc_padding_mask, combined_mask, dec_padding_mask

num_layers = 4
d_model = 128
dff = 512
num_heads = 8
input_vocab_size = vocab_size + 1
target_vocab_size = vocab_size + 1
dropout_rate = 0.1

transformer = Transformer(num_layers, d_model, num_heads, dff, input_vocab_size, target_vocab_size, pe_input=input_vocab_size, pe_target=target_vocab_size, rate=dropout_rate)

EPOCHS = 20
BATCH_SIZE = 64

# 数据集
dataset = tf.data.Dataset.from_tensor_slices((X, y)).shuffle(num_samples).batch(BATCH_SIZE)

for epoch in range(EPOCHS):
    start = time.time()

    train_loss.reset_states()

    for (batch, (inp, tar)) in enumerate(dataset):
        tar_inp = tar[:, :-1]
        tar_real = tar[:, 1:]

        enc_padding_mask, combined_mask, dec_padding_mask = create_masks(inp, tar_inp)

        with tf.GradientTape() as tape:
            predictions, _ = transformer(inp, tar_inp, True, enc_padding_mask, combined_mask, dec_padding_mask)
            loss = loss_function(tar_real, predictions)

        gradients = tape.gradient(loss, transformer.trainable_variables)
        optimizer.apply_gradients(zip(gradients, transformer.trainable_variables))

        train_loss(loss)

        if batch % 50 == 0:
            print(f'Epoch {epoch + 1} Batch {batch} Loss {train_loss.result():.4f}')

    print(f'Epoch {epoch + 1} Loss {train_loss.result():.4f}')
    print(f'Time taken for 1 epoch: {time.time() - start:.2f} secs\n')

标签:Transformer,self,mask,num,模块,tf,即插即用,model,size
From: https://blog.csdn.net/GDHBFTGGG/article/details/140698401

相关文章

  • numpy 已安装,但出现错误“导入 _multiarray_umath 时 DLL 加载失败:找不到指定的模块。
    我使用的是带有Python3.9的Anaconda发行版。在安装的软件包中,我有numpy、pandas等。在PyCharmIDE中,我将ProjectInterpreter设置为与Anaconda一起安装的python.exe的路径:C:\Users\[user]\anaconda3\envs[Tensorflow]\pythonw.exeP.S:我已经尝试了各种方法来解决堆栈溢......
  • Profinet转ModbusTCP网关模块的配置与应用详解
    Profinet转ModbusTCP网关模块的配置与应用详解Profinet转ModbusTCP网关模块(XD-ETHPN20)是一种常见的工业通信设备,广泛应用于现代工业自动化系统中。通过使用Profinet转ModbusTCP网关模块(XD-ETHPN20)将Profinet协议转换成ModbusTCP协议,实现了不同网络之间的互联互通。这种网关设备......
  • Transformer —— 李沐老师论文跟读
    论文地址:https://arxiv.org/pdf/1706.03762摘要当时的序列转录模型主要依赖于复杂的循环或者卷积神经网络加encoder+decoder架构组成,而论文提出了一种简单的网络架构transformer,在原有的encoder+decoder基础上增加注意力机制,而不使用循环和卷积。引言在引言中提到RNN的缺点......
  • AI大模型原理(通俗易懂版)——Transformer
    传送门:AI大模型原理(通俗易懂版)-CSDN博客关于GPT的生成等大语言模型背后的技术原理,一个很常见的说法是,它们是通过预测出现概率最高的下一个词来实现文本生成的,这种效果有点像搜索引擎的自动补全。每当我们输入一个新的字或词,输入框就开始预测后面的文本,概率越高的,排在越上面。......
  • Spring Boot + Maven 多模块项目开发详解
    SpringBoot+Maven多模块项目开发详解在现代软件开发中,模块化开发是一种非常重要的设计思想。它不仅能提高代码的可维护性,还能促进团队协作。今天,我们就来聊聊如何使用SpringBoot和Maven来构建一个多模块项目。为什么要使用多模块?在开始之前,我们先来讨论一下为什么要使......
  • 模块3 面向对象编程高级 --- 第九章:实现接口
    第九章实现接口主要知识点1、接口的定义2、接口的声明3、接口的实现4、接口的应用学习目标掌握接口的定义、声明、实现以及使用方法。接口是一种特殊的类,允许包括变量、常量等一个类所包含的基本内容,可以包含方法。接口中的方法只能有声明,不允许......
  • vuex的工作流程,模块化使用案例分享,及状态持久化
    文章目录一、Vuex是什么?二、核心概念三、Vuex的工作流程四、什么情况下我应该使用Vuex?五、Vuex的使用六、使用示例七、状态持久化1、手动利用HTML5的本地存储2、利用vuex-persistedstate插件2.1、安装2.2、配置一、Vuex是什么?Vuex是一个专为Vue.js应用程......
  • 系统模块时序图的重要性:解锁系统模块交互的全景视图
    在复杂的系统开发中,理解和管理不同模块之间的交互是成功的关键。时序图是一种有效的工具,可以帮助我们清晰地展示这些交互,提升设计和开发的效率。本文将深入探讨系统模块之间的时序图,并通过实例展示其实际应用。1.什么是系统模块之间的时序图?系统模块之间的时序图是用来展示......
  • importlib.import_module() 从多个子文件夹导入模块,但仅识别顶级目录
    我使用importlib.import_module()从不同的文件夹导入模块,但它只识别顶级目录名称。这是我的环境。test目录下有a、b、c三个文件夹(py312)root@ubuntu2004-host:~#treetest/test/├──a│  ├──aa│  │  └──aaa│  │  └─......
  • AWS Lambda 没有名为“regex._regex”的模块
    我一直在尝试使用python3.9通过AWSLambda运行一些代码,但在运行代码时不断遇到问题。我不断收到“没有名为'regex._regex'的模块”错误。几天前,我在通过PyCharm在本地工作时遇到了这个问题,但我想通过安装更新​​版本的正则表达式来解决这个问题(老实说,我不记得到底是什......