Spark Core解析 2:Scheduler 調(diào)度體系

Spark Core解析 2:Scheduler 調(diào)度體系

Overview

調(diào)度系統(tǒng),是貫穿整個(gè)Spark應(yīng)用的主心骨,從調(diào)度系統(tǒng)開始入手了解Spark Core,比較容易理清頭緒。

Spark的資源調(diào)度采用的是常見的兩層調(diào)度,底層資源的管理和分配是第一層調(diào)度,交給YARN、Mesos或者Spark的Standalone集群處理,Application從第一層調(diào)度拿到資源后,還要進(jìn)行內(nèi)部的任務(wù)和資源調(diào)度,將任務(wù)和資源進(jìn)行匹配,這是第二層調(diào)度,本文講的就是這第二層調(diào)度。

Spark的調(diào)度體系涉及的任務(wù)包括3個(gè)粒度,分別是Job、Stage、Task。
Job代表用戶提交的一系列操作的總體,一個(gè)具體的計(jì)算任務(wù),有明確的輸入輸出,一個(gè)Job由多個(gè)Stage組成;
一個(gè)Stage代表Job計(jì)算流程的一個(gè)組成部分,一個(gè)階段,包含多個(gè)Task;
一個(gè)Task代表對一個(gè)分區(qū)的數(shù)據(jù)進(jìn)行計(jì)算的具體任務(wù)。

層級(jí)關(guān)系:Job > Stage > Task

Spark Core 解析:RDD 彈性分布式數(shù)據(jù)集中,已經(jīng)解釋了RDD之間的依賴,以及如何組成RDD血緣圖。

所以本文主要目的就是解釋清楚:Scheduler將RDD血緣圖轉(zhuǎn)變成Stage DAG,然后生成Task,最后提交給Executor去執(zhí)行的過程。

20191212230626.png

Stage

Job的不同分區(qū)的計(jì)算通??梢圆⑿?,但是有些計(jì)算需要將數(shù)據(jù)進(jìn)行重新分區(qū),這個(gè)過程稱作shuffle(混洗)。Shuffle的過程是沒法完全并行的,這時(shí)候就會(huì)出現(xiàn)task之間的等待,task的數(shù)量也可能發(fā)生變化,所以Spark中以shuffle為邊界,對task進(jìn)行劃分,劃分出來的每段稱為Stage。

Stage代表一組可以并行的執(zhí)行相同計(jì)算的task,每個(gè)任務(wù)必須有相同的分區(qū)規(guī)則,這樣一個(gè)stage中是沒有shuffle的。

在一個(gè)Spark App中,stage有一個(gè)全局唯一ID,stage id是自增的。

20191028171155.png

Stage分為兩種:

  • ResultStage:最后執(zhí)行的stage,負(fù)責(zé)Job最終的結(jié)果輸出,每個(gè)Job有且僅有一個(gè)ResultStage
  • ShuffleMapStage:該stage的輸出不是最終結(jié)果,而是其他stage的輸入數(shù)據(jù),通常涉及一次shuffle計(jì)算。

stage創(chuàng)建流程:

  • 從最終執(zhí)行action的RDD開始,沿著RDD依賴關(guān)系遍歷,
    一旦發(fā)現(xiàn)某個(gè)RDD的dependency是ShuffleDependency,就創(chuàng)建一個(gè)ShuffleMapStage。
  • 最后創(chuàng)建ResultStage。

example 1

val rg=sc.parallelize(List((1,10),(2,20)))
rg.reduceByKey(_+_).collect
stages-simple.png

這里reduceByKey操作引起了一次shuffle,所以job被切分成了2個(gè)stage。

example 2

val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect
stages-join.png

join操作導(dǎo)致rddA和rddB都進(jìn)行了一次shuffle,所以有3個(gè)stage。

example 3

import org.apache.spark.HashPartitioner
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect
stages-co-join.png

WHAT ?

因?yàn)閞ddA已經(jīng)定義了Partitioner,這里join操作會(huì)保留rddA的分區(qū)方式,所以對rddA的依賴是OneToOneDepenency,而對于rddB則是ShuffleDependency。

stage-example-3-2.png

探索:一個(gè)RDD被依賴多次,會(huì)如何

val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
rddA join rddA collect
rdd use twice.png
rdd-used-twice.png

一個(gè)RDD被兩個(gè)stage使用了。

小結(jié)

綜上,stage的劃分一定是依據(jù)shuffle即ShuffleDependency,跟算子和RDD變量的定義沒有很強(qiáng)的關(guān)系,example2和3中的join操作rddA.join(rddB).collect看起來一模一樣,但實(shí)際產(chǎn)生的stage劃分卻差別很大。

Task

與stage對應(yīng),task也分為兩種:

  • ShuffleMapTask:即ShuffleMapStage中的task,主要完成map、shuffle計(jì)算。
  • ResultTask:ResultStage中的task,主要完成最終結(jié)果輸出或者返回結(jié)果給driver的任務(wù)。

一個(gè)stage有多少個(gè)partition就會(huì)創(chuàng)建多少個(gè)task,比如一個(gè)ShuffleMapStage有10個(gè)partition,那么就會(huì)創(chuàng)建10個(gè)ShuffleMapTask。

一個(gè)Stage中的所有task組成一個(gè)TaskSet。

Job Submit

graph TB
R(RDD.action)-->S(SparkContext.runJob)-- RDD -->D(DAGScheduler.runJob)
-- TaskSet -->T(TaskScheduler.submitTasks)-- TaskDescription -->E(Executor.launchTask)

RDD在action操作中通過SparkContext.runJob方法觸發(fā)Job執(zhí)行流程,該方法將調(diào)用DagScheduler.runJob方法,將RDD傳入DagScheduler。然后,DAGScheduler創(chuàng)建TaskSet提交給TaskScheduler,TaskScheduler再將TaskSet封裝成TaskDescription發(fā)送給Executor,最后Executor會(huì)將TaskDescription提交給線程池來運(yùn)行。

Stage Scheduler(high-level)

DagScheduler

Stage級(jí)別的調(diào)度是DagScheduler負(fù)責(zé)的,也是Spark調(diào)度體系的核心。

DagScheduler的工作模式

sequenceDiagram
    participant M as main thread
    participant L as eventProcessLoop
    participant E as event thread
    M-->>L: post event
    E-->>L: handle event

DagScheduler內(nèi)部維護(hù)了一個(gè)事件消息總線eventProcessLoop(類型為DAGSchedulerEventProcessLoop),其實(shí)就是一個(gè)用來存儲(chǔ)DAGSchedulerEvent類型數(shù)據(jù)的隊(duì)列。

當(dāng)DagScheduler的一些方法被調(diào)用的時(shí)候(如submitJob方法),并不會(huì)在主線程中處理該任務(wù),而是post一個(gè)event(如JobSubmitted)到eventProcessLoop。eventProcessLoop中有一個(gè)守護(hù)線程,會(huì)不斷的依次從隊(duì)列中取出event,然后調(diào)用對應(yīng)的handle(如handleJobSubmitted)方法來執(zhí)行具體的任務(wù)。

Stage調(diào)度流程

  • 1.submit job

    DagScheduler.runJob方法會(huì)調(diào)用submitJob方法,向eventProcessLoop發(fā)送一個(gè)JobSubmitted類型的消息,其中包含了RDD等信息。當(dāng)eventProcessLoop接收到JobSubmitted類型的消息,會(huì)調(diào)用DagScheduler.handleJobSubmitted方法來處理消息。

sequenceDiagram
    participant M as main thread(runJob)
    participant L as eventProcessLoop
    participant E as event thread(handleJobSubmitted)
    M-->>L: post JobSubmitted event
    E-->>L: handle JobSubmitted event
  • 2.create stage

    • DagScheduler在它的handleJobSubmitted方法中開始創(chuàng)建ResultStage。ResultStage中包含了最終執(zhí)行action的finalRDD,以及計(jì)算函數(shù)func。

    • ResultStage有個(gè)parents屬性,這個(gè)屬性是個(gè)列表,也就是說可以有多個(gè)parent stage。創(chuàng)建ResultStage時(shí)需要先創(chuàng)建它的parent stage來填充這個(gè)屬性,也就是說要?jiǎng)?chuàng)建ResultStage直接依賴的所有ShuffleMapStage。

    • 通過stage.rdd.dependencies屬性,采用寬度優(yōu)先遍歷,一旦發(fā)現(xiàn)某個(gè)RDD(假設(shè)叫rddA)的dependency是ShuffleDependency,就創(chuàng)建一個(gè)ShuffleMapStage,ShuffleMapStage中包含的關(guān)鍵信息與ResultStage不同,是rddA的ShuffleDependency和rddA的ShuffleDependency.rdd,也就是說新創(chuàng)建的ShuffleMapStage持有的信息是他自身的最后一個(gè)RDD和該RDD的子RDD的dependency。

    • 創(chuàng)建一個(gè)ShuffleMapStage的過程同理會(huì)需要?jiǎng)?chuàng)建它的parent stage,也是若干ShuffleMapStage。如此遞歸下去,直到創(chuàng)建完所有的ShuffleMapStage,最后才完成ResultStage的創(chuàng)建。最后創(chuàng)建出來的這些Stage(若干ShuffleMapStage加一個(gè)ResultStage),通過parent屬性串起來,就像這樣

      graph TD
      A[ResultStage]-- parent -->B[ShuffleMapStage 1]
      A-- parent -->C[ShuffleMapStage 2]
      B-- parent -->D[ShuffleMapStage 3]
      

      這就生成了所謂的DAG圖,但是這個(gè)圖的指向跟執(zhí)行順序是反過來的,如果按執(zhí)行順序來畫DAG圖,就是常見的形式了:

      graph TD
      D[ShuffleMapStage 3]-->C[ShuffleMapStage 2]
      C[ShuffleMapStage 2]-->A[ResultStage]
      B[ShuffleMapStage 1]-->A[ResultStage]
      
  • 3.submit stage

    DagScheduler.handleJobSubmitted方法創(chuàng)建好ResultStage后會(huì)提交這個(gè)stage(submitStage方法),在提交一個(gè)stage的時(shí)候,會(huì)要先提交它的parent stage,也是通過遞歸的形式,直到一個(gè)stage的所有parent stage都被提交了,它自己才能被提交,如果一個(gè)stage的parent還沒有完成,則會(huì)把這個(gè)stage加入waitingStages。也就是說,DAG圖中前面的stage會(huì)被先提交。當(dāng)一個(gè)stage的parent都準(zhǔn)備好了,也就是執(zhí)行完了,它才會(huì)進(jìn)入submitMissingTasks的環(huán)節(jié)。

  • 4.submit task

    Task是在DagScheduler(不是TaskScheduler)的submitMissingTasks方法中創(chuàng)建的,包括ShuffleMapTask和ResultTask,與Stage對應(yīng)。歸屬于同一個(gè)stage的這批Task組成一個(gè)TaskSet集合,最后提交給TaskScheduler的就是這個(gè)TaskSet集合。

20191029095005.png

Task Scheduler(low-level)

Task的調(diào)度工作是由TaskScheduler與SchedulerBackend緊密合作,共同完成的。

TaskScheduler是task級(jí)別的調(diào)度器,主要作用是管理task的調(diào)度和提交,是Spark底層的調(diào)度器。

SchedulerBackend是TaskScheduler的后端服務(wù),有獨(dú)立的線程,所有的Executor都會(huì)注冊到SchedulerBackend,主要作用是進(jìn)行資源分配、將task分配給executor等。

Task調(diào)度流程

spark task scheduler.png

第一個(gè)線程是DAGScheduler的事件處理線程,在其中,Task先經(jīng)過DAGScheduler(藍(lán)色箭頭表示)封裝成TaskSet,再由TaskScheduler(綠色箭頭)封裝成TaskSetManager,并加入調(diào)度隊(duì)列中。

SchedulerBackend在收到ReviveOffers消息時(shí),會(huì)從線程池取一個(gè)線程進(jìn)行makeOffers操作,WorkerOffer創(chuàng)建后傳遞給TaskScheduler進(jìn)行分配。

圖中第二個(gè)線程就是SchedulerBackend的一個(gè)事件分發(fā)線程,從Pool中取出最優(yōu)先的TaskSetManager,然后將WorkerOffer與其中的Task進(jìn)行配對,生成TaskDescription,發(fā)送給WorkerOffer指定的Executor去執(zhí)行。

工作流程

TaskScheduler.png
  • 1 DAGScheduler(submitMissingTasks方法中)調(diào)用TaskScheduler.submitTasks()創(chuàng)建并提交TaskSet給TaskScheduler;
  • 2 TaskScheduler拿到TaskSet后會(huì)創(chuàng)建一個(gè)TaskSetManager來管理它,并且把TaskSetManager添加到rootPool調(diào)度池中;
  • 3 調(diào)用SchedulerBackend.reviveOffers()方法;
  • 4 SchedulerBackend發(fā)送ReviveOffers消息給DriverEndpoint;
  • 5 DriverEndpoint收到ReviveOffers消息后,會(huì)調(diào)用makeOffers()方法創(chuàng)建WorkerOffer,并通過TaskScheduler.resourceOffers()返回offer;
  • 6 TaskScheduler從rootPool獲取按調(diào)度算法排序后的TaskSetManager列表,取第一個(gè)TaskSetManager,逐個(gè)給TaskSet的Task分配WorkerOffer,生成TaskDescription(包含offer信息);
  • 7 調(diào)用SchedulerBackend.DriverEndpoint的launchTasks方法,將TaskDescription序列化并封裝在LaunchTask消息中,發(fā)送給offer指定的executor。LaunchTask消息被ExecutorBackend收到后,會(huì)將Task信息反序列化,傳給Executor.launchTask(),最后使用Executor的線程池中的線程來執(zhí)行這個(gè)Task。

梳理

Stage,TaskSet,TaskSetManager是一一對應(yīng)的,數(shù)量相等,都是只存在driver上的。
Parition,Task,TaskDescription是一一對應(yīng),數(shù)量相同,Task和TaskDescription是會(huì)被發(fā)到executor上的。

TaskScheduler的調(diào)度池

與DAGScheduler不同的是TaskScheduler有調(diào)度池,有兩種調(diào)度實(shí)體,Pool和TaskSetManager。
與YARN的調(diào)度隊(duì)列類似,采用了層級(jí)隊(duì)列的方式,Pool是TaskSetManager的容器,起到將TaskSetManager分組的作用。

Schedulable

Schedulable是調(diào)度實(shí)體的基類,有兩個(gè)子類Pool和TaskSetManager。

要理解調(diào)度規(guī)則,必須知道下面幾個(gè)屬性:

  • parent:所屬調(diào)度池,頂層的調(diào)度池為root pool;
  • schedulableQueue:包含的調(diào)度對象組成的隊(duì)列;
  • schedulingMode:調(diào)度模式,F(xiàn)IFO or FAIR;
  • weight:權(quán)重
  • minShare:最小分配額(CPU核數(shù))
  • runningTasks:運(yùn)行中task數(shù)
  • priority:優(yōu)先級(jí)
  • stageId:就是stageId
  • name:名稱

Pool和TaskSetManager對于這些屬性的取值有所不同,從而導(dǎo)致了他們的調(diào)度行為也不一樣。

properties Pool TaskSetManager
weight config 1
minShare config 0
priority 0 jobId
stageId -1 stageId
name config TaskSet_{taskSet.id}
runningTasks Pool所含TaskSetManager的runningTasks和 TaskSetManager運(yùn)行中task數(shù)

Pools創(chuàng)建流程

TaskScheduler有個(gè)屬性schedulingMode,值取決于配置項(xiàng)spark.scheduler.mode,默認(rèn)為FIFO。這個(gè)屬性會(huì)導(dǎo)致TaskScheduler使用不同的SchedulableBuilder,即FIFOSchedulableBuilder和FairSchedulableBuilder。

TaskScheduler在初始化的時(shí)候,就會(huì)創(chuàng)建root pool,根調(diào)度池,是所有pool的祖先。
它的屬性取值為:

name: "" (空字符串)
schedulingMode: 同TaskScheduler的schedulingMode屬性
weight: 0
minShare: 0

注意root pool的調(diào)度模式確定了。

接下來會(huì)執(zhí)行schedulableBuilder.buildPools()方法,

  • 如果是FIFOSchedulableBuilder,則什么都不會(huì)發(fā)生。

  • 若是FairSchedulableBuilder

    • 1 依據(jù)scheduler配置文件(后面會(huì)說),開始創(chuàng)建pool(可以是多個(gè)pool,F(xiàn)IFO,F(xiàn)AIR都有可能,取決于配置文件),并都加入root pool中。
    • 2 如果現(xiàn)在root pool中沒有名為"default"的pool(即配置文件中沒有定義一個(gè)叫default的pool),創(chuàng)建default pool,并加入root pool中。
      這時(shí)default pool它的屬性取值是固定的:
    name: "default"
    schedulingMode: FIFO
    weight: 1
    minShare: 0
    

Task加入pool流程

當(dāng)TaskScheduler提交task的時(shí)候,會(huì)先創(chuàng)建TaskSetManager,然后通過schedulableBuilder添加到pool中。

  • 如果是FIFOSchedulableBuilder,則會(huì)直接把TaskSetManager加入root pool隊(duì)列中。

  • 若是FairSchedulableBuilder

    • 1 從spark.scheduler.pool配置獲取pool name,沒有定義則用'default';
    • 2 從root pool遍歷找到對應(yīng)名稱的pool,把TaskSetManager加入pool的隊(duì)列。如果沒有找到,則創(chuàng)建一個(gè)該名稱的pool,采用與default pool相同的屬性配置,并加入root pool。

調(diào)度池結(jié)構(gòu)

經(jīng)過上面兩部分,最終得到的調(diào)度池結(jié)構(gòu)如下:

spark.scheduler.mode=FIFO

20191128210416.png

spark.scheduler.mode=FAIR

20191128210432.png

Fair Scheduler pools配置

Fair Scheduler Pool的劃分依賴于配置文件,默認(rèn)的配置文件為'fairscheduler.xml',也可以通過配置項(xiàng)"spark.scheduler.allocation.file"指定配置文件。

煮個(gè)栗子,文件內(nèi)容如下:

<?xml version="1.0"?>
<allocations>
  <pool name="prod">
    <schedulingMode>FAIR</schedulingMode>
    <weight>1</weight>
    <minShare>2</minShare>
  </pool>
  <pool name="test">
    <schedulingMode>FIFO</schedulingMode>
    <weight>2</weight>
    <minShare>3</minShare>
  </pool>
</allocations>

這里配置了兩個(gè)pool,prod和test,并且配置了相關(guān)屬性,這兩個(gè)pool都會(huì)添加到root pool中。

調(diào)度算法

以SchedulingAlgorithm為基類,內(nèi)置實(shí)現(xiàn)的調(diào)度算法有兩種FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,其邏輯如下:

  • FIFO: 先進(jìn)先出,優(yōu)先級(jí)比較算法如下,

    • 1.比較priority,小的優(yōu)先;
    • 2.priority相同則比較StageId,小的優(yōu)先。
  • FAIR:公平調(diào)度,優(yōu)先級(jí)比較算法如下,

    • 1.runningTasks小于minShare的優(yōu)先級(jí)比不小于的優(yōu)先級(jí)要高。
    • 2.若兩者運(yùn)行的runningTasks都比minShare小,則比較minShare使用率(runningTasks/max(minShare,1)),使用率越低優(yōu)先級(jí)越高。
    • 3.若兩者的minShare使用率相同,則比較權(quán)重使用率(runningTasks/weight),使用率越低優(yōu)先級(jí)越高。
    • 4.若權(quán)重也相同,則比較name,小的優(yōu)先。
Pool為FIFO模式下的幾種情形

TaskSetManager之間的比較,其實(shí)就是先比較jobId再比較stageId,誰小誰優(yōu)先,意味著就是誰先提交誰優(yōu)先。

Pool之間的比較,不存在!FIFO的pool隊(duì)列中是不會(huì)有pool的。

Pool為FAIR模式下的幾種情形

TaskSetManager之間的比較,因?yàn)閙inShare=0,weight=1,F(xiàn)AIR算法變成了:

  • 1 runningTasks小的優(yōu)先
  • 2 runningTasks相同則比較name

Pool之間的比較,就是標(biāo)準(zhǔn)的FAIR算法。

當(dāng)root pool為FAIR模式,先取最優(yōu)先的pool,再從pool中,按pool的調(diào)度模式取優(yōu)先的TaskSetManager。

開始使用FAIR mode

啟用FAIR模式:

  • 1 準(zhǔn)備好fairscheduler.xml文件
  • 2 啟動(dòng)參數(shù)添加 --conf spark.scheduler.mode=FAIR
  • 3 運(yùn)行啟動(dòng)命令,如spark-shell --master yarn --deploy-mode client --conf spark.scheode=FAIR
ui-fair.png

啟動(dòng)后如果直接運(yùn)行Job會(huì)自動(dòng)提交到default pool,那么如何提交Job到指定pool?
SparkContext.setLocalProperty("spark.scheduler.pool","poolName")

如果每次只運(yùn)行一個(gè)Job,開啟FAIR模式的意義不大,那么如何同時(shí)運(yùn)行多個(gè)Job?
要異步提交Job,需要用到RDD的async action,目前有如下幾個(gè):

countAsync
collectAsync
takeAsync
foreachAsync
foreachPartitionAsync

舉個(gè)例子:

sc.setLocalProperty("spark.scheduler.pool","test")
b.foreachAsync(_=>Thread.sleep(100))
sc.setLocalProperty("spark.scheduler.pool","production")
b.foreachAsync(_=>Thread.sleep(100))

這樣就會(huì)有兩個(gè)任務(wù)在不同的pool同時(shí)運(yùn)行:

pools.png

FAIR mode應(yīng)用場景

場景1:Spark SQL thrift server
作用:讓離線任務(wù)和交互式查詢?nèi)蝿?wù)分配到不同的pool,給交互式查詢?nèi)蝿?wù)更高的優(yōu)先級(jí),這樣長時(shí)間運(yùn)行的離線任務(wù)就不會(huì)一直占用所有資源,阻塞交互式查詢?nèi)蝿?wù)。

場景2:Streaming job與Batch job同時(shí)運(yùn)行
作用:比如用Streaming接數(shù)據(jù)寫入HDFS,可能產(chǎn)生很多小文件,可以在低優(yōu)先級(jí)的pool定時(shí)運(yùn)行batch job合并小文件。

另外可以參考Spark Summit 2017的分享:Continuous Application with FAIR Scheduler

參考

Spark內(nèi)核設(shè)計(jì)的藝術(shù)

spark任務(wù)調(diào)度FIFO和FAIR的詳解

Job Scheduling

轉(zhuǎn)載請注明原文地址:
https://liam-blog.ml/2019/11/07/spark-core-scheduler/

查看更多博主文章

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容