获得更多深度学习在NLP方面应用的经典论文、实践经验和最新消息,欢迎关注微信公众号“DeepLearning_NLP” 或者扫描头像二维码添加关注。
分布式tensorflow:
本文档将展示如何创建一个tensorflow服务的集群,如何在不同的集群之间分布式部署计算图。
你好,分布式tensorflow!
首先,简单实践一个简单的tensorflow集群的例子:
# Start a TensorFlow server as asingle-process "cluster".
$ python
>>> import tensorflowas tf
>>> c = tf.constant("Hello, distributed TensorFlow!")
>>> server = tf.train.Server.create_local_server()
>>> sess = tf.Session(server.target) # Create a session on the server.
>>> sess.run(c)
'Hello, distributed TensorFlow!'
tf.train.Server.create_local_server()方法创建了一个单进程的集群,包含一个正在执行的服务。
创建一个集群:
一个tensorflow集群cluster是一个任务tasks的集合,其中,每一个任务会参与一个tensorflow图的分布式执行。每一个任务与一个tensorflow服务server相关,每一个服务包含一个master:用于创建sessions,一个worker:用于执行图中的操作。一个集群也可以被分解为一个或者多个工作jobs,每个jobs包含一个或多个任务tasks。
为创建一个集群,需要为集群中的每个tasks创建一个tensorflow server。不同的tasks可以运行在不同的机器上,也可以运行在同一台机器上(比如,指出具体使用的gpu设备)。每一个task都做如下操作:
1、 创建一个tf.train.ClusterSpec
,
用以描述一个集群中的所有tasks(任务)。对于每一个task,这应该都一样。
2、 创建tf.train.Server
,传递
tf.train.ClusterSpec给构造函数,并通过job name和task index(工作名和任务id)来区分本地不同的任务。
创建一个tf.train.ClusterSpec去描述集群:
集群配置信息词典(cluster specificationdictionary)会把job name(工作名)映射到网络地址列表,把这个词典传递给tf.train.ClusterSpec
的构造函数,比如说:
在每一个task创建一个tf.train.Server实例:
一个tf.train.Server对象包括一个本地设备的集合,并在它的tf.train.ClusterSpec里面保存了与其他所有任务连接关系的集合;有助于tf.Session使用这些信息去执行一个分布式计算。每一个server是一个有具体名字的job,并在job内部有一个task id。一个server也可以与集群中的其他server通信。
比如说,在localhost:2222和localhost:2223执行一个包含两个server的cluster,在本地机器上运行下面两个不同的进程:
# In task 0:
cluster = tf.train.ClusterSpec({"local":["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=0)
# In task 1:
cluster = tf.train.ClusterSpec({"local":["localhost:2222","localhost:2223"]})
server = tf.train.Server(cluster, job_name="local", task_index=1)
注意:手动指定集群配置信息非常麻烦,特别是对大集群。我么正在开发工具用于通过编程部署任务,比如:使用集群管理工具Kubernetes。
在模型中指定具体的分布式设备:
使用tf.device函数,可以把ops操作指定到一个特定的进程中,比如说:
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
with tf.Session("grpc://worker7.example.com:2222")as sess:
for _ in range(10000):
sess.run(train_op)
在上面的例子中,变量在两个ps job的tasks中被创建,模型中计算密集的部分在worker job中创建。Tensorflow会插入合适的数据,这些数据会在不同的job间传递(比如,前向传递时,从ps到worker;运用梯度时,从worker到ps)
重复性训练:
数据并行化(data parallelism)是一种常见的训练模式,一个worker job里面多个任务(tasks),每个task对根据不同的mini_batch数据建模,更新共享的参数,这些参数由一个或者多个tasks共享,且存在于一个ps job里面。所有任务运行在不同的机器上。有很多种方式在tensorflow中展开这种结构,我们正在搭建库用以简化执行重复模型的工作。可选的途径包括:
1、 图内复制(In-graph replication)。在这种方式中,客户端建立一个单一的tf.Graph,它包含一个参数集合(比如,tf.Variable被指定到/job:ps),和多个模型中计算密集部分的副本,每一个都被指定到/job:worker里的不同task。
2、 图间复制(Between-graphreplication)。在这种方式中,对/job:worker里的每一个task有独立的client(客户端),特别是同一个进程。对每一个客户端建立相同的图,包括图的参数(在使用tf.train.replica_device_setter把这些参数确定性的映射到不同任务tasks中之前,先把参数指定到/job:ps),模型计算密集部分的一个副本,指定到一个处于/job:worker中本地task中。
3、 异步训练(Asynchronous training)。这种方式中,图的每一个副本拥有各自独立的训练loop,不需要执行一致性。对于以上两种复制方式都适用。
4、 同步训练(Synchronous training)。这种方式中,所有的副本读取相同的当前参数值,并行计算梯度,然后一起应用。适用于In-graphreplication(比如:CIFAR-10 multi-GPUtrainer的梯度平均)和Between-graphreplication(比如:使用tf.train.SyncReplicasOptimizer)
整合到一起:训练程序例子:
下面代码给出了一个分布式训练的框架,实现了between-graphreplication 和 asynchronoustraining。包括了参数服务器和worker tasks的代码:
import argparse
import sys
import tensorflow as tf
FLAGS = None
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# Create a cluster from the parameter server and worker hosts.
cluster = tf.train.ClusterSpec({"ps": ps_hosts,"worker": worker_hosts})
# Create and start a server for the local task.
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name =="ps":
server.join()
elif FLAGS.job_name =="worker":
# Assigns ops to the local worker by default.
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d"% FLAGS.task_index,
cluster=cluster)):
# Build model...
loss = ...
global_step = tf.contrib.framework.get_or_create_global_step()
train_op = tf.train.AdagradOptimizer(0.01).minimize(
loss, global_step=global_step)
# The StopAtStepHook handles stopping after running given steps.
hooks=[tf.train.StopAtStepHook(last_step=1000000)]
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="/tmp/train_logs",
hooks=hooks) as mon_sess:
whilenot mon_sess.should_stop():
# Run a training step asynchronously.
# See `tf.train.SyncReplicasOptimizer` for additional details on how to
# perform *synchronous* training.
# mon_sess.run handles AbortedError in case of preempted PS.
mon_sess.run(train_op)
if __name__ =="__main__":
parser = argparse.ArgumentParser()
parser.register("type","bool",lambda v: v.lower()=="true")
# Flags for defining the tf.train.ClusterSpec
parser.add_argument(
"--ps_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--worker_hosts",
type=str,
default="",
help="Comma-separated list of hostname:port pairs"
)
parser.add_argument(
"--job_name",
type=str,
default="",
help="One of 'ps', 'worker'"
)
# Flags for defining the tf.train.Server
parser.add_argument(
"--task_index",
type=int,
default=0,
help="Index of task within the job"
)
FLAGS, unparsed = parser.parse_known_args()
tf.app.run(main=main, argv=[sys.argv[0]]+ unparsed)
通过一个名为trainer.py的脚本来启动一个包含两个ps(parameter server)和两个wotker的trainer:
# On ps0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
--job_name=ps --task_index=0
# On ps1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
--job_name=ps --task_index=1
# On worker0.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
--job_name=worker --task_index=0
# On worker1.example.com:
$ python trainer.py \
--ps_hosts=ps0.example.com:2222,ps1.example.com:2222\
--worker_hosts=worker0.example.com:2222,worker1.example.com:2222\
--job_name=worker --task_index=1
参考文献:
https://www.tensorflow.org/deploy/distributed
标签:ps,task,worker,Distributed,job,train,原理,tf,tensorflow From: https://blog.51cto.com/u_13046751/6530895