前 言
深度學(xué)習(xí)在各個(gè)領(lǐng)域?qū)崿F(xiàn)突破的一部分原因是我們使用了更多的數(shù)據(jù)(大數(shù)據(jù))來(lái)訓(xùn)練更復(fù)雜的模型(深度神經(jīng)網(wǎng)絡(luò)),并且可以利用一些高性能并行計(jì)算設(shè)備如GPU和FPGA來(lái)加速模型訓(xùn)練。但是有時(shí)候,模型之大或者訓(xùn)練數(shù)據(jù)量之多可能超出我們的想象,這個(gè)時(shí)候就需要分布式訓(xùn)練系統(tǒng),利用分布式系統(tǒng)我們可以訓(xùn)練更加復(fù)雜的模型(單機(jī)無(wú)法裝載),還可以加速我們的訓(xùn)練過(guò)程,這對(duì)于研究者實(shí)現(xiàn)模型的超參數(shù)優(yōu)化是非常有意義的。2017年6月,F(xiàn)acebook發(fā)布了他們的論文Accurate, Large Minibatch SGD:Training ImageNet in 1 Hour,文中指出他們采用分布在32個(gè)服務(wù)器上的256塊GPUs將Resnet-50模型在ImageNet數(shù)據(jù)集上的訓(xùn)練時(shí)間從兩周縮短為1個(gè)小時(shí)。在軟件層面,他們使用了很大的minibatch(8192)來(lái)訓(xùn)練模型,并且使學(xué)習(xí)速率正比于minibatch的大小。這意味著,采用分布式系統(tǒng)可以實(shí)現(xiàn)模型在成百個(gè)GPUs上的訓(xùn)練,從而大大減少訓(xùn)練時(shí)間,你也將有更多的機(jī)會(huì)去嘗試各種各樣的超參數(shù)組合。作為使用人數(shù)最多的深度學(xué)習(xí)框架,TensorFlow從version 0.8開(kāi)始支持模型的分布式訓(xùn)練,現(xiàn)在的TensorFlow支持模型的多機(jī)多卡(GPUs和 CPUs)訓(xùn)練。在這篇文章里面,我將簡(jiǎn)單介紹分布式TensorFlow的基礎(chǔ)知識(shí),并通過(guò)實(shí)例來(lái)講解如何使用分布式TensorFlow來(lái)訓(xùn)練模型。
Methods that scale with computation are the future of AI. —Rich Sutton, 強(qiáng)化學(xué)習(xí)之父
在開(kāi)始之前,有必要先簡(jiǎn)單介紹一下深度學(xué)習(xí)的分布式訓(xùn)練策略以及分布式架構(gòu)。這有助于理解分布式TensorFlow系統(tǒng)。
分布式訓(xùn)練策略
1.模型并行
所謂模型并行指的是將模型部署到很多設(shè)備上(設(shè)備可能分布在不同機(jī)器上,下同)運(yùn)行,比如多個(gè)機(jī)器的GPUs。當(dāng)神經(jīng)網(wǎng)絡(luò)模型很大時(shí),由于顯存限制,它是難以在跑在單個(gè)GPU上,這個(gè)時(shí)候就需要模型并行。比如Google的神經(jīng)機(jī)器翻譯系統(tǒng),其可能采用深度LSTM模型,如下圖所示,此時(shí)模型的不同部分需要分散到許多設(shè)備上進(jìn)行并行訓(xùn)練。深度學(xué)習(xí)模型一般包含很多層,如果要采用模型并行策略,一般需要將不同的層運(yùn)行在不同的設(shè)備上,但是實(shí)際上層與層之間的運(yùn)行是存在約束的:前向運(yùn)算時(shí),后面的層需要等待前面層的輸出作為輸入,而在反向傳播時(shí),前面的層又要受限于后面層的計(jì)算結(jié)果。所以除非模型本身很大,一般不會(huì)采用模型并行,因?yàn)槟P蛯优c層之間存在串行邏輯。但是如果模型本身存在一些可以并行的單元,那么也是可以利用模型并行來(lái)提升訓(xùn)練速度,比如GoogLeNet的Inception模塊。
模型并行訓(xùn)練
2.數(shù)據(jù)并行
深度學(xué)習(xí)模型最常采用的分布式訓(xùn)練策略是數(shù)據(jù)并行,因?yàn)橛?xùn)練費(fèi)時(shí)的一個(gè)重要原因是訓(xùn)練數(shù)據(jù)量很大。數(shù)據(jù)并行就是在很多設(shè)備上放置相同的模型,并且各個(gè)設(shè)備采用不同的訓(xùn)練樣本對(duì)模型訓(xùn)練。訓(xùn)練深度學(xué)習(xí)模型常采用的是batch SGD方法,采用數(shù)據(jù)并行,可以每個(gè)設(shè)備都訓(xùn)練不同的batch,然后收集這些梯度用于模型參數(shù)更新。前面所說(shuō)的Facebook訓(xùn)練Resnet50就是采用數(shù)據(jù)并行策略,使用256個(gè)GPUs,每個(gè)GPU讀取32個(gè)圖片進(jìn)行訓(xùn)練,如下圖所示,這樣相當(dāng)于采用非常大的batch(256 × 32 = 8192)來(lái)訓(xùn)練模型。
數(shù)據(jù)并行可以是同步的(synchronous),也可以是異步的(asynchronous)。所謂同步指的是所有的設(shè)備都是采用相同的模型參數(shù)來(lái)訓(xùn)練,等待所有設(shè)備的mini-batch訓(xùn)練完成后,收集它們的梯度然后取均值,然后執(zhí)行模型的一次參數(shù)更新。這相當(dāng)于通過(guò)聚合很多設(shè)備上的mini-batch形成一個(gè)很大的batch來(lái)訓(xùn)練模型,F(xiàn)acebook就是這樣做的,但是他們發(fā)現(xiàn)當(dāng)batch大小增加時(shí),同時(shí)線性增加學(xué)習(xí)速率會(huì)取得不錯(cuò)的效果。同步訓(xùn)練看起來(lái)很不錯(cuò),但是實(shí)際上需要各個(gè)設(shè)備的計(jì)算能力要均衡,而且要求集群的通信也要均衡,類(lèi)似于木桶效應(yīng),一個(gè)拖油瓶會(huì)嚴(yán)重拖慢訓(xùn)練進(jìn)度,所以同步訓(xùn)練方式相對(duì)來(lái)說(shuō)訓(xùn)練速度會(huì)慢一些。異步訓(xùn)練中,各個(gè)設(shè)備完成一個(gè)mini-batch訓(xùn)練之后,不需要等待其它節(jié)點(diǎn),直接去更新模型的參數(shù),這樣總體會(huì)訓(xùn)練速度會(huì)快很多。但是異步訓(xùn)練的一個(gè)很?chē)?yán)重的問(wèn)題是梯度失效問(wèn)題(stale gradients),剛開(kāi)始所有設(shè)備采用相同的參數(shù)來(lái)訓(xùn)練,但是異步情況下,某個(gè)設(shè)備完成一步訓(xùn)練后,可能發(fā)現(xiàn)模型參數(shù)其實(shí)已經(jīng)被其它設(shè)備更新過(guò)了,此時(shí)這個(gè)梯度就過(guò)期了,因?yàn)楝F(xiàn)在的模型參數(shù)和訓(xùn)練前采用的參數(shù)是不一樣的。由于梯度失效問(wèn)題,異步訓(xùn)練雖然速度快,但是可能陷入次優(yōu)解(sub-optimal training performance)。異步訓(xùn)練和同步訓(xùn)練在TensorFlow中不同點(diǎn)如下圖所示:
數(shù)據(jù)并行中的同步方式和異步方式
為了解決異步訓(xùn)練出現(xiàn)的梯度失效問(wèn)題,微軟提出了一種Asynchronous Stochastic Gradient Descent方法,主要是通過(guò)梯度補(bǔ)償來(lái)提升訓(xùn)練效果。應(yīng)該還有其他類(lèi)似的研究,感興趣的可以深入了解一下。
分布式訓(xùn)練架構(gòu)
前面說(shuō)的是分布式訓(xùn)練策略,這里要談的是系統(tǒng)架構(gòu)層,包括兩種架構(gòu):Parameter server architecture(就是常見(jiàn)的PS架構(gòu),參數(shù)服務(wù)器)和Ring-allreduce architecture。這里主要參考Distributed TensorFlow,完全是拿來(lái)主義了。
1.Parameter server架構(gòu)
在Parameter server架構(gòu)(PS架構(gòu))中,集群中的節(jié)點(diǎn)被分為兩類(lèi):parameter server和worker。其中parameter server存放模型的參數(shù),而worker負(fù)責(zé)計(jì)算參數(shù)的梯度。在每個(gè)迭代過(guò)程,worker從parameter sever中獲得參數(shù),然后將計(jì)算的梯度返回給parameter server,parameter server聚合從worker傳回的梯度,然后更新參數(shù),并將新的參數(shù)廣播給worker。采用同步SGD方式的PS架構(gòu)如下圖所示:
PS架構(gòu)中的同步SGD訓(xùn)練方式
在TensorFlow之前,Google采用的是DistBelief框架,其支持PS架構(gòu)。TensorFlow從DistBelief借鑒了它的很多分布式訓(xùn)練模式,所以TensorFlow也支持PS架構(gòu)。Mxnet的主要?jiǎng)?chuàng)建者李沐在前人基礎(chǔ)上開(kāi)發(fā)了更加通用的輕量級(jí)ps-lite,如果想深入理解PS架構(gòu),可以看一下沐神的講解。PS架構(gòu)是深度學(xué)習(xí)最常采用的分布式訓(xùn)練架構(gòu)。
2.Ring-allreduce架構(gòu)
在Ring-allreduce架構(gòu)中,各個(gè)設(shè)備都是worker,并且形成一個(gè)環(huán),如下圖所示,沒(méi)有中心節(jié)點(diǎn)來(lái)聚合所有worker計(jì)算的梯度。在一個(gè)迭代過(guò)程,每個(gè)worker完成自己的mini-batch訓(xùn)練,計(jì)算出梯度,并將梯度傳遞給環(huán)中的下一個(gè)worker,同時(shí)它也接收從上一個(gè)worker的梯度。對(duì)于一個(gè)包含N個(gè)worker的環(huán),各個(gè)worker需要收到其它N-1個(gè)worker的梯度后就可以更新模型參數(shù)。其實(shí)這個(gè)過(guò)程需要兩個(gè)部分:scatter-reduce和allgather,百度的教程對(duì)這個(gè)過(guò)程給出了詳細(xì)的圖文解釋。百度開(kāi)發(fā)了自己的allreduce框架,并將其用在了深度學(xué)習(xí)的分布式訓(xùn)練中。
Ring-allreduce架構(gòu)示意圖
相比PS架構(gòu),Ring-allreduce架構(gòu)是帶寬優(yōu)化的,因?yàn)榧褐忻總€(gè)節(jié)點(diǎn)的帶寬都被充分利用。此外,在深度學(xué)習(xí)訓(xùn)練過(guò)程中,計(jì)算梯度采用BP算法,其特點(diǎn)是后面層的梯度先被計(jì)算,而前面層的梯度慢于前面層,Ring-allreduce架構(gòu)可以充分利用這個(gè)特點(diǎn),在前面層梯度計(jì)算的同時(shí)進(jìn)行后面層梯度的傳遞,從而進(jìn)一步減少訓(xùn)練時(shí)間。在百度的實(shí)驗(yàn)中,他們發(fā)現(xiàn)訓(xùn)練速度基本上線性正比于GPUs數(shù)目(worker數(shù))。
分布式TensorFlow簡(jiǎn)介
好了,言歸正傳,現(xiàn)在開(kāi)始介紹分布式TensorFlow的基礎(chǔ)知識(shí)。在分布式TensorFlow中,參與分布式系統(tǒng)的所有節(jié)點(diǎn)或者設(shè)備被總稱(chēng)為一個(gè)集群(cluster),一個(gè)cluster中包含很多服務(wù)器(server),每個(gè)server去執(zhí)行一項(xiàng)任務(wù)(task),server和task是一一對(duì)應(yīng)的。所以,cluster可以看成是server的集合,也可以看成是task的集合。TensorFlow為各個(gè)task又增加了一個(gè)抽象層,將一系列相似的task集合稱(chēng)為一個(gè)job,比如在PS架構(gòu)中,習(xí)慣稱(chēng)parameter server的task集合為ps,而稱(chēng)執(zhí)行梯度計(jì)算的task集合為worker。所以cluster又可以看成是job的集合,不過(guò)這只是邏輯上的意義,具體還要看這個(gè)server真正干什么。在TensorFlow中,job用name(字符串)標(biāo)識(shí),而task用index(整數(shù)索引)標(biāo)識(shí),那么cluster中的每個(gè)task可以用job的name加上task的index來(lái)唯一標(biāo)識(shí)。在分布式系統(tǒng)中,一般情況下各個(gè)task在不同的節(jié)點(diǎn)或者設(shè)備上執(zhí)行。TensorFlow中用tf.train.ClusterSpec創(chuàng)建一個(gè)cluster:
<pre data-anchor-id="qq27" style="margin: 0px; padding: 10px 15px; max-width: 100%; box-sizing: border-box !important; word-wrap: break-word !important;">
cluster = tf.train.ClusterSpec({ "worker": [ "worker0.example.com:2222", "worker1.example.com:2222", "worker2.example.com:2222" ], "ps": [ "ps0.example.com:2222", "ps1.example.com:2222" ]})
</pre>
可以看出,cluster接收的其實(shí)就是一個(gè)字典,字典里面包含了各個(gè)task所在host的主機(jī)地址,這個(gè)cluster共包含兩類(lèi)job:ps和worker,共5個(gè)task:
/job:worker/task:0
/job:worker/task:1
/job:worker/task:2
/job:ps/task:0
/job:ps/task:1
創(chuàng)建好cluster,需要?jiǎng)?chuàng)建各個(gè)task的server,使用tf.train.Server函數(shù),比如創(chuàng)建第一個(gè)worker的server:
server = tf.train.Server(cluster, job_name="worker", task_index=0)
在創(chuàng)建sever時(shí)必須要傳入cluster,這樣每個(gè)server才可以知道自己所在的cluster包含哪些hosts,然后server與server之間才可以通信。sever的創(chuàng)建需要在自己所在host上,一旦所有的server在各自的host上創(chuàng)建好了,整個(gè)集群就搭建好了,cluster之間的各個(gè)server可以互相通信。具體來(lái)說(shuō),每個(gè)server包含兩個(gè)組件:master和worker。其中master提供master service,其主要可以提供對(duì)cluster中各個(gè)設(shè)備的遠(yuǎn)程訪問(wèn)(RPC協(xié)議),同時(shí)它的另外一個(gè)重要功能是作為創(chuàng)建tf.Session的target。而worker提供worker service,可以用本地設(shè)備執(zhí)行TF中的計(jì)算子圖。這兩個(gè)東西并不好理解,這里我們先講TensorFlow中的另外一個(gè)重要概念:client,先拋出官方英文解釋?zhuān)?/p>
A client is typically a program that builds a TensorFlow graph and constructs a tensorflow::Session to interact with a cluster. Clients are typically written in Python or C++. A single client process can directly interact with multiple TensorFlow servers (see "Replicated training" above), and a single server can serve multiple clients.
這個(gè)client是個(gè)很重要的概念,簡(jiǎn)單來(lái)說(shuō)就是一個(gè)程序,它創(chuàng)建了TF的計(jì)算圖,并通過(guò)建立Session與cluster中的設(shè)備進(jìn)行交互。說(shuō)白了前面創(chuàng)建的cluster與server只是搭建分布式環(huán)境,真正要執(zhí)行計(jì)算需要?jiǎng)?chuàng)建client。對(duì)于tf.Session這個(gè)類(lèi),其第一個(gè)參數(shù)是target,一般情況下大家確實(shí)用不到,因?yàn)椴恢付ㄟ@個(gè)參數(shù)的話,Session就默認(rèn)調(diào)用本地設(shè)備,但是在分布式環(huán)境就需要指定了,這就是server里面的master(server.target提供這個(gè)參數(shù))。實(shí)際上,TensorFlow的完整執(zhí)行邏輯如下圖所示:
TensorFlow計(jì)算圖在單機(jī)和分布式系統(tǒng)的執(zhí)行流程圖
就是說(shuō)client要跑計(jì)算時(shí),其實(shí)要先要把計(jì)算圖以及要執(zhí)行的節(jié)點(diǎn)(Graph中的Node)發(fā)給master,master負(fù)責(zé)資源調(diào)度(就是這個(gè)計(jì)算該怎么執(zhí)行,在哪些設(shè)備執(zhí)行),最終的執(zhí)行需要各個(gè)worker進(jìn)程(使用本地設(shè)備執(zhí)行計(jì)算),所以每個(gè)server會(huì)包含master和worker兩個(gè)部分。關(guān)于master的具體作用,可以參考一下TF教程中的TensorFlow Architecture,不過(guò)這里貼一張圖,大家意淫一下:
分布式TensorFlow中的master劃分子圖
上面簡(jiǎn)單解釋了一下server和client相關(guān)的一些重要概念,幫助大家理解分布式TensorFlow的執(zhí)行邏輯。那么,在構(gòu)建Graph時(shí)如何調(diào)用cluster中的各個(gè)server呢?很簡(jiǎn)單,使用tf.device,只需要指定task的詳細(xì)信息即可:
with tf.device("/job:ps/task:0"):
weights_1 = tf.Variable(...)
biases_1 = tf.Variable(...)
with tf.device("/job:ps/task:1"):
weights_2 = tf.Variable(...)
biases_2 = tf.Variable(...)
with tf.device("/job:worker/task:7"):
input, labels = ...
layer_1 = tf.nn.relu(tf.matmul(input, weights_1) + biases_1)
logits = tf.nn.relu(tf.matmul(layer_1, weights_2) + biases_2)
# ...
train_op = ...
很簡(jiǎn)單,就如同把cluster里面的設(shè)備當(dāng)成本機(jī)設(shè)備一樣使用,至于怎么真正執(zhí)行,那就是系統(tǒng)層面的事了。構(gòu)建了Graph后,我們需要?jiǎng)?chuàng)建Session來(lái)執(zhí)行計(jì)算圖:
with tf.Session("grpc://worker7.example.com:2222") as sess:
for _ in range(10000):
sess.run(train_op)
注意由于是分布式系統(tǒng),需要指定Session的target參數(shù),或者采用grpc+主機(jī)地址,或者直接利用sever.target,兩個(gè)是完全一樣的。下面我們通過(guò)一個(gè)簡(jiǎn)單的實(shí)例來(lái)理解上面過(guò)程,這個(gè)例子的cluster共包含3個(gè)task:1個(gè)ps和2個(gè)worker。
import tensorflow as tf
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps hosts")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker hosts")
tf.app.flags.DEFINE_string("job_name", "worker", "'ps' or'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
FLAGS = tf.app.flags.FLAGS
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# create cluster
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# create the server
server = tf.train.Server(cluster, job_name=FLAGS.job_name, task_index=FLAGS.task_index)
server.join()
if __name__ == "__main__":
tf.app.run()
注意這里用單機(jī)環(huán)境模擬多機(jī)環(huán)境,然后分別執(zhí)行下面三個(gè)命令行來(lái)創(chuàng)建三個(gè)server:
python example.py --job_name=ps --task_index=0
python example.py --job_name=worker --task_index=0
python example.py --job_name=worker --task_index=1
執(zhí)行完畢后,三個(gè)server都處在等待狀態(tài),現(xiàn)在我們?cè)趧?chuàng)建一個(gè)client來(lái)執(zhí)行一個(gè)計(jì)算圖,并且采用/job:worker/task:0這個(gè)server所對(duì)應(yīng)的master,即grpc://localhost:2223來(lái)創(chuàng)建Session,如下所示:
import tensorflow as tf
if __name__ == "__main__":
with tf.device("/job:ps/task:0"):
x = tf.Variable(tf.ones([2, 2]))
y = tf.Variable(tf.ones([2, 2]))
with tf.device("/job:worker/task:0"):
z = tf.matmul(x, y) + x
with tf.device("/job:worker/task:1"):
z = tf.matmul(z, x) + x
with tf.Session("grpc://localhost:2223") as sess:
sess.run(tf.global_variables_initializer())
val = sess.run(z)
print(val)
其實(shí)這個(gè)client就是一個(gè)進(jìn)程,但是其在計(jì)算時(shí)需要依靠cluster中的device來(lái)執(zhí)行部分計(jì)算子圖。值得注意的是上面的程序我們遵循了PS架構(gòu),參數(shù)放置在ps,而worker執(zhí)行計(jì)算。但是在TensorFlow中,其實(shí)每個(gè)task所屬的job只是一個(gè)概念,并沒(méi)有什么差別,就是說(shuō)對(duì)于上面的程序,你完全可以把參數(shù)放置在worker上。所以說(shuō),TensorFlow的分布式架構(gòu)支持PS模式,并且也往往采用這種方式,但是TensorFlow并不完全與PS架構(gòu)對(duì)等。
復(fù)制訓(xùn)練(Replicated training)
前面已經(jīng)說(shuō)過(guò)了,深度學(xué)習(xí)模型分布式訓(xùn)練最常用的是數(shù)據(jù)并行策略,在TensorFlow中稱(chēng)這為復(fù)制訓(xùn)練(Replicated training),就是說(shuō)多個(gè)worker使用不同的mini-batch訓(xùn)練相同的模型,計(jì)算出的梯度用于更新放置在ps的模型參數(shù)。由于復(fù)制訓(xùn)練是一種最常用的模式,TensorFlow也增加了一些庫(kù)函數(shù)來(lái)簡(jiǎn)化復(fù)制訓(xùn)練的實(shí)現(xiàn)。在TensorFlow中共有四種不同的方式來(lái)實(shí)現(xiàn)復(fù)制訓(xùn)練:
In-graph replication:只構(gòu)建一個(gè)client,這個(gè)client構(gòu)建一個(gè)Graph,Graph中包含一套模型參數(shù),放置在ps上,同時(shí)Graph中包含模型計(jì)算部分的多個(gè)副本,每個(gè)副本都放置在一個(gè)worker上,這樣多個(gè)worker可以同時(shí)訓(xùn)練復(fù)制的模型。TensorFlow教程中的使用多個(gè)GPUs訓(xùn)練cifar10分類(lèi)模型就屬于這個(gè)類(lèi)型,每個(gè)GPUs上的計(jì)算子圖是相同的,但是屬于同一個(gè)Graph。這種方法很少使用,因?yàn)橐坏ヽlient掛了,整個(gè)系統(tǒng)就全崩潰了,容錯(cuò)能力差。
Between-graph replication:每個(gè)worker都創(chuàng)建一個(gè)client,這個(gè)client一般還與task的主程序在同一進(jìn)程中。各個(gè)client構(gòu)建相同的Graph,但是參數(shù)還是放置在ps上。這種方式就比較好,一個(gè)worker的client掛掉了,系統(tǒng)還可以繼續(xù)跑。
Asynchronous training:異步方式訓(xùn)練,各個(gè)worker自己干自己的,不需要與其它worker來(lái)協(xié)調(diào),前面也已經(jīng)詳細(xì)介紹了異步訓(xùn)練,上面兩種方式都可以采用異步訓(xùn)練。
Synchronous training:同步訓(xùn)練,各個(gè)worker要統(tǒng)一步伐,計(jì)算出的梯度要先聚合才可以執(zhí)行一次模型更新,對(duì)于In-graph replication方法,由于各個(gè)worker的計(jì)算子圖屬于同一個(gè)Graph,很容易實(shí)現(xiàn)同步訓(xùn)練。但是對(duì)于Between-graph replication方式,各個(gè)worker都有自己的client,這就需要系統(tǒng)上的設(shè)計(jì)了,TensorFlow提供了tf.train.SyncReplicasOptimizer來(lái)實(shí)現(xiàn)Between-graph replication的同步訓(xùn)練。
由于在TensorFlow中最常用的是Between-graph replication方式,這里著重講一下如何實(shí)現(xiàn)這種方式。在Between-graph replication中,各個(gè)worker都包含一個(gè)client,它們構(gòu)建相同的計(jì)算圖,然后把參數(shù)放在ps上,TensorFlow提供了一個(gè)專(zhuān)門(mén)的函數(shù)tf.train.replica_device_setter來(lái)方便Graph構(gòu)建,先看代碼:
# cluster包含兩個(gè)ps 和三個(gè) worker
cluster_spec = {
"ps": ["ps0:2222", "ps1:2222"],
"worker": ["worker0:2222", "worker1:2222", "worker2:2222"]}
cluster = tf.train.ClusterSpec(cluster_spec)
with tf.device(tf.train.replica_device_setter(
worker_device="/job:worker/task:%d" % FLAGS.task_index,
cluster=cluster)):
# Build your graph
v1 = tf.Variable(...) # assigned to /job:ps/task:0
v2 = tf.Variable(...) # assigned to /job:ps/task:1
v3 = tf.Variable(...) # assigned to /job:ps/task:0
# Run compute
使用tf.train.replica_device_setter可以自動(dòng)把Graph中的Variables放到ps上,而同時(shí)將Graph的計(jì)算部分放置在當(dāng)前worker上,省去了很多麻煩。由于ps往往不止一個(gè),這個(gè)函數(shù)在為各個(gè)Variable分配ps時(shí)默認(rèn)采用簡(jiǎn)單的round-robin方式,就是按次序?qū)?shù)挨個(gè)放到各個(gè)ps上,但這個(gè)方式可能不能使ps負(fù)載均衡,如果需要更加合理,可以采用tf.contrib.training.GreedyLoadBalancingStrategy策略。
采用Between-graph replication方式的另外一個(gè)問(wèn)題,由于各個(gè)worker都獨(dú)立擁有自己的client,但是對(duì)于一些公共操作比如模型參數(shù)初始化與checkpoint文件保存等,如果每個(gè)client都獨(dú)立進(jìn)行這些操作,顯然是對(duì)資源的浪費(fèi)。為了解決這個(gè)問(wèn)題,一般會(huì)指定一個(gè)worker為chief worker,它將作為各個(gè)worker的管家,協(xié)調(diào)它們之間的訓(xùn)練,并且完成模型初始化和模型保存和恢復(fù)等公共操作。在TensorFlow中,可以使用tf.train.MonitoredTrainingSession創(chuàng)建client的Session,并且其可以指定哪個(gè)worker是chief worker。關(guān)于這些方面,想深入理解可以看一下2017 TensorFlow 開(kāi)發(fā)峰會(huì)的官方講解,其中也對(duì)分布式TensorFlow的容錯(cuò)機(jī)制做了簡(jiǎn)單介紹。
Between-graph replication方式的編程結(jié)構(gòu)圖
MNIST分布式訓(xùn)練實(shí)例
最后,我們給出MNIST的分布式訓(xùn)練實(shí)例,采用Between-graph replication方式,并且同步訓(xùn)練和異步訓(xùn)練都支持。在這個(gè)例子中,cluster共包含2個(gè)ps和2個(gè)worker,其中worker1為chief worker。代碼如下:
import tensorflow as tf
from tensorflow.contrib.learn.python.learn.datasets.mnist import read_data_sets
tf.app.flags.DEFINE_string("ps_hosts", "localhost:2222", "ps hosts")
tf.app.flags.DEFINE_string("worker_hosts", "localhost:2223,localhost:2224", "worker hosts")
tf.app.flags.DEFINE_string("job_name", "worker", "'ps' or'worker'")
tf.app.flags.DEFINE_integer("task_index", 0, "Index of task within the job")
tf.app.flags.DEFINE_integer("num_workers", 2, "Number of workers")
tf.app.flags.DEFINE_boolean("is_sync", False, "using synchronous training or not")
FLAGS = tf.app.flags.FLAGS
def model(images):
"""Define a simple mnist classifier"""
net = tf.layers.dense(images, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 500, activation=tf.nn.relu)
net = tf.layers.dense(net, 10, activation=None)
return net
def main(_):
ps_hosts = FLAGS.ps_hosts.split(",")
worker_hosts = FLAGS.worker_hosts.split(",")
# create the cluster configured by `ps_hosts' and 'worker_hosts'
cluster = tf.train.ClusterSpec({"ps": ps_hosts, "worker": worker_hosts})
# create a server for local task
server = tf.train.Server(cluster, job_name=FLAGS.job_name,
task_index=FLAGS.task_index)
if FLAGS.job_name == "ps":
server.join() # ps hosts only join
elif FLAGS.job_name == "worker":
# workers perform the operation
# ps_strategy = tf.contrib.training.GreedyLoadBalancingStrategy(FLAGS.num_ps)
# Note: tf.train.replica_device_setter automatically place the paramters (Variables)
# on the ps hosts (default placement strategy: round-robin over all ps hosts, and also
# place multi copies of operations to each worker host
with tf.device(tf.train.replica_device_setter(worker_device="/job:worker/task:%d" % (FLAGS.task_index),
cluster=cluster)):
# load mnist dataset
mnist = read_data_sets("./dataset", one_hot=True)
# the model
images = tf.placeholder(tf.float32, [None, 784])
labels = tf.placeholder(tf.int32, [None, 10])
logits = model(images)
loss = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=logits, labels=labels))
# The StopAtStepHook handles stopping after running given steps.
hooks = [tf.train.StopAtStepHook(last_step=2000)]
global_step = tf.train.get_or_create_global_step()
optimizer = tf.train.AdamOptimizer(learning_rate=1e-04)
if FLAGS.is_sync:
# asynchronous training
# use tf.train.SyncReplicasOptimizer wrap optimizer
# ref: https://www.tensorflow.org/api_docs/python/tf/train/SyncReplicasOptimizer
optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=FLAGS.num_workers,
total_num_replicas=FLAGS.num_workers)
# create the hook which handles initialization and queues
hooks.append(optimizer.make_session_run_hook((FLAGS.task_index==0)))
train_op = optimizer.minimize(loss, global_step=global_step,
aggregation_method=tf.AggregationMethod.ADD_N)
# The MonitoredTrainingSession takes care of session initialization,
# restoring from a checkpoint, saving to a checkpoint, and closing when done
# or an error occurs.
with tf.train.MonitoredTrainingSession(master=server.target,
is_chief=(FLAGS.task_index == 0),
checkpoint_dir="./checkpoint_dir",
hooks=hooks) as mon_sess:
while not mon_sess.should_stop():
# mon_sess.run handles AbortedError in case of preempted PS.
img_batch, label_batch = mnist.train.next_batch(32)
_, ls, step = mon_sess.run([train_op, loss, global_step],
feed_dict={images: img_batch, labels: label_batch})
if step % 100 == 0:
print("Train step %d, loss: %f" % (step, ls))
if __name__ == "__main__":
tf.app.run()
異步執(zhí)行時(shí),分別執(zhí)行下面四條語(yǔ)句:
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=0
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=1
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=0
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=1
此時(shí)你會(huì)看到兩個(gè)worker打印出的step是交叉的,說(shuō)明此時(shí)是異步執(zhí)行的,每個(gè)worker執(zhí)行一次梯度計(jì)算后,立即將梯度發(fā)給ps完成參數(shù)更新。
對(duì)于同步執(zhí)行,采用tf.train.SyncReplicasOptimizer,分別執(zhí)行下面四條語(yǔ)句:
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=0 --is_sync=True
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=ps --task_index=1 --is_sync=True
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=0 --is_sync=True
python distributed_mnist.py --ps_hosts=localhost:2222,localhost:2223 --worker_hosts=localhost:2224,localhost:2225 --job_name=worker --task_index=1 --is_sync=True
此時(shí)你可以看到兩個(gè)worker基本上同時(shí)打印相同的step(但是loss是不一樣的),說(shuō)明是同步執(zhí)行。值得注意的是,TensorFlow中的同步訓(xùn)練可能與你想象中不同,它只是收集足夠的梯度(N個(gè)step的梯度結(jié)果)就聚合這些梯度值然后執(zhí)行一次參數(shù)更新。但是它不管這N個(gè)結(jié)果是從哪里來(lái)的,如果其中某個(gè)worker速度很慢,可能這N個(gè)結(jié)果都是從其他worker計(jì)算出的。言外之意就是chief worker聚合的梯度不一定是從全部worker中收集而來(lái)的(參考這個(gè)issues)。這個(gè)機(jī)制很怪異,我想是為了容錯(cuò)機(jī)制,不至于一個(gè)worker死掉了而終止整個(gè)訓(xùn)練過(guò)程。所以,在同步訓(xùn)練過(guò)程中,最好每個(gè)worker的能力都差不多,要不然很難得到想要的加速效果(某個(gè)worker慢的話,它計(jì)算的梯度可能過(guò)期,那么只能被丟棄,這種情況下這個(gè)worker做的就是無(wú)用功)。
走得更遠(yuǎn)
TensorFlow可以與Hadoop和Spark等工具結(jié)合,感興趣的話可以自己深入去學(xué)習(xí):
TensorFlow官方教程:How to run TensorFlow on Hadoop.
Yahho: Open Sourcing TensorFlowOnSpark: Distributed Deep Learning on Big-Data Clusters.
總結(jié)
最近打算學(xué)習(xí)一下分布式TensorFlow,所以系統(tǒng)地看了官方文檔以及一些國(guó)外的博客,然后就把其中一些講解得比較好的地方以及自己的學(xué)習(xí)心得總結(jié)了一下,所以就有了此文。但是網(wǎng)上的資料并不是很多,所以文中有錯(cuò)誤之處在所難免,也懇請(qǐng)各位大佬斧正。很偶然地看到一篇最新的綜述文章Demystifying Parallel and Distributed Deep Learning: An In-Depth Concurrency Analysis,60頁(yè)的paper系統(tǒng)總結(jié)了深度學(xué)習(xí)的并行化策略,想深入學(xué)習(xí)的可以讀讀這個(gè)paper。
參考文獻(xiàn)
Distributed TensorFlow.
How to write distributed TensorFlow code?—?with an example on Clusterone.
MNIST實(shí)例:Distributing TensorFlow.
Google TF 官網(wǎng):Distributed TensorFlow.
Distributed TensorFlow(2017 TensorFlow 開(kāi)發(fā)峰會(huì)).
Baidu Research: Bringing HPC Techniques to Deep Learning.
TensorFlow學(xué)習(xí)筆記(9):分布式TensorFlow.
Distributed TensorFlow Example.
(注:文中圖片原始來(lái)源均可以在文中或者參考文章的鏈接中找到)
個(gè)人技術(shù)博客:https://blog.csdn.net/u013709270
微信:TonyJeemy520 加個(gè)人微信拉你進(jìn)機(jī)器學(xué)習(xí)、深度學(xué)習(xí)交流群,請(qǐng)備注 : 來(lái)自簡(jiǎn)書(shū)
QQ交流群:651616387 請(qǐng)備注 : 來(lái)自簡(jiǎn)書(shū)
微信公眾號(hào):機(jī)器學(xué)習(xí)算法工程師 ----二維碼見(jiàn)下圖
掃碼關(guān)注微信號(hào):機(jī)器學(xué)習(xí)算法工程師,更多干貨分享, 或加個(gè)人微信,拉你進(jìn)機(jī)器學(xué)習(xí)、深度學(xué)習(xí)交流群