CogVideo & CogVideoX 微调代码源码解析(二)
.\cogvideo-finetune\inference\cli_demo.py
# 脚本说明:演示如何使用 CogVideoX 模型通过 Hugging Face `diffusers` 管道生成视频
"""
This script demonstrates how to generate a video using the CogVideoX model with the Hugging Face `diffusers` pipeline.
The script supports different types of video generation, including text-to-video (t2v), image-to-video (i2v),
and video-to-video (v2v), depending on the input data and different weight.
- text-to-video: THUDM/CogVideoX-5b or THUDM/CogVideoX-2b
- video-to-video: THUDM/CogVideoX-5b or THUDM/CogVideoX-2b
- image-to-video: THUDM/CogVideoX-5b-I2V
Running the Script:
To run the script, use the following command with appropriate arguments:
$ python cli_demo.py --prompt "A girl riding a bike." --model_path THUDM/CogVideoX-5b --generate_type "t2v"
Additional options are available to specify the model path, guidance scale, number of inference steps, video generation type, and output paths.
"""
# 导入命令行参数解析模块
import argparse
# 从 typing 模块导入 Literal 类型,用于限制参数值
from typing import Literal
# 导入 PyTorch 库
import torch
# 从 diffusers 模块导入所需的类
from diffusers import (
CogVideoXPipeline, # 导入 CogVideoX 管道用于视频生成
CogVideoXDDIMScheduler, # 导入 DDIM 调度器
CogVideoXDPMScheduler, # 导入 DPMS 调度器
CogVideoXImageToVideoPipeline, # 导入图像到视频的管道
CogVideoXVideoToVideoPipeline, # 导入视频到视频的管道
)
# 从 diffusers.utils 模块导入辅助函数
from diffusers.utils import export_to_video, load_image, load_video
# 定义生成视频的函数,接受多个参数
def generate_video(
prompt: str, # 视频描述
model_path: str, # 预训练模型的路径
lora_path: str = None, # LoRA 权重的路径(可选)
lora_rank: int = 128, # LoRA 权重的秩
output_path: str = "./output.mp4", # 生成视频的保存路径
image_or_video_path: str = "", # 输入图像或视频的路径
num_inference_steps: int = 50, # 推理过程中的步骤数
guidance_scale: float = 6.0, # 指导尺度
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量
dtype: torch.dtype = torch.bfloat16, # 计算的数据类型
generate_type: str = Literal["t2v", "i2v", "v2v"], # 生成类型限制
seed: int = 42, # 可重复性的种子
):
"""
Generates a video based on the given prompt and saves it to the specified path.
Parameters:
- prompt (str): The description of the video to be generated.
- model_path (str): The path of the pre-trained model to be used.
- lora_path (str): The path of the LoRA weights to be used.
- lora_rank (int): The rank of the LoRA weights.
- output_path (str): The path where the generated video will be saved.
- num_inference_steps (int): Number of steps for the inference process. More steps can result in better quality.
- guidance_scale (float): The scale for classifier-free guidance. Higher values can lead to better alignment with the prompt.
- num_videos_per_prompt (int): Number of videos to generate per prompt.
- dtype (torch.dtype): The data type for computation (default is torch.bfloat16).
- generate_type (str): The type of video generation (e.g., 't2v', 'i2v', 'v2v').·
- seed (int): The seed for reproducibility.
"""
# 1. 加载预训练的 CogVideoX 管道,使用指定的精度(bfloat16)。
# 添加 device_map="balanced" 到 from_pretrained 函数中,移除 enable_model_cpu_offload() 函数以使用多 GPU。
# 初始化图像变量
image = None
# 初始化视频变量
video = None
# 根据生成类型选择合适的管道
if generate_type == "i2v":
# 从预训练模型路径加载图像到视频管道,指定数据类型
pipe = CogVideoXImageToVideoPipeline.from_pretrained(model_path, torch_dtype=dtype)
# 加载输入的图像
image = load_image(image=image_or_video_path)
elif generate_type == "t2v":
# 从预训练模型路径加载文本到视频管道,指定数据类型
pipe = CogVideoXPipeline.from_pretrained(model_path, torch_dtype=dtype)
else:
# 从预训练模型路径加载视频到视频管道,指定数据类型
pipe = CogVideoXVideoToVideoPipeline.from_pretrained(model_path, torch_dtype=dtype)
# 加载输入的视频
video = load_video(image_or_video_path)
# 如果使用 lora,添加以下代码
if lora_path:
# 加载 lora 权重,指定权重文件和适配器名称
pipe.load_lora_weights(lora_path, weight_name="pytorch_lora_weights.safetensors", adapter_name="test_1")
# 融合 lora 权重,设置缩放比例
pipe.fuse_lora(lora_scale=1 / lora_rank)
# 2. 设置调度器
# 可以更改为 `CogVideoXDPMScheduler` 或 `CogVideoXDDIMScheduler`
# 推荐使用 `CogVideoXDDIMScheduler` 适用于 CogVideoX-2B
# 使用 `CogVideoXDPMScheduler` 适用于 CogVideoX-5B / CogVideoX-5B-I2V
# 使用 DDIM 调度器设置
# pipe.scheduler = CogVideoXDDIMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
# 使用 DPM 调度器设置
pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
# 3. 为模型启用 CPU 卸载
# 如果有多个 GPU 或足够的 GPU 内存(如 H100),可以关闭此选项以减少推理时间
# 并启用到 CUDA 设备
# pipe.to("cuda")
# 启用顺序 CPU 卸载
pipe.enable_sequential_cpu_offload()
# 启用 VAE 切片
pipe.vae.enable_slicing()
# 启用 VAE 平铺
pipe.vae.enable_tiling()
# 4. 根据提示生成视频帧
# `num_frames` 是要生成的帧数
# 默认值适用于 6 秒视频和 8 fps,并会加 1 帧作为第一帧,总共 49 帧
if generate_type == "i2v":
# 调用管道生成视频
video_generate = pipe(
prompt=prompt, # 用于生成视频的提示文本
image=image, # 用作视频背景的图像路径
num_videos_per_prompt=num_videos_per_prompt, # 每个提示生成的视频数量
num_inference_steps=num_inference_steps, # 推理步骤数量
num_frames=49, # 要生成的帧数,版本 `0.30.3` 及之后更改为 49
use_dynamic_cfg=True, # 此 ID 用于 DPM 调度器,DDIM 调度器应为 False
guidance_scale=guidance_scale, # 指导尺度
generator=torch.Generator().manual_seed(seed), # 设置种子以确保可重复性
).frames[0] # 获取生成的第一个视频帧
elif generate_type == "t2v":
# 调用管道生成视频
video_generate = pipe(
prompt=prompt, # 用于生成视频的提示文本
num_videos_per_prompt=num_videos_per_prompt, # 每个提示生成的视频数量
num_inference_steps=num_inference_steps, # 推理步骤数量
num_frames=49, # 要生成的帧数
use_dynamic_cfg=True, # 此 ID 用于 DPM 调度器,DDIM 调度器应为 False
guidance_scale=guidance_scale, # 指导尺度
generator=torch.Generator().manual_seed(seed), # 设置种子以确保可重复性
).frames[0] # 获取生成的第一个视频帧
else: # 如果前面的条件不满足,执行以下代码
# 调用管道函数生成视频
video_generate = pipe(
prompt=prompt, # 传入生成视频所需的提示文本
video=video, # 用作视频背景的文件路径
num_videos_per_prompt=num_videos_per_prompt, # 每个提示生成的视频数量
num_inference_steps=num_inference_steps, # 推理步骤数量
# num_frames=49, # 可选参数:生成的帧数,当前被注释
use_dynamic_cfg=True, # 启用动态配置
guidance_scale=guidance_scale, # 控制生成内容的引导程度
generator=torch.Generator().manual_seed(seed), # 设置随机数生成器的种子,以确保结果可复现
).frames[0] # 获取生成的视频帧中的第一帧
# 将生成的帧导出为视频文件,fps必须设置为8以符合原视频要求
export_to_video(video_generate, output_path, fps=8) # 调用导出函数,传入生成的视频和输出路径
# 仅在直接运行此脚本时执行以下代码块
if __name__ == "__main__":
# 创建一个参数解析器,描述程序功能
parser = argparse.ArgumentParser(description="Generate a video from a text prompt using CogVideoX")
# 添加视频描述的命令行参数,必需
parser.add_argument("--prompt", type=str, required=True, help="The description of the video to be generated")
# 添加背景图片路径的命令行参数,默认值为 None
parser.add_argument(
"--image_or_video_path",
type=str,
default=None,
help="The path of the image to be used as the background of the video",
)
# 添加预训练模型路径的命令行参数,默认值为指定模型
parser.add_argument(
"--model_path", type=str, default="THUDM/CogVideoX-5b", help="The path of the pre-trained model to be used"
)
# 添加 LoRA 权重路径的命令行参数,默认值为 None
parser.add_argument("--lora_path", type=str, default=None, help="The path of the LoRA weights to be used")
# 添加 LoRA 权重秩的命令行参数,默认值为 128
parser.add_argument("--lora_rank", type=int, default=128, help="The rank of the LoRA weights")
# 添加生成视频保存路径的命令行参数,默认值为当前目录下的 output.mp4
parser.add_argument(
"--output_path", type=str, default="./output.mp4", help="The path where the generated video will be saved"
)
# 添加无分类器引导的缩放比例的命令行参数,默认值为 6.0
parser.add_argument("--guidance_scale", type=float, default=6.0, help="The scale for classifier-free guidance")
# 添加推理过程中的步骤数的命令行参数,默认值为 50
parser.add_argument(
"--num_inference_steps", type=int, default=50, help="Number of steps for the inference process"
)
# 添加每个提示生成视频数量的命令行参数,默认值为 1
parser.add_argument("--num_videos_per_prompt", type=int, default=1, help="Number of videos to generate per prompt")
# 添加视频生成类型的命令行参数,默认值为 't2v'
parser.add_argument(
"--generate_type", type=str, default="t2v", help="The type of video generation (e.g., 't2v', 'i2v', 'v2v')"
)
# 添加计算使用的数据类型的命令行参数,默认值为 'bfloat16'
parser.add_argument(
"--dtype", type=str, default="bfloat16", help="The data type for computation (e.g., 'float16' or 'bfloat16')"
)
# 添加用于重现性的随机种子的命令行参数,默认值为 42
parser.add_argument("--seed", type=int, default=42, help="The seed for reproducibility")
# 解析命令行参数并将结果存储在 args 中
args = parser.parse_args()
# 根据指定的数据类型设置相应的 PyTorch 数据类型
dtype = torch.float16 if args.dtype == "float16" else torch.bfloat16
# 调用生成视频的函数,传递解析后的参数
generate_video(
prompt=args.prompt,
model_path=args.model_path,
lora_path=args.lora_path,
lora_rank=args.lora_rank,
output_path=args.output_path,
image_or_video_path=args.image_or_video_path,
num_inference_steps=args.num_inference_steps,
guidance_scale=args.guidance_scale,
num_videos_per_prompt=args.num_videos_per_prompt,
dtype=dtype,
generate_type=args.generate_type,
seed=args.seed,
)
.\cogvideo-finetune\inference\cli_demo_quantization.py
# 该脚本演示如何使用 CogVideoX 通过文本提示生成视频,并使用量化功能。
# 注意事项:
# 必须从源代码安装 `torchao`,`torch`,`diffusers`,`accelerate` 库以使用量化功能。
# 仅支持 H100 或更高版本的 NVIDIA GPU 进行 FP-8 量化。
# 所有量化方案必须在 NVIDIA GPU 上使用。
# 运行脚本的示例命令:
# python cli_demo_quantization.py --prompt "A girl riding a bike." --model_path THUDM/CogVideoX-2b --quantization_scheme fp8 --dtype float16
# python cli_demo_quantization.py --prompt "A girl riding a bike." --model_path THUDM/CogVideoX-5b --quantization_scheme fp8 --dtype bfloat16
import argparse # 导入命令行参数解析库
import os # 导入操作系统相关的功能库
import torch # 导入 PyTorch 库
import torch._dynamo # 导入 PyTorch 动态编译库
from diffusers import AutoencoderKLCogVideoX, CogVideoXTransformer3DModel, CogVideoXPipeline, CogVideoXDPMScheduler # 从 diffusers 库导入多个模型和调度器
from diffusers.utils import export_to_video # 导入用于导出视频的工具
from transformers import T5EncoderModel # 从 transformers 库导入 T5 编码器模型
from torchao.quantization import quantize_, int8_weight_only # 导入量化相关函数
from torchao.float8.inference import ActivationCasting, QuantConfig, quantize_to_float8 # 导入 FP8 量化相关工具
os.environ["TORCH_LOGS"] = "+dynamo,output_code,graph_breaks,recompiles" # 设置环境变量以启用 Torch 日志
torch._dynamo.config.suppress_errors = True # 配置动态编译以抑制错误
torch.set_float32_matmul_precision("high") # 设置浮点32矩阵乘法的精度为高
torch._inductor.config.conv_1x1_as_mm = True # 配置 1x1 卷积为矩阵乘法
torch._inductor.config.coordinate_descent_tuning = True # 启用坐标下降调优
torch._inductor.config.epilogue_fusion = False # 禁用后处理融合
torch._inductor.config.coordinate_descent_check_all_directions = True # 检查所有方向的坐标下降
def quantize_model(part, quantization_scheme): # 定义量化模型的函数
if quantization_scheme == "int8": # 如果量化方案为 int8
quantize_(part, int8_weight_only()) # 对模型进行 int8 量化
elif quantization_scheme == "fp8": # 如果量化方案为 fp8
quantize_to_float8(part, QuantConfig(ActivationCasting.DYNAMIC)) # 对模型进行 fp8 量化
return part # 返回量化后的模型
def generate_video( # 定义生成视频的函数
prompt: str, # 视频描述的文本提示
model_path: str, # 预训练模型的路径
output_path: str = "./output.mp4", # 生成视频的保存路径,默认为 ./output.mp4
num_inference_steps: int = 50, # 推理过程的步骤数,更多步骤可能导致更高质量
guidance_scale: float = 6.0, # 无分类器引导的尺度,较高的值可以更好地对齐提示
num_videos_per_prompt: int = 1, # 每个提示生成的视频数量
quantization_scheme: str = "fp8", # 使用的量化方案('int8','fp8')
dtype: torch.dtype = torch.bfloat16, # 计算的数据类型(默认为 torch.bfloat16)
):
"""
根据给定提示生成视频并保存到指定路径。
参数:
- prompt (str): 要生成视频的描述。
- model_path (str): 要使用的预训练模型的路径。
- output_path (str): 生成的视频保存路径。
- num_inference_steps (int): 推理过程的步骤数。更多步骤可以获得更好的质量。
- guidance_scale (float): 无分类器引导的尺度。较高的值可能导致更好地对齐提示。
- num_videos_per_prompt (int): 每个提示生成的视频数量。
- quantization_scheme (str): 要使用的量化方案('int8','fp8')。
- dtype (torch.dtype): 计算的数据类型(默认为 torch.bfloat16)。
"""
text_encoder = T5EncoderModel.from_pretrained(model_path, subfolder="text_encoder", torch_dtype=dtype) # 从预训练模型加载文本编码器
# 对文本编码器进行量化处理,以减少模型的内存占用和加速推理
text_encoder = quantize_model(part=text_encoder, quantization_scheme=quantization_scheme)
# 从预训练模型加载 3D Transformer 模型,指定模型路径和数据类型
transformer = CogVideoXTransformer3DModel.from_pretrained(model_path, subfolder="transformer", torch_dtype=dtype)
# 对 Transformer 模型进行量化处理
transformer = quantize_model(part=transformer, quantization_scheme=quantization_scheme)
# 从预训练模型加载 VAE(变分自编码器),指定模型路径和数据类型
vae = AutoencoderKLCogVideoX.from_pretrained(model_path, subfolder="vae", torch_dtype=dtype)
# 对 VAE 进行量化处理
vae = quantize_model(part=vae, quantization_scheme=quantization_scheme)
# 创建视频生成管道,传入预训练的组件和数据类型
pipe = CogVideoXPipeline.from_pretrained(
model_path,
text_encoder=text_encoder,
transformer=transformer,
vae=vae,
torch_dtype=dtype,
)
# 使用调度器配置初始化调度器,设置时间步间距为 "trailing"
pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
# 使用 compile 会加快推理速度。第一次推理约需 30 分钟进行编译。
# pipe.transformer.to(memory_format=torch.channels_last)
# 对于 FP8 应该移除 CPU 卸载功能
pipe.enable_model_cpu_offload()
# 这对于 FP8 和 INT8 不是必要的,应移除此行
# pipe.enable_sequential_cpu_offload()
# 启用 VAE 的切片功能,以便进行高效处理
pipe.vae.enable_slicing()
# 启用 VAE 的平铺功能,以支持生成视频的高效处理
pipe.vae.enable_tiling()
# 调用管道生成视频,指定提示、每个提示的视频数量、推理步骤、帧数等参数
video = pipe(
prompt=prompt,
num_videos_per_prompt=num_videos_per_prompt,
num_inference_steps=num_inference_steps,
num_frames=49,
use_dynamic_cfg=True,
guidance_scale=guidance_scale,
generator=torch.Generator(device="cuda").manual_seed(42),
).frames[0]
# 将生成的视频导出到指定路径,设置每秒帧数为 8
export_to_video(video, output_path, fps=8)
# 当脚本被直接执行时,以下代码将会被运行
if __name__ == "__main__":
# 创建一个参数解析器,提供脚本的功能描述
parser = argparse.ArgumentParser(description="Generate a video from a text prompt using CogVideoX")
# 添加一个必需的参数,用于输入视频描述
parser.add_argument("--prompt", type=str, required=True, help="The description of the video to be generated")
# 添加一个可选参数,指定预训练模型的路径,默认为指定的路径
parser.add_argument(
"--model_path", type=str, default="THUDM/CogVideoX-5b", help="The path of the pre-trained model to be used"
)
# 添加一个可选参数,指定生成视频的输出路径,默认为当前目录下的 output.mp4
parser.add_argument(
"--output_path", type=str, default="./output.mp4", help="The path where the generated video will be saved"
)
# 添加一个可选参数,指定推理过程中的步骤数量,默认为 50
parser.add_argument(
"--num_inference_steps", type=int, default=50, help="Number of steps for the inference process"
)
# 添加一个可选参数,指定无分类器引导的比例,默认为 6.0
parser.add_argument("--guidance_scale", type=float, default=6.0, help="The scale for classifier-free guidance")
# 添加一个可选参数,指定每个提示生成的视频数量,默认为 1
parser.add_argument("--num_videos_per_prompt", type=int, default=1, help="Number of videos to generate per prompt")
# 添加一个可选参数,指定计算的数据类型,默认为 bfloat16
parser.add_argument(
"--dtype", type=str, default="bfloat16", help="The data type for computation (e.g., 'float16', 'bfloat16')"
)
# 添加一个可选参数,指定量化方案,默认为 bf16,选择范围为 int8 和 fp8
parser.add_argument(
"--quantization_scheme",
type=str,
default="bf16",
choices=["int8", "fp8"],
help="The quantization scheme to use (int8, fp8)",
)
# 解析命令行参数并将结果存储在 args 中
args = parser.parse_args()
# 根据输入的数据类型设置 PyTorch 数据类型,默认为 bfloat16
dtype = torch.float16 if args.dtype == "float16" else torch.bfloat16
# 调用生成视频的函数,传入所有解析后的参数
generate_video(
prompt=args.prompt,
model_path=args.model_path,
output_path=args.output_path,
num_inference_steps=args.num_inference_steps,
guidance_scale=args.guidance_scale,
num_videos_per_prompt=args.num_videos_per_prompt,
quantization_scheme=args.quantization_scheme,
dtype=dtype,
)
.\cogvideo-finetune\inference\cli_vae_demo.py
"""
此脚本旨在演示如何使用 CogVideoX-2b VAE 模型进行视频编码和解码。
它允许将视频编码为潜在表示,解码回视频,或顺序执行这两项操作。
在运行脚本之前,请确保克隆了 CogVideoX Hugging Face 模型仓库,并将
`{your local diffusers path}` 参数设置为克隆仓库的路径。
命令 1:编码视频
使用 CogVideoX-5b VAE 模型编码位于 ../resources/videos/1.mp4 的视频。
内存使用量:编码时大约需要 ~18GB 的 GPU 内存。
如果您没有足够的 GPU 内存,我们在资源文件夹中提供了一个预编码的张量文件(encoded.pt),
您仍然可以运行解码命令。
$ python cli_vae_demo.py --model_path {your local diffusers path}/CogVideoX-2b/vae/ --video_path ../resources/videos/1.mp4 --mode encode
命令 2:解码视频
将存储在 encoded.pt 中的潜在表示解码回视频。
内存使用量:解码时大约需要 ~4GB 的 GPU 内存。
$ python cli_vae_demo.py --model_path {your local diffusers path}/CogVideoX-2b/vae/ --encoded_path ./encoded.pt --mode decode
命令 3:编码和解码视频
编码位于 ../resources/videos/1.mp4 的视频,然后立即解码。
内存使用量:编码 需要 34GB + 解码需要 19GB(顺序执行)。
$ python cli_vae_demo.py --model_path {your local diffusers path}/CogVideoX-2b/vae/ --video_path ../resources/videos/1.mp4 --mode both
"""
# 导入 argparse 模块用于处理命令行参数
import argparse
# 导入 torch 库用于深度学习相关的操作
import torch
# 导入 imageio 库用于读取视频文件
import imageio
# 从 diffusers 库导入 AutoencoderKLCogVideoX 模型
from diffusers import AutoencoderKLCogVideoX
# 从 torchvision 库导入 transforms,用于数据转换
from torchvision import transforms
# 导入 numpy 库用于数值计算
import numpy as np
def encode_video(model_path, video_path, dtype, device):
"""
加载预训练的 AutoencoderKLCogVideoX 模型并编码视频帧。
参数:
- model_path (str): 预训练模型的路径。
- video_path (str): 视频文件的路径。
- dtype (torch.dtype): 计算所用的数据类型。
- device (str): 计算所用的设备(例如,"cuda" 或 "cpu")。
返回:
- torch.Tensor: 编码后的视频帧。
"""
# 从指定路径加载预训练的模型,并将其移动到指定设备
model = AutoencoderKLCogVideoX.from_pretrained(model_path, torch_dtype=dtype).to(device)
# 启用切片功能以优化内存使用
model.enable_slicing()
# 启用平铺功能以处理大图像
model.enable_tiling()
# 使用 ffmpeg 读取视频文件
video_reader = imageio.get_reader(video_path, "ffmpeg")
# 将视频的每一帧转换为张量并存储在列表中
frames = [transforms.ToTensor()(frame) for frame in video_reader]
# 关闭视频读取器
video_reader.close()
# 将帧列表转换为张量,调整维度,并将其移动到指定设备和数据类型
frames_tensor = torch.stack(frames).to(device).permute(1, 0, 2, 3).unsqueeze(0).to(dtype)
# 在不计算梯度的上下文中进行编码
with torch.no_grad():
# 使用模型编码帧,并从中获取样本
encoded_frames = model.encode(frames_tensor)[0].sample()
# 返回编码后的帧
return encoded_frames
def decode_video(model_path, encoded_tensor_path, dtype, device):
"""
加载预训练的 AutoencoderKLCogVideoX 模型并解码编码的视频帧。
参数:
- model_path (str): 预训练模型的路径。
- encoded_tensor_path (str): 编码张量文件的路径。
# dtype 参数指定计算时使用的数据类型
- dtype (torch.dtype): The data type for computation.
# device 参数指定用于计算的设备(例如,“cuda”或“cpu”)
- device (str): The device to use for computation (e.g., "cuda" or "cpu").
# 返回解码后的视频帧
Returns:
- torch.Tensor: The decoded video frames.
"""
# 从预训练模型加载 AutoencoderKLCogVideoX,并将其转移到指定设备上,设置数据类型
model = AutoencoderKLCogVideoX.from_pretrained(model_path, torch_dtype=dtype).to(device)
# 从指定路径加载编码的张量,并将其转移到设备和指定的数据类型上
encoded_frames = torch.load(encoded_tensor_path, weights_only=True).to(device).to(dtype)
# 在不计算梯度的上下文中解码编码的帧
with torch.no_grad():
# 调用模型解码编码的帧,并获取解码后的样本
decoded_frames = model.decode(encoded_frames).sample
# 返回解码后的帧
return decoded_frames
# 定义一个函数,用于保存视频帧到视频文件
def save_video(tensor, output_path):
"""
保存视频帧到视频文件。
参数:
- tensor (torch.Tensor): 视频帧的张量。
- output_path (str): 输出视频的保存路径。
"""
# 将张量转换为浮点32位类型
tensor = tensor.to(dtype=torch.float32)
# 将张量的第一个维度去掉,重新排列维度,并转移到 CPU,再转换为 NumPy 数组
frames = tensor[0].squeeze(0).permute(1, 2, 3, 0).cpu().numpy()
# 将帧的值裁剪到 0 到 1 之间,并乘以 255 以转换为像素值
frames = np.clip(frames, 0, 1) * 255
# 将帧的数据类型转换为无符号 8 位整数
frames = frames.astype(np.uint8)
# 创建一个视频写入对象,设置输出路径和帧率为 8
writer = imageio.get_writer(output_path + "/output.mp4", fps=8)
# 遍历每一帧,将其添加到视频写入对象中
for frame in frames:
writer.append_data(frame)
# 关闭视频写入对象,完成写入
writer.close()
# 如果当前脚本是主程序,则执行以下代码
if __name__ == "__main__":
# 创建一个参数解析器,用于处理命令行参数
parser = argparse.ArgumentParser(description="CogVideoX encode/decode demo")
# 添加一个参数,用于指定模型的路径
parser.add_argument("--model_path", type=str, required=True, help="The path to the CogVideoX model")
# 添加一个参数,用于指定视频文件的路径(用于编码)
parser.add_argument("--video_path", type=str, help="The path to the video file (for encoding)")
# 添加一个参数,用于指定编码的张量文件的路径(用于解码)
parser.add_argument("--encoded_path", type=str, help="The path to the encoded tensor file (for decoding)")
# 添加一个参数,用于指定输出文件的保存路径,默认为当前目录
parser.add_argument("--output_path", type=str, default=".", help="The path to save the output file")
# 添加一个参数,指定模式:编码、解码或两者
parser.add_argument(
"--mode", type=str, choices=["encode", "decode", "both"], required=True, help="Mode: encode, decode, or both"
)
# 添加一个参数,指定计算的数据类型,默认为 'bfloat16'
parser.add_argument(
"--dtype", type=str, default="bfloat16", help="The data type for computation (e.g., 'float16' or 'bfloat16')"
)
# 添加一个参数,指定用于计算的设备,默认为 'cuda'
parser.add_argument(
"--device", type=str, default="cuda", help="The device to use for computation (e.g., 'cuda' or 'cpu')"
)
# 解析命令行参数
args = parser.parse_args()
# 根据指定的设备创建一个设备对象
device = torch.device(args.device)
# 根据指定的数据类型设置数据类型,默认为 bfloat16
dtype = torch.float16 if args.dtype == "float16" else torch.bfloat16
# 根据模式选择编码、解码或两者的操作
if args.mode == "encode":
# 确保提供了视频路径用于编码
assert args.video_path, "Video path must be provided for encoding."
# 调用编码函数,将视频编码为张量
encoded_output = encode_video(args.model_path, args.video_path, dtype, device)
# 将编码后的张量保存到指定路径
torch.save(encoded_output, args.output_path + "/encoded.pt")
# 打印完成编码的消息
print(f"Finished encoding the video to a tensor, save it to a file at {encoded_output}/encoded.pt")
elif args.mode == "decode":
# 确保提供了编码张量的路径用于解码
assert args.encoded_path, "Encoded tensor path must be provided for decoding."
# 调用解码函数,将编码的张量解码为视频帧
decoded_output = decode_video(args.model_path, args.encoded_path, dtype, device)
# 调用保存视频的函数,将解码后的输出保存为视频文件
save_video(decoded_output, args.output_path)
# 打印完成解码的消息
print(f"Finished decoding the video and saved it to a file at {args.output_path}/output.mp4")
elif args.mode == "both":
# 确保提供了视频路径用于编码
assert args.video_path, "Video path must be provided for encoding."
# 调用编码函数,将视频编码为张量
encoded_output = encode_video(args.model_path, args.video_path, dtype, device)
# 将编码后的张量保存到指定路径
torch.save(encoded_output, args.output_path + "/encoded.pt")
# 调用解码函数,将保存的张量解码为视频帧
decoded_output = decode_video(args.model_path, args.output_path + "/encoded.pt", dtype, device)
# 调用保存视频的函数,将解码后的输出保存为视频文件
save_video(decoded_output, args.output_path)
.\cogvideo-finetune\inference\convert_demo.py
"""
该CogVideoX模型旨在根据详细且高度描述性的提示生成高质量的视频。
当提供精细的、细致的提示时,模型表现最佳,这能提高视频生成的质量。
该脚本旨在帮助将简单的用户输入转换为适合CogVideoX的详细提示。
它可以处理文本到视频(t2v)和图像到视频(i2v)的转换。
- 对于文本到视频,只需提供提示。
- 对于图像到视频,提供图像文件的路径和可选的用户输入。
图像将被编码并作为请求的一部分发送给Azure OpenAI。
### 如何运行:
运行脚本进行**文本到视频**:
$ python convert_demo.py --prompt "一个女孩骑自行车。" --type "t2v"
运行脚本进行**图像到视频**:
$ python convert_demo.py --prompt "猫在跑" --type "i2v" --image_path "/path/to/your/image.jpg"
"""
# 导入argparse库以处理命令行参数
import argparse
# 从openai库导入OpenAI和AzureOpenAI类
from openai import OpenAI, AzureOpenAI
# 导入base64库以进行数据编码
import base64
# 从mimetypes库导入guess_type函数以推测文件类型
from mimetypes import guess_type
# 定义文本到视频的系统提示
sys_prompt_t2v = """您是一个创建视频的机器人团队的一部分。您与一个助手机器人合作,助手会绘制您所说的方括号中的任何内容。
例如,输出“一个阳光穿过树木的美丽清晨”将触发您的伙伴机器人输出如描述的森林早晨的视频。您将被希望创建详细、精彩视频的人所提示。完成此任务的方法是将他们的简短提示转化为极其详细和描述性的内容。
需要遵循一些规则:
您每次用户请求只能输出一个视频描述。
当请求修改时,您不应简单地将描述变得更长。您应重构整个描述,以整合建议。
有时用户不想要修改,而是希望得到一个新图像。在这种情况下,您应忽略与用户的先前对话。
视频描述必须与以下示例的单词数量相同。多余的单词将被忽略。
"""
# 定义图像到视频的系统提示
sys_prompt_i2v = """
**目标**:**根据输入图像和用户输入给出高度描述性的视频说明。** 作为专家,深入分析图像,运用丰富的创造力和细致的思考。在描述图像的细节时,包含适当的动态信息,以确保视频说明包含合理的动作和情节。如果用户输入不为空,则说明应根据用户的输入进行扩展。
**注意**:输入图像是视频的第一帧,输出视频说明应描述从当前图像开始的运动。用户输入是可选的,可以为空。
**注意**:不要包含相机转场!!!不要包含画面切换!!!不要包含视角转换!!!
**回答风格**:
# 定义将图像文件转换为 URL 的函数
def image_to_url(image_path):
# 根据图像路径猜测其 MIME 类型,第二个返回值忽略
mime_type, _ = guess_type(image_path)
# 如果无法猜测 MIME 类型,则设置为通用二进制流类型
if mime_type is None:
mime_type = "application/octet-stream"
# 以二进制模式打开图像文件
with open(image_path, "rb") as image_file:
# 读取图像文件内容并进行 Base64 编码,解码为 UTF-8 格式
base64_encoded_data = base64.b64encode(image_file.read()).decode("utf-8")
# 返回格式化的 Base64 数据 URL 字符串
return f"data:{mime_type};base64,{base64_encoded_data}"
# 定义将提示转换为可用于模型推理的格式的函数
def convert_prompt(prompt: str, retry_times: int = 3, type: str = "t2v", image_path: str = None):
"""
将提示转换为可用于模型推理的格式
"""
# 创建 OpenAI 客户端实例
client = OpenAI()
## 如果使用 Azure OpenAI,请取消注释下面一行并注释上面一行
# client = AzureOpenAI(
# api_key="",
# api_version="",
# azure_endpoint=""
# )
# 去除提示字符串两端的空白
text = prompt.strip()
# 返回未处理的提示
return prompt
# 如果当前脚本是主程序
if __name__ == "__main__":
# 创建命令行参数解析器
parser = argparse.ArgumentParser()
# 添加提示参数,类型为字符串,必需
parser.add_argument("--prompt", type=str, required=True, help="Prompt to convert")
# 添加重试次数参数,类型为整数,默认值为 3
parser.add_argument("--retry_times", type=int, default=3, help="Number of times to retry the conversion")
# 添加转换类型参数,类型为字符串,默认值为 "t2v"
parser.add_argument("--type", type=str, default="t2v", help="Type of conversion (t2v or i2v)")
# 添加图像路径参数,类型为字符串,默认值为 None
parser.add_argument("--image_path", type=str, default=None, help="Path to the image file")
# 解析命令行参数并存储在 args 中
args = parser.parse_args()
# 调用 convert_prompt 函数进行提示转换
converted_prompt = convert_prompt(args.prompt, args.retry_times, args.type, args.image_path)
# 打印转换后的提示
print(converted_prompt)
.\cogvideo-finetune\inference\gradio_composite_demo\app.py
# 这是 Gradio 网页演示的主文件,使用 CogVideoX-5B 模型生成视频并进行增强
"""
THis is the main file for the gradio web demo. It uses the CogVideoX-5B model to generate videos gradio web demo.
set environment variable OPENAI_API_KEY to use the OpenAI API to enhance the prompt.
Usage:
OpenAI_API_KEY=your_openai_api_key OPENAI_BASE_URL=https://api.openai.com/v1 python inference/gradio_web_demo.py
"""
# 导入数学库
import math
# 导入操作系统相关的库
import os
# 导入随机数生成库
import random
# 导入多线程库
import threading
# 导入时间库
import time
# 导入计算机视觉库
import cv2
# 导入临时文件处理库
import tempfile
# 导入视频处理库
import imageio_ffmpeg
# 导入 Gradio 库用于构建界面
import gradio as gr
# 导入 PyTorch 库
import torch
# 导入图像处理库
from PIL import Image
# 导入 Diffusers 库中的模型和调度器
from diffusers import (
CogVideoXPipeline,
CogVideoXDPMScheduler,
CogVideoXVideoToVideoPipeline,
CogVideoXImageToVideoPipeline,
CogVideoXTransformer3DModel,
)
# 导入视频和图像加载工具
from diffusers.utils import load_video, load_image
# 导入日期和时间处理库
from datetime import datetime, timedelta
# 导入图像处理器
from diffusers.image_processor import VaeImageProcessor
# 导入 OpenAI 库
from openai import OpenAI
# 导入视频编辑库
import moviepy.editor as mp
# 导入实用工具模块
import utils
# 导入 RIFE 模型的加载和推断工具
from rife_model import load_rife_model, rife_inference_with_latents
# 导入 Hugging Face Hub 的下载工具
from huggingface_hub import hf_hub_download, snapshot_download
# 检查是否可以使用 GPU,并设置设备类型
device = "cuda" if torch.cuda.is_available() else "cpu"
# 定义模型的名称
MODEL = "THUDM/CogVideoX-5b"
# 从 Hugging Face Hub 下载 Real-ESRGAN 模型权重
hf_hub_download(repo_id="ai-forever/Real-ESRGAN", filename="RealESRGAN_x4.pth", local_dir="model_real_esran")
# 从 Hugging Face Hub 下载 RIFE 模型快照
snapshot_download(repo_id="AlexWortega/RIFE", local_dir="model_rife")
# 从预训练模型中加载视频生成管道,并将其转移到指定设备
pipe = CogVideoXPipeline.from_pretrained(MODEL, torch_dtype=torch.bfloat16).to(device)
# 设置调度器,使用配置文件中的参数
pipe.scheduler = CogVideoXDPMScheduler.from_config(pipe.scheduler.config, timestep_spacing="trailing")
# 创建视频到视频的生成管道
pipe_video = CogVideoXVideoToVideoPipeline.from_pretrained(
MODEL,
transformer=pipe.transformer,
vae=pipe.vae,
scheduler=pipe.scheduler,
tokenizer=pipe.tokenizer,
text_encoder=pipe.text_encoder,
torch_dtype=torch.bfloat16,
).to(device)
# 创建图像到视频的生成管道
pipe_image = CogVideoXImageToVideoPipeline.from_pretrained(
MODEL,
transformer=CogVideoXTransformer3DModel.from_pretrained(
MODEL, subfolder="transformer", torch_dtype=torch.bfloat16
),
vae=pipe.vae,
scheduler=pipe.scheduler,
tokenizer=pipe.tokenizer,
text_encoder=pipe.text_encoder,
torch_dtype=torch.bfloat16,
).to(device)
# 下面的行被注释掉,用于内存优化
# pipe.transformer.to(memory_format=torch.channels_last)
# pipe.transformer = torch.compile(pipe.transformer, mode="max-autotune", fullgraph=True)
# pipe_image.transformer.to(memory_format=torch.channels_last)
# pipe_image.transformer = torch.compile(pipe_image.transformer, mode="max-autotune", fullgraph=True)
# 创建输出目录,如果不存在则创建
os.makedirs("./output", exist_ok=True)
# 创建临时文件夹用于 Gradio
os.makedirs("./gradio_tmp", exist_ok=True)
# 加载超分辨率模型
upscale_model = utils.load_sd_upscale("model_real_esran/RealESRGAN_x4.pth", device)
# 加载帧插值模型
frame_interpolation_model = load_rife_model("model_rife")
# 系统提示,用于指导生成视频的助手
sys_prompt = """You are part of a team of bots that creates videos. You work with an assistant bot that will draw anything you say in square brackets.
# 检查输入视频的尺寸是否适合要求,若不适合则进行处理
def resize_if_unfit(input_video, progress=gr.Progress(track_tqdm=True)):
# 获取输入视频的宽度和高度
width, height = get_video_dimensions(input_video)
# 如果视频尺寸为720x480,直接使用原视频
if width == 720 and height == 480:
processed_video = input_video
# 否则进行中心裁剪和调整大小
else:
processed_video = center_crop_resize(input_video)
# 返回处理后的视频
return processed_video
# 获取输入视频的尺寸信息
def get_video_dimensions(input_video_path):
# 读取视频帧
reader = imageio_ffmpeg.read_frames(input_video_path)
# 获取视频元数据
metadata = next(reader)
# 返回视频尺寸
return metadata["size"]
# 对视频进行中心裁剪和调整大小
def center_crop_resize(input_video_path, target_width=720, target_height=480):
# 打开输入视频
cap = cv2.VideoCapture(input_video_path)
# 获取原视频的宽度、高度和帧率
orig_width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
orig_height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
orig_fps = cap.get(cv2.CAP_PROP_FPS)
# 获取视频的总帧数
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
# 计算宽度和高度的缩放因子
width_factor = target_width / orig_width
height_factor = target_height / orig_height
# 选择较大的缩放因子进行调整
resize_factor = max(width_factor, height_factor)
# 计算中间宽度和高度
inter_width = int(orig_width * resize_factor)
inter_height = int(orig_height * resize_factor)
# 设置目标帧率
target_fps = 8
# 计算理想跳过的帧数
ideal_skip = max(0, math.ceil(orig_fps / target_fps) - 1)
# 限制跳过的帧数最大为5
skip = min(5, ideal_skip) # Cap at 5
# 调整跳过的帧数,以确保足够的帧数
while (total_frames / (skip + 1)) < 49 and skip > 0:
skip -= 1
processed_frames = [] # 存储处理后的帧
frame_count = 0 # 记录已处理帧数
total_read = 0 # 记录已读取帧数
# 读取帧并进行处理,直到处理49帧或读取完成
while frame_count < 49 and total_read < total_frames:
ret, frame = cap.read() # 读取一帧
if not ret: # 如果未成功读取,退出循环
break
# 只处理指定间隔的帧
if total_read % (skip + 1) == 0:
# 调整帧的大小
resized = cv2.resize(frame, (inter_width, inter_height), interpolation=cv2.INTER_AREA)
# 计算裁剪区域的起始位置
start_x = (inter_width - target_width) // 2
start_y = (inter_height - target_height) // 2
# 裁剪帧
cropped = resized[start_y : start_y + target_height, start_x : start_x + target_width]
processed_frames.append(cropped) # 将裁剪后的帧添加到列表
frame_count += 1 # 更新处理帧数
total_read += 1 # 更新已读取帧数
cap.release() # 释放视频捕获对象
# 使用临时文件创建一个后缀为 .mp4 的文件,不会在关闭时删除
with tempfile.NamedTemporaryFile(suffix=".mp4", delete=False) as temp_file:
# 获取临时视频文件的路径
temp_video_path = temp_file.name
# 指定视频编码格式为 mp4v
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
# 初始化视频写入对象,设置输出路径、编码格式、帧率和帧大小
out = cv2.VideoWriter(temp_video_path, fourcc, target_fps, (target_width, target_height))
# 遍历处理过的帧
for frame in processed_frames:
# 将每一帧写入视频文件
out.write(frame)
# 释放视频写入对象,完成文件写入
out.release()
# 返回临时视频文件的路径
return temp_video_path
# 定义一个转换提示的函数,接受提示字符串和重试次数(默认为3)
def convert_prompt(prompt: str, retry_times: int = 3) -> str:
# 检查环境变量中是否存在 OPENAI_API_KEY
if not os.environ.get("OPENAI_API_KEY"):
# 如果没有 API 密钥,返回原始提示
return prompt
# 创建 OpenAI 客户端
client = OpenAI()
# 去掉提示字符串两端的空白字符
text = prompt.strip()
# 返回处理后的提示字符串
return prompt
# 定义一个推断函数,接受多种参数
def infer(
prompt: str,
image_input: str,
video_input: str,
video_strenght: float,
num_inference_steps: int,
guidance_scale: float,
seed: int = -1,
progress=gr.Progress(track_tqdm=True),
):
# 如果种子为 -1,随机生成一个种子
if seed == -1:
seed = random.randint(0, 2**8 - 1)
# 如果有视频输入
if video_input is not None:
# 加载视频并限制为49帧
video = load_video(video_input)[:49] # Limit to 49 frames
# 通过管道处理视频
video_pt = pipe_video(
video=video,
prompt=prompt,
num_inference_steps=num_inference_steps,
num_videos_per_prompt=1,
strength=video_strenght,
use_dynamic_cfg=True,
output_type="pt",
guidance_scale=guidance_scale,
generator=torch.Generator(device="cpu").manual_seed(seed),
).frames
# 如果有图像输入
elif image_input is not None:
# 将输入图像转换为PIL格式并调整大小
image_input = Image.fromarray(image_input).resize(size=(720, 480)) # Convert to PIL
# 加载图像
image = load_image(image_input)
# 通过管道处理图像
video_pt = pipe_image(
image=image,
prompt=prompt,
num_inference_steps=num_inference_steps,
num_videos_per_prompt=1,
use_dynamic_cfg=True,
output_type="pt",
guidance_scale=guidance_scale,
generator=torch.Generator(device="cpu").manual_seed(seed),
).frames
# 如果没有图像或视频输入
else:
# 通过管道直接处理提示生成视频
video_pt = pipe(
prompt=prompt,
num_videos_per_prompt=1,
num_inference_steps=num_inference_steps,
num_frames=49,
use_dynamic_cfg=True,
output_type="pt",
guidance_scale=guidance_scale,
generator=torch.Generator(device="cpu").manual_seed(seed),
).frames
# 返回生成的视频和种子
return (video_pt, seed)
# 定义一个将视频转换为GIF的函数
def convert_to_gif(video_path):
# 加载视频文件
clip = mp.VideoFileClip(video_path)
# 设置视频帧率为8
clip = clip.set_fps(8)
# 调整视频高度为240
clip = clip.resize(height=240)
# 创建GIF文件的路径
gif_path = video_path.replace(".mp4", ".gif")
# 将视频写入GIF文件
clip.write_gif(gif_path, fps=8)
# 返回生成的GIF文件路径
return gif_path
# 定义一个删除旧文件的函数
def delete_old_files():
# 无限循环以持续检查旧文件
while True:
# 获取当前时间
now = datetime.now()
# 计算10分钟前的时间
cutoff = now - timedelta(minutes=10)
# 定义要检查的目录
directories = ["./output", "./gradio_tmp"]
# 遍历每个目录
for directory in directories:
# 遍历目录中的每个文件
for filename in os.listdir(directory):
# 生成文件的完整路径
file_path = os.path.join(directory, filename)
# 检查是否为文件
if os.path.isfile(file_path):
# 获取文件的最后修改时间
file_mtime = datetime.fromtimestamp(os.path.getmtime(file_path))
# 如果文件的修改时间早于截止时间,则删除文件
if file_mtime < cutoff:
os.remove(file_path)
# 每600秒(10分钟)休眠一次
time.sleep(600)
# 启动一个线程来执行删除旧文件的函数,设置为守护线程
threading.Thread(target=delete_old_files, daemon=True).start()
# 定义示例视频列表
examples_videos = [["example_videos/horse.mp4"], ["example_videos/kitten.mp4"], ["example_videos/train_running.mp4"]]
# 创建一个包含示例图片路径的列表,每个子列表包含一个图片路径
examples_images = [["example_images/beach.png"], ["example_images/street.png"], ["example_images/camping.png"]]
# 使用 Gradio 库创建一个块结构的界面
with gr.Blocks() as demo:
# 添加一个 Markdown 组件,用于显示标题和链接
gr.Markdown("""
<div style="text-align: center; font-size: 32px; font-weight: bold; margin-bottom: 20px;">
# 在页面中居中显示的标题,字体大小为 32px,粗体,并有底部间距
CogVideoX-5B Huggingface Space
标签:视频,CogVideoX,self,torch,flow,---,源码,video,path
From: https://www.cnblogs.com/apachecn/p/18494403