1. 概述
shuffle 就是對數(shù)據(jù)進行重組,由于分布式計算的特性和要求,在實現(xiàn)細節(jié)上更加繁瑣和復(fù)雜.
在 MapReduce框架,Shuffle 是連接 Map 和 Reduce 之間的橋梁,Map 階段通過 shuffle 讀取數(shù)據(jù)并輸出到對應(yīng)的 Reduce ;而 Reduce 階段負責從 Map 端拉取數(shù)據(jù)并進行計算,在整個 shuffle 過程中,往往伴隨著大量的磁盤和網(wǎng)絡(luò) I/O.所以 shuffle 性能的高低也直接決定了整個程序的性能高低,Spark 也會有自己的 shuffle 實現(xiàn)過程.

2 .spark 中的shuffle介紹
在 DAG 調(diào)度的過程中,Stage 階段的劃分是根據(jù)是否有 shuffle 過程,也就是存在 ShuffleDependency 寬依賴的時候,需要進行 shuffle ,這時候會將作業(yè) job 劃分成多個 Stage ,并且在劃分 Stage 的時候,構(gòu)建 ShuffleDependency 的時候進行 shuffle 注冊,獲取后續(xù)數(shù)據(jù)讀取所需要的 ShuffleHandle ,最終每一個 Job 提交后都會生成一個 ResultStage 和 若干個 ShuffleMapStage ,其在中 ResultStage 表示生成作業(yè)的最終結(jié)果所在的Stage.
ResultStage 與 ShuffleMapStage 中的 task 分別對應(yīng)著 RedultTask 與 ShuffleMapTask.
一個作業(yè),除了最終的 ResultStage 外,其他若干 ShuffleMapStage 中各個 ShuffleMapTask 需要將最終的數(shù)據(jù)根據(jù)相應(yīng)的 Partitioner 對數(shù)據(jù)進行分組,然后持久化分區(qū)的數(shù)據(jù).
2.1 HashShuffle 機制
2.1.1 HashShuffle 概述
在 spark-1.6 版本之前,一直使用 HashShuffle, 在 Spark-1.6 版本之后使用 Sort-Base Shuffle,因為 HashShuffle 存在的不足所以就替換了 HashShuffle.
我們知道,Spark 的運行主要分為 2 部分,一部分是 驅(qū)動程序,其核心是 SparkContext; 另一部分是 Worker 節(jié)點上 Task ,它是運行 實際任務(wù)的,程序運行的時候,Driver 和 Executor 進程相互交互,運行什么任務(wù),即 Driver 會分配 Task 到 Executor, Driver 跟 Executor 進行網(wǎng)絡(luò)傳輸,任務(wù)數(shù)據(jù)從哪兒獲取,即 Task 要從 Driver 抓取其他上游的 Task 的數(shù)據(jù)結(jié)果,所以有這個過程中就不斷的產(chǎn)生網(wǎng)絡(luò)結(jié)果,其中,下一個 Stage 向上一個 Stage 要數(shù)據(jù)這個過程,我們就稱為 Shuffle.
2.1.2 沒有優(yōu)化之前的 HashShuffle 機制

在HashShuffle 沒有優(yōu)化之前,每一個 ShuffleMapTask 會為每一個 ReduceTask 創(chuàng)建一個 bucket 緩存,并且會為每一個 bucket 創(chuàng)建一個文件.這個bucket 存放的數(shù)據(jù)就是經(jīng)過 Partitioner 操作(默認是 HashPartitioner)之后,找到對應(yīng)的 bucket 然后放進去,最后將數(shù)據(jù)
刷新 bucket 緩存的數(shù)據(jù)到磁盤上, 即對應(yīng)的 block file.
然后 ShuffleMapTask 將輸出作為 MapStatus 發(fā)送到 DAGScheduler 的
MapOutPutTrackerMaster ,每一個 MapStatus 包含了每一個 ResultTask 要拉取的數(shù)據(jù)的位置和大小.
ResultTask 然后去利用 BlockStoreShuffleFetcher 向 MapOutPutTrackerMaster 獲取 MapStatus ,看哪一份數(shù)據(jù)是屬于自己的,然后底層通過 BlockManager 將數(shù)據(jù)拉取過來.
拉取過來的數(shù)據(jù)會組成一個 內(nèi)部的 ShuffleRDD,優(yōu)先放入內(nèi)存,內(nèi)存不夠則放入磁盤,然后 ResulTask 開始進行聚合,最后生成我們希望獲取的那個 MapPartitionRDD.
存在的問題
如圖所示: 在這里有 1 個worker, 2 個 executor ,每一個 executor 運行兩個 ShuffleMapTask,有三個 ReduceTask,所以總共就有 4 * 3 = 12 個bucket 和 12 個 block file.
如果數(shù)據(jù)量較大,將會生成 M * R 個小文件,比如 ShuffleMapTask 有 100 , ResultTask 有 100 個,這就會產(chǎn)生 100 * 100 = 1000000 個小文件.
bucket 緩存很重要,需要將 ShuffleMapTask 所有的數(shù)據(jù)都寫入 bucktet, 才會刷到 磁盤,那么如果 Map 端數(shù)據(jù)過多,這就很容易造成內(nèi)存溢出,盡管后面有優(yōu)化, bucket 寫入的數(shù)據(jù)達到刷新到磁盤的閥值之后,就會將數(shù)據(jù)一點點的刷新到磁盤,但是這樣 磁盤的 I/O 就多了.
$ 2.1.2 優(yōu)化的 HashShuffle

每一個 Executor 進程根據(jù)核數(shù),決定 Task 的并發(fā)數(shù)量,比如 executor 核心數(shù)是2 , 就是可以并發(fā)運行兩個 task ,如果是一個 則只能運行一個 task.
假設(shè) executor 核心數(shù)是1 , ShuffleMapTask 數(shù)量是 M, 那么塔依然會根據(jù) ReduceTask 的數(shù)量T , 創(chuàng)建 R 個 bucket 緩存,然后對 Key 進行 Hash ,數(shù)據(jù)進入不同的 bucket 中,每一個 bucket 對應(yīng)一個 block file ,用于刷新 bucket 緩存里的數(shù)據(jù)。
然后下一個 task 運行的時候,那么不會再創(chuàng)建新的 bucket 和 block file,而是復(fù)用之前的 task 已經(jīng)創(chuàng)建好的 bucket 和 block file 。即所謂的用一個 Executor 進程里所有的 Task 都會把相同的 Key 放入相同的 bucket 緩沖中。
這樣的話,生成文件的數(shù)量就是 ( 本地 worker 的 executor 數(shù)量 * executor 的 cores * ResultTask 數(shù)量) 如上如所示,。即 2 * 1 * 3 = 6 個文件,每一個 Executor 的 shuffleMapTask 數(shù)量 100, 那么 未優(yōu)化的 HasjShuffle 的文件數(shù) 2 * 1 * 100 100 = 2090000,優(yōu)化之后的數(shù)量是 2 1 * 100 = 200 文件,相當于少了100 倍.
存在的問題
如果 Reducer 端的并行任務(wù)或者是數(shù)據(jù)分片過多的話 則 Core * Reducer Task 依舊過大,也會產(chǎn)生很多小文件.
2.2 Sort-Based Shuffle 機制
2.2.1 概述
HashShuffle 回顧
HashShuffle 寫數(shù)據(jù)的時候,內(nèi)存有一個 bucket 緩沖區(qū),同時在本地磁盤有對應(yīng)的本地文本,如果本地有文件,那么在內(nèi)存應(yīng)該也有文件句柄也是需要消耗內(nèi)存的,也就是說,從內(nèi)存的角度考慮,即有一部分存儲數(shù)據(jù),一部分管理文件句柄,如果 Mapper 分片數(shù)量為 1000 , Reduce 分片數(shù)量為 1000 ,那么總共就需要 1000000 個小文件,所以就會有很多內(nèi)存的消耗,頻繁 IO 以及 GC 頻繁 或者 出現(xiàn)內(nèi)存溢出.
2.2.2 Sorted-Based Shuffle
為了緩解 Shuffle 過程產(chǎn)生文件過多 和 Writer 緩解開銷過大的問題, spark 引入了類似于 Hadoop Map-Reduce 的 Shuffle 機制,該機制每一個 ShuffleMapTask 不會為后續(xù)的任務(wù)創(chuàng)建單獨的文件,而是會將所有的 Task 結(jié)果寫入同一個文件,并且對應(yīng)生成一個 索引文件,以前的數(shù)據(jù)是放在內(nèi)存緩存中,等到數(shù)據(jù)完了在刷到磁盤,現(xiàn)在為了減少內(nèi)存的使用,在內(nèi)存不夠用的時候,可以將輸出溢寫到磁盤,結(jié)束的時候,再將這些不同的文件聯(lián)合內(nèi)存的數(shù)據(jù)一起進行歸并,從而減少內(nèi)存的使用量,一方面文件數(shù)量顯著減少,另一方面減少 Writer 緩存所占用的內(nèi)存大小,從而同時避免 GC 的風險和頻率.

####### 對于 BypassMergeSortShuffleWriter ,使用這個模式特點
主要用于處理不需要排序和聚合的 Shuffle 操作,所以數(shù)據(jù)是直接寫入文件,數(shù)據(jù)量較大的時候,網(wǎng)絡(luò) I/O 和內(nèi)存負擔較重.
主要適合處理 Reducer 任務(wù)數(shù)量較少的情況下
將每一個分區(qū)寫入一個單獨的文件,最后將這些文件合并,減少文件數(shù)量,但是這種方式需要并發(fā)打開多個文件,對內(nèi)存消耗比較大.
因為 BypassMergeSortShuffleWriter
補充
另外這個 Sort-Based Shuffle 跟 Executor 核心沒有關(guān)系,即跟并發(fā)度沒有關(guān)系,它是每一個 ShuffleMapTask 都會產(chǎn)生一個 data文件和 index 文件,所謂合并也只是將該 ShuffleMapTask 的各個 partition 對應(yīng)的分區(qū)文件合并到data文件而已,所以這個就需要優(yōu)化后的 HashShuffle 的區(qū)別開來.
比較適合數(shù)據(jù)量很大的場景或者集群規(guī)模很大
引入了外部排序器,可以支持在 Map 端進行本地聚合或者不聚合
如果外部排序器 enable 了 Spill 功能,如果內(nèi)存不夠,可以先將輸出溢寫 到本地磁盤, 最后將 內(nèi)存結(jié)果和本地磁盤的溢寫文件進行合并.
對于 UnsafeShuffleWriter 由于需要謹慎使用,我們暫時不做分析了.