基于spark1.6
創(chuàng)建完SparkContext,然后執(zhí)行Action算子
當(dāng)RDD執(zhí)行Action算子時(形成一個job),會將代碼提交到Master上運(yùn)行,
例如wordcount的action 算子 collect方法? def collect(): Array[T] = {
? ? val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
? ? Array.concat(results: _*)
? }
sc是SparkContext對象,上面 runJob 如下
? def runJob[T, U: ClassTag](
? ? ? rdd: RDD[T],
? ? ? func: (TaskContext, Iterator[T]) => U,
? ? ? partitions: Seq[Int],
? ? ? allowLocal: Boolean,
? ? ? resultHandler: (Int, U) => Unit) {
........................
? ? }
? ? //該方法調(diào)用多次重載的方法后,最終會調(diào)用dagScheduler的runJob,形成和切分stage
def runJob[T, U: ClassTag](
? ? ? rdd: RDD[T],
? ? ? func: (TaskContext, Iterator[T]) => U,
? ? ? partitions: Seq[Int],
? ? ? allowLocal: Boolean,
? ? ? resultHandler: (Int, U) => Unit) {
? 。。。。。。。
? ? //dagScheduler出現(xiàn)了,可以切分stage
? ? dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,
? ? ? resultHandler, localProperties.get)
? ? progressBar.foreach(_.finishAll())
? ? rdd.doCheckpoint()
? }
dagScheduler的runJob 是我們比較關(guān)心的
def runJob[T, U: ClassTag](
? ? 。。。。。
? ? val waiter = submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)
? }
這里面的我們主要看的是submitJob(rdd, func, partitions, callSite, allowLocal, resultHandler, properties)提交任務(wù),括號里面的是任務(wù)信息
def submitJob[T, U](。。。): JobWaiter[U] = {
? ? //在這兒才封裝任務(wù)提交事件,把該事件對象加入到任務(wù)隊(duì)列里面? ? eventProcessLoop.post(JobSubmitted(
? ? ? jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties))
? }
JobSubmitted:// 封裝job事件對象,放入DAGScheduler阻塞的事件隊(duì)列,例如:任務(wù)id,數(shù)據(jù)RDD,fun,jobId(可見一個action就是一個job)
從隊(duì)列中取出事件對象,調(diào)用 onReceive方法,即調(diào)用子類 DAGSchedulerEventProcessLoop 的onReceive方法,該方法的匹配模式如下:
(1)先生成finalStage。
? ? case JobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties) =>
? ? ? //調(diào)用dagScheduler來出來提交任務(wù)
? ? ? dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, allowLocal, callSite, listener, properties)
調(diào)用了handleJobSubmitted方法,接下來查看該方法
private[scheduler] def handleJobSubmitted(jobId: Int,
? ? ? finalRDD: RDD[_],
? ? ? func: (TaskContext, Iterator[_]) => _,
? ? ? partitions: Array[Int],
? ? ? allowLocal: Boolean,
? ? ? callSite: CallSite,? ? ? listener: JobListener,
? ? ? properties: Properties) {
? ? ? ? var finalStage: Stage = null
? ? ? ? //最終的stage,從后往前劃分
? ? ? ? finalStage = newResultStage(finalRDD, partitions.size, None, jobId, callSite)
? ? ? ? 。。。。
? ? ? ? submitStage(finalStage)
? //?提交其他正在等待的stage??
?? submitWaitingStages()?}
}
/**??
???*?創(chuàng)建一個 ResultStage?,形成有向無環(huán)圖
???*/??
??private?def?newResultStage(??
rdd:?RDD[_],
??????func:?(TaskContext,?Iterator[_])?=>?_,??
??????partitions:?Array[Int],??
??????jobId:?Int,??
??????callSite:?CallSite):?ResultStage?=?{??
?//下面這個函數(shù)會生成我們的DAG,需重點(diǎn)關(guān)注??
????val?(parentStages:?List[Stage],?id:?Int)?=?getParentStagesAndId(rdd,?jobId)??
val?stage?=?new?ResultStage(id,rdd,?func,?partitions,parentStages,?jobId,?callSite)
????stageIdToStage(id)?=?stage?//將Stage的id放入stageIdToStage結(jié)構(gòu)中。??
????updateJobIdStageIdMaps(jobId,?stage)?//更新JobIdStageIdMaps??
????stage??
?}
??}?
上面的代碼中,調(diào)用了newResultStage方法,該方法是劃分任務(wù)的核心方法,任務(wù)劃分是根據(jù)最后一個依賴關(guān)系作為開始,通過遞歸,將每個寬依賴做為切分Stage的依據(jù),切分Stage的過程是流程中的一環(huán)(詳見 day29_spark-源碼-Stage劃分算法,并最終得到了DAG圖中的Result Stage(final Stage)),但在這里不詳細(xì)闡述,當(dāng)任務(wù)切分完畢后,代碼繼續(xù)執(zhí)行來到submitStage(finalStage)這里開始進(jìn)行任務(wù)提交
(2)提交resultStage
//提交Stage,如果有未提交的ParentStage,則會遞歸提交這些ParentStage,只有所有ParentStage都計(jì)算完了,才能提交當(dāng)前Stage?
? private def submitStage(stage: Stage) { // 此stage是 result stage
? //?根據(jù)stage獲取jobId??
? ? val jobId = activeJobForStage(stage)? //查找該Stage的所有激活的job
? if (jobId.isDefined) {? //?jobId 存在就執(zhí)行,如果不存在就停止??
//?記錄Debug日志信息:submitStage(stage)??
? ? ? logDebug("submitStage(" + stage + ")")?
? ? //如果當(dāng)前Stage沒有在等待parent Stage的返回,也不是正在運(yùn)行的Stage,并且也沒有提 示提交失敗,說明未處理,那么我們就嘗試提交Stage?
? ? ? if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {?
? ? ? ? //得到還未執(zhí)行的父 stage
val missing = getMissingParentStages(stage).sortBy(_.id)?
? ? ? ? logDebug("missing: " + missing)?
? ? ? ? if (missing.isEmpty) {? ? //如果沒有父 Stage?
? ? ? ? //當(dāng)前stage 拆分成task,形成taskSet 并提交
? ? ? ? ? submitMissingTasks(stage, jobId.get) // 注意這個stage會是兩種類型 1、shufflerMapStage 2、resultStage
? ? ? ? ? ? } else {?
? ? ? ? ? //有父Stage沒進(jìn)行計(jì)算,就遞歸提交這些父Stage?
? ? ? ? ? for (parent <- missing) {? // 該stage的所有父stage
? ? ? ? ? ? submitStage(parent)// 遞歸調(diào)用本身?
? ? }?
? ? ? ? ? ? waitingStages += stage?
? ? ? ? }?
? ? ? }?
? ? } else {//無效作業(yè),停止它。?
? ? ? abortStage(stage, "No active job for stage " + stage.id, None)?
? ? }?
}
********************getMissingParentStages方法如下****************
針對 stage的執(zhí)行要記住2個判斷點(diǎn) 1、getmissingParentStages()方法為核心方法。這里我們要懂得這樣一個邏輯:我們都知道,Stage是通過shuffle劃分的,所以,每一Stage都是以shuffle開始的,若一個RDD是寬依賴,則必然說明該RDD的父RDD在另一個Stage中,若一個RDD是窄依賴,則該RDD所依賴的父RDD還在同一個Stage中,我們可以根據(jù)這個邏輯,找到該Stage的父Stage。
// DAGScheduler.scala
private def getMissingParentStages(stage: Stage): List[Stage] = {
? ? val missing = new HashSet[Stage] //用于存放父Stage
? ? val visited = new HashSet[RDD[_]] //用于存放已訪問過的RDD
? ? val waitingForVisit = new Stack[RDD[_]]
? ? def visit(rdd: RDD[_]) {
? ? ? if (!visited(rdd)) { //如果RDD沒有被訪問過,則進(jìn)行訪問
? ? ? ? visited += rdd //添加到已訪問RDD的HashSet中
? ? ? ? val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
? ? ? ? if (rddHasUncachedPartitions) {
? ? ? ? ? for (dep <- rdd.dependencies) { //獲取該RDD的依賴
? ? ? ? ? ? dep match {
? ? ? ? ? ? ? case shufDep: ShuffleDependency[_, _, _] =>//若為寬依賴,則該RDD依賴的RDD所在的stage必為父stage
? ? ? ? ? ? ? ? val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)//生成父Stage
? ? ? ? ? ? ? ? if (!mapStage.isAvailable) {//若父Stage task沒有完全執(zhí)行,則添加到父Stage的HashSET中
? ? ? ? ? ? ? ? ? missing += mapStage // 如果是寬依賴,那么就表示找到了,不存在寬依賴前還有寬依賴
? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? case narrowDep: NarrowDependency[_] =>//若為窄依賴,則需要再判斷,其父有無寬依賴
? ? ? ? ? ? ? ? waitingForVisit.push(narrowDep.rdd)
? ? ? ? ? ? }
? ? ? ? ? }
? ? ? ? }
? ? ? }
? ? }
? ? waitingForVisit.push(stage.rdd)
? ? while (waitingForVisit.nonEmpty) {//循環(huán)遍歷所有RDD
? ? ? visit(waitingForVisit.pop())
? ? }
? ? missing.toList
}
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
針對 stage的執(zhí)行要記住2個判斷點(diǎn) 2、每當(dāng)執(zhí)行完一個Task會對變量_numAvailableOutputs加1,直至所有Task執(zhí)行完,_numAvailableOutputs等于分區(qū)數(shù)。
(3)提交MissingTask
stage根據(jù) parition 拆分成task(決定每個Task的最佳位置)生成TaskSet,并提交到TaskScheduler
private def submitMissingTasks(stage: Stage, jobId: Int) {
? //首先根據(jù)stage所依賴的RDD的partition的分布,會產(chǎn)生出與partition數(shù)量相等的task
? var tasks = ArrayBuffer[Task[_]]()
? //對于resultStage或是shufflerMapStage會產(chǎn)生不同的task。
? //檢查該stage時是否ShuffleMapStage,如果是則生成ShuffleMapTask
? if (stage.isShuffleMapStage) { //生成ShuffleMapStage
? ? for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
? ? ? //task根據(jù)partition的locality進(jìn)行分布
? ? ? val locs = getPreferredLocs(stage.rdd, p)
? ? ? tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
? ? }
? } else { //resultStage:該類型stage直接輸出結(jié)果生成ResultTask
? val job = resultStageToJob(stage)
? ? for (id <- 0 until job.numPartitions if !job.finished(id)) {
? ? ? val partition = job.partitions(id)
? ? ? val locs = getPreferredLocs(stage.rdd, partition)
? ? ? //由于是ResultTask,因此需要傳入定義的func,也就是處理結(jié)果返回
? ? ? tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
? ? }
? }
? //向TaskSchuduler提交任務(wù),以stage為單位,一個stage對應(yīng)一個TaskSet
? taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
}
Task任務(wù)調(diào)度
taskScheduler.submitTasks方法比較重要,主要將任務(wù)加入調(diào)度池(taskschduler 創(chuàng)建時初始一個調(diào)度池),最后調(diào)用了CoarseGrainedSchedulerBackend.reviveOffers()
override def submitTasks(taskSet: TaskSet) {
? ? val tasks = taskSet.tasks
? ? this.synchronized {
? ? ? ? //將TaskSet 封裝成TaskSetManger? ? ? val manager = createTaskSetManager(taskSet, maxTaskFailures)
? ? ? activeTaskSets(taskSet.id) = manager
? ? //用 schedulableBuilder去添加TaskSetManager到隊(duì)列中
//schedulableBuilder有兩種形態(tài):FIFOSchedulableBuilder: 單一pool ,F(xiàn)airSchedulableBuilder: ? 多個pool? ? ? schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
。。。。。。。。。。。
? ? //在TaskSchedulerImpl在submitTasks添加TaskSetManager到pool后,調(diào)用了? ? backend.reviveOffers()? ? ?
//fifo 直接將可調(diào)度對象TaskSetManager加入SchedulerQueue的尾端。?
override def addSchedulable(schedulable: Schedulable) {?
? ? require(schedulable != null)?
? ? schedulableQueue.add(schedulable)?
? ? schedulableNameToSchedulable.put(schedulable.name, schedulable)?
? ? schedulable.parent = this?
? }
? override def reviveOffers() {
? ? //自己給自己發(fā)消息(告訴它我要提交task)
? ? driverActor ! ReviveOffers? }
這里用了內(nèi)部的DriverActor對象發(fā)送了一個內(nèi)部消息給自己,接下來查看receiver方法接受的消息
收到消息后調(diào)用了makeOffers()方法
? ? ? case ReviveOffers =>
? ? ? ? makeOffers()
? def makeOffers() {
? ? ? launchTasks(scheduler.resourceOffers(executorDataMap.map {
? case (id, executorData) =>
? ? ? ? new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
? ? ? }.toSeq))
? ? }
makeOffers方法中,將Executor的信息集合與調(diào)度池中的Tasks封裝成WokerOffers,調(diào)用
launchTasks
? ? def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
? ? ? for (task <- tasks.flatten) {
? ? ? ? 。。。。。。
? ? ? ? //把task序列化
? ? ? ? val serializedTask = ser.serialize(task)
? ? ? ? ? ? 。。。。。
//向executor進(jìn)程 發(fā)送創(chuàng)建TaskRunner(extends Runnable)
? ? ? ? ? val executorData = executorDataMap(task.executorId)(這是之前注冊過了的)
? ? ? ? ? executorData.freeCores -= scheduler.CPUS_PER_TASK
? ? ? ? ? //把序列化好的task發(fā)送給Executor
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))? ? ? ? ?
? ? ? ? }
? ? ? }
? ? }
會由CoarseGrainedSchedulerBackend來接受執(zhí)行指令,內(nèi)部封裝DriverActor
launchTasks方法將遍歷Tasks集合,每個Task任務(wù)序列化,發(fā)送啟動Task執(zhí)行消息的給Executor