在Tensorflow中,隊列是一類非常特殊的元素,其涉及到的內(nèi)容也十分廣泛,使用的方法也是多種多樣.但是在本文中僅僅介紹一下使用隊列進行并行的輸入的方法.不過需要首先記住一點,使用隊列的方法進行輸入的本質(zhì)是從隊列取數(shù)據(jù)和加數(shù)據(jù)的過程是并行的,并且加數(shù)據(jù)的過程是可以采用多線程的.本文不同于其他的教程,將會以使用queue實現(xiàn)mini-batch數(shù)據(jù)輸入作為講述的核心.主要參考了這篇官網(wǎng)的教程
基本思路
??Mini-batch的數(shù)據(jù)輸入方式是很熟悉的套路.但是怎么生成這樣的mini-batch是難點.并且生成完1個epoch的所有mini-batch之后,將會回過頭去重新生成一遍所有的mini-batch.我們分析一下這樣的思路,可以發(fā)現(xiàn)如果采用隊列的方式對生成的batch進行存儲.每次取出來一個隊列元素(一個batch),順手在把這個batch 加回到隊列中,并且利用這個batch進行模型的訓練.訓練結(jié)束了之后將會再取一個batch ,循環(huán)往復.這樣就可以保證有源源不斷的batch可以取出來.
實現(xiàn)方法之隊列簡介

??上面的這個圖,就是最最最簡單的一種隊列啦.但是呢麻雀雖小,五臟俱全.這么一小段代碼,就涵蓋了我們用的到的幾個op.tf.FIFOQueue()可以創(chuàng)建一個最簡單的先進先出的隊列.q.enqueue_many()可以n入隊多個元素.q.dequeue()可以出隊一個元素.q.enqueue()則很明顯可以入隊一個元素。
QueueRunner和Coordinator的使用
??好像利用上一小節(jié)的內(nèi)容就可以實現(xiàn)我們上述的生成mini-batch的過程啦,但是這樣的話我這篇總結(jié)就太短啦~立即推:還沒完!
??其實確實,上一部分的內(nèi)容已經(jīng)足夠。想想,只需要首先把所有的batch都放入到隊列當中,然后每次出隊一個元素,在放入隊尾就行了。但是呢,Tensorflow比較貼心。給我們提供了另外一個工具QueueRunner.它的作用是用來幫我們往隊列里填數(shù)據(jù),沒有了就填沒有了就填。而且能夠多線程的填。用法如下:
def simple_shuffle_batch(source, capacity, batch_size=10):
# Create a random shuffle queue.
queue = tf.RandomShuffleQueue(capacity=capacity,
min_after_dequeue=int(0.9*capacity),
shapes=source.shape, dtypes=source.dtype)
# Create an op to enqueue one item.
enqueue = queue.enqueue(source)
# Create a queue runner that, when started, will launch 4 threads applying
# that enqueue op.
num_threads = 4
qr = tf.train.QueueRunner(queue, [enqueue] * num_threads)
# Register the queue runner so it can be found and started by
# <a href="../../api_docs/python/tf/train/start_queue_runners"><code>tf.train.start_queue_runners</code></a> later (the threads are not launched yet).
tf.train.add_queue_runner(qr)
# Create an op to dequeue a batch
return queue.dequeue_many(batch_size)
??從代碼里可以看出來,使用queuerunner需要三個步驟
????1.需要創(chuàng)建一個queuerunner,即qr = tf.train.QueueRunner(queue, [enqueue] * num_threads)
????2.并且添加到訓練中去tf.train.add_queue_runner(qr)。
????3.代碼中其實還少一步,就是要使用tf.train.start_queue_runners()這個函數(shù)來啟動我們的自動化小助手queue runner。否則它不會干活幫我們填充數(shù)據(jù)的。
??還沒完!Tensorflow還提供了一個東西叫Coordinator這個東西呢,只有在我們使用多線程版本的queue runner的時候才有用,它的作用是負責統(tǒng)一規(guī)劃。等到所有的線程都結(jié)束了,就做個記錄,說好啦可以收工啦,可以匯合數(shù)據(jù)啦。用法也是極其簡單的。
# Using Python's threading library.
import threading
# Thread body: loop until the coordinator indicates a stop was requested.
# If some condition becomes true, ask the coordinator to stop.
def MyLoop(coord):
while not coord.should_stop():
...do something...
if ...some condition...:
coord.request_stop()
# Main thread: create a coordinator.
coord = tf.train.Coordinator()
# Create 10 threads that run 'MyLoop()'
threads = [threading.Thread(target=MyLoop, args=(coord,)) for i in xrange(10)]
# Start the threads and wait for all of them to stop.
for t in threads:
t.start()
coord.join(threads)
Supervisor的使用
??有了上面這些東西呢,就可以比較完美的實現(xiàn)我們的mini-batch的數(shù)據(jù)的生成過程了。但是還需要多提一個看起來不相關(guān)的話題。就是如果你恰好在看官網(wǎng)的教程,我們可以發(fā)現(xiàn)在序列的rnn教程的讀取數(shù)據(jù)的代碼里和我上文講的過程是有出入的。主要集中在以下兩點:
????1.該代碼沒有把每一個batch 放進queue當中,而是呀把epoch的編號放進了queue當中,使用的是tf.train.range_input_producer()這樣一個方法,本質(zhì)是建立了一個fifo隊列。這里首先需要解釋一下,在rnn解決序列問題的時候,epoch并不是我們理解的整個文本,而是一個unrolling的rnn的時間跨度。比如說我們的rnn模型使用了10個cell作為中間層,那么一個epoch就是由10個mini-batch組成的。理解了這個,我們也就很容易理解代碼中的做法,代碼把所有epoch編號,每次出隊一個編號,這個編號重新入隊之后,再根據(jù)編號去找對應的batch們,然后利用這些batch進行訓練。和我前面提到的直接將batch存到queue中的想法,多了一個查找的過程。
????2.另外一個重要區(qū)別是代碼里沒有用到start_queue_runners和coordinator。是我說錯了嗎!不!是它沒有直接用,可以觀察到,它使用了supervisor這么個東西。如果仔細研究過,就會知道這是一個用來長期進行訓練的可以取代普通session的東東。它的主要作用是模型訓練了一段時間(通常是半分鐘)之后,就會存儲一下。這樣的話如果訓練的時間很長,斷電啦,或者咋的啦。就會自動的從之前存儲的模型中讀取信息,實現(xiàn)了‘斷點續(xù)傳’。這個supervisor還順手把我們的queue_runners給啟動啦,并且自動的管理了多線程。真是多才多藝的小監(jiān)督員。