父類EventLoop起了一個Thread,監(jiān)聽從LinkedBlockingDeque中獲取event,然后用onReceive接收執(zhí)行,DAGSchedulerEventProcessLoop類中onReceive方法調(diào)用了doOnReceive,具體判斷事件的類別,并進行處理。
private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
private[this] val timer = dagScheduler.metricsSource.messageProcessingTimer
/**
* The main event loop of the DAG scheduler.
*/
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
doOnReceive(event)
} finally {
timerContext.stop()
}
}
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 處理job提交任務(wù)
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
//處理map提交的stage任務(wù)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
//處理map stage 取消的任務(wù)
case StageCancelled(stageId, reason) =>
dagScheduler.handleStageCancellation(stageId, reason)
//處理job 取消的任務(wù)
case JobCancelled(jobId, reason) =>
dagScheduler.handleJobCancellation(jobId, reason)
//處理job 組取消的任務(wù)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
//處理所有job 取消的任務(wù)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
//處理executort完成分配的事件
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
//Executor丟失
case ExecutorLost(execId, reason) =>
val filesLost = reason match {
case SlaveLost(_, true) => true
case _ => false
}
dagScheduler.handleExecutorLost(execId, filesLost)
//開始task
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
//獲取task結(jié)果
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
//事件完成
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
//task失敗
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
//重復(fù)提交
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
}
override def onError(e: Throwable): Unit = {
logError("DAGSchedulerEventProcessLoop failed; shutting down SparkContext", e)
try {
dagScheduler.doCancelAllJobs()
} catch {
case t: Throwable => logError("DAGScheduler failed to cancel all jobs.", t)
}
dagScheduler.sc.stopInNewThread()
}
override def onStop(): Unit = {
// Cancel any active jobs in postStop hook
dagScheduler.cleanUpAfterSchedulerStop()
}
}