6.4.5 视频数据的预处理和增强
文件video_transforms.py主要实现了视频数据的预处理和增强功能,包括对视频帧的随机裁剪、中心裁剪、尺度调整、归一化、随机水平翻转等操作。通过自定义的多种预处理类,例如 RandomCropVideo、CenterCropVideo 和 NormalizeVideo,可以对输入的视频数据进行灵活的空间裁剪、调整和归一化。同时,代码还提供了时间维度上的随机采样功能,通过 TemporalRandomCrop 实现对视频帧的时序抽样,适用于深度学习模型的视频输入预处理。
(1)函数 _is_tensor_video_clip 的功能是验证输入是否为 4D 的 PyTorch 张量视频剪辑。如果输入不是 PyTorch 张量或不是 4D 张量,会抛出相应的错误提示。
def _is_tensor_video_clip(clip):
if not torch.is_tensor(clip):
raise TypeError("clip should be Tensor. Got %s" % type(clip))
if not clip.ndimension() == 4:
raise ValueError("clip should be 4D. Got %dD" % clip.dim())
return True
(2)函数 center_crop_arr 的功能是对输入的 PIL 图像进行中心裁剪,将其调整为指定的目标大小 image_size。该函数先通过缩放保证图像最小边与目标尺寸一致,然后居中裁剪图像以生成目标大小的结果图像。
def center_crop_arr(pil_image, image_size):
"""
Center cropping implementation from ADM.
来源链接:
https://github.com/openai/guided-diffusion/blob/8fb3ad9197f16bbc40620447b2742e13458d2831/guided_diffusion/image_datasets.py#L126
函数功能:
- 对输入的 PIL 图像进行中心裁剪,调整为指定的目标大小 image_size。
"""
# 如果图像的最小边比目标大小的两倍还大,逐步缩小图像
while min(*pil_image.size) >= 2 * image_size:
pil_image = pil_image.resize(
tuple(x // 2 for x in pil_image.size), resample=Image.BOX
)
# 计算比例使最小边缩放至目标大小
scale = image_size / min(*pil_image.size)
pil_image = pil_image.resize(
tuple(round(x * scale) for x in pil_image.size), resample=Image.BICUBIC
)
# 将图像转换为数组
arr = np.array(pil_image)
# 计算中心裁剪的起始位置
crop_y = (arr.shape[0] - image_size) // 2
crop_x = (arr.shape[1] - image_size) // 2
# 裁剪并转换回 PIL 图像
return Image.fromarray(arr[crop_y: crop_y + image_size, crop_x: crop_x + image_size])
(3)函数 crop 的功能是对输入的视频片段(4D Tensor)进行裁剪,裁剪出从位置 (i, j) 开始、高度为 h、宽度为 w 的子区域。
def crop(clip, i, j, h, w):
"""
对输入的视频片段进行裁剪。
Args:
clip (torch.tensor): 需要裁剪的视频片段,大小为 (T, C, H, W),
其中 T 是时间帧数,C 是通道数,H 是高度,W 是宽度。
i (int): 裁剪区域的起始高度坐标。
j (int): 裁剪区域的起始宽度坐标。
h (int): 裁剪区域的高度。
w (int): 裁剪区域的宽度。
Returns:
torch.tensor: 裁剪后的子区域,大小为 (T, C, h, w)。
Raises:
ValueError: 如果输入 `clip` 不是 4D 张量,则抛出异常。
"""
if len(clip.size()) != 4:
raise ValueError("clip should be a 4D tensor")
return clip[..., i : i + h, j : j + w]
(4)函数 resize 的功能是将输入的视频片段(4D Tensor)按照指定的目标大小进行缩放,支持指定插值模式。
def resize(clip, target_size, interpolation_mode):
if len(target_size) != 2:
raise ValueError(f"target size should be tuple (height, width), instead got {target_size}")
return torch.nn.functional.interpolate(clip, size=target_size, mode=interpolation_mode, align_corners=False)
(5)函数 resize_scale 的功能是根据输入的视频片段的最小维度与目标大小的最小维度之间的比例,来缩放视频片段。
def resize_scale(clip, target_size, interpolation_mode):
if len(target_size) != 2:
raise ValueError(f"target size should be tuple (height, width), instead got {target_size}")
H, W = clip.size(-2), clip.size(-1)
scale_ = target_size[0] / min(H, W)
return torch.nn.functional.interpolate(clip, scale_factor=scale_, mode=interpolation_mode, align_corners=False)
(6)函数 resized_crop 的功能是对视频片段进行空间裁剪并调整大小,首先通过给定的坐标和裁剪尺寸对视频进行裁剪,然后再根据目标尺寸对裁剪后的视频进行缩放。
def resized_crop(clip, i, j, h, w, size, interpolation_mode="bilinear"):
"""
对视频片段进行空间裁剪和调整大小。
Args:
clip (torch.tensor): 需要裁剪的视频片段,大小为 (T, C, H, W),
其中 T 是时间帧数,C 是通道数,H 是高度,W 是宽度。
i (int): 裁剪区域左上角的 i 坐标。
j (int): 裁剪区域左上角的 j 坐标。
h (int): 裁剪区域的高度。
w (int): 裁剪区域的宽度。
size (tuple(int, int)): 目标大小,应为一个包含两个整数的元组 (height, width)。
interpolation_mode (str): 插值模式,默认为 "bilinear",可以是 "nearest", "bilinear", "bicubic" 等。
Returns:
torch.tensor: 裁剪并调整大小后的片段,大小为 (T, C, H, W)。
Raises:
ValueError: 如果 `clip` 不是 4D 的 torch.tensor,抛出异常。
"""
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
# 先进行裁剪
clip = crop(clip, i, j, h, w)
# 然后调整大小
clip = resize(clip, size, interpolation_mode)
return clip
(7)函数 center_crop 的功能是对视频片段进行中心裁剪,裁剪区域的尺寸为 crop_size,并确保裁剪区域在视频片段的中间。函数 center_crop 会从视频片段中提取一个中心区域,裁剪区域的尺寸由 crop_size 决定。如果视频的高度或宽度小于裁剪尺寸,会抛出异常。
def center_crop(clip, crop_size):
"""
对视频片段进行中心裁剪。
Args:
clip (torch.tensor): 需要裁剪的视频片段,大小为 (T, C, H, W),
其中 T 是时间帧数,C 是通道数,H 是高度,W 是宽度。
crop_size (tuple(int, int)): 裁剪区域的大小,应为一个包含两个整数的元组 (height, width)。
Returns:
torch.tensor: 中心裁剪后的片段,大小为 (T, C, H, W),裁剪区域大小为 `crop_size`。
Raises:
ValueError: 如果 `clip` 不是 4D 的 torch.tensor,或者视频的高度和宽度小于裁剪区域的大小,抛出异常。
"""
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
h, w = clip.size(-2), clip.size(-1)
th, tw = crop_size
if h < th or w < tw:
raise ValueError("height and width must be no smaller than crop_size")
# 计算裁剪区域的左上角坐标
i = int(round((h - th) / 2.0))
j = int(round((w - tw) / 2.0))
return crop(clip, i, j, th, tw)
(8)函数 center_crop_using_short_edge 的功能是根据视频片段的短边进行中心裁剪,裁剪区域的尺寸与短边相同。函数 center_crop_using_short_edge 会根据视频片段的短边(高度或宽度)进行裁剪,使得裁剪后的区域的大小与短边相等,并将裁剪区域居中。
def center_crop_using_short_edge(clip):
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
h, w = clip.size(-2), clip.size(-1)
if h < w:
th, tw = h, h
i = 0
j = int(round((w - tw) / 2.0))
else:
th, tw = w, w
i = int(round((h - th) / 2.0))
j = 0
return crop(clip, i, j, th, tw)
(9)函数 random_shift_crop 的功能是根据视频片段的长边进行滑动,裁剪出一个短边大小的区域。裁剪的位置是随机选择的,确保裁剪区域能够适应视频的长边和短边。
def random_shift_crop(clip):
'''
沿着长边进行滑动,裁剪出短边大小的区域。
Args:
clip (torch.tensor): 需要裁剪的视频片段,大小为 (T, C, H, W),
其中 T 是时间帧数,C 是通道数,H 是高度,W 是宽度。
Returns:
torch.tensor: 随机滑动裁剪后的片段,大小为 (T, C, H, W),裁剪区域的尺寸为短边的大小。
Raises:
ValueError: 如果 `clip` 不是 4D 的 torch.tensor,抛出异常。
'''
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
h, w = clip.size(-2), clip.size(-1)
# 判断长边和短边
if h <= w:
long_edge = w
short_edge = h
else:
long_edge = h
short_edge = w
th, tw = short_edge, short_edge
# 随机选择裁剪区域的起始位置
i = torch.randint(0, h - th + 1, size=(1,)).item()
j = torch.randint(0, w - tw + 1, size=(1,)).item()
return crop(clip, i, j, th, tw)
(10)函数 to_tensor 的功能是将视频片段的 uint8 类型数据转换为 float 类型,并将像素值除以 255 进行归一化。
def to_tensor(clip):
"""
将张量数据类型从 uint8 转换为 float,并将像素值除以 255.0 进行归一化,同时调整张量的维度顺序。
参数:
clip (torch.tensor, dtype=torch.uint8): 大小为 (T, C, H, W) 的视频片段张量,数据类型为 uint8。
返回:
clip (torch.tensor, dtype=torch.float): 大小为 (T, C, H, W) 的视频片段张量,数据类型为 float。
"""
_is_tensor_video_clip(clip)
if not clip.dtype == torch.uint8:
raise TypeError("clip tensor should have data type uint8. Got %s" % str(clip.dtype))
# 返回经过 float 转换、除以 255.0 归一化的张量
return clip.float() / 255.0
(11)函数 normalize 的功能是对视频片段进行标准化处理,即通过减去均值并除以标准差来对像素进行归一化。函数 normalize 将输入的视频片段(clip)进行标准化。它通过减去指定的均值并除以标准差来进行处理,通常用于图像预处理以提高模型的表现。如果 inplace 为 True,则在原始张量上直接修改,否则返回一个新的标准化后的张量。
def normalize(clip, mean, std, inplace=False):
"""
对视频片段进行标准化处理,通过减去均值并除以标准差进行归一化。
参数:
clip (torch.tensor): 要进行标准化的视频片段。大小为 (T, C, H, W)。
mean (tuple): 每个像素的 RGB 均值。大小为 (3)。
std (tuple): 每个像素的标准差。大小为 (3)。
inplace (bool, optional): 是否直接在原始张量上进行操作,默认为 False。
返回:
normalized clip (torch.tensor): 标准化后的视频片段。大小为 (T, C, H, W)。
"""
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
if not inplace:
clip = clip.clone()
mean = torch.as_tensor(mean, dtype=clip.dtype, device=clip.device)
std = torch.as_tensor(std, dtype=clip.dtype, device=clip.device)
clip.sub_(mean[:, None, None, None]).div_(std[:, None, None, None])
return clip
(12)函数 hflip 的功能是对视频片段进行水平翻转,即将每一帧图像的内容在水平方向上翻转。函数 hflip 对输入的视频片段(clip)进行水平翻转。它通过调用 flip 方法对视频片段的最后一个维度(宽度维度)进行翻转,生成每一帧的镜像版本。
def hflip(clip):
"""
对视频片段进行水平翻转。
参数:
clip (torch.tensor): 要进行翻转的视频片段。大小为 (T, C, H, W)。
返回:
flipped clip (torch.tensor): 水平翻转后的视频片段。大小为 (T, C, H, W)。
"""
if not _is_tensor_video_clip(clip):
raise ValueError("clip should be a 4D torch.tensor")
return clip.flip(-1)
(13)类 RandomCropVideo 的功能是对视频片段进行随机裁剪。它接受一个裁剪尺寸,并通过调用 __call__ 方法对输入的视频片段进行随机裁剪。
import numbers
import torch
class RandomCropVideo:
def __init__(self, size):
if isinstance(size, numbers.Number):
self.size = (int(size), int(size))
else:
self.size = size
def __call__(self, clip):
"""
对视频片段进行随机裁剪。
参数:
clip (torch.tensor): 要进行裁剪的视频片段。大小为 (T, C, H, W)。
返回:
torch.tensor: 随机裁剪后的视频片段。大小为 (T, C, OH, OW)。
"""
i, j, h, w = self.get_params(clip)
return crop(clip, i, j, h, w)
def get_params(self, clip):
"""
获取裁剪的起始坐标和裁剪区域的尺寸。
参数:
clip (torch.tensor): 要裁剪的视频片段。大小为 (T, C, H, W)。
返回:
tuple: 裁剪的起始坐标 (i, j) 和裁剪区域的高度和宽度 (h, w)。
"""
h, w = clip.shape[-2:]
th, tw = self.size
if h < th or w < tw:
raise ValueError(f"Required crop size {(th, tw)} is larger than input image size {(h, w)}")
if w == tw and h == th:
return 0, 0, h, w
i = torch.randint(0, h - th + 1, size=(1,)).item()
j = torch.randint(0, w - tw + 1, size=(1,)).item()
return i, j, th, tw
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size})"
对上述代码的具体说明如下所示:
- __init__:初始化裁剪尺寸,可以传入一个数字或一个尺寸元组 (height, width)。
- __call__:当类实例被调用时,执行视频片段的随机裁剪。
- get_params:根据输入的视频片段尺寸,计算裁剪的起始位置和裁剪区域的尺寸。
- __repr__:返回类的字符串表示。
(14)类 CenterCropResizeVideo 的功能是对视频片段进行中心裁剪,并使用视频的短边进行裁剪,最后将裁剪后的视频片段缩放到指定的大小。
class CenterCropResizeVideo:
def __init__(
self,
size,
interpolation_mode="bilinear",
):
if isinstance(size, tuple):
if len(size) != 2:
raise ValueError(f"size should be tuple (height, width), instead got {size}")
self.size = size
else:
self.size = (size, size)
self.interpolation_mode = interpolation_mode
def __call__(self, clip):
clip_center_crop = center_crop_using_short_edge(clip)
clip_center_crop_resize = resize(clip_center_crop, target_size=self.size, interpolation_mode=self.interpolation_mode)
return clip_center_crop_resize
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size}, interpolation_mode={self.interpolation_mode}"
对上述代码的具体说明如下所示:
- __init__:初始化类实例时,指定裁剪和缩放的目标大小 size,以及插值方法(默认使用 "bilinear" 双线性插值)。
- __call__:当类实例被调用时,执行中心裁剪并缩放视频片段。
- __repr__:返回类的字符串表示,展示其裁剪大小和插值方法。
(15)类 UCFCenterCropVideo 的功能是首先按比例缩放视频片段,使短边达到指定大小,然后对缩放后的视频片段进行中心裁剪。
class UCFCenterCropVideo:
def __init__(
self,
size,
interpolation_mode="bilinear",
):
if isinstance(size, tuple):
if len(size) != 2:
raise ValueError(f"size should be tuple (height, width), instead got {size}")
self.size = size
else:
self.size = (size, size)
self.interpolation_mode = interpolation_mode
def __call__(self, clip):
clip_resize = resize_scale(clip=clip, target_size=self.size, interpolation_mode=self.interpolation_mode)
clip_center_crop = center_crop(clip_resize, self.size)
return clip_center_crop
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size}, interpolation_mode={self.interpolation_mode}"
对上述代码的具体说明如下所示:
- __init__:初始化类实例时,指定裁剪和缩放的目标大小 size,以及插值方法(默认使用 "bilinear" 双线性插值)。
- __call__:当类实例被调用时,执行按比例缩放和中心裁剪视频片段。
- __repr__:返回类的字符串表示,展示其裁剪大小和插值方法。
(16)类 KineticsRandomCropResizeVideo 的功能是对视频片段进行随机裁剪,然后调整大小到指定的目标尺寸。
class KineticsRandomCropResizeVideo:
def __init__(
self,
size,
interpolation_mode="bilinear",
):
if isinstance(size, tuple):
if len(size) != 2:
raise ValueError(f"size should be tuple (height, width), instead got {size}")
self.size = size
else:
self.size = (size, size)
self.interpolation_mode = interpolation_mode
def __call__(self, clip):
clip_random_crop = random_shift_crop(clip)
clip_resize = resize(clip_random_crop, self.size, self.interpolation_mode)
return clip_resize
(17)类 CenterCropVideo 的功能是对视频片段进行中心裁剪,裁剪后的区域大小与初始化时指定的目标大小一致。
class CenterCropVideo:
def __init__(
self,
size,
interpolation_mode="bilinear",
):
if isinstance(size, tuple):
if len(size) != 2:
raise ValueError(f"size should be tuple (height, width), instead got {size}")
self.size = size
else:
self.size = (size, size)
self.interpolation_mode = interpolation_mode
def __call__(self, clip):
clip_center_crop = center_crop(clip, self.size)
return clip_center_crop
def __repr__(self) -> str:
return f"{self.__class__.__name__}(size={self.size}, interpolation_mode={self.interpolation_mode}"
对上述代码的具体说明如下所示:
- __init__:初始化类实例时,指定裁剪的目标大小 size,以及插值方法(默认使用 "bilinear" 双线性插值)。如果只指定一个数字,则认为该数字为边长,并将裁剪大小设置为正方形 (size, size)。
- __call__:当类实例被调用时,执行中心裁剪操作。
- __repr__:返回类的字符串表示,展示其裁剪大小和插值方法。
(18)类 NormalizeVideo 的功能是对视频片段进行标准化处理,包括减去均值并除以标准差,可以选择是否进行原地处理。
class NormalizeVideo:
def __init__(self, mean, std, inplace=False):
self.mean = mean
self.std = std
self.inplace = inplace
def __call__(self, clip):
return normalize(clip, self.mean, self.std, self.inplace)
def __repr__(self) -> str:
return f"{self.__class__.__name__}(mean={self.mean}, std={self.std}, inplace={self.inplace})"
对上述代码的具体说明如下所示:
- __init__:初始化类实例时,传入用于标准化的均值 mean 和标准差 std,以及是否进行原地操作的标志 inplace(默认为 False)。
- __call__:当类实例被调用时,执行标准化操作。标准化方式是通过 normalize 函数,利用传入的均值和标准差对视频片段进行标准化。
- __repr__:返回类的字符串表示形式,显示均值、标准差和是否进行原地标准化的参数。
(19)类 ToTensorVideo 的功能是将视频片段的 uint8 数据类型转换为 float,并将其值除以 255.0,同时变换张量的维度顺序。
class ToTensorVideo:
def __init__(self):
pass
def __call__(self, clip):
return to_tensor(clip)
def __repr__(self) -> str:
return self.__class__.__name__
对上述代码的具体说明如下所示:
- __init__:初始化类实例时不需要额外的参数。
- __call__:当类实例被调用时,将视频片段 clip 从 uint8 类型转换为 float 类型,并除以 255.0,返回转换后的结果。
- __repr__:返回类的字符串表示形式,即类名 ToTensorVideo。
(20)类RandomHorizontalFlipVideo通过指定的概率 p 对视频片段进行随机水平翻转,若随机数小于 p,则执行翻转。
class RandomHorizontalFlipVideo:
def __init__(self, p=0.5):
self.p = p
def __call__(self, clip):
if random.random() < self.p:
clip = hflip(clip)
return clip
def __repr__(self) -> str:
return f"{self.__class__.__name__}(p={self.p})"
对上述代码的具体说明如下所示:
- __init__:初始化时设置水平翻转的概率 p,默认值为 0.5。
- __call__:在调用该类实例时,根据给定的概率 p 对视频片段进行水平翻转(通过 hflip 函数),如果随机数小于 p,则执行翻转操作。
- __repr__:返回类的字符串表示形式,包含翻转的概率 p。
(21)类TemporalRandomCrop通过指定的帧数大小 size,在给定的总帧数 total_frames 中随机选择一个时间窗口进行裁剪,返回裁剪开始和结束的帧索引。
class TemporalRandomCrop(object):
def __init__(self, size):
self.size = size
def __call__(self, total_frames):
rand_end = max(0, total_frames - self.size - 1)
begin_index = random.randint(0, rand_end)
end_index = min(begin_index + self.size, total_frames)
return begin_index, end_index
(22)下面代码用于从视频中随机采样指定数量的帧,对帧进行一系列图像变换(包括转为张量、随机水平翻转、中心裁剪和归一化),然后保存变换后的视频片段以及每帧的图像。整个流程可以分为以下步骤:
- 读取视频:加载视频帧信息并提取视频元数据。
- 随机时间裁剪:从视频中随机选择一段指定长度的帧。
- 帧变换处理:对选中的帧进行预定义的图像变换处理。
- 保存结果:保存处理后的视频以及视频帧图像。
if __name__ == '__main__':
from torchvision import transforms
import torchvision.io as io
import numpy as np
from torchvision.utils import save_image
import os
# 读取视频
# 参数:
# filename:输入视频文件路径
# pts_unit:时间戳单位,设置为秒
# output_format:视频帧输出格式为 (T, C, H, W)
vframes, aframes, info = io.read_video(
filename='./v_Archery_g01_c03.avi',
pts_unit='sec',
output_format='TCHW'
)
# 定义帧变换
trans = transforms.Compose([
ToTensorVideo(), # 转为张量格式
RandomHorizontalFlipVideo(), # 随机水平翻转
UCFCenterCropVideo(512), # 使用短边等比例裁剪,然后居中裁剪为 512x512
transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5], inplace=True) # 归一化
])
# 指定目标视频长度和帧间隔
target_video_len = 32
frame_interval = 1
# 获取总帧数
total_frames = len(vframes)
print(total_frames)
# 定义时间随机裁剪器
temporal_sample = TemporalRandomCrop(target_video_len * frame_interval)
# 从视频中采样帧
start_frame_ind, end_frame_ind = temporal_sample(total_frames)
assert end_frame_ind - start_frame_ind >= target_video_len # 确保采样帧数足够
frame_indice = np.linspace(start_frame_ind, end_frame_ind - 1, target_video_len, dtype=int)
print(frame_indice)
# 提取选中的帧
select_vframes = vframes[frame_indice]
print(select_vframes.shape)
print(select_vframes.dtype)
# 应用变换处理
select_vframes_trans = trans(select_vframes)
print(select_vframes_trans.shape)
print(select_vframes_trans.dtype)
# 转换帧为 uint8 格式用于保存视频
select_vframes_trans_int = ((select_vframes_trans * 0.5 + 0.5) * 255).to(dtype=torch.uint8)
print(select_vframes_trans_int.dtype)
print(select_vframes_trans_int.permute(0, 2, 3, 1).shape)
# 保存处理后的视频
io.write_video('./test.avi', select_vframes_trans_int.permute(0, 2, 3, 1), fps=8)
# 保存每帧的处理后图像
for i in range(target_video_len):
save_image(select_vframes_trans[i], os.path.join('./test000', '%04d.png' % i), normalize=True, value_range=(-1, 1))
标签:Diffusion,视频,clip,文生,self,裁剪,__,size
From: https://blog.csdn.net/asd343442/article/details/145210720