Spark Shuffle基礎(chǔ)

Shuffle 基本概念

概述:

Shuffle描述著數(shù)據(jù)從map task輸出到reduce task 輸入的這段過程。在分布式情況下,reduce task需要跨節(jié)點拉取其它節(jié)點上的map task結(jié)果。
當Map的輸出結(jié)果要被Reduce使用時,輸出結(jié)果需要按key哈希,并且分發(fā)到每一個Reducer上去,這個過程就是shuffle。
由于shuffle涉及到了磁盤的讀寫和網(wǎng)絡(luò)的傳輸,因此shuffle性能的高低直接影響到了整個程序的運行效率。

Spark 的Shuffle 分為 Write,Read 兩階段

  • Write 對應(yīng)的是ShuffleMapTask,具體的寫操作ExternalSorter來負責(zé)
  • Read 階段由ShuffleRDD里的HashShuffleReader來完成。如果拉來的數(shù)據(jù)如果過大,需要落地,則也由ExternalSorter來完成的
  • 所有Write 寫完后,才會執(zhí)行Read。 他們被分成了兩個不同的Stage階段。
    Shuffle Write ,Shuffle Read 兩階段都可能需要落磁盤,并且通過Disk Merge 來完成最后的Sort歸并排序。

Spark的Shuffle機制

Spark中的Shuffle是把一組無規(guī)則的數(shù)據(jù)盡量轉(zhuǎn)換成一組具有一定規(guī)則的數(shù)據(jù)。

Shuffle就是包裹在各種需要重分區(qū)的算子之下的一個對數(shù)據(jù)進行重新組合的過程。
Shuffle將數(shù)據(jù)進行收集分配到指定Reduce分區(qū),Reduce階段根據(jù)函數(shù)對相應(yīng)的分區(qū)做Reduce所需的函數(shù)處理。

Shuffle的基本流程

bucket是一個抽象概念,在實現(xiàn)中每個bucket可以對應(yīng)一個文件,可以對應(yīng)文件的一部分或是其他等

shuffle-write-no-consolidation.png
  • 首先每一個Mapper會根據(jù)Reducer的數(shù)量創(chuàng)建出相應(yīng)的bucket,bucket的數(shù)量是M×R,其中M是Map的個數(shù),R是Reduce的個數(shù)。
  • 其次Mapper產(chǎn)生的結(jié)果會根據(jù)設(shè)置的partition算法填充到每個bucket中去。這里的partition算法是可以自定義的,當然默認的算法是根據(jù)key哈希到不同的bucket中去。
  • 當Reducer啟動時,它會根據(jù)自己task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應(yīng)的bucket作為Reducer的輸入進行處理。

Spark中Shuffle類型

Hash Shuffle:

第一版是每個map產(chǎn)生r個文件,一共產(chǎn)生mr個文件,但是產(chǎn)生的中間文件太大影響擴展性。而后進行修改,讓一個core上的map共用文件,減少文件數(shù)目,這樣共產(chǎn)生core個文件,但中間文件數(shù)目仍隨任務(wù)數(shù)線性增加,仍然難以對應(yīng)大作業(yè)。

Sort Shuffle

每個map產(chǎn)生一個文件,同時也優(yōu)化了性能,減少網(wǎng)絡(luò)IO,徹底解決了擴展性問題。

本文只是對Shuffle作了初步的描述,了解基本概念

問題

今天遇到如下問題,特來了解一下。

17/02/06 11:50:21 ERROR Executor: Exception in task 0.0 in stage 857456.0 (TID 437542)
java.io.FileNotFoundException: /tmp/spark-be115c66-a319-4931-a2ca-81ae9e7a6198/executor-54de96d2-5256-4637-b474-4342b00e755a/blockmgr-0c1c3d9f-c5d7-4b1c-bc12-7773083fa181/18/shuffle_426055_0_0.data.5874ce88-94f5-4c34-b56a-f729d4d4e393 (No such file or directory)
     at java.io.FileOutputStream.open(Native Method)
     at java.io.FileOutputStream.<init>(FileOutputStream.java:212)
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.writePartitionedFile(BypassMergeSortShuffleWriter.java:182)
     at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:159)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:79)
     at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:47)
     at org.apache.spark.scheduler.Task.run(Task.scala:85)
     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
     at java.lang.Thread.run(Thread.java:722)

參考網(wǎng)上的解決方案:
一般造成此問題的是系統(tǒng)資源不夠用
修改啟動參數(shù):

  • 添加:--conf spark.shuffle.manager=SORT
    Spark默認的shuffle采用Hash模式,會產(chǎn)生相當規(guī)模的文件,與此同時帶來了大量的內(nèi)存開銷
  • 是因為一個excutor給分配的內(nèi)存不夠,此時,減少excutor-core的數(shù)量,加大excutor-memory的值應(yīng)該就沒有問題。

參考:
http://blog.jasonding.top/2015/07/14/Spark/【Spark】Spark的Shuffle機制/
http://www.itdecent.cn/p/c83bb237caa8
https://github.com/JerryLead/SparkInternals/blob/master/markdown/4-shuffleDetails.md

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容