目录
Pytorch分布式并行训练原理
PyTorch分布式训练采用数据并行架构。
分布式数据并行原理
- 每个rank都有一份模型的拷贝(一个rank就是进程),不同rank可以相同机器的不同GPU卡上,也可以分布在不同的机器上;
- 将数据的不同Batch的数据传给不同的rank;
- 每个rank执行forward pass;
- 每个rank执行backward pass,因为模型输入的数据不同,所以不同rank上的梯度不同;
- 通过
allreduce
算法同步梯度,最终每个rank的梯度保持一致;
PyTorch分布式数据并行Demo
分布式数据并行训练Demo:
def ddp_demo(rank, world_size, accum_grad=4):
assert dist.is_gloo_available(), "Gloo is not available!"
print(f"world_size: {world_size}, rank: {rank}, is_gloo_available: {dist.is_gloo_available()}")
# 1. 初始化进程组
dist.init_process_group("gloo", world_size=world_size, rank=rank)
model = nn.Sequential(nn.Linear(10, 100), nn.ReLU(), nn.Linear(100, 20))
# 2. 分布式数据并行封装模型
ddp_model = DistributedDataParallel(model)
criterion = nn.MSELoss()
optimizer = optim.SGD(ddp_model.parameters(), lr=1e-3)
dataset = TensorDataset(torch.randn(1000, 10))
# 3. 数据并行
sampler = DistributedSampler(dataset=dataset, num_replicas=world_size, shuffle=True)
dataloader = DataLoader(dataset=dataset, batch_size=24, sampler=sampler, collate_fn=transform)
for epoch in range(1):
for step, batch in enumerate(dataloader):
output = ddp_model(batch)
label = torch.rand_like(output)
if step % accum_grad == 0:
# 同步参数
context = contextlib.nullcontext
else:
# 4. 梯度累计,不同步参数
context = ddp_model.no_sync
with context():
time.sleep(random.random())
loss = criterion(output, label)
loss.backward()
if step % accum_grad == 0:
optimizer.step()
optimizer.zero_grad()
print(f"epoch: {epoch}, step: {step}, rank: {rank} update parameters.")
# 5. 销毁进程组上下文数据(一些全局变量)
dist.destroy_process_group()
本地没有Nvidia环境,使用
gloo
替代nccl
做通信后端。
PyTorch分布式训练Demo链接:https://gist.github.com/hotbaby/15950bbb43d052cd835b0f18c997f67c
模型转换成分布式训练步骤:
- 初始化进程组
dist.init_process_group("gloo", world_size=world_size, rank=rank)
; - 分布式数据并行封装模型
DistributedDataParallel(model)
; - 数据分布式并行,将数据分成
world_size
份,根据rank
采样DistributedSampler(dataset=dataset, num_replicas=world_size, shuffle=True)
; - 训练过程中梯度累计,降低训练进程间的参数同步频率,提升通信效率【可选】;
- 销毁进程组
dist.destroy_process_group()
。
PyTorch分布式数据并行内部设计
一个典型神经网络训练流程:
- 创建神经网络模型、定义目标函数和优化器;
- 迭代输入数据;
- 前向传播,计算loss;
- 反向传播,计算梯度;
- 根据梯度和学习率,更新参数
weight = weight - learning_rate * weight
; - 重复2 ~ 5步骤。
分布式数据并行DDP通过嵌入到上述训练流程的不同阶段,实现训练加速:
- 初始化
- 进程组
ProcessGroup
,解决rank间通信问题;
- 进程组
- 构建神经模型
- rank0参数广播给其他rank,保证所有模型副本的参数一致;
- 初始化
Reducer
,用于后续反向传播阶段梯度累计和同步;
- 前向传播
require_forward_param_sync
为True时,同步参数;
- 反向传播
- DDP通过注册
autograd hook
注册梯度同步函数,调用Reducer
的allreduce
算法在rank间同步梯度;
- DDP通过注册
- 更新参数
PyTorch分布式数据并行源代码分析
图片来源:https://pytorch.org/docs/master/notes/ddp.html
distributed.py
是分布式数据并行DDP Python程序入口,DistributedDataParallel
封装了神经网络模型,重写了Module的forward()
函数,调用底层C++库common.h
接口同步模型参数,调用C++库reducer.h
的allreduce
实现梯度同步;common.h
实现了broadcast_coalesced
广播等工具函数;reducer.h
Reducer
构造函数被distributed.py
调用,注册了autograd_hook()
函数实现梯度累计;autograd_hook()
被PyTorch的自动微分引擎autograd engine
调用;prepare_for_backward()
被distributed.py
的前向传播阶段调用;
ProcessGroup.hpp
抽象了allreducer()
、gather()
、scatter()
、broadcast()
等接口,NCCL
、Gloo
等Backend实现了ProcessGroup.hpp
定义的接口。
Ring AllReduce算法
为什么需要Ring AllReducer算法?
假设模型大小为128MB,网络带宽为1000Mb/s,8张GPU卡并行计算。一个训练step,rank0将参数同步到其他rank的时间是8秒,随着GPU数量的增长,参数同步的时间也线性增长,最终网络通信的时间抵消了GPU多卡训练时间,多卡训练加速不明显。
ring allreduce算法原理:
参考https://zhuanlan.zhihu.com/p/69797852
ring allreduce算法网络通信代价分析:
假设模型大小为M,GPU数量为N,在scatter-reducer
和all-gather
阶段,发送的数据大小为M/N
,数据传输总量为2(N - 1)* M / N
,数据通信总量是确定,不会随着GPU数量增长而增长。
参考文献
- PyTorch分布式数据并行官方文档【重要】
- PyTorch分布式数据并行概览
- PyTorch分布式数据并行后端
- PyTorch分布式集合通信
- Python实现ring allreduce算法
- 分布式训练allreduce算法
- ring allredue算法
- https://www.cnblogs.com/rossiXYZ/p/15172816.html
- https://www.autodl.com/docs/distributed_training/
- https://pytorch.org/tutorials/beginner/blitz/neural_networks_tutorial.html
- https://dev-discuss.pytorch.org/t/torchdynamo-update-9-making-ddp-work-with-torchdynamo/860