Spark 相關(guān)概念介紹、架構(gòu)原理、作業(yè)執(zhí)行流程簡介

這篇文章主要介紹 spark 的相關(guān)名詞概念,和作業(yè)的執(zhí)行流程,任務(wù)分配,希望通過這篇文章可以幫助大家對 spark 有一個(gè)更深層次的的理解。

名詞解釋:

1. Standalone模式下存在的角色。
Client:客戶端進(jìn)程,負(fù)責(zé)提交作業(yè)到Master。
Master:Standalone模式中主控節(jié)點(diǎn),負(fù)責(zé)接收Client提交的作業(yè),管理Worker,并命令Worker啟動Driver和Executor。
Worker:Standalone模式中slave節(jié)點(diǎn)上的守護(hù)進(jìn)程,負(fù)責(zé)管理本節(jié)點(diǎn)的資源,定期向Master匯報(bào)心跳,接收Master的命令,啟動Driver和Executor。
Driver: 一個(gè)Spark作業(yè)運(yùn)行時(shí)包括一個(gè)Driver進(jìn)程,也是作業(yè)的主進(jìn)程,負(fù)責(zé)作業(yè)的解析、生成Stage并調(diào)度Task到Executor上。包括DAGScheduler,TaskScheduler。
Executor:即真正執(zhí)行作業(yè)的地方,一個(gè)集群一般包含多個(gè)Executor,每個(gè)Executor接收Driver的命令Launch Task,一個(gè)Executor可以執(zhí)行一到多個(gè)Task。

2.作業(yè)相關(guān)的名詞解釋

Job: 在用戶程序中, 每次調(diào)用Action函數(shù)都會產(chǎn)生一個(gè)新的job, 也就是說一個(gè)Action都會生成一個(gè)job.

Stage:一個(gè)Spark作業(yè)一般包含一到多個(gè)Stage。
Task:一個(gè)Stage包含一到多個(gè)Task,通過多個(gè)Task實(shí)現(xiàn)并行運(yùn)行的功能。
DAGScheduler: 實(shí)現(xiàn)將Spark作業(yè)分解成一到多個(gè)Stage,每個(gè)Stage根據(jù)RDD的Partition個(gè)數(shù)決定Task的個(gè)數(shù),然后生成相應(yīng)的Task set放到TaskScheduler中。

TaskScheduler:實(shí)現(xiàn)Task分配到Executor上執(zhí)行。

3、spark 根據(jù) job 劃分 stage 的過程
構(gòu)建RDD之間的依賴關(guān)系. 具體來說依賴有寬窄之分, 如果子RDD中的每個(gè)分區(qū)依賴常數(shù)個(gè)父RDD中的分區(qū), 我們把這種依賴叫做窄依賴; 如果子RDD中的每個(gè)數(shù)據(jù)分片依賴父RDD的所有分片, 我們把這種依賴叫做寬依賴.

 在這兒我們在引入一個(gè)新的詞匯lineage

, 在spark中每個(gè)RDD都攜帶自己的lineage. 而lineage就是通過RDD之間的依賴來表示的.



wide-narrow-dependency

我們通過這幅圖可以大概看一下寬窄依賴到底是這么回事. 圖中矩形框圍住的部分是RDD, 實(shí)心小矩形是Partition.
接下來我們看一下Spark是如何構(gòu)建DAG的. 當(dāng)用戶調(diào)用Action函數(shù)時(shí), 調(diào)度器會逆向的遍歷該RDD的lineage, 每個(gè)stage會嘗試盡可能多包含那些連續(xù)的窄依賴. 如果當(dāng)前的Stage向上回溯的過程中遇到了寬依賴, 則當(dāng)前Stage結(jié)束, 一個(gè)新的Stage被構(gòu)建. 第二個(gè)Stage是第一個(gè)Stage的parent. 還有一種情況也會結(jié)束當(dāng)前Stage, 那就是那個(gè)partition已經(jīng)被計(jì)算出來, 換存在內(nèi)存中, 這種情況下我們就不必作多余的計(jì)算了.

 簡單的說就是遇到寬依賴, 就生成新的Stage. 寬依賴會觸發(fā)shuffle. 我們來看上邊代碼的visit函數(shù): 拿到RDD的所有的dependency, 如果是窄依賴那么繼續(xù)查找依賴的RDD的parent; 如果是寬依賴, 則調(diào)用getShuffleMapStage把生成的Stage加到當(dāng)前stage的parents中. 該函數(shù)執(zhí)行完畢, 則整個(gè)DAG就構(gòu)建完成.

兩種方式的作業(yè)運(yùn)行原理

Driver運(yùn)行在Worker上
通過org.apache.spark.deploy.Client類執(zhí)行作業(yè),作業(yè)運(yùn)行命令如下:
./bin/spark-class org.apache.spark.deploy.Client launch spark://host:port file:///jar_url org.apache.spark.examples.SparkPi spark://host:port
作業(yè)執(zhí)行流如圖1所示。


作業(yè)執(zhí)行流程描述:

  1. 客戶端提交作業(yè)給Master

  2. Master讓一個(gè)Worker啟動Driver,即SchedulerBackend。Worker創(chuàng)建一個(gè)DriverRunner線程,DriverRunner啟動SchedulerBackend進(jìn)程。

  3. 另外Master還會讓其余Worker啟動Exeuctor,即ExecutorBackend。Worker創(chuàng)建一個(gè)ExecutorRunner線程,ExecutorRunner會啟動ExecutorBackend進(jìn)程。

  4. ExecutorBackend啟動后會向Driver的SchedulerBackend注冊。SchedulerBackend進(jìn)程中包含DAGScheduler,它會根據(jù)用戶程序,生成執(zhí)行計(jì)劃,并調(diào)度執(zhí)行。對于每個(gè)stage的task,都會被存放到TaskScheduler中,ExecutorBackend向SchedulerBackend匯報(bào)的時(shí)候把TaskScheduler中的task調(diào)度到ExecutorBackend執(zhí)行。

  5. 所有stage都完成后作業(yè)結(jié)束。

Driver運(yùn)行在客戶端
直接執(zhí)行Spark作業(yè),作業(yè)運(yùn)行命令如下(示例):
./bin/run-example org.apache.spark.examples.SparkPi spark://host:port
作業(yè)執(zhí)行流如圖2所示。


圖2
作業(yè)執(zhí)行流程描述:

  1. 客戶端啟動后直接運(yùn)行用戶程序,啟動Driver相關(guān)的工作:DAGScheduler和BlockManagerMaster等。

  2. 客戶端的Driver向Master注冊。

  3. Master還會讓W(xué)orker啟動Exeuctor。Worker創(chuàng)建一個(gè)ExecutorRunner線程,ExecutorRunner會啟動ExecutorBackend進(jìn)程。

  4. ExecutorBackend啟動后會向Driver的SchedulerBackend注冊。Driver的DAGScheduler解析作業(yè)并生成相應(yīng)的Stage,每個(gè)Stage包含的Task通過TaskScheduler分配給Executor執(zhí)行。

  5. 所有stage都完成后作業(yè)結(jié)束。
    基于Yarn的Spark架構(gòu)與作業(yè)執(zhí)行流程


    這里Spark AppMaster相當(dāng)于Standalone模式下的SchedulerBackend,Executor相當(dāng)于standalone的ExecutorBackend,spark AppMaster中包括DAGScheduler和YarnClusterScheduler。
    Spark on Yarn的執(zhí)行流程可以參考http://www.csdn.net/article/2013-12-04/2817706--YARN spark on Yarn部分。
    這里主要介紹一下Spark ApplicationMaster的主要工作。代碼參考Apache Spark 0.9.0版本ApplicationMaster.scala中的run()方法。
    步驟如下:

  6. 設(shè)置環(huán)境變量spark.local.dir和spark.ui.port。NodeManager啟動ApplicationMaster的時(shí)候會傳遞LOCAL_DIRS(YARN_LOCAL_DIRS)變量,這個(gè)變量會被設(shè)置為spark.local.dir的值。后續(xù)臨時(shí)文件會存放在此目錄下。

  7. 獲取NodeManager傳遞給ApplicationMaster的appAttemptId。

  8. 創(chuàng)建AMRMClient,即ApplicationMaster與ResourceManager的通信連接。

  9. 啟動用戶程序,startUserClass(),使用一個(gè)線程通過發(fā)射調(diào)用用戶程序的main方法。這時(shí)候,用戶程序中會初始化SparkContext,它包含DAGScheduler和TaskScheduler。

  10. 向ResourceManager注冊。

  11. 向ResourceManager申請containers,它根據(jù)輸入數(shù)據(jù)和請求的資源,調(diào)度Executor到相應(yīng)的NodeManager上,這里的調(diào)度算法會考慮輸入數(shù)據(jù)的locality。

文章如有錯(cuò)漏還望多批評指正。

文章參考鏈接:
Spark Job執(zhí)行流程源碼解析
Spark架構(gòu)與作業(yè)執(zhí)行流程簡介

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容