之前呢,我們詳細(xì)地分析了DAGScheduler的執(zhí)行過程,我們知道,RDD形成的DAG經(jīng)過DAGScheduler,依據(jù)shuffle將DAG劃分為若干個stage,再由taskScheduler提交task到executor中執(zhí)行,那么執(zhí)行task的過程,就需要調(diào)度器來參與了。
Spark調(diào)度器主要有兩種模式,也是大家耳熟能詳?shù)?strong>FIFO和FAIR模式。默認(rèn)情況下,Spark是FIFO(先入先出)模式,即誰先提交誰先執(zhí)行。而FAIR(公平調(diào)度)模式會在調(diào)度池中為任務(wù)進行分組,可以有不同的權(quán)重,根據(jù)權(quán)重來決定執(zhí)行順序。
那么源碼中是怎么實現(xiàn)的呢?
首先,當(dāng)Stage劃分好,會調(diào)用TaskSchedulerImpl.submitTasks()方法,以TaskSet的形式提交給TaskScheduler,并創(chuàng)建一個TaskSetManger對象添加進調(diào)度池。
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
//....
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
//.....
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
SchedulerBulider通過TaskSchedulerImpl.initialize()進行了實例化,并調(diào)用了SchedulerBulider.buildPools()方法。具體怎么個build,就要看用戶選擇的schedulingMode了。
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
s"$schedulingMode")
}
}
schedulableBuilder.buildPools()
}
然后我們來看一下兩個調(diào)度器的buildPools()方法。
override def buildPools() {
// nothing
}
FIFO什么也沒干~~
override def buildPools() {
var fileData: Option[(InputStream, String)] = None
try {
fileData = schedulerAllocFile.map { f =>
val fis = new FileInputStream(f)
logInfo(s"Creating Fair Scheduler pools from $f")
Some((fis, f))
}.getOrElse {
val is = Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
if (is != null) {
logInfo(s"Creating Fair Scheduler pools from default file: $DEFAULT_SCHEDULER_FILE")
Some((is, DEFAULT_SCHEDULER_FILE))
} else {
logWarning("Fair Scheduler configuration file not found so jobs will be scheduled in " +
s"FIFO order. To use fair scheduling, configure pools in $DEFAULT_SCHEDULER_FILE or " +
s"set $SCHEDULER_ALLOCATION_FILE_PROPERTY to a file that contains the configuration.")
None
}
}
fileData.foreach { case (is, fileName) => buildFairSchedulerPool(is, fileName) }
} catch {
case NonFatal(t) =>
val defaultMessage = "Error while building the fair scheduler pools"
val message = fileData.map { case (is, fileName) => s"$defaultMessage from $fileName" }
.getOrElse(defaultMessage)
logError(message, t)
throw t
} finally {
fileData.foreach { case (is, fileName) => is.close() }
}
// finally create "default" pool
buildDefaultPool()
}