首页 > 其他分享 >Numpy手撸神经网络实现线性回归

Numpy手撸神经网络实现线性回归

时间:2023-10-06 09:33:05浏览次数:51  
标签:__ parameters decay data self 神经网络 线性 Numpy def

Numpy手撸神经网络实现线性回归

简介

在深度学习理论学习之后,我们常常会直接使用深度学习框架(如PaddlePaddle、PyTorch或TensorFlow)来构建模型,而忽略了底层各种层结构的实现。但对于深度学习的学习者来说,是否能够亲手编写一个简单的模型呢?本文将介绍如何使用NumPy手动实现一个神经网络模型来进行线性回归任务。

目标

本文的目标是使用手动实现的神经网络模型来拟合目标曲线,其中目标曲线由函数f(x) = sin(x)生成。

data

拟合结果如下图所示:

拟合结果图

实现思路

在深度学习框架中,数据通常以张量(tensor)的形式进行处理,但为了简化起见,我们将数据的输入和输出都使用NumPy的ndarray格式传递。本节将包含以下主要类的实现:

1. Tensor和初始化

首先,我们需要定义一个名为Tensor的类,用于保存数据和梯度。此类具有data属性用于存储数据和grad属性用于存储梯度。

import numpy as np

class Tensor:
    def __init__(self, shape):
        self.data = np.zeros(shape=shape, dtype=np.float32) # 用于存放数据
        self.grad = np.zeros(shape=shape, dtype=np.float32) # 用于存放梯度

    def clear_grad(self):
        self.grad = np.zeros_like(self.grad)

    def __str__(self):
        return "Tensor shape: {}, data: {}".format(self.data.shape, self.data)

我们还定义了一个初始化器(Initializer)基类,以及两种初始化器:ConstantNormal。这些初始化器用于初始化层的参数。

2. Layer

在深度学习中,层是神经网络的基本组件。我们实现了两种层:全连接层(Linear)和ReLU激活函数(ReLU)。

# 为了使层能够组建起来,实现前向传播和反向传播,首先定义层的基类Layer
# Layer的几个主要方法说明:
#   forward: 实现前向传播
#   backward: 实现反向传播
#   parameters: 返回该层的参数,传入优化器进行优化

class Layer:
    def __init__(self, name='layer', *args, **kwargs):
        self.name = name

    def forward(self, *args, **kwargs):
        raise NotImplementedError

    def backward(self):
        raise NotImplementedError

    def parameters(self):
        return []

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    def __str__(self):
        return self.name


class Linear(Layer):
    """
    input X, shape: [N, C]
    output Y, shape: [N, O]
    weight W, shape: [C, O]
    bias b, shape: [1, O]
    grad dY, shape: [N, O]
    forward formula:
        Y = X @ W + b   # @表示矩阵乘法
    backward formula:
        dW = X.T @ dY
        db = sum(dY, axis=0)
        dX = dY @ W.T
    """
    def __init__(
        self,
        in_features,
        out_features,
        name='linear',
        weight_attr=Normal(),
        bias_attr=Constant(),
        *args,
        **kwargs
        ):
        super().__init__(name=name, *args, **kwargs)
        self.weights = Tensor((in_features, out_features))
        self.weights.data = weight_attr(self.weights.data.shape)
        self.bias = Tensor((1, out_features))
        self.bias.data = bias_attr(self.bias.data.shape)
        self.input = None

    def forward(self, x):
        self.input = x
        output = np.dot(x, self.weights.data) + self.bias.data
        return output

    def backward(self, gradient):
        self.weights.grad += np.dot(self.input.T, gradient)  # dy / dw
        self.bias.grad += np.sum(gradient, axis=0, keepdims=True)  # dy / db 
        input_grad = np.dot(gradient, self.weights.data.T)  # dy / dx
        return input_grad

    def parameters(self):
        return [self.weights, self.bias]

    def __str__(self):
        string = "linear layer, weight shape: {}, bias shape: {}".format(self.weights.data.shape, self.bias.data.shape)
        return string


class ReLU(Layer):
    """
    forward formula:
        relu = x if x >= 0
             = 0 if x < 0
    backwawrd formula:
        grad = gradient * (x > 0)
    """
    def __init__(self, name='relu', *args, **kwargs):
        super().__init__(name=name, *args, **kwargs)
        self.activated = None

    def forward(self, x):
        x[x < 0] = 0             
        self.activated = x
        return self.activated

    def backward(self, gradient):
        return gradient * (self.activated > 0)  

这些层具有前向传播和反向传播的功能,以及参数的存储。

3. 模型组网

在这一部分,我们定义了一个名为Sequential的类,用于将多个层按顺序组成神经网络模型。该类允许我们逐层前向传播和反向传播。

# 模型组网的功能是将层串起来,实现数据的前向传播和梯度的反向传播
# 添加层的时候,按照顺序添加层的参数
# Sequential方法说明:
#   add: 向组网中添加层
#   forward: 按照组网构建的层顺序,依次前向传播
#   backward: 接收损失函数的梯度,按照层的逆序反向传播
class Sequential:
    def __init__(self, *args, **kwargs):
        self.graphs = []
        self._parameters = []
        for arg_layer in args:
            if isinstance(arg_layer, Layer):
                self.graphs.append(arg_layer)
                self._parameters += arg_layer.parameters()

    def add(self, layer):
        assert isinstance(layer, Layer), "The type of added layer must be Layer, but got {}.".format(type(layer))
        self.graphs.append(layer)
        self._parameters += layer.parameters()

    def forward(self, x):
        for graph in self.graphs:
            x = graph(x)
        return x

    def backward(self, grad):
        # grad backward in inverse order of graph
        for graph in self.graphs[::-1]:
            grad = graph.backward(grad)

    def __call__(self, *args, **kwargs):
        return self.forward(*args, **kwargs)

    def __str__(self):
        string = 'Sequential:\n'
        for graph in self.graphs:
            string += graph.__str__() + '\n'
        return string

    def parameters(self):
        return self._parameters

4. 优化器

优化器用于根据梯度来更新模型的参数。我们实现了带有动量的随机梯度下降优化器(SGD)。

# 优化器主要完成根据梯度来优化参数的任务,其主要参数有学习率和正则化类型和正则化系数
# Optimizer主要方法:
#   step: 梯度反向传播后调用,该方法根据计算出的梯度,对参数进行优化
#   clear_grad: 模型调用backward后,梯度会进行累加,如果已经调用step优化过参数,需要将使用过的梯度清空
#   get_decay: 根据不同的正则化方法,计算出正则化惩罚值
class Optimizer:
    """
    optimizer base class.
    Args:
        parameters (Tensor): parameters to be optimized.
        learning_rate (float): learning rate. Default: 0.001.
        weight_decay (float): The decay weight of parameters. Defaylt: 0.0.
        decay_type (str): The type of regularizer. Default: l2.
    """
    def __init__(self, parameters, learning_rate=0.001, weight_decay=0.0, decay_type='l2'):
        assert decay_type in ['l1', 'l2'], "only support decay_type 'l1' and 'l2', but got {}.".format(decay_type)
        self.parameters = parameters
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.decay_type = decay_type
        
    def step(self):
        raise NotImplementedError

    def clear_grad(self):
        for p in self.parameters:
            p.clear_grad()

    def get_decay(self, g):
        if self.decay_type == 'l1':
            return self.weight_decay
        elif self.decay_type == 'l2':
            return self.weight_decay * g

# 基本的梯度下降法为(不带正则化):
# W = W - learn_rate * dW
# 带动量的梯度计算方法(减弱的梯度的随机性):
# dW = (momentum * v) + (1 - momentum) * dW
class SGD(Optimizer):
    def __init__(self, momentum=0.9, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.momentum = momentum
        self.velocity = []
        for p in self.parameters:
            self.velocity.append(np.zeros_like(p.grad))

    def step(self):
        for p, v in zip(self.parameters, self.velocity):
            decay = self.get_decay(p.grad)
            v = self.momentum * v + p.grad + decay # 动量计算
            p.data = p.data - self.learning_rate * v

5. 损失函数

我们定义了均方误差损失函数(MSE),用于衡量模型预测和真实值之间的差异。

# 优化器主要完成根据梯度来优化参数的任务,其主要参数有学习率和正则化类型和正则化系数
# Optimizer主要方法:
#   step: 梯度反向传播后调用,该方法根据计算出的梯度,对参数进行优化
#   clear_grad: 模型调用backward后,梯度会进行累加,如果已经调用step优化过参数,需要将使用过的梯度清空
#   get_decay: 根据不同的正则化方法,计算出正则化惩罚值
class Optimizer:
    """
    optimizer base class.
    Args:
        parameters (Tensor): parameters to be optimized.
        learning_rate (float): learning rate. Default: 0.001.
        weight_decay (float): The decay weight of parameters. Defaylt: 0.0.
        decay_type (str): The type of regularizer. Default: l2.
    """
    def __init__(self, parameters, learning_rate=0.001, weight_decay=0.0, decay_type='l2'):
        assert decay_type in ['l1', 'l2'], "only support decay_type 'l1' and 'l2', but got {}.".format(decay_type)
        self.parameters = parameters
        self.learning_rate = learning_rate
        self.weight_decay = weight_decay
        self.decay_type = decay_type
        
    def step(self):
        raise NotImplementedError

    def clear_grad(self):
        for p in self.parameters:
            p.clear_grad()

    def get_decay(self, g):
        if self.decay_type == 'l1':
            return self.weight_decay
        elif self.decay_type == 'l2':
            return self.weight_decay * g

# 基本的梯度下降法为(不带正则化):
# W = W - learn_rate * dW
# 带动量的梯度计算方法(减弱的梯度的随机性):
# dW = (momentum * v) + (1 - momentum) * dW
class SGD(Optimizer):
    def __init__(self, momentum=0.9, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.momentum = momentum
        self.velocity = []
        for p in self.parameters:
            self.velocity.append(np.zeros_like(p.grad))

    def step(self):
        for p, v in zip(self.parameters, self.velocity):
            decay = self.get_decay(p.grad)
            v = self.momentum * v + p.grad + decay # 动量计算
            p.data = p.data - self.learning_rate * v

6. 数据集和数据加载

我们还实现了DatasetBatchSamplerDataLoader类,用于加载和处理数据。

# 这里仿照PaddlePaddle,Dataset需要实现__getitem__和__len__方法
class Dataset:
    def __init__(self, *args, **kwargs):
        pass

    def __getitem__(self, idx):
        raise NotImplementedError("'{}' not implement in class {}"
                                  .format('__getitem__', self.__class__.__name__))

    def __len__(self):
        raise NotImplementedError("'{}' not implement in class {}"
                                  .format('__len__', self.__class__.__name__))


# 根据dataset和一些设置,生成每个batch在dataset中的索引
class BatchSampler:
    def __init__(self, dataset=None, shuffle=False, batch_size=1, drop_last=False):
        self.batch_size = batch_size
        self.drop_last = drop_last
        self.shuffle = shuffle

        self.num_data = len(dataset)
        if self.drop_last or (self.num_data % batch_size == 0):
            self.num_samples = self.num_data // batch_size
        else:
            self.num_samples = self.num_data // batch_size + 1
        indices = np.arange(self.num_data)
        if shuffle:
            np.random.shuffle(indices)
        if drop_last:
            indices = indices[:self.num_samples * batch_size]
        self.indices = indices

    def __len__(self):
        return self.num_samples

    def __iter__(self):
        batch_indices = []
        for i in range(self.num_samples):
            if (i + 1) * self.batch_size <= self.num_data:
                for idx in range(i * self.batch_size, (i + 1) * self.batch_size):
                    batch_indices.append(self.indices[idx])
                yield batch_indices
                batch_indices = []
            else:
                for idx in range(i * self.batch_size, self.num_data):
                    batch_indices.append(self.indices[idx])
        if not self.drop_last and len(batch_indices) > 0:
            yield batch_indices


# 根据sampler生成的索引,从dataset中取数据,并组合成一个batch
class DataLoader:
    def __init__(self, dataset, sampler=BatchSampler, shuffle=False, batch_size=1, drop_last=False):
        self.dataset = dataset
        self.sampler = sampler(dataset, shuffle, batch_size, drop_last)

    def __len__(self):
        return len(self.sampler)

    def __call__(self):
        self.__iter__()

    def __iter__(self):
        for sample_indices in self.sampler:
            data_list = []
            label_list = []
            for indice in sample_indices:
                data, label = self.dataset[indice]
                data_list.append(data)
                label_list.append(label)
            yield np.stack(data_list, axis=0), np.stack(label_list, axis=0)

线性回归示例

在本节中,我们使用上述定义的类来构建一个简单的神经网络模型,并进行线性回归示例。

1. 提取数据

首先,我们从数据集中提取训练数据,这里使用了一个预先生成的包含目标函数f(x) = sin(x) + 噪声的数据集。

# 提取训练数据
!unzip -oq ~/data/data119921/sin_data.zip

2. 查看数据分布

我们绘制了原始数据的分布图。

import matplotlib.pyplot as plt
%matplotlib inline

x_path = "x.npy"
y_path = "y.npy"

X = np.load(x_path)
Y = np.load(y_path)

plt.scatter(X, Y)

3. 搭建模型,设置超参数

我们定义了一个简单的神经网络模型,包括线性层和ReLU激活函数,并设置了超参数。

# 定义超参数
epoches = 1000
batch_size = 4
learning_rate = 0.01
weight_decay = 0.0
train_number = 100  # 选择的训练数据数量,总共200,这里仅挑选一部分训练,以避免过拟合

# 创建线性回归模型
model = Sequential(
    Linear(1, 16, name='linear1'),
    ReLU(name='relu1'),
    Linear(16, 64, name='linear2'),
    ReLU(name='relu2'),
    Linear(64, 16, name='linear3'),
    Re

LU(name='relu3'),
    Linear(16, 1, name='linear4'),
)
opt = SGD(parameters=model.parameters(), learning_rate=learning_rate, weight_decay=weight_decay, decay_type='l2')
loss_fn = MSE()

print(model)

4. 训练

我们使用训练数据集对模型进行训练。

# 挑选部分数据进行训练,绘制数据分布图
indexes = np.arange(X.shape[0])
train_indexes = np.random.choice(indexes, train_number)
X = X[train_indexes]
Y = Y[train_indexes]
plt.scatter(X, Y)

# 构建数据集和数据加载器,开始训练
train_dataset = LinearDataset(X, Y)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size, drop_last=True)

for epoch in range(1, epoches):
    losses = []
    for x, y in train_dataloader:
        pred = model(x)
        loss = loss_fn(pred, y)
        losses.append(loss)

        grad = loss_fn.backward()
        model.backward(grad)

        opt.step()
        opt.clear_grad()
    print("epoch: {}. loss: {}".format(epoch, np.array(losses).mean()))

5. 验证效果

训练结束后,我们生成一组密集的验证点,绘制曲线以查看模型效果。

# 生成验证点
val_number = 500
X_val = np.linspace(-np.pi, np.pi, val_number).reshape(val_number, 1)
Y_val = np.sin(X_val) * 2
val_dataset = LinearDataset(X_val, Y_val)
val_dataloader = DataLoader(val_dataset, shuffle=False, batch_size=2, drop_last=False)
all_pred = []
for x, y in val_dataloader:
    pred = model(x)
    all_pred.append(pred)
all_pred = np.vstack(all_pred)

# 绘制真实曲线和模型预测曲线
plt.plot(X_val, Y_val, color='green', label='true')
plt.plot(X_val, all_pred, color='red', label='predict')
plt.legend()
plt.show()

# 打印模型权重
for g in model.graphs:
    try:
        print(g.name, "  weights: ", g.weights.data)
        print(g.name, "  bias: ", g.bias.data)
    except:
        # ReLU层没有参数
        pass

希望这篇文章对您有所帮助,让您更好地理解深度学习模型的构建和训练过程。如果您有任何问题或需要进一步的解释,请随时提出。

本文由mdnice多平台发布

标签:__,parameters,decay,data,self,神经网络,线性,Numpy,def
From: https://www.cnblogs.com/haidao09/p/17744254.html

相关文章

  • numpy手搓卷积
    numpy实现卷积1卷积本质设计这样的一个滤波器(filter,也称为kernel),用这个filter,往我们的图片上“盖”,覆盖一块跟filter一样大的区域之后,对应元素相乘,然后求和。计算一个区域之后,就向其他区域挪动,接着计算,直到把原图片的每一个角落都覆盖到了为止。这个过程就是“卷积”。可以......
  • [学习笔记] 线性基
    线性基是向量空间的一组基,通常可以解决有关异或的一些题目。——OIWiki线性基就是从初始集合中选出的一个子集,它满足一些性质,可以处理一些问题(屁话)。性质线性基中每个元素二进制下最高位是不同的。线性基中没有异或和为\(0\)的子集。线性基中任意子集中元素异或和的值......
  • 实验1 线性拟合 冷却
    '''目标:拟合物料冷却规律分类变量:物料规格,冷却方式连续变量:温度,时间其他因素:车间温度现实因素:初始温度,初始时间需求因素:目标温度的时间,目标温度的时长(时间-初始时间),当前时间的温度不加入分类变量则为单个线性模型''''''实验1只有温度和时间每个物料的初始温......
  • 线性混合模型为什么是多元高斯分布
    如何建立高斯模型,可以看图根据每类数据做一个高斯函数,然后做一个混合高斯密度函数。如果提取目标的话得把目标的概率函数提取出来。 ......
  • 线性方程组求解
    下面是运用MATLAB写的一个代码,可用来求解线性方程组。functionx=ch2_gauss(A,b)n1=size(A,1);n2=size(A,2);n3=length(b);if(n1~=n2)  disp("Aisnotasquarenmatrix");  return;endif(n2~=n3)  disp("dimensionofAandbisnotequal");  returnendn=n1;L=z......
  • 附录A NumPy高级应用
    在这篇附录中,我会深入NumPy库的数组计算。这会包括ndarray更内部的细节,和更高级的数组操作和算法。本章包括了一些杂乱的章节,不需要仔细研究。A.1ndarray对象的内部机理NumPy的ndarray提供了一种将同质数据块(可以是连续或跨越)解释为多维数组对象的方式。正如你之前所看到的那......
  • 第04章 NumPy基础:数组和矢量计算
    NumPy(NumericalPython的简称)是Python数值计算最重要的基础包。大多数提供科学计算的包都是用NumPy的数组作为构建基础。NumPy的部分功能如下:ndarray,一个具有矢量算术运算和复杂广播能力的快速且节省空间的多维数组。用于对整组数据进行快速运算的标准数学函数(无需编写循环)。......
  • 【数据结构】线性表
    线性表顺序表链式存储单链表双链表知识目录顺序表概念:用一组地址连续的存储单元依次存储线性表的数据元素,这种存储结构的线性表称为顺序表。特点:逻辑上相邻的数据元素,物理次序也是相邻的。只要确定好了存储线性表的起始位置,线性表中任一数据元素都可以随机存取,所以线性表的顺序存......
  • python numpy 稀疏矩阵与密集矩阵
    在NumPy中,稀疏矩阵和密集矩阵是两种不同的数据表示方式,用于存储矩阵数据。它们之间的主要区别在于存储元素的方式和内存占用。稀疏矩阵(SparseMatrix):区别:存储方式:稀疏矩阵只存储非零元素的位置和数值,而忽略零元素,从而节省内存。内存占用:由于只存储非零元素,稀疏矩阵在处理大规模......
  • AttributeError: module 'numpy' has no attribute 'int'.
    AttributeError:module'numpy'hasnoattribute'int'.numpy                    1.24.1                  pypi_0   pypiscikit-learn             1.2.2                   pypi_0   p......