首页 > 其他分享 >NCCL接口测试

NCCL接口测试

时间:2023-08-16 18:44:15浏览次数:40  
标签:NCCL rank tensor torch distributed 接口 cuda 测试 device

准备知识

PyTorch分布式通信的程序包相关的API。

  • torch.distributed.init_process_group() ,初始化进程组。
  • torch.distributed.get_rank(),可以获得当前进程的rankrank % torch.torch.cuda.device_count() 可以得到当前节点的ranklocal_rank
  • torch.distributed.get_world_size(),可以获得进程组的进程数量。
  • torch.distributed.barrier(),同步进程组内的所有进程,阻塞所有进程直到所有进程都执行到操作。

调用集合通信API前,必须先初始化进程组torch.distributed.init_process_group("nccl")

如何运行多机多卡训练程序?

deepspeed、torch实现了命令行launcher程序deepspeedtorchrun

launcher程序里,会创建RANK、WORLD_SIZE等环境变量。

以deepspeed为例:

deepspeed --num_gpus NUM_GPUS --num_nodes NUM_NODES --hostfile HOSTFILE program.py

多机训练时必须要配置NCCL_SOCKET_IFNAME环境变量。

集合通信操作

NCCL支持集合通信操作(Collective Operations):

  • AllReduce,进程组内所有进程进行规约操作,最终所有进程得到统一的Tensor。
  • ReduceScatter,进程组内所有进程先进行reduce操作,再进行scatter操作,每个进程得到Tensor的一部分。
  • AllGather,进程组内所有进程的Tensor聚合成一个Tensor列表,并且最终所有进程都有一个Tensor列表副本。
  • Broadcast,进程组内的一个进程将Tensor广播给其他进程。

以下Python相关代码保存在github gist上。

AllReduce

PyTorch torch.distributed.all_reduce()接口:

输入和输出都一个Tensor,而且接口是in-place操作,即直接修改原始Tensor的值,而不是新创建一个Tensor。

def dist_allreduce():
    print_rank_0("all_reduce:")
    torch.distributed.barrier()

    rank = torch.distributed.get_rank()
    world_size = torch.distributed.get_world_size()

    tensor = torch.tensor(rank)
    input_tensor = tensor.clone()
    torch.distributed.all_reduce(tensor)

    logging.info(f"all_reduce, rank: {rank}, before allreduce tensor: {repr(input_tensor)}, after allreduce tensor: {repr(tensor)}")
    torch.distributed.barrier()

日志:

INFO:root:all_reduce:
INFO:root:all_reduce, rank: 0, before allreduce tensor: tensor(0, device='cuda:0'), after allreduce tensor: tensor(6, device='cuda:0')
INFO:root:all_reduce, rank: 3, before allreduce tensor: tensor(3, device='cuda:3'), after allreduce tensor: tensor(6, device='cuda:3')
INFO:root:all_reduce, rank: 1, before allreduce tensor: tensor(1, device='cuda:1'), after allreduce tensor: tensor(6, device='cuda:1')
INFO:root:all_reduce, rank: 2, before allreduce tensor: tensor(2, device='cuda:2'), after allreduce tensor: tensor(6, device='cuda:2')

ReduceScatter

PyTorch torch.distributed.reduce_scatter()接口:

输入:Tensor列表,列表的长度等于world_size

输出:Tensor

def dist_reducescatter():
    print_rank_0("reduce_scatter:")
    torch.distributed.barrier()

    rank = torch.distributed.get_rank()
    world_size = torch.distributed.get_world_size()

    output = torch.empty(1, dtype=torch.int64)
    input_list = [torch.tensor(rank) for i in range(world_size)]
    torch.distributed.reduce_scatter(output, input_list, op=ReduceOp.SUM)
    torch.distributed.barrier()
    logging.info(f"reduce_scatter, rank: {rank}, input_list: {input_list}, tensor: {repr(output)}")
    torch.distributed.barrier()

日志:

INFO:root:reduce_scatter:
INFO:root:reduce_scatter, rank: 0, input_list: [tensor(0, device='cuda:0'), tensor(0, device='cuda:0'), tensor(0, device='cuda:0'), tensor(0, device='cuda:0')], tensor: tensor([6], device='cuda:0')
INFO:root:reduce_scatter, rank: 3, input_list: [tensor(3, device='cuda:3'), tensor(3, device='cuda:3'), tensor(3, device='cuda:3'), tensor(3, device='cuda:3')], tensor: tensor([6], device='cuda:3')
INFO:root:reduce_scatter, rank: 1, input_list: [tensor(1, device='cuda:1'), tensor(1, device='cuda:1'), tensor(1, device='cuda:1'), tensor(1, device='cuda:1')], tensor: tensor([6], device='cuda:1')
INFO:root:reduce_scatter, rank: 2, input_list: [tensor(2, device='cuda:2'), tensor(2, device='cuda:2'), tensor(2, device='cuda:2'), tensor(2, device='cuda:2')], tensor: tensor([6], device='cuda:2')

AllGather

PyTorch torch.distributed.all_gather()接口

输入:Tensor

输出:Tensor列表,列表的长度等于world_size

def dist_allgather():
    print_rank_0("allgather:")
    torch.distributed.barrier()

    rank = torch.distributed.get_rank()
    world_size = torch.distributed.get_world_size()

    input_tensor = torch.tensor(rank)
    tensor_list = [torch.zeros(1, dtype=torch.int64) for _ in range(world_size)]
    torch.distributed.all_gather(tensor_list, input_tensor)
    logging.info(f"allgather, rank: {rank}, input_tensor: {repr(input_tensor)}, output tensor_list: {tensor_list}")
    torch.distributed.barrier()

日志:

INFO:root:allgather:
INFO:root:allgather, rank: 3, input_tensor: tensor(3, device='cuda:3'), output tensor_list: [tensor([0], device='cuda:3'), tensor([1], device='cuda:3'), tensor([2], device='cuda:3'), tensor([3], device='cuda:3')]
INFO:root:allgather, rank: 2, input_tensor: tensor(2, device='cuda:2'), output tensor_list: [tensor([0], device='cuda:2'), tensor([1], device='cuda:2'), tensor([2], device='cuda:2'), tensor([3], device='cuda:2')]
INFO:root:allgather, rank: 1, input_tensor: tensor(1, device='cuda:1'), output tensor_list: [tensor([0], device='cuda:1'), tensor([1], device='cuda:1'), tensor([2], device='cuda:1'), tensor([3], device='cuda:1')]
INFO:root:allgather, rank: 0, input_tensor: tensor(0, device='cuda:0'), output tensor_list: [tensor([0], device='cuda:0'), tensor([1], device='cuda:0'), tensor([2], device='cuda:0'), tensor([3], device='cuda:0')]

Broadcast

PyTorch torch.distributed.broadcast()接口:

输入:Tensor

输出:Tensor,如果src等于当前rank,将Tensor发送其他rank,如果src不等于当前rank,接收src rank发送的Tensor

def dist_broadcast():
    print_rank_0("broadcast:")
    torch.distributed.barrier()

    rank = torch.distributed.get_rank()
    world_size = torch.distributed.get_world_size()

    tensor = torch.tensor(world_size) if rank == 0 else torch.zeros(1, dtype=torch.int64)
    before_tensor = tensor.clone()
    torch.distributed.broadcast(tensor, src=0)
    logging.info(f"broadcast, rank: {rank}, before broadcast tensor: {repr(before_tensor)} after broadcast tensor: {repr(tensor)}")
    torch.distributed.barrier()

日志:

INFO:root:broadcast:
INFO:root:broadcast, rank: 2, before broadcast tensor: tensor(4, device='cuda:2') after broadcast tensor: tensor(4, device='cuda:2')
INFO:root:broadcast, rank: 1, before broadcast tensor: tensor([0], device='cuda:1') after broadcast tensor: tensor([4], device='cuda:1')
INFO:root:broadcast, rank: 3, before broadcast tensor: tensor([0], device='cuda:3') after broadcast tensor: tensor([4], device='cuda:3')
INFO:root:broadcast, rank: 0, before broadcast tensor: tensor([0], device='cuda:0') after broadcast tensor: tensor([4], device='cuda:0')

参考文献

标签:NCCL,rank,tensor,torch,distributed,接口,cuda,测试,device
From: https://www.cnblogs.com/bytehandler/p/17635933.html

相关文章

  • java反射和泛型测试
     java反射和泛型测试packagecom.qsds.test;importjava.lang.reflect.Method;importcom.qr.util.GetFieldOrder;/***Helloworld!**/publicclassApp<T>{publicstaticvoidmain(String[]args)throwsException{//反射测试tes......
  • HD Supply EDI 项目测试流程
    在此前的文章 HDSupplyEDI项目案例中,我们已经为大家介绍了HDSupplyEDI项目案例,了解了HDSupply业务测试场景,本文将为大家介绍EDI项目测试流程。HDSupplyEDI项目连接测试在HDSupplyEDI项目中,CommerceHub为SFTPSever端,供应商作为SFTPClient端,只需要在知行之桥E......
  • 测试报告还能通知到协作群组?RunnerGo现已支持!
    测试人员在配置自动化测试、性能测试脚本时需要配置测试报告通知人,近期更新中RunnerGo消息通知已接入第三方,在配置测试报告通知人时可以选择飞书、企业微信、钉钉、邮箱。本篇文章会介绍如何配置RunnerGo的三方消息通知。RunnerGo目前支持创建飞书群机器人、飞书企业应用,企业微信机......
  • 测试报告还能通知到协作群组?RunnerGo现已支持!
    测试人员在配置自动化测试、性能测试脚本时需要配置测试报告通知人,近期更新中RunnerGo消息通知已接入第三方,在配置测试报告通知人时可以选择飞书、企业微信、钉钉、邮箱。本篇文章会介绍如何配置RunnerGo的三方消息通知。RunnerGo目前支持创建飞书群机器人、飞书企业应用,企业微信......
  • 用 GPT-4 给开源项目 GoPool 重构测试代码 - 每天5分钟玩转 GPT 编程系列(8)
    目录1.好险,差点被喷2.重构测试代码2.1引入Ginkgo测试框架2.2尝试改造旧的测试用例2.3重构功能测试代码3.总结1.好险,差点被喷早几天发了一篇文章:《仅三天,我用GPT-4生成了性能全网第一的GolangWorkerPool,轻松打败GitHub万星项目》,这标题是挺容易被怼,哇咔咔;不过最......
  • 代码性能测试 运行时间和占用内存
    运行时间用内置的`%time`和`%timeit`前者运行1次的时间,后者运行多次的平均值,放在单行代码前。要测试整个单元格,就是`%%time`和`%%timeit`,放在单元格的顶部。占用内存要用到第三方库memory_profiler,然后在单元格中导入 %load_extmemory_profiler在需要测量内存的代码单元格......
  • ITK 实例1 ITK环境测试
    1#include"itkImage.h"//包含图像类的头文件2#include<iostream>34intmain()5{6//创建一个三维、像素是无符号短字符数据类型的图像7typedefitk::Image<unsignedshort,3>ImageType;8//调用New()操作创建图像并将结果分配到itk::......
  • 测试
    CodeforcesRound765(Div.2)A.AncientCivilization好像就是voidsolve(){intn=read(),m=read();vector<int>cnt0(m+1),cnt1(m+1);for(inti=1;i<=n;i++){intx=read();for(intj=m;j>=1;j--){if(x%2)cnt1[j]+......
  • 发朋友圈怎么测试?功能测试重点关注什么?
     发朋友圈的功能测试可以关注以下几个方面:文本输入:测试输入各种字符、符号、表情、链接等,检查是否可以正常显示和发布。图片/视频上传:测试上传各种格式、大小、分辨率的图片和视频,检查是否可以正常显示和发布。定位功能:测试是否可以正确获取和显示位置信息。评论和点赞功能......
  • java实现一个什么都没有的接口有什么用呢?
    在Java中,一个完全没有方法定义的接口被称为标记接口(MarkerInterface)。标记接口不包含任何方法,但它们在代码中传达了某种含义或元信息。它们有以下几种主要用途:语义约定:通过实现一个特定的标记接口,类可以明确地表示它具有某种特性或行为,即使该接口没有定义任何方法。当你看到一个类......