[spark streaming] 動(dòng)態(tài)生成 Job 并提交執(zhí)行

前言

Spark Streaming Job的生成是通過(guò)JobGenerator每隔 batchDuration 長(zhǎng)時(shí)間動(dòng)態(tài)生成的,每個(gè)batch 對(duì)應(yīng)提交一個(gè)JobSet,因?yàn)獒槍?duì)一個(gè)batch可能有多個(gè)輸出操作。

概述流程:

  • 定時(shí)器定時(shí)向 eventLoop 發(fā)送生成job的請(qǐng)求
  • 通過(guò)receiverTracker 為當(dāng)前batch分配block
  • 為當(dāng)前batch生成對(duì)應(yīng)的 Jobs
  • 將Jobs封裝成JobSet 提交執(zhí)行

入口

在 JobGenerator 初始化的時(shí)候就創(chuàng)建了一個(gè)定時(shí)器:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

每隔 batchDuration 就會(huì)向 eventLoop 發(fā)送 GenerateJobs(new Time(longTime))消息,eventLoop的事件處理方法中會(huì)調(diào)用generateJobs(time)方法:

      case GenerateJobs(time) => generateJobs(time)
private def generateJobs(time: Time) {
    // Checkpoint all RDDs marked for checkpointing to ensure their lineages are
    // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
    ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true")
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
        PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

為當(dāng)前batchTime分配Block

首先調(diào)用receiverTracker.allocateBlocksToBatch(time)方法為當(dāng)前batchTime分配對(duì)應(yīng)的Block,最終會(huì)調(diào)用receiverTracker的Block管理者receivedBlockTrackerallocateBlocksToBatch方法:

def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
    if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
      val streamIdToBlocks = streamIds.map { streamId =>
          (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
      }.toMap
      val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)
      if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
        timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
        lastAllocatedBatchTime = batchTime
      } else {
        logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
      }
    } else {
      logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
    }
  }
private def getReceivedBlockQueue(streamId: Int): ReceivedBlockQueue = {
    streamIdToUnallocatedBlockQueues.getOrElseUpdate(streamId, new ReceivedBlockQueue)
  }

可以看到是從streamIdToUnallocatedBlockQueues中獲取到所有streamId對(duì)應(yīng)的未分配的blocks,該隊(duì)列的信息是supervisor 存儲(chǔ)好Block后向receiverTracker上報(bào)的Block信息,詳情可見(jiàn) ReceiverTracker 數(shù)據(jù)產(chǎn)生與存儲(chǔ)。

獲取到所有streamId對(duì)應(yīng)的未分配的blockInfos后,將其放入了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,后面生成RDD的時(shí)候會(huì)用到。

為當(dāng)前batchTime生成Jobs

調(diào)用DStreamGraphgenerateJobs方法為當(dāng)前batchTime生成job:

 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

一個(gè)outputStream就對(duì)應(yīng)一個(gè)job,遍歷所有的outputStreams,為其生成job:

# ForEachDStream
override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

先獲取到time對(duì)應(yīng)的RDD,然后將其作為參數(shù)再調(diào)用foreachFunc方法,foreachFunc方法是通過(guò)構(gòu)造器傳過(guò)來(lái)的,我們來(lái)看看print()輸出的情況:

def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println(s"Time: $time")
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

這里的構(gòu)造的foreachFunc方法就是最終和rdd一起提交job的執(zhí)行方法,也即對(duì)rdd調(diào)用take()后并打印,真正觸發(fā)action操作的是在這個(gè)func函數(shù)里,現(xiàn)在再來(lái)看看是怎么拿到rdd的,每個(gè)DStream都有一個(gè)generatedRDDs:Map[Time, RDD[T]]變量,來(lái)保存time對(duì)應(yīng)的RDD,若獲取不到則會(huì)通過(guò)compute()方法來(lái)計(jì)算,對(duì)于需要在executor上啟動(dòng)Receiver來(lái)接收數(shù)據(jù)的ReceiverInputDStream來(lái)說(shuō):

 override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

會(huì)通過(guò)receiverTracker來(lái)獲取該batch對(duì)應(yīng)的blocks,前面已經(jīng)分析過(guò)為所有streamId分配了對(duì)應(yīng)的未分配的block,并且放在了timeToAllocatedBlocks:Map[Time, AllocatedBlocks]中,這里底層就是從這個(gè)timeToAllocatedBlocks獲取到的blocksInfo,然后調(diào)用了createBlockRDD(validTime, blockInfos)通過(guò)blockId創(chuàng)建了RDD。

最后,將通過(guò)此RDD和foreachFun構(gòu)建jobFunc,并創(chuàng)建Job返回。

封裝jobs成JobSet并提交執(zhí)行

每個(gè)outputStream對(duì)應(yīng)一個(gè)Job,最終就會(huì)生成一個(gè)jobs,為這個(gè)jobs創(chuàng)建JobSet,并通過(guò)jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))來(lái)提交這個(gè)JobSet:

jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))

然后通過(guò)jobExecutor來(lái)執(zhí)行,jobExecutor是一個(gè)線程池,并行度默認(rèn)為1,可通過(guò)spark.streaming.concurrentJobs配置,即同時(shí)可執(zhí)行幾個(gè)批次的數(shù)據(jù)。

處理類JobHandler中調(diào)用的是Job.run(),執(zhí)行的是前面構(gòu)建的 jobFunc 方法。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容