目錄
前言
又是很久沒有連載,萬分抱歉。今天(注:其實(shí)也包含昨天)需要盯著雙11各個實(shí)時任務(wù)的運(yùn)行,目前仍然無事發(fā)生,抽空來寫幾筆吧。
在前面的文章中,我們了解了塊管理器BlockManager管理下的讀寫流程。并且已經(jīng)知道,BlockManager讀取塊時,如果塊在本地找不到,就會去集群內(nèi)的遠(yuǎn)端節(jié)點(diǎn)去獲取。同理,如果BlockManager寫入塊時需要復(fù)制,那么除了在本地寫入之外,也要再寫一份到遠(yuǎn)端節(jié)點(diǎn)。BlockManager與遠(yuǎn)端節(jié)點(diǎn)的交互就得依賴塊傳輸服務(wù)BlockTransferService。但是BlockTransferService需要依賴之前偷懶沒有講過的RPC底層組件,所以現(xiàn)在得把這個坑填上,計劃用3篇文章來填。
由于Spark 2.x的RPC環(huán)境是完全基于Netty搞的,所以如果看官對Netty有基本的了解的話,讀起來會順暢一點(diǎn)。
RPC底層概覽
在系列的文章#8中,我們講到了RPC環(huán)境——即NettyRpcEnv的構(gòu)建和屬性成員。來復(fù)習(xí)一下。
代碼#B1.1 - o.a.s.rpc.netty.NettyRpcEnv中的部分屬性成員
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", 0))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
@volatile private var server: TransportServer = _
- TransportConf:傳輸配置,作用在RPC環(huán)境中類似于SparkConf,負(fù)責(zé)管理與RPC相關(guān)的各種配置。
- Dispatcher:調(diào)度器,或者叫分發(fā)器,用于將消息路由到正確的RPC端點(diǎn),它的邏輯在文章#9中已經(jīng)講過,就不再提了。
- StreamManager:流式管理器,用于處理RPC環(huán)境中的文件,如自定義的配置文件或者JAR包。
- TransportContext:傳輸上下文,作用在RPC環(huán)境中類似于SparkContext,負(fù)責(zé)管理RPC的服務(wù)端(TransportServer)與客戶端(TransportClient),和它們之間的Netty Channel和Pipeline。
- TransportClientFactory:創(chuàng)建RPC客戶端TransportClient的工廠。
- TransportServer:RPC環(huán)境中的服務(wù)端,負(fù)責(zé)提供基礎(chǔ)且高效的流式服務(wù)。
這些東西就是Spark RPC底層主要的組成部分,之前并沒有了解過,下面我們從TransportConf、TransportContext這兩樣開始探究。
傳輸配置TransportConf
在Spark源碼中并不會顯式地創(chuàng)建TransportConf實(shí)例,而是通過SparkTransportConf對象代為實(shí)現(xiàn)。該對象的源碼很短,如下。
代碼#B1.2 - o.a.s.network.netty.SparkTransportConf對象
object SparkTransportConf {
private val MAX_DEFAULT_NETTY_THREADS = 8
def fromSparkConf(_conf: SparkConf, module: String, numUsableCores: Int = 0): TransportConf = {
val conf = _conf.clone
val numThreads = defaultNumThreads(numUsableCores)
conf.setIfMissing(s"spark.$module.io.serverThreads", numThreads.toString)
conf.setIfMissing(s"spark.$module.io.clientThreads", numThreads.toString)
new TransportConf(module, new ConfigProvider {
override def get(name: String): String = conf.get(name)
override def get(name: String, defaultValue: String): String = conf.get(name, defaultValue)
override def getAll(): java.lang.Iterable[java.util.Map.Entry[String, String]] = {
conf.getAll.toMap.asJava.entrySet()
}
})
}
private def defaultNumThreads(numUsableCores: Int): Int = {
val availableCores =
if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
math.min(availableCores, MAX_DEFAULT_NETTY_THREADS)
}
}
可見,SparkTransportConf.fromSparkConf()方法負(fù)責(zé)從SparkConf持有的參數(shù)創(chuàng)建TransportConf。TransportConf接受的構(gòu)造參數(shù)有二:
- module:該RPC配置所屬的模塊名,比如代碼#B.1中創(chuàng)建NettyRpcEnv時,模塊名就是"rpc"。
- ConfigProvider:一個簡單的負(fù)責(zé)數(shù)據(jù)類型轉(zhuǎn)化的類,能夠讀取SparkConf中與RPC相關(guān)的配置,并傳遞給TransportConf。在如上的代碼中,是直接初始化了一個匿名類。
這樣,我們就可以通過SparkConf持有、并通過ConfigProvider獲取Spark RPC的所有配置參數(shù)。換句話說,TransportConf就是SparkConf的一個子集,SparkConf仍然是配置的唯一入口,方便統(tǒng)一管理。
上面代碼中需要初始化的參數(shù)如下:
- spark.rpc.io.numConnectionsPerPeer:在每對RPC實(shí)體間建立的連接數(shù)量,默認(rèn)是1;
- spark.rpc.io.threads:RPC服務(wù)端和客戶端的線程數(shù),默認(rèn)為0,即不設(shè)置。在此值不設(shè)置的情況下,最終會采用可用的核心數(shù)與MAX_DEFAULT_NETTY_THREADS常量(為8)之間的較小值。
TransportConf類的實(shí)現(xiàn)就非常簡單了,主要由很多get方法組成。以下是部分代碼,不再贅述。
代碼#B1.3 - o.a.s.network.netty.TransportConf類的部分代碼
public TransportConf(String module, ConfigProvider conf) {
this.module = module;
this.conf = conf;
SPARK_NETWORK_IO_MODE_KEY = getConfKey("io.mode");
SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY = getConfKey("io.preferDirectBufs");
SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY = getConfKey("io.connectionTimeout");
// ...略...
}
public String ioMode() { return conf.get(SPARK_NETWORK_IO_MODE_KEY, "NIO").toUpperCase(Locale.ROOT); }
public boolean preferDirectBufs() { return conf.getBoolean(SPARK_NETWORK_IO_PREFERDIRECTBUFS_KEY, true); }
public int connectionTimeoutMs() {
long defaultNetworkTimeoutS = JavaUtils.timeStringAsSec(
conf.get("spark.network.timeout", "120s"));
long defaultTimeoutMs = JavaUtils.timeStringAsSec(
conf.get(SPARK_NETWORK_IO_CONNECTIONTIMEOUT_KEY, defaultNetworkTimeoutS + "s")) * 1000;
return (int) defaultTimeoutMs;
}
public int numConnectionsPerPeer() { return conf.getInt(SPARK_NETWORK_IO_NUMCONNECTIONSPERPEER_KEY, 1); }
public int serverThreads() { return conf.getInt(SPARK_NETWORK_IO_SERVERTHREADS_KEY, 0); }
public int clientThreads() { return conf.getInt(SPARK_NETWORK_IO_CLIENTTHREADS_KEY, 0); }
public int receiveBuf() { return conf.getInt(SPARK_NETWORK_IO_RECEIVEBUFFER_KEY, -1); }
public int sendBuf() { return conf.getInt(SPARK_NETWORK_IO_SENDBUFFER_KEY, -1); }
傳輸上下文TransportContext
既然SparkContext是Spark Core功能的主要入口,那么TransportContext自然就是Spark RPC環(huán)境的入口了。它比前面講過的NettyRpcEnv更加底層,如果沒有它,RPC環(huán)境也就無從談起了。
成員屬性與構(gòu)造方法
代碼#B1.4 - o.a.s.network.TransportContext類的成員屬性與構(gòu)造方法
private final TransportConf conf;
private final RpcHandler rpcHandler;
private final boolean closeIdleConnections;
private static final MessageEncoder ENCODER = MessageEncoder.INSTANCE;
private static final MessageDecoder DECODER = MessageDecoder.INSTANCE;
public TransportContext(TransportConf conf, RpcHandler rpcHandler) {
this(conf, rpcHandler, false);
}
public TransportContext(
TransportConf conf,
RpcHandler rpcHandler,
boolean closeIdleConnections) {
this.conf = conf;
this.rpcHandler = rpcHandler;
this.closeIdleConnections = closeIdleConnections;
}
以下3個成員屬性同時也是TransportContext構(gòu)造方法的參數(shù):
- conf:即RPC傳輸配置TransportConf;
- rpcHandler:RPC消息處理器RpcHandler,是一個抽象類。顧名思義,它其中包含了所有RPC消息的具體處理邏輯;
- closeIdleConnections:表示是否關(guān)閉空閑連接的標(biāo)志。
另外還有兩個常量ENCODER和DECODER。前者是消息編碼器MessageEncoder的實(shí)例,由Netty提供的MessageToMessageEncoder派生而來,RPC服務(wù)端使用它來編碼向客戶端發(fā)送的消息;后者是消息解碼器MessageDecoder的實(shí)例,由Netty提供的MessageToMessageDecoder派生而來,RPC客戶端使用它來解碼從服務(wù)端收到的消息。
下面具體看看TransportContext提供的方法,通過這些方法,我們能見識到更多其他由TransportContext創(chuàng)建的RPC組件。
創(chuàng)建傳輸客戶端工廠TransportClientFactory
代碼#B1.5 - o.a.s.network.TransportContext.createClientFactory()方法
public TransportClientFactory createClientFactory(List<TransportClientBootstrap> bootstraps) {
return new TransportClientFactory(this, bootstraps);
}
public TransportClientFactory createClientFactory() {
return createClientFactory(new ArrayList<>());
}
createClientFactory()方法負(fù)責(zé)創(chuàng)建傳輸客戶端工廠TransportClientFactory,由TransportClientFactory進(jìn)而可以創(chuàng)建更多的傳輸客戶端TransportClient。其參數(shù)中的TransportClientBootstrap表示傳輸客戶端的初始化邏輯(通常是一些一次性的工作,比如讓它們攜帶SASL認(rèn)證的token等)。
createClientFactory()方法是直接調(diào)用了TransportClientFactory的構(gòu)造方法,關(guān)于它的邏輯,下篇文章詳細(xì)講。
創(chuàng)建傳輸服務(wù)端TransportServer
代碼#B1.6 - o.a.s.network.TransportContext.createServer()方法
public TransportServer createServer(int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, null, port, rpcHandler, bootstraps);
}
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
public TransportServer createServer(List<TransportServerBootstrap> bootstraps) {
return createServer(0, bootstraps);
}
public TransportServer createServer() {
return createServer(0, new ArrayList<>());
}
createServer()方法有4個重載,可以指定RPC服務(wù)端要綁定到的主機(jī)地址和端口號,以及初始化邏輯TransportServerBootstrap。TransportServer的構(gòu)造方法和相關(guān)細(xì)節(jié)會與TransportClientFactory一起講。
初始化Netty ChannelPipeline與ChannelHandler
先來看initializePipeline()方法。
代碼#B1.7 - o.a.s.network.TransportContext.initializePipeline()方法
public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}
這段代碼看起來可能有些sophisticated,我們先回憶一下Netty的基礎(chǔ)知識。
在Netty架構(gòu)中,Channel(通道)是消息通信的載體,ChannelHandler(通道處理器)則負(fù)責(zé)Channel中消息通信具體邏輯的實(shí)現(xiàn)。而ChannelPipeline(通道管線)將多個ChannelHandler按順序組織起來,ChannelEvent(消息體)就按照ChannelPipeline規(guī)定的順序流轉(zhuǎn)。下面用一幅簡圖來表示它們之間的關(guān)系。

在ChannelPipeline內(nèi)部用雙鏈表來維護(hù)ChannelHandler以及它對應(yīng)的上下文實(shí)例ChannelHandlerContext,另外還有特殊的頭節(jié)點(diǎn)HeadContext和尾節(jié)點(diǎn)TailContext。Netty的這種設(shè)計可以讓用戶專注于實(shí)現(xiàn)ChannelHandler的邏輯細(xì)節(jié),這大概也是Spark開發(fā)者們所看重的優(yōu)點(diǎn)之一吧。
從上文代碼中的initializePipeline()方法可以看出,通過鏈?zhǔn)秸{(diào)用ChannelPipeline.addLast()方法,按順序添加了以下ChannelHandler:
- 消息編碼器MessageEncoder;
- 幀解碼器TransportFrameDecoder;
- 消息解碼器MessageDecoder;
- 空閑狀態(tài)處理器IdleStateHandler;
- 真正的RPC處理邏輯TransportChannelHandler。它是Spark RPC環(huán)境專用的ChannelHandler。
它們的類圖如下所示。

其中,實(shí)現(xiàn)了ChannelInboundHandler接口的處理器用于處理請求消息,而實(shí)現(xiàn)了ChannelOutboundHandler接口的處理器用于處理響應(yīng)消息,并且它們的順序是相反的。因此,處理請求的流程是:TransportFrameDecoder→MessageDecoder→IdleStateHandler→TransportChannelHandler,處理響應(yīng)的流程是:IdleStateHandler→MessageEncoder。
那么TransportChannelHandler是哪里來的呢?它是由createChannelHandler()方法創(chuàng)建的。
代碼#B1.8 - o.a.s.network.TransportContext.createChannelHandler()方法
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
由代碼可見,一個TransportChannelHandler實(shí)際上由三個組件組成:
- TransportResponseHandler,是客戶端處理服務(wù)端發(fā)回的響應(yīng)消息的處理器;
- TransportClient,單獨(dú)的RPC客戶端,不受TransportClientFactory的管理;
- TransportRequestHandler,是服務(wù)端處理客戶端發(fā)來的請求消息的處理器。
注意,雖然TransportResponseHandler和TransportRequestHandler的名稱里都有“Handler”,但它們不是Netty層面上的東西,僅僅是Spark內(nèi)置的MessageHandler抽象類的實(shí)現(xiàn)而已,它規(guī)定了處理請求和響應(yīng)的一些基本規(guī)范,后文會講解到。

總結(jié)
本文講解了RPC環(huán)境中的傳輸配置TransportConf與傳輸上下文TransportContext的細(xì)節(jié),探究了由TransportContext初始化的傳輸客戶端工廠TransportClientFactory、傳輸服務(wù)端TransportServer,最后結(jié)合Netty的部分知識講解了ChannelPipeline與ChannelHandler的初始化邏輯。
民那晚安。