Spark Core源碼精讀計劃 番外篇B-1:重回Spark RPC環(huán)境

目錄

前言

又是很久沒有連載,萬分抱歉。今天(注:其實(shí)也包含昨天)需要盯著雙11各個實(shí)時任務(wù)的運(yùn)行,目前仍然無事發(fā)生,抽空來寫幾筆吧。

在前面的文章中,我們了解了塊管理器BlockManager管理下的讀寫流程。并且已經(jīng)知道,BlockManager讀取塊時,如果塊在本地找不到,就會去集群內(nèi)的遠(yuǎn)端節(jié)點(diǎn)去獲取。同理,如果BlockManager寫入塊時需要復(fù)制,那么除了在本地寫入之外,也要再寫一份到遠(yuǎn)端節(jié)點(diǎn)。BlockManager與遠(yuǎn)端節(jié)點(diǎn)的交互就得依賴塊傳輸服務(wù)BlockTransferService。但是BlockTransferService需要依賴之前偷懶沒有講過的RPC底層組件,所以現(xiàn)在得把這個坑填上,計劃用3篇文章來填。

由于Spark 2.x的RPC環(huán)境是完全基于Netty搞的,所以如果看官對Netty有基本的了解的話,讀起來會順暢一點(diǎn)。

RPC底層概覽

在系列的文章#8中,我們講到了RPC環(huán)境——即NettyRpcEnv的構(gòu)建和屬性成員。來復(fù)習(xí)一下。

代碼#B1.1 - o.a.s.rpc.netty.NettyRpcEnv中的部分屬性成員

  private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", 0))

  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)

  private val streamManager = new NettyStreamManager(this)

  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager))

  private val clientFactory = transportContext.createClientFactory(createClientBootstraps())

  @volatile private var server: TransportServer = _
  • TransportConf:傳輸配置,作用在RPC環(huán)境中類似于SparkConf,負(fù)責(zé)管理與RPC相關(guān)的各種配置。
  • Dispatcher:調(diào)度器,或者叫分發(fā)器,用于將消息路由到正確的RPC端點(diǎn),它的邏輯在文章#9中已經(jīng)講過,就不再提了。
  • StreamManager:流式管理器,用于處理RPC環(huán)境中的文件,如自定義的配置文件或者JAR包。
  • TransportContext:傳輸上下文,作用在RPC環(huán)境中類似于SparkContext,負(fù)責(zé)管理RPC的服務(wù)端(TransportServer)與客戶端(TransportClient),和它們之間的Netty Channel和Pipeline。
  • TransportClientFactory:創(chuàng)建RPC客戶端TransportClient的工廠。
  • TransportServer:RPC環(huán)境中的服務(wù)端,負(fù)責(zé)提供基礎(chǔ)且高效的流式服務(wù)。

這些東西就是Spark RPC底層主要的組成部分,之前并沒有了解過,下面我們從TransportConf、TransportContext這兩樣開始探究。

傳輸配置TransportConf

在Spark源碼中并不會顯式地創(chuàng)建TransportConf實(shí)例,而是通過SparkTransportConf對象代為實(shí)現(xiàn)。該對象的源碼很短,如下。

代碼#B1.2 - o.a.s.network.netty.SparkTransportConf對象

object SparkTransportConf {
  private val MAX_DEFAULT_NETTY_THREADS = 8

  def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
    val conf = _conf.clone

    val numThreads = defaultNumThreads(numUsableCores)
    conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
    conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)

    new TransportConf(module, new ConfigProvider {
      override def get(name: String): String = conf.get(name)
      override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
      override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
        conf.getAll.toMap.asJava.entrySet()
      }
    })
  }

  private def defaultNumThreads(numUsableCores: Int): Int = {
    val availableCores =
      if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
    math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
  }
}

可見,SparkTransportConf.fromSparkConf()方法負(fù)責(zé)從SparkConf持有的參數(shù)創(chuàng)建TransportConf。TransportConf接受的構(gòu)造參數(shù)有二:

  • module:該RPC配置所屬的模塊名,比如代碼#B.1中創(chuàng)建NettyRpcEnv時,模塊名就是"rpc"。
  • ConfigProvider:一個簡單的負(fù)責(zé)數(shù)據(jù)類型轉(zhuǎn)化的類,能夠讀取SparkConf中與RPC相關(guān)的配置,并傳遞給TransportConf。在如上的代碼中,是直接初始化了一個匿名類。

這樣,我們就可以通過SparkConf持有、并通過ConfigProvider獲取Spark RPC的所有配置參數(shù)。換句話說,TransportConf就是SparkConf的一個子集,SparkConf仍然是配置的唯一入口,方便統(tǒng)一管理。

上面代碼中需要初始化的參數(shù)如下:

  • spark.rpc.io.numConnectionsPerPeer:在每對RPC實(shí)體間建立的連接數(shù)量,默認(rèn)是1;
  • spark.rpc.io.threads:RPC服務(wù)端和客戶端的線程數(shù),默認(rèn)為0,即不設(shè)置。在此值不設(shè)置的情況下,最終會采用可用的核心數(shù)與MAX_DEFAULT_NETTY_THREADS常量(為8)之間的較小值。

TransportConf類的實(shí)現(xiàn)就非常簡單了,主要由很多get方法組成。以下是部分代碼,不再贅述。

代碼#B1.3 - o.a.s.network.netty.TransportConf類的部分代碼

  public TransportConf(String module, ConfigProvider conf) {
    this.module = module;
    this.conf = conf;
    SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
    SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
    SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
    // ...略...
  }

  public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT); }

  public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); }

  public int connectionTimeoutMs() {
    long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
      conf.get("spark.network.timeout", "120s"));
    long defaultTimeoutMs = JavaUtils.timeStringAsSec(
      conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
    return (int) defaultTimeoutMs;
  }

  public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); }

  public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }

  public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }

  public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }

  public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }

傳輸上下文TransportContext

既然SparkContext是Spark Core功能的主要入口,那么TransportContext自然就是Spark RPC環(huán)境的入口了。它比前面講過的NettyRpcEnv更加底層,如果沒有它,RPC環(huán)境也就無從談起了。

成員屬性與構(gòu)造方法

代碼#B1.4 - o.a.s.network.TransportContext類的成員屬性與構(gòu)造方法

  private final TransportConf conf;
  private final RpcHandler rpcHandler;
  private final boolean closeIdleConnections;

  private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
  private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;

  public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
    this(conf, rpcHandler, false);
  }

  public TransportContext(
      TransportConf conf,
      RpcHandler rpcHandler,
      boolean closeIdleConnections) {
    this.conf = conf;
    this.rpcHandler = rpcHandler;
    this.closeIdleConnections = closeIdleConnections;
  }

以下3個成員屬性同時也是TransportContext構(gòu)造方法的參數(shù):

  • conf:即RPC傳輸配置TransportConf;
  • rpcHandler:RPC消息處理器RpcHandler,是一個抽象類。顧名思義,它其中包含了所有RPC消息的具體處理邏輯;
  • closeIdleConnections:表示是否關(guān)閉空閑連接的標(biāo)志。

另外還有兩個常量ENCODER和DECODER。前者是消息編碼器MessageEncoder的實(shí)例,由Netty提供的MessageToMessageEncoder派生而來,RPC服務(wù)端使用它來編碼向客戶端發(fā)送的消息;后者是消息解碼器MessageDecoder的實(shí)例,由Netty提供的MessageToMessageDecoder派生而來,RPC客戶端使用它來解碼從服務(wù)端收到的消息。

下面具體看看TransportContext提供的方法,通過這些方法,我們能見識到更多其他由TransportContext創(chuàng)建的RPC組件。

創(chuàng)建傳輸客戶端工廠TransportClientFactory

代碼#B1.5 - o.a.s.network.TransportContext.createClientFactory()方法

  public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
    return new TransportClientFactory(this, bootstraps);
  }

  public TransportClientFactory createClientFactory() {
    return createClientFactory(new ArrayList<>());
  }

createClientFactory()方法負(fù)責(zé)創(chuàng)建傳輸客戶端工廠TransportClientFactory,由TransportClientFactory進(jìn)而可以創(chuàng)建更多的傳輸客戶端TransportClient。其參數(shù)中的TransportClientBootstrap表示傳輸客戶端的初始化邏輯(通常是一些一次性的工作,比如讓它們攜帶SASL認(rèn)證的token等)。

createClientFactory()方法是直接調(diào)用了TransportClientFactory的構(gòu)造方法,關(guān)于它的邏輯,下篇文章詳細(xì)講。

創(chuàng)建傳輸服務(wù)端TransportServer

代碼#B1.6 - o.a.s.network.TransportContext.createServer()方法

  public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, null, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

  public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
    return createServer(0, bootstraps);
  }

  public TransportServer createServer() {
    return createServer(0, new ArrayList<>());
  }

createServer()方法有4個重載,可以指定RPC服務(wù)端要綁定到的主機(jī)地址和端口號,以及初始化邏輯TransportServerBootstrap。TransportServer的構(gòu)造方法和相關(guān)細(xì)節(jié)會與TransportClientFactory一起講。

初始化Netty ChannelPipeline與ChannelHandler

先來看initializePipeline()方法。

代碼#B1.7 - o.a.s.network.TransportContext.initializePipeline()方法

  public TransportChannelHandler initializePipeline(SocketChannel channel) {
    return initializePipeline(channel, rpcHandler);
  }

  public TransportChannelHandler initializePipeline(
      SocketChannel channel,
      RpcHandler channelRpcHandler) {
    try {
      TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
      channel.pipeline()
        .addLast("encoder", ENCODER)
        .addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
        .addLast("decoder", DECODER)
        .addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
        .addLast("handler", channelHandler);
      return channelHandler;
    } catch (RuntimeException e) {
      logger.error("Error while initializing Netty pipeline", e);
      throw e;
    }
  }

這段代碼看起來可能有些sophisticated,我們先回憶一下Netty的基礎(chǔ)知識。

在Netty架構(gòu)中,Channel(通道)是消息通信的載體,ChannelHandler(通道處理器)則負(fù)責(zé)Channel中消息通信具體邏輯的實(shí)現(xiàn)。而ChannelPipeline(通道管線)將多個ChannelHandler按順序組織起來,ChannelEvent(消息體)就按照ChannelPipeline規(guī)定的順序流轉(zhuǎn)。下面用一幅簡圖來表示它們之間的關(guān)系。

圖#B1.1 - Netty Channel、ChannelPipeline、ChannelHandler之間的關(guān)系

在ChannelPipeline內(nèi)部用雙鏈表來維護(hù)ChannelHandler以及它對應(yīng)的上下文實(shí)例ChannelHandlerContext,另外還有特殊的頭節(jié)點(diǎn)HeadContext和尾節(jié)點(diǎn)TailContext。Netty的這種設(shè)計可以讓用戶專注于實(shí)現(xiàn)ChannelHandler的邏輯細(xì)節(jié),這大概也是Spark開發(fā)者們所看重的優(yōu)點(diǎn)之一吧。

從上文代碼中的initializePipeline()方法可以看出,通過鏈?zhǔn)秸{(diào)用ChannelPipeline.addLast()方法,按順序添加了以下ChannelHandler:

  • 消息編碼器MessageEncoder;
  • 幀解碼器TransportFrameDecoder;
  • 消息解碼器MessageDecoder;
  • 空閑狀態(tài)處理器IdleStateHandler;
  • 真正的RPC處理邏輯TransportChannelHandler。它是Spark RPC環(huán)境專用的ChannelHandler。

它們的類圖如下所示。

圖#B1.2 - Spark RPC ChannelPipeline內(nèi)的組件類圖

其中,實(shí)現(xiàn)了ChannelInboundHandler接口的處理器用于處理請求消息,而實(shí)現(xiàn)了ChannelOutboundHandler接口的處理器用于處理響應(yīng)消息,并且它們的順序是相反的。因此,處理請求的流程是:TransportFrameDecoder→MessageDecoder→IdleStateHandler→TransportChannelHandler,處理響應(yīng)的流程是:IdleStateHandler→MessageEncoder。

那么TransportChannelHandler是哪里來的呢?它是由createChannelHandler()方法創(chuàng)建的。

代碼#B1.8 - o.a.s.network.TransportContext.createChannelHandler()方法

  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred());
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), closeIdleConnections);
  }

由代碼可見,一個TransportChannelHandler實(shí)際上由三個組件組成:

  • TransportResponseHandler,是客戶端處理服務(wù)端發(fā)回的響應(yīng)消息的處理器;
  • TransportClient,單獨(dú)的RPC客戶端,不受TransportClientFactory的管理;
  • TransportRequestHandler,是服務(wù)端處理客戶端發(fā)來的請求消息的處理器。

注意,雖然TransportResponseHandler和TransportRequestHandler的名稱里都有“Handler”,但它們不是Netty層面上的東西,僅僅是Spark內(nèi)置的MessageHandler抽象類的實(shí)現(xiàn)而已,它規(guī)定了處理請求和響應(yīng)的一些基本規(guī)范,后文會講解到。

圖#B1.3 - MessageHandler類圖

總結(jié)

本文講解了RPC環(huán)境中的傳輸配置TransportConf與傳輸上下文TransportContext的細(xì)節(jié),探究了由TransportContext初始化的傳輸客戶端工廠TransportClientFactory、傳輸服務(wù)端TransportServer,最后結(jié)合Netty的部分知識講解了ChannelPipeline與ChannelHandler的初始化邏輯。

民那晚安。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容