上次在做內(nèi)部培訓(xùn)的時(shí)候,我講了這么一句:
一個(gè)Job里的Stage都是串行的,前一個(gè)Stage完成后下一個(gè)Stage才會(huì)進(jìn)行。
顯然上面的話是不嚴(yán)謹(jǐn)?shù)摹?/p>
看如下的代碼:

這里的話,我們構(gòu)建了兩個(gè)輸入(input1,input2),input2帶有一個(gè)reduceByKey,所以會(huì)產(chǎn)生一次Shuffle,接著進(jìn)行Join,會(huì)產(chǎn)生第二次Shuffle(值得注意的是,join 不一定產(chǎn)生新的Stage,我通過強(qiáng)制變更join后的分區(qū)數(shù)讓其發(fā)生Shuffle ,然后進(jìn)行Stage的切分)。
所以這里一共有兩次Shuffle,產(chǎn)生了四個(gè)Stage。 下圖是Spark UI上呈現(xiàn)的。那這四個(gè)Stage的執(zhí)行順序是什么呢?

再次看Spark UI上的截圖:

我們仔細(xì)分析下我們看到現(xiàn)象:
首先我們看到 Stage0,Stage 1 是同時(shí)提交的。
Stage0 只有兩條記錄,并且設(shè)置了兩個(gè)Partition,所以一次性就能執(zhí)行完,也就是3s就完成了。
Stage1 有四個(gè)分區(qū),六條記錄,記錄數(shù)最多的分區(qū)是兩條,也就是需要執(zhí)行10秒,如果完全能并行執(zhí)行,也就是最多10s。但是這里消耗了13秒,為什么呢?點(diǎn)擊這個(gè)13秒進(jìn)去看看:

我們看到有兩個(gè)task 延遲了3秒后才并行執(zhí)行的。 根據(jù)上面的代碼,我們只有四顆核供Spark使用,Stage0 里的兩個(gè)任務(wù)因?yàn)檎谶\(yùn)行,所以Stage1 只能運(yùn)行兩個(gè)任務(wù),等Stage0 運(yùn)行完成后,Stage1剩下的兩個(gè)任務(wù)才接著運(yùn)行。
之后Stage2 是在Stage1 執(zhí)行完成之后才開始執(zhí)行,而Stage3是在Stage2 執(zhí)行完成才開始執(zhí)行。
現(xiàn)在我們可以得出結(jié)論了:
- Stage 可以并行執(zhí)行的
- 存在依賴的Stage 必須在依賴的Stage執(zhí)行完成后才能執(zhí)行下一個(gè)Stage
- Stage的并行度取決于資源數(shù)
我么也可以從源碼的角度解釋這個(gè)現(xiàn)象:

我們看到如果一個(gè)Stage有多個(gè)依賴,會(huì)深度便利,直到到了根節(jié)點(diǎn),如果有多個(gè)根節(jié)點(diǎn),都會(huì)通過submitMissingTasks 提交上去運(yùn)行。當(dāng)然Spark只是嘗試提交你的Tasks,能不能完全并行運(yùn)行取決于你的資源數(shù)了。
這里再貢獻(xiàn)一張畫了很久的示意圖,體現(xiàn)了partition,shuffle,stage,RDD,transformation,action,source 等多個(gè)概念。
