首页 > 编程语言 >Yolov8-源码解析-三十六-

Yolov8-源码解析-三十六-

时间:2024-09-05 11:53:24浏览次数:15  
标签:三十六 __ nn self Yolov8 源码 c2 c1 def

Yolov8 源码解析(三十六)

.\yolov8\ultralytics\models\yolo\pose\__init__.py

# 导入模块 predict 中的 PosePredictor 类
# 导入模块 train 中的 PoseTrainer 类
# 导入模块 val 中的 PoseValidator 类
from .predict import PosePredictor
from .train import PoseTrainer
from .val import PoseValidator

# 定义 __all__ 变量,包含需要在该模块中公开的类名字符串
__all__ = "PoseTrainer", "PoseValidator", "PosePredictor"

.\yolov8\ultralytics\models\yolo\segment\predict.py

# 导入必要的模块和类
from ultralytics.engine.results import Results
from ultralytics.models.yolo.detect.predict import DetectionPredictor
from ultralytics.utils import DEFAULT_CFG, ops

class SegmentationPredictor(DetectionPredictor):
    """
    一个扩展了DetectionPredictor类的类,用于基于分割模型进行预测。

    示例:
        ```python
        from ultralytics.utils import ASSETS
        from ultralytics.models.yolo.segment import SegmentationPredictor

        args = dict(model='yolov8n-seg.pt', source=ASSETS)
        predictor = SegmentationPredictor(overrides=args)
        predictor.predict_cli()
        ```py
    """

    def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None):
        """
        初始化SegmentationPredictor对象,使用提供的配置、覆盖和回调函数。
        """
        super().__init__(cfg, overrides, _callbacks)
        self.args.task = "segment"  # 设置预测任务为分割任务

    def postprocess(self, preds, img, orig_imgs):
        """
        对每个输入批次中的图像应用非最大抑制,并处理检测结果。
        """
        # 对预测结果应用非最大抑制
        p = ops.non_max_suppression(
            preds[0],
            self.args.conf,
            self.args.iou,
            agnostic=self.args.agnostic_nms,
            max_det=self.args.max_det,
            nc=len(self.model.names),
            classes=self.args.classes,
        )

        # 如果输入图像不是一个列表,而是一个torch.Tensor,则转换为numpy数组
        if not isinstance(orig_imgs, list):
            orig_imgs = ops.convert_torch2numpy_batch(orig_imgs)

        results = []  # 初始化结果列表
        proto = preds[1][-1] if isinstance(preds[1], tuple) else preds[1]  # 确定使用的协议格式
        for i, (pred, orig_img, img_path) in enumerate(zip(p, orig_imgs, self.batch[0])):
            if not len(pred):  # 如果预测结果为空,保存空框
                masks = None
            elif self.args.retina_masks:  # 如果需要返回掩膜
                # 缩放框,并处理原始图像生成掩膜
                pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
                masks = ops.process_mask_native(proto[i], pred[:, 6:], pred[:, :4], orig_img.shape[:2])  # HWC
            else:
                # 处理掩膜,生成掩膜,并缩放框
                masks = ops.process_mask(proto[i], pred[:, 6:], pred[:, :4], img.shape[2:], upsample=True)  # HWC
                pred[:, :4] = ops.scale_boxes(img.shape[2:], pred[:, :4], orig_img.shape)
            # 将处理后的结果添加到结果列表中
            results.append(Results(orig_img, path=img_path, names=self.model.names, boxes=pred[:, :6], masks=masks))
        return results  # 返回处理后的结果列表

.\yolov8\ultralytics\models\yolo\segment\train.py

# 导入必要的模块和类
from copy import copy
from ultralytics.models import yolo
from ultralytics.nn.tasks import SegmentationModel
from ultralytics.utils import DEFAULT_CFG, RANK
from ultralytics.utils.plotting import plot_images, plot_results

# 定义一个继承自DetectionTrainer的类,用于分割模型的训练
class SegmentationTrainer(yolo.detect.DetectionTrainer):
    """
    A class extending the DetectionTrainer class for training based on a segmentation model.

    Example:
        ```python
        from ultralytics.models.yolo.segment import SegmentationTrainer

        args = dict(model='yolov8n-seg.pt', data='coco8-seg.yaml', epochs=3)
        trainer = SegmentationTrainer(overrides=args)
        trainer.train()
        ```py
    """

    def __init__(self, cfg=DEFAULT_CFG, overrides=None, _callbacks=None):
        """Initialize a SegmentationTrainer object with given arguments."""
        if overrides is None:
            overrides = {}
        # 设置任务类型为分割任务
        overrides["task"] = "segment"
        super().__init__(cfg, overrides, _callbacks)

    def get_model(self, cfg=None, weights=None, verbose=True):
        """Return SegmentationModel initialized with specified config and weights."""
        # 使用指定的配置和权重初始化分割模型
        model = SegmentationModel(cfg, ch=3, nc=self.data["nc"], verbose=verbose and RANK == -1)
        if weights:
            model.load(weights)

        return model

    def get_validator(self):
        """Return an instance of SegmentationValidator for validation of YOLO model."""
        # 返回一个SegmentationValidator的实例,用于验证YOLO模型
        self.loss_names = "box_loss", "seg_loss", "cls_loss", "dfl_loss"
        return yolo.segment.SegmentationValidator(
            self.test_loader, save_dir=self.save_dir, args=copy(self.args), _callbacks=self.callbacks
        )

    def plot_training_samples(self, batch, ni):
        """Creates a plot of training sample images with labels and box coordinates."""
        # 绘制包含标签和框坐标的训练样本图像的图表
        plot_images(
            batch["img"],
            batch["batch_idx"],
            batch["cls"].squeeze(-1),
            batch["bboxes"],
            masks=batch["masks"],
            paths=batch["im_file"],
            fname=self.save_dir / f"train_batch{ni}.jpg",
            on_plot=self.on_plot,
        )

    def plot_metrics(self):
        """Plots training/val metrics."""
        # 绘制训练/验证指标的图表
        plot_results(file=self.csv, segment=True, on_plot=self.on_plot)  # save results.png

.\yolov8\ultralytics\models\yolo\segment\val.py

# 导入所需模块
from multiprocessing.pool import ThreadPool
from pathlib import Path

# 导入 NumPy 和 PyTorch 库
import numpy as np
import torch
import torch.nn.functional as F

# 导入 Ultralytics 相关模块和函数
from ultralytics.models.yolo.detect import DetectionValidator
from ultralytics.utils import LOGGER, NUM_THREADS, ops
from ultralytics.utils.checks import check_requirements
from ultralytics.utils.metrics import SegmentMetrics, box_iou, mask_iou
from ultralytics.utils.plotting import output_to_target, plot_images

# 定义一个继承自 DetectionValidator 的 SegmentationValidator 类
class SegmentationValidator(DetectionValidator):
    """
    A class extending the DetectionValidator class for validation based on a segmentation model.

    Example:
        ```python
        from ultralytics.models.yolo.segment import SegmentationValidator

        args = dict(model='yolov8n-seg.pt', data='coco8-seg.yaml')
        validator = SegmentationValidator(args=args)
        validator()
        ```py
    """

    def __init__(self, dataloader=None, save_dir=None, pbar=None, args=None, _callbacks=None):
        """Initialize SegmentationValidator and set task to 'segment', metrics to SegmentMetrics."""
        # 调用父类的初始化方法
        super().__init__(dataloader, save_dir, pbar, args, _callbacks)
        # 初始化额外的属性
        self.plot_masks = None
        self.process = None
        # 将任务设置为 'segment',并初始化评估指标为 SegmentMetrics
        self.args.task = "segment"
        self.metrics = SegmentMetrics(save_dir=self.save_dir, on_plot=self.on_plot)

    def preprocess(self, batch):
        """Preprocesses batch by converting masks to float and sending to device."""
        # 调用父类的预处理方法
        batch = super().preprocess(batch)
        # 将批次中的 masks 转换为 float 类型,并发送到设备上
        batch["masks"] = batch["masks"].to(self.device).float()
        return batch

    def init_metrics(self, model):
        """Initialize metrics and select mask processing function based on save_json flag."""
        # 调用父类的初始化评估指标方法
        super().init_metrics(model)
        # 初始化绘制 masks 的列表
        self.plot_masks = []
        # 如果设置了保存为 JSON 格式,则检查所需的 pycocotools 版本
        if self.args.save_json:
            check_requirements("pycocotools>=2.0.6")
        # 根据保存标志选择处理 masks 的函数
        # 如果设置了保存为 JSON 或 TXT,则选择更精确的本地处理函数
        self.process = ops.process_mask_native if self.args.save_json or self.args.save_txt else ops.process_mask
        # 初始化统计信息字典
        self.stats = dict(tp_m=[], tp=[], conf=[], pred_cls=[], target_cls=[], target_img=[])

    def get_desc(self):
        """Return a formatted description of evaluation metrics."""
        # 返回格式化的评估指标描述字符串
        return ("%22s" + "%11s" * 10) % (
            "Class",
            "Images",
            "Instances",
            "Box(P",
            "R",
            "mAP50",
            "mAP50-95)",
            "Mask(P",
            "R",
            "mAP50",
            "mAP50-95)",
        )
    def postprocess(self, preds):
        """
        Post-processes YOLO predictions and returns output detections with proto.

        Args:
            preds (list): List of prediction outputs from YOLO model.

        Returns:
            tuple: A tuple containing processed predictions (p) and prototype data (proto).
        """
        # Perform non-maximum suppression on the first prediction output
        p = ops.non_max_suppression(
            preds[0],
            self.args.conf,
            self.args.iou,
            labels=self.lb,
            multi_label=True,
            agnostic=self.args.single_cls,
            max_det=self.args.max_det,
            nc=self.nc,
        )
        # Determine the prototype data from the second prediction output
        proto = preds[1][-1] if len(preds[1]) == 3 else preds[1]  # second output is len 3 if pt, but only 1 if exported
        return p, proto

    def _prepare_batch(self, si, batch):
        """
        Prepares a batch for training or inference by processing images and targets.

        Args:
            si (int): Index of the current sample in the batch.
            batch (dict): Dictionary containing batch data including images and targets.

        Returns:
            dict: A prepared batch dictionary with additional 'masks' data.
        """
        # Call superclass method to prepare the batch
        prepared_batch = super()._prepare_batch(si, batch)
        # Determine which indices to use for masks based on overlap_mask flag
        midx = [si] if self.args.overlap_mask else batch["batch_idx"] == si
        # Add masks data to the prepared batch
        prepared_batch["masks"] = batch["masks"][midx]
        return prepared_batch

    def _prepare_pred(self, pred, pbatch, proto):
        """
        Prepares predictions for training or inference by processing images and targets.

        Args:
            pred (Tensor): Predictions from the model.
            pbatch (dict): Prepared batch data.
            proto (Tensor): Prototype data for processing masks.

        Returns:
            tuple: A tuple containing processed predictions (predn) and processed masks (pred_masks).
        """
        # Call superclass method to prepare predictions
        predn = super()._prepare_pred(pred, pbatch)
        # Process masks using prototype data and prediction outputs
        pred_masks = self.process(proto, pred[:, 6:], pred[:, :4], shape=pbatch["imgsz"])
        return predn, pred_masks
    # 更新评估指标的方法
    def update_metrics(self, preds, batch):
        """Metrics."""
        # 遍历预测结果的每个样本
        for si, (pred, proto) in enumerate(zip(preds[0], preds[1])):
            # 增加已处理样本计数
            self.seen += 1
            # 计算当前预测的数量
            npr = len(pred)
            # 初始化统计数据结构
            stat = dict(
                conf=torch.zeros(0, device=self.device),  # 置信度列表
                pred_cls=torch.zeros(0, device=self.device),  # 预测类别列表
                tp=torch.zeros(npr, self.niou, dtype=torch.bool, device=self.device),  # True Positive 列表
                tp_m=torch.zeros(npr, self.niou, dtype=torch.bool, device=self.device),  # True Positive for Masked 列表
            )
            # 准备批次数据
            pbatch = self._prepare_batch(si, batch)
            # 分离出类别和边界框数据
            cls, bbox = pbatch.pop("cls"), pbatch.pop("bbox")
            # 计算目标类别和独特类别
            nl = len(cls)
            stat["target_cls"] = cls
            stat["target_img"] = cls.unique()
            
            # 如果没有预测结果,但有真实标签
            if npr == 0:
                if nl:
                    # 将统计数据添加到总体统计中
                    for k in self.stats.keys():
                        self.stats[k].append(stat[k])
                    # 如果需要绘图,处理混淆矩阵
                    if self.args.plots:
                        self.confusion_matrix.process_batch(detections=None, gt_bboxes=bbox, gt_cls=cls)
                continue

            # 处理掩膜数据
            gt_masks = pbatch.pop("masks")
            
            # 准备预测数据
            if self.args.single_cls:
                pred[:, 5] = 0
            predn, pred_masks = self._prepare_pred(pred, pbatch, proto)
            stat["conf"] = predn[:, 4]
            stat["pred_cls"] = predn[:, 5]

            # 如果有真实标签,评估预测结果
            if nl:
                stat["tp"] = self._process_batch(predn, bbox, cls)
                stat["tp_m"] = self._process_batch(
                    predn, bbox, cls, pred_masks, gt_masks, self.args.overlap_mask, masks=True
                )
                # 如果需要绘图,处理混淆矩阵
                if self.args.plots:
                    self.confusion_matrix.process_batch(predn, bbox, cls)

            # 将统计数据添加到总体统计中
            for k in self.stats.keys():
                self.stats[k].append(stat[k])

            # 转换预测掩膜为Tensor,并添加到绘图列表中
            pred_masks = torch.as_tensor(pred_masks, dtype=torch.uint8)
            if self.args.plots and self.batch_i < 3:
                self.plot_masks.append(pred_masks[:15].cpu())  # 选取前15个样本进行绘图

            # 保存预测结果到JSON文件
            if self.args.save_json:
                self.pred_to_json(
                    predn,
                    batch["im_file"][si],
                    ops.scale_image(
                        pred_masks.permute(1, 2, 0).contiguous().cpu().numpy(),
                        pbatch["ori_shape"],
                        ratio_pad=batch["ratio_pad"][si],
                    ),
                )
            # 保存预测结果到文本文件
            if self.args.save_txt:
                self.save_one_txt(
                    predn,
                    pred_masks,
                    self.args.save_conf,
                    pbatch["ori_shape"],
                    self.save_dir / "labels" / f'{Path(batch["im_file"][si]).stem}.txt',
                )
    def finalize_metrics(self, *args, **kwargs):
        """
        Sets speed and confusion matrix for evaluation metrics.
        """
        # 将速度和混淆矩阵设置为评估指标中的属性值
        self.metrics.speed = self.speed
        self.metrics.confusion_matrix = self.confusion_matrix

    def _process_batch(self, detections, gt_bboxes, gt_cls, pred_masks=None, gt_masks=None, overlap=False, masks=False):
        """
        Compute correct prediction matrix for a batch based on bounding boxes and optional masks.

        Args:
            detections (torch.Tensor): Tensor of shape (N, 6) representing detected bounding boxes and
                associated confidence scores and class indices. Each row is of the format [x1, y1, x2, y2, conf, class].
            gt_bboxes (torch.Tensor): Tensor of shape (M, 4) representing ground truth bounding box coordinates.
                Each row is of the format [x1, y1, x2, y2].
            gt_cls (torch.Tensor): Tensor of shape (M,) representing ground truth class indices.
            pred_masks (torch.Tensor | None): Tensor representing predicted masks, if available. The shape should
                match the ground truth masks.
            gt_masks (torch.Tensor | None): Tensor of shape (M, H, W) representing ground truth masks, if available.
            overlap (bool): Flag indicating if overlapping masks should be considered.
            masks (bool): Flag indicating if the batch contains mask data.

        Returns:
            (torch.Tensor): A correct prediction matrix of shape (N, 10), where 10 represents different IoU levels.

        Note:
            - If `masks` is True, the function computes IoU between predicted and ground truth masks.
            - If `overlap` is True and `masks` is True, overlapping masks are taken into account when computing IoU.

        Example:
            ```python
            detections = torch.tensor([[25, 30, 200, 300, 0.8, 1], [50, 60, 180, 290, 0.75, 0]])
            gt_bboxes = torch.tensor([[24, 29, 199, 299], [55, 65, 185, 295]])
            gt_cls = torch.tensor([1, 0])
            correct_preds = validator._process_batch(detections, gt_bboxes, gt_cls)
            ```py
        """
        if masks:
            # 如果处理的是带有掩码数据的批次
            if overlap:
                # 如果要考虑重叠的掩码
                nl = len(gt_cls)
                # 创建索引并扩展掩码以匹配预测掩码的形状
                index = torch.arange(nl, device=gt_masks.device).view(nl, 1, 1) + 1
                gt_masks = gt_masks.repeat(nl, 1, 1)
                gt_masks = torch.where(gt_masks == index, 1.0, 0.0)
            if gt_masks.shape[1:] != pred_masks.shape[1:]:
                # 如果地面真实掩码的形状与预测掩码的形状不匹配,进行插值操作
                gt_masks = F.interpolate(gt_masks[None], pred_masks.shape[1:], mode="bilinear", align_corners=False)[0]
                gt_masks = gt_masks.gt_(0.5)
            # 计算掩码的 IoU
            iou = mask_iou(gt_masks.view(gt_masks.shape[0], -1), pred_masks.view(pred_masks.shape[0], -1))
        else:  # 处理框
            # 计算框的 IoU
            iou = box_iou(gt_bboxes, detections[:, :4])

        # 返回匹配预测结果
        return self.match_predictions(detections[:, 5], gt_cls, iou)
    def plot_val_samples(self, batch, ni):
        """Plots validation samples with bounding box labels."""
        # 使用自定义函数 plot_images 绘制验证样本图像,并添加边界框标签
        plot_images(
            batch["img"],  # 图像数据
            batch["batch_idx"],  # 批次索引
            batch["cls"].squeeze(-1),  # 压缩类别信息
            batch["bboxes"],  # 边界框信息
            masks=batch["masks"],  # 可选参数,掩膜信息
            paths=batch["im_file"],  # 图像文件路径
            fname=self.save_dir / f"val_batch{ni}_labels.jpg",  # 保存文件名
            names=self.names,  # 类别名称映射
            on_plot=self.on_plot,  # 绘图回调函数
        )

    def plot_predictions(self, batch, preds, ni):
        """Plots batch predictions with masks and bounding boxes."""
        # 使用自定义函数 plot_images 绘制预测结果图像,包括掩膜和边界框
        plot_images(
            batch["img"],  # 图像数据
            *output_to_target(preds[0], max_det=15),  # 将预测转换为目标格式,最多15个检测结果
            torch.cat(self.plot_masks, dim=0) if len(self.plot_masks) else self.plot_masks,  # 组合绘制的掩膜信息
            paths=batch["im_file"],  # 图像文件路径
            fname=self.save_dir / f"val_batch{ni}_pred.jpg",  # 保存文件名
            names=self.names,  # 类别名称映射
            on_plot=self.on_plot,  # 绘图回调函数
        )  # pred
        self.plot_masks.clear()  # 清空掩膜列表

    def save_one_txt(self, predn, pred_masks, save_conf, shape, file):
        """Save YOLO detections to a txt file in normalized coordinates in a specific format."""
        # 使用 Results 类保存 YOLO 检测结果到文本文件,使用指定的格式和坐标
        from ultralytics.engine.results import Results

        Results(
            np.zeros((shape[0], shape[1]), dtype=np.uint8),  # 创建一个全零数组作为占位符
            path=None,  # 不保存路径信息
            names=self.names,  # 类别名称映射
            boxes=predn[:, :6],  # 边界框信息
            masks=pred_masks,  # 掩膜信息
        ).save_txt(file, save_conf=save_conf)  # 调用 Results 类的 save_txt 方法保存文本文件

    def pred_to_json(self, predn, filename, pred_masks):
        """
        Save one JSON result.

        Examples:
             >>> result = {"image_id": 42, "category_id": 18, "bbox": [258.15, 41.29, 348.26, 243.78], "score": 0.236}
        """
        # 导入 pycocotools.mask 中的 encode 函数
        from pycocotools.mask import encode  # noqa

        def single_encode(x):
            """Encode predicted masks as RLE and append results to jdict."""
            # 将预测的掩膜编码为 RLE,并追加到结果字典 jdict 中
            rle = encode(np.asarray(x[:, :, None], order="F", dtype="uint8"))[0]
            rle["counts"] = rle["counts"].decode("utf-8")  # 将编码后的 counts 字段解码为 UTF-8 格式
            return rle

        stem = Path(filename).stem  # 获取文件名的主干部分
        image_id = int(stem) if stem.isnumeric() else stem  # 如果主干部分是数字,则转换为整数作为 image_id
        box = ops.xyxy2xywh(predn[:, :4])  # 将边界框格式从 xyxy 转换为 xywh
        box[:, :2] -= box[:, 2:] / 2  # 将边界框的中心点坐标转换为左上角坐标
        pred_masks = np.transpose(pred_masks, (2, 0, 1))  # 转置掩膜数据的维度顺序
        with ThreadPool(NUM_THREADS) as pool:  # 使用线程池并行处理
            rles = pool.map(single_encode, pred_masks)  # 并行编码掩膜数据
        for i, (p, b) in enumerate(zip(predn.tolist(), box.tolist())):  # 遍历预测结果和边界框
            self.jdict.append(  # 将结果以字典形式追加到 jdict 中
                {
                    "image_id": image_id,  # 图像 ID
                    "category_id": self.class_map[int(p[5])],  # 类别 ID,通过 class_map 映射获取
                    "bbox": [round(x, 3) for x in b],  # 边界框坐标,保留三位小数
                    "score": round(p[4], 5),  # 分数,保留五位小数
                    "segmentation": rles[i],  # 掩膜编码结果
                }
            )
    def eval_json(self, stats):
        """Return COCO-style object detection evaluation metrics."""
        # 检查是否需要保存 JSON,并且数据格式为 COCO,并且 jdict 不为空
        if self.args.save_json and self.is_coco and len(self.jdict):
            # 定义标注文件和预测文件的路径
            anno_json = self.data["path"] / "annotations/instances_val2017.json"  # annotations
            pred_json = self.save_dir / "predictions.json"  # predictions
            # 记录评估过程中使用的文件
            LOGGER.info(f"\nEvaluating pycocotools mAP using {pred_json} and {anno_json}...")
            try:  # https://github.com/cocodataset/cocoapi/blob/master/PythonAPI/pycocoEvalDemo.ipynb
                # 检查并导入 pycocotools 所需的版本
                check_requirements("pycocotools>=2.0.6")
                from pycocotools.coco import COCO  # noqa
                from pycocotools.cocoeval import COCOeval  # noqa

                # 确保注释文件和预测文件存在
                for x in anno_json, pred_json:
                    assert x.is_file(), f"{x} file not found"
                # 初始化 COCO 对象用于注释
                anno = COCO(str(anno_json))  # init annotations api
                # 加载预测结果用于 COCO 对象
                pred = anno.loadRes(str(pred_json))  # init predictions api (must pass string, not Path)
                # 进行两种评估:bbox 和 segm
                for i, eval in enumerate([COCOeval(anno, pred, "bbox"), COCOeval(anno, pred, "segm")]):
                    # 如果是 COCO 格式,设置图像 IDs 用于评估
                    if self.is_coco:
                        eval.params.imgIds = [int(Path(x).stem) for x in self.dataloader.dataset.im_files]  # im to eval
                    eval.evaluate()
                    eval.accumulate()
                    eval.summarize()
                    # 更新统计信息中的 mAP50-95 和 mAP50
                    idx = i * 4 + 2
                    stats[self.metrics.keys[idx + 1]], stats[self.metrics.keys[idx]] = eval.stats[
                        :2
                    ]  # update mAP50-95 and mAP50
            except Exception as e:
                # 捕获异常并记录警告信息
                LOGGER.warning(f"pycocotools unable to run: {e}")
        # 返回更新后的统计信息
        return stats

.\yolov8\ultralytics\models\yolo\segment\__init__.py

# 导入模块:从当前包中导入 SegmentationPredictor、SegmentationTrainer 和 SegmentationValidator 类
from .predict import SegmentationPredictor
from .train import SegmentationTrainer
from .val import SegmentationValidator

# __all__ 变量定义:指定在使用 `from package import *` 时应导入的公共接口
__all__ = "SegmentationPredictor", "SegmentationTrainer", "SegmentationValidator"

.\yolov8\ultralytics\models\yolo\world\train.py

# Ultralytics YOLO 

标签:三十六,__,nn,self,Yolov8,源码,c2,c1,def
From: https://www.cnblogs.com/apachecn/p/18398121

相关文章