ResNet详解
ResNet在2015年由微软实验室提出,斩获当年ImageNet竞赛中分类任务第一名,目标检测第一名。获得COCO数据集中目标检测第一名,图像分割第一名。
模型中的亮点:
- 超深的网络结构
- 提出Residual模块
- 使用Batch Normalization加速训练(丢弃Dropout)
- 梯度消失或梯度爆炸
- 退化问题
Residual架构
其中bottleneck的\(1\times1\)卷积用来降低维度和升高维度
Batch Normalization
Batch Normalization的目的是使得一批(Batch)数据的特征图均满足均值为0,方差为1的分布规律。
- \(\mu\quad\sigma^2\)在正向传播过程中统计得到
- \(\gamma\quad\beta\) 在反向传播过程中训练得到
Input: Values of x over a mini-batch:\(\mathcal{B}=\{x_{1...m}\}\)
Parameters to be learned:\(\gamma,\beta\)
$\begin{align}\mu_B&\leftarrow\frac1m\sum_{i=1}^mx_i\qquad //mini-batch,mean\
\sigma_B2&\leftarrow\frac1m\sum_{i=1}m(x_i-\mu_{\mathcal{B}})^2\qquad // mini-batch, variance\
\widehat{x_i}&\leftarrow\frac{x_i-\mu_{\mathcal{B}}}{\sqrt{\sigma_{\mathcal{B}}^2+\epsilon}}\qquad // normalize\
y_i&\leftarrow\gamma\widehat{x_i}+\beta=BN_{\beta,\gamma}(x_i)\qquad //scale,and,shift
使用Pytorch搭建ResNet模型
搭建Residual块
当输出通道和尺寸与输入不能直接进行相加时,通过downsample,也就是步幅为2的\(1\times1\)卷积层,也就是右图中虚线部分,来使得输入经过分支downsample后能够与主干进行相加。
ResNet50以下使用的是BasicBlock块
# 定义3x3的卷积块
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
# 定义1x1的卷积块
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
# ResNet18和34使用的残差块
class BasicBlock(nn.Module):
# 输出通道相对于输入通道的倍数
expansion: int = 1
def __init__(
self,
inplanes: int, # 输入通道数
planes: int, # 输出通道数
stride: int = 1, # 步幅
downsample: Optional[nn.Module] = None, # 分支为虚线时,使得分支输出与主干输出形状一致
groups: int = 1, # 分组卷积数量
base_width: int = 64, # 基础的通道数
dilation: int = 1, # 卷积时每间隔dilation取值,而不是连续取值
norm_layer: Optional[Callable[..., nn.Module]] = None, # 正则化层
) -> None:
super().__init__()
# 正则化层为None时,默认使用批量标准化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
# Basicblock不支持分组卷积和64之外的基础通道数
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
# 也不支持dilation > 1
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
# 第一个3x3卷积层
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
# 第二个3x3卷积层
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
# 虚线的下采样层
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
搭建Bottleneck块
ResNet50及以上使用的残差块是Bottleneck块,包含2个\(1\times1\)卷积层和1个\(3\times3\)卷积层,和一个分枝连接。
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion: int = 4 # 即输出通道是输入通道的4倍
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64,
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
# 正则化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
# 1x1卷积层
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
# 3x3卷积层
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
# 1x1卷积层
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
构建ResNet类
class ResNet(nn.Module):
def __init__(
self,
block: Type[Union[BasicBlock, Bottleneck]], # 使用的残差块
layers: List[int], # 传入一个整型列表,为每层的残差块数量
num_classes: int = 1000, # 分类类别
zero_init_residual: bool = False, # 是否初始化为0
groups: int = 1, # 分组卷积
width_per_group: int = 64, #
replace_stride_with_dilation: Optional[List[bool]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
# 初始化正则化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
# 默认conv1的输出通道为64
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
f"or a 3-element tuple, got {replace_stride_with_dilation}"
)
# 分组卷积组数
self.groups = groups
# block中经过第一个卷积层后的输出通道
self.base_width = width_per_group
# 对应结构表中的conv1
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
# 对应conv1后的最大池化层
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# 对应conv2.x,输入通道为64
self.layer1 = self._make_layer(block, 64, layers[0])
# 对应conv3.x,输入通道为128
self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
# 对应conv4.x,输入通道为256
self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
# 对应conv5.x,输入通道为512
self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
# 对应conv5.x后的自适应平均池化层,使得输出大小为1x1
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
# 最后一层为全连接层,结合softmax输出概率分布
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
# 如果是卷积层,就使用kaiming_norm来初始化卷积层权重
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
# 如果是正则化层
# 则初始化权重为1
# 初始化偏置为0
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
# 初始化残差块最后一层的批量归一化层的权重为0,能提高0.2~0.3%的准确率
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck) and m.bn3.weight is not None:
nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type]
elif isinstance(m, BasicBlock) and m.bn2.weight is not None:
nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
# 构建conv1~5
def _make_layer(
self,
block: Type[Union[BasicBlock, Bottleneck]],
planes: int,
blocks: int,
stride: int = 1,
dilate: bool = False,
) -> nn.Sequential:
# 初始化正则化层
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
# 如果块的输入通道与输出通道不同 或 步幅不为1改变了大小
# 需要经过一个1x1的卷积层和正则化层来调整大小
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
)
)
# 更新block的输出通道数,作为接下来block的输入通道数
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
# 实现正向传播方法
def _forward_impl(self, x: Tensor) -> Tensor:
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
# 实现正向传播
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
模型脚本文件
from typing import Any, Callable, List, Optional, Type, Union
import torch
import torch.nn as nn
from torch import Tensor
# 定义3x3的卷积块
def conv3x3(in_planes: int, out_planes: int, stride: int = 1, groups: int = 1, dilation: int = 1) -> nn.Conv2d:
"""3x3 convolution with padding"""
return nn.Conv2d(
in_planes,
out_planes,
kernel_size=3,
stride=stride,
padding=dilation,
groups=groups,
bias=False,
dilation=dilation,
)
# 定义1x1的卷积块
def conv1x1(in_planes: int, out_planes: int, stride: int = 1) -> nn.Conv2d:
"""1x1 convolution"""
return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
# ResNet18和34使用的残差块
class BasicBlock(nn.Module):
# 输出通道相对于输入通道的倍数
expansion: int = 1
def __init__(
self,
inplanes: int, # 输入通道数
planes: int, # 输出通道数
stride: int = 1, # 步幅
downsample: Optional[nn.Module] = None, # 分支为虚线时,使得分支输出与主干输出形状一致
groups: int = 1, # 分组卷积数量
base_width: int = 64, # 基础的通道数
dilation: int = 1, # 卷积时每间隔dilation取值,而不是连续取值
norm_layer: Optional[Callable[..., nn.Module]] = None, # 正则化层
) -> None:
super().__init__()
# 正则化层为None时,默认使用批量标准化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
# Basicblock不支持分组卷积和64之外的基础通道数
if groups != 1 or base_width != 64:
raise ValueError("BasicBlock only supports groups=1 and base_width=64")
# 也不支持dilation > 1
if dilation > 1:
raise NotImplementedError("Dilation > 1 not supported in BasicBlock")
# Both self.conv1 and self.downsample layers downsample the input when stride != 1
# 第一个3x3卷积层
self.conv1 = conv3x3(inplanes, planes, stride)
self.bn1 = norm_layer(planes)
self.relu = nn.ReLU(inplace=True)
# 第二个3x3卷积层
self.conv2 = conv3x3(planes, planes)
self.bn2 = norm_layer(planes)
# 虚线的下采样层
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class Bottleneck(nn.Module):
# Bottleneck in torchvision places the stride for downsampling at 3x3 convolution(self.conv2)
# while original implementation places the stride at the first 1x1 convolution(self.conv1)
# according to "Deep residual learning for image recognition"https://arxiv.org/abs/1512.03385.
# This variant is also known as ResNet V1.5 and improves accuracy according to
# https://ngc.nvidia.com/catalog/model-scripts/nvidia:resnet_50_v1_5_for_pytorch.
expansion: int = 4 # 即输出通道是输入通道的4倍
def __init__(
self,
inplanes: int,
planes: int,
stride: int = 1,
downsample: Optional[nn.Module] = None,
groups: int = 1,
base_width: int = 64, # 经过conv1后的输出通道,默认为64
# 空洞卷积只会应用在BasicBlock的第一个卷积,
# 或者是BottleneckBlock的第2个卷积。并且padding == dilation,使得输入输出特征图大小不变
dilation: int = 1,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
# 正则化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
# 计算bottleneck网络中的宽度
width = int(planes * (base_width / 64.0)) * groups
# Both self.conv2 and self.downsample layers downsample the input when stride != 1
# 1x1卷积层
self.conv1 = conv1x1(inplanes, width)
self.bn1 = norm_layer(width)
# 3x3卷积层
self.conv2 = conv3x3(width, width, stride, groups, dilation)
self.bn2 = norm_layer(width)
# 1x1卷积层
self.conv3 = conv1x1(width, planes * self.expansion)
self.bn3 = norm_layer(planes * self.expansion)
self.relu = nn.ReLU(inplace=True)
self.downsample = downsample
self.stride = stride
def forward(self, x: Tensor) -> Tensor:
identity = x
out = self.conv1(x)
out = self.bn1(out)
out = self.relu(out)
out = self.conv2(out)
out = self.bn2(out)
out = self.relu(out)
out = self.conv3(out)
out = self.bn3(out)
# 当分支是虚线连接时,需要进行1x1卷积,使得分支输出与主干输出形状一致
if self.downsample is not None:
identity = self.downsample(x)
out += identity
out = self.relu(out)
return out
class ResNet(nn.Module):
def __init__(
self,
block: Type[Union[BasicBlock, Bottleneck]], # 使用的残差块
layers: List[int], # 传入一个整型列表,为每层的残差块数量
num_classes: int = 1000, # 分类类别
zero_init_residual: bool = False, # 是否初始化为0
groups: int = 1, # 分组卷积
width_per_group: int = 64, #
replace_stride_with_dilation: Optional[List[bool]] = None,
norm_layer: Optional[Callable[..., nn.Module]] = None,
) -> None:
super().__init__()
# 初始化正则化层
if norm_layer is None:
norm_layer = nn.BatchNorm2d
self._norm_layer = norm_layer
# 默认conv1的输出通道为64
self.inplanes = 64
self.dilation = 1
if replace_stride_with_dilation is None:
# each element in the tuple indicates if we should replace
# the 2x2 stride with a dilated convolution instead
replace_stride_with_dilation = [False, False, False]
if len(replace_stride_with_dilation) != 3:
raise ValueError(
"replace_stride_with_dilation should be None "
f"or a 3-element tuple, got {replace_stride_with_dilation}"
)
# 分组卷积组数
self.groups = groups
# block中经过第一个卷积层后的输出通道
self.base_width = width_per_group
# 对应结构表中的conv1
self.conv1 = nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False)
self.bn1 = norm_layer(self.inplanes)
self.relu = nn.ReLU(inplace=True)
# 对应conv1后的最大池化层
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
# 对应conv2.x,输入通道为64
self.layer1 = self._make_layer(block, 64, layers[0])
# 对应conv3.x,输入通道为128
self.layer2 = self._make_layer(block, 128, layers[1], stride=2, dilate=replace_stride_with_dilation[0])
# 对应conv4.x,输入通道为256
self.layer3 = self._make_layer(block, 256, layers[2], stride=2, dilate=replace_stride_with_dilation[1])
# 对应conv5.x,输入通道为512
self.layer4 = self._make_layer(block, 512, layers[3], stride=2, dilate=replace_stride_with_dilation[2])
# 对应conv5.x后的自适应平均池化层,使得输出大小为1x1
self.avgpool = nn.AdaptiveAvgPool2d((1, 1))
# 最后一层为全连接层,结合softmax输出概率分布
self.fc = nn.Linear(512 * block.expansion, num_classes)
for m in self.modules():
if isinstance(m, nn.Conv2d):
# 如果是卷积层,就使用kaiming_norm来初始化卷积层权重
nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu")
elif isinstance(m, (nn.BatchNorm2d, nn.GroupNorm)):
# 如果是正则化层
# 则初始化权重为1
# 初始化偏置为0
nn.init.constant_(m.weight, 1)
nn.init.constant_(m.bias, 0)
# Zero-initialize the last BN in each residual branch,
# so that the residual branch starts with zeros, and each residual block behaves like an identity.
# This improves the model by 0.2~0.3% according to https://arxiv.org/abs/1706.02677
# 初始化残差块最后一层的批量归一化层的权重为0,能提高0.2~0.3%的准确率
if zero_init_residual:
for m in self.modules():
if isinstance(m, Bottleneck) and m.bn3.weight is not None:
nn.init.constant_(m.bn3.weight, 0) # type: ignore[arg-type]
elif isinstance(m, BasicBlock) and m.bn2.weight is not None:
nn.init.constant_(m.bn2.weight, 0) # type: ignore[arg-type]
# 构建conv1~5
def _make_layer(
self,
block: Type[Union[BasicBlock, Bottleneck]],
planes: int,
blocks: int,
stride: int = 1,
dilate: bool = False,
) -> nn.Sequential:
# 初始化正则化层
norm_layer = self._norm_layer
downsample = None
previous_dilation = self.dilation
if dilate:
self.dilation *= stride
stride = 1
# 如果块的输入通道与输出通道不同 或 步幅不为1改变了大小
# 需要经过一个1x1的卷积层和正则化层来调整大小
if stride != 1 or self.inplanes != planes * block.expansion:
downsample = nn.Sequential(
conv1x1(self.inplanes, planes * block.expansion, stride),
norm_layer(planes * block.expansion),
)
layers = []
layers.append(
block(
self.inplanes, planes, stride, downsample, self.groups, self.base_width, previous_dilation, norm_layer
)
)
# 更新block的输出通道数,作为接下来block的输入通道数
self.inplanes = planes * block.expansion
for _ in range(1, blocks):
layers.append(
block(
self.inplanes,
planes,
groups=self.groups,
base_width=self.base_width,
dilation=self.dilation,
norm_layer=norm_layer,
)
)
return nn.Sequential(*layers)
# 实现正向传播方法
def _forward_impl(self, x: Tensor) -> Tensor:
# See note [TorchScript super()]
x = self.conv1(x)
x = self.bn1(x)
x = self.relu(x)
x = self.maxpool(x)
x = self.layer1(x)
x = self.layer2(x)
x = self.layer3(x)
x = self.layer4(x)
x = self.avgpool(x)
x = torch.flatten(x, 1)
x = self.fc(x)
return x
# 实现正向传播
def forward(self, x: Tensor) -> Tensor:
return self._forward_impl(x)
def _resnet(
block: Type[Union[BasicBlock, Bottleneck]],
layers: List[int],
**kwargs: Any,
) -> ResNet:
model = ResNet(block, layers, **kwargs)
return model
def resnet18(**kwargs: Any) -> ResNet:
return _resnet(BasicBlock, [2, 2, 2, 2], **kwargs)
def resnet34(**kwargs: Any) -> ResNet:
return _resnet(BasicBlock, [3, 4, 6, 3], **kwargs)
def resnet50(**kwargs: Any) -> ResNet:
return _resnet(Bottleneck, [3, 4, 6, 3], **kwargs)
def resnet101(**kwargs: Any) -> ResNet:
return _resnet(Bottleneck, [3, 4, 23, 3], **kwargs)
def resnet152(**kwargs: Any) -> ResNet:
return _resnet(Bottleneck, [3, 8, 36, 3], **kwargs)
训练脚本文件
import os
import sys
import json
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets
from tqdm import tqdm
# 导入模型resnet18
from model_office import resnet18 as Net
def main():
# 指定设备为gpu或cpu
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print('using {} device.'.format(device))
# 数据增强
data_transform = {
'train': transforms.Compose([transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])]),
'val': transforms.Compose([transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
}
# 图片路径
image_path = r"D:\卷积神经网络PPT\AlexNet\Alex_Torch\data"
# 获取训练集
train_dataset = datasets.ImageFolder(root=os.path.join(image_path, 'train'),
transform=data_transform['train'])
# 获取训练集数量
train_num = len(train_dataset)
# 获取ImageFolder读取类文件夹的类索引
# 并类索引写入到json文件
flower_list = train_dataset.class_to_idx
cla_dict = dict((val, key) for key, val in flower_list.items())
json_str = json.dumps(cla_dict, indent=4)
with open('./class_indices.json', 'w') as json_file:
json_file.write(json_str)
# 设置训练批次
batch_size = 32
# 设置读取数据的进程数
nw = min([os.cpu_count(), batch_size if batch_size > 1 else 0, 8])
print('using {} dataloader workers every process'.format(nw))
# 加载训练集
train_loader = torch.utils.data.DataLoader2(train_dataset,
batch_size=batch_size,
shuffle=True,
num_workers=nw,
pin_memory=True)
# 获取验证集
valid_dataset = datasets.ImageFolder(root=os.path.join(image_path, 'val'),
transform=data_transform['val'])
# 获取验证集数量
val_num = len(valid_dataset)
# 加载验证集
valid_loader = torch.utils.data.DataLoader2(valid_dataset,
batch_size=batch_size,
shuffle=False,
num_workers=nw,
pin_memory=True)
print('using {} images for training, {} images for validation'.format(train_num, val_num))
# 使用模型初始化resnet18,设置分类类别为3
net = Net(num_classes=5).to(device)
loss_function = nn.CrossEntropyLoss().to(device)
params = [p for p in net.parameters() if p.requires_grad]
optimizer = optim.AdamW(params, lr=1e-4, weight_decay=5e-2)
epochs = 30
best_acc = 0.0
save_path = './resNet34.pth'
for epoch in range(epochs):
# train
net.train()
running_loss = 0.0
train_bar = tqdm(train_loader, file=sys.stdout)
for step, data in enumerate(train_bar):
images, labels = data
optimizer.zero_grad(set_to_none=True)
logits = net(images.to(device))
loss = loss_function(logits, labels.to(device))
loss.backward()
optimizer.step()
running_loss += loss.item()
train_bar.desc = 'train epoch[{}/{}] loss:{:.3f}'.format(epoch, epochs, loss.item())
# validate
net.eval()
acc = 0.0
sample_num = 0
with torch.no_grad():
val_bar = tqdm(valid_loader, file=sys.stdout)
for val_data in val_bar:
val_images, val_labels = val_data
sample_num += val_images.shape[0]
outputs = net(val_images.to(device))
# loss=loss_function(outputs,labels.to(device))
predict = torch.argmax(outputs, dim=1)
acc += torch.eq(predict, val_labels.to(device)).sum().item()
val_bar.desc = 'valid epoch[{}/{}] accuracy: {:.4f}'.format(epoch, epochs, acc / sample_num)
if acc / sample_num > best_acc:
best_acc = acc / sample_num
torch.save(net.state_dict(), save_path)
print('Finished Training')
if __name__ == '__main__':
main()
预测脚本文件
import os
import json
import torch
from PIL import Image
from torchvision import transforms
import matplotlib.pyplot as plt
from model import resnet34
from model_office import resnet18
def main():
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 验证集图片数据增强
data_transform = transforms.Compose(
[transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5])])
# 加载图片
img_path = "./img.png"
assert os.path.exists(img_path), "file: '{}' dose not exist.".format(img_path)
img = Image.open(img_path)
plt.imshow(img)
# 对图像进行预处理
img = data_transform(img)
# 为图像增加一个批次维度
img = torch.unsqueeze(img, dim=0)
# 读取类字典
json_path = './class_indices.json'
assert os.path.exists(json_path), "file: '{}' dose not exist.".format(json_path)
with open(json_path, "r") as f:
class_indict = json.load(f)
# 创建模型
model = resnet18(num_classes=5).to(device)
# 加载模型权重
weights_path = "./resNet34.pth"
assert os.path.exists(weights_path), "file: '{}' dose not exist.".format(weights_path)
model.load_state_dict(torch.load(weights_path, map_location=device))
# 对土坯那进行预测
model.eval()
with torch.no_grad():
# predict class
output = torch.squeeze(model(img.to(device))).cpu()
predict = torch.softmax(output, dim=0)
predict_cla = torch.argmax(predict).numpy()
print_res = "class: {} prob: {:.4}".format(class_indict[str(predict_cla)],
predict[predict_cla].numpy())
plt.title(print_res)
for i in range(len(predict)):
print("class: {:10} prob: {:.4}".format(class_indict[str(i)],
predict[i].numpy()))
plt.show()
if __name__ == '__main__':
main()
使用TensorFlow搭建ResNet模型
搭建Residual块
class BasicBlock(layers.Layer):
expansion = 1
def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
super(BasicBlock, self).__init__(**kwargs)
self.conv1 = layers.Conv2D(out_channel,
kernel_size=3,
strides=strides,
padding='same',
use_bias=False)
self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
self.conv2 = layers.Conv2D(out_channel,
kernel_size=3,
strides=1,
padding='same',
use_bias=False)
self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
self.downsample = downsample
self.relu = layers.ReLU()
self.add = layers.Add()
def call(self, inputs, training=False):
identity = inputs
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
if self.downsample is not None:
identity = self.downsample(inputs)
x = self.add([identity, x])
x = self.relu(x)
return x
搭建Bottleneck块
class Bottleneck(layers.Layer):
expansion = 4
def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
super(Bottleneck, self).__init__(**kwargs)
self.conv1 = layers.Conv2D(out_channel, kernel_size=1, use_bias=False, name='conv1')
self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')
self.conv2 = layers.Conv2D(out_channel, kernel_size=3, use_bias=False, strides=strides,
padding='same', name='conv2')
self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv2/BatchNorm')
self.conv3 = layers.Conv2D(out_channel * self.expansion, kernel_size=1, use_bias=False, name='conv3')
self.bn3 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv3/BatchNorm')
self.relu = layers.ReLU()
self.downsample = downsample
self.add = layers.Add()
def call(self, inputs, training=False):
identity = inputs
if self.downsample is not None:
identity = self.downsample(inputs)
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = self.relu(x)
x = self.conv3(x)
x = self.bn3(x, training=training)
x = self.add([x, identity])
x = self.relu(x)
return x
def _make_layer(block, in_channel, channel, block_num, name, strides=1):
downsample = None
if strides != 1 or in_channel != channel * block.expansion:
downsample = Sequential([
layers.Conv2D(channel * block.expansion, kernel_size=1, strides=strides,
use_bias=False, name='conv1'),
layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='BatchNorm')
], name='shortcut')
layers_list = []
layers_list.append(block(channel, downsample=downsample, strides=strides, name='unit_1'))
for index in range(1, block_num):
layers_list.append(block(channel, name='unit_' + str(index + 1)))
return Sequential(layers_list, name=name)
使用函数式方法构建模型
def _resnet(block, blocks_num, im_width=224, im_height=224, num_classes=1000, include_top=True):
input_image = layers.Input(shape=(im_width, im_height, 3), dtype='float32')
x = layers.Conv2D(filters=64, kernel_size=7, strides=2,
padding='same', use_bias=False, name='conv1')(input_image)
x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')(x)
x = layers.ReLU()(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)
x = _make_layer(block, x.shape[-1], 64, blocks_num[0], name="block1")(x)
x = _make_layer(block, x.shape[-1], 128, blocks_num[1], strides=2, name="block2")(x)
x = _make_layer(block, x.shape[-1], 256, blocks_num[2], strides=2, name="block3")(x)
x = _make_layer(block, x.shape[-1], 512, blocks_num[3], strides=2, name="block4")(x)
if include_top:
x = layers.GlobalAvgPool2D()(x)
x = layers.Dense(num_classes, name='logits')(x)
predict = layers.Softmax()(x)
else:
predict = x
model = Model(inputs=input_image, outputs=predict)
return model
模型脚本
from tensorflow import keras
from keras import layers, Model, Sequential
class BasicBlock(layers.Layer):
expansion = 1
def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
super(BasicBlock, self).__init__(**kwargs)
self.conv1 = layers.Conv2D(out_channel,
kernel_size=3,
strides=strides,
padding='same',
use_bias=False)
self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
self.conv2 = layers.Conv2D(out_channel,
kernel_size=3,
strides=1,
padding='same',
use_bias=False)
self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5)
self.downsample = downsample
self.relu = layers.ReLU()
self.add = layers.Add()
def call(self, inputs, training=False):
identity = inputs
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
if self.downsample is not None:
identity = self.downsample(inputs)
x = self.add([identity, x])
x = self.relu(x)
return x
class Bottleneck(layers.Layer):
expansion = 4
def __init__(self, out_channel, strides=1, downsample=None, **kwargs):
super(Bottleneck, self).__init__(**kwargs)
self.conv1 = layers.Conv2D(out_channel, kernel_size=1, use_bias=False, name='conv1')
self.bn1 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')
self.conv2 = layers.Conv2D(out_channel, kernel_size=3, use_bias=False, strides=strides,
padding='same', name='conv2')
self.bn2 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv2/BatchNorm')
self.conv3 = layers.Conv2D(out_channel * self.expansion, kernel_size=1, use_bias=False, name='conv3')
self.bn3 = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv3/BatchNorm')
self.relu = layers.ReLU()
self.downsample = downsample
self.add = layers.Add()
def call(self, inputs, training=False):
identity = inputs
if self.downsample is not None:
identity = self.downsample(inputs)
x = self.conv1(inputs)
x = self.bn1(x, training=training)
x = self.relu(x)
x = self.conv2(x)
x = self.bn2(x, training=training)
x = self.relu(x)
x = self.conv3(x)
x = self.bn3(x, training=training)
x = self.add([x, identity])
x = self.relu(x)
return x
def _make_layer(block, in_channel, channel, block_num, name, strides=1):
downsample = None
if strides != 1 or in_channel != channel * block.expansion:
downsample = Sequential([
layers.Conv2D(channel * block.expansion, kernel_size=1, strides=strides,
use_bias=False, name='conv1'),
layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='BatchNorm')
], name='shortcut')
layers_list = []
layers_list.append(block(channel, downsample=downsample, strides=strides, name='unit_1'))
for index in range(1, block_num):
layers_list.append(block(channel, name='unit_' + str(index + 1)))
return Sequential(layers_list, name=name)
def _resnet(block, blocks_num, im_width=224, im_height=224, num_classes=1000, include_top=True):
input_image = layers.Input(shape=(im_width, im_height, 3), dtype='float32')
x = layers.Conv2D(filters=64, kernel_size=7, strides=2,
padding='same', use_bias=False, name='conv1')(input_image)
x = layers.BatchNormalization(momentum=0.9, epsilon=1e-5, name='conv1/BatchNorm')(x)
x = layers.ReLU()(x)
x = layers.MaxPool2D(pool_size=3, strides=2, padding='same')(x)
x = _make_layer(block, x.shape[-1], 64, blocks_num[0], name="block1")(x)
x = _make_layer(block, x.shape[-1], 128, blocks_num[1], strides=2, name="block2")(x)
x = _make_layer(block, x.shape[-1], 256, blocks_num[2], strides=2, name="block3")(x)
x = _make_layer(block, x.shape[-1], 512, blocks_num[3], strides=2, name="block4")(x)
if include_top:
x = layers.GlobalAvgPool2D()(x)
x = layers.Dense(num_classes, name='logits')(x)
predict = layers.Softmax()(x)
else:
predict = x
model = Model(inputs=input_image, outputs=predict)
return model
def resnet18(im_width=224, im_height=224, num_classes=1000, include_top=True):
return _resnet(BasicBlock, [2, 2, 2, 2], im_width, im_height, num_classes, include_top)
def resnet34(im_width=224, im_height=224, num_classes=1000, include_top=True):
return _resnet(BasicBlock, [3, 4, 6, 3], im_width, im_height, num_classes, include_top)
def resnet50(im_width=224, im_height=224, num_classes=1000, include_top=True):
return _resnet(Bottleneck, [3, 4, 6, 3], im_width, im_height, num_classes, include_top)
def resnet101(im_width=224, im_height=224, num_classes=1000, include_top=True):
return _resnet(Bottleneck, [3, 4, 23, 3], im_width, im_height, num_classes, include_top)
训练脚本
import glob
import json
import os
import random
import sys
import tensorflow as tf
from tqdm import tqdm
from model import resnet18 as Net
os.environ['CUDA_DEVICE_ORDER'] = 'PCI_BUS_ID'
os.environ['CUDA_VISIBLE_DEVICES'] = '0'
def main():
gpus = tf.config.list_physical_devices('GPU')
if gpus:
try:
for gpu in gpus:
tf.config.experimental.set_memory_growth(gpu, True)
except RuntimeError as e:
print(e)
exit(-1)
image_path = r"D:\卷积神经网络PPT\AlexNet\Alex_Torch\data"
train_dir = os.path.join(image_path, 'train')
valid_dir = os.path.join(image_path, 'val')
if not os.path.exists('weights'):
os.mkdir('weights')
im_height = 224
im_width = 224
batch_size = 32
epochs = 30
data_class = [cla for cla in os.listdir(train_dir)]
class_num = len(data_class)
cla_dict = dict((val, key) for key, val in enumerate(data_class))
inverse_dict = dict((val, key) for key, val in cla_dict.items())
json_str = json.dumps(inverse_dict, indent=4)
with open('class_indices.json', 'w') as json_file:
json_file.write(json_str)
# 加载训练集图片列表
random.seed(42)
train_image_list = glob.glob(train_dir + '/*/*.jpg')
random.shuffle(train_image_list)
train_num = len(train_image_list)
train_label_list = [cla_dict[path.split(os.path.sep)[-2]] for path in train_image_list]
# 加载验证集图片列表
valid_image_list = glob.glob(valid_dir + '/*/*.jpg')
valid_num = len(valid_image_list)
valid_label_list = [cla_dict[path.split(os.path.sep)[-2]] for path in valid_image_list]
print('using {} images for training, {} images for validation.'.format(train_num, valid_num))
@tf.function
def process_train_img(img_path, label):
# 将标签转换为独热向量
label = tf.one_hot(label, depth=class_num)
image = tf.io.read_file(img_path)
image = tf.image.decode_jpeg(image)
# image = tf.image.convert_image_dtype(image, tf.float32)
image = tf.cast(image, tf.float32)
image = tf.image.resize(image, [im_height, im_width])
image = tf.image.random_flip_left_right(image)
image = (image - 0.5) / 0.5
# image = image - [_R_MEAN, _G_MEAN, _B_MEAN]
return image, label
@tf.function
def process_valid_img(img_path, label):
label = tf.one_hot(label, depth=class_num)
image = tf.io.read_file(img_path)
image = tf.image.decode_jpeg(image)
image = tf.cast(image, tf.float32)
image = tf.image.resize(image, [im_height, im_width])
image = (image - 0.5) / 0.5
return image, label
AUTOTUNE = tf.data.AUTOTUNE
# load train dataset
train_ds = tf.data.Dataset.from_tensor_slices((train_image_list, train_label_list))
train_ds = train_ds.shuffle(buffer_size=train_num) \
.map(process_train_img, num_parallel_calls=AUTOTUNE).batch(batch_size) \
.prefetch(AUTOTUNE)
# load valid dataset
valid_ds = tf.data.Dataset.from_tensor_slices((valid_image_list, valid_label_list))
valid_ds = valid_ds.map(process_valid_img, num_parallel_calls=AUTOTUNE).batch(batch_size)
# 实例化模型
model = Net(num_classes=5, include_top=True)
model.summary()
# using keras low level api for training
loss_object = tf.keras.losses.CategoricalCrossentropy(from_logits=False)
optimizer = tf.keras.optimizers.Adam(learning_rate=1e-4)
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_acc = tf.keras.metrics.CategoricalAccuracy(name='train_acc')
valid_loss = tf.keras.metrics.Mean(name='valid_loss')
valid_acc = tf.keras.metrics.CategoricalAccuracy(name='valid_acc')
@tf.function
def train_step(images, labels):
with tf.GradientTape() as tape:
outputs = model(images, training=True)
loss = loss_object(labels, outputs)
gradients = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
train_loss(loss)
train_acc(labels, outputs)
@tf.function
def valid_step(images, labels):
outputs = model(images, training=False)
v_loss = loss_object(labels, outputs)
valid_loss(v_loss)
valid_acc(labels, outputs)
best_valid_loss = float('inf')
for epoch in range(1, epochs + 1):
train_loss.reset_states()
train_acc.reset_states()
valid_loss.reset_states()
valid_acc.reset_states()
train_bar = tqdm(train_ds, file=sys.stdout)
for step, (images, labels) in enumerate(train_bar):
train_step(images, labels)
# print train process
train_bar.desc = 'train epoch[{}/{}] loss:{:.4f}, acc:{:.4f}'.format(epoch, epochs, train_loss.result(),
train_acc.result())
# validate
val_bar = tqdm(valid_ds, file=sys.stdout)
for images, labels in val_bar:
valid_step(images, labels)
# print val process
val_bar.desc = 'valid epoch[{}/{}] loss:{:.4f}, acc:{:.4f}'.format(epoch, epochs, valid_loss.result(),
valid_acc.result())
if valid_loss.result() < best_valid_loss:
model.save_weights('./weights/myResNet.ckpt', save_format='tf')
if __name__ == '__main__':
main()
预测脚本
import os
import json
import glob
import numpy as np
from PIL import Image
import tensorflow as tf
import matplotlib.pyplot as plt
from model import resnet34
def main():
im_height = 224
im_width = 224
num_classes = 5
# load image
img_path = r'./img.png'
img = Image.open(img_path).convert('RGB')
# resize image to 224X224
img = img.resize((im_width, im_height))
plt.imshow(img)
# scaling pixel value to (0-1)
img = np.array(img).astype(np.float32)
img = (img - 0.5) / 0.5
# Add the image to a batch where it's the only member
img = np.expand_dims(img, 0)
# read class_indict
json_path = './class_indices.json'
with open(json_path, 'r') as f:
cla_dict = json.load(f)
# create model
model = resnet34(num_classes=num_classes, include_top=True)
# load weights
weights_path = './weights/myResNet.ckpt'
model.load_weights(weights_path)
# prediction
result = np.squeeze(model.predict(img))
predict_class = np.argmax(result)
predict_class = np.argmax(result)
print_res = "class: {} prob: {:.3}".format(cla_dict[str(predict_class)],
result[predict_class])
plt.title(print_res)
for i in range(len(result)):
print("class: {:10} prob: {:.3}".format(cla_dict[str(i)],
result[i]))
plt.show()
if __name__ == '__main__':
main()
标签:layers,layer,nn,self,ResNet,stride,out
From: https://www.cnblogs.com/Reion/p/16634329.html