(一)选题背景:
本来比较讨厌吸烟这种危害健康的行为。
本次课程设计选择识别行人是否抽烟,该任务可以应用在一些禁止吸烟的地方,以达到对人群进行检测的目的。
(二)机器学习设计案例设计方案:
1.本选题采用的机器学习案例(训练集与测试集)的来源描述
该数据集来自阿里云天池
2 采用的机器学习框架描述
采用MobileNetV2轻量级网络作为backbone网络。MobileNetV2是一个轻量级的网络,在该网络的设计中使用了Expansion layer和 Projection layer。这个projection layer使用 1*1 的网络结构,目的是希望把高维特征映射到低维空间去。Expansion layer的功能正相反,使用 1*1 的网络结构,目的是将低维空间映射到高维空间。Expansion有一个超参数是维度扩展几倍。可以根据实际情况来做调整的,默认值是6,也就是扩展6倍。整个模块的结构如图1所示。
图1 MobileNetV2模块结构图
图中输入是24维,最后输出也是24维。但这个过程中,通道数扩展了6倍,然后应用深度可分离卷积进行处理。整个网络是中间胖,两头窄,像一个纺锤形。bottleneck residual block(ResNet论文中的)是中间窄两头胖,在MobileNetV2中正好反了过来,所以,在MobileNetV2的论文中称这样的网络结构为Inverted residuals。需要注意的是residual connection是在输入和输出的部分进行连接。因为从高维向低维转换,使用ReLU激活函数可能会造成信息丢失或破坏(不使用非线性激活数数)。所以在projection convolution这一部分,不再使用ReLU激活函数而是使用线性激活函数。
先通过Expansion layer来扩展维度,之后在用深度可分离卷积来提取特征,之后使用Projection layer来压缩数据,让网络从新变小。因为Expansion layer 和 Projection layer都是有可以学习的参数,所以整个网络结构可以学习到如何更好的扩展数据和从新压缩数据。
(三)机器学习的实现步骤
下载的数据集
(1)生成数据集信息
import os from os import getcwd from utils.utils import get_classes classes_path = 'model_data/cls_classes.txt' datasets_path = 'datasets' sets = ["train", "test"] classes, _ = get_classes(classes_path) if __name__ == "__main__": for se in sets: list_file = open('cls_' + se + '.txt', 'w') datasets_path_t = os.path.join(datasets_path, se) types_name = os.listdir(datasets_path_t) for type_name in types_name: if type_name not in classes: continue cls_id = classes.index(type_name) photos_path = os.path.join(datasets_path_t, type_name) photos_name = os.listdir(photos_path) for photo_name in photos_name: _, postfix = os.path.splitext(photo_name) if postfix not in ['.jpg', '.png', '.jpeg']: continue list_file.write(str(cls_id) + ";" + '%s'%(os.path.join(photos_path, photo_name))) list_file.write('\n') list_file.close()
(2)数据增强
数据增强也叫数据扩增,意思是在不实质性的增加数据的情况下,让有限的数据产生等价于更多数据的价值。数据增强可以分为,有监督的数据增强和无监督的数据增强方法。其中有监督的数据增强又可以分为单样本数据增强和多样本数据增强方法,无监督的数据增强分为生成新的数据和学习增强策略两个方向。
由于数据集的数据量比较大,仅使用随即裁剪、翻转图像等简单的数据增强方式,为了节省磁盘空间,使用在线数据增强,即数据增强后的数据直接喂入网络进行训练,不保留在本地。
代码实现:
def AutoAugment(self, image, random=True): if not random: image = self.resize(image) image = self.center_crop(image) return image #------------------------------------------# # resize并且随即裁剪 #------------------------------------------# image = self.resize_crop(image) #------------------------------------------# # 翻转图像 #------------------------------------------# flip = self.rand()<.5 if flip: image = image.transpose(Image.FLIP_LEFT_RIGHT) #------------------------------------------# # 随机增强 #------------------------------------------# image = self.policy(image) return image def center_crop(img, output_size): if isinstance(output_size, numbers.Number): output_size = (int(output_size), int(output_size)) w, h = img.size th, tw = output_size i = int(round((h - th) / 2.)) j = int(round((w - tw) / 2.)) return crop(img, i, j, th, tw) class RandomResizedCrop(object): """Crop the given PIL Image to random size and aspect ratio. A crop of random size (default: of 0.08 to 1.0) of the original size and a random aspect ratio (default: of 3/4 to 4/3) of the original aspect ratio is made. This crop is finally resized to given size. This is popularly used to train the Inception networks. Args: size: expected output size of each edge scale: range of size of the origin size cropped ratio: range of aspect ratio of the origin aspect ratio cropped interpolation: Default: PIL.Image.BILINEAR """ def __init__(self, size, scale=(0.08, 1.0), ratio=(3. / 4., 4. / 3.), interpolation=Image.BILINEAR): self.size = size if (scale[0] > scale[1]) or (ratio[0] > ratio[1]): warnings.warn("range should be of kind (min, max)") self.interpolation = interpolation self.scale = scale self.ratio = ratio @staticmethod def get_params(img, scale, ratio): """Get parameters for ``crop`` for a random sized crop. Args: img (PIL Image): Image to be cropped. scale (tuple): range of size of the origin size cropped ratio (tuple): range of aspect ratio of the origin aspect ratio cropped Returns: tuple: params (i, j, h, w) to be passed to ``crop`` for a random sized crop. """ area = img.size[0] * img.size[1] for attempt in range(10): target_area = random.uniform(*scale) * area log_ratio = (math.log(ratio[0]), math.log(ratio[1])) aspect_ratio = math.exp(random.uniform(*log_ratio)) w = int(round(math.sqrt(target_area * aspect_ratio))) h = int(round(math.sqrt(target_area / aspect_ratio))) if w <= img.size[0] and h <= img.size[1]: i = random.randint(0, img.size[1] - h) j = random.randint(0, img.size[0] - w) return i, j, h, w # Fallback to central crop in_ratio = img.size[0] / img.size[1] if (in_ratio < min(ratio)): w = img.size[0] h = int(round(w / min(ratio))) elif (in_ratio > max(ratio)): h = img.size[1] w = int(round(h * max(ratio))) else: # whole image w = img.size[0] h = img.size[1] i = (img.size[1] - h) // 2 j = (img.size[0] - w) // 2 return i, j, h, w def __call__(self, img): """ Args: img (PIL Image): Image to be cropped and resized. Returns: PIL Image: Randomly cropped and resized image. """ i, j, h, w = self.get_params(img, self.scale, self.ratio) return resized_crop(img, i, j, h, w, self.size, self.interpolation)
(3)显示网络结构
import torch from thop import clever_format, profile from torchsummary import summary from nets import get_model_from_name if __name__ == "__main__": input_shape = [224, 224] num_classes = 1000 backbone = "mobilenetv2" device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model = get_model_from_name[backbone](num_classes=num_classes, pretrained=False).to(device) summary(model, (3, input_shape[0], input_shape[1])) dummy_input = torch.randn(1, 3, input_shape[0], input_shape[1]).to(device) flops, params = profile(model.to(device), (dummy_input, ), verbose=False) flops = flops * 2 flops, params = clever_format([flops, params], "%.3f") print('Total GFLOPS: %s' % (flops)) print('Total params: %s' % (params))
(3)训练实现流程
首先设置一些训练中需要用到的参数,如是否使用GPU,是否用多张卡进行训练,骨干网络,是否使用预训练,epoch,batch_size、学习率、优化器、学习率下降策略等参数。
在开始训练前,首先使用DataLoader函数读取训练集和验证集,然后对构造的网络结构进行实例化,然后对模型进行训练。训练时每一轮都需要先清零梯度,然后进行前向传播,计算损失,计算准确率,最后保存权重文件。
代码如下:
import os from threading import local import torch import torch.nn.functional as F from torch import nn from tqdm import tqdm from .utils import get_lr def fit_one_epoch(model_train, model, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, Epoch, cuda, fp16, scaler, save_period, save_dir, local_rank=0): total_loss = 0 total_accuracy = 0 val_loss = 0 val_accuracy = 0 if local_rank == 0: print('Start Train') pbar = tqdm(total=epoch_step,desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) model_train.train() for iteration, batch in enumerate(gen): if iteration >= epoch_step: break images, targets = batch with torch.no_grad(): if cuda: images = images.cuda(local_rank) targets = targets.cuda(local_rank) optimizer.zero_grad() if not fp16: outputs = model_train(images) loss_value = nn.CrossEntropyLoss()(outputs, targets) loss_value.backward() optimizer.step() else: from torch.cuda.amp import autocast with autocast(): outputs = model_train(images) loss_value = nn.CrossEntropyLoss()(outputs, targets) scaler.scale(loss_value).backward() scaler.step(optimizer) scaler.update() total_loss += loss_value.item() with torch.no_grad(): accuracy = torch.mean((torch.argmax(F.softmax(outputs, dim=-1), dim=-1) == targets).type(torch.FloatTensor)) total_accuracy += accuracy.item() if local_rank == 0: pbar.set_postfix(**{'total_loss': total_loss / (iteration + 1), 'accuracy' : total_accuracy / (iteration + 1), 'lr' : get_lr(optimizer)}) pbar.update(1) if local_rank == 0: pbar.close() print('Finish Train') print('Start Validation') pbar = tqdm(total=epoch_step_val, desc=f'Epoch {epoch + 1}/{Epoch}',postfix=dict,mininterval=0.3) model_train.eval() for iteration, batch in enumerate(gen_val): if iteration >= epoch_step_val: break images, targets = batch with torch.no_grad(): if cuda: images = images.cuda(local_rank) targets = targets.cuda(local_rank) optimizer.zero_grad() outputs = model_train(images) loss_value = nn.CrossEntropyLoss()(outputs, targets) val_loss += loss_value.item() accuracy = torch.mean((torch.argmax(F.softmax(outputs, dim=-1), dim=-1) == targets).type(torch.FloatTensor)) val_accuracy += accuracy.item() if local_rank == 0: pbar.set_postfix(**{'total_loss': val_loss / (iteration + 1), 'accuracy' : val_accuracy / (iteration + 1), 'lr' : get_lr(optimizer)}) pbar.update(1) if local_rank == 0: pbar.close() print('Finish Validation') loss_history.append_loss(epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val) print('Epoch:' + str(epoch + 1) + '/' + str(Epoch)) print('Total Loss: %.3f || Val Loss: %.3f ' % (total_loss / epoch_step, val_loss / epoch_step_val)) if (epoch + 1) % save_period == 0 or epoch + 1 == Epoch: torch.save(model.state_dict(), os.path.join(save_dir, "ep%03d-loss%.3f-val_loss%.3f.pth" % (epoch + 1, total_loss / epoch_step, val_loss / epoch_step_val))) if len(loss_history.val_loss) <= 1 or (val_loss / epoch_step_val) <= min(loss_history.val_loss): print('Save best model to best_epoch_weights.pth') torch.save(model.state_dict(), os.path.join(save_dir, "best_epoch_weights.pth")) torch.save(model.state_dict(), os.path.join(save_dir, "last_epoch_weights.pth")) import os import numpy as np import torch import torch.backends.cudnn as cudnn import torch.distributed as dist import torch.nn as nn import torch.optim as optim from torch.utils.data import DataLoader from nets import get_model_from_name from utils.callbacks import LossHistory from utils.dataloader import DataGenerator, detection_collate from utils.utils import (download_weights, get_classes, get_lr_scheduler, set_optimizer_lr, show_config, weights_init) from utils.utils_fit import fit_one_epoch if __name__ == "__main__": Cuda = True distributed = False sync_bn = False fp16 = False classes_path = 'model_data/cls_classes.txt' input_shape = [224, 224] backbone = "vgg16" pretrained = True model_path = "" Init_Epoch = 0 Freeze_Epoch = 50 Freeze_batch_size = 32 UnFreeze_Epoch = 200 Unfreeze_batch_size = 32 Freeze_Train = True Init_lr = 1e-2 Min_lr = Init_lr * 0.01 optimizer_type = "sgd" momentum = 0.9 weight_decay = 5e-4 lr_decay_type = "cos" save_period = 10 save_dir = 'logs' num_workers = 4 train_annotation_path = "cls_train.txt" test_annotation_path = 'cls_test.txt' ngpus_per_node = torch.cuda.device_count() if distributed: dist.init_process_group(backend="nccl") local_rank = int(os.environ["LOCAL_RANK"]) rank = int(os.environ["RANK"]) device = torch.device("cuda", local_rank) if local_rank == 0: print(f"[{os.getpid()}] (rank = {rank}, local_rank = {local_rank}) training...") print("Gpu Device Count : ", ngpus_per_node) else: device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') local_rank = 0 rank = 0 if pretrained: if distributed: if local_rank == 0: download_weights(backbone) dist.barrier() else: download_weights(backbone) class_names, num_classes = get_classes(classes_path) if backbone not in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: model = get_model_from_name[backbone](num_classes = num_classes, pretrained = pretrained) else: model = get_model_from_name[backbone](input_shape = input_shape, num_classes = num_classes, pretrained = pretrained) if not pretrained: weights_init(model) if model_path != "": if local_rank == 0: print('Load weights {}.'.format(model_path)) model_dict = model.state_dict() pretrained_dict = torch.load(model_path, map_location = device) load_key, no_load_key, temp_dict = [], [], {} for k, v in pretrained_dict.items(): if k in model_dict.keys() and np.shape(model_dict[k]) == np.shape(v): temp_dict[k] = v load_key.append(k) else: no_load_key.append(k) model_dict.update(temp_dict) model.load_state_dict(model_dict) if local_rank == 0: print("\nSuccessful Load Key:", str(load_key)[:500], "……\nSuccessful Load Key Num:", len(load_key)) print("\nFail To Load Key:", str(no_load_key)[:500], "……\nFail To Load Key num:", len(no_load_key)) print("\n\033[1;33;44m温馨提示,head部分没有载入是正常现象,Backbone部分没有载入是错误的。\033[0m") if local_rank == 0: loss_history = LossHistory(save_dir, model, input_shape=input_shape) else: loss_history = None if fp16: from torch.cuda.amp import GradScaler as GradScaler scaler = GradScaler() else: scaler = None model_train = model.train() if sync_bn and ngpus_per_node > 1 and distributed: model_train = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model_train) elif sync_bn: print("Sync_bn is not support in one gpu or not distributed.") if Cuda: if distributed: #----------------------------# # 多卡平行运行 #----------------------------# model_train = model_train.cuda(local_rank) model_train = torch.nn.parallel.DistributedDataParallel(model_train, device_ids=[local_rank], find_unused_parameters=True) else: model_train = torch.nn.DataParallel(model) cudnn.benchmark = True model_train = model_train.cuda() with open(train_annotation_path, encoding='utf-8') as f: train_lines = f.readlines() with open(test_annotation_path, encoding='utf-8') as f: val_lines = f.readlines() num_train = len(train_lines) num_val = len(val_lines) np.random.seed(10101) np.random.shuffle(train_lines) np.random.seed(None) if local_rank == 0: show_config( num_classes = num_classes, backbone = backbone, model_path = model_path, input_shape = input_shape, \ Init_Epoch = Init_Epoch, Freeze_Epoch = Freeze_Epoch, UnFreeze_Epoch = UnFreeze_Epoch, Freeze_batch_size = Freeze_batch_size, Unfreeze_batch_size = Unfreeze_batch_size, Freeze_Train = Freeze_Train, \ Init_lr = Init_lr, Min_lr = Min_lr, optimizer_type = optimizer_type, momentum = momentum, lr_decay_type = lr_decay_type, \ save_period = save_period, save_dir = save_dir, num_workers = num_workers, num_train = num_train, num_val = num_val ) wanted_step = 3e4 if optimizer_type == "sgd" else 1e4 total_step = num_train // Unfreeze_batch_size * UnFreeze_Epoch if total_step <= wanted_step: wanted_epoch = wanted_step // (num_train // Unfreeze_batch_size) + 1 print("\n\033[1;33;44m[Warning] 使用%s优化器时,建议将训练总步长设置到%d以上。\033[0m"%(optimizer_type, wanted_step)) print("\033[1;33;44m[Warning] 本次运行的总训练数据量为%d,Unfreeze_batch_size为%d,共训练%d个Epoch,计算出总训练步长为%d。\033[0m"%(num_train, Unfreeze_batch_size, UnFreeze_Epoch, total_step)) print("\033[1;33;44m[Warning] 由于总训练步长为%d,小于建议总步长%d,建议设置总世代为%d。\033[0m"%(total_step, wanted_step, wanted_epoch)) if True: UnFreeze_flag = False if Freeze_Train: model.freeze_backbone() batch_size = Freeze_batch_size if Freeze_Train else Unfreeze_batch_size nbs = 64 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 lr_limit_min = 1e-4 if optimizer_type == 'adam' else 5e-4 if backbone in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: nbs = 256 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 lr_limit_min = 1e-5 if optimizer_type == 'adam' else 5e-4 Init_lr_fit = min(max(batch_size / nbs * Init_lr, lr_limit_min), lr_limit_max) Min_lr_fit = min(max(batch_size / nbs * Min_lr, lr_limit_min * 1e-2), lr_limit_max * 1e-2) optimizer = { 'adam' : optim.Adam(model_train.parameters(), Init_lr_fit, betas = (momentum, 0.999), weight_decay=weight_decay), 'sgd' : optim.SGD(model_train.parameters(), Init_lr_fit, momentum = momentum, nesterov=True) }[optimizer_type] lr_scheduler_func = get_lr_scheduler(lr_decay_type, Init_lr_fit, Min_lr_fit, UnFreeze_Epoch) epoch_step = num_train // batch_size epoch_step_val = num_val // batch_size if epoch_step == 0 or epoch_step_val == 0: raise ValueError("数据集过小,无法继续进行训练,请扩充数据集。") train_dataset = DataGenerator(train_lines, input_shape, True) val_dataset = DataGenerator(val_lines, input_shape, False) if distributed: train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, shuffle=True,) val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, shuffle=False,) batch_size = batch_size // ngpus_per_node shuffle = False else: train_sampler = None val_sampler = None shuffle = True gen = DataLoader(train_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, drop_last=True, collate_fn=detection_collate, sampler=train_sampler) gen_val = DataLoader(val_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, drop_last=True, collate_fn=detection_collate, sampler=val_sampler) for epoch in range(Init_Epoch, UnFreeze_Epoch): if epoch >= Freeze_Epoch and not UnFreeze_flag and Freeze_Train: batch_size = Unfreeze_batch_size nbs = 64 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 lr_limit_min = 1e-4 if optimizer_type == 'adam' else 5e-4 if backbone in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: nbs = 256 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 lr_limit_min = 1e-5 if optimizer_type == 'adam' else 5e-4 Init_lr_fit = min(max(batch_size / nbs * Init_lr, lr_limit_min), lr_limit_max) Min_lr_fit = min(max(batch_size / nbs * Min_lr, lr_limit_min * 1e-2), lr_limit_max * 1e-2) lr_scheduler_func = get_lr_scheduler(lr_decay_type, Init_lr_fit, Min_lr_fit, UnFreeze_Epoch) model.Unfreeze_backbone() epoch_step = num_train // batch_size epoch_step_val = num_val // batch_size if epoch_step == 0 or epoch_step_val == 0: raise ValueError("数据集过小,无法继续进行训练,请扩充数据集。") if distributed: batch_size = batch_size // ngpus_per_node gen = DataLoader(train_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, drop_last=True, collate_fn=detection_collate, sampler=train_sampler) gen_val = DataLoader(val_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, drop_last=True, collate_fn=detection_collate, sampler=val_sampler) UnFreeze_flag = True if distributed: train_sampler.set_epoch(epoch) set_optimizer_lr(optimizer, lr_scheduler_func, epoch) fit_one_epoch(model_train, model, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank) if local_rank == 0: loss_history.writer.close()
(5)预测
代码:
import os import numpy as np import torch from classification import (Classification, cvtColor, letterbox_image, preprocess_input) from utils.utils import letterbox_image from utils.utils_metrics import evaluteTop1_5 test_annotation_path = 'cls_test.txt' metrics_out_path = "metrics_out" class Eval_Classification(Classification): def detect_image(self, image): image = cvtColor(image) image_data = letterbox_image(image, [self.input_shape[1], self.input_shape[0]], self.letterbox_image) image_data = np.transpose(np.expand_dims(preprocess_input(np.array(image_data, np.float32)), 0), (0, 3, 1, 2)) with torch.no_grad(): photo = torch.from_numpy(image_data).type(torch.FloatTensor) if self.cuda: photo = photo.cuda() preds = torch.softmax(self.model(photo)[0], dim=-1).cpu().numpy() return preds if __name__ == "__main__": if not os.path.exists(metrics_out_path): os.makedirs(metrics_out_path) classfication = Eval_Classification() with open("./cls_test.txt","r") as f: lines = f.readlines() top1, top5, Recall, Precision = evaluteTop1_5(classfication, lines, metrics_out_path) print("top-1 accuracy = %.2f%%" % (top1*100)) print("top-5 accuracy = %.2f%%" % (top5*100)) print("mean Recall = %.2f%%" % (np.mean(Recall)*100)) print("mean Precision = %.2f%%" % (np.mean(Precision)*100))
为了验证模型的效果,使用precision和recall作为评价指标。实验结果如下图所示。
(6)对图像进行分类预测结果
代码:
#分类 import matplotlib.pyplot as plt import numpy as np import torch from torch import nn from nets import get_model_from_name from utils.utils import (cvtColor, get_classes, letterbox_image, preprocess_input, show_config) class Classification(object): _defaults = { "model_path" : 'logs/best_epoch_weights.pth', "classes_path" : 'model_data/cls_classes.txt', "input_shape" : [224, 224], "backbone" : 'mobilenetv2', "letterbox_image" : False, "cuda" : True } @classmethod def get_defaults(cls, n): if n in cls._defaults: return cls._defaults[n] else: return "Unrecognized attribute name '" + n + "'" def __init__(self, **kwargs): self.__dict__.update(self._defaults) for name, value in kwargs.items(): setattr(self, name, value) self.class_names, self.num_classes = get_classes(self.classes_path) self.generate() show_config(**self._defaults) def generate(self): if self.backbone not in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: self.model = get_model_from_name[self.backbone](num_classes = self.num_classes, pretrained = False) else: self.model = get_model_from_name[self.backbone](input_shape = self.input_shape, num_classes = self.num_classes, pretrained = False) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') self.model.load_state_dict(torch.load(self.model_path, map_location=device)) self.model = self.model.eval() print('{} model, and classes loaded.'.format(self.model_path)) if self.cuda: self.model = nn.DataParallel(self.model) self.model = self.model.cuda() def detect_image(self, image): image = cvtColor(image) image_data = letterbox_image(image, [self.input_shape[1], self.input_shape[0]], self.letterbox_image) image_data = np.transpose(np.expand_dims(preprocess_input(np.array(image_data, np.float32)), 0), (0, 3, 1, 2)) with torch.no_grad(): photo = torch.from_numpy(image_data) if self.cuda: photo = photo.cuda() preds = torch.softmax(self.model(photo)[0], dim=-1).cpu().numpy() class_name = self.class_names[np.argmax(preds)] probability = np.max(preds) plt.rcParams['font.sans-serif'] = ['SimHei'] plt.subplot(1, 1, 1) plt.imshow(np.array(image)) if class_name == "smoking_images": plt.title('他正在吸烟的概率为:%.3f' %(probability)) plt.show() return class_name else: plt.title('他没有在吸烟的概率为:%.3f' % (probability)) plt.show() return class_name
#预测结果 from PIL import Image from classification import Classification import os os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" classfication = Classification() while True: img = input('Input image filename:') try: image = Image.open(img) except: print('Open Error! Try again!') continue else: class_name = classfication.detect_image(image) print(class_name)
(7)附完整程序源代码:
1 #生成数据集信息 2 import os 3 from os import getcwd 4 5 from utils.utils import get_classes 6 7 classes_path = 'model_data/cls_classes.txt' 8 9 datasets_path = 'datasets' 10 11 sets = ["train", "test"] 12 classes, _ = get_classes(classes_path) 13 14 if __name__ == "__main__": 15 for se in sets: 16 list_file = open('cls_' + se + '.txt', 'w') 17 18 datasets_path_t = os.path.join(datasets_path, se) 19 types_name = os.listdir(datasets_path_t) 20 for type_name in types_name: 21 if type_name not in classes: 22 continue 23 cls_id = classes.index(type_name) 24 25 photos_path = os.path.join(datasets_path_t, type_name) 26 photos_name = os.listdir(photos_path) 27 for photo_name in photos_name: 28 _, postfix = os.path.splitext(photo_name) 29 if postfix not in ['.jpg', '.png', '.jpeg']: 30 continue 31 list_file.write(str(cls_id) + ";" + '%s'%(os.path.join(photos_path, photo_name))) 32 list_file.write('\n') 33 list_file.close() 34 35 #显示网络结构 36 import torch 37 from thop import clever_format, profile 38 from torchsummary import summary 39 40 from nets import get_model_from_name 41 42 if __name__ == "__main__": 43 input_shape = [224, 224] 44 num_classes = 1000 45 backbone = "mobilenetv2" 46 47 device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 48 model = get_model_from_name[backbone](num_classes=num_classes, pretrained=False).to(device) 49 50 summary(model, (3, input_shape[0], input_shape[1])) 51 52 dummy_input = torch.randn(1, 3, input_shape[0], input_shape[1]).to(device) 53 flops, params = profile(model.to(device), (dummy_input, ), verbose=False) 54 55 flops = flops * 2 56 flops, params = clever_format([flops, params], "%.3f") 57 print('Total GFLOPS: %s' % (flops)) 58 print('Total params: %s' % (params)) 59 #训练网络 60 import os 61 import numpy as np 62 import torch 63 import torch.backends.cudnn as cudnn 64 import torch.distributed as dist 65 import torch.nn as nn 66 import torch.optim as optim 67 from torch.utils.data import DataLoader 68 69 from nets import get_model_from_name 70 from utils.callbacks import LossHistory 71 from utils.dataloader import DataGenerator, detection_collate 72 from utils.utils import (download_weights, get_classes, get_lr_scheduler, 73 set_optimizer_lr, show_config, weights_init) 74 from utils.utils_fit import fit_one_epoch 75 76 if __name__ == "__main__": 77 78 Cuda = True 79 80 distributed = False 81 82 sync_bn = False 83 84 fp16 = False 85 86 classes_path = 'model_data/cls_classes.txt' 87 88 input_shape = [224, 224] 89 90 backbone = "mobilenetv2" 91 92 pretrained = True 93 94 model_path = "" 95 96 Init_Epoch = 0 97 Freeze_Epoch = 50 98 Freeze_batch_size = 32 99 100 UnFreeze_Epoch = 200 101 Unfreeze_batch_size = 32 102 103 Freeze_Train = True 104 105 Init_lr = 1e-2 106 Min_lr = Init_lr * 0.01 107 108 optimizer_type = "sgd" 109 momentum = 0.9 110 weight_decay = 5e-4 111 112 lr_decay_type = "cos" 113 114 save_period = 10 115 116 save_dir = 'logs' 117 118 num_workers = 4 119 120 121 train_annotation_path = "cls_train.txt" 122 test_annotation_path = 'cls_test.txt' 123 124 125 ngpus_per_node = torch.cuda.device_count() 126 if distributed: 127 dist.init_process_group(backend="nccl") 128 local_rank = int(os.environ["LOCAL_RANK"]) 129 rank = int(os.environ["RANK"]) 130 device = torch.device("cuda", local_rank) 131 if local_rank == 0: 132 print(f"[{os.getpid()}] (rank = {rank}, local_rank = {local_rank}) training...") 133 print("Gpu Device Count : ", ngpus_per_node) 134 else: 135 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 136 local_rank = 0 137 rank = 0 138 139 140 if pretrained: 141 if distributed: 142 if local_rank == 0: 143 download_weights(backbone) 144 dist.barrier() 145 else: 146 download_weights(backbone) 147 148 149 class_names, num_classes = get_classes(classes_path) 150 151 if backbone not in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: 152 model = get_model_from_name[backbone](num_classes = num_classes, pretrained = pretrained) 153 else: 154 model = get_model_from_name[backbone](input_shape = input_shape, num_classes = num_classes, pretrained = pretrained) 155 156 if not pretrained: 157 weights_init(model) 158 if model_path != "": 159 160 if local_rank == 0: 161 print('Load weights {}.'.format(model_path)) 162 163 164 model_dict = model.state_dict() 165 pretrained_dict = torch.load(model_path, map_location = device) 166 load_key, no_load_key, temp_dict = [], [], {} 167 for k, v in pretrained_dict.items(): 168 if k in model_dict.keys() and np.shape(model_dict[k]) == np.shape(v): 169 temp_dict[k] = v 170 load_key.append(k) 171 else: 172 no_load_key.append(k) 173 model_dict.update(temp_dict) 174 model.load_state_dict(model_dict) 175 176 if local_rank == 0: 177 print("\nSuccessful Load Key:", str(load_key)[:500], "……\nSuccessful Load Key Num:", len(load_key)) 178 print("\nFail To Load Key:", str(no_load_key)[:500], "……\nFail To Load Key num:", len(no_load_key)) 179 180 181 if local_rank == 0: 182 loss_history = LossHistory(save_dir, model, input_shape=input_shape) 183 else: 184 loss_history = None 185 186 187 if fp16: 188 from torch.cuda.amp import GradScaler as GradScaler 189 scaler = GradScaler() 190 else: 191 scaler = None 192 193 model_train = model.train() 194 195 if sync_bn and ngpus_per_node > 1 and distributed: 196 model_train = torch.nn.SyncBatchNorm.convert_sync_batchnorm(model_train) 197 elif sync_bn: 198 print("Sync_bn is not support in one gpu or not distributed.") 199 200 if Cuda: 201 if distributed: 202 203 model_train = model_train.cuda(local_rank) 204 model_train = torch.nn.parallel.DistributedDataParallel(model_train, device_ids=[local_rank], find_unused_parameters=True) 205 else: 206 model_train = torch.nn.DataParallel(model) 207 cudnn.benchmark = True 208 model_train = model_train.cuda() 209 210 211 with open(train_annotation_path, encoding='utf-8') as f: 212 train_lines = f.readlines() 213 with open(test_annotation_path, encoding='utf-8') as f: 214 val_lines = f.readlines() 215 num_train = len(train_lines) 216 num_val = len(val_lines) 217 np.random.seed(10101) 218 np.random.shuffle(train_lines) 219 np.random.seed(None) 220 221 if local_rank == 0: 222 show_config( 223 num_classes = num_classes, backbone = backbone, model_path = model_path, input_shape = input_shape, \ 224 Init_Epoch = Init_Epoch, Freeze_Epoch = Freeze_Epoch, UnFreeze_Epoch = UnFreeze_Epoch, Freeze_batch_size = Freeze_batch_size, Unfreeze_batch_size = Unfreeze_batch_size, Freeze_Train = Freeze_Train, \ 225 Init_lr = Init_lr, Min_lr = Min_lr, optimizer_type = optimizer_type, momentum = momentum, lr_decay_type = lr_decay_type, \ 226 save_period = save_period, save_dir = save_dir, num_workers = num_workers, num_train = num_train, num_val = num_val 227 ) 228 229 wanted_step = 3e4 if optimizer_type == "sgd" else 1e4 230 total_step = num_train // Unfreeze_batch_size * UnFreeze_Epoch 231 232 233 if True: 234 UnFreeze_flag = False 235 236 if Freeze_Train: 237 model.freeze_backbone() 238 239 240 batch_size = Freeze_batch_size if Freeze_Train else Unfreeze_batch_size 241 242 243 nbs = 64 244 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 245 lr_limit_min = 1e-4 if optimizer_type == 'adam' else 5e-4 246 if backbone in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: 247 nbs = 256 248 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 249 lr_limit_min = 1e-5 if optimizer_type == 'adam' else 5e-4 250 Init_lr_fit = min(max(batch_size / nbs * Init_lr, lr_limit_min), lr_limit_max) 251 Min_lr_fit = min(max(batch_size / nbs * Min_lr, lr_limit_min * 1e-2), lr_limit_max * 1e-2) 252 253 optimizer = { 254 'adam' : optim.Adam(model_train.parameters(), Init_lr_fit, betas = (momentum, 0.999), weight_decay=weight_decay), 255 'sgd' : optim.SGD(model_train.parameters(), Init_lr_fit, momentum = momentum, nesterov=True) 256 }[optimizer_type] 257 258 259 lr_scheduler_func = get_lr_scheduler(lr_decay_type, Init_lr_fit, Min_lr_fit, UnFreeze_Epoch) 260 261 epoch_step = num_train // batch_size 262 epoch_step_val = num_val // batch_size 263 264 train_dataset = DataGenerator(train_lines, input_shape, True) 265 val_dataset = DataGenerator(val_lines, input_shape, False) 266 267 if distributed: 268 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, shuffle=True,) 269 val_sampler = torch.utils.data.distributed.DistributedSampler(val_dataset, shuffle=False,) 270 batch_size = batch_size // ngpus_per_node 271 shuffle = False 272 else: 273 train_sampler = None 274 val_sampler = None 275 shuffle = True 276 277 gen = DataLoader(train_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, 278 drop_last=True, collate_fn=detection_collate, sampler=train_sampler) 279 gen_val = DataLoader(val_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, 280 drop_last=True, collate_fn=detection_collate, sampler=val_sampler) 281 282 for epoch in range(Init_Epoch, UnFreeze_Epoch): 283 284 if epoch >= Freeze_Epoch and not UnFreeze_flag and Freeze_Train: 285 batch_size = Unfreeze_batch_size 286 287 288 nbs = 64 289 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 290 lr_limit_min = 1e-4 if optimizer_type == 'adam' else 5e-4 291 if backbone in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: 292 nbs = 256 293 lr_limit_max = 1e-3 if optimizer_type == 'adam' else 1e-1 294 lr_limit_min = 1e-5 if optimizer_type == 'adam' else 5e-4 295 Init_lr_fit = min(max(batch_size / nbs * Init_lr, lr_limit_min), lr_limit_max) 296 Min_lr_fit = min(max(batch_size / nbs * Min_lr, lr_limit_min * 1e-2), lr_limit_max * 1e-2) 297 298 lr_scheduler_func = get_lr_scheduler(lr_decay_type, Init_lr_fit, Min_lr_fit, UnFreeze_Epoch) 299 300 model.Unfreeze_backbone() 301 302 epoch_step = num_train // batch_size 303 epoch_step_val = num_val // batch_size 304 305 306 if distributed: 307 batch_size = batch_size // ngpus_per_node 308 309 gen = DataLoader(train_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, 310 drop_last=True, collate_fn=detection_collate, sampler=train_sampler) 311 gen_val = DataLoader(val_dataset, shuffle=shuffle, batch_size=batch_size, num_workers=num_workers, pin_memory=True, 312 drop_last=True, collate_fn=detection_collate, sampler=val_sampler) 313 314 UnFreeze_flag = True 315 316 if distributed: 317 train_sampler.set_epoch(epoch) 318 319 set_optimizer_lr(optimizer, lr_scheduler_func, epoch) 320 321 fit_one_epoch(model_train, model, loss_history, optimizer, epoch, epoch_step, epoch_step_val, gen, gen_val, UnFreeze_Epoch, Cuda, fp16, scaler, save_period, save_dir, local_rank) 322 323 if local_rank == 0: 324 loss_history.writer.close() 325 #分类 326 import matplotlib.pyplot as plt 327 import numpy as np 328 import torch 329 from torch import nn 330 331 from nets import get_model_from_name 332 from utils.utils import (cvtColor, get_classes, letterbox_image, 333 preprocess_input, show_config) 334 335 336 337 class Classification(object): 338 _defaults = { 339 340 "model_path" : 'logs/best_epoch_weights.pth', 341 "classes_path" : 'model_data/cls_classes.txt', 342 343 "input_shape" : [224, 224], 344 345 "backbone" : 'mobilenetv2', 346 347 "letterbox_image" : False, 348 349 "cuda" : True 350 } 351 352 @classmethod 353 def get_defaults(cls, n): 354 if n in cls._defaults: 355 return cls._defaults[n] 356 else: 357 return "Unrecognized attribute name '" + n + "'" 358 359 360 def __init__(self, **kwargs): 361 self.__dict__.update(self._defaults) 362 for name, value in kwargs.items(): 363 setattr(self, name, value) 364 365 366 self.class_names, self.num_classes = get_classes(self.classes_path) 367 self.generate() 368 369 show_config(**self._defaults) 370 371 def generate(self): 372 373 if self.backbone not in ['vit_b_16', 'swin_transformer_tiny', 'swin_transformer_small', 'swin_transformer_base']: 374 self.model = get_model_from_name[self.backbone](num_classes = self.num_classes, pretrained = False) 375 else: 376 self.model = get_model_from_name[self.backbone](input_shape = self.input_shape, num_classes = self.num_classes, pretrained = False) 377 device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') 378 self.model.load_state_dict(torch.load(self.model_path, map_location=device)) 379 self.model = self.model.eval() 380 print('{} model, and classes loaded.'.format(self.model_path)) 381 382 if self.cuda: 383 self.model = nn.DataParallel(self.model) 384 self.model = self.model.cuda() 385 386 387 def detect_image(self, image): 388 image = cvtColor(image) 389 390 image_data = letterbox_image(image, [self.input_shape[1], self.input_shape[0]], self.letterbox_image) 391 image_data = np.transpose(np.expand_dims(preprocess_input(np.array(image_data, np.float32)), 0), (0, 3, 1, 2)) 392 393 with torch.no_grad(): 394 photo = torch.from_numpy(image_data) 395 if self.cuda: 396 photo = photo.cuda() 397 398 preds = torch.softmax(self.model(photo)[0], dim=-1).cpu().numpy() 399 400 class_name = self.class_names[np.argmax(preds)] 401 probability = np.max(preds) 402 plt.rcParams['font.sans-serif'] = ['SimHei'] 403 plt.subplot(1, 1, 1) 404 plt.imshow(np.array(image)) 405 if class_name == "smoking_images": 406 plt.title('他正在吸烟的概率为:%.3f' %(probability)) 407 plt.show() 408 return class_name 409 else: 410 plt.title('他没有在吸烟的概率为:%.3f' % (probability)) 411 plt.show() 412 return class_name 413 import os 414 415 import numpy as np 416 import torch 417 418 from classification import (Classification, cvtColor, letterbox_image, 419 preprocess_input) 420 from utils.utils import letterbox_image 421 from utils.utils_metrics import evaluteTop1_5 422 423 424 test_annotation_path = 'cls_test.txt' 425 metrics_out_path = "metrics_out" 426 427 class Eval_Classification(Classification): 428 def detect_image(self, image): 429 430 image = cvtColor(image) 431 432 image_data = letterbox_image(image, [self.input_shape[1], self.input_shape[0]], self.letterbox_image) 433 434 image_data = np.transpose(np.expand_dims(preprocess_input(np.array(image_data, np.float32)), 0), (0, 3, 1, 2)) 435 436 with torch.no_grad(): 437 photo = torch.from_numpy(image_data).type(torch.FloatTensor) 438 if self.cuda: 439 photo = photo.cuda() 440 441 preds = torch.softmax(self.model(photo)[0], dim=-1).cpu().numpy() 442 443 return preds 444 445 if __name__ == "__main__": 446 if not os.path.exists(metrics_out_path): 447 os.makedirs(metrics_out_path) 448 449 classfication = Eval_Classification() 450 451 with open("./cls_test.txt","r") as f: 452 lines = f.readlines() 453 top1, top5, Recall, Precision = evaluteTop1_5(classfication, lines, metrics_out_path) 454 print("top-1 accuracy = %.2f%%" % (top1*100)) 455 print("top-5 accuracy = %.2f%%" % (top5*100)) 456 print("mean Recall = %.2f%%" % (np.mean(Recall)*100)) 457 print("mean Precision = %.2f%%" % (np.mean(Precision)*100)) 458 459 #预测结果 460 from PIL import Image 461 462 from classification import Classification 463 import os 464 os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE" 465 classfication = Classification() 466 467 while True: 468 img = input('Input image filename:') 469 try: 470 image = Image.open(img) 471 except: 472 print('Open Error! Try again!') 473 continue 474 else: 475 class_name = classfication.detect_image(image) 476 print(class_name)
四、总结
在本次任务中,学习了MobileNetV2网络,并且利用MobileNetV2实现了抽烟和打电话的图像识别,得到了较为不错的结果,mprecision达到了99.61%,mrecall达到了99.71。在后续的学习中,将会学习更多的经典网络结构,例如ResNet50和VGG16等,并且还会学习现在较为流行的注意力机制等方法,争取下次对更难的数据集进行识别。
标签:torch,机器,self,分类,lr,吸烟,path,model,image From: https://www.cnblogs.com/zzjyyds/p/16988949.html