
0. Server的創(chuàng)建與啟動(dòng)
上半部分介紹了executor通過rpcEnv獲取SparkConfig,介紹了Client的創(chuàng)建,鏈接,以及消息是如何通過rpcEnv進(jìn)行發(fā)送,最后又是如何將返回值給到調(diào)用者。
接下來看一下,server端在獲取到請(qǐng)求后是如何投遞消息到正確的RpcEndpoint以及如何將響應(yīng)數(shù)據(jù)寫出的。
上文中介紹到TransportContext,這里封裝了Netty Server / Client的Handler的操作。Client端處理返回值是靠TransportResponseHandler, Server端處理請(qǐng)求是靠的TransportRequestHandler與NettyRpcHandler
作為在sparkContext中初始化的一部分,NettyRpcEnv會(huì)啟動(dòng)執(zhí)行啟TransportServer.
Server作為rpc請(qǐng)求的入口,Server收到請(qǐng)求后會(huì)將消息交給Dispatcher進(jìn)行路由,投遞到正確的RpcEndpoint的消息隊(duì)列中
NettyRpcEnv初始化
NettyRpcEnv#create
def create(config: RpcEnvConfig): RpcEnv = {
...
//創(chuàng)建nettyRpcEnv
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
//as server
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
...
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
}
nettyEnv
}
NettyRpcEnv#startServer
def startServer(bindAddress: String, port: Int): Unit = {
...
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
transportContext作為NettyRpcEnv的字段,實(shí)例化時(shí)已經(jīng)創(chuàng)建好了。啟動(dòng)完成后立即就注冊(cè)了一個(gè)RpcEndpoint。這個(gè)在跟蹤完啟動(dòng)過程再來敘述。
[common/network-common] org.apache.spark.network.TransportContext#createServer
/** Create a server which will attempt to bind to a specific host and port. */
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
進(jìn)一跟進(jìn)TransportServer的創(chuàng)建
TransportServer
[common/network-common] org.apache.spark.network.server.TransportServer
public class TrasnportServer implements closeable {
//唯一的構(gòu)造函數(shù)
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
...
init(hostToBind, portToBind);
...
}
private void init(String hostToBind, int portToBind) {
...
//標(biāo)準(zhǔn)的netty server的建立了
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
...
//這里面需要大篇幅介紹,暫時(shí)就不管了,先關(guān)注server建立
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
...
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
...
}
}
NettyRpcEnv啟動(dòng)了server之后,緊接著就注冊(cè)了一個(gè)RpcEndpoit。這里就引入了新的概念Dispatcher, RpcEndpoint
Dispatcher
[spark-core] org.apache.spark.rpc.netty.Dispatcher
//A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private val endpoints: ConcurrentMap[String, MessageLoop] // 注冊(cè)所有的MessageLoop
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //endpoint與endpointRef的映射
private lazy val sharedLoop: SharedMessageLoop //多個(gè)endpoint共享的MessageLoop.
...
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
//位置標(biāo)識(shí)一個(gè)rpcEndpoint, RpcEndponitRef也是靠這個(gè)對(duì)象尋址RpcEndpoint
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
...
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is called.
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
//共享messagLoop,主要是共用線程池,內(nèi)部提供更多的操作可以互斥/并行
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {...}
}
endpointRef
}
}
在看TransportServer Handler的設(shè)置
[common/network-common] org.apache.spark.network.server.TransportServer#init
private void init(String hostToBind, int portToBind) {
...
//這里的代碼和client端配置handler非常的相似
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
//server/client都通過TransportContext進(jìn)行handler的邏輯控制
context.initializePipeline(ch, rpcHandler);
}
});
...
}
Server Netty Handler
繼續(xù)回顧TransportServer啟動(dòng)的過程,注冊(cè)了Channel Handler來處理socket鏈接。TransportContext創(chuàng)建了TransportChannelHandler并注冊(cè)到了ServerBootstrap.childeHanlder中。即TransportChannelHandler的責(zé)任就是處理外部請(qǐng)求。
[common/network-common] org.apache.spark.network.TranportContext#createChannelHandler
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
* properties (such as the remoteAddress()) may not be available yet.
*/
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
//client處理返回值
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
ChunkFetchRequestHandler chunkFetchRequestHandler = null;
if (!separateChunkFetchRequest) {
chunkFetchRequestHandler = new ChunkFetchRequestHandler(
client, rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
}
//server處理請(qǐng)求
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}
1.接收到請(qǐng)求
當(dāng)請(qǐng)求來到server端時(shí)。請(qǐng)求從TransportChannelHandler委托給了TransportRequestHandler。這里處理的請(qǐng)求種類很多,我們只關(guān)注RpcRequest這類請(qǐng)求,其它種類的請(qǐng)求會(huì)在其它模塊中涉及到。
TransportRequestHandler
[common/network-common] org.apache.spark.network.server.TransportRequestHandler
/**
* A handler that processes requests from clients and writes chunk data back. Each handler is
* attached to a single Netty channel, and keeps track of which streams have been fetched via this
* channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
*
* The messages should have been processed by the pipeline setup by {@link TransportServer}.
*/
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** The Netty channel that this handler is associated with. */
private final Channel channel;
/** Client on the same channel allowing us to talk back to the requester. */
private final TransportClient reverseClient;
/** Handles all RPC messages. */
private final RpcHandler rpcHandler;
...
@Override
public void handle(RequestMessage request) throws Exception {
if (request instanceof ChunkFetchRequest) {
chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
//現(xiàn)在我們關(guān)注遠(yuǎn)程的rpc請(qǐng)求
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
//處理rpcRequest
private void processRpcRequest(final RpcRequest req) {
try {
//委托給rpcHandler
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
//回調(diào),處理最后的返回
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
...
});
} catch (Exception e) {...}
}
}
rpcHandler是NettyEnv在創(chuàng)建TransportContext時(shí)一同創(chuàng)建的,作為了TransportContext的構(gòu)造函數(shù)參數(shù),同樣,TransportContext在創(chuàng)建TransportRequestHandler時(shí)作為構(gòu)造函數(shù)參數(shù)傳入,這里的rpcHandler即為NettyRpcHandler,該類與NettyRpcEnv在同一文件中。
[spark-core] org.apache.spark.rpc.netty.NettyRpcHandler#receive
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
//交給Dispatcher去路由到正確的endpoint
dispatcher.postRemoteMessage(messageToDispatch, callback)
}
識(shí)別RpcEndpoint
如何知道正確的RpcEndpoint, 從上篇知道,對(duì)一個(gè)endpoint發(fā)起請(qǐng)求需要使用其對(duì)應(yīng)的RpcEndpointRef,RpcEndpoint都綁定了一個(gè)endpointName,那么就需要在請(qǐng)求體中帶有endpoint的識(shí)別信息,server端才能找到對(duì)應(yīng)的RpcEndpoint。
[spark-core] org.apache.spark.rpc.netty.RequestMessae#serialze
/** Manually serialize [[RequestMessage]] to minimize the size. */
def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = new DataOutputStream(bos)
try {
//寫入本地地址
writeRpcAddress(out, senderAddress)
//寫入server端地址
writeRpcAddress(out, receiver.address)
//寫endpointName
out.writeUTF(receiver.name)
val s = nettyEnv.serializeStream(out)
try {
//case class,以我們的例子,這里就是 RetrieveSparkAppConfig
s.writeObject(content)
} finally {
s.close()
}
} finally {
out.close()
}
bos.toByteBuffer
}
2.消息投遞 Dispatcher
繼續(xù)回到Dispatcher中,跟蹤消息如何投遞到endpoint.
[spark-core] org.apache.spark.rpc.netty.Dispatcher
/**
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//endponitName -> messageLoop
private val endpoints: ConcurrentMap[String, MessageLoop] =
new ConcurrentHashMap[String, MessageLoop]
...//
/** Posts a message sent by a remote endpoint. */
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
}
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
...
//根據(jù)request消息體里的endpointName找到與endponit綁定的Message隊(duì)列
val loop = endpoints.get(endpointName)
...
//投遞消息
loop.post(endpointName, message)
...
}
}
到此消息就已經(jīng)投遞到正確的endpoint的消息隊(duì)列中。接下來我們需要跟蹤消息處理,查看消息的處理與返回過程。
3.Endpoint消費(fèi)消息
回顧一下上面說到的Server端的啟動(dòng),driverEndpoint是注冊(cè)到Dispatcher中,在注冊(cè)過程中,dispacther還為他創(chuàng)建了一個(gè)MessageLoop。MessageLoop的設(shè)計(jì)稍后詳細(xì)分析?,F(xiàn)在繼續(xù)跟蹤請(qǐng)求。
DriverEndpoint使用的是DedicatedMessageLoop為MessageLoop的子類。是一種單獨(dú)給一個(gè)endpoint使用MessageLoop。 MessageLoop還可以是共享,多個(gè)endponit共用一個(gè)MessageLoop,但是每個(gè)endpoint都用一個(gè)屬于自己的Inbox,這里的Inbox可以比作是信箱,Dispatcher的路由過程就是找到正確的Inbox,讓后把消息加入到Inbox的消息隊(duì)列中。
[spark-core] org.apache.spark.rpc.netty.DedicatedMessageLoop
/**
* A message loop that is dedicated to a single RPC endpoint.
*/
private class DedicatedMessageLoop(
name: String,
endpoint: IsolatedRpcEndpoint,
dispatcher: Dispatcher) extends MessageLoop(dispatcher) {
private val inbox = new Inbox(name, endpoint) //綁定的信箱
//inbox的工作線程
override protected val threadpool = if (endpoint.threadCount() > 1) {
ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
} else {
ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
}
//創(chuàng)建實(shí)例時(shí)就啟動(dòng)線程池開始工作
(1 to endpoint.threadCount()).foreach { _ =>
threadpool.submit(receiveLoopRunnable)
}
//同樣,在創(chuàng)建實(shí)例時(shí)標(biāo)記inbox為激活狀態(tài)
//工作線程從active的inboxs隊(duì)列中取出激活的inbox開始處理消息
// Mark active to handle the OnStart message.
setActive(inbox)
//投遞消息到inbox的消息隊(duì)列中
override def post(endpointName: String, message: InboxMessage): Unit = {
require(endpointName == name)
inbox.post(message)
setActive(inbox)
}
}
到這里,消息一經(jīng)放入到Inbox中,且當(dāng)前的Inbox處于激活狀態(tài)。那么在Inbox的工作線程里,就會(huì)對(duì)消息進(jìn)行處理,也就是receiveLoopRunnable進(jìn)行。這個(gè)變量屬于MessageLoop這個(gè)類,是一個(gè)Runable實(shí)例,run方法執(zhí)行了MessageLoop的receiveLoop.
[spark-core] org.apache.spark.rpc.netty.MessageLoop#receiveLoop
private def receiveLoop(): Unit = {
try {
while (true) {
try {
//active是一個(gè)隊(duì)列,表示哪些inbox有消息
val inbox = active.take()
//PoisonPill中斷信號(hào)
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
//處理消息
inbox.process(dispatcher)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {...}
}
Inbox根據(jù)消息的類型同選擇調(diào)用endpoint的方法。
對(duì)于我們的請(qǐng)求數(shù)據(jù)類型。進(jìn)入到DriverEndpoint的receiveAndReply方法
[spark-core] org.apache.spark.scheduer.cluster.DriverEndpoint#
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
//獲取SparkAppConfig
case RetrieveSparkAppConfig(resourceProfileId) =>
// note this will be updated in later prs to get the ResourceProfile from a
// ResourceProfileManager that matches the resource profile id
// for now just use default profile
val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
Option(delegationTokens.get()),
rp)
context.reply(reply)
}
這里的context變量是對(duì)TransprtRequestHandler總處理請(qǐng)求的RpcResponseCallback的封裝,也就是RemoteNettyRpcCallContext,
4.返回請(qǐng)求
這樣多層的封裝。我們看到,整個(gè)通信會(huì)有多種這樣的對(duì)象封裝對(duì)象,似乎表達(dá)的都是一個(gè)意思。但是這里的做法,是為了對(duì)上層使用屏蔽掉本地或者遠(yuǎn)程。對(duì)使用者透明,實(shí)現(xiàn)層進(jìn)行多維度封裝。
那么進(jìn)行到這里,我們就了解到,最終返回是觸發(fā)TransportRequestHandler中的定義的回調(diào)。
[common/network-common] org.apache.spark.network.server.TransportRequestHandler#processRpcRequest
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
//endpoint完成處理,寫出返回值
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
...//
});
[common/network-common] org.apache.spark.network.server.TransportRequestHandler#respond
/**
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
private ChannelFuture respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
//通過netty channel將數(shù)據(jù)返回client端
return channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
});
}
}
注:基于Apache Spark 3.0
作者:pokerwu
本作品采用知識(shí)共享署名-非商業(yè)性使用 4.0 國際許可協(xié)議進(jìn)行許可。