Spark executor中task的數量與最大并發(fā)數

關于executor和task的概念可以參考官方文檔
本文使用的源碼是spark 2.0.0版本

Task的數量

根據類DAGScheduler中的submitMissingTasks方法可以知道,在stage中會為每個需要計算的partition生成一個task,換句話說也就是每個task處理一個partition。

//From submitMissingTasks
......   
val tasks: Seq[Task[_]] = try {
      stage match {
        case stage: ShuffleMapStage =>
          partitionsToCompute.map { id =>
            val locs = taskIdToLocations(id)
            val part = stage.rdd.partitions(id)
            new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, stage.latestInfo.taskMetrics, properties)
          }

        case stage: ResultStage =>
          val job = stage.activeJob.get
          partitionsToCompute.map { id =>
            val p: Int = stage.partitions(id)
            val part = stage.rdd.partitions(p)
            val locs = taskIdToLocations(id)
            new ResultTask(stage.id, stage.latestInfo.attemptId,
              taskBinary, part, locs, id, properties, stage.latestInfo.taskMetrics)
          }
      }
    }
......

Task的最大并發(fā)數

當task被提交到executor之后,會根據executor可用的cpu核數,決定一個executor中最多同時運行多少個task。在類TaskSchedulerImplresourceOfferSingleTaskSet方法中,CPUS_PER_TASK的定義為val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1),也就是說默認情況下一個task對應cpu的一個核。如果一個executor可用cpu核數為8,那么一個executor中最多同是并發(fā)執(zhí)行8個task;假如設置spark.task.cpus為2,那么同時就只能運行4個task。

//From resourceOfferSingleTaskSet
......
      if (availableCpus(i) >= CPUS_PER_TASK) {
        try {
          for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
            tasks(i) += task
            val tid = task.taskId
            taskIdToTaskSetManager(tid) = taskSet
            taskIdToExecutorId(tid) = execId
            executorIdToTaskCount(execId) += 1
            executorsByHost(host) += execId
            availableCpus(i) -= CPUS_PER_TASK
            assert(availableCpus(i) >= 0)
            launchedTask = true
          }
        } catch {
          case e: TaskNotSerializableException =>
            logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
            // Do not offer resources for this task, but don't throw an error to allow other
            // task sets to be submitted.
            return launchedTask
        }
      }
......

Yarn的task與Spark中task的區(qū)別

在Yarn的NodeManager節(jié)點上啟動一個map task或者reduce task,在物理上啟動的是一個jvm進程;而Spark的task是Executor進程中的一個線程

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容