版權(quán)聲明:本文為原創(chuàng)文章,未經(jīng)允許不得轉(zhuǎn)載。
復(fù)習(xí)內(nèi)容:
Spark中Task的提交源碼解讀 http://www.itdecent.cn/p/9e75c11a5081
SchedulerBackend是一個trait,它配合TaskSchedulerImpl共同完成Task調(diào)度、執(zhí)行、資源的分配等。它的子類如下所示,不同的子類對應(yīng)的不同Spark不同的資源分配調(diào)度。詳見圖1。

Spark中不同(集群)模式進行資源的分配是通過調(diào)用backend.reviveOffers()方法來給Task分配資源的,其調(diào)度子類與其負責的運行模式如下所示:
LocalBackend
(1)本地單線程運行模式,master形如local
(2)本地多線程運行模式,匹配local[N]和Local[],
(3)匹配local[, M]和local[N, M]
SparkDeploySchedulerBackend
(4)匹配Spark Standalone運行模式
(5)匹配local-cluster運行模式即偽分布模式
YarnClusterSchedulerBackend
(6)"yarn-standalone"或"yarn-cluster"運行模式,
(7)yarn-client運行模式
CoarseMesosSchedulerBackend(粗粒度)和MesosSchedulerBackend(細粒度)
(8)匹配Mesos運行模式,mesos有粗粒度和細粒度兩種調(diào)度模式。
補充:細粒度模式目前僅支持Mesos。
粗粒度調(diào)度模式中,每個Executor在獲得系統(tǒng)資源后,就長期擁有,直到應(yīng)用程序退出才釋放資源。優(yōu)點:減少了資源調(diào)度的時間開銷,缺點:所分配的資源被某個應(yīng)用長期占有,造成資源的浪費。
細粒度調(diào)度模式中,資源是根據(jù)任務(wù)的需求動態(tài)調(diào)度的,任務(wù)完成后就還給Mesos,所以不存在資源浪費的問題,但調(diào)度延遲較大。
1.LocalBackend
調(diào)用遠程的一個引用申請資源,該遠程引用已在start方法中賦值
<code>
override def reviveOffers() {
localEndpoint.send(ReviveOffers)
}
</code>
LocalBackend收到遠程的ReviveOffers消息在receive方法中進行消息的匹配,進行資源的分配,如下所示:
<code>
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
reviveOffers()詳見(1)
case StatusUpdate(taskId, state, serializedData) =>
scheduler.statusUpdate(taskId, state, serializedData)
if (TaskState.isFinished(state)) {
freeCores += scheduler.CPUS_PER_TASK
reviveOffers()
}
case KillTask(taskId, interruptThread) =>
executor.killTask(taskId, interruptThread)
}
</code>
(1)方法 reviveOffers()如下所示:
<code>
def reviveOffers() {
val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
for (task <- scheduler.resourceOffers(offers).flatten) {
freeCores -= scheduler.CPUS_PER_TASK
//在executor上創(chuàng)建Task
executor.launchTask(executorBackend, taskId = task.taskId, attemptNumber = task.attemptNumber,
task.name, task.serializedTask)
}
}
</code>
2.SparkDeploySchedulerBackend
由圖1 SchedulerBackend子類繼承我們知道SparkDeploySchedulerBackend是類CoarseGrainedSchedulerBackend的子類,屬于粗粒度調(diào)度模式,類CoarseGrainedSchedulerBackend的子類的調(diào)度都是通過它的reviveOffers方法來完成的,因為都屬于粗粒度調(diào)度模式。
遠程引用申請資源
<code>
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
</code>
同樣在receive方法匹配ReviveOffers 消息,調(diào)用方法 makeOffers()<code>
override def receive: PartialFunction[Any, Unit] = {
case ReviveOffers =>
makeOffers()詳見(1)
</code>
(1)makeOffers()方法如下所示
<code>
private def makeOffers() {
//過濾掉正在kill的executor
val activeExecutors = executorDataMap.filterKeys(!executorsPendingToRemove.contains(_))
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
//創(chuàng)建tasks
launchTasks(scheduler.resourceOffers(workOffers))詳見(2)(3)
}</code>
(2)scheduler.resourceOffers(workOffers)方法如下所示,TaskScheduleImpl調(diào)用提供slaves上的資源。我們通過按照激活的task set的優(yōu)先級。我們以循環(huán)的方式將tasks均勻分配到節(jié)點上
<code>
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
var newExecAvail = false
//對于每一個Executor進行如下操作,主要進行slave的hostname與executor的映射
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
activeExecutorIds += o.executorId
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSetString
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSetString) += o.host
}
}
//隨機shuffle操作避免將tasks分配到同樣的一批workers上
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBufferTaskDescription)
val availableCpus = shuffledOffers.map(o => o.cores).toArray
//根據(jù)調(diào)度策略獲取TaskSetManage的調(diào)度順序
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
//根據(jù)調(diào)度策略依次得到TaskSet,
//在節(jié)點上嘗試5種Locality,PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
//以最快的速度執(zhí)行task
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
</code>
(3)launchTasks方法如下所示
<code>
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
//序列化task
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//類CoarseGrainedExecutorBackend在Executor上反序列化task并完成task的創(chuàng)建
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
</code>
3.MesosSchedulerBackend
Mesos細粒度調(diào)度模式時通過Mesos中的類MesosSchedulerDriver來完成調(diào)度,有興趣的讀者可以看一下。這里就不介紹了