Spark運(yùn)行原理

前兩篇我們講了spark的基礎(chǔ)知識(shí),包括spark的體系結(jié)構(gòu)、執(zhí)行框架、spark的基本數(shù)據(jù)類(lèi)型以及spark中stage的劃分等等。本篇要介紹spark運(yùn)行的原理。包括spark的內(nèi)部執(zhí)行機(jī)制,spark的基本數(shù)據(jù)類(lèi)型RDD的執(zhí)行流程。

1. Spark內(nèi)部執(zhí)行機(jī)制

1.1 內(nèi)部執(zhí)行流程

spark的內(nèi)部執(zhí)行機(jī)制在《spark基礎(chǔ)?下篇》已有介紹,此處再簡(jiǎn)單介紹下。
  如下圖1為分布式集群上spark應(yīng)用程序的一般執(zhí)行框架。主要由sparkcontext(spark上下文)、cluster manager(資源管理器)和?executor(單個(gè)節(jié)點(diǎn)的執(zhí)行進(jìn)程)。其中cluster manager負(fù)責(zé)整個(gè)集群的統(tǒng)一資源管理。executor是應(yīng)用執(zhí)行的主要進(jìn)程,內(nèi)部含有多個(gè)task線程以及內(nèi)存空間。

圖1 spark分布式部署圖

  詳細(xì)流程圖如下圖2:
圖2 詳細(xì)流程圖

  • (1) 應(yīng)用程序在使用spark-submit提交后,根據(jù)提交時(shí)的參數(shù)設(shè)置(deploy mode)在相應(yīng)位置初始化sparkcontext,即spark的運(yùn)行環(huán)境,并創(chuàng)建DAG Scheduler和Task Scheduer,Driver根據(jù)應(yīng)用程序執(zhí)行代碼,將整個(gè)程序根據(jù)action算子劃分成多個(gè)job,每個(gè)job內(nèi)部構(gòu)建DAG圖,DAG Scheduler將DAG圖劃分為多個(gè)stage,同時(shí)每個(gè)stage內(nèi)部劃分為多個(gè)task,DAG Scheduler將taskset傳給Task Scheduer,Task Scheduer負(fù)責(zé)集群上task的調(diào)度。至于stage和task的關(guān)系以及是如何劃分的我們后面再詳細(xì)講。
  • (2) Driver根據(jù)sparkcontext中的資源需求向resource manager申請(qǐng)資源,包括executor數(shù)及內(nèi)存資源。
  • (3) 資源管理器收到請(qǐng)求后在滿足條件的work node節(jié)點(diǎn)上創(chuàng)建executor進(jìn)程。
  • (4) Executor創(chuàng)建完成后會(huì)向driver反向注冊(cè),以便driver可以分配task給他執(zhí)行。
  • (5) 當(dāng)程序執(zhí)行完后,driver向resource manager注銷(xiāo)所申請(qǐng)的資源。

1.2 job、stage、task的關(guān)系

Job、stage和task是spark任務(wù)執(zhí)行流程中的三個(gè)基本單位。其中job是最大的單位,Job是spark應(yīng)用的action算子催生的;stage是由job拆分,在單個(gè)job內(nèi)是根據(jù)shuffle算子來(lái)拆分stage的,單個(gè)stage內(nèi)部可根據(jù)操作數(shù)據(jù)的分區(qū)數(shù)劃分成多個(gè)task。如下圖3所示


圖3 job、stage和task的關(guān)系圖

2. RDD 的執(zhí)行流程

上一節(jié)我們介紹了spark應(yīng)用程序的大概執(zhí)行流程,由于spark應(yīng)用程序中的數(shù)據(jù)塊基本都是RDD,本節(jié)我們來(lái)看下應(yīng)用程序中RDD的執(zhí)行流程。

2.1 RDD 從創(chuàng)建到執(zhí)行

RDD從創(chuàng)建到執(zhí)行的流程如下圖4所示


圖4 RDD執(zhí)行流程
  • (1) 首先針對(duì)一段應(yīng)用代碼,driver會(huì)以action算子為邊界生成響應(yīng)的DAG圖
  • (2) DAG Scheduler從DAG圖的末端開(kāi)始,以圖中的shuffle算子為邊界來(lái)劃分stage,stage劃分完成后,將每個(gè)stage劃分為多個(gè)task,DAG Scheduler將taskSet傳給Task Scheduler來(lái)調(diào)用
  • (3) Task Scheduler根據(jù)一定的調(diào)度算法,將接收到的task池中的task分給work node節(jié)點(diǎn)中的executor執(zhí)行
    這里我們看到RDD的執(zhí)行流程中,DAG Scheduler和Task Scheduler起到非常關(guān)鍵的作用個(gè),因此下面我們來(lái)看下DAG Scheduer和Task Scheduler的工作流程。

2.2 DAG Scheduler工作流程

DAG Scheduler是一個(gè)高級(jí)的scheduler 層,他實(shí)現(xiàn)了基于stage的調(diào)度,他為每一個(gè)job劃分stage,并將單個(gè)stage分成多個(gè)task,然后他會(huì)將stage作為taskSet提交給底層的Task Scheduler,由Task Scheduler執(zhí)行。
DAG的工作原理如下圖5:


圖5 DAG Scheduler工作流程

  針對(duì)左邊的一段代碼,DAG Scheduler根據(jù)collect(action算子)將其劃分到一個(gè)job中,在此job內(nèi)部,劃分stage,如上右圖所示。DAG Scheduler在DAG圖中從末端開(kāi)始查找shuffle算子,上圖中將reduceByKey為stage的分界,shuffle算子只有一個(gè),因此分成兩個(gè)stage。前一個(gè)stage中,RDD在map完成以后執(zhí)行shuffle write將結(jié)果寫(xiě)到內(nèi)存或磁盤(pán)上,后一個(gè)stage首先執(zhí)行shuffle read讀取數(shù)據(jù)在執(zhí)行reduceByKey,即shuffle操作。

2.3 TASK Scheduler工作流程

Task Scheduler是sparkContext中除了DAG Scheduler的另一個(gè)非常重要的調(diào)度器,task Scheduler負(fù)責(zé)將DAGS cheduer產(chǎn)生的task調(diào)度到executor中執(zhí)行。如下圖6所示,Task Scheduler 負(fù)責(zé)將TaskSetPool中的task調(diào)度到executor中執(zhí)行,一般的調(diào)度模式是FIFO(先進(jìn)先出),也可以按照FAIR(公平調(diào)度)的調(diào)度模式,具體根據(jù)配置而定。其中FIFO:顧名思義是先進(jìn)先出隊(duì)列的調(diào)度模式,而FAIR則是根據(jù)權(quán)重來(lái)判斷,權(quán)重可以根據(jù)資源的占用率來(lái)分,如可設(shè)占用較少資源的task的權(quán)重較高。這樣就可以在資源較少時(shí),調(diào)用后來(lái)的權(quán)重較高的task先執(zhí)行了。至于每個(gè)executor中同時(shí)執(zhí)行的task數(shù)則是由分配給每個(gè)executor中cpu的核數(shù)決定的。


圖6 TaskScheduler的工作流程
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容