1. Qwen整体介绍
对于一个完全没接触过大模型的小白来说,猛一听这个名字首先会一懵:Qwen是啥。这里首先解答一下这个问题。下面是官网给出介绍:Qwen是阿里巴巴集团Qwen团队研发的大语言模型和大型多模态模型系列。其实随着大模型领域的发展,这类产品已经有很多了例如:由百度开发的ERNIE,由清华大学开发的Zhuiyi等等。
目前,Qwen已升级至Qwen2版本。无论是语言模型还是多模态模型,均在大规模多语言和多模态数据上进行预训练,并通过高质量数据进行后期微调以贴近人类偏好。Qwen具备自然语言理解、文本生成、视觉理解、音频理解、工具使用、角色扮演、作为AI Agent进行互动等多种能力。
废话不多说,我们可以先看一下Qwen的整体架构。Qwen的整体架构与Llama2类似,如下图所示:
接下来我们顺着整体架构图学习,对于输入问题Text,首先会经过Tokenizer。在这里,对于没有了解过NLP的友友们又开始疑惑了:Tokenizer是啥?其实Tokenizer就是一个分词器,在这里的作用就是将问题中句子分成各个词,每一个词都对应着词表的索引,每个索引对应着一个词向量。
接着之后生成一个input_ids,由此输入Qwen2的主干部分。
1.1 模型初始化
第一部首先进行模型初始化:
class Qwen2Model(Qwen2PreTrainedModel):
def __init__(self, config: Qwen2Config):
super().__init__(config)
self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
self.layers = nn.ModuleList(
[Qwen2DecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)]
)
self.norm = Qwen2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
self.gradient_checkpointing = False
# Initialize weights and apply final processing
self.post_init()
下面我们来解释一下这段代码:
1.def __init__(self, config: Qwen2Config):
super().__init__(config)
Qwen2Model
继承自Qwen2PreTrainedModel
,Qwen2PreTrainedModel
继承自PreTrainedModel。PretrainedConfig
是transformers框架中所有配置类的基类。
Qwen2PreTrainedModel是已经预训练好的模型,具体代码如下:
class Qwen2PreTrainedModel(PreTrainedModel):
config_class = Qwen2Config
base_model_prefix = "model"
supports_gradient_checkpointing = True
_no_split_modules = ["Qwen2DecoderLayer"]
_skip_keys_device_placement = "past_key_values"
_supports_flash_attn_2 = True
_supports_sdpa = True
_supports_cache_class = True
_supports_quantized_cache = True
_supports_static_cache = True
def _init_weights(self, module):
std = self.config.initializer_range
if isinstance(module, nn.Linear):
module.weight.data.normal_(mean=0.0, std=std)
if module.bias is not None:
module.bias.data.zero_()
elif isinstance(module, nn.Embedding):
module.weight.data.normal_(mean=0.0, std=std)
if module.padding_idx is not None:
module.weight.data[module.padding_idx].zero_()
2. self.padding_idx = config.pad_token_id
self.vocab_size = config.vocab_size
这里设置了模型的两个属性:padding_idx
(用于指定填充标记的索引),vocab_size
(词汇表的大小,即模型能够处理的不同token的数量)。
3. self.embed_tokens = nn.Embedding(config.vocab_size, config.hidden_size, self.padding_idx)
self.layers = nn.ModuleList([Qwen2DecoderLayer(config, layer_idx) for layer_idx in range(config.num_hidden_layers)])
self.norm = Qwen2RMSNorm(config.hidden_size, eps=config.rms_norm_eps)
初始化模型的嵌入层、解码器层、归一化层:
- 嵌入层(
nn.Embedding
):模型使用嵌入层将输入的标记映射成密集的向量表示。config.vocab_size
是词汇表的大小,config.hidden_size
是嵌入向量的维度,self.padding_idx
是padding token的索引。 - 解码器层(
nn.ModuleList()
):模型包含多个解码器层,这些层都是由 `Qwen2DecoderLayer`` 定义。每个解码器层都是根据配置对象中的参数构建的,并且有一个索引layer_idx
,它表示层在模型中的位置。 - 归一化层
Qwen2RMSNorm
:归一化层使用的是 Root Mean Square Layer Normalization
4. self.gradient_checkpointing = False
设置了是否使用 gradient_checkpoint
主要是用来节省显存。它用于控制是否使用梯度检查点技术。这是一种节省内存的技术,通过在正向传播中丢弃一些中间梯度来实现。
5. self.post_init()
调用 post_init()
完成一些初始化和准备检查的代码。post_init()代码。
def post_init(self):
"""
A method executed at the end of each Transformer model initialization, to execute code that needs the model's
modules properly initialized (such as weight initialization).
"""
self.init_weights()
self._backward_compatibility_gradient_checkpointing()
1.2 forward方法
第二步实现Qwen2Model的forward方法。在实现 Qwen2Model
的 forward
方法时,我们需要分成三个主要部分:Embedding、Hidden States 和 Decoder Layers。这一过程展示了模型的前向传播行为,即在接收输入数据后,如何计算输出结果。
1. Embedding
首先,对于输入的 input_ids
,我们将使用 torch.nn.Embedding
进行嵌入处理。这一步负责将每个输入标识符映射到一个高维向量空间中,以便后续的处理。
2. Hidden States
接下来,经过嵌入处理后的向量将转化为 Hidden States。这些状态代表了输入数据的内部表示,将作为输入提供给模型的 Decoder Layers。
3. Decoder Layers
最后,经过前两步处理的 Hidden States 会传递到多层的 Decoder Layers 进行进一步的处理。Decoder Layers 是模型的核心部分,它们通过自注意力机制和前馈神经网络对输入进行深入处理,以生成最终的输出。
代码如下:
@add_start_docstrings_to_model_forward(QWEN2_INPUTS_DOCSTRING)
def forward(
self,
input_ids: torch.LongTensor = None,
attention_mask: Optional[torch.Tensor] = None,
position_ids: Optional[torch.LongTensor] = None,
past_key_values: Optional[List[torch.FloatTensor]] = None,
inputs_embeds: Optional[torch.FloatTensor] = None,
use_cache: Optional[bool] = None,
output_attentions: Optional[bool] = None,
output_hidden_states: Optional[bool] = None,
return_dict: Optional[bool] = None,
cache_position: Optional[torch.LongTensor] = None,
) -> Union[Tuple, BaseModelOutputWithPast]:
output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
output_hidden_states = (
output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
)
use_cache = use_cache if use_cache is not None else self.config.use_cache
return_dict = return_dict if return_dict is not None else self.config.use_return_dict
if (input_ids is None) ^ (inputs_embeds is not None):
raise ValueError(
"You cannot specify both input_ids and inputs_embeds at the same time, and must specify either one"
)
if self.gradient_checkpointing and self.training:
if use_cache:
logger.warning_once(
"`use_cache=True` is incompatible with gradient checkpointing. Setting `use_cache=False`..."
)
use_cache = False
use_legacy_cache = False
if use_cache and not isinstance(past_key_values, Cache) and not self.training:
use_legacy_cache = True
past_key_values = DynamicCache.from_legacy_cache(past_key_values)
logger.warning_once(
"We detected that you are passing `past_key_values` as a tuple and this is deprecated and will be removed in v4.46. "
"Please use an appropriate `Cache` class (https://huggingface.co/docs/transformers/internal/generation_utils#transformers.Cache)"
)
if inputs_embeds is None:
inputs_embeds = self.embed_tokens(input_ids)
if cache_position is None:
past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0
cache_position = torch.arange(
past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device
)
if position_ids is None:
position_ids = cache_position.unsqueeze(0)
causal_mask = self._update_causal_mask(
attention_mask, inputs_embeds, cache_position, past_key_values, output_attentions
)
hidden_states = inputs_embeds
# create position embeddings to be shared across the decoder layers
position_embeddings = self.rotary_emb(hidden_states, position_ids)
# decoder layers
all_hidden_states = () if output_hidden_states else None
all_self_attns = () if output_attentions else None
next_decoder_cache = None
for decoder_layer in self.layers:
if output_hidden_states:
all_hidden_states += (hidden_states,)
if self.gradient_checkpointing and self.training:
layer_outputs = self._gradient_checkpointing_func(
decoder_layer.__call__,
hidden_states,
causal_mask,
position_ids,
past_key_values,
output_attentions,
use_cache,
cache_position,
position_embeddings,
)
else:
layer_outputs = decoder_layer(
hidden_states,
attention_mask=causal_mask,
position_ids=position_ids,
past_key_value=past_key_values,
output_attentions=output_attentions,
use_cache=use_cache,
cache_position=cache_position,
position_embeddings=position_embeddings,
)
hidden_states = layer_outputs[0]
if use_cache:
next_decoder_cache = layer_outputs[2 if output_attentions else 1]
if output_attentions:
all_self_attns += (layer_outputs[1],)
hidden_states = self.norm(hidden_states)
# add hidden states from the last decoder layer
if output_hidden_states:
all_hidden_states += (hidden_states,)
next_cache = None
if use_cache:
next_cache = next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache
if not return_dict:
return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None)
return BaseModelOutputWithPast(
last_hidden_state=hidden_states,
past_key_values=next_cache,
hidden_states=all_hidden_states,
attentions=all_self_attns,
)
# Copied from transformers.models.llama.modeling_llama.LlamaModel._update_causal_mask
这里内容有点多,我们看核心:
inputs_embeds = self.embed_tokens(input_ids)
# embed positions
hidden_states = inputs_embeds
for idx, decoder_layer in enumerate(self.layers):
# 将所有的hidden_states保存成tuple
if output_hidden_states:
all_hidden_states += (hidden_states,)
# 将hs送入每一层decoder_layer
layer_outputs = decoder_layer(
hidden_states,
attention_mask=attention_mask,
position_ids=position_ids,
past_key_value=past_key_value,
output_attentions=output_attentions,
use_cache=use_cache,
)
# 取出上一层decoder_输出的hs,再传入下一个layer
# 只要第一个,第二个是cache的一个类,然后进入下一个layer
hidden_states = layer_outputs[0]
# 将最后layers输出后的hidden_states进行标准化
hidden_states = self.norm(hidden_states)
# 加上最后一层的hidden_states
if output_hidden_states:
all_hidden_states += (hidden_states,)
- 如果保存
output_hidden_states
的话,就是第一个为input_ids
进行emb
,然后保存到n-1
层的decoder_layer
的输出hs
,再加上最后一层layer
的输出hs
进行过norm
后的hs
. - 最后是以
BaseModelOutputWithPast
的形式输出。
1.3 RMSNorm
计算公式:
其中:
- x是层的输入的
hidden_state
- 表示的是
hidden_state
的最后一个维度的值 - n 表示上面输入的最后一个维度的数量。
- ϵ 表示是很小的数,防止除0。
class Qwen2RMSNorm(nn.Module): # 标准化层
def __init__(self, hidden_size, eps=1e-6):
"""
Qwen2RMSNorm is equivalent to T5LayerNorm
"""
super().__init__()
self.weight = nn.Parameter(torch.ones(hidden_size))
self.variance_epsilon = eps
def forward(self, hidden_states):
input_dtype = hidden_states.dtype
hidden_states = hidden_states.to(torch.float32)
variance = hidden_states.pow(2).mean(-1, keepdim=True)
hidden_states = hidden_states * torch.rsqrt(variance + self.variance_epsilon)
return self.weight * hidden_states.to(input_dtype)
torch.rsqrt
表示输入的东西开根的导数。.pow(2).mean(-1, keepdim=True)
表示对最后一个维度平方并取均值。
2. Qwen2Attention
标签:None,universe,self,cache,Tiny,states,Qwen,hidden,config From: https://blog.csdn.net/m0_74922316/article/details/142302307