spark-源碼-action算子觸發(fā)

基于spark1.6

創(chuàng)建完SparkContext,然后執(zhí)行Action算子

當(dāng)RDD執(zhí)行Action算子時(形成一個job),會將代碼提交到Master上運(yùn)行,

例如wordcount的action 算子 collect方法? def collect(): Array[T] = {

? ? val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)

? ? Array.concat(results: _*)

? }

sc是SparkContext對象,上面 runJob 如下

? def runJob[T, U: ClassTag](

? ? ? rdd: RDD[T],

? ? ? func: (TaskContext, Iterator[T]) => U,

? ? ? partitions: Seq[Int],

? ? ? allowLocal: Boolean,

? ? ? resultHandler: (Int, U) => Unit) {

........................

? ? }

? ? //該方法調(diào)用多次重載的方法后,最終會調(diào)用dagScheduler的runJob,形成和切分stage

def runJob[T, U: ClassTag](

? ? ? rdd: RDD[T],

? ? ? func: (TaskContext, Iterator[T]) => U,

? ? ? partitions: Seq[Int],

? ? ? allowLocal: Boolean,

? ? ? resultHandler: (Int, U) => Unit) {

? 。。。。。。。

? ? //dagScheduler出現(xiàn)了,可以切分stage

? ? dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,

? ? ? resultHandler, localProperties.get)

? ? progressBar.foreach(_.finishAll())

? ? rdd.doCheckpoint()

? }


dagScheduler的runJob 是我們比較關(guān)心的

def runJob[T, U: ClassTag](

? ? 。。。。。

? ? val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)

? }

這里面的我們主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任務(wù),括號里面的是任務(wù)信息

def submitJob[T, U](。。。): JobWaiter[U] = {

? ? //在這兒才封裝任務(wù)提交事件,把該事件對象加入到任務(wù)隊(duì)列里面? ? eventProcessLoop.post(JobSubmitted(

? ? ? jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))

? }

JobSubmitted:// 封裝job事件對象,放入DAGScheduler阻塞的事件隊(duì)列,例如:任務(wù)id,數(shù)據(jù)RDD,fun,jobId(可見一個action就是一個job)

從隊(duì)列中取出事件對象,調(diào)用 onReceive方法,即調(diào)用子類 DAGSchedulerEventProcessLoop 的onReceive方法,該方法的匹配模式如下:

(1)先生成finalStage。

? ? case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>

? ? ? //調(diào)用dagScheduler來出來提交任務(wù)

? ? ? dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)

調(diào)用了handleJobSubmitted方法,接下來查看該方法

private[scheduler] def handleJobSubmitted(jobId: Int,

? ? ? finalRDD: RDD[_],

? ? ? func: (TaskContext, Iterator[_]) => _,

? ? ? partitions: Array[Int],

? ? ? allowLocal: Boolean,

? ? ? callSite: CallSite,? ? ? listener: JobListener,

? ? ? properties: Properties) {

? ? ? ? var finalStage: Stage = null

? ? ? ? //最終的stage,從后往前劃分

? ? ? ? finalStage = newResultStage(finalRDD, partitions.size, None, jobId, callSite)

? ? ? ? 。。。。

? ? ? ? submitStage(finalStage)

? //?提交其他正在等待的stage??

?? submitWaitingStages()?}

}

/**??

???*?創(chuàng)建一個 ResultStage?,形成有向無環(huán)圖

???*/??

??private?def?newResultStage(??

rdd:?RDD[_],

??????func:?(TaskContext,?Iterator[_])?=>?_,??

??????partitions:?Array[Int],??

??????jobId:?Int,??

??????callSite:?CallSite):?ResultStage?=?{??

?//下面這個函數(shù)會生成我們的DAG,需重點(diǎn)關(guān)注??

????val?(parentStages:?List[Stage],?id:?Int)?=?getParentStagesAndId(rdd,?jobId)??

val?stage?=?new?ResultStage(id,rdd,?func,?partitions,parentStages,?jobId,?callSite)

????stageIdToStage(id)?=?stage?//將Stage的id放入stageIdToStage結(jié)構(gòu)中。??

????updateJobIdStageIdMaps(jobId,?stage)?//更新JobIdStageIdMaps??

????stage??

?}

??}?

上面的代碼中,調(diào)用了newResultStage方法,該方法是劃分任務(wù)的核心方法,任務(wù)劃分是根據(jù)最后一個依賴關(guān)系作為開始,通過遞歸,將每個寬依賴做為切分Stage的依據(jù),切分Stage的過程是流程中的一環(huán)(詳見 day29_spark-源碼-Stage劃分算法,并最終得到了DAG圖中的Result Stage(final Stage)),但在這里不詳細(xì)闡述,當(dāng)任務(wù)切分完畢后,代碼繼續(xù)執(zhí)行來到submitStage(finalStage)這里開始進(jìn)行任務(wù)提交

(2)提交resultStage

//提交Stage,如果有未提交的ParentStage,則會遞歸提交這些ParentStage,只有所有ParentStage都計(jì)算完了,才能提交當(dāng)前Stage?

? private def submitStage(stage: Stage) { // 此stage是 result stage

? //?根據(jù)stage獲取jobId??

? ? val jobId = activeJobForStage(stage)? //查找該Stage的所有激活的job

? if (jobId.isDefined) {? //?jobId 存在就執(zhí)行,如果不存在就停止??

//?記錄Debug日志信息:submitStage(stage)??

? ? ? logDebug("submitStage(" + stage + ")")?

? ? //如果當(dāng)前Stage沒有在等待parent Stage的返回,也不是正在運(yùn)行的Stage,并且也沒有提 示提交失敗,說明未處理,那么我們就嘗試提交Stage?

? ? ? if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {?

? ? ? ? //得到還未執(zhí)行的父 stage

val missing = getMissingParentStages(stage).sortBy(_.id)?

? ? ? ? logDebug("missing: " + missing)?

? ? ? ? if (missing.isEmpty) {? ? //如果沒有父 Stage?

? ? ? ? //當(dāng)前stage 拆分成task,形成taskSet 并提交

? ? ? ? ? submitMissingTasks(stage, jobId.get) // 注意這個stage會是兩種類型 1、shufflerMapStage 2、resultStage

? ? ? ? ? ? } else {?

? ? ? ? ? //有父Stage沒進(jìn)行計(jì)算,就遞歸提交這些父Stage?

? ? ? ? ? for (parent <- missing) {? // 該stage的所有父stage

? ? ? ? ? ? submitStage(parent)// 遞歸調(diào)用本身?

? ? }?

? ? ? ? ? ? waitingStages += stage?

? ? ? ? }?

? ? ? }?

? ? } else {//無效作業(yè),停止它。?

? ? ? abortStage(stage, "No active job for stage " + stage.id, None)?

? ? }?

}

********************getMissingParentStages方法如下****************

針對 stage的執(zhí)行要記住2個判斷點(diǎn) 1、getmissingParentStages()方法為核心方法。這里我們要懂得這樣一個邏輯:我們都知道,Stage是通過shuffle劃分的,所以,每一Stage都是以shuffle開始的,若一個RDD是寬依賴,則必然說明該RDD的父RDD在另一個Stage中,若一個RDD是窄依賴,則該RDD所依賴的父RDD還在同一個Stage中,我們可以根據(jù)這個邏輯,找到該Stage的父Stage。

// DAGScheduler.scala

private def getMissingParentStages(stage: Stage): List[Stage] = {

? ? val missing = new HashSet[Stage] //用于存放父Stage

? ? val visited = new HashSet[RDD[_]] //用于存放已訪問過的RDD

? ? val waitingForVisit = new Stack[RDD[_]]

? ? def visit(rdd: RDD[_]) {

? ? ? if (!visited(rdd)) { //如果RDD沒有被訪問過,則進(jìn)行訪問

? ? ? ? visited += rdd //添加到已訪問RDD的HashSet中

? ? ? ? val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)

? ? ? ? if (rddHasUncachedPartitions) {

? ? ? ? ? for (dep <- rdd.dependencies) { //獲取該RDD的依賴

? ? ? ? ? ? dep match {

? ? ? ? ? ? ? case shufDep: ShuffleDependency[_, _, _] =>//若為寬依賴,則該RDD依賴的RDD所在的stage必為父stage

? ? ? ? ? ? ? ? val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)//生成父Stage

? ? ? ? ? ? ? ? if (!mapStage.isAvailable) {//若父Stage task沒有完全執(zhí)行,則添加到父Stage的HashSET中

? ? ? ? ? ? ? ? ? missing += mapStage // 如果是寬依賴,那么就表示找到了,不存在寬依賴前還有寬依賴

? ? ? ? ? ? ? ? }

? ? ? ? ? ? ? case narrowDep: NarrowDependency[_] =>//若為窄依賴,則需要再判斷,其父有無寬依賴

? ? ? ? ? ? ? ? waitingForVisit.push(narrowDep.rdd)

? ? ? ? ? ? }

? ? ? ? ? }

? ? ? ? }

? ? ? }

? ? }

? ? waitingForVisit.push(stage.rdd)

? ? while (waitingForVisit.nonEmpty) {//循環(huán)遍歷所有RDD

? ? ? visit(waitingForVisit.pop())

? ? }

? ? missing.toList

}

def isAvailable: Boolean = _numAvailableOutputs == numPartitions

針對 stage的執(zhí)行要記住2個判斷點(diǎn) 2、每當(dāng)執(zhí)行完一個Task會對變量_numAvailableOutputs加1,直至所有Task執(zhí)行完,_numAvailableOutputs等于分區(qū)數(shù)。

(3)提交MissingTask

stage根據(jù) parition 拆分成task(決定每個Task的最佳位置)生成TaskSet,并提交到TaskScheduler

private def submitMissingTasks(stage: Stage, jobId: Int) {

? //首先根據(jù)stage所依賴的RDD的partition的分布,會產(chǎn)生出與partition數(shù)量相等的task

? var tasks = ArrayBuffer[Task[_]]()

? //對于resultStage或是shufflerMapStage會產(chǎn)生不同的task。

? //檢查該stage時是否ShuffleMapStage,如果是則生成ShuffleMapTask

? if (stage.isShuffleMapStage) { //生成ShuffleMapStage

? ? for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {

? ? ? //task根據(jù)partition的locality進(jìn)行分布

? ? ? val locs = getPreferredLocs(stage.rdd, p)

? ? ? tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)

? ? }

? } else { //resultStage:該類型stage直接輸出結(jié)果生成ResultTask

? val job = resultStageToJob(stage)

? ? for (id <- 0 until job.numPartitions if !job.finished(id)) {

? ? ? val partition = job.partitions(id)

? ? ? val locs = getPreferredLocs(stage.rdd, partition)

? ? ? //由于是ResultTask,因此需要傳入定義的func,也就是處理結(jié)果返回

? ? ? tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)

? ? }

? }

? //向TaskSchuduler提交任務(wù),以stage為單位,一個stage對應(yīng)一個TaskSet

? taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))

}

Task任務(wù)調(diào)度

taskScheduler.submitTasks方法比較重要,主要將任務(wù)加入調(diào)度池(taskschduler 創(chuàng)建時初始一個調(diào)度池),最后調(diào)用了CoarseGrainedSchedulerBackend.reviveOffers()

override def submitTasks(taskSet: TaskSet) {

? ? val tasks = taskSet.tasks

? ? this.synchronized {

? ? ? ? //將TaskSet 封裝成TaskSetManger? ? ? val manager = createTaskSetManager(taskSet, maxTaskFailures)

? ? ? activeTaskSets(taskSet.id) = manager

? ? //用 schedulableBuilder去添加TaskSetManager到隊(duì)列中

//schedulableBuilder有兩種形態(tài):FIFOSchedulableBuilder: 單一pool ,F(xiàn)airSchedulableBuilder: ? 多個pool? ? ? schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

。。。。。。。。。。。

? ? //在TaskSchedulerImpl在submitTasks添加TaskSetManager到pool后,調(diào)用了? ? backend.reviveOffers()? ? ?

//fifo 直接將可調(diào)度對象TaskSetManager加入SchedulerQueue的尾端。?

override def addSchedulable(schedulable: Schedulable) {?

? ? require(schedulable != null)?

? ? schedulableQueue.add(schedulable)?

? ? schedulableNameToSchedulable.put(schedulable.name, schedulable)?

? ? schedulable.parent = this?

? }

? override def reviveOffers() {

? ? //自己給自己發(fā)消息(告訴它我要提交task)

? ? driverActor ! ReviveOffers? }

這里用了內(nèi)部的DriverActor對象發(fā)送了一個內(nèi)部消息給自己,接下來查看receiver方法接受的消息

收到消息后調(diào)用了makeOffers()方法

? ? ? case ReviveOffers =>

? ? ? ? makeOffers()

? def makeOffers() {

? ? ? launchTasks(scheduler.resourceOffers(executorDataMap.map {

? case (id, executorData) =>

? ? ? ? new WorkerOffer(id, executorData.executorHost, executorData.freeCores)

? ? ? }.toSeq))

? ? }

makeOffers方法中,將Executor的信息集合與調(diào)度池中的Tasks封裝成WokerOffers,調(diào)用

launchTasks

? ? def launchTasks(tasks: Seq[Seq[TaskDescription]]) {

? ? ? for (task <- tasks.flatten) {

? ? ? ? 。。。。。。

? ? ? ? //把task序列化

? ? ? ? val serializedTask = ser.serialize(task)

? ? ? ? ? ? 。。。。。

//向executor進(jìn)程 發(fā)送創(chuàng)建TaskRunner(extends Runnable)

? ? ? ? ? val executorData = executorDataMap(task.executorId)(這是之前注冊過了的)

? ? ? ? ? executorData.freeCores -= scheduler.CPUS_PER_TASK

? ? ? ? ? //把序列化好的task發(fā)送給Executor

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))? ? ? ? ?

? ? ? ? }

? ? ? }

? ? }

會由CoarseGrainedSchedulerBackend來接受執(zhí)行指令,內(nèi)部封裝DriverActor

launchTasks方法將遍歷Tasks集合,每個Task任務(wù)序列化,發(fā)送啟動Task執(zhí)行消息的給Executor

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容