首页 > 其他分享 >昇思25天学习打卡营第16天|ShuffleNet图像分类

昇思25天学习打卡营第16天|ShuffleNet图像分类

时间:2024-07-05 13:02:27浏览次数:9  
标签:25 nn 16 self dataset channels 打卡 out size

ShuffleNet网络介绍

        ShuffleNetV1是由旷视科技提出的一种高效计算的卷积神经网络(CNN)模型,主要用于移动设备。与MobileNet和SqueezeNet类似,ShuffleNetV1的设计目标是利用有限的计算资源达到最佳模型精度。其核心设计是引入了Pointwise Group Convolution和Channel Shuffle,这两种操作在保持精度的同时大大降低了模型的计算量。

模型架构

        ShuffleNet最显著的特点是在ResNet的基础上,通过对通道进行重排解决了Group Convolution带来的弊端。具体来说,ShuffleNet对ResNet的Bottleneck单元进行了改进,在较小的计算量情况下实现了较高的准确率。

Pointwise Group Convolution

        分组卷积(Group Convolution)将卷积核分组,减少了参数量和计算量。每个卷积核只处理输入特征图的一部分通道,虽然参数量减少了,但这种方法也限制了不同组别之间的信息交流。

Channel Shuffle

        分组卷积的一个主要问题是不同组别的通道无法交流。为了解决这个问题,ShuffleNet引入了Channel Shuffle机制,通过重排通道,确保不同组别的通道信息能够相互交流。

模型构建

        ShuffleNet的网络结构如下所示,以输入图像224×224,组数3(g = 3)为例:

import mindspore as ms
from mindspore import nn, ops, Tensor

class GroupConv(nn.Cell):
    def __init__(self, in_channels, out_channels, kernel_size, stride, pad_mode="pad", pad=0, groups=1, has_bias=False):
        super(GroupConv, self).__init__()
        self.groups = groups
        self.convs = nn.CellList()
        for _ in range(groups):
            self.convs.append(nn.Conv2d(in_channels // groups, out_channels // groups, kernel_size=kernel_size, stride=stride, has_bias=has_bias, padding=pad, pad_mode=pad_mode, group=1, weight_init='xavier_uniform'))

    def construct(self, x):
        features = ops.split(x, split_size_or_sections=int(len(x[0]) // self.groups), axis=1)
        outputs = ()
        for i in range(self.groups):
            outputs = outputs + (self.convs[i](features[i].astype("float32")),)
        out = ops.cat(outputs, axis=1)
        return out

class ShuffleV1Block(nn.Cell):
    def __init__(self, inp, oup, group, first_group, mid_channels, ksize, stride):
        super(ShuffleV1Block, self).__init__()
        self.stride = stride
        pad = ksize // 2
        self.group = group
        if stride == 2:
            outputs = oup - inp
        else:
            outputs = oup
        self.relu = nn.ReLU()
        branch_main_1 = [
            GroupConv(in_channels=inp, out_channels=mid_channels, kernel_size=1, stride=1, pad_mode="pad", pad=0, groups=1 if first_group else group),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(),
        ]
        branch_main_2 = [
            nn.Conv2d(mid_channels, mid_channels, kernel_size=ksize, stride=stride, pad_mode='pad', padding=pad, group=mid_channels, weight_init='xavier_uniform', has_bias=False),
            nn.BatchNorm2d(mid_channels),
            GroupConv(in_channels=mid_channels, out_channels=outputs, kernel_size=1, stride=1, pad_mode="pad", pad=0, groups=group),
            nn.BatchNorm2d(outputs),
        ]
        self.branch_main_1 = nn.SequentialCell(branch_main_1)
        self.branch_main_2 = nn.SequentialCell(branch_main_2)
        if self.stride == 2:
            self.branch_proj = nn.AvgPool2d(kernel_size=3, stride=2, pad_mode='same')

    def construct(self, old_x):
        left = old_x
        right = old_x
        out = old_x
        right = self.branch_main_1(right)
        if self.group > 1:
            right = self.channel_shuffle(right)
        right = self.branch_main_2(right)
        if self.stride == 1:
            out = self.relu(left + right)
        elif self.stride == 2:
            left = self.branch_proj(left)
            out = ops.cat((left, right), 1)
            out = self.relu(out)
        return out

    def channel_shuffle(self, x):
        batchsize, num_channels, height, width = ops.shape(x)
        group_channels = num_channels // self.group
        x = ops.reshape(x, (batchsize, group_channels, self.group, height, width))
        x = ops.transpose(x, (0, 2, 1, 3, 4))
        x = ops.reshape(x, (batchsize, num_channels, height, width))
        return x

class ShuffleNetV1(nn.Cell):
    def __init__(self, n_class=1000, model_size='2.0x', group=3):
        super(ShuffleNetV1, self).__init__()
        self.stage_repeats = [4, 8, 4]
        self.model_size = model_size
        if group == 3:
            if model_size == '0.5x':
                self.stage_out_channels = [-1, 12, 120, 240, 480]
            elif model_size == '1.0x':
                self.stage_out_channels = [-1, 24, 240, 480, 960]
            elif model_size == '1.5x':
                self.stage_out_channels = [-1, 24, 360, 720, 1440]
            elif model_size == '2.0x':
                self.stage_out_channels = [-1, 48, 480, 960, 1920]
            else:
                raise NotImplementedError
        elif group == 8:
            if model_size == '0.5x':
                self.stage_out_channels = [-1, 16, 192, 384, 768]
            elif model_size == '1.0x':
                self.stage_out_channels = [-1, 24, 384, 768, 1536]
            elif model_size == '1.5x':
                self.stage_out_channels = [-1, 24, 576, 1152, 2304]
            elif model_size == '2.0x':
                self.stage_out_channels = [-1, 48, 768, 1536, 3072]
            else:
                raise NotImplementedError
        input_channel = self.stage_out_channels[1]
        self.first_conv = nn.SequentialCell(
            nn.Conv2d(3, input_channel, 3, 2, 'pad', 1, weight_init='xavier_uniform', has_bias=False),
            nn.BatchNorm2d(input_channel),
            nn.ReLU(),
        )
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, pad_mode='same')
        features = []
        for idxstage in range(len(self.stage_repeats)):
            numrepeat = self.stage_repeats[idxstage]
            output_channel = self.stage_out_channels[idxstage + 2]
            for i in range(numrepeat):
                stride = 2 if i == 0 else 1
                first_group = idxstage == 0 and i == 0
                features.append(ShuffleV1Block(input_channel, output_channel, group=group, first_group=first_group, mid_channels=output_channel // 4, ksize=3, stride=stride))
                input_channel = output_channel
        self.features = nn.SequentialCell(features)
        self.globalpool = nn.AvgPool2d(7)
        self.classifier = nn.Dense(self.stage_out_channels[-1], n_class)

    def construct(self, x):
        x = self.first_conv(x)
        x = self.maxpool(x)
        x = self.features(x)
        x = self.globalpool(x)
        x = ops.reshape(x, (-1, self.stage_out_channels[-1]))
        x = self.classifier(x)
        return x

模型训练

        使用CIFAR-10数据集进行训练。首先,准备数据集并进行数据增强处理。

from mindspore.dataset import Cifar10Dataset
from mindspore.dataset import vision, transforms

def get_dataset(train_dataset_path, batch_size, usage):
    image_trans = []
    if usage == "train":
        image_trans = [
            vision.RandomCrop((32, 32), (4, 4, 4, 4)),
            vision.RandomHorizontalFlip(prob=0.5),
            vision.Resize((224, 224)),
            vision.Rescale(1.0 / 255.0, 0.0),
            vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
            vision.HWC2CHW()
        ]
    elif usage == "test":
        image_trans = [
            vision.Resize((224, 224)),
            vision.Rescale(1.0 / 255.0, 0.0),
            vision.Normalize([0.4914, 0.4822, 0.4465], [0.2023, 0.1994, 0.2010]),
            vision.HWC2CHW()
        ]
    label_trans = transforms.TypeCast(ms.int32)
    dataset = Cifar10Dataset(train_dataset_path, usage=usage, shuffle=True, num_samples=2000)
    dataset = dataset.map(image_trans, 'image')
    dataset = dataset.map(label_trans, 'label')
    dataset = dataset.batch(batch_size, drop_remainder=True)
    return dataset

train_dataset = get_dataset("./dataset/cifar-10-batches-bin", 32, "train")
batches_per_epoch = train_dataset.get_dataset_size()

定义训练过程,包括损失函数、优化器和训练步骤。

import time
from mindspore import Model, nn
from mindspore.train import ModelCheckpoint, CheckpointConfig, TimeMonitor, LossMonitor
from mindspore.nn import Momentum

def train():
    ms.set_context(mode=ms.PYNATIVE_MODE, device_target="CPU")
    net = ShuffleNetV1(model_size="2.0x", n_class=10)
    loss = nn.CrossEntropyLoss(weight=None, reduction='mean', label_smoothing=0.1)
    min_lr = 0.0005
    base_lr = 0.05
    lr_scheduler = nn.cosine_decay_lr(min_lr, base_lr, batches_per_epoch * 2, batches_per_epoch, decay_epoch=2)
    lr = Tensor(lr_scheduler[-1])
    optimizer = Momentum(params=net.trainable_params(), learning_rate=lr, momentum=0.9, weight_decay=0.00004, loss_scale=1024)
    loss_scale_manager = ms.amp.FixedLossScaleManager(1024, drop_overflow_update=False)
    model = Model(net, loss_fn=loss, optimizer=optimizer, amp_level="O3", loss_scale_manager=loss_scale_manager)
    callback = [TimeMonitor(), LossMonitor()]
    save_ckpt_path = "./"
    config_ckpt = CheckpointConfig(save_checkpoint_steps=batches_per_epoch, keep_checkpoint_max=5)
    ckpt_callback = ModelCheckpoint("shufflenetv1", directory=save_ckpt_path, config=config_ckpt)
    callback += [ckpt_callback]

    print("============== Starting Training ==============")
    start_time = time.time()
    model.train(1, train_dataset, callbacks=callback)
    use_time = time.time() - start_time
    hour = str(int(use_time // 60 // 60))
    minute = str(int(use_time // 60 % 60))
    second = str(int(use_time % 60))
    print("total time:" + hour + "h " + minute + "m " + second + "s")
    print("============== Train Success ==============")

if __name__ == '__main__':
    train()

模型评估

        在CIFAR-10测试集上评估模型性能。

from mindspore import load_checkpoint, load_param_into_net

def test():
    ms.set_context(mode=ms.PYNATIVE_MODE, device_target="CPU")
    test_dataset = get_dataset("./dataset/cifar-10-batches-bin", 32, "test")
    net = ShuffleNetV1(model_size="2.0x", n_class=10)
    param_dict = load_checkpoint("shufflenetv1-1_500.ckpt")
    load_param_into_net(net, param_dict)
    net.set_train(False)
    loss = nn.CrossEntropyLoss(weight=None, reduction='mean', label_smoothing=0.1)
    eval_metrics = {'Loss': nn.Loss(), 'Top_1_Acc': nn.Top1CategoricalAccuracy(), 'Top_5_Acc': nn.Top5CategoricalAccuracy()}
    model = Model(net, loss_fn=loss, metrics=eval_metrics)
    start_time = time.time()
    res = model.eval(test_dataset, dataset_sink_mode=False)
    use_time = time.time() - start_time
    hour = str(int(use_time // 60 // 60))
    minute = str(int(use_time // 60 % 60))
    second = str(int(use_time % 60))
    log = "result:" + str(res) + ", ckpt:'" + "./shufflenetv1-1_500.ckpt" + "', time: " + hour + "h " + minute + "m " + second + "s"
    print(log)
    with open('./eval_log.txt', 'a') as file_object:
        file_object.write(log + '\n')

if __name__ == '__main__':
    test()

模型预测

        在CIFAR-10测试集上进行模型预测,并将预测结果可视化。

import matplotlib.pyplot as plt
import numpy as np

def predict():
    net = ShuffleNetV1(model_size="2.0x", n_class=10)
    param_dict = load_checkpoint("shufflenetv1-1_500.ckpt")
    load_param_into_net(net, param_dict)
    model = Model(net)
    predict_dataset = get_dataset("./dataset/cifar-10-batches-bin", 32, "test")
    
    class_dict = {0: "airplane", 1: "automobile", 2: "bird", 3: "cat", 4: "deer", 5: "dog", 6: "frog", 7: "horse", 8: "ship", 9: "truck"}
    plt.figure(figsize=(16, 5))
    for i, data in enumerate(predict_dataset.create_dict_iterator(), 1):
        images = data['image']
        labels = data['label']
        output = model.predict(Tensor(images))
        pred = np.argmax(output.asnumpy(), axis=1)
        for j in range(len(images)):
            plt.subplot(4, 8, i * 8 + j + 1)
            plt.title(f'{class_dict[pred[j]]}')
            plt.imshow(images[j].transpose(1, 2, 0).asnumpy())
            plt.axis("off")
        if i == 3:  # 只展示前三批次结果
            break
    plt.show()

if __name__ == '__main__':
    predict()

结果

学习心得:学习ShuffleNet的过程中,我对高效计算的卷积神经网络有了更深入的理解。ShuffleNet作为一种轻量级模型,通过引入Pointwise Group Convolution和Channel Shuffle,实现了在有限计算资源下的高效图像分类。在学习过程中,我深入研究了分组卷积和通道重排机制。分组卷积通过将卷积核分组,减少了计算量,但也带来了信息交流的问题。Channel Shuffle机制通过重排通道,解决了不同组别通道信息交流的问题,使得模型在保证计算效率的同时,也能有效地提取图像特征。

如果你觉得这篇博文对你有帮助,请点赞、收藏、关注我,并且可以打赏支持我!

欢迎关注我的后续博文,我将分享更多关于人工智能、自然语言处理、计算机视觉的精彩内容。

谢谢大家的支持!

标签:25,nn,16,self,dataset,channels,打卡,out,size
From: https://blog.csdn.net/ljd939952281/article/details/140193443

相关文章

  • 昇思25天学习打卡营第14天|基于MindSpore的红酒分类实验
    AI是在帮助开发者还是取代他们?在软件开发领域,生成式人工智能(AIGC)正在改变开发者的工作方式。无论是代码生成、错误检测还是自动化测试,AI工具正在成为开发者的得力助手。然而,这也引发了对开发者职业前景和技能需求变化的讨论。AI究竟是在帮助开发者还是取代他们?我的观点是,正......
  • 【日记】今天好忙(316 字)
    正文今天一整天都是疯忙的节奏。上午开户,一来来俩。从9点到12点,中间连轴转没停过。昨天睡得还可以,不过上午依旧很困。昨晚跟兄长打了一晚上的掳人少女人偶,没打过……宫崎英高!你罪该万死!搞这么难…….昨晚也因为去的时候在下雨,所以没什么人来练习。只有我......
  • QILSTE H11-316QSR高亮红光LED灯珠 发光二极管LED
    型号H11-316QSR,一款由QILSTE(HongKong)TechnologyCo.,Ltd生产的高亮红光LED,以其3.2×1.5×1.1mm的紧凑外观尺寸和透明平面胶体,在自动贴片机和红外线回流焊制程中展现出卓越的性能。在**最大绝对额定值**中,H11-316QSR的参数如下:消耗功率为75mW,最大脉冲电流可达100mA,正向直......
  • 代码随想录算法训练营第十三天|今天量大管饱144、145、94、102、107、199、637、429、
    今天来处理二叉树part1、2、3,顶级享受,一次到位。完全二叉树和满二叉树概念没问题。二叉搜索树,左子树所有结点的值小于它的根结点的值,右子树上所有结点的值大于它的根结点的值平衡二叉搜索树,它是一棵空树或它的左右两个子树的高度差的绝对值不超过1。二叉树的存储方式:链式存储......
  • MySQL - [16] SSL
    题记部分 一、标题  二、相关SQL(1)查看MySQL服务器是否支持SSL:SHOWVARIABLESLIKE'have_ssl';Tips:如果输出显示have_ssl的值为YES,则表明MySQL支持SSL。(2)检查SSL证书和密钥是否已被配置:SHOWVARIABLESLIKE'ssl%';Tips:查看输出结果中是否有ssl_ca、ssl_cert......
  • 竞赛图 SCC 计数(ARC163D)
    我们先端上来一个美味的结论。对于一张有\(n\)个点的竞赛图\(G\),它的SCC数量等于:将\(G\)的\(n\)个点划分为两个点集\(S\)和\(T\),且要求\(|T|>0\),对于任意的\(u\inS\)和\(v\inT\),\(u\)和\(v\)之间的连边方向为\(u\tov\)的方案数。考虑将图\(G\)进行......
  • 《昇思25天学习打卡营第7天 | 模型训练》
    《昇思25天学习打卡营第7天|模型训练》目录《昇思25天学习打卡营第7天|模型训练》模型训练的步骤构建数据集定义神经网络模型定义超参、损失函数和优化器超参损失函数优化器训练与评估模型训练的步骤模型训练一般分为四个步骤:构建数据集。定义神经网络模型。......
  • 昇思25天学习打卡营第8天|使用静态图加速
            神经网络编译框架分为两种运行模式,分别是动态图模式以及静态图模式。MindSpore默认情况下是以动态图模式运行,但也支持手工切换为静态图模式。动态图模式:        该模式类似Python的解释执行方式,一边编译一遍执行。在计算图中定义一个Tensor时,其值就......
  • HMAC SHA256对称加密
    HMACSHA256是一种对称加密算法。以下是关于HMACSHA256以及对称加密和非对称加密的详细解释:HMACSHA256HMAC(Hash-basedMessageAuthenticationCode)是一种基于哈希函数的消息认证码,它通过结合一个秘密密钥和一个哈希算法来生成消息摘要,从而验证消息的完整性和真实性。HMAC......
  • 【计算机毕业设计】025基于weixin小程序移动学习平台
    ......