Spark Core解析 2:Scheduler 調(diào)度體系
Overview
調(diào)度系統(tǒng),是貫穿整個(gè)Spark應(yīng)用的主心骨,從調(diào)度系統(tǒng)開始入手了解Spark Core,比較容易理清頭緒。
Spark的資源調(diào)度采用的是常見的兩層調(diào)度,底層資源的管理和分配是第一層調(diào)度,交給YARN、Mesos或者Spark的Standalone集群處理,Application從第一層調(diào)度拿到資源后,還要進(jìn)行內(nèi)部的任務(wù)和資源調(diào)度,將任務(wù)和資源進(jìn)行匹配,這是第二層調(diào)度,本文講的就是這第二層調(diào)度。
Spark的調(diào)度體系涉及的任務(wù)包括3個(gè)粒度,分別是Job、Stage、Task。
Job代表用戶提交的一系列操作的總體,一個(gè)具體的計(jì)算任務(wù),有明確的輸入輸出,一個(gè)Job由多個(gè)Stage組成;
一個(gè)Stage代表Job計(jì)算流程的一個(gè)組成部分,一個(gè)階段,包含多個(gè)Task;
一個(gè)Task代表對一個(gè)分區(qū)的數(shù)據(jù)進(jìn)行計(jì)算的具體任務(wù)。
層級(jí)關(guān)系:Job > Stage > Task
在Spark Core 解析:RDD 彈性分布式數(shù)據(jù)集中,已經(jīng)解釋了RDD之間的依賴,以及如何組成RDD血緣圖。
所以本文主要目的就是解釋清楚:Scheduler將RDD血緣圖轉(zhuǎn)變成Stage DAG,然后生成Task,最后提交給Executor去執(zhí)行的過程。

Stage
Job的不同分區(qū)的計(jì)算通??梢圆⑿?,但是有些計(jì)算需要將數(shù)據(jù)進(jìn)行重新分區(qū),這個(gè)過程稱作shuffle(混洗)。Shuffle的過程是沒法完全并行的,這時(shí)候就會(huì)出現(xiàn)task之間的等待,task的數(shù)量也可能發(fā)生變化,所以Spark中以shuffle為邊界,對task進(jìn)行劃分,劃分出來的每段稱為Stage。
Stage代表一組可以并行的執(zhí)行相同計(jì)算的task,每個(gè)任務(wù)必須有相同的分區(qū)規(guī)則,這樣一個(gè)stage中是沒有shuffle的。
在一個(gè)Spark App中,stage有一個(gè)全局唯一ID,stage id是自增的。

Stage分為兩種:
- ResultStage:最后執(zhí)行的stage,負(fù)責(zé)Job最終的結(jié)果輸出,每個(gè)Job有且僅有一個(gè)ResultStage;
- ShuffleMapStage:該stage的輸出不是最終結(jié)果,而是其他stage的輸入數(shù)據(jù),通常涉及一次shuffle計(jì)算。
stage創(chuàng)建流程:
- 從最終執(zhí)行action的RDD開始,沿著RDD依賴關(guān)系遍歷,
一旦發(fā)現(xiàn)某個(gè)RDD的dependency是ShuffleDependency,就創(chuàng)建一個(gè)ShuffleMapStage。 - 最后創(chuàng)建ResultStage。
example 1
val rg=sc.parallelize(List((1,10),(2,20)))
rg.reduceByKey(_+_).collect

這里reduceByKey操作引起了一次shuffle,所以job被切分成了2個(gè)stage。
example 2
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect

join操作導(dǎo)致rddA和rddB都進(jìn)行了一次shuffle,所以有3個(gè)stage。
example 3
import org.apache.spark.HashPartitioner
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3))
val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C")))
rddA.join(rddB).collect

WHAT ?
因?yàn)閞ddA已經(jīng)定義了Partitioner,這里join操作會(huì)保留rddA的分區(qū)方式,所以對rddA的依賴是OneToOneDepenency,而對于rddB則是ShuffleDependency。

探索:一個(gè)RDD被依賴多次,會(huì)如何
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c")))
rddA join rddA collect


一個(gè)RDD被兩個(gè)stage使用了。
小結(jié)
綜上,stage的劃分一定是依據(jù)shuffle即ShuffleDependency,跟算子和RDD變量的定義沒有很強(qiáng)的關(guān)系,example2和3中的join操作rddA.join(rddB).collect看起來一模一樣,但實(shí)際產(chǎn)生的stage劃分卻差別很大。
Task
與stage對應(yīng),task也分為兩種:
- ShuffleMapTask:即ShuffleMapStage中的task,主要完成map、shuffle計(jì)算。
- ResultTask:ResultStage中的task,主要完成最終結(jié)果輸出或者返回結(jié)果給driver的任務(wù)。
一個(gè)stage有多少個(gè)partition就會(huì)創(chuàng)建多少個(gè)task,比如一個(gè)ShuffleMapStage有10個(gè)partition,那么就會(huì)創(chuàng)建10個(gè)ShuffleMapTask。
一個(gè)Stage中的所有task組成一個(gè)TaskSet。
Job Submit
graph TB
R(RDD.action)-->S(SparkContext.runJob)-- RDD -->D(DAGScheduler.runJob)
-- TaskSet -->T(TaskScheduler.submitTasks)-- TaskDescription -->E(Executor.launchTask)
RDD在action操作中通過SparkContext.runJob方法觸發(fā)Job執(zhí)行流程,該方法將調(diào)用DagScheduler.runJob方法,將RDD傳入DagScheduler。然后,DAGScheduler創(chuàng)建TaskSet提交給TaskScheduler,TaskScheduler再將TaskSet封裝成TaskDescription發(fā)送給Executor,最后Executor會(huì)將TaskDescription提交給線程池來運(yùn)行。
Stage Scheduler(high-level)
DagScheduler
Stage級(jí)別的調(diào)度是DagScheduler負(fù)責(zé)的,也是Spark調(diào)度體系的核心。
DagScheduler的工作模式
sequenceDiagram
participant M as main thread
participant L as eventProcessLoop
participant E as event thread
M-->>L: post event
E-->>L: handle event
DagScheduler內(nèi)部維護(hù)了一個(gè)事件消息總線eventProcessLoop(類型為DAGSchedulerEventProcessLoop),其實(shí)就是一個(gè)用來存儲(chǔ)DAGSchedulerEvent類型數(shù)據(jù)的隊(duì)列。
當(dāng)DagScheduler的一些方法被調(diào)用的時(shí)候(如submitJob方法),并不會(huì)在主線程中處理該任務(wù),而是post一個(gè)event(如JobSubmitted)到eventProcessLoop。eventProcessLoop中有一個(gè)守護(hù)線程,會(huì)不斷的依次從隊(duì)列中取出event,然后調(diào)用對應(yīng)的handle(如handleJobSubmitted)方法來執(zhí)行具體的任務(wù)。
Stage調(diào)度流程
-
1.submit job
DagScheduler.runJob方法會(huì)調(diào)用submitJob方法,向eventProcessLoop發(fā)送一個(gè)JobSubmitted類型的消息,其中包含了RDD等信息。當(dāng)eventProcessLoop接收到JobSubmitted類型的消息,會(huì)調(diào)用DagScheduler.handleJobSubmitted方法來處理消息。
sequenceDiagram
participant M as main thread(runJob)
participant L as eventProcessLoop
participant E as event thread(handleJobSubmitted)
M-->>L: post JobSubmitted event
E-->>L: handle JobSubmitted event
-
2.create stage
DagScheduler在它的handleJobSubmitted方法中開始創(chuàng)建ResultStage。ResultStage中包含了最終執(zhí)行action的finalRDD,以及計(jì)算函數(shù)func。
ResultStage有個(gè)parents屬性,這個(gè)屬性是個(gè)列表,也就是說可以有多個(gè)parent stage。創(chuàng)建ResultStage時(shí)需要先創(chuàng)建它的parent stage來填充這個(gè)屬性,也就是說要?jiǎng)?chuàng)建ResultStage直接依賴的所有ShuffleMapStage。
通過stage.rdd.dependencies屬性,采用寬度優(yōu)先遍歷,一旦發(fā)現(xiàn)某個(gè)RDD(假設(shè)叫rddA)的dependency是ShuffleDependency,就創(chuàng)建一個(gè)ShuffleMapStage,ShuffleMapStage中包含的關(guān)鍵信息與ResultStage不同,是rddA的ShuffleDependency和rddA的ShuffleDependency.rdd,也就是說新創(chuàng)建的ShuffleMapStage持有的信息是他自身的最后一個(gè)RDD和該RDD的子RDD的dependency。
-
創(chuàng)建一個(gè)ShuffleMapStage的過程同理會(huì)需要?jiǎng)?chuàng)建它的parent stage,也是若干ShuffleMapStage。如此遞歸下去,直到創(chuàng)建完所有的ShuffleMapStage,最后才完成ResultStage的創(chuàng)建。最后創(chuàng)建出來的這些Stage(若干ShuffleMapStage加一個(gè)ResultStage),通過parent屬性串起來,就像這樣
graph TD A[ResultStage]-- parent -->B[ShuffleMapStage 1] A-- parent -->C[ShuffleMapStage 2] B-- parent -->D[ShuffleMapStage 3]這就生成了所謂的DAG圖,但是這個(gè)圖的指向跟執(zhí)行順序是反過來的,如果按執(zhí)行順序來畫DAG圖,就是常見的形式了:
graph TD D[ShuffleMapStage 3]-->C[ShuffleMapStage 2] C[ShuffleMapStage 2]-->A[ResultStage] B[ShuffleMapStage 1]-->A[ResultStage]
-
3.submit stage
DagScheduler.handleJobSubmitted方法創(chuàng)建好ResultStage后會(huì)提交這個(gè)stage(submitStage方法),在提交一個(gè)stage的時(shí)候,會(huì)要先提交它的parent stage,也是通過遞歸的形式,直到一個(gè)stage的所有parent stage都被提交了,它自己才能被提交,如果一個(gè)stage的parent還沒有完成,則會(huì)把這個(gè)stage加入waitingStages。也就是說,DAG圖中前面的stage會(huì)被先提交。當(dāng)一個(gè)stage的parent都準(zhǔn)備好了,也就是執(zhí)行完了,它才會(huì)進(jìn)入submitMissingTasks的環(huán)節(jié)。
-
4.submit task
Task是在DagScheduler(不是TaskScheduler)的submitMissingTasks方法中創(chuàng)建的,包括ShuffleMapTask和ResultTask,與Stage對應(yīng)。歸屬于同一個(gè)stage的這批Task組成一個(gè)TaskSet集合,最后提交給TaskScheduler的就是這個(gè)TaskSet集合。

Task Scheduler(low-level)
Task的調(diào)度工作是由TaskScheduler與SchedulerBackend緊密合作,共同完成的。
TaskScheduler是task級(jí)別的調(diào)度器,主要作用是管理task的調(diào)度和提交,是Spark底層的調(diào)度器。
SchedulerBackend是TaskScheduler的后端服務(wù),有獨(dú)立的線程,所有的Executor都會(huì)注冊到SchedulerBackend,主要作用是進(jìn)行資源分配、將task分配給executor等。
Task調(diào)度流程

第一個(gè)線程是DAGScheduler的事件處理線程,在其中,Task先經(jīng)過DAGScheduler(藍(lán)色箭頭表示)封裝成TaskSet,再由TaskScheduler(綠色箭頭)封裝成TaskSetManager,并加入調(diào)度隊(duì)列中。
SchedulerBackend在收到ReviveOffers消息時(shí),會(huì)從線程池取一個(gè)線程進(jìn)行makeOffers操作,WorkerOffer創(chuàng)建后傳遞給TaskScheduler進(jìn)行分配。
圖中第二個(gè)線程就是SchedulerBackend的一個(gè)事件分發(fā)線程,從Pool中取出最優(yōu)先的TaskSetManager,然后將WorkerOffer與其中的Task進(jìn)行配對,生成TaskDescription,發(fā)送給WorkerOffer指定的Executor去執(zhí)行。
工作流程

- 1 DAGScheduler(submitMissingTasks方法中)調(diào)用TaskScheduler.submitTasks()創(chuàng)建并提交TaskSet給TaskScheduler;
- 2 TaskScheduler拿到TaskSet后會(huì)創(chuàng)建一個(gè)TaskSetManager來管理它,并且把TaskSetManager添加到rootPool調(diào)度池中;
- 3 調(diào)用SchedulerBackend.reviveOffers()方法;
- 4 SchedulerBackend發(fā)送ReviveOffers消息給DriverEndpoint;
- 5 DriverEndpoint收到ReviveOffers消息后,會(huì)調(diào)用makeOffers()方法創(chuàng)建WorkerOffer,并通過TaskScheduler.resourceOffers()返回offer;
- 6 TaskScheduler從rootPool獲取按調(diào)度算法排序后的TaskSetManager列表,取第一個(gè)TaskSetManager,逐個(gè)給TaskSet的Task分配WorkerOffer,生成TaskDescription(包含offer信息);
- 7 調(diào)用SchedulerBackend.DriverEndpoint的launchTasks方法,將TaskDescription序列化并封裝在LaunchTask消息中,發(fā)送給offer指定的executor。LaunchTask消息被ExecutorBackend收到后,會(huì)將Task信息反序列化,傳給Executor.launchTask(),最后使用Executor的線程池中的線程來執(zhí)行這個(gè)Task。
梳理
Stage,TaskSet,TaskSetManager是一一對應(yīng)的,數(shù)量相等,都是只存在driver上的。
Parition,Task,TaskDescription是一一對應(yīng),數(shù)量相同,Task和TaskDescription是會(huì)被發(fā)到executor上的。
TaskScheduler的調(diào)度池
與DAGScheduler不同的是TaskScheduler有調(diào)度池,有兩種調(diào)度實(shí)體,Pool和TaskSetManager。
與YARN的調(diào)度隊(duì)列類似,采用了層級(jí)隊(duì)列的方式,Pool是TaskSetManager的容器,起到將TaskSetManager分組的作用。
Schedulable
Schedulable是調(diào)度實(shí)體的基類,有兩個(gè)子類Pool和TaskSetManager。
要理解調(diào)度規(guī)則,必須知道下面幾個(gè)屬性:
- parent:所屬調(diào)度池,頂層的調(diào)度池為root pool;
- schedulableQueue:包含的調(diào)度對象組成的隊(duì)列;
- schedulingMode:調(diào)度模式,F(xiàn)IFO or FAIR;
- weight:權(quán)重
- minShare:最小分配額(CPU核數(shù))
- runningTasks:運(yùn)行中task數(shù)
- priority:優(yōu)先級(jí)
- stageId:就是stageId
- name:名稱
Pool和TaskSetManager對于這些屬性的取值有所不同,從而導(dǎo)致了他們的調(diào)度行為也不一樣。
| properties | Pool | TaskSetManager |
|---|---|---|
| weight | config | 1 |
| minShare | config | 0 |
| priority | 0 | jobId |
| stageId | -1 | stageId |
| name | config | TaskSet_{taskSet.id} |
| runningTasks | Pool所含TaskSetManager的runningTasks和 | TaskSetManager運(yùn)行中task數(shù) |
Pools創(chuàng)建流程
TaskScheduler有個(gè)屬性schedulingMode,值取決于配置項(xiàng)spark.scheduler.mode,默認(rèn)為FIFO。這個(gè)屬性會(huì)導(dǎo)致TaskScheduler使用不同的SchedulableBuilder,即FIFOSchedulableBuilder和FairSchedulableBuilder。
TaskScheduler在初始化的時(shí)候,就會(huì)創(chuàng)建root pool,根調(diào)度池,是所有pool的祖先。
它的屬性取值為:
name: "" (空字符串)
schedulingMode: 同TaskScheduler的schedulingMode屬性
weight: 0
minShare: 0
注意root pool的調(diào)度模式確定了。
接下來會(huì)執(zhí)行schedulableBuilder.buildPools()方法,
如果是FIFOSchedulableBuilder,則什么都不會(huì)發(fā)生。
-
若是FairSchedulableBuilder
- 1 依據(jù)scheduler配置文件(后面會(huì)說),開始創(chuàng)建pool(可以是多個(gè)pool,F(xiàn)IFO,F(xiàn)AIR都有可能,取決于配置文件),并都加入root pool中。
- 2 如果現(xiàn)在root pool中沒有名為"default"的pool(即配置文件中沒有定義一個(gè)叫default的pool),創(chuàng)建default pool,并加入root pool中。
這時(shí)default pool它的屬性取值是固定的:
name: "default" schedulingMode: FIFO weight: 1 minShare: 0
Task加入pool流程
當(dāng)TaskScheduler提交task的時(shí)候,會(huì)先創(chuàng)建TaskSetManager,然后通過schedulableBuilder添加到pool中。
如果是FIFOSchedulableBuilder,則會(huì)直接把TaskSetManager加入root pool隊(duì)列中。
-
若是FairSchedulableBuilder
- 1 從
spark.scheduler.pool配置獲取pool name,沒有定義則用'default'; - 2 從root pool遍歷找到對應(yīng)名稱的pool,把TaskSetManager加入pool的隊(duì)列。如果沒有找到,則創(chuàng)建一個(gè)該名稱的pool,采用與default pool相同的屬性配置,并加入root pool。
- 1 從
調(diào)度池結(jié)構(gòu)
經(jīng)過上面兩部分,最終得到的調(diào)度池結(jié)構(gòu)如下:
spark.scheduler.mode=FIFO

spark.scheduler.mode=FAIR

Fair Scheduler pools配置
Fair Scheduler Pool的劃分依賴于配置文件,默認(rèn)的配置文件為'fairscheduler.xml',也可以通過配置項(xiàng)"spark.scheduler.allocation.file"指定配置文件。
煮個(gè)栗子,文件內(nèi)容如下:
<?xml version="1.0"?>
<allocations>
<pool name="prod">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
這里配置了兩個(gè)pool,prod和test,并且配置了相關(guān)屬性,這兩個(gè)pool都會(huì)添加到root pool中。
調(diào)度算法
以SchedulingAlgorithm為基類,內(nèi)置實(shí)現(xiàn)的調(diào)度算法有兩種FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,其邏輯如下:
-
FIFO: 先進(jìn)先出,優(yōu)先級(jí)比較算法如下,
- 1.比較priority,小的優(yōu)先;
- 2.priority相同則比較StageId,小的優(yōu)先。
-
FAIR:公平調(diào)度,優(yōu)先級(jí)比較算法如下,
- 1.runningTasks小于minShare的優(yōu)先級(jí)比不小于的優(yōu)先級(jí)要高。
- 2.若兩者運(yùn)行的runningTasks都比minShare小,則比較minShare使用率(runningTasks/max(minShare,1)),使用率越低優(yōu)先級(jí)越高。
- 3.若兩者的minShare使用率相同,則比較權(quán)重使用率(runningTasks/weight),使用率越低優(yōu)先級(jí)越高。
- 4.若權(quán)重也相同,則比較name,小的優(yōu)先。
Pool為FIFO模式下的幾種情形
TaskSetManager之間的比較,其實(shí)就是先比較jobId再比較stageId,誰小誰優(yōu)先,意味著就是誰先提交誰優(yōu)先。
Pool之間的比較,不存在!FIFO的pool隊(duì)列中是不會(huì)有pool的。
Pool為FAIR模式下的幾種情形
TaskSetManager之間的比較,因?yàn)閙inShare=0,weight=1,F(xiàn)AIR算法變成了:
- 1 runningTasks小的優(yōu)先
- 2 runningTasks相同則比較name
Pool之間的比較,就是標(biāo)準(zhǔn)的FAIR算法。
當(dāng)root pool為FAIR模式,先取最優(yōu)先的pool,再從pool中,按pool的調(diào)度模式取優(yōu)先的TaskSetManager。
開始使用FAIR mode
啟用FAIR模式:
- 1 準(zhǔn)備好
fairscheduler.xml文件 - 2 啟動(dòng)參數(shù)添加
--conf spark.scheduler.mode=FAIR - 3 運(yùn)行啟動(dòng)命令,如
spark-shell --master yarn --deploy-mode client --conf spark.scheode=FAIR

啟動(dòng)后如果直接運(yùn)行Job會(huì)自動(dòng)提交到default pool,那么如何提交Job到指定pool?
SparkContext.setLocalProperty("spark.scheduler.pool","poolName")
如果每次只運(yùn)行一個(gè)Job,開啟FAIR模式的意義不大,那么如何同時(shí)運(yùn)行多個(gè)Job?
要異步提交Job,需要用到RDD的async action,目前有如下幾個(gè):
countAsync
collectAsync
takeAsync
foreachAsync
foreachPartitionAsync
舉個(gè)例子:
sc.setLocalProperty("spark.scheduler.pool","test")
b.foreachAsync(_=>Thread.sleep(100))
sc.setLocalProperty("spark.scheduler.pool","production")
b.foreachAsync(_=>Thread.sleep(100))
這樣就會(huì)有兩個(gè)任務(wù)在不同的pool同時(shí)運(yùn)行:

FAIR mode應(yīng)用場景
場景1:Spark SQL thrift server
作用:讓離線任務(wù)和交互式查詢?nèi)蝿?wù)分配到不同的pool,給交互式查詢?nèi)蝿?wù)更高的優(yōu)先級(jí),這樣長時(shí)間運(yùn)行的離線任務(wù)就不會(huì)一直占用所有資源,阻塞交互式查詢?nèi)蝿?wù)。
場景2:Streaming job與Batch job同時(shí)運(yùn)行
作用:比如用Streaming接數(shù)據(jù)寫入HDFS,可能產(chǎn)生很多小文件,可以在低優(yōu)先級(jí)的pool定時(shí)運(yùn)行batch job合并小文件。
另外可以參考Spark Summit 2017的分享:Continuous Application with FAIR Scheduler
參考
Spark內(nèi)核設(shè)計(jì)的藝術(shù)
spark任務(wù)調(diào)度FIFO和FAIR的詳解
轉(zhuǎn)載請注明原文地址:
https://liam-blog.ml/2019/11/07/spark-core-scheduler/