Spark源碼解析(二):SparkContext內(nèi)部執(zhí)行流程

SparkContext內(nèi)部執(zhí)行的時(shí)序圖

對(duì)于這個(gè)時(shí)序圖的具體描述如下:

1.SparkSubmit在main()方法中執(zhí)行,然后根據(jù)提交的類(lèi)型調(diào)用相應(yīng)的方法,這里是"Submit",調(diào)用submit()方法,submit()里面進(jìn)行一些判斷后,

使用反射Class.forName(childMainClass, true, loader),然后調(diào)用invoke()方法來(lái)調(diào)用程序員自己寫(xiě)的類(lèi),我們這里是WordCount。

2.在WordCount類(lèi)中,main()方法里有調(diào)用SparkContext,SparkContext構(gòu)造器使用createSparkEnv()方法,

這個(gè)方法使用SparkEnv.createDriverEnv(conf, isLocal, listenerBus)方法創(chuàng)建SparkEnv對(duì)象;

在SparkEnv類(lèi),調(diào)用create()方法來(lái)進(jìn)行創(chuàng)建SparkEnv,在這個(gè)方法內(nèi)部,有一個(gè)

AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf, securityManager)的調(diào)用過(guò)程,

主要用來(lái)產(chǎn)生Akka中的ActorSystem以及得到綁定的端口號(hào)。

3.在創(chuàng)建SparkEnv對(duì)象后,SparkContext構(gòu)造器使用代碼SparkContext.createTaskScheduler(this, master)創(chuàng)建TaskScheduler對(duì)象,

這里根據(jù)實(shí)際的提交模式來(lái)進(jìn)行創(chuàng)建TaskScheduler對(duì)象,提交模式有:local、Mesos、Zookeeper、Simr、Spark,

這里模們主要分析Spark集群下的模式;然后還需要?jiǎng)?chuàng)建一個(gè)SparkDeploySchedulerBackend對(duì)象;

在創(chuàng)建TaskScheduler對(duì)象調(diào)用initialize()方法,這里選擇調(diào)度模式,主要有兩種模式,F(xiàn)IFO和FAIR,默認(rèn)的調(diào)度模式;

最后調(diào)用taskScheduler的start()方法,里面主要調(diào)用SparkDeploySchedulerBackend對(duì)象的start()方法,

首先調(diào)用父類(lèi)的start()方法產(chǎn)生一個(gè)用于和Executor通信的DriverActor對(duì)象,然后里面主要?jiǎng)?chuàng)建一個(gè)AppClient對(duì)象內(nèi)部有ClientActor類(lèi)對(duì)象,

用于Driver和Master進(jìn)行RPC通信。

SparkContext源碼分析流程

1.SparkSubmit半生對(duì)象的源碼

1.1SparkSubmit的main()函數(shù)在SparkSubmit半生對(duì)象的104行左右,這個(gè)是程序的主要入口:

接下來(lái)主要進(jìn)入submit()方法,下面是submit()方法

1.2SparkSubmit的submit()方法,代碼大約在142行左右, 這個(gè)方法的主要作用是根據(jù)不同的模式使用runMain()方法:

1.3SparkSubmit的runMain()方法,代碼大約在505行左右,這個(gè)方法主要的主要作用是通過(guò)反射獲取自定義類(lèi),這里我們主要的是WordCount,然后通過(guò)invoke方法調(diào)用main?這里是方法的重要代碼:

調(diào)用WordCount的main()方法后,接下來(lái)就要看SparkContext的內(nèi)部了。

2.SparkContext內(nèi)部源碼分析

很重要:SparkContext是Spark提交任務(wù)到集群的入口

我們看一下SparkContext的主構(gòu)造器

1.調(diào)用createSparkEnv方法創(chuàng)建SparkEnv,里面有一個(gè)非常重要的對(duì)象ActorSystem

2.創(chuàng)建TaskScheduler -> 根據(jù)提交任務(wù)的URL進(jìn)行匹配 -> TaskSchedulerImpl -> SparkDeploySchedulerBackend(里面有兩個(gè)Actor)

3.創(chuàng)建DAGScheduler

2.1創(chuàng)建SparkEnv獲取ActorSystem,代碼大約在275行左右,這一步的主要的作用是創(chuàng)建ActorSystem對(duì)象以后根據(jù)這個(gè)對(duì)象來(lái)創(chuàng)建相應(yīng)的Actor

主要調(diào)用SparkEnv類(lèi)的createDriverEnv()方法獲取SparkEnv對(duì)象,createDriverEnv()主要調(diào)用SparkEnv的create()方法,這里代碼大約

SparkEnv的154行,代碼具體如下:

createDriverEnv()內(nèi)部主要調(diào)用create()方法,代碼大約在202行,重要的代碼如下:

這個(gè)方法的主要作用是調(diào)用AkkaUtils這個(gè)工具類(lèi)創(chuàng)建ActorSystem。

2.2創(chuàng)建TaskScheduler,代碼大約在374行,重要的代碼如下:

這里調(diào)用createTaskScheduler()方法,這個(gè)類(lèi)主要的作用是根據(jù)提交的類(lèi)型創(chuàng)建相應(yīng)的TaskScheduler(),這里主要分析Spark集群下,主要的代碼如下:

這里進(jìn)行模式匹配,以上代碼大約在SparkContext的2159行,主要的作用是創(chuàng)建TaskSchedulerImpl對(duì)象,然后初始化調(diào)度器這里,需要看的是initialize(),主要的實(shí)現(xiàn)是

TaskSchedulerImpl類(lèi),這里我們將會(huì)深入TaskSchedulerImpl類(lèi)的initialize()方法,下面是該方法的實(shí)現(xiàn):

主要用于調(diào)度的模式,調(diào)度模式主要分為FIFO和FAIR。在進(jìn)行創(chuàng)建了TaskScheduler對(duì)象后,我們?cè)賮?lái)看一下主要的代碼:

上述代碼中,這里主要用于創(chuàng)建一個(gè)HeartbeatReceiver對(duì)象來(lái)進(jìn)行心跳,用于Executors和DriverActor的心跳。

然后創(chuàng)建DAGScheduler對(duì)象,這個(gè)對(duì)象的主要作用是用來(lái)劃分Stage。

2.3TaskScheduler進(jìn)行啟動(dòng),代碼大約在395行,重要的代碼如下:

由于這里是TaskScheduler的主要的實(shí)現(xiàn)類(lèi)是TaskScheduler是TaskSchedulerImpl類(lèi),我們要進(jìn)入的源碼是:

主要調(diào)用了SparkDeploySchedulerBackend的start()方法,接下來(lái)我們需要看SparkDeploySchedulerBackend內(nèi)部實(shí)現(xiàn)。

以下是SparkDeploySchedulerBackend的構(gòu)造器函數(shù),這個(gè)代碼大約在SparkDeploySchedulerBackend的45行重要的代碼如下:

從上面的代碼可以看出首先調(diào)用父類(lèi)(CoarseGrainedSchedulerBackend)的start()方法,然后對(duì)于一些重要的參數(shù)進(jìn)行封裝,這里最重要的參數(shù)是

CoarseGrainedExecutorBackend類(lèi),還有一些driverUrl和WORKER_URL等參數(shù)的封裝,將CoarseGrainedExecutorBackend

封裝成Command,這是一個(gè)樣例類(lèi),不知道樣例類(lèi)請(qǐng)點(diǎn)擊這里,將這個(gè)參數(shù)封裝成為一個(gè)

ApplicationDescription對(duì)象,創(chuàng)建一個(gè)AppClient對(duì)象,這個(gè)對(duì)象主要用于Driver和Master之間的通信,以下我們分析start()方法后再分析client.start()。

可以從上面的代碼看出,?這里主要?jiǎng)?chuàng)建一個(gè)DriverActor,這個(gè)Actor的主要的作用是Driver進(jìn)程和Executor進(jìn)程進(jìn)行RPC通信

在分析完以上的CoarseGrainedSchedulerBackend的start()方法后,這里主要進(jìn)行的源碼分析是client.start()方法這里創(chuàng)建一個(gè)ClientActor,準(zhǔn)確來(lái)說(shuō)是這個(gè)

ClientActor來(lái)和Master通信。

現(xiàn)在,這里就調(diào)用ClientActor的生命周期方法,對(duì)于Akka通信不了解的,請(qǐng)點(diǎn)擊這里進(jìn)行了解Actor的生命周期方法。

Akka的Actor的生命周期方法主要從preStart()方法開(kāi)始,這行代碼大約在67行左右,主要的內(nèi)容如下:

在preStart()方法中主要做的事情是向Master注冊(cè),registerWithMaster()的主要內(nèi)容是:

這個(gè)方法主要是向活著的Master進(jìn)行注冊(cè),將以前所有的參數(shù)封裝的appDescription發(fā)送給Master,然后啟動(dòng)定時(shí)調(diào)度器進(jìn)行和Master的注冊(cè),因?yàn)橛锌赡苓M(jìn)行多次通信。

Master收到通過(guò)樣例類(lèi)的模式匹配,對(duì)于Driver向Master注冊(cè)Application,主要的作用是持久化Driver傳遞的參數(shù)和進(jìn)行資源調(diào)度.

主要的代碼大約在Master類(lèi)的315行,主要的代碼如下:

這段代碼的意義是:持久化信息,告知ClientActor發(fā)送注冊(cè)成功的信息,然后適使用schedule()進(jìn)行資源的調(diào)度。

對(duì)于schedule()方法,代碼大約在533行,這里的主要作用是進(jìn)行資源調(diào)度,主要的是兩種資源調(diào)度的方法,一種是盡量打散的分配資源,還有一種是盡量集中

對(duì)于盡量打散的方式:將每個(gè)app分配到盡可能多的worker中執(zhí)行

App調(diào)度時(shí)會(huì)為app分配滿足條件的資源-----Worker(State是Alive,其上并沒(méi)有該Application的executor,可用內(nèi)存滿足要求(spark.executor.memory指定,默認(rèn)512),

核滿足要求(spark.cores.max, 最大可用core數(shù),若未指定,則為全部資源)),然后通知Woker啟動(dòng)Excutor. 及向AppClient發(fā)送ExecutorAdded消息。

進(jìn)行調(diào)度時(shí),調(diào)度程序會(huì)根據(jù)配制SpreadOutApps = spark.deploy.spreadOut情況決定資源分配方式。

1 從列表中取下一app,根據(jù)CPU情況找出合適的woker,按核從小到大排序

2 如果worker節(jié)點(diǎn)存在可以分配的core 則進(jìn)行預(yù)分配處理(輪循一次分一個(gè)直至滿足app需求),并在分配列表(assigned = ArrayInt)中計(jì)數(shù)。

3根據(jù)assinged列表中的預(yù)分配信息,進(jìn)行分配Executor(真實(shí)分配)

4 啟動(dòng)Executor并設(shè)置app.state = ApplicationState.RUNNING

盡情集中的方式: 將每個(gè)app分配到盡可能少的worker中執(zhí)行。?1 從可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)

2 遍歷waitingApps 找到滿足app運(yùn)行條件的app,進(jìn)行分配

3啟動(dòng)Executor(launchExecutor(w,e))并設(shè)置app.state = ApplicationState.RUNNING

其中:launchExcutor(worker, exec) 具體內(nèi)容如下:

向executor分配給worker

通知worker啟動(dòng)executor

由分配過(guò)程可知, 分配的Excutor個(gè)數(shù)與CPU核心數(shù)有關(guān)。當(dāng)指定完Worker節(jié)點(diǎn)后,會(huì)在Worker節(jié)點(diǎn)創(chuàng)建ExecutorRunner,并啟動(dòng),執(zhí)行App中的Command

去創(chuàng)建并啟動(dòng)CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend啟動(dòng)后,會(huì)首先通過(guò)傳入的driverUrl這個(gè)參數(shù)向

在CoarseGrainedSchedulerBackend::DriverActor(用于與Master通信,及調(diào)度任務(wù))發(fā)送RegisterExecutor(executorId, hostPort, cores),

DriverActor會(huì)創(chuàng)建executorData(executor信息)加入executorDataMap供后續(xù)task使用,并回復(fù)RegisteredExecutor,

此時(shí)CoarseGrainedExecutorBackend會(huì)創(chuàng)建一個(gè)org.apache.spark.executor.Executor。至此,Executor創(chuàng)建完畢。

Executor是直接用于task執(zhí)行, 是集群中的直接勞動(dòng)者。至此,資源分配結(jié)束。

百度腦圖關(guān)于作業(yè)提交以及SparkContext的示意圖



注意:這里的SparkContext和Master是兩個(gè)獨(dú)立的類(lèi),由于Baidu腦圖不能獨(dú)立劃分,所以看起來(lái)像父子類(lèi)關(guān)系。

總結(jié)

在SparkContext(這里基于Spark的版本是1.3.1)主要做的工作是:

1.創(chuàng)建SparkEnv,里面有一個(gè)很重要的對(duì)象ActorSystem

2.創(chuàng)建TaskScheduler,這里是根據(jù)提交的集群來(lái)創(chuàng)建相應(yīng)的TaskScheduler

3.對(duì)于TaskScheduler,主要的任務(wù)調(diào)度模式有FIFO和FAIR

4.在SparkContext中創(chuàng)建了兩個(gè)Actor,一個(gè)是DriverActor,這里主要用于Driver和Executor之間的通信;還有一個(gè)是ClientActor,主要用于Driver和Master之間的通信。

5.創(chuàng)建DAGScheduler,其實(shí)這個(gè)是用于Stage的劃分

6.調(diào)用taskScheduler.start()方法啟動(dòng),進(jìn)行資源調(diào)度,有兩種資源分配方法,一種是盡量打散;一種是盡量集中

7.Driver向Master注冊(cè),發(fā)送了一些信息,其中一個(gè)重要的類(lèi)是CoarseGrainedExecutorBackend,這個(gè)類(lèi)以后用于創(chuàng)建Executor進(jìn)程。


?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Apache Spark 是專(zhuān)為大規(guī)模數(shù)據(jù)處理而設(shè)計(jì)的快速通用的計(jì)算引擎。Spark是UC Berkeley AM...
    大佛愛(ài)讀書(shū)閱讀 2,975評(píng)論 0 20
  • 通過(guò)文章“Spark核心概念RDD”我們知道,Spark的核心是根據(jù)RDD來(lái)實(shí)現(xiàn)的,Spark Scheduler...
    尼小摩閱讀 717評(píng)論 1 9
  • 轉(zhuǎn)載:Spark的運(yùn)行架構(gòu)分析(一) 1:Spark的運(yùn)行模式 2:Spark中的一些名詞解釋 3:Spark的運(yùn)...
    小小少年Boy閱讀 546評(píng)論 0 1
  • 可達(dá)性跟便攜性: 拿美圖秀秀來(lái)說(shuō),針對(duì)的人群不同,所以對(duì)應(yīng)不同的需求,目標(biāo)用戶是不會(huì)ps的,就是可達(dá)性,目標(biāo)是會(huì)p...
    一條愛(ài)分享閱讀 879評(píng)論 3 3
  • 昨晚,和媽媽在網(wǎng)上聊天,她問(wèn)我有沒(méi)有在學(xué)校談男朋友,我回答沒(méi)有,媽媽說(shuō)還在等那個(gè)兵娃子嗎?我說(shuō)是啊。媽媽說(shuō)沒(méi)感覺(jué)...
    潼潼閱讀 197評(píng)論 0 1

友情鏈接更多精彩內(nèi)容