一、卷积神经网络
卷积神经网络(Convolutional Neural Netword,CNN)是一种深度学习模型,它在图像识别、视频分析、自然语言处理等领域表现出色。CNN
的核心思想是利用卷积运算来提取输入数据的特征,并且能够保持空间层次结构。
卷积神经网络的架构如下:
我们今天的重点是利用卷积神经网络来实践一个花朵分类任务。首先我们来看一下我们的数据集:
我们将不同的花朵分类(102种)存入到不同的文件夹中,根据文件名来进行花朵分类,并且分为训练集和测试集两个数据部分,最后需要进行结果展示的时候,我们通过引入一个 json
文件来将不同文件名映射到对应的花朵名字:
然后我们训练的模型采用 ResNet
网络结构,它是一种深度卷积神经网络,旨在解决深度学习中的退化问题。该网络结构通过引入“快捷连接”(Shortcut connection)来优化训练过程,允许网络层之间的直接连接,从而有效的训练更深层的网络结构。
由于其较浅的网络结构和较少的参数,常用于需要较快速度和较少计算资源的场景。同时它也是许多计算机视觉任务的基础网络结构,如图像分类、目标检测和图像分割。在实际应用中,ResNet 可以通过预训练模型进行迁移学习,快速适应新的数据集和任务。
下面是 ResNet
的网络结构图:
常用的 ResNet
网络结构有 18、50、101、152 层等,出于演示效率考虑,我们本次训练中采用 18 层的 ResNet
网络结构,即 resnet18
,并采用迁移学习
的方法:
- 首先替换模型的全连接层,以适应我们自己的分类任务
- 在第一次训练中,我们冻结除全连接层外的所有其它层参数(不进行参数更新),利用其它层训练好的参数作为初始参数,随机化我们自己添加的全连接层参数,并对其进行训练
- 在第二次训练中,我们解冻除全连接层外的所以其他参数(进行参数更新),并结合我们第一次训练中训练好的参数一起作为初始参数,对整个模型的参数进行重新训练
二、搭建模型并训练
1 导入数据并对数据进行预处理
我们首先在代码中假如如下语句,如有 GPU 则用 GPU 训练,这样能大大提高训练的速度:
# 如有 GPU 则用 GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
导入图片数据的路径:
# 定义图片数据的路径
data_dir = './data/CNN/flower_data/'
train_dir = data_dir + '/train'
valid_dir = data_dir + '/valid'
接下来我们对图片先进行数据预处理:
# 定义需要对图片数据进行的预处理操作
data_transforms = {
'train':
transforms.Compose([ # 一个操作组合
transforms.Resize([96, 96]), # 将所有图片调整为 96*96 的大小(考虑性能的情况下越大越好)
# 根据实际情况设置大小,大部分经典网络用正方形结构
transforms.RandomRotation(45), # 随机旋转,表示从 -45 到 45 度之间随机选角度旋转
transforms.CenterCrop(64), # 从中心开始裁剪成 64*64 大小的图片(一般裁剪成 64、128、224、256 大小)
transforms.RandomHorizontalFlip(p=0.5), # 随机水平翻转,翻转概率为 50%
transforms.RandomVerticalFlip(p=0.5), # 随机垂直翻转
# transforms.ColorJitter(brightness=0.2, contrast=0.1, saturation=0.1, hue=0.1), # 亮度、对比度、饱和度、色相(用的少)
# transforms.RandomGrayscale(p=0.025), # 概率转换成灰度率,3 通道就是 R=G=B(用的少)
transforms.ToTensor(), # 将数据转换为 Tensor 格式
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # 设置均值,标准差(R,G,B)
]),
'valid':
transforms.Compose([
transforms.Resize([64, 64]),
transforms.ToTensor(),
# 测试数据和训练数据的均值和标准差需要一致
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
]),
}
# 加载数据集并对不同数据集进行对应的预处理操作
image_datasets = {x: datasets.ImageFolder(os.path.join(data_dir, x),
data_transforms[x]) for x in ['train', 'valid']}
# 设置 DataLoader
batch_size = 128 # 由于输入图片较小,所以 batch 可以指定大一点
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
shuffle=True) for x in ['train', 'valid']}
在上面的代码中,data_transform 中制定了所有图像预处理操作(比如调整大小、数据增强),ImageFolder 假设所有的文件按文件夹保存好,每个文件夹下面存储同一类别的图片,文件夹的名字为分类的名字,对训练集和验证集都进行预处理操作,然后将二者设置好 batch 大小加载到 dataloader
里面。
2 构建模型
接下来我们开始构建模型:
# 模型初始化
params_to_update = [] # 保存需要更新的参数列表
def initialize_model(model_name, num_classes, feature_extract, use_pretrained=True):
global params_to_update # 在函数内部声明全局变量
# 从 models 模块中动态选择特定模型,并使用预训练权重
model_ft = getattr(models, model_name)(pretrained=True)
# model_ft = models.resnet18(pretrained=use_pretrained) # 选择模型,18 层的比较快(18,50,101,152)
# 如果需要进行迁移学习则先将所有参数设置为不更新
if feature_extract:
for param in model_ft.parameters():
param.requires_grad = False
# 该模型输出层是个 1000 分类,需要修改全连接层,所以需要获取模型中全连接层的输入特征数
num_ftrs = model_ft.fc.in_features
# 更改全连接层,输出类别数根据自己任务更改:102,更改后新层的 requires_grad 属性默认是 True
model_ft.fc = nn.Linear(num_ftrs, num_classes)
# 打印需要更新梯度的参数
params_to_update = [param for param in model_ft.parameters() if param.requires_grad]
print('Params to learn:')
for name, param in model_ft.named_parameters():
if param.requires_grad:
print("\t", name)
return model_ft
# 选择的模型名字
model_name = 'resnet18' # 可选 resnet,alexnet,vgg,squeezenet,densenet,inception 等
# 获取数据的总类别
num_classes = len(image_datasets['train'].classes) # 注意类别排序为 1,10,100,101...,非默认从零开始
# 只更新输出层权重,其他层权重冻住(指示是否仅提取特征,即冻住模型的某些层)
# 迁移学习:照搬别人的模型以及将训练好的权重作为我们的初始化,并根据数据量冻住一些层
feature_extract = True
# 模型初始化并传入 GPU 或 CPU
model_ft = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True).to(device)
这段代码的目的是根据给定的参数初始化一个预训练的模型,并对其进行必要的修改以适应新的分类任务。通过替换全连接层,模型可以输出正确数量的类别。同时,如果 feature_extract 为 True,则模型的参数在训练过程中不会更新,这在迁移学习中很常见,即利用预训练模型的特征提取能力,只训练顶层分类器。
3 设定模型的超参数
接下来我们需要为模型设定超参数,分别是迭代次数
、优化器
、学习率
和损失函数
:
epochs = 30 # 迭代次数
optimizer_ft = optim.Adam(params_to_update, lr=1e-2) # 设置优化器
# 设置学习率衰减(学习率每 10 个 epoch 衰减成原来的 1/10)
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)
criterion = nn.CrossEntropyLoss() # 设置损失函数
值得注意的是我们不是采用固定的学习率,而是使用学习率衰减(Learning Rate Decay)策略,设置学习率衰减有以下好处:
- 避免震荡:在训练初期,如果学习率过高,模型的权重更新可能会过大,导致训练过程中的损失函数值出现较大震荡,难以稳定下降。通过学习率衰减,可以逐步减小更新步长,使模型在训练后期更加稳定。
- 提高收敛速度:在训练初期,较大的学习率可以帮助模型快速逃离局部最小值,但随着训练的进行,需要更小的学习率来细致地逼近全局最小值。
4 训练模型并保存参数
接下来我们就可以开始进行训练了:
# 模型参数的保存路径
filename = './data/CNN/best.pt'
def train_model(model, dataloaders, criterion, optimizer, epochs, filename):
# 记录测试集中最高的准确率
best_acc = 0
# 记录训练中在测试集上准确率最高的模型参数
best_model_wts = {}
# 记录训练过程中训练集和验证集的历史损失和准确率
train_losses_history = []
train_acc_history = []
valid_losses_history = []
val_acc_history = []
# 初始化了一个列表 LRs,用于存储每个训练周期(epoch)开始时的学习率
# 该行代码获取的是优化器(optimizer)中第一个参数组的初始学习率
LRs = [optimizer.param_groups[0]['lr']]
# 获取 Unix 时间戳,记录开始训练的时间
since = time.time()
# 开始迭代训练,每个 epoch 先跑训练集再跑测试集
for epoch in range(epochs):
print('Epoch {}/{}'.format(epoch, epochs - 1))
print('-' * 10)
# 训练和验证
for phase in ['train', 'valid']:
if phase == 'train':
model.train() # 训练(默认行为,可以不需要显示的调用 model.train())
else:
model.eval() # 验证
running_loss = 0.0
running_corrects = 0
# 每次取一个 batch,直到跑完一个 epoch
for inputs, labels in dataloaders[phase]:
# 放到 CPU 或 GPU
inputs = inputs.to(device)
labels = labels.to(device)
# 清零
optimizer.zero_grad()
# 只有训练的时候计算和更新梯度
outputs = model(inputs)
loss = criterion(outputs, labels) # 注意这里的 loss 所有样本损失的平均值
# 获取模型输出中每个样本最可能属于的类别
_, preds = torch.max(outputs, 1)
# 训练阶段更新权重
if phase == 'train':
loss.backward()
optimizer.step()
# 对每个 batch 的损失和正确总数进行累加
running_loss += loss.item() * inputs.size(0) # 得到整个批次的总损失,其中 0 表示 batch 那个维度
running_corrects += torch.sum(preds == labels.data) # 预测结果最大的和真实值是否一致
# 计算每个 epoch 的平均损失和准确率
epoch_loss = running_loss / len(dataloaders[phase].dataset)
epoch_acc = running_corrects / len(dataloaders[phase].dataset)
# 到目前为止已经花费的时间
time_elapsed = time.time() - since
# 打印花费的时间和训练集或测试集的损失和准确率
print('Time elapsed {:0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))
# 保存验证集上准确率最高的模型参数和准确率
if phase == 'valid' and epoch_acc > best_acc:
best_acc = epoch_acc
best_model_wts = model.state_dict()
state = { # 字典里 Key 就是各层的名字,值就是训练好的权重
'state_dict': model.state_dict(),
'best_acc': best_acc,
# 为了在需要时能够完全恢复训练过程,包括模型参数和优化器内部的
# 状态,我们也需要保存优化器的状态
'optimizer': optimizer.state_dict(),
}
torch.save(state, filename)
# 保存训练集和验证集的历史损失和准确率
if phase == 'train':
train_acc_history.append(epoch_acc)
train_losses_history.append(epoch_loss)
if phase == 'valid':
val_acc_history.append(epoch_acc)
valid_losses_history.append(epoch_loss)
# 在每个训练周期结束时,将当前的学习率添加到 LRs 列表中,以便跟踪学习率的变化
print('Optimizer learning rate : {:.7f}'.format(optimizer.param_groups[0]['lr']))
LRs.append(optimizer.param_groups[0]['lr'])
# 调用学习率调度器,在每个训练周期结束后更新学习率
scheduler.step()
# 打印花费的总时间以及最高的准确率
time_elapsed = time.time() - since
print('Training compete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
print('Best val Acc: {:4f}'.format(best_acc))
# 训练完后用最好的一次当做模型的最终测试结果,等着一会测试
model.load_state_dict(best_model_wts)
return model, val_acc_history, train_acc_history, valid_losses_history, train_losses_history, LRs
# 开始训练
model_ft, val_acc_history, train_acc_history, valid_losses_history, train_losses_history, LRs = \
train_model(model_ft, dataloaders, criterion, optimizer_ft, epochs, filename)
训练完成后的结果如下:
迭代 30 个 epoch 后,在训练集上的准确率达到 67.55%,在验证集上的准确率达到 40.71%,模型训练中,在验证集上出现的最高准确率为 40.8313%,这是我们只训练自己定义的全连接层后的效果,接下来我们解冻其它层的所有参数,继续训练。
5 解冻其他层参数结合 fc 层训练好的参数一起重新训练
# 设置更新模型的所有参数
for param in model_ft.parameters():
param.requires_grad = True
# 设置模型的超参数
epochs = 30 # 迭代次数
optimizer_ft = optim.Adam(model_ft.parameters(), lr=1e-3) # 优化器
scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1) # 学习率调小一点
criterion = nn.CrossEntropyLoss() # 损失函数
# 加载之前训练好的权重参数
checkpoint = torch.load(filename)
best_acc = checkpoint['best_acc']
model_ft.load_state_dict(checkpoint['state_dict'])
# 再次开始重新训练
model_ft, val_acc_history, train_acc_history, valid_losses_history, train_losses_history, LRs = \
train_model(model_ft, dataloaders, criterion, optimizer_ft, epochs, filename)
训练的结果如下:
我们可以看到,在解冻其他层的所有参数后进行训练,模型的准确率有了较大的提升,在训练集上达到了 98.03%,测试集上达到了 72.25%,在测试集上最高的准确率达到了 73.3496%,最后我们加载训练好的模型参数用来进行预测。
6 加载模型和训练好的参数用于预测
接下来我们随机从 dataloader 中得到一个 batch 的测试数据用于预测:
# 如有 GPU 则用 GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
# 初始化模型并将模型加载到 GPU 或 CPU
model_ft = initialize_model(model_name, num_classes, feature_extract, use_pretrained=True).to(device)
# 加载模型参数
checkpoint = torch.load('./data/CNN/best.pt')
best_acc = checkpoint['best_acc']
model_ft.load_state_dict(checkpoint['state_dict'])
# 将模型设置为评估模式
model_ft.eval()
# 从 dataloader 中随机得到一个 batch 的测试数据
dataiter = iter(dataloaders['valid'])
images, labels = dataiter.next()
# 将数据传入到 GPU 或 CPU
train_on_gpu = torch.cuda.is_available()
if train_on_gpu:
output = model_ft(images.cuda())
else:
output = model_ft(images)
# 从预测值中获得每行最大值及其索引
_, preds_tensor = torch.max(output, 1)
# squeeze 用于去除 numpy 数组中所有长度为1的维度(这里将一个二维数组 (batch, 1) 转换为一个一维数组)
preds = np.squeeze(preds_tensor.numpy()) if not train_on_gpu \
else np.squeeze(preds_tensor.cpu().numpy())
# 图像张量转换函数
def im_convert(tensor):
# 将张量移动到 CPU,创建一个副本,并从计算图中分离,使其可以转换为 NumPy 数组
image = tensor.to("cpu").clone().detach()
# 将张量转换为 NumPy 数组,并去除长度为1的维度
image = image.numpy().squeeze()
# 调整数组的维度顺序,以匹配图像显示库的期望格式(通常是高度、宽度、通道)
image = image.transpose(1, 2, 0) # torch 中 3*64*64 是(0, 1, 2),其他工具包通道顺序不一样
# 对图像进行反标准化,将其转换回原始的像素值范围。这里使用的是 ImageNet 数据集的标准差和均值(标准化是RGB值-均值再除以标准差)
image = image * np.array((0.229, 0.224, 0.225)) + np.array((0.485, 0.456, 0.406))
# 将图像的像素值限制在 [0, 1] 范围内,确保它们是有效的像素值
image = image.clip(0, 1)
return image
# 创建一个大小为 20x20 英寸的图表
fig = plt.figure(figsize=(20, 20))
# 设置图表的列数和行数
columns = 4
rows = 2
# 读取标签对应的实际名字
with open('./data/CNN/cat_to_name.json', 'r') as f:
cat_to_name = json.load(f)
# 将 tensor 图像转换格式后添加到子图中
for idx in range (columns * rows):
# 在图表中添加一个子图,并移除 x 和 y 轴的刻度
ax = fig.add_subplot(rows, columns, idx+1, xticks=[], yticks=[])
# 使用 im_convert 函数将图像张量转换为显示格式,并显示在子图上
plt.imshow(im_convert(images[idx]))
# 设置子图的标题,显示预测的类别和实际的类别。如果预测正确,则标题颜色为绿色;如果预测错误,则标题颜色为红色
ax.set_title("{} ({})".format(cat_to_name[str(preds[idx])], cat_to_name[str(labels[idx].item())]),
color=("green" if cat_to_name[str(preds[idx])] == cat_to_name[str(labels[idx].item())] else "red"))
# 显示图表,展示所有图像及其预测和实际类别
plt.show()
预测的结果如下: