上一篇文章我們談到,DAGScheduler將Job劃分成由Stage組成的DAG后,就根據(jù)Stage的具體類型來生成ShuffleMapTask和ResultTask,然后使用TaskSet對其進(jìn)行封裝,最后調(diào)用TaskScheduler的submitTasks方法提交具體的TaskSet,而實(shí)際上是調(diào)用的TaskSchedulerImpl的submitTasks方法,下面我們就來分析具體Tasks提交的過程。
直接進(jìn)入TaskSchedulerImpl的submitTasks方法:
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// 使用synchronized來確保Task的同步提交
this.synchronized {
// 首先創(chuàng)建一個TaskSetManager,主要負(fù)責(zé)調(diào)度TaskSet中的Tasks
// 默認(rèn)的最大失敗重試次數(shù)是4次,可以通過spark.task.maxFailures進(jìn)行配置
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
// 將TaskSetManager及TaskSet的屬性信息添加到schedulableBuilder中
// SchedulableBuilder會確定TaskSetManager的調(diào)度順序是FIFO還是FAIR,默認(rèn)是FIFO
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
進(jìn)入backend的reviveOffers()方法:
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
可以看到是給DriverEndpoint發(fā)送了一條ReviveOffers消息(我們在前面分析DriverEndpoint注冊的時候就給自己發(fā)送了一條ReviveOffers消息),DriverEndpoint接收到該消息后的處理如下:
case ReviveOffers =>
makeOffers()
接下來看makeOffers():
private def makeOffers() {
// Filter out executors under killing
// 過濾出Alive的Executors
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 構(gòu)建由WorkerOffer組成的集合,WorkerOffer就代表Executor上可用的計算資源
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
// 將Tasks提交到Executors上
launchTasks(scheduler.resourceOffers(workOffers))
}
在makeOffers()方法中首先準(zhǔn)備好可以用于計算的workOffers(代表所有可用的ExecutorBackend中可以使用的cores的信息)。
在launchTasks之前,我們重點(diǎn)看一下scheduler.resourceOffers(workOffers)都做了什么工作:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 標(biāo)記slave為alive并記錄hostname的信息
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
// 跨機(jī)架的情況,我們這里不做考慮
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 將offers打亂,為的就是負(fù)載均衡
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
// 這里我們清楚的看見每個Executor上的Tasks的個數(shù)取決于該Executor上可用的cores的個數(shù)
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// 每個executor上可用的cores的個數(shù)組成的數(shù)組
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
可以看到TaskSchedulerImpl的resourceOffers方法的主要作用就是為每個Task具體分配計算資源,輸入的就是可用的資源,輸出的是由TaskDescription組成的二維數(shù)組(Seq[Seq[TaskDescription]]),TaskDescription中保存了executorId,即每個Task具體運(yùn)行在哪個ExecutorBackend上,下面我們就具體分析該方法的執(zhí)行過程:
對可用的資源進(jìn)行標(biāo)記并記錄hostname,同時判斷是否有新的executor加入
即resourceOffers方法的如下部分:
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
這部分不是我們考慮的重點(diǎn),所以大家只要知道他的作用就可以了。
將所有可用的計算資源隨機(jī)打散
對應(yīng)的源碼:
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
通過Random.shuffle的方法將所有的計算資源重新進(jìn)行“洗牌”,以追求最大化的負(fù)載均衡
根據(jù)每個ExecutorBackend的cores的個數(shù)聲明類型為TaskDescription的ArrayBuffer數(shù)組
對應(yīng)的源碼:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
此處只是聲明,并沒有加入TaskDescription
獲得根據(jù)具體的調(diào)度策略排序后的TaskSets
對應(yīng)的源碼:
val sortedTaskSets = rootPool.getSortedTaskSetQueue
調(diào)度策略即SchedulableBuilder,具體的實(shí)現(xiàn)分為FIFOSchedulableBuilder和FairSchedulableBuilder,而默認(rèn)使用的就是FIFO的調(diào)度策略。
如果有新的Executor加入,此時會調(diào)用TaskSet的executorAdded方法來獲取最新的完整的可用計算資源
對應(yīng)的源碼:
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
確定最高優(yōu)先級本地性
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
這里的LocalityLevel從高到低依次為:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,其中NO_PREF是指機(jī)器的本地性,因?yàn)橐慌_機(jī)器上有可能有很多node。
下面我們來看這個resourceOfferSingleTaskSet方法:
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
首先循環(huán)遍歷判斷每個Executor上可用的cores的個數(shù)是否滿足每個Task所需要的CPU的個數(shù),默認(rèn)CPUS_PER_TASK的個數(shù)為1;然后通過調(diào)用TaskSetManager的resourceOffer方法最終確定每個Task具體運(yùn)行在哪個ExecutorBackend的具體的Locality Level:
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
// 將task序列化
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
// 判斷序列化后的大小是否超過了TaskSetManager的限制,默認(rèn)是100k
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")
// 向DAGScheduler匯報Task開始
sched.dagScheduler.taskStarted(task, info)
// 返回TaskDescription
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}
該方法傳入的maxLocality就是上面TaskSchedulerImpl的resourceOffers方法中那個for循環(huán)中傳入的maxLocality,該循環(huán)是按照上面提到的Locality Level的級別由高到低進(jìn)行的,所以最優(yōu)的計算本地性就是PROCESS_LOCAL,最后返回Some(TaskDescription)給resourceOfferSingleTaskSet方法中,然后的一系列操作是更新數(shù)據(jù)結(jié)構(gòu)以及從可用的計算資源中減掉剛才Task使用的cores的個數(shù)(默認(rèn)每個Task使用一個core),最后將分配好的Task(Seq[ArrayBuffer[TaskDescription]])返回給TaskSchedulerImpl的resourceOffers方法,而該方法又將最后的結(jié)果(Seq[Seq[TaskDescription]])返回給CoarseGrainedSchedulerBackend的makeOffers方法,最后執(zhí)行l(wèi)aunchTask:
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
這里需要補(bǔ)充一點(diǎn):我們之前已經(jīng)分析過DAGScheduler是從數(shù)據(jù)的層面,也就是RDD的層面考慮的preferredLocation,即DAGScheduler部分已經(jīng)確定了Task要被發(fā)到哪個Executor上運(yùn)行;而TaskScheduler是從具體計算Task的角度考慮計算的本地性,也就是說具體的計算是發(fā)生在內(nèi)存中還是發(fā)生在本地磁盤等等(PROCESS_LOCAL、NODE_LOCAL...),由此也印證了DAGScheduler負(fù)責(zé)高層的調(diào)度任務(wù),而TaskScheduler負(fù)責(zé)底層的調(diào)度任務(wù)。
接下來我們就進(jìn)入launchTasks方法:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
// 注意此處是對TaskDescription進(jìn)行序列化操作
val serializedTask = ser.serialize(task)
// 判斷序列化的大小是否超過限制
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
// 獲得該TaskDescription要發(fā)送到的那個Executor的信息
val executorData = executorDataMap(task.executorId)
// 減掉使用的計算資源,即cores
executorData.freeCores -= scheduler.CPUS_PER_TASK
// 將Task發(fā)送到具體的Executor上
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
這里akkaFrameSize的默認(rèn)大小是128M(Spark 1.6.3版本,可以通過spark.akka.frameSize進(jìn)行配置),而AkkaUtils.reservedSizeBytes的大小是200k,也就是說序列化后的大小不能超過128MB-200k,最后如果小于該限制就會將task發(fā)送到具體的ExecutorBackend上。
ExecutorBackend(Standalone模式下就是CoarseGrainedExecutorBackend)接收到該消息后就會調(diào)用具體的executor的launchTask方法去執(zhí)行task:
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
由此可見真正發(fā)送tasks的不是TaskSetManager,而是CoarseGrainedSchedulerBackend,TaskSetManager只是負(fù)責(zé)監(jiān)控task的運(yùn)行。
至此Tasks的提交過程執(zhí)行完成,下一篇文章我們將繼續(xù)分析Tasks的運(yùn)行過程。
本文參照的是Spark 1.6.3版本的源碼,同時給出Spark 2.1.0版本的連接:
本文為原創(chuàng),歡迎轉(zhuǎn)載,轉(zhuǎn)載請注明出處、作者,謝謝!