一.前言
Spark的作業(yè)和任務(wù)調(diào)度系統(tǒng)是其核心。Spark的作業(yè)調(diào)度主要是基于RDD的一系列操作構(gòu)成一個(gè)作業(yè),然后在Executor上執(zhí)行,這些操作算子主要分為轉(zhuǎn)換和行動(dòng)算子,對(duì)于轉(zhuǎn)換算子的計(jì)算是lazy級(jí)別的,也就是延遲執(zhí)行,只有出現(xiàn)了行動(dòng)算子才觸發(fā)作業(yè)的提交。在Spark調(diào)度中,最重要的是DAGScheduler和TaskSechduler兩個(gè)調(diào)度器,其中DAGScheduler負(fù)責(zé)任務(wù)的邏輯調(diào)度,將作業(yè)拆分成不同Stage,具有依賴關(guān)系的任務(wù)集,而TaskSechduler則負(fù)責(zé)具體任務(wù)的調(diào)度執(zhí)行。
下面介紹一些相關(guān)術(shù)語(yǔ)
- 1.作業(yè)job:RDD中由action算子觸發(fā)所生成的一個(gè)或者多個(gè)Stage
- 2.調(diào)度階段Stage:每個(gè)作業(yè)會(huì)因?yàn)镽DD之間的依賴關(guān)系拆分成多組任務(wù)集合,稱之為Stage,也叫做任務(wù)集TaskSet,Stage是由DAGScheduler來(lái)劃分的,Stage分為ShuffleMapStage和ResultStage兩種
-
3.DAGSchuduler:面向Stage的任務(wù)調(diào)度器,負(fù)責(zé)Spark提交的作業(yè),根據(jù)RDD的依賴關(guān)系劃分Stage,根據(jù)Stage中的最后一個(gè)RDD中的partition來(lái)確定task的數(shù)量,確定最優(yōu)的task的location,封裝成
taskSet提交給taskScheduler - 4.TaskScheduler:接受DAGScheduler提交過(guò)來(lái)的調(diào)度階段,然后把任務(wù)發(fā)到woker節(jié)點(diǎn)上的Executor來(lái)運(yùn)行任務(wù)
SparkContext中有三個(gè)重要的組件,DAGScheduler、TaskSechduler、和SchedulerBackend
1.DAGScheduler
DAGScheduler主要負(fù)責(zé)將用戶的應(yīng)用的DAG劃分為不同的Stage,其中每個(gè)Stage由可以并發(fā)執(zhí)行的一組Task構(gòu)成, 這些Task的執(zhí)行邏輯完全相同,只是作用于不同的數(shù)據(jù)。
2.TaskSechduler
負(fù)責(zé)具體任務(wù)的調(diào)度執(zhí)行,從DAGScheduler接收不同Stage的任務(wù),按照調(diào)度算法,分配給應(yīng)用程序的資源Executor上執(zhí)行相關(guān)任務(wù),并為執(zhí)行特別慢的任務(wù)啟動(dòng)備份任務(wù)。
3.SchedulerBackend
分配當(dāng)前可用的資源, 具體就是向當(dāng)前等待分配計(jì)算資源的Task分配計(jì)算資源(即Executor) , 并且在分配的Executor上啟動(dòng)Task, 完成計(jì)算的調(diào)度過(guò)程。 它使用reviveOffers完成上述的任務(wù)調(diào)度。
接下來(lái)我們開(kāi)始根據(jù)源碼進(jìn)行作業(yè)和任務(wù)調(diào)度的流程分析。
二.作業(yè)和任務(wù)調(diào)度流程分析
1.劃分Stage階段
首先我們從一個(gè)action算子開(kāi)始明確整個(gè)作業(yè)的流程。
例如在RDD類中的foreach中會(huì)調(diào)用SparkContext的runJob方法,這里層層找下去會(huì)發(fā)現(xiàn)最終調(diào)用了DAGScheduler的runJob方法。
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
DAGScheduler的runJob方法中,會(huì)調(diào)用submitJob,這里會(huì)發(fā)生阻塞,直到返回作業(yè)完成或者失敗的結(jié)果。
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
throw exception
}
}
然后在submitJob方法里,創(chuàng)建一個(gè)jobWaiter對(duì)象,封裝了job相關(guān)信息,并借助內(nèi)部消息處理把這個(gè)對(duì)象發(fā)送給DAGScheduler的內(nèi)嵌類DAGSchedulerEventProcessLoop進(jìn)行處理。
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): JobWaiter[U] = {
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length
partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
throw new IllegalArgumentException(
"Attempting to access a non-existent partition: " + p + ". " +
"Total number of partitions: " + maxPartitions)
}
val jobId = nextJobId.getAndIncrement()
if (partitions.size == 0) {
// Return immediately if the job is running 0 tasks
return new JobWaiter[U](this, jobId, 0, resultHandler)
}
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
//提交任務(wù)。eventProcessLoop 是 DAGSchedulerEventProcessLoop 對(duì)象
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
waiter
}
最后在DAGSchedulerEventProcessLoop消息接受,方法onReceive中,會(huì)調(diào)用doOnReceive,,會(huì)進(jìn)行模式匹配,匹配到接受的jobSubmitted樣例類,繼續(xù)調(diào)用DAGScheduler的handleJobSubmitted方法來(lái)提交作業(yè),在該方法中會(huì)進(jìn)行stage切分。
handleJobSubmitted的源碼如下
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// New stage creation may throw an exception if, for example, jobs are run on a
// HadoopRDD whose underlying HDFS files have been deleted.
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
略.....
這里通過(guò)調(diào)用createResultStage方法獲取最后一個(gè)stage。
private def createResultStage(
rdd: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
jobId: Int,
callSite: CallSite): ResultStage = {
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stage
}
上面通過(guò)createResultStage方法調(diào)用getOrCreateParentStages方法,主要流程是根據(jù)finalRdd找出其依賴的祖先rdd是否存在shuffle操作,如果沒(méi)有shuffle操作,則本次作業(yè)只有一個(gè)resultStage,該stage不存在父stage,如果存在shuffle操作,則本次作業(yè)存在一個(gè)resultStage和至少一個(gè)shuffleMapStage,具體是根據(jù)兩個(gè)set集合和一個(gè)棧完成的Stage判斷,而且最多只會(huì)返回當(dāng)前Stage的直接父Stage,并不會(huì)做過(guò)多的向前回溯。生成的resultStage叫做finalStage。
核心的getShuffleDependencies代碼如下
private[scheduler] def getShuffleDependencies(
rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
val parents = new HashSet[ShuffleDependency[_, _, _]]
val visited = new HashSet[RDD[_]]
val waitingForVisit = new ArrayStack[RDD[_]]
waitingForVisit.push(rdd)
while (waitingForVisit.nonEmpty) {
val toVisit = waitingForVisit.pop()
if (!visited(toVisit)) {
visited += toVisit
toVisit.dependencies.foreach {
case shuffleDep: ShuffleDependency[_, _, _] =>
parents += shuffleDep
case dependency =>
waitingForVisit.push(dependency.rdd)
}
}
}
parents
}
上述的過(guò)程總結(jié)如下:
DAGScheduler會(huì)從最后一個(gè)RDD出發(fā),優(yōu)先使用廣度優(yōu)先遍歷整個(gè)依賴樹(shù),從而劃分Stage,Stage的劃分依據(jù)是以是否為寬依賴shuffleDependency進(jìn)行的,即當(dāng)某個(gè)RDD的操作為shuffle時(shí),以shuffle操作為界限劃分成兩個(gè)Stage。
當(dāng)所有Stage劃分完畢,這些Stage直接就形成了一個(gè)依賴關(guān)系,這些父Stage都會(huì)被封裝到List集合中,被稱為parents,通過(guò)該屬性可以獲取當(dāng)前Satge的所有父Stage,Stage是劃分Spark作業(yè)執(zhí)行的重要組成部分。
到這里Stage劃分階段就完成了。
2.提交Stage階段
通過(guò)上一階段生成的finalStage,生成一個(gè)作業(yè)實(shí)例,在該作業(yè)實(shí)例執(zhí)行過(guò)程中通過(guò)監(jiān)聽(tīng)總線獲取作業(yè),Stage執(zhí)行情況。
handleJobSubmitted的源碼如下
private[scheduler] def handleJobSubmitted(jobId: Int,
finalRDD: RDD[_],
func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
....
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
submitStage(finalStage)
}
之后在submitStage方法中調(diào)用getMissParentStage方法獲取finalStage中的父調(diào)度階段,如果不存在父調(diào)度階段,則使用submitMissingTasks方法提交執(zhí)行,如果存在父Stage,則把該調(diào)度階段存放到waitingStages列表中。
submitStage的源碼如下
//遞歸尋找stage
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
val missing: List[Stage] = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
//最終會(huì)執(zhí)行 submitMissingTasks 方法
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
waitingStages += stage
}
}
} else {
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
如果waitingStages列表不為空,則遞歸調(diào)用submitStage方法,把存在父Stage的Stage放入到waitingStages列表中,不存在父Stage的作為作業(yè)的運(yùn)行入口,這就是一個(gè)遞歸的過(guò)程,遞歸的出口就是找到finalStage在stage數(shù)中的firstStage。
getMissParentStage的源碼如下
private def getMissingParentStages(stage: Stage): List[Stage] = {
val missing = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
val waitingForVisit = new ArrayStack[RDD[_]]
def visit(rdd: RDD[_]) {
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
missing += mapStage
}
case narrowDep: NarrowDependency[_] =>
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
waitingForVisit.push(stage.rdd)
while (waitingForVisit.nonEmpty) {
visit(waitingForVisit.pop())
}
missing.toList
}
可以看到上述的這種算法,實(shí)現(xiàn)了一種大遞歸,小循環(huán)的調(diào)度模式,避免了回溯到firstStage時(shí),因?yàn)檫f歸過(guò)多而棧溢出的問(wèn)題。最后執(zhí)行的submitMissingTasks方法傳入的Stage都將會(huì)是Stage依賴樹(shù)中的源Stage。
3.提交任務(wù)階段
當(dāng)Stage切分階段完成之后,在DAGScheduler的submitMissingTasks方法中,會(huì)根據(jù)Stage中最后一個(gè)RDD的partition個(gè)數(shù)拆分對(duì)應(yīng)個(gè)數(shù)的任務(wù)。
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
....
}
/** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */
override def findMissingPartitions(): Seq[Int] = {
mapOutputTrackerMaster
.findMissingPartitions(shuffleDep.shuffleId)
.getOrElse(0 until numPartitions)
}
之后submitMissingTasks方法會(huì)對(duì)每個(gè)task計(jì)算出它的最佳位置,通過(guò)調(diào)用getPreferredLocs方法完成
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
getPreferredLocsInternal方法中會(huì)首先判斷partition是否被緩存了,如果被緩存了就獲取其緩存的位置,如果沒(méi)有就返回該RDD的最佳位置列表,通過(guò)調(diào)用不同RDD的getPreferredLocations實(shí)現(xiàn)來(lái)完成。如果有些RDD沒(méi)有實(shí)現(xiàn)該方法,那么rddPrefs為空,則會(huì)判斷該RDD的依賴是否是窄依賴,然后獲取該RDD的第一個(gè)窄依賴對(duì)應(yīng)的RDD,計(jì)算該RDD去獲取最佳位置列表。
getPreferredLocsInternal的源碼如下
private def getPreferredLocsInternal(
rdd: RDD[_],
partition: Int,
visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = {
// If the partition has already been visited, no need to re-visit.
// This avoids exponential path exploration. SPARK-695
if (!visited.add((rdd, partition))) {
// Nil has already been returned for previously visited partitions.
return Nil
}
// If the partition is cached, return the cache locations
val cached = getCacheLocs(rdd)(partition)
if (cached.nonEmpty) {
return cached
}
// If the RDD has some placement preferences (as is the case for input RDDs), get those
val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
if (rddPrefs.nonEmpty) {
return rddPrefs.map(TaskLocation(_))
}
// If the RDD has narrow dependencies, pick the first partition of the first narrow dependency
// that has any placement preferences. Ideally we would choose based on transfer sizes,
// but this will do for now.
rdd.dependencies.foreach {
case n: NarrowDependency[_] =>
for (inPart <- n.getParents(partition)) {
val locs = getPreferredLocsInternal(n.rdd, inPart, visited)
if (locs != Nil) {
return locs
}
}
case _ =>
}
Nil
}
之后submitMissingTasks方法會(huì)進(jìn)行任務(wù)集的獲取,根據(jù)Stage的不同劃分出不同的Task,對(duì)于ResultStage生成resultTask,對(duì)于shuffleMapStage生成shuffleMapTask,最后這些任務(wù)組成一個(gè)任務(wù)集提交到TaskScheduler中進(jìn)行處理。
一個(gè)任務(wù)集taskSet包含該Stage中的所有任務(wù),這些任務(wù)的處理邏輯完全一樣,只不過(guò)是對(duì)應(yīng)的處理的數(shù)據(jù)不一樣,而且這些數(shù)據(jù)對(duì)應(yīng)的是其數(shù)據(jù)分片partition。需要注意的是,這里的任務(wù)都會(huì)被序列化,并且以二進(jìn)制的形式廣播出去。
val tasks: Seq[Task[_]] = try {
val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
stage match {
case stage: ShuffleMapStage =>
stage.pendingPartitions.clear()
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = partitions(id)
stage.pendingPartitions += id
new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
Option(sc.applicationId), sc.applicationAttemptId)
}
case stage: ResultStage =>
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptNumber,
taskBinary, part, locs, id, properties, serializedTaskMetrics,
Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${Utils.exceptionString(e)}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +
s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
//以taskSet形式提交任務(wù)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
上面DAGScheduler將任務(wù)集交給TaskScheduler去處理提交,當(dāng)TaskScheduler收到發(fā)來(lái)的任務(wù)集時(shí),在submitTask方法中構(gòu)建一個(gè)TaskSetManager實(shí)例,用于管理這個(gè)任務(wù)集的生命周期,之后將該任務(wù)集管理器加入到系統(tǒng)調(diào)度池中,由系統(tǒng)統(tǒng)一調(diào)配,該調(diào)度器屬于應(yīng)用級(jí)別,支持FIFO和FAIR兩種調(diào)度模式(schedulableBuilder.addTaskSetManager)
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
之后通過(guò)調(diào)度器后臺(tái)進(jìn)程schedulerBackend的reviveOffers分配資源并運(yùn)行,這里的schedulerBackend是CoarseGrainedSchedulerBackend粗粒度的schedulerBackend,該方法會(huì)將driverEndpoint端點(diǎn)發(fā)送消息。
override def reviveOffers() {
//給Driver 提交task,在當(dāng)前類中的DriverEndpoint中 有receive方法來(lái)接收數(shù)據(jù)
driverEndpoint.send(ReviveOffers)
}
上面的DriverEndpoint是CoarseGrainedSchedulerBackend中的一個(gè)內(nèi)部類,這里并沒(méi)有通過(guò)內(nèi)部類的方式直接去調(diào)用而是遵循了消息傳遞調(diào)度的規(guī)范,通過(guò)通過(guò)rpcEnv來(lái)完成的,也就是說(shuō),通過(guò)dipatcher的生產(chǎn)者消費(fèi)者模式來(lái)處理消息,保證消息處理的統(tǒng)一性。
最終會(huì)調(diào)用DriverEndpoint的receive方法,匹配ReviveOffers,然后調(diào)用makeoffers方法
// Make fake resource offers on all executors
private def makeOffers() {
// Make sure no executor is killed while some task is launching on it
val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toIndexedSeq
scheduler.resourceOffers(workOffers)
}
if (!taskDescs.isEmpty) {
//去Executor中啟動(dòng)Task
launchTasks(taskDescs)
}
}
DriverEndpoint中的makeoffers方法會(huì)先獲取集群中可以的使用的Executor,然后發(fā)送到TaskSehedulerImpl中進(jìn)行對(duì)任務(wù)集的任務(wù)分配資源,resourceOffer方法在資源分配過(guò)程中會(huì)根據(jù)調(diào)度策略對(duì)TaskSetManager進(jìn)行排序,然后依次對(duì)這些TaskSetManager按照就近原則分配資源,順序?yàn)镻ROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY。
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
....
val shuffledOffers = shuffleOffers(filteredOffers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores / CPUS_PER_TASK))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
for (taskSet <- sortedTaskSets) {
var launchedAnyTask = false
var launchedTaskAtCurrentMaxLocality = false
for (currentMaxLocality <- taskSet.myLocalityLevels) {
do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
launchedAnyTask |= launchedTaskAtCurrentMaxLocality
} while (launchedTaskAtCurrentMaxLocality)
}
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
之后分配好的資源任務(wù)最后提交到launchTask方法中。該方法會(huì)把任務(wù)一個(gè)一個(gè)的發(fā)送到woker節(jié)點(diǎn)上的CoarseGrainedExecutorBackend上,然后通過(guò)其內(nèi)部的Executor來(lái)執(zhí)行任務(wù)。
4.任務(wù)執(zhí)行階段
當(dāng)CoarseGrainedExecutorBackend接受到LaunchTask消息時(shí),會(huì)調(diào)用Executor的launchTask方法來(lái)處理,初始化一個(gè)TaskRunner來(lái)封裝任務(wù),它用于管理任務(wù)運(yùn)行時(shí)的細(xì)節(jié),再把TaskRunner對(duì)象放入到ThreadPool線程池中去執(zhí)行。
//啟動(dòng)Task
case LaunchTask(data) =>
if (executor == null) {
exitExecutor(1, "Received LaunchTask command but executor was null")
} else {
val taskDesc = TaskDescription.decode(data.value)
logInfo("Got assigned task " + taskDesc.taskId)
//Executor 啟動(dòng)Task
executor.launchTask(this, taskDesc)
}
在TaskRunner的run方法中首先會(huì)對(duì)發(fā)送過(guò)來(lái)的Task本身和它所依賴的Jar等文件反序列化,然后對(duì)反序列化的任務(wù)調(diào)用Task中的runTask方法,由于task本身是一個(gè)抽象類,具體的runTask方法是由它的兩個(gè)子類ShuffleMapTask和ReduceTask來(lái)實(shí)現(xiàn)。
對(duì)應(yīng)ShuffleMapTask來(lái)說(shuō):
runTask方法中首先會(huì)反序列化廣播變量中的RDD及其依賴關(guān)系,之后通過(guò)shuffleManager(在2.3.1版本中只有sortShuffleManager)根據(jù)從依賴關(guān)系中獲取ShuffleHandle,調(diào)用getWriter方法,創(chuàng)建對(duì)應(yīng)的shuffleWriter,
getWriter中的會(huì)對(duì)handle的類型做判斷,采用哪種shuffle寫,在Spark中有三種shuffle寫B(tài)ypassMergeSortShuffleWriter、UnsafeShuffleWriter、SortShuffleWriter。
這三者和ShuffleHandle的對(duì)應(yīng)關(guān)系如下:
- UnsafeShuffleWriter:SerializedShuffleHandle
- bypassMergeSortShuffleWriter:BypassMergeSortShuffleHandle,
- SortShuffleWriter:BaseShuffleHandle
ShuffleMapTask的runTask方法
override def runTask(context: TaskContext): MapStatus = {
// Deserialize the RDD using the broadcast variable.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
//反序列化回來(lái)當(dāng)前RDD的依賴關(guān)系
val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
var writer: ShuffleWriter[Any, Any] = null
try {
val manager = SparkEnv.get.shuffleManager
/**
* 從依賴關(guān)系中獲取ShuffleHandle ,調(diào)用getWriter 方法 創(chuàng)建相對(duì)應(yīng)的 ShuffleWriter
*/
writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
//寫磁盤
writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
} catch {
case e: Exception =>
try {
if (writer != null) {
writer.stop(success = false)
}
} catch {
case e: Exception =>
log.debug("Could not stop writer", e)
}
throw e
}
}
之后就是調(diào)用相關(guān)shuffleWriter中的write,通過(guò)該Stage中的最后一個(gè)rdd的迭代器,從數(shù)據(jù)源讀數(shù)據(jù),迭代器的調(diào)用就會(huì)形成一個(gè)pipeline,計(jì)算結(jié)果會(huì)保存在本地系統(tǒng)中的blockManager中,最終返回給DAGScheduler是一個(gè)MapStatus對(duì)象。該對(duì)象中管理了ShuffleMapTask的運(yùn)算結(jié)果存儲(chǔ)到BlockManager的相關(guān)存儲(chǔ)信息,而不是計(jì)算結(jié)果本身,這些儲(chǔ)存信息將會(huì)成為下一階段任務(wù)需要獲取輸入數(shù)據(jù)時(shí)的依據(jù)。
以BypassMergeSortShuffleWriter為例,可以看到這里調(diào)用了rdd的迭代器,構(gòu)成了pipeline,向前抓取數(shù)據(jù)。
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);
while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
//partitioner.getPartition(key) 得到當(dāng)前這條數(shù)據(jù)寫入的分區(qū)號(hào)
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}
for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}
File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}
bypassMergeSortShuffleWriter中的write方法首先會(huì)獲取序列化器,通過(guò)blockManager獲取diskWriter,即磁盤文件的IO流(磁盤對(duì)象寫),其中緩沖區(qū)大小為32k,之后將每一條記錄partitioner.getPartition(key),判斷其分區(qū)號(hào),選擇對(duì)應(yīng)的分區(qū)的IO流寫入到文件中去,然后這些文件最終會(huì)形成一個(gè)大的文件,文件都是根據(jù)partition排好序的。
這個(gè)過(guò)程不需要開(kāi)辟很大內(nèi)存,也不需要頻繁序列化反序列化,也不需要比較這種比較損耗資源的操作。
對(duì)于ResultTask來(lái)說(shuō):
它的最終返回結(jié)果就是func函數(shù)的計(jì)算結(jié)果。
override def runTask(context: TaskContext): U = {
// Deserialize the RDD and the func using the broadcast variables.
val threadMXBean = ManagementFactory.getThreadMXBean
val deserializeStartTime = System.currentTimeMillis()
val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime
} else 0L
val ser = SparkEnv.get.closureSerializer.newInstance()
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
_executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
_executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
} else 0L
func(context, rdd.iterator(partition, context))
}
5.獲取執(zhí)行結(jié)果
對(duì)于Executor的計(jì)算結(jié)果,會(huì)根據(jù)結(jié)果有著不同的策略:
1.對(duì)于生成的結(jié)果大小大于1G,則結(jié)果直接丟棄,該配置項(xiàng)可以通過(guò)maxResultSize來(lái)配置
2.對(duì)于生成結(jié)果大小在min(1m,128MB)-1G之間的數(shù)據(jù),會(huì)把該結(jié)果以taskId為編號(hào)存入到blockManager中,然后把該編號(hào)通過(guò)netty發(fā)送給driverEndpoint端點(diǎn),該閾值是netty框架傳輸?shù)淖畲笾?28MB和配置的最大maxDirectResultSize的值(默認(rèn)1MB)的最小值。
3.對(duì)于生成結(jié)果在0-1MB的數(shù)據(jù),通過(guò)netty直接發(fā)送到driverEndpoint端點(diǎn).
執(zhí)行完任務(wù)的run方法對(duì)于Executor的計(jì)算結(jié)果的處理如下
// directSend = sending directly back to the driver
val serializedResult: ByteBuffer = {
if (maxResultSize > 0 && resultSize > maxResultSize) {
logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
s"dropping it.")
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize > maxDirectResultSize) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
blockId,
new ChunkedByteBuffer(serializedDirectResult.duplicate()),
StorageLevel.MEMORY_AND_DISK_SER)
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
} else {
logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
serializedDirectResult
}
}
....中間部分省略
setTaskFinishedAndClearInterruptStatus()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
任務(wù)執(zhí)行完畢后,TaskRunner會(huì)將任務(wù)的執(zhí)行結(jié)果發(fā)送給driverEndpoint端點(diǎn),該端點(diǎn)會(huì)轉(zhuǎn)發(fā)給TaskScheduler的statusUpdate方法進(jìn)行處理,在該方法中對(duì)于不同的任務(wù)狀態(tài)有著不同的處理。
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {
var failedExecutor: Option[String] = None
var reason: Option[ExecutorLossReason] = None
synchronized {
try {
taskIdToTaskSetManager.get(tid) match {
case Some(taskSet) =>
if (state == TaskState.LOST) {
// TaskState.LOST is only used by the deprecated Mesos fine-grained scheduling mode,
// where each executor corresponds to a single task, so mark the executor as failed.
val execId = taskIdToExecutorId.getOrElse(tid, throw new IllegalStateException(
"taskIdToTaskSetManager.contains(tid) <=> taskIdToExecutorId.contains(tid)"))
if (executorIdToRunningTaskIds.contains(execId)) {
reason = Some(
SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))
removeExecutor(execId, reason.get)
failedExecutor = Some(execId)
}
}
if (TaskState.isFinished(state)) {
cleanupTaskState(tid)
taskSet.removeRunningTask(tid)
if (state == TaskState.FINISHED) {
taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)
} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {
taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
}
}
case None =>
logError(
("Ignoring update with state %s for TID %s because its task set is gone (this is " +
"likely the result of receiving duplicate task finished status updates) or its " +
"executor has been marked as failed.")
.format(state, tid))
}
} catch {
case e: Exception => logError("Exception in statusUpdate", e)
}
}
// Update the DAGScheduler without holding a lock on this, since that can deadlock
if (failedExecutor.isDefined) {
assert(reason.isDefined)
dagScheduler.executorLost(failedExecutor.get, reason.get)
backend.reviveOffers()
}
}
statusUpdate對(duì)于不同的任務(wù)狀態(tài)的處理方式如下:
1.如果類型為Taskstate.Finished,那么會(huì)調(diào)用TaskResultGetter的enquenceSuccessfulTask方法進(jìn)行處理,該方法的主要是根據(jù)TaskResult的發(fā)送方式去做出相應(yīng)的處理,如果是indirectTaskResult,就根據(jù)blockid獲取結(jié)果,如果是directTaskResult,那么結(jié)果就無(wú)需遠(yuǎn)程獲取了(因?yàn)橹苯影l(fā)送到driverEndpoint端點(diǎn)了,不需要從blockmanager去拉取數(shù)據(jù))。
2.如果類型是Taskstate.failed或者Taskstate.killed,或者Taskstate.lost,調(diào)用TaskResultGetter的enquenceFailedTask進(jìn)行處理,對(duì)于Taskstate.lost,還需要將其所在的executor標(biāo)記為failed,并且根據(jù)更新后的executor去重新調(diào)度。
enquenceSuccessfulTask方法最終會(huì)調(diào)用TaskScheduler的handleSuccessfulTask方法,最終調(diào)用DAGSchedule的handleTaskCompletion方法。
還記得之前說(shuō)過(guò)shufflewriter階段寫入到blockManager中時(shí),最終返回給DAGScheduler是一個(gè)MapStatus對(duì)象嗎,該對(duì)象會(huì)被序列化存入到indirectTaskResult和directTaskResult中,而handleTaskCompletion方法就會(huì)獲取這個(gè)結(jié)果,并把mapStatus注冊(cè)到MapOutputTrackerMaster中,從而完成ShuffleMapTask的處理。
DAGScheduler的handleTaskCompletion方法匹配ShuffleMapTask的處理過(guò)程如下:
case smt: ShuffleMapTask =>
val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
val status = event.result.asInstanceOf[MapStatus]
val execId = status.location.executorId
logDebug("ShuffleMapTask finished on " + execId)
if (stageIdToStage(task.stageId).latestInfo.attemptNumber == task.stageAttemptId) {
shuffleStage.pendingPartitions -= task.partitionId
}
if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
} else {
mapOutputTracker.registerMapOutput(
shuffleStage.shuffleDep.shuffleId, smt.partitionId, status)
shuffleStage.pendingPartitions -= task.partitionId
}
if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
markStageAsFinished(shuffleStage)
logInfo("looking for newly runnable stages")
logInfo("running: " + runningStages)
logInfo("waiting: " + waitingStages)
logInfo("failed: " + failedStages)
mapOutputTracker.incrementEpoch()
clearCacheLocs()
if (!shuffleStage.isAvailable) {
logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
") because some of its tasks had failed: " +
shuffleStage.findMissingPartitions().mkString(", "))
submitStage(shuffleStage)
} else {
markMapStageJobsAsFinished(shuffleStage)
submitWaitingChildStages(shuffleStage)
}
}
}
而如果任務(wù)是ResultTask,會(huì)判斷該作業(yè)是否完成,如果完成,則標(biāo)記該作業(yè)以及完成,清除作業(yè)依賴的資源并發(fā)送消息給系統(tǒng)監(jiān)聽(tīng)總線告知作業(yè)完成。
三.總結(jié)
