Spark RPC 框架源碼分析(二)運(yùn)行時(shí)序

前情提要:

一. Spark RPC 概述概述

上一篇我們已經(jīng)說明了 Spark RPC 框架的一個(gè)簡(jiǎn)單例子,以及一些基本概念的說明。這一篇我們主要講述其運(yùn)行時(shí)序,從而揭露 Spark RPC 框架的運(yùn)行原理。我們將分為兩部分,分別從服務(wù)端和客戶端來看。

所用 spark 版本:spark 2.1.0

二. Spark RPC 服務(wù)端

我們以上一篇 HelloworldServer 為線索,深入到 Spark RPC 框架來看看啟動(dòng)一個(gè)服務(wù)時(shí)都做了些什么。

HelloworldServer{
  ......
  def main(args: Array[String]): Unit = {
    //val host = args(0)
    val host = "localhost"
    val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }
  ......
}
image

這段代碼中有兩個(gè)主要流程,我們分別來說

2.1 Spark RPC 服務(wù)端 NettyRpcEnvFactory.create(config)

首先是下面這條代碼的運(yùn)行流程:

val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)

其實(shí)就是通過 NettyRpcEnvFactory 創(chuàng)建出一個(gè) RPC Environment ,其具體類是 NettyRpcEnv 。

我們?cè)賮砜纯磩?chuàng)建過程中會(huì)發(fā)生什么。

object NettyRpcEnvFactory extends RpcEnvFactory {
    ......
    def create(config: RpcEnvConfig): RpcEnv = {
        val conf = config.conf
    
        // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
        // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
        val javaSerializerInstance =
        new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
        //根據(jù)配置以及地址,new 一個(gè) NettyRpcEnv ,
        val nettyEnv =
        new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress)
        //如果是服務(wù)端創(chuàng)建的,那么會(huì)啟動(dòng)服務(wù)。服務(wù)端和客戶端都會(huì)通過這個(gè)方法創(chuàng)建一個(gè) NettyRpcEnv ,但區(qū)別就在這里了。
        if (!config.clientMode) {
        val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            //啟動(dòng)服務(wù)的方法,下一步就是調(diào)用這個(gè)方法了
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
        }
        try {
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
        } catch {
            case NonFatal(e) =>
            nettyEnv.shutdown()
            throw e
        }
        }
        nettyEnv
    }
    ......
}

還沒完,如果是服務(wù)端調(diào)用這段代碼,那么主要的功能是創(chuàng)建 RPCEnv ,即 NettyRpcEnv(客戶端在后面說) 。以及通過下面這行代碼,

nettyEnv.startServer(config.bindAddress, actualPort)

去調(diào)用相應(yīng)的方法啟動(dòng)服務(wù)端的服務(wù)。下面進(jìn)入到這個(gè)方法中去看看。

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
  ......
  def startServer(bindAddress: String, port: Int): Unit = {
    // here disable security
    val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList()
    //TransportContext 屬于 spark.network 中的部分,負(fù)責(zé) RPC 消息在網(wǎng)絡(luò)中的傳輸
    server = transportContext.createServer(bindAddress, port, bootstraps)
    //在每個(gè) RpcEndpoint 注冊(cè)的時(shí)候都會(huì)注冊(cè)一個(gè)默認(rèn)的 RpcEndpointVerifier,它的作用是客戶端調(diào)用的時(shí)候先用它來詢問 Endpoint 是否存在。
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }
  ......
}

執(zhí)行完畢之后這個(gè) create 方法就結(jié)束。這個(gè)流程主要就是開啟一些服務(wù),然后返回一個(gè)新的 NettyRpcEnv 。

2.2 Spark RPC 服務(wù)端 rpcEnv.setupEndpoint("hello-service", helloEndpoint)

這條代碼會(huì)去調(diào)用 NettyRpcEnv 中相應(yīng)的方法

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
  ......
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }
  ......
}

我們看到,這個(gè)方法主要是調(diào)用 dispatcher 進(jìn)行注冊(cè)的。dispatcher 的功能上一節(jié)已經(jīng)說了,

Dispatcher 的主要作用是保存注冊(cè)的RpcEndpoint、分發(fā)相應(yīng)的Message到RpcEndPoint中進(jìn)行處理。Dispatcher 即是上圖中 ThreadPool的角色。它同時(shí)也維系一個(gè) threadpool,用來處理每次接受到的 InboxMessage 。而這里處理 InboxMessage 是通過 inbox 實(shí)現(xiàn)的。

,這里我們就說一說 dispatcher 的流程。

dispatcher

dispatcher 在 NettyRpcEnv 被創(chuàng)建的時(shí)候創(chuàng)建出來。

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
    ......
    //初始化時(shí)創(chuàng)建 dispatcher
    private val dispatcher: Dispatcher = new Dispatcher(this)
    ......
}

dispatcher 類被創(chuàng)建的時(shí)候也有幾個(gè)屬性需要注意:

private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) {
    ......
    //每個(gè) RpcEndpoint 其實(shí)都會(huì)被整合成一個(gè) EndpointData 。并且每個(gè) RpcEndpoint 都會(huì)有一個(gè) inbox。
    private class EndpointData(
                                val name: String,
                                val endpoint: RpcEndpoint,
                                val ref: NettyRpcEndpointRef) {
        val inbox = new Inbox(ref, endpoint)
    }
    
    //一個(gè)阻塞隊(duì)列,當(dāng)有 RpcEndpoint 相關(guān)請(qǐng)求(InboxMessage)的時(shí)候,就會(huì)將請(qǐng)求塞到這個(gè)隊(duì)列中,然后被線程池處理。
    private val receivers = new LinkedBlockingQueue[EndpointData]
    
    //初始化便創(chuàng)建出來的線程池,當(dāng)上面的 receivers 隊(duì)列中沒內(nèi)容時(shí),會(huì)阻塞。當(dāng)有 RpcEndpoint 相關(guān)請(qǐng)求(即 InboxMessage )的時(shí)候就會(huì)立刻執(zhí)行。
    //這里處理 InboxMessage 本質(zhì)上是調(diào)用相應(yīng) RpcEndpoint 的 inbox 去處理。
    private val threadpool: ThreadPoolExecutor = {
        val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
        math.max(2, Runtime.getRuntime.availableProcessors()))
        val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
        for (i <- 0 until numThreads) {
            pool.execute(new MessageLoop)
        }
        pool
    }
    ......
}

了解一些 Dispatcher 的邏輯流程后,我們來正式看看 Dispatcher 的 registerRpcEndpoint 方法。

顧名思義,這個(gè)方法就是將 RpcEndpoint 注冊(cè)到 Dispatcher 中去。當(dāng)有 Message 到來的時(shí)候,便會(huì)分發(fā) Message 到相應(yīng)的 RpcEndPoint 中進(jìn)行處理。

private[netty] class Dispatcher(nettyEnv: NettyRpcEnv) {
  ......
  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    //注冊(cè) RpcEndpoint 時(shí)需要的是 上面的 EndpointData ,其中就包含 endpointRef ,這個(gè)主要是供客戶端使用的。
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    //多線程環(huán)境下,注冊(cè)一個(gè) RpcEndpoint 需要判斷現(xiàn)在是否處于 stop 狀態(tài)。
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")
      }
      //新建 EndpointData 并存儲(chǔ)到一個(gè) ConcurrentMap 中。
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
      }
      val data = endpoints.get(name)
      endpointRefs.put(data.endpoint, data.ref)
      //將 這個(gè) EndpointData 加入到 receivers 隊(duì)列中,此時(shí) dispatcher 中的 threadpool 會(huì)去處理這個(gè)加進(jìn)來的 EndpointData 
      //處理過程是調(diào)用它的 inbox 的 process()方法。然后 inbox 會(huì)等待消息到來。
      receivers.offer(data) // for the OnStart message
    }
    endpointRef
  }
  ......
}

Spark RPC 服務(wù)端邏輯小結(jié):我們說明了 Spark RPC 服務(wù)端啟動(dòng)的邏輯流程,分為兩個(gè)部分,第一個(gè)是 Spark RPC env ,即 NettyRpcEnv 的創(chuàng)建過程,第二個(gè)則是 RpcEndpoint 注冊(cè)到 dispatcher 的流程。
1. NettyRpcEnvFactory 創(chuàng)建 NettyRpcEnv

  • 根據(jù)地址創(chuàng)建 NettyRpcEnv。
  • NettyRpcEnv 開始啟動(dòng)服務(wù),包括 TransportContext 根據(jù)地址開啟監(jiān)聽服務(wù),向 Dispacther 注冊(cè)一個(gè) RpcEndpointVerifier 等待。

2. Dispatcher 注冊(cè) RpcEndpoint

  • Dispatcher 初始化時(shí)便創(chuàng)建一個(gè)線程池并阻塞等待 receivers 隊(duì)列中加入新的 EndpointData
  • 一旦新加入 EndpointData 便會(huì)調(diào)用該 EndpointData 的 inbox 去處理消息。比如 OnStart 消息,或是 RPCMessage 等等。

三.Spark RPC 客戶端

依舊是以上一節(jié) Spark RPC 客戶端 HelloWorld 的為線索,我們來逐層深入 Spark RPC 客戶端 HelloworldClient 的 asyncCall() 方法。

object HelloworldClient {
  ......
  def asyncCall() = {
    val rpcConf = new RpcConf()
    val config = RpcEnvClientConfig(rpcConf, "hello-client")
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String](SayHi("neo"))
    future.onComplete {
      case scala.util.Success(value) => println(s"Got the result = $value")
      case scala.util.Failure(e) => println(s"Got error: $e")
    }
    Await.result(future, Duration.apply("30s"))
    rpcEnv.shutdown()
  }
  ......
}

image

創(chuàng)建 Spark RPC 客戶端 Env(即 NettyRpcEnvFactory ) 部分和 Spark RPC 服務(wù)端是一樣的,只是不會(huì)開啟監(jiān)聽服務(wù),這里就不詳細(xì)展開。

我們從這一句開始看,這也是 Spark RPC 客戶端和服務(wù)端區(qū)別的地方所在。

val endPointRef: RpcEndpointRef = rpcEnv.setupEndpointRef(RpcAddress("localhost", 52345), "hello-service")

setupEndpointRef()

上面的的 setupEndpointRef 最終會(huì)去調(diào)用下面 setupEndpointRef() 這個(gè)方法,這個(gè)方法中又進(jìn)行一次跳轉(zhuǎn),跳轉(zhuǎn)去 setupEndpointRefByURI 這個(gè)方法中 。需要注意的是這兩個(gè)方法都是 RpcEnv 里面的,而 RpcEnv 是抽象類,它里面只實(shí)現(xiàn)部分方法,而 NettyRpcEnv 繼承了它,實(shí)現(xiàn)了全部方法。

abstract class RpcEnv(conf: RpcConf) {
  ......
  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
    //會(huì)跳轉(zhuǎn)去調(diào)用下面的方法
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
  }
  
  def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
    //其中 asyncSetupEndpointRefByURI() 返回的是 Future[RpcEndpointRef]。 這里就是阻塞,等待返回一個(gè) RpcEndpointRef。
    // defaultLookupTimeout.awaitResult 底層調(diào)用 Await.result 阻塞 直到結(jié)果返回或返回異常
    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
  }
  ......
}  

這里最主要的代碼其實(shí)就一句,

defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))

這一段可以分為兩部分, 第一部分的 defaultLookupTimeout.awaitResult 其實(shí)底層是調(diào)用 Await.result 阻塞等待一個(gè)異步操作,直到結(jié)果返回。

而asyncSetupEndpointRefByURI(uri) 則是根據(jù)給定的 uri 去返回一個(gè) RpcEndpointRef ,它是在 NettyRpcEnv 中實(shí)現(xiàn)的:

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
  ......
  def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef] = {
    //獲取地址
    val addr = RpcEndpointAddress(uri)
    //根據(jù)地址等信息新建一個(gè) NettyRpcEndpointRef 。
    val RpcendpointRef = new NettyRpcEndpointRef(conf, addr, this) 
    //每個(gè)新建的 RpcendpointRef 都有先有一個(gè)對(duì)應(yīng)的verifier 去檢查服務(wù)端存不存在對(duì)應(yīng)的 Rpcendpoint 。  
    val verifier = new NettyRpcEndpointRef(
      conf, RpcEndpointAddress(addr.rpcAddress, RpcEndpointVerifier.NAME), this)
    //向服務(wù)端發(fā)送請(qǐng)求判斷是否存在對(duì)應(yīng)的 Rpcendpoint。
    verifier.ask[Boolean](RpcEndpointVerifier.createCheckExistence(endpointRef.name)).flatMap { find =>
      if (find) {
        Future.successful(endpointRef)
      } else {
        Future.failed(new RpcEndpointNotFoundException(uri))
      }
    }(ThreadUtils.sameThread)
  }
  ......
}
  

asyncSetupEndpointRefByURI() 這個(gè)方法實(shí)現(xiàn)兩個(gè)功能,第一個(gè)就是新建一個(gè) RpcEndpointRef 。第二個(gè)是新建一個(gè) verifier ,這個(gè) verifier 的作用就是先給服務(wù)端發(fā)送一個(gè)請(qǐng)求判斷是否存在 RpcEndpointRef 對(duì)應(yīng)的 RpcEndpoint 。

這段代碼中最重要的就是 verifiter.ask[Boolean](...) 了。如果有找到之后就會(huì)調(diào)用 Future.successful 這個(gè)方法,反之則會(huì) 通過 Future.failed 拋出一個(gè)異常。

ask 可以算是比較核心的一個(gè)方法,我們可以到 ask 方法中去看看。

class NettyRpcEnv{
    ......
    private[netty] def ask[T: ClassTag](message: RequestMessage, timeout: RpcTimeout): Future[T] = {
      val promise = Promise[Any]()
      val remoteAddr = message.receiver.address
      //
      def onFailure(e: Throwable): Unit = {
  //      println("555");
        if (!promise.tryFailure(e)) {
          log.warn(s"Ignored failure: $e")
        }
      }
  
      def onSuccess(reply: Any): Unit = reply match {
        case RpcFailure(e) => onFailure(e)
        case rpcReply =>
          println("666");
          if (!promise.trySuccess(rpcReply)) {
            log.warn(s"Ignored message: $reply")
          }
      }
  
      try {
        if (remoteAddr == address) {
          val p = Promise[Any]()
          p.future.onComplete {
            case Success(response) => onSuccess(response)
            case Failure(e) => onFailure(e)
          }(ThreadUtils.sameThread)
          dispatcher.postLocalMessage(message, p)
        } else {
          //跳轉(zhuǎn)到這里執(zhí)行
          //封裝一個(gè) RpcOutboxMessage ,同時(shí) onSuccess 方法也是在這里注冊(cè)的。
          val rpcMessage = RpcOutboxMessage(serialize(message),
            onFailure,
            (client, response) => onSuccess(deserialize[Any](client, response)))
          postToOutbox(message.receiver, rpcMessage)
          promise.future.onFailure {
            case _: TimeoutException =>  println("111");rpcMessage.onTimeout()
  //          case _ => println("222");
          }(ThreadUtils.sameThread)
        }
        
        val timeoutCancelable = timeoutScheduler.schedule(new Runnable {
          override def run(): Unit = {
  //          println("333");
            onFailure(new TimeoutException(s"Cannot receive any reply in ${timeout.duration}"))
          }
        }, timeout.duration.toNanos, TimeUnit.NANOSECONDS)
        //promise 對(duì)應(yīng)的 future onComplete時(shí)會(huì)去調(diào)用,但當(dāng) successful 的時(shí)候,上面的 run 并不會(huì)被調(diào)用。
        promise.future.onComplete { v =>
  //        println("4444");
          timeoutCancelable.cancel(true)
        }(ThreadUtils.sameThread)
  
      } catch {
        case NonFatal(e) =>
          onFailure(e)
      }
  
      promise.future.mapTo[T].recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
    }
    ......
}

這里涉及到使用一些 scala 多線程的高級(jí)用法,包括 Promise 和 Future。如果想要對(duì)這些有更加深入的了解,可以參考這篇文章

這個(gè)函數(shù)的作用從名字中就可以看得出,其實(shí)就是將要 發(fā)送的消息封裝成一個(gè) RpcOutboxMessage ,然后交給 OutBox 去發(fā)送,OutBox 和前面所說的 InBox 對(duì)應(yīng),對(duì)應(yīng) Actor 模型中的 MailBox(信箱)。用于發(fā)送和接收消息。

其中使用到了 Future 和 Promise 進(jìn)行異步并發(fā)以及錯(cuò)誤處理,比如當(dāng)發(fā)送時(shí)間超時(shí)的時(shí)候 Promise 就會(huì)返回一個(gè) TimeoutException ,而我們就可以設(shè)置自己的 onFailure 函數(shù)去處理這些異常。

OK,注冊(cè)完 RpcEndpointRef 后我們便可以用它來向服務(wù)端發(fā)送消息了,而其實(shí) RpcEndpointRef 發(fā)送消息還是調(diào)用 ask 方法,就是上面的那個(gè) ask 方法。上面也有介紹,本質(zhì)上就是通過 OutBox 進(jìn)行處理。

我們來梳理一下客戶端的發(fā)送流程。

客戶端邏輯小結(jié):客戶端和服務(wù)端比較類似,都是需要?jiǎng)?chuàng)建一個(gè) NettyRpcEnv 。不同的是接下來客戶端創(chuàng)建的是 RpcEndpointRef ,并用之向服務(wù)端對(duì)應(yīng)的 RpcEndpoint 發(fā)送消息。

1. NettyRpcEnvFactory 創(chuàng)建 NettyRpcEnv

  • 根據(jù)地址創(chuàng)建 NettyRpcEnv。 根據(jù)地址開啟監(jiān)聽服務(wù),向 Dispacther 注冊(cè)一個(gè) RpcEndpointVerifier 等待。

2. 創(chuàng)建 RpcEndpointRef

  • 創(chuàng)建一個(gè)新的 RpcEndpointRef
  • 創(chuàng)建對(duì)應(yīng)的 verifier ,使用 verifier 向服務(wù)端發(fā)送請(qǐng)求,判斷對(duì)應(yīng)的 RpcEndpoint 是否存在。若存在,返回該 RpcEndpointRef ,否則拋出異常。

3. RpcEndpointRef 使用同步或者異步的方式發(fā)送請(qǐng)求。

OK,以上就是 Spark RPC 時(shí)序的源碼分析。下一篇會(huì)將一個(gè)實(shí)際的例子,Spark 的心跳機(jī)制和代碼。喜歡的話就關(guān)注一波吧

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容