本学习笔记来源于B站:04-1 轻松学PyTorch 肺部感染识别 简介 中第04-1到04-6这六个视频。
在本预训练-微调代码中,重点要学习的内容包括:加载官方提供的经典网络架构resnet50和对应的预训练模型,对ResNet最后的两层网络(池化层和全连接层)进行修改,改为适合自己任务的网络架构。对网络内容改动较大,值得学习和借鉴的地方很多。代码中主要改动了三个位置。
改动位置一:
from torchvision import datasets, transforms, utils, models
改动位置二:
# 7 迁移学习:拿到一个成熟的模型,进行模型微调
def get_model():
model_pre = models.resnet50(pretrained=True) # 获取预训练模型
# 冻结预训练模型中所有的参数(不进行训练)
for param in model_pre.parameters():
param.requires_grad = False
# 微调模型:替换 ResNet最后的两层网络,返回一个新的模型
model_pre.avgpool = AdaptiveConcatPool2d() # 池化层替换
model_pre.fc = nn.Sequential( # 改变全连接层
nn.Flatten(), # 所有维度拉平
nn.BatchNorm1d(4096), # 256 x 6 x 6 ——> 4096
nn.Dropout(0.5), # 丢掉一些神经元
nn.Linear(4096, 512), # 线性层的处理
nn.ReLU(), # 激活层
nn.BatchNorm1d(512), # 正则化处理
nn.Linear(512,2), # 分类层
nn.LogSoftmax(dim=1), # 损失函数
)
return model_pre
改动位置三:
# 8 模型微调
# 定义一个池化层处理函数
class AdaptiveConcatPool2d(nn.Module):
def __init__(self, size=None):
super().__init__()
size = size or (1, 1) # 池化层的卷积核大小,默认值为(1,1)
self.pool_one = nn.AdaptiveAvgPool2d(size) # 池化层 1
self.pool_two = nn.AdaptiveMaxPool2d(size) # 池化层 2
def forward(self, x):
return torch.cat([self.pool_one(x), self.pool_two(x)], 1)
# 连接两个池化层,1表示维度(列),表示上下的这种连接
案例的主要流程为:
第一步:加载预训练模型ResNet,该模型已在ImageNet上训练过。
第二步:冻结预训练模型中低层卷积层的参数(权重)。
第三步:用可训练参数的多层替换分类层。
第四步:在训练集上训练分类层。
第五步:微调超参数,根据需要解冻更多层。
肺部感染识别案例的具体流程为:
完整代码如下:
# 1 加入必要的库
import torch
import torch.nn as nn
import numpy as np
import torch.optim as optim
from torchvision import datasets, transforms, utils, models # 改动位置一
import time
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader
from torch.utils.tensorboard.writer import SummaryWriter
import os
import torchvision
# 2 加载数据集
# 2.1 图像变化设置
data_transforms = {
"train":
transforms.Compose([
transforms.RandomResizedCrop(300),
transforms.RandomHorizontalFlip(),
transforms.CenterCrop(256),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
]),
"val":
transforms.Compose([
transforms.Resize(300),
transforms.CenterCrop(256),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406],
[0.229, 0.224, 0.225])
]),
'test':
transforms.Compose([
transforms.Resize(size=300),
transforms.CenterCrop(size=256),
transforms.ToTensor(),
transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224,
0.225])
]),
}
# 3 可视化图片
def imshow(inp, title=None):
inp = inp.numpy().transpose((1, 2, 0))
mean = np.array([0.485, 0.456, 0.406])
std = np.array([0.229, 0.224, 0.225])
inp = std * inp + mean
inp = np.clip(inp, 0, 1)
plt.imshow(inp)
if title is not None:
plt.title(title)
plt.pause(0.001)
# 6 可视化模型预测
def visualize_model(model, num_images=6):
was_training = model.training
model.eval()
images_so_far = 0
fig = plt.figure()
with torch.no_grad():
for i, (datas, targets) in enumerate(dataloaders['val']):
datas, targets = datas.to(device), targets.to(device)
outputs = model(datas) # 预测数据
_, preds = torch.max(outputs, 1) # 获取每行数据的最大值
for j in range(datas.size()[0]):
images_so_far += 1 # 累计图片数量
ax = plt.subplot(num_images // 2, 2, images_so_far) # 显示图片
ax.axis('off') # 关闭坐标轴
ax.set_title('predicted:{}'.format(class_names[preds[j]]))
imshow(datas.cpu().data[j])
if images_so_far == num_images:
model.train(mode=was_training)
return
model.train(mode=was_training)
# 9 定义训练函数
def train(model, device, train_loader, criterion, optimizer, epoch, writer):
model.train()
total_loss = 0.0
# 循环读取训练数据,更新模型参数
for batch_id, (data, target) in enumerate(train_loader):
data, target = data.to(device), target.to(device)
optimizer.zero_grad() # 梯度初始化为零
output = model(data) # 训练后的输出
loss = criterion(output, target) # 计算损失
loss.backward() # 反向传播
optimizer.step() # 参数更新
total_loss += loss.item() # 累计训练损失
# 写入日志
writer.add_scalar('Train Loss', total_loss / len(train_loader), epoch)
writer.flush() # 刷新
return total_loss / len(train_loader) # 返回平均损失值
# 10 定义测试函数
def test(model, device, test_loader, criterion, epoch, writer):
model.eval()
total_loss = 0.0
correct = 0.0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
total_loss += criterion(output, target).item()
_, preds = torch.max(output, dim=1) # 获取预测结果中每行数据概率最大的下标
correct += torch.sum(preds == target.data) # 累计预测正确的个数
total_loss /= len(test_loader)
accuracy = correct / len(test_loader.dataset)
# 写入日志
writer.add_scalar('Test Loss', total_loss, epoch)
writer.add_scalar('Accuracy', accuracy, epoch)
writer.flush()
# 输出信息
print("Test Loss : {:.4f}, Accuracy : {:.4f}".format(total_loss, accuracy))
# {:.4f}:表示保留小数点后 4位
return total_loss, accuracy
# 定义函数,获取 Tensorboard的 writer
def tb_writer():
timestr = time.strftime("%Y%m%d_%H%M%S")
writer = SummaryWriter('logdir/' + timestr)
return writer
# 8 模型微调 # 改动位置三
# 定义一个池化层处理函数
class AdaptiveConcatPool2d(nn.Module):
def __init__(self, size=None):
super().__init__()
size = size or (1, 1) # 池化层的卷积核大小,默认值为(1,1)
self.pool_one = nn.AdaptiveAvgPool2d(size) # 池化层 1
self.pool_two = nn.AdaptiveMaxPool2d(size) # 池化层 2
def forward(self, x):
return torch.cat([self.pool_one(x), self.pool_two(x)], 1)
# 连接两个池化层,1表示维度(列),表示上下的这种连接
# 7 迁移学习:拿到一个成熟的模型,进行模型微调 # 改动位置二
def get_model():
model_pre = models.resnet50(pretrained=True) # 获取预训练模型
# 冻结预训练模型中所有的参数(不进行训练)
for param in model_pre.parameters():
param.requires_grad = False
# 微调模型:替换 ResNet最后的两层网络,返回一个新的模型
model_pre.avgpool = AdaptiveConcatPool2d() # 池化层替换
model_pre.fc = nn.Sequential( # 改变全连接层
nn.Flatten(), # 所有维度拉平
nn.BatchNorm1d(4096), # 256 x 6 x 6 ——> 4096
nn.Dropout(0.5), # 丢掉一些神经元
nn.Linear(4096, 512), # 线性层的处理
nn.ReLU(), # 激活层
nn.BatchNorm1d(512), # 正则化处理
nn.Linear(512,2), # 分类层
nn.LogSoftmax(dim=1), # 损失函数
)
return model_pre
# 返回一个训练过后最好的模型
def train_epochs(model, device, dataloaders, criterion, optimizer, num_epochs, writer):
print("{0:>20} | {1:>20} | {2:>20} | {3:>20} |".format('Epoch', 'Training Loss', 'Test Loss', 'Accuracy'))
best_score = np.inf # 假设最好的预测值
start = time.time() # 开始时间
# 开始循环读取数据进行训练和验证
for epoch in num_epochs:
train_loss = train(model, device, dataloaders['train'], criterion, optimizer, epoch, writer)
test_loss, accuracy = test(model, device, dataloaders['val'], criterion, epoch, writer)
if test_loss < best_score:
best_score = test_loss
torch.save(model.state_dict(), model_path)
print("{0:>20} | {1:>20} | {2:>20} | {3:>20.2f} |".format(epoch, train_loss, test_loss, accuracy))
writer.flush()
time_all = time.time() - start
print("Training complete in {:.2f}m {:.2f}s".format(time_all // 60, time_all % 60))
def misclassified_images(pred, writer, target, data, output, epoch, count=10):
misclassified = (pred != target.data)
for index, image_tensor in enumerate(data[misclassified][:count]):
# 显示预测不同的前 10张图片
img_name = '{}->Predict-{}x{}-Actual'.format(
epoch,
LABEL[pred[misclassified].tolist()[index]],
LABEL[target.data[misclassified].tolist()[index]],
)
writer.add_image(img_name, inv_normalize(image_tensor), epoch)
# 9 训练和验证
# 定义超参数
model_path = 'model.pth'
batch_size = 16
device = torch.device("cuda:0" if torch.cuda.is_available() else 'cpu')
# 2.2 加载数据
data_path = "./chest_xray"
# 2.2.1 加载数据集
image_datasets = {x: datasets.ImageFolder(os.path.join(data_path, x),
data_transforms[x]) for x in ['train', 'val', 'test']}
# 2.2.2 为数据集创建 iterator
dataloaders = {x: DataLoader(image_datasets[x], batch_size=batch_size, shuffle=True) for x in ['train', 'val', 'test']}
# 2.2.3 训练集和验证集的大小
data_sizes = {x: len(image_datasets[x]) for x in ['train', 'val', 'test']}
# 2.2.4 训练集所对应的标签
class_names = image_datasets['train'].classes
LABEL = dict((v, k) for k, v in image_datasets['train'].class_to_idx.items())
print("-" * 50)
# 4 获取 trian中的一批数据
datas, targets = next(iter(dataloaders['train']))
# 5 显示这批数据
out = torchvision.utils.make_grid(datas)
imshow(out, title=[class_names[x] for x in targets])
# 将 tensor转换为 image
inv_normalize = transforms.Normalize(
mean=[-0.485 / 0.229, -0.456 / 0.224, -0.406 / 0.225],
std=[1 / 0.229, 1 / 0.224, 1 / 0.255]
)
writer = tb_writer()
images, labels = next(iter(dataloaders['train'])) # 获取一批数据
grid = torchvision.utils.make_grid([inv_normalize(image) for image in images[:32]]) # 读取 32张图片
writer.add_image('X-Ray grid', grid, 0)
writer.flush()
model = get_model().to(device)
criterion = nn.NLLLoss()
optimizer = optim.Adam(model.parameters())
train_epochs(model, device, dataloaders, criterion, optimizer, range(0, 10), writer)
writer.close()
标签:肺部,nn,writer,笔记,学习,train,transforms,model,data
From: https://blog.csdn.net/zly19980718/article/details/143773680