Spark 2.0 中已經(jīng)移除 Hash Based Shuffle,但作為曾經(jīng)的默認 Shuffle 機制,還是值得進行分析
Spark 最開始只有 Hash Based Shuffle,因為在很多場景中并不需要排序,在這些場景中多余的排序反而會損耗性能。
Hash Based Shuffle Write
該過程實現(xiàn)的核心是在 HashShuffleWriter#write(records: Iterator[Product2[K, V]]): Unit 其主要流程如下:

該函數(shù)的輸入是一個 Shuffle Map Task 計算得到的結果(對應的迭代器),若在寬依賴中定義了 map 端的聚合則會先進行聚合,隨后對于迭代器(若要聚合則為聚合后的迭代器)的每一項先通過計算 key 的 hash 值來確定要寫到哪個文件,然后將 key、value 寫入文件。
寫入的文件名的格式是:shuffle_$shuffleId_$mapId_$reduceId。寫入時,若文件已存在會刪除會創(chuàng)建新文件。
上圖描述了如何處理一個 Shuffle Map Task 計算結果,在實際應用中,往往有很多 Shuffle Map Tasks 及下游 tasks,即如下情況(圖摘自:JerryLead/SparkInternals-Shuffle 過程):

存在的問題
這種簡單的實現(xiàn)會有幾個問題,為說明方便,這里設 M = Shuffle Map Task 數(shù)量,R = 下游 tasks 數(shù)量:
- 產(chǎn)生過多文件:由于每個 Shuffle Map Task 需要為每個下游的 Task 創(chuàng)建一個單獨的文件,因此文件的數(shù)量就是
M * R。如果 Shuffle Map Tasks 數(shù)量是 1000,下游的 tasks 數(shù)是 800,那么理論上會產(chǎn)生 80w 個文件(對于 size 為 0的文件會特殊處理) - 打開多個文件對于系統(tǒng)來說意味著隨機寫,尤其是每個文件較小且文件特別多的情況。機械硬盤在隨機讀寫方面的性能很差,如果是固態(tài)硬盤,會改善很多
- 緩沖區(qū)占用內(nèi)存空間大:每個 Shuffle Map Task 需要開 R 個 bucket(為減少寫文件次數(shù)的緩沖區(qū)),N 個 Shuffle Map Task 就會產(chǎn)生
N * R個 bucket。雖然一個 Shuffle Map Task,對應的 buckets 會被回收,但一個節(jié)點上的 bucket 個數(shù)最多可以達到cores * R個,每個 bucket 默認為 32KB。對于 24 核 1000 個 reducer 來說,占用內(nèi)存就是 750MB
改進:Shuffle Consolidate Writer
在上面提到的幾個問題,Spark 提供了 Shuffle Consolidate Files 機制進行優(yōu)化。該機制的手段是減少 Shuffle 過程產(chǎn)生的文件,若使用這個功能,則需要置 spark.shuffle.consolidateFiles 為 true,其實現(xiàn)可用下圖來表示(圖摘自:JerryLead/SparkInternals-Shuffle 過程)

即:對于運行在同一個 core 的 Shuffle Map Tasks,對于將要被同一個 reducer read 的數(shù)據(jù),第一個 Shuffle Map Task 會創(chuàng)建一個文件,之后的就會將數(shù)據(jù)追加到這個文件而不是新建一個文件(相當于同一個 core 上的 Shuffle Map Task 寫了文件不同的部分)。因此文件數(shù)就從原來的 M * R 個變成了 cores * R 個。當 M / cores 的值越大,減少文件數(shù)的效果越顯著。需要注意的是,該機制雖然在很多時候能緩解上述的幾個問題,但是并不能徹底解決。
參考
- 《Spark 技術內(nèi)幕》
- JerryLead/SparkInternals - Shuffle 過程