1. shuffle原理
概述:Shuffle描述著數(shù)據(jù)從map task輸出到reduce task輸入的這段過(guò)程。在分布式情況下,reduce task需要跨節(jié)點(diǎn)取拉取其他節(jié)點(diǎn)的map task記過(guò)。這一過(guò)程將會(huì)產(chǎn)生網(wǎng)絡(luò)資源、內(nèi)存、磁盤(pán)IO的消耗。
1.1 mapreduce的shuffle原理
1.1.1 map task端操作
每個(gè)map task都有一個(gè)內(nèi)存緩沖區(qū)(默認(rèn)是100MB),存儲(chǔ)著map的輸出結(jié)果,當(dāng)緩沖區(qū)塊滿的時(shí)候需要將緩沖區(qū)的數(shù)據(jù)以一個(gè)臨時(shí)文件的方式存放到磁盤(pán),當(dāng)整個(gè)map task結(jié)束后再對(duì)磁盤(pán)中這個(gè)map task產(chǎn)生的所有臨時(shí)文件做合并,生成最終的正式輸出文件,然后等待reduce task來(lái)拉取數(shù)據(jù)。
Spill過(guò)程:這個(gè)從內(nèi)存往磁盤(pán)寫(xiě)數(shù)據(jù)的過(guò)程被成為Spill(溢寫(xiě))。整個(gè)緩沖區(qū)有個(gè)溢寫(xiě)的比例spill.percent(默認(rèn)是0.8),當(dāng)達(dá)到閾值時(shí)map task可以繼續(xù)往剩余的memory寫(xiě),同時(shí)溢寫(xiě)線程鎖定已用memory,先對(duì)key(序列化的字節(jié))做排序,若果client程序設(shè)置了Combiner,那么在溢寫(xiě)的過(guò)程中就會(huì)進(jìn)行局部聚合。
Merge過(guò)程:每次溢寫(xiě)都會(huì)生成一個(gè)臨時(shí)文件,在map task真正完成時(shí)會(huì)將這些文件歸并成一個(gè)文件,這個(gè)過(guò)程叫做Merge。
1.1.2 reduce task端操作
當(dāng)所有的map task執(zhí)行完成,對(duì)應(yīng)節(jié)點(diǎn)的reduce task開(kāi)始啟動(dòng),簡(jiǎn)單地說(shuō),此階段就是不斷拉?。‵etcher)每個(gè)map task所在節(jié)點(diǎn)的最終結(jié)果,然后不斷地merge形成reduce task的輸入文件。
Copy階段:Reduce進(jìn)程啟動(dòng)一些數(shù)據(jù)copy線程(Fetch)通過(guò)HTTP協(xié)議拉取map階段輸出文件。
Merge過(guò)程: Copy過(guò)來(lái)的數(shù)據(jù)會(huì)先放到內(nèi)存緩沖區(qū)(基于JVM的heap size設(shè)置),如果內(nèi)存緩沖區(qū)不足也會(huì)發(fā)生map task的spill(sort 默認(rèn), combine可選),多個(gè)溢寫(xiě)文件時(shí)也會(huì)發(fā)生map task的merge
關(guān)于排序方法:
在Map階段,k-v溢寫(xiě)時(shí),采用的正是快排;而溢出文件的合并使用的則是歸并;在Reduce階段,通過(guò)shuffle從Map獲取的文件合并的時(shí)候采用的也是歸并。
1.2 spark現(xiàn)在的SortShuffleManager
SortShuffleManager運(yùn)行原理
SortShuffleManager運(yùn)行機(jī)制主要分成兩種:
- 一種是普通運(yùn)行機(jī)制
- 另一種是bypass運(yùn)行機(jī)制
當(dāng)shuffle read task的數(shù)量小于等于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值時(shí)(默認(rèn)為200),就會(huì)起用bypass機(jī)制
普通運(yùn)行機(jī)制
下圖說(shuō)明了普通的SortShuffleManager原理。在該模式下,數(shù)據(jù)會(huì)先寫(xiě)入一個(gè)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中,此時(shí)根據(jù)不同的shuffle算子,可能選用不同的數(shù)據(jù)結(jié)構(gòu)。如果是reduceByKey這種聚合類(lèi)的shuffle算子,那么會(huì)選用Map數(shù)據(jù)結(jié)構(gòu),如果是join就使用Array,Map是邊聚合邊寫(xiě)內(nèi)存,array是直接寫(xiě)內(nèi)存。接著,每寫(xiě)一條數(shù)據(jù)進(jìn)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)之后,就會(huì)判斷一下,是否達(dá)到了某個(gè)臨界閾值。如果達(dá)到臨界閾值的話,那么就會(huì)嘗試將內(nèi)存數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)溢寫(xiě)到磁盤(pán),然后清空數(shù)據(jù)結(jié)構(gòu)。
在溢寫(xiě)到磁盤(pán)文件之前,會(huì)先根據(jù)key對(duì)內(nèi)存數(shù)據(jù)結(jié)構(gòu)中已有的數(shù)據(jù)進(jìn)行排序。排序過(guò)后,會(huì)分批將數(shù)據(jù)寫(xiě)入磁盤(pán)文件。默認(rèn)batch的數(shù)量是10000條,也就是說(shuō),排序號(hào)的數(shù)據(jù),會(huì)以每批1萬(wàn)條數(shù)據(jù)的形式分批寫(xiě)入磁盤(pán)文件。寫(xiě)入磁盤(pán)文件是通過(guò)Java的BufferedOutputStream(緩沖輸出流)實(shí)現(xiàn)的。首先會(huì)將數(shù)據(jù)緩沖在內(nèi)存中,當(dāng)內(nèi)存緩沖滿溢之后,再一次寫(xiě)入磁盤(pán)文件中。
一個(gè)task將所有數(shù)據(jù)寫(xiě)入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過(guò)程中,會(huì)發(fā)生多次磁盤(pán)溢寫(xiě)操作,也就會(huì)產(chǎn)生多個(gè)臨時(shí)文件。最終會(huì)將之前所有的臨時(shí)文件進(jìn)行合并,這就是merge過(guò)程,一個(gè)task就只對(duì)應(yīng)一個(gè)磁盤(pán)文件,也就意味著該task為下游stage的task準(zhǔn)備的數(shù)據(jù)都在這一個(gè)文件中,因此還會(huì)單獨(dú)寫(xiě)一份索引文件,其中標(biāo)識(shí)了下游各個(gè)task的數(shù)據(jù)在文件中的start offset與end offset。
SortShuffleManager由于有一個(gè)磁盤(pán)文件merge過(guò)程,因此大大減少了文件數(shù)量。

bypass運(yùn)行機(jī)制
bypass運(yùn)行機(jī)制觸發(fā)條件:
- shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold參數(shù)的值(默認(rèn)為200)
- 不是排序類(lèi)的shuffle算子
此時(shí)task會(huì)為每個(gè)下游task都創(chuàng)建一個(gè)臨時(shí)磁盤(pán)文件,并將數(shù)據(jù)按key進(jìn)行hash后根據(jù)key的hash值,將key寫(xiě)如對(duì)應(yīng)的磁盤(pán)文件中。當(dāng)然,寫(xiě)入磁盤(pán)文件時(shí)也是先寫(xiě)入內(nèi)存緩沖,緩沖寫(xiě)滿之后再溢寫(xiě)到磁盤(pán)文件。最后同樣會(huì)將所有的臨時(shí)磁盤(pán)文件合并成一個(gè)磁盤(pán)文件,并創(chuàng)建一個(gè)單獨(dú)的索引文件。
該過(guò)程的磁盤(pán)寫(xiě)機(jī)制其實(shí)跟未經(jīng)優(yōu)化的HashShuffleManager是一模一樣的,因?yàn)槎家獎(jiǎng)?chuàng)建數(shù)量驚人的磁盤(pán)文件,只是在最后會(huì)做一個(gè)磁盤(pán)文件合并而已。因此少量的最終磁盤(pán)文件,也讓該機(jī)制相對(duì)未經(jīng)優(yōu)化的HashShuffleManager來(lái)說(shuō),shuffle read的性能更高。
該機(jī)制與普通SortShuffleManager運(yùn)行機(jī)制不同在于: - 第一,磁盤(pán)寫(xiě)機(jī)制不同
- 第二,不會(huì)進(jìn)行排序
也就是說(shuō),啟用該機(jī)制的最大好處在于:shuffle write過(guò)程中,不需要進(jìn)行數(shù)據(jù)的排序操作,也就是節(jié)省掉了這部分性能開(kāi)銷(xiāo)。