首页 > 编程语言 >diffusers-源码解析-三-

diffusers-源码解析-三-

时间:2024-10-22 12:37:07浏览次数:6  
标签:dim None int self diffusers states 源码 hidden 解析

diffusers 源码解析(三)

.\diffusers\loaders\textual_inversion.py

# 版权声明,表示该文件的所有权及使用条款
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 许可证信息,指明该文件遵循的开源许可证
# Licensed under the Apache License, Version 2.0 (the "License");
# 使用本文件需遵循许可证的规定
# you may not use this file except in compliance with the License.
# 获取许可证的链接
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 免责声明,表示在法律允许的范围内不承担任何责任
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 从 typing 模块导入所需类型,便于类型注解
from typing import Dict, List, Optional, Union

# 导入 safetensors 库,可能用于安全的张量处理
import safetensors
# 导入 PyTorch 库,便于深度学习模型的构建与训练
import torch
# 导入验证 Hugging Face Hub 参数的函数
from huggingface_hub.utils import validate_hf_hub_args
# 从 PyTorch 导入神经网络模块
from torch import nn

# 根据可用性导入 transformers 模块的预训练模型与分词器
from ..models.modeling_utils import load_state_dict
# 导入工具函数,处理模型文件、检查依赖等
from ..utils import _get_model_file, is_accelerate_available, is_transformers_available, logging

# 检查 transformers 库是否可用,如果可用则导入相关类
if is_transformers_available():
    from transformers import PreTrainedModel, PreTrainedTokenizer

# 检查 accelerate 库是否可用,如果可用则导入相关钩子
if is_accelerate_available():
    from accelerate.hooks import AlignDevicesHook, CpuOffload, remove_hook_from_module

# 获取当前模块的日志记录器
logger = logging.get_logger(__name__)

# 定义文本反转所需的文件名
TEXT_INVERSION_NAME = "learned_embeds.bin"
# 定义安全版本的文本反转文件名
TEXT_INVERSION_NAME_SAFE = "learned_embeds.safetensors"

# 装饰器,用于验证 Hugging Face Hub 的参数
@validate_hf_hub_args
# 定义加载文本反转状态字典的函数
def load_textual_inversion_state_dicts(pretrained_model_name_or_paths, **kwargs):
    # 从关键字参数中获取缓存目录,默认为 None
    cache_dir = kwargs.pop("cache_dir", None)
    # 从关键字参数中获取是否强制下载的标志,默认为 False
    force_download = kwargs.pop("force_download", False)
    # 从关键字参数中获取代理设置,默认为 None
    proxies = kwargs.pop("proxies", None)
    # 从关键字参数中获取本地文件是否仅使用的标志,默认为 None
    local_files_only = kwargs.pop("local_files_only", None)
    # 从关键字参数中获取访问令牌,默认为 None
    token = kwargs.pop("token", None)
    # 从关键字参数中获取版本号,默认为 None
    revision = kwargs.pop("revision", None)
    # 从关键字参数中获取子文件夹名称,默认为 None
    subfolder = kwargs.pop("subfolder", None)
    # 从关键字参数中获取权重文件名,默认为 None
    weight_name = kwargs.pop("weight_name", None)
    # 从关键字参数中获取是否使用 safetensors 的标志,默认为 None
    use_safetensors = kwargs.pop("use_safetensors", None)

    # 设置允许使用 pickle 的标志为 False
    allow_pickle = False
    # 如果未指定使用 safetensors,则默认启用,并允许使用 pickle
    if use_safetensors is None:
        use_safetensors = True
        allow_pickle = True

    # 设置用户代理信息,用于标识请求的类型和框架
    user_agent = {
        "file_type": "text_inversion",
        "framework": "pytorch",
    }
    # 初始化状态字典列表
    state_dicts = []
    # 遍历预训练模型名称或路径列表
    for pretrained_model_name_or_path in pretrained_model_name_or_paths:
        # 检查当前项是否不是字典或张量
        if not isinstance(pretrained_model_name_or_path, (dict, torch.Tensor)):
            # 初始化模型文件为 None
            model_file = None

            # 尝试加载 .safetensors 权重
            if (use_safetensors and weight_name is None) or (
                weight_name is not None and weight_name.endswith(".safetensors")
            ):
                try:
                    # 获取模型文件,提供相关参数
                    model_file = _get_model_file(
                        pretrained_model_name_or_path,
                        weights_name=weight_name or TEXT_INVERSION_NAME_SAFE,
                        cache_dir=cache_dir,
                        force_download=force_download,
                        proxies=proxies,
                        local_files_only=local_files_only,
                        token=token,
                        revision=revision,
                        subfolder=subfolder,
                        user_agent=user_agent,
                    )
                    # 从文件中加载状态字典到 CPU 上
                    state_dict = safetensors.torch.load_file(model_file, device="cpu")
                except Exception as e:
                    # 如果不允许 pickle,抛出异常
                    if not allow_pickle:
                        raise e

                    # 如果加载失败,设置模型文件为 None
                    model_file = None

            # 如果模型文件仍然是 None,则尝试加载其他格式
            if model_file is None:
                model_file = _get_model_file(
                    pretrained_model_name_or_path,
                    weights_name=weight_name or TEXT_INVERSION_NAME,
                    cache_dir=cache_dir,
                    force_download=force_download,
                    proxies=proxies,
                    local_files_only=local_files_only,
                    token=token,
                    revision=revision,
                    subfolder=subfolder,
                    user_agent=user_agent,
                )
                # 从文件中加载状态字典
                state_dict = load_state_dict(model_file)
        else:
            # 如果当前项是字典或张量,直接使用它作为状态字典
            state_dict = pretrained_model_name_or_path

        # 将状态字典添加到列表中
        state_dicts.append(state_dict)

    # 返回状态字典列表
    return state_dicts
# 定义一个混合类,用于加载文本反转的标记和嵌入到分词器和文本编码器中
class TextualInversionLoaderMixin:
    r"""
    加载文本反转标记和嵌入到分词器和文本编码器中。
    """

    # 定义一个方法,根据输入的提示和分词器可能进行转换
    def maybe_convert_prompt(self, prompt: Union[str, List[str]], tokenizer: "PreTrainedTokenizer"):  # noqa: F821
        r"""
        处理包含特殊标记的提示,这些标记对应于多向量文本反转嵌入,将其替换为多个
        特殊标记,每个对应一个向量。如果提示没有文本反转标记或文本反转标记是单个向量,
        则返回输入提示。

        参数:
            prompt (`str` 或 list of `str`):
                引导图像生成的提示。
            tokenizer (`PreTrainedTokenizer`):
                负责将提示编码为输入标记的分词器。

        返回:
            `str` 或 list of `str`: 转换后的提示
        """
        # 检查输入提示是否为列表,如果不是则将其转换为列表
        if not isinstance(prompt, List):
            prompts = [prompt]
        else:
            prompts = prompt

        # 对每个提示应用可能的转换
        prompts = [self._maybe_convert_prompt(p, tokenizer) for p in prompts]

        # 如果输入提示不是列表,则返回第一个转换后的提示
        if not isinstance(prompt, List):
            return prompts[0]

        # 返回转换后的提示列表
        return prompts

    # 定义一个私有方法,可能将提示转换为“多向量”兼容提示
    def _maybe_convert_prompt(self, prompt: str, tokenizer: "PreTrainedTokenizer"):  # noqa: F821
        r"""
        可能将提示转换为“多向量”兼容提示。如果提示包含一个与多向量文本反转嵌入
        对应的标记,该函数将处理提示,使特殊标记被多个特殊标记替换,每个对应一个向量。
        如果提示没有文本反转标记或文本反转标记是单个向量,则简单返回输入提示。

        参数:
            prompt (`str`):
                引导图像生成的提示。
            tokenizer (`PreTrainedTokenizer`):
                负责将提示编码为输入标记的分词器。

        返回:
            `str`: 转换后的提示
        """
        # 使用分词器对提示进行分词
        tokens = tokenizer.tokenize(prompt)
        # 创建一个唯一标记的集合
        unique_tokens = set(tokens)
        # 遍历唯一标记
        for token in unique_tokens:
            # 检查标记是否在添加的标记编码器中
            if token in tokenizer.added_tokens_encoder:
                replacement = token  # 初始化替换变量为当前标记
                i = 1  # 初始化计数器
                # 生成替换标记,直到没有更多的标记存在
                while f"{token}_{i}" in tokenizer.added_tokens_encoder:
                    replacement += f" {token}_{i}"  # 添加计数到替换变量
                    i += 1  # 增加计数器

                # 在提示中替换原始标记为生成的替换标记
                prompt = prompt.replace(token, replacement)

        # 返回最终的提示
        return prompt
    # 定义检查文本反转输入参数的私有方法
    def _check_text_inv_inputs(self, tokenizer, text_encoder, pretrained_model_name_or_paths, tokens):
        # 检查传入的 tokenizer 是否为 None,如果是,则抛出 ValueError
        if tokenizer is None:
            raise ValueError(
                # 报错信息,说明需要提供 tokenizer 参数
                f"{self.__class__.__name__} requires `self.tokenizer` or passing a `tokenizer` of type `PreTrainedTokenizer` for calling"
                f" `{self.load_textual_inversion.__name__}`"
            )

        # 检查传入的 text_encoder 是否为 None,如果是,则抛出 ValueError
        if text_encoder is None:
            raise ValueError(
                # 报错信息,说明需要提供 text_encoder 参数
                f"{self.__class__.__name__} requires `self.text_encoder` or passing a `text_encoder` of type `PreTrainedModel` for calling"
                f" `{self.load_textual_inversion.__name__}`"
            )

        # 检查预训练模型名称列表的长度与 tokens 列表的长度是否一致
        if len(pretrained_model_name_or_paths) > 1 and len(pretrained_model_name_or_paths) != len(tokens):
            raise ValueError(
                # 报错信息,说明模型列表与 tokens 列表的长度不匹配
                f"You have passed a list of models of length {len(pretrained_model_name_or_paths)}, and list of tokens of length {len(tokens)} "
                f"Make sure both lists have the same length."
            )

        # 过滤出有效的 tokens,即不为 None 的 tokens
        valid_tokens = [t for t in tokens if t is not None]
        # 检查有效 tokens 的集合长度是否小于有效 tokens 的列表长度,如果是,则说明有重复
        if len(set(valid_tokens)) < len(valid_tokens):
            raise ValueError(f"You have passed a list of tokens that contains duplicates: {tokens}")

    # 定义一个静态方法
    @staticmethod
    # 定义一个私有方法,用于检索 tokens 和 embeddings
        def _retrieve_tokens_and_embeddings(tokens, state_dicts, tokenizer):
            # 初始化空列表以存储所有 tokens 和 embeddings
            all_tokens = []
            all_embeddings = []
            # 同时遍历状态字典和 tokens
            for state_dict, token in zip(state_dicts, tokens):
                # 检查状态字典是否为 PyTorch 张量
                if isinstance(state_dict, torch.Tensor):
                    # 如果 token 为 None,抛出错误
                    if token is None:
                        raise ValueError(
                            "You are trying to load a textual inversion embedding that has been saved as a PyTorch tensor. Make sure to pass the name of the corresponding token in this case: `token=...`."
                        )
                    # 加载 token 和 embedding
                    loaded_token = token
                    embedding = state_dict
                # 检查状态字典是否只包含一个键
                elif len(state_dict) == 1:
                    # 处理 diffusers 格式
                    loaded_token, embedding = next(iter(state_dict.items()))
                # 检查状态字典是否包含 "string_to_param" 键
                elif "string_to_param" in state_dict:
                    # 处理 A1111 格式
                    loaded_token = state_dict["name"]
                    embedding = state_dict["string_to_param"]["*"]
                else:
                    # 抛出状态字典格式错误的错误
                    raise ValueError(
                        f"Loaded state dictionary is incorrect: {state_dict}. \n\n"
                        "Please verify that the loaded state dictionary of the textual embedding either only has a single key or includes the `string_to_param`"
                        " input key."
                    )
    
                # 如果 token 不为 None 且加载的 token 与当前 token 不同,记录日志
                if token is not None and loaded_token != token:
                    logger.info(f"The loaded token: {loaded_token} is overwritten by the passed token {token}.")
                else:
                    # 将加载的 token 赋值给当前 token
                    token = loaded_token
    
                # 检查 token 是否已经在 tokenizer 的词汇表中
                if token in tokenizer.get_vocab():
                    # 如果已存在,抛出错误
                    raise ValueError(
                        f"Token {token} already in tokenizer vocabulary. Please choose a different token name or remove {token} and embedding from the tokenizer and text encoder."
                    )
    
                # 将 token 和 embedding 添加到对应列表中
                all_tokens.append(token)
                all_embeddings.append(embedding)
    
            # 返回所有的 tokens 和 embeddings
            return all_tokens, all_embeddings
    
        # 声明该方法为静态方法
        @staticmethod
    # 扩展给定的令牌和嵌入,将多向量令牌和其嵌入整合到一起
    def _extend_tokens_and_embeddings(tokens, embeddings, tokenizer):
        # 初始化一个空列表以存储所有令牌
        all_tokens = []
        # 初始化一个空列表以存储所有嵌入
        all_embeddings = []
    
        # 遍历嵌入和令牌的配对
        for embedding, token in zip(embeddings, tokens):
            # 检查令牌是否已经在词汇表中
            if f"{token}_1" in tokenizer.get_vocab():
                # 如果令牌已经存在,初始化多向量令牌列表
                multi_vector_tokens = [token]
                # 初始化索引
                i = 1
                # 检查是否有后续的多向量令牌
                while f"{token}_{i}" in tokenizer.added_tokens_encoder:
                    # 将多向量令牌添加到列表中
                    multi_vector_tokens.append(f"{token}_{i}")
                    # 递增索引
                    i += 1
    
                # 抛出异常,提示多向量令牌已经存在
                raise ValueError(
                    f"Multi-vector Token {multi_vector_tokens} already in tokenizer vocabulary. Please choose a different token name or remove the {multi_vector_tokens} and embedding from the tokenizer and text encoder."
                )
    
            # 判断当前嵌入是否为多维向量
            is_multi_vector = len(embedding.shape) > 1 and embedding.shape[0] > 1
            if is_multi_vector:
                # 如果是多维向量,将令牌及其索引添加到列表中
                all_tokens += [token] + [f"{token}_{i}" for i in range(1, embedding.shape[0])]
                # 添加对应的所有嵌入到列表中
                all_embeddings += [e for e in embedding]  # noqa: C416
            else:
                # 如果不是多维向量,仅添加当前令牌
                all_tokens += [token]
                # 根据嵌入的维度添加嵌入
                all_embeddings += [embedding[0]] if len(embedding.shape) > 1 else [embedding]
    
        # 返回所有令牌和嵌入的列表
        return all_tokens, all_embeddings
    
    # 装饰器,用于验证 Hugging Face Hub 参数
    @validate_hf_hub_args
    # 加载文本反转(Textual Inversion)模型
    def load_textual_inversion(
        # 预训练模型的名称或路径,支持多种格式
        pretrained_model_name_or_path: Union[str, List[str], Dict[str, torch.Tensor], List[Dict[str, torch.Tensor]]],
        # 可选的令牌
        token: Optional[Union[str, List[str]]] = None,
        # 可选的预训练分词器
        tokenizer: Optional["PreTrainedTokenizer"] = None,  # noqa: F821
        # 可选的文本编码器
        text_encoder: Optional["PreTrainedModel"] = None,  # noqa: F821
        # 其他关键字参数
        **kwargs,
    ):
        # 省略具体实现
    
    # 卸载文本反转(Textual Inversion)模型
    def unload_textual_inversion(
        # 可选的令牌
        tokens: Optional[Union[str, List[str]]] = None,
        # 可选的预训练分词器
        tokenizer: Optional["PreTrainedTokenizer"] = None,
        # 可选的文本编码器
        text_encoder: Optional["PreTrainedModel"] = None,
    ):
        # 省略具体实现

.\diffusers\loaders\unet.py

# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行授权;
# 除非遵循该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件
# 在许可证下分发是按“原样”基础进行的,
# 不提供任何形式的明示或暗示的担保或条件。
# 有关许可证下特定语言管理权限和
# 限制的信息,请参阅许可证。
import os  # 导入操作系统模块,提供与操作系统交互的功能
from collections import defaultdict  # 导入默认字典,用于处理键不存在时的默认值
from contextlib import nullcontext  # 导入空上下文管理器,提供不做任何操作的上下文
from pathlib import Path  # 导入路径库,便于处理文件路径
from typing import Callable, Dict, Union  # 导入类型提示,提供函数、字典和联合类型的支持

import safetensors  # 导入 safetensors 库,处理安全张量
import torch  # 导入 PyTorch 库,深度学习框架
import torch.nn.functional as F  # 导入 PyTorch 的神经网络功能模块
from huggingface_hub.utils import validate_hf_hub_args  # 导入用于验证 Hugging Face Hub 参数的工具
from torch import nn  # 从 PyTorch 导入神经网络模块

from ..models.embeddings import (  # 从父级目录导入嵌入模型
    ImageProjection,  # 导入图像投影类
    IPAdapterFaceIDImageProjection,  # 导入人脸识别图像投影类
    IPAdapterFaceIDPlusImageProjection,  # 导入增强的人脸识别图像投影类
    IPAdapterFullImageProjection,  # 导入完整图像投影类
    IPAdapterPlusImageProjection,  # 导入增强图像投影类
    MultiIPAdapterImageProjection,  # 导入多种图像投影类
)
from ..models.modeling_utils import load_model_dict_into_meta, load_state_dict  # 导入模型加载工具
from ..utils import (  # 从父级目录导入工具模块
    USE_PEFT_BACKEND,  # 导入使用 PEFT 后端的标志
    _get_model_file,  # 导入获取模型文件的函数
    convert_unet_state_dict_to_peft,  # 导入转换 UNet 状态字典到 PEFT 的函数
    get_adapter_name,  # 导入获取适配器名称的函数
    get_peft_kwargs,  # 导入获取 PEFT 参数的函数
    is_accelerate_available,  # 导入检查加速可用性的函数
    is_peft_version,  # 导入检查 PEFT 版本的函数
    is_torch_version,  # 导入检查 PyTorch 版本的函数
    logging,  # 导入日志模块
)
from .lora_pipeline import LORA_WEIGHT_NAME, LORA_WEIGHT_NAME_SAFE, TEXT_ENCODER_NAME, UNET_NAME  # 从当前目录导入 LoRA 权重名称和模型名称
from .utils import AttnProcsLayers  # 从当前目录导入注意力处理层

if is_accelerate_available():  # 检查是否可以使用加速功能
    from accelerate.hooks import AlignDevicesHook, CpuOffload, remove_hook_from_module  # 导入加速库的钩子函数

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器

CUSTOM_DIFFUSION_WEIGHT_NAME = "pytorch_custom_diffusion_weights.bin"  # 定义自定义扩散权重的文件名
CUSTOM_DIFFUSION_WEIGHT_NAME_SAFE = "pytorch_custom_diffusion_weights.safetensors"  # 定义安全自定义扩散权重的文件名


class UNet2DConditionLoadersMixin:  # 定义一个混合类,用于加载 LoRA 层
    """
    将 LoRA 层加载到 [`UNet2DCondtionModel`] 中。
    """  # 类文档字符串,说明该类的作用

    text_encoder_name = TEXT_ENCODER_NAME  # 定义文本编码器名称
    unet_name = UNET_NAME  # 定义 UNet 名称

    @validate_hf_hub_args  # 使用装饰器验证 Hugging Face Hub 参数
    # 定义处理自定义扩散的方法,接收状态字典作为参数
    def _process_custom_diffusion(self, state_dict):
        # 从模块中导入自定义扩散注意力处理器
        from ..models.attention_processor import CustomDiffusionAttnProcessor

        # 初始化空字典,用于存储注意力处理器
        attn_processors = {}
        # 使用 defaultdict 初始化一个字典,用于分组自定义扩散数据
        custom_diffusion_grouped_dict = defaultdict(dict)
        
        # 遍历状态字典中的每一项
        for key, value in state_dict.items():
            # 如果当前值为空,设置分组字典的对应键为空字典
            if len(value) == 0:
                custom_diffusion_grouped_dict[key] = {}
            else:
                # 如果键中包含"to_out",则提取相应的处理器键和子键
                if "to_out" in key:
                    attn_processor_key, sub_key = ".".join(key.split(".")[:-3]), ".".join(key.split(".")[-3:])
                else:
                    # 否则,按另一种方式提取处理器键和子键
                    attn_processor_key, sub_key = ".".join(key.split(".")[:-2]), ".".join(key.split(".")[-2:])
                # 将值存储到分组字典中
                custom_diffusion_grouped_dict[attn_processor_key][sub_key] = value

        # 遍历分组字典中的每一项
        for key, value_dict in custom_diffusion_grouped_dict.items():
            # 如果值字典为空,初始化自定义扩散注意力处理器
            if len(value_dict) == 0:
                attn_processors[key] = CustomDiffusionAttnProcessor(
                    train_kv=False, train_q_out=False, hidden_size=None, cross_attention_dim=None
                )
            else:
                # 获取交叉注意力维度
                cross_attention_dim = value_dict["to_k_custom_diffusion.weight"].shape[1]
                # 获取隐藏层大小
                hidden_size = value_dict["to_k_custom_diffusion.weight"].shape[0]
                # 判断是否训练 q 输出
                train_q_out = True if "to_q_custom_diffusion.weight" in value_dict else False
                # 初始化自定义扩散注意力处理器并传入参数
                attn_processors[key] = CustomDiffusionAttnProcessor(
                    train_kv=True,
                    train_q_out=train_q_out,
                    hidden_size=hidden_size,
                    cross_attention_dim=cross_attention_dim,
                )
                # 加载状态字典到注意力处理器
                attn_processors[key].load_state_dict(value_dict)

        # 返回注意力处理器的字典
        return attn_processors

    # 类方法的装饰器
    @classmethod
    # 从 diffusers.loaders.lora_base.LoraBaseMixin 中复制的方法,用于选择性禁用卸载功能
    # 定义一个类方法,用于选择性地禁用模型的 CPU 离线
    def _optionally_disable_offloading(cls, _pipeline):
        """
        可选地移除离线处理,如果管道已经被顺序离线到 CPU。

        Args:
            _pipeline (`DiffusionPipeline`):
                需要禁用离线处理的管道。

        Returns:
            tuple:
                一个元组,指示 `is_model_cpu_offload` 或 `is_sequential_cpu_offload` 是否为 True。
        """
        # 初始化模型 CPU 离线标志为 False
        is_model_cpu_offload = False
        # 初始化顺序 CPU 离线标志为 False
        is_sequential_cpu_offload = False

        # 如果管道不为 None 且 hf_device_map 为 None
        if _pipeline is not None and _pipeline.hf_device_map is None:
            # 遍历管道中的每个组件
            for _, component in _pipeline.components.items():
                # 检查组件是否为 nn.Module 类型并且具有 _hf_hook 属性
                if isinstance(component, nn.Module) and hasattr(component, "_hf_hook"):
                    # 如果模型尚未 CPU 离线
                    if not is_model_cpu_offload:
                        # 检查组件的 _hf_hook 是否为 CpuOffload 类型
                        is_model_cpu_offload = isinstance(component._hf_hook, CpuOffload)
                    # 如果顺序离线尚未设置
                    if not is_sequential_cpu_offload:
                        # 检查 _hf_hook 是否为 AlignDevicesHook 类型,或者其 hooks 属性的第一个元素是否为 AlignDevicesHook
                        is_sequential_cpu_offload = (
                            isinstance(component._hf_hook, AlignDevicesHook)
                            or hasattr(component._hf_hook, "hooks")
                            and isinstance(component._hf_hook.hooks[0], AlignDevicesHook)
                        )

                    # 记录信息,指示检测到加速钩子并即将移除之前的钩子
                    logger.info(
                        "Accelerate hooks detected. Since you have called `load_lora_weights()`, the previous hooks will be first removed. Then the LoRA parameters will be loaded and the hooks will be applied again."
                    )
                    # 从组件中移除钩子,是否递归取决于顺序离线标志
                    remove_hook_from_module(component, recurse=is_sequential_cpu_offload)

        # 返回 CPU 离线标志的元组
        return (is_model_cpu_offload, is_sequential_cpu_offload)

    # 定义保存注意力处理器的方法
    def save_attn_procs(
        # 保存目录,支持字符串或路径对象
        save_directory: Union[str, os.PathLike],
        # 主进程标志,默认值为 True
        is_main_process: bool = True,
        # 权重名称,默认值为 None
        weight_name: str = None,
        # 保存功能,默认值为 None
        save_function: Callable = None,
        # 安全序列化标志,默认值为 True
        safe_serialization: bool = True,
        # 其他关键字参数
        **kwargs,
    ):
        # 定义获取自定义扩散状态字典的方法
        def _get_custom_diffusion_state_dict(self):
            # 从模型中导入自定义注意力处理器
            from ..models.attention_processor import (
                CustomDiffusionAttnProcessor,
                CustomDiffusionAttnProcessor2_0,
                CustomDiffusionXFormersAttnProcessor,
            )

            # 创建要保存的注意力处理器层
            model_to_save = AttnProcsLayers(
                {
                    # 过滤出类型为自定义注意力处理器的项
                    y: x
                    for (y, x) in self.attn_processors.items()
                    if isinstance(
                        x,
                        (
                            CustomDiffusionAttnProcessor,
                            CustomDiffusionAttnProcessor2_0,
                            CustomDiffusionXFormersAttnProcessor,
                        ),
                    )
                }
            )
            # 获取模型的状态字典
            state_dict = model_to_save.state_dict()
            # 遍历注意力处理器
            for name, attn in self.attn_processors.items():
                # 如果当前注意力处理器的状态字典为空
                if len(attn.state_dict()) == 0:
                    # 在状态字典中为该名称添加空字典
                    state_dict[name] = {}

            # 返回状态字典
            return state_dict
    # 加载 IP 适配器权重的私有方法
        def _load_ip_adapter_weights(self, state_dicts, low_cpu_mem_usage=False):
            # 检查 state_dicts 是否为列表,如果不是则转换为列表
            if not isinstance(state_dicts, list):
                state_dicts = [state_dicts]
    
            # 如果已有编码器隐藏投影且配置为文本投影,则赋值给文本编码器隐藏投影
            if (
                self.encoder_hid_proj is not None
                and self.config.encoder_hid_dim_type == "text_proj"
                and not hasattr(self, "text_encoder_hid_proj")
            ):
                self.text_encoder_hid_proj = self.encoder_hid_proj
    
            # 在加载 IP 适配器权重后将 encoder_hid_proj 设置为 None
            self.encoder_hid_proj = None
    
            # 将 IP 适配器的注意力处理器转换为 Diffusers 格式
            attn_procs = self._convert_ip_adapter_attn_to_diffusers(state_dicts, low_cpu_mem_usage=low_cpu_mem_usage)
            # 设置注意力处理器
            self.set_attn_processor(attn_procs)
    
            # 转换 IP 适配器图像投影层为 Diffusers 格式
            image_projection_layers = []
            # 遍历每个 state_dict,转换图像投影层
            for state_dict in state_dicts:
                image_projection_layer = self._convert_ip_adapter_image_proj_to_diffusers(
                    state_dict["image_proj"], low_cpu_mem_usage=low_cpu_mem_usage
                )
                # 将转换后的图像投影层添加到列表中
                image_projection_layers.append(image_projection_layer)
    
            # 创建多重 IP 适配器图像投影并赋值给 encoder_hid_proj
            self.encoder_hid_proj = MultiIPAdapterImageProjection(image_projection_layers)
            # 更新编码器隐藏维度类型为图像投影
            self.config.encoder_hid_dim_type = "ip_image_proj"
    
            # 将模型转移到指定的数据类型和设备
            self.to(dtype=self.dtype, device=self.device)
    # 加载 IP 适配器的 LoRA 权重,返回包含这些权重的字典
    def _load_ip_adapter_loras(self, state_dicts):
        # 初始化空字典以存储 LoRA 权重
        lora_dicts = {}
        # 遍历注意力处理器的键值对,获取索引和名称
        for key_id, name in enumerate(self.attn_processors.keys()):
            # 遍历每个状态字典
            for i, state_dict in enumerate(state_dicts):
                # 检查当前状态字典中是否包含特定的 LoRA 权重
                if f"{key_id}.to_k_lora.down.weight" in state_dict["ip_adapter"]:
                    # 如果该索引不在字典中,则初始化为空字典
                    if i not in lora_dicts:
                        lora_dicts[i] = {}
                    # 更新字典,添加 'to_k_lora.down.weight' 的权重
                    lora_dicts[i].update(
                        {
                            f"unet.{name}.to_k_lora.down.weight": state_dict["ip_adapter"][
                                f"{key_id}.to_k_lora.down.weight"
                            ]
                        }
                    )
                    # 更新字典,添加 'to_q_lora.down.weight' 的权重
                    lora_dicts[i].update(
                        {
                            f"unet.{name}.to_q_lora.down.weight": state_dict["ip_adapter"][
                                f"{key_id}.to_q_lora.down.weight"
                            ]
                        }
                    )
                    # 更新字典,添加 'to_v_lora.down.weight' 的权重
                    lora_dicts[i].update(
                        {
                            f"unet.{name}.to_v_lora.down.weight": state_dict["ip_adapter"][
                                f"{key_id}.to_v_lora.down.weight"
                            ]
                        }
                    )
                    # 更新字典,添加 'to_out_lora.down.weight' 的权重
                    lora_dicts[i].update(
                        {
                            f"unet.{name}.to_out_lora.down.weight": state_dict["ip_adapter"][
                                f"{key_id}.to_out_lora.down.weight"
                            ]
                        }
                    )
                    # 更新字典,添加 'to_k_lora.up.weight' 的权重
                    lora_dicts[i].update(
                        {f"unet.{name}.to_k_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_k_lora.up.weight"]}
                    )
                    # 更新字典,添加 'to_q_lora.up.weight' 的权重
                    lora_dicts[i].update(
                        {f"unet.{name}.to_q_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_q_lora.up.weight"]}
                    )
                    # 更新字典,添加 'to_v_lora.up.weight' 的权重
                    lora_dicts[i].update(
                        {f"unet.{name}.to_v_lora.up.weight": state_dict["ip_adapter"][f"{key_id}.to_v_lora.up.weight"]}
                    )
                    # 更新字典,添加 'to_out_lora.up.weight' 的权重
                    lora_dicts[i].update(
                        {
                            f"unet.{name}.to_out_lora.up.weight": state_dict["ip_adapter"][
                                f"{key_id}.to_out_lora.up.weight"
                            ]
                        }
                    )
        # 返回包含所有 LoRA 权重的字典
        return lora_dicts

.\diffusers\loaders\unet_loader_utils.py

# 版权声明,标识本代码的版权所有者及其保留的权利
# 
# 根据 Apache License, Version 2.0 进行许可;除非符合许可条款,否则不得使用此文件
# 可以在以下网址获取许可副本
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用的法律或书面协议另有规定,否则根据许可分发的软件以 "按现状" 基础提供,
# 不附加任何明示或暗示的担保或条件
# 请参阅许可协议以了解有关许可及其限制的详细信息
import copy  # 导入 copy 模块,用于对象的浅拷贝或深拷贝
from typing import TYPE_CHECKING, Dict, List, Union  # 导入类型注释支持

from ..utils import logging  # 从上级模块导入 logging 功能


if TYPE_CHECKING:
    # 在这里导入以避免循环导入问题
    from ..models import UNet2DConditionModel  # 从上级模块导入 UNet2DConditionModel

logger = logging.get_logger(__name__)  # 获取当前模块的日志记录器,禁用 pylint 的命名检查


def _translate_into_actual_layer_name(name):
    """将用户友好的名称(例如 'mid')转换为实际层名称(例如 'mid_block.attentions.0')"""
    if name == "mid":
        return "mid_block.attentions.0"  # 如果名称是 'mid',返回其对应的实际层名

    updown, block, attn = name.split(".")  # 将名称按 '.' 分割成上下文、块和注意力部分

    updown = updown.replace("down", "down_blocks").replace("up", "up_blocks")  # 替换上下文中的 'down' 和 'up'
    block = block.replace("block_", "")  # 去掉块名称中的 'block_' 前缀
    attn = "attentions." + attn  # 将注意力部分格式化为完整名称

    return ".".join((updown, block, attn))  # 将所有部分合并为实际层名并返回


def _maybe_expand_lora_scales(
    unet: "UNet2DConditionModel", weight_scales: List[Union[float, Dict]], default_scale=1.0
):
    # 可能扩展 LoRA 权重比例,接受 UNet 模型和权重比例列表作为参数
    blocks_with_transformer = {
        "down": [i for i, block in enumerate(unet.down_blocks) if hasattr(block, "attentions")],
        # 找到下层块中具有注意力层的块索引
        "up": [i for i, block in enumerate(unet.up_blocks) if hasattr(block, "attentions")]
        # 找到上层块中具有注意力层的块索引
    }
    transformer_per_block = {"down": unet.config.layers_per_block, "up": unet.config.layers_per_block + 1}
    # 创建字典,包含每个块的变换层数量

    expanded_weight_scales = [
        _maybe_expand_lora_scales_for_one_adapter(
            weight_for_adapter,
            blocks_with_transformer,
            transformer_per_block,
            unet.state_dict(),
            default_scale=default_scale,
        )
        # 对每个适配器的权重调用扩展函数,生成扩展后的权重比例列表
        for weight_for_adapter in weight_scales
    ]

    return expanded_weight_scales  # 返回扩展后的权重比例


def _maybe_expand_lora_scales_for_one_adapter(
    scales: Union[float, Dict],
    blocks_with_transformer: Dict[str, int],
    transformer_per_block: Dict[str, int],
    state_dict: None,
    default_scale: float = 1.0,
):
    """
    将输入扩展为更细粒度的字典。以下示例提供了更多细节。

    参数:
        scales (`Union[float, Dict]`):
            要扩展的比例字典。
        blocks_with_transformer (`Dict[str, int]`):
            包含 'up' 和 'down' 键的字典,显示哪些块具有变换层
        transformer_per_block (`Dict[str, int]`):
            包含 'up' 和 'down' 键的字典,显示每个块的变换层数量

    例如,转换
    ```python
    scales = {"down": 2, "mid": 3, "up": {"block_0": 4, "block_1": [5, 6, 7]}}
```py 
    # 定义一个字典,表示每个方向的块及其对应的编号
        blocks_with_transformer = {"down": [1, 2], "up": [0, 1]}
        # 定义一个字典,表示每个方向的块需要的变换器数量
        transformer_per_block = {"down": 2, "up": 3}
        # 如果 blocks_with_transformer 的键不是 "down" 和 "up",则抛出错误
        if sorted(blocks_with_transformer.keys()) != ["down", "up"]:
            raise ValueError("blocks_with_transformer needs to be a dict with keys `'down' and `'up'`")
        # 如果 transformer_per_block 的键不是 "down" 和 "up",则抛出错误
        if sorted(transformer_per_block.keys()) != ["down", "up"]:
            raise ValueError("transformer_per_block needs to be a dict with keys `'down' and `'up'`")
        # 如果 scales 不是字典类型,则直接返回其值
        if not isinstance(scales, dict):
            # don't expand if scales is a single number
            return scales
        # 复制 scales 的深拷贝,以避免修改原始数据
        scales = copy.deepcopy(scales)
        # 如果 scales 中没有 "mid",则赋予默认比例
        if "mid" not in scales:
            scales["mid"] = default_scale
        # 如果 "mid" 是列表类型且仅有一个元素,则将其转换为该元素
        elif isinstance(scales["mid"], list):
            if len(scales["mid"]) == 1:
                scales["mid"] = scales["mid"][0]
            # 如果 "mid" 列表元素个数不为 1,则抛出错误
            else:
                raise ValueError(f"Expected 1 scales for mid, got {len(scales['mid'])}.")
        # 遍历方向 "up" 和 "down"
        for updown in ["up", "down"]:
            # 如果当前方向不在 scales 中,则赋予默认比例
            if updown not in scales:
                scales[updown] = default_scale
            # 如果当前方向的比例不是字典,则将其转换为字典格式
            if not isinstance(scales[updown], dict):
                scales[updown] = {f"block_{i}": copy.deepcopy(scales[updown]) for i in blocks_with_transformer[updown]}
            # 遍历当前方向的每个块
            for i in blocks_with_transformer[updown]:
                block = f"block_{i}"
                # 如果当前块未赋值,则设置为默认比例
                if block not in scales[updown]:
                    scales[updown][block] = default_scale
                # 如果块的比例不是列表,则转换为列表格式
                if not isinstance(scales[updown][block], list):
                    scales[updown][block] = [scales[updown][block] for _ in range(transformer_per_block[updown])]
                # 如果块的比例列表仅有一个元素,则扩展为多个元素
                elif len(scales[updown][block]) == 1:
                    scales[updown][block] = scales[updown][block] * transformer_per_block[updown]
                # 如果块的比例列表长度不匹配,则抛出错误
                elif len(scales[updown][block]) != transformer_per_block[updown]:
                    raise ValueError(
                        f"Expected {transformer_per_block[updown]} scales for {updown}.{block}, got {len(scales[updown][block])}."
                    )
            # 将 scales 中的当前方向块转换为扁平格式
            for i in blocks_with_transformer[updown]:
                block = f"block_{i}"
                for tf_idx, value in enumerate(scales[updown][block]):
                    scales[f"{updown}.{block}.{tf_idx}"] = value
            # 删除 scales 中当前方向的条目
            del scales[updown]
    # 遍历 scales 字典中的每一层
        for layer in scales.keys():
            # 检查该层是否在 state_dict 中存在
            if not any(_translate_into_actual_layer_name(layer) in module for module in state_dict.keys()):
                # 如果不存在,抛出值错误,提示该层无法设置 lora 缩放
                raise ValueError(
                    f"Can't set lora scale for layer {layer}. It either doesn't exist in this unet or it has no attentions."
                )
    
        # 返回一个字典,键为实际层名,值为对应的权重
        return {_translate_into_actual_layer_name(name): weight for name, weight in scales.items()}

.\diffusers\loaders\utils.py

# 版权声明,表明此文件的所有权归 HuggingFace 团队所有
# 
# 根据 Apache 许可证第2.0版(“许可证”)许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# 除非适用法律要求或书面同意,否则根据许可证分发的软件是基于“按现状”提供的,
# 不附带任何明示或暗示的保证或条件。
# 请参阅许可证以了解与权限和
# 限制相关的特定语言。

# 导入类型字典
from typing import Dict

# 导入 PyTorch 库
import torch

# 定义一个名为 AttnProcsLayers 的类,继承自 torch.nn.Module
class AttnProcsLayers(torch.nn.Module):
    # 初始化方法,接受一个字典作为参数
    def __init__(self, state_dict: Dict[str, torch.Tensor]):
        # 调用父类的初始化方法
        super().__init__()
        # 将字典的值存储到一个 ModuleList 中
        self.layers = torch.nn.ModuleList(state_dict.values())
        # 创建一个映射字典,将索引与键关联
        self.mapping = dict(enumerate(state_dict.keys()))
        # 创建反向映射字典,将键与索引关联
        self.rev_mapping = {v: k for k, v in enumerate(state_dict.keys())}

        # 定义用于分割的关键字列表,分别用于处理器和自注意力
        self.split_keys = [".processor", ".self_attn"]

        # 定义一个将状态字典映射到模块的函数
        def map_to(module, state_dict, *args, **kwargs):
            new_state_dict = {}  # 创建一个新的状态字典
            # 遍历状态字典中的每个键值对
            for key, value in state_dict.items():
                num = int(key.split(".")[1])  # 提取数字部分,0 总是“layers”
                # 根据映射生成新的键
                new_key = key.replace(f"layers.{num}", module.mapping[num])
                new_state_dict[new_key] = value  # 存储到新字典中

            return new_state_dict  # 返回新状态字典

        # 定义一个用于重新映射键的函数
        def remap_key(key, state_dict):
            # 遍历分割关键字
            for k in self.split_keys:
                if k in key:  # 如果键包含分割关键字
                    return key.split(k)[0] + k  # 返回处理后的键

            # 如果没有找到匹配的分割关键字,抛出错误
            raise ValueError(
                f"There seems to be a problem with the state_dict: {set(state_dict.keys())}. {key} has to have one of {self.split_keys}."
            )

        # 定义一个将状态字典映射回模块的函数
        def map_from(module, state_dict, *args, **kwargs):
            all_keys = list(state_dict.keys())  # 获取所有键
            # 遍历所有键
            for key in all_keys:
                replace_key = remap_key(key, state_dict)  # 重新映射键
                # 生成新的键
                new_key = key.replace(replace_key, f"layers.{module.rev_mapping[replace_key]}")
                state_dict[new_key] = state_dict[key]  # 更新状态字典
                del state_dict[key]  # 删除旧的键

        # 注册状态字典钩子以实现映射
        self._register_state_dict_hook(map_to)
        # 注册加载状态字典前的钩子以实现映射
        self._register_load_state_dict_pre_hook(map_from, with_module=True)

.\diffusers\loaders\__init__.py

# 引入类型检查支持
from typing import TYPE_CHECKING

# 从上级目录导入工具函数和常量
from ..utils import DIFFUSERS_SLOW_IMPORT, _LazyModule, deprecate
from ..utils.import_utils import is_peft_available, is_torch_available, is_transformers_available


# 定义获取文本编码器 LORA 状态字典的函数
def text_encoder_lora_state_dict(text_encoder):
    # 发出关于函数即将弃用的警告
    deprecate(
        "text_encoder_load_state_dict in `models`",
        "0.27.0",
        "`text_encoder_lora_state_dict` is deprecated and will be removed in 0.27.0. Make sure to retrieve the weights using `get_peft_model`. See https://huggingface.co/docs/peft/v0.6.2/en/quicktour#peftmodel for more information.",
    )
    # 初始化状态字典
    state_dict = {}

    # 遍历文本编码器的注意力模块
    for name, module in text_encoder_attn_modules(text_encoder):
        # 获取 q_proj 线性层的状态字典并更新状态字典
        for k, v in module.q_proj.lora_linear_layer.state_dict().items():
            state_dict[f"{name}.q_proj.lora_linear_layer.{k}"] = v

        # 获取 k_proj 线性层的状态字典并更新状态字典
        for k, v in module.k_proj.lora_linear_layer.state_dict().items():
            state_dict[f"{name}.k_proj.lora_linear_layer.{k}"] = v

        # 获取 v_proj 线性层的状态字典并更新状态字典
        for k, v in module.v_proj.lora_linear_layer.state_dict().items():
            state_dict[f"{name}.v_proj.lora_linear_layer.{k}"] = v

        # 获取 out_proj 线性层的状态字典并更新状态字典
        for k, v in module.out_proj.lora_linear_layer.state_dict().items():
            state_dict[f"{name}.out_proj.lora_linear_layer.{k}"] = v

    # 返回构建的状态字典
    return state_dict


# 检查是否可用 Transformers 库
if is_transformers_available():

    # 定义获取文本编码器注意力模块的函数
    def text_encoder_attn_modules(text_encoder):
        # 发出关于函数即将弃用的警告
        deprecate(
            "text_encoder_attn_modules in `models`",
            "0.27.0",
            "`text_encoder_lora_state_dict` is deprecated and will be removed in 0.27.0. Make sure to retrieve the weights using `get_peft_model`. See https://huggingface.co/docs/peft/v0.6.2/en/quicktour#peftmodel for more information.",
        )
        # 从 Transformers 导入相关模型
        from transformers import CLIPTextModel, CLIPTextModelWithProjection

        # 初始化注意力模块列表
        attn_modules = []

        # 检查文本编码器的类型并获取相应的注意力模块
        if isinstance(text_encoder, (CLIPTextModel, CLIPTextModelWithProjection)):
            for i, layer in enumerate(text_encoder.text_model.encoder.layers):
                name = f"text_model.encoder.layers.{i}.self_attn"
                mod = layer.self_attn
                attn_modules.append((name, mod))
        else:
            # 如果不认识的编码器类型,抛出错误
            raise ValueError(f"do not know how to get attention modules for: {text_encoder.__class__.__name__}")

        # 返回注意力模块列表
        return attn_modules


# 初始化导入结构字典
_import_structure = {}

# 检查是否可用 PyTorch 库
if is_torch_available():
    # 更新导入结构以包含单文件模型
    _import_structure["single_file_model"] = ["FromOriginalModelMixin"]

    # 更新导入结构以包含 UNet
    _import_structure["unet"] = ["UNet2DConditionLoadersMixin"]
    # 更新导入结构以包含工具函数
    _import_structure["utils"] = ["AttnProcsLayers"]
    # 检查是否可以使用 transformers 库
        if is_transformers_available():
            # 将 "single_file" 模块的导入结构更新为包含 FromSingleFileMixin 类
            _import_structure["single_file"] = ["FromSingleFileMixin"]
            # 将 "lora_pipeline" 模块的导入结构更新为包含多个 LoraLoaderMixin 类
            _import_structure["lora_pipeline"] = [
                "AmusedLoraLoaderMixin",  # 包含 AmusedLoraLoaderMixin 类
                "StableDiffusionLoraLoaderMixin",  # 包含 StableDiffusionLoraLoaderMixin 类
                "SD3LoraLoaderMixin",  # 包含 SD3LoraLoaderMixin 类
                "StableDiffusionXLLoraLoaderMixin",  # 包含 StableDiffusionXLLoraLoaderMixin 类
                "LoraLoaderMixin",  # 包含 LoraLoaderMixin 类
                "FluxLoraLoaderMixin",  # 包含 FluxLoraLoaderMixin 类
            ]
            # 将 "textual_inversion" 模块的导入结构更新为包含 TextualInversionLoaderMixin 类
            _import_structure["textual_inversion"] = ["TextualInversionLoaderMixin"]
            # 将 "ip_adapter" 模块的导入结构更新为包含 IPAdapterMixin 类
            _import_structure["ip_adapter"] = ["IPAdapterMixin"]
# 将 "peft" 模块的结构信息映射到包含的类
_import_structure["peft"] = ["PeftAdapterMixin"]

# 检查是否在类型检查模式或慢导入标志被设置
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
    # 检查 PyTorch 是否可用
    if is_torch_available():
        # 从单文件模型导入混合类
        from .single_file_model import FromOriginalModelMixin
        # 从 UNet 导入混合类
        from .unet import UNet2DConditionLoadersMixin
        # 从工具模块导入 Attention 处理层类
        from .utils import AttnProcsLayers

        # 检查 Transformers 是否可用
        if is_transformers_available():
            # 从 IP 适配器模块导入混合类
            from .ip_adapter import IPAdapterMixin
            # 从 Lora 管道模块导入多个加载器混合类
            from .lora_pipeline import (
                AmusedLoraLoaderMixin,
                FluxLoraLoaderMixin,
                LoraLoaderMixin,
                SD3LoraLoaderMixin,
                StableDiffusionLoraLoaderMixin,
                StableDiffusionXLLoraLoaderMixin,
            )
            # 从单文件模块导入混合类
            from .single_file import FromSingleFileMixin
            # 从文本反演模块导入混合类
            from .textual_inversion import TextualInversionLoaderMixin

    # 从 PEFT 模块导入 PeftAdapterMixin 类
    from .peft import PeftAdapterMixin
else:
    # 如果不在类型检查或慢导入状态,导入系统模块
    import sys

    # 使用懒加载模块,将当前模块替换为 _LazyModule 实例
    sys.modules[__name__] = _LazyModule(__name__, globals()["__file__"], _import_structure, module_spec=__spec__)

.\diffusers\models\activations.py

# coding=utf-8  # 指定文件编码为 UTF-8
# Copyright 2024 HuggingFace Inc.  # 版权声明
#
# Licensed under the Apache License, Version 2.0 (the "License");  # 许可声明
# you may not use this file except in compliance with the License.  # 使用文件的合规性声明
# You may obtain a copy of the License at  # 许可证获取说明
#
#     http://www.apache.org/licenses/LICENSE-2.0  # 许可证的 URL
#
# Unless required by applicable law or agreed to in writing, software  # 免责声明
# distributed under the License is distributed on an "AS IS" BASIS,  # 以“现状”基础分发
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  # 不提供任何明示或暗示的保证
# See the License for the specific language governing permissions and  # 查看许可证了解权限
# limitations under the License.  # 许可证下的限制条款

import torch  # 导入 PyTorch 库
import torch.nn.functional as F  # 导入 PyTorch 的功能性 API
from torch import nn  # 从 PyTorch 导入神经网络模块

from ..utils import deprecate  # 从 utils 导入 deprecate 方法
from ..utils.import_utils import is_torch_npu_available  # 从 utils 导入检查 NPU 可用性的函数


if is_torch_npu_available():  # 如果 NPU 可用
    import torch_npu  # 导入 NPU 库

# 定义一个字典,映射激活函数名称到相应的 PyTorch 激活函数
ACTIVATION_FUNCTIONS = {
    "swish": nn.SiLU(),  # Swish 激活函数
    "silu": nn.SiLU(),  # SiLU 激活函数
    "mish": nn.Mish(),  # Mish 激活函数
    "gelu": nn.GELU(),  # GELU 激活函数
    "relu": nn.ReLU(),  # ReLU 激活函数
}

# 获取激活函数的帮助函数
def get_activation(act_fn: str) -> nn.Module:  # 定义函数,接受激活函数名称
    """Helper function to get activation function from string.  # 文档字符串,说明功能

    Args:  # 参数说明
        act_fn (str): Name of activation function.  # 激活函数名称

    Returns:  # 返回值说明
        nn.Module: Activation function.  # 返回对应的激活函数模块
    """

    act_fn = act_fn.lower()  # 将激活函数名称转换为小写
    if act_fn in ACTIVATION_FUNCTIONS:  # 如果激活函数在字典中
        return ACTIVATION_FUNCTIONS[act_fn]  # 返回对应的激活函数
    else:  # 否则
        raise ValueError(f"Unsupported activation function: {act_fn}")  # 抛出不支持的激活函数错误


class FP32SiLU(nn.Module):  # 定义 FP32SiLU 类,继承自 nn.Module
    r"""  # 文档字符串,描述该类
    SiLU activation function with input upcasted to torch.float32.  # SiLU 激活函数,输入转换为 float32
    """

    def __init__(self):  # 初始化方法
        super().__init__()  # 调用父类构造函数

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:  # 定义前向传播方法
        return F.silu(inputs.float(), inplace=False).to(inputs.dtype)  # 将输入转换为 float32,计算 SiLU,返回原数据类型


class GELU(nn.Module):  # 定义 GELU 类,继承自 nn.Module
    r"""  # 文档字符串,描述该类
    GELU activation function with tanh approximation support with `approximate="tanh"`.  # GELU 激活函数,支持 tanh 近似

    Parameters:  # 参数说明
        dim_in (`int`): The number of channels in the input.  # 输入通道数
        dim_out (`int`): The number of channels in the output.  # 输出通道数
        approximate (`str`, *optional*, defaults to `"none"`): If `"tanh"`, use tanh approximation.  # 是否使用 tanh 近似
        bias (`bool`, defaults to True): Whether to use a bias in the linear layer.  # 是否在线性层中使用偏置
    """

    def __init__(self, dim_in: int, dim_out: int, approximate: str = "none", bias: bool = True):  # 初始化方法
        super().__init__()  # 调用父类构造函数
        self.proj = nn.Linear(dim_in, dim_out, bias=bias)  # 创建线性层
        self.approximate = approximate  # 设置近似参数

    def gelu(self, gate: torch.Tensor) -> torch.Tensor:  # 定义 GELU 方法
        if gate.device.type != "mps":  # 如果设备不是 MPS
            return F.gelu(gate, approximate=self.approximate)  # 计算并返回 GELU
        # mps: gelu is not implemented for float16  # 对于 MPS,float16 不支持 GELU
        return F.gelu(gate.to(dtype=torch.float32), approximate=self.approximate).to(dtype=gate.dtype)  # 转换为 float32 计算 GELU,返回原数据类型

    def forward(self, hidden_states):  # 定义前向传播方法
        hidden_states = self.proj(hidden_states)  # 通过线性层处理隐藏状态
        hidden_states = self.gelu(hidden_states)  # 计算 GELU 激活
        return hidden_states  # 返回激活后的隐藏状态


class GEGLU(nn.Module):  # 定义 GEGLU 类,继承自 nn.Module
    r"""  # 文档字符串,描述该类
    A [variant](https://arxiv.org/abs/2002.05202) of the gated linear unit activation function.  # GEGLU 激活函数的变种
    # 参数说明
    Parameters:
        dim_in (`int`): 输入通道的数量。
        dim_out (`int`): 输出通道的数量。
        bias (`bool`, defaults to True): 是否在线性层中使用偏置。

    # 初始化方法,设置输入和输出通道,及偏置选项
    def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
        # 调用父类的初始化方法
        super().__init__()
        # 创建一个线性层,输入通道为 dim_in,输出通道为 dim_out * 2
        self.proj = nn.Linear(dim_in, dim_out * 2, bias=bias)

    # GELU 激活函数的实现
    def gelu(self, gate: torch.Tensor) -> torch.Tensor:
        # 检查当前设备类型是否为 MPS
        if gate.device.type != "mps":
            # 如果不是 MPS,直接返回 GELU 的计算结果
            return F.gelu(gate)
        # 对于 MPS:GELU 未对 float16 实现
        # 将 gate 转换为 float32 计算 GELU,然后再转换回原始数据类型
        return F.gelu(gate.to(dtype=torch.float32)).to(dtype=gate.dtype)

    # 前向传播方法
    def forward(self, hidden_states, *args, **kwargs):
        # 如果传入额外参数或 kwargs 中包含 scale,给出弃用提示
        if len(args) > 0 or kwargs.get("scale", None) is not None:
            # 弃用提示信息,告知用户 scale 参数将被忽略
            deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`."
            # 调用弃用函数,显示警告
            deprecate("scale", "1.0.0", deprecation_message)
        # 将隐藏状态通过线性层进行变换
        hidden_states = self.proj(hidden_states)
        # 检查是否可用 NPU
        if is_torch_npu_available():
            # 使用 torch_npu.npu_geglu 可以在 NPU 上更快且节省内存
            return torch_npu.npu_geglu(hidden_states, dim=-1, approximate=1)[0]
        else:
            # 将隐藏状态分为两部分:hidden_states 和 gate
            hidden_states, gate = hidden_states.chunk(2, dim=-1)
            # 返回 hidden_states 与 gate 的 GELU 结果的乘积
            return hidden_states * self.gelu(gate)
# 定义一个名为 SwiGLU 的类,继承自 nn.Module
class SwiGLU(nn.Module):
    r"""
    A [variant](https://arxiv.org/abs/2002.05202) of the gated linear unit activation function. It's similar to `GEGLU`
    but uses SiLU / Swish instead of GeLU.

    Parameters:
        dim_in (`int`): The number of channels in the input.
        dim_out (`int`): The number of channels in the output.
        bias (`bool`, defaults to True): Whether to use a bias in the linear layer.
    """

    # 初始化方法,接受输入和输出的维度及偏置参数
    def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
        # 调用父类构造函数
        super().__init__()
        # 定义一个线性层,将输入通道映射到输出通道的两倍
        self.proj = nn.Linear(dim_in, dim_out * 2, bias=bias)
        # 使用 SiLU 激活函数
        self.activation = nn.SiLU()

    # 前向传播方法
    def forward(self, hidden_states):
        # 通过线性层处理输入
        hidden_states = self.proj(hidden_states)
        # 将处理后的输出拆分为两部分
        hidden_states, gate = hidden_states.chunk(2, dim=-1)
        # 返回激活后的输出和门控的乘积
        return hidden_states * self.activation(gate)


# 定义一个名为 ApproximateGELU 的类,继承自 nn.Module
class ApproximateGELU(nn.Module):
    r"""
    The approximate form of the Gaussian Error Linear Unit (GELU). For more details, see section 2 of this
    [paper](https://arxiv.org/abs/1606.08415).

    Parameters:
        dim_in (`int`): The number of channels in the input.
        dim_out (`int`): The number of channels in the output.
        bias (`bool`, defaults to True): Whether to use a bias in the linear layer.
    """

    # 初始化方法,接受输入和输出的维度及偏置参数
    def __init__(self, dim_in: int, dim_out: int, bias: bool = True):
        # 调用父类构造函数
        super().__init__()
        # 定义一个线性层,将输入通道映射到输出通道
        self.proj = nn.Linear(dim_in, dim_out, bias=bias)

    # 前向传播方法
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # 通过线性层处理输入
        x = self.proj(x)
        # 返回经过 sigmoid 函数调节后的输出
        return x * torch.sigmoid(1.702 * x)

.\diffusers\models\adapter.py

# 版权所有 2022 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版("许可证")进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,依据许可证分发的软件
# 是以“原样”基础提供的,不附带任何形式的担保或条件,
# 无论是明示或暗示的。
# 有关许可证下特定权限和限制的具体信息,请参见许可证。
import os  # 导入操作系统模块以处理文件和目录
from typing import Callable, List, Optional, Union  # 导入类型注解以增强代码可读性

import torch  # 导入 PyTorch 库以进行张量运算和深度学习
import torch.nn as nn  # 导入 PyTorch 的神经网络模块以构建模型

from ..configuration_utils import ConfigMixin, register_to_config  # 从父目录导入配置相关工具
from ..utils import logging  # 从父目录导入日志记录工具
from .modeling_utils import ModelMixin  # 从当前目录导入模型混合工具

logger = logging.get_logger(__name__)  # 创建一个记录器实例,用于日志记录

class MultiAdapter(ModelMixin):  # 定义 MultiAdapter 类,继承自 ModelMixin
    r"""  # 类文档字符串,描述类的功能和用途
    MultiAdapter 是一个包装模型,包含多个适配器模型,并根据
    用户分配的权重合并它们的输出。

    该模型继承自 [`ModelMixin`]。有关库实现的所有模型的通用方法的文档
    (例如下载或保存等),请查看超类文档。

    参数:
        adapters (`List[T2IAdapter]`, *可选*, 默认为 None):
            一个 `T2IAdapter` 模型实例的列表。
    """
    # 初始化 MultiAdapter 类,接受一组适配器
        def __init__(self, adapters: List["T2IAdapter"]):
            # 调用父类的初始化方法
            super(MultiAdapter, self).__init__()
    
            # 计算适配器的数量
            self.num_adapter = len(adapters)
            # 将适配器列表转换为 PyTorch 的 ModuleList
            self.adapters = nn.ModuleList(adapters)
    
            # 检查适配器数量,至少需要一个
            if len(adapters) == 0:
                raise ValueError("Expecting at least one adapter")
    
            # 检查适配器数量,如果只有一个,建议使用 T2IAdapter
            if len(adapters) == 1:
                raise ValueError("For a single adapter, please use the `T2IAdapter` class instead of `MultiAdapter`")
    
            # 获取第一个适配器的总缩放因子
            first_adapter_total_downscale_factor = adapters[0].total_downscale_factor
            # 获取第一个适配器的缩放因子
            first_adapter_downscale_factor = adapters[0].downscale_factor
            # 遍历剩余的适配器,检查它们的缩放因子
            for idx in range(1, len(adapters)):
                if (
                    adapters[idx].total_downscale_factor != first_adapter_total_downscale_factor
                    or adapters[idx].downscale_factor != first_adapter_downscale_factor
                ):
                    # 如果缩放因子不一致,抛出错误
                    raise ValueError(
                        f"Expecting all adapters to have the same downscaling behavior, but got:\n"
                        f"adapters[0].total_downscale_factor={first_adapter_total_downscale_factor}\n"
                        f"adapters[0].downscale_factor={first_adapter_downscale_factor}\n"
                        f"adapter[`{idx}`].total_downscale_factor={adapters[idx].total_downscale_factor}\n"
                        f"adapter[`{idx}`].downscale_factor={adapters[idx].downscale_factor}"
                    )
    
            # 设置 MultiAdapter 的总缩放因子
            self.total_downscale_factor = first_adapter_total_downscale_factor
            # 设置 MultiAdapter 的缩放因子
            self.downscale_factor = first_adapter_downscale_factor
    # 定义前向传播方法,接受输入张量和可选的适配器权重
    def forward(self, xs: torch.Tensor, adapter_weights: Optional[List[float]] = None) -> List[torch.Tensor]:
        r"""
        Args:
            xs (`torch.Tensor`):
                (batch, channel, height, width) 输入的图像张量,多个适配器模型沿维度 1 连接,
                `channel` 应等于 `num_adapter` * "图像的通道数"。
            adapter_weights (`List[float]`, *optional*, defaults to None):
                表示在将每个适配器的输出相加之前,要乘以的权重的浮点数列表。
        """
        # 如果没有提供适配器权重,则初始化为每个适配器权重相等的张量
        if adapter_weights is None:
            adapter_weights = torch.tensor([1 / self.num_adapter] * self.num_adapter)
        else:
            # 将提供的适配器权重转换为张量
            adapter_weights = torch.tensor(adapter_weights)

        # 初始化累计状态为 None,用于存储加权特征
        accume_state = None
        # 遍历输入张量、适配器权重和适配器模型
        for x, w, adapter in zip(xs, adapter_weights, self.adapters):
            # 使用适配器模型处理输入张量以提取特征
            features = adapter(x)
            # 如果累计状态为空,初始化它
            if accume_state is None:
                accume_state = features
                # 根据当前适配器的权重调整累计状态的特征
                for i in range(len(accume_state)):
                    accume_state[i] = w * accume_state[i]
            else:
                # 如果累计状态已经存在,将新特征加到累计状态中
                for i in range(len(features)):
                    accume_state[i] += w * features[i]
        # 返回加权后的累计状态
        return accume_state

    # 定义保存预训练模型的方法,接收多个参数
    def save_pretrained(
        self,
        save_directory: Union[str, os.PathLike],
        is_main_process: bool = True,
        save_function: Callable = None,
        safe_serialization: bool = True,
        variant: Optional[str] = None,
    ):
        """
        保存模型及其配置文件到指定目录,以便后续通过 `[`~models.adapter.MultiAdapter.from_pretrained`]` 类方法重新加载。

        参数:
            save_directory (`str` 或 `os.PathLike`):
                要保存的目录。如果目录不存在,则会创建。
            is_main_process (`bool`, *可选*, 默认为 `True`):
                调用此函数的进程是否为主进程。在分布式训练(如 TPU)中有用,仅在主进程上设置 `is_main_process=True` 以避免竞争条件。
            save_function (`Callable`):
                用于保存状态字典的函数。在分布式训练(如 TPU)时,可以用其他方法替换 `torch.save`。可通过环境变量 `DIFFUSERS_SAVE_MODE` 配置。
            safe_serialization (`bool`, *可选*, 默认为 `True`):
                是否使用 `safetensors` 保存模型,或使用传统的 PyTorch 方法(使用 `pickle`)。
            variant (`str`, *可选*):
                如果指定,权重将以 pytorch_model.<variant>.bin 格式保存。
        """
        # 初始化索引为 0
        idx = 0
        # 设置保存模型的路径
        model_path_to_save = save_directory
        # 遍历所有适配器
        for adapter in self.adapters:
            # 调用适配器的保存方法,将模型及其配置保存到指定路径
            adapter.save_pretrained(
                model_path_to_save,
                is_main_process=is_main_process,
                save_function=save_function,
                safe_serialization=safe_serialization,
                variant=variant,
            )

            # 索引加一,用于下一个模型路径
            idx += 1
            # 更新模型保存路径,添加索引
            model_path_to_save = model_path_to_save + f"_{idx}"

    # 定义类方法装饰器
    @classmethod
# 定义一个 T2IAdapter 类,继承自 ModelMixin 和 ConfigMixin
class T2IAdapter(ModelMixin, ConfigMixin):
    r"""
    一个简单的类似 ResNet 的模型,接受包含控制信号(如关键姿态和深度)的图像。该模型
    生成多个特征图,作为 [`UNet2DConditionModel`] 的额外条件。模型的架构遵循
    [Adapter](https://github.com/TencentARC/T2I-Adapter/blob/686de4681515662c0ac2ffa07bf5dda83af1038a/ldm/modules/encoders/adapter.py#L97)
    和
    [AdapterLight](https://github.com/TencentARC/T2I-Adapter/blob/686de4681515662c0ac2ffa07bf5dda83af1038a/ldm/modules/encoders/adapter.py#L235) 的原始实现。

    该模型继承自 [`ModelMixin`]。有关库为所有模型实现的通用方法(如下载或保存等),请查看超类文档。

    参数:
        in_channels (`int`, *可选*, 默认为 3):
            Adapter 输入的通道数(*控制图像*)。如果使用灰度图像作为 *控制图像*,请将此参数设置为 1。
        channels (`List[int]`, *可选*, 默认为 `(320, 640, 1280, 1280)`):
            每个下采样块输出隐藏状态的通道数。`len(block_out_channels)` 还将决定 Adapter 中下采样块的数量。
        num_res_blocks (`int`, *可选*, 默认为 2):
            每个下采样块中的 ResNet 块数。
        downscale_factor (`int`, *可选*, 默认为 8):
            决定 Adapter 总体下采样因子的因素。
        adapter_type (`str`, *可选*, 默认为 `full_adapter`):
            要使用的 Adapter 类型。选择 `full_adapter`、`full_adapter_xl` 或 `light_adapter`。
    """

    # 注册初始化函数到配置中
    @register_to_config
    def __init__(
        self,
        in_channels: int = 3,  # 设置输入通道数,默认为 3
        channels: List[int] = [320, 640, 1280, 1280],  # 设置每个下采样块的输出通道数,默认为给定列表
        num_res_blocks: int = 2,  # 设置每个下采样块中的 ResNet 块数,默认为 2
        downscale_factor: int = 8,  # 设置下采样因子,默认为 8
        adapter_type: str = "full_adapter",  # 设置 Adapter 类型,默认为 'full_adapter'
    ):
        super().__init__()  # 调用父类的初始化方法

        # 根据 adapter_type 的值实例化相应的 Adapter
        if adapter_type == "full_adapter":
            self.adapter = FullAdapter(in_channels, channels, num_res_blocks, downscale_factor)  # 实例化 FullAdapter
        elif adapter_type == "full_adapter_xl":
            self.adapter = FullAdapterXL(in_channels, channels, num_res_blocks, downscale_factor)  # 实例化 FullAdapterXL
        elif adapter_type == "light_adapter":
            self.adapter = LightAdapter(in_channels, channels, num_res_blocks, downscale_factor)  # 实例化 LightAdapter
        else:
            raise ValueError(  # 如果 adapter_type 不合法,抛出异常
                f"Unsupported adapter_type: '{adapter_type}'. Choose either 'full_adapter' or "
                "'full_adapter_xl' or 'light_adapter'."  # 提示支持的 Adapter 类型
            )
    # 定义前向传播函数,接收一个张量 x,并返回特征张量列表
    def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
        # 文档字符串,描述该函数的功能和输出
        r"""
        该函数通过适配器模型处理输入张量 `x`,并返回特征张量列表,
        每个张量表示从输入中提取的不同尺度的信息。列表的长度由
        在初始化时指定的 `channels` 和 `num_res_blocks` 参数中的
        下采样块数量决定。
        """
        # 调用适配器的前向方法,处理输入张量并返回结果
        return self.adapter(x)
    
    # 定义属性 total_downscale_factor,返回适配器的总下采样因子
    @property
    def total_downscale_factor(self):
        # 返回适配器的总下采样因子
        return self.adapter.total_downscale_factor
    
    # 定义属性 downscale_factor,表示初始像素无序操作中的下采样因子
    @property
    def downscale_factor(self):
        # 文档字符串,描述下采样因子的作用和可能的异常情况
        """在 T2I-Adapter 的初始像素无序操作中应用的下采样因子。如果输入图像的维度
        不能被下采样因子整除,则会引发异常。
        """
        # 返回适配器无序操作中的下采样因子
        return self.adapter.unshuffle.downscale_factor
# 全适配器类
class FullAdapter(nn.Module):
    r"""
    详细信息请参见 [`T2IAdapter`]。
    """

    # 初始化方法,设置输入通道、通道列表、残差块数量和下采样因子
    def __init__(
        self,
        in_channels: int = 3,
        channels: List[int] = [320, 640, 1280, 1280],
        num_res_blocks: int = 2,
        downscale_factor: int = 8,
    ):
        # 调用父类初始化方法
        super().__init__()

        # 根据下采样因子计算输入通道数
        in_channels = in_channels * downscale_factor**2

        # 创建像素反混洗层
        self.unshuffle = nn.PixelUnshuffle(downscale_factor)
        # 创建输入卷积层
        self.conv_in = nn.Conv2d(in_channels, channels[0], kernel_size=3, padding=1)

        # 创建适配器块列表
        self.body = nn.ModuleList(
            [
                # 添加第一个适配器块
                AdapterBlock(channels[0], channels[0], num_res_blocks),
                # 添加后续适配器块,带下采样
                *[
                    AdapterBlock(channels[i - 1], channels[i], num_res_blocks, down=True)
                    for i in range(1, len(channels))
                ],
            ]
        )

        # 计算总的下采样因子
        self.total_downscale_factor = downscale_factor * 2 ** (len(channels) - 1)

    # 前向传播方法
    def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
        r"""
        该方法通过 FullAdapter 模型处理输入张量 `x`,执行像素反混洗、卷积和适配器块堆栈的操作。
        返回一个特征张量列表,每个张量在处理的不同阶段捕获信息。特征张量的数量由初始化时指定的下采样块数量决定。
        """
        # 反混洗输入张量
        x = self.unshuffle(x)
        # 通过输入卷积层处理
        x = self.conv_in(x)

        # 初始化特征列表
        features = []

        # 遍历适配器块并处理输入
        for block in self.body:
            x = block(x)
            # 将特征添加到列表中
            features.append(x)

        # 返回特征列表
        return features


# 全适配器 XL 类
class FullAdapterXL(nn.Module):
    r"""
    详细信息请参见 [`T2IAdapter`]。
    """

    # 初始化方法,设置输入通道、通道列表、残差块数量和下采样因子
    def __init__(
        self,
        in_channels: int = 3,
        channels: List[int] = [320, 640, 1280, 1280],
        num_res_blocks: int = 2,
        downscale_factor: int = 16,
    ):
        # 调用父类初始化方法
        super().__init__()

        # 根据下采样因子计算输入通道数
        in_channels = in_channels * downscale_factor**2

        # 创建像素反混洗层
        self.unshuffle = nn.PixelUnshuffle(downscale_factor)
        # 创建输入卷积层
        self.conv_in = nn.Conv2d(in_channels, channels[0], kernel_size=3, padding=1)

        # 初始化适配器块列表
        self.body = []
        # 遍历通道列表,创建适配器块
        for i in range(len(channels)):
            if i == 1:
                # 为第二个通道添加适配器块
                self.body.append(AdapterBlock(channels[i - 1], channels[i], num_res_blocks))
            elif i == 2:
                # 为第三个通道添加带下采样的适配器块
                self.body.append(AdapterBlock(channels[i - 1], channels[i], num_res_blocks, down=True))
            else:
                # 为其他通道添加适配器块
                self.body.append(AdapterBlock(channels[i], channels[i], num_res_blocks))

        # 将适配器块列表转换为 ModuleList
        self.body = nn.ModuleList(self.body)
        # XL 只有一个下采样适配器块
        self.total_downscale_factor = downscale_factor * 2
    # 定义一个前向传播方法,输入为张量 x,返回特征张量的列表
    def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
        r"""
        该方法接受张量 x 作为输入,并通过 FullAdapterXL 模型处理。包括像素解混淆、应用卷积层,并将每个块追加到特征张量列表中。
        """
        # 对输入张量进行像素解混淆操作
        x = self.unshuffle(x)
        # 将解混淆后的张量通过输入卷积层
        x = self.conv_in(x)
    
        # 初始化一个空列表以存储特征张量
        features = []
    
        # 遍历模型主体中的每个块
        for block in self.body:
            # 将当前张量通过块处理
            x = block(x)
            # 将处理后的张量追加到特征列表中
            features.append(x)
    
        # 返回特征张量的列表
        return features
# AdapterBlock 类是一个辅助模型,包含多个类似 ResNet 的模块,用于 FullAdapter 和 FullAdapterXL 模型
class AdapterBlock(nn.Module):
    r"""
    AdapterBlock 是一个包含多个 ResNet 样式块的辅助模型。它在 `FullAdapter` 和
    `FullAdapterXL` 模型中使用。

    参数:
        in_channels (`int`):
            AdapterBlock 输入的通道数。
        out_channels (`int`):
            AdapterBlock 输出的通道数。
        num_res_blocks (`int`):
            AdapterBlock 中 ResNet 块的数量。
        down (`bool`, *可选*, 默认为 `False`):
            是否对 AdapterBlock 的输入进行下采样。
    """

    # 初始化 AdapterBlock 类,接收输入输出通道数及 ResNet 块数量
    def __init__(self, in_channels: int, out_channels: int, num_res_blocks: int, down: bool = False):
        super().__init__()  # 调用父类的构造函数

        self.downsample = None  # 初始化下采样层为 None
        if down:  # 如果需要下采样
            self.downsample = nn.AvgPool2d(kernel_size=2, stride=2, ceil_mode=True)  # 创建平均池化层

        self.in_conv = None  # 初始化输入卷积层为 None
        if in_channels != out_channels:  # 如果输入通道与输出通道不相等
            self.in_conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)  # 创建 1x1 卷积层

        # 创建一系列的 ResNet 块,数量由 num_res_blocks 指定
        self.resnets = nn.Sequential(
            *[AdapterResnetBlock(out_channels) for _ in range(num_res_blocks)],
        )

    # 定义前向传播方法
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        r"""
        此方法接收张量 x 作为输入,执行下采样和卷积操作(如果指定了 self.downsample 和 self.in_conv)。
        然后,它对输入张量应用一系列残差块。
        """
        if self.downsample is not None:  # 如果存在下采样层
            x = self.downsample(x)  # 执行下采样操作

        if self.in_conv is not None:  # 如果存在输入卷积层
            x = self.in_conv(x)  # 执行卷积操作

        x = self.resnets(x)  # 将输入传递通过一系列 ResNet 块

        return x  # 返回处理后的张量


# AdapterResnetBlock 类是一个实现 ResNet 样式块的辅助模型
class AdapterResnetBlock(nn.Module):
    r"""
    `AdapterResnetBlock` 是一个实现 ResNet 样式块的辅助模型。

    参数:
        channels (`int`):
            AdapterResnetBlock 输入和输出的通道数。
    """

    # 初始化 AdapterResnetBlock 类,接收通道数
    def __init__(self, channels: int):
        super().__init__()  # 调用父类的构造函数
        self.block1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)  # 创建 3x3 卷积层
        self.act = nn.ReLU()  # 创建 ReLU 激活函数
        self.block2 = nn.Conv2d(channels, channels, kernel_size=1)  # 创建 1x1 卷积层

    # 定义前向传播方法
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        r"""
        此方法接收输入张量 x,并对其应用卷积层、ReLU 激活和另一个卷积层。返回与输入张量的相加结果。
        """

        h = self.act(self.block1(x))  # 先通过第一个卷积层并应用激活函数
        h = self.block2(h)  # 再通过第二个卷积层

        return h + x  # 返回卷积结果与输入的和


# LightAdapter 类是一个轻量适配器模型
class LightAdapter(nn.Module):
    r"""
    有关更多信息,请参阅 [`T2IAdapter`]。
    """

    # 初始化 LightAdapter 类,设置输入通道数、通道列表、ResNet 块数量及下采样因子
    def __init__(
        self,
        in_channels: int = 3,
        channels: List[int] = [320, 640, 1280],
        num_res_blocks: int = 4,
        downscale_factor: int = 8,
    # 初始化方法
        ):
            # 调用父类的初始化方法
            super().__init__()
    
            # 计算输入通道数,考虑下采样因子
            in_channels = in_channels * downscale_factor**2
    
            # 初始化像素反shuffle操作,依据下采样因子
            self.unshuffle = nn.PixelUnshuffle(downscale_factor)
    
            # 创建一个模块列表,包含多个 LightAdapterBlock
            self.body = nn.ModuleList(
                [
                    # 第一个 LightAdapterBlock,处理输入通道到第一个输出通道
                    LightAdapterBlock(in_channels, channels[0], num_res_blocks),
                    # 使用列表推导创建后续的 LightAdapterBlock,处理每对通道
                    *[
                        LightAdapterBlock(channels[i], channels[i + 1], num_res_blocks, down=True)
                        for i in range(len(channels) - 1)
                    ],
                    # 最后一个 LightAdapterBlock,处理最后一组通道
                    LightAdapterBlock(channels[-1], channels[-1], num_res_blocks, down=True),
                ]
            )
    
            # 计算总下采样因子
            self.total_downscale_factor = downscale_factor * (2 ** len(channels))
    
        # 前向传播方法
        def forward(self, x: torch.Tensor) -> List[torch.Tensor]:
            r"""
            此方法接收输入张量 x,进行下采样,并将结果存入特征张量列表中。每个特征张量对应于 LightAdapter 中的不同处理级别。
            """
            # 对输入张量进行反shuffle处理
            x = self.unshuffle(x)
    
            # 初始化特征列表
            features = []
    
            # 遍历模块列表中的每个块
            for block in self.body:
                # 通过当前块处理输入张量
                x = block(x)
                # 将处理后的张量添加到特征列表中
                features.append(x)
    
            # 返回特征列表
            return features
# LightAdapterBlock 类是一个帮助模型,包含多个 LightAdapterResnetBlocks,用于 LightAdapter 模型中
class LightAdapterBlock(nn.Module):
    r"""
    A `LightAdapterBlock` is a helper model that contains multiple `LightAdapterResnetBlocks`. It is used in the
    `LightAdapter` model.

    Parameters:
        in_channels (`int`):
            Number of channels of LightAdapterBlock's input.
        out_channels (`int`):
            Number of channels of LightAdapterBlock's output.
        num_res_blocks (`int`):
            Number of LightAdapterResnetBlocks in the LightAdapterBlock.
        down (`bool`, *optional*, defaults to `False`):
            Whether to perform downsampling on LightAdapterBlock's input.
    """

    # 初始化方法,接收输入输出通道数、残差块数量和是否下采样的标志
    def __init__(self, in_channels: int, out_channels: int, num_res_blocks: int, down: bool = False):
        super().__init__()  # 调用父类构造函数
        mid_channels = out_channels // 4  # 计算中间通道数

        self.downsample = None  # 初始化下采样层为 None
        if down:  # 如果需要下采样
            # 创建平均池化层,kernel_size为2,步幅为2,向上取整
            self.downsample = nn.AvgPool2d(kernel_size=2, stride=2, ceil_mode=True)

        # 定义输入卷积层,kernel_size为1
        self.in_conv = nn.Conv2d(in_channels, mid_channels, kernel_size=1)
        # 创建残差块序列,数量为 num_res_blocks
        self.resnets = nn.Sequential(*[LightAdapterResnetBlock(mid_channels) for _ in range(num_res_blocks)])
        # 定义输出卷积层,kernel_size为1
        self.out_conv = nn.Conv2d(mid_channels, out_channels, kernel_size=1)

    # 前向传播方法,接收输入张量 x
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        r"""
        This method takes tensor x as input and performs downsampling if required. Then it applies in convolution
        layer, a sequence of residual blocks, and out convolutional layer.
        """
        if self.downsample is not None:  # 如果定义了下采样层
            x = self.downsample(x)  # 对输入 x 进行下采样

        x = self.in_conv(x)  # 通过输入卷积层处理 x
        x = self.resnets(x)  # 通过残差块序列处理 x
        x = self.out_conv(x)  # 通过输出卷积层处理 x

        return x  # 返回处理后的结果


# LightAdapterResnetBlock 类是一个帮助模型,实现类似 ResNet 的块,具有与 AdapterResnetBlock 略微不同的架构
class LightAdapterResnetBlock(nn.Module):
    """
    A `LightAdapterResnetBlock` is a helper model that implements a ResNet-like block with a slightly different
    architecture than `AdapterResnetBlock`.

    Parameters:
        channels (`int`):
            Number of channels of LightAdapterResnetBlock's input and output.
    """

    # 初始化方法,接收通道数
    def __init__(self, channels: int):
        super().__init__()  # 调用父类构造函数
        # 定义第一个卷积层,kernel_size为3,padding为1
        self.block1 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)
        self.act = nn.ReLU()  # 定义 ReLU 激活函数
        # 定义第二个卷积层,kernel_size为3,padding为1
        self.block2 = nn.Conv2d(channels, channels, kernel_size=3, padding=1)

    # 前向传播方法,接收输入张量 x
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        r"""
        This function takes input tensor x and processes it through one convolutional layer, ReLU activation, and
        another convolutional layer and adds it to input tensor.
        """
        h = self.act(self.block1(x))  # 通过第一个卷积层和 ReLU 激活处理 x
        h = self.block2(h)  # 通过第二个卷积层处理 h

        return h + x  # 将处理结果与输入 x 相加并返回

.\diffusers\models\attention.py

# 版权所有 2024 The HuggingFace Team. 保留所有权利。
#
# 根据 Apache 许可证,第 2.0 版(“许可证”)许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下位置获取许可证副本:
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件
# 按“原样”分发,不附有任何明示或暗示的担保或条件。
# 请参见许可证以获取有关权限和
# 限制的具体语言。
from typing import Any, Dict, List, Optional, Tuple  # 导入所需的类型注解

import torch  # 导入 PyTorch 库
import torch.nn.functional as F  # 导入 PyTorch 的函数式 API
from torch import nn  # 从 PyTorch 导入神经网络模块

from ..utils import deprecate, logging  # 从上级目录导入工具函数和日志记录模块
from ..utils.torch_utils import maybe_allow_in_graph  # 导入可能允许图形计算的工具
from .activations import GEGLU, GELU, ApproximateGELU, FP32SiLU, SwiGLU  # 导入不同激活函数
from .attention_processor import Attention, JointAttnProcessor2_0  # 导入注意力处理器
from .embeddings import SinusoidalPositionalEmbedding  # 导入正弦位置嵌入
from .normalization import AdaLayerNorm, AdaLayerNormContinuous, AdaLayerNormZero, RMSNorm  # 导入各种归一化方法


logger = logging.get_logger(__name__)  # 创建一个模块级别的日志记录器


def _chunked_feed_forward(ff: nn.Module, hidden_states: torch.Tensor, chunk_dim: int, chunk_size: int):
    # 定义一个函数,按块处理前馈神经网络,以节省内存
    # 检查隐藏状态的维度是否能够被块大小整除
    if hidden_states.shape[chunk_dim] % chunk_size != 0:
        raise ValueError(
            f"`hidden_states` dimension to be chunked: {hidden_states.shape[chunk_dim]} has to be divisible by chunk size: {chunk_size}. Make sure to set an appropriate `chunk_size` when calling `unet.enable_forward_chunking`."
        )  # 如果不能整除,抛出一个值错误,提示块大小设置不正确

    num_chunks = hidden_states.shape[chunk_dim] // chunk_size  # 计算可以生成的块数量
    # 按块处理隐藏状态并将结果拼接成一个张量
    ff_output = torch.cat(
        [ff(hid_slice) for hid_slice in hidden_states.chunk(num_chunks, dim=chunk_dim)],  # 将每个块传入前馈模块进行处理
        dim=chunk_dim,  # 在指定维度上拼接结果
    )
    return ff_output  # 返回拼接后的输出


@maybe_allow_in_graph  # 可能允许在图形计算中使用此类
class GatedSelfAttentionDense(nn.Module):  # 定义一个门控自注意力密集层类,继承自 nn.Module
    r"""  # 类文档字符串,描述类的功能和参数

    A gated self-attention dense layer that combines visual features and object features.

    Parameters:
        query_dim (`int`): The number of channels in the query.  # 查询的通道数
        context_dim (`int`): The number of channels in the context.  # 上下文的通道数
        n_heads (`int`): The number of heads to use for attention.  # 注意力头的数量
        d_head (`int`): The number of channels in each head.  # 每个头的通道数
    """

    def __init__(self, query_dim: int, context_dim: int, n_heads: int, d_head: int):
        super().__init__()  # 调用父类构造函数

        # 因为需要拼接视觉特征和对象特征,所以需要一个线性投影
        self.linear = nn.Linear(context_dim, query_dim)  # 创建一个线性层,用于上下文到查询的映射

        self.attn = Attention(query_dim=query_dim, heads=n_heads, dim_head=d_head)  # 初始化注意力层
        self.ff = FeedForward(query_dim, activation_fn="geglu")  # 初始化前馈层,激活函数为 GEGLU

        self.norm1 = nn.LayerNorm(query_dim)  # 初始化第一个层归一化
        self.norm2 = nn.LayerNorm(query_dim)  # 初始化第二个层归一化

        self.register_parameter("alpha_attn", nn.Parameter(torch.tensor(0.0)))  # 注册注意力参数 alpha_attn
        self.register_parameter("alpha_dense", nn.Parameter(torch.tensor(0.0)))  # 注册密集参数 alpha_dense

        self.enabled = True  # 设置 enabled 属性为 True
    # 前向传播函数,接收输入张量 x 和对象张量 objs,返回处理后的张量
        def forward(self, x: torch.Tensor, objs: torch.Tensor) -> torch.Tensor:
            # 如果未启用该模块,直接返回输入张量 x
            if not self.enabled:
                return x
    
            # 获取输入张量的第二维大小,表示视觉特征的数量
            n_visual = x.shape[1]
            # 通过线性层处理对象张量
            objs = self.linear(objs)
    
            # 将输入张量和处理后的对象张量拼接,进行归一化后计算注意力,并调整输入张量
            x = x + self.alpha_attn.tanh() * self.attn(self.norm1(torch.cat([x, objs], dim=1)))[:, :n_visual, :]
            # 使用另一个归一化层处理 x,并通过前馈网络调整 x
            x = x + self.alpha_dense.tanh() * self.ff(self.norm2(x))
    
            # 返回处理后的张量 x
            return x
# 装饰器,可能允许在计算图中使用此类
@maybe_allow_in_graph
# 定义一个联合变换器块类,继承自 nn.Module
class JointTransformerBlock(nn.Module):
    r"""
    根据 MMDiT 架构定义的变换器块,介绍于 Stable Diffusion 3.

    参考文献: https://arxiv.org/abs/2403.03206

    参数:
        dim (`int`): 输入和输出中的通道数量.
        num_attention_heads (`int`): 用于多头注意力的头数.
        attention_head_dim (`int`): 每个头中的通道数.
        context_pre_only (`bool`): 布尔值,决定是否添加与处理 `context` 条件相关的一些块.
    """

    # 初始化函数,定义参数
    def __init__(self, dim, num_attention_heads, attention_head_dim, context_pre_only=False):
        # 调用父类构造函数
        super().__init__()

        # 记录是否仅处理上下文
        self.context_pre_only = context_pre_only
        # 根据上下文类型设置归一化类型
        context_norm_type = "ada_norm_continous" if context_pre_only else "ada_norm_zero"

        # 创建一个适应性层归一化对象
        self.norm1 = AdaLayerNormZero(dim)

        # 根据上下文归一化类型创建相应的归一化对象
        if context_norm_type == "ada_norm_continous":
            self.norm1_context = AdaLayerNormContinuous(
                dim, dim, elementwise_affine=False, eps=1e-6, bias=True, norm_type="layer_norm"
            )
        elif context_norm_type == "ada_norm_zero":
            self.norm1_context = AdaLayerNormZero(dim)
        else:
            # 如果归一化类型未知,抛出错误
            raise ValueError(
                f"Unknown context_norm_type: {context_norm_type}, currently only support `ada_norm_continous`, `ada_norm_zero`"
            )
        # 检查是否有缩放点积注意力函数
        if hasattr(F, "scaled_dot_product_attention"):
            processor = JointAttnProcessor2_0()  # 使用相应的处理器
        else:
            # 如果不支持,抛出错误
            raise ValueError(
                "The current PyTorch version does not support the `scaled_dot_product_attention` function."
            )
        # 初始化注意力模块
        self.attn = Attention(
            query_dim=dim,
            cross_attention_dim=None,
            added_kv_proj_dim=dim,
            dim_head=attention_head_dim,
            heads=num_attention_heads,
            out_dim=dim,
            context_pre_only=context_pre_only,
            bias=True,
            processor=processor,
        )

        # 创建层归一化对象
        self.norm2 = nn.LayerNorm(dim, elementwise_affine=False, eps=1e-6)
        # 创建前馈网络对象
        self.ff = FeedForward(dim=dim, dim_out=dim, activation_fn="gelu-approximate")

        # 如果不是仅处理上下文,则创建上下文的归一化和前馈网络
        if not context_pre_only:
            self.norm2_context = nn.LayerNorm(dim, elementwise_affine=False, eps=1e-6)
            self.ff_context = FeedForward(dim=dim, dim_out=dim, activation_fn="gelu-approximate")
        else:
            # 如果仅处理上下文,则设置为 None
            self.norm2_context = None
            self.ff_context = None

        # 将块大小默认设置为 None
        self._chunk_size = None
        # 设置块维度为 0
        self._chunk_dim = 0

    # 从基本变换器块复制的方法,设置块的前馈网络
    def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0):
        # 设置块的前馈网络大小
        self._chunk_size = chunk_size
        # 设置块维度
        self._chunk_dim = dim
    # 定义前向传播函数,接受隐藏状态、编码器隐藏状态和时间嵌入作为输入
        def forward(
            self, hidden_states: torch.FloatTensor, encoder_hidden_states: torch.FloatTensor, temb: torch.FloatTensor
        ):
            # 对隐藏状态进行归一化,并计算门控、多头自注意力和MLP的偏移和缩放值
            norm_hidden_states, gate_msa, shift_mlp, scale_mlp, gate_mlp = self.norm1(hidden_states, emb=temb)
    
            # 判断是否仅使用上下文信息
            if self.context_pre_only:
                # 仅归一化编码器隐藏状态
                norm_encoder_hidden_states = self.norm1_context(encoder_hidden_states, temb)
            else:
                # 对编码器隐藏状态进行归一化,并计算相应的门控和偏移值
                norm_encoder_hidden_states, c_gate_msa, c_shift_mlp, c_scale_mlp, c_gate_mlp = self.norm1_context(
                    encoder_hidden_states, emb=temb
                )
    
            # 进行注意力计算
            attn_output, context_attn_output = self.attn(
                hidden_states=norm_hidden_states, encoder_hidden_states=norm_encoder_hidden_states
            )
    
            # 处理注意力输出以更新隐藏状态
            attn_output = gate_msa.unsqueeze(1) * attn_output  # 应用门控机制
            hidden_states = hidden_states + attn_output  # 更新隐藏状态
    
            # 对隐藏状态进行第二次归一化
            norm_hidden_states = self.norm2(hidden_states)
            # 结合缩放和偏移进行调整
            norm_hidden_states = norm_hidden_states * (1 + scale_mlp[:, None]) + shift_mlp[:, None]
            # 如果设置了分块大小,则进行分块前馈处理以节省内存
            if self._chunk_size is not None:
                ff_output = _chunked_feed_forward(self.ff, norm_hidden_states, self._chunk_dim, self._chunk_size)
            else:
                # 否则直接执行前馈操作
                ff_output = self.ff(norm_hidden_states)
            # 应用门控机制更新前馈输出
            ff_output = gate_mlp.unsqueeze(1) * ff_output
    
            # 更新隐藏状态
            hidden_states = hidden_states + ff_output
    
            # 处理编码器隐藏状态的注意力输出
            if self.context_pre_only:
                # 如果仅使用上下文,则编码器隐藏状态设为 None
                encoder_hidden_states = None
            else:
                # 应用门控机制更新上下文注意力输出
                context_attn_output = c_gate_msa.unsqueeze(1) * context_attn_output
                encoder_hidden_states = encoder_hidden_states + context_attn_output  # 更新编码器隐藏状态
    
                # 对编码器隐藏状态进行第二次归一化
                norm_encoder_hidden_states = self.norm2_context(encoder_hidden_states)
                # 结合缩放和偏移进行调整
                norm_encoder_hidden_states = norm_encoder_hidden_states * (1 + c_scale_mlp[:, None]) + c_shift_mlp[:, None]
                # 如果设置了分块大小,则进行分块前馈处理以节省内存
                if self._chunk_size is not None:
                    context_ff_output = _chunked_feed_forward(
                        self.ff_context, norm_encoder_hidden_states, self._chunk_dim, self._chunk_size
                    )
                else:
                    # 否则直接执行前馈操作
                    context_ff_output = self.ff_context(norm_encoder_hidden_states)
                # 应用门控机制更新编码器隐藏状态
                encoder_hidden_states = encoder_hidden_states + c_gate_mlp.unsqueeze(1) * context_ff_output
    
            # 返回更新后的编码器隐藏状态和隐藏状态
            return encoder_hidden_states, hidden_states
# 装饰器,可能允许在计算图中使用此类
@maybe_allow_in_graph
# 定义一个基本的 Transformer 块,继承自 nn.Module
class BasicTransformerBlock(nn.Module):
    r"""
    一个基本的 Transformer 块。

    参数:
        dim (`int`): 输入和输出中的通道数。
        num_attention_heads (`int`): 用于多头注意力的头数。
        attention_head_dim (`int`): 每个头的通道数。
        dropout (`float`, *可选*, 默认为 0.0): 使用的丢弃概率。
        cross_attention_dim (`int`, *可选*): 用于交叉注意力的 encoder_hidden_states 向量的大小。
        activation_fn (`str`, *可选*, 默认为 `"geglu"`): 在前馈中使用的激活函数。
        num_embeds_ada_norm (:
            obj: `int`, *可选*): 在训练期间使用的扩散步骤数量。参见 `Transformer2DModel`。
        attention_bias (:
            obj: `bool`, *可选*, 默认为 `False`): 配置注意力是否应该包含偏置参数。
        only_cross_attention (`bool`, *可选*):
            是否仅使用交叉注意力层。在这种情况下使用两个交叉注意力层。
        double_self_attention (`bool`, *可选*):
            是否使用两个自注意力层。在这种情况下不使用交叉注意力层。
        upcast_attention (`bool`, *可选*):
            是否将注意力计算上溯到 float32。这对于混合精度训练很有用。
        norm_elementwise_affine (`bool`, *可选*, 默认为 `True`):
            是否为归一化使用可学习的逐元素仿射参数。
        norm_type (`str`, *可选*, 默认为 `"layer_norm"`):
            要使用的归一化层。可以是 `"layer_norm"`、`"ada_norm"` 或 `"ada_norm_zero"`。
        final_dropout (`bool`, *可选*, 默认为 False):
            是否在最后的前馈层之后应用最终的丢弃。
        attention_type (`str`, *可选*, 默认为 `"default"`):
            要使用的注意力类型。可以是 `"default"`、`"gated"` 或 `"gated-text-image"`。
        positional_embeddings (`str`, *可选*, 默认为 `None`):
            要应用的位置嵌入的类型。
        num_positional_embeddings (`int`, *可选*, 默认为 `None`):
            要应用的最大位置嵌入数量。
    """
    # 初始化方法,设置模型参数
        def __init__(
            self,
            dim: int,  # 模型维度
            num_attention_heads: int,  # 注意力头的数量
            attention_head_dim: int,  # 每个注意力头的维度
            dropout=0.0,  # dropout 概率
            cross_attention_dim: Optional[int] = None,  # 交叉注意力维度,默认为 None
            activation_fn: str = "geglu",  # 激活函数类型,默认为 'geglu'
            num_embeds_ada_norm: Optional[int] = None,  # 自适应规范化的嵌入数量
            attention_bias: bool = False,  # 是否使用注意力偏置
            only_cross_attention: bool = False,  # 是否仅使用交叉注意力
            double_self_attention: bool = False,  # 是否双重自注意力
            upcast_attention: bool = False,  # 是否提升注意力精度
            norm_elementwise_affine: bool = True,  # 是否进行逐元素仿射规范化
            norm_type: str = "layer_norm",  # 规范化类型,支持多种类型
            norm_eps: float = 1e-5,  # 规范化的 epsilon 值
            final_dropout: bool = False,  # 最终层的 dropout 开关
            attention_type: str = "default",  # 注意力机制类型,默认为 'default'
            positional_embeddings: Optional[str] = None,  # 位置嵌入的类型
            num_positional_embeddings: Optional[int] = None,  # 位置嵌入的数量
            ada_norm_continous_conditioning_embedding_dim: Optional[int] = None,  # 自适应规范化连续条件嵌入维度
            ada_norm_bias: Optional[int] = None,  # 自适应规范化偏置
            ff_inner_dim: Optional[int] = None,  # 前馈网络的内部维度
            ff_bias: bool = True,  # 前馈网络是否使用偏置
            attention_out_bias: bool = True,  # 输出注意力是否使用偏置
        def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0):  # 设置分块前馈网络的方法
            # 设置分块前馈网络的大小和维度
            self._chunk_size = chunk_size  # 存储分块大小
            self._chunk_dim = dim  # 存储维度
    
        def forward(  # 前向传播方法
            self,
            hidden_states: torch.Tensor,  # 输入的隐藏状态
            attention_mask: Optional[torch.Tensor] = None,  # 注意力掩码,默认为 None
            encoder_hidden_states: Optional[torch.Tensor] = None,  # 编码器的隐藏状态
            encoder_attention_mask: Optional[torch.Tensor] = None,  # 编码器的注意力掩码
            timestep: Optional[torch.LongTensor] = None,  # 时间步长
            cross_attention_kwargs: Dict[str, Any] = None,  # 交叉注意力的参数
            class_labels: Optional[torch.LongTensor] = None,  # 类别标签
            added_cond_kwargs: Optional[Dict[str, torch.Tensor]] = None,  # 添加的条件参数
# 定义一个前馈层的类,继承自 nn.Module
class LuminaFeedForward(nn.Module):
    r"""
    一个前馈层。

    参数:
        hidden_size (`int`):
            模型隐藏层的维度。该参数决定了模型隐藏表示的宽度。
        intermediate_size (`int`): 前馈层的中间维度。
        multiple_of (`int`, *optional*): 确保隐藏维度是该值的倍数。
        ffn_dim_multiplier (float, *optional*): 自定义的隐藏维度乘数。默认为 None。
    """

    # 初始化方法,接受多个参数
    def __init__(
        self,
        dim: int,
        inner_dim: int,
        multiple_of: Optional[int] = 256,
        ffn_dim_multiplier: Optional[float] = None,
    ):
        # 调用父类的初始化方法
        super().__init__()
        # 将 inner_dim 调整为原来的 2/3
        inner_dim = int(2 * inner_dim / 3)
        # 如果提供了 ffn_dim_multiplier,则调整 inner_dim
        if ffn_dim_multiplier is not None:
            inner_dim = int(ffn_dim_multiplier * inner_dim)
        # 将 inner_dim 调整为 multiple_of 的倍数
        inner_dim = multiple_of * ((inner_dim + multiple_of - 1) // multiple_of)

        # 创建第一个线性层,从 dim 到 inner_dim,不使用偏置
        self.linear_1 = nn.Linear(
            dim,
            inner_dim,
            bias=False,
        )
        # 创建第二个线性层,从 inner_dim 到 dim,不使用偏置
        self.linear_2 = nn.Linear(
            inner_dim,
            dim,
            bias=False,
        )
        # 创建第三个线性层,从 dim 到 inner_dim,不使用偏置
        self.linear_3 = nn.Linear(
            dim,
            inner_dim,
            bias=False,
        )
        # 初始化 SiLU 激活函数
        self.silu = FP32SiLU()

    # 前向传播方法,定义模型的前向计算逻辑
    def forward(self, x):
        # 依次通过线性层和激活函数计算输出
        return self.linear_2(self.silu(self.linear_1(x)) * self.linear_3(x))


# 用于图中可能允许的装饰器定义一个基本的变换器块类
@maybe_allow_in_graph
class TemporalBasicTransformerBlock(nn.Module):
    r"""
    针对视频数据的基本变换器块。

    参数:
        dim (`int`): 输入和输出中的通道数。
        time_mix_inner_dim (`int`): 用于时间注意力的通道数。
        num_attention_heads (`int`): 多头注意力使用的头数。
        attention_head_dim (`int`): 每个头中的通道数。
        cross_attention_dim (`int`, *optional*): 用于交叉注意力的 encoder_hidden_states 向量大小。
    """

    # 初始化方法,接受多个参数
    def __init__(
        self,
        dim: int,
        time_mix_inner_dim: int,
        num_attention_heads: int,
        attention_head_dim: int,
        cross_attention_dim: Optional[int] = None,
    ):
        # 初始化父类
        super().__init__()
        # 判断是否为时间混合内部维度,设置标志
        self.is_res = dim == time_mix_inner_dim

        # 创建输入层归一化层
        self.norm_in = nn.LayerNorm(dim)

        # 定义三个模块,每个模块都有自己的归一化层
        # 1. 自注意力模块
        # 创建前馈神经网络,输入维度和输出维度设置为时间混合内部维度,激活函数为 GEGLU
        self.ff_in = FeedForward(
            dim,
            dim_out=time_mix_inner_dim,
            activation_fn="geglu",
        )

        # 创建第一个归一化层
        self.norm1 = nn.LayerNorm(time_mix_inner_dim)
        # 创建自注意力层,设置查询维度、头数和头维度
        self.attn1 = Attention(
            query_dim=time_mix_inner_dim,
            heads=num_attention_heads,
            dim_head=attention_head_dim,
            cross_attention_dim=None,
        )

        # 2. 交叉注意力模块
        # 检查交叉注意力维度是否为 None
        if cross_attention_dim is not None:
            # 当前仅在自注意力中使用 AdaLayerNormZero
            # 第二个交叉注意力模块返回的调制块数量没有意义
            self.norm2 = nn.LayerNorm(time_mix_inner_dim)
            # 创建交叉注意力层,设置查询维度和交叉注意力维度
            self.attn2 = Attention(
                query_dim=time_mix_inner_dim,
                cross_attention_dim=cross_attention_dim,
                heads=num_attention_heads,
                dim_head=attention_head_dim,
            )  # 如果 encoder_hidden_states 为 None,则为自注意力
        else:
            # 如果没有交叉注意力,归一化层和注意力层设置为 None
            self.norm2 = None
            self.attn2 = None

        # 3. 前馈神经网络模块
        # 创建第二个归一化层
        self.norm3 = nn.LayerNorm(time_mix_inner_dim)
        # 创建前馈神经网络,输入维度为时间混合内部维度,激活函数为 GEGLU
        self.ff = FeedForward(time_mix_inner_dim, activation_fn="geglu")

        # 让块大小默认为 None
        self._chunk_size = None
        # 让块维度默认为 None
        self._chunk_dim = None

    # 设置块前馈的方法,接受可选的块大小和其他参数
    def set_chunk_feed_forward(self, chunk_size: Optional[int], **kwargs):
        # 设置块前馈的块大小
        self._chunk_size = chunk_size
        # 块维度硬编码为 1,以获得更好的速度与内存平衡
        self._chunk_dim = 1

    # 前向传播方法,接受隐藏状态和帧数以及可选的编码器隐藏状态
    def forward(
        self,
        hidden_states: torch.Tensor,
        num_frames: int,
        encoder_hidden_states: Optional[torch.Tensor] = None,
    ) -> torch.Tensor:
        # 注意归一化始终在后续计算之前应用
        # 0. 自注意力
        # 获取批次大小,通常是隐藏状态的第一个维度
        batch_size = hidden_states.shape[0]

        # 获取批次帧数、序列长度和通道数
        batch_frames, seq_length, channels = hidden_states.shape
        # 根据帧数计算新的批次大小
        batch_size = batch_frames // num_frames

        # 调整隐藏状态形状以适应新批次大小和帧数
        hidden_states = hidden_states[None, :].reshape(batch_size, num_frames, seq_length, channels)
        # 改变维度顺序以便后续操作
        hidden_states = hidden_states.permute(0, 2, 1, 3)
        # 重新调整形状为(batch_size * seq_length, num_frames, channels)
        hidden_states = hidden_states.reshape(batch_size * seq_length, num_frames, channels)

        # 保存残差以便后续使用
        residual = hidden_states
        # 对隐藏状态应用输入归一化
        hidden_states = self.norm_in(hidden_states)

        # 如果存在分块大小,则使用分块前馈函数
        if self._chunk_size is not None:
            hidden_states = _chunked_feed_forward(self.ff_in, hidden_states, self._chunk_dim, self._chunk_size)
        else:
            # 否则,直接应用前馈函数
            hidden_states = self.ff_in(hidden_states)

        # 如果使用残差连接,则将残差添加回隐藏状态
        if self.is_res:
            hidden_states = hidden_states + residual

        # 对隐藏状态进行归一化
        norm_hidden_states = self.norm1(hidden_states)
        # 计算自注意力输出
        attn_output = self.attn1(norm_hidden_states, encoder_hidden_states=None)
        # 将自注意力输出与隐藏状态相加
        hidden_states = attn_output + hidden_states

        # 3. 交叉注意力
        # 如果存在第二个注意力层,则计算交叉注意力
        if self.attn2 is not None:
            norm_hidden_states = self.norm2(hidden_states)
            attn_output = self.attn2(norm_hidden_states, encoder_hidden_states=encoder_hidden_states)
            # 将交叉注意力输出与隐藏状态相加
            hidden_states = attn_output + hidden_states

        # 4. 前馈
        # 对隐藏状态进行归一化
        norm_hidden_states = self.norm3(hidden_states)

        # 如果存在分块大小,则使用分块前馈函数
        if self._chunk_size is not None:
            ff_output = _chunked_feed_forward(self.ff, norm_hidden_states, self._chunk_dim, self._chunk_size)
        else:
            # 否则,直接应用前馈函数
            ff_output = self.ff(norm_hidden_states)

        # 如果使用残差连接,则将前馈输出与隐藏状态相加
        if self.is_res:
            hidden_states = ff_output + hidden_states
        else:
            # 否则,仅使用前馈输出
            hidden_states = ff_output

        # 调整隐藏状态形状以适应新批次大小和帧数
        hidden_states = hidden_states[None, :].reshape(batch_size, seq_length, num_frames, channels)
        # 改变维度顺序以便后续操作
        hidden_states = hidden_states.permute(0, 2, 1, 3)
        # 重新调整形状为(batch_size * num_frames, seq_length, channels)
        hidden_states = hidden_states.reshape(batch_size * num_frames, seq_length, channels)

        # 返回处理后的隐藏状态
        return hidden_states
# 定义一个 SkipFFTransformerBlock 类,继承自 nn.Module
class SkipFFTransformerBlock(nn.Module):
    # 初始化方法,接收多个参数来设置层的属性
    def __init__(
        self,
        dim: int,  # 输入的特征维度
        num_attention_heads: int,  # 注意力头的数量
        attention_head_dim: int,  # 每个注意力头的维度
        kv_input_dim: int,  # 键值对输入的维度
        kv_input_dim_proj_use_bias: bool,  # 是否在 KV 映射中使用偏置
        dropout=0.0,  # dropout 比率
        cross_attention_dim: Optional[int] = None,  # 交叉注意力的维度,可选
        attention_bias: bool = False,  # 是否使用注意力偏置
        attention_out_bias: bool = True,  # 是否使用输出的注意力偏置
    ):
        # 调用父类的初始化方法
        super().__init__()
        # 如果 KV 输入维度与特征维度不一致,则定义 KV 映射层
        if kv_input_dim != dim:
            self.kv_mapper = nn.Linear(kv_input_dim, dim, kv_input_dim_proj_use_bias)
        else:
            self.kv_mapper = None  # 否则不使用 KV 映射

        # 定义第一个归一化层
        self.norm1 = RMSNorm(dim, 1e-06)

        # 定义第一个注意力层
        self.attn1 = Attention(
            query_dim=dim,  # 查询的维度
            heads=num_attention_heads,  # 注意力头数量
            dim_head=attention_head_dim,  # 每个头的维度
            dropout=dropout,  # dropout 比率
            bias=attention_bias,  # 是否使用注意力偏置
            cross_attention_dim=cross_attention_dim,  # 交叉注意力的维度
            out_bias=attention_out_bias,  # 输出是否使用偏置
        )

        # 定义第二个归一化层
        self.norm2 = RMSNorm(dim, 1e-06)

        # 定义第二个注意力层
        self.attn2 = Attention(
            query_dim=dim,  # 查询的维度
            cross_attention_dim=cross_attention_dim,  # 交叉注意力的维度
            heads=num_attention_heads,  # 注意力头数量
            dim_head=attention_head_dim,  # 每个头的维度
            dropout=dropout,  # dropout 比率
            bias=attention_bias,  # 是否使用注意力偏置
            out_bias=attention_out_bias,  # 输出是否使用偏置
        )

    # 前向传播方法
    def forward(self, hidden_states, encoder_hidden_states, cross_attention_kwargs):
        # 复制交叉注意力的参数,如果没有则初始化为空字典
        cross_attention_kwargs = cross_attention_kwargs.copy() if cross_attention_kwargs is not None else {}

        # 如果存在 KV 映射层,则对编码器的隐藏状态进行映射
        if self.kv_mapper is not None:
            encoder_hidden_states = self.kv_mapper(F.silu(encoder_hidden_states))

        # 对输入的隐藏状态进行归一化
        norm_hidden_states = self.norm1(hidden_states)

        # 计算第一个注意力层的输出
        attn_output = self.attn1(
            norm_hidden_states,  # 归一化后的隐藏状态
            encoder_hidden_states=encoder_hidden_states,  # 编码器的隐藏状态
            **cross_attention_kwargs,  # 其他交叉注意力的参数
        )

        # 更新隐藏状态
        hidden_states = attn_output + hidden_states

        # 对更新后的隐藏状态进行第二次归一化
        norm_hidden_states = self.norm2(hidden_states)

        # 计算第二个注意力层的输出
        attn_output = self.attn2(
            norm_hidden_states,  # 归一化后的隐藏状态
            encoder_hidden_states=encoder_hidden_states,  # 编码器的隐藏状态
            **cross_attention_kwargs,  # 其他交叉注意力的参数
        )

        # 更新隐藏状态
        hidden_states = attn_output + hidden_states

        # 返回最终的隐藏状态
        return hidden_states


# 定义一个 FreeNoiseTransformerBlock 类,继承自 nn.Module
@maybe_allow_in_graph
class FreeNoiseTransformerBlock(nn.Module):
    r"""
    A FreeNoise Transformer block.  # FreeNoise Transformer 块的文档字符串

    """
    # 初始化方法,设置模型的各种参数
        def __init__(
            # 模型的维度
            self,
            dim: int,
            # 注意力头的数量
            num_attention_heads: int,
            # 每个注意力头的维度
            attention_head_dim: int,
            # dropout 概率,默认为 0.0
            dropout: float = 0.0,
            # 交叉注意力的维度,默认为 None
            cross_attention_dim: Optional[int] = None,
            # 激活函数的名称,默认为 "geglu"
            activation_fn: str = "geglu",
            # 自适应归一化的嵌入数量,默认为 None
            num_embeds_ada_norm: Optional[int] = None,
            # 是否使用注意力偏差,默认为 False
            attention_bias: bool = False,
            # 是否仅使用交叉注意力,默认为 False
            only_cross_attention: bool = False,
            # 是否使用双重自注意力,默认为 False
            double_self_attention: bool = False,
            # 是否上溯注意力,默认为 False
            upcast_attention: bool = False,
            # 归一化是否使用逐元素仿射,默认为 True
            norm_elementwise_affine: bool = True,
            # 归一化的类型,默认为 "layer_norm"
            norm_type: str = "layer_norm",
            # 归一化的 epsilon 值,默认为 1e-5
            norm_eps: float = 1e-5,
            # 最终是否使用 dropout,默认为 False
            final_dropout: bool = False,
            # 位置嵌入的类型,默认为 None
            positional_embeddings: Optional[str] = None,
            # 位置嵌入的数量,默认为 None
            num_positional_embeddings: Optional[int] = None,
            # 前馈网络内部维度,默认为 None
            ff_inner_dim: Optional[int] = None,
            # 前馈网络是否使用偏差,默认为 True
            ff_bias: bool = True,
            # 注意力输出是否使用偏差,默认为 True
            attention_out_bias: bool = True,
            # 上下文长度,默认为 16
            context_length: int = 16,
            # 上下文步幅,默认为 4
            context_stride: int = 4,
            # 权重方案,默认为 "pyramid"
            weighting_scheme: str = "pyramid",
        # 获取帧索引的方法,返回一对帧索引的列表
        def _get_frame_indices(self, num_frames: int) -> List[Tuple[int, int]]:
            # 初始化帧索引列表
            frame_indices = []
            # 遍历所有帧,步幅为上下文步幅
            for i in range(0, num_frames - self.context_length + 1, self.context_stride):
                # 当前窗口的起始帧
                window_start = i
                # 当前窗口的结束帧,确保不超过总帧数
                window_end = min(num_frames, i + self.context_length)
                # 将窗口索引添加到列表
                frame_indices.append((window_start, window_end))
            # 返回帧索引列表
            return frame_indices
    
        # 获取帧权重的方法,返回权重列表
        def _get_frame_weights(self, num_frames: int, weighting_scheme: str = "pyramid") -> List[float]:
            # 如果权重方案为 "pyramid"
            if weighting_scheme == "pyramid":
                # 判断帧数是否为偶数
                if num_frames % 2 == 0:
                    # 生成偶数帧的权重列表
                    weights = list(range(1, num_frames // 2 + 1))
                    # 反转并连接权重列表
                    weights = weights + weights[::-1]
                else:
                    # 生成奇数帧的权重列表
                    weights = list(range(1, num_frames // 2 + 1))
                    # 添加中间权重并反转连接
                    weights = weights + [num_frames // 2 + 1] + weights[::-1]
            else:
                # 抛出不支持的权重方案错误
                raise ValueError(f"Unsupported value for weighting_scheme={weighting_scheme}")
    
            # 返回权重列表
            return weights
    
        # 设置自由噪声属性的方法,无返回值
        def set_free_noise_properties(
            self, context_length: int, context_stride: int, weighting_scheme: str = "pyramid"
        ) -> None:
            # 设置上下文长度
            self.context_length = context_length
            # 设置上下文步幅
            self.context_stride = context_stride
            # 设置权重方案
            self.weighting_scheme = weighting_scheme
    
        # 设置块前馈的方法,无返回值
        def set_chunk_feed_forward(self, chunk_size: Optional[int], dim: int = 0) -> None:
            # 设置块大小和维度
            self._chunk_size = chunk_size
            self._chunk_dim = dim
    
        # 前向传播方法,处理输入隐藏状态
        def forward(
            self,
            # 输入的隐藏状态张量
            hidden_states: torch.Tensor,
            # 可选的注意力掩码
            attention_mask: Optional[torch.Tensor] = None,
            # 可选的编码器隐藏状态
            encoder_hidden_states: Optional[torch.Tensor] = None,
            # 可选的编码器注意力掩码
            encoder_attention_mask: Optional[torch.Tensor] = None,
            # 交叉注意力的额外参数
            cross_attention_kwargs: Dict[str, Any] = None,
            # 可变参数
            *args,
            # 关键字参数
            **kwargs,
# 定义一个前馈层类,继承自 nn.Module
class FeedForward(nn.Module):
    r"""
    前馈层。

    参数:
        dim (`int`): 输入的通道数。
        dim_out (`int`, *可选*): 输出的通道数。如果未给定,默认为 `dim`。
        mult (`int`, *可选*, 默认为 4): 用于隐藏维度的乘数。
        dropout (`float`, *可选*, 默认为 0.0): 使用的 dropout 概率。
        activation_fn (`str`, *可选*, 默认为 `"geglu"`): 前馈中使用的激活函数。
        final_dropout (`bool` *可选*, 默认为 False): 是否应用最终的 dropout。
        bias (`bool`, 默认为 True): 是否在线性层中使用偏置。
    """

    # 初始化方法
    def __init__(
        self,
        dim: int,  # 输入通道数
        dim_out: Optional[int] = None,  # 输出通道数(可选)
        mult: int = 4,  # 隐藏维度乘数
        dropout: float = 0.0,  # dropout 概率
        activation_fn: str = "geglu",  # 激活函数类型
        final_dropout: bool = False,  # 是否应用最终 dropout
        inner_dim=None,  # 隐藏层维度(可选)
        bias: bool = True,  # 是否使用偏置
    ):
        # 调用父类构造函数
        super().__init__()
        # 如果未指定 inner_dim,则计算为 dim 乘以 mult
        if inner_dim is None:
            inner_dim = int(dim * mult)
        # 如果未指定 dim_out,则设置为 dim
        dim_out = dim_out if dim_out is not None else dim

        # 根据选择的激活函数创建对应的激活层
        if activation_fn == "gelu":
            act_fn = GELU(dim, inner_dim, bias=bias)  # GELU 激活函数
        if activation_fn == "gelu-approximate":
            act_fn = GELU(dim, inner_dim, approximate="tanh", bias=bias)  # 近似 GELU
        elif activation_fn == "geglu":
            act_fn = GEGLU(dim, inner_dim, bias=bias)  # GEGLU 激活函数
        elif activation_fn == "geglu-approximate":
            act_fn = ApproximateGELU(dim, inner_dim, bias=bias)  # 近似 GEGLU
        elif activation_fn == "swiglu":
            act_fn = SwiGLU(dim, inner_dim, bias=bias)  # SwiGLU 激活函数

        # 初始化一个模块列表,用于存储层
        self.net = nn.ModuleList([])
        # 添加激活函数层到网络
        self.net.append(act_fn)
        # 添加 dropout 层到网络
        self.net.append(nn.Dropout(dropout))
        # 添加线性层到网络,输入为 inner_dim,输出为 dim_out
        self.net.append(nn.Linear(inner_dim, dim_out, bias=bias))
        # 如果 final_dropout 为真,则添加最终的 dropout 层
        if final_dropout:
            self.net.append(nn.Dropout(dropout))

    # 前向传播方法
    def forward(self, hidden_states: torch.Tensor, *args, **kwargs) -> torch.Tensor:
        # 检查是否有额外的参数,或是否传递了过期的 scale 参数
        if len(args) > 0 or kwargs.get("scale", None) is not None:
            deprecation_message = "The `scale` argument is deprecated and will be ignored. Please remove it, as passing it will raise an error in the future. `scale` should directly be passed while calling the underlying pipeline component i.e., via `cross_attention_kwargs`."
            deprecate("scale", "1.0.0", deprecation_message)  # 发出关于 scale 参数的弃用警告
        # 遍历网络中的每一层,依次对 hidden_states 进行处理
        for module in self.net:
            hidden_states = module(hidden_states)  # 将当前层应用于输入
        # 返回处理后的隐藏状态
        return hidden_states

标签:dim,None,int,self,diffusers,states,源码,hidden,解析
From: https://www.cnblogs.com/apachecn/p/18492373

相关文章

  • diffusers-源码解析-六-
    diffusers源码解析(六).\diffusers\models\autoencoders\autoencoder_oobleck.py#版权声明,表示该代码属于HuggingFace团队,所有权利保留#根据Apache2.0许可证进行授权#用户在合规的情况下可以使用该文件#许可证的获取地址#如果没有适用的法律或书面协议,软件是按“现......
  • diffusers-源码解析-九-
    diffusers源码解析(九).\diffusers\models\embeddings_flax.py#Copyright2024TheHuggingFaceTeam.Allrightsreserved.##LicensedundertheApacheLicense,Version2.0(the"License");#youmaynotusethisfileexceptincompliancewiththe......
  • diffusers-源码解析-二十一-
    diffusers源码解析(二十一).\diffusers\pipelines\controlnet\pipeline_controlnet.py#版权信息,指明该代码由HuggingFace团队版权所有##根据Apache2.0许可证授权,用户需遵循许可证规定使用该文件#许可证可以在以下网址获取##http://www.apache.org/licenses/L......
  • diffusers-源码解析-二十四-
    diffusers源码解析(二十四).\diffusers\pipelines\controlnet_sd3\pipeline_stable_diffusion_3_controlnet.py#版权声明,指出版权所有者及相关信息#Copyright2024StabilityAI,TheHuggingFaceTeamandTheInstantXTeam.Allrightsreserved.##按照Apache2.0许可......
  • diffusers-源码解析-二十三-
    diffusers源码解析(二十三).\diffusers\pipelines\controlnet\pipeline_controlnet_sd_xl_img2img.py#版权所有2024HuggingFace团队。保留所有权利。##根据Apache许可证第2.0版(“许可证”)许可;#除非遵守许可证,否则您不得使用此文件。#您可以在以下网址获得许可证副......
  • diffusers-源码解析-二十六-
    diffusers源码解析(二十六).\diffusers\pipelines\deepfloyd_if\pipeline_if_inpainting_superresolution.py#导入html模块,用于处理HTML文本importhtml#导入inspect模块,用于获取对象的信息importinspect#导入re模块,用于正则表达式匹配importre#导入urllib.......
  • diffusers-源码解析-二十九-
    diffusers源码解析(二十九).\diffusers\pipelines\deprecated\stable_diffusion_variants\pipeline_stable_diffusion_model_editing.py#版权信息,声明版权和许可协议#Copyright2024TIMEAuthorsandTheHuggingFaceTeam.Allrightsreserved."#根据ApacheLicense2.0......
  • diffusers-源码解析-十一-
    diffusers源码解析(十一).\diffusers\models\transformers\hunyuan_transformer_2d.py#版权所有2024HunyuanDiT作者,QixunWang和HuggingFace团队。保留所有权利。##根据Apache许可证第2.0版("许可证")进行许可;#除非符合许可证,否则您不得使用此文件。#您可以在以......
  • diffusers-源码解析-十五-
    diffusers源码解析(十五).\diffusers\models\unets\unet_3d_condition.py#版权声明,声明此代码的版权信息和所有权#Copyright2024AlibabaDAMO-VILABandTheHuggingFaceTeam.Allrightsreserved.#版权声明,声明此代码的版权信息和所有权#Copyright2024TheModelSco......
  • diffusers-源码解析-十四-
    diffusers源码解析(十四).\diffusers\models\unets\unet_2d_blocks_flax.py#版权声明,说明该文件的版权信息及相关许可协议#Copyright2024TheHuggingFaceTeam.Allrightsreserved.##许可信息,使用ApacheLicense2.0许可#LicensedundertheApacheLicense,Versi......