Spark 運(yùn)行基本流程

[TOC]

一、Spark中的基本概念

(1)Application:表示你的應(yīng)用程序

(2)Driver:表示main()函數(shù),創(chuàng)建SparkContext。由SparkContext負(fù)責(zé)與ClusterManager通信,進(jìn)行資源的申請(qǐng),任務(wù)的分配和監(jiān)控等。程序執(zhí)行完畢后關(guān)閉SparkContext

(3)Executor:某個(gè)Application運(yùn)行在Worker節(jié)點(diǎn)上的一個(gè)進(jìn)程,該進(jìn)程負(fù)責(zé)運(yùn)行某些task,并且負(fù)責(zé)將數(shù)據(jù)存在內(nèi)存或者磁盤(pán)上。在Spark on Yarn模式下,其進(jìn)程名稱為 CoarseGrainedExecutor Backend,一個(gè)CoarseGrainedExecutor Backend進(jìn)程有且僅有一個(gè)executor對(duì)象,它負(fù)責(zé)將Task包裝成taskRunner,并從線程池中抽取出一個(gè)空閑線程運(yùn)行Task,這樣,每個(gè)CoarseGrainedExecutorBackend能并行運(yùn)行Task的數(shù)據(jù)就取決于分配給它的CPU的個(gè)數(shù)。

(4)Worker:集群中可以運(yùn)行Application代碼的節(jié)點(diǎn)。在Standalone模式中指的是通過(guò)slave文件配置的worker節(jié)點(diǎn),在Spark on Yarn模式中指的就是NodeManager節(jié)點(diǎn)。

(5)Task:在Executor進(jìn)程中執(zhí)行任務(wù)的工作單元,多個(gè)Task組成一個(gè)Stage

(6)Job:包含多個(gè)Task組成的并行計(jì)算,是由Action行為觸發(fā)的

(7)Stage:每個(gè)Job會(huì)被拆分很多組Task,作為一個(gè)TaskSet,其名稱為Stage

(8)DAGScheduler:根據(jù)Job構(gòu)建基于Stage的DAG,并提交Stage給TaskScheduler,其劃分Stage的依據(jù)是RDD之間的依賴關(guān)系

(9)TaskScheduler:將TaskSet提交給Worker(集群)運(yùn)行,每個(gè)Executor運(yùn)行什么Task就是在此處分配的。

image.png

二、Spark的運(yùn)行流程

2.1 Spark的基本運(yùn)行流程

(1)構(gòu)建Spark Application的運(yùn)行環(huán)境(啟動(dòng)SparkContext),SparkContext向資源管理器(可以是Standalone、Mesos或YARN)注冊(cè)并申請(qǐng)運(yùn)行Executor資源;
(2)資源管理器分配Executor資源并啟動(dòng)StandaloneExecutorBackend,Executor運(yùn)行情況將隨著心跳發(fā)送到資源管理器上;
(3)SparkContext構(gòu)建成DAG圖,將DAG圖分解成Stage,并把Taskset發(fā)送給Task Scheduler。Executor向SparkContext申請(qǐng)Task
(4)Task Scheduler將Task發(fā)放給Executor運(yùn)行同時(shí)SparkContext將應(yīng)用程序代碼發(fā)放給Executor。

(5)Task在Executor上運(yùn)行,運(yùn)行完畢釋放所有資源。

image.png
3、Spark運(yùn)行架構(gòu)特點(diǎn)

(1)每個(gè)Application獲取專(zhuān)屬的executor進(jìn)程,該進(jìn)程在Application期間一直駐留,并以多線程方式運(yùn)行tasks。這種Application隔離機(jī)制有其優(yōu)勢(shì)的,無(wú)論是從調(diào)度角度看(每個(gè)Driver調(diào)度它自己的任務(wù)),還是從運(yùn)行角度看(來(lái)自不同Application的Task運(yùn)行在不同的JVM中)。當(dāng)然,這也意味著Spark Application不能跨應(yīng)用程序共享數(shù)據(jù),除非將數(shù)據(jù)寫(xiě)入到外部存儲(chǔ)系統(tǒng)。

(2)Spark與資源管理器無(wú)關(guān),只要能夠獲取executor進(jìn)程,并能保持相互通信就可以了。

(3)提交SparkContext的Client應(yīng)該靠近Worker節(jié)點(diǎn)(運(yùn)行Executor的節(jié)點(diǎn)),最好是在同一個(gè)Rack里,因?yàn)镾park Application運(yùn)行過(guò)程中SparkContext和Executor之間有大量的信息交換;如果想在遠(yuǎn)程集群中運(yùn)行,最好使用RPC將SparkContext提交給集群,不要遠(yuǎn)離Worker運(yùn)行SparkContext。

(4)Task采用了數(shù)據(jù)本地性和推測(cè)執(zhí)行的優(yōu)化機(jī)制。

4、DAGScheduler

Job=多個(gè)stage,Stage=多個(gè)同種task, Task分為ShuffleMapTask和ResultTask,Dependency分為ShuffleDependency和NarrowDependency

  1. 面向stage的切分,切分依據(jù)為寬依賴

  2. 維護(hù)waiting jobs和active jobs,維護(hù)waiting stages、active stages和failed stages,以及與jobs的映射關(guān)系

主要職能
1、接收提交Job的主入口,submitJob(rdd, ...)或runJob(rdd, ...)。在SparkContext里會(huì)調(diào)用這兩個(gè)方法。

  • 生成一個(gè)Stage并提交,接著判斷Stage是否有父Stage未完成,若有,提交并等待父Stage,以此類(lèi)推。結(jié)果是:DAGScheduler里增加了一些waiting stage和一個(gè)running stage。
  • running stage提交后,分析stage里Task的類(lèi)型,生成一個(gè)Task描述,即TaskSet。
  • 調(diào)用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交給TaskScheduler。TaskScheduler依據(jù)資源量和觸發(fā)分配條件,會(huì)為這個(gè)TaskSet分配資源并觸發(fā)執(zhí)行。
  • DAGScheduler提交job后,異步返回JobWaiter對(duì)象,能夠返回job運(yùn)行狀態(tài),能夠cancel job,執(zhí)行成功后會(huì)處理并返回結(jié)果

2、處理TaskCompletionEvent

  • 如果task執(zhí)行成功,對(duì)應(yīng)的stage里減去這個(gè)task,做一些計(jì)數(shù)工作:
    如果task是ResultTask,計(jì)數(shù)器Accumulator加一,在job里為該task置true,job finish總數(shù)加一。加完后如果finish數(shù)目與partition數(shù)目相等,說(shuō)明這個(gè)stage完成了,標(biāo)記stage完成,從running stages里減去這個(gè)stage,做一些stage移除的清理工作
    如果task是ShuffleMapTask,計(jì)數(shù)器Accumulator加一,在stage里加上一個(gè)output location,里面是一個(gè)MapStatus類(lèi)。MapStatus是ShuffleMapTask執(zhí)行完成的返回,包含location信息和block size(可以選擇壓縮或未壓縮)。同時(shí)檢查該stage完成,向MapOutputTracker注冊(cè)本stage里的shuffleId和location信息。然后檢查stage的output location里是否存在空,若存在空,說(shuō)明一些task失敗了,整個(gè)stage重新提交;否則,繼續(xù)從waiting stages里提交下一個(gè)需要做的stage
  • 如果task是重提交,對(duì)應(yīng)的stage里增加這個(gè)task
  • 如果task是fetch失敗,馬上標(biāo)記對(duì)應(yīng)的stage完成,從running stages里減去。如果不允許retry,abort整個(gè)stage;否則,重新提交整個(gè)stage。另外,把這個(gè)fetch相關(guān)的location和map任務(wù)信息,從stage里剔除,從MapOutputTracker注銷(xiāo)掉。最后,如果這次fetch的blockManagerId對(duì)象不為空,做一次ExecutorLost處理,下次shuffle會(huì)換在另一個(gè)executor上去執(zhí)行。
  • 其他task狀態(tài)會(huì)由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

3、其他與job相關(guān)的操作還包括:cancel job, cancel stage, resubmit failed stage等

5、TaskScheduler

維護(hù)task和executor對(duì)應(yīng)關(guān)系,executor和物理資源對(duì)應(yīng)關(guān)系,在排隊(duì)的task和正在跑的task。

內(nèi)部維護(hù)一個(gè)任務(wù)隊(duì)列,根據(jù)FIFO或Fair策略,調(diào)度任務(wù)。

TaskScheduler本身是個(gè)接口,spark里只實(shí)現(xiàn)了一個(gè)TaskSchedulerImpl,理論上任務(wù)調(diào)度可以定制。

主要功能:

1、submitTasks(taskSet),接收DAGScheduler提交來(lái)的tasks

  • 為tasks創(chuàng)建一個(gè)TaskSetManager,添加到任務(wù)隊(duì)列里。TaskSetManager跟蹤每個(gè)task的執(zhí)行狀況,維護(hù)了task的許多具體信息。
  • 觸發(fā)一次資源的索要。
    • 首先,TaskScheduler對(duì)照手頭的可用資源和Task隊(duì)列,進(jìn)行executor分配(考慮優(yōu)先級(jí)、本地化等策略),符合條件的executor會(huì)被分配給TaskSetManager。
    • 然后,得到的Task描述交給SchedulerBackend,調(diào)用launchTask(tasks),觸發(fā)executor上task的執(zhí)行。task描述被序列化后發(fā)給executor,executor提取task信息,調(diào)用task的run()方法執(zhí)行計(jì)算。
  1. cancelTasks(stageId),取消一個(gè)stage的tasks

    1. 調(diào)用SchedulerBackend的killTask(taskId, executorId, ...)方法。taskId和executorId在TaskScheduler里一直維護(hù)著。
  2. resourceOffer(offers: Seq[Workers]),這是非常重要的一個(gè)方法,調(diào)用者是SchedulerBacnend,用途是底層資源SchedulerBackend把空余的workers資源交給TaskScheduler,讓其根據(jù)調(diào)度策略為排隊(duì)的任務(wù)分配合理的cpu和內(nèi)存資源,然后把任務(wù)描述列表傳回給SchedulerBackend

    1. 從worker offers里,搜集executor和host的對(duì)應(yīng)關(guān)系、active executors、機(jī)架信息等等
    2. worker offers資源列表進(jìn)行隨機(jī)洗牌,任務(wù)隊(duì)列里的任務(wù)列表依據(jù)調(diào)度策略進(jìn)行一次排序
    3. 遍歷每個(gè)taskSet,按照進(jìn)程本地化、worker本地化、機(jī)器本地化、機(jī)架本地化的優(yōu)先級(jí)順序,為每個(gè)taskSet提供可用的cpu核數(shù),看是否滿足
    4. 默認(rèn)一個(gè)task需要一個(gè)cpu,設(shè)置參數(shù)為"spark.task.cpus=1"
    5. 為taskSet分配資源,校驗(yàn)是否滿足的邏輯,最終在TaskSetManager的resourceOffer(execId, host, maxLocality)方法里
    6. 滿足的話,會(huì)生成最終的任務(wù)描述,并且調(diào)用DAGScheduler的taskStarted(task, info)方法,通知DAGScheduler,這時(shí)候每次會(huì)觸發(fā)DAGScheduler做一次submitMissingStage的嘗試,即stage的tasks都分配到了資源的話,馬上會(huì)被提交執(zhí)行
  3. statusUpdate(taskId, taskState, data),另一個(gè)非常重要的方法,調(diào)用者是SchedulerBacnend,用途是SchedulerBacnend會(huì)將task執(zhí)行的狀態(tài)匯報(bào)給TaskScheduler做一些決定

    • 若TaskLost,找到該task對(duì)應(yīng)的executor,從active executor里移除,避免這個(gè)executor被分配到其他task繼續(xù)失敗下去。
    • task finish包括四種狀態(tài):finished, killed, failed, lost。只有finished是成功執(zhí)行完成了。其他三種是失敗。
    • task成功執(zhí)行完,調(diào)用TaskResultGetter.enqueueSuccessfulTask(taskSet, tid, data),否則調(diào)用TaskResultGetter.enqueueFailedTask(taskSet, tid, state, data)。TaskResultGetter內(nèi)部維護(hù)了一個(gè)線程池,負(fù)責(zé)異步fetch task執(zhí)行結(jié)果并反序列化。默認(rèn)開(kāi)四個(gè)線程做這件事,可配參數(shù)"spark.resultGetter.threads"=4。

TaskResultGetter取task result的邏輯

1、對(duì)于success task,如果taskResult里的數(shù)據(jù)是直接結(jié)果數(shù)據(jù),直接把data反序列出來(lái)得到結(jié)果;如果不是,會(huì)調(diào)用blockManager.getRemoteBytes(blockId)從遠(yuǎn)程獲取。如果遠(yuǎn)程取回的數(shù)據(jù)是空的,那么會(huì)調(diào)用TaskScheduler.handleFailedTask,告訴它這個(gè)任務(wù)是完成了的但是數(shù)據(jù)是丟失的。否則,取到數(shù)據(jù)之后會(huì)通知BlockManagerMaster移除這個(gè)block信息,調(diào)用TaskScheduler.handleSuccessfulTask,告訴它這個(gè)任務(wù)是執(zhí)行成功的,并且把result data傳回去。

2、對(duì)于failed task,從data里解析出fail的理由,調(diào)用TaskScheduler.handleFailedTask,告訴它這個(gè)任務(wù)失敗了,理由是什么。

6、SchedulerBackend

在TaskScheduler下層,用于對(duì)接不同的資源管理系統(tǒng),SchedulerBackend是個(gè)接口,需要實(shí)現(xiàn)的主要方法如下:

def start(): Unit
def stop(): Unit
def reviveOffers(): Unit // 重要方法:SchedulerBackend把自己手頭上的可用資源交給TaskScheduler,TaskScheduler根據(jù)調(diào)度策略分配給排隊(duì)的任務(wù)嗎,返回一批可執(zhí)行的任務(wù)描述,SchedulerBackend負(fù)責(zé)launchTask,即最終把task塞到了executor模型上,executor里的線程池會(huì)執(zhí)行task的run()
def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit =
    throw new UnsupportedOperationException

粗粒度:進(jìn)程常駐的模式,典型代表是standalone模式,mesos粗粒度模式,yarn

細(xì)粒度:mesos細(xì)粒度模式

這里討論粗粒度模式,更好理解:CoarseGrainedSchedulerBackend。

維護(hù)executor相關(guān)信息(包括executor的地址、通信端口、host、總核數(shù),剩余核數(shù)),手頭上executor有多少被注冊(cè)使用了,有多少剩余,總共還有多少核是空的等等。

主要職能

  1. Driver端主要通過(guò)actor監(jiān)聽(tīng)和處理下面這些事件:

    1. RegisterExecutor(executorId, hostPort, cores, logUrls)。這是executor添加的來(lái)源,通常worker拉起、重啟會(huì)觸發(fā)executor的注冊(cè)。CoarseGrainedSchedulerBackend把這些executor維護(hù)起來(lái),更新內(nèi)部的資源信息,比如總核數(shù)增加。最后調(diào)用一次makeOffer(),即把手頭資源丟給TaskScheduler去分配一次,返回任務(wù)描述回來(lái),把任務(wù)launch起來(lái)。這個(gè)makeOffer()的調(diào)用會(huì)出現(xiàn)在任何與資源變化相關(guān)的事件中,下面會(huì)看到。
    2. StatusUpdate(executorId, taskId, state, data)。task的狀態(tài)回調(diào)。首先,調(diào)用TaskScheduler.statusUpdate上報(bào)上去。然后,判斷這個(gè)task是否執(zhí)行結(jié)束了,結(jié)束了的話把executor上的freeCore加回去,調(diào)用一次makeOffer()。
    3. ReviveOffers。這個(gè)事件就是別人直接向SchedulerBackend請(qǐng)求資源,直接調(diào)用makeOffer()。
    4. KillTask(taskId, executorId, interruptThread)。這個(gè)killTask的事件,會(huì)被發(fā)送給executor的actor,executor會(huì)處理KillTask這個(gè)事件。
    5. StopExecutors。通知每一個(gè)executor,處理StopExecutor事件。
    6. RemoveExecutor(executorId, reason)。從維護(hù)信息中,那這堆executor涉及的資源數(shù)減掉,然后調(diào)用TaskScheduler.executorLost()方法,通知上層我這邊有一批資源不能用了,你處理下吧。TaskScheduler會(huì)繼續(xù)把executorLost的事件上報(bào)給DAGScheduler,原因是DAGScheduler關(guān)心shuffle任務(wù)的output location。DAGScheduler會(huì)告訴BlockManager這個(gè)executor不可用了,移走它,然后把所有的stage的shuffleOutput信息都遍歷一遍,移走這個(gè)executor,并且把更新后的shuffleOutput信息注冊(cè)到MapOutputTracker上,最后清理下本地的CachedLocationsMap。
  2. reviveOffers()方法的實(shí)現(xiàn)。直接調(diào)用了makeOffers()方法,得到一批可執(zhí)行的任務(wù)描述,調(diào)用launchTasks。

  3. launchTasks(tasks: Seq[Seq[TaskDescription]])方法。

    1. 遍歷每個(gè)task描述,序列化成二進(jìn)制,然后發(fā)送給每個(gè)對(duì)應(yīng)的executor這個(gè)任務(wù)信息
      1. 如果這個(gè)二進(jìn)制信息太大,超過(guò)了9.2M(默認(rèn)的akkaFrameSize 10M 減去 默認(rèn) 為akka留空的200K),會(huì)出錯(cuò),abort整個(gè)taskSet,并打印提醒增大akka frame size
      2. 如果二進(jìn)制數(shù)據(jù)大小可接受,發(fā)送給executor的actor,處理LaunchTask(serializedTask)事件。
7、Executor

Executor是spark里的進(jìn)程模型,可以套用到不同的資源管理系統(tǒng)上,與SchedulerBackend配合使用。

內(nèi)部有個(gè)線程池,有個(gè)running tasks map,有個(gè)actor(2.0以后就不是actor了),接收上面提到的由SchedulerBackend發(fā)來(lái)的事件。

事件處理

  1. launchTask。根據(jù)task描述,生成一個(gè)TaskRunner線程,丟盡running tasks map里,用線程池執(zhí)行這個(gè)TaskRunner
  2. killTask。從running tasks map里拿出線程對(duì)象,調(diào)它的kill方法。

三、Spark在不同集群中的運(yùn)行架構(gòu)

Spark注重建立良好的生態(tài)系統(tǒng),它不僅支持多種外部文件存儲(chǔ)系統(tǒng),提供了多種多樣的集群運(yùn)行模式。部署在單臺(tái)機(jī)器上時(shí),既可以用本地(Local)模式運(yùn)行,也可以使用偽分布式模式來(lái)運(yùn)行;當(dāng)以分布式集群部署的時(shí)候,可以根據(jù)自己集群的實(shí)際情況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各種運(yùn)行模式雖然在啟動(dòng)方式、運(yùn)行位置、調(diào)度策略上各有不同,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據(jù)用戶的配置和Job的需要運(yùn)行和管理Task。

3.1 Spark on Standalone運(yùn)行過(guò)程

Standalone模式是Spark實(shí)現(xiàn)的資源調(diào)度框架,其主要的節(jié)點(diǎn)有Client節(jié)點(diǎn)、Master節(jié)點(diǎn)和Worker節(jié)點(diǎn)。其中Driver既可以運(yùn)行在Master節(jié)點(diǎn)上中,也可以運(yùn)行在本地Client端。當(dāng)用spark-shell交互式工具提交Spark的Job時(shí),Driver在Master節(jié)點(diǎn)上運(yùn)行;當(dāng)使用spark-submit工具提交Job或者在Eclips、IDEA等開(kāi)發(fā)平臺(tái)上使用”new SparkConf().setMaster(“spark://master:7077”)”方式運(yùn)行Spark任務(wù)時(shí),Driver是運(yùn)行在本地Client端上的。

  1. 我們提交一個(gè)任務(wù),任務(wù)就叫Application
  2. 初始化程序的入口SparkContext,
    1. 初始化DAG Scheduler
    2. 初始化Task Scheduler
      3、Task Scheduler向master去進(jìn)行注冊(cè)并申請(qǐng)資源(CPU Core和Memory)
      4、Master根據(jù)SparkContext的資源申請(qǐng)要求和Worker心跳周期內(nèi)報(bào)告的信息決定在哪個(gè)Worker上分配資源,然后在該Worker上獲取資源,然后啟動(dòng)StandaloneExecutorBackend;順便初始化好了一個(gè)線程池
  3. StandaloneExecutorBackend向Driver(SparkContext)注冊(cè),這樣Driver就知道哪些Executor為他進(jìn)行服務(wù)了。
    到這個(gè)時(shí)候其實(shí)我們的初始化過(guò)程基本完成了,我們開(kāi)始執(zhí)行transformation的代碼,但是代碼并不會(huì)真正的運(yùn)行,直到我們遇到一個(gè)action操作。生產(chǎn)一個(gè)job任務(wù),進(jìn)行stage的劃分

6、SparkContext將Applicaiton代碼發(fā)送給StandaloneExecutorBackend;并且SparkContext解析Applicaiton代碼,構(gòu)建DAG圖,并提交給DAG Scheduler分解成Stage(當(dāng)碰到Action操作 時(shí),就會(huì)催生Job;每個(gè)Job中含有1個(gè)或多個(gè)Stage,Stage一般在獲取外部數(shù)據(jù)和shuffle之前產(chǎn)生)。

7、將Stage(或者稱為T(mén)askSet)提交給Task Scheduler。Task Scheduler負(fù)責(zé)將Task分配到相應(yīng)的Worker,最后提交給StandaloneExecutorBackend執(zhí)行;
8、對(duì)task進(jìn)行序列化,并根據(jù)task的分配算法,分配task
9、executor對(duì)接收過(guò)來(lái)的task進(jìn)行反序列化,把task封裝成一個(gè)線程
10、開(kāi)始執(zhí)行Task,并向SparkContext報(bào)告,直至Task完成。
11、資源注銷(xiāo)

運(yùn)行過(guò)程圖形說(shuō)明

image.png

3.2 Spark on YARN運(yùn)行過(guò)程

YARN是一種統(tǒng)一資源管理機(jī)制,在其上面可以運(yùn)行多套計(jì)算框架。目前的大數(shù)據(jù)技術(shù)世界,大多數(shù)公司除了使用Spark來(lái)進(jìn)行數(shù)據(jù)計(jì)算,由于歷史原因或者單方面業(yè)務(wù)處理的性能考慮而使用著其他的計(jì)算框架,比如MapReduce、Storm等計(jì)算框架。Spark基于此種情況開(kāi)發(fā)了Spark on YARN的運(yùn)行模式,由于借助了YARN良好的彈性資源管理機(jī)制,不僅部署Application更加方便,而且用戶在YARN集群中運(yùn)行的服務(wù)和Application的資源也完全隔離,更具實(shí)踐應(yīng)用價(jià)值的是YARN可以通過(guò)隊(duì)列的方式,管理同時(shí)運(yùn)行在集群中的多個(gè)服務(wù)。

Spark on YARN模式根據(jù)Driver在集群中的位置分為兩種模式:一種是YARN-Client模式,另一種是YARN-Cluster(或稱為YARN-Standalone模式)。

3.2.2 YARN-Client

Yarn-Client模式中,Driver在客戶端本地運(yùn)行,這種模式可以使得Spark Application和客戶端進(jìn)行交互,因?yàn)镈river在客戶端,所以可以通過(guò)webUI訪問(wèn)Driver的狀態(tài),默認(rèn)是http://hadoop1:4040訪問(wèn),而YARN通過(guò)http:// hadoop1:8088訪問(wèn)。

YARN-client的工作流程分為以下幾個(gè)步驟:

  1. Spark Yarn Client向YARN的ResourceManager申請(qǐng)啟動(dòng)Application Master。同時(shí)在SparkContent初始化中將創(chuàng)建DAGScheduler和TASKScheduler等,由于我們選擇的是Yarn-Client模式,程序會(huì)選擇YarnClientClusterScheduler和YarnClientSchedulerBackend;

  2. ResourceManager收到請(qǐng)求后,在集群中選擇一個(gè)NodeManager,為該應(yīng)用程序分配第一個(gè)Container,要求它在這個(gè)Container中啟動(dòng)應(yīng)用程序的ApplicationMaster,與YARN-Cluster區(qū)別的是在該ApplicationMaster不運(yùn)行SparkContext,只與SparkContext進(jìn)行聯(lián)系進(jìn)行資源的分派;

  3. Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊(cè),根據(jù)任務(wù)信息向ResourceManager申請(qǐng)資源(Container);

  4. 一旦ApplicationMaster申請(qǐng)到資源(也就是Container)后,便與對(duì)應(yīng)的NodeManager通信,要求它在獲得的Container中啟動(dòng)啟動(dòng)CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動(dòng)后會(huì)向Client中的SparkContext注冊(cè)并申請(qǐng)Task;

  5. Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執(zhí)行,CoarseGrainedExecutorBackend運(yùn)行Task并向Driver匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度,以讓Client隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù);

  6. 應(yīng)用程序運(yùn)行完成后,Client的SparkContext向ResourceManager申請(qǐng)注銷(xiāo)并關(guān)閉自己。

image.png

3.2.3 YARN-Cluster

在YARN-Cluster模式中,當(dāng)用戶向YARN中提交一個(gè)應(yīng)用程序后,YARN將分兩個(gè)階段運(yùn)行該應(yīng)用程序:第一個(gè)階段是把Spark的Driver作為一個(gè)ApplicationMaster在YARN集群中先啟動(dòng);第二個(gè)階段是由ApplicationMaster創(chuàng)建應(yīng)用程序,然后為它向ResourceManager申請(qǐng)資源,并啟動(dòng)Executor來(lái)運(yùn)行Task,同時(shí)監(jiān)控它的整個(gè)運(yùn)行過(guò)程,直到運(yùn)行完成。

YARN-cluster的工作流程分為以下幾個(gè)步驟:

  1. Spark Yarn Client向YARN中提交應(yīng)用程序,包括ApplicationMaster程序、啟動(dòng)ApplicationMaster的命令、需要在Executor中運(yùn)行的程序等;

  2. ResourceManager收到請(qǐng)求后,在集群中選擇一個(gè)NodeManager,為該應(yīng)用程序分配第一個(gè)Container,要求它在這個(gè)Container中啟動(dòng)應(yīng)用程序的ApplicationMaster,其中ApplicationMaster進(jìn)行SparkContext等的初始化;

  3. ApplicationMaster向ResourceManager注冊(cè),這樣用戶可以直接通過(guò)ResourceManage查看應(yīng)用程序的運(yùn)行狀態(tài),然后它將采用輪詢的方式通過(guò)RPC協(xié)議為各個(gè)任務(wù)申請(qǐng)資源,并監(jiān)控它們的運(yùn)行狀態(tài)直到運(yùn)行結(jié)束;

  4. 一旦ApplicationMaster申請(qǐng)到資源(也就是Container)后,便與對(duì)應(yīng)的NodeManager通信,要求它在獲得的Container中啟動(dòng)啟動(dòng)CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動(dòng)后會(huì)向ApplicationMaster中的SparkContext注冊(cè)并申請(qǐng)Task。這一點(diǎn)和Standalone模式一樣,只不過(guò)SparkContext在Spark Application中初始化時(shí),使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進(jìn)行任務(wù)的調(diào)度,其中YarnClusterScheduler只是對(duì)TaskSchedulerImpl的一個(gè)簡(jiǎn)單包裝,增加了對(duì)Executor的等待邏輯等;

  5. ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執(zhí)行,CoarseGrainedExecutorBackend運(yùn)行Task并向ApplicationMaster匯報(bào)運(yùn)行的狀態(tài)和進(jìn)度,以讓ApplicationMaster隨時(shí)掌握各個(gè)任務(wù)的運(yùn)行狀態(tài),從而可以在任務(wù)失敗時(shí)重新啟動(dòng)任務(wù);

  6. 應(yīng)用程序運(yùn)行完成后,ApplicationMaster向ResourceManager申請(qǐng)注銷(xiāo)并關(guān)閉自己。

image.png
YARN-Client 與 YARN-Cluster 區(qū)別

理解YARN-Client和YARN-Cluster深層次的區(qū)別之前先清楚一個(gè)概念:Application Master。在YARN中,每個(gè)Application實(shí)例都有一個(gè)ApplicationMaster進(jìn)程,它是Application啟動(dòng)的第一個(gè)容器。它負(fù)責(zé)和ResourceManager打交道并請(qǐng)求資源,獲取資源之后告訴NodeManager為其啟動(dòng)Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區(qū)別其實(shí)就是ApplicationMaster進(jìn)程的區(qū)別。

1、YARN-Cluster模式下,Driver運(yùn)行在AM(Application Master)中,它負(fù)責(zé)向YARN申請(qǐng)資源,并監(jiān)督作業(yè)的運(yùn)行狀況。當(dāng)用戶提交了作業(yè)之后,就可以關(guān)掉Client,作業(yè)會(huì)繼續(xù)在YARN上運(yùn)行,因而YARN-Cluster模式不適合運(yùn)行交互類(lèi)型的作業(yè);

2、YARN-Client模式下,Application Master僅僅向YARN請(qǐng)求Executor,Client會(huì)和請(qǐng)求的Container通信來(lái)調(diào)度他們工作,也就是說(shuō)Client不能離開(kāi)。

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容