上次在做內(nèi)部培訓(xùn)的時候,我講了這么一句:
一個Job里的Stage都是串行的,前一個Stage完成后下一個Stage才會進(jìn)行。
顯然上面的話是不嚴(yán)謹(jǐn)?shù)摹?/p>
看如下的代碼:

這里的話,我們構(gòu)建了兩個輸入(input1,input2),input2帶有一個reduceByKey,所以會產(chǎn)生一次Shuffle,接著進(jìn)行Join,會產(chǎn)生第二次Shuffle(值得注意的是,join 不一定產(chǎn)生新的Stage,我通過強(qiáng)制變更join后的分區(qū)數(shù)讓其發(fā)生Shuffle ,然后進(jìn)行Stage的切分)。
所以這里一共有兩次Shuffle,產(chǎn)生了四個Stage。 下圖是Spark UI上呈現(xiàn)的。那這四個Stage的執(zhí)行順序是什么呢?

再次看Spark UI上的截圖:

我們仔細(xì)分析下我們看到現(xiàn)象:
首先我們看到 Stage0,Stage 1 是同時提交的。
Stage0 只有兩條記錄,并且設(shè)置了兩個Partition,所以一次性就能執(zhí)行完,也就是3s就完成了。
Stage1 有四個分區(qū),六條記錄,記錄數(shù)最多的分區(qū)是兩條,也就是需要執(zhí)行10秒,如果完全能并行執(zhí)行,也就是最多10s。但是這里消耗了13秒,為什么呢?點(diǎn)擊這個13秒進(jìn)去看看:

我們看到有兩個task 延遲了3秒后才并行執(zhí)行的。 根據(jù)上面的代碼,我們只有四顆核供Spark使用,Stage0 里的兩個任務(wù)因?yàn)檎谶\(yùn)行,所以Stage1 只能運(yùn)行兩個任務(wù),等Stage0 運(yùn)行完成后,Stage1剩下的兩個任務(wù)才接著運(yùn)行。
之后Stage2 是在Stage1 執(zhí)行完成之后才開始執(zhí)行,而Stage3是在Stage2 執(zhí)行完成才開始執(zhí)行。
現(xiàn)在我們可以得出結(jié)論了:
- Stage 可以并行執(zhí)行的
- 存在依賴的Stage 必須在依賴的Stage執(zhí)行完成后才能執(zhí)行下一個Stage
- Stage的并行度取決于資源數(shù)
我么也可以從源碼的角度解釋這個現(xiàn)象:

我們看到如果一個Stage有多個依賴,會深度便利,直到到了根節(jié)點(diǎn),如果有多個根節(jié)點(diǎn),都會通過submitMissingTasks 提交上去運(yùn)行。當(dāng)然Spark只是嘗試提交你的Tasks,能不能完全并行運(yùn)行取決于你的資源數(shù)了。
這里再貢獻(xiàn)一張畫了很久的示意圖,體現(xiàn)了partition,shuffle,stage,RDD,transformation,action,source 等多個概念。
