diffusers 源码解析(三)
.\diffusers\loaders\textual_inversion.py
# 版权声明,表示该文件的所有权及使用条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 许可证信息,指明该文件遵循的开源许可证
# Licensed under the Apache License, Version 2.0 (the "License");
# 使用本文件需遵循许可证的规定
# you may not use this file except in compliance with the License.
# 获取许可证的链接
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 免责声明,表示在法律允许的范围内不承担任何责任
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 从 typing 模块导入所需类型,便于类型注解
from typing import Dict, List, Optional, Union
# 导入 safetensors 库,可能用于安全的张量处理
import safetensors
# 导入 PyTorch 库,便于深度学习模型的构建与训练
import torch
# 导入验证 Hugging Face Hub 参数的函数
from huggingface_hub.utils import validate_hf_hub_args
# 从 PyTorch 导入神经网络模块
from torch import nn
# 根据可用性导入 transformers 模块的预训练模型与分词器
from ..models.modeling_utils import load_state_dict
# 导入工具函数,处理模型文件、检查依赖等
from ..utils import _get_model_file, is_accelerate_available, is_transformers_available, logging
# 检查 transformers 库是否可用,如果可用则导入相关类
if is_transformers_available():
from transformers import PreTrainedModel, PreTrainedTokenizer
# 检查 accelerate 库是否可用,如果可用则导入相关钩子
if is_accelerate_available():
from accelerate.hooks import AlignDevicesHook, CpuOffload, remove_hook_from_module
# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)
# 定义文本反转所需的文件名
TEXT_INVERSION_NAME = "learned_embeds.bin"
# 定义安全版本的文本反转文件名
TEXT_INVERSION_NAME_SAFE = "learned_embeds.safetensors"
# 装饰器,用于验证 Hugging Face Hub 的参数
@validate_hf_hub_args
# 定义加载文本反转状态字典的函数
def load_textual_inversion_state_dicts(pretrained_model_name_or_paths, **kwargs):
# 从关键字参数中获取缓存目录,默认为 None
cache_dir = kwargs.pop("cache_dir", None)
# 从关键字参数中获取是否强制下载的标志,默认为 False
force_download = kwargs.pop("force_download", False)
# 从关键字参数中获取代理设置,默认为 None
proxies = kwargs.pop("proxies", None)
# 从关键字参数中获取本地文件是否仅使用的标志,默认为 None
local_files_only = kwargs.pop("local_files_only", None)
# 从关键字参数中获取访问令牌,默认为 None
token = kwargs.pop("token", None)
# 从关键字参数中获取版本号,默认为 None
revision = kwargs.pop("revision", None)
# 从关键字参数中获取子文件夹名称,默认为 None
subfolder = kwargs.pop("subfolder", None)
# 从关键字参数中获取权重文件名,默认为 None
weight_name = kwargs.pop("weight_name", None)
# 从关键字参数中获取是否使用 safetensors 的标志,默认为 None
use_safetensors = kwargs.pop("use_safetensors", None)
# 设置允许使用 pickle 的标志为 False
allow_pickle = False
# 如果未指定使用 safetensors,则默认启用,并允许使用 pickle
if use_safetensors is None:
use_safetensors = True
allow_pickle = True
# 设置用户代理信息,用于标识请求的类型和框架
user_agent = {
"file_type": "text_inversion",
"framework": "pytorch",
}
# 初始化状态字典列表
state_dicts = []
# 遍历预训练模型名称或路径列表
for pretrained_model_name_or_path in pretrained_model_name_or_paths:
# 检查当前项是否不是字典或张量
if not isinstance(pretrained_model_name_or_path, (dict, torch.Tensor)):
# 初始化模型文件为 None
model_file = None
# 尝试加载 .safetensors 权重
if (use_safetensors and weight_name is None) or (
weight_name is not None and weight_name.endswith(".safetensors")
):
try:
# 获取模型文件,提供相关参数
model_file = _get_model_file(
pretrained_model_name_or_path,
weights_name=weight_name or TEXT_INVERSION_NAME_SAFE,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
local_files_only=local_files_only,
token=token,
revision=revision,
subfolder=subfolder,
user_agent=user_agent,
)
# 从文件中加载状态字典到 CPU 上
state_dict = safetensors.torch.load_file(model_file, device="cpu")
except Exception as e:
# 如果不允许 pickle,抛出异常
if not allow_pickle:
raise e
# 如果加载失败,设置模型文件为 None
model_file = None
# 如果模型文件仍然是 None,则尝试加载其他格式
if model_file is None:
model_file = _get_model_file(
pretrained_model_name_or_path,
weights_name=weight_name or TEXT_INVERSION_NAME,
cache_dir=cache_dir,
force_download=force_download,
proxies=proxies,
local_files_only=local_files_only,
token=token,
revision=revision,
subfolder=subfolder,
user_agent=user_agent,
)
# 从文件中加载状态字典
state_dict = load_state_dict(model_file)
else:
# 如果当前项是字典或张量,直接使用它作为状态字典
state_dict = pretrained_model_name_or_path
# 将状态字典添加到列表中
state_dicts.append(state_dict)
# 返回状态字典列表
return state_dicts
# 定义一个混合类,用于加载文本反转的标记和嵌入到分词器和文本编码器中
class TextualInversionLoaderMixin:
r"""
加载文本反转标记和嵌入到分词器和文本编码器中。
"""
# 定义一个方法,根据输入的提示和分词器可能进行转换
def maybe_convert_prompt(self, prompt: Union[str, List[str]], tokenizer: "PreTrainedTokenizer"): # noqa: F821
r"""
处理包含特殊标记的提示,这些标记对应于多向量文本反转嵌入,将其替换为多个
特殊标记,每个对应一个向量。如果提示没有文本反转标记或文本反转标记是单个向量,
则返回输入提示。
参数:
prompt (`str` 或 list of `str`):
引导图像生成的提示。
tokenizer (`PreTrainedTokenizer`):
负责将提示编码为输入标记的分词器。
返回:
`str` 或 list of `str`: 转换后的提示
"""
# 检查输入提示是否为列表,如果不是则将其转换为列表
if not isinstance(prompt, List):
prompts = [prompt]
else:
prompts = prompt
# 对每个提示应用可能的转换
prompts = [self._maybe_convert_prompt(p, tokenizer) for p in prompts]
# 如果输入提示不是列表,则返回第一个转换后的提示
if not isinstance(prompt, List):
return prompts[0]
# 返回转换后的提示列表
return prompts
# 定义一个私有方法,可能将提示转换为“多向量”兼容提示
def _maybe_convert_prompt(self, prompt: str, tokenizer: "PreTrainedTokenizer"): # noqa: F821
r"""
可能将提示转换为“多向量”兼容提示。如果提示包含一个与多向量文本反转嵌入
对应的标记,该函数将处理提示,使特殊标记被多个特殊标记替换,每个对应一个向量。
如果提示没有文本反转标记或文本反转标记是单个向量,则简单返回输入提示。
参数:
prompt (`str`):
引导图像生成的提示。
tokenizer (`PreTrainedTokenizer`):
负责将提示编码为输入标记的分词器。
返回:
`str`: 转换后的提示
"""
# 使用分词器对提示进行分词
tokens = tokenizer.tokenize(prompt)
# 创建一个唯一标记的集合
unique_tokens = set(tokens)
# 遍历唯一标记
for token in unique_tokens:
# 检查标记是否在添加的标记编码器中
if token in tokenizer.added_tokens_encoder:
replacement = token # 初始化替换变量为当前标记
i = 1 # 初始化计数器
# 生成替换标记,直到没有更多的标记存在
while f"{token}_{i}" in tokenizer.added_tokens_encoder:
replacement += f" {token}_{i}" # 添加计数到替换变量
i += 1 # 增加计数器
# 在提示中替换原始标记为生成的替换标记
prompt = prompt.replace(token, replacement)
# 返回最终的提示
return prompt
# 定义检查文本反转输入参数的私有方法
def _check_text_inv_inputs(self, tokenizer, text_encoder, pretrained_model_name_or_paths, tokens):
# 检查传入的 tokenizer 是否为 None,如果是,则抛出 ValueError
if tokenizer is None:
raise ValueError(
# 报错信息,说明需要提供 tokenizer 参数
f"{self.__class__.__name__} requires `self.tokenizer` or passing a `tokenizer` of type `PreTrainedTokenizer` for calling"
f" `{self.load_textual_inversion.__name__}`"
)
# 检查传入的 text_encoder 是否为 None,如果是,则抛出 ValueError
if text_encoder is None:
raise ValueError(
# 报错信息,说明需要提供 text_encoder 参数
f"{self.__class__.__name__} requires `self.text_encoder` or passing a `text_encoder` of type `PreTrainedModel` for calling"
f" `{self.load_textual_inversion.__name__}`"
)
# 检查预训练模型名称列表的长度与 tokens 列表的长度是否一致
if len(pretrained_model_name_or_paths) > 1 and len(pretrained_model_name_or_paths) != len(tokens):
raise ValueError(
# 报错信息,说明模型列表与 tokens 列表的长度不匹配
f"You have passed a list of models of length {len(pretrained_model_name_or_paths)}, and list of tokens of length {len(tokens)} "
f"Make sure both lists have the same length."
)
# 过滤出有效的 tokens,即不为 None 的 tokens
valid_tokens = [t for t in tokens if t is not None]
# 检查有效 tokens 的集合长度是否小于有效 tokens 的列表长度,如果是,则说明有重复
if len(set(valid_tokens)) < len(valid_tokens):
raise ValueError(f"You have passed a list of tokens that contains duplicates: {tokens}")
# 定义一个静态方法
@staticmethod
# 定义一个私有方法,用于检索 tokens 和 embeddings
def _retrieve_tokens_and_embeddings(tokens, state_dicts, tokenizer):
# 初始化空列表以存储所有 tokens 和 embeddings
all_tokens = []
all_embeddings = []
# 同时遍历状态字典和 tokens
for state_dict, token in zip(state_dicts, tokens):
# 检查状态字典是否为 PyTorch 张量
if isinstance(state_dict, torch.Tensor):
# 如果 token 为 None,抛出错误
if token is None:
raise ValueError(
"You are trying to load a textual inversion embedding that has been saved as a PyTorch tensor. Make sure to pass the name of the corresponding token in this case: `token=...`."
)
# 加载 token 和 embedding
loaded_token = token
embedding = state_dict
# 检查状态字典是否只包含一个键
elif len(state_dict) == 1:
# 处理 diffusers 格式
loaded_token, embedding = next(iter(state_dict.items()))
# 检查状态字典是否包含 "string_to_param" 键
elif "string_to_param" in state_dict:
# 处理 A1111 格式
loaded_token = state_dict["name"]
embedding = state_dict["string_to_param"]["*"]
else:
# 抛出状态字典格式错误的错误
raise ValueError(
f"Loaded state dictionary is incorrect: {state_dict}. \n\n"
"Please verify that the loaded state dictionary of the textual embedding either only has a single key or includes the `string_to_param`"
" input key."
)
# 如果 token 不为 None 且加载的 token 与当前 token 不同,记录日志
if token is not None and loaded_token != token:
logger.info(f"The loaded token: {loaded_token} is overwritten by the passed token {token}.")
else:
# 将加载的 token 赋值给当前 token
token = loaded_token
# 检查 token 是否已经在 tokenizer 的词汇表中
if token in tokenizer.get_vocab():
# 如果已存在,抛出错误
raise ValueError(
f"Token {token} already in tokenizer vocabulary. Please choose a different token name or remove {token} and embedding from the tokenizer and text encoder."
)
# 将 token 和 embedding 添加到对应列表中
all_tokens.append(token)
all_embeddings.append(embedding)
# 返回所有的 tokens 和 embeddings
return all_tokens, all_embeddings
# 声明该方法为静态方法
@staticmethod
# 扩展给定的令牌和嵌入,将多向量令牌和其嵌入整合到一起
def _extend_tokens_and_embeddings(tokens, embeddings, tokenizer):
# 初始化一个空列表以存储所有令牌
all_tokens = []
# 初始化一个空列表以存储所有嵌入
all_embeddings = []
# 遍历嵌入和令牌的配对
for embedding, token in zip(embeddings, tokens):
# 检查令牌是否已经在词汇表中
if f"{token}_1" in tokenizer.get_vocab():
# 如果令牌已经存在,初始化多向量令牌列表
multi_vector_tokens = [token]
# 初始化索引
i = 1
# 检查是否有后续的多向量令牌
while f"{token}_{i}" in tokenizer.added_tokens_encoder:
# 将多向量令牌添加到列表中
multi_vector_tokens.append(f"{token}_{i}")
# 递增索引
i += 1
# 抛出异常,提示多向量令牌已经存在
raise ValueError(
f"Multi-vector Token {multi_vector_tokens} already in tokenizer vocabulary. Please choose a different token name or remove the {multi_vector_tokens} and embedding from the tokenizer and text encoder."
)
# 判断当前嵌入是否为多维向量
is_multi_vector = len(embedding.shape) > 1 and embedding.shape[0] > 1
if is_multi_vector:
# 如果是多维向量,将令牌及其索引添加到列表中
all_tokens += [token] + [f"{token}_{i}" for i in range(1, embedding.shape[0])]
# 添加对应的所有嵌入到列表中
all_embeddings += [e for e in embedding] # noqa: C416
else:
# 如果不是多维向量,仅添加当前令牌
all_tokens += [token]
# 根据嵌入的维度添加嵌入
all_embeddings += [embedding[0]] if len(embedding.shape) > 1 else [embedding]
# 返回所有令牌和嵌入的列表
return all_tokens, all_embeddings
# 装饰器,用于验证 Hugging Face Hub 参数
@validate_hf_hub_args
# 加载文本反转(Textual Inversion)模型
def load_textual_inversion(
# 预训练模型的名称或路径,支持多种格式
pretrained_model_name_or_path: Union[str, List[str], Dict[str, torch.Tensor], List[Dict[str, torch.Tensor]]],
# 可选的令牌
token: Optional[Union[str, List[str]]] = None,
# 可选的预训练分词器
tokenizer: Optional["PreTrainedTokenizer"] = None, # noqa: F821
# 可选的文本编码器
text_encoder: Optional["PreTrainedModel"] = None, # noqa: F821
# 其他关键字参数
**kwargs,
):
# 省略具体实现
# 卸载文本反转(Textual Inversion)模型
def unload_textual_inversion(
# 可选的令牌
tokens: Optional[Union[str, List[str]]] = None,
# 可选的预训练分词器
tokenizer: Optional["PreTrainedTokenizer"] = None,
# 可选的文本编码器
text_encoder: Optional["PreTrainedModel"] = None,
):
# 省略具体实现
.\diffusers\loaders\unet.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件
# 在许可证下分发是按“原样”基础进行的,
# 不提供任何形式的明示或暗示的担保或条件。
# 有关许可证下特定语言管理权限和
# 限制的信息,请参阅许可证。
import os # 导入操作系统模块,提供与操作系统交互的功能
from collections import defaultdict # 导入默认字典,用于处理键不存在时的默认值
from contextlib import nullcontext # 导入空上下文管理器,提供不做任何操作的上下文
from pathlib import Path # 导入路径库,便于处理文件路径
from typing import Callable, Dict, Union # 导入类型提示,提供函数、字典和联合类型的支持
import safetensors # 导入 safetensors 库,处理安全张量
import torch # 导入 PyTorch 库,深度学习框架
import torch.nn.functional as F # 导入 PyTorch 的神经网络功能模块
from huggingface_hub.utils import validate_hf_hub_args # 导入用于验证 Hugging Face Hub 参数的工具
from torch import nn # 从 PyTorch 导入神经网络模块
from ..models.embeddings import ( # 从父级目录导入嵌入模型
ImageProjection, # 导入图像投影类
IPAdapterFaceIDImageProjection, # 导入人脸识别图像投影类
IPAdapterFaceIDPlusImageProjection, # 导入增强的人脸识别图像投影类
IPAdapterFullImageProjection, # 导入完整图像投影类
IPAdapterPlusImageProjection, # 导入增强图像投影类
MultiIPAdapterImageProjection, # 导入多种图像投影类
)
from ..models.modeling_utils import load_model_dict_into_meta, load_state_dict # 导入模型加载工具
from ..utils import ( # 从父级目录导入工具模块
USE_PEFT_BACKEND, # 导入使用 PEFT 后端的标志
_get_model_file, # 导入获取模型文件的函数
convert_unet_state_dict_to_peft, # 导入转换 UNet 状态字典到 PEFT 的函数
get_adapter_name, # 导入获取适配器名称的函数
get_peft_kwargs, # 导入获取 PEFT 参数的函数
is_accelerate_available, # 导入检查加速可用性的函数
is_peft_version, # 导入检查 PEFT 版本的函数
is_torch_version, # 导入检查 PyTorch 版本的函数
logging, # 导入日志模块
)
from .lora_pipeline import LORA_WEIGHT_NAME, LORA_WEIGHT_NAME_SAFE, TEXT_ENCODER_NAME, UNET_NAME # 从当前目录导入 LoRA 权重名称和模型名称
from .utils import AttnProcsLayers # 从当前目录导入注意力处理层
if is_accelerate_available(): # 检查是否可以使用加速功能
from accelerate.hooks import AlignDevicesHook, CpuOffload, remove_hook_from_module # 导入加速库的钩子函数
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器
CUSTOM_DIFFUSION_WEIGHT_NAME = "pytorch_custom_diffusion_weights.bin" # 定义自定义扩散权重的文件名
CUSTOM_DIFFUSION_WEIGHT_NAME_SAFE = "pytorch_custom_diffusion_weights.safetensors" # 定义安全自定义扩散权重的文件名
class UNet2DConditionLoadersMixin: # 定义一个混合类,用于加载 LoRA 层
"""
将 LoRA 层加载到 [`UNet2DCondtionModel`] 中。
""" # 类文档字符串,说明该类的作用
text_encoder_name = TEXT_ENCODER_NAME # 定义文本编码器名称
unet_name = UNET_NAME # 定义 UNet 名称
@validate_hf_hub_args # 使用装饰器验证 Hugging Face Hub 参数
# 定义处理自定义扩散的方法,接收状态字典作为参数
def _process_custom_diffusion(self, state_dict):
# 从模块中导入自定义扩散注意力处理器
from ..models.attention_processor import CustomDiffusionAttnProcessor
# 初始化空字典,用于存储注意力处理器
attn_processors = {}
# 使用 defaultdict 初始化一个字典,用于分组自定义扩散数据
custom_diffusion_grouped_dict = defaultdict(dict)
# 遍历状态字典中的每一项
for key, value in state_dict.items():
# 如果当前值为空,设置分组字典的对应键为空字典
if len(value) == 0:
custom_diffusion_grouped_dict[key] = {}
else:
# 如果键中包含"to_out",则提取相应的处理器键和子键
if "to_out" in key:
attn_processor_key, sub_key = ".".join(key.split(".")[:-3]), ".".join(key.split(".")[-3:])
else:
# 否则,按另一种方式提取处理器键和子键
attn_processor_key, sub_key = ".".join(key.split(".")[:-2]), ".".join(key.split(".")[-2:])
# 将值存储到分组字典中
custom_diffusion_grouped_dict[attn_processor_key][sub_key] = value
# 遍历分组字典中的每一项
for key, value_dict in custom_diffusion_grouped_dict.items():
# 如果值字典为空,初始化自定义扩散注意力处理器
if len(value_dict) == 0:
attn_processors[key] = CustomDiffusionAttnProcessor(
train_kv=False, train_q_out=False, hidden_size=None, cross_attention_dim=None
)
else:
# 获取交叉注意力维度
cross_attention_dim = value_dict["to_k_custom_diffusion.weight"].shape[1]
# 获取隐藏层大小
hidden_size = value_dict["to_k_custom_diffusion.weight"].shape[0]
# 判断是否训练 q 输出
train_q_out = True if "to_q_custom_diffusion.weight" in value_dict else False
# 初始化自定义扩散注意力处理器并传入参数
attn_processors[key] = CustomDiffusionAttnProcessor(
train_kv=True,
train_q_out=train_q_out,
hidden_size=hidden_size,
cross_attention_dim=cross_attention_dim,
)
# 加载状态字典到注意力处理器
attn_processors[key].load_state_dict(value_dict)
# 返回注意力处理器的字典
return attn_processors
# 类方法的装饰器
@classmethod
# 从 diffusers.loaders.lora_base.LoraBaseMixin 中复制的方法,用于选择性禁用卸载功能
# 定义一个类方法,用于选择性地禁用模型的 CPU 离线
def _optionally_disable_offloading(cls, _pipeline):
"""
可选地移除离线处理,如果管道已经被顺序离线到 CPU。
Args:
_pipeline (`DiffusionPipeline`):
需要禁用离线处理的管道。
Returns:
tuple:
一个元组,指示 `is_model_cpu_offload` 或 `is_sequential_cpu_offload` 是否为 True。
"""
# 初始化模型 CPU 离线标志为 False
is_model_cpu_offload = False
# 初始化顺序 CPU 离线标志为 False
is_sequential_cpu_offload = False
# 如果管道不为 None 且 hf_device_map 为 None
if _pipeline is not None and _pipeline.hf_device_map is None:
# 遍历管道中的每个组件
for _, component in _pipeline.components.items():
# 检查组件是否为 nn.Module 类型并且具有 _hf_hook 属性
if isinstance(component, nn.Module) and hasattr(component, "_hf_hook"):
# 如果模型尚未 CPU 离线
if not is_model_cpu_offload:
# 检查组件的 _hf_hook 是否为 CpuOffload 类型
is_model_cpu_offload = isinstance(component._hf_hook, CpuOffload)
# 如果顺序离线尚未设置
if not is_sequential_cpu_offload:
# 检查 _hf_hook 是否为 AlignDevicesHook 类型,或者其 hooks 属性的第一个元素是否为 AlignDevicesHook
is_sequential_cpu_offload = (
isinstance(component._hf_hook, AlignDevicesHook)
or hasattr(component._hf_hook, "hooks")
and isinstance(component._hf_hook.hooks[0], AlignDevicesHook)
)
# 记录信息,指示检测到加速钩子并即将移除之前的钩子
logger.info(
"Accelerate hooks detected. Since you have called `load_lora_weights()`, the previous hooks will be first removed. Then the LoRA parameters will be loaded and the hooks will be applied again."
)
# 从组件中移除钩子,是否递归取决于顺序离线标志
remove_hook_from_module(component, recurse=is_sequential_cpu_offload)
# 返回 CPU 离线标志的元组
return (is_model_cpu_offload, is_sequential_cpu_offload)
# 定义保存注意力处理器的方法
def save_attn_procs(
# 保存目录,支持字符串或路径对象
save_directory: Union[str, os.PathLike],
# 主进程标志,默认值为 True
is_main_process: bool = True,
# 权重名称,默认值为 None
weight_name: str = None,
# 保存功能,默认值为 None
save_function: Callable = None,
# 安全序列化标志,默认值为 True
safe_serialization: bool = True,
# 其他关键字参数
**kwargs,
):
# 定义获取自定义扩散状态字典的方法
def _get_custom_diffusion_state_dict(self):
# 从模型中导入自定义注意力处理器
from ..models.attention_processor import (
CustomDiffusionAttnProcessor,
CustomDiffusionAttnProcessor2_0,
CustomDiffusionXFormersAttnProcessor,
)
# 创建要保存的注意力处理器层
model_to_save = AttnProcsLayers(
{
# 过滤出类型为自定义注意力处理器的项
y: x
for (y, x) in self.attn_processors.items()
if isinstance(
x,
(
CustomDiffusionAttnProcessor,
CustomDiffusionAttnProcessor2_0,
CustomDiffusionXFormersAttnProcessor,
),
)
}
)
# 获取模型的状态字典
state_dict = model_to_save.state_dict()
# 遍历注意力处理器
for name, attn in self.attn_processors.items():
# 如果当前注意力处理器的状态字典为空
if len(attn.state_dict()) == 0:
# 在状态字典中为该名称添加空字典
state_dict[name] = {}
# 返回状态字典
return state_dict
# 加载 IP 适配器权重的私有方法
def _load_ip_adapter_weights(self, state_dicts, low_cpu_mem_usage=False):
# 检查 state_dicts 是否为列表,如果不是则转换为列表
if not isinstance(state_dicts, list):
state_dicts = [state_dicts]
# 如果已有编码器隐藏投影且配置为文本投影,则赋值给文本编码器隐藏投影
if (
self.encoder_hid_proj is not None
and self.config.encoder_hid_dim_type == "text_proj"
and not hasattr(self, "text_encoder_hid_proj")
):
self.text_encoder_hid_proj = self.encoder_hid_proj
# 在加载 IP 适配器权重后将 encoder_hid_proj 设置为 None
self.encoder_hid_proj = None
# 将 IP 适配器的注意力处理器转换为 Diffusers 格式
attn_procs = self._convert_ip_adapter_attn_to_diffusers(state_dicts, low_cpu_mem_usage=low_cpu_mem_usage)
# 设置注意力处理器
self.set_attn_processor(attn_procs)
# 转换 IP 适配器图像投影层为 Diffusers 格式
image_projection_layers = []
# 遍历每个 state_dict,转换图像投影层
for state_dict in state_dicts:
image_projection_layer = self._convert_ip_adapter_image_proj_to_diffusers(
state_dict["image_proj"], low_cpu_mem_usage=low_cpu_mem_usage
)
# 将转换后的图像投影层添加到列表中
image_projection_layers.append(image_projection_layer)
# 创建多重 IP 适配器图像投影并赋值给 encoder_hid_proj
self.encoder_hid_proj = MultiIPAdapterImageProjection(image_projection_layers)
# 更新编码器隐藏维度类型为图像投影
self.config.encoder_hid_dim_type = "ip_image_proj"
# 将模型转移到指定的数据类型和设备
self.to(dtype=self.dtype, device=self.device)
# 加载 IP 适配器的 LoRA 权重,返回包含这些权重的字典
def _load_ip_adapter_loras(self, state_dicts):
# 初始化空字典以存储 LoRA 权重
lora_dicts = {}
# 遍历注意力处理器的键值对,获取索引和名称
for key_id, name in enumerate(self.attn_processors.keys()):
# 遍历每个状态字典
for i, state_dict in enumerate(state_dicts):
# 检查当前状态字典中是否包含特定的 LoRA 权重
if f"{key_id}.to_k_lora.down.weight" in state_dict["ip_adapter"]:
# 如果该索引不在字典中,则初始化为空字典
if i not in lora_dicts:
lora_dicts[i] = {}
# 更新字典,添加 'to_k_lora.down.weight' 的权重
lora_dicts[i].update(
{
f"unet.{name}.to_k_lora.down.weight": state_dict["ip_adapter"][
f"{key_id}.to_k_lora.down.weight"
]
}
)
# 更新字典,添加 'to_q_lora.down.weight' 的权重
lora_dicts[i].update(
{
f"unet.{name}.to_q_lora.down.weight": state_dict["ip_adapter"][
f"{key_id}.to_q_lora.down.weight"
]
}
)
# 更新字典,添加 'to_v_lora.down.weight' 的权重
lora_dicts[i].update(
{
f"unet.{name}.to_v_lora.down.weight": state_dict["ip_adapter"][
f"{key_id}.to_v_lora.down.weight"
]
}
)
# 更新字典,添加 'to_out_lora.down.weight' 的权重
lora_dicts[i].update(
{
f"unet.{name}.to_out_lora.down.weight": state_dict["ip_adapter"][
f"{key_id}.to_out_lora.down.weight"
]
}
)
# 更新字典,添加 'to_k_lora.up.weight' 的权重
lora_dicts[i].update(
{f"unet.{name}.to_k_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_k_lora.up.weight"]}
)
# 更新字典,添加 'to_q_lora.up.weight' 的权重
lora_dicts[i].update(
{f"unet.{name}.to_q_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_q_lora.up.weight"]}
)
# 更新字典,添加 'to_v_lora.up.weight' 的权重
lora_dicts[i].update(
{f"unet.{name}.to_v_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_v_lora.up.weight"]}
)
# 更新字典,添加 'to_out_lora.up.weight' 的权重
lora_dicts[i].update(
{
f"unet.{name}.to_out_lora.up.weight": state_dict["ip_adapter"][
f"{key_id}.to_out_lora.up.weight"
]
}
)
# 返回包含所有 LoRA 权重的字典
return lora_dicts
.\diffusers\loaders\unet_loader_utils.py
# 版权声明,标识本代码的版权所有者及其保留的权利
#
# 根据 Apache License, Version 2.0 进行许可;除非符合许可条款,否则不得使用此文件
# 可以在以下网址获取许可副本
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用的法律或书面协议另有规定,否则根据许可分发的软件以 "按现状" 基础提供,
# 不附加任何明示或暗示的担保或条件
# 请参阅许可协议以了解有关许可及其限制的详细信息
import copy # 导入 copy 模块,用于对象的浅拷贝或深拷贝
from typing import TYPE_CHECKING, Dict, List, Union # 导入类型注释支持
from ..utils import logging # 从上级模块导入 logging 功能
if TYPE_CHECKING:
# 在这里导入以避免循环导入问题
from ..models import UNet2DConditionModel # 从上级模块导入 UNet2DConditionModel
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,禁用 pylint 的命名检查
def _translate_into_actual_layer_name(name):
"""将用户友好的名称(例如 'mid')转换为实际层名称(例如 'mid_block.attentions.0')"""
if name == "mid":
return "mid_block.attentions.0" # 如果名称是 'mid',返回其对应的实际层名
updown, block, attn = name.split(".") # 将名称按 '.' 分割成上下文、块和注意力部分
updown = updown.replace("down", "down_blocks").replace("up", "up_blocks") # 替换上下文中的 'down' 和 'up'
block = block.replace("block_", "") # 去掉块名称中的 'block_' 前缀
attn = "attentions." + attn # 将注意力部分格式化为完整名称
return ".".join((updown, block, attn)) # 将所有部分合并为实际层名并返回
def _maybe_expand_lora_scales(
unet: "UNet2DConditionModel", weight_scales: List[Union[float, Dict]], default_scale=1.0
):
# 可能扩展 LoRA 权重比例,接受 UNet 模型和权重比例列表作为参数
blocks_with_transformer = {
"down": [i for i, block in enumerate(unet.down_blocks) if hasattr(block, "attentions")],
# 找到下层块中具有注意力层的块索引
"up": [i for i, block in enumerate(unet.up_blocks) if hasattr(block, "attentions")]
# 找到上层块中具有注意力层的块索引
}
transformer_per_block = {"down": unet.config.layers_per_block, "up": unet.config.layers_per_block + 1}
# 创建字典,包含每个块的变换层数量
expanded_weight_scales = [
_maybe_expand_lora_scales_for_one_adapter(
weight_for_adapter,
blocks_with_transformer,
transformer_per_block,
unet.state_dict(),
default_scale=default_scale,
)
# 对每个适配器的权重调用扩展函数,生成扩展后的权重比例列表
for weight_for_adapter in weight_scales
]
return expanded_weight_scales # 返回扩展后的权重比例
def _maybe_expand_lora_scales_for_one_adapter(
scales: Union[float, Dict],
blocks_with_transformer: Dict[str, int],
transformer_per_block: Dict[str, int],
state_dict: None,
default_scale: float = 1.0,
):
"""
将输入扩展为更细粒度的字典。以下示例提供了更多细节。
参数:
scales (`Union[float, Dict]`):
要扩展的比例字典。
blocks_with_transformer (`Dict[str, int]`):
包含 'up' 和 'down' 键的字典,显示哪些块具有变换层
transformer_per_block (`Dict[str, int]`):
包含 'up' 和 'down' 键的字典,显示每个块的变换层数量
例如,转换
```python
scales = {"down": 2, "mid": 3, "up": {"block_0": 4, "block_1": [5, 6, 7]}}
```py
# 定义一个字典,表示每个方向的块及其对应的编号
blocks_with_transformer = {"down": [1, 2], "up": [0, 1]}
# 定义一个字典,表示每个方向的块需要的变换器数量
transformer_per_block = {"down": 2, "up": 3}
# 如果 blocks_with_transformer 的键不是 "down" 和 "up",则抛出错误
if sorted(blocks_with_transformer.keys()) != ["down", "up"]:
raise ValueError("blocks_with_transformer needs to be a dict with keys `'down' and `'up'`")
# 如果 transformer_per_block 的键不是 "down" 和 "up",则抛出错误
if sorted(transformer_per_block.keys()) != ["down", "up"]:
raise ValueError("transformer_per_block needs to be a dict with keys `'down' and `'up'`")
# 如果 scales 不是字典类型,则直接返回其值
if not isinstance(scales, dict):
# don't expand if scales is a single number
return scales
# 复制 scales 的深拷贝,以避免修改原始数据
scales = copy.deepcopy(scales)
# 如果 scales 中没有 "mid",则赋予默认比例
if "mid" not in scales:
scales["mid"] = default_scale
# 如果 "mid" 是列表类型且仅有一个元素,则将其转换为该元素
elif isinstance(scales["mid"], list):
if len(scales["mid"]) == 1:
scales["mid"] = scales["mid"][0]
# 如果 "mid" 列表元素个数不为 1,则抛出错误
else:
raise ValueError(f"Expected 1 scales for mid, got {len(scales['mid'])}.")
# 遍历方向 "up" 和 "down"
for updown in ["up", "down"]:
# 如果当前方向不在 scales 中,则赋予默认比例
if updown not in scales:
scales[updown] = default_scale
# 如果当前方向的比例不是字典,则将其转换为字典格式
if not isinstance(scales[updown], dict):
scales[updown] = {f"block_{i}": copy.deepcopy(scales[updown]) for i in blocks_with_transformer[updown]}
# 遍历当前方向的每个块
for i in blocks_with_transformer[updown]:
block = f"block_{i}"
# 如果当前块未赋值,则设置为默认比例
if block not in scales[updown]:
scales[updown][block] = default_scale
# 如果块的比例不是列表,则转换为列表格式
if not isinstance(scales[updown][block], list):
scales[updown][block] = [scales[updown][block] for _ in range(transformer_per_block[updown])]
# 如果块的比例列表仅有一个元素,则扩展为多个元素
elif len(scales[updown][block]) == 1:
scales[updown][block] = scales[updown][block] * transformer_per_block[updown]
# 如果块的比例列表长度不匹配,则抛出错误
elif len(scales[updown][block]) != transformer_per_block[updown]:
raise ValueError(
f"Expected {transformer_per_block[updown]} scales for {updown}.{block}, got {len(scales[updown][block])}."
)
# 将 scales 中的当前方向块转换为扁平格式
for i in blocks_with_transformer[updown]:
block = f"block_{i}"
for tf_idx, value in enumerate(scales[updown][block]):
scales[f"{updown}.{block}.{tf_idx}"] = value
# 删除 scales 中当前方向的条目
del scales[updown]
# 遍历 scales 字典中的每一层
for layer in scales.keys():
# 检查该层是否在 state_dict 中存在
if not any(_translate_into_actual_layer_name(layer) in module for module in state_dict.keys()):
# 如果不存在,抛出值错误,提示该层无法设置 lora 缩放
raise ValueError(
f"Can't set lora scale for layer {layer}. It either doesn't exist in this unet or it has no attentions."
)
# 返回一个字典,键为实际层名,值为对应的权重
return {_translate_into_actual_layer_name(name): weight for name, weight in scales.items()}
.\diffusers\loaders\utils.py
# 版权声明,表明此文件的所有权归 HuggingFace 团队所有
#
# 根据 Apache 许可证第2.0版(“许可证”)许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件是基于“按现状”提供的,
# 不附带任何明示或暗示的保证或条件。
# 请参阅许可证以了解与权限和
# 限制相关的特定语言。
# 导入类型字典
from typing import Dict
# 导入 PyTorch 库
import torch
# 定义一个名为 AttnProcsLayers 的类,继承自 torch.nn.Module
class AttnProcsLayers(torch.nn.Module):
# 初始化方法,接受一个字典作为参数
def __init__(self, state_dict: Dict[str, torch.Tensor]):
# 调用父类的初始化方法
super().__init__()
# 将字典的值存储到一个 ModuleList 中
self.layers = torch.nn.ModuleList(state_dict.values())
# 创建一个映射字典,将索引与键关联
self.mapping = dict(enumerate(state_dict.keys()))
# 创建反向映射字典,将键与索引关联
self.rev_mapping = {v: k for k, v in enumerate(state_dict.keys())}
# 定义用于分割的关键字列表,分别用于处理器和自注意力
self.split_keys = [".processor", ".self_attn"]
# 定义一个将状态字典映射到模块的函数
def map_to(module, state_dict, *args, **kwargs):
new_state_dict = {} # 创建一个新的状态字典
# 遍历状态字典中的每个键值对
for key, value in state_dict.items():
num = int(key.split(".")[1]) # 提取数字部分,0 总是“layers”
# 根据映射生成新的键
new_key = key.replace(f"layers.{num}", module.mapping[num])
new_state_dict[new_key] = value # 存储到新字典中
return new_state_dict # 返回新状态字典
# 定义一个用于重新映射键的函数
def remap_key(key, state_dict):
# 遍历分割关键字
for k in self.split_keys:
if k in key: # 如果键包含分割关键字
return key.split(k)[0] + k # 返回处理后的键
# 如果没有找到匹配的分割关键字,抛出错误
raise ValueError(
f"There seems to be a problem with the state_dict: {set(state_dict.keys())}. {key} has to have one of {self.split_keys}."
)
# 定义一个将状态字典映射回模块的函数
def map_from(module, state_dict, *args, **kwargs):
all_keys = list(state_dict.keys()) # 获取所有键
# 遍历所有键
for key in all_keys:
replace_key = remap_key(key, state_dict) # 重新映射键
# 生成新的键
new_key = key.replace(replace_key, f"layers.{module.rev_mapping[replace_key]}")
state_dict[new_key] = state_dict[key] # 更新状态字典
del state_dict[key] # 删除旧的键
# 注册状态字典钩子以实现映射
self._register_state_dict_hook(map_to)
# 注册加载状态字典前的钩子以实现映射
self._register_load_state_dict_pre_hook(map_from, with_module=True)
.\diffusers\loaders\__init__.py
# 引入类型检查支持
from typing import TYPE_CHECKING
# 从上级目录导入工具函数和常量
from ..utils import DIFFUSERS_SLOW_IMPORT, _LazyModule, deprecate
from ..utils.import_utils import is_peft_available, is_torch_available, is_transformers_available
# 定义获取文本编码器 LORA 状态字典的函数
def text_encoder_lora_state_dict(text_encoder):
# 发出关于函数即将弃用的警告
deprecate(
"text_encoder_load_state_dict in `models`",
"0.27.0",
"`text_encoder_lora_state_dict` is deprecated and will be removed in 0.27.0. Make sure to retrieve the weights using `get_peft_model`. See https://huggingface.co/docs/peft/v0.6.2/en/quicktour#peftmodel for more information.",
)
# 初始化状态字典
state_dict = {}
# 遍历文本编码器的注意力模块
for name, module in text_encoder_attn_modules(text_encoder):
# 获取 q_proj 线性层的状态字典并更新状态字典
for k, v in module.q_proj.lora_linear_layer.state_dict().items():
state_dict[f"{name}.q_proj.lora_linear_layer.{k}"] = v
# 获取 k_proj 线性层的状态字典并更新状态字典
for k, v in module.k_proj.lora_linear_layer.state_dict().items():
state_dict[f"{name}.k_proj.lora_linear_layer.{k}"] = v
# 获取 v_proj 线性层的状态字典并更新状态字典
for k, v in module.v_proj.lora_linear_layer.state_dict().items():
state_dict[f"{name}.v_proj.lora_linear_layer.{k}"] = v
# 获取 out_proj 线性层的状态字典并更新状态字典
for k, v in module.out_proj.lora_linear_layer.state_dict().items():
state_dict[f"{name}.out_proj.lora_linear_layer.{k}"] = v
# 返回构建的状态字典
return state_dict
# 检查是否可用 Transformers 库
if is_transformers_available():
# 定义获取文本编码器注意力模块的函数
def text_encoder_attn_modules(text_encoder):
# 发出关于函数即将弃用的警告
deprecate(
"text_encoder_attn_modules in `models`",
"0.27.0",
"`text_encoder_lora_state_dict` is deprecated and will be removed in 0.27.0. Make sure to retrieve the weights using `get_peft_model`. See https://huggingface.co/docs/peft/v0.6.2/en/quicktour#peftmodel for more information.",
)
# 从 Transformers 导入相关模型
from transformers import CLIPTextModel, CLIPTextModelWithProjection
# 初始化注意力模块列表
attn_modules = []
# 检查文本编码器的类型并获取相应的注意力模块
if isinstance(text_encoder, (CLIPTextModel, CLIPTextModelWithProjection)):
for i, layer in enumerate(text_encoder.text_model.encoder.layers):
name = f"text_model.encoder.layers.{i}.self_attn"
mod = layer.self_attn
attn_modules.append((name, mod))
else:
# 如果不认识的编码器类型,抛出错误
raise ValueError(f"do not know how to get attention modules for: {text_encoder.__class__.__name__}")
# 返回注意力模块列表
return attn_modules
# 初始化导入结构字典
_import_structure = {}
# 检查是否可用 PyTorch 库
if is_torch_available():
# 更新导入结构以包含单文件模型
_import_structure["single_file_model"] = ["FromOriginalModelMixin"]
# 更新导入结构以包含 UNet
_import_structure["unet"] = ["UNet2DConditionLoadersMixin"]
# 更新导入结构以包含工具函数
_import_structure["utils"] = ["AttnProcsLayers"]
# 检查是否可以使用 transformers 库
if is_transformers_available():
# 将 "single_file" 模块的导入结构更新为包含 FromSingleFileMixin 类
_import_structure["single_file"] = ["FromSingleFileMixin"]
# 将 "lora_pipeline" 模块的导入结构更新为包含多个 LoraLoaderMixin 类
_import_structure["lora_pipeline"] = [
"AmusedLoraLoaderMixin", # 包含 AmusedLoraLoaderMixin 类
"StableDiffusionLoraLoaderMixin", # 包含 StableDiffusionLoraLoaderMixin 类
"SD3LoraLoaderMixin", # 包含 SD3LoraLoaderMixin 类
"StableDiffusionXLLoraLoaderMixin", # 包含 StableDiffusionXLLoraLoaderMixin 类
"LoraLoaderMixin", # 包含 LoraLoaderMixin 类
"FluxLoraLoaderMixin", # 包含 FluxLoraLoaderMixin 类
]
# 将 "textual_inversion" 模块的导入结构更新为包含 TextualInversionLoaderMixin 类
_import_structure["textual_inversion"] = ["TextualInversionLoaderMixin"]
# 将 "ip_adapter" 模块的导入结构更新为包含 IPAdapterMixin 类
_import_structure["ip_adapter"] = ["IPAdapterMixin"]
# 将 "peft" 模块的结构信息映射到包含的类
_import_structure["peft"] = ["PeftAdapterMixin"]
# 检查是否在类型检查模式或慢导入标志被设置
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 检查 PyTorch 是否可用
if is_torch_available():
# 从单文件模型导入混合类
from .single_file_model import FromOriginalModelMixin
# 从 UNet 导入混合类
from .unet import UNet2DConditionLoadersMixin
# 从工具模块导入 Attention 处理层类
from .utils import AttnProcsLayers
# 检查 Transformers 是否可用
if is_transformers_available():
# 从 IP 适配器模块导入混合类
from .ip_adapter import IPAdapterMixin
# 从 Lora 管道模块导入多个加载器混合类
from .lora_pipeline import (
AmusedLoraLoaderMixin,
FluxLoraLoaderMixin,
LoraLoaderMixin,
SD3LoraLoaderMixin,
StableDiffusionLoraLoaderMixin,
StableDiffusionXLLoraLoaderMixin,
)
# 从单文件模块导入混合类
from .single_file import FromSingleFileMixin
# 从文本反演模块导入混合类
from .textual_inversion import TextualInversionLoaderMixin
# 从 PEFT 模块导入 PeftAdapterMixin 类
from .peft import PeftAdapterMixin
else:
# 如果不在类型检查或慢导入状态,导入系统模块
import sys
# 使用懒加载模块,将当前模块替换为 _LazyModule 实例
sys.modules[__name__] = _LazyModule(__name__, globals()["__file__"], _import_structure, module_spec=__spec__)
.\diffusers\models\activations.py
# coding=utf-8 # 指定文件编码为 UTF-8
# Copyright 2024 HuggingFace Inc. # 版权声明
#
# Licensed under the Apache License, Version 2.0 (the "License"); # 许可声明
# you may not use this file except in compliance with the License. # 使用文件的合规性声明
# You may obtain a copy of the License at # 许可证获取说明
#
# http://www.apache.org/licenses/LICENSE-2.0 # 许可证的 URL
#
# Unless required by applicable law or agreed to in writing, software # 免责声明
# distributed under the License is distributed on an "AS IS" BASIS, # 以“现状”基础分发
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 不提供任何明示或暗示的保证
# See the License for the specific language governing permissions and # 查看许可证了解权限
# limitations under the License. # 许可证下的限制条款
import torch # 导入 PyTorch 库
import torch.nn.functional as F # 导入 PyTorch 的功能性 API
from torch import nn # 从 PyTorch 导入神经网络模块
from ..utils import deprecate # 从 utils 导入 deprecate 方法
from ..utils.import_utils import is_torch_npu_available # 从 utils 导入检查 NPU 可用性的函数
if is_torch_npu_available(): # 如果 NPU 可用
import torch_npu # 导入 NPU 库
# 定义一个字典,映射激活函数名称到相应的 PyTorch 激活函数
ACTIVATION_FUNCTIONS = {
"swish": nn.SiLU(), # Swish 激活函数
"silu": nn.SiLU(), # SiLU 激活函数
"mish": nn.Mish(), # Mish 激活函数
"gelu": nn.GELU(), # GELU 激活函数
"relu": nn.ReLU(), # ReLU 激活函数
}
# 获取激活函数的帮助函数
def get_activation(act_fn: str) -> nn.Module: # 定义函数,接受激活函数名称
"""Helper function to get activation function from string. # 文档字符串,说明功能
Args: # 参数说明
act_fn (str): Name of activation function. # 激活函数名称
Returns: # 返回值说明
nn.Module: Activation function. # 返回对应的激活函数模块
"""
act_fn = act_fn.lower() # 将激活函数名称转换为小写
if act_fn in ACTIVATION_FUNCTIONS: # 如果激活函数在字典中
return ACTIVATION_FUNCTIONS[act_fn] # 返回对应的激活函数
else: # 否则
raise ValueError(f"Unsupported activation function: {act_fn}") # 抛出不支持的激活函数错误
class FP32SiLU(nn.Module): # 定义 FP32SiLU 类,继承自 nn.Module
r""" # 文档字符串,描述该类
SiLU activation function with input upcasted to torch.float32. # SiLU 激活函数,输入转换为 float32
"""
def __init__(self): # 初始化方法
super().__init__() # 调用父类构造函数
def forward(self, inputs: torch.Tensor) -> torch.Tensor: # 定义前向传播方法
return F.silu(inputs.float(), inplace=False).to(inputs.dtype) # 将输入转换为 float32,计算 SiLU,返回原数据类型
class GELU(nn.Module): # 定义 GELU 类,继承自 nn.Module
r""" # 文档字符串,描述该类
GELU activation function with tanh approximation support with `approximate="tanh"`. # GELU 激活函数,支持 tanh 近似
Parameters: # 参数说明
dim_in (`int`): The number of channels in the input. # 输入通道数
dim_out (`int`): The number of channels in the output. # 输出通道数
approximate (`str`, *optional*, defaults to `"none"`): If `"tanh"`, use tanh approximation. # 是否使用 tanh 近似
bias (`bool`, defaults to True): Whether to use a bias in the linear layer. # 是否在线性层中使用偏置
"""
def __init__(self, dim_in: int, dim_out: int, approximate: str = "none", bias: bool = True): # 初始化方法
super().__init__() # 调用父类构造函数
self.proj = nn.Linear(dim_in, dim_out, bias=bias) # 创建线性层
self.approximate = approximate # 设置近似参数
def gelu(self, gate: torch.Tensor) -> torch.Tensor: # 定义 GELU 方法
if gate.device.type != "mps": # 如果设备不是 MPS
return F.gelu(gate, approximate=self.approximate) # 计算并返回 GELU
# mps: gelu is not implemented for float16 # 对于 MPS,float16 不支持 GELU
return F.gelu(gate.to(dtype=torch.float32), approximate=self.approximate).to(dtype=gate.dtype) # 转换为 float32 计算 GELU,返回原数据类型
def forward(self, hidden_states): # 定义前向传播方法
hidden_states = self.proj(hidden_states) # 通过线性层处理隐藏状态
hidden_states = self.gelu(hidden_states) # 计算 GELU 激活
return hidden_states # 返回激活后的隐藏状态
class GEGLU(nn.Module): # 定义 GEGLU 类,继承自 nn.Module
r""" # 文档字符串,描述该类
A [variant](https://arxiv.org/abs/2002.05202) of the gated linear unit activation function. # GEGLU 激活函数的变种
# 参数说明
Parameters:
dim_in (`int`): 输入通道的数量。
dim_out (`int`): 输出通道的数量。
bias (`bool`, defaults to True): 是否在线性层中使用偏置。
# 初始化方法,设置输入和输出通道,及偏置选项
def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
# 调用父类的初始化方法
super().__init__()
# 创建一个线性层,输入通道为 dim_in,输出通道为 dim_out * 2
self.proj = nn.Linear(dim_in, dim_out * 2, bias=bias)
# GELU 激活函数的实现
def gelu(self, gate: torch.Tensor) -> torch.Tensor:
# 检查当前设备类型是否为 MPS
if gate.device.type != "mps":
# 如果不是 MPS,直接返回 GELU 的计算结果
return F.gelu(gate)
# 对于 MPS:GELU 未对 float16 实现
# 将 gate 转换为 float32 计算 GELU,然后再转换回原始数据类型
return F.gelu(gate.to(dtype=torch.float32)).to(dtype=gate.dtype)
# 前向传播方法
def forward(self, hidden_states, *args, **kwargs):
# 如果传入额外参数或 kwargs 中包含 scale,给出弃用提示
if len(args) > 0 or kwargs.get("scale", None) is not None:
# 弃用提示信息,告知用户 scale 参数将被忽略
deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`."
# 调用弃用函数,显示警告
deprecate("scale", "1.0.0", deprecation_message)
# 将隐藏状态通过线性层进行变换
hidden_states = self.proj(hidden_states)
# 检查是否可用 NPU
if is_torch_npu_available():
# 使用 torch_npu.npu_geglu 可以在 NPU 上更快且节省内存
return torch_npu.npu_geglu(hidden_states, dim=-1, approximate=1)[0]
else:
# 将隐藏状态分为两部分:hidden_states 和 gate
hidden_states, gate = hidden_states.chunk(2, dim=-1)
# 返回 hidden_states 与 gate 的 GELU 结果的乘积
return hidden_states * self.gelu(gate)
# 定义一个名为 SwiGLU 的类,继承自 nn.Module
class SwiGLU(nn.Module):
r"""
A [variant](https://arxiv.org/abs/2002.05202) of the gated linear unit activation function. It's similar to `GEGLU`
but uses SiLU / Swish instead of GeLU.
Parameters:
dim_in (`int`): The number of channels in the input.
dim_out (`int`): The number of channels in the output.
bias (`bool`, defaults to True): Whether to use a bias in the linear layer.
"""
# 初始化方法,接受输入和输出的维度及偏置参数
def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
# 调用父类构造函数
super().__init__()
# 定义一个线性层,将输入通道映射到输出通道的两倍
self.proj = nn.Linear(dim_in, dim_out * 2, bias=bias)
# 使用 SiLU 激活函数
self.activation = nn.SiLU()
# 前向传播方法
def forward(self, hidden_states):
# 通过线性层处理输入
hidden_states = self.proj(hidden_states)
# 将处理后的输出拆分为两部分
hidden_states, gate = hidden_states.chunk(2, dim=-1)
# 返回激活后的输出和门控的乘积
return hidden_states * self.activation(gate)
# 定义一个名为 ApproximateGELU 的类,继承自 nn.Module
class ApproximateGELU(nn.Module):
r"""
The approximate form of the Gaussian Error Linear Unit (GELU). For more details, see section 2 of this
[paper](https://arxiv.org/abs/1606.08415).
Parameters:
dim_in (`int`): The number of channels in the input.
dim_out (`int`): The number of channels in the output.
bias (`bool`, defaults to True): Whether to use a bias in the linear layer.
"""
# 初始化方法,接受输入和输出的维度及偏置参数
def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
# 调用父类构造函数
super().__init__()
# 定义一个线性层,将输入通道映射到输出通道
self.proj = nn.Linear(dim_in, dim_out, bias=bias)
# 前向传播方法
def forward(self, x: torch.Tensor) -> torch.Tensor:
# 通过线性层处理输入
x = self.proj(x)
# 返回经过 sigmoid 函数调节后的输出
return x * torch.sigmoid(1.702 * x)
.\diffusers\models\adapter.py
# 版权所有 2022 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版("许可证")进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,依据许可证分发的软件
# 是以“原样”基础提供的,不附带任何形式的担保或条件,
# 无论是明示或暗示的。
# 有关许可证下特定权限和限制的具体信息,请参见许可证。
import os # 导入操作系统模块以处理文件和目录
from typing import Callable, List, Optional, Union # 导入类型注解以增强代码可读性
import torch # 导入 PyTorch 库以进行张量运算和深度学习
import torch.nn as nn # 导入 PyTorch 的神经网络模块以构建模型
from ..configuration_utils import ConfigMixin, register_to_config # 从父目录导入配置相关工具
from ..utils import logging # 从父目录导入日志记录工具
from .modeling_utils import ModelMixin # 从当前目录导入模型混合工具
logger = logging.get_logger(__name__) # 创建一个记录器实例,用于日志记录
class MultiAdapter(ModelMixin): # 定义 MultiAdapter 类,继承自 ModelMixin
r""" # 类文档字符串,描述类的功能和用途
MultiAdapter 是一个包装模型,包含多个适配器模型,并根据
用户分配的权重合并它们的输出。
该模型继承自 [`ModelMixin`]。有关库实现的所有模型的通用方法的文档
(例如下载或保存等),请查看超类文档。
参数:
adapters (`List[T2IAdapter]`, *可选*, 默认为 None):
一个 `T2IAdapter` 模型实例的列表。
"""
# 初始化 MultiAdapter 类,接受一组适配器
def __init__(self, adapters: List["T2IAdapter"]):
# 调用父类的初始化方法
super(MultiAdapter, self).__init__()
# 计算适配器的数量
self.num_adapter = len(adapters)
# 将适配器列表转换为 PyTorch 的 ModuleList
self.adapters = nn.ModuleList(adapters)
# 检查适配器数量,至少需要一个
if len(adapters) == 0:
raise ValueError("Expecting at least one adapter")
# 检查适配器数量,如果只有一个,建议使用 T2IAdapter
if len(adapters) == 1:
raise ValueError("For a single adapter, please use the `T2IAdapter` class instead of `MultiAdapter`")
# 获取第一个适配器的总缩放因子
first_adapter_total_downscale_factor = adapters[0].total_downscale_factor
# 获取第一个适配器的缩放因子
first_adapter_downscale_factor = adapters[0].downscale_factor
# 遍历剩余的适配器,检查它们的缩放因子
for idx in range(1, len(adapters)):
if (
adapters[idx].total_downscale_factor != first_adapter_total_downscale_factor
or adapters[idx].downscale_factor != first_adapter_downscale_factor
):
# 如果缩放因子不一致,抛出错误
raise ValueError(
f"Expecting all adapters to have the same downscaling behavior, but got:\n"
f"adapters[0].total_downscale_factor={first_adapter_total_downscale_factor}\n"
f"adapters[0].downscale_factor={first_adapter_downscale_factor}\n"
f"adapter[`{idx}`].total_downscale_factor={adapters[idx].total_downscale_factor}\n"
f"adapter[`{idx}`].downscale_factor={adapters[idx].downscale_factor}"
)
# 设置 MultiAdapter 的总缩放因子
self.total_downscale_factor = first_adapter_total_downscale_factor
# 设置 MultiAdapter 的缩放因子
self.downscale_factor = first_adapter_downscale_factor
# 定义前向传播方法,接受输入张量和可选的适配器权重
def forward(self, xs: torch.Tensor, adapter_weights: Optional[List[float]] = None) -> List[torch.Tensor]:
r"""
Args:
xs (`torch.Tensor`):
(batch, channel, height, width) 输入的图像张量,多个适配器模型沿维度 1 连接,
`channel` 应等于 `num_adapter` * "图像的通道数"。
adapter_weights (`List[float]`, *optional*, defaults to None):
表示在将每个适配器的输出相加之前,要乘以的权重的浮点数列表。
"""
# 如果没有提供适配器权重,则初始化为每个适配器权重相等的张量
if adapter_weights is None:
adapter_weights = torch.tensor([1 / self.num_adapter] * self.num_adapter)
else:
# 将提供的适配器权重转换为张量
adapter_weights = torch.tensor(adapter_weights)
# 初始化累计状态为 None,用于存储加权特征
accume_state = None
# 遍历输入张量、适配器权重和适配器模型
for x, w, adapter in zip(xs, adapter_weights, self.adapters):
# 使用适配器模型处理输入张量以提取特征
features = adapter(x)
# 如果累计状态为空,初始化它
if accume_state is None:
accume_state = features
# 根据当前适配器的权重调整累计状态的特征
for i in range(len(accume_state)):
accume_state[i] = w * accume_state[i]
else:
# 如果累计状态已经存在,将新特征加到累计状态中
for i in range(len(features)):
accume_state[i] += w * features[i]
# 返回加权后的累计状态
return accume_state
# 定义保存预训练模型的方法,接收多个参数
def save_pretrained(
self,
save_directory: Union[str, os.PathLike],
is_main_process: bool = True,
save_function: Callable = None,
safe_serialization: bool = True,
variant: Optional[str] = None,
):
"""
保存模型及其配置文件到指定目录,以便后续通过 `[`~models.adapter.MultiAdapter.from_pretrained`]` 类方法重新加载。
参数:
save_directory (`str` 或 `os.PathLike`):
要保存的目录。如果目录不存在,则会创建。
is_main_process (`bool`, *可选*, 默认为 `True`):
调用此函数的进程是否为主进程。在分布式训练(如 TPU)中有用,仅在主进程上设置 `is_main_process=True` 以避免竞争条件。
save_function (`Callable`):
用于保存状态字典的函数。在分布式训练(如 TPU)时,可以用其他方法替换 `torch.save`。可通过环境变量 `DIFFUSERS_SAVE_MODE` 配置。
safe_serialization (`bool`, *可选*, 默认为 `True`):
是否使用 `safetensors` 保存模型,或使用传统的 PyTorch 方法(使用 `pickle`)。
variant (`str`, *可选*):
如果指定,权重将以 pytorch_model.<variant>.bin 格式保存。
"""
# 初始化索引为 0
idx = 0
# 设置保存模型的路径
model_path_to_save = save_directory
# 遍历所有适配器
for adapter in self.adapters:
# 调用适配器的保存方法,将模型及其配置保存到指定路径
adapter.save_pretrained(
model_path_to_save,
is_main_process=is_main_process,
save_function=save_function,
safe_serialization=safe_serialization,
variant=variant,
)
# 索引加一,用于下一个模型路径
idx += 1
# 更新模型保存路径,添加索引
model_path_to_save = model_path_to_save + f"_{idx}"
# 定义类方法装饰器
@classmethod
# 定义一个 T2IAdapter 类,继承自 ModelMixin 和 ConfigMixin
class T2IAdapter(ModelMixin, ConfigMixin):
r"""
一个简单的类似 ResNet 的模型,接受包含控制信号(如关键姿态和深度)的图像。该模型
生成多个特征图,作为 [`UNet2DConditionModel`] 的额外条件。模型的架构遵循
[Adapter](https://github.com/TencentARC/T2I-Adapter/blob/686de4681515662c0ac2ffa07bf5dda83af1038a/ldm/modules/encoders/adapter.py#L97)
和
[AdapterLight](https://github.com/TencentARC/T2I-Adapter/blob/686de4681515662c0ac2ffa07bf5dda83af1038a/ldm/modules/encoders/adapter.py#L235) 的原始实现。
该模型继承自 [`ModelMixin`]。有关库为所有模型实现的通用方法(如下载或保存等),请查看超类文档。
参数:
in_channels (`int`, *可选*, 默认为 3):
Adapter 输入的通道数(*控制图像*)。如果使用灰度图像作为 *控制图像*,请将此参数设置为 1。
channels (`List[int]`, *可选*, 默认为 `(320, 640, 1280, 1280)`):
每个下采样块输出隐藏状态的通道数。`len(block_out_channels)` 还将决定 Adapter 中下采样块的数量。
num_res_blocks (`int`, *可选*, 默认为 2):
每个下采样块中的 ResNet 块数。
downscale_factor (`int`, *可选*, 默认为 8):
决定 Adapter 总体下采样因子的因素。
adapter_type (`str`, *可选*, 默认为 `full_adapter`):
要使用的 Adapter 类型。选择 `full_adapter`、`full_adapter_xl` 或 `light_adapter`。
"""
# 注册初始化函数到配置中
@register_to_config
def __init__(
self,
in_channels: int = 3, # 设置输入通道数,默认为 3
channels: List[int] = [320, 640, 1280, 1280], # 设置每个下采样块的输出通道数,默认为给定列表
num_res_blocks: int = 2, # 设置每个下采样块中的 ResNet 块数,默认为 2
downscale_factor: int = 8, # 设置下采样因子,默认为 8
adapter_type: str = "full_adapter", # 设置 Adapter 类型,默认为 'full_adapter'
):
super().__init__() # 调用父类的初始化方法
# 根据 adapter_type 的值实例化相应的 Adapter
if adapter_type == "full_adapter":
self.adapter = FullAdapter(in_channels, channels, num_res_blocks, downscale_factor) # 实例化 FullAdapter
elif adapter_type == "full_adapter_xl":
self.adapter = FullAdapterXL(in_channels, channels, num_res_blocks, downscale_factor) # 实例化 FullAdapterXL
elif adapter_type == "light_adapter":
self.adapter = LightAdapter(in_channels, channels, num_res_blocks, downscale_factor) # 实例化 LightAdapter
else:
raise ValueError( # 如果 adapter_type 不合法,抛出异常
f"Unsupported adapter_type: '{adapter_type}'. Choose either 'full_adapter' or "
"'full_adapter_xl' or 'light_adapter'." # 提示支持的 Adapter 类型
)
# 定义前向传播函数,接收一个张量 x,并返回特征张量列表
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
# 文档字符串,描述该函数的功能和输出
r"""
该函数通过适配器模型处理输入张量 `x`,并返回特征张量列表,
每个张量表示从输入中提取的不同尺度的信息。列表的长度由
在初始化时指定的 `channels` 和 `num_res_blocks` 参数中的
下采样块数量决定。
"""
# 调用适配器的前向方法,处理输入张量并返回结果
return self.adapter(x)
# 定义属性 total_downscale_factor,返回适配器的总下采样因子
@property
def total_downscale_factor(self):
# 返回适配器的总下采样因子
return self.adapter.total_downscale_factor
# 定义属性 downscale_factor,表示初始像素无序操作中的下采样因子
@property
def downscale_factor(self):
# 文档字符串,描述下采样因子的作用和可能的异常情况
"""在 T2I-Adapter 的初始像素无序操作中应用的下采样因子。如果输入图像的维度
不能被下采样因子整除,则会引发异常。
"""
# 返回适配器无序操作中的下采样因子
return self.adapter.unshuffle.downscale_factor
# 全适配器类
class FullAdapter(nn.Module):
r"""
详细信息请参见 [`T2IAdapter`]。
"""
# 初始化方法,设置输入通道、通道列表、残差块数量和下采样因子
def __init__(
self,
in_channels: int = 3,
channels: List[int] = [320, 640, 1280, 1280],
num_res_blocks: int = 2,
downscale_factor: int = 8,
):
# 调用父类初始化方法
super().__init__()
# 根据下采样因子计算输入通道数
in_channels = in_channels * downscale_factor**2
# 创建像素反混洗层
self.unshuffle = nn.PixelUnshuffle(downscale_factor)
# 创建输入卷积层
self.conv_in = nn.Conv2d(in_channels, channels[0], kernel_size=3, padding=1)
# 创建适配器块列表
self.body = nn.ModuleList(
[
# 添加第一个适配器块
AdapterBlock(channels[0], channels[0], num_res_blocks),
# 添加后续适配器块,带下采样
*[
AdapterBlock(channels[i - 1], channels[i], num_res_blocks, down=True)
for i in range(1, len(channels))
],
]
)
# 计算总的下采样因子
self.total_downscale_factor = downscale_factor * 2 ** (len(channels) - 1)
# 前向传播方法
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
r"""
该方法通过 FullAdapter 模型处理输入张量 `x`,执行像素反混洗、卷积和适配器块堆栈的操作。
返回一个特征张量列表,每个张量在处理的不同阶段捕获信息。特征张量的数量由初始化时指定的下采样块数量决定。
"""
# 反混洗输入张量
x = self.unshuffle(x)
# 通过输入卷积层处理
x = self.conv_in(x)
# 初始化特征列表
features = []
# 遍历适配器块并处理输入
for block in self.body:
x = block(x)
# 将特征添加到列表中
features.append(x)
# 返回特征列表
return features
# 全适配器 XL 类
class FullAdapterXL(nn.Module):
r"""
详细信息请参见 [`T2IAdapter`]。
"""
# 初始化方法,设置输入通道、通道列表、残差块数量和下采样因子
def __init__(
self,
in_channels: int = 3,
channels: List[int] = [320, 640, 1280, 1280],
num_res_blocks: int = 2,
downscale_factor: int = 16,
):
# 调用父类初始化方法
super().__init__()
# 根据下采样因子计算输入通道数
in_channels = in_channels * downscale_factor**2
# 创建像素反混洗层
self.unshuffle = nn.PixelUnshuffle(downscale_factor)
# 创建输入卷积层
self.conv_in = nn.Conv2d(in_channels, channels[0], kernel_size=3, padding=1)
# 初始化适配器块列表
self.body = []
# 遍历通道列表,创建适配器块
for i in range(len(channels)):
if i == 1:
# 为第二个通道添加适配器块
self.body.append(AdapterBlock(channels[i - 1], channels[i], num_res_blocks))
elif i == 2:
# 为第三个通道添加带下采样的适配器块
self.body.append(AdapterBlock(channels[i - 1], channels[i], num_res_blocks, down=True))
else:
# 为其他通道添加适配器块
self.body.append(AdapterBlock(channels[i], channels[i], num_res_blocks))
# 将适配器块列表转换为 ModuleList
self.body = nn.ModuleList(self.body)
# XL 只有一个下采样适配器块
self.total_downscale_factor = downscale_factor * 2
# 定义一个前向传播方法,输入为张量 x,返回特征张量的列表
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
r"""
该方法接受张量 x 作为输入,并通过 FullAdapterXL 模型处理。包括像素解混淆、应用卷积层,并将每个块追加到特征张量列表中。
"""
# 对输入张量进行像素解混淆操作
x = self.unshuffle(x)
# 将解混淆后的张量通过输入卷积层
x = self.conv_in(x)
# 初始化一个空列表以存储特征张量
features = []
# 遍历模型主体中的每个块
for block in self.body:
# 将当前张量通过块处理
x = block(x)
# 将处理后的张量追加到特征列表中
features.append(x)
# 返回特征张量的列表
return features
# AdapterBlock 类是一个辅助模型,包含多个类似 ResNet 的模块,用于 FullAdapter 和 FullAdapterXL 模型
class AdapterBlock(nn.Module):
r"""
AdapterBlock 是一个包含多个 ResNet 样式块的辅助模型。它在 `FullAdapter` 和
`FullAdapterXL` 模型中使用。
参数:
in_channels (`int`):
AdapterBlock 输入的通道数。
out_channels (`int`):
AdapterBlock 输出的通道数。
num_res_blocks (`int`):
AdapterBlock 中 ResNet 块的数量。
down (`bool`, *可选*, 默认为 `False`):
是否对 AdapterBlock 的输入进行下采样。
"""
# 初始化 AdapterBlock 类,接收输入输出通道数及 ResNet 块数量
def __init__(self, in_channels: int, out_channels: int, num_res_blocks: int, down: bool = False):
super().__init__() # 调用父类的构造函数
self.downsample = None # 初始化下采样层为 None
if down: # 如果需要下采样
self.downsample = nn.AvgPool2d(kernel_size=2, stride=2, ceil_mode=True) # 创建平均池化层
self.in_conv = None # 初始化输入卷积层为 None
if in_channels != out_channels: # 如果输入通道与输出通道不相等
self.in_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1) # 创建 1x1 卷积层
# 创建一系列的 ResNet 块,数量由 num_res_blocks 指定
self.resnets = nn.Sequential(
*[AdapterResnetBlock(out_channels) for _ in range(num_res_blocks)],
)
# 定义前向传播方法
def forward(self, x: torch.Tensor) -> torch.Tensor:
r"""
此方法接收张量 x 作为输入,执行下采样和卷积操作(如果指定了 self.downsample 和 self.in_conv)。
然后,它对输入张量应用一系列残差块。
"""
if self.downsample is not None: # 如果存在下采样层
x = self.downsample(x) # 执行下采样操作
if self.in_conv is not None: # 如果存在输入卷积层
x = self.in_conv(x) # 执行卷积操作
x = self.resnets(x) # 将输入传递通过一系列 ResNet 块
return x # 返回处理后的张量
# AdapterResnetBlock 类是一个实现 ResNet 样式块的辅助模型
class AdapterResnetBlock(nn.Module):
r"""
`AdapterResnetBlock` 是一个实现 ResNet 样式块的辅助模型。
参数:
channels (`int`):
AdapterResnetBlock 输入和输出的通道数。
"""
# 初始化 AdapterResnetBlock 类,接收通道数
def __init__(self, channels: int):
super().__init__() # 调用父类的构造函数
self.block1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1) # 创建 3x3 卷积层
self.act = nn.ReLU() # 创建 ReLU 激活函数
self.block2 = nn.Conv2d(channels, channels, kernel_size=1) # 创建 1x1 卷积层
# 定义前向传播方法
def forward(self, x: torch.Tensor) -> torch.Tensor:
r"""
此方法接收输入张量 x,并对其应用卷积层、ReLU 激活和另一个卷积层。返回与输入张量的相加结果。
"""
h = self.act(self.block1(x)) # 先通过第一个卷积层并应用激活函数
h = self.block2(h) # 再通过第二个卷积层
return h + x # 返回卷积结果与输入的和
# LightAdapter 类是一个轻量适配器模型
class LightAdapter(nn.Module):
r"""
有关更多信息,请参阅 [`T2IAdapter`]。
"""
# 初始化 LightAdapter 类,设置输入通道数、通道列表、ResNet 块数量及下采样因子
def __init__(
self,
in_channels: int = 3,
channels: List[int] = [320, 640, 1280],
num_res_blocks: int = 4,
downscale_factor: int = 8,
# 初始化方法
):
# 调用父类的初始化方法
super().__init__()
# 计算输入通道数,考虑下采样因子
in_channels = in_channels * downscale_factor**2
# 初始化像素反shuffle操作,依据下采样因子
self.unshuffle = nn.PixelUnshuffle(downscale_factor)
# 创建一个模块列表,包含多个 LightAdapterBlock
self.body = nn.ModuleList(
[
# 第一个 LightAdapterBlock,处理输入通道到第一个输出通道
LightAdapterBlock(in_channels, channels[0], num_res_blocks),
# 使用列表推导创建后续的 LightAdapterBlock,处理每对通道
*[
LightAdapterBlock(channels[i], channels[i + 1], num_res_blocks, down=True)
for i in range(len(channels) - 1)
],
# 最后一个 LightAdapterBlock,处理最后一组通道
LightAdapterBlock(channels[-1], channels[-1], num_res_blocks, down=True),
]
)
# 计算总下采样因子
self.total_downscale_factor = downscale_factor * (2 ** len(channels))
# 前向传播方法
def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
r"""
此方法接收输入张量 x,进行下采样,并将结果存入特征张量列表中。每个特征张量对应于 LightAdapter 中的不同处理级别。
"""
# 对输入张量进行反shuffle处理
x = self.unshuffle(x)
# 初始化特征列表
features = []
# 遍历模块列表中的每个块
for block in self.body:
# 通过当前块处理输入张量
x = block(x)
# 将处理后的张量添加到特征列表中
features.append(x)
# 返回特征列表
return features
# LightAdapterBlock 类是一个帮助模型,包含多个 LightAdapterResnetBlocks,用于 LightAdapter 模型中
class LightAdapterBlock(nn.Module):
r"""
A `LightAdapterBlock` is a helper model that contains multiple `LightAdapterResnetBlocks`. It is used in the
`LightAdapter` model.
Parameters:
in_channels (`int`):
Number of channels of LightAdapterBlock's input.
out_channels (`int`):
Number of channels of LightAdapterBlock's output.
num_res_blocks (`int`):
Number of LightAdapterResnetBlocks in the LightAdapterBlock.
down (`bool`, *optional*, defaults to `False`):
Whether to perform downsampling on LightAdapterBlock's input.
"""
# 初始化方法,接收输入输出通道数、残差块数量和是否下采样的标志
def __init__(self, in_channels: int, out_channels: int, num_res_blocks: int, down: bool = False):
super().__init__() # 调用父类构造函数
mid_channels = out_channels // 4 # 计算中间通道数
self.downsample = None # 初始化下采样层为 None
if down: # 如果需要下采样
# 创建平均池化层,kernel_size为2,步幅为2,向上取整
self.downsample = nn.AvgPool2d(kernel_size=2, stride=2, ceil_mode=True)
# 定义输入卷积层,kernel_size为1
self.in_conv = nn.Conv2d(in_channels, mid_channels, kernel_size=1)
# 创建残差块序列,数量为 num_res_blocks
self.resnets = nn.Sequential(*[LightAdapterResnetBlock(mid_channels) for _ in range(num_res_blocks)])
# 定义输出卷积层,kernel_size为1
self.out_conv = nn.Conv2d(mid_channels, out_channels, kernel_size=1)
# 前向传播方法,接收输入张量 x
def forward(self, x: torch.Tensor) -> torch.Tensor:
r"""
This method takes tensor x as input and performs downsampling if required. Then it applies in convolution
layer, a sequence of residual blocks, and out convolutional layer.
"""
if self.downsample is not None: # 如果定义了下采样层
x = self.downsample(x) # 对输入 x 进行下采样
x = self.in_conv(x) # 通过输入卷积层处理 x
x = self.resnets(x) # 通过残差块序列处理 x
x = self.out_conv(x) # 通过输出卷积层处理 x
return x # 返回处理后的结果
# LightAdapterResnetBlock 类是一个帮助模型,实现类似 ResNet 的块,具有与 AdapterResnetBlock 略微不同的架构
class LightAdapterResnetBlock(nn.Module):
"""
A `LightAdapterResnetBlock` is a helper model that implements a ResNet-like block with a slightly different
architecture than `AdapterResnetBlock`.
Parameters:
channels (`int`):
Number of channels of LightAdapterResnetBlock's input and output.
"""
# 初始化方法,接收通道数
def __init__(self, channels: int):
super().__init__() # 调用父类构造函数
# 定义第一个卷积层,kernel_size为3,padding为1
self.block1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
self.act = nn.ReLU() # 定义 ReLU 激活函数
# 定义第二个卷积层,kernel_size为3,padding为1
self.block2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
# 前向传播方法,接收输入张量 x
def forward(self, x: torch.Tensor) -> torch.Tensor:
r"""
This function takes input tensor x and processes it through one convolutional layer, ReLU activation, and
another convolutional layer and adds it to input tensor.
"""
h = self.act(self.block1(x)) # 通过第一个卷积层和 ReLU 激活处理 x
h = self.block2(h) # 通过第二个卷积层处理 h
return h + x # 将处理结果与输入 x 相加并返回
.\diffusers\models\attention.py
# 版权所有 2024 The HuggingFace Team. 保留所有权利。
#
# 根据 Apache 许可证,第 2.0 版(“许可证”)许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件
# 按“原样”分发,不附有任何明示或暗示的担保或条件。
# 请参见许可证以获取有关权限和
# 限制的具体语言。
from typing import Any, Dict, List, Optional, Tuple # 导入所需的类型注解
import torch # 导入 PyTorch 库
import torch.nn.functional as F # 导入 PyTorch 的函数式 API
from torch import nn # 从 PyTorch 导入神经网络模块
from ..utils import deprecate, logging # 从上级目录导入工具函数和日志记录模块
from ..utils.torch_utils import maybe_allow_in_graph # 导入可能允许图形计算的工具
from .activations import GEGLU, GELU, ApproximateGELU, FP32SiLU, SwiGLU # 导入不同激活函数
from .attention_processor import Attention, JointAttnProcessor2_0 # 导入注意力处理器
from .embeddings import SinusoidalPositionalEmbedding # 导入正弦位置嵌入
from .normalization import AdaLayerNorm, AdaLayerNormContinuous, AdaLayerNormZero, RMSNorm # 导入各种归一化方法
logger = logging.get_logger(__name__) # 创建一个模块级别的日志记录器
def _chunked_feed_forward(ff: nn.Module, hidden_states: torch.Tensor, chunk_dim: int, chunk_size: int):
# 定义一个函数,按块处理前馈神经网络,以节省内存
# 检查隐藏状态的维度是否能够被块大小整除
if hidden_states.shape[chunk_dim] % chunk_size != 0:
raise ValueError(
f"`hidden_states` dimension to be chunked: {hidden_states.shape[chunk_dim]} has to be divisible by chunk size: {chunk_size}. Make sure to set an appropriate `chunk_size` when calling `unet.enable_forward_chunking`."
) # 如果不能整除,抛出一个值错误,提示块大小设置不正确
num_chunks = hidden_states.shape[chunk_dim] // chunk_size # 计算可以生成的块数量
# 按块处理隐藏状态并将结果拼接成一个张量
ff_output = torch.cat(
[ff(hid_slice) for hid_slice in hidden_states.chunk(num_chunks, dim=chunk_dim)], # 将每个块传入前馈模块进行处理
dim=chunk_dim, # 在指定维度上拼接结果
)
return ff_output # 返回拼接后的输出
@maybe_allow_in_graph # 可能允许在图形计算中使用此类
class GatedSelfAttentionDense(nn.Module): # 定义一个门控自注意力密集层类,继承自 nn.Module
r""" # 类文档字符串,描述类的功能和参数
A gated self-attention dense layer that combines visual features and object features.
Parameters:
query_dim (`int`): The number of channels in the query. # 查询的通道数
context_dim (`int`): The number of channels in the context. # 上下文的通道数
n_heads (`int`): The number of heads to use for attention. # 注意力头的数量
d_head (`int`): The number of channels in each head. # 每个头的通道数
"""
def __init__(self, query_dim: int, context_dim: int, n_heads: int, d_head: int):
super().__init__() # 调用父类构造函数
# 因为需要拼接视觉特征和对象特征,所以需要一个线性投影
self.linear = nn.Linear(context_dim, query_dim) # 创建一个线性层,用于上下文到查询的映射
self.attn = Attention(query_dim=query_dim, heads=n_heads, dim_head=d_head) # 初始化注意力层
self.ff = FeedForward(query_dim, activation_fn="geglu") # 初始化前馈层,激活函数为 GEGLU
self.norm1 = nn.LayerNorm(query_dim) # 初始化第一个层归一化
self.norm2 = nn.LayerNorm(query_dim) # 初始化第二个层归一化
self.register_parameter("alpha_attn", nn.Parameter(torch.tensor(0.0))) # 注册注意力参数 alpha_attn
self.register_parameter("alpha_dense", nn.Parameter(torch.tensor(0.0))) # 注册密集参数 alpha_dense
self.enabled = True # 设置 enabled 属性为 True
# 前向传播函数,接收输入张量 x 和对象张量 objs,返回处理后的张量
def forward(self, x: torch.Tensor, objs: torch.Tensor) -> torch.Tensor:
# 如果未启用该模块,直接返回输入张量 x
if not self.enabled:
return x
# 获取输入张量的第二维大小,表示视觉特征的数量
n_visual = x.shape[1]
# 通过线性层处理对象张量
objs = self.linear(objs)
# 将输入张量和处理后的对象张量拼接,进行归一化后计算注意力,并调整输入张量
x = x + self.alpha_attn.tanh() * self.attn(self.norm1(torch.cat([x, objs], dim=1)))[:, :n_visual, :]
# 使用另一个归一化层处理 x,并通过前馈网络调整 x
x = x + self.alpha_dense.tanh() * self.ff(self.norm2(x))
# 返回处理后的张量 x
return x
# 装饰器,可能允许在计算图中使用此类
@maybe_allow_in_graph
# 定义一个联合变换器块类,继承自 nn.Module
class JointTransformerBlock(nn.Module):
r"""
根据 MMDiT 架构定义的变换器块,介绍于 Stable Diffusion 3.
参考文献: https://arxiv.org/abs/2403.03206
参数:
dim (`int`): 输入和输出中的通道数量.
num_attention_heads (`int`): 用于多头注意力的头数.
attention_head_dim (`int`): 每个头中的通道数.
context_pre_only (`bool`): 布尔值,决定是否添加与处理 `context` 条件相关的一些块.
"""
# 初始化函数,定义参数
def __init__(self, dim, num_attention_heads, attention_head_dim, context_pre_only=False):
# 调用父类构造函数
super().__init__()
# 记录是否仅处理上下文
self.context_pre_only = context_pre_only
# 根据上下文类型设置归一化类型
context_norm_type = "ada_norm_continous" if context_pre_only else "ada_norm_zero"
# 创建一个适应性层归一化对象
self.norm1 = AdaLayerNormZero(dim)
# 根据上下文归一化类型创建相应的归一化对象
if context_norm_type == "ada_norm_continous":
self.norm1_context = AdaLayerNormContinuous(
dim, dim, elementwise_affine=False, eps=1e-6, bias=True, norm_type="layer_norm"
)
elif context_norm_type == "ada_norm_zero":
self.norm1_context = AdaLayerNormZero(dim)
else:
# 如果归一化类型未知,抛出错误
raise ValueError(
f"Unknown context_norm_type: {context_norm_type}, currently only support `ada_norm_continous`, `ada_norm_zero`"
)
# 检查是否有缩放点积注意力函数
if hasattr(F, "scaled_dot_product_attention"):
processor = JointAttnProcessor2_0() # 使用相应的处理器
else:
# 如果不支持,抛出错误
raise ValueError(
"The current PyTorch version does not support the `scaled_dot_product_attention` function."
)
# 初始化注意力模块
self.attn = Attention(
query_dim=dim,
cross_attention_dim=None,
added_kv_proj_dim=dim,
dim_head=attention_head_dim,
heads=num_attention_heads,
out_dim=dim,
context_pre_only=context_pre_only,
bias=True,
processor=processor,
)
# 创建层归一化对象
self.norm2 = nn.LayerNorm(dim, elementwise_affine=False, eps=1e-6)
# 创建前馈网络对象
self.ff = FeedForward(dim=dim, dim_out=dim, activation_fn="gelu-approximate")
# 如果不是仅处理上下文,则创建上下文的归一化和前馈网络
if not context_pre_only:
self.norm2_context = nn.LayerNorm(dim, elementwise_affine=False, eps=1e-6)
self.ff_context = FeedForward(dim=dim, dim_out=dim, activation_fn="gelu-approximate")
else:
# 如果仅处理上下文,则设置为 None
self.norm2_context = None
self.ff_context = None
# 将块大小默认设置为 None
self._chunk_size = None
# 设置块维度为 0
self._chunk_dim = 0
# 从基本变换器块复制的方法,设置块的前馈网络
def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0):
# 设置块的前馈网络大小
self._chunk_size = chunk_size
# 设置块维度
self._chunk_dim = dim
# 定义前向传播函数,接受隐藏状态、编码器隐藏状态和时间嵌入作为输入
def forward(
self, hidden_states: torch.FloatTensor, encoder_hidden_states: torch.FloatTensor, temb: torch.FloatTensor
):
# 对隐藏状态进行归一化,并计算门控、多头自注意力和MLP的偏移和缩放值
norm_hidden_states, gate_msa, shift_mlp, scale_mlp, gate_mlp = self.norm1(hidden_states, emb=temb)
# 判断是否仅使用上下文信息
if self.context_pre_only:
# 仅归一化编码器隐藏状态
norm_encoder_hidden_states = self.norm1_context(encoder_hidden_states, temb)
else:
# 对编码器隐藏状态进行归一化,并计算相应的门控和偏移值
norm_encoder_hidden_states, c_gate_msa, c_shift_mlp, c_scale_mlp, c_gate_mlp = self.norm1_context(
encoder_hidden_states, emb=temb
)
# 进行注意力计算
attn_output, context_attn_output = self.attn(
hidden_states=norm_hidden_states, encoder_hidden_states=norm_encoder_hidden_states
)
# 处理注意力输出以更新隐藏状态
attn_output = gate_msa.unsqueeze(1) * attn_output # 应用门控机制
hidden_states = hidden_states + attn_output # 更新隐藏状态
# 对隐藏状态进行第二次归一化
norm_hidden_states = self.norm2(hidden_states)
# 结合缩放和偏移进行调整
norm_hidden_states = norm_hidden_states * (1 + scale_mlp[:, None]) + shift_mlp[:, None]
# 如果设置了分块大小,则进行分块前馈处理以节省内存
if self._chunk_size is not None:
ff_output = _chunked_feed_forward(self.ff, norm_hidden_states, self._chunk_dim, self._chunk_size)
else:
# 否则直接执行前馈操作
ff_output = self.ff(norm_hidden_states)
# 应用门控机制更新前馈输出
ff_output = gate_mlp.unsqueeze(1) * ff_output
# 更新隐藏状态
hidden_states = hidden_states + ff_output
# 处理编码器隐藏状态的注意力输出
if self.context_pre_only:
# 如果仅使用上下文,则编码器隐藏状态设为 None
encoder_hidden_states = None
else:
# 应用门控机制更新上下文注意力输出
context_attn_output = c_gate_msa.unsqueeze(1) * context_attn_output
encoder_hidden_states = encoder_hidden_states + context_attn_output # 更新编码器隐藏状态
# 对编码器隐藏状态进行第二次归一化
norm_encoder_hidden_states = self.norm2_context(encoder_hidden_states)
# 结合缩放和偏移进行调整
norm_encoder_hidden_states = norm_encoder_hidden_states * (1 + c_scale_mlp[:, None]) + c_shift_mlp[:, None]
# 如果设置了分块大小,则进行分块前馈处理以节省内存
if self._chunk_size is not None:
context_ff_output = _chunked_feed_forward(
self.ff_context, norm_encoder_hidden_states, self._chunk_dim, self._chunk_size
)
else:
# 否则直接执行前馈操作
context_ff_output = self.ff_context(norm_encoder_hidden_states)
# 应用门控机制更新编码器隐藏状态
encoder_hidden_states = encoder_hidden_states + c_gate_mlp.unsqueeze(1) * context_ff_output
# 返回更新后的编码器隐藏状态和隐藏状态
return encoder_hidden_states, hidden_states
# 装饰器,可能允许在计算图中使用此类
@maybe_allow_in_graph
# 定义一个基本的 Transformer 块,继承自 nn.Module
class BasicTransformerBlock(nn.Module):
r"""
一个基本的 Transformer 块。
参数:
dim (`int`): 输入和输出中的通道数。
num_attention_heads (`int`): 用于多头注意力的头数。
attention_head_dim (`int`): 每个头的通道数。
dropout (`float`, *可选*, 默认为 0.0): 使用的丢弃概率。
cross_attention_dim (`int`, *可选*): 用于交叉注意力的 encoder_hidden_states 向量的大小。
activation_fn (`str`, *可选*, 默认为 `"geglu"`): 在前馈中使用的激活函数。
num_embeds_ada_norm (:
obj: `int`, *可选*): 在训练期间使用的扩散步骤数量。参见 `Transformer2DModel`。
attention_bias (:
obj: `bool`, *可选*, 默认为 `False`): 配置注意力是否应该包含偏置参数。
only_cross_attention (`bool`, *可选*):
是否仅使用交叉注意力层。在这种情况下使用两个交叉注意力层。
double_self_attention (`bool`, *可选*):
是否使用两个自注意力层。在这种情况下不使用交叉注意力层。
upcast_attention (`bool`, *可选*):
是否将注意力计算上溯到 float32。这对于混合精度训练很有用。
norm_elementwise_affine (`bool`, *可选*, 默认为 `True`):
是否为归一化使用可学习的逐元素仿射参数。
norm_type (`str`, *可选*, 默认为 `"layer_norm"`):
要使用的归一化层。可以是 `"layer_norm"`、`"ada_norm"` 或 `"ada_norm_zero"`。
final_dropout (`bool`, *可选*, 默认为 False):
是否在最后的前馈层之后应用最终的丢弃。
attention_type (`str`, *可选*, 默认为 `"default"`):
要使用的注意力类型。可以是 `"default"`、`"gated"` 或 `"gated-text-image"`。
positional_embeddings (`str`, *可选*, 默认为 `None`):
要应用的位置嵌入的类型。
num_positional_embeddings (`int`, *可选*, 默认为 `None`):
要应用的最大位置嵌入数量。
"""
# 初始化方法,设置模型参数
def __init__(
self,
dim: int, # 模型维度
num_attention_heads: int, # 注意力头的数量
attention_head_dim: int, # 每个注意力头的维度
dropout=0.0, # dropout 概率
cross_attention_dim: Optional[int] = None, # 交叉注意力维度,默认为 None
activation_fn: str = "geglu", # 激活函数类型,默认为 'geglu'
num_embeds_ada_norm: Optional[int] = None, # 自适应规范化的嵌入数量
attention_bias: bool = False, # 是否使用注意力偏置
only_cross_attention: bool = False, # 是否仅使用交叉注意力
double_self_attention: bool = False, # 是否双重自注意力
upcast_attention: bool = False, # 是否提升注意力精度
norm_elementwise_affine: bool = True, # 是否进行逐元素仿射规范化
norm_type: str = "layer_norm", # 规范化类型,支持多种类型
norm_eps: float = 1e-5, # 规范化的 epsilon 值
final_dropout: bool = False, # 最终层的 dropout 开关
attention_type: str = "default", # 注意力机制类型,默认为 'default'
positional_embeddings: Optional[str] = None, # 位置嵌入的类型
num_positional_embeddings: Optional[int] = None, # 位置嵌入的数量
ada_norm_continous_conditioning_embedding_dim: Optional[int] = None, # 自适应规范化连续条件嵌入维度
ada_norm_bias: Optional[int] = None, # 自适应规范化偏置
ff_inner_dim: Optional[int] = None, # 前馈网络的内部维度
ff_bias: bool = True, # 前馈网络是否使用偏置
attention_out_bias: bool = True, # 输出注意力是否使用偏置
def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0): # 设置分块前馈网络的方法
# 设置分块前馈网络的大小和维度
self._chunk_size = chunk_size # 存储分块大小
self._chunk_dim = dim # 存储维度
def forward( # 前向传播方法
self,
hidden_states: torch.Tensor, # 输入的隐藏状态
attention_mask: Optional[torch.Tensor] = None, # 注意力掩码,默认为 None
encoder_hidden_states: Optional[torch.Tensor] = None, # 编码器的隐藏状态
encoder_attention_mask: Optional[torch.Tensor] = None, # 编码器的注意力掩码
timestep: Optional[torch.LongTensor] = None, # 时间步长
cross_attention_kwargs: Dict[str, Any] = None, # 交叉注意力的参数
class_labels: Optional[torch.LongTensor] = None, # 类别标签
added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None, # 添加的条件参数
# 定义一个前馈层的类,继承自 nn.Module
class LuminaFeedForward(nn.Module):
r"""
一个前馈层。
参数:
hidden_size (`int`):
模型隐藏层的维度。该参数决定了模型隐藏表示的宽度。
intermediate_size (`int`): 前馈层的中间维度。
multiple_of (`int`, *optional*): 确保隐藏维度是该值的倍数。
ffn_dim_multiplier (float, *optional*): 自定义的隐藏维度乘数。默认为 None。
"""
# 初始化方法,接受多个参数
def __init__(
self,
dim: int,
inner_dim: int,
multiple_of: Optional[int] = 256,
ffn_dim_multiplier: Optional[float] = None,
):
# 调用父类的初始化方法
super().__init__()
# 将 inner_dim 调整为原来的 2/3
inner_dim = int(2 * inner_dim / 3)
# 如果提供了 ffn_dim_multiplier,则调整 inner_dim
if ffn_dim_multiplier is not None:
inner_dim = int(ffn_dim_multiplier * inner_dim)
# 将 inner_dim 调整为 multiple_of 的倍数
inner_dim = multiple_of * ((inner_dim + multiple_of - 1) // multiple_of)
# 创建第一个线性层,从 dim 到 inner_dim,不使用偏置
self.linear_1 = nn.Linear(
dim,
inner_dim,
bias=False,
)
# 创建第二个线性层,从 inner_dim 到 dim,不使用偏置
self.linear_2 = nn.Linear(
inner_dim,
dim,
bias=False,
)
# 创建第三个线性层,从 dim 到 inner_dim,不使用偏置
self.linear_3 = nn.Linear(
dim,
inner_dim,
bias=False,
)
# 初始化 SiLU 激活函数
self.silu = FP32SiLU()
# 前向传播方法,定义模型的前向计算逻辑
def forward(self, x):
# 依次通过线性层和激活函数计算输出
return self.linear_2(self.silu(self.linear_1(x)) * self.linear_3(x))
# 用于图中可能允许的装饰器定义一个基本的变换器块类
@maybe_allow_in_graph
class TemporalBasicTransformerBlock(nn.Module):
r"""
针对视频数据的基本变换器块。
参数:
dim (`int`): 输入和输出中的通道数。
time_mix_inner_dim (`int`): 用于时间注意力的通道数。
num_attention_heads (`int`): 多头注意力使用的头数。
attention_head_dim (`int`): 每个头中的通道数。
cross_attention_dim (`int`, *optional*): 用于交叉注意力的 encoder_hidden_states 向量大小。
"""
# 初始化方法,接受多个参数
def __init__(
self,
dim: int,
time_mix_inner_dim: int,
num_attention_heads: int,
attention_head_dim: int,
cross_attention_dim: Optional[int] = None,
):
# 初始化父类
super().__init__()
# 判断是否为时间混合内部维度,设置标志
self.is_res = dim == time_mix_inner_dim
# 创建输入层归一化层
self.norm_in = nn.LayerNorm(dim)
# 定义三个模块,每个模块都有自己的归一化层
# 1. 自注意力模块
# 创建前馈神经网络,输入维度和输出维度设置为时间混合内部维度,激活函数为 GEGLU
self.ff_in = FeedForward(
dim,
dim_out=time_mix_inner_dim,
activation_fn="geglu",
)
# 创建第一个归一化层
self.norm1 = nn.LayerNorm(time_mix_inner_dim)
# 创建自注意力层,设置查询维度、头数和头维度
self.attn1 = Attention(
query_dim=time_mix_inner_dim,
heads=num_attention_heads,
dim_head=attention_head_dim,
cross_attention_dim=None,
)
# 2. 交叉注意力模块
# 检查交叉注意力维度是否为 None
if cross_attention_dim is not None:
# 当前仅在自注意力中使用 AdaLayerNormZero
# 第二个交叉注意力模块返回的调制块数量没有意义
self.norm2 = nn.LayerNorm(time_mix_inner_dim)
# 创建交叉注意力层,设置查询维度和交叉注意力维度
self.attn2 = Attention(
query_dim=time_mix_inner_dim,
cross_attention_dim=cross_attention_dim,
heads=num_attention_heads,
dim_head=attention_head_dim,
) # 如果 encoder_hidden_states 为 None,则为自注意力
else:
# 如果没有交叉注意力,归一化层和注意力层设置为 None
self.norm2 = None
self.attn2 = None
# 3. 前馈神经网络模块
# 创建第二个归一化层
self.norm3 = nn.LayerNorm(time_mix_inner_dim)
# 创建前馈神经网络,输入维度为时间混合内部维度,激活函数为 GEGLU
self.ff = FeedForward(time_mix_inner_dim, activation_fn="geglu")
# 让块大小默认为 None
self._chunk_size = None
# 让块维度默认为 None
self._chunk_dim = None
# 设置块前馈的方法,接受可选的块大小和其他参数
def set_chunk_feed_forward(self, chunk_size: Optional[int], **kwargs):
# 设置块前馈的块大小
self._chunk_size = chunk_size
# 块维度硬编码为 1,以获得更好的速度与内存平衡
self._chunk_dim = 1
# 前向传播方法,接受隐藏状态和帧数以及可选的编码器隐藏状态
def forward(
self,
hidden_states: torch.Tensor,
num_frames: int,
encoder_hidden_states: Optional[torch.Tensor] = None,
) -> torch.Tensor:
# 注意归一化始终在后续计算之前应用
# 0. 自注意力
# 获取批次大小,通常是隐藏状态的第一个维度
batch_size = hidden_states.shape[0]
# 获取批次帧数、序列长度和通道数
batch_frames, seq_length, channels = hidden_states.shape
# 根据帧数计算新的批次大小
batch_size = batch_frames // num_frames
# 调整隐藏状态形状以适应新批次大小和帧数
hidden_states = hidden_states[None, :].reshape(batch_size, num_frames, seq_length, channels)
# 改变维度顺序以便后续操作
hidden_states = hidden_states.permute(0, 2, 1, 3)
# 重新调整形状为(batch_size * seq_length, num_frames, channels)
hidden_states = hidden_states.reshape(batch_size * seq_length, num_frames, channels)
# 保存残差以便后续使用
residual = hidden_states
# 对隐藏状态应用输入归一化
hidden_states = self.norm_in(hidden_states)
# 如果存在分块大小,则使用分块前馈函数
if self._chunk_size is not None:
hidden_states = _chunked_feed_forward(self.ff_in, hidden_states, self._chunk_dim, self._chunk_size)
else:
# 否则,直接应用前馈函数
hidden_states = self.ff_in(hidden_states)
# 如果使用残差连接,则将残差添加回隐藏状态
if self.is_res:
hidden_states = hidden_states + residual
# 对隐藏状态进行归一化
norm_hidden_states = self.norm1(hidden_states)
# 计算自注意力输出
attn_output = self.attn1(norm_hidden_states, encoder_hidden_states=None)
# 将自注意力输出与隐藏状态相加
hidden_states = attn_output + hidden_states
# 3. 交叉注意力
# 如果存在第二个注意力层,则计算交叉注意力
if self.attn2 is not None:
norm_hidden_states = self.norm2(hidden_states)
attn_output = self.attn2(norm_hidden_states, encoder_hidden_states=encoder_hidden_states)
# 将交叉注意力输出与隐藏状态相加
hidden_states = attn_output + hidden_states
# 4. 前馈
# 对隐藏状态进行归一化
norm_hidden_states = self.norm3(hidden_states)
# 如果存在分块大小,则使用分块前馈函数
if self._chunk_size is not None:
ff_output = _chunked_feed_forward(self.ff, norm_hidden_states, self._chunk_dim, self._chunk_size)
else:
# 否则,直接应用前馈函数
ff_output = self.ff(norm_hidden_states)
# 如果使用残差连接,则将前馈输出与隐藏状态相加
if self.is_res:
hidden_states = ff_output + hidden_states
else:
# 否则,仅使用前馈输出
hidden_states = ff_output
# 调整隐藏状态形状以适应新批次大小和帧数
hidden_states = hidden_states[None, :].reshape(batch_size, seq_length, num_frames, channels)
# 改变维度顺序以便后续操作
hidden_states = hidden_states.permute(0, 2, 1, 3)
# 重新调整形状为(batch_size * num_frames, seq_length, channels)
hidden_states = hidden_states.reshape(batch_size * num_frames, seq_length, channels)
# 返回处理后的隐藏状态
return hidden_states
# 定义一个 SkipFFTransformerBlock 类,继承自 nn.Module
class SkipFFTransformerBlock(nn.Module):
# 初始化方法,接收多个参数来设置层的属性
def __init__(
self,
dim: int, # 输入的特征维度
num_attention_heads: int, # 注意力头的数量
attention_head_dim: int, # 每个注意力头的维度
kv_input_dim: int, # 键值对输入的维度
kv_input_dim_proj_use_bias: bool, # 是否在 KV 映射中使用偏置
dropout=0.0, # dropout 比率
cross_attention_dim: Optional[int] = None, # 交叉注意力的维度,可选
attention_bias: bool = False, # 是否使用注意力偏置
attention_out_bias: bool = True, # 是否使用输出的注意力偏置
):
# 调用父类的初始化方法
super().__init__()
# 如果 KV 输入维度与特征维度不一致,则定义 KV 映射层
if kv_input_dim != dim:
self.kv_mapper = nn.Linear(kv_input_dim, dim, kv_input_dim_proj_use_bias)
else:
self.kv_mapper = None # 否则不使用 KV 映射
# 定义第一个归一化层
self.norm1 = RMSNorm(dim, 1e-06)
# 定义第一个注意力层
self.attn1 = Attention(
query_dim=dim, # 查询的维度
heads=num_attention_heads, # 注意力头数量
dim_head=attention_head_dim, # 每个头的维度
dropout=dropout, # dropout 比率
bias=attention_bias, # 是否使用注意力偏置
cross_attention_dim=cross_attention_dim, # 交叉注意力的维度
out_bias=attention_out_bias, # 输出是否使用偏置
)
# 定义第二个归一化层
self.norm2 = RMSNorm(dim, 1e-06)
# 定义第二个注意力层
self.attn2 = Attention(
query_dim=dim, # 查询的维度
cross_attention_dim=cross_attention_dim, # 交叉注意力的维度
heads=num_attention_heads, # 注意力头数量
dim_head=attention_head_dim, # 每个头的维度
dropout=dropout, # dropout 比率
bias=attention_bias, # 是否使用注意力偏置
out_bias=attention_out_bias, # 输出是否使用偏置
)
# 前向传播方法
def forward(self, hidden_states, encoder_hidden_states, cross_attention_kwargs):
# 复制交叉注意力的参数,如果没有则初始化为空字典
cross_attention_kwargs = cross_attention_kwargs.copy() if cross_attention_kwargs is not None else {}
# 如果存在 KV 映射层,则对编码器的隐藏状态进行映射
if self.kv_mapper is not None:
encoder_hidden_states = self.kv_mapper(F.silu(encoder_hidden_states))
# 对输入的隐藏状态进行归一化
norm_hidden_states = self.norm1(hidden_states)
# 计算第一个注意力层的输出
attn_output = self.attn1(
norm_hidden_states, # 归一化后的隐藏状态
encoder_hidden_states=encoder_hidden_states, # 编码器的隐藏状态
**cross_attention_kwargs, # 其他交叉注意力的参数
)
# 更新隐藏状态
hidden_states = attn_output + hidden_states
# 对更新后的隐藏状态进行第二次归一化
norm_hidden_states = self.norm2(hidden_states)
# 计算第二个注意力层的输出
attn_output = self.attn2(
norm_hidden_states, # 归一化后的隐藏状态
encoder_hidden_states=encoder_hidden_states, # 编码器的隐藏状态
**cross_attention_kwargs, # 其他交叉注意力的参数
)
# 更新隐藏状态
hidden_states = attn_output + hidden_states
# 返回最终的隐藏状态
return hidden_states
# 定义一个 FreeNoiseTransformerBlock 类,继承自 nn.Module
@maybe_allow_in_graph
class FreeNoiseTransformerBlock(nn.Module):
r"""
A FreeNoise Transformer block. # FreeNoise Transformer 块的文档字符串
"""
# 初始化方法,设置模型的各种参数
def __init__(
# 模型的维度
self,
dim: int,
# 注意力头的数量
num_attention_heads: int,
# 每个注意力头的维度
attention_head_dim: int,
# dropout 概率,默认为 0.0
dropout: float = 0.0,
# 交叉注意力的维度,默认为 None
cross_attention_dim: Optional[int] = None,
# 激活函数的名称,默认为 "geglu"
activation_fn: str = "geglu",
# 自适应归一化的嵌入数量,默认为 None
num_embeds_ada_norm: Optional[int] = None,
# 是否使用注意力偏差,默认为 False
attention_bias: bool = False,
# 是否仅使用交叉注意力,默认为 False
only_cross_attention: bool = False,
# 是否使用双重自注意力,默认为 False
double_self_attention: bool = False,
# 是否上溯注意力,默认为 False
upcast_attention: bool = False,
# 归一化是否使用逐元素仿射,默认为 True
norm_elementwise_affine: bool = True,
# 归一化的类型,默认为 "layer_norm"
norm_type: str = "layer_norm",
# 归一化的 epsilon 值,默认为 1e-5
norm_eps: float = 1e-5,
# 最终是否使用 dropout,默认为 False
final_dropout: bool = False,
# 位置嵌入的类型,默认为 None
positional_embeddings: Optional[str] = None,
# 位置嵌入的数量,默认为 None
num_positional_embeddings: Optional[int] = None,
# 前馈网络内部维度,默认为 None
ff_inner_dim: Optional[int] = None,
# 前馈网络是否使用偏差,默认为 True
ff_bias: bool = True,
# 注意力输出是否使用偏差,默认为 True
attention_out_bias: bool = True,
# 上下文长度,默认为 16
context_length: int = 16,
# 上下文步幅,默认为 4
context_stride: int = 4,
# 权重方案,默认为 "pyramid"
weighting_scheme: str = "pyramid",
# 获取帧索引的方法,返回一对帧索引的列表
def _get_frame_indices(self, num_frames: int) -> List[Tuple[int, int]]:
# 初始化帧索引列表
frame_indices = []
# 遍历所有帧,步幅为上下文步幅
for i in range(0, num_frames - self.context_length + 1, self.context_stride):
# 当前窗口的起始帧
window_start = i
# 当前窗口的结束帧,确保不超过总帧数
window_end = min(num_frames, i + self.context_length)
# 将窗口索引添加到列表
frame_indices.append((window_start, window_end))
# 返回帧索引列表
return frame_indices
# 获取帧权重的方法,返回权重列表
def _get_frame_weights(self, num_frames: int, weighting_scheme: str = "pyramid") -> List[float]:
# 如果权重方案为 "pyramid"
if weighting_scheme == "pyramid":
# 判断帧数是否为偶数
if num_frames % 2 == 0:
# 生成偶数帧的权重列表
weights = list(range(1, num_frames // 2 + 1))
# 反转并连接权重列表
weights = weights + weights[::-1]
else:
# 生成奇数帧的权重列表
weights = list(range(1, num_frames // 2 + 1))
# 添加中间权重并反转连接
weights = weights + [num_frames // 2 + 1] + weights[::-1]
else:
# 抛出不支持的权重方案错误
raise ValueError(f"Unsupported value for weighting_scheme={weighting_scheme}")
# 返回权重列表
return weights
# 设置自由噪声属性的方法,无返回值
def set_free_noise_properties(
self, context_length: int, context_stride: int, weighting_scheme: str = "pyramid"
) -> None:
# 设置上下文长度
self.context_length = context_length
# 设置上下文步幅
self.context_stride = context_stride
# 设置权重方案
self.weighting_scheme = weighting_scheme
# 设置块前馈的方法,无返回值
def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0) -> None:
# 设置块大小和维度
self._chunk_size = chunk_size
self._chunk_dim = dim
# 前向传播方法,处理输入隐藏状态
def forward(
self,
# 输入的隐藏状态张量
hidden_states: torch.Tensor,
# 可选的注意力掩码
attention_mask: Optional[torch.Tensor] = None,
# 可选的编码器隐藏状态
encoder_hidden_states: Optional[torch.Tensor] = None,
# 可选的编码器注意力掩码
encoder_attention_mask: Optional[torch.Tensor] = None,
# 交叉注意力的额外参数
cross_attention_kwargs: Dict[str, Any] = None,
# 可变参数
*args,
# 关键字参数
**kwargs,
# 定义一个前馈层类,继承自 nn.Module
class FeedForward(nn.Module):
r"""
前馈层。
参数:
dim (`int`): 输入的通道数。
dim_out (`int`, *可选*): 输出的通道数。如果未给定,默认为 `dim`。
mult (`int`, *可选*, 默认为 4): 用于隐藏维度的乘数。
dropout (`float`, *可选*, 默认为 0.0): 使用的 dropout 概率。
activation_fn (`str`, *可选*, 默认为 `"geglu"`): 前馈中使用的激活函数。
final_dropout (`bool` *可选*, 默认为 False): 是否应用最终的 dropout。
bias (`bool`, 默认为 True): 是否在线性层中使用偏置。
"""
# 初始化方法
def __init__(
self,
dim: int, # 输入通道数
dim_out: Optional[int] = None, # 输出通道数(可选)
mult: int = 4, # 隐藏维度乘数
dropout: float = 0.0, # dropout 概率
activation_fn: str = "geglu", # 激活函数类型
final_dropout: bool = False, # 是否应用最终 dropout
inner_dim=None, # 隐藏层维度(可选)
bias: bool = True, # 是否使用偏置
):
# 调用父类构造函数
super().__init__()
# 如果未指定 inner_dim,则计算为 dim 乘以 mult
if inner_dim is None:
inner_dim = int(dim * mult)
# 如果未指定 dim_out,则设置为 dim
dim_out = dim_out if dim_out is not None else dim
# 根据选择的激活函数创建对应的激活层
if activation_fn == "gelu":
act_fn = GELU(dim, inner_dim, bias=bias) # GELU 激活函数
if activation_fn == "gelu-approximate":
act_fn = GELU(dim, inner_dim, approximate="tanh", bias=bias) # 近似 GELU
elif activation_fn == "geglu":
act_fn = GEGLU(dim, inner_dim, bias=bias) # GEGLU 激活函数
elif activation_fn == "geglu-approximate":
act_fn = ApproximateGELU(dim, inner_dim, bias=bias) # 近似 GEGLU
elif activation_fn == "swiglu":
act_fn = SwiGLU(dim, inner_dim, bias=bias) # SwiGLU 激活函数
# 初始化一个模块列表,用于存储层
self.net = nn.ModuleList([])
# 添加激活函数层到网络
self.net.append(act_fn)
# 添加 dropout 层到网络
self.net.append(nn.Dropout(dropout))
# 添加线性层到网络,输入为 inner_dim,输出为 dim_out
self.net.append(nn.Linear(inner_dim, dim_out, bias=bias))
# 如果 final_dropout 为真,则添加最终的 dropout 层
if final_dropout:
self.net.append(nn.Dropout(dropout))
# 前向传播方法
def forward(self, hidden_states: torch.Tensor, *args, **kwargs) -> torch.Tensor:
# 检查是否有额外的参数,或是否传递了过期的 scale 参数
if len(args) > 0 or kwargs.get("scale", None) is not None:
deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`."
deprecate("scale", "1.0.0", deprecation_message) # 发出关于 scale 参数的弃用警告
# 遍历网络中的每一层,依次对 hidden_states 进行处理
for module in self.net:
hidden_states = module(hidden_states) # 将当前层应用于输入
# 返回处理后的隐藏状态
return hidden_states
标签:dim,None,int,self,diffusers,states,源码,hidden,解析
From: https://www.cnblogs.com/apachecn/p/18492373