Shuffle 基本概念
概述:
Shuffle描述著數(shù)據(jù)從map task輸出到reduce task 輸入的這段過程。在分布式情況下,reduce task需要跨節(jié)點拉取其它節(jié)點上的map task結(jié)果。
當Map的輸出結(jié)果要被Reduce使用時,輸出結(jié)果需要按key哈希,并且分發(fā)到每一個Reducer上去,這個過程就是shuffle。
由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。
Spark 的Shuffle 分為 Write,Read 兩階段
- Write 對應(yīng)的是ShuffleMapTask,具體的寫操作ExternalSorter來負責(zé)
- Read 階段由ShuffleRDD里的HashShuffleReader來完成。如果拉來的數(shù)據(jù)如果過大,需要落地,則也由ExternalSorter來完成的
- 所有Write 寫完后,才會執(zhí)行Read。 他們被分成了兩個不同的Stage階段。
Shuffle Write ,Shuffle Read 兩階段都可能需要落磁盤,并且通過Disk Merge 來完成最后的Sort歸并排序。
Spark的Shuffle機制
Spark中的Shuffle是把一組無規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組具有一定規(guī)則的數(shù)據(jù)。
Shuffle就是包裹在各種需要重分區(qū)的算子之下的一個對數(shù)據(jù)進行重新組合的過程。
Shuffle將數(shù)據(jù)進行收集分配到指定Reduce分區(qū),Reduce階段根據(jù)函數(shù)對相應(yīng)的分區(qū)做Reduce所需的函數(shù)處理。
Shuffle的基本流程
bucket是一個抽象概念,在實現(xiàn)中每個bucket可以對應(yīng)一個文件,可以對應(yīng)文件的一部分或是其他等

- 首先每一個Mapper會根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M×R,其中M是Map的個數(shù),R是Reduce的個數(shù)。
- 其次Mapper產(chǎn)生的結(jié)果會根據(jù)設(shè)置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據(jù)key哈希到不同的bucket中去。
- 當Reducer啟動時,它會根據(jù)自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進行處理。
Spark中Shuffle類型
Hash Shuffle:
第一版是每個map產(chǎn)生r個文件,一共產(chǎn)生mr個文件,但是產(chǎn)生的中間文件太大影響擴展性。而后進行修改,讓一個core上的map共用文件,減少文件數(shù)目,這樣共產(chǎn)生core個文件,但中間文件數(shù)目仍隨任務(wù)數(shù)線性增加,仍然難以對應(yīng)大作業(yè)。
Sort Shuffle
每個map產(chǎn)生一個文件,同時也優(yōu)化了性能,減少網(wǎng)絡(luò)IO,徹底解決了擴展性問題。
本文只是對Shuffle作了初步的描述,了解基本概念
問題
今天遇到如下問題,特來了解一下。
17/02/06 11:50:21 ERROR Executor: Exception in task 0.0 in stage 857456.0 (TID 437542)
java.io.FileNotFoundException: /tmp/spark-be115c66-a319-4931-a2ca-81ae9e7a6198/executor-54de96d2-5256-4637-b474-4342b00e755a/blockmgr-0c1c3d9f-c5d7-4b1c-bc12-7773083fa181/18/shuffle_426055_0_0.data.5874ce88-94f5-4c34-b56a-f729d4d4e393 (No such file or directory)
at java.io.FileOutputStream.open(Native Method)
at java.io.FileOutputStream.<init>(FileOutputStream.java:212)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedFile(BypassMergeSortShuffleWriter.java:182)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:159)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
at org.apache.spark.scheduler.Task.run(Task.scala:85)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:722)
參考網(wǎng)上的解決方案:
一般造成此問題的是系統(tǒng)資源不夠用
修改啟動參數(shù):
- 添加:--conf spark.shuffle.manager=SORT
Spark默認的shuffle采用Hash模式,會產(chǎn)生相當規(guī)模的文件,與此同時帶來了大量的內(nèi)存開銷 - 是因為一個excutor給分配的內(nèi)存不夠,此時,減少excutor-core的數(shù)量,加大excutor-memory的值應(yīng)該就沒有問題。
參考:
http://blog.jasonding.top/2015/07/14/Spark/【Spark】Spark的Shuffle機制/
http://www.itdecent.cn/p/c83bb237caa8
https://github.com/JerryLead/SparkInternals/blob/master/markdown/4-shuffleDetails.md