diffusers 源码解析(三十)
.\diffusers\pipelines\deprecated\versatile_diffusion\pipeline_versatile_diffusion.py
# 导入检查模块,用于获取对象的成员信息
import inspect
# 从 typing 模块导入常用类型,方便类型注解
from typing import Callable, List, Optional, Union
# 导入 PIL 库中的 Image 模块,用于图像处理
import PIL.Image
# 导入 PyTorch 库
import torch
# 从 transformers 库中导入用于处理和生成图像的模型和处理器
from transformers import CLIPImageProcessor, CLIPTextModel, CLIPTokenizer, CLIPVisionModel
# 从本地模块导入模型
from ....models import AutoencoderKL, UNet2DConditionModel
# 从本地模块导入调度器
from ....schedulers import KarrasDiffusionSchedulers
# 从本地模块导入日志记录工具
from ....utils import logging
# 从本地模块导入扩散管道的工具
from ...pipeline_utils import DiffusionPipeline
# 从本地模块导入多种引导的扩散管道
from .pipeline_versatile_diffusion_dual_guided import VersatileDiffusionDualGuidedPipeline
from .pipeline_versatile_diffusion_image_variation import VersatileDiffusionImageVariationPipeline
from .pipeline_versatile_diffusion_text_to_image import VersatileDiffusionTextToImagePipeline
# 获取当前模块的日志记录器
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个名为 VersatileDiffusionPipeline 的类,继承自 DiffusionPipeline
class VersatileDiffusionPipeline(DiffusionPipeline):
r"""
使用稳定扩散进行文本到图像生成的管道。
该模型继承自 [`DiffusionPipeline`]。有关所有管道的通用方法的文档(下载、保存、在特定设备上运行等),请查看超类文档。
参数:
vae ([`AutoencoderKL`]):
用于编码和解码图像与潜在表示的变分自编码器(VAE)模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14))。
tokenizer ([`~transformers.CLIPTokenizer`]):
一个 `CLIPTokenizer` 用于对文本进行分词。
unet ([`UNet2DConditionModel`]):
用于对编码后的图像潜在数据进行去噪的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
与 `unet` 结合使用的调度器,用于对编码的图像潜在数据进行去噪。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
safety_checker ([`StableDiffusionSafetyChecker`]):
分类模块,估计生成的图像是否可能被视为冒犯或有害。
有关模型潜在危害的更多详细信息,请参阅 [model card](https://huggingface.co/runwayml/stable-diffusion-v1-5)。
feature_extractor ([`~transformers.CLIPImageProcessor`]):
一个 `CLIPImageProcessor` 用于从生成的图像中提取特征;用于作为 `safety_checker` 的输入。
"""
# 定义类属性,用于存储不同组件的实例
tokenizer: CLIPTokenizer # 用于文本分词的 CLIPTokenizer 实例
image_feature_extractor: CLIPImageProcessor # 用于从图像中提取特征的 CLIPImageProcessor 实例
text_encoder: CLIPTextModel # 文本编码器的 CLIPTextModel 实例
image_encoder: CLIPVisionModel # 图像编码器的 CLIPVisionModel 实例
image_unet: UNet2DConditionModel # 用于图像去噪的 UNet2DConditionModel 实例
text_unet: UNet2DConditionModel # 用于文本去噪的 UNet2DConditionModel 实例
vae: AutoencoderKL # 变分自编码器的实例
scheduler: KarrasDiffusionSchedulers # 调度器的实例
# 初始化类的构造函数,接收多个模型组件作为参数
def __init__(
self,
tokenizer: CLIPTokenizer, # 文本分词器,用于处理文本输入
image_feature_extractor: CLIPImageProcessor, # 图像特征提取器,用于处理图像输入
text_encoder: CLIPTextModel, # 文本编码器,将文本转换为向量表示
image_encoder: CLIPVisionModel, # 图像编码器,将图像转换为向量表示
image_unet: UNet2DConditionModel, # 图像生成的UNet模型
text_unet: UNet2DConditionModel, # 文本生成的UNet模型
vae: AutoencoderKL, # 变分自编码器,用于图像重建
scheduler: KarrasDiffusionSchedulers, # 调度器,控制生成过程的时间步
):
# 调用父类的构造函数
super().__init__()
# 注册各个模块,使其可用
self.register_modules(
tokenizer=tokenizer,
image_feature_extractor=image_feature_extractor,
text_encoder=text_encoder,
image_encoder=image_encoder,
image_unet=image_unet,
text_unet=text_unet,
vae=vae,
scheduler=scheduler,
)
# 计算变分自编码器的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 装饰器,禁止梯度计算,提高推理速度
@torch.no_grad()
def image_variation(
self,
image: Union[torch.Tensor, PIL.Image.Image], # 输入图像,可以是张量或PIL图像
height: Optional[int] = None, # 可选的输出图像高度
width: Optional[int] = None, # 可选的输出图像宽度
num_inference_steps: int = 50, # 推理步骤的数量
guidance_scale: float = 7.5, # 引导比例,控制生成效果
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负面提示
num_images_per_prompt: Optional[int] = 1, # 每个提示生成的图像数量
eta: float = 0.0, # 控制噪声的参数
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器
latents: Optional[torch.Tensor] = None, # 先验潜在向量
output_type: Optional[str] = "pil", # 输出类型,默认为PIL图像
return_dict: bool = True, # 是否返回字典格式的结果
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, # 可选的回调函数
callback_steps: int = 1, # 回调的步数
# 装饰器,禁止梯度计算,提高推理速度
@torch.no_grad()
def text_to_image(
self,
prompt: Union[str, List[str]], # 输入提示,可以是单个字符串或字符串列表
height: Optional[int] = None, # 可选的输出图像高度
width: Optional[int] = None, # 可选的输出图像宽度
num_inference_steps: int = 50, # 推理步骤的数量
guidance_scale: float = 7.5, # 引导比例,控制生成效果
negative_prompt: Optional[Union[str, List[str]]] = None, # 可选的负面提示
num_images_per_prompt: Optional[int] = 1, # 每个提示生成的图像数量
eta: float = 0.0, # 控制噪声的参数
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器
latents: Optional[torch.Tensor] = None, # 先验潜在向量
output_type: Optional[str] = "pil", # 输出类型,默认为PIL图像
return_dict: bool = True, # 是否返回字典格式的结果
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, # 可选的回调函数
callback_steps: int = 1, # 回调的步数
# 装饰器,禁止梯度计算,提高推理速度
@torch.no_grad()
def dual_guided(
self,
prompt: Union[PIL.Image.Image, List[PIL.Image.Image]], # 输入提示,可以是图像或图像列表
image: Union[str, List[str]], # 输入图像路径,可以是单个字符串或字符串列表
text_to_image_strength: float = 0.5, # 文本到图像的强度
height: Optional[int] = None, # 可选的输出图像高度
width: Optional[int] = None, # 可选的输出图像宽度
num_inference_steps: int = 50, # 推理步骤的数量
guidance_scale: float = 7.5, # 引导比例,控制生成效果
num_images_per_prompt: Optional[int] = 1, # 每个提示生成的图像数量
eta: float = 0.0, # 控制噪声的参数
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器
latents: Optional[torch.Tensor] = None, # 先验潜在向量
output_type: Optional[str] = "pil", # 输出类型,默认为PIL图像
return_dict: bool = True, # 是否返回字典格式的结果
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, # 可选的回调函数
callback_steps: int = 1, # 回调的步数
.\diffusers\pipelines\deprecated\versatile_diffusion\pipeline_versatile_diffusion_dual_guided.py
# 版权声明,标识文件归 HuggingFace 团队所有,保留所有权利
# 使用 Apache 2.0 许可证,要求遵守许可证条款
# 许可证的获取地址
# http://www.apache.org/licenses/LICENSE-2.0
# 除非适用的法律或书面协议另有约定,软件按 "原样" 分发
# 不提供任何形式的明示或暗示的担保或条件
# 详细信息见许可证中的具体权限和限制条款
# 导入 inspect 模块,用于获取活跃的对象信息
import inspect
# 从 typing 模块导入类型提示,用于类型注释
from typing import Callable, List, Optional, Tuple, Union
# 导入 numpy 库,用于数值计算
import numpy as np
# 导入 PIL.Image,用于图像处理
import PIL.Image
# 导入 torch 库,提供深度学习功能
import torch
# 导入 torch.utils.checkpoint,用于模型检查点功能
import torch.utils.checkpoint
# 从 transformers 库导入 CLIP 相关类,用于图像和文本处理
from transformers import (
CLIPImageProcessor, # 图像处理器
CLIPTextModelWithProjection, # 带投影的文本模型
CLIPTokenizer, # 文本分词器
CLIPVisionModelWithProjection, # 带投影的视觉模型
)
# 从本地模块导入图像处理和模型相关的类
from ....image_processor import VaeImageProcessor # VAE 图像处理器
from ....models import AutoencoderKL, DualTransformer2DModel, Transformer2DModel, UNet2DConditionModel # 各种模型
from ....schedulers import KarrasDiffusionSchedulers # 调度器
from ....utils import deprecate, logging # 工具函数和日志记录
from ....utils.torch_utils import randn_tensor # 随机张量生成
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 管道相关工具
from .modeling_text_unet import UNetFlatConditionModel # 文本条件模型
# 创建日志记录器实例,用于当前模块
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义一个多功能扩散双重引导管道类
class VersatileDiffusionDualGuidedPipeline(DiffusionPipeline):
r"""
使用多功能扩散的图像-文本双重引导生成的管道。
该模型继承自 [`DiffusionPipeline`]。查阅超类文档以获取所有管道的通用方法
(下载、保存、在特定设备上运行等)。
参数:
vqvae ([`VQModel`]):
向量量化(VQ)模型,用于将图像编码和解码为潜在表示。
bert ([`LDMBertModel`]):
基于 [`~transformers.BERT`] 的文本编码器模型。
tokenizer ([`~transformers.BertTokenizer`]):
用于文本分词的 `BertTokenizer`。
unet ([`UNet2DConditionModel`]):
用于去噪编码图像潜在的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 结合使用的调度器,以去噪编码的图像潜在。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`] 中的任何一个。
"""
# 定义模型在 CPU 上的卸载顺序
model_cpu_offload_seq = "bert->unet->vqvae"
# 定义类属性,包含不同模型组件
tokenizer: CLIPTokenizer # 文本分词器
image_feature_extractor: CLIPImageProcessor # 图像特征提取器
text_encoder: CLIPTextModelWithProjection # 文本编码器
image_encoder: CLIPVisionModelWithProjection # 图像编码器
image_unet: UNet2DConditionModel # 图像去噪模型
text_unet: UNetFlatConditionModel # 文本条件去噪模型
vae: AutoencoderKL # 自编码器
scheduler: KarrasDiffusionSchedulers # 调度器
# 定义可选组件
_optional_components = ["text_unet"] # 可选的文本去噪模型组件
# 初始化方法,设置模型的基本组件
def __init__(
self,
tokenizer: CLIPTokenizer, # 用于文本的分词器
image_feature_extractor: CLIPImageProcessor, # 图像特征提取器
text_encoder: CLIPTextModelWithProjection, # 文本编码器,带有投影层
image_encoder: CLIPVisionModelWithProjection, # 图像编码器,带有投影层
image_unet: UNet2DConditionModel, # 用于图像处理的 UNet 模型
text_unet: UNetFlatConditionModel, # 用于文本处理的 UNet 模型
vae: AutoencoderKL, # 变分自编码器
scheduler: KarrasDiffusionSchedulers, # 调度器,用于控制训练过程
):
# 调用父类的初始化方法
super().__init__()
# 注册各个模块,便于管理和调用
self.register_modules(
tokenizer=tokenizer,
image_feature_extractor=image_feature_extractor,
text_encoder=text_encoder,
image_encoder=image_encoder,
image_unet=image_unet,
text_unet=text_unet,
vae=vae,
scheduler=scheduler,
)
# 计算 VAE 的缩放因子,用于图像处理
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器实例,使用计算出的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 检查文本 UNet 是否存在且图像 UNet 配置中不包含双交叉注意力
if self.text_unet is not None and (
"dual_cross_attention" not in self.image_unet.config or not self.image_unet.config.dual_cross_attention
):
# 如果从通用检查点加载而非保存的双引导管道,转换为双注意力
self._convert_to_dual_attention()
# 移除未使用的权重
def remove_unused_weights(self):
# 将 text_unet 注册为 None,以释放资源
self.register_modules(text_unet=None)
# 定义一个私有方法,用于将图像的 UNet 转换为双重注意力机制
def _convert_to_dual_attention(self):
"""
替换 image_unet 的 `Transformer2DModel` 块为包含来自 `image_unet` 和 `text_unet` 的 transformer 块的 `DualTransformer2DModel`
"""
# 遍历 image_unet 中的所有命名模块
for name, module in self.image_unet.named_modules():
# 检查当前模块是否为 Transformer2DModel 的实例
if isinstance(module, Transformer2DModel):
# 分割模块名称,获取父级名称和索引
parent_name, index = name.rsplit(".", 1)
index = int(index)
# 获取图像和文本的 transformer 模块
image_transformer = self.image_unet.get_submodule(parent_name)[index]
text_transformer = self.text_unet.get_submodule(parent_name)[index]
# 获取图像 transformer 的配置
config = image_transformer.config
# 创建双重 transformer 模型
dual_transformer = DualTransformer2DModel(
num_attention_heads=config.num_attention_heads,
attention_head_dim=config.attention_head_dim,
in_channels=config.in_channels,
num_layers=config.num_layers,
dropout=config.dropout,
norm_num_groups=config.norm_num_groups,
cross_attention_dim=config.cross_attention_dim,
attention_bias=config.attention_bias,
sample_size=config.sample_size,
num_vector_embeds=config.num_vector_embeds,
activation_fn=config.activation_fn,
num_embeds_ada_norm=config.num_embeds_ada_norm,
)
# 将图像 transformer 和文本 transformer 分别赋值给双重 transformer
dual_transformer.transformers[0] = image_transformer
dual_transformer.transformers[1] = text_transformer
# 替换原有的模块为双重 transformer 模块
self.image_unet.get_submodule(parent_name)[index] = dual_transformer
# 注册配置,启用双重交叉注意力
self.image_unet.register_to_config(dual_cross_attention=True)
# 定义一个私有方法,用于将双重注意力机制还原为图像 UNet 的标准 transformer
def _revert_dual_attention(self):
"""
将 image_unet 的 `DualTransformer2DModel` 块还原为带有 image_unet 权重的 `Transformer2DModel`
如果在另一个管道中重用 `image_unet`,例如 `VersatileDiffusionPipeline`,请调用此函数
"""
# 遍历 image_unet 中的所有命名模块
for name, module in self.image_unet.named_modules():
# 检查当前模块是否为 DualTransformer2DModel 的实例
if isinstance(module, DualTransformer2DModel):
# 分割模块名称,获取父级名称和索引
parent_name, index = name.rsplit(".", 1)
index = int(index)
# 将双重 transformer 的第一个 transformer 还原到原有模块
self.image_unet.get_submodule(parent_name)[index] = module.transformers[0]
# 注册配置,禁用双重交叉注意力
self.image_unet.register_to_config(dual_cross_attention=False)
# 定义一个私有方法用于将提示编码为文本编码器的隐藏状态
def _encode_image_prompt(self, prompt, device, num_images_per_prompt, do_classifier_free_guidance):
r"""
将提示编码为文本编码器的隐藏状态。
参数:
prompt (`str` 或 `List[str]`):
要编码的提示
device: (`torch.device`):
PyTorch 设备
num_images_per_prompt (`int`):
每个提示生成的图像数量
do_classifier_free_guidance (`bool`):
是否使用无分类器引导
"""
# 定义一个私有方法用于标准化嵌入
def normalize_embeddings(encoder_output):
# 对编码器输出进行层归一化
embeds = self.image_encoder.vision_model.post_layernorm(encoder_output.last_hidden_state)
# 进行视觉投影以获得嵌入
embeds = self.image_encoder.visual_projection(embeds)
# 取第一个嵌入进行池化
embeds_pooled = embeds[:, 0:1]
# 归一化嵌入
embeds = embeds / torch.norm(embeds_pooled, dim=-1, keepdim=True)
return embeds
# 根据提示类型确定批大小
batch_size = len(prompt) if isinstance(prompt, list) else 1
# 获取提示文本的嵌入
image_input = self.image_feature_extractor(images=prompt, return_tensors="pt")
# 将像素值移动到指定设备并转换为相应的数据类型
pixel_values = image_input.pixel_values.to(device).to(self.image_encoder.dtype)
# 通过图像编码器获取图像嵌入
image_embeddings = self.image_encoder(pixel_values)
# 对图像嵌入进行标准化处理
image_embeddings = normalize_embeddings(image_embeddings)
# 复制图像嵌入以适应每个提示的生成,采用适合 MPS 的方法
bs_embed, seq_len, _ = image_embeddings.shape
image_embeddings = image_embeddings.repeat(1, num_images_per_prompt, 1)
# 重塑图像嵌入的形状以适应批处理
image_embeddings = image_embeddings.view(bs_embed * num_images_per_prompt, seq_len, -1)
# 获取无条件的嵌入以用于无分类器引导
if do_classifier_free_guidance:
# 创建一个形状为 (512, 512, 3) 的全零图像数组,值为0.5
uncond_images = [np.zeros((512, 512, 3)) + 0.5] * batch_size
# 获取无条件图像的特征
uncond_images = self.image_feature_extractor(images=uncond_images, return_tensors="pt")
# 将无条件图像的像素值移动到指定设备并转换为相应的数据类型
pixel_values = uncond_images.pixel_values.to(device).to(self.image_encoder.dtype)
# 通过图像编码器获取负提示嵌入
negative_prompt_embeds = self.image_encoder(pixel_values)
# 对负提示嵌入进行标准化处理
negative_prompt_embeds = normalize_embeddings(negative_prompt_embeds)
# 复制无条件嵌入以适应每个提示的生成,采用适合 MPS 的方法
seq_len = negative_prompt_embeds.shape[1]
negative_prompt_embeds = negative_prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 重塑无条件嵌入的形状以适应批处理
negative_prompt_embeds = negative_prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 为了进行无分类器引导,需要进行两次前向传递
# 这里将无条件嵌入和条件嵌入连接成一个批次
# 以避免进行两次前向传递
image_embeddings = torch.cat([negative_prompt_embeds, image_embeddings])
# 返回最终的图像嵌入
return image_embeddings
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.decode_latents 复制
def decode_latents(self, latents):
# 定义弃用信息,说明该方法将在 1.0.0 版本中移除,并提供替代方法
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用函数,发出警告
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 配置的缩放因子对潜在向量进行缩放
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在向量,获取图像数据
image = self.vae.decode(latents, return_dict=False)[0]
# 将图像数据规范化到 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,以确保兼容性且不增加显著开销
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_extra_step_kwargs 复制
def prepare_extra_step_kwargs(self, generator, eta):
# 准备额外的参数供调度器步骤使用,不同调度器的参数签名可能不同
# eta(η)仅在 DDIMScheduler 中使用,其他调度器将忽略它
# eta 对应于 DDIM 论文中的 η,应在 [0, 1] 范围内
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤参数字典
extra_step_kwargs = {}
# 如果接受 eta,则将其添加到额外步骤参数中
if accepts_eta:
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 如果接受 generator,则将其添加到额外步骤参数中
if accepts_generator:
extra_step_kwargs["generator"] = generator
# 返回额外步骤参数字典
return extra_step_kwargs
def check_inputs(self, prompt, image, height, width, callback_steps):
# 检查 prompt 类型,必须为 str、PIL.Image 或 list
if not isinstance(prompt, str) and not isinstance(prompt, PIL.Image.Image) and not isinstance(prompt, list):
raise ValueError(f"`prompt` has to be of type `str` `PIL.Image` or `list` but is {type(prompt)}")
# 检查 image 类型,必须为 str、PIL.Image 或 list
if not isinstance(image, str) and not isinstance(image, PIL.Image.Image) and not isinstance(image, list):
raise ValueError(f"`image` has to be of type `str` `PIL.Image` or `list` but is {type(image)}")
# 检查 height 和 width 是否为 8 的倍数
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查 callback_steps 是否为正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制
# 准备潜在变量,定义形状和相关参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,包括批量大小、通道数和缩放后的高度和宽度
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器是否为列表且长度与批量大小不匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
# 抛出错误,提示生成器数量与批量大小不匹配
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有提供潜在变量,随机生成潜在变量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,将其移动到指定设备
latents = latents.to(device)
# 将初始噪声按调度器要求的标准差进行缩放
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在变量
return latents
# 设置变换器的参数,包括混合比例和条件类型
def set_transformer_params(self, mix_ratio: float = 0.5, condition_types: Tuple = ("text", "image")):
# 遍历命名模块,查找 DualTransformer2DModel 模块
for name, module in self.image_unet.named_modules():
if isinstance(module, DualTransformer2DModel):
# 设置模块的混合比例
module.mix_ratio = mix_ratio
# 遍历条件类型,设置每种条件的参数
for i, type in enumerate(condition_types):
if type == "text":
# 为文本条件设置长度和变换器索引
module.condition_lengths[i] = self.text_encoder.config.max_position_embeddings
module.transformer_index_for_condition[i] = 1 # 使用第二个(文本)变换器
else:
# 为图像条件设置长度和变换器索引
module.condition_lengths[i] = 257
module.transformer_index_for_condition[i] = 0 # 使用第一个(图像)变换器
# 不计算梯度的调用方法,处理输入参数
@torch.no_grad()
def __call__(
# 输入的提示,可以是单张或多张图像
prompt: Union[PIL.Image.Image, List[PIL.Image.Image]],
# 输入的图像文件路径
image: Union[str, List[str]],
# 文本到图像的强度
text_to_image_strength: float = 0.5,
# 可选的图像高度
height: Optional[int] = None,
# 可选的图像宽度
width: Optional[int] = None,
# 推理步骤的数量
num_inference_steps: int = 50,
# 指导比例
guidance_scale: float = 7.5,
# 每个提示生成的图像数量
num_images_per_prompt: Optional[int] = 1,
# 超参数
eta: float = 0.0,
# 随机数生成器
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 可选的潜在变量
latents: Optional[torch.Tensor] = None,
# 输出类型,默认为 PIL 图像
output_type: Optional[str] = "pil",
# 是否返回字典格式的结果
return_dict: bool = True,
# 回调函数,用于推理过程中的处理
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤
callback_steps: int = 1,
# 其他可选参数
**kwargs,
.\diffusers\pipelines\deprecated\versatile_diffusion\pipeline_versatile_diffusion_image_variation.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证第 2.0 版(“许可证”)进行授权;
# 除非遵循许可证,否则您不得使用此文件。
# 您可以在以下地址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面协议另有约定,软件
# 以“原样”基础分发,不提供任何形式的保证或条件,
# 明示或暗示。请参阅许可证以获取有关权限的特定语言
# 和限制。
import inspect # 导入 inspect 模块以获取对象的内部信息
from typing import Callable, List, Optional, Union # 从 typing 导入用于类型注释的各种类型
import numpy as np # 导入 numpy 作为数值计算库
import PIL.Image # 导入 PIL.Image 用于处理图像
import torch # 导入 PyTorch 框架以进行深度学习
import torch.utils.checkpoint # 导入 checkpoint 以进行内存优化的反向传播
from transformers import CLIPImageProcessor, CLIPVisionModelWithProjection # 从 transformers 导入图像处理和视觉模型
from ....image_processor import VaeImageProcessor # 从相对路径导入 VaeImageProcessor
from ....models import AutoencoderKL, UNet2DConditionModel # 导入自动编码器和 UNet 模型
from ....schedulers import KarrasDiffusionSchedulers # 导入 Karras Diffusion 调度器
from ....utils import deprecate, logging # 导入 deprecate 和 logging 工具
from ....utils.torch_utils import randn_tensor # 从 torch_utils 导入 randn_tensor 函数
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 从 pipeline_utils 导入 DiffusionPipeline 和 ImagePipelineOutput
logger = logging.get_logger(__name__) # 初始化日志记录器,使用当前模块名称
class VersatileDiffusionImageVariationPipeline(DiffusionPipeline): # 定义一个用于图像变换的管道类,继承自 DiffusionPipeline
r""" # 开始文档字符串,描述此类的作用
Pipeline for image variation using Versatile Diffusion. # 声明这是一个用于图像变换的管道
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.). # 说明该模型继承自 DiffusionPipeline,并可查看其文档
Parameters: # 参数说明
vqvae ([`VQModel`]): # vqvae 参数,类型为 VQModel
Vector-quantized (VQ) model to encode and decode images to and from latent representations. # 描述 vqvae 的功能
bert ([`LDMBertModel`]): # bert 参数,类型为 LDMBertModel
Text-encoder model based on [`~transformers.BERT`]. # 描述 bert 的功能
tokenizer ([`~transformers.BertTokenizer`]): # tokenizer 参数,类型为 BertTokenizer
A `BertTokenizer` to tokenize text. # 描述 tokenizer 的功能
unet ([`UNet2DConditionModel`]): # unet 参数,类型为 UNet2DConditionModel
A `UNet2DConditionModel` to denoise the encoded image latents. # 描述 unet 的功能
scheduler ([`SchedulerMixin`]): # scheduler 参数,类型为 SchedulerMixin
A scheduler to be used in combination with `unet` to denoise the encoded image latents. Can be one of
[`DDIMScheduler`], [`LMSDiscreteScheduler`], or [`PNDMScheduler`]. # 描述 scheduler 的功能
"""
model_cpu_offload_seq = "bert->unet->vqvae" # 定义模型在 CPU 上的卸载顺序
image_feature_extractor: CLIPImageProcessor # 声明图像特征提取器的类型
image_encoder: CLIPVisionModelWithProjection # 声明图像编码器的类型
image_unet: UNet2DConditionModel # 声明 UNet 的类型
vae: AutoencoderKL # 声明变分自编码器的类型
scheduler: KarrasDiffusionSchedulers # 声明调度器的类型
def __init__( # 定义初始化方法
self, # 当前实例
image_feature_extractor: CLIPImageProcessor, # 图像特征提取器参数
image_encoder: CLIPVisionModelWithProjection, # 图像编码器参数
image_unet: UNet2DConditionModel, # UNet 参数
vae: AutoencoderKL, # 变分自编码器参数
scheduler: KarrasDiffusionSchedulers, # 调度器参数
):
# 调用父类的初始化方法
super().__init__()
# 注册模块,包括图像特征提取器、图像编码器、图像 UNet、VAE 和调度器
self.register_modules(
image_feature_extractor=image_feature_extractor,
image_encoder=image_encoder,
image_unet=image_unet,
vae=vae,
scheduler=scheduler,
)
# 计算 VAE 的缩放因子,基于输出通道的数量
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建 VAE 图像处理器实例,传入缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 从 StableDiffusionPipeline 复制的解码潜在变量的方法
def decode_latents(self, latents):
# 警告消息,表明该方法已弃用
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 调用弃用警告函数,通知用户该方法即将被移除
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 配置缩放潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,获取生成的图像
image = self.vae.decode(latents, return_dict=False)[0]
# 归一化图像数据,确保其在 [0, 1] 范围内
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像数据转换为 float32 格式,并将维度顺序调整为 (批量, 高, 宽, 通道)
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从 StableDiffusionPipeline 复制的准备额外步骤参数的方法
def prepare_extra_step_kwargs(self, generator, eta):
# 为调度器步骤准备额外的参数,因为并非所有调度器的参数签名相同
# eta 仅在 DDIMScheduler 中使用,其他调度器将忽略该参数
# eta 在 DDIM 论文中的值应在 [0, 1] 范围内
# 检查调度器步骤是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
# 初始化额外步骤参数字典
extra_step_kwargs = {}
if accepts_eta:
# 如果接受 eta,则将其添加到额外参数中
extra_step_kwargs["eta"] = eta
# 检查调度器步骤是否接受生成器参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
# 如果接受生成器,则将其添加到额外参数中
extra_step_kwargs["generator"] = generator
# 返回准备好的额外步骤参数
return extra_step_kwargs
# 从 StableDiffusionImageVariationPipeline 复制的输入检查方法
# 检查输入参数的有效性
def check_inputs(self, image, height, width, callback_steps):
# 确保 image 是 torch.Tensor、PIL.Image.Image 或者列表类型
if (
not isinstance(image, torch.Tensor)
and not isinstance(image, PIL.Image.Image)
and not isinstance(image, list)
):
# 如果类型不符合,抛出值错误
raise ValueError(
"`image` has to be of type `torch.Tensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is"
f" {type(image)}"
)
# 确保 height 和 width 是 8 的倍数
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查 callback_steps 是否为正整数
if (callback_steps is None) or (
callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0)
):
# 如果不符合,抛出值错误
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制的
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在张量的形状
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与 batch_size 匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果 latents 为 None,则随机生成潜在张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 否则将 latents 转移到指定设备
latents = latents.to(device)
# 按调度器所需的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回处理后的潜在张量
return latents
# 禁用梯度计算
@torch.no_grad()
def __call__(
# 定义输入参数,包括图像、尺寸、推理步数等
image: Union[PIL.Image.Image, List[PIL.Image.Image], torch.Tensor],
height: Optional[int] = None,
width: Optional[int] = None,
num_inference_steps: int = 50,
guidance_scale: float = 7.5,
negative_prompt: Optional[Union[str, List[str]]] = None,
num_images_per_prompt: Optional[int] = 1,
eta: float = 0.0,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.Tensor] = None,
output_type: Optional[str] = "pil",
return_dict: bool = True,
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
callback_steps: int = 1,
**kwargs,
.\diffusers\pipelines\deprecated\versatile_diffusion\pipeline_versatile_diffusion_text_to_image.py
# 版权所有 2024 HuggingFace 团队。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)进行许可;
# 除非遵守许可证,否则您不得使用此文件。
# 您可以在以下网址获取许可证的副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律或书面同意,否则根据许可证分发的软件在“按原样”基础上分发,
# 不提供任何形式的明示或暗示的担保或条件。
# 有关特定语言所管辖的权限和限制,请参阅许可证。
# 导入 inspect 模块以进行代码检查
import inspect
# 导入用于类型注释的相关类型
from typing import Callable, List, Optional, Union
# 导入 PyTorch 库
import torch
# 导入 PyTorch 的检查点工具
import torch.utils.checkpoint
# 从 transformers 库导入 CLIP 相关模型和处理器
from transformers import CLIPImageProcessor, CLIPTextModelWithProjection, CLIPTokenizer
# 从相对路径导入自定义图像处理器
from ....image_processor import VaeImageProcessor
# 从相对路径导入自定义模型
from ....models import AutoencoderKL, Transformer2DModel, UNet2DConditionModel
# 从相对路径导入调度器
from ....schedulers import KarrasDiffusionSchedulers
# 从相对路径导入工具函数和日志记录
from ....utils import deprecate, logging
from ....utils.torch_utils import randn_tensor
# 从相对路径导入 DiffusionPipeline 和 ImagePipelineOutput
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput
# 从相对路径导入文本 UNet 模型
from .modeling_text_unet import UNetFlatConditionModel
# 初始化日志记录器,使用当前模块名称
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
# 定义 VersatileDiffusionTextToImagePipeline 类,继承自 DiffusionPipeline
class VersatileDiffusionTextToImagePipeline(DiffusionPipeline):
r"""
用于文本到图像生成的管道,使用 Versatile Diffusion。
该模型继承自 [`DiffusionPipeline`]。请查看超类文档,以获取所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
参数:
vqvae ([`VQModel`]):
向量量化(VQ)模型,用于将图像编码和解码为潜在表示。
bert ([`LDMBertModel`]):
基于 [`~transformers.BERT`] 的文本编码器模型。
tokenizer ([`~transformers.BertTokenizer`]):
用于标记文本的 `BertTokenizer`。
unet ([`UNet2DConditionModel`]):
用于对编码的图像潜在数据进行去噪的 `UNet2DConditionModel`。
scheduler ([`SchedulerMixin`]):
用于与 `unet` 一起去噪编码图像潜在数据的调度器。可以是
[`DDIMScheduler`], [`LMSDiscreteScheduler`] 或 [`PNDMScheduler`]。
"""
# 定义模型的 CPU 卸载顺序
model_cpu_offload_seq = "bert->unet->vqvae"
# 声明 tokenizer 类型为 CLIPTokenizer
tokenizer: CLIPTokenizer
# 声明图像特征提取器类型为 CLIPImageProcessor
image_feature_extractor: CLIPImageProcessor
# 声明文本编码器类型为 CLIPTextModelWithProjection
text_encoder: CLIPTextModelWithProjection
# 声明图像 UNet 类型为 UNet2DConditionModel
image_unet: UNet2DConditionModel
# 声明文本 UNet 类型为 UNetFlatConditionModel
text_unet: UNetFlatConditionModel
# 声明 VAE 类型为 AutoencoderKL
vae: AutoencoderKL
# 声明调度器类型为 KarrasDiffusionSchedulers
scheduler: KarrasDiffusionSchedulers
# 定义可选组件列表,包含文本 UNet
_optional_components = ["text_unet"]
# 初始化方法,接受多个参数
def __init__(
# 初始化 tokenizer,类型为 CLIPTokenizer
self,
tokenizer: CLIPTokenizer,
# 初始化文本编码器,类型为 CLIPTextModelWithProjection
text_encoder: CLIPTextModelWithProjection,
# 初始化图像 UNet,类型为 UNet2DConditionModel
image_unet: UNet2DConditionModel,
# 初始化文本 UNet,类型为 UNetFlatConditionModel
text_unet: UNetFlatConditionModel,
# 初始化 VAE,类型为 AutoencoderKL
vae: AutoencoderKL,
# 初始化调度器,类型为 KarrasDiffusionSchedulers
scheduler: KarrasDiffusionSchedulers,
):
# 调用父类的初始化方法
super().__init__()
# 注册各个模块,提供必要的组件
self.register_modules(
tokenizer=tokenizer, # 注册分词器
text_encoder=text_encoder, # 注册文本编码器
image_unet=image_unet, # 注册图像 UNet 模型
text_unet=text_unet, # 注册文本 UNet 模型
vae=vae, # 注册变分自编码器
scheduler=scheduler, # 注册调度器
)
# 计算 VAE 的缩放因子
self.vae_scale_factor = 2 ** (len(self.vae.config.block_out_channels) - 1)
# 创建图像处理器,使用计算得到的缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 如果文本 UNet 模型存在,则交换其注意力块
if self.text_unet is not None:
self._swap_unet_attention_blocks()
def _swap_unet_attention_blocks(self):
"""
在图像和文本 UNet 之间交换 `Transformer2DModel` 块
"""
# 遍历图像 UNet 的所有命名模块
for name, module in self.image_unet.named_modules():
# 如果模块是 Transformer2DModel 类型
if isinstance(module, Transformer2DModel):
# 分离父模块名称和索引
parent_name, index = name.rsplit(".", 1)
index = int(index) # 将索引转换为整数
# 交换图像 UNet 和文本 UNet 的相应模块
self.image_unet.get_submodule(parent_name)[index], self.text_unet.get_submodule(parent_name)[index] = (
self.text_unet.get_submodule(parent_name)[index],
self.image_unet.get_submodule(parent_name)[index],
)
def remove_unused_weights(self):
# 注册文本 UNet 为 None,以移除未使用的权重
self.register_modules(text_unet=None)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的 decode_latents 方法
def decode_latents(self, latents):
# 设置弃用警告信息
deprecation_message = "The decode_latents method is deprecated and will be removed in 1.0.0. Please use VaeImageProcessor.postprocess(...) instead"
# 发出弃用警告
deprecate("decode_latents", "1.0.0", deprecation_message, standard_warn=False)
# 根据 VAE 的缩放因子调整潜在变量
latents = 1 / self.vae.config.scaling_factor * latents
# 解码潜在变量,得到图像
image = self.vae.decode(latents, return_dict=False)[0]
# 对图像进行归一化处理
image = (image / 2 + 0.5).clamp(0, 1)
# 将图像转换为 float32 格式,兼容 bfloat16
image = image.cpu().permute(0, 2, 3, 1).float().numpy()
# 返回处理后的图像
return image
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline 复制的 prepare_extra_step_kwargs 方法
def prepare_extra_step_kwargs(self, generator, eta):
# 准备调度器步骤的额外参数,因为不同调度器的签名不同
# eta (η) 仅在 DDIMScheduler 中使用,其他调度器将被忽略。
# eta 对应于 DDIM 论文中的 η: https://arxiv.org/abs/2010.02502
# 应该在 [0, 1] 之间
# 检查调度器是否接受 eta 参数
accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys())
extra_step_kwargs = {} # 初始化额外参数字典
if accepts_eta:
extra_step_kwargs["eta"] = eta # 如果接受,添加 eta 参数
# 检查调度器是否接受 generator 参数
accepts_generator = "generator" in set(inspect.signature(self.scheduler.step).parameters.keys())
if accepts_generator:
extra_step_kwargs["generator"] = generator # 如果接受,添加 generator 参数
# 返回准备好的额外参数字典
return extra_step_kwargs
# 定义一个检查输入参数的函数,确保传入的参数符合要求
def check_inputs(
self, # 类的实例对象
prompt, # 文本提示,可能是字符串或列表
height, # 图像高度
width, # 图像宽度
callback_steps, # 回调的步骤数
negative_prompt=None, # 负面提示,可选参数
prompt_embeds=None, # 提示的嵌入向量,可选参数
negative_prompt_embeds=None, # 负面提示的嵌入向量,可选参数
callback_on_step_end_tensor_inputs=None, # 在步骤结束时的回调输入,可选参数
):
# 检查高度和宽度是否是8的倍数
if height % 8 != 0 or width % 8 != 0:
# 如果不是,抛出值错误,提示高度和宽度的要求
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查回调步骤的类型和有效性
if callback_steps is not None and (not isinstance(callback_steps, int) or callback_steps <= 0):
# 如果回调步骤无效,抛出值错误
raise ValueError(
f"`callback_steps` has to be a positive integer but is {callback_steps} of type"
f" {type(callback_steps)}."
)
# 检查给定的回调输入是否在允许的输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有不在允许输入中的项,抛出值错误
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查提示和提示嵌入的互斥性
if prompt is not None and prompt_embeds is not None:
# 如果两者都提供,抛出值错误
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查是否至少提供一个提示
elif prompt is None and prompt_embeds is None:
# 如果两个都未提供,抛出值错误
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查提示的类型是否有效
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 如果类型无效,抛出值错误
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查负面提示和负面提示嵌入的互斥性
if negative_prompt is not None and negative_prompt_embeds is not None:
# 如果两者都提供,抛出值错误
raise ValueError(
f"Cannot forward both `negative_prompt`: {negative_prompt} and `negative_prompt_embeds`:"
f" {negative_prompt_embeds}. Please make sure to only forward one of the two."
)
# 检查提示嵌入和负面提示嵌入的形状是否一致
if prompt_embeds is not None and negative_prompt_embeds is not None:
if prompt_embeds.shape != negative_prompt_embeds.shape:
# 如果形状不一致,抛出值错误
raise ValueError(
"`prompt_embeds` and `negative_prompt_embeds` must have the same shape when passed directly, but"
f" got: `prompt_embeds` {prompt_embeds.shape} != `negative_prompt_embeds`"
f" {negative_prompt_embeds.shape}."
)
# 从 diffusers.pipelines.stable_diffusion.pipeline_stable_diffusion.StableDiffusionPipeline.prepare_latents 复制而来
# 准备潜在变量,设置其形状与参数
def prepare_latents(self, batch_size, num_channels_latents, height, width, dtype, device, generator, latents=None):
# 定义潜在变量的形状,考虑到 VAE 的缩放因子
shape = (
batch_size,
num_channels_latents,
int(height) // self.vae_scale_factor,
int(width) // self.vae_scale_factor,
)
# 检查生成器列表的长度是否与批处理大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 如果没有传入潜在变量,则生成随机张量
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
# 如果提供了潜在变量,则将其移动到指定设备
latents = latents.to(device)
# 根据调度器要求的标准差缩放初始噪声
latents = latents * self.scheduler.init_noise_sigma
# 返回准备好的潜在变量
return latents
# 禁用梯度计算,以节省内存和提高性能
@torch.no_grad()
def __call__(
# 提示信息,可以是字符串或字符串列表
prompt: Union[str, List[str]],
# 图像高度,可选
height: Optional[int] = None,
# 图像宽度,可选
width: Optional[int] = None,
# 推理步骤数量,默认为50
num_inference_steps: int = 50,
# 引导比例,默认为7.5
guidance_scale: float = 7.5,
# 负提示信息,可选
negative_prompt: Optional[Union[str, List[str]]] = None,
# 每个提示生成的图像数量,默认为1
num_images_per_prompt: Optional[int] = 1,
# eta 值,默认为0.0
eta: float = 0.0,
# 生成器,可选
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,可选
latents: Optional[torch.Tensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式的输出,默认为 True
return_dict: bool = True,
# 回调函数,可选
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None,
# 回调步骤间隔,默认为1
callback_steps: int = 1,
# 其他关键字参数
**kwargs,
.\diffusers\pipelines\deprecated\versatile_diffusion\__init__.py
# 从类型检查模块导入 TYPE_CHECKING,用于静态类型检查
from typing import TYPE_CHECKING
# 从上级模块的 utils 导入所需的功能和常量
from ....utils import (
DIFFUSERS_SLOW_IMPORT, # 表示是否进行慢速导入
OptionalDependencyNotAvailable, # 表示可选依赖项不可用的异常
_LazyModule, # 用于延迟加载模块的工具
is_torch_available, # 检查 PyTorch 是否可用的函数
is_transformers_available, # 检查 Transformers 是否可用的函数
is_transformers_version, # 检查 Transformers 版本的函数
)
# 初始化一个空字典,用于存放假对象
_dummy_objects = {}
# 初始化一个空字典,用于存放模块的导入结构
_import_structure = {}
# 尝试检查必要的依赖项是否可用
try:
# 如果 Transformers 和 PyTorch 不可用,或者 Transformers 版本不符合要求,抛出异常
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入假对象以避免缺失依赖时的错误
from ....utils.dummy_torch_and_transformers_objects import (
VersatileDiffusionDualGuidedPipeline, # 导入双重引导管道
VersatileDiffusionImageVariationPipeline, # 导入图像变化管道
VersatileDiffusionPipeline, # 导入通用扩散管道
VersatileDiffusionTextToImagePipeline, # 导入文本到图像管道
)
# 更新假对象字典
_dummy_objects.update(
{
"VersatileDiffusionDualGuidedPipeline": VersatileDiffusionDualGuidedPipeline,
"VersatileDiffusionImageVariationPipeline": VersatileDiffusionImageVariationPipeline,
"VersatileDiffusionPipeline": VersatileDiffusionPipeline,
"VersatileDiffusionTextToImagePipeline": VersatileDiffusionTextToImagePipeline,
}
)
# 如果依赖项可用,则设置导入结构
else:
_import_structure["modeling_text_unet"] = ["UNetFlatConditionModel"] # 设置文本 UNet 模型的导入结构
_import_structure["pipeline_versatile_diffusion"] = ["VersatileDiffusionPipeline"] # 设置通用扩散管道的导入结构
_import_structure["pipeline_versatile_diffusion_dual_guided"] = ["VersatileDiffusionDualGuidedPipeline"] # 设置双重引导管道的导入结构
_import_structure["pipeline_versatile_diffusion_image_variation"] = ["VersatileDiffusionImageVariationPipeline"] # 设置图像变化管道的导入结构
_import_structure["pipeline_versatile_diffusion_text_to_image"] = ["VersatileDiffusionTextToImagePipeline"] # 设置文本到图像管道的导入结构
# 检查是否为类型检查或是否需要慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 如果 Transformers 和 PyTorch 不可用,或者 Transformers 版本不符合要求,抛出异常
if not (is_transformers_available() and is_torch_available() and is_transformers_version(">=", "4.25.0")):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项不可用的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入假对象以避免缺失依赖时的错误
from ....utils.dummy_torch_and_transformers_objects import (
VersatileDiffusionDualGuidedPipeline, # 导入双重引导管道
VersatileDiffusionImageVariationPipeline, # 导入图像变化管道
VersatileDiffusionPipeline, # 导入通用扩散管道
VersatileDiffusionTextToImagePipeline, # 导入文本到图像管道
)
# 如果依赖项可用,则导入实际的管道实现
else:
from .pipeline_versatile_diffusion import VersatileDiffusionPipeline # 导入通用扩散管道
from .pipeline_versatile_diffusion_dual_guided import VersatileDiffusionDualGuidedPipeline # 导入双重引导管道
from .pipeline_versatile_diffusion_image_variation import VersatileDiffusionImageVariationPipeline # 导入图像变化管道
from .pipeline_versatile_diffusion_text_to_image import VersatileDiffusionTextToImagePipeline # 导入文本到图像管道
# 如果不是类型检查或慢速导入
else:
import sys # 导入系统模块
# 使用延迟加载模块的方式替换当前模块
sys.modules[__name__] = _LazyModule(
__name__, # 模块名称
globals()["__file__"], # 当前文件路径
_import_structure, # 导入结构
module_spec=__spec__, # 模块的规范
)
# 将假对象的属性设置到当前模块
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value) # 设置假对象的属性
.\diffusers\pipelines\deprecated\vq_diffusion\pipeline_vq_diffusion.py
# 版权所有 2024 Microsoft 和 The HuggingFace Team。保留所有权利。
#
# 根据 Apache 许可证,版本 2.0(“许可证”)授权;
# 除非遵守许可证,否则不得使用此文件。
# 可以在以下网址获取许可证副本:
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,否则根据许可证分发的软件
# 是按“原样”基础分发的,没有任何明示或暗示的担保或条件。
# 有关许可证下特定语言的权限和限制,请参阅许可证。
# 从 typing 模块导入所需的类型
from typing import Callable, List, Optional, Tuple, Union
# 导入 PyTorch 库
import torch
# 从 transformers 库导入 CLIP 相关模型和分词器
from transformers import CLIPTextModel, CLIPTokenizer
# 从本地模块导入配置和模型相关的类
from ....configuration_utils import ConfigMixin, register_to_config
from ....models import ModelMixin, Transformer2DModel, VQModel
from ....schedulers import VQDiffusionScheduler
from ....utils import logging
from ...pipeline_utils import DiffusionPipeline, ImagePipelineOutput
# 创建一个 logger 实例,用于记录日志信息,禁用 pylint 的命名检查
logger = logging.get_logger(__name__) # pylint: disable=invalid-name
class LearnedClassifierFreeSamplingEmbeddings(ModelMixin, ConfigMixin):
"""
存储用于分类器自由采样的学习文本嵌入的实用类
"""
@register_to_config
# 初始化方法,接受可学习标志和可选的隐藏层大小及长度
def __init__(self, learnable: bool, hidden_size: Optional[int] = None, length: Optional[int] = None):
# 调用父类初始化方法
super().__init__()
# 设置可学习标志
self.learnable = learnable
# 如果可学习,检查隐藏层大小和长度是否被设置
if self.learnable:
# 确保在可学习时隐藏层大小不为空
assert hidden_size is not None, "learnable=True requires `hidden_size` to be set"
# 确保在可学习时长度不为空
assert length is not None, "learnable=True requires `length` to be set"
# 创建一个形状为 (length, hidden_size) 的全零张量作为嵌入
embeddings = torch.zeros(length, hidden_size)
else:
# 如果不可学习,嵌入设为 None
embeddings = None
# 将嵌入转换为可学习参数
self.embeddings = torch.nn.Parameter(embeddings)
class VQDiffusionPipeline(DiffusionPipeline):
r"""
使用 VQ Diffusion 进行文本到图像生成的管道。
此模型继承自 [`DiffusionPipeline`]。查看超类文档以获取所有管道实现的通用方法
(下载、保存、在特定设备上运行等)。
参数:
vqvae ([`VQModel`]):
用于编码和解码图像到潜在表示的向量量化变分自编码器(VAE)模型。
text_encoder ([`~transformers.CLIPTextModel`]):
冻结的文本编码器([clip-vit-base-patch32](https://huggingface.co/openai/clip-vit-base-patch32))。
tokenizer ([`~transformers.CLIPTokenizer`]):
用于分词的 `CLIPTokenizer`。
transformer ([`Transformer2DModel`]):
用于去噪编码图像潜在的条件 `Transformer2DModel`。
scheduler ([`VQDiffusionScheduler`]):
用于与 `transformer` 一起去噪编码图像潜在的调度器。
"""
# 定义类属性 vqvae,类型为 VQModel
vqvae: VQModel
# 定义类属性 text_encoder,类型为 CLIPTextModel
text_encoder: CLIPTextModel
# 定义类属性 tokenizer,类型为 CLIPTokenizer
tokenizer: CLIPTokenizer
# 定义一个包含多个模型和调度器的类
transformer: Transformer2DModel # 2D 变换器模型
learned_classifier_free_sampling_embeddings: LearnedClassifierFreeSamplingEmbeddings # 学习的分类器自由采样嵌入
scheduler: VQDiffusionScheduler # VQ 扩散调度器
# 初始化方法,接受多个模型和调度器作为参数
def __init__(
self,
vqvae: VQModel, # VQ-VAE 模型
text_encoder: CLIPTextModel, # 文本编码器模型
tokenizer: CLIPTokenizer, # 分词器
transformer: Transformer2DModel, # 2D 变换器模型
scheduler: VQDiffusionScheduler, # VQ 扩散调度器
learned_classifier_free_sampling_embeddings: LearnedClassifierFreeSamplingEmbeddings, # 学习的分类器自由采样嵌入
):
super().__init__() # 调用父类的初始化方法
# 注册多个模块,使其在模型中可用
self.register_modules(
vqvae=vqvae, # 注册 VQ-VAE 模型
transformer=transformer, # 注册 2D 变换器
text_encoder=text_encoder, # 注册文本编码器
tokenizer=tokenizer, # 注册分词器
scheduler=scheduler, # 注册调度器
learned_classifier_free_sampling_embeddings=learned_classifier_free_sampling_embeddings, # 注册自由采样嵌入
)
# 禁用梯度计算,优化内存使用
@torch.no_grad()
def __call__(
self,
prompt: Union[str, List[str]], # 输入的提示,可以是字符串或字符串列表
num_inference_steps: int = 100, # 推理步骤数,默认为100
guidance_scale: float = 5.0, # 引导比例,默认为5.0
truncation_rate: float = 1.0, # 截断比例,默认为1.0
num_images_per_prompt: int = 1, # 每个提示生成的图像数量,默认为1
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, # 随机数生成器,默认为None
latents: Optional[torch.Tensor] = None, # 潜在变量,默认为None
output_type: Optional[str] = "pil", # 输出类型,默认为"pil"
return_dict: bool = True, # 是否返回字典,默认为True
callback: Optional[Callable[[int, int, torch.Tensor], None]] = None, # 回调函数,默认为None
callback_steps: int = 1, # 回调步骤,默认为1
# 定义一个截断方法,用于调整概率分布
def truncate(self, log_p_x_0: torch.Tensor, truncation_rate: float) -> torch.Tensor:
"""
截断 `log_p_x_0`,使每列向量的总累积概率等于 `truncation_rate`
低于该比例的概率将被设置为零。
"""
# 对 log 概率进行排序,并获取索引
sorted_log_p_x_0, indices = torch.sort(log_p_x_0, 1, descending=True)
# 计算排序后概率的指数值
sorted_p_x_0 = torch.exp(sorted_log_p_x_0)
# 创建掩码,标记哪些概率的累积和低于截断比例
keep_mask = sorted_p_x_0.cumsum(dim=1) < truncation_rate
# 确保至少保留最大概率
all_true = torch.full_like(keep_mask[:, 0:1, :], True) # 创建全为 True 的张量
keep_mask = torch.cat((all_true, keep_mask), dim=1) # 在掩码前添加全 True 的行
keep_mask = keep_mask[:, :-1, :] # 移除最后一列
# 根据原始索引排序掩码
keep_mask = keep_mask.gather(1, indices.argsort(1))
rv = log_p_x_0.clone() # 复制输入的 log 概率
rv[~keep_mask] = -torch.inf # 将未保留的概率设置为负无穷(即 log(0))
return rv # 返回调整后的概率
.\diffusers\pipelines\deprecated\vq_diffusion\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,以便在类型检查时使用
from typing import TYPE_CHECKING
# 从上层模块的 utils 导入一些工具和常量
from ....utils import (
DIFFUSERS_SLOW_IMPORT, # 导入标志,指示是否慢速导入
OptionalDependencyNotAvailable, # 导入异常,用于处理可选依赖未满足的情况
_LazyModule, # 导入懒加载模块的类
is_torch_available, # 导入函数,检查 PyTorch 是否可用
is_transformers_available, # 导入函数,检查 transformers 是否可用
)
# 初始化一个空字典,用于存储虚拟对象
_dummy_objects = {}
# 初始化一个空字典,用于存储模块导入结构
_import_structure = {}
# 尝试检查依赖是否可用
try:
# 如果 transformers 和 torch 都不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从 dummy_torch_and_transformers_objects 模块导入虚拟对象
from ....utils.dummy_torch_and_transformers_objects import (
LearnedClassifierFreeSamplingEmbeddings, # 导入虚拟的学习分类器对象
VQDiffusionPipeline, # 导入虚拟的 VQ 扩散管道对象
)
# 更新 _dummy_objects 字典,添加导入的虚拟对象
_dummy_objects.update(
{
"LearnedClassifierFreeSamplingEmbeddings": LearnedClassifierFreeSamplingEmbeddings, # 添加学习分类器对象
"VQDiffusionPipeline": VQDiffusionPipeline, # 添加 VQ 扩散管道对象
}
)
# 如果没有抛出异常,执行以下代码
else:
# 更新 _import_structure 字典,添加实际模块的路径
_import_structure["pipeline_vq_diffusion"] = ["LearnedClassifierFreeSamplingEmbeddings", "VQDiffusionPipeline"] # 指定管道模块的导入
# 如果类型检查或慢速导入标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 尝试检查依赖是否可用
try:
# 如果 transformers 和 torch 都不可用,抛出异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖不可用的异常
except OptionalDependencyNotAvailable:
# 从 dummy_torch_and_transformers_objects 模块导入虚拟对象
from ....utils.dummy_torch_and_transformers_objects import (
LearnedClassifierFreeSamplingEmbeddings, # 导入虚拟的学习分类器对象
VQDiffusionPipeline, # 导入虚拟的 VQ 扩散管道对象
)
# 如果没有抛出异常,执行以下代码
else:
# 从 pipeline_vq_diffusion 模块导入实际对象
from .pipeline_vq_diffusion import LearnedClassifierFreeSamplingEmbeddings, VQDiffusionPipeline # 导入实际的学习分类器和 VQ 扩散管道对象
# 如果不在类型检查或慢速导入模式
else:
import sys # 导入 sys 模块,用于操作 Python 运行时环境
# 使用懒加载模块创建一个新的模块
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块名称
globals()["__file__"], # 当前模块文件路径
_import_structure, # 导入结构字典
module_spec=__spec__, # 模块的规格
)
# 遍历 _dummy_objects 字典,将虚拟对象设置到当前模块中
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value) # 设置虚拟对象到模块属性中
.\diffusers\pipelines\deprecated\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从上层模块的 utils 中导入多个工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 慢速导入标识
OptionalDependencyNotAvailable, # 可选依赖未找到异常
_LazyModule, # 懒加载模块
get_objects_from_module, # 从模块获取对象的工具函数
is_librosa_available, # 检查 librosa 库是否可用
is_note_seq_available, # 检查 note_seq 库是否可用
is_torch_available, # 检查 torch 库是否可用
is_transformers_available, # 检查 transformers 库是否可用
)
# 用于存储虚拟对象的字典
_dummy_objects = {}
# 用于存储导入结构的字典
_import_structure = {}
try:
# 检查 torch 是否可用,若不可用则引发异常
if not is_torch_available():
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入虚拟 PyTorch 对象
from ...utils import dummy_pt_objects
# 更新虚拟对象字典,以包含从虚拟对象模块获取的对象
_dummy_objects.update(get_objects_from_module(dummy_pt_objects))
# 如果 torch 可用,则定义导入结构
else:
_import_structure["latent_diffusion_uncond"] = ["LDMPipeline"] # 添加 LDM 管道
_import_structure["pndm"] = ["PNDMPipeline"] # 添加 PNDM 管道
_import_structure["repaint"] = ["RePaintPipeline"] # 添加 RePaint 管道
_import_structure["score_sde_ve"] = ["ScoreSdeVePipeline"] # 添加 ScoreSDE VE 管道
_import_structure["stochastic_karras_ve"] = ["KarrasVePipeline"] # 添加 Karras VE 管道
try:
# 检查 transformers 和 torch 是否都可用,若不可用则引发异常
if not (is_transformers_available() and is_torch_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入虚拟 torch 和 transformers 对象
from ...utils import dummy_torch_and_transformers_objects
# 更新虚拟对象字典,以包含从虚拟对象模块获取的对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
# 如果两者都可用,则定义导入结构
else:
_import_structure["alt_diffusion"] = [ # 添加替代扩散相关的管道
"AltDiffusionImg2ImgPipeline",
"AltDiffusionPipeline",
"AltDiffusionPipelineOutput",
]
_import_structure["versatile_diffusion"] = [ # 添加多功能扩散相关的管道
"VersatileDiffusionDualGuidedPipeline",
"VersatileDiffusionImageVariationPipeline",
"VersatileDiffusionPipeline",
"VersatileDiffusionTextToImagePipeline",
]
_import_structure["vq_diffusion"] = ["VQDiffusionPipeline"] # 添加 VQ 扩散管道
_import_structure["stable_diffusion_variants"] = [ # 添加稳定扩散变体相关的管道
"CycleDiffusionPipeline",
"StableDiffusionInpaintPipelineLegacy",
"StableDiffusionPix2PixZeroPipeline",
"StableDiffusionParadigmsPipeline",
"StableDiffusionModelEditingPipeline",
]
try:
# 检查 torch 和 librosa 是否都可用,若不可用则引发异常
if not (is_torch_available() and is_librosa_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入虚拟 torch 和 librosa 对象
from ...utils import dummy_torch_and_librosa_objects # noqa F403
# 更新虚拟对象字典,以包含从虚拟对象模块获取的对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_librosa_objects))
else:
_import_structure["audio_diffusion"] = ["AudioDiffusionPipeline", "Mel"] # 添加音频扩散相关的管道
try:
# 检查 transformers、torch 和 note_seq 是否都可用,若不可用则引发异常
if not (is_transformers_available() and is_torch_available() and is_note_seq_available()):
raise OptionalDependencyNotAvailable()
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
# 从 utils 中导入虚拟 transformers、torch 和 note_seq 对象
from ...utils import dummy_transformers_and_torch_and_note_seq_objects # noqa F403
# 更新虚拟对象字典,以包含从虚拟对象模块获取的对象
_dummy_objects.update(get_objects_from_module(dummy_transformers_and_torch_and_note_seq_objects))
else:
_import_structure["spectrogram_diffusion"] = ["MidiProcessor", "SpectrogramDiffusionPipeline"] # 添加频谱扩散相关的管道
# 如果正在进行类型检查或设置为慢速导入,则执行检查
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 检查 torch 是否可用,若不可用则引发异常
if not is_torch_available():
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未找到的异常
except OptionalDependencyNotAvailable:
# 从虚拟模块导入占位符对象
from ...utils.dummy_pt_objects import *
# 如果没有抛出异常,导入以下管道类
else:
from .latent_diffusion_uncond import LDMPipeline
from .pndm import PNDMPipeline
from .repaint import RePaintPipeline
from .score_sde_ve import ScoreSdeVePipeline
from .stochastic_karras_ve import KarrasVePipeline
# 尝试检查是否可用所需的库
try:
if not (is_transformers_available() and is_torch_available()):
# 如果库不可用,抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未找到的异常
except OptionalDependencyNotAvailable:
# 从虚拟模块导入占位符对象
from ...utils.dummy_torch_and_transformers_objects import *
# 如果没有抛出异常,导入以下管道和类
else:
from .alt_diffusion import AltDiffusionImg2ImgPipeline, AltDiffusionPipeline, AltDiffusionPipelineOutput
from .audio_diffusion import AudioDiffusionPipeline, Mel
from .spectrogram_diffusion import SpectrogramDiffusionPipeline
from .stable_diffusion_variants import (
CycleDiffusionPipeline,
StableDiffusionInpaintPipelineLegacy,
StableDiffusionModelEditingPipeline,
StableDiffusionParadigmsPipeline,
StableDiffusionPix2PixZeroPipeline,
)
from .stochastic_karras_ve import KarrasVePipeline
from .versatile_diffusion import (
VersatileDiffusionDualGuidedPipeline,
VersatileDiffusionImageVariationPipeline,
VersatileDiffusionPipeline,
VersatileDiffusionTextToImagePipeline,
)
from .vq_diffusion import VQDiffusionPipeline
# 尝试检查是否可用所需的音频库
try:
if not (is_torch_available() and is_librosa_available()):
# 如果库不可用,抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未找到的异常
except OptionalDependencyNotAvailable:
# 从虚拟模块导入占位符对象
from ...utils.dummy_torch_and_librosa_objects import *
# 如果没有抛出异常,导入音频相关类
else:
from .audio_diffusion import AudioDiffusionPipeline, Mel
# 尝试检查是否可用所有必要的库
try:
if not (is_transformers_available() and is_torch_available() and is_note_seq_available()):
# 如果库不可用,抛出异常
raise OptionalDependencyNotAvailable()
# 捕获可选依赖项未找到的异常
except OptionalDependencyNotAvailable:
# 从虚拟模块导入占位符对象
from ...utils.dummy_transformers_and_torch_and_note_seq_objects import * # noqa F403
# 如果没有抛出异常,导入音频频谱相关类
else:
from .spectrogram_diffusion import (
MidiProcessor,
SpectrogramDiffusionPipeline,
)
else:
# 导入 sys 模块以便访问 Python 的模块系统
import sys
# 将当前模块名(__name__)的 sys.modules 条目替换为一个懒加载模块对象
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块的名称
globals()["__file__"], # 当前模块的文件路径
_import_structure, # 用于指定模块的导入结构
module_spec=__spec__, # 模块的规格,提供模块的元数据
)
# 遍历 _dummy_objects 字典,将每个对象的名称和值设置到当前模块中
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value) # 为当前模块设置属性
.\diffusers\pipelines\dit\pipeline_dit.py
# Attribution-NonCommercial 4.0 International (CC BY-NC 4.0) # 版权声明,说明此文件的使用限制
# William Peebles and Saining Xie # 贡献者姓名
#
# Copyright (c) 2021 OpenAI # 表示 OpenAI 对该文件的版权
# MIT License # 说明该文件遵循 MIT 许可证
#
# Copyright 2024 The HuggingFace Team. All rights reserved. # HuggingFace 团队对文件的版权声明
#
# Licensed under the Apache License, Version 2.0 (the "License"); # 说明文件遵循 Apache 2.0 许可证
# you may not use this file except in compliance with the License. # 使用文件的前提条件
# You may obtain a copy of the License at # 提供许可证获取链接
#
# http://www.apache.org/licenses/LICENSE-2.0 # 许可证链接
#
# Unless required by applicable law or agreed to in writing, software # 说明在特定情况下的免责条款
# distributed under the License is distributed on an "AS IS" BASIS, # 文件按现状提供,不提供任何担保
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # 不承担任何明示或暗示的责任
# See the License for the specific language governing permissions and # 提供查看许可证的建议
# limitations under the License. # 说明许可证的限制
from typing import Dict, List, Optional, Tuple, Union # 从 typing 模块导入类型提示功能
import torch # 导入 PyTorch 库
from ...models import AutoencoderKL, DiTTransformer2DModel # 从模型模块导入相关类
from ...schedulers import KarrasDiffusionSchedulers # 从调度器模块导入 KarrasDiffusionSchedulers 类
from ...utils.torch_utils import randn_tensor # 从工具模块导入随机张量生成函数
from ..pipeline_utils import DiffusionPipeline, ImagePipelineOutput # 从管道工具模块导入相关类
class DiTPipeline(DiffusionPipeline): # 定义一个继承自 DiffusionPipeline 的类 DiTPipeline
r""" # 文档字符串,描述此类的功能和参数
Pipeline for image generation based on a Transformer backbone instead of a UNet. # 说明此管道用于基于 Transformer 的图像生成,而非 UNet
This model inherits from [`DiffusionPipeline`]. Check the superclass documentation for the generic methods # 指出该模型继承自 DiffusionPipeline,并建议查看父类文档
implemented for all pipelines (downloading, saving, running on a particular device, etc.). # 指出可用的通用方法
Parameters: # 说明类的参数
transformer ([`DiTTransformer2DModel`]): # transformer 参数,类型为 DiTTransformer2DModel
A class conditioned `DiTTransformer2DModel` to denoise the encoded image latents. # 用于去噪图像潜在表示的 transformer 类
vae ([`AutoencoderKL`]): # vae 参数,类型为 AutoencoderKL
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. # 用于图像编码和解码的变分自编码器模型
scheduler ([`DDIMScheduler`]): # scheduler 参数,类型为 DDIMScheduler
A scheduler to be used in combination with `transformer` to denoise the encoded image latents. # 与 transformer 结合使用以去噪的调度器
"""
model_cpu_offload_seq = "transformer->vae" # 定义模型 CPU 负载卸载的顺序
def __init__( # 定义初始化方法
self,
transformer: DiTTransformer2DModel, # transformer 参数
vae: AutoencoderKL, # vae 参数
scheduler: KarrasDiffusionSchedulers, # scheduler 参数
id2label: Optional[Dict[int, str]] = None, # 可选的 id2label 参数,默认为 None
):
super().__init__() # 调用父类的初始化方法
self.register_modules(transformer=transformer, vae=vae, scheduler=scheduler) # 注册传入的模块
# create a imagenet -> id dictionary for easier use # 创建一个方便使用的 imagenet 到 id 的字典
self.labels = {} # 初始化标签字典
if id2label is not None: # 如果 id2label 参数不为 None
for key, value in id2label.items(): # 遍历 id2label 的键值对
for label in value.split(","): # 分割每个值为多个标签
self.labels[label.lstrip().rstrip()] = int(key) # 去除标签两端空白并存储对应的 id
self.labels = dict(sorted(self.labels.items())) # 对标签字典进行排序
# 获取标签字符串对应的类 ID 的方法
def get_label_ids(self, label: Union[str, List[str]]) -> List[int]:
r"""
将 ImageNet 的标签字符串映射到对应的类 ID。
参数:
label (`str` 或 `dict` of `str`):
要映射到类 ID 的标签字符串。
返回:
`list` of `int`:
要被管道处理的类 ID 列表。
"""
# 检查输入的 label 是否为列表,如果不是则将其转换为列表
if not isinstance(label, list):
label = list(label)
# 遍历标签列表中的每个标签
for l in label:
# 检查标签是否在已知标签列表中,不存在则抛出错误
if l not in self.labels:
raise ValueError(
f"{l} 不存在。请确保选择以下标签之一: \n {self.labels}."
)
# 返回标签对应的类 ID 列表
return [self.labels[l] for l in label]
# 无梯度计算装饰器,禁止计算梯度以节省内存和计算资源
@torch.no_grad()
def __call__(
# 输入的类标签列表
class_labels: List[int],
# 指导比例的默认值
guidance_scale: float = 4.0,
# 可选的随机数生成器或生成器列表
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 推理步骤的默认次数
num_inference_steps: int = 50,
# 输出类型的默认值
output_type: Optional[str] = "pil",
# 是否返回字典的默认值
return_dict: bool = True,
.\diffusers\pipelines\dit\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于静态类型检查
from typing import TYPE_CHECKING
# 从父级目录的 utils 模块导入 DIFFUSERS_SLOW_IMPORT 和 _LazyModule
from ...utils import DIFFUSERS_SLOW_IMPORT, _LazyModule
# 定义一个字典,描述模块结构,包含 pipeline_dit 模块及其下的 DiTPipeline
_import_structure = {"pipeline_dit": ["DiTPipeline"]}
# 检查是否在类型检查阶段或需要慢速导入
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
# 从 pipeline_dit 模块导入 DiTPipeline 类
from .pipeline_dit import DiTPipeline
else:
# 导入 sys 模块
import sys
# 使用 _LazyModule 动态加载模块,并替换当前模块
sys.modules[__name__] = _LazyModule(
__name__, # 当前模块名
globals()["__file__"], # 当前文件路径
_import_structure, # 模块结构字典
module_spec=__spec__, # 模块的规格信息
)
.\diffusers\pipelines\flux\pipeline_flux.py
# 版权声明,标明所有权和许可信息
# Copyright 2024 Black Forest Labs and The HuggingFace Team. All rights reserved.
#
# 依据 Apache License, Version 2.0 进行授权
# 如果不遵循该许可协议,则不得使用此文件
# 可在以下网址获取许可证的副本
# http://www.apache.org/licenses/LICENSE-2.0
#
# 除非适用法律要求或书面同意,软件在“按原样”基础上分发,
# 不提供任何明示或暗示的保证或条件
# 查看许可证以获取特定的权限和限制
import inspect # 导入inspect模块以进行对象的检测
from typing import Any, Callable, Dict, List, Optional, Union # 导入类型注解
import numpy as np # 导入numpy库以进行数值计算
import torch # 导入PyTorch库以进行深度学习
from transformers import CLIPTextModel, CLIPTokenizer, T5EncoderModel, T5TokenizerFast # 导入transformers库中的模型和分词器
from ...image_processor import VaeImageProcessor # 从图像处理模块导入变分自编码器图像处理器
from ...loaders import FluxLoraLoaderMixin # 导入FluxLoraLoaderMixin以处理数据加载
from ...models.autoencoders import AutoencoderKL # 导入KL自编码器模型
from ...models.transformers import FluxTransformer2DModel # 导入Flux 2D变换器模型
from ...schedulers import FlowMatchEulerDiscreteScheduler # 导入调度器以处理时间步进
from ...utils import ( # 导入工具函数
USE_PEFT_BACKEND, # PEFT后端的使用标志
is_torch_xla_available, # 检查Torch XLA可用性
logging, # 导入日志记录模块
replace_example_docstring, # 替换示例文档字符串的工具函数
scale_lora_layers, # 缩放LoRA层的工具函数
unscale_lora_layers, # 取消缩放LoRA层的工具函数
)
from ...utils.torch_utils import randn_tensor # 导入用于生成随机张量的工具函数
from ..pipeline_utils import DiffusionPipeline # 导入扩散管道
from .pipeline_output import FluxPipelineOutput # 导入Flux管道输出
# 检查Torch XLA是否可用,并相应地导入和设置标志
if is_torch_xla_available():
import torch_xla.core.xla_model as xm # 导入XLA模型以支持分布式训练
XLA_AVAILABLE = True # 设置XLA可用标志为True
else:
XLA_AVAILABLE = False # 设置XLA可用标志为False
logger = logging.get_logger(__name__) # 创建日志记录器,以当前模块名作为标识
EXAMPLE_DOC_STRING = """ # 示例文档字符串
Examples:
```py
>>> import torch # 导入PyTorch库
>>> from diffusers import FluxPipeline # 从diffusers导入Flux管道
>>> pipe = FluxPipeline.from_pretrained("black-forest-labs/FLUX.1-schnell", torch_dtype=torch.bfloat16) # 加载预训练的Flux管道
>>> pipe.to("cuda") # 将管道移动到CUDA设备
>>> prompt = "A cat holding a sign that says hello world" # 设置生成图像的提示
>>> # 根据使用的变体,管道调用会略有不同
>>> # 有关更多详细信息,请参阅管道文档
>>> image = pipe(prompt, num_inference_steps=4, guidance_scale=0.0).images[0] # 生成图像
>>> image.save("flux.png") # 保存生成的图像
```py
""" # 示例文档字符串结束
def calculate_shift( # 定义计算图像序列长度的偏移函数
image_seq_len, # 输入参数:图像序列长度
base_seq_len: int = 256, # 基本序列长度,默认为256
max_seq_len: int = 4096, # 最大序列长度,默认为4096
base_shift: float = 0.5, # 基本偏移量,默认为0.5
max_shift: float = 1.16, # 最大偏移量,默认为1.16
):
m = (max_shift - base_shift) / (max_seq_len - base_seq_len) # 计算斜率
b = base_shift - m * base_seq_len # 计算截距
mu = image_seq_len * m + b # 计算偏移量
return mu # 返回计算得到的偏移量
# 从稳定扩散的管道中复制的检索时间步的函数
def retrieve_timesteps( # 定义检索时间步的函数
scheduler, # 输入参数:调度器对象
num_inference_steps: Optional[int] = None, # 可选参数:推理步骤数
device: Optional[Union[str, torch.device]] = None, # 可选参数:设备类型
timesteps: Optional[List[int]] = None, # 可选参数:时间步列表
sigmas: Optional[List[float]] = None, # 可选参数:标准差列表
**kwargs, # 其他关键字参数
):
"""
调用调度器的 `set_timesteps` 方法,并在调用后从调度器中检索时间步
``` # 函数文档字符串开始
```py # 文档字符串结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py # 代码块结束
``` # 代码块结束
```py #
# 函数文档字符串,描述参数和返回值的用途
"""
custom timesteps. Any kwargs will be supplied to `scheduler.set_timesteps`.
Args:
scheduler (`SchedulerMixin`): # 定义一个调度器类的实例,用于获取时间步
The scheduler to get timesteps from.
num_inference_steps (`int`): # 定义推理步骤的数量,用于生成样本
The number of diffusion steps used when generating samples with a pre-trained model. If used, `timesteps`
must be `None`. # 如果使用此参数,`timesteps`必须为None
device (`str` or `torch.device`, *optional*): # 指定将时间步移动到的设备
The device to which the timesteps should be moved to. If `None`, the timesteps are not moved.
timesteps (`List[int]`, *optional*): # 定义自定义时间步以覆盖调度器的时间步策略
Custom timesteps used to override the timestep spacing strategy of the scheduler. If `timesteps` is passed,
`num_inference_steps` and `sigmas` must be `None`. # 如果传递此参数,`num_inference_steps`和`sigmas`必须为None
sigmas (`List[float]`, *optional*): # 定义自定义sigma以覆盖调度器的时间步策略
Custom sigmas used to override the timestep spacing strategy of the scheduler. If `sigmas` is passed,
`num_inference_steps` and `timesteps` must be `None`. # 如果传递此参数,`num_inference_steps`和`timesteps`必须为None
Returns:
`Tuple[torch.Tensor, int]`: A tuple where the first element is the timestep schedule from the scheduler and the
second element is the number of inference steps. # 返回一个包含时间步调度和推理步骤数量的元组
"""
# 检查是否同时提供了自定义时间步和sigma
if timesteps is not None and sigmas is not None:
# 抛出错误,提示只能传递一个参数
raise ValueError("Only one of `timesteps` or `sigmas` can be passed. Please choose one to set custom values")
# 如果提供了自定义时间步
if timesteps is not None:
# 检查调度器是否接受自定义时间步
accepts_timesteps = "timesteps" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accepts_timesteps:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" timestep schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的自定义时间步
scheduler.set_timesteps(timesteps=timesteps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果提供了自定义sigma
elif sigmas is not None:
# 检查调度器是否接受自定义sigma
accept_sigmas = "sigmas" in set(inspect.signature(scheduler.set_timesteps).parameters.keys())
# 如果不接受,抛出错误
if not accept_sigmas:
raise ValueError(
f"The current scheduler class {scheduler.__class__}'s `set_timesteps` does not support custom"
f" sigmas schedules. Please check whether you are using the correct scheduler."
)
# 设置调度器的自定义sigma
scheduler.set_timesteps(sigmas=sigmas, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 计算推理步骤的数量
num_inference_steps = len(timesteps)
# 如果没有提供自定义时间步或sigma
else:
# 根据推理步骤设置调度器的时间步
scheduler.set_timesteps(num_inference_steps, device=device, **kwargs)
# 获取调度器的时间步
timesteps = scheduler.timesteps
# 返回时间步和推理步骤数量的元组
return timesteps, num_inference_steps
# 定义一个名为 FluxPipeline 的类,继承自 DiffusionPipeline 和 FluxLoraLoaderMixin
class FluxPipeline(DiffusionPipeline, FluxLoraLoaderMixin):
r"""
Flux 管道用于文本到图像生成。
参考文献: https://blackforestlabs.ai/announcing-black-forest-labs/
参数:
transformer ([`FluxTransformer2DModel`]):
条件变换器 (MMDiT) 架构,用于去噪编码的图像潜在。
scheduler ([`FlowMatchEulerDiscreteScheduler`]):
与 `transformer` 结合使用的调度器,用于去噪编码的图像潜在。
vae ([`AutoencoderKL`]):
变分自编码器 (VAE) 模型,用于将图像编码和解码为潜在表示。
text_encoder ([`CLIPTextModel`]):
[CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel),特别是
[clip-vit-large-patch14](https://huggingface.co/openai/clip-vit-large-patch14) 变体。
text_encoder_2 ([`T5EncoderModel`]):
[T5](https://huggingface.co/docs/transformers/en/model_doc/t5#transformers.T5EncoderModel),特别是
[google/t5-v1_1-xxl](https://huggingface.co/google/t5-v1_1-xxl) 变体。
tokenizer (`CLIPTokenizer`):
类的分词器
[CLIPTokenizer](https://huggingface.co/docs/transformers/en/model_doc/clip#transformers.CLIPTokenizer)。
tokenizer_2 (`T5TokenizerFast`):
类的第二个分词器
[T5TokenizerFast](https://huggingface.co/docs/transformers/en/model_doc/t5#transformers.T5TokenizerFast)。
"""
# 定义一个序列,用于 CPU 卸载模型组件的顺序
model_cpu_offload_seq = "text_encoder->text_encoder_2->transformer->vae"
# 定义可选组件的空列表
_optional_components = []
# 定义用于回调的张量输入名称
_callback_tensor_inputs = ["latents", "prompt_embeds"]
# 初始化方法,接收多个参数以设置对象
def __init__(
self,
scheduler: FlowMatchEulerDiscreteScheduler, # 调度器参数
vae: AutoencoderKL, # VAE 参数
text_encoder: CLIPTextModel, # 文本编码器参数
tokenizer: CLIPTokenizer, # 第一个分词器参数
text_encoder_2: T5EncoderModel, # 第二个文本编码器参数
tokenizer_2: T5TokenizerFast, # 第二个分词器参数
transformer: FluxTransformer2DModel, # 转换器参数
):
# 调用父类的初始化方法
super().__init__()
# 注册多个模块以供使用
self.register_modules(
vae=vae, # 注册 VAE
text_encoder=text_encoder, # 注册文本编码器
text_encoder_2=text_encoder_2, # 注册第二个文本编码器
tokenizer=tokenizer, # 注册第一个分词器
tokenizer_2=tokenizer_2, # 注册第二个分词器
transformer=transformer, # 注册转换器
scheduler=scheduler, # 注册调度器
)
# 计算 VAE 的缩放因子,默认值为 16
self.vae_scale_factor = (
2 ** (len(self.vae.config.block_out_channels)) if hasattr(self, "vae") and self.vae is not None else 16
)
# 创建 VAE 图像处理器,使用 VAE 缩放因子
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
# 设置分词器的最大长度,默认值为 77
self.tokenizer_max_length = (
self.tokenizer.model_max_length if hasattr(self, "tokenizer") and self.tokenizer is not None else 77
)
# 默认样本大小设置为 64
self.default_sample_size = 64
# 定义获取 T5 模型提示嵌入的私有方法
def _get_t5_prompt_embeds(
self,
prompt: Union[str, List[str]] = None, # 提示文本,可以是字符串或字符串列表
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
max_sequence_length: int = 512, # 提示的最大序列长度
device: Optional[torch.device] = None, # 可选的设备,默认为 None
dtype: Optional[torch.dtype] = None, # 可选的数据类型,默认为 None
):
# 如果未指定设备,则使用类中的执行设备
device = device or self._execution_device
# 如果未指定数据类型,则使用文本编码器的默认数据类型
dtype = dtype or self.text_encoder.dtype
# 如果提示是字符串,则将其转换为单元素列表
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取提示的批处理大小
batch_size = len(prompt)
# 使用分词器将提示转换为张量,进行填充、截断等处理
text_inputs = self.tokenizer_2(
prompt,
padding="max_length", # 填充到最大长度
max_length=max_sequence_length, # 最大长度限制
truncation=True, # 允许截断
return_length=False, # 不返回长度
return_overflowing_tokens=False, # 不返回溢出的标记
return_tensors="pt", # 返回 PyTorch 张量
)
# 获取输入的 ID
text_input_ids = text_inputs.input_ids
# 获取未截断的 ID
untruncated_ids = self.tokenizer_2(prompt, padding="longest", return_tensors="pt").input_ids
# 检查未截断的 ID 是否长于输入 ID,并且内容不相等
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码被截断的文本并记录警告
removed_text = self.tokenizer_2.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
logger.warning(
"The following part of your input was truncated because `max_sequence_length` is set to "
f" {max_sequence_length} tokens: {removed_text}" # 日志记录被截断的文本
)
# 获取文本嵌入,不输出隐藏状态
prompt_embeds = self.text_encoder_2(text_input_ids.to(device), output_hidden_states=False)[0]
# 更新数据类型
dtype = self.text_encoder_2.dtype
# 将嵌入转换为指定的数据类型和设备
prompt_embeds = prompt_embeds.to(dtype=dtype, device=device)
# 获取嵌入的序列长度
_, seq_len, _ = prompt_embeds.shape
# 为每个提示生成的图像复制文本嵌入,使用与 MPS 兼容的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
# 重塑嵌入以适应新的批处理大小
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
# 返回生成的提示嵌入
return prompt_embeds
# 定义获取 CLIP 模型提示嵌入的私有方法
def _get_clip_prompt_embeds(
self,
prompt: Union[str, List[str]], # 提示文本,可以是字符串或字符串列表
num_images_per_prompt: int = 1, # 每个提示生成的图像数量
device: Optional[torch.device] = None, # 可选的设备,默认为 None
# 开始一个函数定义,使用括号表示参数
):
# 如果未指定设备,则使用实例的执行设备
device = device or self._execution_device
# 如果 prompt 是字符串,则将其转换为列表形式
prompt = [prompt] if isinstance(prompt, str) else prompt
# 获取 prompt 的批处理大小
batch_size = len(prompt)
# 使用 tokenizer 对 prompt 进行编码,生成张量格式的输入
text_inputs = self.tokenizer(
prompt,
padding="max_length", # 填充到最大长度
max_length=self.tokenizer_max_length, # 最大长度设置
truncation=True, # 允许截断
return_overflowing_tokens=False, # 不返回溢出令牌
return_length=False, # 不返回长度信息
return_tensors="pt", # 返回 PyTorch 张量
)
# 获取编码后的输入 ID
text_input_ids = text_inputs.input_ids
# 使用最长填充对原始 prompt 进行编码,获取未截断的 ID
untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
# 如果未截断的 ID 长度大于等于输入 ID,且不相等,则处理截断警告
if untruncated_ids.shape[-1] >= text_input_ids.shape[-1] and not torch.equal(text_input_ids, untruncated_ids):
# 解码并记录被截断的文本
removed_text = self.tokenizer.batch_decode(untruncated_ids[:, self.tokenizer_max_length - 1 : -1])
# 记录警告信息,提示用户部分输入已被截断
logger.warning(
"The following part of your input was truncated because CLIP can only handle sequences up to"
f" {self.tokenizer_max_length} tokens: {removed_text}"
)
# 使用文本编码器生成 prompt 的嵌入
prompt_embeds = self.text_encoder(text_input_ids.to(device), output_hidden_states=False)
# 使用 CLIPTextModel 的池化输出
prompt_embeds = prompt_embeds.pooler_output
# 将嵌入转换为指定的数据类型和设备
prompt_embeds = prompt_embeds.to(dtype=self.text_encoder.dtype, device=device)
# 为每个 prompt 生成重复的文本嵌入,使用适合 mps 的方法
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt)
# 调整张量形状以适应批处理大小和图像数量
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, -1)
# 返回处理后的文本嵌入
return prompt_embeds
# 定义一个新的函数,名为 encode_prompt,接收多个参数
def encode_prompt(
self,
# 定义 prompt,支持字符串或字符串列表
prompt: Union[str, List[str]],
# 定义第二个 prompt,支持字符串或字符串列表
prompt_2: Union[str, List[str]],
# 可选设备参数,默认值为 None
device: Optional[torch.device] = None,
# 每个 prompt 生成的图像数量,默认为 1
num_images_per_prompt: int = 1,
# 可选的文本嵌入参数,默认值为 None
prompt_embeds: Optional[torch.FloatTensor] = None,
# 可选的池化文本嵌入参数,默认值为 None
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 最大序列长度,默认值为 512
max_sequence_length: int = 512,
# 可选的 LoRA 比例,默认值为 None
lora_scale: Optional[float] = None,
# 定义另一个函数 check_inputs,接收多个参数
def check_inputs(
self,
# 定义第一个 prompt 参数
prompt,
# 定义第二个 prompt 参数
prompt_2,
# 定义高度参数
height,
# 定义宽度参数
width,
# 可选的文本嵌入参数,默认值为 None
prompt_embeds=None,
# 可选的池化文本嵌入参数,默认值为 None
pooled_prompt_embeds=None,
# 可选的回调参数,默认值为 None
callback_on_step_end_tensor_inputs=None,
# 可选的最大序列长度,默认值为 None
max_sequence_length=None,
):
# 检查高度和宽度是否都能被 8 整除
if height % 8 != 0 or width % 8 != 0:
# 如果不满足条件,抛出值错误,说明高度和宽度的具体值
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
# 检查是否提供了回调输入,并确保它们都在预定义的回调输入中
if callback_on_step_end_tensor_inputs is not None and not all(
k in self._callback_tensor_inputs for k in callback_on_step_end_tensor_inputs
):
# 如果有未在预定义输入中的回调,抛出值错误,显示具体未找到的回调
raise ValueError(
f"`callback_on_step_end_tensor_inputs` has to be in {self._callback_tensor_inputs}, but found {[k for k in callback_on_step_end_tensor_inputs if k not in self._callback_tensor_inputs]}"
)
# 检查 prompt 和 prompt_embeds 是否同时提供
if prompt is not None and prompt_embeds is not None:
# 如果都提供了,抛出值错误,提醒只能提供其中一个
raise ValueError(
f"Cannot forward both `prompt`: {prompt} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt_2 和 prompt_embeds 是否同时提供
elif prompt_2 is not None and prompt_embeds is not None:
# 如果都提供了,抛出值错误,提醒只能提供其中一个
raise ValueError(
f"Cannot forward both `prompt_2`: {prompt_2} and `prompt_embeds`: {prompt_embeds}. Please make sure to"
" only forward one of the two."
)
# 检查 prompt 和 prompt_embeds 是否都未提供
elif prompt is None and prompt_embeds is None:
# 如果都未提供,抛出值错误,提醒必须提供一个
raise ValueError(
"Provide either `prompt` or `prompt_embeds`. Cannot leave both `prompt` and `prompt_embeds` undefined."
)
# 检查 prompt 的类型是否为字符串或列表
elif prompt is not None and (not isinstance(prompt, str) and not isinstance(prompt, list)):
# 如果类型不符合,抛出值错误,显示实际类型
raise ValueError(f"`prompt` has to be of type `str` or `list` but is {type(prompt)}")
# 检查 prompt_2 的类型是否为字符串或列表
elif prompt_2 is not None and (not isinstance(prompt_2, str) and not isinstance(prompt_2, list)):
# 如果类型不符合,抛出值错误,显示实际类型
raise ValueError(f"`prompt_2` has to be of type `str` or `list` but is {type(prompt_2)}")
# 检查是否提供了 prompt_embeds 但未提供 pooled_prompt_embeds
if prompt_embeds is not None and pooled_prompt_embeds is None:
# 如果未提供 pooled_prompt_embeds,抛出值错误,说明需要从相同的文本编码器生成
raise ValueError(
"If `prompt_embeds` are provided, `pooled_prompt_embeds` also have to be passed. Make sure to generate `pooled_prompt_embeds` from the same text encoder that was used to generate `prompt_embeds`."
)
# 检查最大序列长度是否大于 512
if max_sequence_length is not None and max_sequence_length > 512:
# 如果大于 512,抛出值错误,说明具体的最大序列长度
raise ValueError(f"`max_sequence_length` cannot be greater than 512 but is {max_sequence_length}")
@staticmethod
# 准备潜在图像 ID
def _prepare_latent_image_ids(batch_size, height, width, device, dtype):
# 创建一个零的张量,形状为 (height // 2, width // 2, 3)
latent_image_ids = torch.zeros(height // 2, width // 2, 3)
# 为第二维度增加行索引,形成潜在图像 ID 的位置
latent_image_ids[..., 1] = latent_image_ids[..., 1] + torch.arange(height // 2)[:, None]
# 为第三维度增加列索引
latent_image_ids[..., 2] = latent_image_ids[..., 2] + torch.arange(width // 2)[None, :]
# 获取潜在图像 ID 的高度、宽度和通道数
latent_image_id_height, latent_image_id_width, latent_image_id_channels = latent_image_ids.shape
# 将潜在图像 ID 复制到 batch_size 的维度上
latent_image_ids = latent_image_ids[None, :].repeat(batch_size, 1, 1, 1)
# 重塑张量为 (batch_size, height_id * width_id, channels)
latent_image_ids = latent_image_ids.reshape(
batch_size, latent_image_id_height * latent_image_id_width, latent_image_id_channels
)
# 返回设备和数据类型调整后的潜在图像 ID
return latent_image_ids.to(device=device, dtype=dtype)
@staticmethod
# 打包潜在张量
def _pack_latents(latents, batch_size, num_channels_latents, height, width):
# 重塑张量为特定形状
latents = latents.view(batch_size, num_channels_latents, height // 2, 2, width // 2, 2)
# 调整维度顺序
latents = latents.permute(0, 2, 4, 1, 3, 5)
# 再次重塑张量
latents = latents.reshape(batch_size, (height // 2) * (width // 2), num_channels_latents * 4)
# 返回打包后的潜在张量
return latents
@staticmethod
# 解包潜在张量
def _unpack_latents(latents, height, width, vae_scale_factor):
# 获取批量大小、补丁数量和通道数
batch_size, num_patches, channels = latents.shape
# 根据 VAE 缩放因子调整高度和宽度
height = height // vae_scale_factor
width = width // vae_scale_factor
# 重塑潜在张量为特定形状
latents = latents.view(batch_size, height, width, channels // 4, 2, 2)
# 调整维度顺序
latents = latents.permute(0, 3, 1, 4, 2, 5)
# 再次重塑张量为最终形状
latents = latents.reshape(batch_size, channels // (2 * 2), height * 2, width * 2)
# 返回解包后的潜在张量
return latents
# 准备潜在张量
def prepare_latents(
self,
batch_size,
num_channels_latents,
height,
width,
dtype,
device,
generator,
latents=None,
):
# 根据 VAE 缩放因子调整高度和宽度
height = 2 * (int(height) // self.vae_scale_factor)
width = 2 * (int(width) // self.vae_scale_factor)
# 定义张量形状
shape = (batch_size, num_channels_latents, height, width)
# 如果提供了潜在张量,则准备潜在图像 ID
if latents is not None:
latent_image_ids = self._prepare_latent_image_ids(batch_size, height, width, device, dtype)
return latents.to(device=device, dtype=dtype), latent_image_ids
# 验证生成器列表的长度与批量大小匹配
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}. Make sure the batch size matches the length of the generators."
)
# 创建随机潜在张量
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
# 打包潜在张量
latents = self._pack_latents(latents, batch_size, num_channels_latents, height, width)
# 准备潜在图像 ID
latent_image_ids = self._prepare_latent_image_ids(batch_size, height, width, device, dtype)
# 返回打包后的潜在张量和潜在图像 ID
return latents, latent_image_ids
# 获取引导比例
@property
def guidance_scale(self):
return self._guidance_scale
# 属性定义结束
# 定义获取联合注意力参数的方法
def joint_attention_kwargs(self):
# 返回存储的联合注意力参数
return self._joint_attention_kwargs
# 定义 num_timesteps 属性
@property
def num_timesteps(self):
# 返回存储的时间步数
return self._num_timesteps
# 定义 interrupt 属性
@property
def interrupt(self):
# 返回存储的中断状态
return self._interrupt
# 禁用梯度计算,以节省内存和加快计算速度
@torch.no_grad()
# 替换示例文档字符串
@replace_example_docstring(EXAMPLE_DOC_STRING)
# 定义调用方法,接受多种参数
def __call__(
# 提示信息,可以是字符串或字符串列表
prompt: Union[str, List[str]] = None,
# 第二个提示信息,可以是字符串或字符串列表,默认为 None
prompt_2: Optional[Union[str, List[str]]] = None,
# 图像的高度,默认为 None
height: Optional[int] = None,
# 图像的宽度,默认为 None
width: Optional[int] = None,
# 推理步骤的数量,默认为 28
num_inference_steps: int = 28,
# 时间步列表,默认为 None
timesteps: List[int] = None,
# 指导比例,默认为 7.0
guidance_scale: float = 7.0,
# 每个提示生成的图像数量,默认为 1
num_images_per_prompt: Optional[int] = 1,
# 随机数生成器,可以是单个或多个生成器,默认为 None
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
# 潜在变量,默认为 None
latents: Optional[torch.FloatTensor] = None,
# 提示嵌入,默认为 None
prompt_embeds: Optional[torch.FloatTensor] = None,
# 池化的提示嵌入,默认为 None
pooled_prompt_embeds: Optional[torch.FloatTensor] = None,
# 输出类型,默认为 "pil"
output_type: Optional[str] = "pil",
# 是否返回字典格式,默认为 True
return_dict: bool = True,
# 联合注意力参数,默认为 None
joint_attention_kwargs: Optional[Dict[str, Any]] = None,
# 每个步骤结束时的回调函数,默认为 None
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
# 在步骤结束时的张量输入名称列表,默认为 ["latents"]
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
# 最大序列长度,默认为 512
max_sequence_length: int = 512,
.\diffusers\pipelines\flux\pipeline_output.py
# 从 dataclasses 模块导入 dataclass 装饰器
from dataclasses import dataclass
# 导入 List 和 Union 类型注解
from typing import List, Union
# 导入 numpy 库并简化为 np
import numpy as np
# 导入 PIL.Image 模块用于图像处理
import PIL.Image
# 从上级目录的 utils 模块导入 BaseOutput 类
from ...utils import BaseOutput
# 定义 FluxPipelineOutput 类,继承自 BaseOutput
@dataclass
class FluxPipelineOutput(BaseOutput):
"""
Stable Diffusion 流水线的输出类。
Args:
images (`List[PIL.Image.Image]` 或 `np.ndarray`)
长度为 `batch_size` 的去噪 PIL 图像列表或形状为 `(batch_size, height, width,
num_channels)` 的 numpy 数组。PIL 图像或 numpy 数组表示扩散流水线的去噪图像。
"""
# 定义 images 属性,可以是 PIL 图像列表或 numpy 数组
images: Union[List[PIL.Image.Image], np.ndarray]
.\diffusers\pipelines\flux\__init__.py
# 从 typing 模块导入 TYPE_CHECKING,用于类型检查
from typing import TYPE_CHECKING
# 从 utils 模块导入所需的工具函数和常量
from ...utils import (
DIFFUSERS_SLOW_IMPORT, # 导入慢加载的标志
OptionalDependencyNotAvailable, # 导入可选依赖未找到的异常
_LazyModule, # 导入延迟模块加载的类
get_objects_from_module, # 导入从模块获取对象的函数
is_torch_available, # 导入检查 PyTorch 是否可用的函数
is_transformers_available, # 导入检查 Transformers 是否可用的函数
)
# 定义一个空字典,用于存储虚拟对象
_dummy_objects = {}
# 定义一个空字典,用于存储额外导入的对象
_additional_imports = {}
# 定义模块的导入结构,初始化 pipeline_output
_import_structure = {"pipeline_output": ["FluxPipelineOutput"]}
# 尝试检查 Transformers 和 PyTorch 是否可用
try:
if not (is_transformers_available() and is_torch_available()): # 如果不可用
raise OptionalDependencyNotAvailable() # 抛出异常
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
from ...utils import dummy_torch_and_transformers_objects # noqa F403 # 导入虚拟对象模块
# 更新 _dummy_objects 字典,获取虚拟对象
_dummy_objects.update(get_objects_from_module(dummy_torch_and_transformers_objects))
else:
# 如果可用,更新导入结构以包含 FluxPipeline
_import_structure["pipeline_flux"] = ["FluxPipeline"]
# 如果进行类型检查或慢加载标志为真
if TYPE_CHECKING or DIFFUSERS_SLOW_IMPORT:
try:
# 检查 Transformers 和 PyTorch 是否可用
if not (is_transformers_available() and is_torch_available()): # 如果不可用
raise OptionalDependencyNotAvailable() # 抛出异常
# 捕获可选依赖未找到的异常
except OptionalDependencyNotAvailable:
from ...utils.dummy_torch_and_transformers_objects import * # noqa F403 # 导入虚拟对象
else:
# 如果可用,从 pipeline_flux 导入 FluxPipeline
from .pipeline_flux import FluxPipeline
else:
import sys # 导入 sys 模块
# 将当前模块替换为一个延迟加载的模块
sys.modules[__name__] = _LazyModule(
__name__,
globals()["__file__"],
_import_structure,
module_spec=__spec__,
)
# 将 _dummy_objects 中的对象设置为当前模块的属性
for name, value in _dummy_objects.items():
setattr(sys.modules[__name__], name, value)
# 将 _additional_imports 中的对象设置为当前模块的属性
for name, value in _additional_imports.items():
setattr(sys.modules[__name__], name, value)
标签:None,prompt,image,导入,diffusers,源码,import,解析,self
From: https://www.cnblogs.com/apachecn/p/18492374