前兩篇我們講了spark的基礎(chǔ)知識(shí),包括spark的體系結(jié)構(gòu)、執(zhí)行框架、spark的基本數(shù)據(jù)類(lèi)型以及spark中stage的劃分等等。本篇要介紹spark運(yùn)行的原理。包括spark的內(nèi)部執(zhí)行機(jī)制,spark的基本數(shù)據(jù)類(lèi)型RDD的執(zhí)行流程。
1. Spark內(nèi)部執(zhí)行機(jī)制
1.1 內(nèi)部執(zhí)行流程
spark的內(nèi)部執(zhí)行機(jī)制在《spark基礎(chǔ)?下篇》已有介紹,此處再簡(jiǎn)單介紹下。
如下圖1為分布式集群上spark應(yīng)用程序的一般執(zhí)行框架。主要由sparkcontext(spark上下文)、cluster manager(資源管理器)和?executor(單個(gè)節(jié)點(diǎn)的執(zhí)行進(jìn)程)。其中cluster manager負(fù)責(zé)整個(gè)集群的統(tǒng)一資源管理。executor是應(yīng)用執(zhí)行的主要進(jìn)程,內(nèi)部含有多個(gè)task線程以及內(nèi)存空間。

詳細(xì)流程圖如下圖2:

- (1) 應(yīng)用程序在使用spark-submit提交后,根據(jù)提交時(shí)的參數(shù)設(shè)置(deploy mode)在相應(yīng)位置初始化sparkcontext,即spark的運(yùn)行環(huán)境,并創(chuàng)建DAG Scheduler和Task Scheduer,Driver根據(jù)應(yīng)用程序執(zhí)行代碼,將整個(gè)程序根據(jù)action算子劃分成多個(gè)job,每個(gè)job內(nèi)部構(gòu)建DAG圖,DAG Scheduler將DAG圖劃分為多個(gè)stage,同時(shí)每個(gè)stage內(nèi)部劃分為多個(gè)task,DAG Scheduler將taskset傳給Task Scheduer,Task Scheduer負(fù)責(zé)集群上task的調(diào)度。至于stage和task的關(guān)系以及是如何劃分的我們后面再詳細(xì)講。
- (2) Driver根據(jù)sparkcontext中的資源需求向resource manager申請(qǐng)資源,包括executor數(shù)及內(nèi)存資源。
- (3) 資源管理器收到請(qǐng)求后在滿足條件的work node節(jié)點(diǎn)上創(chuàng)建executor進(jìn)程。
- (4) Executor創(chuàng)建完成后會(huì)向driver反向注冊(cè),以便driver可以分配task給他執(zhí)行。
- (5) 當(dāng)程序執(zhí)行完后,driver向resource manager注銷(xiāo)所申請(qǐng)的資源。
1.2 job、stage、task的關(guān)系
Job、stage和task是spark任務(wù)執(zhí)行流程中的三個(gè)基本單位。其中job是最大的單位,Job是spark應(yīng)用的action算子催生的;stage是由job拆分,在單個(gè)job內(nèi)是根據(jù)shuffle算子來(lái)拆分stage的,單個(gè)stage內(nèi)部可根據(jù)操作數(shù)據(jù)的分區(qū)數(shù)劃分成多個(gè)task。如下圖3所示

2. RDD 的執(zhí)行流程
上一節(jié)我們介紹了spark應(yīng)用程序的大概執(zhí)行流程,由于spark應(yīng)用程序中的數(shù)據(jù)塊基本都是RDD,本節(jié)我們來(lái)看下應(yīng)用程序中RDD的執(zhí)行流程。
2.1 RDD 從創(chuàng)建到執(zhí)行
RDD從創(chuàng)建到執(zhí)行的流程如下圖4所示

- (1) 首先針對(duì)一段應(yīng)用代碼,driver會(huì)以action算子為邊界生成響應(yīng)的DAG圖
- (2) DAG Scheduler從DAG圖的末端開(kāi)始,以圖中的shuffle算子為邊界來(lái)劃分stage,stage劃分完成后,將每個(gè)stage劃分為多個(gè)task,DAG Scheduler將taskSet傳給Task Scheduler來(lái)調(diào)用
- (3) Task Scheduler根據(jù)一定的調(diào)度算法,將接收到的task池中的task分給work node節(jié)點(diǎn)中的executor執(zhí)行
這里我們看到RDD的執(zhí)行流程中,DAG Scheduler和Task Scheduler起到非常關(guān)鍵的作用個(gè),因此下面我們來(lái)看下DAG Scheduer和Task Scheduler的工作流程。
2.2 DAG Scheduler工作流程
DAG Scheduler是一個(gè)高級(jí)的scheduler 層,他實(shí)現(xiàn)了基于stage的調(diào)度,他為每一個(gè)job劃分stage,并將單個(gè)stage分成多個(gè)task,然后他會(huì)將stage作為taskSet提交給底層的Task Scheduler,由Task Scheduler執(zhí)行。
DAG的工作原理如下圖5:

針對(duì)左邊的一段代碼,DAG Scheduler根據(jù)collect(action算子)將其劃分到一個(gè)job中,在此job內(nèi)部,劃分stage,如上右圖所示。DAG Scheduler在DAG圖中從末端開(kāi)始查找shuffle算子,上圖中將reduceByKey為stage的分界,shuffle算子只有一個(gè),因此分成兩個(gè)stage。前一個(gè)stage中,RDD在map完成以后執(zhí)行shuffle write將結(jié)果寫(xiě)到內(nèi)存或磁盤(pán)上,后一個(gè)stage首先執(zhí)行shuffle read讀取數(shù)據(jù)在執(zhí)行reduceByKey,即shuffle操作。
2.3 TASK Scheduler工作流程
Task Scheduler是sparkContext中除了DAG Scheduler的另一個(gè)非常重要的調(diào)度器,task Scheduler負(fù)責(zé)將DAGS cheduer產(chǎn)生的task調(diào)度到executor中執(zhí)行。如下圖6所示,Task Scheduler 負(fù)責(zé)將TaskSetPool中的task調(diào)度到executor中執(zhí)行,一般的調(diào)度模式是FIFO(先進(jìn)先出),也可以按照FAIR(公平調(diào)度)的調(diào)度模式,具體根據(jù)配置而定。其中FIFO:顧名思義是先進(jìn)先出隊(duì)列的調(diào)度模式,而FAIR則是根據(jù)權(quán)重來(lái)判斷,權(quán)重可以根據(jù)資源的占用率來(lái)分,如可設(shè)占用較少資源的task的權(quán)重較高。這樣就可以在資源較少時(shí),調(diào)用后來(lái)的權(quán)重較高的task先執(zhí)行了。至于每個(gè)executor中同時(shí)執(zhí)行的task數(shù)則是由分配給每個(gè)executor中cpu的核數(shù)決定的。
