Spark Shuffle之Hash Shuffle

源文件放在github,如有謬誤之處,歡迎指正。原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/hash-shuffle.md

正如你所知,spark實(shí)現(xiàn)了多種shuffle方法,通過 spark.shuffle.manager來確定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認(rèn)為sort shuffle。本節(jié)主要介紹hash shuffle。

spark在1.2前默認(rèn)為hash shuffle(spark.shuffle.manager = hash),但hash shuffle也經(jīng)歷了兩個發(fā)展階段。

第一階段

上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運(yùn)行,CPU core 數(shù)為 2,可以同時運(yùn)行兩個 task。每個 task 的執(zhí)行結(jié)果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每個 task 包含 R 個緩沖區(qū),R = reducer 個數(shù)(也就是下一個 stage 中 task 的個數(shù)),緩沖區(qū)被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,默認(rèn)是 32KB(Spark 1.1 版本以前是 100KB)。

第二階段

這樣的實(shí)現(xiàn)很簡單,但有幾個問題:

1 產(chǎn)生的 FileSegment 過多。每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件。

2 緩沖區(qū)占用內(nèi)存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個 bucket。雖然一個 ShuffleMapTask 結(jié)束后,對應(yīng)的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達(dá)到 cores R 個(一般 worker 同時可以運(yùn)行 cores 個 ShuffleMapTask),占用的內(nèi)存空間也就達(dá)到了cores * R * 32 KB。對于 8 核 1000 個 reducer 來說,占用內(nèi)存就是 256MB。

spark.shuffle.consolidateFiles默認(rèn)為false,如果為true,shuffleMapTask輸出文件可以被合并。如圖

可以明顯看出,在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數(shù)降為 cores * R。但是緩存空間占用大還沒有解決。

總結(jié)

優(yōu)點(diǎn)

  1. 快-不需要排序,也不需要維持hash表
  2. 不需要額外空間用作排序
  3. 不需要額外IO-數(shù)據(jù)寫入磁盤只需一次,讀取也只需一次

缺點(diǎn)

  1. 當(dāng)partitions大時,輸出大量的文件(cores * R),性能開始降低
  2. 大量的文件寫入,使文件系統(tǒng)開始變?yōu)殡S機(jī)寫,性能比順序?qū)懸档?00倍
  3. 緩存空間占用比較大

當(dāng)然,數(shù)據(jù)經(jīng)過序列化、壓縮寫入文件,讀取的時候,需要反序列化、解壓縮。reduce fetch的時候有一個非常重要的參數(shù)spark.reducer.maxSizeInFlight,這里用 softBuffer 表示,默認(rèn)大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。如果增大,reduce請求的chunk就會變大,可以提高性能,但是增加了reduce的內(nèi)存使用量。

如果排序在reduce不強(qiáng)制執(zhí)行,那么reduce只返回一個依賴于map的迭代器。如果需要排序, 那么在reduce端,調(diào)用ExternalSorter。

參考文獻(xiàn)

spark Architecture:Shuffle

shuffle 過程

sort shuffle

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容