Apache Spark之Rpc(下)

SparkNetty.png

0. Server的創(chuàng)建與啟動(dòng)

上半部分介紹了executor通過rpcEnv獲取SparkConfig,介紹了Client的創(chuàng)建,鏈接,以及消息是如何通過rpcEnv進(jìn)行發(fā)送,最后又是如何將返回值給到調(diào)用者。

接下來看一下,server端在獲取到請(qǐng)求后是如何投遞消息到正確的RpcEndpoint以及如何將響應(yīng)數(shù)據(jù)寫出的。

上文中介紹到TransportContext,這里封裝了Netty Server / Client的Handler的操作。Client端處理返回值是靠TransportResponseHandler, Server端處理請(qǐng)求是靠的TransportRequestHandler與NettyRpcHandler

作為在sparkContext中初始化的一部分,NettyRpcEnv會(huì)啟動(dòng)執(zhí)行啟TransportServer.

Server作為rpc請(qǐng)求的入口,Server收到請(qǐng)求后會(huì)將消息交給Dispatcher進(jìn)行路由,投遞到正確的RpcEndpoint的消息隊(duì)列中

NettyRpcEnv初始化

NettyRpcEnv#create

def create(config: RpcEnvConfig): RpcEnv = {
    ...
  //創(chuàng)建nettyRpcEnv
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
  //as server
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      ...
      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
     }
    nettyEnv
  }

NettyRpcEnv#startServer

def startServer(bindAddress: String, port: Int): Unit = {
    ...
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

transportContext作為NettyRpcEnv的字段,實(shí)例化時(shí)已經(jīng)創(chuàng)建好了。啟動(dòng)完成后立即就注冊(cè)了一個(gè)RpcEndpoint。這個(gè)在跟蹤完啟動(dòng)過程再來敘述。

[common/network-common] org.apache.spark.network.TransportContext#createServer

 /** Create a server which will attempt to bind to a specific host and port. */
  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

進(jìn)一跟進(jìn)TransportServer的創(chuàng)建

TransportServer

[common/network-common] org.apache.spark.network.server.TransportServer

public class TrasnportServer implements closeable {
  //唯一的構(gòu)造函數(shù)
  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    ...
    init(hostToBind, portToBind);
    ...
  }
  
 private void init(String hostToBind, int portToBind) {
   ...
   //標(biāo)準(zhǔn)的netty server的建立了
   EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
      conf.getModuleName() + "-server");
   bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, pooledAllocator)
      .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
      .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
   ...
   //這里面需要大篇幅介紹,暫時(shí)就不管了,先關(guān)注server建立
   bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      ...
    });
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
   ...
 }
}

NettyRpcEnv啟動(dòng)了server之后,緊接著就注冊(cè)了一個(gè)RpcEndpoit。這里就引入了新的概念Dispatcher, RpcEndpoint

Dispatcher

[spark-core] org.apache.spark.rpc.netty.Dispatcher

//A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {

  private val endpoints: ConcurrentMap[String, MessageLoop] // 注冊(cè)所有的MessageLoop
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //endpoint與endpointRef的映射
  private lazy val sharedLoop: SharedMessageLoop //多個(gè)endpoint共享的MessageLoop.
  ...
  
 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
   //位置標(biāo)識(shí)一個(gè)rpcEndpoint, RpcEndponitRef也是靠這個(gè)對(duì)象尋址RpcEndpoint
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      ...
      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is called.
      endpointRefs.put(endpoint, endpointRef)
      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
          //共享messagLoop,主要是共用線程池,內(nèi)部提供更多的操作可以互斥/并行
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {...}
    }
    endpointRef
  }
}

在看TransportServer Handler的設(shè)置

[common/network-common] org.apache.spark.network.server.TransportServer#init

private void init(String hostToBind, int portToBind) {
  ...
  //這里的代碼和client端配置handler非常的相似
  bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());

        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        //server/client都通過TransportContext進(jìn)行handler的邏輯控制
        context.initializePipeline(ch, rpcHandler);
      }
    });
  ...
}

Server Netty Handler

繼續(xù)回顧TransportServer啟動(dòng)的過程,注冊(cè)了Channel Handler來處理socket鏈接。TransportContext創(chuàng)建了TransportChannelHandler并注冊(cè)到了ServerBootstrap.childeHanlder中。即TransportChannelHandler的責(zé)任就是處理外部請(qǐng)求。

[common/network-common] org.apache.spark.network.TranportContext#createChannelHandler

/**
   * Creates the server- and client-side handler which is used to handle both RequestMessages and
   * ResponseMessages. The channel is expected to have been successfully created, though certain
   * properties (such as the remoteAddress()) may not be available yet.
   */
  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    //client處理返回值
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
    ChunkFetchRequestHandler chunkFetchRequestHandler = null;
    if (!separateChunkFetchRequest) {
      chunkFetchRequestHandler = new ChunkFetchRequestHandler(
        client, rpcHandler.getStreamManager(),
        conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
    }
    //server處理請(qǐng)求
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
  }

1.接收到請(qǐng)求

當(dāng)請(qǐng)求來到server端時(shí)。請(qǐng)求從TransportChannelHandler委托給了TransportRequestHandler。這里處理的請(qǐng)求種類很多,我們只關(guān)注RpcRequest這類請(qǐng)求,其它種類的請(qǐng)求會(huì)在其它模塊中涉及到。

TransportRequestHandler

[common/network-common] org.apache.spark.network.server.TransportRequestHandler

/**
 * A handler that processes requests from clients and writes chunk data back. Each handler is
 * attached to a single Netty channel, and keeps track of which streams have been fetched via this
 * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
 *
 * The messages should have been processed by the pipeline setup by {@link TransportServer}.
 */
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
  /** The Netty channel that this handler is associated with. */
  private final Channel channel;
  /** Client on the same channel allowing us to talk back to the requester. */
  private final TransportClient reverseClient;
  /** Handles all RPC messages. */
  private final RpcHandler rpcHandler;
  ...
    
  @Override
  public void handle(RequestMessage request) throws Exception {
    if (request instanceof ChunkFetchRequest) {
      chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
      //現(xiàn)在我們關(guān)注遠(yuǎn)程的rpc請(qǐng)求
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else if (request instanceof UploadStream) {
      processStreamUpload((UploadStream) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }
  //處理rpcRequest
  private void processRpcRequest(final RpcRequest req) {
    try {
      //委托給rpcHandler
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        //回調(diào),處理最后的返回
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
        ...
      });
    } catch (Exception e) {...}
  }
}

rpcHandler是NettyEnv在創(chuàng)建TransportContext時(shí)一同創(chuàng)建的,作為了TransportContext的構(gòu)造函數(shù)參數(shù),同樣,TransportContext在創(chuàng)建TransportRequestHandler時(shí)作為構(gòu)造函數(shù)參數(shù)傳入,這里的rpcHandler即為NettyRpcHandler,該類與NettyRpcEnv在同一文件中。

[spark-core] org.apache.spark.rpc.netty.NettyRpcHandler#receive

 override def receive(
      client: TransportClient,
      message: ByteBuffer,
      callback: RpcResponseCallback): Unit = {
    val messageToDispatch = internalReceive(client, message)
   //交給Dispatcher去路由到正確的endpoint
    dispatcher.postRemoteMessage(messageToDispatch, callback)
  }

識(shí)別RpcEndpoint

如何知道正確的RpcEndpoint, 從上篇知道,對(duì)一個(gè)endpoint發(fā)起請(qǐng)求需要使用其對(duì)應(yīng)的RpcEndpointRef,RpcEndpoint都綁定了一個(gè)endpointName,那么就需要在請(qǐng)求體中帶有endpoint的識(shí)別信息,server端才能找到對(duì)應(yīng)的RpcEndpoint。

[spark-core] org.apache.spark.rpc.netty.RequestMessae#serialze

/** Manually serialize [[RequestMessage]] to minimize the size. */
  def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
    val bos = new ByteBufferOutputStream()
    val out = new DataOutputStream(bos)
    try {
      //寫入本地地址
      writeRpcAddress(out, senderAddress)
      //寫入server端地址
      writeRpcAddress(out, receiver.address)
      //寫endpointName
      out.writeUTF(receiver.name)
      val s = nettyEnv.serializeStream(out)
      try {
        //case class,以我們的例子,這里就是 RetrieveSparkAppConfig
        s.writeObject(content)
      } finally {
        s.close()
      }
    } finally {
      out.close()
    }
    bos.toByteBuffer
  }

2.消息投遞 Dispatcher

繼續(xù)回到Dispatcher中,跟蹤消息如何投遞到endpoint.

[spark-core] org.apache.spark.rpc.netty.Dispatcher

/**
 * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
 */
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
  //endponitName -> messageLoop
  private val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]
  ...//
  
  /** Posts a message sent by a remote endpoint. */
  def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
    val rpcCallContext =
      new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
  }
  /**
   * Posts a message to a specific endpoint.
   *
   * @param endpointName name of the endpoint.
   * @param message the message to post
   * @param callbackIfStopped callback function if the endpoint is stopped.
   */
  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    ... 
    //根據(jù)request消息體里的endpointName找到與endponit綁定的Message隊(duì)列
    val loop = endpoints.get(endpointName)
    ...
    //投遞消息
    loop.post(endpointName, message)
    ... 
  }
}

到此消息就已經(jīng)投遞到正確的endpoint的消息隊(duì)列中。接下來我們需要跟蹤消息處理,查看消息的處理與返回過程。

3.Endpoint消費(fèi)消息

回顧一下上面說到的Server端的啟動(dòng),driverEndpoint是注冊(cè)到Dispatcher中,在注冊(cè)過程中,dispacther還為他創(chuàng)建了一個(gè)MessageLoop。MessageLoop的設(shè)計(jì)稍后詳細(xì)分析?,F(xiàn)在繼續(xù)跟蹤請(qǐng)求。

DriverEndpoint使用的是DedicatedMessageLoop為MessageLoop的子類。是一種單獨(dú)給一個(gè)endpoint使用MessageLoop。 MessageLoop還可以是共享,多個(gè)endponit共用一個(gè)MessageLoop,但是每個(gè)endpoint都用一個(gè)屬于自己的Inbox,這里的Inbox可以比作是信箱,Dispatcher的路由過程就是找到正確的Inbox,讓后把消息加入到Inbox的消息隊(duì)列中。

[spark-core] org.apache.spark.rpc.netty.DedicatedMessageLoop

/**
 * A message loop that is dedicated to a single RPC endpoint.
 */
private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher) extends MessageLoop(dispatcher) { 
  private val inbox = new Inbox(name, endpoint) //綁定的信箱
    //inbox的工作線程
  override protected val threadpool = if (endpoint.threadCount() > 1) {
    ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
  } else {
    ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
  }
  //創(chuàng)建實(shí)例時(shí)就啟動(dòng)線程池開始工作
  (1 to endpoint.threadCount()).foreach { _ =>
    threadpool.submit(receiveLoopRunnable)
  }
  //同樣,在創(chuàng)建實(shí)例時(shí)標(biāo)記inbox為激活狀態(tài)
  //工作線程從active的inboxs隊(duì)列中取出激活的inbox開始處理消息
  // Mark active to handle the OnStart message.
  setActive(inbox)
  //投遞消息到inbox的消息隊(duì)列中
  override def post(endpointName: String, message: InboxMessage): Unit = {
    require(endpointName == name)
    inbox.post(message)
    setActive(inbox)
  }
} 

到這里,消息一經(jīng)放入到Inbox中,且當(dāng)前的Inbox處于激活狀態(tài)。那么在Inbox的工作線程里,就會(huì)對(duì)消息進(jìn)行處理,也就是receiveLoopRunnable進(jìn)行。這個(gè)變量屬于MessageLoop這個(gè)類,是一個(gè)Runable實(shí)例,run方法執(zhí)行了MessageLoop的receiveLoop.

[spark-core] org.apache.spark.rpc.netty.MessageLoop#receiveLoop

  private def receiveLoop(): Unit = {
    try {
      while (true) {
        try {
          //active是一個(gè)隊(duì)列,表示哪些inbox有消息
          val inbox = active.take()
          //PoisonPill中斷信號(hào)
          if (inbox == MessageLoop.PoisonPill) {
            // Put PoisonPill back so that other threads can see it.
            setActive(MessageLoop.PoisonPill)
            return
          }
          //處理消息
          inbox.process(dispatcher)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {...}
}

Inbox根據(jù)消息的類型同選擇調(diào)用endpoint的方法。

對(duì)于我們的請(qǐng)求數(shù)據(jù)類型。進(jìn)入到DriverEndpoint的receiveAndReply方法

[spark-core] org.apache.spark.scheduer.cluster.DriverEndpoint#

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  ...
  //獲取SparkAppConfig
  case RetrieveSparkAppConfig(resourceProfileId) =>
    // note this will be updated in later prs to get the ResourceProfile from a
    // ResourceProfileManager that matches the resource profile id
    // for now just use default profile
    val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
    val reply = SparkAppConfig(
          sparkProperties,
          SparkEnv.get.securityManager.getIOEncryptionKey(),
          Option(delegationTokens.get()),
          rp)
    context.reply(reply)
}

這里的context變量是對(duì)TransprtRequestHandler總處理請(qǐng)求的RpcResponseCallback的封裝,也就是RemoteNettyRpcCallContext,

4.返回請(qǐng)求

這樣多層的封裝。我們看到,整個(gè)通信會(huì)有多種這樣的對(duì)象封裝對(duì)象,似乎表達(dá)的都是一個(gè)意思。但是這里的做法,是為了對(duì)上層使用屏蔽掉本地或者遠(yuǎn)程。對(duì)使用者透明,實(shí)現(xiàn)層進(jìn)行多維度封裝。

那么進(jìn)行到這里,我們就了解到,最終返回是觸發(fā)TransportRequestHandler中的定義的回調(diào)。

[common/network-common] org.apache.spark.network.server.TransportRequestHandler#processRpcRequest

   rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          //endpoint完成處理,寫出返回值
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
      ...//
      });

[common/network-common] org.apache.spark.network.server.TransportRequestHandler#respond

/**
   * Responds to a single message with some Encodable object. If a failure occurs while sending,
   * it will be logged and the channel closed.
   */
  private ChannelFuture respond(Encodable result) {
    SocketAddress remoteAddress = channel.remoteAddress();
    //通過netty channel將數(shù)據(jù)返回client端
    return channel.writeAndFlush(result).addListener(future -> {
      if (future.isSuccess()) {
        logger.trace("Sent result {} to client {}", result, remoteAddress);
      } else {
        logger.error(String.format("Error sending result %s to %s; closing connection",
          result, remoteAddress), future.cause());
        channel.close();
      }
    });
   }
 }

注:基于Apache Spark 3.0

作者:pokerwu
本作品采用知識(shí)共享署名-非商業(yè)性使用 4.0 國際許可協(xié)議進(jìn)行許可。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容