tensorflow:tf.data.Dataset用法解析和模型實戰(zhàn)

關鍵詞:tensorflow,tf.estimator

內(nèi)容目錄

  • tf.data.Dataset簡介
  • tf.data.Dataset.from_tensor_slices的使用
  • shuffle,repeat,batch的順序搭配
  • 使用from_tensor_slices和from_structure管道進行訓練和測試
  • 使用from_tensor_slices管道和tf.estimator評估器進行訓練和測試

tf.data.Dataset簡介

tf.data.Dataset支持將內(nèi)存中的訓練數(shù)據(jù)(列表,元組,字典)輸入為tensor對象,且可以使用各種API完成對數(shù)據(jù)進行映射,亂序,批次,復制,另外它采用輸入管道的方式進行數(shù)據(jù)輸入,不再使用占位符和feed_dict將Python對象在每個批次中傳遞到靜態(tài)圖,而是直接在管道內(nèi)部轉化為tensor對象直接輸入到圖,降低了整體由于等待數(shù)據(jù)輸入導致的計算資源閑置時間,簡單而言使用tf.data.Dataset管道進行訓練數(shù)據(jù)輸入使得訓練效率更高。


快速開始

import tensorflow as tf

x = [[2.0, 3.3], [1.2, 3.2]]
y = [1, 0]
data = tf.data.Dataset.from_tensor_slices((x, y))  # 以元組進行輸入
data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y})  # 以字典進行輸入
iters = data.make_one_shot_iterator()  # 轉化為迭代器
iters2 = data2.make_one_shot_iterator()

with tf.Session() as sess:
    for i in range(2):
        one = iters.get_next()
        a = sess.run(one)
        print(a)
        two = iters2.get_next()
        b = sess.run(two)
        print(b)

打印結果如下

(array([2. , 3.3], dtype=float32), 1)
{'x': array([2. , 3.3], dtype=float32), 'y': 1}
(array([1.2, 3.2], dtype=float32), 0)
{'x': array([1.2, 3.2], dtype=float32), 'y': 0}

管道每次輸入分別為訓練的x和y的各一行,其中以元組輸入以下標獲得對應的特征或者標簽,以字典輸入以key獲得特征或者標簽,管道的輸入是一個tensor需要在Session里面run出來。


tf.data.Dataset.from_tensor_slices的含義和輸入要求

該函數(shù)是把內(nèi)存中的Python數(shù)據(jù)輸入管道,slices的含義是針對列表形式的向量,以最外邊的那一維(向量的第一維)進行切割,作為樣本和樣本之間分割條件(新的一行),例如輸入x是一個三維向量(3,2,2),y是一個一維向量

x = [[[2.0, 3.3], [1.2, 3.2]], [[1.0, -2.3], [1.0, 2.1]], [[-1.5, 0.7], [1.9, -0.2]]]
y = [1, 0, 1]

data = tf.data.Dataset.from_tensor_slices((x, y))
data_iter = data.make_one_shot_iterator()

with tf.Session() as sess:
    try:
        for i in range(3):
            one = data_iter.get_next()
            a = sess.run(one)
            print(a)
    except tf.errors.OutOfRangeError:
        print("已經(jīng)沒有數(shù)據(jù)")

輸入如下

(array([[2. , 3.3],
       [1.2, 3.2]], dtype=float32), 1)
(array([[ 1. , -2.3],
       [ 1. ,  2.1]], dtype=float32), 0)
(array([[-1.5,  0.7],
       [ 1.9, -0.2]], dtype=float32), 1)

對于字典的形式,只是給數(shù)據(jù)增加了一個自定義的key,而value也是遵守同元組一樣的切分規(guī)則,只需把代碼改成

data = tf.data.Dataset.from_tensor_slices({"x": x, "y": y})

輸出如下

{'x': array([[2. , 3.3],
       [1.2, 3.2]], dtype=float32), 'y': 1}
{'x': array([[ 1. , -2.3],
       [ 1. ,  2.1]], dtype=float32), 'y': 0}
{'x': array([[-1.5,  0.7],
       [ 1.9, -0.2]], dtype=float32), 'y': 1}

定義元組和字典給from_tensor_slices是在告訴它輸入的是不同的列,每個列必須是列表元素。


獲取管道數(shù)據(jù)

將tf.data.Dataset創(chuàng)建的DatasetV1Adapter對象通過make_one_shot_iterator,make_initializable_iterator轉化為Iterator,通過迭代器的get_next方法獲取數(shù)據(jù),數(shù)據(jù)是tensor類型

make_one_shot_iterator

一次迭代,不需要顯式初始化,它自動初始化,不支持參數(shù)化,例如

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2], [1.9, -0.3]]
y = [1, 0, 1, 1, 0, 1, 0]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y})
iters2 = data2.make_one_shot_iterator()

with tf.Session() as sess:
    try:
        for i in range(7):
            one = iters2.get_next()
            a = sess.run(one)
    except tf.errors.OutOfRangeError:
        print("已經(jīng)沒有數(shù)據(jù)")

get_next的結果要傳遞給Session,在Session中不需要對迭代器做初始化,另外get_next隨便放在Session內(nèi)還是外都可以,比如下面效果是一樣的

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y})
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    try:
        for i in range(7):
            a = sess.run(one)
    except tf.errors.OutOfRangeError:
        print("已經(jīng)沒有數(shù)據(jù)")
make_initializable_iterator

需要首先運行初始化指令iterator.initializer(),支持參數(shù)化,使用tf.placeholder()可以在管道內(nèi)傳參

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2], [1.9, -0.3]]
y = [1, 0, 1, 1, 0, 1, 0]
z = tf.placeholder(tf.float32, shape=[])

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).map(lambda x: {"x": x["x"] + z, "y": x["y"]})
iters2 = data2.make_initializable_iterator()

with tf.Session() as sess:
    sess.run(iters2.initializer, feed_dict={z: -10.0})
    try:
        for i in range(7):
            one = iters2.get_next()
            a = sess.run(one)
            print(a)
    except tf.errors.OutOfRangeError:
        print("已經(jīng)沒有數(shù)據(jù)")

打印如下

{'x': array([-8. , -6.7], dtype=float32), 'y': 1}
{'x': array([-8.8, -6.8], dtype=float32), 'y': 0}
{'x': array([ -9. , -12.3], dtype=float32), 'y': 1}
{'x': array([-9. , -7.9], dtype=float32), 'y': 1}
{'x': array([-11.5,  -9.3], dtype=float32), 'y': 0}
{'x': array([ -8.1, -10.2], dtype=float32), 'y': 1}
{'x': array([ -8.1, -10.3], dtype=float32), 'y': 0}

在Session中調(diào)用了迭代器的initializer,同時將占位符傳遞到管道內(nèi)部,作用是給管道的map函數(shù)作為參數(shù)使用,本例中是給x每個元素減10。


對管道數(shù)據(jù)進行操作

tf.data.Dataset創(chuàng)建的管道數(shù)據(jù)支持訓練需要的數(shù)據(jù)復制,打亂,批次生成等操作

repeat操作

將數(shù)據(jù)進行復制,類似epoch進行循環(huán)

x = [[2.0, 3.3], [1.2, 3.2]]
y = [1, 0]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).repeat(2)
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    for i in range(4):
        a = sess.run(one)
        print(a)

打印如下,整個數(shù)據(jù)被重復讀取了1次

{'x': array([2. , 3.3], dtype=float32), 'y': 1}
{'x': array([1.2, 3.2], dtype=float32), 'y': 0}
{'x': array([2. , 3.3], dtype=float32), 'y': 1}
{'x': array([1.2, 3.2], dtype=float32), 'y': 0}

如果直接調(diào)用repeat()的話,生成的序列就會無限重復下去,沒有結束,因此也不會拋出tf.errors.OutOfRangeError異常。


batch操作

迭代器每次返回一個小批次而不是整個數(shù)據(jù)集

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2]]
y = [1, 0, 1, 1, 0, 1]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).batch(3)
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    for i in range(2):
        a = sess.run(one)
        print(a)

以三個為一組對整個數(shù)據(jù)集進行切分,輸出如下

{'x': array([[ 2. ,  3.3],
       [ 1.2,  3.2],
       [ 1. , -2.3]], dtype=float32), 'y': array([1, 0, 1], dtype=int32)}
{'x': array([[ 1. ,  2.1],
       [-1.5,  0.7],
       [ 1.9, -0.2]], dtype=float32), 'y': array([1, 0, 1], dtype=int32)}

如果batch不能剛好整除樣本數(shù),會在最后一個批次有不足batch的一組,例如改為4個一組

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).batch(4)

輸出如下最后一組數(shù)據(jù)量不足4

{'x': array([[ 2. ,  3.3],
       [ 1.2,  3.2],
       [ 1. , -2.3],
       [ 1. ,  2.1]], dtype=float32), 'y': array([1, 0, 1, 1], dtype=int32)}
{'x': array([[-1.5,  0.7],
       [ 1.9, -0.2]], dtype=float32), 'y': array([0, 1], dtype=int32)}

可以加入drop_remainder參數(shù)刪除不足batch的批次,同時可迭代次數(shù)也因此減1,因為刪除了最后一個批次

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).batch(4, drop_remainder=True)

設置batch之后對應的可迭代數(shù)量變少,同樣的如果調(diào)用迭代數(shù)大于batch除以總樣本數(shù)后的值,也會報錯 End of sequence,通過異常捕獲可以在沒有數(shù)據(jù)的停止下來

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2], [1.9, -0.3]]
y = [1, 0, 1, 1, 0, 1, 0]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).repeat(2).batch(3, drop_remainder=True)
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    try:
        for i in range(14):
            a = sess.run(one)
    except tf.errors.OutOfRangeError:
        print("已經(jīng)沒有數(shù)據(jù)")

shuffle操作

打亂整個數(shù)據(jù)集的順序,參數(shù)buffsize的大小越大,數(shù)據(jù)的混亂程度越高

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2]]
y = [1, 0, 1, 1, 0, 1]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).shuffle(100000)
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    for i in range(6):
        a = sess.run(one)
        print(a)

輸出如下,整體亂序,但是 元素都有輸出

{'x': array([ 1.9, -0.2], dtype=float32), 'y': 1}
{'x': array([1.2, 3.2], dtype=float32), 'y': 0}
{'x': array([1. , 2.1], dtype=float32), 'y': 1}
{'x': array([2. , 3.3], dtype=float32), 'y': 1}
{'x': array([-1.5,  0.7], dtype=float32), 'y': 0}
{'x': array([ 1. , -2.3], dtype=float32), 'y': 1}

repeat,batch,shuffle的順序要求

三者聯(lián)合使用的正確順序是先shuffle再repeat最后batch,例如

x = [[2.0, 3.3], [1.2, 3.2], [1.0, -2.3], [1.0, 2.1], [-1.5, 0.7], [1.9, -0.2], [1.9, -0.3]]
y = [1, 0, 1, 1, 0, 1, 0]

data2 = tf.data.Dataset.from_tensor_slices({"x": x, "y": y}).shuffle(1000).repeat(2).batch(3, drop_remainder=True)
iters2 = data2.make_one_shot_iterator()
one = iters2.get_next()

with tf.Session() as sess:
    for i in range(4):
        a = sess.run(one)
  • 先shuffle:保證一個epoch先shuffle,如果先repeat則整體shuffle,可能在一個epoch/batch之內(nèi)一個樣本輸出多條
  • 先repeat再batch:如果先batch再repeat,相當于對batch的結果再repeat,如果epoch不能被batch整除,就會出現(xiàn)每個epoch都會出現(xiàn)剩余的batch,這種情況被repeat之后導致訓練的時候動不動就出現(xiàn)樣本不足的batch

map操作

類似于Python的map,可以對管道的數(shù)據(jù)進行映射處理,此處不做展開


管道數(shù)據(jù)流轉總結

以一個特征和標簽數(shù)據(jù)輸入為例

x = [[[2.0, 3.3], [1.2, 3.2]], [[1.0, -2.3], [1.0, 2.1]], [[-1.5, 0.7], [1.9, -0.2]]]
y = [1, 0, 1]
管道數(shù)據(jù)流轉

通過from_tensor_slices將python的元組,字段對象轉化為DatasetV1Adapter對象,batch操作將數(shù)據(jù)拓展一維,make_one_shot_iterator將DatasetV1Adapter轉化為tensorflow可迭代對象,通過get_next獲取管道數(shù)據(jù),輸出是一個元組或者字典形式的tensorflow的Tensor。

使用tf.data.Dataset.from_tensor_slices進行模型訓練

由于管道的輸出直接是tensor,因此可以直接輸入網(wǎng)絡而不需要feed_dict,如果不使用管道,一個簡單的模型網(wǎng)絡代碼如下

class Model(object):
    def __init__(self, num_class, feature_size, learning_rate=0.05, weight_decay=0.01, decay_learning_rate=0.99):
        self.input_x = tf.placeholder(tf.float32, [None, feature_size], name="input_x")
        self.input_y = tf.placeholder(tf.float32, [None, num_class], name="input_y")
        self.dropout_keep_prob = tf.placeholder(tf.float32, name="dropout_keep_prob")
        self.global_step = tf.Variable(0, name="global_step", trainable=False)

        with tf.name_scope('layer_1'):
            dense_out_1 = tf.layers.dense(self.input_x, 64)
            dense_out_act_1 = tf.nn.relu(dense_out_1)

        with tf.name_scope('layer_2'):
            dense_out_2 = tf.layers.dense(dense_out_act_1, 32)
            dense_out_act_2 = tf.nn.relu(dense_out_2)

        with tf.name_scope('layer_out'):
            self.output = tf.layers.dense(dense_out_act_2, 2)
            self.probs = tf.nn.softmax(self.output, dim=1, name="probs")

        with tf.name_scope('loss'):
            self.loss = tf.reduce_mean(
                tf.nn.softmax_cross_entropy_with_logits_v2(logits=self.output, labels=self.input_y))
            vars = tf.trainable_variables()
            loss_l2 = tf.add_n([tf.nn.l2_loss(v) for v in vars if
                                v.name not in ['bias', 'gamma', 'b', 'g', 'beta']]) * weight_decay
            self.loss += loss_l2

        with tf.name_scope("optimizer"):
            if decay_learning_rate:
                learning_rate = tf.train.exponential_decay(learning_rate, self.global_step, 100, decay_learning_rate)
            optimizer = tf.train.AdamOptimizer(learning_rate=learning_rate)
            self.train_step = optimizer.minimize(self.loss, global_step=self.global_step)

        with tf.name_scope("metrics"):
            self.accuracy = tf.reduce_mean(
                tf.cast(tf.equal(tf.arg_max(self.probs, 1), tf.arg_max(self.input_y, 1)), dtype=tf.float32))

需要手動用yield實現(xiàn)一個迭代器,完成復制,批次,打亂的操作

def get_batch(epochs, batch_size, features, labels):
    for epoch in range(epochs):
        tmp = list(zip(features, labels))
        shuffle(tmp)
        features, labels = zip(*tmp)
        for batch in range(0, len(features), batch_size):
            if batch + batch_size < len(features):
                batch_features = features[batch: (batch + batch_size)]
                batch_labels = labels[batch: (batch + batch_size)]
            else:
                batch_features = features[batch: len(features)]
                batch_labels = labels[batch: len(features)]

            yield epoch, batch_features, batch_labels

然后在Session使用feed_dict傳入數(shù)據(jù)

feed_dict = {model.input_x: batch_x, model.input_y: batch_y, model.dropout_keep_prob: 0.8}
            _, step, loss_train, acc_train = sess.run([model.train_step, model.global_step, model.loss, model.accuracy], feed_dict=feed_dict)

使用管道數(shù)據(jù)的場景下代碼修改如下

# 導入管道數(shù)據(jù)
    train_data = tf.data.Dataset.from_tensor_slices({"feature": train_x, "label": train_y}).shuffle(1000).repeat(20).batch(128, drop_remainder=True)
    test_data = tf.data.Dataset.from_tensor_slices({"feature": test_x, "label": test_y}).batch(len(test_x))
    data = tf.data.Iterator.from_structure(train_data.output_types, train_data.output_shapes)
    next_one = data.get_next()
    train_init_op = data.make_initializer(train_data)
    test_init_op = data.make_initializer(test_data)

    # 構建網(wǎng)絡
    dense_out_1 = tf.layers.dense(next_one["feature"], 64)
    dense_out_act_1 = tf.nn.relu(dense_out_1)
    dense_out_2 = tf.layers.dense(dense_out_act_1, 32)
    dense_out_act_2 = tf.nn.relu(dense_out_2)
    output = tf.layers.dense(dense_out_act_2, 2)
    probs = tf.nn.softmax(output, dim=1, name="probs")
    loss = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits_v2(logits=output, labels=next_one["label"]))
    vars = tf.trainable_variables()
    loss_l2 = tf.add_n([tf.nn.l2_loss(v) for v in vars if
                        v.name not in ['bias', 'gamma', 'b', 'g', 'beta']]) * 0.001
    loss += loss_l2
    optimizer = tf.train.AdamOptimizer(learning_rate=0.005)
    global_step = tf.Variable(0, name="global_step", trainable=False)
    train_step = optimizer.minimize(loss, global_step=global_step)
    accuracy = tf.reduce_mean(
        tf.cast(tf.equal(tf.arg_max(probs, 1), tf.arg_max(next_one["label"], 1)), dtype=tf.float32))

    saver = tf.train.Saver(tf.global_variables(), max_to_keep=1)

    with tf.Session() as sess:
        init_op = tf.group(tf.global_variables_initializer())
        sess.run(init_op)
        train_loss_list = []
        steps = []
        acc_list = []
        train_acc_list = []
        sess.run(train_init_op)
        while True:
            try:
                _, step, loss_val, acc_val = sess.run([train_step, global_step, loss, accuracy])
                train_loss_list.append(loss_val)
                steps.append(step)
                train_acc_list.append(acc_val)
                if step % 10 == 0:
                    print("step:", step, "loss:", loss_val)
                    # ckpt
                    saver.save(sess, os.path.join(BASIC_PATH, "./ckpt1/ckpt"))
            except tf.errors.OutOfRangeError:
                print("已經(jīng)沒有數(shù)據(jù)")
                break
        # 測試
        sess.run(test_init_op)
        loss_val, acc_val = sess.run([loss, accuracy])
        print("{:-^30}".format("evaluation"))
        print("[evaluation]", "loss:", loss_val, "acc", acc_val)

其中需要使用tf.data.Iterator.from_structure將訓練集和測試集一起輸入,通過make_initializer切換狀態(tài),在訓練的時候使用訓練,測試的時候使用測試,但是在代碼上共享一個變量


使用tf.data.Dataset+tf.estimator.Estimator訓練模型

tf.data.Dataset最常見的是和評估器tf.estimator.Estimator一起使用,將以上代碼改為如下格式,先定義輸入數(shù)據(jù)的函數(shù),包含訓練,測試和預測

def train_input_fn(train_x, train_y, batch_size):
    dataset = tf.data.Dataset.from_tensor_slices((train_x, train_y))
    dataset = dataset.shuffle(1000).repeat().batch(batch_size)

    return dataset


def eval_input_fn(data, label, batch=None):
    if label is None:
        return tf.data.Dataset.from_tensor_slices(data).batch(batch)
    else:
        return tf.data.Dataset.from_tensor_slices((data, label)).batch(batch)

網(wǎng)絡結構函數(shù)定義如下,將特征和標簽直接以tensor的形式輸入

def model(features: tf.Tensor, labels: tf.Tensor, mode: str, params: dict):
    # 定義網(wǎng)絡結構
    dense_out_1 = tf.layers.dense(features, params["hidden_1_dim"])
    dense_out_act_1 = tf.nn.relu(dense_out_1)
    dense_out_2 = tf.layers.dense(dense_out_act_1, params["hidden_2_dim"])
    dense_out_act_2 = tf.nn.relu(dense_out_2)
    output = tf.layers.dense(dense_out_act_2, params["output_dim"])
    probs = tf.nn.softmax(output, dim=1, name="probs")

    if mode == tf.estimator.ModeKeys.PREDICT:
        return tf.estimator.EstimatorSpec(mode, predictions=probs)

    accuracy = tf.metrics.accuracy(tf.arg_max(probs, 1), tf.arg_max(labels, 1))
    metrics = {"acc": accuracy}
    loss = tf.reduce_mean(
        tf.nn.softmax_cross_entropy_with_logits_v2(logits=output, labels=labels))
    vars = tf.trainable_variables()
    loss_l2 = tf.add_n([tf.nn.l2_loss(v) for v in vars if
                        v.name not in ['bias', 'gamma', 'b', 'g', 'beta']]) * params["weight_decay"]
    loss += loss_l2

    if mode == tf.estimator.ModeKeys.EVAL:
        return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)

    assert mode == tf.estimator.ModeKeys.TRAIN
    optimizer = tf.train.AdamOptimizer(learning_rate=params["learning_rate"])
    train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
    return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op)

訓練,測試,預測過程如下

params = {
        "learning_rate": 0.01,
        "weight_decay": 0.001,
        "hidden_1_dim": 64,
        "hidden_2_dim": 32,
        "output_dim": 2
    }
    config = tf.estimator.RunConfig()
    # 定義評估器
    estimator = tf.estimator.Estimator(model_fn=model, model_dir="./tf_estimator", params=params, config=config)
    # 訓練
    estimator.train(lambda: train_input_fn(train_x, train_y, 128), steps=200)
    # 測試
    train_metrics = estimator.evaluate(input_fn=lambda: eval_input_fn(test_x, test_y, len(test_x)))
    print(train_metrics)
    # 預測
    predictins = estimator.predict(input_fn=lambda: eval_input_fn(test_x, None, len(test_x)))

注意estimator的train,evaluate,predict接收的input_fn都要時無參數(shù)的函數(shù),而train_input_fn,eval_input_fn都是有參數(shù)的,因此使用匿名函數(shù)再包一層。

最后編輯于
?著作權歸作者所有,轉載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內(nèi)容