文章目录
- 1. 创建数据集
- 1.1 dict输入
- 1.2 处理tuple输入
- 2. 创建iterator
- 2.1 Dataset.make_one_shot_iterator()
- 2.2 Dataset.make_initializable_iterator()
- 3. Transformation
- 3.1 map
- 3.2 batch
- 3.3 shuffle
- 3.4 repeat
- 样例实践
本文主要参考了
TensorFlow学习笔记(4): Tensorflow tf.data.Dataset
Tensorflow中API------tf.data.Dataset使用
Dataset主要包含下面三个子类以及一个实例方法Iterator。
Dataset 是基类,表示一串元素(elements),其中每个元素(理解为一个训练样本或一个batch)包含了一或多个Tensor对象。例如:在一个图片pipeline中,一个元素可以是单个训练样本,它们带有一个表示图片数据的tensors和一个label组成的pair。包括了创造和变换(transform)datasets的方法,同时也允许从内存中的数据来初始化dataset。Dataset读取数据有以下三种方式:
-
TextLineDataset
从文本文件中读取行数据。 -
TFRecordDataset
从TFRecord文件中读取records。 -
FixLengthRecordDataset
从二进制文件中读取固定长度records。
Iterator
它提供了方法来从一个dataset中抽取元素。通过Iterator.get_next()
返回的该操作会yields出Datasets中的下一个元素,作为输入pipeline和模型间的接口使用。
1. 创建数据集
from_tensor_slices(tensors)
参数tensors必须是一个tensors,在0维的dize都相同。tf.data.Dataset.from_tensor_slices
的真正作用是切分传入Tensor的第一个维度,生成相应的dataset。例如上面传入的是一个矩阵(100,2), tf.data.Dataset.from_tensor_slices
就会切分它形状上的第一个维度,最后生成的dataset中一个含有100个元素,每个元素的形状是(2, ),即每个元素是矩阵的一行,如下所示。
注意:这里是非Eager模式,one_element只是一个tensor,需要运行session才能取值。
@staticmethod
from_tensor_slices(tensors)
# 从内存中创建Dataset
a = np.random.uniform(size=(100,2))
dataset = tf.data.Dataset.from_tensor_slices(a)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(10):
print(sess.run([one_element]))
********输出***********
[array([0.11397362, 0.68389881])]
[array([0.33010397, 0.01920068])]
[array([0.99258612, 0.30668152])]
[array([0.62999354, 0.96661998])]
[array([0.26922582, 0.29277836])]
[array([0.70142808, 0.82017049])]
[array([0.08068107, 0.37464286])]
[array([0.70070917, 0.62077841])]
[array([0.36669648, 0.8481603 ])]
[array([0.45951399, 0.79220773])]
在Eager模式中,通过tfe.Iterator(dataset)
直接创建Iterator并迭代。迭代时可以直接取出值,不需要使用sess.run():
import tensorflow.contrib.eager as tfe
tfe.enable_eager_execution()
dataset = tf.data.Dataset.from_tensor_slices(np.array([1.0, 2.0, 3.0, 4.0, 5.0]))
for one_element in tfe.Iterator(dataset):
print(one_element)
1.1 dict输入
同样支持dict类型的输入,例如,在图像识别问题中,一个元素可以是{"image": image_tensor, "label": labeltensor}
的形式,image_tensor包含了多个图片的信息列表,labeltensor包含了多个图片的标签,tf.data.Dataset.from_tensor_slices
可以处理得到每个element是一个dict,如下所示:
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
dataset = tf.data.Dataset.from_tensor_slices(b)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
********输出***********
[{'a': 1.0, 'b': array([0.17629646, 0.98159967])}]
[{'a': 2.0, 'b': array([0.62656944, 0.41537445])}]
[{'a': 3.0, 'b': array([0.94459501, 0.09661302])}]
[{'a': 4.0, 'b': array([0.66029436, 0.40497688])}]
[{'a': 5.0, 'b': array([0.67671157, 0.95346658])}]
1.2 处理tuple输入
可以看出下列代码输入tuple
dataset = tf.data.Dataset.from_tensor_slices(
(np.array([1.0, 2.0, 3.0, 4.0, 5.0]), np.random.uniform(size=(5, 2)))
)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
********输出***********
[(1.0, array([0.31577073, 0.21829554]))]
[(2.0, array([0.1872871 , 0.56726053]))]
[(3.0, array([0.32354807, 0.2709601 ]))]
[(4.0, array([0.61253432, 0.55664856]))]
[(5.0, array([0.75801247, 0.34546886]))]
2. 创建iterator
一旦你已经构建了一个Dataset来表示你的输入数据,下一步是创建一个Iterator
来访问Dataset的elements。Dataset API当前支持四种iterator
,复杂度依次递增:
- one-shot
- initializable
- reinitializable
- feedable
2.1 Dataset.make_one_shot_iterator()
one-shot iterator
是最简单的iterator,它只支持在一个dataset上迭代一次的操作,不需要显式初始化。One-shot iterators可以处理几乎所有的己存在的基于队列的input pipeline支持的情况,但它们不支持参数化(parameterization)
a = np.random.uniform(size=(100,2))
dataset = tf.data.Dataset.from_tensor_slices(a)
iterator = dataset.make_one_shot_iterator() # one-shot
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(10):
print(sess.run([one_element]))
********输出***********
[array([0.11397362, 0.68389881])]
[array([0.33010397, 0.01920068])]
[array([0.99258612, 0.30668152])]
[array([0.62999354, 0.96661998])]
[array([0.26922582, 0.29277836])]
[array([0.70142808, 0.82017049])]
[array([0.08068107, 0.37464286])]
[array([0.70070917, 0.62077841])]
[array([0.36669648, 0.8481603 ])]
[array([0.45951399, 0.79220773])]
2.2 Dataset.make_initializable_iterator()
在使用tf.data.Dataset.from_tensor_slices(array)
时,实际上发生的事情是将array作为一个tf.constants
保存到了计算图中。当array很大时,会导致计算图变得很大,给传输、保存带来不便。这时,我们可以用一个placeholder
取代这里的array,并使用initializable iterator
,只在需要时将array传进去,这样就可以避免把大数组保存在图里,如下所示:
示例1
max_value = tf.placeholder(tf.int64, shape=[])
dataset = tf.data.Dataset.range(max_value)
iterator = dataset.make_initializable_iterator()
next_element = iterator.get_next()
sess = tf.Session()
sess.run(tf.global_variables_initializer())
# Initialize an iterator over a dataset with 10 elements.
sess.run(iterator.initializer, feed_dict={max_value: 10})
for i in range(10):
value = sess.run(next_element)
assert i == value
print value
# Initialize the same iterator over a dataset with 100 elements.
sess.run(iterator.initializer, feed_dict={max_value: 100})
for i in range(100):
value = sess.run(next_element)
assert i == value
print value
示例2
with np.load("/var/data/training_data.npy") as data:
features = data["features"]
labels = data["labels"]
features_placeholder = tf.placeholder(features.dtype, features.shape)
labels_placeholder = tf.placeholder(labels.dtype, labels.shape)
dataset = tf.data.Dataset.from_tensor_slices((features_placeholder, labels_placeholder))
iterator = dataset.make_initializable_iterator()
sess.run(iterator.initializer, feed_dict={features_placeholder: features,labels_placeholder: labels})
3. Transformation
Dataset支持transformation这类操作,一个dataset通过transformation变成一个新的dataset,通常我们可以通过transformation完成数据变换、打乱、组成batch、生成epoch等一系列操作。
常用的transformation有
- map
- batch
- shuffle
- repeat
3.1 map
map接收一个函数,Dataset中的每个元素都会被当作这个函数的输入,并将函数返回值作为新的Dataset,例如我们对dict中的元素+1,如下所示
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.map(lambda x:{'a':x['a']+1,'b':x['b']})
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': 2.0, 'b': array([0.02247996, 0.11312282])}]
[{'a': 3.0, 'b': array([0.31083596, 0.81514463])}]
[{'a': 4.0, 'b': array([0.27203468, 0.19826087])}]
[{'a': 5.0, 'b': array([0.43270765, 0.36494948])}]
[{'a': 6.0, 'b': array([0.36024733, 0.92946233])}]
3.2 batch
batch是机器学习中批量梯度下降法(Batch Gradient Descent, BGD)的概念,在每次梯度下降的时候取batch-size的数据量做平均来获取梯度下降方向,例如我们将batch-size设为2,那么每次iterator都会得到2个数据,如下所示
正好打印两个batch的数据
b = {"a":np.array([1.0,2.0,3.0,4.0]),
"b": np.random.uniform(size=(4,2))}
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.batch(2)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(2):
print(sess.run([one_element]))
3.3 shuffle
shuffle的功能为打乱dataset中的元素, 它会维持一个固定大小的buffer,并从该buffer中随机均匀地选择下一个元素。
shuffle(
buffer_size, # shuffle的size,表示从现有dataset中采样元素的个数
seed=None, # random seed
reshuffle_each_iteration=None # boolean 表示在每次迭代结束都需要reshuffle
)
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.shuffle(4)
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
3.4 repeat
repeat的功能就是将整个序列重复多次,主要用来处理机器学习中的epoch,假设原先的数据是一个epoch,使用repeat(5)就可以将之变成5个epoch.
如下打印了5个epoch的数据。
b = {"a":np.array([1.0,2.0,3.0,4.0,5.0]),
"b": np.random.uniform(size=(5,2))}
# 创建dataset
dataset = tf.data.Dataset.from_tensor_slices(b)
dataset = dataset.repeat(5)
dataset = dataset.shuffle(4)
# 创建Iterator读取数据
iterator = dataset.make_one_shot_iterator()
one_element = iterator.get_next()
with tf.Session() as sess:
for i in range(5):
print(sess.run([one_element]))
*************输出*************
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 4.0, 'b': array([0.82844146, 0.15107684])}]
[{'a': 2.0, 'b': array([0.64863353, 0.42703828])}]
[{'a': 5.0, 'b': array([0.60242078, 0.44079629])}]
[{'a': 1.0, 'b': array([0.70024193, 0.72232312])}]
[{'a': 3.0, 'b': array([0.02262048, 0.19707233])}]
样例实践
读取花的样本数据,生成训练数据。数据共有5列,前4列为花的性质,最后一列为label。
tensorflow/models
import os
import six.moves.urllib.request as request
import tensorflow as tf
from distutils.version import StrictVersion
# Check that we have correct TensorFlow version installed
tf_version = tf.__version__
print("TensorFlow version: {}".format(tf_version))
assert StrictVersion("1.4") <= StrictVersion(tf_version), "TensorFlow r1.4 or later is needed"
# Windows users: You only need to change PATH, rest is platform independent
PATH = "/tmp/tf_dataset_and_estimator_apis"
# Fetch and store Training and Test dataset files
PATH_DATASET = PATH + os.sep + "dataset"
FILE_TRAIN = PATH_DATASET + os.sep + "iris_training.csv"
FILE_TEST = PATH_DATASET + os.sep + "iris_test.csv"
URL_TRAIN = "http://download.tensorflow.org/data/iris_training.csv"
URL_TEST = "http://download.tensorflow.org/data/iris_test.csv"
def download_dataset(url, file):
if not os.path.exists(PATH_DATASET):
os.makedirs(PATH_DATASET)
if not os.path.exists(file):
data = request.urlopen(url).read()
with open(file, "wb") as f:
f.write(data)
f.close()
download_dataset(URL_TRAIN, FILE_TRAIN)
download_dataset(URL_TEST, FILE_TEST)
tf.logging.set_verbosity(tf.logging.INFO)
# The CSV features in our training & test data
feature_names = [
'SepalLength',
'SepalWidth',
'PetalLength',
'PetalWidth']
# Create an input function reading a file using the Dataset API
# Then provide the results to the Estimator API
def my_input_fn(file_path, perform_shuffle=False, repeat_count=1):
def decode_csv(line):
parsed_line = tf.decode_csv(line, [[0.], [0.], [0.], [0.], [0]])
label = parsed_line[-1] # Last element is the label
del parsed_line[-1] # Delete last element
features = parsed_line # Everything but last elements are the features
d = dict(zip(feature_names, features)), label
return d
dataset = (tf.data.TextLineDataset(file_path) # Read text file
.skip(1) # Skip header row
.map(decode_csv)) # Transform each elem by applying decode_csv fn
if perform_shuffle:
# Randomizes input using a window of 256 elements (read into memory)
dataset = dataset.shuffle(buffer_size=256)
dataset = dataset.repeat(repeat_count) # Repeats dataset this # times
dataset = dataset.batch(32) # Batch size to use
iterator = dataset.make_one_shot_iterator()
batch_features, batch_labels = iterator.get_next()
return batch_features, batch_labels
next_batch = my_input_fn(FILE_TRAIN, True) # Will return 32 random elements
# Create the feature_columns, which specifies the input to our model
# All our input features are numeric, so use numeric_column for each one
feature_columns = [tf.feature_column.numeric_column(k) for k in feature_names]
# Create a deep neural network regression classifier
# Use the DNNClassifier pre-made estimator
classifier = tf.estimator.DNNClassifier(
feature_columns=feature_columns, # The input features to our model
hidden_units=[10, 10], # Two layers, each with 10 neurons
n_classes=3,
model_dir=PATH) # Path to where checkpoints etc are stored
# Train our model, use the previously defined function my_input_fn
# Input to training is a file with training example
# Stop training after 8 iterations of train data (epochs)
classifier.train(
input_fn=lambda: my_input_fn(FILE_TRAIN, True, 8))
# Evaluate our model using the examples contained in FILE_TEST
# Return value will contain evaluation_metrics such as: loss & average_loss
evaluate_result = classifier.evaluate(
input_fn=lambda: my_input_fn(FILE_TEST, False, 4))
print("Evaluation results")
for key in evaluate_result:
print(" {}, was: {}".format(key, evaluate_result[key]))
# Predict the type of some Iris flowers.
# Let's predict the examples in FILE_TEST, repeat only once.
predict_results = classifier.predict(
input_fn=lambda: my_input_fn(FILE_TEST, False, 1))
print("Predictions on test file")
for prediction in predict_results:
# Will print the predicted class, i.e: 0, 1, or 2 if the prediction
# is Iris Sentosa, Vericolor, Virginica, respectively.
print(prediction["class_ids"][0])
# Let create a dataset for prediction
# We've taken the first 3 examples in FILE_TEST
prediction_input = [[5.9, 3.0, 4.2, 1.5], # -> 1, Iris Versicolor
[6.9, 3.1, 5.4, 2.1], # -> 2, Iris Virginica
[5.1, 3.3, 1.7, 0.5]] # -> 0, Iris Sentosa
def new_input_fn():
def decode(x):
x = tf.split(x, 4) # Need to split into our 4 features
return dict(zip(feature_names, x)) # To build a dict of them
dataset = tf.data.Dataset.from_tensor_slices(prediction_input)
dataset = dataset.map(decode)
iterator = dataset.make_one_shot_iterator()
next_feature_batch = iterator.get_next()
return next_feature_batch, None # In prediction, we have no labels
# Predict all our prediction_input
predict_results = classifier.predict(input_fn=new_input_fn)
# Print results
print("Predictions:")
for idx, prediction in enumerate(predict_results):
type = prediction["class_ids"][0] # Get the predicted class (index)
if type == 0:
print(" I think: {}, is Iris Sentosa".format(prediction_input[idx]))
elif type == 1:
print(" I think: {}, is Iris Versicolor".format(prediction_input[idx]))
else:
print(" I think: {}, is Iris Virginica".format(prediction_input[idx]))