首页 > 其他分享 >一种简单的自编码器PyTorch代码实现

一种简单的自编码器PyTorch代码实现

时间:2023-12-23 20:32:20浏览次数:37  
标签:layers loss 编码器 nn 代码 use PyTorch model self

1. 引言

对于许多新接触深度学习爱好者来说,玩AutoEncoder总是很有趣的,因为它具有简单的处理逻辑、简易的网络架构,方便可视化潜在的特征空间。在本文中,我将从头开始介绍一个简单的AutoEncoder模型,以及一些可视化潜在特征空间的一些的方法,以便使本文变得生动有趣。

闲话少说,我们直接开始吧!

2. 数据集介绍

在本文中,我们使用FashionMNIST数据集来完成此任务。

一种简单的自编码器PyTorch代码实现_自编码器

以下是Kaggle上数据集的链接:戳我。 该数据集已在torchvision库中集成;我们可以通过几行代码直接导入和处理该数据集。

为此,首先需要是编写一个collate_fn函数,将数据集从PIL图像转换为torch张量,并进行相应的pad操作:

# This function convert the PIL images to tensors then pad them
def collate_fn(batch):
    process = transforms.Compose([
                transforms.ToTensor(),
                transforms.Pad([2])]
                )
    # x - images; we process each image in the batch
    x = [process(data[0]) for data in batch]
    x = torch.concat(x).unsqueeze(1)
    # y - labels, note that we should convert the labels to LongTensor
    y = torch.LongTensor([data[1] for data in batch])
    return x, y

3. 实现DataLoader

接着,我们就可以使用以下代码来完成相应的DataLoader的实现:

labels = ["T-shirt/top", "Trouser", "Pullover", "Dress","Coat", 
          "Sandla", "Shirt", "Sneaker", "Bag", "Ankle boot"]

# download/load dataset
train_data = FashionMNIST("./MNIST_DATA", train=True, download=True)
valid_data = FashionMNIST("./MNIST_DATA", train=False, download=True)

# put datasets into dataloaders
train_loader = DataLoader(train_data, batch_size=config["batch_size"], 
                          shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_data, batch_size=config["batch_size"], 
                           shuffle=False, collate_fn=collate_fn)

接着我们可以使用以下代码来检验上述代码是否符合我们的预期,测试代码如下:

print("Inspecting train data: ")
for _, data in enumerate(train_loader):
    print("Batch shape: ", data[0].shape)
    fig, ax = plt.subplots(1, 4, figsize=(10, 4))
    for i in range(4):
        # Ture 3D tensor to 2D tensor due to image's single channel
        ax[i].imshow(data[0][i].squeeze(), cmap="gray")
        ax[i].axis("off")
        ax[i].set_title(labels[data[1][i]])
    plt.show()
    # And don't forget to break
    break

运行结果如下:

一种简单的自编码器PyTorch代码实现_自编码器_02

观察上图,图像和标签一一对应关系正常,接着我们就可以进入我们的网络设计部分。

4. 实现encoder

我们知道自编码器是由编码器encoder和解码器decoder实现的,其中编码器的作用为将输入的图像编码为特征空间的特征向量,解码器的作用相反,尽可能的将上述特征向量结果恢复为原图。基于此,我们首先来一步步实现编码器。首先,我们来定义模型的基本超参数如下:

# Model parameters:
LAYERS = 3
KERNELS = [3, 3, 3]
CHANNELS = [32, 64, 128]
STRIDES = [2, 2, 2]
LINEAR_DIM = 2048

同时相应的编码器的网络结构设计如下:

class Encoder(nn.Module):
    def __init__(self, output_dim=2, use_batchnorm=False, use_dropout=False):
        super(Encoder, self).__init__()
        # bottleneck dimentionality
        self.output_dim = output_dim
        # variables deciding if using dropout and batchnorm in model
        self.use_dropout = use_dropout
        self.use_batchnorm = use_batchnorm
        # convolutional layer hyper parameters
        self.layers = LAYERS
        self.kernels = KERNELS
        self.channels = CHANNELS
        self.strides = STRIDES
        self.conv = self.get_convs()
        # layers for latent space projection
        self.fc_dim = LINEAR_DIM
        self.flatten = nn.Flatten()
        self.linear = nn.Linear(self.fc_dim, self.output_dim)
    def get_convs(self):
        """
        generating convolutional layers based on model's hyper parameters
        """
        conv_layers = nn.Sequential()
        for i in range(self.layers):
            # The input channel of the first layer is 1
            if i == 0: conv_layers.append(nn.Conv2d(1, 
                                              self.channels[i], 
                                              kernel_size=self.kernels[i],
                                              stride=self.strides[i],
                                              padding=1))
            
            else: conv_layers.append(nn.Conv2d(self.channels[i-1], 
                                         self.channels[i],
                                         kernel_size=self.kernels[i],
                                         stride=self.strides[i],
                                         padding=1))
            
            if self.use_batchnorm:
                conv_layers.append(nn.BatchNorm2d(self.channels[i]))
            
            # Here we use GELU as activation function
            conv_layers.append(nn.GELU()) 
            if self.use_dropout:
                conv_layers.append(nn.Dropout2d(0.15))
        return conv_layers
  
    def forward(self, x):
        x = self.conv(x)
        x = self.flatten(x)
        return self.linear(x)

在Pytorch中torchsummary是一个非常方便的工具,用于检查和调试模型的网络结构;我们可以检查层、每层中的张量形状以及模型的参数。代码如下:

from torchsummary import summary
# Get the summary of autoencoder architecture
encoder = Encoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(encoder, (1, 32, 32))
pass

得到输出如下:

一种简单的自编码器PyTorch代码实现_自编码器_03

5. 实现decoder

在我们的例子中,解码器层decoder是编码器的反向操作;确保每一层的输入和输出形状是很重要的。此外,我们应该调整转置卷积层中的paddingoutput_pading参数,以确保输出图像和输入图像的维度相同。代码实现如下:

class Decoder(nn.Module):
    def __init__(self, input_dim=2, use_batchnorm=False, use_dropout=False):
        super(Decoder, self).__init__()
        # variables deciding if using dropout and batchnorm in model
        self.use_dropout = use_dropout
        self.use_batchnorm = use_batchnorm
        self.fc_dim = LINEAR_DIM
        self.input_dim = input_dim
        # Conv layer hypyer parameters
        self.layers = LAYERS
        self.kernels = KERNELS
        self.channels = CHANNELS[::-1] # flip the channel dimensions
        self.strides = STRIDES
        
        # In decoder, we first do fc project, then conv layers
        self.linear = nn.Linear(self.input_dim, self.fc_dim)
        self.conv =  self.get_convs()
        self.output = nn.Conv2d(self.channels[-1], 1, kernel_size=1, stride=1)

    def get_convs(self):
        conv_layers = nn.Sequential()
        for i in range(self.layers):
            if i == 0: conv_layers.append(
                            nn.ConvTranspose2d(self.channels[i],
                                               self.channels[i],
                                               kernel_size=self.kernels[i],
                                               stride=self.strides[i],
                                               padding=1,
                                               output_padding=1)
                            )
            
            else: conv_layers.append(
                            nn.ConvTranspose2d(self.channels[i-1], 
                                               self.channels[i],
                                               kernel_size=self.kernels[i],
                                               stride=self.strides[i],
                                               padding=1,
                                               output_padding=1
                                              )
                            )
            if self.use_batchnorm and i != self.layers - 1:
                conv_layers.append(nn.BatchNorm2d(self.channels[i]))
            conv_layers.append(nn.GELU())
            if self.use_dropout:
                conv_layers.append(nn.Dropout2d(0.15))
        return conv_layers
   
    def forward(self, x):
        x = self.linear(x)
        # reshape 3D tensor to 4D tensor
        x = x.reshape(x.shape[0], 128, 4, 4)
        x = self.conv(x)
        return self.output(x)

相应的解码器实现如下:

decoder = Decoder(use_batchnorm=True, use_dropout=True).to(DEVICE)
summary(decoder, (1, 2))
pass

运行后,结果如下:

一种简单的自编码器PyTorch代码实现_自编码器_04

6. 实现自编码器

接着,我们将上述编码器和解码器串联起来,代码实现如下:

class AutoEncoder(nn.Module):
    
    def __init__(self):
        super(AutoEncoder, self).__init__()
        self.encoder = Encoder(output_dim=2, use_batchnorm=True, use_dropout=False)
        self.decoder = Decoder(input_dim=2, use_batchnorm=True, use_dropout=False)
        
    def forward(self, x):
        return self.decoder(self.encoder(x))

model = AutoEncoder().to(DEVICE)
summary(model, (1, 32, 32))
pass

得到结果如下:

一种简单的自编码器PyTorch代码实现_深度学习_05

7. 可视化函数

在进入训练部分之前,让我们花一些时间编写一个函数来可视化我们模型的潜在特征空间,即编码后二维特征向量的可视化表示。

def plotting(step:int=0, show=False):
    model.eval() # Switch the model to evaluation mode
    points = []
    label_idcs = []
    path = "./ScatterPlots"
    if not os.path.exists(path): os.mkdir(path)
    for i, data in enumerate(valid_loader):
        img, label = [d.to(DEVICE) for d in data]
        # We only need to encode the validation images
        proj = model.encoder(img)
        points.extend(proj.detach().cpu().numpy())
        label_idcs.extend(label.detach().cpu().numpy())
        del img, label
    
    points = np.array(points)
    # Creating a scatter plot
    fig, ax = plt.subplots(figsize=(10, 10) if not show else (8, 8))
    scatter = ax.scatter(x=points[:, 0], y=points[:, 1], s=2.0, 
                c=label_idcs, cmap='tab10', alpha=0.9, zorder=2)
    
    ax.spines["right"].set_visible(False)
    ax.spines["top"].set_visible(False)
    
    if show: 
        ax.grid(True, color="lightgray", alpha=1.0, zorder=0)
        plt.show()
    else: 
        # Do not show but only save the plot in training
        plt.savefig(f"{path}/Step_{step:03d}.png", bbox_inches="tight")
        plt.close() # don't forget to close the plot, or it is always in memory
        model.train()

以下是训练过程中生成的图;该过程显示了模型的潜在空间随时间的分布,可以看出尽管有个别离群点,整体不同类别的数据在特征空间呈现出聚类趋势:

一种简单的自编码器PyTorch代码实现_计算机视觉_06

8. 损失函数

在编写训练和验证函数之前,还有一个步骤是定义目标函数和优化方法。由于自动编码器是一个自监督模型,输入也是网络输出重建图像逼近的对象,因此我们可以使用MSE(均方误差)损失来评估输入和重建图像之间的逐像素损失。当然有很多优化器可供选择,这里我选择的是AdamW,因为我在过去几个月里经常使用它。

criterion = nn.MSELoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=config["lr"], weight_decay=1e-5)

# For mixed precision training
scaler = torch.cuda.amp.GradScaler()
steps = 0 # tracking the training steps

9. 训练函数

接着我们来定义训练一个epoch的函数,代码实现如下:

def train(model, dataloader, criterion, optimizer, save_distrib=False):
    # steps is used to track training progress, purely for latent space plots
    global steps 
    model.train()
    train_loss = 0.0
    
    # Process tqdm bar, helpful for monitoring training process
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True, 
                     leave=False, position=0, desc="Train")
    for i, batch in enumerate(dataloader):
        optimizer.zero_grad()
        x = batch[0].to(DEVICE)
        
        # Here we implement the mixed precision training
        with torch.cuda.amp.autocast():
            y_recons = model(x)
            loss = criterion(y_recons, x)
        
        train_loss += loss.item()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        
        batch_bar.set_postfix(
            loss=f"{train_loss/(i+1):.4f}",
            lr = f"{optimizer.param_groups[0]['lr']:.4f}"
        )
        batch_bar.update()        

        # Saving latent space plots
        if steps % 10 == 0 and save_distrib and steps <= 400: plotting(steps)
        steps += 1        
        
        # remove unnecessary cache in CUDA memory
        torch.cuda.empty_cache()
        del x, y_recons
    
    batch_bar.close()
    train_loss /= len(dataloader)

    return train_loss

10.验证函数

想要的验证函数的实现稍微简单一点,代码如下:

def validate(model, dataloader, criterion):
    model.eval() # Don't forget to turn the model to eval mode
    valid_loss = 0.0
    # Progress tqdm bar
    batch_bar = tqdm(total=len(dataloader), dynamic_ncols=True,
                     leave=False, position=0, desc="Validation")
    
    for i, batch in enumerate(dataloader):
        x = batch[0].to(DEVICE)
        with torch.no_grad(): # we don't need gradients in validation
            y_recons = model(x)
            loss = criterion(y_recons, x)
        valid_loss += loss.item()
        batch_bar.set_postfix(
            loss=f"{valid_loss/(i+1):.4f}",
            lr = f"{optimizer.param_groups[0]['lr']:.4f}"
        )
        batch_bar.update()
        torch.cuda.empty_cache()
        del x, y_recons
    
    batch_bar.close()
    valid_loss /= len(dataloader)
    return valid_loss

11.训练过程

接着,我们将上述代码串起来,来实现我们模型的训练,由于FashionMNIST是一个很小的数据集,我们实际上不需要大量训练;初始训练和验证损失非常低,并且在三个epoch之后没有太大的改进空间。

for i in range(config["epochs"]):

    curr_lr = float(optimizer.param_groups[0]["lr"])
    train_loss = train(model, train_loader, criterion, 
                       optimizer, save_distrib=True)
    valid_loss = validate(model, valid_loader, criterion)

    print(f"Epoch {i+1}/{config['epochs']}\nTrain loss: {train_loss:.4f}\t Validation loss: {valid_loss:.4f}\tlr: {curr_lr:.4f}")

输出如下:

一种简单的自编码器PyTorch代码实现_自编码器_07

12.结果可视化

我们现在可以再次绘制和检查收敛后的特征空间,可视化输出如下:

一种简单的自编码器PyTorch代码实现_自编码器_08

观察上图可知,相应的聚类后的效果比训练过程中的要好,但有些个别类混合在同一集群中。这个问题可以通过增加编码器输出的特征向量的维度或使用其他损失函数函数来解决。

13.预测效果可视化

为了验证我们的解码器确实学到了东西,我们可以在随机绘制一些离散点来观察解码器重建图像的效果,代码如下:

# randomly sample x and y values
xs = [random.uniform(-6.0, 8.0) for i in range(8)]
ys = [random.uniform(-7.5, 10.0) for i in range(8)]

points = list(zip(xs, ys))
coords = torch.tensor(points).unsqueeze(1).to(DEVICE)
nrows, ncols = 2, 4
fig, axes = plt.subplots(nrows, ncols, figsize=(10, 5))
model.eval()
with torch.no_grad():
    generates = [model.decoder(coord) for coord in coords]
# plot points
idx = 0
for row in range(0, nrows):
    for col in range(0, ncols):
        ax = axes[row, col]
        im = generates[idx].squeeze().detach().cpu()
        ax.imshow(im, cmap="gray")
        ax.axis("off")
        coord = coords[idx].detach().cpu().numpy()[0]
        ax.set_title(f"({coord[0]:.3f}, {coord[1]:.3f})")
        idx += 1

plt.show()

代码输出如下:

一种简单的自编码器PyTorch代码实现_计算机视觉_09

14. 总结

本文重点介绍了如何利用Pytorch来实现自编码器,从数据集,到搭建网络结构,以及特征可视化和网络预测输出几个方面,分别进行了详细的阐述,并给出了相应的代码示例。

您学废了吗?

完整代码链接:戳我

标签:layers,loss,编码器,nn,代码,use,PyTorch,model,self
From: https://blog.51cto.com/u_15506603/8946144

相关文章

  • 数字医院HIS系统源代码,采用前后端分离架构,SaaS云部署,支持电子病历四级
    本HIS项目采用前后端分离架构,SaaS云部署模式前端:Angular+Nginx+JavaScript后端:Java+Spring,SpringBoot,SpringMVC等数据库:MySQL+MyCat1、自主研发+应用实例,整合电子病历系统、LIS系统,支持电子病历四级。2、功能:预约挂号、门诊费用、住院管理、药房管理、药库管理、门诊医生站、门诊......
  • VUE框架底层源代码解读------VUE框架
    <!DOCTYPEhtml><htmllang="en"><head><metacharset="UTF-8"><metaname="viewport"content="width=device-width,initial-scale=1.0"><title>Document</title>......
  • Qt代码片段
    DrawText写竖排文字painter.drawText(10,50,30,150,Qt::AlignTop|Qt::TextSingleLine|Qt::TextWordWrap,"测试文字");painter.save();//显将画笔属性进行保存painter.translate(100,100);//将原点移动到要画文字的地方painter.rotate(90);//将画笔翻转90度painter.draw......
  • 如何使用深度学习技术探测代码逻辑死循环 —— 浪潮集团的“公开号CN117271314A”专利
    新闻链接:https://mbd.baidu.com/newspage/data/landingsuper?context={"nid"%3A"news_10054958188888757354"}&n_type=-1&p_from=-1国家专利局查询:https://pss-system.cponline.cnipa.gov.cn/conventionalSearch......
  • 短视频app开发,集群容错策略的代码分析
    短视频app开发,集群容错策略的代码分析1FailoverFailover故障转移策略作为默认策略,当短视频app开发中的消费发生异常时通过负载均衡策略再选择一个生产者节点进行调用,直到达到重试次数。即使业务代码没有显示重试,也有可能多次执行消费逻辑从而造成重复数据:publicclass......
  • 一款基于.NET Core的快速开发框架、支持多种前端UI、内置代码生成器
    前言经常看到有小伙伴在技术群里问有没有什么好用且快速的开发框架推荐的,今天就给大家分享一款基于MITLicense协议开源、免费的.NETCore快速开发框架、支持多种前端UI、内置代码生成器、一款高效开发的利器:WalkingTec.Mvvm框架(简称WTM)。官方项目介绍WalkingTec.Mvvm框架(简称W......
  • 慢调用链诊断利器-ARMS 代码热点
    作者:铖朴、义泊可观测技术背景从最早的Google发表的一篇名为《Dapper,aLarge-ScaleDistributedSystemsTracingInfrastructure》的论文开始,到后来以:Metrics(指标)、Tracing(链路追踪)以及Logging(日志)三大方向互为补充的可观测解决方案逐渐被业界所接受并成为事实标准。基......
  • Github Copilot生成代码和单元测试并执行
    ChatGPTPrompts整理总结 最近一直在学习ChatGPTPrompt的编写技巧,做了一些验证和整理,分享给大家ActasaLinuxTerminal英文PromptIwantyoutoactasalinuxterminal.Iwilltypecommandsandyouwillreplywithwhattheterminalshouldshow.Iwantyouto......
  • 可控核聚变的物联网代码
    可控核聚变的物联网代码涉及到多个领域的知识,包括物理学、工程学、计算机科学等。这里我给出一个简单的Python示例,用于模拟核聚变过程。请注意,这个示例仅用于演示目的,实际的可控核聚变系统需要更复杂的设计和实现。importrandomimporttimeclassNuclearReactor:def__ini......
  • 应用程序内部的代码级别的读写分离CQRS(Command Query Responsibility Segregation)直
    产品代码都给你看了,可别再说不会DDD(十):CQRS  这是一个讲解DDD落地的文章系列,作者是《实现领域驱动设计》的译者滕云。本文章系列以一个真实的并已成功上线的软件项目——码如云(https://www.mryqr.com)为例,系统性地讲解DDD在落地实施过程中的各种典型实践,以及在面临实际业务场......