源文件放在github,如有謬誤之處,歡迎指正。原文鏈接https://github.com/jacksu/utils4s/blob/master/spark-knowledge/md/hash-shuffle.md
正如你所知,spark實(shí)現(xiàn)了多種shuffle方法,通過 spark.shuffle.manager來確定。暫時總共有三種:hash shuffle、sort shuffle和tungsten-sort shuffle,從1.2.0開始默認(rèn)為sort shuffle。本節(jié)主要介紹hash shuffle。
spark在1.2前默認(rèn)為hash shuffle(spark.shuffle.manager = hash),但hash shuffle也經(jīng)歷了兩個發(fā)展階段。
第一階段
上圖有 4 個 ShuffleMapTask 要在同一個 worker node 上運(yùn)行,CPU core 數(shù)為 2,可以同時運(yùn)行兩個 task。每個 task 的執(zhí)行結(jié)果(該 stage 的 finalRDD 中某個 partition 包含的 records)被逐一寫到本地磁盤上。每個 task 包含 R 個緩沖區(qū),R = reducer 個數(shù)(也就是下一個 stage 中 task 的個數(shù)),緩沖區(qū)被稱為 bucket,其大小為spark.shuffle.file.buffer.kb ,默認(rèn)是 32KB(Spark 1.1 版本以前是 100KB)。
第二階段
這樣的實(shí)現(xiàn)很簡單,但有幾個問題:
1 產(chǎn)生的 FileSegment 過多。每個 ShuffleMapTask 產(chǎn)生 R(reducer 個數(shù))個 FileSegment,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個文件。一般 Spark job 的 M 和 R 都很大,因此磁盤上會存在大量的數(shù)據(jù)文件。
2 緩沖區(qū)占用內(nèi)存空間大。每個 ShuffleMapTask 需要開 R 個 bucket,M 個 ShuffleMapTask 就會產(chǎn)生 M * R 個 bucket。雖然一個 ShuffleMapTask 結(jié)束后,對應(yīng)的緩沖區(qū)可以被回收,但一個 worker node 上同時存在的 bucket 個數(shù)可以達(dá)到 cores R 個(一般 worker 同時可以運(yùn)行 cores 個 ShuffleMapTask),占用的內(nèi)存空間也就達(dá)到了cores * R * 32 KB。對于 8 核 1000 個 reducer 來說,占用內(nèi)存就是 256MB。
spark.shuffle.consolidateFiles默認(rèn)為false,如果為true,shuffleMapTask輸出文件可以被合并。如圖
可以明顯看出,在一個 core 上連續(xù)執(zhí)行的 ShuffleMapTasks 可以共用一個輸出文件 ShuffleFile。先執(zhí)行完的 ShuffleMapTask 形成 ShuffleBlock i,后執(zhí)行的 ShuffleMapTask 可以將輸出數(shù)據(jù)直接追加到 ShuffleBlock i 后面,形成 ShuffleBlock i',每個 ShuffleBlock 被稱為 FileSegment。下一個 stage 的 reducer 只需要 fetch 整個 ShuffleFile 就行了。這樣,每個 worker 持有的文件數(shù)降為 cores * R。但是緩存空間占用大還沒有解決。
總結(jié)
優(yōu)點(diǎn)
- 快-不需要排序,也不需要維持hash表
- 不需要額外空間用作排序
- 不需要額外IO-數(shù)據(jù)寫入磁盤只需一次,讀取也只需一次
缺點(diǎn)
- 當(dāng)partitions大時,輸出大量的文件(cores * R),性能開始降低
- 大量的文件寫入,使文件系統(tǒng)開始變?yōu)殡S機(jī)寫,性能比順序?qū)懸档?00倍
- 緩存空間占用比較大
當(dāng)然,數(shù)據(jù)經(jīng)過序列化、壓縮寫入文件,讀取的時候,需要反序列化、解壓縮。reduce fetch的時候有一個非常重要的參數(shù)spark.reducer.maxSizeInFlight,這里用 softBuffer 表示,默認(rèn)大小為 48MB。一個 softBuffer 里面一般包含多個 FileSegment,但如果某個 FileSegment 特別大的話,這一個就可以填滿甚至超過 softBuffer 的界限。如果增大,reduce請求的chunk就會變大,可以提高性能,但是增加了reduce的內(nèi)存使用量。
如果排序在reduce不強(qiáng)制執(zhí)行,那么reduce只返回一個依賴于map的迭代器。如果需要排序, 那么在reduce端,調(diào)用ExternalSorter。