Pytorch 并行:DistributedDataParallel
一个节点上往往有多个 GPU(单机多卡),一旦有多个 GPU 空闲(当然得赶紧都占着),就需要合理利用多 GPU 资源,这与并行化训练是分不开的。
O、数据并行化
按 《深入浅出Pytorch》的话来说,pytorch模型的并行化,主要分为两类:
- 模型并行:一个 GPU 容纳不了一个模型,需要多个 GPU 分别承载模型的一部分
- 数据并行:将训练数据分配到各个 GPU 上,在不同 GPU 上分别独立地训练相同模型,最终将并行的训练结果归约到一个 GPU 上
Pytorch并行也主要支持后者,即数据并行。一般而言,训练的时候都需要较大 batch size,才能保持训练过程的稳定性,而 batch size 直接与训练所需显存大小挂钩,设置大了,很容易爆显存。而并行化却能很好的解决这个问题,能够显式的提高逻辑 batch size 大小。
Pytorch 较早的并行化,实现在 torch.nn.DataParalell
中,但该实现是基于线程的(只使用了一个线程),由于 CPython 解释器有着全局线程锁(Global Interpreter Lock,GIL)的特性,导致任何时候CPU上的一个进程(对应一个 python 解释器)中,仅能有一个线程能真正运行 python 代码。这就大大限制了并行的实现,很难实现真正的并行,除此之外,DataParalell 还有其他诸多弊端,这里就不再赘述,本文主要阐述单机多卡条件下 DistributedDataParallel 的使用,等“富有”了再研究多机多卡。
一、DistributedDataParallel
1、基本原理
请允许我用几句话说明一下 torch.nn.DistributedDataParallel
的基本原理
- 开启多个进程,每个进程控制一个 GPU
- 每个 GPU 上都存储模型,执行相同的任务(虽然训练数据不同)
- GPU 间只传递梯度(不像 DataParallel 还要传递整个模型参数)
有两种使用办法,一种是在代码内部,手动启动各个 GPU 对应的进程;另一种是用 Pytorch 官方命令 torchrun
启动多线程,这种的话代码内部实现倒是更方便。这里主要对后者的使用方法加以说明。在此之前先阐述一下基本训练代码的结构,一般而言,Pytorch 训练代码的结构如下:
定义模型和训练类
当然这里的 Trainer 类在不复杂的情况下,还是可以简化的,一个 train()
函数就能搞定。
class Model(nn.module):
def __init__(self, **kwargs) ...
def forward(self, **kwargs) ...
class Trainer():
def __init__(self,
model: nn.Module,
optim: torch.optim.Optimizer,
train_dataloader,
test_dataloader,
**kwargs
) ...
def train(self, Epochs, **kwargs) ...
获取参数与实例化 Model、Trainer 类
from torch.optim import Optimizer
from torch.utils.data import Dataset, DataLoader
def parser() ...
def main(args):
device = torch.cuda(f'cuda:{args.rank}')
model = getModel().to(device)
optim = Adam(model.parameters(), lr=1e-5)
trainer_dataloader, test_dataloader = DataLoader(...)
trainer = Trainer(model, optim, trainer_dataloader, test_dataloader)
trainer.train(args.epochs)
if __name__=='__main__':
args = parser()
main(args)
2、torchrun 命令启动方式
torchrun
其实是早期 torch.distributed.launch
启动方式的优化,后者已经被 deprecated
,其启动方式更为繁杂且智能性更低:
python -m torch.distributed.launch --use-env train_script.py
如果想要调用 torchrun
运行,需要在上述代码中添加如下部分:
import torch.distributed as dist
import torch
def main(args)
############################################################################
# 1.获取并行过程 GPU 需要知道的参数,并初始化控制该 GPU 的进程
local_rank = int(os.environ["LOCAL_RANK"])
world_size = int(os.environ["WORLD_SIZE"])
dist.init_process_group(
backend='nccl',
init_method='env://', # 采用环境变量的值来初始化
world_size=world_size, # 总 GPU 数量
rank=local_rank # 本进程对应 GPU 编号
)
##############################################################################
device = torch.cuda(f'cuda:{args.rank}')
model = getModel().to(device)
##############################################################################
# 2.包装 model,pytorch 会自行将模型复制到各个 GPU 上
model = torch.nn.parallel.DistributedDataParallel(model,
device_ids=[local_rank],
find_unused_parameters=True,
output_device=local_rank)
##############################################################################
optim = Adam(model.parameters(), lr=1e-5)
# dataset = Dataset()
# trainer_dataloader, test_dataloader = DataLoader(...)
##############################################################################
# 3.设置并行的数据分发器,使得各 GPU 共享一个 dataloader
train_dataset = MyDataset(args.data_file, args.max_length)
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset,
num_replicas=world_size,
rank=local_rank
)
mini_batch_size=8
# 一般是分布 train,不需要分布 test,所以这里只给出 train_dataloader
train_dataloader = torch.utils.data.DataLoader(
dataset=train_dataset,
batch_size=mini_batch_size,
shuffle=False, # 这里必须设置为 False
num_workers=0,
pin_memory=True,
sampler=train_sampler)
##############################################################################
trainer = Trainer(model, optim, trainer_dataloader, test_dataloader)
trainer.train(args.epochs)
设置了这三部分,就可以通过 torchrun 命令 来运行程序了。单机多卡(一般的情况,即一个计算机上多个GPU)的命令如下:
torchrun \
--nnodes 1\ # 单机,一个 node 节点(计算机数量)
--nproc-per-node 3\ # 多卡, GPU 数量
--train.py --rank 3 --epochs 50 ... # 训练脚本 train.py 的参数附在后面即可
torchrun 命令优点在于进程初始化所需要的变量 local_rank
、world_size
都会自动设置,而如果用 multiprocess 则需要根据 nodes 数、gpu 数手动计算,显然 torchrun 更为官方一点。但另一种方式有助于我们理解并行代码中,诸多变量的含义,还可以设置一些更灵活的变量,具体参见文末的 参考文章(1),或者其中文版本 参考文章(3)。
三、Tricks
这里记录一下未来分布式训练中的一些 trick,待更新......
1、指定并行 GPU 列表
有些时候并不是所有 GPU 都空着的,但上述运行方式会自动从 rank=0
开始依次调用当前节点上的 gpu。一旦有 GPU 并非空闲,很可能会报 out of memory
的错。这个时候就要指定参与并行训练的 GPUs 了。
一个较为方便的方法,是设置环境变量 CUDA_VISIBLE_DEVICES
,Pytorch 根据该变量来判断哪些 GPU 可以用。具体而言,可以在自己程序中设置一个参数:
def parser():
......
parser.add_argument('-v', '--visible', type=str, help='the rank of visible GPUs')
if args.visible is not None:
os.environ['CUDA_VISIBLE_DEVICES'] = args.visible # -v "0,3"
......
运行过程中,虽然 local_rank
依然是自然数顺序,但是 pytorch 会自动将 local_rank
视作可见 GPU 的索引,而不会使用 “非空闲” 的 GPUs。
参考文章
- Distributed data parallel training in Pytorch (yangkky.github.io)
- torchrun (Elastic Launch) — PyTorch 2.0 documentation
- PyTorch分布式训练简明教程(2022更新版) - 知乎 (zhihu.com)
- DataParallel 和 DistributedDataParallel 的区别和使用方法_Golden-sun的博客-CSDN博客