3.6 Shuffle機(jī)制
在MapReduce框架中,Shuffle是連接Map和Reduce之間的橋梁,Map的輸出要用到Reduce中必須經(jīng)過(guò)Shuffle這個(gè)環(huán)節(jié),Shuffle的性能高低直接影響了整個(gè)程序的性能和吞吐量。Spark作為MapReduce框架的一種實(shí)現(xiàn),自然也實(shí)現(xiàn)了Shuffle的邏輯。對(duì)于大數(shù)據(jù)計(jì)算框架而言,Shuffle階段的效率是決定性能好壞的關(guān)鍵因素之一。
3.6.1 什么是Shuffle
Shuffle是MapReduce框架中的一個(gè)特定的階段,介于Map階段和Reduce階段之間,當(dāng)Map的輸出結(jié)果要被Reduce使用時(shí),輸出結(jié)果需要按關(guān)鍵字值(key)哈希,并且分發(fā)到每一個(gè)Reducer上,這個(gè)過(guò)程就是Shuffle。直觀來(lái)講,Spark Shuffle機(jī)制是將一組無(wú)規(guī)則的數(shù)據(jù)轉(zhuǎn)換為一組具有一定規(guī)則數(shù)據(jù)的過(guò)程。由于Shuffle涉及了磁盤(pán)的讀寫(xiě)和網(wǎng)絡(luò)的傳輸,因此Shuffle性能的高低直接影響整個(gè)程序的運(yùn)行效率。
在MapReduce計(jì)算框架中,Shuffle連接了Map階段和Reduce階段,即每個(gè)Reduce Task從每個(gè)Map Task產(chǎn)生的數(shù)據(jù)中讀取一片數(shù)據(jù),極限情況下可能觸發(fā)M*R個(gè)數(shù)據(jù)拷貝通道(M是Map Task數(shù)目,R是Reduce Task數(shù)目)。通常Shuffle分為兩部分:Map階段的數(shù)據(jù)準(zhǔn)備和Reduce階段的數(shù)據(jù)拷貝。首先,Map階段需根據(jù)Reduce階段的Task數(shù)量決定每個(gè)Map Task輸出的數(shù)據(jù)分片數(shù)目,有多種方式存放這些數(shù)據(jù)分片:
1)保存在內(nèi)存中或者磁盤(pán)上(Spark和MapReduce都存放在磁盤(pán)上)。
2)每個(gè)分片對(duì)應(yīng)一個(gè)文件(現(xiàn)在Spark采用的方式,以及以前MapReduce采用的方式),或者所有分片放到一個(gè)數(shù)據(jù)文件中,外加一個(gè)索引文件記錄每個(gè)分片在數(shù)據(jù)文件中的偏移量(現(xiàn)在MapReduce采用的方式)。
因此可以認(rèn)為Spark Shuffle與Mapreduce Shuffle的設(shè)計(jì)思想相同,但在實(shí)現(xiàn)細(xì)節(jié)和優(yōu)化方式上不同。
在Spark中,任務(wù)通常分為兩種,Shuffle mapTask和reduceTask,具體邏輯如圖3-11所示:
[插圖]
圖3-11 Spark Shuffl e
圖3-11中的主要邏輯如下:
1)首先每一個(gè)MapTask會(huì)根據(jù)ReduceTask的數(shù)量創(chuàng)建出相應(yīng)的bucket, bucket的數(shù)量是M×R,其中M是Map的個(gè)數(shù),R是Reduce的個(gè)數(shù)。
2)其次MapTask產(chǎn)生的結(jié)果會(huì)根據(jù)設(shè)置的partition算法填充到每個(gè)bucket中。這里的partition算法是可以自定義的,當(dāng)然默認(rèn)的算法是根據(jù)key哈希到不同的bucket中。
當(dāng)ReduceTask啟動(dòng)時(shí),它會(huì)根據(jù)自己task的id和所依賴(lài)的Mapper的id從遠(yuǎn)端或本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進(jìn)行處理。
這里的bucket是一個(gè)抽象概念,在實(shí)現(xiàn)中每個(gè)bucket可以對(duì)應(yīng)一個(gè)文件,可以對(duì)應(yīng)文件的一部分或是其他等。Spark shuffle可以分為兩部分:
1)將數(shù)據(jù)分成bucket,并將其寫(xiě)入磁盤(pán)的過(guò)程稱(chēng)為Shuffle Write。
2)在存儲(chǔ)Shuffle數(shù)據(jù)的節(jié)點(diǎn)Fetch數(shù)據(jù),并執(zhí)行用戶(hù)定義的聚集操作,這個(gè)過(guò)程稱(chēng)為Shuffle Fetch。
3.6.2 Shuffle歷史及細(xì)節(jié)
下面介紹Shuffle Write與Fetch。
1. Shuffle Write
在Spark的早期版本實(shí)現(xiàn)中,Spark在每一個(gè)MapTask中為每個(gè)ReduceTask創(chuàng)建一個(gè)bucket,并將RDD計(jì)算結(jié)果放進(jìn)bucket中。
但早期的Shuffle Write有兩個(gè)比較大的問(wèn)題。
1)Map的輸出必須先全部存儲(chǔ)到內(nèi)存中,然后寫(xiě)入磁盤(pán)。這對(duì)內(nèi)存是非常大的開(kāi)銷(xiāo),當(dāng)內(nèi)存不足以存儲(chǔ)所有的Map輸出時(shí)就會(huì)出現(xiàn)OOM(Out of Memory)。
2)每個(gè)MapTask會(huì)產(chǎn)生與ReduceTask數(shù)量一致的Shuffle文件,如果MapTask個(gè)數(shù)是1k, ReduceTask個(gè)數(shù)也是1k,就會(huì)產(chǎn)生1M個(gè)Shuffle文件。這對(duì)于文件系統(tǒng)是比較大的壓力,同時(shí)在Shuffle數(shù)據(jù)量不大而Shuffle文件又非常多的情況下,隨機(jī)寫(xiě)也會(huì)嚴(yán)重降低IO的性能。
后來(lái)到了Spark 0.8版實(shí)現(xiàn)時(shí),顯著減少了Shuffle的內(nèi)存壓力,現(xiàn)在Map輸出不需要先全部存儲(chǔ)在內(nèi)存中,再flush到硬盤(pán),而是record-by-record寫(xiě)入磁盤(pán)中。對(duì)于Shuffle文件的管理也獨(dú)立出新的ShuffleBlockManager進(jìn)行管理,而不是與RDD cache文件在一起了。
但是Spark 0.8版的Shuffle Write仍然有兩個(gè)大的問(wèn)題沒(méi)有解決。
1)Shuffle文件過(guò)多的問(wèn)題。這會(huì)導(dǎo)致文件系統(tǒng)的壓力過(guò)大并降低IO的吞吐量。
2)雖然Map輸出數(shù)據(jù)不再需要預(yù)先存儲(chǔ)在內(nèi)存中然后寫(xiě)入磁盤(pán),從而顯著減少了內(nèi)存壓力。但是新引入的DiskObjectWriter所帶來(lái)的buffer開(kāi)銷(xiāo)也是不容小視的內(nèi)存開(kāi)銷(xiāo)。假定有1k個(gè)MapTask和1k個(gè)ReduceTask,就會(huì)有1M個(gè)bucket,相應(yīng)地就會(huì)有1M個(gè)write handler,而每一個(gè)write handler默認(rèn)需要100KB內(nèi)存,那么總共需要100GB內(nèi)存。這樣僅僅是buffer就需要這么多的內(nèi)存。因此當(dāng)ReduceTask數(shù)量很多時(shí),內(nèi)存開(kāi)銷(xiāo)會(huì)很大。
為了解決shuffle文件過(guò)多的情況,Spark后來(lái)引入了新的Shuffle consolidation,以期顯著減少Shuffle文件的數(shù)量。
Shuffle consolidation的原理如圖3-12所示:
[插圖]
圖3-12 Shuffl e consolidation
在圖3-12中,假定該job有4個(gè)Mapper和4個(gè)Reducer,有2個(gè)core能并行運(yùn)行兩個(gè)task??梢运愠鯯park的Shuffle Write共需要16個(gè)bucket,也就有了16個(gè)write handler。在之前的Spark版本中,每個(gè)bucket對(duì)應(yīng)一個(gè)文件,因此在這里會(huì)產(chǎn)生16個(gè)shuffle文件。
而在Shuffle consolidation中,每個(gè)bucket并非對(duì)應(yīng)一個(gè)文件,而是對(duì)應(yīng)文件中的一個(gè)segment。同時(shí)Shuffle consolidation產(chǎn)生的Shuffle文件數(shù)量與Spark core的個(gè)數(shù)也有關(guān)系。在圖3-12中,job中的4個(gè)Mapper分為兩批運(yùn)行,在第一批2個(gè)Mapper運(yùn)行時(shí)會(huì)申請(qǐng)8個(gè)bucket,產(chǎn)生8個(gè)Shuffle文件;而在第二批Mapper運(yùn)行時(shí),申請(qǐng)的8個(gè)bucket并不會(huì)再產(chǎn)生8個(gè)新的文件,而是追加寫(xiě)到之前的8個(gè)文件后面,這樣一共就只有8個(gè)Shuffle文件,而在文件內(nèi)部共有16個(gè)不同的segment。因此從理論上講Shuffle consolidation產(chǎn)生的Shuffle文件數(shù)量為C×R,其中C是Spark集群的core number, R是Reducer的個(gè)數(shù)。
很顯然,當(dāng)M=C時(shí),Shuffle consolidation產(chǎn)生的文件數(shù)和之前的實(shí)現(xiàn)相同。
Shuffle consolidation顯著減少了Shuffle文件的數(shù)量,解決了Spark之前實(shí)現(xiàn)中一個(gè)比較嚴(yán)重的問(wèn)題。但是Writer handler的buffer開(kāi)銷(xiāo)過(guò)大依然沒(méi)有減少,若要減少Writer handler的buffer開(kāi)銷(xiāo),只能減少Reducer的數(shù)量,但是這又會(huì)引入新的問(wèn)題。
2. Shuffle Fetch與Aggregator
Shuffle Write寫(xiě)出去的數(shù)據(jù)要被Reducer使用,就需要Shuffle Fetch將所需的數(shù)據(jù)Fetch過(guò)來(lái)。這里的Fetch操作包括本地和遠(yuǎn)端,因?yàn)镾huffle數(shù)據(jù)有可能一部分是存儲(chǔ)在本地的。在早期版本中,Spark對(duì)Shuffle Fetcher實(shí)現(xiàn)了兩套不同的框架:NIO通過(guò)socket連接Fetch數(shù)據(jù);OIO通過(guò)netty server去fetch數(shù)據(jù)。分別對(duì)應(yīng)的類(lèi)是Basic-BlockFetcherIterator和NettyBlockFetcherIterator。
目前在Spark1.5.0中做了優(yōu)化。新版本定義了類(lèi)ShuffleBlockFetcherIterator來(lái)完成數(shù)據(jù)的fetch。對(duì)于local的數(shù)據(jù),ShuffleBlockFetcherIterator會(huì)通過(guò)local的BlockMan-ager來(lái)fetch。對(duì)于遠(yuǎn)端的數(shù)據(jù)塊,它通過(guò)BlockTransferService類(lèi)來(lái)完成。具體實(shí)現(xiàn)參見(jiàn)如下代碼:
? ? ? ? ? ? ? [ShuffleBlockFetcherIterator.scala]
/* fetch local數(shù)據(jù)塊 */
private[this] def fetchLocalBlocks() {
在MapReduce的Shuffle過(guò)程中,Shuffle fetch過(guò)來(lái)的數(shù)據(jù)會(huì)進(jìn)行歸并排序(merge sort),使得相同key下的不同value按序歸并到一起供Reducer使用,這個(gè)過(guò)程如圖3-13所示:
[插圖]
圖3-13 Fetch merge
這些歸并排序都是在磁盤(pán)上進(jìn)行的,這樣做雖然有效地控制了內(nèi)存使用,但磁盤(pán)IO卻大幅增加了。雖然Spark屬于MapReduce體系,但是對(duì)傳統(tǒng)的MapReduce算法進(jìn)行了一定的改變。Spark假定在大多數(shù)應(yīng)用場(chǎng)景下,Shuffle數(shù)據(jù)的排序不是必須的,如word count。強(qiáng)制進(jìn)行排序只會(huì)使性能變差,因此Spark并不在Reducer端做歸并排序。既然沒(méi)有歸并排序,那Spark是如何進(jìn)行reduce的呢?這就涉及下面要講的Shuffle Aggregator了。
Aggregator本質(zhì)上是一個(gè)hashmap,它是以map output的key為key,以任意所要combine的類(lèi)型為value的hashmap。
在做word count reduce計(jì)算count值時(shí),它會(huì)將Shuffle fetch到的每一個(gè)key-value對(duì)更新或是插入hashmap中(若在hashmap中沒(méi)有查找到,則插入其中;若查找到,則更新value值)。這樣就不需要預(yù)先把所有的key-value進(jìn)行merge sort,而是來(lái)一個(gè)處理一個(gè),省去了外部排序這一步驟。但同時(shí)需要注意的是,reducer的內(nèi)存必須足以存放這個(gè)partition的所有key和count值,因此對(duì)內(nèi)存有一定的要求。
在上面word count的例子中,因?yàn)関alue會(huì)不斷地更新,而不需要將其全部記錄在內(nèi)存中,因此內(nèi)存的使用還是比較少的。考慮一下如果是groupByKey這樣的操作,Reducer需要得到key對(duì)應(yīng)的所有value。在Hadoop MapReduce中,由于有了歸并排序,因此給予Reducer的數(shù)據(jù)已經(jīng)是group by key了,而Spark沒(méi)有這一步,因此需要將key和對(duì)應(yīng)的value全部存放在hashmap中,并將value合并成一個(gè)array??梢韵胂鬄榱四軌虼娣潘袛?shù)據(jù),用戶(hù)必須確保每一個(gè)partition小到內(nèi)存能夠容納,這對(duì)于內(nèi)存是非常嚴(yán)峻的考驗(yàn)。因此在Spark文檔中,建議用戶(hù)涉及這類(lèi)操作時(shí)盡量增加partition,也就是增加Mapper和Reducer的數(shù)量。
增加Mapper和Reducer的數(shù)量固然可以減小partition的大小,使內(nèi)存可以容納這個(gè)partition。但是在Shuffle write中提到,bucket和對(duì)應(yīng)于bucket的write handler是由Mapper和Reducer的數(shù)量決定的,task越多,bucket就會(huì)增加得更多,由此帶來(lái)write handler所需的buffer也會(huì)更多。在一方面我們?yōu)榱藴p少內(nèi)存的使用采取了增加task數(shù)量的策略,另一方面task數(shù)量增多又會(huì)帶來(lái)buffer開(kāi)銷(xiāo)更大的問(wèn)題,因此陷入了內(nèi)存使用的兩難境地。
為了減少內(nèi)存的使用,只能將Aggregator的操作從內(nèi)存移到磁盤(pán)上進(jìn)行,因此Spark新版本中提供了外部排序的實(shí)現(xiàn),以解決這個(gè)問(wèn)題。
Spark將需要聚集的數(shù)據(jù)分為兩類(lèi):不需要?dú)w并排序和需要?dú)w并排序的數(shù)據(jù)。對(duì)于前者,在內(nèi)存中的AppendOnlyMap中對(duì)數(shù)據(jù)聚集。對(duì)于需要?dú)w并排序的數(shù)據(jù),現(xiàn)在內(nèi)存中進(jìn)行聚集,當(dāng)內(nèi)存數(shù)據(jù)達(dá)到閾值時(shí),將數(shù)據(jù)排序后寫(xiě)入磁盤(pán)。事實(shí)上,磁盤(pán)上的數(shù)據(jù)只是全部數(shù)據(jù)的一部分,最后將磁盤(pán)數(shù)據(jù)全部進(jìn)行歸并排序和聚集。具體Aggregator的邏輯可以參見(jiàn)Aggregator類(lèi)的實(shí)現(xiàn)。