Spark Streaming Receiver啟動(dòng)過(guò)程分析

—————?—————?—————?—————?—————?—————
Spark Streaming概述
Spark Streaming 初始化過(guò)程
Spark Streaming Receiver啟動(dòng)過(guò)程分析
Spark Streaming 數(shù)據(jù)準(zhǔn)備階段分析(Receiver方式)
Spark Streaming 數(shù)據(jù)計(jì)算階段分析
SparkStreaming Backpressure分析
Spark Streaming Executor DynamicAllocation 機(jī)制分析

—————?—————?—————?—————?—————?—————

Receiver是數(shù)據(jù)準(zhǔn)備階段的一個(gè)主要組件,其負(fù)載接入外部數(shù)據(jù),其生命周期由ReceiverTracker負(fù)責(zé)管理。

Receiver的啟動(dòng)

1. Receiver抽取與Executor準(zhǔn)備

“Spark Streaming 初始化過(guò)程”中提到 JobScheduler在啟動(dòng)時(shí)會(huì)創(chuàng)建和啟動(dòng)ReceiverTracker.
在ReceiverTracker創(chuàng)建時(shí),其會(huì)從DStreamGraph中抽取出ReceiverInputStream,以便在啟動(dòng)Receiver時(shí)從中抽取出Receiver,然后一一啟動(dòng)。

  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()

在ReceiverTracker啟動(dòng)時(shí),其主要做如下兩件事:

  • 創(chuàng)建ReceiverTrackerEndpoint,用于接收Receiver的信息
  • 啟動(dòng)Receiver.

ReceiverTracker的Start方法如下所示:

  /** Start the endpoint and receiver execution thread. */
  def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started")
    }

    if (!receiverInputStreams.isEmpty) {
      endpoint = ssc.env.rpcEnv.setupEndpoint(
        "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
      if (!skipReceiverLaunch) launchReceivers()
      logInfo("ReceiverTracker started")
      trackerState = Started
    }
  }

其中 launchReceivers()方法用于啟動(dòng)Receiver, 其代碼如下:

 /**
   * Get the receivers from the ReceiverInputDStreams, distributes them to the
   * worker nodes as a parallel collection, and runs them.
   */
  private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    // 發(fā)送啟動(dòng)指令
    endpoint.send(StartAllReceivers(receivers)) 
  }

此方法的主要操作有:

  • 從ReceiverInputStreams中抽取Receiver, 并將streamId做為Receiver的id.
  • 執(zhí)行runDummySparkJob,此方法是執(zhí)行一個(gè)簡(jiǎn)單的SparkJob,目的是為確保應(yīng)用申請(qǐng)的Executor的最小份額得以滿足,最小份額由參數(shù)“spark.cores.max” 和 “spark.scheduler.minRegisteredResourcesRatio” 共同決定,默認(rèn)為申請(qǐng)的所有Executor。當(dāng)應(yīng)用已獲得的Executor數(shù)量小于最小份額時(shí),Job將阻塞并等待Executor注冊(cè),直到滿足其運(yùn)行需要的最小限額。
    runDummySparkJob的代碼如下:
  /**
   * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
   * receivers to be scheduled on the same node.
   *
   * TODO Should poll the executor number and wait for executors according to
   * "spark.scheduler.minRegisteredResourcesRatio" and
   * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
   */
  private def runDummySparkJob(): Unit = {
    if (!ssc.sparkContext.isLocal) {
      ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
    }
    assert(getExecutors.nonEmpty)
  }

程序邏輯非常簡(jiǎn)單,目的是使其在不消耗過(guò)多資源的情況下,可以保證在調(diào)度Recevier時(shí),已有大量的Executor注冊(cè)完成,從而使Recevier調(diào)度時(shí)盡量均勻的調(diào)度至不同的Executor 。

  • 向ReceiverTrackerEndpoint發(fā)送啟動(dòng)所有executor指令(StartAllReceivers)

在ReceiverTrackerEndpoint收到StartAllReceivers指令后,其將

  • 調(diào)度Receiver: 為Receiver設(shè)置執(zhí)行位置信息
  • 啟動(dòng)Receiver

其實(shí)現(xiàn)邏輯如下:

case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

2. Receiver 調(diào)度

Receiver調(diào)度工作由ReceiverSchedulingPolicy進(jìn)行,對(duì)Receiver的調(diào)度工作主要可以分為如下兩個(gè)階段:

  • 全局調(diào)度階段
    此階段發(fā)生在首次調(diào)度Receiver時(shí),此階段會(huì)保證receivers盡量均勻的分散在Executors中。調(diào)度過(guò)程中會(huì)為每一個(gè)Receiver指定啟動(dòng)的位置信息(location)
  • 局部調(diào)度階段
    此階段發(fā)生在Receiver重啟時(shí),僅需啟動(dòng)失敗Receiver

全局調(diào)度階段是必然會(huì)發(fā)生的,因此將以這種情況為例對(duì)Receiver調(diào)度進(jìn)行詳細(xì)說(shuō)明。其調(diào)度過(guò)程如下:

  • 獲取所有executor的主要地址信息
  • 創(chuàng)建numReceiversOnExecutor用于記錄每個(gè)Executor分配的Receiver數(shù)目
  • 創(chuàng)建scheduledLocations用于記錄用戶指定偏好位置的Receiver
  • 調(diào)度指定preferredLocation信息的Receiver. 遍歷Receivers, 為用戶指定的preferredLocation的主機(jī)中選擇啟動(dòng)Receiver數(shù) 最少的Executor做為當(dāng)前Receiver啟動(dòng)位置,并更新記錄scheduledLocations 和numReceiversOnExecutor。
  • 調(diào)度未指定preferredLocation信息的Receiver.
    將Executor依照分配的Receiver數(shù)目從小到大排序,為Receiver分配一個(gè)Executor.
  • 若還有剩余Executor, 將這些Executor 加入到擁有最少候選對(duì)象的Receiver列表中。

至此, Receiver與與Executor的關(guān)聯(lián)聯(lián)系建立完畢。
調(diào)度的實(shí)現(xiàn)代碼如下所示:

 /**
   * Try our best to schedule receivers with evenly distributed. However, if the
   * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
   * because we have to respect them.
   *
   * Here is the approach to schedule executors:
   * <ol>
   *   <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
   *       executors running on those host.</li>
   *   <li>Then, schedule all other receivers evenly among all the executors such that overall
   *       distribution over all the receivers is even.</li>
   * </ol>
   *
   * This method is called when we start to launch receivers at the first time.
   *
   * @return a map for receivers and their scheduled locations
   */
  def scheduleReceivers(
      receivers: Seq[Receiver[_]],
      executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
    if (receivers.isEmpty) {
      return Map.empty
    }

    if (executors.isEmpty) {
      return receivers.map(_.streamId -> Seq.empty).toMap
    }

    val hostToExecutors = executors.groupBy(_.host)
    val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
    val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
    // Set the initial value to 0
    executors.foreach(e => numReceiversOnExecutor(e) = 0)

    // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
    // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
    for (i <- 0 until receivers.length) {
      // Note: preferredLocation is host but executors are host_executorId
      receivers(i).preferredLocation.foreach { host =>
        hostToExecutors.get(host) match {
          case Some(executorsOnHost) =>
            // preferredLocation is a known host. Select an executor that has the least receivers in
            // this host
            val leastScheduledExecutor =
              executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
            scheduledLocations(i) += leastScheduledExecutor
            numReceiversOnExecutor(leastScheduledExecutor) =
              numReceiversOnExecutor(leastScheduledExecutor) + 1
          case None =>
            // preferredLocation is an unknown host.
            // Note: There are two cases:
            // 1. This executor is not up. But it may be up later.
            // 2. This executor is dead, or it's not a host in the cluster.
            // Currently, simply add host to the scheduled executors.

            // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
            // this case
            scheduledLocations(i) += TaskLocation(host)
        }
      }
    }

    // For those receivers that don't have preferredLocation, make sure we assign at least one
    // executor to them.
    for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
      // Select the executor that has the least receivers
      val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
      scheduledLocationsForOneReceiver += leastScheduledExecutor
      numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
    }

    // Assign idle executors to receivers that have less executors
    val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
    for (executor <- idleExecutors) {
      // Assign an idle executor to the receiver that has least candidate executors.
      val leastScheduledExecutors = scheduledLocations.minBy(_.size)
      leastScheduledExecutors += executor
    }

    receivers.map(_.streamId).zip(scheduledLocations).toMap
  }

此實(shí)現(xiàn),存在一個(gè)問(wèn)題,如果Receiver設(shè)置了preferredLocation且preferredLocation所對(duì)應(yīng)的主機(jī)存在此應(yīng)用的Executor的情況下,也不一定保證Receiver調(diào)度至此Executor.

3. Receiver 啟動(dòng)

在為Receiver設(shè)置完啟動(dòng)位置之后,將調(diào)用startReceiver方法啟動(dòng)Receiver, 啟動(dòng)過(guò)程如下:

  • 依據(jù)preferredLocation將Receiver包裝成RDD
 // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
  • 以SparkJob的形式提交作業(yè), Receiver作為T(mén)ask 以線程方式執(zhí)行
 val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
  • Task執(zhí)行, 執(zhí)行的startReceiverFunc方法,該方法會(huì)創(chuàng)建并啟動(dòng)ReceiverSupervisorImpl(Job及Task調(diào)度過(guò)程此處不再詳細(xì)說(shuō)明,同批處理)
    // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

其中ReceiverSupervisorImpl 提供了處理Receiver接收數(shù)據(jù)的所有必要的方法。并且它還創(chuàng)建了BlockGenerator,用于對(duì)Receiver接收的數(shù)據(jù)流進(jìn)行切片操作。
其ReceiverSupervisorImpl的Start方法實(shí)現(xiàn)如下:

/** Start the supervisor */
 def start() {
   onStart()
   startReceiver()
 }

其中onStart() 會(huì)創(chuàng)建BlockGenerator并啟動(dòng)。
startReceiver()方法,首先會(huì)向ReceiverTracker注冊(cè)Receiver信息,并驗(yàn)證Receiver是否合法。若合法,則調(diào)用Receiver的onStart方法進(jìn)行數(shù)據(jù)接收,其實(shí)現(xiàn)邏輯如下:

 /** Start receiver */
  def startReceiver(): Unit = synchronized {
    try {
      if (onReceiverStart()) {
        logInfo(s"Starting receiver $streamId")
        receiverState = Started
        receiver.onStart()
        logInfo(s"Called receiver $streamId onStart")
      } else {
        // The driver refused us
        stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
      }
    } catch {
      case NonFatal(t) =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }

下面以WordCount中的SocketInputDStream中的SocketReceiver為例進(jìn)行說(shuō)明,其onStart方法實(shí)現(xiàn)如下:

def onStart() {

    logInfo(s"Connecting to $host:$port")
    try {
      socket = new Socket(host, port)
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $host:$port", e)
        return
    }
    logInfo(s"Connected to $host:$port")

    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }

 /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next())
      }
      if (!isStopped()) {
        restart("Socket data stream had no more data")
      } else {
        logInfo("Stopped receiving")
      }
    } catch {
      case NonFatal(e) =>
        logWarning("Error receiving data", e)
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }
}

通過(guò)上述實(shí)現(xiàn)可知,其將通過(guò)socket方式進(jìn)行數(shù)據(jù)接收。
Receiver啟動(dòng)流程至此結(jié)束,Receiver啟動(dòng)之后會(huì)接收源源不斷的數(shù)據(jù)流并對(duì)數(shù)據(jù)分片,副本分發(fā)工作,為計(jì)算階段做準(zhǔn)備,接下來(lái)將進(jìn)行數(shù)據(jù)準(zhǔn)備環(huán)節(jié)的分析。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容