首页 > 其他分享 >deepspeed流水线并行

deepspeed流水线并行

时间:2023-08-28 14:14:51浏览次数:43  
标签:layers deepspeed nn self 并行 train forward 流水线

docs/_tutorials/pipeline.md

https://gitee.com/qzl66/DeepSpeed/blob/master/docs/_tutorials/pipeline.md

 

1、重构管道模型  Expressing Pipeline Models

 流水线并行要求模型被表示为一系列层。在前向传播中,每一层输入为上一层的输出。其实管道并行模型是不需要指定forward()的!管道并行模型的正向传递隐式采用以下形式:
def forward(self, inputs):
    x = inputs
    for layer in self.layers:
        x = layer(x)
    return x
PyTorch的torch.nn.Sequential是一个用于表达流水线并行模型的方便容器,可以通过DeepSpeed并行化,无需修改:
net = nn.Sequential(
    nn.Linear(in_features, hidden_dim),
    nn.ReLU(inplace=True),
    nn.Linear(hidden_dim, out_features)
)
from deepspeed.pipe import PipelineModule
net = PipelineModule(layers=net, num_stages=2)
PipelineModule使用其layers参数作为组成模型的层序列。在初始化之后,net被分成两个流水线阶段,其层被移动到相应的GPU。如果存在两个以上的GPU,DeepSpeed还将使用混合数据并行。 注意:GPU总数必须能被流水线阶段数整除。

Note: For large model training, see memory-efficient model construction. {: .notice--info}

 

让我们来看看torchvision的AlexNet的一个简化实现:

 

class AlexNet(nn.Module):
    def __init__(self, num_classes=1000):
        super(AlexNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2),
            ...
            nn.MaxPool2d(kernel_size=3, stride=2),
        )
        self.avgpool = nn.AdaptiveAvgPool2d((6, 6))
        self.classifier = nn.Sequential(
            nn.Dropout(),
            ...
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        x = self.features(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.classifier(x)
        return x
AlexNet主要由几个连续的子模块组成。我们可以将它的子模块展平成一系列层,从而将它变成一个PipelineModule:  
class AlexNetPipe(AlexNet):
    def to_layers(self):
        layers = [
            *self.features,
            self.avgpool,
            lambda x: torch.flatten(x, 1),
            *self.classifier
        ]
        return layers

from deepspeed.pipe import PipelineModule
net = AlexNetPipe()
net = PipelineModule(layers=net.to_layers(), num_stages=2)
注意:上面各层中间的lambda不是torch.nn.Module类型。任何实现__call__()的对象都可以是PipelineModule中的一层:这允许在管道中进行方便的数据转换。  

2、输入和输出:

按照torch.nn.Sequential,每层的输入和输出必须是单个torch。张量或一组张量。在实践中,一些模型可能需要修改它们的前向传递,将参数打包和解包为forward()。考虑transformer模块堆栈的简化实现:
class TransformerBlock(nn.Module)
    ...
    def forward(self, hidden, mask):
        output = self.compute(hidden, mask)
        return output
    ...

stack = [ TransformerBlock() for _ in range(num_layers) ]
需要对TransformerBlock进行两处修改: 1、必须将参数收集到一个元组中。 2、mask也必须从forward()返回才能传递到下一层。 这些修改可以通过一个简短的子类来完成:
class TransformerBlockPipe(TransformerBlock)
    def forward(self, inputs):
        hidden, mask = inputs
        output = super().forward(hidden, mask)
        return (output, mask)
stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
 

训练循环Training Loops

 流水线并行将前向传播和后向传播交叉进行,因此训练循环不能被分成前向()、后向()和步进()的独立阶段。相反,DeepSpeed的管道引擎提供了一个train_batch()方法,该方法使管道引擎前进,直到下一批训练数据被使用并且模型权重被更新。
train_iter = iter(train_loader)
loss = engine.train_batch(data_iter=train_iter)
上面的train_batch()示例相当于下面的传统数据并行DeepSpeed:
train_iter = iter(train_loader)
for micro_batch in engine.gradient_accumulation_steps():
    batch = next(data_iter)
    loss = engine(batch)
    engine.backward(loss)
    engine.step()
 

数据处理

 

     

标签:layers,deepspeed,nn,self,并行,train,forward,流水线
From: https://www.cnblogs.com/q-zl/p/17661843.html

相关文章

  • 分享生产项目DevOps CICD流水线解决方案
    一、前言每家互联网业务迭代更新都会有自己的一套DevOps发布上线技术架构体系,不管是采用什么工具都离不开编译、打包、发布、部署等几个环境,随着互联网快速的发展,为了满足企业业务上线需求,大批的技术人员都研发出各种有意思的工具,像我们熟知的Jenkins、Spug等,都为我们互联网公司业......
  • 并行求解器基础知识学习
      1.数字化工具的新特征    。。。。物理机-->虚拟化-->容器化   2.分布式并行编程基础(1)传相关并行编程框架:MPI(消息传递接口)——一种典型的并行编程框架OpenCL   CUDA(2)HDFS分布式文件系统下的MapReduce并行模式shuffle调度  ......
  • 并发和并行,线程和进程
     ......
  • 为什么会有 GIL?如何释放 GIL 实现并行?
    https://mp.weixin.qq.com/s?__biz=Mzg3NTczMDU2Mg==&mid=2247503319&idx=1&sn=7dd1c7c05ccb319501eb0457a1f4c9b7&chksm=cf3f8e3af848072c0ef585787bf2b4359f3d63e43a927f93eaef81c407efff73233ffc00a2c7&scene=178&cur_album_id=251396376434648678......
  • node前端的流水线pipline案例
    pipeline{agent{kubernetes{cloud'kubernetes-dev'slaveConnectTimeout1200workspaceVolumehostPathWorkspaceVolume(hostPath:"/opt/jenkins/workspace",readOnly:false)yaml'''apiVersi......
  • jdk后端的流水线pipline案例
    pipeline{agent{kubernetes{cloud'kubernetes-dev'slaveConnectTimeout1200workspaceVolumehostPathWorkspaceVolume(hostPath:"/opt/jenkins/workspace",readOnly:false)yaml'''apiVersi......
  • Oracle行列操作--合并行与按字段拆分
    1、在实际工作中遇到根据某一字段将多行合并成一行的情况,我们下面以选修课的例子进行说明:--createtablecreatetableXXK(idNUMBER,rymcNVARCHAR2(50),xxkmcNVARCHAR2(50))---inserttestdatainsertintoXXKvalues(1,'小明','编程');insertintoXXKv......
  • 并发和并行
     并发VS并行1、多线程程序在单核上运行,就是并发2、多线程程序在多核上运行,就是并行并发:因为是在一个cPU上,比如有10个线程,每个线程执行10毫秒(进行轮询操作),从人的角度看,好像这10个线程都在运行,但是从微观上看,在某一时间点看,其实只有一个线程在执行,这就是并发并行:因为是在多个c......
  • C#生产流程控制(串行,并行混合执行)
    开源框架CsGohttps://gitee.com/hamasm/CsGo?_from=gitee_search 文档资料:https://blog.csdn.net/aa2528877987/article/details/132139337 实现效果 usingGo;usingSystem;usingSystem.Collections.Generic;usingSystem.Threading.Tasks;usingSystem.Windo......
  • Pytorch 并行:DistributedDataParallel
    Pytorch并行:DistributedDataParallel一个节点上往往有多个GPU(单机多卡),一旦有多个GPU空闲(当然得赶紧都占着),就需要合理利用多GPU资源,这与并行化训练是分不开的。O、数据并行化按《深入浅出Pytorch》的话来说,pytorch模型的并行化,主要分为两类:模型并行:一个GPU容纳不了一......