TasksetManager沖突導(dǎo)致SparkContext異常關(guān)閉

背景介紹

當(dāng)正在悠閑敲著代碼的時(shí)候,業(yè)務(wù)方兄弟反饋接收到大量線(xiàn)上運(yùn)行的spark streaming任務(wù)的告警短信,查看應(yīng)用的web頁(yè)面信息,發(fā)現(xiàn)spark應(yīng)用已經(jīng)退出了,第一時(shí)間拉起線(xiàn)上的應(yīng)用,再慢慢的定位故障原因。本文代碼基于spark 1.6.1。

問(wèn)題定位

登陸到線(xiàn)上機(jī)器,查看錯(cuò)誤日志,發(fā)現(xiàn)系統(tǒng)一直報(bào)Cannot call methods on a stopped SparkContext.,全部日志如下

[ERROR][JobScheduler][2017-03-08+15:56:00.067][org.apache.spark.streaming.scheduler.JobScheduler]Error running job streaming job 1488959760000 ms.0
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext.
This stopped SparkContext was created at:

org.apache.spark.SparkContext.<init>(SparkContext.scala:82)
org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:874)
org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81)
com.xxxx.xxxx.MainApp$.createStreamingContext(MainApp.scala:46)
com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126)
com.xxxx.xxxx.MainApp$$anonfun$15.apply(MainApp.scala:126)
scala.Option.getOrElse(Option.scala:120)
org.apache.spark.streaming.StreamingContext$.getOrCreate(StreamingContext.scala:864)
com.xxxx.xxxx.MainApp$.main(MainApp.scala:125)
com.xxxx.xxxx.MainApp.main(MainApp.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

看到此處應(yīng)該很清楚了,是SparkContext已經(jīng)停止了,接下來(lái)我們分析下是什么原因?qū)е铝薙parkContext的停止,首先找到關(guān)閉的日志;分析SparkContext的代碼可知,在關(guān)閉結(jié)束后會(huì)打印一個(gè)成功關(guān)閉的詳情日志。

logInfo("Successfully stopped SparkContext")

通過(guò)grep命令找到相應(yīng)的日志的位置,如下所示

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:30.841][org.apache.spark.SparkContext]Successfully stopped SparkContext

從日志中可以看出是dag-scheduler-event-loop線(xiàn)程關(guān)閉了SparkContext,查看該線(xiàn)程的日志信息,顯示如下

java.lang.IllegalStateException: more than one active taskSet for stage 4571114: 4571114.2,4571114.1
        at org.apache.spark.scheduler.TaskSchedulerImpl.submitTasks(TaskSchedulerImpl.scala:173)
        at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1052)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:921)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:1214)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1637)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1599)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1588)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)

上面顯示有一個(gè)stage同時(shí)啟動(dòng)了兩個(gè)TasksetManager,TaskScheduler.submitTasks的代碼如下:

 override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
    .........
  }

看到這震驚了,怎么會(huì)出現(xiàn)兩個(gè)呢?繼續(xù)看之前的日志,發(fā)現(xiàn)stage4571114被resubmit了;

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Resubmitting ShuffleMapStage 4571114 (map at MainApp.scala:73) because some of its tasks had failed: 0
[INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.547][org.apache.spark.scheduler.DAGScheduler]Submitting ShuffleMapStage 4571114 (MapPartitionsRDD[3719544] at map at MainApp.scala:73), which has no missing parents

查看stage重新提交的代碼,以下代碼截取自DAGScheduler.handleTaskCompletion方法

case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            updateAccumulators(event)
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
              shuffleStage.addOutputLoc(smt.partitionId, status)
            }

            if (runningStages.contains(shuffleStage) && shuffleStage.pendingPartitions.isEmpty) {
              markStageAsFinished(shuffleStage)
              logInfo("looking for newly runnable stages")
              logInfo("running: " + runningStages)
              logInfo("waiting: " + waitingStages)
              logInfo("failed: " + failedStages)

              // We supply true to increment the epoch number here in case this is a
              // recomputation of the map outputs. In that case, some nodes may have cached
              // locations with holes (from when we detected the error) and will need the
              // epoch incremented to refetch them.
              // TODO: Only increment the epoch number if this is not the first time
              //       we registered these map outputs.
              mapOutputTracker.registerMapOutputs(
                shuffleStage.shuffleDep.shuffleId,
                shuffleStage.outputLocInMapOutputTrackerFormat(),
                changeEpoch = true)

              clearCacheLocs()

              if (!shuffleStage.isAvailable) {
                // Some tasks had failed; let's resubmit this shuffleStage
                // TODO: Lower-level scheduler should also deal with this
                logInfo("Resubmitting " + shuffleStage + " (" + shuffleStage.name +
                  ") because some of its tasks had failed: " +
                  shuffleStage.findMissingPartitions().mkString(", "))
                submitStage(shuffleStage)
              } else {
                // Mark any map-stage jobs waiting on this stage as finished
                if (shuffleStage.mapStageJobs.nonEmpty) {
                  val stats = mapOutputTracker.getStatistics(shuffleStage.shuffleDep)
                  for (job <- shuffleStage.mapStageJobs) {
                    markMapStageJobAsFinished(job, stats)
                  }
                }
              }

可以看出只有shuffleStage.pendingPartitions為空同時(shí)shuffleStage.isAvailable為false的時(shí)候才會(huì)觸發(fā)resubmit,我們來(lái)看下這兩個(gè)變量是什么時(shí)候開(kāi)始,pendingPartitions表示現(xiàn)在正在處理的partition的數(shù)量,當(dāng)task運(yùn)行結(jié)束后會(huì)刪除,

    val stage = stageIdToStage(task.stageId)
    event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
         //從正在處理的partition中移除
        stage.pendingPartitions -= task.partitionId

isAvaible判斷的是已經(jīng)告知driver的shuffle數(shù)據(jù)位置的partition數(shù)目是否等于總共的partition數(shù)目
def isAvailable: Boolean = _numAvailableOutputs == numPartitions
這個(gè)變量也是在ShuffleTask運(yùn)行結(jié)束后進(jìn)行更新的,不過(guò)需要注意的是,只有在Shuffle數(shù)據(jù)所在的executor還是可用的時(shí)候才進(jìn)行更新,如果運(yùn)行shuffleTask的executor已經(jīng)掛了,肯定也無(wú)法通過(guò)該executor獲取磁盤(pán)上的shuffle數(shù)據(jù)

 case smt: ShuffleMapTask =>
            val shuffleStage = stage.asInstanceOf[ShuffleMapStage]
            updateAccumulators(event)
            val status = event.result.asInstanceOf[MapStatus]
            val execId = status.location.executorId
            logDebug("ShuffleMapTask finished on " + execId)
            if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) {
              logInfo(s"Ignoring possibly bogus $smt completion from executor $execId")
            } else {
              shuffleStage.addOutputLoc(smt.partitionId, status)
            }

唯一的可能造成重新調(diào)度的就是該處了,根據(jù)關(guān)鍵信息查詢(xún)下日志信息

[INFO][dag-scheduler-event-loop][2017-03-03+22:16:27.427][org.apache.spark.scheduler.DAGScheduler]Ignoring possibly bogus ShuffleMapTask(4571114, 0) completion from executor 4

但就算此時(shí)剛運(yùn)行完shuffleTask的executor掛掉了,造成了stage的重新調(diào)度,也不會(huì)導(dǎo)致TasksetManager沖突,因?yàn)榇藭r(shí)taskset.isZombie狀態(tài)肯定變了為true,因?yàn)門(mén)asksetManager.handleSuccessfulTask方法執(zhí)行在DAGScheduler.handleTaskCompletion之前。

        ts.taskSet != taskSet && !ts.isZombie```

TasksetManager.handleSuccessfulTask

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
//最終會(huì)提交一個(gè)CompletionEvent事件到DAGScheduler的事件隊(duì)列中等待處理
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
}

可能有的同學(xué)已經(jīng)看出問(wèn)題來(lái)了,為了將問(wèn)題說(shuō)的更明白,我畫(huà)了一個(gè)task執(zhí)行成功的時(shí)序圖

![task執(zhí)行成功時(shí)序圖](http://upload-images.jianshu.io/upload_images/4906916-cba16d0fd30b808a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
結(jié)合時(shí)序圖和代碼我們可以看出DAGSchduler.handleCompletion執(zhí)行發(fā)生在了TasksetManager.handleSuccessfulTask方法中isZombie變?yōu)閠rue之前,handleSuccessfulTask是在`task-result-getter`線(xiàn)程中執(zhí)行的,導(dǎo)致isZombie還未變?yōu)閠rue,DAGSchduler就觸發(fā)了stage的重新提交,最終導(dǎo)致TaskManger沖突。
以下日志分別是resubmit提交的時(shí)間和handleSuccessfuleTask的結(jié)束時(shí)間,從側(cè)面(由于isZombie變?yōu)閠rue并沒(méi)有馬上打印時(shí)間)也能夠看出resubmit重新提交的時(shí)間早于handleSuccessfuleTask。

handleSuccessfuleTask結(jié)束時(shí)間
[INFO][task-result-getter-2][2017-03-03+22:16:29.999][org.apache.spark.scheduler.TaskSchedulerImpl]Removed TaskSet 4571114.1, whose tasks have all completed, from pool

resubmit stage任務(wù)重新提交時(shí)間
[INFO][dag-scheduler-event-loop][2017-03-03+22:16:29.549][org.apache.spark.scheduler.TaskSchedulerImpl]Adding task set 4571114.2 with 1 tasks

事件發(fā)生的時(shí)間軸

![事件時(shí)間](http://upload-images.jianshu.io/upload_images/4906916-e8dba924eaa8a86a.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

###問(wèn)題修復(fù)
該問(wèn)題修復(fù)其實(shí)很簡(jiǎn)單,只需要修改TasksetManager.handleSuccessfulTask的方法,在isZombie=true后再發(fā)送CompletionEvent事件即可,代碼修復(fù)如下

def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {
val info = taskInfos(tid)
val index = info.index
info.markSuccessful()
removeRunningTask(tid)
// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the
// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not
// "deserialize" the value when holding a lock to avoid blocking other threads. So we call
// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.
// Note: "result.value()" only deserializes the value when it's called at the first time, so
// here "result.value()" just returns the value and won't block other threads.
if (!successful(index)) {
tasksSuccessful += 1
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
info.id, taskSet.id, info.taskId, info.duration, info.host, tasksSuccessful, numTasks))
// Mark successful and stop if all the tasks have succeeded.
successful(index) = true
if (tasksSuccessful == numTasks) {
isZombie = true
}
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
} else {
logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +
" because task " + index + " has already completed successfully")
}
failedExecutors.remove(index)
maybeFinishTaskSet()
sched.dagScheduler.taskEnded(
tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)
}

改問(wèn)題已經(jīng)提交到spark社區(qū)中,詳情見(jiàn)[spark patch](https://github.com/apache/spark/pull/17208)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容