diffusers 源码解析(二十九)
.\diffusers\pipelines\deprecated\stable_diffusion_variants\pipeline_stable_diffusion_model_editing.py
# 版权信息,声明版权和许可协议
# Copyright 2024 TIME Authors and The HuggingFace Team. All rights reserved."
# 根据 Apache License 2.0 许可协议进行许可
# 此文件只能在遵守许可证的情况下使用
# 可通过以下网址获取许可证副本
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则软件按“现状”分发,不提供任何形式的担保或条件
# 具体许可条款和限制请参见许可证
# 导入复制模块,用于对象复制
import copy
# 导入检查模块,用于检查对象的信息
import inspect
# 导入类型提示相关的模块
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库中导入图像处理器、文本模型和标记器
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer
# 从相对路径导入自定义图像处理器
from ....image_processor import VaeImageProcessor
# 从相对路径导入自定义加载器混合类
from ....loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 从相对路径导入自定义模型类
from ....models import AutoencoderKL, UNet2DConditionModel
# 从相对路径导入调整 LoRA 规模的函数
from ....models.lora import adjust_lora_scale_text_encoder
# 从相对路径导入调度器类
from ....schedulers import PNDMScheduler
# 从调度器工具导入调度器混合类
from ....schedulers.scheduling_utils import SchedulerMixin
# 从工具库中导入多个功能模块
from ....utils import USE_PEFT_BACKEND, deprecate, logging, scale_lora_layers, unscale_lora_layers
# 从自定义的 PyTorch 工具库中导入随机张量生成函数
from ....utils.torch_utils import randn_tensor
# 从管道工具导入扩散管道和稳定扩散混合类
from ...pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 从稳定扩散相关模块导入管道输出类
from ...stable_diffusion.pipeline_output import StableDiffusionPipelineOutput
# 从稳定扩散安全检查器模块导入安全检查器类
from ...stable_diffusion.safety_checker import StableDiffusionSafetyChecker
# 创建一个日志记录器,用于记录当前模块的信息
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个常量列表,包含不同的图像描述前缀
AUGS_CONST = ["A photo of ", "An image of ", "A picture of "]
# 定义一个稳定扩散模型编辑管道类,继承自多个基类
class StableDiffusionModelEditingPipeline(
DiffusionPipeline, StableDiffusionMixin, TextualInversionLoaderMixin, StableDiffusionLoraLoaderMixin
):
r"""
文本到图像模型编辑的管道。
该模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
该管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
# 文档字符串,描述类或方法的参数
Args:
vae ([`AutoencoderKL`]):
# Variational Auto-Encoder (VAE) 模型,用于将图像编码和解码为潜在表示
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
text_encoder ([`~transformers.CLIPTextModel`]):
# 冻结的文本编码器,使用 CLIP 的大型视觉变换模型
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)).
tokenizer ([`~transformers.CLIPTokenizer`]):
# 用于文本标记化的 CLIPTokenizer
A `CLIPTokenizer` to tokenize text.
unet ([`UNet2DConditionModel`]):
# 用于去噪编码图像潜在的 UNet2DConditionModel
A `UNet2DConditionModel` to denoise the encoded image latents.
scheduler ([`SchedulerMixin`]):
# 调度器,与 unet 一起用于去噪编码的图像潜在,可以是 DDIMScheduler、LMSDiscreteScheduler 或 PNDMScheduler
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`].
safety_checker ([`StableDiffusionSafetyChecker`]):
# 分类模块,用于估计生成的图像是否可能被视为冒犯或有害
Classification module that estimates whether generated images could be considered offensive or harmful.
# 参阅模型卡以获取有关模型潜在危害的更多详细信息
Please refer to the [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) for more details
about a model's potential harms.
feature_extractor ([`~transformers.CLIPImageProcessor`]):
# 用于从生成的图像中提取特征的 CLIPImageProcessor;作为输入传递给 safety_checker
A `CLIPImageProcessor` to extract features from generated images; used as inputs to the `safety_checker`.
with_to_k ([`bool`]):
# 是否在编辑文本到图像模型时编辑键投影矩阵与值投影矩阵
Whether to edit the key projection matrices along with the value projection matrices.
with_augs ([`list`]):
# 在编辑文本到图像模型时应用的文本增强,设置为 [] 表示不进行增强
Textual augmentations to apply while editing the text-to-image model. Set to `[]` for no augmentations.
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor"]
# 定义不参与 CPU 卸载的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,设置模型和参数
def __init__(
self,
vae: AutoencoderKL,
text_encoder: CLIPTextModel,
tokenizer: CLIPTokenizer,
unet: UNet2DConditionModel,
scheduler: SchedulerMixin,
safety_checker: StableDiffusionSafetyChecker,
feature_extractor: CLIPImageProcessor,
requires_safety_checker: bool = True,
with_to_k: bool = True,
with_augs: list = AUGS_CONST,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的代码
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
lora_scale: Optional[float] = None,
**kwargs,
):
# 定义一个弃用消息,提示用户 `_encode_prompt()` 已弃用,建议使用 `encode_prompt()`,并说明输出格式变化
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数记录弃用警告,指定版本和警告信息,标准警告设置为 False
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,将参数传入以获取提示嵌入的元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 用户输入的提示文本
device=device, # 指定运行设备
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=negative_prompt, # 负面提示文本
prompt_embeds=prompt_embeds, # 现有的提示嵌入(如果有的话)
negative_prompt_embeds=negative_prompt_embeds, # 负面提示嵌入(如果有的话)
lora_scale=lora_scale, # Lora 缩放参数
**kwargs, # 额外参数
)
# 连接提示嵌入元组的两个部分,适配旧版本的兼容性
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回连接后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.encode_prompt 复制
def encode_prompt(
self,
prompt, # 用户输入的提示文本
device, # 指定运行设备
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器的引导
negative_prompt=None, # 负面提示文本,默认为 None
prompt_embeds: Optional[torch.Tensor] = None, # 现有的提示嵌入,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None, # 负面提示嵌入,默认为 None
lora_scale: Optional[float] = None, # Lora 缩放参数,默认为 None
clip_skip: Optional[int] = None, # 跳过的剪辑层,默认为 None
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制
def run_safety_checker(self, image, device, dtype): # 定义安全检查函数,接收图像、设备和数据类型
# 检查安全检查器是否存在
if self.safety_checker is None:
has_nsfw_concept = None # 如果没有安全检查器,则 NSFW 概念为 None
else:
# 如果图像是张量类型,进行后处理,转换为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果图像是 numpy 数组,直接转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 获取安全检查器的输入,转换为张量并移至指定设备
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 使用安全检查器检查图像,返回处理后的图像和 NSFW 概念
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 NSFW 概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
# 解码潜在向量的方法
def decode_latents(self, latents):
# 警告信息,提示此方法已弃用,将在 1.0.0 中移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用 deprecate 函数记录弃用信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据配置的缩放因子调整潜在向量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在向量并返回图像数据
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像数据归一化到 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 始终将图像数据转换为 float32 类型,以确保兼容性并降低开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像数据
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,不同调度器的签名不同
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器会忽略
# eta 对应于 DDIM 论文中的 η,范围应在 [0, 1] 之间
# 检查调度器步骤的参数是否接受 eta
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤关键字参数字典
extra_step_kwargs = {}
# 如果调度器接受 eta,则将其添加到字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤的参数是否接受 generator
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator,则将其添加到字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回额外步骤关键字参数字典
return extra_step_kwargs
# 检查输入参数的方法
def check_inputs(
self,
prompt,
height,
width,
callback_steps,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
callback_on_step_end_tensor_inputs=None,
):
# 检查高度和宽度是否能被8整除
if height % 8 != 0 or width % 8 != 0:
# 抛出异常,给出高度和宽度的信息
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步数是否为正整数
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 抛出异常,给出回调步数的信息
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调输入是否在预期的输入列表中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 抛出异常,列出不在预期列表中的输入
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了提示和提示嵌入
if prompt is not None and prompt_embeds is not None:
# 抛出异常,提示只能提供一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否同时未提供提示和提示嵌入
elif prompt is None and prompt_embeds is None:
# 抛出异常,提示至少要提供一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示类型是否合法
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 抛出异常,提示类型不符合要求
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了负提示和负提示嵌入
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出异常,提示只能提供一个
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入的形状是否一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 抛出异常,给出形状不一致的信息
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在变量的函数,接受多个参数以控制形状和生成方式
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,包括批量大小、通道数和调整后的高度宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批量大小一致
if isinstance(generator, list) and len(generator) != batch_size:
# 如果不一致,则抛出值错误并提供相关信息
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量为空,则生成新的随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果已提供潜在变量,则将其转移到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在变量
return latents
# 装饰器,指示该函数不需要计算梯度
@torch.no_grad()
def edit_model(
self,
source_prompt: str,
destination_prompt: str,
lamb: float = 0.1,
restart_params: bool = True,
# 装饰器,指示该函数不需要计算梯度
@torch.no_grad()
def __call__(
self,
# 允许输入字符串或字符串列表作为提示
prompt: Union[str, List[str]] = None,
# 可选参数,指定生成图像的高度
height: Optional[int] = None,
# 可选参数,指定生成图像的宽度
width: Optional[int] = None,
# 设置推理步骤的默认数量为50
num_inference_steps: int = 50,
# 设置引导比例的默认值为7.5
guidance_scale: float = 7.5,
# 可选参数,允许输入负面提示
negative_prompt: Optional[Union[str, List[str]]] = None,
# 可选参数,指定每个提示生成的图像数量,默认值为1
num_images_per_prompt: Optional[int] = 1,
# 设置eta的默认值为0.0
eta: float = 0.0,
# 可选参数,允许输入生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选参数,允许输入潜在变量的张量
latents: Optional[torch.Tensor] = None,
# 可选参数,允许输入提示嵌入的张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选参数,允许输入负面提示嵌入的张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选参数,指定输出类型,默认为"pil"
output_type: Optional[str] = "pil",
# 可选参数,控制是否返回字典形式的输出,默认为True
return_dict: bool = True,
# 可选回调函数,用于处理生成过程中每一步的信息
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 可选参数,指定回调的步骤数,默认为1
callback_steps: int = 1,
# 可选参数,允许传入交叉注意力的关键字参数
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 可选参数,允许指定跳过的clip层数
clip_skip: Optional[int] = None,
.\diffusers\pipelines\deprecated\stable_diffusion_variants\pipeline_stable_diffusion_paradigms.py
# 版权所有 2024 ParaDiGMS 作者和 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,依据许可证分发的软件
# 是按“原样”提供的,没有任何形式的明示或暗示的担保或条件。
# 有关许可证的特定权限和限制,请参见许可证。
# 导入 inspect 模块以进行对象检查
import inspect
# 从 typing 模块导入类型提示相关的类
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 PyTorch 库以进行深度学习操作
import torch
# 从 transformers 库导入 CLIP 图像处理器、文本模型和分词器
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer
# 导入自定义的图像处理器
from ....image_processor import VaeImageProcessor
# 导入与加载相关的混合类
from ....loaders import FromSingleFileMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin
# 导入用于自动编码器和条件 UNet 的模型
from ....models import AutoencoderKL, UNet2DConditionModel
# 从 lora 模块导入调整 lora 规模的函数
from ....models.lora import adjust_lora_scale_text_encoder
# 导入 Karras 扩散调度器
from ....schedulers import KarrasDiffusionSchedulers
# 导入实用工具模块中的各种功能
from ....utils import (
USE_PEFT_BACKEND, # 用于 PEFT 后端的标志
deprecate, # 用于标记弃用功能的装饰器
logging, # 日志记录功能
replace_example_docstring, # 替换示例文档字符串的功能
scale_lora_layers, # 调整 lora 层规模的功能
unscale_lora_layers, # 反调整 lora 层规模的功能
)
# 从 torch_utils 模块导入生成随机张量的功能
from ....utils.torch_utils import randn_tensor
# 导入扩散管道和稳定扩散混合类
from ...pipeline_utils import DiffusionPipeline, StableDiffusionMixin
# 导入稳定扩散管道输出类
from ...stable_diffusion.pipeline_output import StableDiffusionPipelineOutput
# 导入稳定扩散安全检查器
from ...stable_diffusion.safety_checker import StableDiffusionSafetyChecker
# 创建日志记录器,用于当前模块的日志
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,展示用法示例
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import DDPMParallelScheduler
>>> from diffusers import StableDiffusionParadigmsPipeline
>>> scheduler = DDPMParallelScheduler.from_pretrained("runwayml/stable-diffusion-v1-5", subfolder="scheduler")
>>> pipe = StableDiffusionParadigmsPipeline.from_pretrained(
... "runwayml/stable-diffusion-v1-5", scheduler=scheduler, torch_dtype=torch.float16
... )
>>> pipe = pipe.to("cuda")
>>> ngpu, batch_per_device = torch.cuda.device_count(), 5
>>> pipe.wrapped_unet = torch.nn.DataParallel(pipe.unet, device_ids=[d for d in range(ngpu)])
>>> prompt = "a photo of an astronaut riding a horse on mars"
>>> image = pipe(prompt, parallel=ngpu * batch_per_device, num_inference_steps=1000).images[0]
```py
"""
# 定义 StableDiffusionParadigmsPipeline 类,继承多个混合类以实现功能
class StableDiffusionParadigmsPipeline(
DiffusionPipeline, # 从扩散管道继承
StableDiffusionMixin, # 从稳定扩散混合类继承
TextualInversionLoaderMixin, # 从文本反转加载混合类继承
StableDiffusionLoraLoaderMixin, # 从稳定扩散 lora 加载混合类继承
FromSingleFileMixin, # 从单文件加载混合类继承
):
r"""
用于文本到图像生成的管道,使用稳定扩散的并行化版本。
此模型继承自 [`DiffusionPipeline`]。有关通用方法的文档,请查看超类文档
# 实现所有管道的功能(下载、保存、在特定设备上运行等)。
# 管道还继承以下加载方法:
# - [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
# - [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
# - [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
# - [`~loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
# 参数说明:
# vae ([`AutoencoderKL`]):
# 变分自编码器(VAE)模型,用于将图像编码和解码为潜在表示。
# text_encoder ([`~transformers.CLIPTextModel`]):
# 冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
# tokenizer ([`~transformers.CLIPTokenizer`]):
# 一个 `CLIPTokenizer` 用于对文本进行标记化。
# unet ([`UNet2DConditionModel`]):
# 一个 `UNet2DConditionModel` 用于去噪编码的图像潜在。
# scheduler ([`SchedulerMixin`]):
# 用于与 `unet` 结合使用的调度器,用于去噪编码的图像潜在。可以是
# [`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的一个。
# safety_checker ([`StableDiffusionSafetyChecker`]):
# 分类模块,估计生成的图像是否可能被认为是冒犯或有害的。
# 请参考 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) 获取更多关于模型潜在危害的详细信息。
# feature_extractor ([`~transformers.CLIPImageProcessor`]):
# 一个 `CLIPImageProcessor` 用于从生成的图像中提取特征;作为 `safety_checker` 的输入。
# 定义模型的 CPU 离线加载顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor"]
# 定义排除在 CPU 离线加载之外的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,接受多个参数
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器模型
text_encoder: CLIPTextModel, # 文本编码器
tokenizer: CLIPTokenizer, # 文本标记器
unet: UNet2DConditionModel, # UNet2D 条件模型
scheduler: KarrasDiffusionSchedulers, # 调度器
safety_checker: StableDiffusionSafetyChecker, # 安全检查模块
feature_extractor: CLIPImageProcessor, # 特征提取器
requires_safety_checker: bool = True, # 是否需要安全检查器
):
# 调用父类的初始化方法
super().__init__()
# 检查是否禁用安全检查器,并且需要安全检查器
if safety_checker is None and requires_safety_checker:
# 记录警告信息,提醒用户禁用安全检查器的风险
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 检查是否提供了安全检查器但未提供特征提取器
if safety_checker is not None and feature_extractor is None:
# 抛出错误,提示用户需要定义特征提取器
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块到当前实例
self.register_modules(
vae=vae, # 变分自编码器
text_encoder=text_encoder, # 文本编码器
tokenizer=tokenizer, # 分词器
unet=unet, # U-Net 模型
scheduler=scheduler, # 调度器
safety_checker=safety_checker, # 安全检查器
feature_extractor=feature_extractor, # 特征提取器
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 初始化图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 将是否需要安全检查器的配置注册到当前实例
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 用于在多个 GPU 上运行多个去噪步骤时,将 unet 包装为 torch.nn.DataParallel
self.wrapped_unet = self.unet
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline._encode_prompt 复制的函数
def _encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备类型(CPU或GPU)
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 可选的负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
**kwargs, # 其他可选参数
):
# 设置弃用信息,提醒用户该方法即将被移除,建议使用新方法
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数,传递弃用信息和版本号
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,获取提示的嵌入元组
prompt_embeds_tuple = self.encode_prompt(
prompt=prompt, # 输入提示文本
device=device, # 计算设备
num_images_per_prompt=num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance=do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=negative_prompt, # 负提示文本
prompt_embeds=prompt_embeds, # 提示嵌入
negative_prompt_embeds=negative_prompt_embeds, # 负提示嵌入
lora_scale=lora_scale, # LORA 缩放因子
**kwargs, # 其他额外参数
)
# 将返回的嵌入元组进行拼接,以支持向后兼容
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回拼接后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的 encode_prompt 方法
def encode_prompt(
self,
prompt, # 输入的提示文本
device, # 计算设备
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否使用无分类器引导
negative_prompt=None, # 负提示文本,默认为 None
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负提示嵌入
lora_scale: Optional[float] = None, # 可选的 LORA 缩放因子
clip_skip: Optional[int] = None, # 可选的跳过剪辑参数
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.run_safety_checker 复制的代码
def run_safety_checker(self, image, device, dtype):
# 检查是否存在安全检查器
if self.safety_checker is None:
has_nsfw_concept = None # 如果没有安全检查器,则设置无 NSFW 概念为 None
else:
# 如果输入是张量格式,则进行后处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入不是张量,转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器处理图像,返回张量形式
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 调用安全检查器,检查图像是否包含 NSFW 概念
image, has_nsfw_concept = self.safety_checker(
images=image, # 输入图像
clip_input=safety_checker_input.pixel_values.to(dtype) # 安全检查的特征输入
)
# 返回处理后的图像及是否存在 NSFW 概念
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的代码
# 定义一个方法,用于准备额外的参数以供调度器步骤使用
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的关键字参数,因为并非所有调度器都有相同的签名
# eta (η) 仅在 DDIMScheduler 中使用,对于其他调度器将被忽略
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 其值应在 [0, 1] 之间
# 检查调度器的 step 方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化一个字典以存储额外的步骤参数
extra_step_kwargs = {}
# 如果接受 eta 参数,则将其添加到字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的 step 方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator 参数,则将其添加到字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外步骤参数字典
return extra_step_kwargs
# 定义一个方法,用于检查输入参数的有效性
def check_inputs(
self,
prompt, # 输入的提示文本
height, # 图像的高度
width, # 图像的宽度
callback_steps, # 回调步骤的频率
negative_prompt=None, # 可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选的在步骤结束时的回调张量输入
):
# 检查高度和宽度是否为 8 的倍数
if height % 8 != 0 or width % 8 != 0:
# 抛出错误,如果高度或宽度不符合要求
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步数是否为正整数
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 抛出错误,如果回调步数不符合要求
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查回调结束时的张量输入是否在允许的输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 抛出错误,如果不在允许的输入中
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查是否同时提供了提示和提示嵌入
if prompt is not None and prompt_embeds is not None:
# 抛出错误,不能同时提供提示和提示嵌入
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查提示和提示嵌入是否均未定义
elif prompt is None and prompt_embeds is None:
# 抛出错误,必须提供一个提示或提示嵌入
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示的类型
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 抛出错误,如果提示不是字符串或列表
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了负提示和负提示嵌入
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出错误,不能同时提供负提示和负提示嵌入
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负提示嵌入的形状是否相同
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 抛出错误,如果形状不一致
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
# 准备潜在变量,设置其形状和属性
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,考虑批量大小、通道数和缩放因子
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果潜在变量未提供,则生成随机潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 将提供的潜在变量移动到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 计算输入张量在指定维度上的累积和
def _cumsum(self, input, dim, debug=False):
# 如果调试模式开启,则在CPU上执行累积和以确保可重复性
if debug:
# cumsum_cuda_kernel没有确定性实现,故在CPU上执行
return torch.cumsum(input.cpu().float(), dim=dim).to(input.device)
else:
# 在指定维度上直接计算累积和
return torch.cumsum(input, dim=dim)
# 调用方法,接受多种参数以生成输出
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
self,
prompt: Union[str, List[str]] = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
parallel: int = 10,
tolerance: float = 0.1,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: int = 1,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
debug: bool = False,
clip_skip: int = None,
.\diffusers\pipelines\deprecated\stable_diffusion_variants\pipeline_stable_diffusion_pix2pix_zero.py
# 版权声明,说明版权信息及持有者
# Copyright 2024 Pix2Pix Zero Authors and The HuggingFace Team. All rights reserved.
#
# 使用 Apache License 2.0 许可协议
# Licensed under the Apache License, Version 2.0 (the "License");
# 该文件仅在遵循许可协议的情况下使用
# you may not use this file except in compliance with the License.
# 许可协议的获取链接
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用的法律要求或书面协议,否则软件按“原样”分发
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# 不提供任何明示或暗示的保证或条件
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可协议中关于权限和限制的详细信息
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect # 导入 inspect 模块,用于获取对象信息
from dataclasses import dataclass # 从 dataclasses 模块导入 dataclass 装饰器
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示
import numpy as np # 导入 numpy 模块,常用于数值计算
import PIL.Image # 导入 PIL.Image 模块,用于图像处理
import torch # 导入 PyTorch 库,主要用于深度学习
import torch.nn.functional as F # 导入 PyTorch 的功能性神经网络模块
from transformers import ( # 从 transformers 模块导入以下类
BlipForConditionalGeneration, # 导入用于条件生成的 Blip 模型
BlipProcessor, # 导入 Blip 处理器
CLIPImageProcessor, # 导入 CLIP 图像处理器
CLIPTextModel, # 导入 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 分词器
)
from ....image_processor import PipelineImageInput, VaeImageProcessor # 从自定义模块导入图像处理相关类
from ....loaders import StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入稳定扩散和文本反转加载器混合类
from ....models import AutoencoderKL, UNet2DConditionModel # 导入自动编码器和 UNet 模型
from ....models.attention_processor import Attention # 导入注意力处理器
from ....models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 文本编码器规模的函数
from ....schedulers import DDIMScheduler, DDPMScheduler, EulerAncestralDiscreteScheduler, LMSDiscreteScheduler # 导入多种调度器
from ....schedulers.scheduling_ddim_inverse import DDIMInverseScheduler # 导入 DDIM 反向调度器
from ....utils import ( # 从自定义工具模块导入实用函数和常量
PIL_INTERPOLATION, # 导入 PIL 图像插值方法
USE_PEFT_BACKEND, # 导入是否使用 PEFT 后端的常量
BaseOutput, # 导入基础输出类
deprecate, # 导入废弃标记装饰器
logging, # 导入日志记录模块
replace_example_docstring, # 导入替换示例文档字符串的函数
scale_lora_layers, # 导入缩放 Lora 层的函数
unscale_lora_layers, # 导入反缩放 Lora 层的函数
)
from ....utils.torch_utils import randn_tensor # 从 PyTorch 工具模块导入生成随机张量的函数
from ...pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 从管道工具模块导入扩散管道和稳定扩散混合类
from ...stable_diffusion.pipeline_output import StableDiffusionPipelineOutput # 导入稳定扩散管道输出类
from ...stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 导入稳定扩散安全检查器
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器
@dataclass # 将下面的类定义为数据类
class Pix2PixInversionPipelineOutput(BaseOutput, TextualInversionLoaderMixin): # 定义输出类,继承基础输出和文本反转加载器混合类
"""
输出类用于稳定扩散管道。
参数:
latents (`torch.Tensor`)
反转的潜在张量
images (`List[PIL.Image.Image]` or `np.ndarray`)
长度为 `batch_size` 的去噪 PIL 图像列表或形状为 `(batch_size, height, width,
num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组呈现扩散管道的去噪图像。
"""
latents: torch.Tensor # 定义潜在张量属性
images: Union[List[PIL.Image.Image], np.ndarray] # 定义图像属性,可以是图像列表或 numpy 数组
EXAMPLE_DOC_STRING = """ # 定义示例文档字符串的初始部分
``` # 示例文档字符串的开始
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
```py # 示例文档字符串的结束
``` # 示例文档字符串的结束
# 示例代码展示如何使用 Diffusers 库进行图像生成
Examples:
```py
# 导入所需的库
>>> import requests # 用于发送 HTTP 请求
>>> import torch # 用于处理张量和深度学习模型
# 从 Diffusers 库导入必要的类
>>> from diffusers import DDIMScheduler, StableDiffusionPix2PixZeroPipeline
# 定义下载嵌入文件的函数
>>> def download(embedding_url, local_filepath):
... # 发送 GET 请求获取嵌入文件
... r = requests.get(embedding_url)
... # 以二进制模式打开本地文件并写入获取的内容
... with open(local_filepath, "wb") as f:
... f.write(r.content)
# 定义模型检查点的名称
>>> model_ckpt = "CompVis/stable-diffusion-v1-4"
# 从预训练模型加载管道并设置数据类型为 float16
>>> pipeline = StableDiffusionPix2PixZeroPipeline.from_pretrained(model_ckpt, torch_dtype=torch.float16)
# 根据管道配置创建 DDIM 调度器
>>> pipeline.scheduler = DDIMScheduler.from_config(pipeline.scheduler.config)
# 将模型移动到 GPU
>>> pipeline.to("cuda")
# 定义文本提示
>>> prompt = "a high resolution painting of a cat in the style of van gough"
# 定义源和目标嵌入文件的 URL
>>> source_emb_url = "https://hf.co/datasets/sayakpaul/sample-datasets/resolve/main/cat.pt"
>>> target_emb_url = "https://hf.co/datasets/sayakpaul/sample-datasets/resolve/main/dog.pt"
# 遍历源和目标嵌入 URL 进行下载
>>> for url in [source_emb_url, target_emb_url]:
... # 调用下载函数,将文件保存到本地
... download(url, url.split("/")[-1])
# 从本地加载源嵌入
>>> src_embeds = torch.load(source_emb_url.split("/")[-1])
# 从本地加载目标嵌入
>>> target_embeds = torch.load(target_emb_url.split("/")[-1])
# 使用管道生成图像
>>> images = pipeline(
... prompt, # 输入的文本提示
... source_embeds=src_embeds, # 源嵌入
... target_embeds=target_embeds, # 目标嵌入
... num_inference_steps=50, # 推理步骤数
... cross_attention_guidance_amount=0.15, # 跨注意力引导的强度
... ).images # 生成的图像
# 保存生成的第一张图像
>>> images[0].save("edited_image_dog.png") # 将图像保存为 PNG 文件
"""
# 示例文档字符串,提供了使用示例和说明
EXAMPLE_INVERT_DOC_STRING = """
Examples:
```py
>>> import torch # 导入 PyTorch 库
>>> from transformers import BlipForConditionalGeneration, BlipProcessor # 从 transformers 导入模型和处理器
>>> from diffusers import DDIMScheduler, DDIMInverseScheduler, StableDiffusionPix2PixZeroPipeline # 从 diffusers 导入调度器和管道
>>> import requests # 导入 requests 库,用于发送网络请求
>>> from PIL import Image # 从 PIL 导入 Image 类,用于处理图像
>>> captioner_id = "Salesforce/blip-image-captioning-base" # 定义图像说明生成模型的 ID
>>> processor = BlipProcessor.from_pretrained(captioner_id) # 从预训练模型加载处理器
>>> model = BlipForConditionalGeneration.from_pretrained( # 从预训练模型加载图像说明生成模型
... captioner_id, torch_dtype=torch.float16, low_cpu_mem_usage=True # 指定数据类型和低内存使用模式
... )
>>> sd_model_ckpt = "CompVis/stable-diffusion-v1-4" # 定义稳定扩散模型的检查点 ID
>>> pipeline = StableDiffusionPix2PixZeroPipeline.from_pretrained( # 从预训练模型加载 Pix2Pix 零管道
... sd_model_ckpt, # 指定检查点
... caption_generator=model, # 指定图像说明生成器
... caption_processor=processor, # 指定图像说明处理器
... torch_dtype=torch.float16, # 指定数据类型
... safety_checker=None, # 关闭安全检查器
... )
>>> pipeline.scheduler = DDIMScheduler.from_config(pipeline.scheduler.config) # 使用调度器配置初始化 DDIM 调度器
>>> pipeline.inverse_scheduler = DDIMInverseScheduler.from_config(pipeline.scheduler.config) # 使用调度器配置初始化 DDIM 反向调度器
>>> pipeline.enable_model_cpu_offload() # 启用模型的 CPU 卸载
>>> img_url = "https://github.com/pix2pixzero/pix2pix-zero/raw/main/assets/test_images/cats/cat_6.png" # 定义要处理的图像 URL
>>> raw_image = Image.open(requests.get(img_url, stream=True).raw).convert("RGB").resize((512, 512)) # 从 URL 加载图像并调整大小
>>> # 生成说明
>>> caption = pipeline.generate_caption(raw_image) # 生成图像的说明
>>> # "a photography of a cat with flowers and dai dai daie - daie - daie kasaii" # 生成的说明示例
>>> inv_latents = pipeline.invert(caption, image=raw_image).latents # 根据说明和原始图像进行反向处理,获取潜变量
>>> # 我们需要生成源和目标嵌入
>>> source_prompts = ["a cat sitting on the street", "a cat playing in the field", "a face of a cat"] # 定义源提示列表
>>> target_prompts = ["a dog sitting on the street", "a dog playing in the field", "a face of a dog"] # 定义目标提示列表
>>> source_embeds = pipeline.get_embeds(source_prompts) # 获取源提示的嵌入表示
>>> target_embeds = pipeline.get_embeds(target_prompts) # 获取目标提示的嵌入表示
>>> # 潜变量可以用于编辑真实图像
>>> # 在使用稳定扩散 2 或其他使用 v-prediction 的模型时
>>> # 将 `cross_attention_guidance_amount` 设置为 0.01 或更低,以避免输入潜变量梯度爆炸
>>> image = pipeline( # 使用管道生成新的图像
... caption, # 使用生成的说明
... source_embeds=source_embeds, # 传递源嵌入
... target_embeds=target_embeds, # 传递目标嵌入
... num_inference_steps=50, # 指定推理步骤数量
... cross_attention_guidance_amount=0.15, # 指定交叉注意力指导量
... generator=generator, # 使用指定的生成器
... latents=inv_latents, # 传递潜变量
... negative_prompt=caption, # 使用生成的说明作为负提示
... ).images[0] # 获取生成的图像
>>> image.save("edited_image.png") # 保存生成的图像
```py
"""
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion_img2img 导入的 preprocess 函数
def preprocess(image): # 定义 preprocess 函数,接受图像作为参数
# 设置一个警告信息,提示用户 preprocess 方法已被弃用,并将在 diffusers 1.0.0 中移除
deprecation_message = "The preprocess method is deprecated and will be removed in diffusers 1.0.0. Please use VaeImageProcessor.preprocess(...) instead"
# 调用 deprecate 函数,记录弃用信息,设定标准警告为 False
deprecate("preprocess", "1.0.0", deprecation_message, standard_warn=False)
# 检查输入的 image 是否是一个 Torch 张量
if isinstance(image, torch.Tensor):
# 如果是,直接返回该张量
return image
# 检查输入的 image 是否是一个 PIL 图像
elif isinstance(image, PIL.Image.Image):
# 如果是,将其封装为一个单元素列表
image = [image]
# 检查列表中的第一个元素是否为 PIL 图像
if isinstance(image[0], PIL.Image.Image):
# 获取第一个图像的宽度和高度
w, h = image[0].size
# 将宽度和高度调整为 8 的整数倍
w, h = (x - x % 8 for x in (w, h)) # resize to integer multiple of 8
# 对每个图像进行调整大小,转换为 numpy 数组,并在新维度上增加一维
image = [np.array(i.resize((w, h), resample=PIL_INTERPOLATION["lanczos"]))[None, :] for i in image]
# 将所有图像在第 0 维上连接成一个大的数组
image = np.concatenate(image, axis=0)
# 将数据转换为 float32 类型并归一化到 [0, 1] 范围
image = np.array(image).astype(np.float32) / 255.0
# 调整数组维度顺序为 (batch_size, channels, height, width)
image = image.transpose(0, 3, 1, 2)
# 将像素值范围从 [0, 1] 转换到 [-1, 1]
image = 2.0 * image - 1.0
# 将 numpy 数组转换为 Torch 张量
image = torch.from_numpy(image)
# 检查列表中的第一个元素是否为 Torch 张量
elif isinstance(image[0], torch.Tensor):
# 将多个张量在第 0 维上连接成一个大的张量
image = torch.cat(image, dim=0)
# 返回处理后的图像
return image
# 准备 UNet 模型以执行 Pix2Pix Zero 优化
def prepare_unet(unet: UNet2DConditionModel):
# 初始化一个空字典,用于存储 Pix2Pix Zero 注意力处理器
pix2pix_zero_attn_procs = {}
# 遍历 UNet 的注意力处理器的键
for name in unet.attn_processors.keys():
# 将处理器名称中的 ".processor" 替换为空
module_name = name.replace(".processor", "")
# 获取 UNet 中对应的子模块
module = unet.get_submodule(module_name)
# 如果名称包含 "attn2"
if "attn2" in name:
# 将处理器设置为 Pix2Pix Zero 模式
pix2pix_zero_attn_procs[name] = Pix2PixZeroAttnProcessor(is_pix2pix_zero=True)
# 允许该模块进行梯度更新
module.requires_grad_(True)
else:
# 设置为非 Pix2Pix Zero 模式
pix2pix_zero_attn_procs[name] = Pix2PixZeroAttnProcessor(is_pix2pix_zero=False)
# 不允许该模块进行梯度更新
module.requires_grad_(False)
# 设置 UNet 的注意力处理器为修改后的处理器字典
unet.set_attn_processor(pix2pix_zero_attn_procs)
# 返回修改后的 UNet 模型
return unet
class Pix2PixZeroL2Loss:
# 初始化损失类
def __init__(self):
# 设置初始损失值为 0
self.loss = 0.0
# 计算损失的方法
def compute_loss(self, predictions, targets):
# 更新损失值为预测值与目标值之间的均方差
self.loss += ((predictions - targets) ** 2).sum((1, 2)).mean(0)
class Pix2PixZeroAttnProcessor:
"""注意力处理器类,用于存储注意力权重。
在 Pix2Pix Zero 中,该过程发生在交叉注意力块的计算中。"""
# 初始化注意力处理器
def __init__(self, is_pix2pix_zero=False):
# 记录是否为 Pix2Pix Zero 模式
self.is_pix2pix_zero = is_pix2pix_zero
# 如果是 Pix2Pix Zero 模式,初始化参考交叉注意力映射
if self.is_pix2pix_zero:
self.reference_cross_attn_map = {}
# 定义调用方法
def __call__(
self,
attn: Attention,
hidden_states,
encoder_hidden_states=None,
attention_mask=None,
timestep=None,
loss=None,
):
# 获取隐藏状态的批次大小和序列长度
batch_size, sequence_length, _ = hidden_states.shape
# 准备注意力掩码
attention_mask = attn.prepare_attention_mask(attention_mask, sequence_length, batch_size)
# 将隐藏状态转换为查询向量
query = attn.to_q(hidden_states)
# 如果没有编码器隐藏状态,则使用隐藏状态本身
if encoder_hidden_states is None:
encoder_hidden_states = hidden_states
# 如果需要进行交叉规范化
elif attn.norm_cross:
encoder_hidden_states = attn.norm_encoder_hidden_states(encoder_hidden_states)
# 将编码器隐藏状态转换为键和值
key = attn.to_k(encoder_hidden_states)
value = attn.to_v(encoder_hidden_states)
# 将查询、键和值转换为批次维度
query = attn.head_to_batch_dim(query)
key = attn.head_to_batch_dim(key)
value = attn.head_to_batch_dim(value)
# 计算注意力分数
attention_probs = attn.get_attention_scores(query, key, attention_mask)
# 如果是 Pix2Pix Zero 模式且时间步不为 None
if self.is_pix2pix_zero and timestep is not None:
# 新的记录以保存注意力权重
if loss is None:
self.reference_cross_attn_map[timestep.item()] = attention_probs.detach().cpu()
# 计算损失
elif loss is not None:
# 获取之前的注意力概率
prev_attn_probs = self.reference_cross_attn_map.pop(timestep.item())
# 计算损失
loss.compute_loss(attention_probs, prev_attn_probs.to(attention_probs.device))
# 将注意力概率与值相乘以获得新的隐藏状态
hidden_states = torch.bmm(attention_probs, value)
# 将隐藏状态转换回头维度
hidden_states = attn.batch_to_head_dim(hidden_states)
# 线性变换
hidden_states = attn.to_out[0](hidden_states)
# 应用 dropout
hidden_states = attn.to_out[1](hidden_states)
# 返回新的隐藏状态
return hidden_states
# 定义一个用于像素级图像编辑的管道类,基于 Stable Diffusion
class StableDiffusionPix2PixZeroPipeline(DiffusionPipeline, StableDiffusionMixin):
r"""
使用 Pix2Pix Zero 进行像素级图像编辑的管道。基于 Stable Diffusion。
该模型继承自 [`DiffusionPipeline`]。请查阅超类文档以获取库为所有管道实现的通用方法(例如下载或保存、在特定设备上运行等)。
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码到潜在表示的变分自编码器(VAE)模型。
text_encoder ([`CLIPTextModel`]):
冻结的文本编码器。Stable Diffusion 使用
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel) 的文本部分,
特别是 [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
tokenizer (`CLIPTokenizer`):
类的分词器
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer)。
unet ([`UNet2DConditionModel`]): 用于去噪编码图像潜在的条件 U-Net 架构。
scheduler ([`SchedulerMixin`]):
与 `unet` 一起使用以去噪编码图像潜在的调度器。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`], [`EulerAncestralDiscreteScheduler`] 或 [`DDPMScheduler`] 的之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
估计生成图像是否可能被视为攻击性或有害的分类模块。
请参阅 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5) 以获取详细信息。
feature_extractor ([`CLIPImageProcessor`]):
从生成图像中提取特征的模型,以便作为 `safety_checker` 的输入。
requires_safety_checker (bool):
管道是否需要安全检查器。如果您公开使用该管道,我们建议将其设置为 True。
"""
# 定义 CPU 卸载的模型组件顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 可选组件列表
_optional_components = [
"safety_checker",
"feature_extractor",
"caption_generator",
"caption_processor",
"inverse_scheduler",
]
# 从 CPU 卸载中排除的组件列表
_exclude_from_cpu_offload = ["safety_checker"]
# 初始化方法,定义管道的参数
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器模型
text_encoder: CLIPTextModel, # 文本编码器模型
tokenizer: CLIPTokenizer, # 分词器模型
unet: UNet2DConditionModel, # 条件 U-Net 模型
scheduler: Union[DDPMScheduler, DDIMScheduler, EulerAncestralDiscreteScheduler, LMSDiscreteScheduler], # 调度器类型
feature_extractor: CLIPImageProcessor, # 特征提取器模型
safety_checker: StableDiffusionSafetyChecker, # 安全检查器模型
inverse_scheduler: DDIMInverseScheduler, # 反向调度器
caption_generator: BlipForConditionalGeneration, # 描述生成器
caption_processor: BlipProcessor, # 描述处理器
requires_safety_checker: bool = True, # 是否需要安全检查器的标志
# 定义一个构造函数
):
# 调用父类的构造函数
super().__init__()
# 如果没有提供安全检查器且需要安全检查器,发出警告
if safety_checker is None and requires_safety_checker:
logger.warning(
# 输出关于禁用安全检查器的警告信息
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 如果提供了安全检查器但没有提供特征提取器,抛出错误
if safety_checker is not None and feature_extractor is None:
raise ValueError(
# 提示用户必须定义特征提取器以使用安全检查器
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册模块,设置各个组件
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
caption_processor=caption_processor,
caption_generator=caption_generator,
inverse_scheduler=inverse_scheduler,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 将配置项注册到当前实例
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 类复制的编码提示的方法
def _encode_prompt(
self,
prompt,
device,
num_images_per_prompt,
do_classifier_free_guidance,
negative_prompt=None,
# 可选参数,表示提示的嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选参数,表示负面提示的嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选参数,表示 LORA 的缩放因子
lora_scale: Optional[float] = None,
# 接收任意额外参数
**kwargs,
# 开始定义一个方法,处理已弃用的编码提示功能
):
# 定义弃用信息,说明该方法将被移除,并推荐使用新方法
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用弃用函数,记录弃用警告
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用新的编码提示方法,获取结果元组
prompt_embeds_tuple = self.encode_prompt(
# 传入提示文本
prompt=prompt,
# 设备类型(CPU/GPU)
device=device,
# 每个提示生成的图像数量
num_images_per_prompt=num_images_per_prompt,
# 是否进行分类器自由引导
do_classifier_free_guidance=do_classifier_free_guidance,
# 负面提示文本
negative_prompt=negative_prompt,
# 提示嵌入
prompt_embeds=prompt_embeds,
# 负面提示嵌入
negative_prompt_embeds=negative_prompt_embeds,
# Lora缩放因子
lora_scale=lora_scale,
# 其他可选参数
**kwargs,
)
# 将返回的元组中的两个嵌入连接起来以兼容旧版
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回最终的提示嵌入
return prompt_embeds
# 从指定的管道中复制的 encode_prompt 方法定义
def encode_prompt(
# 提示文本
self,
prompt,
# 设备类型
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否进行分类器自由引导
do_classifier_free_guidance,
# 负面提示文本(可选)
negative_prompt=None,
# 提示嵌入(可选)
prompt_embeds: Optional[torch.Tensor] = None,
# 负面提示嵌入(可选)
negative_prompt_embeds: Optional[torch.Tensor] = None,
# Lora缩放因子(可选)
lora_scale: Optional[float] = None,
# 跳过的clip层数(可选)
clip_skip: Optional[int] = None,
# 从指定的管道中复制的 run_safety_checker 方法定义
def run_safety_checker(self, image, device, dtype):
# 检查是否存在安全检查器
if self.safety_checker is None:
# 如果没有安全检查器,标记为无概念
has_nsfw_concept = None
else:
# 如果图像是张量,则进行后处理以转换为PIL格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果不是张量,则将其转换为PIL格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 将处理后的图像提取特征,准备进行安全检查
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 使用安全检查器检查图像,返回图像及其概念状态
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回检查后的图像及其概念状态
return image, has_nsfw_concept
# 从指定的管道中复制的 decode_latents 方法
# 解码潜在向量并返回生成的图像
def decode_latents(self, latents):
# 警告用户该方法已过时,将在1.0.0版本中删除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用deprecate函数记录该方法的弃用信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 使用配置的缩放因子对潜在向量进行缩放
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在向量,返回生成的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像值从[-1, 1]映射到[0, 1]并限制范围
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为float32格式以确保兼容性,并将其转换为numpy数组
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs复制
def prepare_extra_step_kwargs(self, generator, eta):
# 准备额外的参数以供调度器步骤使用,因调度器的参数签名可能不同
# eta (η) 仅在DDIMScheduler中使用,其他调度器将忽略它。
# eta在DDIM论文中对应于η:https://arxiv.org/abs/2010.02502
# eta的值应在[0, 1]之间
# 检查调度器步骤是否接受eta参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果接受eta,则将其添加到额外参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受generator参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受generator,则将其添加到额外参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回准备好的额外参数
return extra_step_kwargs
def check_inputs(
self,
prompt,
source_embeds,
target_embeds,
callback_steps,
prompt_embeds=None,
):
# 检查callback_steps是否为正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 确保source_embeds和target_embeds不能同时未定义
if source_embeds is None and target_embeds is None:
raise ValueError("`source_embeds` and `target_embeds` cannot be undefined.")
# 检查prompt和prompt_embeds不能同时被定义
if prompt is not None and prompt_embeds is not None:
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查prompt和prompt_embeds不能同时未定义
elif prompt is None and prompt_embeds is None:
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 确保prompt的类型为str或list
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 从 StableDiffusionPipeline 的 prepare_latents 方法复制的内容
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在张量的形状,包括批次大小、通道数、高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器是否为列表且其长度与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果未提供潜在张量,则生成随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在张量,则将其移动到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在张量
return latents
@torch.no_grad()
def generate_caption(self, images):
"""为给定图像生成标题。"""
# 初始化生成标题的文本
text = "a photography of"
# 保存当前设备
prev_device = self.caption_generator.device
# 获取执行设备
device = self._execution_device
# 处理输入图像并转换为张量
inputs = self.caption_processor(images, text, return_tensors="pt").to(
device=device, dtype=self.caption_generator.dtype
)
# 将标题生成器移动到指定设备
self.caption_generator.to(device)
# 生成标题输出
outputs = self.caption_generator.generate(**inputs, max_new_tokens=128)
# 将标题生成器移回先前设备
self.caption_generator.to(prev_device)
# 解码输出以获取标题
caption = self.caption_processor.batch_decode(outputs, skip_special_tokens=True)[0]
# 返回生成的标题
return caption
def construct_direction(self, embs_source: torch.Tensor, embs_target: torch.Tensor):
"""构造用于引导图像生成过程的编辑方向。"""
# 返回目标和源嵌入的均值之差,并增加一个维度
return (embs_target.mean(0) - embs_source.mean(0)).unsqueeze(0)
@torch.no_grad()
def get_embeds(self, prompt: List[str], batch_size: int = 16) -> torch.Tensor:
# 获取提示的数量
num_prompts = len(prompt)
# 初始化嵌入列表
embeds = []
# 分批处理提示
for i in range(0, num_prompts, batch_size):
prompt_slice = prompt[i : i + batch_size]
# 将提示转换为输入 ID,进行填充和截断
input_ids = self.tokenizer(
prompt_slice,
padding="max_length",
max_length=self.tokenizer.model_max_length,
truncation=True,
return_tensors="pt",
).input_ids
# 将输入 ID 移动到文本编码器设备
input_ids = input_ids.to(self.text_encoder.device)
# 获取嵌入并追加到列表
embeds.append(self.text_encoder(input_ids)[0])
# 将所有嵌入拼接并计算均值
return torch.cat(embeds, dim=0).mean(0)[None]
# 准备图像的潜在表示,接收图像和其他参数,返回潜在向量
def prepare_image_latents(self, image, batch_size, dtype, device, generator=None):
# 检查输入图像的类型是否为有效类型
if not isinstance(image, (torch.Tensor, PIL.Image.Image, list)):
# 抛出类型错误,提示用户输入类型不正确
raise ValueError(
f"`image` has to be of type `torch.Tensor`, `PIL.Image.Image` or list but is {type(image)}"
)
# 将图像转换到指定的设备和数据类型
image = image.to(device=device, dtype=dtype)
# 如果图像有四个通道,直接将其作为潜在表示
if image.shape[1] == 4:
latents = image
else:
# 检查生成器列表的长度是否与批次大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出错误,提示生成器列表长度与批次大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果生成器是列表,逐个图像编码并生成潜在表示
if isinstance(generator, list):
latents = [
self.vae.encode(image[i : i + 1]).latent_dist.sample(generator[i]) for i in range(batch_size)
]
# 将潜在表示合并到一个张量中
latents = torch.cat(latents, dim=0)
else:
# 使用单个生成器编码图像并生成潜在表示
latents = self.vae.encode(image).latent_dist.sample(generator)
# 根据配置的缩放因子调整潜在表示
latents = self.vae.config.scaling_factor * latents
# 检查潜在表示的批次大小是否与请求的匹配
if batch_size != latents.shape[0]:
# 如果可以整除,则扩展潜在表示以匹配批次大小
if batch_size % latents.shape[0] == 0:
# 构建弃用消息,提示用户行为即将被移除
deprecation_message = (
f"You have passed {batch_size} text prompts (`prompt`), but only {latents.shape[0]} initial"
" images (`image`). Initial images are now duplicating to match the number of text prompts. Note"
" that this behavior is deprecated and will be removed in a version 1.0.0. Please make sure to update"
" your script to pass as many initial images as text prompts to suppress this warning."
)
# 触发弃用警告,提醒用户修改代码
deprecate("len(prompt) != len(image)", "1.0.0", deprecation_message, standard_warn=False)
# 计算每个图像需要复制的次数
additional_latents_per_image = batch_size // latents.shape[0]
# 将潜在表示按需重复以匹配批次大小
latents = torch.cat([latents] * additional_latents_per_image, dim=0)
else:
# 抛出错误,提示无法复制图像以匹配批次大小
raise ValueError(
f"Cannot duplicate `image` of batch size {latents.shape[0]} to {batch_size} text prompts."
)
else:
# 将潜在表示封装为一个张量
latents = torch.cat([latents], dim=0)
# 返回最终的潜在表示
return latents
# 定义一个获取epsilon的函数,输入为模型输出、样本和时间步
def get_epsilon(self, model_output: torch.Tensor, sample: torch.Tensor, timestep: int):
# 获取反向调度器的预测类型配置
pred_type = self.inverse_scheduler.config.prediction_type
# 计算在当前时间步的累积alpha值
alpha_prod_t = self.inverse_scheduler.alphas_cumprod[timestep]
# 计算beta值为1减去alpha值
beta_prod_t = 1 - alpha_prod_t
# 根据预测类型返回相应的结果
if pred_type == "epsilon":
return model_output
elif pred_type == "sample":
# 根据样本和模型输出计算返回值
return (sample - alpha_prod_t ** (0.5) * model_output) / beta_prod_t ** (0.5)
elif pred_type == "v_prediction":
# 根据alpha和beta值结合模型输出和样本计算返回值
return (alpha_prod_t**0.5) * model_output + (beta_prod_t**0.5) * sample
else:
# 如果预测类型无效,抛出异常
raise ValueError(
f"prediction_type given as {pred_type} must be one of `epsilon`, `sample`, or `v_prediction`"
)
# 定义一个自动相关损失计算的函数,输入为隐藏状态和可选生成器
def auto_corr_loss(self, hidden_states, generator=None):
# 初始化正则化损失为0
reg_loss = 0.0
# 遍历隐藏状态的每一个维度
for i in range(hidden_states.shape[0]):
for j in range(hidden_states.shape[1]):
# 选取当前噪声
noise = hidden_states[i : i + 1, j : j + 1, :, :]
# 进行循环,直到噪声尺寸小于等于8
while True:
# 随机生成滚动的位移量
roll_amount = torch.randint(noise.shape[2] // 2, (1,), generator=generator).item()
# 计算并累加正则化损失
reg_loss += (noise * torch.roll(noise, shifts=roll_amount, dims=2)).mean() ** 2
reg_loss += (noise * torch.roll(noise, shifts=roll_amount, dims=3)).mean() ** 2
# 如果噪声尺寸小于等于8,跳出循环
if noise.shape[2] <= 8:
break
# 对噪声进行2x2的平均池化
noise = F.avg_pool2d(noise, kernel_size=2)
# 返回计算得到的正则化损失
return reg_loss
# 定义一个计算KL散度的函数,输入为隐藏状态
def kl_divergence(self, hidden_states):
# 计算隐藏状态的均值
mean = hidden_states.mean()
# 计算隐藏状态的方差
var = hidden_states.var()
# 返回KL散度的计算结果
return var + mean**2 - 1 - torch.log(var + 1e-7)
# 定义调用函数,使用@torch.no_grad()装饰器禁止梯度计算
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 输入参数包括提示、源和目标嵌入、图像的高和宽、推理步骤等
prompt: Optional[Union[str, List[str]]] = None,
source_embeds: torch.Tensor = None,
target_embeds: torch.Tensor = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
cross_attention_guidance_amount: float = 0.1,
output_type: Optional[str] = "pil",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: Optional[int] = 1,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
clip_skip: Optional[int] = None,
# 使用@torch.no_grad()和装饰器替换文档字符串
@torch.no_grad()
@replace_example_docstring(EXAMPLE_INVERT_DOC_STRING)
# 定义一个名为 invert 的方法,包含多个可选参数
def invert(
# 输入提示,默认为 None
self,
prompt: Optional[str] = None,
# 输入图像,默认为 None
image: PipelineImageInput = None,
# 推理步骤的数量,默认为 50
num_inference_steps: int = 50,
# 指导比例,默认为 1
guidance_scale: float = 1,
# 随机数生成器,可以是单个或多个生成器,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示嵌入,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 跨注意力引导量,默认为 0.1
cross_attention_guidance_amount: float = 0.1,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典,默认为 True
return_dict: bool = True,
# 回调函数,默认为 None
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤,默认为 1
callback_steps: Optional[int] = 1,
# 跨注意力参数,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# 自动相关的权重,默认为 20.0
lambda_auto_corr: float = 20.0,
# KL 散度的权重,默认为 20.0
lambda_kl: float = 20.0,
# 正则化步骤的数量,默认为 5
num_reg_steps: int = 5,
# 自动相关滚动的数量,默认为 5
num_auto_corr_rolls: int = 5,
.\diffusers\pipelines\deprecated\stable_diffusion_variants\__init__.py
# 从类型检查模块导入类型检查相关功能
from typing import TYPE_CHECKING
# 从 utils 模块导入所需的工具和常量
from ....utils import (
DIFFUSERS_SLOW_IMPORT, # 导入用于延迟导入的常量
OptionalDependencyNotAvailable, # 导入可选依赖不可用的异常类
_LazyModule, # 导入延迟模块加载的工具
get_objects_from_module, # 导入从模块获取对象的函数
is_torch_available, # 导入检查 Torch 库是否可用的函数
is_transformers_available, # 导入检查 Transformers 库是否可用的函数
)
# 初始化一个空字典用于存放虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存放模块导入结构
_import_structure = {}
try:
# 检查 Transformers 和 Torch 库是否都可用
if not (is_transformers_available() and is_torch_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 导入虚拟对象以避免在依赖不可用时导致错误
from ....utils import dummy_torch_and_transformers_objects
# 更新虚拟对象字典,填充从虚拟对象模块获取的对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果依赖可用,添加相关的管道到导入结构字典
_import_structure["pipeline_cycle_diffusion"] = ["CycleDiffusionPipeline"]
_import_structure["pipeline_stable_diffusion_inpaint_legacy"] = ["StableDiffusionInpaintPipelineLegacy"]
_import_structure["pipeline_stable_diffusion_model_editing"] = ["StableDiffusionModelEditingPipeline"]
_import_structure["pipeline_stable_diffusion_paradigms"] = ["StableDiffusionParadigmsPipeline"]
_import_structure["pipeline_stable_diffusion_pix2pix_zero"] = ["StableDiffusionPix2PixZeroPipeline"]
# 根据类型检查标志或慢速导入标志进行条件判断
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 再次检查依赖是否可用
if not (is_transformers_available() and is_torch_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
except OptionalDependencyNotAvailable:
# 导入虚拟对象以避免在依赖不可用时导致错误
from ....utils.dummy_torch_and_transformers_objects import *
else:
# 导入具体的管道类,确保它们在依赖可用时被加载
from .pipeline_cycle_diffusion import CycleDiffusionPipeline
from .pipeline_stable_diffusion_inpaint_legacy import StableDiffusionInpaintPipelineLegacy
from .pipeline_stable_diffusion_model_editing import StableDiffusionModelEditingPipeline
from .pipeline_stable_diffusion_paradigms import StableDiffusionParadigmsPipeline
from .pipeline_stable_diffusion_pix2pix_zero import StableDiffusionPix2PixZeroPipeline
else:
# 如果不是类型检查或慢速导入,则进行懒加载处理
import sys
# 使用懒加载模块构造当前模块,指定导入结构和模块规格
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
# 遍历虚拟对象字典,将对象属性设置到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\deprecated\stochastic_karras_ve\pipeline_stochastic_karras_ve.py
# 版权声明,表明此文件的版权所有者及保留权利
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行许可;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,软件在许可证下分发,按“原样”基础,
# 不提供任何形式的保证或条件,无论是明示或暗示的。
# 请参见许可证以获取有关权限和
# 限制的具体规定。
# 从 typing 模块导入所需的类型提示
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从相对路径导入 UNet2DModel 模型
from ....models import UNet2DModel
# 从相对路径导入调度器 KarrasVeScheduler
from ....schedulers import KarrasVeScheduler
# 从相对路径导入随机张量生成工具
from ....utils.torch_utils import randn_tensor
# 从相对路径导入扩散管道和图像输出
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput
# 定义 KarrasVePipeline 类,继承自 DiffusionPipeline
class KarrasVePipeline(DiffusionPipeline):
r"""
无条件图像生成的管道。
参数:
unet ([`UNet2DModel`]):
用于去噪编码图像的 `UNet2DModel`。
scheduler ([`KarrasVeScheduler`]):
用于与 `unet` 结合去噪编码图像的调度器。
"""
# 为 linting 添加类型提示
unet: UNet2DModel # 定义 unet 类型为 UNet2DModel
scheduler: KarrasVeScheduler # 定义 scheduler 类型为 KarrasVeScheduler
# 初始化函数,接受 UNet2DModel 和 KarrasVeScheduler 作为参数
def __init__(self, unet: UNet2DModel, scheduler: KarrasVeScheduler):
# 调用父类的初始化函数
super().__init__()
# 注册模块,将 unet 和 scheduler 注册到当前实例中
self.register_modules(unet=unet, scheduler=scheduler)
# 装饰器,表明此函数不需要梯度计算
@torch.no_grad()
def __call__(
self,
batch_size: int = 1, # 定义批处理大小,默认为 1
num_inference_steps: int = 50, # 定义推理步骤数量,默认为 50
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 可选生成器
output_type: Optional[str] = "pil", # 可选输出类型,默认为 "pil"
return_dict: bool = True, # 是否返回字典,默认为 True
**kwargs, # 允许额外的关键字参数
.\diffusers\pipelines\deprecated\stochastic_karras_ve\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从相对路径导入工具模块中的 DIFFUSERS_SLOW_IMPORT 和 _LazyModule
from ....utils import DIFFUSERS_SLOW_IMPORT, _LazyModule
# 定义一个字典,描述要导入的模块及其对应的类
_import_structure = {"pipeline_stochastic_karras_ve": ["KarrasVePipeline"]}
# 如果正在进行类型检查或 DIFFUSERS_SLOW_IMPORT 为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 从 pipeline_stochastic_karras_ve 模块导入 KarrasVePipeline 类
from .pipeline_stochastic_karras_ve import KarrasVePipeline
# 否则
else:
# 导入 sys 模块,用于动态修改模块
import sys
# 使用 _LazyModule 创建懒加载模块,并将其赋值给当前模块名称
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块的名称
globals()["__file__"], # 当前模块的文件路径
_import_structure, # 模块结构字典
module_spec=__spec__, # 当前模块的规格
)
.\diffusers\pipelines\deprecated\versatile_diffusion\modeling_text_unet.py
# 从 typing 模块导入各种类型注解
from typing import Any, Dict, List, Optional, Tuple, Union
# 导入 numpy 库,用于数组和矩阵操作
import numpy as np
# 导入 PyTorch 库,进行深度学习模型的构建和训练
import torch
# 导入 PyTorch 的神经网络模块
import torch.nn as nn
# 导入 PyTorch 的功能性模块,提供常用操作
import torch.nn.functional as F
# 从 diffusers.utils 模块导入 deprecate 函数,用于处理弃用警告
from diffusers.utils import deprecate
# 导入配置相关的类和函数
from ....configuration_utils import ConfigMixin, register_to_config
# 导入模型相关的基类
from ....models import ModelMixin
# 导入激活函数获取工具
from ....models.activations import get_activation
# 导入注意力处理器相关组件
from ....models.attention_processor import (
ADDED_KV_ATTENTION_PROCESSORS, # 额外键值注意力处理器
CROSS_ATTENTION_PROCESSORS, # 交叉注意力处理器
Attention, # 注意力机制类
AttentionProcessor, # 注意力处理器基类
AttnAddedKVProcessor, # 额外键值注意力处理器类
AttnAddedKVProcessor2_0, # 版本 2.0 的额外键值注意力处理器
AttnProcessor, # 基础注意力处理器
)
# 导入嵌入层相关组件
from ....models.embeddings import (
GaussianFourierProjection, # 高斯傅里叶投影类
ImageHintTimeEmbedding, # 图像提示时间嵌入类
ImageProjection, # 图像投影类
ImageTimeEmbedding, # 图像时间嵌入类
TextImageProjection, # 文本图像投影类
TextImageTimeEmbedding, # 文本图像时间嵌入类
TextTimeEmbedding, # 文本时间嵌入类
TimestepEmbedding, # 时间步嵌入类
Timesteps, # 时间步类
)
# 导入 ResNet 相关组件
from ....models.resnet import ResnetBlockCondNorm2D
# 导入 2D 双重变换器模型
from ....models.transformers.dual_transformer_2d import DualTransformer2DModel
# 导入 2D 变换器模型
from ....models.transformers.transformer_2d import Transformer2DModel
# 导入 2D 条件 UNet 输出类
from ....models.unets.unet_2d_condition import UNet2DConditionOutput
# 导入工具函数和常量
from ....utils import USE_PEFT_BACKEND, is_torch_version, logging, scale_lora_layers, unscale_lora_layers
# 导入 PyTorch 相关工具函数
from ....utils.torch_utils import apply_freeu
# 创建日志记录器实例
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义获取下采样块的函数
def get_down_block(
down_block_type, # 下采样块类型
num_layers, # 层数
in_channels, # 输入通道数
out_channels, # 输出通道数
temb_channels, # 时间嵌入通道数
add_downsample, # 是否添加下采样
resnet_eps, # ResNet 中的 epsilon 值
resnet_act_fn, # ResNet 激活函数
num_attention_heads, # 注意力头数量
transformer_layers_per_block, # 每个块中的变换器层数
attention_type, # 注意力类型
attention_head_dim, # 注意力头维度
resnet_groups=None, # ResNet 组数(可选)
cross_attention_dim=None, # 交叉注意力维度(可选)
downsample_padding=None, # 下采样填充(可选)
dual_cross_attention=False, # 是否使用双重交叉注意力
use_linear_projection=False, # 是否使用线性投影
only_cross_attention=False, # 是否仅使用交叉注意力
upcast_attention=False, # 是否上升注意力
resnet_time_scale_shift="default", # ResNet 时间缩放偏移
resnet_skip_time_act=False, # ResNet 是否跳过时间激活
resnet_out_scale_factor=1.0, # ResNet 输出缩放因子
cross_attention_norm=None, # 交叉注意力归一化(可选)
dropout=0.0, # dropout 概率
):
# 如果下采样块类型以 "UNetRes" 开头,则去掉前缀
down_block_type = down_block_type[7:] if down_block_type.startswith("UNetRes") else down_block_type
# 如果下采样块类型为 "DownBlockFlat",则返回相应的块实例
if down_block_type == "DownBlockFlat":
return DownBlockFlat(
num_layers=num_layers, # 层数
in_channels=in_channels, # 输入通道数
out_channels=out_channels, # 输出通道数
temb_channels=temb_channels, # 时间嵌入通道数
dropout=dropout, # dropout 概率
add_downsample=add_downsample, # 是否添加下采样
resnet_eps=resnet_eps, # ResNet 中的 epsilon 值
resnet_act_fn=resnet_act_fn, # ResNet 激活函数
resnet_groups=resnet_groups, # ResNet 组数(可选)
downsample_padding=downsample_padding, # 下采样填充(可选)
resnet_time_scale_shift=resnet_time_scale_shift, # ResNet 时间缩放偏移
)
# 检查下采样块类型是否为 CrossAttnDownBlockFlat
elif down_block_type == "CrossAttnDownBlockFlat":
# 如果没有指定 cross_attention_dim,则抛出值错误
if cross_attention_dim is None:
raise ValueError("cross_attention_dim must be specified for CrossAttnDownBlockFlat")
# 创建并返回 CrossAttnDownBlockFlat 实例,传入所需参数
return CrossAttnDownBlockFlat(
# 设置网络层数
num_layers=num_layers,
# 设置输入通道数
in_channels=in_channels,
# 设置输出通道数
out_channels=out_channels,
# 设置时间嵌入通道数
temb_channels=temb_channels,
# 设置 dropout 比率
dropout=dropout,
# 设置是否添加下采样层
add_downsample=add_downsample,
# 设置 ResNet 中的 epsilon 参数
resnet_eps=resnet_eps,
# 设置 ResNet 激活函数
resnet_act_fn=resnet_act_fn,
# 设置 ResNet 组的数量
resnet_groups=resnet_groups,
# 设置下采样的填充参数
downsample_padding=downsample_padding,
# 设置交叉注意力维度
cross_attention_dim=cross_attention_dim,
# 设置注意力头的数量
num_attention_heads=num_attention_heads,
# 设置是否使用双交叉注意力
dual_cross_attention=dual_cross_attention,
# 设置是否使用线性投影
use_linear_projection=use_linear_projection,
# 设置是否仅使用交叉注意力
only_cross_attention=only_cross_attention,
# 设置 ResNet 的时间尺度偏移
resnet_time_scale_shift=resnet_time_scale_shift,
)
# 如果下采样块类型不被支持,则抛出值错误
raise ValueError(f"{down_block_type} is not supported.")
# 根据给定参数创建上采样块的函数
def get_up_block(
# 上采样块类型
up_block_type,
# 网络层数
num_layers,
# 输入通道数
in_channels,
# 输出通道数
out_channels,
# 上一层输出通道数
prev_output_channel,
# 条件嵌入通道数
temb_channels,
# 是否添加上采样
add_upsample,
# ResNet 的 epsilon 值
resnet_eps,
# ResNet 的激活函数
resnet_act_fn,
# 注意力头数
num_attention_heads,
# 每个块的 Transformer 层数
transformer_layers_per_block,
# 分辨率索引
resolution_idx,
# 注意力类型
attention_type,
# 注意力头维度
attention_head_dim,
# ResNet 组数,可选参数
resnet_groups=None,
# 跨注意力维度,可选参数
cross_attention_dim=None,
# 是否使用双重跨注意力
dual_cross_attention=False,
# 是否使用线性投影
use_linear_projection=False,
# 是否仅使用跨注意力
only_cross_attention=False,
# 是否上溯注意力
upcast_attention=False,
# ResNet 时间尺度偏移,默认为 "default"
resnet_time_scale_shift="default",
# ResNet 是否跳过时间激活
resnet_skip_time_act=False,
# ResNet 输出缩放因子
resnet_out_scale_factor=1.0,
# 跨注意力归一化类型,可选参数
cross_attention_norm=None,
# dropout 概率
dropout=0.0,
):
# 如果上采样块类型以 "UNetRes" 开头,去掉前缀
up_block_type = up_block_type[7:] if up_block_type.startswith("UNetRes") else up_block_type
# 如果块类型是 "UpBlockFlat",则返回相应的实例
if up_block_type == "UpBlockFlat":
return UpBlockFlat(
# 传入各个参数
num_layers=num_layers,
in_channels=in_channels,
out_channels=out_channels,
prev_output_channel=prev_output_channel,
temb_channels=temb_channels,
dropout=dropout,
add_upsample=add_upsample,
resnet_eps=resnet_eps,
resnet_act_fn=resnet_act_fn,
resnet_groups=resnet_groups,
resnet_time_scale_shift=resnet_time_scale_shift,
)
# 如果块类型是 "CrossAttnUpBlockFlat"
elif up_block_type == "CrossAttnUpBlockFlat":
# 检查跨注意力维度是否指定
if cross_attention_dim is None:
raise ValueError("cross_attention_dim must be specified for CrossAttnUpBlockFlat")
# 返回相应的跨注意力上采样块实例
return CrossAttnUpBlockFlat(
# 传入各个参数
num_layers=num_layers,
in_channels=in_channels,
out_channels=out_channels,
prev_output_channel=prev_output_channel,
temb_channels=temb_channels,
dropout=dropout,
add_upsample=add_upsample,
resnet_eps=resnet_eps,
resnet_act_fn=resnet_act_fn,
resnet_groups=resnet_groups,
cross_attention_dim=cross_attention_dim,
num_attention_heads=num_attention_heads,
dual_cross_attention=dual_cross_attention,
use_linear_projection=use_linear_projection,
only_cross_attention=only_cross_attention,
resnet_time_scale_shift=resnet_time_scale_shift,
)
# 如果块类型不支持,抛出异常
raise ValueError(f"{up_block_type} is not supported.")
# 定义一个 Fourier 嵌入器类,继承自 nn.Module
class FourierEmbedder(nn.Module):
# 初始化方法,设置频率和温度
def __init__(self, num_freqs=64, temperature=100):
# 调用父类构造函数
super().__init__()
# 保存频率数
self.num_freqs = num_freqs
# 保存温度
self.temperature = temperature
# 计算频率带
freq_bands = temperature ** (torch.arange(num_freqs) / num_freqs)
# 扩展维度以便后续操作
freq_bands = freq_bands[None, None, None]
# 注册频率带为缓冲区,设为非持久性
self.register_buffer("freq_bands", freq_bands, persistent=False)
# 定义调用方法,用于处理输入
def __call__(self, x):
# 将输入与频率带相乘
x = self.freq_bands * x.unsqueeze(-1)
# 返回处理后的结果,包含正弦和余弦
return torch.stack((x.sin(), x.cos()), dim=-1).permute(0, 1, 3, 4, 2).reshape(*x.shape[:2], -1)
# 定义 GLIGEN 文本边界框投影类,继承自 nn.Module
class GLIGENTextBoundingboxProjection(nn.Module):
# 初始化方法,设置对象的基本参数
def __init__(self, positive_len, out_dim, feature_type, fourier_freqs=8):
# 调用父类的初始化方法
super().__init__()
# 存储正样本的长度
self.positive_len = positive_len
# 存储输出的维度
self.out_dim = out_dim
# 初始化傅里叶嵌入器,设置频率数量
self.fourier_embedder = FourierEmbedder(num_freqs=fourier_freqs)
# 计算位置特征的维度,包含 sin 和 cos
self.position_dim = fourier_freqs * 2 * 4 # 2: sin/cos, 4: xyxy
# 如果输出维度是元组,取第一个元素
if isinstance(out_dim, tuple):
out_dim = out_dim[0]
# 根据特征类型设置线性层
if feature_type == "text-only":
self.linears = nn.Sequential(
# 第一层线性变换,输入为正样本长度加位置维度
nn.Linear(self.positive_len + self.position_dim, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 第二层线性变换
nn.Linear(512, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 输出层
nn.Linear(512, out_dim),
)
# 定义一个全为零的参数,用于文本特征的空值处理
self.null_positive_feature = torch.nn.Parameter(torch.zeros([self.positive_len]))
# 处理文本和图像的特征类型
elif feature_type == "text-image":
self.linears_text = nn.Sequential(
# 第一层线性变换
nn.Linear(self.positive_len + self.position_dim, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 第二层线性变换
nn.Linear(512, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 输出层
nn.Linear(512, out_dim),
)
self.linears_image = nn.Sequential(
# 第一层线性变换
nn.Linear(self.positive_len + self.position_dim, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 第二层线性变换
nn.Linear(512, 512),
# 激活函数使用 SiLU
nn.SiLU(),
# 输出层
nn.Linear(512, out_dim),
)
# 定义文本特征的空值处理参数
self.null_text_feature = torch.nn.Parameter(torch.zeros([self.positive_len]))
# 定义图像特征的空值处理参数
self.null_image_feature = torch.nn.Parameter(torch.zeros([self.positive_len]))
# 定义位置特征的空值处理参数
self.null_position_feature = torch.nn.Parameter(torch.zeros([self.position_dim]))
# 前向传播方法定义
def forward(
self,
boxes,
masks,
positive_embeddings=None,
phrases_masks=None,
image_masks=None,
phrases_embeddings=None,
image_embeddings=None,
):
# 在最后一维增加一个维度,便于后续操作
masks = masks.unsqueeze(-1)
# 通过傅里叶嵌入函数生成 boxes 的嵌入表示
xyxy_embedding = self.fourier_embedder(boxes)
# 获取空白位置的特征,并调整形状为 (1, 1, -1)
xyxy_null = self.null_position_feature.view(1, 1, -1)
# 计算加权嵌入,结合 masks 和空白位置特征
xyxy_embedding = xyxy_embedding * masks + (1 - masks) * xyxy_null
# 如果存在正样本嵌入
if positive_embeddings:
# 获取正样本的空白特征,并调整形状为 (1, 1, -1)
positive_null = self.null_positive_feature.view(1, 1, -1)
# 计算正样本嵌入的加权,结合 masks 和空白特征
positive_embeddings = positive_embeddings * masks + (1 - masks) * positive_null
# 将正样本嵌入与 xyxy 嵌入连接并通过线性层处理
objs = self.linears(torch.cat([positive_embeddings, xyxy_embedding], dim=-1))
else:
# 在最后一维增加一个维度,便于后续操作
phrases_masks = phrases_masks.unsqueeze(-1)
image_masks = image_masks.unsqueeze(-1)
# 获取文本和图像的空白特征,并调整形状为 (1, 1, -1)
text_null = self.null_text_feature.view(1, 1, -1)
image_null = self.null_image_feature.view(1, 1, -1)
# 计算文本嵌入的加权,结合 phrases_masks 和空白特征
phrases_embeddings = phrases_embeddings * phrases_masks + (1 - phrases_masks) * text_null
# 计算图像嵌入的加权,结合 image_masks 和空白特征
image_embeddings = image_embeddings * image_masks + (1 - image_masks) * image_null
# 将文本嵌入与 xyxy 嵌入连接并通过文本线性层处理
objs_text = self.linears_text(torch.cat([phrases_embeddings, xyxy_embedding], dim=-1))
# 将图像嵌入与 xyxy 嵌入连接并通过图像线性层处理
objs_image = self.linears_image(torch.cat([image_embeddings, xyxy_embedding], dim=-1))
# 将文本和图像的处理结果在维度 1 上连接
objs = torch.cat([objs_text, objs_image], dim=1)
# 返回最终的对象结果
return objs
# 定义一个名为 UNetFlatConditionModel 的类,继承自 ModelMixin 和 ConfigMixin
class UNetFlatConditionModel(ModelMixin, ConfigMixin):
r"""
一个条件 2D UNet 模型,它接收一个有噪声的样本、条件状态和时间步,并返回一个样本形状的输出。
该模型继承自 [`ModelMixin`]。请查看父类文档以了解其为所有模型实现的通用方法(例如下载或保存)。
"""
# 设置该模型支持梯度检查点
_supports_gradient_checkpointing = True
# 定义不进行拆分的模块名称列表
_no_split_modules = ["BasicTransformerBlock", "ResnetBlockFlat", "CrossAttnUpBlockFlat"]
# 注册到配置的装饰器
@register_to_config
# 初始化方法,设置类的基本参数
def __init__(
# 样本大小,可选参数
self,
sample_size: Optional[int] = None,
# 输入通道数,默认为4
in_channels: int = 4,
# 输出通道数,默认为4
out_channels: int = 4,
# 是否将输入样本居中,默认为False
center_input_sample: bool = False,
# 是否将正弦函数翻转为余弦函数,默认为True
flip_sin_to_cos: bool = True,
# 频率偏移量,默认为0
freq_shift: int = 0,
# 向下采样块的类型,默认为三个CrossAttnDownBlockFlat和一个DownBlockFlat
down_block_types: Tuple[str] = (
"CrossAttnDownBlockFlat",
"CrossAttnDownBlockFlat",
"CrossAttnDownBlockFlat",
"DownBlockFlat",
),
# 中间块的类型,默认为UNetMidBlockFlatCrossAttn
mid_block_type: Optional[str] = "UNetMidBlockFlatCrossAttn",
# 向上采样块的类型,默认为一个UpBlockFlat和三个CrossAttnUpBlockFlat
up_block_types: Tuple[str] = (
"UpBlockFlat",
"CrossAttnUpBlockFlat",
"CrossAttnUpBlockFlat",
"CrossAttnUpBlockFlat",
),
# 是否仅使用交叉注意力,默认为False
only_cross_attention: Union[bool, Tuple[bool]] = False,
# 块输出通道数,默认为320, 640, 1280, 1280
block_out_channels: Tuple[int] = (320, 640, 1280, 1280),
# 每个块的层数,默认为2
layers_per_block: Union[int, Tuple[int]] = 2,
# 向下采样时的填充大小,默认为1
downsample_padding: int = 1,
# 中间块的缩放因子,默认为1
mid_block_scale_factor: float = 1,
# dropout比例,默认为0.0
dropout: float = 0.0,
# 激活函数类型,默认为silu
act_fn: str = "silu",
# 归一化的组数,可选参数,默认为32
norm_num_groups: Optional[int] = 32,
# 归一化的epsilon值,默认为1e-5
norm_eps: float = 1e-5,
# 交叉注意力的维度,默认为1280
cross_attention_dim: Union[int, Tuple[int]] = 1280,
# 每个块的变换器层数,默认为1
transformer_layers_per_block: Union[int, Tuple[int], Tuple[Tuple]] = 1,
# 反向变换器层数的可选配置
reverse_transformer_layers_per_block: Optional[Tuple[Tuple[int]]] = None,
# 编码器隐藏维度的可选参数
encoder_hid_dim: Optional[int] = None,
# 编码器隐藏维度类型的可选参数
encoder_hid_dim_type: Optional[str] = None,
# 注意力头的维度,默认为8
attention_head_dim: Union[int, Tuple[int]] = 8,
# 注意力头数量的可选参数
num_attention_heads: Optional[Union[int, Tuple[int]]] = None,
# 是否使用双交叉注意力,默认为False
dual_cross_attention: bool = False,
# 是否使用线性投影,默认为False
use_linear_projection: bool = False,
# 类嵌入类型的可选参数
class_embed_type: Optional[str] = None,
# 附加嵌入类型的可选参数
addition_embed_type: Optional[str] = None,
# 附加时间嵌入维度的可选参数
addition_time_embed_dim: Optional[int] = None,
# 类嵌入数量的可选参数
num_class_embeds: Optional[int] = None,
# 是否向上投射注意力,默认为False
upcast_attention: bool = False,
# ResNet时间缩放偏移的默认值
resnet_time_scale_shift: str = "default",
# ResNet跳过时间激活的设置,默认为False
resnet_skip_time_act: bool = False,
# ResNet输出缩放因子,默认为1.0
resnet_out_scale_factor: int = 1.0,
# 时间嵌入类型,默认为positional
time_embedding_type: str = "positional",
# 时间嵌入维度的可选参数
time_embedding_dim: Optional[int] = None,
# 时间嵌入激活函数的可选参数
time_embedding_act_fn: Optional[str] = None,
# 时间步后激活的可选参数
timestep_post_act: Optional[str] = None,
# 时间条件投影维度的可选参数
time_cond_proj_dim: Optional[int] = None,
# 输入卷积核的大小,默认为3
conv_in_kernel: int = 3,
# 输出卷积核的大小,默认为3
conv_out_kernel: int = 3,
# 投影类嵌入输入维度的可选参数
projection_class_embeddings_input_dim: Optional[int] = None,
# 注意力类型,默认为default
attention_type: str = "default",
# 类嵌入是否连接,默认为False
class_embeddings_concat: bool = False,
# 中间块是否仅使用交叉注意力的可选参数
mid_block_only_cross_attention: Optional[bool] = None,
# 交叉注意力的归一化类型的可选参数
cross_attention_norm: Optional[str] = None,
# 附加嵌入类型的头数量,默认为64
addition_embed_type_num_heads=64,
# 声明该方法为属性
@property
# 定义一个返回注意力处理器字典的方法
def attn_processors(self) -> Dict[str, AttentionProcessor]:
r"""
返回值:
`dict` 的注意力处理器: 一个字典,包含模型中使用的所有注意力处理器,以其权重名称为索引。
"""
# 初始化一个空字典以递归存储处理器
processors = {}
# 定义一个递归函数来添加处理器
def fn_recursive_add_processors(name: str, module: torch.nn.Module, processors: Dict[str, AttentionProcessor]):
# 如果模块有获取处理器的方法,则添加到字典中
if hasattr(module, "get_processor"):
processors[f"{name}.processor"] = module.get_processor()
# 遍历模块的子模块,递归调用函数
for sub_name, child in module.named_children():
fn_recursive_add_processors(f"{name}.{sub_name}", child, processors)
# 返回处理器字典
return processors
# 遍历当前模块的子模块,并调用递归函数
for name, module in self.named_children():
fn_recursive_add_processors(name, module, processors)
# 返回所有注意力处理器的字典
return processors
# 定义一个设置注意力处理器的方法
def set_attn_processor(self, processor: Union[AttentionProcessor, Dict[str, AttentionProcessor]]):
r"""
设置用于计算注意力的处理器。
参数:
processor (`dict` of `AttentionProcessor` or only `AttentionProcessor`):
实例化的处理器类或处理器类的字典,将被设置为所有 `Attention` 层的处理器。
如果 `processor` 是字典,则键需要定义对应的交叉注意力处理器的路径。
在设置可训练的注意力处理器时,强烈推荐这种做法。
"""
# 计算当前注意力处理器的数量
count = len(self.attn_processors.keys())
# 如果传入的是字典且数量不匹配,则引发错误
if isinstance(processor, dict) and len(processor) != count:
raise ValueError(
f"传入的是处理器字典,但处理器的数量 {len(processor)} 与注意力层的数量 {count} 不匹配。"
f" 请确保传入 {count} 个处理器类。"
)
# 定义一个递归函数来设置注意力处理器
def fn_recursive_attn_processor(name: str, module: torch.nn.Module, processor):
# 如果模块有设置处理器的方法,则根据传入的处理器设置
if hasattr(module, "set_processor"):
if not isinstance(processor, dict):
module.set_processor(processor)
else:
module.set_processor(processor.pop(f"{name}.processor"))
# 遍历模块的子模块,递归调用函数
for sub_name, child in module.named_children():
fn_recursive_attn_processor(f"{name}.{sub_name}", child, processor)
# 遍历当前模块的子模块,并调用递归函数
for name, module in self.named_children():
fn_recursive_attn_processor(name, module, processor)
# 设置默认的注意力处理器
def set_default_attn_processor(self):
"""
禁用自定义注意力处理器,并设置默认的注意力实现。
"""
# 检查所有注意力处理器是否属于已添加的 KV 注意力处理器
if all(proc.__class__ in ADDED_KV_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
# 使用 AttnAddedKVProcessor 作为处理器
processor = AttnAddedKVProcessor()
# 检查所有注意力处理器是否属于交叉注意力处理器
elif all(proc.__class__ in CROSS_ATTENTION_PROCESSORS for proc in self.attn_processors.values()):
# 使用 AttnProcessor 作为处理器
processor = AttnProcessor()
else:
# 如果处理器类型不匹配,则引发值错误
raise ValueError(
f"当注意力处理器的类型为 {next(iter(self.attn_processors.values()))} 时,无法调用 `set_default_attn_processor`"
)
# 设置选定的注意力处理器
self.set_attn_processor(processor)
# 设置梯度检查点
def _set_gradient_checkpointing(self, module, value=False):
# 如果模块具有 gradient_checkpointing 属性,则设置其值
if hasattr(module, "gradient_checkpointing"):
module.gradient_checkpointing = value
# 启用 FreeU 机制
def enable_freeu(self, s1, s2, b1, b2):
r"""启用来自 https://arxiv.org/abs/2309.11497 的 FreeU 机制。
缩放因子的后缀表示应用的阶段块。
请参考 [官方库](https://github.com/ChenyangSi/FreeU) 以获取已知在不同管道(如 Stable Diffusion v1、v2 和 Stable Diffusion XL)中表现良好的值组合。
参数:
s1 (`float`):
阶段 1 的缩放因子,用于减弱跳过特征的贡献。这是为了减轻增强去噪过程中的“过平滑效应”。
s2 (`float`):
阶段 2 的缩放因子,用于减弱跳过特征的贡献。这是为了减轻增强去噪过程中的“过平滑效应”。
b1 (`float`): 阶段 1 的缩放因子,用于增强主干特征的贡献。
b2 (`float`): 阶段 2 的缩放因子,用于增强主干特征的贡献。
"""
# 遍历上采样块并设置相应的缩放因子
for i, upsample_block in enumerate(self.up_blocks):
setattr(upsample_block, "s1", s1) # 设置阶段 1 的缩放因子
setattr(upsample_block, "s2", s2) # 设置阶段 2 的缩放因子
setattr(upsample_block, "b1", b1) # 设置阶段 1 的主干特征缩放因子
setattr(upsample_block, "b2", b2) # 设置阶段 2 的主干特征缩放因子
# 禁用 FreeU 机制
def disable_freeu(self):
"""禁用 FreeU 机制。"""
freeu_keys = {"s1", "s2", "b1", "b2"} # FreeU 机制的关键字集合
# 遍历上采样块并将关键字的值设置为 None
for i, upsample_block in enumerate(self.up_blocks):
for k in freeu_keys:
# 如果上采样块具有该属性或属性值不为 None,则将其设置为 None
if hasattr(upsample_block, k) or getattr(upsample_block, k, None) is not None:
setattr(upsample_block, k, None)
# 定义一个用于融合 QKV 投影的函数
def fuse_qkv_projections(self):
# 文档字符串,描述该函数的作用及实验性质
"""
Enables fused QKV projections. For self-attention modules, all projection matrices (i.e., query, key, value)
are fused. For cross-attention modules, key and value projection matrices are fused.
<Tip warning={true}>
This API is
标签:None,torch,self,attention,resnet,diffusers,channels,二十九,源码
From: https://www.cnblogs.com/apachecn/p/18492362