首页 > 其他分享 >Yuan2.0代码主要结构概览及三种并行方式实现

Yuan2.0代码主要结构概览及三种并行方式实现

时间:2024-01-19 17:56:24浏览次数:27  
标签:pipeline group ranks 概览 并行 groups model parallel Yuan2.0

该代码结构如下图所示:

在initialize_megatron初始化megatron的过程中,有关于数据并行、流水线并行、张量并行的实现,简介及其实现如下:

模型分布式环境初始化:

以两台分别有8个GPU服务器为例,训练具有12层的transformer layers,

图一

图二

   本示例将模型纵向切割为4部分,每部分3个layers,实现pipeline-parallel(流水线并行),将模型横向切割实现tensor-parallel(向量并行),把图二中的“1,2,3层”切割成两部分。

 

图三

上图说明了以model1为例,如何切割一个模型为八个部分,分别放入八个gpu的过程。

一个完整的模型model1的含义:

纵向三刀,把transformer layers的一共12层,切割成了四个部分,每个部分3个layers,其目的是实现pipeline-parallel;【需要pipeline_model_parallel_size=4】

横向的一刀,代表了tensor-parallel,是把(1,2,3)直到(10,11,12)这样的每三层layers,都切割成上下两个部分。【需要tensor_model_parallel_size=2】

tensor model-parallel groups:代表有多少个包含向量并行的groups,由图可知:

model1:[0, 1; 8, 9; 4, 5; 12, 13]

Model2:[2, 3; 10, 11; 6, 7; 14, 15]

对应代码示例中的:

8 tensor model-parallel groups:
    [g0, g1], [g2, g3], [g4, g5], [g6, g7], [g8, g9], [g10, g11], [g12, g13], [g14, g15]

 

pipeline model-parallel groups:代表有多少个包含流水线并行的模型,由图可知:

模型model1先纵向切割为4份为流水线并行关系,然后横向切分,故有两个groups,第一个,[0,4,8,12],第二个:[1,5,9,13]

同理model2。

 

data_parallel groups:数据并行groups,数据并行,是”含有相同参数的模型的子块“之间进行数据并行,有图可以看到两台服务器中的模型结构,(0、2相同),(1、3相同),(4、6相同),对应代码示例中的:

8 data_parallel groups:
    [g0, g2], [g1, g3], [g4, g6], [g5, g7], [g8, g10], [g9, g11], [g12, g14], [g13, g15]

代码实现:

initialize_model_parallel(
    tensor_model_parallel_size: int = 1,
    pipeline_model_parallel_size: int = 1,
    virtual_pipeline_model_parallel_size: Optional[int] = None,
    pipeline_model_parallel_split_rank: Optional[int] = None,
    use_fp8: bool = False,
)
tensor_model_parallel_size = 4
pipeline_model_parallel_size = 2
world_size = 16
data_parallel_size: int = world_size // (tensor_model_parallel_size * pipeline_model_parallel_size)
num_tensor_model_parallel_groups: int = world_size // tensor_model_parallel_size = 4
num_pipeline_model_parallel_groups: int = world_size // pipeline_model_parallel_size = 8

# Build the data-parallel groups.
#构建数据并行groups
all_data_parallel_group_ranks = []
for i in range(pipeline_model_parallel_size):
    start_rank = i * num_pipeline_model_parallel_groups
    end_rank = (i + 1) * num_pipeline_model_parallel_groups
for i in range(pipeline_model_parallel_size):
    start_rank = i * num_pipeline_model_parallel_groups
    end_rank = (i + 1) * num_pipeline_model_parallel_groups
    for j in range(tensor_model_parallel_size):
        ranks = range(start_rank + j, end_rank, tensor_model_parallel_size)
        all_data_parallel_group_ranks.append(list(ranks))
        group = torch.distributed.new_group(ranks)
        group_gloo = torch.distributed.new_group(ranks, backend="gloo")
        if rank in ranks:
            _DATA_PARALLEL_GROUP = group
            _DATA_PARALLEL_GROUP_GLOO = group_gloo
            _DATA_PARALLEL_GLOBAL_RANKS = ranks
print(all_data_parallel_group_ranks)

all_data_parallel_group_ranks
[[0, 2], [1, 3], [4, 6], [5, 7], [8, 10], [9, 11], [12, 14], [13, 15]]
# Build the model-parallel groups.
#构建模型并行占用groups,即模型占用了哪些GPU
global _MODEL_PARALLEL_GROUP
assert _MODEL_PARALLEL_GROUP is None, 'model parallel group is already initialized'
for i in range(data_parallel_size):
    ranks = [data_parallel_group_ranks[i] for data_parallel_group_ranks in all_data_parallel_group_ranks]
    group = torch.distributed.new_group(ranks)
    print(ranks)
    if rank in ranks:
        _MODEL_PARALLEL_GROUP = group

ranks
[0, 1, 4, 5, 8, 9, 12, 13]
[2, 3, 6, 7, 10, 11, 14, 15]
# Build the tensor model-parallel groups.
#构建张量并行groups
global _TENSOR_MODEL_PARALLEL_GROUP
assert _TENSOR_MODEL_PARALLEL_GROUP is None, 'tensor model parallel group is already initialized'
for i in range(num_tensor_model_parallel_groups):
    ranks = range(i * tensor_model_parallel_size, (i + 1) * tensor_model_parallel_size)
    group = torch.distributed.new_group(ranks)
    print(ranks)
    if rank in ranks:
        _TENSOR_MODEL_PARALLEL_GROUP = group

[0, 1]
[2, 3]
[4, 5]
[6, 7]
[8, 9]
[10, 11]
[12, 13]
[14, 15]
# Build the pipeline model-parallel groups and embedding groups
#构建流水线并行groups和embedding groups
for i in range(num_pipeline_model_parallel_groups):
    ranks = range(i, world_size, num_pipeline_model_parallel_groups)
    print(ranks)
    group = torch.distributed.new_group(ranks)
    if rank in ranks:
        _PIPELINE_MODEL_PARALLEL_GROUP = group
        _PIPELINE_GLOBAL_RANKS = ranks
    # Setup embedding group (to exchange gradients between
    # first and last stages).
    if len(ranks) > 1:
        embedding_ranks = [ranks[0], ranks[-1]]
        position_embedding_ranks = [ranks[0]]
        print(embedding_ranks)
        print(position_embedding_ranks)
        if pipeline_model_parallel_split_rank is not None:
            if ranks[pipeline_model_parallel_split_rank] not in embedding_ranks:
                embedding_ranks = [ranks[0], ranks[pipeline_model_parallel_split_rank], ranks[-1]]
            if ranks[pipeline_model_parallel_split_rank] not in position_embedding_ranks:
                position_embedding_ranks = [ranks[0], ranks[pipeline_model_parallel_split_rank]]
    else:
        embedding_ranks = ranks
        position_embedding_ranks = ranks

    group = torch.distributed.new_group(embedding_ranks)
    if rank in embedding_ranks:
        _EMBEDDING_GROUP = group
    if rank in ranks:
        _EMBEDDING_GLOBAL_RANKS = embedding_ranks

    group = torch.distributed.new_group(position_embedding_ranks)
    if rank in position_embedding_ranks:
        _POSITION_EMBEDDING_GROUP = group
    if rank in ranks:
        _POSITION_EMBEDDING_GLOBAL_RANKS = position_embedding_ranks
运行结果:
[0, 4, 8, 12]
[0, 12]
[0]
[1, 5, 9, 13]
[1, 13]
[1]
[2, 6, 10, 14]
[2, 14]
[2]
[3, 7, 11, 15]
[3, 15]
[3]

参考:

https://zhuanlan.zhihu.com/p/470279673

 

模型分布式环境初始化:

以两台分别有8个GPU服务器为例,训练具有12层的transformer layers,

标签:pipeline,group,ranks,概览,并行,groups,model,parallel,Yuan2.0
From: https://www.cnblogs.com/wangzhilun/p/17975268

相关文章

  • MySQL并行复制死锁源码解析
    最近一个MySQL5.7.21备库告警当天的备份失败,登录上去看的时候发现前一天的备份任务还没有结束,通过查看日志发现无法备份成功的原因是一直无法获取FTWRL锁,登录MySQL查看会话状态发现其中几个复制worker一致处于异常状态,下发STOPSLAVE命令时命令也一直被卡住,当时的会话状态如下:......
  • java8并行处理能力
    java8并行处理能力当使用Java8的StreamAPI进行并行处理时,可以通过调用parallel()方法将流转换为并行流。下面是一些示例代码,展示了如何使用Java8的并行处理能力:并行处理集合元素求和:List<Integer>numbers=Arrays.asList(1,2,3,4,5,6,7,8,9,10);intsu......
  • Selenium Grid4.0 - 多台计算机上并行运行
    前言当你希望在多台计算机上并行运行测试?SeleniumGrid可以帮你实现。官方文档原文:https://www.selenium.dev/documentation/grid/getting_started/SeleniumGrid允许通过将客户端发送的命令路由到远程浏览器实例,在远程机器上执行WebDriver脚本。Grid可以做那些事?1.提供一......
  • 51、Flink的管理执行(执行配置、程序打包和并行执行)的介绍及示例
    文章目录Flink系列文章一、执行配置二、程序打包和分布式运行1、打包程序2、总结三、并行执行1、设置并行度1)、算子层次2)、执行环境层次3)、客户端层次4)、系统层次2、设置最大并行度本文介绍了Flink的管理执行的三个内容,即执行配置、打包和分布式运行以及并行执行(设置并行度的几......
  • Dating Java8系列之并行数据处理
    翎野君/文  分支合并框架 分支合并框架介绍分支/合并框架的目的是以递归的方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。把任务提交......
  • 并行 sha256sum 命令
    之前为文件夹里的文件生成SHA-256摘要时,我使用的是sha256sum*.mp4*.xml*.jpg>sha256sums.txt这个命令是逐个生成哈希值的,在计算完成1.mp4之前并不会开始计算2.mp4,不能很好得利用多核性能。解决办法也很简单,利用“百闻不如一见”的xargs即可:echo*.mp4*.xml*.jp......
  • 计算机体系结构之并行机制知识点总结
    cpu支持哪些指令集,是硬件结构决定的,还是软件方式实现的?CPU支持的指令集是由其硬件结构决定的。指令集是一组与硬件交互的底层机器指令,它定义了CPU能够执行的基本操作,包括算术运算、逻辑操作、内存访问等。不同的CPU架构具有不同的指令集。在硬件层面,CPU的设计决定了它支持......
  • Vue3 深入解析:原理与核心功能概览
    引言Vue.js,作为当今最流行的前端框架之一,以其声明式编程、响应式设计和组件化开发等特性深受开发者喜爱。Vue3(也称Vue.jsNext)作为Vue.js的重大更新版本,不仅在性能上有显著提升,还在架构设计上进行了深度优化。本文将深入探讨Vue3的核心原理及其改进之处。一、CompositionAPIVu......
  • el-table 设置合并行或列时,显示错乱问题
    1.需求效果图:2.接口数据格式:点击查看代码constlist=[{contractNo:"CAI-20220801001",contractItem:"用户质量指数",count:15234,customerItems:[{contractNo:null,contractItem:"反欺诈分......
  • 科普:嵌入式多核并行仿真
    ​自信息技术革命以来,计算机一直被应用在各种复杂的数据处理中,如火箭弹道,高能物理和生物学数据等。随着嵌入式领域的多样化需求的不断丰富,多核CPU的应用也越来越广泛:嵌入式系统通常需要同时处理多个任务和实时数据,并在有限的资源和功耗限制下提供高性能和可靠性。多核技术为这些需......