目錄
前言
在之前的文章中,我們由SparkContext的初始化提到了事件總線LiveListenerBus與執(zhí)行環(huán)境SparkEnv。在講解SparkEnv的過程中,RPC環(huán)境RpcEnv又是首先被初始化的重要組件。做個不怎么恰當(dāng)?shù)谋容^,SparkEnv之于SparkContext,正如RpcEnv之于SparkEnv。
由于RPC環(huán)境負(fù)責(zé)著Spark體系內(nèi)幾乎所有內(nèi)部及外部通信,內(nèi)容很多,所以一篇文章必然講不完。本文還是從基礎(chǔ)開始看起。
RPC端點及其引用
RpcEnv抽象類是Spark RPC環(huán)境的通用表示,它其中定義的setupEndpoint()方法用來向RPC環(huán)境注冊一個RPC端點(RpcEndpoint),并返回其引用(RpcEndpointRef)。如果客戶端要對一個RpcEndpoint發(fā)送消息,那么必須首先獲得其對應(yīng)的RpcEndpointRef。它們之間的關(guān)系可以用如下簡圖表示。

既然RpcEndpoint和RpcEndpointRef是RPC環(huán)境中的基礎(chǔ)組件,我們先來研究它們的源碼。
RpcEndpoint
RpcEndpoint是一個特征,其代碼如下。
代碼#8.1 - o.a.s.rpc.RpcEndpoint特征
private[spark] trait RpcEndpoint {
val rpcEnv: RpcEnv
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
def onError(cause: Throwable): Unit = {
throw cause
}
def onConnected(remoteAddress: RpcAddress): Unit = { }
def onDisconnected(remoteAddress: RpcAddress): Unit = { }
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = { }
def onStart(): Unit = { }
def onStop(): Unit = { }
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}
其中定義了如下方法,這些相當(dāng)于是RPC端點在RPC環(huán)境中的“行為準(zhǔn)則”。
- self():取得當(dāng)前RpcEndpoint對應(yīng)的RpcEndpointRef。
- receive()/receiveAndReply():接收其他RpcEndpointRef傳來的消息并進(jìn)行處理,receiveAndReply()方法還會發(fā)送回復(fù)。
- onError():消息處理出現(xiàn)異常時調(diào)用的方法。
- onConnected()/onDisconnected():當(dāng)前RPC端點建立連接或斷開連接時調(diào)用的方法。
- onNetworkError():RPC端點的連接出現(xiàn)網(wǎng)絡(luò)錯誤時調(diào)用的方法。
- onStart()/onStop():RPC端點初始化與關(guān)閉時調(diào)用的方法。
- stop():停止當(dāng)前RpcEndpoint。
RpcEndpoint繼承體系
RpcEndpoint的主要繼承體系如下圖所示。

圖中可以看到不少之前出現(xiàn)過的RPC端點,如文章#2中的HeartbeatReceiver,文章#7中的MapOutputTrackerMasterEndpoint、BlockManagerMasterEndpoint等。在今后涉及到它們時,會專門進(jìn)行講解。
另外,圖中的ThreadSafeRpcEndpoint是直接繼承自RpcEndpoint的特征。顧名思義,它要求RPC端點對消息的處理必須是線程安全的,用文檔中的話說,線程安全RPC端點處理消息必須滿足happens-before原則。
RpcEndpointRef
RpcEndpointRef是一個抽象類,其代碼如下。
代碼#8.2 - o.a.s.rpc.RpcEndpointRef抽象類
private[spark] abstract class RpcEndpointRef(conf: SparkConf)
extends Serializable with Logging {
private[this] val maxRetries = RpcUtils.numRetries(conf)
private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
def address: RpcAddress
def name: String
def send(message: Any): Unit
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}
}
這個抽象類的開頭有三個屬性,都是通過RpcUtils工具類從Spark配置項中取出來的,如下。
- maxRetries:最大重連次數(shù),對應(yīng)配置項為spark.rpc.numRetries,默認(rèn)值3次。
- retryWaitMs:每次重連之前等待的時長,對應(yīng)配置項為spark.rpc.retry.wait,默認(rèn)值3秒。
- defaultAskTimeout:對RPC端點進(jìn)行ask()操作(下面會講到)的默認(rèn)超時時長,對應(yīng)配置項為spark.rpc.askTimeout與spark.network.timeout(前者優(yōu)先級高于后者),默認(rèn)值120秒。
值得注意的是,maxRetries與retryWaitMs兩個屬性在當(dāng)前的2.3.3版本中都沒有用到,而在之前的版本中還是有用到的,證明Spark官方取消了RPC重試機(jī)制,也就是統(tǒng)一為消息傳遞語義中的at most once語義了。當(dāng)然,我們也可以自己實現(xiàn)帶有重試機(jī)制的RPC端點引用。
address和name方法分別返回RPC端點引用對應(yīng)的地址和名稱,不必多講。下面幾個方法的含義如下。
- send()方法:異步發(fā)送一條單向的消息,并且“發(fā)送即忘記”(fire-and-forget),不需要回復(fù)。
- ask()方法:異步發(fā)送一條消息,并在規(guī)定的超時時間內(nèi)等待RPC端點的回復(fù)。RPC端點會調(diào)用receiveAndReply()方法來處理。
- askSync()方法:是ask()方法的同步實現(xiàn)。由于它是阻塞操作,有可能會消耗大量時間,因此必須慎用。
RpcEndpointRef只有一個子類,即NettyRpcEndpointRef。它對send()和ask()兩個方法的實現(xiàn)如下。
代碼#8.3 - o.a.s.rpc.netty.NettyRpcEndPointRef.send()與ask()方法
override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
nettyEnv.ask(new RequestMessage(nettyEnv.address, this, message), timeout)
}
override def send(message: Any): Unit = {
require(message != null, "Message is null")
nettyEnv.send(new RequestMessage(nettyEnv.address, this, message))
}
可見是依賴于NettyRpcEnv類的,下面來看一下它是如何創(chuàng)建出來的。
NettyRpcEnv概況
創(chuàng)建NettyRpcEnv
在文章#7的代碼#7.4~#7.5中,通過工廠類NettyRpcEnvFactory的create()方法創(chuàng)建出了NettyRpcEnv,它是目前Spark官方提供的RPC環(huán)境的唯一實現(xiàn)。該方法的代碼如下。
代碼#8.4 - o.a.s.rpc.netty.NettyRpcEnvFactory.create()方法
def create(config: RpcEnvConfig): RpcEnv = {
val sparkConf = config.conf
val javaSerializerInstance =
new JavaSerializer(sparkConf).newInstance().asInstanceOf[JavaSerializerInstance]
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
可見,這個方法先創(chuàng)建了JavaSerializer序列化器,用于RPC傳輸中的序列化。然后通過NettyRpcEnv的構(gòu)造方法創(chuàng)建NettyRpcEnv,這其中也會涉及到一些RPC基礎(chǔ)組件的初始化,后面會講解到。最后定義偏函數(shù)startNettyRpcEnv,并調(diào)用通用工具類Utils中的startServiceOnPort()方法來啟動NettyRpcEnv。
NettyRpcEnv中的屬性成員
我們暫時先不看NettyRpcEnv類的細(xì)節(jié),而是先來看看它內(nèi)部包含了哪些組件。
代碼#8.5 - o.a.s.rpc.netty.NettyRpcEnv中的屬性成員
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", 0))
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
private val streamManager = new NettyStreamManager(this)
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager))
private val clientFactory = transportContext.createClientFactory(createClientBootstraps())
@volatile private var fileDownloadFactory: TransportClientFactory = _
val timeoutScheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("netty-rpc-env-timeout")
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64))
@volatile private var server: TransportServer = _
private val stopped = new AtomicBoolean(false)
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
- TransportConf:傳輸配置,作用在RPC環(huán)境中類似于SparkConf,負(fù)責(zé)管理與RPC相關(guān)的各種配置。
- Dispatcher:調(diào)度器,或者叫分發(fā)器,用于將消息路由到正確的RPC端點。
- NettyStreamManager:流式管理器,用于處理RPC環(huán)境中的文件,如自定義的配置文件或者JAR包。
- TransportContext:傳輸上下文,作用在RPC環(huán)境中類似于SparkContext,負(fù)責(zé)管理RPC的服務(wù)端(TransportServer)與客戶端(TransportClient),與它們之間的Netty傳輸管道。
- TransportClientFactory:創(chuàng)建RPC客戶端TransportClient的工廠。
- TransportServer:RPC環(huán)境中的服務(wù)端,負(fù)責(zé)提供基礎(chǔ)且高效的流式服務(wù)。
TransportConf和TransportContext提供底層的基于Netty的RPC機(jī)制,TransportClient和TransportServer則是RPC端點的最低級別抽象。
總結(jié)
本文講解了RPC環(huán)境的基本組成部分RpcEndpoint、RpcEndpointRef的細(xì)節(jié)實現(xiàn),并初步了解了NettyRpcEnv的創(chuàng)建過程,以及它內(nèi)部包含的主要組件。雖然TransportConf和TransportContext更為基礎(chǔ),但為了避免嵌套太深出不來,下一篇文章暫時不準(zhǔn)備講它們,而主要來研究NettyRpcEnv內(nèi)的調(diào)度器Dispatcher,它是整個RPC環(huán)境高效運轉(zhuǎn)的基礎(chǔ)。