Spark Streaming(2) - JobScheduler、JobGenerator

本文基于Spark 2.11

1. 前言

Spark Streaming(1)中提到JobScheduler使用JobGenerator可以每隔一段時(shí)間根據(jù)DStream DAG創(chuàng)建出RDD DAG,并提交job,本文主要介紹JobScheduler的細(xì)節(jié)。

2. JobScheduler

JobScheduler在StreamingContext調(diào)用start時(shí)啟動(dòng),啟動(dòng)序列如下:

StreamingContext#start
       ->JobScheduler#start
             -> ReceiverTracker#start
                   ->JobGenerator#start

JobScheduler有如下成員:

 private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  private val jobGenerator = new JobGenerator(this)

  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  1. jobSets。
    job生成時(shí)間到j(luò)obs的映射,JobGenerator調(diào)用DStreamGraph為持有的每一個(gè)DStream DAG生成一個(gè)job返回給JobGenerator,JobGenerator將時(shí)間以及生成的jobs反饋給Jobscheudler,保存在jobSets里。JobGenerator并沒(méi)有提交job,job是由JobScheudler提交的。
  2. numConcurrentJobs
    控制同時(shí)能運(yùn)行的job數(shù)量。
  3. jobExecutor
    線程池,由numConccurrentJobs控制線程數(shù)量,jobExecutor里提交job并等待結(jié)果。由于等待結(jié)果是一個(gè)阻塞操作,所以一個(gè)線程同時(shí)只能提交一個(gè)job
  4. jobGenerator
    JobScheduler委托用來(lái)生成job
  5. receiverTracker,JobScheduler啟動(dòng),接收Receiver上報(bào)的數(shù)據(jù)batch信息。

3. JobGenerator生成job

上面說(shuō)到JobScheduler委托JobGenerator生成job,
下面是JobGenerator的核心成員:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  
  private var eventLoop: EventLoop[JobGeneratorEvent] = null

  // last batch whose completion,checkpointing and metadata cleanup has been completed
  1. timer
    定時(shí)器,JobGenerator定時(shí)生成job,時(shí)間間隔batchDuration就是創(chuàng)建StreamingContext是傳入的,這個(gè)timer每隔timeDuration時(shí)間網(wǎng)eventLoop中發(fā)送一條生成job的消息。
  2. eventLoop
    一直運(yùn)行,接收消息,做出處理。接受的消息類型有:
  • GenerateJobs, 使用DSteamGraph生成job
  • DoCheckpoint,提交新的job去做checkpoint
  • ClearCheckpointData,DoCheckpoint都是在job完成后清楚信息的

生成job
timer定時(shí)器每隔batchDuration往eventLoop發(fā)送GenerateJob事件生成job,下面是eventLoop時(shí)間主循環(huán)中處理GenerateJob事件調(diào)用如下:

eventLoop#processEvent
   --> jobGenerator#generateJobs

下面是JobGenerator的generateJobs

 private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        //將jobs反饋給JobScheudler,等待調(diào)度
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }
  1. receiverTracker.allocateBlocksToBatch(time)根據(jù)當(dāng)前時(shí)間time,從已經(jīng)匯報(bào)的數(shù)據(jù)中生成數(shù)據(jù)塊,后續(xù)根據(jù)DStream生成RDD的數(shù)據(jù)就是根據(jù)time檢索到本次生成的數(shù)據(jù)塊
  2. graph.generateJobs生成jobs
  3. jobScheduler.submitJobSet,反饋給Jobscheudler等待人物調(diào)度
  4. eventLoop.post,創(chuàng)建job做checkpoint

第二步創(chuàng)建中創(chuàng)建job有如下調(diào)用序列:

DStreamGraph#generateJobs
    ->DStream#generateJob

//DStream#generateJob
private[streaming] def generateJob(time: Time): Option[Job] = {
    // 將DStream轉(zhuǎn)換成RDD
    getOrCompute(time) match {
      case Some(rdd) =>
        // 此處創(chuàng)建了函數(shù),函數(shù)里基于當(dāng)前RDD提交了job
        // JobScheduler在jobExecutor線程池中調(diào)度job時(shí),該函數(shù)會(huì)執(zhí)行
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

使用time,以及一個(gè)jobFunc的函數(shù)創(chuàng)建Job,jobFunc在調(diào)度時(shí)執(zhí)行。

4. JobScheduler調(diào)度job

3中面提到JobGenerator生成jobs并將生成的job反饋給JobScheduler,2中說(shuō)到到JobScheduler使用jobExecutor調(diào)度job

下面是JobScheduler的submitJobSet方法:

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
      logInfo("No jobs added for time " + jobSet.time)
    } else {
      listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
      logInfo("Added jobs for time " + jobSet.time)
    }
  }

上面代碼中jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))對(duì)JobGenerator傳遞過(guò)來(lái)的每一個(gè)job包裝成JobHandler,然后在jobExecutor線程池中調(diào)度執(zhí)行。

JobHandler實(shí)現(xiàn)了Runnable接口,是的能在線程池中運(yùn)行,它的run方法如下:

 def run() {
      val oldProps = ssc.sparkContext.getLocalProperties
      try {
        ssc.sparkContext.setLocalProperties(SerializationUtils.clone(ssc.savedProperties.get()))
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)
        // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
        // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
        ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sparkContext.setLocalProperties(oldProps)
      }
    }

調(diào)用Job#run方法,run方法中執(zhí)行jobFunc完成job的提交。

job 并行度的控制
JobScheduler的成員numConcurrentJobs控制同時(shí)能有多少stream job在運(yùn)行,numConcurrentJobs通過(guò)spark.streaming.concurrentJobs配置項(xiàng)獲取,默認(rèn)為1. numCOncurrentJobs控制jobExecutor線程池中線程的數(shù)量從而實(shí)現(xiàn)控制同時(shí)運(yùn)行的JobHandler數(shù)量(而一個(gè)JobHandler封裝一個(gè)job)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容