首页 > 其他分享 >ResNet

ResNet

时间:2022-08-28 22:59:56浏览次数:45  
标签:layers layer nn self ResNet stride out

ResNet详解

ResNet在2015年由微软实验室提出,斩获当年ImageNet竞赛中分类任务第一名,目标检测第一名。获得COCO数据集中目标检测第一名,图像分割第一名。
模型中的亮点:

  • 超深的网络结构
  • 提出Residual模块
  • 使用Batch Normalization加速训练(丢弃Dropout)

image.png

  • 梯度消失或梯度爆炸
  • 退化问题

image.png
image.png

Residual架构

image.png
其中bottleneck的\(1\times1\)卷积用来降低维度和升高维度
image.png

Batch Normalization

Batch Normalization的目的是使得一批(Batch)数据的特征图均满足均值为0,方差为1的分布规律。

  • \(\mu\quad\sigma^2\)在正向传播过程中统计得到
  • \(\gamma\quad\beta\) 在反向传播过程中训练得到

Input: Values of x over a mini-batch:\(\mathcal{B}=\{x_{1...m}\}\)
Parameters to be learned:\(\gamma,\beta\)
$\begin{align}\mu_B&\leftarrow\frac1m\sum_{i=1}^mx_i\qquad //mini-batch,mean\
\sigma_B2&\leftarrow\frac1m\sum_{i=1}m(x_i-\mu_{\mathcal{B}})^2\qquad // mini-batch, variance\
\widehat{x_i}&\leftarrow\frac{x_i-\mu_{\mathcal{B}}}{\sqrt{\sigma_{\mathcal{B}}^2+\epsilon}}\qquad // normalize\
y_i&\leftarrow\gamma\widehat{x_i}+\beta=BN_{\beta,\gamma}(x_i)\qquad //scale,and,shift

\end{align}$

使用Pytorch搭建ResNet模型

搭建Residual块

image.png
当输出通道和尺寸与输入不能直接进行相加时,通过downsample,也就是步幅为2的\(1\times1\)卷积层,也就是右图中虚线部分,来使得输入经过分支downsample后能够与主干进行相加。
ResNet50以下使用的是BasicBlock块
image.png

# 定义3x3的卷积块
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )


# 定义1x1的卷积块
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


# ResNet18和34使用的残差块
class BasicBlock(nn.Module):
    # 输出通道相对于输入通道的倍数
    expansion: int = 1

    def __init__(
            self,
            inplanes: int,  # 输入通道数
            planes: int,  # 输出通道数
            stride: int = 1,  # 步幅
            downsample: Optional[nn.Module] = None,  # 分支为虚线时,使得分支输出与主干输出形状一致
            groups: int = 1,  # 分组卷积数量
            base_width: int = 64,  # 基础的通道数
            dilation: int = 1,  # 卷积时每间隔dilation取值,而不是连续取值
            norm_layer: Optional[Callable[..., nn.Module]] = None,  # 正则化层
    ) -> None:
        super().__init__()
        # 正则化层为None时,默认使用批量标准化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        # Basicblock不支持分组卷积和64之外的基础通道数
        if groups != 1 or base_width != 64:
            raise ValueError("BasicBlock only supports groups=1 and base_width=64")
        # 也不支持dilation > 1
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        # 第一个3x3卷积层
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        # 第二个3x3卷积层
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        # 虚线的下采样层
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

搭建Bottleneck块

ResNet50及以上使用的残差块是Bottleneck块,包含2个\(1\times1\)卷积层和1个\(3\times3\)卷积层,和一个分枝连接。
image.png

class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion: int = 4  # 即输出通道是输入通道的4倍

    def __init__(
            self,
            inplanes: int,
            planes: int,
            stride: int = 1,
            downsample: Optional[nn.Module] = None,
            groups: int = 1,
            base_width: int = 64,
            dilation: int = 1,
            norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        # 正则化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        width = int(planes * (base_width / 64.0)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        # 1x1卷积层
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        # 3x3卷积层
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        # 1x1卷积层
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

构建ResNet类

class ResNet(nn.Module):
    def __init__(
            self,
            block: Type[Union[BasicBlock, Bottleneck]],  # 使用的残差块
            layers: List[int],  # 传入一个整型列表,为每层的残差块数量
            num_classes: int = 1000,  # 分类类别
            zero_init_residual: bool = False,  # 是否初始化为0
            groups: int = 1,  # 分组卷积
            width_per_group: int = 64,  #
            replace_stride_with_dilation: Optional[List[bool]] = None,
            norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        # 初始化正则化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        # 默认conv1的输出通道为64
        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError(
                "replace_stride_with_dilation should be None "
                f"or a 3-element tuple, got {replace_stride_with_dilation}"
            )
        # 分组卷积组数
        self.groups = groups
        # block中经过第一个卷积层后的输出通道
        self.base_width = width_per_group
        # 对应结构表中的conv1
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        # 对应conv1后的最大池化层
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # 对应conv2.x,输入通道为64
        self.layer1 = self._make_layer(block, 64, layers[0])
        # 对应conv3.x,输入通道为128
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        # 对应conv4.x,输入通道为256
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        # 对应conv5.x,输入通道为512
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        # 对应conv5.x后的自适应平均池化层,使得输出大小为1x1
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # 最后一层为全连接层,结合softmax输出概率分布
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # 如果是卷积层,就使用kaiming_norm来初始化卷积层权重
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                # 如果是正则化层
                # 则初始化权重为1
                # 初始化偏置为0
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        # 初始化残差块最后一层的批量归一化层的权重为0,能提高0.2~0.3%的准确率
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck) and m.bn3.weight is not None:
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock) and m.bn2.weight is not None:
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    # 构建conv1~5
    def _make_layer(
            self,
            block: Type[Union[BasicBlock, Bottleneck]],
            planes: int,
            blocks: int,
            stride: int = 1,
            dilate: bool = False,
    ) -> nn.Sequential:
        # 初始化正则化层
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        # 如果块的输入通道与输出通道不同 或 步幅不为1改变了大小
        # 需要经过一个1x1的卷积层和正则化层来调整大小
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(
            block(
                self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
            )
        )
        # 更新block的输出通道数,作为接下来block的输入通道数
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(
                    self.inplanes,
                    planes,
                    groups=self.groups,
                    base_width=self.base_width,
                    dilation=self.dilation,
                    norm_layer=norm_layer,
                )
            )

        return nn.Sequential(*layers)

    # 实现正向传播方法
    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    # 实现正向传播
    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)

模型脚本文件

from typing import Any, Callable, List, Optional, Type, Union

import torch
import torch.nn as nn
from torch import Tensor


# 定义3x3的卷积块
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )


# 定义1x1的卷积块
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)


# ResNet18和34使用的残差块
class BasicBlock(nn.Module):
    # 输出通道相对于输入通道的倍数
    expansion: int = 1

    def __init__(
            self,
            inplanes: int,  # 输入通道数
            planes: int,  # 输出通道数
            stride: int = 1,  # 步幅
            downsample: Optional[nn.Module] = None,  # 分支为虚线时,使得分支输出与主干输出形状一致
            groups: int = 1,  # 分组卷积数量
            base_width: int = 64,  # 基础的通道数
            dilation: int = 1,  # 卷积时每间隔dilation取值,而不是连续取值
            norm_layer: Optional[Callable[..., nn.Module]] = None,  # 正则化层
    ) -> None:
        super().__init__()
        # 正则化层为None时,默认使用批量标准化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        # Basicblock不支持分组卷积和64之外的基础通道数
        if groups != 1 or base_width != 64:
            raise ValueError("BasicBlock only supports groups=1 and base_width=64")
        # 也不支持dilation > 1
        if dilation > 1:
            raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
        # Both self.conv1 and self.downsample layers downsample the input when stride != 1
        # 第一个3x3卷积层
        self.conv1 = conv3x3(inplanes, planes, stride)
        self.bn1 = norm_layer(planes)
        self.relu = nn.ReLU(inplace=True)
        # 第二个3x3卷积层
        self.conv2 = conv3x3(planes, planes)
        self.bn2 = norm_layer(planes)
        # 虚线的下采样层
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class Bottleneck(nn.Module):
    # Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
    # while original implementation places the stride at the first 1x1 convolution(self.conv1)
    # according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
    # This variant is also known as ResNet V1.5 and improves accuracy according to
    # https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.

    expansion: int = 4  # 即输出通道是输入通道的4倍

    def __init__(
            self,
            inplanes: int,
            planes: int,
            stride: int = 1,
            downsample: Optional[nn.Module] = None,
            groups: int = 1,
            base_width: int = 64,  # 经过conv1后的输出通道,默认为64
            # 空洞卷积只会应用在BasicBlock的第一个卷积,
            # 或者是BottleneckBlock的第2个卷积。并且padding == dilation,使得输入输出特征图大小不变
            dilation: int = 1,
            norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        # 正则化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        # 计算bottleneck网络中的宽度
        width = int(planes * (base_width / 64.0)) * groups
        # Both self.conv2 and self.downsample layers downsample the input when stride != 1
        # 1x1卷积层
        self.conv1 = conv1x1(inplanes, width)
        self.bn1 = norm_layer(width)
        # 3x3卷积层
        self.conv2 = conv3x3(width, width, stride, groups, dilation)
        self.bn2 = norm_layer(width)
        # 1x1卷积层
        self.conv3 = conv1x1(width, planes * self.expansion)
        self.bn3 = norm_layer(planes * self.expansion)
        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x: Tensor) -> Tensor:
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)

        out = self.conv3(out)
        out = self.bn3(out)
        # 当分支是虚线连接时,需要进行1x1卷积,使得分支输出与主干输出形状一致
        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out


class ResNet(nn.Module):
    def __init__(
            self,
            block: Type[Union[BasicBlock, Bottleneck]],  # 使用的残差块
            layers: List[int],  # 传入一个整型列表,为每层的残差块数量
            num_classes: int = 1000,  # 分类类别
            zero_init_residual: bool = False,  # 是否初始化为0
            groups: int = 1,  # 分组卷积
            width_per_group: int = 64,  #
            replace_stride_with_dilation: Optional[List[bool]] = None,
            norm_layer: Optional[Callable[..., nn.Module]] = None,
    ) -> None:
        super().__init__()
        # 初始化正则化层
        if norm_layer is None:
            norm_layer = nn.BatchNorm2d
        self._norm_layer = norm_layer

        # 默认conv1的输出通道为64
        self.inplanes = 64
        self.dilation = 1
        if replace_stride_with_dilation is None:
            # each element in the tuple indicates if we should replace
            # the 2x2 stride with a dilated convolution instead
            replace_stride_with_dilation = [False, False, False]
        if len(replace_stride_with_dilation) != 3:
            raise ValueError(
                "replace_stride_with_dilation should be None "
                f"or a 3-element tuple, got {replace_stride_with_dilation}"
            )
        # 分组卷积组数
        self.groups = groups
        # block中经过第一个卷积层后的输出通道
        self.base_width = width_per_group
        # 对应结构表中的conv1
        self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = norm_layer(self.inplanes)
        self.relu = nn.ReLU(inplace=True)
        # 对应conv1后的最大池化层
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        # 对应conv2.x,输入通道为64
        self.layer1 = self._make_layer(block, 64, layers[0])
        # 对应conv3.x,输入通道为128
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
        # 对应conv4.x,输入通道为256
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
        # 对应conv5.x,输入通道为512
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
        # 对应conv5.x后的自适应平均池化层,使得输出大小为1x1
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
        # 最后一层为全连接层,结合softmax输出概率分布
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                # 如果是卷积层,就使用kaiming_norm来初始化卷积层权重
                nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
            elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
                # 如果是正则化层
                # 则初始化权重为1
                # 初始化偏置为0
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

        # Zero-initialize the last BN in each residual branch,
        # so that the residual branch starts with zeros, and each residual block behaves like an identity.
        # This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
        # 初始化残差块最后一层的批量归一化层的权重为0,能提高0.2~0.3%的准确率
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck) and m.bn3.weight is not None:
                    nn.init.constant_(m.bn3.weight, 0)  # type: ignore[arg-type]
                elif isinstance(m, BasicBlock) and m.bn2.weight is not None:
                    nn.init.constant_(m.bn2.weight, 0)  # type: ignore[arg-type]

    # 构建conv1~5
    def _make_layer(
            self,
            block: Type[Union[BasicBlock, Bottleneck]],
            planes: int,
            blocks: int,
            stride: int = 1,
            dilate: bool = False,
    ) -> nn.Sequential:
        # 初始化正则化层
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if dilate:
            self.dilation *= stride
            stride = 1
        # 如果块的输入通道与输出通道不同 或 步幅不为1改变了大小
        # 需要经过一个1x1的卷积层和正则化层来调整大小
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * block.expansion, stride),
                norm_layer(planes * block.expansion),
            )

        layers = []
        layers.append(
            block(
                self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
            )
        )
        # 更新block的输出通道数,作为接下来block的输入通道数
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(
                block(
                    self.inplanes,
                    planes,
                    groups=self.groups,
                    base_width=self.base_width,
                    dilation=self.dilation,
                    norm_layer=norm_layer,
                )
            )

        return nn.Sequential(*layers)

    # 实现正向传播方法
    def _forward_impl(self, x: Tensor) -> Tensor:
        # See note [TorchScript super()]
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)

        return x

    # 实现正向传播
    def forward(self, x: Tensor) -> Tensor:
        return self._forward_impl(x)


def _resnet(
        block: Type[Union[BasicBlock, Bottleneck]],
        layers: List[int],
        **kwargs: Any,
) -> ResNet:
    model = ResNet(block, layers, **kwargs)

    return model


def resnet18(**kwargs: Any) -> ResNet:
    return _resnet(BasicBlock, [2, 2, 2, 2], **kwargs)


def resnet34(**kwargs: Any) -> ResNet:
    return _resnet(BasicBlock, [3, 4, 6, 3], **kwargs)


def resnet50(**kwargs: Any) -> ResNet:
    return _resnet(Bottleneck, [3, 4, 6, 3], **kwargs)


def resnet101(**kwargs: Any) -> ResNet:
    return _resnet(Bottleneck, [3, 4, 23, 3], **kwargs)


def resnet152(**kwargs: Any) -> ResNet:
    return _resnet(Bottleneck, [3, 8, 36, 3], **kwargs)

训练脚本文件

import os
import sys
import json

import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm

# 导入模型resnet18
from model_office import resnet18 as Net


def main():
    # 指定设备为gpu或cpu
    device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
    print('using {} device.'.format(device))
    # 数据增强
    data_transform = {
        'train': transforms.Compose([transforms.RandomResizedCrop(224),
                                     transforms.RandomHorizontalFlip(),
                                     transforms.ToTensor(),
                                     transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
        'val': transforms.Compose([transforms.Resize(256),
                                   transforms.CenterCrop(224),
                                   transforms.ToTensor(),
                                   transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
    }

    # 图片路径
    image_path = r"D:\卷积神经网络PPT\AlexNet\Alex_Torch\data"
    # 获取训练集
    train_dataset = datasets.ImageFolder(root=os.path.join(image_path, 'train'),
                                         transform=data_transform['train'])
    # 获取训练集数量
    train_num = len(train_dataset)

    # 获取ImageFolder读取类文件夹的类索引
    # 并类索引写入到json文件
    flower_list = train_dataset.class_to_idx
    cla_dict = dict((val, key) for key, val in flower_list.items())

    json_str = json.dumps(cla_dict, indent=4)
    with open('./class_indices.json', 'w') as json_file:
        json_file.write(json_str)
    # 设置训练批次
    batch_size = 32
    # 设置读取数据的进程数
    nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])
    print('using {} dataloader workers every process'.format(nw))
    # 加载训练集
    train_loader = torch.utils.data.DataLoader2(train_dataset,
                                                batch_size=batch_size,
                                                shuffle=True,
                                                num_workers=nw,
                                                pin_memory=True)
    # 获取验证集
    valid_dataset = datasets.ImageFolder(root=os.path.join(image_path, 'val'),
                                         transform=data_transform['val'])
    # 获取验证集数量
    val_num = len(valid_dataset)
    # 加载验证集
    valid_loader = torch.utils.data.DataLoader2(valid_dataset,
                                                batch_size=batch_size,
                                                shuffle=False,
                                                num_workers=nw,
                                                pin_memory=True)
    print('using {} images for training, {} images for validation'.format(train_num, val_num))
    # 使用模型初始化resnet18,设置分类类别为3
    net = Net(num_classes=5).to(device)

    loss_function = nn.CrossEntropyLoss().to(device)

    params = [p for p in net.parameters() if p.requires_grad]
    optimizer = optim.AdamW(params, lr=1e-4, weight_decay=5e-2)

    epochs = 30
    best_acc = 0.0
    save_path = './resNet34.pth'
    for epoch in range(epochs):
        # train
        net.train()
        running_loss = 0.0
        train_bar = tqdm(train_loader, file=sys.stdout)
        for step, data in enumerate(train_bar):
            images, labels = data
            optimizer.zero_grad(set_to_none=True)
            logits = net(images.to(device))
            loss = loss_function(logits, labels.to(device))
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            train_bar.desc = 'train epoch[{}/{}] loss:{:.3f}'.format(epoch, epochs, loss.item())

        # validate
        net.eval()
        acc = 0.0
        sample_num = 0
        with torch.no_grad():
            val_bar = tqdm(valid_loader, file=sys.stdout)
            for val_data in val_bar:
                val_images, val_labels = val_data
                sample_num += val_images.shape[0]
                outputs = net(val_images.to(device))
                # loss=loss_function(outputs,labels.to(device))
                predict = torch.argmax(outputs, dim=1)
                acc += torch.eq(predict, val_labels.to(device)).sum().item()

                val_bar.desc = 'valid epoch[{}/{}] accuracy: {:.4f}'.format(epoch, epochs, acc / sample_num)
        if acc / sample_num > best_acc:
            best_acc = acc / sample_num
            torch.save(net.state_dict(), save_path)
    print('Finished Training')


if __name__ == '__main__':
    main()

预测脚本文件

import os
import json

import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt

from model import resnet34
from model_office import resnet18

def main():
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    # 验证集图片数据增强
    data_transform = transforms.Compose(
        [transforms.Resize(256),
         transforms.CenterCrop(224),
         transforms.ToTensor(),
         transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])

    # 加载图片
    img_path = "./img.png"
    assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path)
    img = Image.open(img_path)
    plt.imshow(img)
    # 对图像进行预处理
    img = data_transform(img)
    # 为图像增加一个批次维度
    img = torch.unsqueeze(img, dim=0)

    # 读取类字典
    json_path = './class_indices.json'
    assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)

    with open(json_path, "r") as f:
        class_indict = json.load(f)

    # 创建模型
    model = resnet18(num_classes=5).to(device)

    # 加载模型权重
    weights_path = "./resNet34.pth"
    assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
    model.load_state_dict(torch.load(weights_path, map_location=device))

    # 对土坯那进行预测
    model.eval()
    with torch.no_grad():
        # predict class
        output = torch.squeeze(model(img.to(device))).cpu()
        predict = torch.softmax(output, dim=0)
        predict_cla = torch.argmax(predict).numpy()

    print_res = "class: {}   prob: {:.4}".format(class_indict[str(predict_cla)],
                                                 predict[predict_cla].numpy())
    plt.title(print_res)
    for i in range(len(predict)):
        print("class: {:10}   prob: {:.4}".format(class_indict[str(i)],
                                                  predict[i].numpy()))
    plt.show()


if __name__ == '__main__':
    main()

image.png

使用TensorFlow搭建ResNet模型

搭建Residual块

class BasicBlock(layers.Layer):
    expansion = 1

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(out_channel,
                                   kernel_size=3,
                                   strides=strides,
                                   padding='same',
                                   use_bias=False)
        self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.conv2 = layers.Conv2D(out_channel,
                                   kernel_size=3,
                                   strides=1,
                                   padding='same',
                                   use_bias=False)
        self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.downsample = downsample
        self.relu = layers.ReLU()
        self.add = layers.Add()

    def call(self, inputs, training=False):
        identity = inputs

        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        if self.downsample is not None:
            identity = self.downsample(inputs)

        x = self.add([identity, x])
        x = self.relu(x)

        return x

搭建Bottleneck块

class Bottleneck(layers.Layer):
    expansion = 4

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(Bottleneck, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(out_channel, kernel_size=1, use_bias=False, name='conv1')
        self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')

        self.conv2 = layers.Conv2D(out_channel, kernel_size=3, use_bias=False, strides=strides,
                                   padding='same', name='conv2')
        self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv2/BatchNorm')

        self.conv3 = layers.Conv2D(out_channel * self.expansion, kernel_size=1, use_bias=False, name='conv3')
        self.bn3 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv3/BatchNorm')

        self.relu = layers.ReLU()
        self.downsample = downsample
        self.add = layers.Add()

    def call(self, inputs, training=False):
        identity = inputs
        if self.downsample is not None:
            identity = self.downsample(inputs)
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x, training=training)

        x = self.add([x, identity])
        x = self.relu(x)

        return x


def _make_layer(block, in_channel, channel, block_num, name, strides=1):
    downsample = None
    if strides != 1 or in_channel != channel * block.expansion:
        downsample = Sequential([
            layers.Conv2D(channel * block.expansion, kernel_size=1, strides=strides,
                          use_bias=False, name='conv1'),
            layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='BatchNorm')
        ], name='shortcut')

    layers_list = []
    layers_list.append(block(channel, downsample=downsample, strides=strides, name='unit_1'))
    for index in range(1, block_num):
        layers_list.append(block(channel, name='unit_' + str(index + 1)))

    return Sequential(layers_list, name=name)

使用函数式方法构建模型

def _resnet(block, blocks_num, im_width=224, im_height=224, num_classes=1000, include_top=True):
    input_image = layers.Input(shape=(im_width, im_height, 3), dtype='float32')
    x = layers.Conv2D(filters=64, kernel_size=7, strides=2,
                      padding='same', use_bias=False, name='conv1')(input_image)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')(x)
    x = layers.ReLU()(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    x = _make_layer(block, x.shape[-1], 64, blocks_num[0], name="block1")(x)
    x = _make_layer(block, x.shape[-1], 128, blocks_num[1], strides=2, name="block2")(x)
    x = _make_layer(block, x.shape[-1], 256, blocks_num[2], strides=2, name="block3")(x)
    x = _make_layer(block, x.shape[-1], 512, blocks_num[3], strides=2, name="block4")(x)

    if include_top:
        x = layers.GlobalAvgPool2D()(x)
        x = layers.Dense(num_classes, name='logits')(x)
        predict = layers.Softmax()(x)
    else:
        predict = x
    model = Model(inputs=input_image, outputs=predict)

    return model

模型脚本

from tensorflow import keras
from keras import layers, Model, Sequential


class BasicBlock(layers.Layer):
    expansion = 1

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(BasicBlock, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(out_channel,
                                   kernel_size=3,
                                   strides=strides,
                                   padding='same',
                                   use_bias=False)
        self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.conv2 = layers.Conv2D(out_channel,
                                   kernel_size=3,
                                   strides=1,
                                   padding='same',
                                   use_bias=False)
        self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)

        self.downsample = downsample
        self.relu = layers.ReLU()
        self.add = layers.Add()

    def call(self, inputs, training=False):
        identity = inputs

        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        if self.downsample is not None:
            identity = self.downsample(inputs)

        x = self.add([identity, x])
        x = self.relu(x)

        return x


class Bottleneck(layers.Layer):
    expansion = 4

    def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
        super(Bottleneck, self).__init__(**kwargs)
        self.conv1 = layers.Conv2D(out_channel, kernel_size=1, use_bias=False, name='conv1')
        self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')

        self.conv2 = layers.Conv2D(out_channel, kernel_size=3, use_bias=False, strides=strides,
                                   padding='same', name='conv2')
        self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv2/BatchNorm')

        self.conv3 = layers.Conv2D(out_channel * self.expansion, kernel_size=1, use_bias=False, name='conv3')
        self.bn3 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv3/BatchNorm')

        self.relu = layers.ReLU()
        self.downsample = downsample
        self.add = layers.Add()

    def call(self, inputs, training=False):
        identity = inputs
        if self.downsample is not None:
            identity = self.downsample(inputs)
        x = self.conv1(inputs)
        x = self.bn1(x, training=training)
        x = self.relu(x)

        x = self.conv2(x)
        x = self.bn2(x, training=training)
        x = self.relu(x)

        x = self.conv3(x)
        x = self.bn3(x, training=training)

        x = self.add([x, identity])
        x = self.relu(x)

        return x


def _make_layer(block, in_channel, channel, block_num, name, strides=1):
    downsample = None
    if strides != 1 or in_channel != channel * block.expansion:
        downsample = Sequential([
            layers.Conv2D(channel * block.expansion, kernel_size=1, strides=strides,
                          use_bias=False, name='conv1'),
            layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='BatchNorm')
        ], name='shortcut')

    layers_list = []
    layers_list.append(block(channel, downsample=downsample, strides=strides, name='unit_1'))
    for index in range(1, block_num):
        layers_list.append(block(channel, name='unit_' + str(index + 1)))

    return Sequential(layers_list, name=name)


def _resnet(block, blocks_num, im_width=224, im_height=224, num_classes=1000, include_top=True):
    input_image = layers.Input(shape=(im_width, im_height, 3), dtype='float32')
    x = layers.Conv2D(filters=64, kernel_size=7, strides=2,
                      padding='same', use_bias=False, name='conv1')(input_image)
    x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')(x)
    x = layers.ReLU()(x)
    x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)

    x = _make_layer(block, x.shape[-1], 64, blocks_num[0], name="block1")(x)
    x = _make_layer(block, x.shape[-1], 128, blocks_num[1], strides=2, name="block2")(x)
    x = _make_layer(block, x.shape[-1], 256, blocks_num[2], strides=2, name="block3")(x)
    x = _make_layer(block, x.shape[-1], 512, blocks_num[3], strides=2, name="block4")(x)

    if include_top:
        x = layers.GlobalAvgPool2D()(x)
        x = layers.Dense(num_classes, name='logits')(x)
        predict = layers.Softmax()(x)
    else:
        predict = x
    model = Model(inputs=input_image, outputs=predict)

    return model


def resnet18(im_width=224, im_height=224, num_classes=1000, include_top=True):
    return _resnet(BasicBlock, [2, 2, 2, 2], im_width, im_height, num_classes, include_top)


def resnet34(im_width=224, im_height=224, num_classes=1000, include_top=True):
    return _resnet(BasicBlock, [3, 4, 6, 3], im_width, im_height, num_classes, include_top)


def resnet50(im_width=224, im_height=224, num_classes=1000, include_top=True):
    return _resnet(Bottleneck, [3, 4, 6, 3], im_width, im_height, num_classes, include_top)


def resnet101(im_width=224, im_height=224, num_classes=1000, include_top=True):
    return _resnet(Bottleneck, [3, 4, 23, 3], im_width, im_height, num_classes, include_top)

训练脚本

import glob
import json
import os
import random
import sys

import tensorflow as tf
from tqdm import tqdm

from model import resnet18 as Net

os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '0'


def main():
    gpus = tf.config.list_physical_devices('GPU')
    if gpus:
        try:
            for gpu in gpus:
                tf.config.experimental.set_memory_growth(gpu, True)
        except RuntimeError as e:
            print(e)
            exit(-1)
    image_path = r"D:\卷积神经网络PPT\AlexNet\Alex_Torch\data"

    train_dir = os.path.join(image_path, 'train')
    valid_dir = os.path.join(image_path, 'val')

    if not os.path.exists('weights'):
        os.mkdir('weights')

    im_height = 224
    im_width = 224

    batch_size = 32
    epochs = 30

    data_class = [cla for cla in os.listdir(train_dir)]
    class_num = len(data_class)
    cla_dict = dict((val, key) for key, val in enumerate(data_class))

    inverse_dict = dict((val, key) for key, val in cla_dict.items())

    json_str = json.dumps(inverse_dict, indent=4)
    with open('class_indices.json', 'w') as json_file:
        json_file.write(json_str)
    # 加载训练集图片列表
    random.seed(42)
    train_image_list = glob.glob(train_dir + '/*/*.jpg')
    random.shuffle(train_image_list)
    train_num = len(train_image_list)
    train_label_list = [cla_dict[path.split(os.path.sep)[-2]] for path in train_image_list]

    # 加载验证集图片列表
    valid_image_list = glob.glob(valid_dir + '/*/*.jpg')
    valid_num = len(valid_image_list)
    valid_label_list = [cla_dict[path.split(os.path.sep)[-2]] for path in valid_image_list]

    print('using {} images for training, {} images for validation.'.format(train_num, valid_num))

    @tf.function
    def process_train_img(img_path, label):
        # 将标签转换为独热向量
        label = tf.one_hot(label, depth=class_num)
        image = tf.io.read_file(img_path)
        image = tf.image.decode_jpeg(image)
        # image = tf.image.convert_image_dtype(image, tf.float32)
        image = tf.cast(image, tf.float32)
        image = tf.image.resize(image, [im_height, im_width])
        image = tf.image.random_flip_left_right(image)
        image = (image - 0.5) / 0.5
        # image = image - [_R_MEAN, _G_MEAN, _B_MEAN]
        return image, label

    @tf.function
    def process_valid_img(img_path, label):
        label = tf.one_hot(label, depth=class_num)
        image = tf.io.read_file(img_path)
        image = tf.image.decode_jpeg(image)
        image = tf.cast(image, tf.float32)
        image = tf.image.resize(image, [im_height, im_width])
        image = (image - 0.5) / 0.5

        return image, label

    AUTOTUNE = tf.data.AUTOTUNE

    # load train dataset
    train_ds = tf.data.Dataset.from_tensor_slices((train_image_list, train_label_list))
    train_ds = train_ds.shuffle(buffer_size=train_num) \
        .map(process_train_img, num_parallel_calls=AUTOTUNE).batch(batch_size) \
        .prefetch(AUTOTUNE)

    # load valid dataset
    valid_ds = tf.data.Dataset.from_tensor_slices((valid_image_list, valid_label_list))
    valid_ds = valid_ds.map(process_valid_img, num_parallel_calls=AUTOTUNE).batch(batch_size)

    # 实例化模型
    model = Net(num_classes=5, include_top=True)
    model.summary()

    # using keras low level api for training
    loss_object = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
    optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)

    train_loss = tf.keras.metrics.Mean(name='train_loss')
    train_acc = tf.keras.metrics.CategoricalAccuracy(name='train_acc')

    valid_loss = tf.keras.metrics.Mean(name='valid_loss')
    valid_acc = tf.keras.metrics.CategoricalAccuracy(name='valid_acc')

    @tf.function
    def train_step(images, labels):
        with tf.GradientTape() as tape:
            outputs = model(images, training=True)
            loss = loss_object(labels, outputs)
        gradients = tape.gradient(loss, model.trainable_variables)
        optimizer.apply_gradients(zip(gradients, model.trainable_variables))

        train_loss(loss)
        train_acc(labels, outputs)

    @tf.function
    def valid_step(images, labels):
        outputs = model(images, training=False)
        v_loss = loss_object(labels, outputs)

        valid_loss(v_loss)
        valid_acc(labels, outputs)

    best_valid_loss = float('inf')
    for epoch in range(1, epochs + 1):
        train_loss.reset_states()
        train_acc.reset_states()
        valid_loss.reset_states()
        valid_acc.reset_states()

        train_bar = tqdm(train_ds, file=sys.stdout)
        for step, (images, labels) in enumerate(train_bar):
            train_step(images, labels)

            # print train process
            train_bar.desc = 'train epoch[{}/{}] loss:{:.4f}, acc:{:.4f}'.format(epoch, epochs, train_loss.result(),
                                                                                 train_acc.result())
        # validate
        val_bar = tqdm(valid_ds, file=sys.stdout)
        for images, labels in val_bar:
            valid_step(images, labels)

            # print val process
            val_bar.desc = 'valid epoch[{}/{}] loss:{:.4f}, acc:{:.4f}'.format(epoch, epochs, valid_loss.result(),
                                                                               valid_acc.result())
        if valid_loss.result() < best_valid_loss:
            model.save_weights('./weights/myResNet.ckpt', save_format='tf')


if __name__ == '__main__':
    main()

预测脚本

import os
import json
import glob
import numpy as np

from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt
from model import resnet34


def main():
    im_height = 224
    im_width = 224
    num_classes = 5

    # load image
    img_path = r'./img.png'
    img = Image.open(img_path).convert('RGB')

    # resize image to 224X224
    img = img.resize((im_width, im_height))
    plt.imshow(img)

    # scaling pixel value to (0-1)
    img = np.array(img).astype(np.float32)
    img = (img - 0.5) / 0.5

    # Add the image to a batch where it's the only member
    img = np.expand_dims(img, 0)

    # read class_indict
    json_path = './class_indices.json'

    with open(json_path, 'r') as f:
        cla_dict = json.load(f)

    # create model
    model = resnet34(num_classes=num_classes, include_top=True)

    # load weights
    weights_path = './weights/myResNet.ckpt'
    model.load_weights(weights_path)

    # prediction
    result = np.squeeze(model.predict(img))
    predict_class = np.argmax(result)

    predict_class = np.argmax(result)

    print_res = "class: {}   prob: {:.3}".format(cla_dict[str(predict_class)],
                                                 result[predict_class])
    plt.title(print_res)
    for i in range(len(result)):
        print("class: {:10}   prob: {:.3}".format(cla_dict[str(i)],
                                                  result[i]))
    plt.show()


if __name__ == '__main__':
    main()

image.png

标签:layers,layer,nn,self,ResNet,stride,out
From: https://www.cnblogs.com/Reion/p/16634329.html

相关文章