本文要求讀者了解DAGScheduler如何劃分一個(gè)作業(yè)的stages。本文主要內(nèi)容是作者個(gè)人關(guān)于spark在提交多個(gè)作業(yè)時(shí),stage劃分的一些小思考。
假設(shè)我們有如下圖所示的rdd依賴圖:

spark-job-submit.png
NOTE:rdd3、rdd6、rdd5分別為job0、job1、job2的final rdd。我們以job0、job1、job2的順序依次提交這3個(gè)作業(yè),得到如圖所示的stage劃分。(或許你可以自己嘗試一下stage劃分,看看劃分的結(jié)果是不是和圖中一樣。)
關(guān)于這張圖,簡(jiǎn)單說(shuō)兩點(diǎn):
- 第一,圖中stage后面的數(shù)字表示這3個(gè)作業(yè)在spark上提交后的真實(shí)的stage id。但是,如果我們以job0、job2、job1的順序依次提交,則RDD1和RDD4的stage id并不可知(注意,我們所說(shuō)的可知是指告訴job提交的順序和rdd之間的依賴關(guān)系,能夠手動(dòng)劃分出和spark一樣的stage劃分,包括id順序)。這是因?yàn)?,spark在劃分stages時(shí),會(huì)先用一個(gè)HashSet來(lái)保存RDD的ShuffleDependency。如圖所示,RDD5有ShuffleDependency1和ShuffleDependency3。當(dāng)這兩個(gè)ShuffleDependency被存儲(chǔ)到HashSet中時(shí),則在通常情況下順序不可知(當(dāng)然,對(duì)于我們例子中的兩個(gè)shuffle,經(jīng)過(guò)hash結(jié)果的順序還是可知的)。這就導(dǎo)致之后創(chuàng)建的stage的id不可知。
- 第二,一般來(lái)說(shuō)在真實(shí)的集群環(huán)境中,shuffle時(shí)最耗時(shí)的。所以在我們提交了job0、job1、job2之后,理論上來(lái)說(shuō)最先執(zhí)行的3個(gè)stage依次會(huì)是:Stage0、Stage2、Stage4。假設(shè)此時(shí)它們正在運(yùn)行,且其它的stage還沒有開始運(yùn)行。則DAGScheduler的runningStages = (Stage0,Stage2,Stage4 )。且這三個(gè)stages又分別對(duì)應(yīng)3個(gè)TaskSet,比如TaskSet0, TaskSet1,TaskSet2。如果現(xiàn)在Stage0突然被abort了,那么,接下來(lái)執(zhí)行的是哪個(gè)stage呢?答案是Stage3。由于Job0和Job2依賴了Stage0,則Stage0的abort,會(huì)導(dǎo)致Job0和Job2被fail掉。而由于Job0和Job2的fail,又會(huì)導(dǎo)致正在running的Stage0和Stage4被fail。而Stage5和Stage1則再也沒有可執(zhí)行的機(jī)會(huì)了。對(duì)于Stage2,它被Job1和Job2共同依賴,雖然Job2 fail了,但Job2不會(huì)去fail Stage2,因?yàn)镾tage2還要被Job1所使用。
注:此文對(duì)stage abort的描述過(guò)于簡(jiǎn)單,具體abort過(guò)程請(qǐng)看我的另一篇文章《Spark之a(chǎn)bort stage》。