diffusers 源码解析(二十四)
.\diffusers\pipelines\controlnet_sd3\pipeline_stable_diffusion_3_controlnet.py
# 版权声明,指出版权所有者及相关信息
# Copyright 2024 Stability AI, The HuggingFace Team and The InstantX Team. All rights reserved.
#
# 按照 Apache 2.0 许可证授权使用此文件
# Licensed under the Apache License, Version 2.0 (the "License");
# 只有在遵守许可证的情况下,才可使用此文件
# you may not use this file except in compliance with the License.
# 可以通过以下网址获取许可证的副本
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非在适用法律下另有规定或以书面形式达成协议,否则按“原样”分发的软件不提供任何明示或暗示的保证
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 查看许可证以获取特定权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
# 导入 inspect 模块,用于检查活跃的对象和模块
import inspect
# 导入类型提示相关的模块
from typing import Any, Callable, Dict, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库中导入多个模型和分词器
from transformers import (
CLIPTextModelWithProjection, # 导入带有投影的 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 分词器
T5EncoderModel, # 导入 T5 编码器模型
T5TokenizerFast, # 导入快速 T5 分词器
)
# 从本地模块中导入图像处理和加载器相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor
from ...loaders import FromSingleFileMixin, SD3LoraLoaderMixin
from ...models.autoencoders import AutoencoderKL # 导入自编码器模型
from ...models.controlnet_sd3 import SD3ControlNetModel, SD3MultiControlNetModel # 导入控制网络模型
from ...models.transformers import SD3Transformer2DModel # 导入 2D 变压器模型
from ...schedulers import FlowMatchEulerDiscreteScheduler # 导入调度器
from ...utils import (
USE_PEFT_BACKEND, # 导入使用 PEFT 后端的标志
is_torch_xla_available, # 导入检查 XLA 是否可用的函数
logging, # 导入日志记录模块
replace_example_docstring, # 导入替换示例文档字符串的函数
scale_lora_layers, # 导入缩放 LoRA 层的函数
unscale_lora_layers, # 导入反缩放 LoRA 层的函数
)
from ...utils.torch_utils import randn_tensor # 导入生成随机张量的函数
from ..pipeline_utils import DiffusionPipeline # 导入扩散管道类
from ..stable_diffusion_3.pipeline_output import StableDiffusion3PipelineOutput # 导入扩散管道输出类
# 检查是否可用 XLA 支持
if is_torch_xla_available():
import torch_xla.core.xla_model as xm # 导入 XLA 模型相关的模块
XLA_AVAILABLE = True # 设置 XLA 可用的标志
else:
XLA_AVAILABLE = False # 设置 XLA 不可用的标志
# 创建日志记录器,用于当前模块
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 示例文档字符串,展示如何使用管道
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> import torch
>>> from diffusers import StableDiffusion3ControlNetPipeline # 导入稳定扩散控制网络管道
>>> from diffusers.models import SD3ControlNetModel, SD3MultiControlNetModel # 导入控制网络模型
>>> from diffusers.utils import load_image # 导入加载图像的工具函数
>>> controlnet = SD3ControlNetModel.from_pretrained("InstantX/SD3-Controlnet-Canny", torch_dtype=torch.float16) # 加载预训练的控制网络模型
>>> pipe = StableDiffusion3ControlNetPipeline.from_pretrained( # 从预训练模型创建管道
... "stabilityai/stable-diffusion-3-medium-diffusers", controlnet=controlnet, torch_dtype=torch.float16
... )
>>> pipe.to("cuda") # 将管道移动到 GPU
>>> control_image = load_image("https://huggingface.co/InstantX/SD3-Controlnet-Canny/resolve/main/canny.jpg") # 加载控制图像
>>> prompt = "A girl holding a sign that says InstantX" # 设置生成图像的提示
>>> image = pipe(prompt, control_image=control_image, controlnet_conditioning_scale=0.7).images[0] # 生成图像
>>> image.save("sd3.png") # 保存生成的图像
```py
"""
# 从稳定扩散管道中复制的函数,用于检索时间步
# Copied from diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.retrieve_timesteps
def retrieve_timesteps( # 定义函数以检索时间步
scheduler, # 调度器对象
num_inference_steps: Optional[int] = None, # 可选的推理步骤数
device: Optional[Union[str, torch.device]] = None, # 可选的设备参数
# 可选参数,用于指定时间步的列表
timesteps: Optional[List[int]] = None,
# 可选参数,用于指定sigma值的列表
sigmas: Optional[List[float]] = None,
# 接受其他任意关键字参数
**kwargs,
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器中检索时间步。处理
自定义时间步。任何额外参数将被传递给 `scheduler.set_timesteps`。
参数:
scheduler (`SchedulerMixin`):
获取时间步的调度器。
num_inference_steps (`int`):
用于生成样本的扩散步骤数。如果使用,则 `timesteps`
必须为 `None`。
device (`str` 或 `torch.device`, *可选*):
时间步应移动到的设备。如果为 `None`,则不移动时间步。
timesteps (`List[int]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义时间步。如果传入 `timesteps`,
`num_inference_steps` 和 `sigmas` 必须为 `None`。
sigmas (`List[float]`, *可选*):
用于覆盖调度器的时间步间隔策略的自定义 sigma。如果传入 `sigmas`,
`num_inference_steps` 和 `timesteps` 必须为 `None`。
返回:
`Tuple[torch.Tensor, int]`: 一个元组,其中第一个元素是来自调度器的时间步计划,
第二个元素是推理步骤的数量。
"""
# 检查是否同时传入了 timesteps 和 sigmas
if timesteps is not None and sigmas is not None:
# 如果同时传入,抛出值错误,提示只能选择一个自定义值
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 检查是否传入了 timesteps
if timesteps is not None:
# 检查调度器的 set_timesteps 方法是否接受 timesteps 参数
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出值错误,提示使用的调度器不支持自定义时间步
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,传入 timesteps 和 device,以及其他关键字参数
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 从调度器获取时间步
timesteps = scheduler.timesteps
# 计算时间步的数量
num_inference_steps = len(timesteps)
# 检查是否传入了 sigmas
elif sigmas is not None:
# 检查调度器的 set_timesteps 方法是否接受 sigmas 参数
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出值错误,提示使用的调度器不支持自定义 sigma
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 调用调度器的 set_timesteps 方法,传入 sigmas 和 device,以及其他关键字参数
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 从调度器获取时间步
timesteps = scheduler.timesteps
# 计算时间步的数量
num_inference_steps = len(timesteps)
# 如果都没有传入,使用 num_inference_steps 作为参数调用 set_timesteps
else:
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 从调度器获取时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤的数量
return timesteps, num_inference_steps
class StableDiffusion3ControlNetPipeline(DiffusionPipeline, SD3LoraLoaderMixin, FromSingleFileMixin):
r"""
# 函数的参数说明部分
Args:
transformer ([`SD3Transformer2DModel`]):
# 用于去噪编码图像潜变量的条件变换器(MMDiT)架构。
Conditional Transformer (MMDiT) architecture to denoise the encoded image latents.
scheduler ([`FlowMatchEulerDiscreteScheduler`]):
# 与 `transformer` 结合使用的调度器,用于去噪编码图像潜变量。
A scheduler to be used in combination with `transformer` to denoise the encoded image latents.
vae ([`AutoencoderKL`]):
# 用于编码和解码图像到潜在表示的变分自编码器(VAE)模型。
Variational Auto-Encoder (VAE) Model to encode and decode images to and from latent representations.
text_encoder ([`CLIPTextModelWithProjection`]):
# 特定于 CLIP 的模型,增加了一个投影层,该层的初始化为与 `hidden_size` 维度相同的对角矩阵。
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the [clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) variant,
with an additional added projection layer that is initialized with a diagonal matrix with the `hidden_size`
as its dimension.
text_encoder_2 ([`CLIPTextModelWithProjection`]):
# 特定于 CLIP 的第二个模型。
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModelWithProjection),
specifically the
[laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)
variant.
text_encoder_3 ([`T5EncoderModel`]):
# 冻结的文本编码器,使用 T5 模型的特定变体。
Frozen text-encoder. Stable Diffusion 3 uses
[T5](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5EncoderModel), specifically the
[t5-v1_1-xxl](https://huggingface.co/google/t5-v1_1-xxl) variant.
tokenizer (`CLIPTokenizer`):
# CLIPTokenizer 类的分词器。
Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_2 (`CLIPTokenizer`):
# 第二个 CLIPTokenizer 类的分词器。
Second Tokenizer of class
[CLIPTokenizer](https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer).
tokenizer_3 (`T5TokenizerFast`):
# T5Tokenizer 类的分词器。
Tokenizer of class
[T5Tokenizer](https://huggingface.co/docs/transformers/model_doc/t5#transformers.T5Tokenizer).
controlnet ([`SD3ControlNetModel`] or `List[SD3ControlNetModel]` or [`SD3MultiControlNetModel`]):
# 在去噪过程中为 `unet` 提供额外的条件。如果将多个 ControlNet 设置为列表,则每个 ControlNet 的输出将相加以创建一个组合的附加条件。
Provides additional conditioning to the `unet` during the denoising process. If you set multiple
ControlNets as a list, the outputs from each ControlNet are added together to create one combined
additional conditioning.
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->text_encoder_3->transformer->vae"
# 定义可选组件的列表,初始化为空
_optional_components = []
# 定义回调张量输入的名称列表
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds", "negative_pooled_prompt_embeds"]
# 初始化类的构造函数,设置各种模型和调度器
def __init__(
# 接收的变换器模型
self,
transformer: SD3Transformer2DModel,
# 调度器,用于控制生成过程
scheduler: FlowMatchEulerDiscreteScheduler,
# 自动编码器模型
vae: AutoencoderKL,
# 文本编码器模型
text_encoder: CLIPTextModelWithProjection,
# 文本标记器
tokenizer: CLIPTokenizer,
# 第二个文本编码器模型
text_encoder_2: CLIPTextModelWithProjection,
# 第二个文本标记器
tokenizer_2: CLIPTokenizer,
# 第三个文本编码器模型
text_encoder_3: T5EncoderModel,
# 第三个文本标记器
tokenizer_3: T5TokenizerFast,
# 控制网络模型,可以是单个或多个
controlnet: Union[
SD3ControlNetModel, List[SD3ControlNetModel], Tuple[SD3ControlNetModel], SD3MultiControlNetModel
],
):
# 调用父类的构造函数
super().__init__()
# 注册各种模块以便后续使用
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
text_encoder_3=text_encoder_3,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
tokenizer_3=tokenizer_3,
transformer=transformer,
scheduler=scheduler,
controlnet=controlnet,
)
# 计算 VAE 的缩放因子,根据配置的块输出通道数
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels) - 1) if hasattr(self, "vae") and self.vae is not None else 8
)
# 创建图像处理器,使用计算的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 获取标记器的最大长度,如果不存在则默认值为 77
self.tokenizer_max_length = (
self.tokenizer.model_max_length if hasattr(self, "tokenizer") and self.tokenizer is not None else 77
)
# 获取变换器的默认样本大小,如果不存在则默认值为 128
self.default_sample_size = (
self.transformer.config.sample_size
if hasattr(self, "transformer") and self.transformer is not None
else 128
)
# 从现有管道复制的函数,用于获取 T5 的提示嵌入
def _get_t5_prompt_embeds(
self,
# 提示文本,可以是字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 每个提示生成的图像数量
num_images_per_prompt: int = 1,
# 最大序列长度
max_sequence_length: int = 256,
# 设备,默认为 None
device: Optional[torch.device] = None,
# 数据类型,默认为 None
dtype: Optional[torch.dtype] = None,
# 设备默认为执行设备,如果未指定则使用默认设备
):
device = device or self._execution_device
# 数据类型默认为文本编码器的数据类型,如果未指定则使用默认数据类型
dtype = dtype or self.text_encoder.dtype
# 如果输入是字符串,则将其转换为列表形式
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取批次大小,即提示的数量
batch_size = len(prompt)
# 如果文本编码器未定义,返回一个零张量作为占位符
if self.text_encoder_3 is None:
return torch.zeros(
(
batch_size * num_images_per_prompt, # 每个提示的图像数量乘以批次大小
self.tokenizer_max_length, # 最大的序列长度
self.transformer.config.joint_attention_dim, # 联合注意力维度
),
device=device, # 指定设备
dtype=dtype, # 指定数据类型
)
# 使用 tokenizer 对提示进行编码,返回张量格式
text_inputs = self.tokenizer_3(
prompt,
padding="max_length", # 填充到最大长度
max_length=max_sequence_length, # 最大序列长度限制
truncation=True, # 启用截断
add_special_tokens=True, # 添加特殊标记
return_tensors="pt", # 返回 PyTorch 张量
)
# 获取编码后的输入 ID
text_input_ids = text_inputs.input_ids
# 获取未截断的输入 ID
untruncated_ids = self.tokenizer_3(prompt, padding="longest", return_tensors="pt").input_ids
# 检查未截断的 ID 是否比截断的 ID 更长,并且两者是否不相等
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的文本并记录警告
removed_text = self.tokenizer_3.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}" # 显示被截断的文本
)
# 获取文本嵌入
prompt_embeds = self.text_encoder_3(text_input_ids.to(device))[0]
# 确保嵌入的数据类型与文本编码器一致
dtype = self.text_encoder_3.dtype
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 获取嵌入的形状信息
_, seq_len, _ = prompt_embeds.shape
# 为每个提示生成重复文本嵌入和注意力掩码,使用适合 mps 的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 调整张量的形状,以适应批次大小和图像数量
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 返回处理后的嵌入
return prompt_embeds
# 从稳定扩散管道复制的函数,用于获取 CLIP 提示嵌入
def _get_clip_prompt_embeds(
self,
prompt: Union[str, List[str]], # 输入提示,支持字符串或字符串列表
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
device: Optional[torch.device] = None, # 设备选择,默认为 None
clip_skip: Optional[int] = None, # 可选的跳过步骤
clip_model_index: int = 0, # CLIP 模型的索引
):
# 如果未指定设备,则使用默认执行设备
device = device or self._execution_device
# 定义 CLIP 模型的两个分词器
clip_tokenizers = [self.tokenizer, self.tokenizer_2]
# 定义 CLIP 模型的两个文本编码器
clip_text_encoders = [self.text_encoder, self.text_encoder_2]
# 根据索引选择对应的分词器
tokenizer = clip_tokenizers[clip_model_index]
# 根据索引选择对应的文本编码器
text_encoder = clip_text_encoders[clip_model_index]
# 将提示转换为列表,如果是字符串则单独处理
prompt = [prompt] if isinstance(prompt, str) else prompt
# 计算提示的批处理大小
batch_size = len(prompt)
# 对提示进行编码,返回的张量将填充到最大长度
text_inputs = tokenizer(
prompt,
padding="max_length",
max_length=self.tokenizer_max_length,
truncation=True,
return_tensors="pt",
)
# 提取编码后的输入 ID
text_input_ids = text_inputs.input_ids
# 不截断的输入 ID,使用最长填充方式
untruncated_ids = tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 检查是否有输入被截断
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码并记录被截断的文本
removed_text = tokenizer.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {self.tokenizer_max_length} tokens: {removed_text}"
)
# 使用文本编码器生成提示的嵌入,输出隐藏状态
prompt_embeds = text_encoder(text_input_ids.to(device), output_hidden_states=True)
# 获取池化后的提示嵌入
pooled_prompt_embeds = prompt_embeds[0]
# 根据 clip_skip 的值选择不同的隐藏状态
if clip_skip is None:
prompt_embeds = prompt_embeds.hidden_states[-2]
else:
prompt_embeds = prompt_embeds.hidden_states[-(clip_skip + 2)]
# 将提示嵌入转换为指定的数据类型并放置到正确的设备上
prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)
# 获取提示嵌入的形状信息
_, seq_len, _ = prompt_embeds.shape
# 为每个提示的生成重复文本嵌入,使用适合 MPS 的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 重塑为 (批处理大小 * 每个提示生成的图像数, 序列长度, 嵌入维度)
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 为池化的提示嵌入重复,调整形状
pooled_prompt_embeds = pooled_prompt_embeds.repeat(1, num_images_per_prompt, 1)
pooled_prompt_embeds = pooled_prompt_embeds.view(batch_size * num_images_per_prompt, -1)
# 返回生成的提示嵌入和池化的提示嵌入
return prompt_embeds, pooled_prompt_embeds
# 从 diffusers.pipelines.stable_diffusion_3.pipeline_stable_diffusion_3.StableDiffusion3Pipeline.encode_prompt 复制的代码
# 定义编码提示的函数,接受多个提示和相关参数
def encode_prompt(
self, # self 参数,表示类的实例
prompt: Union[str, List[str]], # 第一个提示,可以是字符串或字符串列表
prompt_2: Union[str, List[str]], # 第二个提示,可以是字符串或字符串列表
prompt_3: Union[str, List[str]], # 第三个提示,可以是字符串或字符串列表
device: Optional[torch.device] = None, # 可选参数,指定设备(如 CPU 或 GPU)
num_images_per_prompt: int = 1, # 每个提示生成的图像数量,默认为 1
do_classifier_free_guidance: bool = True, # 是否执行无分类器引导,默认为 True
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选负提示,可以是字符串或字符串列表
negative_prompt_2: Optional[Union[str, List[str]]] = None, # 第二个可选负提示
negative_prompt_3: Optional[Union[str, List[str]]] = None, # 第三个可选负提示
prompt_embeds: Optional[torch.FloatTensor] = None, # 可选提示嵌入,类型为浮点张量
negative_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选负提示嵌入
pooled_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选池化的提示嵌入
negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None, # 可选池化的负提示嵌入
clip_skip: Optional[int] = None, # 可选参数,指定剪切的层数
max_sequence_length: int = 256, # 最大序列长度,默认为 256
lora_scale: Optional[float] = None, # 可选参数,指定 LoRA 的缩放因子
# 定义检查输入的函数,验证输入参数的有效性
def check_inputs(
self, # self 参数,表示类的实例
prompt, # 第一个提示
prompt_2, # 第二个提示
prompt_3, # 第三个提示
height, # 高度参数
width, # 宽度参数
negative_prompt=None, # 可选负提示
negative_prompt_2=None, # 第二个可选负提示
negative_prompt_3=None, # 第三个可选负提示
prompt_embeds=None, # 可选提示嵌入
negative_prompt_embeds=None, # 可选负提示嵌入
pooled_prompt_embeds=None, # 可选池化的提示嵌入
negative_pooled_prompt_embeds=None, # 可选池化的负提示嵌入
callback_on_step_end_tensor_inputs=None, # 可选参数,用于回调
max_sequence_length=None, # 可选最大序列长度
# 定义准备潜在空间的函数,用于生成潜在表示
def prepare_latents(
self, # self 参数,表示类的实例
batch_size, # 批次大小
num_channels_latents, # 潜在张量的通道数
height, # 高度
width, # 宽度
dtype, # 数据类型
device, # 设备类型
generator, # 随机数生成器
latents=None, # 可选潜在张量
):
# 如果提供了潜在张量,则将其转移到指定设备和数据类型
if latents is not None:
return latents.to(device=device, dtype=dtype)
# 定义潜在张量的形状,包括批次大小和通道数
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor, # 根据 VAE 缩放因子调整高度
int(width) // self.vae_scale_factor, # 根据 VAE 缩放因子调整宽度
)
# 检查生成器的类型,确保其与批次大小一致
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 生成随机潜在张量
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 返回生成的潜在张量
return latents
# 定义准备图像的函数,接受图像及相关参数
def prepare_image(
self, # self 参数,表示类的实例
image, # 输入的图像
width, # 图像的宽度
height, # 图像的高度
batch_size, # 批次大小
num_images_per_prompt, # 每个提示生成的图像数量
device, # 设备类型
dtype, # 数据类型
do_classifier_free_guidance=False, # 是否执行无分类器引导,默认为 False
guess_mode=False, # 是否启用猜测模式,默认为 False
):
# 检查输入的图像是否为 PyTorch 张量
if isinstance(image, torch.Tensor):
# 如果是张量,则不进行处理,直接跳过
pass
else:
# 如果不是张量,则使用图像处理器进行预处理,调整高度和宽度
image = self.image_processor.preprocess(image, height=height, width=width)
# 获取图像批次的大小
image_batch_size = image.shape[0]
# 如果图像批次大小为 1,则根据批次大小重复图像
if image_batch_size == 1:
repeat_by = batch_size
else:
# 否则,图像批次大小与提示批次大小相同
repeat_by = num_images_per_prompt
# 根据 repeat_by 在第 0 维度重复图像
image = image.repeat_interleave(repeat_by, dim=0)
# 将图像移动到指定设备并转换数据类型
image = image.to(device=device, dtype=dtype)
# 如果启用分类器自由引导且不处于猜测模式,则对图像进行复制
if do_classifier_free_guidance and not guess_mode:
image = torch.cat([image] * 2)
# 返回处理后的图像
return image
@property
# 获取指导缩放比例
def guidance_scale(self):
return self._guidance_scale
@property
# 获取剪切跳过的参数
def clip_skip(self):
return self._clip_skip
# `guidance_scale` 定义类似于 Imagen 论文中方程 (2) 的指导权重 `w`
# `guidance_scale = 1` 表示不进行分类器自由引导。
@property
# 检查是否启用分类器自由引导
def do_classifier_free_guidance(self):
return self._guidance_scale > 1
@property
# 获取联合注意力的参数
def joint_attention_kwargs(self):
return self._joint_attention_kwargs
@property
# 获取时间步数的参数
def num_timesteps(self):
return self._num_timesteps
@property
# 获取中断标志
def interrupt(self):
return self._interrupt
# 禁用梯度计算的装饰器
@torch.no_grad()
# 用于替换示例文档字符串的装饰器
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义可调用对象的 __call__ 方法
def __call__(
# 提示文本,支持字符串或字符串列表
self,
prompt: Union[str, List[str]] = None,
# 第二个提示文本,支持字符串或字符串列表,默认为 None
prompt_2: Optional[Union[str, List[str]]] = None,
# 第三个提示文本,支持字符串或字符串列表,默认为 None
prompt_3: Optional[Union[str, List[str]]] = None,
# 输出图像的高度,默认为 None
height: Optional[int] = None,
# 输出图像的宽度,默认为 None
width: Optional[int] = None,
# 推理步骤的数量,默认为 28
num_inference_steps: int = 28,
# 自定义时间步的列表,默认为 None
timesteps: List[int] = None,
# 指导尺度,默认为 7.0
guidance_scale: float = 7.0,
# 控制引导开始的尺度,支持浮点数或浮点数列表,默认为 0.0
control_guidance_start: Union[float, List[float]] = 0.0,
# 控制引导结束的尺度,支持浮点数或浮点数列表,默认为 1.0
control_guidance_end: Union[float, List[float]] = 1.0,
# 控制图像的输入,默认为 None
control_image: PipelineImageInput = None,
# 控制网络条件缩放,支持浮点数或浮点数列表,默认为 1.0
controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
# 控制网络池化投影,默认为 None
controlnet_pooled_projections: Optional[torch.FloatTensor] = None,
# 负提示文本,支持字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 第二个负提示文本,支持字符串或字符串列表,默认为 None
negative_prompt_2: Optional[Union[str, List[str]]] = None,
# 第三个负提示文本,支持字符串或字符串列表,默认为 None
negative_prompt_3: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 随机数生成器,支持生成器或生成器列表,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在向量,默认为 None
latents: Optional[torch.FloatTensor] = None,
# 提示嵌入,默认为 None
prompt_embeds: Optional[torch.FloatTensor] = None,
# 负提示嵌入,默认为 None
negative_prompt_embeds: Optional[torch.FloatTensor] = None,
# 池化提示嵌入,默认为 None
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 负池化提示嵌入,默认为 None
negative_pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式的输出,默认为 True
return_dict: bool = True,
# 联合注意力的参数,默认为 None
joint_attention_kwargs: Optional[Dict[str, Any]] = None,
# 跳过的剪辑层数,默认为 None
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数,默认为 None
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 步骤结束时的张量输入回调,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 最大序列长度,默认为 256
max_sequence_length: int = 256,
.\diffusers\pipelines\controlnet_sd3\__init__.py
# 导入类型检查相关的模块
from typing import TYPE_CHECKING
# 从工具模块导入所需的依赖和功能
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入慢导入的标志
OptionalDependencyNotAvailable, # 导入可选依赖不可用的异常
_LazyModule, # 导入延迟加载模块的类
get_objects_from_module, # 导入从模块获取对象的函数
is_flax_available, # 导入检查 Flax 可用性的函数
is_torch_available, # 导入检查 PyTorch 可用性的函数
is_transformers_available, # 导入检查 Transformers 可用性的函数
)
# 初始化一个空字典用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典用于存储导入结构
_import_structure = {}
# 尝试检查所需依赖
try:
# 如果 Transformers 和 PyTorch 不可用,则抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典以包含占位符对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果依赖可用,更新导入结构以包含 Stable Diffusion 管道
_import_structure["pipeline_stable_diffusion_3_controlnet"] = ["StableDiffusion3ControlNetPipeline"]
# 检查是否进行类型检查或慢导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 再次检查 Transformers 和 PyTorch 的可用性
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403
else:
# 如果依赖可用,从指定模块导入管道
from .pipeline_stable_diffusion_3_controlnet import StableDiffusion3ControlNetPipeline
try:
# 检查 Transformers 和 Flax 的可用性
if not (is_transformers_available() and is_flax_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
from ...utils.dummy_flax_and_transformers_objects import * # noqa F403
# 如果不是类型检查或慢导入
else:
import sys
# 使用延迟加载模块更新当前模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
# 将虚拟对象添加到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\controlnet_xs\pipeline_controlnet_xs.py
# 版权声明,表示该代码的版权所有者及其保留权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证 2.0 版本许可该文件,只有在遵守许可证的情况下才能使用
# Licensed under the Apache License, Version 2.0 (the "License");
# 许可证可在以下网址获取
# you may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件按“原样”分发,不提供任何明示或暗示的担保
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 参见许可证中规定的特定语言的权限和限制
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect # 导入 inspect 模块,用于获取活跃对象的信息
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型提示,提供可选类型注解
import numpy as np # 导入 NumPy 库,用于数值计算
import PIL.Image # 导入 PIL 库,用于图像处理
import torch # 导入 PyTorch 库,进行深度学习相关操作
import torch.nn.functional as F # 导入 PyTorch 的功能性神经网络模块
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer # 导入 Hugging Face 的 CLIP 相关类
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 导入回调相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor # 导入图像处理相关的类
from ...loaders import FromSingleFileMixin, StableDiffusionLoraLoaderMixin, TextualInversionLoaderMixin # 导入文件加载相关的类
from ...models import AutoencoderKL, ControlNetXSAdapter, UNet2DConditionModel, UNetControlNetXSModel # 导入模型相关的类
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 Lora 文本编码器比例的函数
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
from ...utils import ( # 导入多个工具函数和变量
USE_PEFT_BACKEND, # 指示是否使用 PEFT 后端的标志
deprecate, # 用于标记过时功能的装饰器
logging, # 导入日志记录模块
replace_example_docstring, # 替换示例文档字符串的函数
scale_lora_layers, # 缩放 Lora 层的函数
unscale_lora_layers, # 反缩放 Lora 层的函数
)
from ...utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor # 导入与 PyTorch 相关的工具函数
from ..pipeline_utils import DiffusionPipeline, StableDiffusionMixin # 导入扩散管道相关的类
from ..stable_diffusion.pipeline_output import StableDiffusionPipelineOutput # 导入稳定扩散管道输出的类
from ..stable_diffusion.safety_checker import StableDiffusionSafetyChecker # 导入稳定扩散安全检查器的类
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,禁用 Pylint 对命名的警告
# 示例用法
Examples:
```py
>>> # !pip install opencv-python transformers accelerate
# 导入所需的库和模块
>>> from diffusers import StableDiffusionControlNetXSPipeline, ControlNetXSAdapter
>>> from diffusers.utils import load_image
>>> import numpy as np
>>> import torch
>>> import cv2
>>> from PIL import Image
# 设置生成图像的提示词
>>> prompt = "aerial view, a futuristic research complex in a bright foggy jungle, hard lighting"
# 设置反向提示词,限制不想要的特征
>>> negative_prompt = "low quality, bad quality, sketches"
# 下载一张图像
>>> image = load_image(
... "https://hf.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/hf-logo.png"
... )
# 初始化模型和管道
>>> controlnet_conditioning_scale = 0.5
# 从预训练模型加载 ControlNetXS 适配器
>>> controlnet = ControlNetXSAdapter.from_pretrained(
... "UmerHA/Testing-ConrolNetXS-SD2.1-canny", torch_dtype=torch.float16
... )
# 从预训练模型加载稳定扩散管道
>>> pipe = StableDiffusionControlNetXSPipeline.from_pretrained(
... "stabilityai/stable-diffusion-2-1-base", controlnet=controlnet, torch_dtype=torch.float16
... )
# 启用模型 CPU 卸载,以节省内存
>>> pipe.enable_model_cpu_offload()
# 获取 Canny 边缘图像
>>> image = np.array(image) # 将图像转换为 NumPy 数组
>>> image = cv2.Canny(image, 100, 200) # 应用 Canny 边缘检测
>>> image = image[:, :, None] # 添加一个维度以适应图像格式
>>> image = np.concatenate([image, image, image], axis=2) # 将单通道图像转换为三通道
>>> canny_image = Image.fromarray(image) # 将 NumPy 数组转换为 PIL 图像
# 生成图像
>>> image = pipe(
... prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=canny_image
... ).images[0] # 通过管道生成图像,并提取第一张生成的图像
# 定义一个名为 StableDiffusionControlNetXSPipeline 的类,继承自多个基类
class StableDiffusionControlNetXSPipeline(
# 继承 DiffusionPipeline 类,提供扩散管道的基本功能
DiffusionPipeline,
# 继承 StableDiffusionMixin 类,包含与稳定扩散相关的混合功能
StableDiffusionMixin,
# 继承 TextualInversionLoaderMixin 类,用于加载文本反转嵌入
TextualInversionLoaderMixin,
# 继承 StableDiffusionLoraLoaderMixin 类,负责加载 LoRA 权重
StableDiffusionLoraLoaderMixin,
# 继承 FromSingleFileMixin 类,用于从单个文件加载模型
FromSingleFileMixin,
):
r"""
用于文本到图像生成的管道,使用 Stable Diffusion 和 ControlNet-XS 指导。
该模型从 [`DiffusionPipeline`] 继承。请查看父类文档以获取所有管道实现的通用方法(下载、保存、在特定设备上运行等)。
该管道还继承以下加载方法:
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] 用于加载文本反转嵌入
- [`~loaders.StableDiffusionLoraLoaderMixin.load_lora_weights`] 用于加载 LoRA 权重
- [`~loaders.StableDiffusionLoraLoaderMixin.save_lora_weights`] 用于保存 LoRA 权重
- [`loaders.FromSingleFileMixin.from_single_file`] 用于加载 `.ckpt` 文件
参数:
vae ([`AutoencoderKL`]):
用于将图像编码和解码为潜在表示的变分自编码器 (VAE) 模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器 ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于对文本进行分词的 `CLIPTokenizer`。
unet ([`UNet2DConditionModel`]):
用于创建 UNetControlNetXSModel 的 [`UNet2DConditionModel`],用于去噪编码后的图像潜变量。
controlnet ([`ControlNetXSAdapter`]):
[`ControlNetXSAdapter`] 与 `unet` 结合使用以去噪编码的图像潜变量。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用以去噪编码的图像潜变量的调度器。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 之一。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,用于估计生成的图像是否可能被视为冒犯或有害。
请参阅 [模型卡](https://huggingface.co/runwayml/stable-diffusion-v1-5) 了解模型潜在危害的更多细节。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
用于提取生成图像特征的 `CLIPImageProcessor`;作为输入提供给 `safety_checker`。
"""
# 定义模型 CPU 离线处理顺序
model_cpu_offload_seq = "text_encoder->unet->vae"
# 定义可选组件列表
_optional_components = ["safety_checker", "feature_extractor"]
# 定义不参与 CPU 离线处理的组件
_exclude_from_cpu_offload = ["safety_checker"]
# 定义需要回调的张量输入
_callback_tensor_inputs = ["latents", "prompt_embeds", "negative_prompt_embeds"]
# 初始化方法,设置模型的各项参数
def __init__(
self,
vae: AutoencoderKL, # 变分自编码器模型
text_encoder: CLIPTextModel, # 文本编码器模型
tokenizer: CLIPTokenizer, # 用于文本处理的分词器
unet: Union[UNet2DConditionModel, UNetControlNetXSModel], # U-Net 模型,用于生成图像
controlnet: ControlNetXSAdapter, # 控制网络适配器
scheduler: KarrasDiffusionSchedulers, # 扩散调度器
safety_checker: StableDiffusionSafetyChecker, # 安全检查器
feature_extractor: CLIPImageProcessor, # 特征提取器
requires_safety_checker: bool = True, # 是否需要安全检查器
):
# 调用父类的初始化方法
super().__init__()
# 如果传入的 UNet 是 UNet2DConditionModel,则转换为 UNetControlNetXSModel
if isinstance(unet, UNet2DConditionModel):
unet = UNetControlNetXSModel.from_unet(unet, controlnet)
# 如果安全检查器为 None 且要求安全检查,则发出警告
if safety_checker is None and requires_safety_checker:
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the Stable Diffusion license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 如果安全检查器不为 None,但特征提取器为 None,则抛出错误
if safety_checker is not None and feature_extractor is None:
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块,设置其属性
self.register_modules(
vae=vae,
text_encoder=text_encoder,
tokenizer=tokenizer,
unet=unet,
controlnet=controlnet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,用于处理 VAE 输出
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
# 创建控制图像处理器,用于处理控制网络的图像
self.control_image_processor = VaeImageProcessor(
vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
)
# 注册配置参数,是否需要安全检查器
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 从 StableDiffusionPipeline 复制的编码提示的方法
def _encode_prompt(
self,
prompt, # 输入的提示文本
device, # 设备类型,例如 CPU 或 GPU
num_images_per_prompt, # 每个提示生成的图像数量
do_classifier_free_guidance, # 是否执行无分类器引导
negative_prompt=None, # 负面提示文本
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入
lora_scale: Optional[float] = None, # 可选的 LoRA 缩放因子
**kwargs, # 其他可选参数
):
# 定义弃用警告信息,提示用户该方法已被弃用
deprecation_message = "`_encode_prompt()` is deprecated and it will be removed in a future version. Use `encode_prompt()` instead. Also, be aware that the output format changed from a concatenated tensor to a tuple."
# 调用 deprecate 函数记录弃用信息,标记版本号为 "1.0.0"
deprecate("_encode_prompt()", "1.0.0", deprecation_message, standard_warn=False)
# 调用 encode_prompt 方法,获取提示嵌入元组
prompt_embeds_tuple = self.encode_prompt(
# 传入提示文本
prompt=prompt,
# 设备参数
device=device,
# 每个提示生成的图像数量
num_images_per_prompt=num_images_per_prompt,
# 是否进行无分类引导
do_classifier_free_guidance=do_classifier_free_guidance,
# 负提示文本
negative_prompt=negative_prompt,
# 提示嵌入
prompt_embeds=prompt_embeds,
# 负提示嵌入
negative_prompt_embeds=negative_prompt_embeds,
# LORA 缩放因子
lora_scale=lora_scale,
# 额外参数
**kwargs,
)
# 将提示嵌入元组中的两个部分连接,兼容以前的实现
prompt_embeds = torch.cat([prompt_embeds_tuple[1], prompt_embeds_tuple[0]])
# 返回合并后的提示嵌入
return prompt_embeds
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的 encode_prompt 方法
def encode_prompt(
# 提示文本
self,
prompt,
# 设备参数
device,
# 每个提示生成的图像数量
num_images_per_prompt,
# 是否进行无分类引导
do_classifier_free_guidance,
# 可选的负提示文本
negative_prompt=None,
# 可选的提示嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LORA 缩放因子
lora_scale: Optional[float] = None,
# 可选的剪切跳过参数
clip_skip: Optional[int] = None,
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的 run_safety_checker 方法
def run_safety_checker(self, image, device, dtype):
# 如果安全检查器未定义,设置 nsfw 概念为 None
if self.safety_checker is None:
has_nsfw_concept = None
else:
# 如果输入图像为张量,则后处理为 PIL 格式
if torch.is_tensor(image):
feature_extractor_input = self.image_processor.postprocess(image, output_type="pil")
else:
# 如果输入图像为 numpy 格式,则转换为 PIL 格式
feature_extractor_input = self.image_processor.numpy_to_pil(image)
# 使用特征提取器生成安全检查器输入
safety_checker_input = self.feature_extractor(feature_extractor_input, return_tensors="pt").to(device)
# 调用安全检查器进行图像检查,并获取 nsfw 概念的状态
image, has_nsfw_concept = self.safety_checker(
images=image, clip_input=safety_checker_input.pixel_values.to(dtype)
)
# 返回处理后的图像和 nsfw 概念的状态
return image, has_nsfw_concept
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 中复制的 decode_latents 方法
# 解码潜在变量的函数
def decode_latents(self, latents):
# 生成弃用警告信息,提示用户该方法将被移除
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用函数,传递方法名、版本和警告信息
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据配置的缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,返回的第一个元素为图像
image = self.vae.decode(latents, return_dict=False)[0]
# 对图像进行归一化处理,并限制值在 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,适配 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回解码后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制的函数
def prepare_extra_step_kwargs(self, generator, eta):
# 准备额外的参数,以便调度器的步骤可以接受不同的签名
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
# eta 对应于 DDIM 论文中的 η,应在 [0, 1] 之间
# 检查调度器的步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外参数字典
extra_step_kwargs = {}
# 如果接受 eta,添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回额外参数字典
return extra_step_kwargs
# 检查输入参数的函数
def check_inputs(
self,
prompt,
image,
negative_prompt=None,
prompt_embeds=None,
negative_prompt_embeds=None,
controlnet_conditioning_scale=1.0,
control_guidance_start=0.0,
control_guidance_end=1.0,
callback_on_step_end_tensor_inputs=None,
# 检查输入图像和提示的类型和大小
def check_image(self, image, prompt, prompt_embeds):
# 检查图像是否为 PIL 图像类型
image_is_pil = isinstance(image, PIL.Image.Image)
# 检查图像是否为 PyTorch 张量类型
image_is_tensor = isinstance(image, torch.Tensor)
# 检查图像是否为 NumPy 数组类型
image_is_np = isinstance(image, np.ndarray)
# 检查图像是否为 PIL 图像列表
image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
# 检查图像是否为 PyTorch 张量列表
image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
# 检查图像是否为 NumPy 数组列表
image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)
# 如果图像不是上述任一类型,则引发类型错误
if (
not image_is_pil
and not image_is_tensor
and not image_is_np
and not image_is_pil_list
and not image_is_tensor_list
and not image_is_np_list
):
raise TypeError(
f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
)
# 如果图像是 PIL 图像,设置图像批次大小为 1
if image_is_pil:
image_batch_size = 1
else:
# 否则,批次大小为图像的长度
image_batch_size = len(image)
# 如果提示不为空且为字符串,设置提示批次大小为 1
if prompt is not None and isinstance(prompt, str):
prompt_batch_size = 1
# 如果提示为列表,设置提示批次大小为列表的长度
elif prompt is not None and isinstance(prompt, list):
prompt_batch_size = len(prompt)
# 如果提示嵌入不为空,设置提示批次大小为嵌入的第一个维度大小
elif prompt_embeds is not None:
prompt_batch_size = prompt_embeds.shape[0]
# 如果图像批次大小不为 1 且与提示批次大小不相同,则引发值错误
if image_batch_size != 1 and image_batch_size != prompt_batch_size:
raise ValueError(
f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
)
# 准备图像以进行进一步处理
def prepare_image(
self,
image,
width,
height,
batch_size,
num_images_per_prompt,
device,
dtype,
do_classifier_free_guidance=False,
):
# 使用控制图像处理器预处理图像,并转换为 float32 类型
image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)
# 获取图像批次大小
image_batch_size = image.shape[0]
# 如果图像批次大小为 1,则按批次大小重复图像
if image_batch_size == 1:
repeat_by = batch_size
else:
# 否则,按每个提示的图像数量重复
repeat_by = num_images_per_prompt
# 在第 0 维重复图像
image = image.repeat_interleave(repeat_by, dim=0)
# 将图像移动到指定设备和数据类型
image = image.to(device=device, dtype=dtype)
# 如果启用无分类器自由引导,将图像重复两次
if do_classifier_free_guidance:
image = torch.cat([image] * 2)
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的代码
# 准备潜在变量,设置形状和初始化参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 计算潜在变量的形状,考虑批量大小、通道数和缩放因子
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出值错误,提示生成器长度与批量大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在变量,则生成新的潜在变量
if latents is None:
# 使用随机张量生成潜在变量,指定形状、生成器、设备和数据类型
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,则将其转移到指定设备
latents = latents.to(device)
# 将初始噪声按调度器要求的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 属性装饰器,获取引导比例
@property
# 从稳定扩散管道复制的引导比例属性
def guidance_scale(self):
# 返回当前的引导比例值
return self._guidance_scale
# 属性装饰器,获取剪切跳过设置
@property
# 从稳定扩散管道复制的剪切跳过属性
def clip_skip(self):
# 返回剪切跳过的配置值
return self._clip_skip
# 属性装饰器,判断是否执行无分类器自由引导
@property
# 从稳定扩散管道复制的无分类器自由引导属性
def do_classifier_free_guidance(self):
# 判断条件:引导比例大于1且时间条件投影维度为空
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
# 属性装饰器,获取交叉注意力参数
@property
# 从稳定扩散管道复制的交叉注意力参数属性
def cross_attention_kwargs(self):
# 返回交叉注意力的参数设置
return self._cross_attention_kwargs
# 属性装饰器,获取时间步数
@property
# 从稳定扩散管道复制的时间步数属性
def num_timesteps(self):
# 返回当前的时间步数
return self._num_timesteps
# 无梯度上下文装饰器,禁止梯度计算
@torch.no_grad()
# 替换示例文档字符串的装饰器
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义调用方法,允许对象被调用并执行某些操作
def __call__(
# 提示文本,可以是单个字符串或字符串列表,默认为 None
self,
prompt: Union[str, List[str]] = None,
# 输入图像,类型为 PipelineImageInput,默认为 None
image: PipelineImageInput = None,
# 输出图像高度,类型为可选整型,默认为 None
height: Optional[int] = None,
# 输出图像宽度,类型为可选整型,默认为 None
width: Optional[int] = None,
# 推理步骤数量,默认为 50
num_inference_steps: int = 50,
# 指导比例,默认为 7.5
guidance_scale: float = 7.5,
# 负提示文本,可以是单个字符串或字符串列表,默认为 None
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 噪声参数,默认为 0.0
eta: float = 0.0,
# 随机数生成器,可以是单个或多个 torch.Generator,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,可以是 torch.Tensor 类型,默认为 None
latents: Optional[torch.Tensor] = None,
# 提示嵌入,可以是 torch.Tensor 类型,默认为 None
prompt_embeds: Optional[torch.Tensor] = None,
# 负提示嵌入,可以是 torch.Tensor 类型,默认为 None
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典形式的结果,默认为 True
return_dict: bool = True,
# 交叉注意力参数的可选字典,默认为 None
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
# ControlNet 的条件比例,默认为 1.0
controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
# ControlNet 指导开始比例,默认为 0.0
control_guidance_start: float = 0.0,
# ControlNet 指导结束比例,默认为 1.0
control_guidance_end: float = 1.0,
# 可选的跳过剪辑参数,默认为 None
clip_skip: Optional[int] = None,
# 步骤结束时的回调函数,默认为 None
callback_on_step_end: Optional[
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
# 回调函数结束时张量输入的列表,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
.\diffusers\pipelines\controlnet_xs\pipeline_controlnet_xs_sd_xl.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下网址获得许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,否则根据许可证分发的软件
# 是按“原样”基础提供的,不附带任何明示或暗示的担保或条件。
# 请参见许可证以获取有关权限和限制的特定语言。
import inspect # 导入 inspect 模块,用于获取对象的信息
from typing import Any, Callable, Dict, List, Optional, Tuple, Union # 从 typing 模块导入类型注解
import numpy as np # 导入 numpy 库,通常用于数值计算
import PIL.Image # 导入 PIL 的 Image 模块,用于图像处理
import torch # 导入 PyTorch 库,用于深度学习
import torch.nn.functional as F # 导入 PyTorch 的功能性神经网络模块,提供各种函数
from transformers import ( # 从 transformers 库导入所需的类和函数
CLIPImageProcessor, # 导入 CLIP 图像处理器
CLIPTextModel, # 导入 CLIP 文本模型
CLIPTextModelWithProjection, # 导入带投影的 CLIP 文本模型
CLIPTokenizer, # 导入 CLIP 分词器
)
from diffusers.utils.import_utils import is_invisible_watermark_available # 导入检查水印可用性的工具
from ...callbacks import MultiPipelineCallbacks, PipelineCallback # 导入回调相关的类
from ...image_processor import PipelineImageInput, VaeImageProcessor # 导入图像处理相关的类
from ...loaders import FromSingleFileMixin, StableDiffusionXLLoraLoaderMixin, TextualInversionLoaderMixin # 导入加载器相关的混合类
from ...models import AutoencoderKL, ControlNetXSAdapter, UNet2DConditionModel, UNetControlNetXSModel # 导入模型类
from ...models.attention_processor import ( # 从注意力处理器模块导入类
AttnProcessor2_0, # 导入版本 2.0 的注意力处理器
XFormersAttnProcessor, # 导入 XFormers 注意力处理器
)
from ...models.lora import adjust_lora_scale_text_encoder # 导入调整 LoRA 规模的文本编码器函数
from ...schedulers import KarrasDiffusionSchedulers # 导入 Karras 扩散调度器
from ...utils import ( # 从 utils 模块导入工具函数和常量
USE_PEFT_BACKEND, # 导入用于 PEFT 后端的常量
logging, # 导入日志记录模块
replace_example_docstring, # 导入替换示例文档字符串的函数
scale_lora_layers, # 导入缩放 LoRA 层的函数
unscale_lora_layers, # 导入取消缩放 LoRA 层的函数
)
from ...utils.torch_utils import is_compiled_module, is_torch_version, randn_tensor # 导入与 PyTorch 相关的工具函数
from ..pipeline_utils import DiffusionPipeline # 从管道工具导入 DiffusionPipeline 类
from ..stable_diffusion_xl.pipeline_output import StableDiffusionXLPipelineOutput # 从稳定扩散 XL 模块导入管道输出类
if is_invisible_watermark_available(): # 如果隐形水印可用
from ..stable_diffusion_xl.watermark import StableDiffusionXLWatermarker # 导入 StableDiffusionXLWatermarker 类
logger = logging.get_logger(__name__) # 获取当前模块的日志记录器,命名为模块名
EXAMPLE_DOC_STRING = """ # 示例文档字符串,通常用于说明函数或类的用法
``` # 文档字符串结束标志
# 示例代码
Examples:
```py
>>> # !pip install opencv-python transformers accelerate # 安装所需的库
>>> from diffusers import StableDiffusionXLControlNetXSPipeline, ControlNetXSAdapter, AutoencoderKL # 导入所需的类
>>> from diffusers.utils import load_image # 导入加载图像的工具
>>> import numpy as np # 导入 NumPy 库以进行数组操作
>>> import torch # 导入 PyTorch 库以进行深度学习
>>> import cv2 # 导入 OpenCV 库以进行计算机视觉处理
>>> from PIL import Image # 导入 PIL 库以处理图像
>>> prompt = "aerial view, a futuristic research complex in a bright foggy jungle, hard lighting" # 设置生成图像的提示语
>>> negative_prompt = "low quality, bad quality, sketches" # 设置生成图像的负面提示
>>> # 下载一张图像
>>> image = load_image( # 使用工具加载指定 URL 的图像
... "https://hf.co/datasets/hf-internal-testing/diffusers-images/resolve/main/sd_controlnet/hf-logo.png"
... )
>>> # 初始化模型和管道
>>> controlnet_conditioning_scale = 0.5 # 设置 ControlNet 的条件缩放因子
>>> vae = AutoencoderKL.from_pretrained("madebyollin/sdxl-vae-fp16-fix", torch_dtype=torch.float16) # 加载预训练的自编码器模型
>>> controlnet = ControlNetXSAdapter.from_pretrained( # 加载预训练的 ControlNet 模型
... "UmerHA/Testing-ConrolNetXS-SDXL-canny", torch_dtype=torch.float16
... )
>>> pipe = StableDiffusionXLControlNetXSPipeline.from_pretrained( # 加载 Stable Diffusion 管道
... "stabilityai/stable-diffusion-xl-base-1.0", controlnet=controlnet, torch_dtype=torch.float16
... )
>>> pipe.enable_model_cpu_offload() # 启用 CPU 内存卸载以优化内存使用
>>> # 获取 Canny 图像
>>> image = np.array(image) # 将图像转换为 NumPy 数组
>>> image = cv2.Canny(image, 100, 200) # 应用 Canny 边缘检测
>>> image = image[:, :, None] # 在第三维添加一个维度以便于后续处理
>>> image = np.concatenate([image, image, image], axis=2) # 复制图像以创建 RGB 格式
>>> canny_image = Image.fromarray(image) # 将数组转换为 PIL 图像
>>> # 生成图像
>>> image = pipe( # 使用管道生成图像
... prompt, controlnet_conditioning_scale=controlnet_conditioning_scale, image=canny_image
... ).images[0] # 获取生成的图像
# 定义一个名为 StableDiffusionXLControlNetXSPipeline 的类,继承多个基类
class StableDiffusionXLControlNetXSPipeline(
DiffusionPipeline, # 从 DiffusionPipeline 继承基本功能
TextualInversionLoaderMixin, # 从 TextualInversionLoaderMixin 继承加载文本反演的功能
StableDiffusionXLLoraLoaderMixin, # 从 StableDiffusionXLLoraLoaderMixin 继承加载 LoRA 权重的功能
FromSingleFileMixin, # 从 FromSingleFileMixin 继承从单个文件加载的功能
):
r""" # 文档字符串,描述该管道的功能
Pipeline for text-to-image generation using Stable Diffusion XL with ControlNet-XS guidance. # 说明管道用于文本到图像的生成
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods # 指出该模型继承自 DiffusionPipeline
implemented for all pipelines (downloading, saving, running on a particular device, etc.). # 说明该模型实现了一些通用方法
The pipeline also inherits the following loading methods: # 说明该管道还继承了以下加载方法
- [`~loaders.TextualInversionLoaderMixin.load_textual_inversion`] for loading textual inversion embeddings # 指出用于加载文本反演嵌入的方法
- [`loaders.StableDiffusionXLLoraLoaderMixin.load_lora_weights`] for loading LoRA weights # 指出用于加载 LoRA 权重的方法
- [`loaders.FromSingleFileMixin.from_single_file`] for loading `.ckpt` files # 指出用于加载 `.ckpt` 文件的方法
Args: # 参数说明部分
vae ([`AutoencoderKL`]): # VAE 模型参数,表示变分自编码器
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. # 描述 VAE 的功能
text_encoder ([`~transformers.CLIPTextModel`]): # 文本编码器参数,使用 CLIP 模型
Frozen text-encoder ([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14)). # 描述使用的具体文本编码器
text_encoder_2 ([`~transformers.CLIPTextModelWithProjection`]): # 第二个文本编码器参数
Second frozen text-encoder # 描述第二个文本编码器
([laion/CLIP-ViT-bigG-14-laion2B-39B-b160k](https://huggingface.co/laion/CLIP-ViT-bigG-14-laion2B-39B-b160k)). # 指出具体的模型
tokenizer ([`~transformers.CLIPTokenizer`]): # 第一个分词器参数
A `CLIPTokenizer` to tokenize text. # 描述分词器的功能
tokenizer_2 ([`~transformers.CLIPTokenizer`]): # 第二个分词器参数
A `CLIPTokenizer` to tokenize text. # 描述第二个分词器的功能
unet ([`UNet2DConditionModel`]): # UNet 模型参数,用于去噪图像
A [`UNet2DConditionModel`] used to create a UNetControlNetXSModel to denoise the encoded image latents. # 描述 UNet 的用途
controlnet ([`ControlNetXSAdapter`]): # ControlNet 参数,用于图像去噪
A [`ControlNetXSAdapter`] to be used in combination with `unet` to denoise the encoded image latents. # 描述 ControlNet 的用途
scheduler ([`SchedulerMixin`]): # 调度器参数,用于去噪处理
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of # 说明调度器的功能
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. # 列出可用的调度器类型
force_zeros_for_empty_prompt (`bool`, *optional*, defaults to `"True"`): # 参数,控制空提示时的处理
Whether the negative prompt embeddings should always be set to 0. Also see the config of # 描述该参数的功能
`stabilityai/stable-diffusion-xl-base-1-0`. # 指出相关的配置
add_watermarker (`bool`, *optional*): # 参数,控制是否使用水印
Whether to use the [invisible_watermark](https://github.com/ShieldMnt/invisible-watermark/) library to # 描述水印的功能
watermark output images. If not defined, it defaults to `True` if the package is installed; otherwise no # 说明默认值及安装条件
watermarker is used. # 说明无水印的条件
""" # 文档字符串结束
model_cpu_offload_seq = "text_encoder->text_encoder_2->unet->vae" # 定义模型在 CPU 上的卸载顺序
_optional_components = [ # 定义可选组件列表
"tokenizer", # 第一个分词器
"tokenizer_2", # 第二个分词器
"text_encoder", # 第一个文本编码器
"text_encoder_2", # 第二个文本编码器
"feature_extractor", # 特征提取器
] # 可选组件列表结束
# 定义一个包含输入张量名称的列表,用于回调处理
_callback_tensor_inputs = [
# 潜在变量
"latents",
# 提示词嵌入
"prompt_embeds",
# 负面提示词嵌入
"negative_prompt_embeds",
# 额外文本嵌入
"add_text_embeds",
# 额外时间 ID
"add_time_ids",
# 负面池化提示词嵌入
"negative_pooled_prompt_embeds",
# 负面额外时间 ID
"negative_add_time_ids",
]
# 初始化方法,设置类的属性
def __init__(
# 变分自编码器
self,
vae: AutoencoderKL,
# 文本编码器
text_encoder: CLIPTextModel,
# 第二个文本编码器
text_encoder_2: CLIPTextModelWithProjection,
# 第一个分词器
tokenizer: CLIPTokenizer,
# 第二个分词器
tokenizer_2: CLIPTokenizer,
# U-Net模型,支持两种类型
unet: Union[UNet2DConditionModel, UNetControlNetXSModel],
# 控制网适配器
controlnet: ControlNetXSAdapter,
# 调度器
scheduler: KarrasDiffusionSchedulers,
# 是否为空提示强制使用零
force_zeros_for_empty_prompt: bool = True,
# 可选水印参数
add_watermarker: Optional[bool] = None,
# 可选特征提取器
feature_extractor: CLIPImageProcessor = None,
):
# 调用父类的初始化方法
super().__init__()
# 检查 U-Net 类型,如果是 UNet2DConditionModel,则转换为 UNetControlNetXSModel
if isinstance(unet, UNet2DConditionModel):
unet = UNetControlNetXSModel.from_unet(unet, controlnet)
# 注册模型模块,便于管理和使用
self.register_modules(
vae=vae,
text_encoder=text_encoder,
text_encoder_2=text_encoder_2,
tokenizer=tokenizer,
tokenizer_2=tokenizer_2,
unet=unet,
controlnet=controlnet,
scheduler=scheduler,
feature_extractor=feature_extractor,
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,用于处理 VAE 输出的图像
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True)
# 创建控制图像处理器,用于控制网图像处理
self.control_image_processor = VaeImageProcessor(
vae_scale_factor=self.vae_scale_factor, do_convert_rgb=True, do_normalize=False
)
# 根据用户输入或默认设置决定是否添加水印
add_watermarker = add_watermarker if add_watermarker is not None else is_invisible_watermark_available()
# 如果需要添加水印,则初始化水印处理器
if add_watermarker:
self.watermark = StableDiffusionXLWatermarker()
else:
# 否则将水印设置为 None
self.watermark = None
# 注册配置参数,以便后续使用
self.register_to_config(force_zeros_for_empty_prompt=force_zeros_for_empty_prompt)
# 从其他模块复制的提示编码方法
def encode_prompt(
# 输入的提示词
self,
prompt: str,
# 可选的第二个提示词
prompt_2: Optional[str] = None,
# 可选的设备参数
device: Optional[torch.device] = None,
# 每个提示生成的图像数量
num_images_per_prompt: int = 1,
# 是否进行无分类器引导
do_classifier_free_guidance: bool = True,
# 可选的负面提示词
negative_prompt: Optional[str] = None,
# 可选的第二个负面提示词
negative_prompt_2: Optional[str] = None,
# 可选的提示词嵌入
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面提示词嵌入
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的池化提示词嵌入
pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负面池化提示词嵌入
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
# 可选的 LoRA 缩放因子
lora_scale: Optional[float] = None,
# 可选的跳过处理的 clip 层
clip_skip: Optional[int] = None,
# 从其他模块复制的额外步骤参数准备方法
# 准备调度器步骤的额外参数,因为并不是所有调度器都有相同的参数签名
def prepare_extra_step_kwargs(self, generator, eta):
# eta(η)仅在 DDIMScheduler 中使用,其他调度器会忽略它
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应该在 [0, 1] 的范围内
# 检查调度器的步骤方法是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 创建一个空字典用于存放额外参数
extra_step_kwargs = {}
# 如果调度器接受 eta 参数,则将其添加到字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器的步骤方法是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果调度器接受 generator 参数,则将其添加到字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 检查输入的有效性,包括多个参数
def check_inputs(
self,
prompt, # 主要的提示文本
prompt_2, # 第二个提示文本
image, # 输入的图像
negative_prompt=None, # 可选的负面提示文本
negative_prompt_2=None, # 第二个可选的负面提示文本
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
pooled_prompt_embeds=None, # 可选的池化提示嵌入
negative_pooled_prompt_embeds=None, # 可选的池化负面提示嵌入
controlnet_conditioning_scale=1.0, # 控制网络的条件缩放
control_guidance_start=0.0, # 控制指导的起始值
control_guidance_end=1.0, # 控制指导的结束值
callback_on_step_end_tensor_inputs=None, # 步骤结束时的回调函数输入
# 从 diffusers.pipelines.controlnet.pipeline_controlnet.StableDiffusionControlNetPipeline.check_image 复制
# 检查输入的图像类型及其与提示的匹配情况
def check_image(self, image, prompt, prompt_embeds):
# 判断图像是否为 PIL 图像类型
image_is_pil = isinstance(image, PIL.Image.Image)
# 判断图像是否为 Torch 张量类型
image_is_tensor = isinstance(image, torch.Tensor)
# 判断图像是否为 NumPy 数组类型
image_is_np = isinstance(image, np.ndarray)
# 判断图像是否为 PIL 图像列表
image_is_pil_list = isinstance(image, list) and isinstance(image[0], PIL.Image.Image)
# 判断图像是否为 Torch 张量列表
image_is_tensor_list = isinstance(image, list) and isinstance(image[0], torch.Tensor)
# 判断图像是否为 NumPy 数组列表
image_is_np_list = isinstance(image, list) and isinstance(image[0], np.ndarray)
# 检查图像是否为有效类型,如果无效则抛出类型错误
if (
not image_is_pil
and not image_is_tensor
and not image_is_np
and not image_is_pil_list
and not image_is_tensor_list
and not image_is_np_list
):
raise TypeError(
f"image must be passed and be one of PIL image, numpy array, torch tensor, list of PIL images, list of numpy arrays or list of torch tensors, but is {type(image)}"
)
# 如果图像是 PIL 类型,设置批量大小为 1
if image_is_pil:
image_batch_size = 1
else:
# 否则,批量大小为图像列表的长度
image_batch_size = len(image)
# 检查提示的类型,设置对应的批量大小
if prompt is not None and isinstance(prompt, str):
prompt_batch_size = 1
elif prompt is not None and isinstance(prompt, list):
prompt_batch_size = len(prompt)
elif prompt_embeds is not None:
prompt_batch_size = prompt_embeds.shape[0]
# 检查图像批量大小与提示批量大小是否匹配
if image_batch_size != 1 and image_batch_size != prompt_batch_size:
raise ValueError(
f"If image batch size is not 1, image batch size must be same as prompt batch size. image batch size: {image_batch_size}, prompt batch size: {prompt_batch_size}"
)
# 准备图像,调整其大小和批量
def prepare_image(
self,
image,
width,
height,
batch_size,
num_images_per_prompt,
device,
dtype,
do_classifier_free_guidance=False,
):
# 预处理图像并转换为浮点数张量
image = self.control_image_processor.preprocess(image, height=height, width=width).to(dtype=torch.float32)
# 获取图像批量大小
image_batch_size = image.shape[0]
# 如果批量大小为 1,则重复次数为 batch_size
if image_batch_size == 1:
repeat_by = batch_size
else:
# 否则,重复次数为每个提示的图像数量
repeat_by = num_images_per_prompt
# 根据重复次数扩展图像张量
image = image.repeat_interleave(repeat_by, dim=0)
# 将图像移动到指定设备和数据类型
image = image.to(device=device, dtype=dtype)
# 如果使用无分类器自由引导,则将图像复制两次
if do_classifier_free_guidance:
image = torch.cat([image] * 2)
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在向量,设置批处理大小、通道数、高度、宽度等参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在向量的形状,考虑 VAE 的缩放因子
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器的数量是否与批处理大小一致
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在向量,则生成新的随机潜在向量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在向量,则将其移动到指定设备
latents = latents.to(device)
# 将初始噪声按调度器所需的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在向量
return latents
# 获取附加时间ID,包含原始大小、裁剪坐标和目标大小
def _get_add_time_ids(
self, original_size, crops_coords_top_left, target_size, dtype, text_encoder_projection_dim=None
):
# 计算附加时间ID,由原始大小、裁剪坐标和目标大小组成
add_time_ids = list(original_size + crops_coords_top_left + target_size)
# 计算传入的附加嵌入维度
passed_add_embed_dim = (
self.unet.config.addition_time_embed_dim * len(add_time_ids) + text_encoder_projection_dim
)
# 获取期望的附加嵌入维度
expected_add_embed_dim = self.unet.base_add_embedding.linear_1.in_features
# 检查实际的附加嵌入维度是否与期望一致
if expected_add_embed_dim != passed_add_embed_dim:
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."
)
# 将附加时间ID转换为张量
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
# 返回附加时间ID
return add_time_ids
# 从 StableDiffusionUpscalePipeline 复制的函数,用于上升 VAE
def upcast_vae(self):
# 获取 VAE 的数据类型
dtype = self.vae.dtype
# 将 VAE 转换为浮点32类型
self.vae.to(dtype=torch.float32)
# 检查是否使用了 Torch 2.0 或 Xformers 处理器
use_torch_2_0_or_xformers = isinstance(
self.vae.decoder.mid_block.attentions[0].processor,
(
AttnProcessor2_0,
XFormersAttnProcessor,
),
)
# 如果使用了 Xformers 或 Torch 2.0,注意力块不需要为浮点32,这样可以节省大量内存
if use_torch_2_0_or_xformers:
self.vae.post_quant_conv.to(dtype)
self.vae.decoder.conv_in.to(dtype)
self.vae.decoder.mid_block.to(dtype)
@property
# 从 StableDiffusionPipeline 复制的属性,返回引导比例
def guidance_scale(self):
return self._guidance_scale
@property
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.clip_skip 复制
def clip_skip(self):
# 返回剪辑跳过的设置
return self._clip_skip
@property
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.do_classifier_free_guidance 复制
def do_classifier_free_guidance(self):
# 判断是否启用分类器自由引导
return self._guidance_scale > 1 and self.unet.config.time_cond_proj_dim is None
@property
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.cross_attention_kwargs 复制
def cross_attention_kwargs(self):
# 返回交叉注意力的关键字参数
return self._cross_attention_kwargs
@property
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.num_timesteps 复制
def num_timesteps(self):
# 返回时间步数
return self._num_timesteps
@torch.no_grad()
@replace_example_docstring(EXAMPLE_DOC_STRING)
def __call__(
# 定义可调用方法,接收多个参数用于图像生成
self,
prompt: Union[str, List[str]] = None,
prompt_2: Optional[Union[str, List[str]]] = None,
image: PipelineImageInput = None,
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
guidance_scale: float = 5.0,
negative_prompt: Optional[Union[str, List[str]]] = None,
negative_prompt_2: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
prompt_embeds: Optional[torch.Tensor] = None,
negative_prompt_embeds: Optional[torch.Tensor] = None,
pooled_prompt_embeds: Optional[torch.Tensor] = None,
negative_pooled_prompt_embeds: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
controlnet_conditioning_scale: Union[float, List[float]] = 1.0,
control_guidance_start: float = 0.0,
control_guidance_end: float = 1.0,
original_size: Tuple[int, int] = None,
crops_coords_top_left: Tuple[int, int] = (0, 0),
target_size: Tuple[int, int] = None,
negative_original_size: Optional[Tuple[int, int]] = None,
negative_crops_coords_top_left: Tuple[int, int] = (0, 0),
negative_target_size: Optional[Tuple[int, int]] = None,
clip_skip: Optional[int] = None,
callback_on_step_end: Optional[
# 定义回调函数在步骤结束时执行
Union[Callable[[int, int, Dict], None], PipelineCallback, MultiPipelineCallbacks]
] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
.\diffusers\pipelines\controlnet_xs\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从相对路径导入多个工具和依赖项
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入 DIFFUSERS_SLOW_IMPORT 常量
OptionalDependencyNotAvailable, # 导入可选依赖不可用异常
_LazyModule, # 导入延迟加载模块
get_objects_from_module, # 导入从模块获取对象的函数
is_flax_available, # 导入检查 Flax 是否可用的函数
is_torch_available, # 导入检查 PyTorch 是否可用的函数
is_transformers_available, # 导入检查 Transformers 是否可用的函数
)
# 初始化空字典,用于存放虚拟对象
_dummy_objects = {}
# 初始化空字典,用于存放导入结构
_import_structure = {}
# 尝试检查 Transformers 和 Torch 是否可用
try:
if not (is_transformers_available() and is_torch_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象以避免错误
from ...utils import dummy_torch_and_transformers_objects # noqa F403
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果可用,更新导入结构以包含相关管道
_import_structure["pipeline_controlnet_xs"] = ["StableDiffusionControlNetXSPipeline"]
_import_structure["pipeline_controlnet_xs_sd_xl"] = ["StableDiffusionXLControlNetXSPipeline"]
# 尝试检查 Transformers 和 Flax 是否可用
try:
if not (is_transformers_available() and is_flax_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象以避免错误
from ...utils import dummy_flax_and_transformers_objects # noqa F403
# 更新虚拟对象字典
_dummy_objects.update(get_objects_from_module(dummy_flax_and_transformers_objects))
else:
# 如果可用,不执行任何操作(留空)
pass # _import_structure["pipeline_flax_controlnet"] = ["FlaxStableDiffusionControlNetPipeline"]
# 如果正在进行类型检查或慢导入,则执行以下代码
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 尝试检查 Transformers 和 Torch 是否可用
try:
if not (is_transformers_available() and is_torch_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象以避免错误
from ...utils.dummy_torch_and_transformers_objects import *
else:
# 如果可用,导入相关的管道
from .pipeline_controlnet_xs import StableDiffusionControlNetXSPipeline
from .pipeline_controlnet_xs_sd_xl import StableDiffusionXLControlNetXSPipeline
# 尝试检查 Transformers 和 Flax 是否可用
try:
if not (is_transformers_available() and is_flax_available()):
# 如果不可用,则抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从工具中导入虚拟对象以避免错误
from ...utils.dummy_flax_and_transformers_objects import * # noqa F403
else:
# 如果可用,不执行任何操作(留空)
pass # from .pipeline_flax_controlnet import FlaxStableDiffusionControlNetPipeline
# 如果不进行类型检查或慢导入,执行以下代码
else:
import sys # 导入 sys 模块
# 将当前模块替换为延迟加载模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"], # 当前文件的全局变量
_import_structure, # 导入结构
module_spec=__spec__, # 模块规格
)
# 遍历虚拟对象字典,设置模块属性
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
.\diffusers\pipelines\dance_diffusion\pipeline_dance_diffusion.py
# 版权声明,标识该文件的版权所有者及保留权利
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache License, Version 2.0 ("许可证")授权;
# 除非遵守该许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,软件在许可证下分发是基于 "现状" 的基础,
# 不提供任何形式的保证或条件,无论是明示或暗示的。
# 有关许可证的特定语言,治理权限和限制,请参阅许可证。
# 从 typing 模块导入所需的类型提示
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 utils 模块导入日志记录工具
from ...utils import logging
# 从 torch_utils 模块导入随机张量生成工具
from ...utils.torch_utils import randn_tensor
# 从 pipeline_utils 模块导入音频管道输出和扩散管道类
from ..pipeline_utils import AudioPipelineOutput, DiffusionPipeline
# 创建一个日志记录器,用于记录当前模块的日志
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个音频生成的扩散管道类,继承自 DiffusionPipeline
class DanceDiffusionPipeline(DiffusionPipeline):
r"""
用于音频生成的管道。
此模型继承自 [`DiffusionPipeline`]。有关所有管道实现的通用方法(下载、保存、在特定设备上运行等),请查看父类文档。
参数:
unet ([`UNet1DModel`]):
用于去噪编码音频的 `UNet1DModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码音频的潜变量。可以是 [`IPNDMScheduler`] 的一种。
"""
# 定义模型在 CPU 上的卸载顺序,当前为 "unet"
model_cpu_offload_seq = "unet"
# 初始化函数,接受 UNet 模型和调度器作为参数
def __init__(self, unet, scheduler):
# 调用父类的初始化方法
super().__init__()
# 注册 UNet 模型和调度器模块
self.register_modules(unet=unet, scheduler=scheduler)
# 禁用梯度计算的上下文管理器,避免计算梯度以节省内存
@torch.no_grad()
def __call__(
self,
batch_size: int = 1, # 每次生成的音频样本数量,默认为 1
num_inference_steps: int = 100, # 进行推理的步骤数量,默认为 100
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器,默认为 None
audio_length_in_s: Optional[float] = None, # 生成音频的时长(秒),默认为 None
return_dict: bool = True, # 是否以字典形式返回结果,默认为 True
.\diffusers\pipelines\dance_diffusion\__init__.py
# 导入类型检查相关的模块
from typing import TYPE_CHECKING
# 从上层目录导入慢速导入标志和懒加载模块工具
from ...utils import DIFFUSERS_SLOW_IMPORT, _LazyModule
# 定义要导入的结构,包括 'pipeline_dance_diffusion' 模块和其中的 'DanceDiffusionPipeline' 类
_import_structure = {"pipeline_dance_diffusion": ["DanceDiffusionPipeline"]}
# 如果是类型检查或慢速导入标志为真,则直接导入 DanceDiffusionPipeline
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .pipeline_dance_diffusion import DanceDiffusionPipeline
else:
# 否则,导入 sys 模块以处理懒加载
import sys
# 将当前模块替换为懒加载模块,并提供必要的参数
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
.\diffusers\pipelines\ddim\pipeline_ddim.py
# 版权声明,注明该代码的版权信息及使用许可
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 根据 Apache 许可证 2.0 版进行许可(“许可证”);
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有规定,按“原样”基础分发软件,
# 不提供任何形式的明示或暗示的担保或条件。
# 请参阅许可证以获取有关权限和
# 限制的具体语言。
# 从 typing 模块导入必要的类型
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从调度器模块导入 DDIMScheduler 类
from ...schedulers import DDIMScheduler
# 从工具模块导入 randn_tensor 函数
from ...utils.torch_utils import randn_tensor
# 从管道工具模块导入 DiffusionPipeline 和 ImagePipelineOutput 类
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
# 定义 DDIMPipeline 类,继承自 DiffusionPipeline
class DDIMPipeline(DiffusionPipeline):
r"""
用于图像生成的管道。
该模型继承自 [`DiffusionPipeline`]。有关为所有管道实现的通用方法的文档,请查看超类文档(下载、保存、在特定设备上运行等)。
参数:
unet ([`UNet2DModel`]):
用于去噪编码图像潜变量的 `UNet2DModel`。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 结合使用以去噪编码图像的调度器。可以是 [`DDPMScheduler`] 或 [`DDIMScheduler`] 之一。
"""
# 定义模型在 CPU 上的卸载序列
model_cpu_offload_seq = "unet"
# 初始化方法,接受 unet 和 scheduler 参数
def __init__(self, unet, scheduler):
# 调用父类的初始化方法
super().__init__()
# 确保调度器始终可以转换为 DDIM
scheduler = DDIMScheduler.from_config(scheduler.config)
# 注册 unet 和 scheduler 模块
self.register_modules(unet=unet, scheduler=scheduler)
# 禁用梯度计算的上下文管理器
@torch.no_grad()
def __call__(
# 定义批处理大小,默认为 1
batch_size: int = 1,
# 可选的随机数生成器,可以是单个生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# eta 参数,默认为 0.0
eta: float = 0.0,
# 进行推理的步骤数,默认为 50
num_inference_steps: int = 50,
# 可选的布尔值,用于指示是否使用裁剪的模型输出
use_clipped_model_output: Optional[bool] = None,
# 可选的输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 返回字典的布尔值,默认为 True
return_dict: bool = True,
.\diffusers\pipelines\ddim\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从相对路径导入 DIFFUSERS_SLOW_IMPORT 和 _LazyModule
from ...utils import DIFFUSERS_SLOW_IMPORT, _LazyModule
# 定义模块导入结构,包含 "pipeline_ddim" 和其下的 "DDIMPipeline"
_import_structure = {"pipeline_ddim": ["DDIMPipeline"]}
# 如果在类型检查模式下或需要慢速导入,直接导入 DDIMPipeline
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .pipeline_ddim import DDIMPipeline
else:
# 否则,导入 sys 模块
import sys
# 将当前模块替换为 _LazyModule 实例,以实现延迟加载
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
.\diffusers\pipelines\ddpm\pipeline_ddpm.py
# 版权声明,表明版权所有者和版权年份
# Copyright 2024 The HuggingFace Team. All rights reserved.
#
# 许可声明,说明根据 Apache 2.0 许可证使用该文件的条件
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# 说明如何获取许可证的链接
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 免责声明,声明在适用法律下软件以“按现状”方式分发,不提供任何形式的担保
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# 说明许可证的具体条款和条件
# See the License for the specific language governing permissions and
# limitations under the License.
# 从 typing 模块导入需要的类型注解
from typing import List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 utils.torch_utils 模块导入 randn_tensor 函数
from ...utils.torch_utils import randn_tensor
# 从 pipeline_utils 模块导入 DiffusionPipeline 和 ImagePipelineOutput 类
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput
# 定义 DDPMPipeline 类,继承自 DiffusionPipeline 类
class DDPMPipeline(DiffusionPipeline):
r"""
用于图像生成的管道。
该模型继承自 [`DiffusionPipeline`]。请查看超类文档以了解为所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
参数:
unet ([`UNet2DModel`]):
用于去噪编码图像潜变量的 `UNet2DModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于去噪编码图像。可以是
[`DDPMScheduler`] 或 [`DDIMScheduler`] 的其中之一。
"""
# 定义一个类变量,表示 CPU 卸载的模型序列
model_cpu_offload_seq = "unet"
# 初始化方法,接受 unet 和 scheduler 作为参数
def __init__(self, unet, scheduler):
# 调用父类的初始化方法
super().__init__()
# 注册 unet 和 scheduler 模块
self.register_modules(unet=unet, scheduler=scheduler)
# 使用装饰器,表示该方法在执行时不需要计算梯度
@torch.no_grad()
def __call__(
# 定义批量大小,默认为 1
batch_size: int = 1,
# 可选参数,生成器,可以是单个或多个 torch.Generator 实例
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 定义推理步骤的数量,默认为 1000
num_inference_steps: int = 1000,
# 定义输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 定义是否返回字典格式的输出,默认为 True
return_dict: bool = True,
.\diffusers\pipelines\ddpm\__init__.py
# 导入类型检查功能
from typing import TYPE_CHECKING
# 从上层模块导入工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入用于慢速导入的常量
_LazyModule, # 导入懒加载模块的类
)
# 定义模块导入结构,指定将要导入的子模块
_import_structure = {"pipeline_ddpm": ["DDPMPipeline"]}
# 如果在类型检查模式或需要慢速导入,则导入 DDPMPipeline
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
from .pipeline_ddpm import DDPMPipeline # 从子模块导入 DDPMPipeline
# 否则,进行懒加载模块的处理
else:
import sys # 导入系统模块
# 将当前模块替换为懒加载模块,延迟实际导入
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块名
globals()["__file__"], # 当前文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块规范
)
.\diffusers\pipelines\deepfloyd_if\pipeline_if.py
# 导入 html 模块,用于处理 HTML 文本
import html
# 导入 inspect 模块,用于获取对象的信息
import inspect
# 导入 re 模块,用于正则表达式操作
import re
# 从 urllib.parse 导入模块,作为 ul,用于处理 URL
import urllib.parse as ul
# 导入类型提示所需的类型
from typing import Any, Callable, Dict, List, Optional, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 CLIP 图像处理器、T5 编码器模型和 T5 标记器
from transformers import CLIPImageProcessor, T5EncoderModel, T5Tokenizer
# 从 loaders 模块导入 StableDiffusionLoraLoaderMixin 类
from ...loaders import StableDiffusionLoraLoaderMixin
# 从 models 模块导入 UNet2DConditionModel 类
from ...models import UNet2DConditionModel
# 从 schedulers 模块导入 DDPMScheduler 类
from ...schedulers import DDPMScheduler
# 从 utils 模块导入多个工具函数和常量
from ...utils import (
BACKENDS_MAPPING, # 后端映射
is_bs4_available, # 检查 bs4 是否可用
is_ftfy_available, # 检查 ftfy 是否可用
logging, # 日志记录工具
replace_example_docstring, # 替换示例文档字符串的函数
)
# 从 torch_utils 模块导入 randn_tensor 函数
from ...utils.torch_utils import randn_tensor
# 从 pipeline_utils 模块导入 DiffusionPipeline 类
from ..pipeline_utils import DiffusionPipeline
# 从 pipeline_output 模块导入 IFPipelineOutput 类
from .pipeline_output import IFPipelineOutput
# 从 safety_checker 模块导入 IFSafetyChecker 类
from .safety_checker import IFSafetyChecker
# 从 watermark 模块导入 IFWatermarker 类
from .watermark import IFWatermarker
# 创建日志记录器,使用当前模块的名称
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 如果 bs4 可用,导入 BeautifulSoup 类
if is_bs4_available():
from bs4 import BeautifulSoup
# 如果 ftfy 可用,导入该模块
if is_ftfy_available():
import ftfy
# 定义示例文档字符串,包含示例代码块
EXAMPLE_DOC_STRING = """
Examples:
```py
>>> from diffusers import IFPipeline, IFSuperResolutionPipeline, DiffusionPipeline
>>> from diffusers.utils import pt_to_pil
>>> import torch
>>> pipe = IFPipeline.from_pretrained("DeepFloyd/IF-I-XL-v1.0", variant="fp16", torch_dtype=torch.float16)
>>> pipe.enable_model_cpu_offload()
>>> prompt = 'a photo of a kangaroo wearing an orange hoodie and blue sunglasses standing in front of the eiffel tower holding a sign that says "very deep learning"'
>>> prompt_embeds, negative_embeds = pipe.encode_prompt(prompt)
>>> image = pipe(prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_embeds, output_type="pt").images
>>> # save intermediate image
>>> pil_image = pt_to_pil(image)
>>> pil_image[0].save("./if_stage_I.png")
>>> super_res_1_pipe = IFSuperResolutionPipeline.from_pretrained(
... "DeepFloyd/IF-II-L-v1.0", text_encoder=None, variant="fp16", torch_dtype=torch.float16
... )
>>> super_res_1_pipe.enable_model_cpu_offload()
>>> image = super_res_1_pipe(
... image=image, prompt_embeds=prompt_embeds, negative_prompt_embeds=negative_embeds, output_type="pt"
... ).images
>>> # save intermediate image
>>> pil_image = pt_to_pil(image)
>>> pil_image[0].save("./if_stage_I.png")
>>> safety_modules = {
... "feature_extractor": pipe.feature_extractor,
... "safety_checker": pipe.safety_checker,
... "watermarker": pipe.watermarker,
... }
>>> super_res_2_pipe = DiffusionPipeline.from_pretrained(
... "stabilityai/stable-diffusion-x4-upscaler", **safety_modules, torch_dtype=torch.float16
... )
>>> super_res_2_pipe.enable_model_cpu_offload()
>>> image = super_res_2_pipe(
... prompt=prompt,
... image=image,
... ).images
>>> image[0].save("./if_stage_II.png")
```py
"""
# 定义一个名为 IFPipeline 的类,继承自 DiffusionPipeline 和 StableDiffusionLoraLoaderMixin
class IFPipeline(DiffusionPipeline, StableDiffusionLoraLoaderMixin):
# 定义 tokenizer 属性,类型为 T5Tokenizer
tokenizer: T5Tokenizer
# 定义 text_encoder 属性,类型为 T5EncoderModel
text_encoder: T5EncoderModel
# 定义 unet 属性,类型为 UNet2DConditionModel
unet: UNet2DConditionModel
# 定义 scheduler 属性,类型为 DDPMScheduler
scheduler: DDPMScheduler
# 定义 feature_extractor 属性,类型为可选的 CLIPImageProcessor
feature_extractor: Optional[CLIPImageProcessor]
# 定义 safety_checker 属性,类型为可选的 IFSafetyChecker
safety_checker: Optional[IFSafetyChecker]
# 定义 watermarker 属性,类型为可选的 IFWatermarker
watermarker: Optional[IFWatermarker]
# 编译一个正则表达式,用于匹配不良标点符号
bad_punct_regex = re.compile(
r"["
+ "#®•©™&@·º½¾¿¡§~"
+ r"\)"
+ r"\("
+ r"\]"
+ r"\["
+ r"\}"
+ r"\{"
+ r"\|"
+ "\\"
+ r"\/"
+ r"\*"
+ r"]{1,}"
) # noqa
# 定义一个可选组件的列表
_optional_components = ["tokenizer", "text_encoder", "safety_checker", "feature_extractor", "watermarker"]
# 定义模型 CPU 卸载的顺序
model_cpu_offload_seq = "text_encoder->unet"
# 定义需要从 CPU 卸载中排除的组件
_exclude_from_cpu_offload = ["watermarker"]
# 初始化方法,定义类的构造函数
def __init__(
self,
# 接收 tokenizer 参数,类型为 T5Tokenizer
tokenizer: T5Tokenizer,
# 接收 text_encoder 参数,类型为 T5EncoderModel
text_encoder: T5EncoderModel,
# 接收 unet 参数,类型为 UNet2DConditionModel
unet: UNet2DConditionModel,
# 接收 scheduler 参数,类型为 DDPMScheduler
scheduler: DDPMScheduler,
# 接收可选的 safety_checker 参数,类型为 IFSafetyChecker
safety_checker: Optional[IFSafetyChecker],
# 接收可选的 feature_extractor 参数,类型为 CLIPImageProcessor
feature_extractor: Optional[CLIPImageProcessor],
# 接收可选的 watermarker 参数,类型为 IFWatermarker
watermarker: Optional[IFWatermarker],
# 接收一个布尔值,表示是否需要安全检查器,默认为 True
requires_safety_checker: bool = True,
):
# 调用父类的初始化方法
super().__init__()
# 如果安全检查器为 None 且需要安全检查器,发出警告
if safety_checker is None and requires_safety_checker:
logger.warning(
f"You have disabled the safety checker for {self.__class__} by passing `safety_checker=None`. Ensure"
" that you abide to the conditions of the IF license and do not expose unfiltered"
" results in services or applications open to the public. Both the diffusers team and Hugging Face"
" strongly recommend to keep the safety filter enabled in all public facing circumstances, disabling"
" it only for use-cases that involve analyzing network behavior or auditing its results. For more"
" information, please have a look at https://github.com/huggingface/diffusers/pull/254 ."
)
# 如果安全检查器不为 None 但特征提取器为 None,抛出值错误
if safety_checker is not None and feature_extractor is None:
raise ValueError(
"Make sure to define a feature extractor when loading {self.__class__} if you want to use the safety"
" checker. If you do not want to use the safety checker, you can pass `'safety_checker=None'` instead."
)
# 注册各个模块
self.register_modules(
tokenizer=tokenizer,
text_encoder=text_encoder,
unet=unet,
scheduler=scheduler,
safety_checker=safety_checker,
feature_extractor=feature_extractor,
watermarker=watermarker,
)
# 将需要的安全检查器配置注册到配置中
self.register_to_config(requires_safety_checker=requires_safety_checker)
# 采用 torch.no_grad() 装饰器,表示在此装饰的函数中不计算梯度
@torch.no_grad()
# 定义编码提示的函数,接受多个参数以配置行为
def encode_prompt(
self,
prompt: Union[str, List[str]], # 提示内容,可以是字符串或字符串列表
do_classifier_free_guidance: bool = True, # 是否启用无分类器引导
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
device: Optional[torch.device] = None, # 指定设备(如CPU或GPU)
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负面提示
prompt_embeds: Optional[torch.Tensor] = None, # 可选的提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None, # 可选的负面提示嵌入张量
clean_caption: bool = False, # 是否清理提示文本
# 定义运行安全检查器的函数,检查生成的图像是否安全
def run_safety_checker(self, image, device, dtype):
# 检查是否存在安全检查器
if self.safety_checker is not None:
# 提取图像特征并转换为张量,移动到指定设备
safety_checker_input = self.feature_extractor(self.numpy_to_pil(image), return_tensors="pt").to(device)
# 使用安全检查器处理图像,返回处理后的图像和检测结果
image, nsfw_detected, watermark_detected = self.safety_checker(
images=image,
clip_input=safety_checker_input.pixel_values.to(dtype=dtype),
)
else:
# 如果没有安全检查器,设置检测结果为None
nsfw_detected = None
watermark_detected = None
# 返回处理后的图像和检测结果
return image, nsfw_detected, watermark_detected
# 从StableDiffusionPipeline类复制的函数,用于准备额外的调度器步骤参数
# 准备调度器步骤的额外参数,因不同调度器的签名可能不同
def prepare_extra_step_kwargs(self, generator, eta):
# eta(η)仅在DDIMScheduler中使用,对于其他调度器将被忽略
# eta对应于DDIM论文中的η,范围应在[0, 1]之间
# 检查调度器是否接受eta参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {} # 初始化额外参数字典
# 如果接受eta,添加到额外参数字典中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器是否接受generator参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受generator,添加到额外参数字典中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回包含额外参数的字典
return extra_step_kwargs
# 定义检查输入的函数,确保参数有效
def check_inputs(
self,
prompt, # 提示内容
callback_steps, # 回调步骤,用于调度
negative_prompt=None, # 可选的负面提示
prompt_embeds=None, # 可选的提示嵌入
negative_prompt_embeds=None, # 可选的负面提示嵌入
):
# 检查 callback_steps 是否为 None 或者不是正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
# 抛出错误,确保 callback_steps 是一个正整数
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查是否同时提供了 prompt 和 prompt_embeds
if prompt is not None and prompt_embeds is not None:
# 抛出错误,提示只能提供其中一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都未定义
elif prompt is None and prompt_embeds is None:
# 抛出错误,要求提供至少一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 的类型是否有效
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 抛出错误,提示 prompt 必须是字符串或列表
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查是否同时提供了 negative_prompt 和 negative_prompt_embeds
if negative_prompt is not None and negative_prompt_embeds is not None:
# 抛出错误,提示只能提供其中一个
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查 prompt_embeds 和 negative_prompt_embeds 是否都不为 None
if prompt_embeds is not None and negative_prompt_embeds is not None:
# 检查这两个张量的形状是否一致
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 抛出错误,提示形状不匹配
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 准备中间图像的函数,接受多个参数
def prepare_intermediate_images(self, batch_size, num_channels, height, width, dtype, device, generator):
# 定义图像的形状
shape = (batch_size, num_channels, height, width)
# 检查生成器的类型和长度是否符合要求
if isinstance(generator, list) and len(generator) != batch_size:
# 抛出错误,提示生成器长度与批量大小不匹配
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 生成随机张量作为初始图像
intermediate_images = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 按调度器要求的标准差缩放初始噪声
intermediate_images = intermediate_images * self.scheduler.init_noise_sigma
# 返回处理后的中间图像
return intermediate_images
# 定义一个文本预处理的方法,接收文本和一个可选的清理标志
def _text_preprocessing(self, text, clean_caption=False):
# 如果清理标志为真且 BeautifulSoup 库不可用
if clean_caption and not is_bs4_available():
# 记录警告,提示用户该库不可用
logger.warning(BACKENDS_MAPPING["bs4"][-1].format("Setting `clean_caption=True`"))
# 记录另一个警告,说明清理标志将被设置为假
logger.warning("Setting `clean_caption` to False...")
# 将清理标志设置为假
clean_caption = False
# 如果清理标志为真且 ftfy 库不可用
if clean_caption and not is_ftfy_available():
# 记录警告,提示用户该库不可用
logger.warning(BACKENDS_MAPPING["ftfy"][-1].format("Setting `clean_caption=True`"))
# 记录另一个警告,说明清理标志将被设置为假
logger.warning("Setting `clean_caption` to False...")
# 将清理标志设置为假
clean_caption = False
# 如果文本不是元组或列表类型
if not isinstance(text, (tuple, list)):
# 将文本转换为单元素列表
text = [text]
# 定义一个内部处理函数,接收文本字符串
def process(text: str):
# 如果清理标志为真
if clean_caption:
# 清理文本标题
text = self._clean_caption(text)
# 再次清理文本标题(可能是多次清理)
text = self._clean_caption(text)
else:
# 将文本转换为小写并去除首尾空格
text = text.lower().strip()
# 返回处理后的文本
return text
# 对文本列表中的每个文本应用处理函数,并返回处理结果的列表
return [process(t) for t in text]
# 装饰器,指示在此方法内不需要计算梯度
@torch.no_grad()
# 装饰器,替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义调用方法,接收多个参数
def __call__(
# 接收一个可选的字符串或字符串列表作为提示
prompt: Union[str, List[str]] = None,
# 指定推理步骤的数量,默认为100
num_inference_steps: int = 100,
# 接收一个可选的时间步列表
timesteps: List[int] = None,
# 指导缩放的浮点数,默认为7.0
guidance_scale: float = 7.0,
# 可选的负提示字符串或字符串列表
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# 可选的图像高度
height: Optional[int] = None,
# 可选的图像宽度
width: Optional[int] = None,
# eta的浮点值,默认为0.0
eta: float = 0.0,
# 可选的生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的提示嵌入张量
prompt_embeds: Optional[torch.Tensor] = None,
# 可选的负提示嵌入张量
negative_prompt_embeds: Optional[torch.Tensor] = None,
# 输出类型的可选字符串,默认为“pil”
output_type: Optional[str] = "pil",
# 返回字典的布尔值,默认为真
return_dict: bool = True,
# 可选的回调函数,接收三个参数
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤的整数,默认为1
callback_steps: int = 1,
# 清理标志,默认为真
clean_caption: bool = True,
# 可选的交叉注意力参数字典
cross_attention_kwargs: Optional[Dict[str, Any]] = None,
标签:None,prompt,image,torch,diffusers,源码,import,解析,self
From: https://www.cnblogs.com/apachecn/p/18492366