前言
單個 Hdfs 集群中可能存在成百上千個 DataNode ,但默認(rèn)情況下 NameNode 只有一個 , 各個節(jié)點(diǎn)不斷的進(jìn)行內(nèi)部通信,如果不能快速的處理掉通信消息,可能會導(dǎo)致掉節(jié)點(diǎn),或者數(shù)據(jù)傳輸緩慢等問題。因此Hdfs內(nèi)部集群對內(nèi)部RPC通信具有較高的性能要求。
本文會對 Hdfs 集群的RPC通信框架進(jìn)行分析,看看它是如何保證節(jié)點(diǎn)通信的效率。
Protobuf 簡介
在 Hdfs 中,為了提升內(nèi)部通信的傳輸效率,整個RPC通信框架使用 Google 的 Protobuf 序列化框架進(jìn)行數(shù)據(jù)傳輸。為了方便后續(xù)理解,這里先對 Protobuf 進(jìn)行簡單介紹。
數(shù)據(jù)傳輸
Protobuf 首先是一個跨語言的數(shù)據(jù)傳輸框架。把它和 XML 和 JSON進(jìn)行對比可以看出:
| 語言 | 特點(diǎn) | 可讀性 | 數(shù)據(jù)Size | 解析效率 |
|---|---|---|---|---|
| Protobuf | 將數(shù)據(jù)內(nèi)容解析成純字節(jié)形式傳輸 | 數(shù)據(jù)以字節(jié)形式存在,不具備可讀性 | 占用數(shù)據(jù)量少 | 直接讀取數(shù)據(jù)內(nèi)容,效率高 |
| XML和JSON | 引入額外文本構(gòu)造出格式化數(shù)據(jù) | 額外文本使得數(shù)據(jù)具備良好的可讀性 | 數(shù)據(jù)以字符形式存在,且額外文本占用大量空間 | 需要解析剔除額外數(shù)據(jù),效率低 |
XML和Json都是將數(shù)據(jù)封裝成一個格式化文本,因此在必要的傳輸數(shù)據(jù)之外,還有大量的額外文本進(jìn)行狀態(tài)描述。而 Protobuf 通過將數(shù)據(jù)字段序列化成為一串不可讀的字節(jié)碼,同XML和Json相比,對于同樣的數(shù)據(jù),它所需要傳輸?shù)臄?shù)據(jù)量更小,解析的速度更快。
Protobuf 也是一門天生的跨語言數(shù)據(jù)傳輸框架。 對于不同的語言,都用同一個 .proto 的文件進(jìn)行數(shù)據(jù)描述,如下:
message Person {
string name = 1;
int32 id = 2; // Unique ID number for this person.
string email = 3;
}
代碼中的 Person 數(shù)據(jù),可以通過 Google 或者三方的 protobuf 處理工具,被轉(zhuǎn)化為特定編程語言下的數(shù)據(jù)對象。
例如,在Java代碼中,通過 .proto 文件生成一個 AddressBook 數(shù)據(jù)類,那么生成的 Java 文件中會自帶 mergeFrom 和 writeTo 方法如下:
// 從輸入流中反序列化數(shù)據(jù)
AddressBook.Builder addressBook = AddressBook.newBuilder();
addressBook.mergeFrom(new FileInputStream(args[0]));
// 序列化數(shù)據(jù)到輸入流
FileOutputStream output = new FileOutputStream(args[0]);
addressBook.build().writeTo(output);
通過Protobuf內(nèi)部的IO邏輯,我們可以將指定的數(shù)據(jù)轉(zhuǎn)化為少量的字節(jié)碼進(jìn)行傳輸,從而提升整體的傳輸效率。
對于任意語言,只要以同樣的方式記錄和讀取同一份字節(jié)碼數(shù)據(jù)就可以得到同樣的數(shù)據(jù)對象,從而保證序列化數(shù)據(jù)的可還原性。同時,在數(shù)據(jù)的序列化過程中,由于沒有額外文本的參與,也不需要保持?jǐn)?shù)據(jù)在傳輸過程中的可讀性,因此對于同一個數(shù)據(jù),Protobuf擁有比XML和Json更小的數(shù)據(jù)量和更快的解析速度。
RPC 調(diào)用
Protobuf 除了實(shí)現(xiàn)數(shù)據(jù)的傳輸作用以外,還實(shí)現(xiàn)了一套RPC遠(yuǎn)程調(diào)用框架。
定義一個 .proto 文件如下
option java_generic_services = true;
service ReconfigurationProtocolService {
rpc getReconfigurationStatus(GetReconfigurationStatusRequestProto)
returns(GetReconfigurationStatusResponseProto);
}
使用Protobuf編譯工具進(jìn)行處理之后,可以得到一個 ReconfigurationProtocolService 接口,例如上方代碼對應(yīng)的接口中會有一個叫做 getReconfigurationStatus,參數(shù)類型為GetReconfigurationStatusRequestProto, 返回值為GetReconfigurationStatusResponseProto 的方法。
// 構(gòu)造BlockingService
ReconfigurationProtocolServerSideTranslatorPB reconfigurationProtocolXlator
= new ReconfigurationProtocolServerSideTranslatorPB(this);
BlockingService reconfigurationPbService = ReconfigurationProtocolService
.newReflectiveBlockingService(reconfigurationProtocolXlator);
// 調(diào)用BlockingService
service.callBlockingMethod(methodDescriptor, null, param);
在 Java 文件中,通過動態(tài)代理得到一個BlockingService對象,內(nèi)部包裹一個實(shí)現(xiàn)了 ReconfigurationProtocolService.BlockingInterface 接口的對象。
當(dāng)需要使用RPC服務(wù)時,系統(tǒng)通過傳輸需要調(diào)用的方法名和相關(guān)的調(diào)用參數(shù),使用 BlockingService::callBlockingMethod,就可以在Server端解析調(diào)用邏輯,實(shí)現(xiàn)RPC遠(yuǎn)程調(diào)用。
RPC通信的邏輯實(shí)現(xiàn)
總覽
言歸正傳,我們回到 Hdfs 的內(nèi)部通信機(jī)制本身。

如上圖中,Proxy和Impl是對同一個RPC調(diào)用接口的實(shí)現(xiàn)類,當(dāng)Proxy中的接口被調(diào)用時,通過Client發(fā)送消息到 Server ,Server 會按照標(biāo)準(zhǔn)數(shù)據(jù)格式進(jìn)行解析,再調(diào)用Server側(cè)的 Impl方法進(jìn)行執(zhí)行,并返回結(jié)果數(shù)據(jù)。Client 發(fā)送消息到 Server 的過程對于接口訪問而言是透明的,對于使用者來說,他在本地執(zhí)行 Proxy 的接口,會得到具有相同接口的 Impl 的調(diào)用結(jié)果。
不同的RPC框架的具體實(shí)現(xiàn)邏輯不盡相同,在Hdfs中,RPC.Server類扮演RPC框架中的 Server 角色,處理響應(yīng)內(nèi)部通信請求; Client 類扮演RPC框架中的 Client 角色,負(fù)責(zé)調(diào)用消息的發(fā)送和結(jié)果數(shù)據(jù)接收。
接下來會針對 Server 和 Client 的進(jìn)行代碼邏輯的走讀。
Server
RPC.Server的源碼路徑是 $src/hadooop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java。
在 RPC 類中有一個 Builder 類負(fù)責(zé)構(gòu)造 RPC.Server,在構(gòu)造方法中我們看到:
public Server build() throws IOException, HadoopIllegalArgumentException {
return getProtocolEngine(this.protocol, this.conf).getServer(
this.protocol, this.instance, this.bindAddress, this.port,
this.numHandlers, this.numReaders, this.queueSizePerHandler,
this.verbose, this.conf, this.secretManager, this.portRangeConfig);
}
默認(rèn)情況下,通過 getProtocolEngine 都是得到一個 ProtobufRpcEngine 對象,再通過ProtobufRpcEngine::getServer構(gòu)造出 ProtobufRpcEngine.Server 對象。
ProtobufRpcEngine.Server 是 Server 的子類,整個內(nèi)部通信機(jī)制在 Server 類中就已經(jīng)實(shí)現(xiàn)了,下面是 Server 中的數(shù)據(jù)處理流程。

Server類中使用了四種類型的線程類,分別是Listener,Reader,Handler和Responder。如上圖所示,為了方便表示各個線程間的通信邏輯,使用泳道代表著對應(yīng)類型的線程類操作時鎖使用的關(guān)鍵方法。
Listener
Listener 作為單線程任務(wù)負(fù)責(zé)監(jiān)聽指定端口的socket的 ACCEPT 請求,當(dāng)新的 socket鏈接到來時,將其封裝成一個 Connection 對象,通過addConnection添加Reader的處理隊(duì)列中。
Server 中只有一個 Listener 線程負(fù)責(zé)接收新的socket請求,但有多個 Reader 線程,在Listener::doAccept 中會根據(jù)以下代碼盡可能將 Connection 平均分配到各個 Reader中,讓多個線程可以同時讀取不同的 socket 數(shù)據(jù),從而避免Listener單線程引起的性能瓶頸。
Reader getReader() {
currentReader = (currentReader + 1) % readers.length;
return readers[currentReader];
}
Reader
Reader負(fù)責(zé)內(nèi)部通信數(shù)據(jù)的解析工作,它不斷嘗試從Connection所包裝的socket對象中讀取數(shù)據(jù)。當(dāng)發(fā)現(xiàn)某個 socket 可讀時,通過 readAndProcess-> processOneRpc 處理到來的消息。
private void processOneRpc(ByteBuffer bb) throws IOException, WrappedRpcServerException, InterruptedException {
final RpcWritable.Buffer buffer = RpcWritable.Buffer.wrap(bb);
final RpcRequestHeaderProto header = getMessage(RpcRequestHeaderProto.getDefaultInstance(), buffer);
callId = header.getCallId();
if (callId < 0) {
processRpcOutOfBandRequest(header, buffer);
} else if(!connectionContextRead) {
throw new WrappedRpcServerException();
} else {
processRpcRequest(header, buffer);
}
}
從上面的代碼可以看出,每次從 socket 請求傳來的數(shù)據(jù)請求都必然帶著一個 RpcRequestHeaderProto 對象,這個對象中封裝著后續(xù)參數(shù)的相關(guān)信息,就像 Http 協(xié)議中的頭信息。
當(dāng) socket 初次建立鏈接時,需要通過 procesRpcOutOfBandRequest 進(jìn)行鏈接初始化,初始化時的 callId < 0。初始化完成之后,后續(xù)請求通過 processRpcRequest 進(jìn)行消費(fèi)。
private void processRpcRequest(RpcRequestHeaderProto header,
RpcWritable.Buffer buffer) throws WrappedRpcServerException,
InterruptedException {
Class<? extends Writable> rpcRequestClass = getRpcRequestWrapper(header.getRpcKind());
Writable rpcRequest;
rpcRequest = buffer.newInstance(rpcRequestClass, conf);
RpcCall call = new RpcCall(this, header.getCallId(),
header.getRetryCount(), rpcRequest,
ProtoUtil.convert(header.getRpcKind()),
header.getClientId().toByteArray(), traceScope, callerContext);
queueCall(call);
}
這里根據(jù)RpcRequestHeaderProto中包含的body類型解析出對應(yīng)的數(shù)據(jù)類,將其封裝成一個 RpcCall 對象,放入 Handler 的消費(fèi)隊(duì)列中。
Handler
Handler 線程負(fù)責(zé)具體指令的執(zhí)行工作。
final Call call = callQueue.take(); // pop the queue; maybe blocked here
CurCall.set(call);
// always update the current call context
CallerContext.setCurrent(call.callerContext);
UserGroupInformation remoteUser = call.getRemoteUser();
if (remoteUser != null) {
remoteUser.doAs(call);
} else {
call.run();
}
在 Handler 的循環(huán)隊(duì)列中,不斷從 callQueue 中獲取需要消費(fèi)的任務(wù)信息,然后通過 call.run() 進(jìn)行任務(wù)執(zhí)行。
@Override
public Void run() throws Exception {
Writable value = null;
ResponseParams responseParams = new ResponseParams();
value = call(rpcKind, connection.protocolName, rpcRequest, timestamp);
if (!isResponseDeferred()) {
setupResponse(this, responseParams.returnStatus, responseParams.detailedErr, value, responseParams.errorClass, responseParams.error);
sendResponse();
}
}
在 RpcCall::run 中我們看到,系統(tǒng)實(shí)際上是通過Server::call方法執(zhí)行的,這個方法在 RPC.Server 中被實(shí)現(xiàn)。
static { // Register the rpcRequest deserializer for ProtobufRpcEngine
org.apache.hadoop.ipc.Server.registerProtocolEngine(
RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcProtobufRequest.class,
new Server.ProtoBufRpcInvoker());
}
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol,
Writable rpcRequest, long receiveTime) throws Exception {
return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest,
receiveTime);
}
// Server.ProtoBufRpcInvoker
public Writable call(RPC.Server server, String connectionProtocolName,
Writable writableRequest, long receiveTime) throws Exception {
RpcProtobufRequest request = (RpcProtobufRequest) writableRequest;
RequestHeaderProto rpcRequest = request.getRequestHeader();
String methodName = rpcRequest.getMethodName();
String declaringClassProtoName =
rpcRequest.getDeclaringClassProtocolName();
long clientVersion = rpcRequest.getClientProtocolVersion();
ProtoClassProtoImpl protocolImpl = getProtocolImpl(server,
declaringClassProtoName, clientVersion);
BlockingService service = (BlockingService) protocolImpl.protocolImpl;
result = service.callBlockingMethod(methodDescriptor, null, param);
}
從源碼中可以看到,RPC.Server::call經(jīng)過層層路徑,最終在Server.ProtoBufRpcInvoker 根據(jù)傳入的數(shù)據(jù)找到對應(yīng)的BlockingService,利用 Protobuf (這里沒有使用Protobuf內(nèi)置的RpcChannel,而是自己手動調(diào)用BlockingService::callBlockingMethod)實(shí)現(xiàn)方法的調(diào)用。
Responder
在 Reponder 線程的 while 循環(huán)中,我們看到當(dāng)socket可寫時,會嘗試調(diào)用 doAsyncWrite->processResponse 進(jìn)行寫入操作
private boolean processResponse(LinkedList<RpcCall> responseQueue,
boolean inHandler) throws IOException {
call = responseQueue.removeFirst();
SocketChannel channel = call.connection.channel;
int numBytes = channelWrite(channel, call.rpcResponse);
if (numBytes < 0) {
return true;
}
if (!call.rpcResponse.hasRemaining()) {
...
} else {
call.connection.responseQueue.addFirst(call);
}
return done;
}
private int channelWrite(WritableByteChannel channel,
ByteBuffer buffer) throws IOException {
int count = (buffer.remaining() <= NIO_BUFFER_LIMIT) ?
channel.write(buffer) : channelIO(null, channel, buffer);
if (count > 0) {
rpcMetrics.incrSentBytes(count);
}
return count;
}
Responder會將得到的 response 寫入socket 的輸出流中,返回給Client。
Client
Client 的源碼路徑是 $src/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java 。
@Override
@SuppressWarnings("unchecked")
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Invoker invoker = new Invoker(protocol, addr, ticket, conf, factory,
rpcTimeout, connectionRetryPolicy, fallbackToSimpleAuth);
return new ProtocolProxy<T>(protocol, (T) Proxy.newProxyInstance(
protocol.getClassLoader(), new Class[]{protocol}, invoker), false);
}
Client 端通過 ProtobufRpcEngine::getProxy 構(gòu)建出一個動態(tài)代理的接口對象。當(dāng) Client 訪問接口時,通過 Invoker 類通知 Client 發(fā)送請求給 Server。
public Message invoke(Object proxy, final Method method, Object[] args) throws ServiceException {
RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method);
final Message theRequest = (Message) args[1];
final RpcWritable.Buffer val;
val = (RpcWritable.Buffer) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER,
new RpcProtobufRequest(rpcRequestHeader, theRequest), remoteId,
fallbackToSimpleAuth);
return getReturnMessage(method, val);
}
Invoker 會根據(jù)訪問接口的簽名信息構(gòu)造出一個 RequestHeaderProto 對象,在上一小節(jié)中,我們看到當(dāng) Server 接收到 socket 信息時,會先讀取這個 RequestHeaderProto,了解當(dāng)前調(diào)用的方法名稱,然后進(jìn)行后續(xù)分發(fā)。
RequestHeaderProto 對象隨著 Message 對象一起被封裝成一個 Call 對象傳遞給 Client 進(jìn)行發(fā)送,每一個 Call 對象會有一個唯一的 callId, 便于在接收到返回信息中,返回給指定的 Call。
Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass,
AtomicBoolean fallbackToSimpleAuth) throws IOException {
final Connection connection = getConnection(remoteId, call, serviceClass,
fallbackToSimpleAuth);
connection.sendRpcRequest(call);
}
private Connection getConnection(ConnectionId remoteId,
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException {
connection = connections.get(remoteId);
Connection existing = connections.putIfAbsent(remoteId, connection);
if (connection == null) {
connection = new Connection(remoteId, serviceClass);
}
connection.setupIOstreams(fallbackToSimpleAuth);
return connection;
}
Client 有一個 connections 的 Connection 隊(duì)列負(fù)責(zé)同各個節(jié)點(diǎn)的NameNode 進(jìn)行通信,首次構(gòu)造 Connection 對象后,通過 setupIOstreams初始化鏈接信息,同時發(fā)送相關(guān)的設(shè)置信息到 Server::processRpcOutOfBandRequest 中進(jìn)行Server側(cè)的初始化。
當(dāng)有一個可用的Connection 后,通過 connection::sendRpcRequest將請求發(fā)送給對應(yīng)的Server。
同時Connection 也是一個線程類,在 setupIOstreams 的時候會啟動接收線程。接收線程在收到消息之后,根據(jù)消息中的唯一callId將返回?cái)?shù)據(jù)返回給指定的 Call 對象,完成整個 Client 的通信流程。
NameNode 和 DataNode的心跳邏輯
接下來,以 NameNode 和 DataNode的心跳發(fā)送機(jī)制為例,舉例說明內(nèi)部通信的流程。
在 Hdfs 中,心跳是單向的,總是由DataNode主動上報(bào)當(dāng)前狀態(tài)到NameNode中,因此對于心跳而言,NameNode是Server,DataNode是Client。
DataNode
在前一篇文章中,我介紹了DataNode 在啟動的時候,會構(gòu)造一個 BlockPoolManager 對象,在 BlockPoolManager 中有一個 BPOfferService的集合對象。
BPOfferService(List<InetSocketAddress> nnAddrs, List<InetSocketAddress> lifelineNnAddrs, DataNode dn) {
for (int i = 0; i < nnAddrs.size(); ++i) {
this.bpServices.add(new BPServiceActor(nnAddrs.get(i),
lifelineNnAddrs.get(i), this));
}
}
void start() {
for (BPServiceActor actor : bpServices) {
actor.start();
}
}
每一個BPOfferService對應(yīng)著一個 NameService , 對于 NameService 的每一個 NameNode 節(jié)點(diǎn),會對應(yīng) BPServiceActor 的Runnable類。在啟動BPOfferService的時候,其實(shí)就是啟動每一個BPServiceActor類。
void start() {
bpThread = new Thread(this, formatThreadName("heartbeating", nnAddr));
bpThread.start();
}
@Override
public void run() {
connectToNNAndHandshake();
while (shouldRun()) {
offerService();
}
}
private void offerService() throws Exception {
while (shouldRun()) {
final long startTime = scheduler.monotonicNow();
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
resp = sendHeartBeat(requestBlockReportLease);
}
....
}
}
BPServiceActor類本身是一個Runnable的實(shí)現(xiàn)類,在線程循環(huán)中,先鏈接到NameNode ,再在 while 循環(huán)中不斷offerService。
在offerService中,通過 sendHeartBeat 進(jìn)行周期性的心跳發(fā)送。
private void connectToNNAndHandshake() throws IOException {
// get NN proxy
bpNamenode = dn.connectToNN(nnAddr);
// First phase of the handshake with NN - get the namespace
// info.
NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are
// the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
// Second phase of the handshake with the NN.
register(nsInfo);
}
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
throws IOException {
scheduler.scheduleNextHeartbeat();
scheduler.updateLastHeartbeatTime(monotonicNow());
return bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
dn.getFSDataset().getCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease);
}
// DatanodeProtocolClientSideTranslatorPB.java
@Override
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes)
.setRequestFullBlockReportLease(requestFullBlockReportLease);
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());
return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
rollingUpdateStatus, resp.getFullBlockReportLeaseId());
}
在connectToNNAndHandshake中,通過ProtobufRpcEngine::getProxy 獲得一個bpNamenode 的RPC代理類,調(diào)用 bpNamenode.sendHeartbeat時,通過動態(tài)代理將消息通過 Client 發(fā)送出去。
NameNode
DataNode發(fā)送了心跳之后,對應(yīng)的NameNode會接收到一條對應(yīng)的請求信息。
通過走讀代碼,我們找到了同樣實(shí)現(xiàn) DatanodeProtocolService 接口的是DatanodeProtocolServerSideTranslatorPB 類。
public HeartbeatResponseProto sendHeartbeat(RpcController controller,
HeartbeatRequestProto request) throws ServiceException {
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
}
在 DatanodeProtocolServerSideTranslatorPB::sendHeartbeat 中通過事件分發(fā)將心跳事件交給 FSNamesystem 進(jìn)行消費(fèi),從而完成了 DataNode 和 NameNode 的心跳事件。