什么是shuffer
- 寬依賴之間會劃分stage,而Stage之間就是Shuffle
Spark在DAG調(diào)度階段會將一個Job劃分為多個Stage,上游Stage做map工作,下游Stage做reduce工作,其本質(zhì)上還是MapReduce計算框架。Shuffle是連接map和reduce之間的橋梁,它將map的輸出對應(yīng)到reduce輸入中,這期間涉及到序列化反序列化、跨節(jié)點網(wǎng)絡(luò)IO以及磁盤讀寫IO等,所以說Shuffle是整個應(yīng)用程序運行過程中非常昂貴的一個階段,理解Spark Shuffle原理有助于優(yōu)化Spark應(yīng)用程序。
在DAG階段以shuffle為界,劃分stage,上游stage做map task,每個map task將計算結(jié)果數(shù)據(jù)分成多份,每一份對應(yīng)到下游stage的每個partition中,并將其臨時寫到磁盤,該過程叫做shuffle write;下游stage做reduce task,每個reduce task通過網(wǎng)絡(luò)拉取上游stage中所有map task的指定分區(qū)結(jié)果數(shù)據(jù),該過程叫做shuffle read,最后完成reduce的業(yè)務(wù)邏輯。
- 基于Hash的Shuffle實現(xiàn)

- 基于Sort的Shuffle實現(xiàn)(現(xiàn)在采用的機制)

1.2之前是HashShuffleManager
1.2版本之前這時候每一個Mapper會根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M R ,其中M是Map的個數(shù),R是Reduce的個數(shù)。這樣會產(chǎn)生大量的小文件,對文件系統(tǒng)壓力很大,而且也不利于IO吞吐量。后面忍不了了就做了優(yōu)化,把在同一core上運行的多個Mapper 輸出的合并到同一個文件,這樣文件數(shù)目就變成了 cores R 個了
2.1版本 分為三種writer機制, 分為 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter。

-上面是使用哪種 writer 的判斷依據(jù), 是否開啟 mapSideCombine 這個判斷,是因為有些算子會在 map 端先進行一次 combine, 減少傳輸數(shù)據(jù)。
-因為 BypassMergeSortShuffleWriter 會臨時輸出Reducer個(分區(qū)數(shù)目)小文件,所以分區(qū)數(shù)必須要小于一個閥值,默認(rèn)是小于200。
-UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始數(shù)據(jù)首先被序列化處理,并且再也不需要反序列,在其對應(yīng)的元數(shù)據(jù)被排序后,需要Serializer支持relocation,在指定位置讀取對應(yīng)數(shù)據(jù)。
BypassMergeSortShuffleWriter 實現(xiàn)細(xì)節(jié)
BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter實現(xiàn)基本一致, 唯一的區(qū)別在于,map端的多個輸出文件會被匯總為一個文件。 所有分區(qū)的數(shù)據(jù)會合并為同一個文件,會生成一個索引文件,是為了索引到每個分區(qū)的起始地址,可以隨機 access 某個partition的所有數(shù)據(jù)。
SortShuffleWriter 實現(xiàn)細(xì)節(jié)
我們可以先考慮一個問題,假如我有 100億條數(shù)據(jù),但是我們的內(nèi)存只有1M,但是我們磁盤很大, 我們現(xiàn)在要對這100億條數(shù)據(jù)進行排序,是沒法把所有的數(shù)據(jù)一次性的load進行內(nèi)存進行排序的,這就涉及到一個外部排序的問題,我們的1M內(nèi)存只能裝進1億條數(shù)據(jù),每次都只能對這 1億條數(shù)據(jù)進行排序,排好序后輸出到磁盤,總共輸出100個文件,最后怎么把這100個文件進行merge成一個全局有序的大文件。我們可以每個文件(有序的)都取一部分頭部數(shù)據(jù)最為一個 buffer, 并且把這 100個 buffer放在一個堆里面,進行堆排序,比較方式就是對所有堆元素(buffer)的head元素進行比較大小, 然后不斷的把每個堆頂?shù)?buffer 的head 元素 pop 出來輸出到最終文件中, 然后繼續(xù)堆排序,繼續(xù)輸出。如果哪個buffer 空了,就去對應(yīng)的文件中繼續(xù)補充一部分?jǐn)?shù)據(jù)。最終就得到一個全局有序的大文件。
如果你能想通我上面舉的例子,就差不多搞清楚sortshufflewirter的實現(xiàn)原理了,因為解決的是同一個問題。
SortShuffleWriter 中的處理步驟就是
使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在內(nèi)存中進行排序, 排序的 K 是(partitionId, hash(key)) 這樣一個元組。
如果超過內(nèi)存 limit, 我 spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根據(jù) hash(key)進行比較排序
如果需要輸出全局有序的文件的時候,就需要對之前所有的輸出文件 和 當(dāng)前內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)中的數(shù)據(jù)進行 merge sort, 進行全局排序
和我們開始提的那個問題基本類似,不同的地方在于,需要對 Key 相同的元素進行 aggregation, 就是使用定義的 func 進行聚合, 比如你的算子是 reduceByKey(+), 這個func 就是加法運算, 如果兩個key 相同, 就會先找到所有相同的key 進行 reduce(+) 操作,算出一個總結(jié)果 Result,然后輸出數(shù)據(jù)(K,Result)元素。
SortShuffleWriter 中使用 ExternalSorter 來對內(nèi)存中的數(shù)據(jù)進行排序,ExternalSorter內(nèi)部維護了兩個集合PartitionedAppendOnlyMap、PartitionedPairBuffer, 兩者都是使用了 hash table 數(shù)據(jù)結(jié)構(gòu), 如果需要進行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某個Key,如果之前存儲過相同key的K-V 元素,就需要進行 aggregation,然后再存入aggregation后的 K-V), 否則使用 PartitionedPairBuffer(只進行添K-V 元素),
1.2之后是SortShuffleManager
創(chuàng)建磁盤文件上的區(qū)別在SHUFFER上游階段進行兩個機制從而減少臨時文件的產(chǎn)生。
shuffleFileGroup機制在上游的每一個Group磁盤文件的數(shù)量與下游stage的task數(shù)量是相同的
consolidate機制允許不同的task復(fù)用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合并,從而大幅度減少磁盤文件的數(shù)量
主要區(qū)別: SortShuffleManager
- 在上游通過兩個機制改變產(chǎn)生的臨時文件數(shù).
Shuffle階段劃分:
shuffle write:mapper階段,上一個stage得到最后的結(jié)果寫出
- SortShuffle-write普通機制
- 排序,先寫入5M的內(nèi)存模型(Map or Aaary),不夠在申請,排序是根據(jù)kyro序列化支持排序的序列化
根據(jù)公式
applyMemory=nowMenory*2-oldMemory
申請的內(nèi)存=當(dāng)前的數(shù)據(jù)內(nèi)存情況*2-上一次的內(nèi)嵌情況
溢寫,通過JAVA的流寫入磁盤BufferedOutputStream, 默認(rèn)默認(rèn)的batch數(shù)量是10000條.
merge,一個task將所有數(shù)據(jù)寫入內(nèi)存數(shù)據(jù)結(jié)構(gòu)的過程中,會發(fā)生多次磁盤溢寫操作,也就會產(chǎn)生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合并成1個磁盤文件,這就是merge過程,此時會將之前所有臨時磁盤文件中的數(shù)據(jù)讀取出來,然后依次寫入最終的磁盤文件之中。此外,由于一個task就只對應(yīng)一個磁盤文件,也就意味著該task為Reduce端的stage的task準(zhǔn)備的數(shù)據(jù)都在這一個文件中. 還包括一個索引文件 記錄這
標(biāo)識了下游各個task的數(shù)據(jù)在文件中的start offset與end offset。
- bypassShuffle-write機制(不排序)
shuffle read :reduce階段,下一個stage拉取上一個stage進行合并
觸發(fā)條件:
shuffle map task數(shù)量小于spark.shuffle.sort.bypassMergeThreshold=200參數(shù)的值。
不是聚合類的shuffle算子(比如reduceByKey)。
也是根據(jù)KEY 的hash取余來寫入磁盤,跟未經(jīng)優(yōu)化的HashShuffleManager是一樣,只是最后做了帶索引的文件跟合并文件, 也是有 溢寫跟merge
磁盤寫機制不同,第二不會進行排序
UnsafeShuffle-write機制滿足條件.
- 先序列化在排序機制,可以序列化排序,
- kryo排序& partition數(shù)量不能大于指定闕值2的24次方.因為partition number使用的是24bit.
-
一般滿足上述條件才會執(zhí)行,
1617505599.png
1617505971(1).png

