首页 > 其他分享 >Distributed tensorflow实现原理

Distributed tensorflow实现原理

时间:2023-06-21 19:33:11浏览次数:60  
标签:ps task worker Distributed job train 原理 tf tensorflow


获得更多深度学习在NLP方面应用的经典论文、实践经验和最新消息,欢迎关注微信公众号“DeepLearning_NLP” 或者扫描头像二维码添加关注。

分布式tensorflow:

本文档将展示如何创建一个tensorflow服务的集群,如何在不同的集群之间分布式部署计算图。

你好,分布式tensorflow!

首先,简单实践一个简单的tensorflow集群的例子:

# Start a TensorFlow server as asingle-process "cluster".
 $ python
 >>> import tensorflowas tf
 >>> c = tf.constant("Hello, distributed TensorFlow!")
 >>> server = tf.train.Server.create_local_server()
 >>> sess = tf.Session(server.target) # Create a session on the server.
 >>> sess.run(c)
'Hello, distributed TensorFlow!'

tf.train.Server.create_local_server()方法创建了一个单进程的集群,包含一个正在执行的服务。

创建一个集群:

一个tensorflow集群cluster是一个任务tasks的集合,其中,每一个任务会参与一个tensorflow图的分布式执行。每一个任务与一个tensorflow服务server相关,每一个服务包含一个master:用于创建sessions,一个worker:用于执行图中的操作。一个集群也可以被分解为一个或者多个工作jobs,每个jobs包含一个或多个任务tasks。

为创建一个集群,需要为集群中的每个tasks创建一个tensorflow server。不同的tasks可以运行在不同的机器上,也可以运行在同一台机器上(比如,指出具体使用的gpu设备)。每一个task都做如下操作:

1、         创建一个tf.train.ClusterSpec用以描述一个集群中的所有tasks(任务)。对于每一个task,这应该都一样。

2、         创建tf.train.Server,传递tf.train.ClusterSpec给构造函数,并通过job name和task index(工作名和任务id)来区分本地不同的任务。

创建一个tf.train.ClusterSpec去描述集群:

集群配置信息词典(cluster specificationdictionary)会把job name(工作名)映射到网络地址列表,把这个词典传递给tf.train.ClusterSpec的构造函数,比如说:

在每一个task创建一个tf.train.Server实例:

一个tf.train.Server对象包括一个本地设备的集合,并在它的tf.train.ClusterSpec里面保存了与其他所有任务连接关系的集合;有助于tf.Session使用这些信息去执行一个分布式计算。每一个server是一个有具体名字的job,并在job内部有一个task id。一个server也可以与集群中的其他server通信。

比如说,在localhost:2222和localhost:2223执行一个包含两个server的cluster,在本地机器上运行下面两个不同的进程:

# In task 0:
cluster = tf.train.ClusterSpec({"local":["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local":["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)

注意:手动指定集群配置信息非常麻烦,特别是对大集群。我么正在开发工具用于通过编程部署任务,比如:使用集群管理工具Kubernetes。

在模型中指定具体的分布式设备:

使用tf.device函数,可以把ops操作指定到一个特定的进程中,比如说:

with tf.device("/job:ps/task:0"):
  weights_1 = tf.Variable(...)
  biases_1 = tf.Variable(...)

with tf.device("/job:ps/task:1"):
  weights_2 = tf.Variable(...)
  biases_2 = tf.Variable(...)

with tf.device("/job:worker/task:7"):
  input, labels = ...
  layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
  logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
  # ...
  train_op = ...

with tf.Session("grpc://worker7.example.com:2222")as sess:
  for _ in range(10000):
    sess.run(train_op)

在上面的例子中,变量在两个ps job的tasks中被创建,模型中计算密集的部分在worker job中创建。Tensorflow会插入合适的数据,这些数据会在不同的job间传递(比如,前向传递时,从ps到worker;运用梯度时,从worker到ps)

重复性训练:

数据并行化(data parallelism)是一种常见的训练模式,一个worker job里面多个任务(tasks),每个task对根据不同的mini_batch数据建模,更新共享的参数,这些参数由一个或者多个tasks共享,且存在于一个ps job里面。所有任务运行在不同的机器上。有很多种方式在tensorflow中展开这种结构,我们正在搭建库用以简化执行重复模型的工作。可选的途径包括:

1、           图内复制(In-graph replication)。在这种方式中,客户端建立一个单一的tf.Graph,它包含一个参数集合(比如,tf.Variable被指定到/job:ps),和多个模型中计算密集部分的副本,每一个都被指定到/job:worker里的不同task。

2、           图间复制(Between-graphreplication)。在这种方式中,对/job:worker里的每一个task有独立的client(客户端),特别是同一个进程。对每一个客户端建立相同的图,包括图的参数(在使用tf.train.replica_device_setter把这些参数确定性的映射到不同任务tasks中之前,先把参数指定到/job:ps),模型计算密集部分的一个副本,指定到一个处于/job:worker中本地task中。

3、           异步训练(Asynchronous training)。这种方式中,图的每一个副本拥有各自独立的训练loop,不需要执行一致性。对于以上两种复制方式都适用。

4、           同步训练(Synchronous training)。这种方式中,所有的副本读取相同的当前参数值,并行计算梯度,然后一起应用。适用于In-graphreplication(比如:CIFAR-10 multi-GPUtrainer的梯度平均)和Between-graphreplication(比如:使用tf.train.SyncReplicasOptimizer

整合到一起:训练程序例子:

下面代码给出了一个分布式训练的框架,实现了between-graphreplication 和 asynchronoustraining。包括了参数服务器和worker tasks的代码:

import argparse
import sys

import tensorflow as tf

FLAGS = None

def main(_):
  ps_hosts = FLAGS.ps_hosts.split(",")
  worker_hosts = FLAGS.worker_hosts.split(",")

  # Create a cluster from the parameter server and worker hosts.
  cluster = tf.train.ClusterSpec({"ps": ps_hosts,"worker": worker_hosts})

  # Create and start a server for the local task.
  server = tf.train.Server(cluster,
                           job_name=FLAGS.job_name,
                           task_index=FLAGS.task_index)

  if FLAGS.job_name =="ps":
    server.join()
  elif FLAGS.job_name =="worker":

    # Assigns ops to the local worker by default.
    with tf.device(tf.train.replica_device_setter(
        worker_device="/job:worker/task:%d"% FLAGS.task_index,
        cluster=cluster)):

      # Build model...
      loss = ...
      global_step = tf.contrib.framework.get_or_create_global_step()

      train_op = tf.train.AdagradOptimizer(0.01).minimize(
          loss, global_step=global_step)

    # The StopAtStepHook handles stopping after running given steps.
    hooks=[tf.train.StopAtStepHook(last_step=1000000)]

    # The MonitoredTrainingSession takes care of session initialization,
    # restoring from a checkpoint, saving to a checkpoint, and closing when done
    # or an error occurs.
    with tf.train.MonitoredTrainingSession(master=server.target,
                                           is_chief=(FLAGS.task_index == 0),
                                           checkpoint_dir="/tmp/train_logs",
                                           hooks=hooks) as mon_sess:
      whilenot mon_sess.should_stop():
        # Run a training step asynchronously.
        # See `tf.train.SyncReplicasOptimizer` for additional details on how to
        # perform *synchronous* training.
        # mon_sess.run handles AbortedError in case of preempted PS.
        mon_sess.run(train_op)

if __name__ =="__main__":
  parser = argparse.ArgumentParser()
  parser.register("type","bool",lambda v: v.lower()=="true")
  # Flags for defining the tf.train.ClusterSpec
  parser.add_argument(
      "--ps_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--worker_hosts",
      type=str,
      default="",
      help="Comma-separated list of hostname:port pairs"
  )
  parser.add_argument(
      "--job_name",
      type=str,
      default="",
      help="One of 'ps', 'worker'"
  )
  # Flags for defining the tf.train.Server
  parser.add_argument(
      "--task_index",
      type=int,
      default=0,
      help="Index of task within the job"
  )
  FLAGS, unparsed = parser.parse_known_args()
  tf.app.run(main=main, argv=[sys.argv[0]]+ unparsed)

通过一个名为trainer.py的脚本来启动一个包含两个ps(parameter server)和两个wotker的trainer:

# On ps0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
     --job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
     --job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
     --job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
     --ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
     --worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
     --job_name=worker --task_index=1

参考文献:

https://www.tensorflow.org/deploy/distributed

标签:ps,task,worker,Distributed,job,train,原理,tf,tensorflow
From: https://blog.51cto.com/u_13046751/6530895

相关文章

  • NHC/ODO/INS组合原理
    毕业论文中非完整约束部分推导有误,所以更正一下! ......
  • 大型网站技术架构 核心原理与案例分析--阅读笔记
    第一章大型网站架构演化大型网站软件系统的特点大型网站软件系统的特点高并发、大流量高可用海量数据用户分布广法、网络情况复杂安全环境恶劣需求快速变更、发布频繁渐进式发展大型网站架构演化发展历程大型网站的技术挑战主要来自庞大的用户,高并发的访问和海量的数据,任何简单......
  • 并行计算中的线程和进程:原理与实践
    目录1.引言2.技术原理及概念2.1基本概念解释2.2技术原理介绍3.实现步骤与流程3.1准备工作:环境配置与依赖安装3.2核心模块实现3.3集成与测试4.应用示例与代码实现讲解4.1应用场景介绍4.2应用实例分析4.3核心代码实现4.4代码讲解说明5.优化与改进5.1性能优化并行计算......
  • ASIC加速技术原理与实践:从芯片设计到优化
    目录《ASIC加速技术原理与实践:从芯片设计到优化》背景介绍:随着数字电路技术的不断发展,ASIC(专门芯片)作为数字电路中的核心部分,逐渐成为芯片设计中的重要组成部分。ASIC加速技术作为数字电路技术的一种重要分支,为ASIC的性能优化提供了新的解决方案。本文将介绍ASIC加速技术的原理......
  • ASEMI代理光宝光耦LTV-61L的工作原理与应用探析
    编辑-Z本文将对光耦LTV-61L进行深入的探讨,主要从其工作原理、应用领域、使用注意事项以及市场前景四个方面进行详细的阐述。光耦LTV-61L是一种常用的光电器件,其工作原理简单,应用领域广泛,但在使用过程中也需要注意一些问题。同时,随着科技的发展,光耦LTV-61L的市场前景也将更加广阔......
  • Django与celery集成:异步任务原理和过程
    0.原理和架构a.客户发送请求到django;b.django产生任务(要执行的函数);c.django把任务丢给celery的brokerd.celery的worker从broker拿到任务并且执行;e.worker执行后保存结果到后端数据库;  1.在django里面配置celery的目录结构PSD:\djangotest\myrecrument>treeD:.├─.idea......
  • MGR原理解析
    MGR原理解析目录MGR原理解析一、MySQLMGR演化1.1MySQL异步复制1.2MySQL半同步复制1.3MySQL组复制(MGR)1.4MySQL组复制的特性和限制特性优点:限制:二、MGR原理2.1MGR集群中事务整个生命周期2.2transactionmessage和certificationinfotransactionmessagecertificationinfo2......
  • 一致性hash算法原理及实践
    大家好,我是蓝胖子,想起之前学算法的时候,常常只知表面,不得精髓,这个算法到底有哪些应用场景,如何应用在工作中,后来随着工作的深入,一些不懂的问题才慢慢被抽丝剥茧分解出来。今天我们就来看看工作和面试中经常被点名的算法,一致性hash算法,并且我会介绍它在实际的应用场景并用代码实现......
  • DDoS攻击和CC攻击的原理是什么?有何区别?
    近年来,随着移动互联网的高速发展,网络攻击的次数逐年增加,针对网站攻击也变得越来越频繁,而在众多攻击方式之中,最常见的应该就是CC攻击和DDoS攻击,尤其是一些防护能力欠佳的网站,一旦遭受攻击很容易瘫痪,造成不必要的麻烦。那么DDoS攻击和CC攻击的原理有何区别?本文为大家介绍一下。......
  • PCL:点云滤波汇总:算法原理 + 代码实现(转载)
    原文链接:https://blog.csdn.net/weixin_46098577/article/details/114385690PCL官方链接:https://pointclouds.org/documentation/group__filters.html目录1PassThrough直通滤波器1.1官网描述1.2算法原理1.3代码实现2VoxelGrid体素滤波器2.1官网描述2.2算法原理2.3代码......