docs/_tutorials/pipeline.md
https://gitee.com/qzl66/DeepSpeed/blob/master/docs/_tutorials/pipeline.md
1、重构管道模型 Expressing Pipeline Models
流水线并行要求模型被表示为一系列层。在前向传播中,每一层输入为上一层的输出。其实管道并行模型是不需要指定forward()的!管道并行模型的正向传递隐式采用以下形式:def forward(self, inputs): x = inputs for layer in self.layers: x = layer(x) return xPyTorch的torch.nn.Sequential是一个用于表达流水线并行模型的方便容器,可以通过DeepSpeed并行化,无需修改:
net = nn.Sequential( nn.Linear(in_features, hidden_dim), nn.ReLU(inplace=True), nn.Linear(hidden_dim, out_features) ) from deepspeed.pipe import PipelineModule net = PipelineModule(layers=net, num_stages=2)PipelineModule使用其layers参数作为组成模型的层序列。在初始化之后,net被分成两个流水线阶段,其层被移动到相应的GPU。如果存在两个以上的GPU,DeepSpeed还将使用混合数据并行。 注意:GPU总数必须能被流水线阶段数整除。
Note: For large model training, see memory-efficient model construction. {: .notice--info}
让我们来看看torchvision的AlexNet的一个简化实现:
class AlexNet(nn.Module): def __init__(self, num_classes=1000): super(AlexNet, self).__init__() self.features = nn.Sequential( nn.Conv2d(3, 64, kernel_size=11, stride=4, padding=2), ... nn.MaxPool2d(kernel_size=3, stride=2), ) self.avgpool = nn.AdaptiveAvgPool2d((6, 6)) self.classifier = nn.Sequential( nn.Dropout(), ... nn.Linear(4096, num_classes), ) def forward(self, x): x = self.features(x) x = self.avgpool(x) x = torch.flatten(x, 1) x = self.classifier(x) return xAlexNet主要由几个连续的子模块组成。我们可以将它的子模块展平成一系列层,从而将它变成一个PipelineModule:
class AlexNetPipe(AlexNet): def to_layers(self): layers = [ *self.features, self.avgpool, lambda x: torch.flatten(x, 1), *self.classifier ] return layers from deepspeed.pipe import PipelineModule net = AlexNetPipe() net = PipelineModule(layers=net.to_layers(), num_stages=2)注意:上面各层中间的lambda不是torch.nn.Module类型。任何实现__call__()的对象都可以是PipelineModule中的一层:这允许在管道中进行方便的数据转换。
2、输入和输出:
按照torch.nn.Sequential,每层的输入和输出必须是单个torch。张量或一组张量。在实践中,一些模型可能需要修改它们的前向传递,将参数打包和解包为forward()。考虑transformer模块堆栈的简化实现:class TransformerBlock(nn.Module) ... def forward(self, hidden, mask): output = self.compute(hidden, mask) return output ... stack = [ TransformerBlock() for _ in range(num_layers) ]需要对TransformerBlock进行两处修改: 1、必须将参数收集到一个元组中。 2、mask也必须从forward()返回才能传递到下一层。 这些修改可以通过一个简短的子类来完成:
class TransformerBlockPipe(TransformerBlock) def forward(self, inputs): hidden, mask = inputs output = super().forward(hidden, mask) return (output, mask) stack = [ TransformerBlockPipe() for _ in range(num_layers) ]
训练循环Training Loops
流水线并行将前向传播和后向传播交叉进行,因此训练循环不能被分成前向()、后向()和步进()的独立阶段。相反,DeepSpeed的管道引擎提供了一个train_batch()方法,该方法使管道引擎前进,直到下一批训练数据被使用并且模型权重被更新。train_iter = iter(train_loader) loss = engine.train_batch(data_iter=train_iter)上面的train_batch()示例相当于下面的传统数据并行DeepSpeed:
train_iter = iter(train_loader) for micro_batch in engine.gradient_accumulation_steps(): batch = next(data_iter) loss = engine(batch) engine.backward(loss) engine.step()
数据处理
标签:layers,deepspeed,nn,self,并行,train,forward,流水线 From: https://www.cnblogs.com/q-zl/p/17661843.html