Spark Core源碼精讀計劃#8:SparkEnv中RPC環(huán)境的基礎(chǔ)構(gòu)建

目錄

前言

在之前的文章中,我們由SparkContext的初始化提到了事件總線LiveListenerBus與執(zhí)行環(huán)境SparkEnv。在講解SparkEnv的過程中,RPC環(huán)境RpcEnv又是首先被初始化的重要組件。做個不怎么恰當(dāng)?shù)谋容^,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。

由于RPC環(huán)境負(fù)責(zé)著Spark體系內(nèi)幾乎所有內(nèi)部及外部通信,內(nèi)容很多,所以一篇文章必然講不完。本文還是從基礎(chǔ)開始看起。

RPC端點及其引用

RpcEnv抽象類是Spark RPC環(huán)境的通用表示,它其中定義的setupEndpoint()方法用來向RPC環(huán)境注冊一個RPC端點(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客戶端要對一個RpcEndpoint發(fā)送消息,那么必須首先獲得其對應(yīng)的RpcEndpointRef。它們之間的關(guān)系可以用如下簡圖表示。


圖#8.1 - RPC環(huán)境與RPC端點

既然RpcEndpoint和RpcEndpointRef是RPC環(huán)境中的基礎(chǔ)組件,我們先來研究它們的源碼。

RpcEndpoint

RpcEndpoint是一個特征,其代碼如下。

代碼#8.1 - o.a.s.rpc.RpcEndpoint特征

private[spark] trait RpcEndpoint {
  val rpcEnv: RpcEnv

  final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }

  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }

  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  def onError(cause: Throwable): Unit = {
    throw cause
  }

  def onConnected(remoteAddress: RpcAddress): Unit = { }

  def onDisconnected(remoteAddress: RpcAddress): Unit = { }

  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }

  def onStart(): Unit = { }

  def onStop(): Unit = { }

  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}

其中定義了如下方法,這些相當(dāng)于是RPC端點在RPC環(huán)境中的“行為準(zhǔn)則”。

  • self():取得當(dāng)前RpcEndpoint對應(yīng)的RpcEndpointRef。
  • receive()/receiveAndReply():接收其他RpcEndpointRef傳來的消息并進(jìn)行處理,receiveAndReply()方法還會發(fā)送回復(fù)。
  • onError():消息處理出現(xiàn)異常時調(diào)用的方法。
  • onConnected()/onDisconnected():當(dāng)前RPC端點建立連接或斷開連接時調(diào)用的方法。
  • onNetworkError():RPC端點的連接出現(xiàn)網(wǎng)絡(luò)錯誤時調(diào)用的方法。
  • onStart()/onStop():RPC端點初始化與關(guān)閉時調(diào)用的方法。
  • stop():停止當(dāng)前RpcEndpoint。

RpcEndpoint繼承體系

RpcEndpoint的主要繼承體系如下圖所示。

#圖8.2 - RpcEndpoint的主要繼承體系

圖中可以看到不少之前出現(xiàn)過的RPC端點,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它們時,會專門進(jìn)行講解。

另外,圖中的ThreadSafeRpcEndpoint是直接繼承自RpcEndpoint的特征。顧名思義,它要求RPC端點對消息的處理必須是線程安全的,用文檔中的話說,線程安全RPC端點處理消息必須滿足happens-before原則。

RpcEndpointRef

RpcEndpointRef是一個抽象類,其代碼如下。

代碼#8.2 - o.a.s.rpc.RpcEndpointRef抽象類

private[spark] abstract class RpcEndpointRef(conf: SparkConf)
  extends Serializable with Logging {
  private[this] val maxRetries = RpcUtils.numRetries(conf)
  private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
  private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)

  def address: RpcAddress

  def name: String

  def send(message: Any): Unit

  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
}

這個抽象類的開頭有三個屬性,都是通過RpcUtils工具類從Spark配置項中取出來的,如下。

  • maxRetries:最大重連次數(shù),對應(yīng)配置項為spark.rpc.numRetries,默認(rèn)值3次。
  • retryWaitMs:每次重連之前等待的時長,對應(yīng)配置項為spark.rpc.retry.wait,默認(rèn)值3秒。
  • defaultAskTimeout:對RPC端點進(jìn)行ask()操作(下面會講到)的默認(rèn)超時時長,對應(yīng)配置項為spark.rpc.askTimeout與spark.network.timeout(前者優(yōu)先級高于后者),默認(rèn)值120秒。

值得注意的是,maxRetries與retryWaitMs兩個屬性在當(dāng)前的2.3.3版本中都沒有用到,而在之前的版本中還是有用到的,證明Spark官方取消了RPC重試機(jī)制,也就是統(tǒng)一為消息傳遞語義中的at most once語義了。當(dāng)然,我們也可以自己實現(xiàn)帶有重試機(jī)制的RPC端點引用。

address和name方法分別返回RPC端點引用對應(yīng)的地址和名稱,不必多講。下面幾個方法的含義如下。

  • send()方法:異步發(fā)送一條單向的消息,并且“發(fā)送即忘記”(fire-and-forget),不需要回復(fù)。
  • ask()方法:異步發(fā)送一條消息,并在規(guī)定的超時時間內(nèi)等待RPC端點的回復(fù)。RPC端點會調(diào)用receiveAndReply()方法來處理。
  • askSync()方法:是ask()方法的同步實現(xiàn)。由于它是阻塞操作,有可能會消耗大量時間,因此必須慎用。

RpcEndpointRef只有一個子類,即NettyRpcEndpointRef。它對send()和ask()兩個方法的實現(xiàn)如下。

代碼#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()與ask()方法

  override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
    nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
  }

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
  }

可見是依賴于NettyRpcEnv類的,下面來看一下它是如何創(chuàng)建出來的。

NettyRpcEnv概況

創(chuàng)建NettyRpcEnv

在文章#7的代碼#7.4~#7.5中,通過工廠類NettyRpcEnvFactory的create()方法創(chuàng)建出了NettyRpcEnv,它是目前Spark官方提供的RPC環(huán)境的唯一實現(xiàn)。該方法的代碼如下。

代碼#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法

  def create(config: RpcEnvConfig): RpcEnv = {
    val sparkConf = config.conf
    val javaSerializerInstance =
      new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      try {
        Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
      } catch {
        case NonFatal(e) =>
          nettyEnv.shutdown()
          throw e
      }
    }
    nettyEnv
  }

可見,這個方法先創(chuàng)建了JavaSerializer序列化器,用于RPC傳輸中的序列化。然后通過NettyRpcEnv的構(gòu)造方法創(chuàng)建NettyRpcEnv,這其中也會涉及到一些RPC基礎(chǔ)組件的初始化,后面會講解到。最后定義偏函數(shù)startNettyRpcEnv,并調(diào)用通用工具類Utils中的startServiceOnPort()方法來啟動NettyRpcEnv。

NettyRpcEnv中的屬性成員

我們暫時先不看NettyRpcEnv類的細(xì)節(jié),而是先來看看它內(nèi)部包含了哪些組件。

代碼#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的屬性成員

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var fileDownloadFactory: TransportClientFactory = _

  val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")

  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64))

  @volatile private var server: TransportServer = _

  private val stopped = new AtomicBoolean(false)

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  • TransportConf:傳輸配置,作用在RPC環(huán)境中類似于SparkConf,負(fù)責(zé)管理與RPC相關(guān)的各種配置。
  • Dispatcher:調(diào)度器,或者叫分發(fā)器,用于將消息路由到正確的RPC端點。
  • NettyStreamManager:流式管理器,用于處理RPC環(huán)境中的文件,如自定義的配置文件或者JAR包。
  • TransportContext:傳輸上下文,作用在RPC環(huán)境中類似于SparkContext,負(fù)責(zé)管理RPC的服務(wù)端(TransportServer)與客戶端(TransportClient),與它們之間的Netty傳輸管道。
  • TransportClientFactory:創(chuàng)建RPC客戶端TransportClient的工廠。
  • TransportServer:RPC環(huán)境中的服務(wù)端,負(fù)責(zé)提供基礎(chǔ)且高效的流式服務(wù)。

TransportConf和TransportContext提供底層的基于Netty的RPC機(jī)制,TransportClient和TransportServer則是RPC端點的最低級別抽象。

總結(jié)

本文講解了RPC環(huán)境的基本組成部分RpcEndpoint、RpcEndpointRef的細(xì)節(jié)實現(xiàn),并初步了解了NettyRpcEnv的創(chuàng)建過程,以及它內(nèi)部包含的主要組件。雖然TransportConf和TransportContext更為基礎(chǔ),但為了避免嵌套太深出不來,下一篇文章暫時不準(zhǔn)備講它們,而主要來研究NettyRpcEnv內(nèi)的調(diào)度器Dispatcher,它是整個RPC環(huán)境高效運轉(zhuǎn)的基礎(chǔ)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容