首页 > 编程语言 >【HuggingFace Transformers】OpenAIGPTModel源码解析

【HuggingFace Transformers】OpenAIGPTModel源码解析

时间:2024-09-04 15:23:07浏览次数:21  
标签:None Transformers embeds self ids states OpenAIGPTModel 源码 hidden

OpenAIGPTModel源码解析

说到ChatGPT,大家可能都使用过吧。2022年,ChatGPT的推出引发了广泛的关注和讨论。这款对话生成模型不仅具备了强大的语言理解和生成能力,还能进行非常自然的对话,给用户带来了全新的互动体验。然而,ChatGPT的成功背后离不开它的前身——GPT

1. GPT 介绍

GPT(Generative Pre-trained Transformer)是由OpenAI开发的一种基于Transformer架构的大型语言模型。它由多个堆叠的自注意力解码器层(Transformer Blocks)组成,每一层包含多头自注意力机制和前馈神经网络,并配有残差连接和层归一化以稳定训练。GPT采用自回归方式生成文本,通过在大规模互联网数据上进行预训练,具备强大的自然语言理解和生成能力,能够完成对话生成、文本补全等多种任务。其结构如下:

在这里插入图片描述

2. OpenAIGPTModel类 源码解析

源码地址:transformers/src/transformers/models/openai/modeling_openai.py

# -*- coding: utf-8 -*-
# @time: 2024/9/3 20:39
from typing import Optional, Union, Tuple

import torch

from torch import nn
from transformers import add_start_docstrings, OpenAIGPTPreTrainedModel
from transformers.modeling_outputs import BaseModelOutput
from transformers.models.openai.modeling_openai import OPENAI_GPT_START_DOCSTRING, Block, OPENAI_GPT_INPUTS_DOCSTRING, _CHECKPOINT_FOR_DOC, _CONFIG_FOR_DOC
from transformers.utils import add_start_docstrings_to_model_forward, add_code_sample_docstrings


@add_start_docstrings(
    "The bare OpenAI GPT transformer model outputting raw hidden-states without any specific head on top.",
    OPENAI_GPT_START_DOCSTRING,
)
class OpenAIGPTModel(OpenAIGPTPreTrainedModel):
    def __init__(self, config):
        super().__init__(config)

        self.tokens_embed = nn.Embedding(config.vocab_size, config.n_embd)  # 定义 token 嵌入层
        self.positions_embed = nn.Embedding(config.n_positions, config.n_embd)  # 定义 position 嵌入层
        self.drop = nn.Dropout(config.embd_pdrop)  # 定义 drop 层
        self.h = nn.ModuleList([Block(config.n_positions, config, scale=True) for _ in range(config.n_layer)]) # 定义多个 Block 层

        # 注册一个缓冲区用于存储position_ids,初始化为从 0 到 config.n_positions 的序列
        self.register_buffer("position_ids", torch.arange(config.n_positions), persistent=False)
        # Initialize weights and apply final processing
        self.post_init()

    def get_input_embeddings(self):
        return self.tokens_embed

    def set_input_embeddings(self, new_embeddings):
        self.tokens_embed = new_embeddings

    def _prune_heads(self, heads_to_prune):
        """
        Prunes heads of the model. heads_to_prune: dict of {layer_num: list of heads to prune in this layer}
        """
        # 剪掉模型多头注意力机制中的一些头,heads_to_prune 是一个字典,键为layer_num,值为需要剪枝的 heads 列表。
        for layer, heads in heads_to_prune.items():
            self.h[layer].attn.prune_heads(heads)

    @add_start_docstrings_to_model_forward(OPENAI_GPT_INPUTS_DOCSTRING)
    @add_code_sample_docstrings(
        checkpoint=_CHECKPOINT_FOR_DOC,
        output_type=BaseModelOutput,
        config_class=_CONFIG_FOR_DOC,
    )
    def forward(
        self,
        input_ids: Optional[torch.LongTensor] = None,
        attention_mask: Optional[torch.FloatTensor] = None,
        token_type_ids: Optional[torch.LongTensor] = None,
        position_ids: Optional[torch.LongTensor] = None,
        head_mask: Optional[torch.FloatTensor] = None,
        inputs_embeds: Optional[torch.FloatTensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[Tuple[torch.Tensor], BaseModelOutput]:
        # 根据 config 配置设定 output_attentions, output_hidden_states, return_dict 的值
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        # 获取 input_ids 或者 inputs_embeds 以及 input_shape
        if input_ids is not None and inputs_embeds is not None:  # 当 input_ids 和 inputs_embeds 同时存在时,抛出错误
            raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time")
        elif input_ids is not None:  # 如果存在 input_ids,将其形状调整为 (batch_size, sequence_length)
            self.warn_if_padding_and_no_attention_mask(input_ids, attention_mask)
            input_shape = input_ids.size()
            input_ids = input_ids.view(-1, input_shape[-1])
        elif inputs_embeds is not None:  # 如果存在 inputs_embeds,获取其形状
            input_shape = inputs_embeds.size()[:-1]
        else:  # 如果 input_ids 和 inputs_embeds 都不存在,抛出错误
            raise ValueError("You have to specify either input_ids or inputs_embeds")

        # 如果没有传入 position_ids,则生成默认的 position_ids
        if position_ids is None:
            # Code is different from when we had a single embedding matrix from position and token embeddings
            position_ids = self.position_ids[None, : input_shape[-1]]

        # ------------------------------------- 1. 获取 attention_mask -----------------------------#
        # Attention mask.
        if attention_mask is not None:
            # We create a 3D attention mask from a 2D tensor mask.
            # Sizes are [batch_size, 1, 1, to_seq_length]
            # So we can broadcast to [batch_size, num_heads, from_seq_length, to_seq_length]
            # this attention mask is more simple than the triangular masking of causal attention
            # used in OpenAI GPT, we just need to prepare the broadcast dimension here.
            attention_mask = attention_mask.unsqueeze(1).unsqueeze(2)  # 将 2D 掩码扩展为 3D 掩码,适用于批量输入

            # Since attention_mask is 1.0 for positions we want to attend and 0.0 for
            # masked positions, this operation will create a tensor which is 0.0 for
            # positions we want to attend and the dtype's smallest value for masked positions.
            # Since we are adding it to the raw scores before the softmax, this is
            # effectively the same as removing these entirely.
            # 将注意力掩码转换为与模型参数相同的数据类型,并进行数值变换,torch.finfo(self.dtype).min 返回数据类型的最小值。
            attention_mask = attention_mask.to(dtype=next(self.parameters()).dtype)  # fp16 compatibility
            attention_mask = (1.0 - attention_mask) * torch.finfo(self.dtype).min
        # ----------------------------------------------------------------------------------------#

        # ------------------------------------- 2. 获取 head_mask ---------------------------------#
        # Prepare head mask if needed
        head_mask = self.get_head_mask(head_mask, self.config.n_layer)
        # ---------------------------------------------------------- -----------------------------#

        # ------------------------------------- 3. 获取 hidden_states -----------------------------#
        # 如果 inputs_embeds 为 None,则使用 tokens_embed 对 input_ids 计算
        if inputs_embeds is None:
            inputs_embeds = self.tokens_embed(input_ids)
        # 计算 position_embeds
        position_embeds = self.positions_embed(position_ids)
        # 如果存在 token_type_ids,使用 tokens_embed 计算;否则 token_type_embeds 为 0
        if token_type_ids is not None:
            token_type_ids = token_type_ids.view(-1, token_type_ids.size(-1))
            token_type_embeds = self.tokens_embed(token_type_ids)
        else:
            token_type_embeds = 0
        # 计算 hidden_states,即inputs_embeds、position_embeds 和 token_type_embeds 之和,并使用 dropout
        hidden_states = inputs_embeds + position_embeds + token_type_embeds
        hidden_states = self.drop(hidden_states)
        # -------------------------------------------------------------------------------------#

        # 获取输出形状,以及初始化输出结果 all_attentions 和 all_hidden_states
        output_shape = input_shape + (hidden_states.size(-1),)
        all_attentions = () if output_attentions else None
        all_hidden_states = () if output_hidden_states else None

        # -----------------------------------4. Block逐层计算处理(核心部分)--------------------#
        for i, block in enumerate(self.h):
            # 如果需要输出 hidden states,将当前 hidden_states 添加到 all_hidden_states
            if output_hidden_states:
                all_hidden_states = all_hidden_states + (hidden_states,)

            # 通过当前 Block 处理 hidden_states,得到新的 hidden_states 和 attentions
            outputs = block(hidden_states, attention_mask, head_mask[i], output_attentions=output_attentions)
            hidden_states = outputs[0]
            # 如果需要输出 attentions,将当前 attentions 添加到 all_attentions
            if output_attentions:
                all_attentions = all_attentions + (outputs[1],)
        # ---------------------------------------------------------------------------------#

        # 将 hidden_states 的形状调整为输出形状
        hidden_states = hidden_states.view(*output_shape)

        # 如果需要输出 hidden states,将最后的 hidden_states 添加到 all_hidden_states
        if output_hidden_states:
            all_hidden_states = all_hidden_states + (hidden_states,)

        # -----------------------------------5. 根据配置的输出方式输出结果-------------------------------#
        if not return_dict:
            return tuple(v for v in [hidden_states, all_hidden_states, all_attentions] if v is not None)

        return BaseModelOutput(
            last_hidden_state=hidden_states,
            hidden_states=all_hidden_states,
            attentions=all_attentions,
        )

标签:None,Transformers,embeds,self,ids,states,OpenAIGPTModel,源码,hidden
From: https://blog.csdn.net/weixin_47936614/article/details/141893646

相关文章