網(wǎng)絡(luò)通信模塊是分布式系統(tǒng)的底層基礎(chǔ),支撐上層分布式環(huán)境下復(fù)雜的進(jìn)程間通信。遠(yuǎn)程過程調(diào)用(RPC,Remote Procedure Call)是一種常見的分布式網(wǎng)絡(luò)通信協(xié)議。RPC 允許運(yùn)行與一臺機(jī)器的程序調(diào)用另一臺機(jī)器的子程序,同時將網(wǎng)絡(luò)細(xì)節(jié)屏蔽起來,大大簡化了分布式程序的開發(fā)。
Hadoop RPC 簡介
Hadoop 實(shí)現(xiàn)了自己的 RPC 通信協(xié)議,是上層分布式子系統(tǒng)(HDFS、MapReduce、HBase 等)公用的網(wǎng)絡(luò)通信模塊。Hadoop RPC 具有以下特點(diǎn):
- 透明性。所有RPC框架的基本特性,對用戶屏蔽了網(wǎng)絡(luò)通信過程。
- 高性能。Hadoop 各個子系統(tǒng)均采用 Master/Slave 架構(gòu),Master 作為一個 RPC Server,負(fù)責(zé)處理所有 Slave 發(fā)送的請求,需要能夠高效的處理多個并發(fā) RPC 請求。
- 可控性。JDK 自帶的 RPC 框架(RMI)過于重量級,用戶可控之處太少,如:網(wǎng)絡(luò)連接、超時和緩存等難以修改。因此 Hadoop 實(shí)現(xiàn)了輕量級的可控性更強(qiáng)的 RPC 框架。
Hadoop RPC 架構(gòu)
Hadoop RPC 與其他 RPC 框架一樣主要由四個部分組成:序列化層、函數(shù)調(diào)用層、網(wǎng)絡(luò)傳輸層、服務(wù)端處理框架。
- 序列化層,將結(jié)構(gòu)化數(shù)據(jù)轉(zhuǎn)換為字節(jié)流,便于通過網(wǎng)絡(luò)傳輸或進(jìn)行持久化。
- 函數(shù)調(diào)用層,定位要調(diào)用的函數(shù)并執(zhí)行函數(shù)。Hadoop RPC 采用 Java 反射和動態(tài)代理實(shí)現(xiàn)函數(shù)調(diào)用。
- 網(wǎng)絡(luò)傳輸層,描述了 Client 和 Server 之間消息傳輸?shù)姆绞?。Hadoop RPC 采用基于 TCP/IP 的 Socket 機(jī)制。
- 服務(wù)端處理框架,可以抽象為網(wǎng)絡(luò)I/O模型,直接決定了服務(wù)器端的并發(fā)處理能力。常見的有:阻塞式I/O、非阻塞式I/O、事件驅(qū)動I/O等,Hadoop RPC 采用基于 Reactor 設(shè)計模式的事件驅(qū)動I/O模型。
Hadoop RPC 總體架構(gòu)如圖,自下而上分為兩層:第一層是基于 Java NIO 實(shí)現(xiàn)的 Client/Server 通信模型;第二層是供上層應(yīng)用調(diào)用的 RPC 接口。

Hadoop RPC 使用
首先定義 RPC 協(xié)議,RPC 協(xié)議是客戶端和服務(wù)端的通信接口,定義了服務(wù)器端對外提供的服務(wù)接口。
interface ClientProtocol extends org.apache.hadoop.ipc.VersionedProtocol {
// 版本號,默認(rèn)情況下不通版本號的 Client 和 Server 不能通信
public static final long versionId = 1L;
String echo(String value) throws IOException;
int add(int v1, int v2) throwd IOException;
}
實(shí)現(xiàn) RPC 協(xié)議,Hadoop RPC 協(xié)議通常是一個 Java 接口。
public static class ClientProtocolImpl implemenets ClientProtocol {
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
return ClientProtocol.versionId;
}
public String echo(String value) throws IOException {
return value;
}
public int add(int v1, int v2) throwd IOException {
return v1 + v2;
}
}
構(gòu)造并啟動 RPC Server,使用 getServer() 方法構(gòu)造 RPC Server,并啟動
// serverHost 和 serverPort 表示服務(wù)器的 host 和端口
// numHandlers 表示服務(wù)器端處理請求的線程數(shù)
server = RPC.getServer(new ClientProtocolImpl(), serverHost, serverPort,
numHandlers, false, conf);
server.start();
構(gòu)造 RPC CLient,并發(fā)送 RPC 請求。使用 getProxy() 方法構(gòu)造客戶端代理,通過代理對象調(diào)用遠(yuǎn)程服務(wù)器端方法
proxy = (ClientProtocol) RPC.getProxy(ClientProtocol.class, new ClientProtocolImpl(),
ClientProtocol.versionId, addr, conf);
int result = proxy.add(5, 6);
String echoResult = proxy.echo("hello");
經(jīng)過上面四個步驟,便利用 Hadoop RPC 構(gòu)建了一個簡單的 Client/Server 網(wǎng)絡(luò)模型。
深入理解 Hadoop RPC
Hadoop RPC 主要由三個類組成:RPC、Client、Server,分別對應(yīng)接口、客戶端實(shí)現(xiàn)、服務(wù)器端實(shí)現(xiàn)。
ipc.RPC
RPC 類是對底層客戶機(jī)/服務(wù)器網(wǎng)絡(luò)模型的封裝,以便為開發(fā)人員提供方便簡潔的編程接口。
RPC 類定義了一個內(nèi)部類 RPC.Server,繼承 Server 抽象,并利用反射機(jī)制實(shí)現(xiàn)了 call 接口。RPC 類包含一個 ClientCache 類型的成員根據(jù)用戶提供的 SocketFactory 緩存 Client(重用 Client)。

與本地執(zhí)行反射調(diào)用不通的是,RPC 函數(shù)調(diào)用時(執(zhí)行 invock 方法),需要將函數(shù)調(diào)用信息(函數(shù)名、參數(shù)列表等)打包成可序列化對象 Invocation,通過網(wǎng)絡(luò)發(fā)送給服務(wù)器端,服務(wù)器端接收到后,根據(jù)這些信息在利用反射機(jī)制完成函數(shù)調(diào)用。
ipc.Client
Client 類主要完成的功能是發(fā)送遠(yuǎn)程過程調(diào)用信息,并接收執(zhí)行結(jié)果。Client 類內(nèi)部有兩個重要的類:Call 和 Connection。
Call 封裝了一個 RPC 請求,包含唯一標(biāo)識Id、函數(shù)調(diào)用信息、函數(shù)返回信息、錯誤信息、執(zhí)行完成標(biāo)識。Connection 封裝了 Client 與每個 Server 之前的連接信息。包括通信連接唯一標(biāo)識RemoteId、Socket、網(wǎng)絡(luò)輸入輸出數(shù)據(jù)流、RPC請求等。

ipc.Server
Hadoop 采用 Master/Slave 結(jié)構(gòu),Master(NameNode、JobTracker)是整個系統(tǒng)的單點(diǎn),是系統(tǒng)的性能和擴(kuò)展瓶頸之一。Master 通過 ipc.Server 接收并處理所有 Slave 發(fā)送的請求,這要求 ipc.Server 將高并發(fā)和可擴(kuò)展性作為設(shè)計目標(biāo)。
ipc.Server 采用了多種提高并發(fā)處理能力的技術(shù),包括:線程池、事件驅(qū)動和 Reactor 設(shè)計模式等。均采用 JDK 自帶的庫實(shí)現(xiàn)。下面著重介紹 Reactor 設(shè)計模式如何提高整體性能。
Reactor 是并發(fā)編程中一種基于事件驅(qū)動的設(shè)計模式,具有以下兩個特點(diǎn):
- 通過派發(fā)、分離I/O操作事件提高系統(tǒng)的并發(fā)性能
- 提高粗粒度的并發(fā)控制,單線程實(shí)現(xiàn),避免復(fù)雜的同步處理
典型的 Reactor 實(shí)現(xiàn)原理圖如下:

主要包括以下幾個角色:
- Reactor,IO事件的派發(fā)者
- Acceptor,接收來自 Client 的連接,建立與 Client 對應(yīng)的 Handler,并向 Reactor 注冊 Handler
- Handler,與 Client 通信的實(shí)體,實(shí)現(xiàn)業(yè)務(wù)的處理,內(nèi)部會進(jìn)一步劃分為:read、decode、compute、encode、send 等過程,
- Reader/Sender,為了加速處理速度,通過構(gòu)建線程池,存放數(shù)據(jù)處理線程,數(shù)據(jù)讀出后在線程池中等待后續(xù)處理即可。因此一般會分離 Handler 的讀和寫的過程,非別注冊為讀和寫事件由 Reader 和 Sender 處理。
ipc.Server 實(shí)現(xiàn)了一個典型的 Reactor 模式,整體架構(gòu)基本與上述一致。ipc.Server 被劃分為三個階段:接收請求、處理請求和返回結(jié)果

接收請求
接收來自各個客戶端的請求,封裝為 Call 對象,放入共享隊列(callQueue)。其中 Listener 負(fù)責(zé)監(jiān)聽請求,整個 Server 只有一個 Listener。一旦有新的請求到達(dá),會輪詢的方式從線程池中選擇一個 Reader 處理。Selector 對象負(fù)責(zé)監(jiān)聽相關(guān)事件。
處理請求
從共享隊列中獲取 Call 對象,執(zhí)行對應(yīng)的函數(shù)調(diào)用,由多個 Handler 并行完成。Handler 會嘗試將結(jié)果返回給客戶端,但是考慮到某些函數(shù)調(diào)用返回的結(jié)果很大或網(wǎng)絡(luò)慢等原因,可能很難一次性將結(jié)果發(fā)送到客戶端,Handler 會嘗試將后續(xù)發(fā)送任務(wù)交給 Responder 處理。
返回結(jié)果
Server 端只有一個 Responder,負(fù)責(zé)處理 Handler 的結(jié)果返回給客戶端,Selector 對象負(fù)責(zé)監(jiān)聽相關(guān)事件。
Hadoop RPC 參數(shù)
Hadoop RPC 提供了一些可配置參數(shù):
| 參數(shù) | 說明 |
|---|---|
| ipc.server.read.threadpool.size | Reader 線程數(shù) 默認(rèn)值:1 |
| ipc.server.handler.queue.size | 每個 Handler 對應(yīng)最大 Call 數(shù)量,會影響 Call 隊列長度 默認(rèn)值:100 |
| mapred.job.tracker.handler.count dfs.namenode.service.handler.count |
JobTracker 和 NameNode 中 Handler 數(shù)量 默認(rèn)值:10 |
| ipc.client.connect.max.retries | 客戶端最大重試次數(shù),間隔1s 默認(rèn)值:10 |
《Hadoop技術(shù)內(nèi)幕:深入解析MapReduce架構(gòu)設(shè)計與實(shí)現(xiàn)原理》