我有一个经过训练的模型和 4 个 GPU。我有一个数据集,我想将其分割为 4 个(并在每个 GPU 上使用相同的批量大小进行处理),并且基本上将我从每个 GPU 获得的结果相加。一般来说,我对并行处理很陌生,我对 torch.multiprocessing 等的谷歌搜索没有帮助。
我的问题的一个最小示例如下: 我尝试使用 $python 和 $torchrun 运行以下脚本。
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torch.distributed as dist
import torch.multiprocessing as mp
net = nn.Sequential(nn.Linear(1, 10), nn.Tanh(), nn.Linear(10,1), nn.Flatten(0)) # some model
x = torch.linspace(0, 1, 100).unsqueeze(1) # the full dataset
s0 = net(x).sum() # some action of the model on the full data
# I want to preform the same action on the data in parallel and get the same result
def run_inference(rank, world_size):
# create default process group
dist.init_process_group("gloo", rank=rank, world_size=world_size)
net.to(rank)
x = torch.linspace(0, 1, 100).unsqueeze(1) # re-define data here? (Based on google results)
data = TensorDataset(x)
world_size = torch.cuda.device_count()
batch_size = 10
data_loader = DataLoader(data, batch_size=batch_size, drop_last=False, num_workers=world_size)
sum_i = 0
for idx, data in enumerate(data_loader):
sum_i += net(data[0]).sum() # how to store and add results from different GPUs?
world_size = torch.cuda.device_count() # 4 in my case
mp.spawn(run_inference,
args=(world_size,),
nprocs=world_size,
join=True)
以下是使用 PyTorch 在 4 个 GPU 上并行进行推理并对结果求和的细分方法:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
import torch.distributed as dist
import torch.multiprocessing as mp
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.linear1 = nn.Linear(1, 10)
self.tanh = nn.Tanh()
self.linear2 = nn.Linear(10, 1)
def forward(self, x):
x = self.linear1(x)
x = self.tanh(x)
x = self.linear2(x)
return x.squeeze()
def run_inference(rank, world_size):
# 初始化进程组
dist.init_process_group("nccl", rank=rank, world_size=world_size)
# 设置设备
torch.cuda.set_device(rank)
# 模型和数据
net = Net().to(rank)
net = nn.parallel.DistributedDataParallel(net, device_ids=[rank])
x = torch.linspace(0, 1, 100).unsqueeze(1).to(rank)
dataset = TensorDataset(x)
# 创建分布式采样器
sampler = torch.utils.data.distributed.DistributedSampler(
dataset,
num_replicas=world_size,
rank=rank,
shuffle=False
)
data_loader = DataLoader(
dataset,
batch_size=10,
shuffle=False,
num_workers=4,
pin_memory=True,
sampler=sampler
)
# 推理循环
sum_i = torch.tensor(0.0).to(rank)
for data in data_loader:
input_data = data[0]
output = net(input_data)
sum_i += output.sum()
# 将所有 GPU 上的总和相加
dist.all_reduce(sum_i, op=dist.ReduceOp.SUM)
# 在主进程上打印结果
if rank == 0:
print(f"总和: {sum_i.item()}")
if __name__ == '__main__':
world_size = torch.cuda.device_count()
mp.spawn(run_inference, args=(world_size,), nprocs=world_size, join=True)
说明:
-
导入必要的库: 我们需要
torch
、torch.nn
、torch.utils.data
、torch.distributed
和torch.multiprocessing
。 -
定义模型: 我们将相同的模型架构 (
net
) 定义为之前。 -
run_inference
函数: -
此函数现在接受
rank
(进程的 ID)和world_size
(进程总数)作为参数。 -
初始化进程组:
使用
nccl
后端初始化dist.init_process_group
,该后端通常适用于多 GPU 通信。 -
设置设备:
torch.cuda.set_device(rank)
确保每个进程在其指定的 GPU 上运行。 -
将模型移至设备:
将
net
实例移至当前设备。 -
创建分布式数据加载器:
- 我们将数据划分为多个工作进程。
-
使用
DistributedSampler
确保每个进程获得唯一的数据子集。
-
执行推理:
对每个批次运行模型,并累积输出的总和(
sum_i
)。 -
使用
dist.all_reduce
减少总和: 此函数将所有进程上的sum_i
值相加,并将结果存储在所有进程的sum_i
中。 -
打印结果: 在
rank 0
进程(主进程)上打印最终的总和。 -
if __name__ == '__main__':
块: -
获取可用的 GPU 数量 (
world_size
)。 -
使用
mp.spawn
函数生成多个进程,每个进程运行run_inference
函数。
如何运行:
- 确保有 4 个可用的 GPU。
-
使用
python -m torch.distributed.launch --nproc_per_node=4 your_script.py
命令运行脚本,其中your_script.py
是的 Python 文件名。
此脚本将在 4 个 GPU 上并行运行推理,并将结果聚合以计算最终的总和。 每个 GPU 处理数据的不同部分,并且
dist.all_reduce
确保最终结果是所有部分结果的总和。