《联邦学习实战》第3章阅读笔记
第3章 用Python从零实现横向联邦图像分类
1. 环境配置
1.1 Windows如何安装miniconda
1.2 如何安装深度学习框架
a. 首先创建一个新的 Conda 环境,为了隔离不同的项目及其依赖,建议创建一个新的 Conda 环境。你可以使用以下命令创建一个名为 myenv 的环境(你也可以自己取一个更有意义的名字):
# 创建虚拟环境myenv并指定Python版本为3.10
# 这样就不用单独下载python了
conda create --name myenv python=3.10
b. 激活 Conda 环境:创建完成后,使用以下命令激活环境:
conda activate myenv
c. 安装pytorch
前往pytorch官网查看适合自己的pytorch版本,如下图所示
然后复制指令安装,如图中所示
pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
安装时间可能会比较久,因为我们是从官网下载,读者也可以检索Windows下载pytorch很慢怎么办,进行提速
d. 检测是否安装成功
如果能成功输出torch版本号,就表示安装成功
import torch
print(torch.__version__)
2. PyTorch基础操作
笔者虚拟环境如下,代码和书中版本可能略有不同,但皆可正常运行
python 3.10.13
torch 2.1.1+cu118
torchaudio 2.1.1+cu118
torchvision 0.16.1+cu118
笔者使用的开发环境是jupyter notebook,推荐大家使用jupyter notebook或者jupyter lab进行深度学习和机器学习的开发。
Jupyter Notebook 是一种交互式的开发环境,它允许你在浏览器中编写和运行代码。
2.1 创建Tensor
import torch
torch.IntTensor(2, 3) # 生成整型张量,大小为(2,3)
torch.FloatTensor(2, 3) # 生成浮点型张量,大小为(2,3)
torch.empty(2, 3) # 生成空的张量,大小为(2,3)
torch.rand(2, 3) # 生成均匀分布的随机张量,大小为(2,3)
torch.randn(2, 3) # 生成标准正态分布的随机张量,大小为(2,3)
mean = 2
stddev = 3
size = (2, 3)
# 使用位置参数传递均值、标准差和大小
normal_tensor = torch.normal(mean, stddev, size) # 生成均值为2,标准差为3,大小为(2,3)的正态分布
torch.full((2, 3), 7) #生成大小为(2,3)的张量,全部值填充为7
torch.ones(2, 3) #生成大小为(2,3)的张量,全部值置为1
torch.zeros(2, 3) #生成大小为(2,3)的张量,全部值置为0
2.2 Tensor与Python数据结构转换
import numpy as np
arr = np.array([1, 2, 3, 4, 5]) # np.array将传入的参数转换为numpy数组类型,即ndarray
a1 = torch.tensor(arr) # tensor([1, 2, 3, 4, 5], dtype=torch.int32)
a2 = torch.as_tensor(arr) # tensor([1, 2, 3, 4, 5], dtype=torch.int32)
a3 = torch.from_numpy(arr) # tensor([1, 2, 3, 4, 5], dtype=torch.int32)
# 需要注意的是,as_tensor和from_numpy会复用原数据的内存空间
# 也就是说,原数据或者Tensor的任意一方改变,都会导致另一方的数据改变。
arr[0] = 10
print(a1) # tensor([1, 2, 3, 4, 5], dtype=torch.int32)
print(a2) # tensor([10, 2, 3, 4, 5], dtype=torch.int32)
print(a3) # tensor([10, 2, 3, 4, 5], dtype=torch.int32)
2.3 数据操作
print(a1) # tensor([1, 2, 3, 4, 5], dtype=torch.int32)
print(a2) # tensor([10, 2, 3, 4, 5], dtype=torch.int32)
a1 + a2 # tensor([11, 4, 6, 8, 10], dtype=torch.int32)
torch.add(a1, a2) # tensor([11, 4, 6, 8, 10], dtype=torch.int32)
x = torch.tensor([1, 2, 3])
y = torch.tensor([4, 5, 6])
x.add_(y) # add_为就地加法,即直接加载x张量上,执行操作以后x为tensor([5, 7, 9])
x=torch.tensor([1,2,3])
x.device # 初始时,张量x的数据默认存放在CPU上
if torch.cuda.is_available():
x = x.cuda()
x.device # 如果系统有GPU设备,张量x的数据将存放在GPU上
2.4 自动求导
自动求导功能是PyTorch进行模型训练的核心模块,PyTorch的自动求导功能通过autograd包实现。
autoqrad包求导时,首先要求Tensor将requires grad属性设置为True。
随后,PyTorch将自动跟踪该Tensor的所有操作。
当调用backward()进行反向计算时,将自动计算梯度值并保存在grad属性中。
import torch
x = torch.ones(2,2,requires_grad=True)
y = x + 2
z = y * y * 3
out = z.mean()
out.backward()
x.grad #求取x的梯度值为(4.5,4.5,4.5,4.5)
3. 用python实现横向联邦学习图像分类
本节我们使用Python从零开始实现一个简单的横向联邦学习模型。具体来说,我们将用横向联邦来实现对cifar10图像数据集的分类,模型使用的是ResNet-18。
我们将分别从服务端、客户端和配置文件三个角度详细讲解设计一个横向联邦所需要的基本操作。
需要注意的是,为了方便实现,本章没有采用网络通信的方式来模拟客户端和服务端的通信,而是在本地以循环的方式来模拟。
3.1 配置信息
联邦学习在模型训练之前,会将配置信息分别发送到服务端和客户端中保存,如果配 置信息发生改变,也会同时对所有参与方进行同步,以保证各参与方的配置信息一致。
# 导包
from torchvision import transforms, models, datasets
import random
conf = {
"model_name": "resnet18", # 模型名称
"no_models": 10, # 客户端数量
"type": "cifar", # 数据集名称
"global_epochs": 20, # 全局epoch
"local_epochs": 3, # 局部训练迭代次数
"k": 6, # 随机从所有客户端中选取k个做训练
"batch_size": 32, # 局部训练批处理大小
"lr": 0.001, # 学习率
"momentum": 0.0001, # 优化器动量,加速优化
"lambda":0.1 # 在聚合过程中使用的系数,用于控制从客户端收集的更新对全局模型的影响程度
}
3.2 训练数据集
我们使用torchvision的 datasets模块内置的cifar10数据集
def get_dataset(dir, name):
# 根据指定的数据集名称,加载并返回训练和评估数据集
if name == 'mnist':
# 加载MNIST数据集
train_dataset = datasets.MNIST(dir, train=True, download=True, transform=transforms.ToTensor())
eval_dataset = datasets.MNIST(dir, train=False, transform=transforms.ToTensor())
elif name == 'cifar':
# 定义CIFAR10训练数据的预处理步骤
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4), # 随机裁剪
transforms.RandomHorizontalFlip(), # 随机水平翻转
transforms.ToTensor(), # 转换为Tensor
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.3023, 0.1994, 0.2010)), # 归一化
])
# 定义CIFAR10测试数据的预处理步骤
transform_test = transforms.Compose([
transforms.ToTensor(), # 转换为Tensor
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.3023, 0.1994, 0.2010)), # 归一化
])
# 加载CIFAR10数据集
train_dataset = datasets.CIFAR10(dir, train=True, download=True, transform=transform_train)
eval_dataset = datasets.CIFAR10(dir, train=False, transform=transform_test)
return train_dataset, eval_dataset
3.3 服务端
横向联邦学习的服务端的主要功能是将被选择的客户端上传的本地模型进行模型聚合。但这里需要特别注意的是,事实上,对于一个功能完善的联邦学习框架,服务端的功能要复杂得多,比如服务端需要对各个客户端节点进行网络监控、对失败节点发出重连信号等。
本章由于是模拟,不涉及网络通信细节和失败故障等处理,因此不讨论这些功能细节,仅涉及模型聚合功能。
class Server(object):
def __init__(self, conf, eval_dataset):
# 初始化服务器配置和全局模型
self.conf = conf
self.global_model = models.get_model(self.conf["model_name"])
if torch.cuda.is_available():
self.global_model = self.global_model.cuda() # 如果GPU可用,将模型转移到GPU
# 创建用于模型评估的数据加载器
self.eval_loader = torch.utils.data.DataLoader(eval_dataset, batch_size=self.conf["batch_size"], shuffle=True)
def model_aggregate(self, gradient_accumulator):
# 聚合从客户端收集的梯度
for name, data in self.global_model.state_dict().items():
update_per_layer = gradient_accumulator[name] * self.conf["lambda"]
data.add_(update_per_layer.to(data.device)) # 确保梯度更新操作在正确的设备上执行
def model_eval(self):
# 评估全局模型的性能
self.global_model.eval() # 设置模型为评估模式
total_loss = 0.0
correct = 0
dataset_size = 0
for batch_id, batch in enumerate(self.eval_loader):
data, target = batch
dataset_size += data.size()[0]
if torch.cuda.is_available():
data = data.cuda() # 将数据和标签转移到GPU
target = target.cuda()
output = self.global_model(data)
total_loss += torch.nn.functional.cross_entropy(output, target, reduction='sum').item()
pred = output.data.max(1)[1] # 获取预测结果
correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()
acc = 100.0 * (float(correct) / float(dataset_size))
total_loss /= dataset_size
return acc, total_loss
3.4 客户端
横向联邦学习的客户端主要功能是接收服务端的下发指令和全局模型,并利用本地数据进行局部模型训练。
class Client:
def __init__(self, conf, model, train_dataset, id=1):
# 初始化客户端对象的属性
self.conf = conf # 存储客户端配置信息的字典
self.local_model = models.get_model(self.conf["model_name"]) # 获取客户端本地模型
if torch.cuda.is_available(): # 检查CUDA是否可用
self.local_model = self.local_model.cuda() # 如果CUDA可用,则将模型移动到GPU
self.client_id = id # 客户端ID
self.train_dataset = train_dataset # 用于本地训练的数据集
all_range = list(range(len(self.train_dataset))) # 获取数据集的索引范围
data_len = int(len(self.train_dataset) / self.conf["no_models"]) # 计算每个模型的数据集长度
indices = all_range[id * data_len: (id + 1) * data_len] # 计算当前客户端的数据集索引
# 创建数据加载器,使用随机子集采样器从数据集中加载数据
self.train_loader = torch.utils.data.DataLoader(self.train_dataset,
batch_size=conf["batch_size"],
sampler=torch.utils.data.sampler.SubsetRandomSampler(indices))
def local_train(self, model):
# 在本地训练客户端模型
# 首先通过克隆参数张量来避免参数共享问题
# 在这段代码中,首先调用了 clone() 方法来创建参数张量的一个克隆。
# 这是因为 PyTorch 中的张量是可变对象,直接赋值或复制它们只会创建一个引用,而不会创建新的张量。
# 这样做的问题是,如果不使用克隆,那么这两个模型的参数张量将共享内存,这意味着如果其中一个对象的参数值发生改变,另一个对象的参数值也会相应地改变。
for name, param in model.state_dict().items():
self.local_model.state_dict()[name].copy_(param.clone())
# 设置优化器,这里使用随机梯度下降(SGD)
optimizer = torch.optim.SGD(self.local_model.parameters(), lr=self.conf["lr"], momentum=self.conf["momentum"])
# 将本地模型设置为训练模式
self.local_model.train()
# 循环执行本地训练的epoch数
for e in range(self.conf["local_epochs"]):
# 遍历训练数据加载器中的每个batch
for batch_id, batch in enumerate(self.train_loader):
data, target = batch
if torch.cuda.is_available():
data = data.cuda()
target = target.cuda()
# 梯度清零
optimizer.zero_grad()
# 前向传播计算模型输出
output = self.local_model(data)
# 计算损失函数值
loss = torch.nn.functional.cross_entropy(output, target)
# 反向传播计算梯度
loss.backward()
# 根据梯度更新模型参数
optimizer.step()
# 输出当前epoch完成的信息
print("Client %d, Epoch %d done." %(self.client_id, e))
# 计算本地模型参数与全局模型参数的差异
diff = dict()
for name, data in self.local_model.state_dict().items():
diff[name] = (data - model.state_dict()[name]) # 计算差异
return diff
3.5 模型训练
train_datasets, eval_datasets = get_dataset("./data/", conf["type"]) # 加载数据集
server = Server(conf, eval_datasets) # 创建服务端
clients = [] # 用于存储创建的客户端
# 创建多个客户端
for c in range(conf["no_models"]):
clients.append(Client(conf, server.global_model, train_datasets, c))
# 在训练循环开始前初始化记录列表
accuracies = [] # 存储每个 epoch 的准确率
losses = [] # 存储每个 epoch 的损失函数值
for e in range(conf["global_epochs"]): # 遍历全局训练轮数
# 随机选择 k 个客户端参与联邦训练
candidates = random.sample(clients, conf["k"])
gradient_accumulator = {} # 初始化梯度累加器,用于存储梯度的累加结果
for name, params in server.global_model.state_dict().items():
gradient_accumulator[name] = torch.zeros_like(params) # 初始化梯度累加器为与模型参数同形状的零张量
# 对每个选定的客户端执行本地训练,并将梯度累加到梯度累加器中
for c in candidates:
diff = c.local_train(server.global_model) # 调用客户端对象的本地训练方法,返回参数梯度差异
for name, params in server.global_model.state_dict().items():
gradient_accumulator[name].add_(diff[name]) # 将每个客户端的参数梯度差异累加到梯度累加器中
server.model_aggregate(gradient_accumulator) # 聚合客户端的参数梯度
acc, loss = server.model_eval() # 在服务器端评估模型性能,返回准确率和损失
accuracies.append(acc) # 将准确率记录到列表中
losses.append(loss) # 将损失函数值记录到列表中
print("Epoch %d, acc: %f, loss: %f\n" %(e, acc, loss)) # 打印当前 epoch 的准确率和损失函数值
3.6 联邦训练效果
【注】由于笔者是在自己电脑上训练,因此简化了训练过程,模型未能达到理想精度。此处仅做一个演示。
如果读者想要达到良好的训练效果,可以修改conf配置表,增加全局和局部的epoch数量,甚至将k修改为10,让所有客户端参与训练。
import matplotlib.pyplot as plt
size = (10, 5)
# 绘制损失和准确率图
plt.figure(figsize=size)
# 绘制准确率曲线
plt.plot(accuracies, label='Accuracy', color='blue')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
# 绘制损失曲线
plt.plot(losses, label='Loss', color='red')
plt.ylabel('Loss')
plt.title('Accuracy and Loss vs. Epoch')
# 设置 x 轴的刻度为 [0, 1, 2, 3, ..., 20]
plt.xticks(range(21))
plt.legend()
plt.show()
标签:实战,self,torch,笔记,conf,联邦,model,data,客户端 From: https://blog.csdn.net/qq1161964247/article/details/137042896《联邦学习实战》杨强等著