Spark內(nèi)核分析之Shuffle操作流程(非常重要)

????????如題,我們來分析一下spark的shuffle操作原理;為什么說其非常重要,是因?yàn)閟huffle操作是我們?cè)赟park調(diào)優(yōu)中非常重要的一環(huán),對(duì)shuffle進(jìn)行了優(yōu)化,往往可以使得我們的spark程序運(yùn)行效率有極大的提升。依照慣例,我們先來看一張圖;

普通shuffle流程圖

上圖是一個(gè)普通的Shuffle操作流程原理圖,一個(gè)shuffle操作由三個(gè)RDD算子構(gòu)成,分別是mapPartitionsRDD,ShuffleRDD,mapPartitionsRDD;如上圖所示,

1.每個(gè)ShuffleMapTask都會(huì)為每個(gè)ResultTask創(chuàng)建一個(gè)Bucket緩存和一個(gè)對(duì)應(yīng)的ShuffleBlockFile磁盤文件;2.每個(gè)ShuffleMapTask的輸出相關(guān)信息封裝成一個(gè)MapStatus發(fā)送到DAGScheduler的MapOutputTracker中去;

3.ResultTask開始拉取該任務(wù)所需要的數(shù)據(jù),ResultTask通過向DAGScheduler的MapOutputTracker獲取MapStatus的信息,從而知道自己需要的數(shù)據(jù)所在的位置,然后去相應(yīng)的位置拉去數(shù)據(jù)到該任務(wù)所在節(jié)點(diǎn)的內(nèi)存中,如果內(nèi)存不夠,會(huì)將部分?jǐn)?shù)據(jù)寫入磁盤,完成這系列的操作是由ShufflerRDD算子完成的;

4.然后ResultTask對(duì)拉取到的數(shù)據(jù)進(jìn)行聚合操作,最后生成mapPartitionsRDD算子;

想想上面的這個(gè)Shuffler流程會(huì)有什么問題?

我們來做一個(gè)假設(shè),如果有100個(gè)ShuffleMapTask,2個(gè)CPU Core,100個(gè)ResultTask,那么這個(gè)shuffler操作將產(chǎn)生10000個(gè)文件,如此多的文件對(duì)于Spark作業(yè)的性能就是一個(gè)災(zāi)難;針對(duì)這個(gè)問題當(dāng)然有對(duì)應(yīng)的優(yōu)化策略,接著我們來看另外一張圖;

優(yōu)化的Shuffler流程圖

????????通過優(yōu)化的Shuffler操作如上圖所示,假設(shè)有100個(gè)ShufflerMapTask,2CPU core,100個(gè)Resulttask,優(yōu)化后產(chǎn)生的中間文件是200個(gè),是優(yōu)化之前的1/50;那么這是如何做到的,通過閱讀源碼可以知道,只要引入consolidation機(jī)制就可以實(shí)現(xiàn)了,其配置是通過在SparkConf中配置對(duì)應(yīng)的參數(shù)即可實(shí)現(xiàn);

????????來簡(jiǎn)單分析一下:一個(gè)ShuffleMapTask將數(shù)據(jù)寫入本地不變,但是當(dāng)這一批ShuffleMapTask運(yùn)行完成以后,下一批ShuffleMapTask開始運(yùn)行(一批ShuffleMapTask是指,同一時(shí)間有兩個(gè)Task并行執(zhí)行,因?yàn)橛袃蓚€(gè)CPU Core),它們產(chǎn)生的數(shù)據(jù)會(huì)直接寫入上一批ShuffleMapTask產(chǎn)生的本地文件中;上圖中左邊的一組可以稱為一組ShuffleGroup,每個(gè)文件中都存儲(chǔ)了多個(gè)ShuffleMapTask的數(shù)據(jù),每個(gè)ShuffleMapTask所產(chǎn)生的數(shù)據(jù)是一個(gè)segment,每個(gè)File中通過索引,偏移量來標(biāo)記每部分?jǐn)?shù)據(jù)來自不同的ShuffleMapTask。

下面我們來看看源碼是如何實(shí)現(xiàn)的;

ShuffleMapTask的runTask

1.首先通過Spark全局變量得到shuffleManager對(duì)象,并通過shuffleManager對(duì)象獲得Write對(duì)象;

2.接著,通過rdd.iterator方法對(duì)屬于自己的partition進(jìn)行計(jì)算,最后會(huì)調(diào)用我們自己編寫的RDD算子來計(jì)算partition;

3.接著Writer調(diào)用自己的write方法將RDD算子計(jì)算的結(jié)果寫入緩存;

HashShuffleWriter的write

1.判斷aggregator為true,并且是否設(shè)置了map端的combine操作;若成立,則進(jìn)行map端的數(shù)據(jù)合并(這里是一個(gè)spark優(yōu)化點(diǎn),在我之前關(guān)于spark優(yōu)化系列文章中有寫過);

2.對(duì)所有經(jīng)過合并操作之后的數(shù)據(jù)遍歷,根據(jù)每個(gè)元素獲得對(duì)應(yīng)的bucketId,然后將改元素寫入對(duì)應(yīng)的bucket緩存中;

這里我們來看看這個(gè)shuffle對(duì)象做了什么?

FileShuffleBlockManager的forMapTask

1.首先創(chuàng)建出一個(gè)ShuffleWriterGroup對(duì)象;

2.接著判斷Spark作業(yè)是否設(shè)置了consolidateShuffleFiles;如果設(shè)置其為true,首先得到一個(gè)fileGroup對(duì)象,然后使用shuffleId,mapId,BucketId來得到一個(gè)blockId,接著根據(jù)這個(gè)blockId寫數(shù)據(jù)到磁盤的對(duì)象;相反,如果沒有設(shè)置consolidateShuffleFiles為true,則直接為每個(gè)shuffleMapTask創(chuàng)建一個(gè)blockFile,然后得到一個(gè)寫數(shù)據(jù)到磁盤的對(duì)象;

3.執(zhí)行完這里后,接著調(diào)用write方法將數(shù)據(jù)寫入內(nèi)存緩沖bucket,然后再將數(shù)據(jù)寫入磁盤;

寫數(shù)據(jù)到這里就完成了,然后會(huì)將產(chǎn)生的數(shù)據(jù)位置等信息封裝成一個(gè)MapStatus對(duì)象發(fā)送給DAGSchedule的MapOutputTracker中;接下來ResultTask開始讀取數(shù)據(jù);

ShuffleRDD的compute
HashShuffleReader的read


BlockStoreShuffleFetcher的fetch
BlockStoreShuffleFetcher的fetch

總結(jié):到此shuffle的整個(gè)操作流程就分析完了,接下來會(huì)分析底層數(shù)據(jù)存儲(chǔ)的核心組件BlockManager的工作原理,,歡迎關(guān)注。

如需轉(zhuǎn)載,請(qǐng)注明:

本篇:Spark內(nèi)核分析之Shuffle操作流程(非常重要)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容