????????接著上一篇,我們接著來分析下一個非常重要的組建DAGScheduler的運行原理是怎么實現(xiàn)的;通過之前對Spark的分析講解,我們的Spark作業(yè)是在遇到一個action算子以后并以此為界限,劃分出一個Job出來,也就是在這個時候,Spark作業(yè)向集群提交一個Job任務(wù);下面我們看看源碼是如何實現(xiàn)的;

????????通過在任何一個action操作的算子中追蹤發(fā)現(xiàn),最終提交一個Job是調(diào)用了SparkContext的runJob方法實現(xiàn)的,在該方法中通過dagSchedualer.runJob()正式向集群提交一個Job任務(wù),接下來重點來了,我們來看看DAGScheduler是如何對一個Job進(jìn)行stage劃分的;

這里通過eventProcessLoop對象將Job進(jìn)行提交,下面我們看看在eventProcessLoop中具體發(fā)生了什么;



1.首先,創(chuàng)建出與partition數(shù)量相等的task;
2.由觸發(fā)Job提交的那個RDD算子作為作為起點,創(chuàng)建第一個stage并命名為finalStage;
3.對于if條件成立的內(nèi)容,是針對于本地模式運行的,我們主要來分析一下集群模式下的工作模式,在else邏輯中,我們可以看到調(diào)用了submitStage的方法,該方法就是實現(xiàn)stage劃分的重要實現(xiàn);


1.在該方法中我們調(diào)用了getMissingParentStages()方法,并將其RDD壓入一個棧中;
2.在這個方法中,首先彈棧獲得棧頂?shù)腞DD,并使用循環(huán)反復(fù)調(diào)用當(dāng)前RDD所依賴的父RDD,并判斷其父RDD是寬依賴還是窄依賴;
3.如果是寬依賴,則創(chuàng)建一個新的stage,并將其加入到missingStage緩存中;如果是窄依賴的話,則將當(dāng)前的RDD在壓入棧中;
4.如此往復(fù),直到一個stage遍歷完成;
5.運行完以上動作之后,接著使用遞歸操作,重復(fù)調(diào)用submitStage()方法,直到?jīng)]有父Stage的時候,即方法返回結(jié)果為Nil的時候,開始調(diào)用submitMissingTask將一個stage(即一個Taskset)提交給TaskScheduler去;
總結(jié):至此,我們的DAGScheduler的stage劃分算法基本上就介紹完了,下篇文章我們來接著介紹當(dāng)一個Taskset被提交給TaskScheduler后,TaskScheduler是如何對一個Taskset集合中的每個Task進(jìn)行合理分配的,即我們的Task分配算法是如何實現(xiàn)的,歡迎關(guān)注。
如需轉(zhuǎn)載,請注明: