TensorFlow 学习手册(四)
译者:飞龙
第八章:队列、线程和读取数据
在本章中,我们介绍了在 TensorFlow 中使用队列和线程的方法,主要目的是简化读取输入数据的过程。我们展示了如何编写和读取 TFRecords,这是高效的 TensorFlow 文件格式。然后我们演示了队列、线程和相关功能,并在一个完整的工作示例中连接所有要点,展示了一个包括预处理、批处理和训练的图像数据的多线程输入管道。
输入管道
当处理可以存储在内存中的小数据集时,比如 MNIST 图像,将所有数据加载到内存中,然后使用 feeding 将数据推送到 TensorFlow 图中是合理的。然而,对于更大的数据集,这可能变得难以管理。处理这种情况的一个自然范式是将数据保留在磁盘上,并根据需要加载其中的块(比如用于训练的小批量),这样唯一的限制就是硬盘的大小。
此外,在实践中,一个典型的数据管道通常包括诸如读取具有不同格式的输入文件、更改输入的形状或结构、归一化或进行其他形式的预处理、对输入进行洗牌等步骤,甚至在训练开始之前。
这个过程的很多部分可以轻松地解耦并分解为模块化组件。例如,预处理不涉及训练,因此可以一次性对输入进行预处理,然后将其馈送到训练中。由于我们的训练无论如何都是批量处理示例,原则上我们可以在运行时处理输入批次,从磁盘中读取它们,应用预处理,然后将它们馈送到计算图中进行训练。
然而,这种方法可能是浪费的。因为预处理与训练无关,等待每个批次进行预处理会导致严重的 I/O 延迟,迫使每个训练步骤(急切地)等待加载和处理数据的小批量。更具可扩展性的做法是预取数据,并使用独立的线程进行加载和处理以及训练。但是,当需要重复读取和洗牌许多保存在磁盘上的文件时,这种做法可能变得混乱,并且需要大量的簿记和技术性来无缝运行。
值得注意的是,即使不考虑预处理,使用在前几章中看到的标准馈送机制(使用feed_dict
)本身也是浪费的。feed_dict
会将数据从 Python 运行时单线程复制到 TensorFlow 运行时,导致进一步的延迟和减速。我们希望通过某种方式直接将数据读取到本机 TensorFlow 中,避免这种情况。
为了让我们的生活更轻松(和更快),TensorFlow 提供了一套工具来简化这个输入管道过程。主要的构建模块是标准的 TensorFlow 文件格式,用于编码和解码这种格式的实用工具,数据队列和多线程。
我们将逐一讨论这些关键组件,探索它们的工作原理,并构建一个端到端的多线程输入管道。我们首先介绍 TFRecords,这是 TensorFlow 推荐的文件格式,以后会派上用场。
TFRecords
数据集当然可以采用许多格式,有时甚至是混合的(比如图像和音频文件)。将输入文件转换为一个统一的格式,无论其原始格式如何,通常是方便和有用的。TensorFlow 的默认标准数据格式是 TFRecord。TFRecord 文件只是一个包含序列化输入数据的二进制文件。序列化基于协议缓冲区(protobufs),简单地说,它通过使用描述数据结构的模式将数据转换为存储,独立于所使用的平台或语言(就像 XML 一样)。
在我们的设置中,使用 TFRecords(以及 protobufs/二进制文件)相比仅使用原始数据文件有许多优势。这种统一格式允许整洁地组织输入数据,所有相关属性都保持在一起,避免了许多目录和子目录的需求。TFRecord 文件实现了非常快速的处理。所有数据都保存在一个内存块中,而不是分别存储每个输入文件,从而减少了从内存读取数据所需的时间。还值得注意的是,TensorFlow 自带了许多针对 TFRecords 进行优化的实现和工具,使其非常适合作为多线程输入管道的一部分使用。
使用 TFRecordWriter 进行写入
我们首先将输入文件写入 TFRecord 格式,以便我们可以处理它们(在其他情况下,我们可能已经将数据存储在这种格式中)。在这个例子中,我们将 MNIST 图像转换为这种格式,但是相同的思想也适用于其他类型的数据。
首先,我们将 MNIST 数据下载到 save_dir
,使用来自 tensorflow.contrib.learn
的实用函数:
from__future__importprint_functionimportosimporttensorflowastffromtensorflow.contrib.learn.python.learn.datasetsimportmnistsave_dir="*`path/to/mnist`*"# Download data to save_dirdata_sets=mnist.read_data_sets(save_dir,dtype=tf.uint8,reshape=False,validation_size=1000)
我们下载的数据包括训练、测试和验证图像,每个都在一个单独的 拆分 中。我们遍历每个拆分,将示例放入适当的格式,并使用 TFRecordWriter()
写入磁盘:
data_splits = ["train","test","validation"]
for d in range(len(data_splits)):
print("saving " + data_splits[d])
data_set = data_sets[d]
filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')
writer = tf.python_io.TFRecordWriter(filename)
for index in range(data_set.images.shape[0]):
image = data_set.images[index].tostring()
example = tf.train.Example(features=tf.train.Features(feature={
'height': tf.train.Feature(int64_list=
tf.train.Int64List(value=
[data_set.images.shape[1]])),
'width': tf.train.Feature(int64_list=
tf.train.Int64List(value =
[data_set.images.shape[2]])),
'depth': tf.train.Feature(int64_list=
tf.train.Int64List(value =
[data_set.images.shape[3]])),
'label': tf.train.Feature(int64_list=
tf.train.Int64List(value =
[int(data_set.labels[index])])),
'image_raw': tf.train.Feature(bytes_list=
tf.train.BytesList(value =
[image]))}))
writer.write(example.SerializeToString())
writer.close()
让我们分解这段代码,以理解不同的组件。
我们首先实例化一个 TFRecordWriter
对象,给它一个对应数据拆分的路径:
filename = os.path.join(save_dir, data_splits[d] + '.tfrecords')
writer = tf.python_io.TFRecordWriter(filename)
然后我们遍历每个图像,将其从 NumPy 数组转换为字节字符串:
image = data_set.images[index].tostring()
接下来,我们将图像转换为它们的 protobuf 格式。tf.train.Example
是用于存储我们的数据的结构。Example
对象包含一个 Features
对象,它又包含一个从属性名称到 Feature
的映射。Feature
可以包含一个 Int64List
、一个 BytesList
或一个 FloatList
(这里没有使用)。例如,在这里我们对图像的标签进行编码:
tf.train.Feature(int64_list=tf.train.Int64List(value =
[int(data_set.labels[index])]))
这里是实际原始图像的编码:
tf.train.Feature(bytes_list=tf.train.BytesList(value =[image]))
让我们看看我们保存的数据是什么样子。我们使用 tf.python_io.tf_record_iterator
来实现这一点,这是一个从 TFRecords 文件中读取记录的迭代器:
filename = os.path.join(save_dir, 'train.tfrecords')
record_iterator = tf.python_io.tf_record_iterator(filename)
seralized_img_example= next(record_iterator)
serialized_img
是一个字节字符串。为了恢复保存图像到 TFRecord 时使用的结构,我们解析这个字节字符串,使我们能够访问我们之前存储的所有属性:
example = tf.train.Example()
example.ParseFromString(seralized_img_example)
image = example.features.feature['image_raw'].bytes_list.value
label = example.features.feature['label'].int64_list.value[0]
width = example.features.feature['width'].int64_list.value[0]
height = example.features.feature['height'].int64_list.value[0]
我们的图像也保存为字节字符串,因此我们将其转换回 NumPy 数组,并将其重新整形为形状为 (28,28,1) 的张量:
img_flat = np.fromstring(image[0], dtype=np.uint8)
img_reshaped = img_flat.reshape((height, width, -1))
这个基本示例应该让您了解 TFRecords 以及如何写入和读取它们。在实践中,我们通常希望将 TFRecords 读入一个预取数据队列作为多线程过程的一部分。在下一节中,我们首先介绍 TensorFlow 队列,然后展示如何将它们与 TFRecords 一起使用。
队列
TensorFlow 队列类似于普通队列,允许我们入队新项目,出队现有项目等。与普通队列的重要区别在于,就像 TensorFlow 中的任何其他内容一样,队列是计算图的一部分。它的操作像往常一样是符号化的,图中的其他节点可以改变其状态(就像变量一样)。这一点一开始可能会有点困惑,所以让我们通过一些示例来了解基本队列功能。
入队和出队
在这里,我们创建一个字符串的 先进先出(FIFO)队列,最多可以存储 10 个元素。由于队列是计算图的一部分,它们在会话中运行。在这个例子中,我们使用了一个 tf.InteractiveSession()
:
import tensorflow as tf
sess= tf.InteractiveSession()
queue1 = tf.FIFOQueue(capacity=10,dtypes=[tf.string])
在幕后,TensorFlow 为存储这 10 个项目创建了一个内存缓冲区。
就像 TensorFlow 中的任何其他操作一样,要向队列添加项目,我们创建一个操作:
enque_op = queue1.enqueue(["F"])
由于您现在已经熟悉了 TensorFlow 中计算图的概念,因此定义enque_op
并不会向队列中添加任何内容——我们需要运行该操作。因此,如果我们在运行操作之前查看queue1
的大小,我们会得到这个结果:
sess.run(queue1.size())
Out:
0
运行操作后,我们的队列现在有一个项目在其中:
enque_op.run()
sess.run(queue1.size())
Out:
1
让我们向queue1
添加更多项目,并再次查看其大小:
enque_op = queue1.enqueue(["I"])
enque_op.run()
enque_op = queue1.enqueue(["F"])
enque_op.run()
enque_op = queue1.enqueue(["O"])
enque_op.run()
sess.run(queue1.size())
Out:
4
接下来,我们出队项目。出队也是一个操作,其输出评估为对应于出队项目的张量:
x = queue1.dequeue()
x.eval()
Out: b'F'
x.eval()
Out: b'I'
x.eval()
Out: b'F'
x.eval()
Out: b'O'
请注意,如果我们再次对空队列运行x.eval()
,我们的主线程将永远挂起。正如我们将在本章后面看到的,实际上我们使用的代码知道何时停止出队并避免挂起。
另一种出队的方法是一次检索多个项目,使用dequeue_many()
操作。此操作要求我们提前指定项目的形状:
queue1 = tf.FIFOQueue(capacity=10,dtypes=[tf.string],shapes=[()])
在这里,我们像以前一样填充队列,然后一次出队四个项目:
inputs = queue1.dequeue_many(4)
inputs.eval()
Out:
array([b'F', b'I', b'F', b'O'], dtype=object)
多线程
TensorFlow 会话是多线程的——多个线程可以使用同一个会话并行运行操作。单个操作具有并行实现,默认情况下使用多个 CPU 核心或 GPU 线程。然而,如果单个对sess.run()
的调用没有充分利用可用资源,可以通过进行多个并行调用来提高吞吐量。例如,在典型情况下,我们可能有多个线程对图像进行预处理并将其推送到队列中,而另一个线程则从队列中拉取预处理后的图像进行训练(在下一章中,我们将讨论分布式训练,这在概念上是相关的,但有重要的区别)。
让我们通过一些简单的示例来介绍在 TensorFlow 中引入线程以及与队列的自然互动,然后在 MNIST 图像的完整示例中将所有内容连接起来。
我们首先创建一个容量为 100 个项目的 FIFO 队列,其中每个项目是使用tf.random_normal()
生成的随机浮点数:
from __future__ import print_function
import threading
import time
gen_random_normal = tf.random_normal(shape=())
queue = tf.FIFOQueue(capacity=100,dtypes=[tf.float32],shapes=())
enque = queue.enqueue(gen_random_normal)
def add():
for i in range(10):
sess.run(enque)
再次注意,enque
操作实际上并没有将随机数添加到队列中(它们尚未生成)在图执行之前。项目将使用我们创建的add()
函数进行入队,该函数通过多次调用sess.run()
向队列中添加 10 个项目。
接下来,我们创建 10 个线程,每个线程并行运行add()
,因此每个线程异步地向队列中添加 10 个项目。我们可以(暂时)将这些随机数视为添加到队列中的训练数据:
threads = [threading.Thread(target=add, args=()) for i in range(10)]
threads
Out:
[<Thread(Thread-77, initial)>,
<Thread(Thread-78, initial)>,
<Thread(Thread-79, initial)>,
<Thread(Thread-80, initial)>,
<Thread(Thread-81, initial)>,
<Thread(Thread-82, initial)>,
<Thread(Thread-83, initial)>,
<Thread(Thread-84, initial)>,
<Thread(Thread-85, initial)>,
<Thread(Thread-86, initial)>]
我们已经创建了一个线程列表,现在我们执行它们,以短间隔打印队列的大小,从 0 增长到 100:
for t in threads:
t.start()
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
Out:
10
84
100
最后,我们使用dequeue_many()
一次出队 10 个项目,并检查结果:
x = queue.dequeue_many(10)
print(x.eval())
sess.run(queue.size())
Out:
[ 0.05863889 0.61680967 1.05087686 -0.29185265 -0.44238046 0.53796548
-0.24784896 0.40672767 -0.88107938 0.24592835]
90
协调器和 QueueRunner
在现实场景中(正如我们将在本章后面看到的),有效地运行多个线程可能会更加复杂。线程应该能够正确停止(例如,避免“僵尸”线程,或者在一个线程失败时一起关闭所有线程),在停止后需要关闭队列,并且还有其他需要解决的技术但重要的问题。
TensorFlow 配备了一些工具来帮助我们进行这个过程。其中最重要的是tf.train.Coordinator
,用于协调一组线程的终止,以及tf.train.QueueRunner
,它简化了让多个线程与无缝协作地将数据入队的过程。
tf.train.Coordinator
我们首先演示如何在一个简单的玩具示例中使用tf.train.Coordinator
。在下一节中,我们将看到如何将其作为真实输入管道的一部分使用。
我们使用与上一节类似的代码,修改add()
函数并添加一个协调器:
gen_random_normal = tf.random_normal(shape=())
queue = tf.FIFOQueue(capacity=100,dtypes=[tf.float32],shapes=())
enque = queue.enqueue(gen_random_normal)
def add(coord,i):
while not coord.should_stop():
sess.run(enque)
if i == 11:
coord.request_stop()
coord = tf.train.Coordinator()
threads = [threading.Thread(target=add, args=(coord,i)) for i in range(10)]
coord.join(threads)
for t in threads:
t.start()
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
10
100
100
任何线程都可以调用coord.request_stop()
来让所有其他线程停止。线程通常运行循环来检查是否停止,使用coord.should_stop()
。在这里,我们将线程索引i
传递给add()
,并使用一个永远不满足的条件(i==11
)来请求停止。因此,我们的线程完成了它们的工作,将全部 100 个项目添加到队列中。但是,如果我们将add()
修改如下:
def add(coord,i):
while not coord.should_stop():
sess.run(enque)
if i == 1:
coord.request_stop()
然后线程i=1
将使用协调器请求所有线程停止,提前停止所有入队操作:
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
time.sleep(0.01)
print(sess.run(queue.size()))
Out:
10
17
17
tf.train.QueueRunner 和 tf.RandomShuffleQueue
虽然我们可以创建多个重复运行入队操作的线程,但最好使用内置的tf.train.QueueRunner
,它正是这样做的,同时在异常发生时关闭队列。
在这里,我们创建一个队列运行器,将并行运行四个线程以入队项目:
gen_random_normal = tf.random_normal(shape=())
queue = tf.RandomShuffleQueue(capacity=100,dtypes=[tf.float32],
min_after_dequeue=1)
enqueue_op = queue.enqueue(gen_random_normal)
qr = tf.train.QueueRunner(queue, [enqueue_op] * 4)
coord = tf.train.Coordinator()
enqueue_threads = qr.create_threads(sess, coord=coord, start=True)
coord.request_stop()
coord.join(enqueue_threads)
请注意,qr.create_threads()
将我们的会话作为参数,以及我们的协调器。
在这个例子中,我们使用了tf.RandomShuffleQueue
而不是 FIFO 队列。RandomShuffleQueue
只是一个带有以随机顺序弹出项目的出队操作的队列。这在训练使用随机梯度下降优化的深度神经网络时非常有用,这需要对数据进行洗牌。min_after_dequeue
参数指定在调用出队操作后队列中将保留的最小项目数,更大的数字意味着更好的混合(随机抽样),但需要更多的内存。
一个完整的多线程输入管道
现在,我们将所有部分组合在一起,使用 MNIST 图像的工作示例,从将数据写入 TensorFlow 的高效文件格式,通过数据加载和预处理,到训练模型。我们通过在之前演示的排队和多线程功能的基础上构建,并在此过程中介绍一些更有用的组件来读取和处理 TensorFlow 中的数据。
首先,我们将 MNIST 数据写入 TFRecords,使用与本章开头使用的相同代码:
from__future__importprint_functionimportosimporttensorflowastffromtensorflow.contrib.learn.python.learn.datasetsimportmnistimportnumpyasnpsave_dir="*`path/to/mnist`*"# Download data to save_dirdata_sets=mnist.read_data_sets(save_dir,dtype=tf.uint8,reshape=False,validation_size=1000)data_splits=["train","test","validation"]fordinrange(len(data_splits)):print("saving "+data_splits[d])data_set=data_sets[d]filename=os.path.join(save_dir,data_splits[d]+'.tfrecords')writer=tf.python_io.TFRecordWriter(filename)forindexinrange(data_set.images.shape[0]):image=data_set.images[index].tostring()example=tf.train.Example(features=tf.train.Features(feature={'height':tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[1]])),'width':tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[2]])),'depth':tf.train.Feature(int64_list=tf.train.Int64List(value=[data_set.images.shape[3]])),'label':tf.train.Feature(int64_list=tf.train.Int64List(value=[int(data_set.labels[index])])),'image_raw':tf.train.Feature(bytes_list=tf.train.BytesList(value=[image]))}))writer.write(example.SerializeToString())writer.close()
tf.train.string_input_producer()和 tf.TFRecordReader()
tf.train.string_input_producer()
只是在幕后创建一个QueueRunner
,将文件名字符串输出到我们的输入管道的队列中。这个文件名队列将在多个线程之间共享:
filename = os.path.join(save_dir ,"train.tfrecords")
filename_queue = tf.train.string_input_producer(
[filename], num_epochs=10)
num_epochs
参数告诉string_input_producer()
将每个文件名字符串生成num_epochs
次。
接下来,我们使用TFRecordReader()
从这个队列中读取文件,该函数接受一个文件名队列并从filename_queue
中逐个出队文件名。在内部,TFRecordReader()
使用图的状态来跟踪正在读取的 TFRecord 的位置,因为它从磁盘加载输入数据的“块之后的块”:
reader = tf.TFRecordReader()
_, serialized_example = reader.read(filename_queue)
features = tf.parse_single_example(
serialized_example,
features={
'image_raw': tf.FixedLenFeature([], tf.string),
'label': tf.FixedLenFeature([], tf.int64),
})
tf.train.shuffle_batch()
我们解码原始字节字符串数据,进行(非常)基本的预处理将像素值转换为浮点数,然后使用tf.train.shuffle_batch()
将图像实例洗牌并收集到batch_size
批次中,该函数内部使用RandomShuffleQueue
并累积示例,直到包含batch_size
+ min_after_dequeue
个元素:
image = tf.decode_raw(features['image_raw'], tf.uint8)
image.set_shape([784])
image = tf.cast(image, tf.float32) * (1. / 255) - 0.5
label = tf.cast(features['label'], tf.int32)
# Randomly collect instances into batches
images_batch, labels_batch = tf.train.shuffle_batch(
[image, label], batch_size=128,
capacity=2000,
min_after_dequeue=1000)
capacity
和min_after_dequeue
参数的使用方式与之前讨论的相同。由shuffle_batch()
返回的小批次是在内部创建的RandomShuffleQueue
上调用dequeue_many()
的结果。
tf.train.start_queue_runners()和总结
我们将简单的 softmax 分类模型定义如下:
W = tf.get_variable("W", [28*28, 10])
y_pred = tf.matmul(images_batch, W)
loss = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=y_pred,
labels=labels_batch)
loss_mean = tf.reduce_mean(loss)
train_op = tf.train.AdamOptimizer().minimize(loss)
sess = tf.Session()
init = tf.global_variables_initializer()
sess.run(init)
init = tf.local_variables_initializer()
sess.run(init)
最后,我们通过调用tf.train.start_queue_runners()
创建线程将数据入队到队列中。与其他调用不同,这个调用不是符号化的,实际上创建了线程(因此需要在初始化之后完成):
from __future__ import print_function
# Coordinator
coord = tf.train.Coordinator()
threads = tf.train.start_queue_runners(sess=sess,coord=coord)
让我们看一下创建的线程列表:
threads
Out:
[<Thread(Thread-483, stopped daemon 13696)>,
<Thread(Thread-484, started daemon 16376)>,
<Thread(Thread-485, started daemon 4320)>,
<Thread(Thread-486, started daemon 13052)>,
<Thread(Thread-487, started daemon 7216)>,
<Thread(Thread-488, started daemon 4332)>,
<Thread(Thread-489, started daemon 16820)>]
一切就绪后,我们现在准备运行多线程过程,从读取和预处理批次到将其放入队列再到训练模型。重要的是要注意,我们不再使用熟悉的feed_dict
参数——这样可以避免数据复制并提供加速,正如本章前面讨论的那样:
try:
step = 0
while not coord.should_stop():
step += 1
sess.run([train_op])
if step%500==0:
loss_mean_val = sess.run([loss_mean])
print(step)
print(loss_mean_val)
except tf.errors.OutOfRangeError:
print('Done training for %d epochs, %d steps.' % (NUM_EPOCHS, step))
finally:
# When done, ask the threads to stop
coord.request_stop()
# Wait for threads to finish
coord.join(threads)
sess.close()
直到抛出tf.errors.OutOfRangeError
错误,表示队列为空,我们已经完成训练:
Out:
Done training for 10 epochs, 2299500 steps.
未来的输入管道
在 2017 年中期,TensorFlow 开发团队宣布了 Dataset API,这是一个新的初步输入管道抽象,提供了一些简化和加速。本章介绍的概念,如 TFRecords 和队列,是 TensorFlow 及其输入管道过程的基础,仍然处于核心地位。TensorFlow 仍然在不断发展中,自然会不时发生令人兴奋和重要的变化。请参阅问题跟踪器进行持续讨论。
总结
在本章中,我们看到了如何在 TensorFlow 中使用队列和线程,以及如何创建一个多线程输入管道。这个过程可以帮助增加吞吐量和资源利用率。在下一章中,我们将进一步展示如何在分布式环境中使用 TensorFlow,在多个设备和机器之间进行工作。
第九章:分布式 TensorFlow
在本章中,我们讨论了使用 TensorFlow 进行分布式计算。我们首先简要调查了在机器学习中分布模型训练的不同方法,特别是深度学习。然后介绍了为支持分布式计算而设计的 TensorFlow 元素,最后通过一个端到端的示例将所有内容整合在一起。
分布式计算
分布式 计算,在最一般的术语中,意味着利用多个组件来执行所需的计算或实现目标。在我们的情况下,这意味着使用多台机器来加快深度学习模型的训练。
这背后的基本思想是通过使用更多的计算能力,我们应该能够更快地训练相同的模型。尽管通常情况下确实如此,但实际上更快多少取决于许多因素(即,如果您期望使用 10 倍资源并获得 10 倍加速,您很可能会感到失望!)。
在机器学习环境中有许多分布计算的方式。您可能希望利用多个设备,无论是在同一台机器上还是跨集群。在训练单个模型时,您可能希望在集群上计算梯度以加快训练速度,无论是同步还是异步。集群也可以用于同时训练多个模型,或者为单个模型搜索最佳参数。
在接下来的小节中,我们将详细介绍并行化的许多方面。
并行化发生在哪里?
在并行化类型的分类中,第一个分割是位置。我们是在单台机器上使用多个计算设备还是跨集群?
在一台机器上拥有强大的硬件与多个设备变得越来越普遍。云服务提供商(如亚马逊网络服务)现在提供这种类型的平台设置并准备就绪。
无论是在云端还是本地,集群配置在设计和演进方面提供了更多的灵活性,设置可以扩展到目前在同一板上使用多个设备所不可行的程度(基本上,您可以使用任意大小的集群)。
另一方面,虽然同一板上的几个设备可以使用共享内存,但集群方法引入了节点之间通信的时间成本。当需要共享的信息量很大且通信相对缓慢时,这可能成为一个限制因素。
并行化的目标是什么?
第二个分割是实际目标。我们是想使用更多硬件使相同的过程更快,还是为了并行化多个模型的训练?
在开发阶段经常需要训练多个模型,需要在模型或超参数之间做出选择。在这种情况下,通常会运行几个选项并选择表现最佳的一个。这样做是很自然的。
另一方面,当训练单个(通常是大型)模型时,可以使用集群来加快训练速度。在最常见的方法中,称为数据并行,每个计算设备上都存在相同的模型结构,每个副本上运行的数据是并行化的。
例如,当使用梯度下降训练深度学习模型时,该过程由以下步骤组成:
-
计算一批训练样本的梯度。
-
对梯度求和。
-
相应地对模型参数应用更新。
很明显,这个模式的第 1 步适合并行化。简单地使用多个设备计算梯度(针对不同的训练样本),然后在第 2 步中聚合结果并求和,就像常规情况下一样。
同步与异步数据并行
在刚才描述的过程中,来自不同训练示例的梯度被聚合在一起,以对模型参数进行单次更新。这就是所谓的同步训练,因为求和步骤定义了一个流必须等待所有节点完成梯度计算的点。
有一种情况可能更好地避免这种情况,即当异构计算资源一起使用时,因为同步选项意味着等待节点中最慢的节点。
异步选项是在每个节点完成为其分配的训练示例的梯度计算后独立应用更新步骤。
TensorFlow 元素
在本节中,我们将介绍在并行计算中使用的 TensorFlow 元素和概念。这不是完整的概述,主要作为本章结束的并行示例的介绍。
tf.app.flags
我们从一个与并行计算完全无关但对本章末尾的示例至关重要的机制开始。实际上,在 TensorFlow 示例中广泛使用flags
机制,值得讨论。
实质上,tf.app.flags
是 Python argparse
模块的包装器,通常用于处理命令行参数,具有一些额外和特定的功能。
例如,考虑一个具有典型命令行参数的 Python 命令行程序:
'python distribute.py --job_name="ps" --task_index=0'
程序distribute.py传递以下内容:
job_name="ps"
task_index=0
然后在 Python 脚本中提取这些信息,使用:
tf.app.flags.DEFINE_string("job_name", "", "name of job")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")
参数(字符串和整数)由命令行中的名称、默认值和参数描述定义。
flags
机制允许以下类型的参数:
-
tf.app.flags.DEFINE_string
定义一个字符串值。 -
tf.app.flags.DEFINE_boolean
定义一个布尔值。 -
tf.app.flags.DEFINE_float
定义一个浮点值。 -
tf.app.flags.DEFINE_integer
定义一个整数值。
最后,tf.app.flags.FLAGS
是一个结构,包含从命令行输入解析的所有参数的值。参数可以通过FLAGS.arg
访问,或者在必要时通过字典FLAGS.__flags
访问(然而,强烈建议使用第一种选项——它设计的方式)。
集群和服务器
一个 TensorFlow 集群只是参与计算图并行处理的节点(也称为任务)的集合。每个任务由其可以访问的网络地址定义。例如:
parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
"localhost:2224",
"localhost:2225"]
cluster = tf.train.ClusterSpec({"parameter_server": parameter_servers,
"worker": workers})
在这里,我们定义了四个本地任务(请注意,localhost:*XXXX*
指向当前机器上端口XXXX,在多台计算机设置中,localhost
将被 IP 地址替换)。任务分为一个参数服务器和三个工作节点。参数服务器/工作节点分配被称为作业。我们稍后在本章中进一步描述这些在训练期间的作用。
每个任务必须运行一个 TensorFlow 服务器,以便既使用本地资源进行实际计算,又与集群中的其他任务通信,以促进并行化。
基于集群定义,第一个工作节点上的服务器(即localhost:2223
)将通过以下方式启动:
server = tf.train.Server(cluster,
job_name="worker",
task_index=0)
由Server()
接收的参数让它知道自己的身份,以及集群中其他成员的身份和地址。
一旦我们有了集群和服务器,我们就构建计算图,这将使我们能够继续进行并行计算。
在设备之间复制计算图
如前所述,有多种方法可以进行并行训练。在“设备放置”中,我们简要讨论如何直接将操作放置在集群中特定任务上。在本节的其余部分,我们将介绍对于图间复制所必需的内容。
图间 复制 指的是常见的并行化模式,其中在每个 worker 任务上构建一个单独但相同的计算图。在训练期间,每个 worker 计算梯度,并由参数服务器组合,参数服务器还跟踪参数的当前版本,以及可能是训练的其他全局元素(如全局步骤计数器等)。
我们使用tf.train.replica_device_setter()
来在每个任务上复制模型(计算图)。worker_device
参数应该指向集群中当前任务。例如,在第一个 worker 上我们运行这个:
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % 0,
cluster=cluster)):
# Build model...
例外是参数服务器,我们不在其上构建计算图。为了使进程不终止,我们使用:
server.join()
这将在并行计算的过程中保持参数服务器的运行。
管理的会话
在这一部分,我们将介绍我们将在模型的并行训练中使用的机制。首先,我们定义一个Supervisor
:
sv = tf.train.Supervisor(is_chief=...,
logdir=...,
global_step=...,
init_op=...)
正如其名称所示,Supervisor
用于监督训练,在并行设置中提供一些必要的实用程序。
传递了四个参数:
is_chief
(布尔值)
必须有一个单一的chief,负责初始化等任务。
logdir
(字符串)
存储日志的位置。
global_step
一个 TensorFlow 变量,将在训练期间保存当前的全局步骤。
init_op
一个用于初始化模型的 TensorFlow 操作,比如tf.global_variables_initializer()
。
然后启动实际会话:
with sv.managed_session(server.target) as sess:
# Train ...
在这一点上,chief 将初始化变量,而所有其他任务等待这个过程完成。
设备放置
在本节中我们讨论的最终 TensorFlow 机制是设备放置。虽然这个主题的全部内容超出了本章的范围,但概述中没有提到这种能力是不完整的,这在工程高级系统时非常有用。
在具有多个计算设备(CPU、GPU 或这些组合)的环境中操作时,控制计算图中每个操作将发生的位置可能是有用的。这可能是为了更好地利用并行性,利用不同设备的不同能力,并克服某些设备的内存限制等限制。
即使您没有明确选择设备放置,TensorFlow 也会在需要时输出所使用的放置。这是在构建会话时启用的:
tf.Session(config=tf.ConfigProto(log_device_placement=True))
为了明确选择一个设备,我们使用:
with tf.device('/gpu:0'):
op = ...
'/gpu:0'
指向系统上的第一个 GPU;同样,我们可以使用'/cpu:0'
将操作放置在 CPU 上,或者在具有多个 GPU 设备的系统上使用'/gpu:X'
,其中X
是我们想要使用的 GPU 的索引。
最后,跨集群的放置是通过指向特定任务来完成的。例如:
with tf.device("/job:worker/task:2"):
op = ...
这将分配给集群规范中定义的第二个worker
任务。
跨 CPU 的放置
默认情况下,TensorFlow 使用系统上所有可用的 CPU,并在内部处理线程。因此,设备放置'/cpu:0'
是完整的 CPU 功率,'/cpu:1'
默认情况下不存在,即使在多 CPU 环境中也是如此。
为了手动分配到特定的 CPU(除非您有非常充分的理由这样做,否则让 TensorFlow 处理),必须使用指令定义一个会话来分离 CPU:
config = tf.ConfigProto(device_count={"CPU": 8},
inter_op_parallelism_threads=8,
intra_op_parallelism_threads=1)
sess = tf.Session(config=config)
在这里,我们定义了两个参数:
-
inter_op_parallelism_threads=8
,意味着我们允许八个线程用于不同的操作 -
intra_op_parallelism_threads=1
,表示每个操作都有一个线程
这些设置对于一个 8-CPU 系统是有意义的。
分布式示例
在本节中,我们将所有内容整合在一起,以端到端的方式展示了我们在第四章中看到的 MNIST CNN 模型的分布式训练示例。我们将使用一个参数服务器和三个工作任务。为了使其易于重现,我们将假设所有任务都在单台机器上本地运行(通过将localhost
替换为 IP 地址,如前所述,可以轻松适应多机设置)。像往常一样,我们首先呈现完整的代码,然后将其分解为元素并加以解释:
import tensorflow as tf
from tensorflow.contrib import slim
from tensorflow.examples.tutorials.mnist import input_data
BATCH_SIZE = 50
TRAINING_STEPS = 5000
PRINT_EVERY = 100
LOG_DIR = "/tmp/log"
parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
"localhost:2224",
"localhost:2225"]
cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})
tf.app.flags.DEFINE_string("job_name", "", "'ps' / 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")
FLAGS = tf.app.flags.FLAGS
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
def net(x):
x_image = tf.reshape(x, [-1, 28, 28, 1])
net = slim.layers.conv2d(x_image, 32, [5, 5], scope='conv1')
net = slim.layers.max_pool2d(net, [2, 2], scope='pool1')
net = slim.layers.conv2d(net, 64, [5, 5], scope='conv2')
net = slim.layers.max_pool2d(net, [2, 2], scope='pool2')
net = slim.layers.flatten(net, scope='flatten')
net = slim.layers.fully_connected(net, 500, scope='fully_connected')
net = slim.layers.fully_connected(net, 10, activation_fn=None,
scope='pred')
return net
if FLAGS.job_name == "ps":
server.join()
elif FLAGS.job_name == "worker":
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
global_step = tf.get_variable('global_step', [],
initializer=tf.constant_initializer(0),
trainable=False)
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
y = net(x)
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(y, y_))
train_step = tf.train.AdamOptimizer(1e-4)\
.minimize(cross_entropy, global_step=global_step)
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
init_op = tf.global_variables_initializer()
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir=LOG_DIR,
global_step=global_step,
init_op=init_op)
with sv.managed_session(server.target) as sess:
step = 0
while not sv.should_stop() and step <= TRAINING_STEPS:
batch_x, batch_y = mnist.train.next_batch(BATCH_SIZE)
_, acc, step = sess.run([train_step, accuracy, global_step],
feed_dict={x: batch_x, y_: batch_y})
if step % PRINT_EVERY == 0:
print "Worker : {}, Step: {}, Accuracy (batch): {}".\
format(FLAGS.task_index, step, acc)
test_acc = sess.run(accuracy, feed_dict={x: mnist.test.images,
y_: mnist.test.labels})
print "Test-Accuracy: {}".format(test_acc)
sv.stop()
为了运行这个分布式示例,我们从四个不同的终端执行四个命令来分派每个任务(我们将很快解释这是如何发生的):
python distribute.py --job_name="ps" --task_index=0
python distribute.py --job_name="worker" --task_index=0
python distribute.py --job_name="worker" --task_index=1
python distribute.py --job_name="worker" --task_index=2
或者,以下将自动分派四个任务(取决于您使用的系统,输出可能全部发送到单个终端或四个单独的终端):
import subprocess
subprocess.Popen('python distribute.py --job_name="ps" --task_index=0',
shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=0',
shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=1',
shell=True)
subprocess.Popen('python distribute.py --job_name="worker" --task_index=2',
shell=True)
接下来,我们将检查前面示例中的代码,并突出显示这与我们迄今在书中看到的示例有何不同。
第一个块处理导入和常量:
import tensorflow as tf
from tensorflow.contrib import slim
from tensorflow.examples.tutorials.mnist import input_data
BATCH_SIZE = 50
TRAINING_STEPS = 5000
PRINT_EVERY = 100
LOG_DIR = "/tmp/log"
在这里我们定义:
BATCH_SIZE
在每个小批次训练中要使用的示例数。
TRAINING_STEPS
我们将在训练中使用的小批次总数。
PRINT_EVERY
打印诊断信息的频率。由于在我们使用的分布式训练中,所有任务都有一个当前步骤的计数器,因此在某个步骤上的print
只会从一个任务中发生。
LOG_DIR
训练监督员将把日志和临时信息保存到此位置。在程序运行之间应该清空,因为旧信息可能导致下一个会话崩溃。
接下来,我们定义集群,如本章前面讨论的:
parameter_servers = ["localhost:2222"]
workers = ["localhost:2223",
"localhost:2224",
"localhost:2225"]
cluster = tf.train.ClusterSpec({"ps": parameter_servers, "worker": workers})
我们在本地运行所有任务。为了使用多台计算机,将localhost
替换为正确的 IP 地址。端口 2222-2225 也是任意的,当然(但在使用单台机器时必须是不同的):在分布式设置中,您可能会在所有机器上使用相同的端口。
在接下来的内容中,我们使用tf.app.flags
机制来定义两个参数,我们将通过命令行在每个任务调用程序时提供:
tf.app.flags.DEFINE_string("job_name", "", "'ps' / 'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task")
FLAGS = tf.app.flags.FLAGS
参数如下:
job_name
这将是'ps'
表示单参数服务器,或者对于每个工作任务将是'worker'
。
task_index
每种类型工作中任务的索引。因此,参数服务器将使用task_index = 0
,而对于工作任务,我们将有0
,1
和2
。
现在我们准备使用我们在本章中定义的集群中当前任务的身份来定义此当前任务的服务器。请注意,这将在我们运行的四个任务中的每一个上发生。这四个任务中的每一个都知道自己的身份(job_name
,task_index
),以及集群中其他每个人的身份(由第一个参数提供):
server = tf.train.Server(cluster,
job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
在开始实际训练之前,我们定义我们的网络并加载要使用的数据。这类似于我们在以前的示例中所做的,所以我们不会在这里再次详细说明。为了简洁起见,我们使用 TF-Slim:
mnist = input_data.read_data_sets('MNIST_data', one_hot=True)
def net(x):
x_image = tf.reshape(x, [-1, 28, 28, 1])
net = slim.layers.conv2d(x_image, 32, [5, 5], scope='conv1')
net = slim.layers.max_pool2d(net, [2, 2], scope='pool1')
net = slim.layers.conv2d(net, 64, [5, 5], scope='conv2')
net = slim.layers.max_pool2d(net, [2, 2], scope='pool2')
net = slim.layers.flatten(net, scope='flatten')
net = slim.layers.fully_connected(net, 500, scope='fully_connected')
net = slim.layers.fully_connected(net, 10, activation_fn=None, scope='pred')
return net
在训练期间要执行的实际处理取决于任务的类型。对于参数服务器,我们希望机制主要是为参数提供服务。这包括等待请求并处理它们。要实现这一点,只需要这样做:
if FLAGS.job_name == "ps":
server.join()
服务器的.join()
方法即使在所有其他任务终止时也不会终止,因此一旦不再需要,必须在外部终止此进程。
在每个工作任务中,我们定义相同的计算图:
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
global_step = tf.get_variable('global_step', [],
initializer=tf.constant_initializer(0),
trainable=False)
x = tf.placeholder(tf.float32, shape=[None, 784], name="x-input")
y_ = tf.placeholder(tf.float32, shape=[None, 10], name="y-input")
y = net(x)
cross_entropy = tf.reduce_mean(
tf.nn.softmax_cross_entropy_with_logits(y, y_))
train_step = tf.train.AdamOptimizer(1e-4)\
.minimize(cross_entropy, global_step=global_step)
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_, 1))
accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
init_op = tf.global_variables_initializer()
我们使用tf.train.replica_device_setter()
来指定这一点,这意味着 TensorFlow 变量将通过参数服务器进行同步(这是允许我们进行分布式计算的机制)。
global_step
变量将保存跨任务训练期间的总步数(每个步骤索引只会出现在一个任务上)。这样可以创建一个时间线,以便我们始终知道我们在整个计划中的位置,从每个任务分开。
其余的代码是我们在整本书中已经看过的许多示例中看到的标准设置。
接下来,我们设置一个Supervisor
和一个managed_session
:
sv = tf.train.Supervisor(is_chief=(FLAGS.task_index == 0),
logdir=LOG_DIR,
global_step=global_step,
init_op=init_op)
with sv.managed_session(server.target) as sess:
这类似于我们在整个过程中使用的常规会话,只是它能够处理分布式的一些方面。变量的初始化将仅在一个任务中完成(通过is_chief
参数指定的首席任务;在我们的情况下,这将是第一个工作任务)。所有其他任务将等待这个任务完成,然后继续。
会话开启后,我们开始训练:
while not sv.should_stop() and step <= TRAINING_STEPS:
batch_x, batch_y = mnist.train.next_batch(BATCH_SIZE)
_, acc, step = sess.run([train_step, accuracy, global_step],
feed_dict={x: batch_x, y_: batch_y})
if step % PRINT_EVERY == 0:
print "Worker : {}, Step: {}, Accuracy (batch): {}".\
format(FLAGS.task_index, step, acc)
每隔PRINT_EVERY
步,我们打印当前小批量的当前准确率。这将很快达到 100%。例如,前两行可能是:
Worker : 1, Step: 0.0, Accuracy (batch): 0.140000000596
Worker : 0, Step: 100.0, Accuracy (batch): 0.860000014305
最后,我们运行测试准确率:
test_acc = sess.run(accuracy,
feed_dict={x: mnist.test.images, y_: mnist.test.labels})
print "Test-Accuracy: {}".format(test_acc)
请注意,这将在每个工作任务上执行,因此相同的输出将出现三次。为了节省计算资源,我们可以只在一个任务中运行这个(例如,只在第一个工作任务中)。
总结
在本章中,我们涵盖了关于深度学习和机器学习中并行化的主要概念,并以一个关于数据并行化集群上分布式训练的端到端示例结束。
分布式训练是一个非常重要的工具,既可以加快训练速度,也可以训练那些否则不可行的模型。在下一章中,我们将介绍 TensorFlow 的 Serving 功能,允许训练好的模型在生产环境中被利用。
标签:lrn,merge,train,线程,tf,TensorFlow,data,我们 From: https://www.cnblogs.com/apachecn/p/18011873