7.1 Dubbo調(diào)用介紹
如果我們手動(dòng)寫一個(gè)簡單的RPC調(diào)用,一般需要把服務(wù)調(diào)用的信息傳遞給服務(wù)端,包括每次服務(wù)調(diào)用的一些共用信息包括服務(wù)調(diào)用接口、方法名、方法參數(shù)類型和方法參數(shù)值等,在傳遞方法參數(shù)值時(shí)需要先序列化對(duì)象并經(jīng)過網(wǎng)絡(luò)傳輸?shù)椒?wù)端,在服務(wù)端接受后再按照客戶端序列化的順序再做一次反序列化,然后拼裝成請(qǐng)求對(duì)象進(jìn)行服務(wù)反射調(diào)用,最終將調(diào)用結(jié)果傳給客戶端。Dubbo的實(shí)現(xiàn)也基本是相同的原理,下圖是Dubbo一次完整RPC調(diào)用中經(jīng)過的步驟:

首先在客戶端啟動(dòng)時(shí),會(huì)從注冊(cè)中心拉取和訂閱對(duì)應(yīng)的服務(wù)列表,Cluster會(huì)把拉取的服務(wù)列表聚合成一個(gè)Invoker,每次RPC調(diào)用前會(huì)通過Directory#list獲取providers地址(已經(jīng)生成好的Invoker地址),獲取這些服務(wù)列表給后續(xù)路由和負(fù)載均衡使用。對(duì)應(yīng)上圖①中將多個(gè)服務(wù)提供者做聚合。在框架內(nèi)部實(shí)現(xiàn)Directory接口的是RegistryDirectory類,它和接口名是一對(duì)一的關(guān)系(每一個(gè)接口都有一個(gè)RegistryDirectory實(shí)例),主要負(fù)責(zé)拉取和訂閱服務(wù)提供者、動(dòng)態(tài)配置和路由項(xiàng)。
在Dubbo發(fā)起服務(wù)調(diào)用時(shí),所有路由和負(fù)載均衡都是在客戶端實(shí)現(xiàn)的??蛻舳朔?wù)調(diào)用首先會(huì)觸發(fā)路由操作,然后將路由結(jié)果得到的服務(wù)列表作為負(fù)載均衡參數(shù),經(jīng)過負(fù)載均衡后會(huì)選出一臺(tái)機(jī)器進(jìn)行RPC調(diào)用,這3個(gè)步驟一次對(duì)應(yīng)圖中②③④??蛻舳私?jīng)過路由和負(fù)載均衡后,會(huì)將請(qǐng)求交給底層IO線程池(如Netty)進(jìn)行處理,IO線程池主要處理讀寫、序列化和反序列化等邏輯,因此這里一定不能阻塞操作,Dubbo也提供參數(shù)控制(decode.in.io)參數(shù),在處理反序列化對(duì)象時(shí)會(huì)在業(yè)務(wù)線程池中處理。在⑤中包含兩種類似的線程池,一種是IO線程池(Netty),另一種是Dubbo業(yè)務(wù)線程池(承載業(yè)務(wù)方法調(diào)用)。
目前Dubbo將服務(wù)調(diào)用和Telnet調(diào)用做了端口復(fù)用,子啊編解碼層面也做了適配。在Telnet調(diào)用時(shí),會(huì)新建立一個(gè)TCP連接,傳遞接口、方法和json格式的參數(shù)進(jìn)行服務(wù)調(diào)用,在編解碼層面簡單讀取流中的字符串(因?yàn)椴皇荄ubbo標(biāo)準(zhǔn)頭報(bào)文),最終交給Telnet對(duì)應(yīng)的Handler去解析方法調(diào)用。如果不是Telnet調(diào)用,則服務(wù)提供方會(huì)根據(jù)傳遞過來的接口、分組和版本信息查找Invoker對(duì)應(yīng)的實(shí)例進(jìn)行反射調(diào)用。在⑦中進(jìn)行了端口復(fù)用,Telnet和正常RPC調(diào)用不一樣的地方是序列化和反序列化使用的不是Hessian方式,而是直接使用fastjson進(jìn)行處理。
講解完主要調(diào)用原理,接下來開始探討細(xì)節(jié),比如Dubbo協(xié)議、編解碼實(shí)現(xiàn)和線程模型等,本篇重點(diǎn)主要放在⑤⑥⑦。
7.2 Dubbo協(xié)議
Dubbo協(xié)議參考了現(xiàn)有的TCP/IP協(xié)議,每一次RPC調(diào)用包括協(xié)議頭和協(xié)議體兩部分。16字節(jié)長的報(bào)文頭部主要包含魔數(shù)(0xdabb),以及當(dāng)前請(qǐng)求報(bào)文是否是Request、Response、心跳和事件的信息,請(qǐng)求時(shí)也會(huì)攜帶當(dāng)前報(bào)文體內(nèi)序列化協(xié)議編號(hào)。除此之外,報(bào)文頭還攜帶了請(qǐng)求狀態(tài),以及請(qǐng)求唯一標(biāo)識(shí)和報(bào)文體長度。

| 偏移比特位 | 字段 | 描述 |
|---|---|---|
| 0 ~ 7 | 魔數(shù)高位 | 0xda00 |
| 8 ~15 | 魔數(shù)低位 | 0xbb |
| 16 | 數(shù)據(jù)包類型 | 0為Response,1為Request |
| 17 | 調(diào)用方式 | 第16為1時(shí)有效,0表示單向調(diào)用,1表示雙向調(diào)用,即有無返回值 |
| 18 | 事件標(biāo)識(shí) | 0表示當(dāng)前數(shù)據(jù)包是請(qǐng)求或響應(yīng),1表示心跳包 |
| 19 ~ 23 | 序列化器編號(hào) | 2為Hessian2Serialization,3為JavaSerialization,4為CompactedJavaSerialization,6為FastJsonSerialization,7為NativeJavaSerialization,8為KryoSerialization,9為FstSerialization |
| 24 ~ 31 | 狀態(tài) | 20為OK,30為CLIENT_TIMEOUT,31為SERVER_TIMEOUT,40為BAD_REQUEST,50為BAD_RESPONSE,60為SERVICE_NOT_FOUND,70為SERVICE_ERROR,80為SERVER_ERROR,90為CLIENT_ERROR,100為SERVER_THREADPOOL_EXHAUSTED_ERROR |
| 32 ~ 95 | 請(qǐng)求編號(hào) | 這8個(gè)字節(jié)存儲(chǔ)RPC請(qǐng)求的唯一ID,用來將請(qǐng)求和響應(yīng)做關(guān)聯(lián) |
| 96 ~ 127 | 消息體長度 | 4個(gè)字節(jié)存儲(chǔ)消息題長度 |
在消息體中,客戶端嚴(yán)格按照序列化順序?qū)懭胂ⅲ?wù)端也會(huì)遵循相同的順序讀取消息,客戶端發(fā)起的請(qǐng)求消息體一次依次保存下列內(nèi)容:Dubbo版本號(hào)、服務(wù)接口名、服務(wù)接口版本、方法名、參數(shù)類型、方法參數(shù)值和請(qǐng)求額外參數(shù)(attachment)。
服務(wù)端返回的響應(yīng)消息體主要包含回值狀態(tài)標(biāo)記和返回值,其中回值狀態(tài)標(biāo)記包含6中:
| 狀態(tài)值 | 狀態(tài)符號(hào) | 描述 |
|---|---|---|
| 5 | RESPONSE_NULL_VALUE_WITH_ATTACHMENTS | 響應(yīng)空值包含隱藏參數(shù) |
| 4 | RESPONSE_VALUE_WITH_ATTACHMENTS | 響應(yīng)結(jié)果包含隱藏參數(shù) |
| 3 | RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS | 異常返回包含隱藏參數(shù) |
| 2 | RESPONSE_NULL_VALUE | 響應(yīng)空值 |
| 1 | RESPONSE_VALUE | 響應(yīng)結(jié)果 |
| 0 | RESPONSE_WITH_EXCEPTION_ | 異常返回 |
我們知道在網(wǎng)絡(luò)通信中(TCP)需要解決網(wǎng)絡(luò)粘包/解包的問題,常用的方法比如用回車、換行、固定長度和特殊分隔符進(jìn)行處理,而Dubbo是使用特殊符號(hào)0xdabb魔法數(shù)來分割處理粘包問題。
在實(shí)際場景中,客戶端會(huì)使用多線程并發(fā)調(diào)用服務(wù),Dubbo如何做到正確響應(yīng)調(diào)用線程呢?關(guān)鍵在于協(xié)議頭的全局請(qǐng)求id標(biāo)識(shí),先看原理圖:

當(dāng)客戶端多個(gè)線程并發(fā)請(qǐng)求時(shí),框架內(nèi)部會(huì)調(diào)用DefaultFuture對(duì)象的get方法進(jìn)行等待。在請(qǐng)求發(fā)起時(shí),框架內(nèi)部會(huì)創(chuàng)建Request對(duì)象,這時(shí)候會(huì)被分配一個(gè)唯一id,DefaultFuture可以從Request中獲取id,并將關(guān)聯(lián)關(guān)系存儲(chǔ)到靜態(tài)HashMap中,就是上圖中的Futures集合。當(dāng)客戶端收到響應(yīng)時(shí),會(huì)根據(jù)Response對(duì)象中的id,從Futures集合中查找對(duì)應(yīng)DefaultFuture對(duì)象,最終會(huì)喚醒對(duì)應(yīng)的線程并通知結(jié)果。客戶端也會(huì)啟動(dòng)一個(gè)定時(shí)掃描線程去探測超時(shí)沒有返回的請(qǐng)求。
6.3 編解碼器的原理
先了解一下編解碼器的類關(guān)系圖:

如上,AbstractCodec主要提供基礎(chǔ)能力,比如校驗(yàn)報(bào)文長度和查找具體編解碼器等。TransportCodec主要抽象編解碼實(shí)現(xiàn),自動(dòng)幫我們?nèi)フ{(diào)用序列化、反序列化實(shí)現(xiàn)和自動(dòng)cleanup流。我們通過Dubbo編解碼繼承結(jié)構(gòu)可以清晰看到,DubboCodec繼承自ExchageCodec,它又再次繼承了TelnetCodec實(shí)現(xiàn)。我們前面說過Telnet實(shí)現(xiàn)復(fù)用了Dubbo協(xié)議端口,其實(shí)就是在這層編解碼做了通用處理。因?yàn)榱髦锌赡馨鄠€(gè)RPC請(qǐng)求,Dubbo框架嘗試一次性讀取更多完整報(bào)文編解碼生成對(duì)象,也就是圖中的DubboCountCodec,它的實(shí)現(xiàn)思想比較簡單,依次調(diào)用DubboCodec去解碼,如果能解碼成完整報(bào)文,則加入消息列表,然后觸發(fā)下一個(gè)Handler方法調(diào)用。
6.3.1 Dubbo協(xié)議編碼器
編碼器的作用是將Java對(duì)象轉(zhuǎn)成字節(jié)流,主要分兩部分,構(gòu)造報(bào)文頭部,和對(duì)消息體進(jìn)行序列化處理。所有編輯碼層實(shí)現(xiàn)都應(yīng)該繼承自ExchangeCodec,當(dāng)Dubbo協(xié)議編碼請(qǐng)求對(duì)象時(shí),會(huì)調(diào)用ExchangeCodec#encode方法。我們來看下這個(gè)方法是如何對(duì)請(qǐng)求對(duì)象進(jìn)行編碼的:

如上,是Dubbo將請(qǐng)求對(duì)象轉(zhuǎn)成字節(jié)流的過程,其中encodeRequestData方法是對(duì)RpcInvocation調(diào)用對(duì)象的編碼,主要是對(duì)接口、方法、方法參數(shù)類型、方法參數(shù)等進(jìn)行編碼,在DubboCodec#encodeRequestData中對(duì)此方法進(jìn)行了重寫:

注意隱式參數(shù)是個(gè)HashMap,timeout和group等動(dòng)態(tài)參數(shù)就放在這里。
處理完編碼請(qǐng)求對(duì)象后,我們繼續(xù)分析編碼響應(yīng)對(duì)象,在ExchangeCodec#encodeResponse中:
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
// 獲取指定或默認(rèn)的序列化協(xié)議(Hessian2)
Serialization serialization = getSerialization(channel);
// 構(gòu)造16字節(jié)頭
byte[] header = new byte[HEADER_LENGTH];
// 設(shè)置2字節(jié)魔數(shù)
Bytes.short2bytes(MAGIC, header);
// 第3個(gè)字節(jié)(19~23)設(shè)置序列化編號(hào)
header[2] = serialization.getContentTypeId();
// 18設(shè)置事件標(biāo)識(shí),沒有設(shè)置17調(diào)用方式應(yīng)該是因?yàn)轫憫?yīng)用不上
if (res.isHeartbeat()) {
header[2] |= FLAG_EVENT;
}
// 第4個(gè)字節(jié)設(shè)置響應(yīng)狀態(tài)
byte status = res.getStatus();
header[3] = status;
// 第5個(gè)字節(jié)設(shè)置id
Bytes.long2bytes(res.getId(), header, 4);
// 空出16字節(jié)頭部,后面存儲(chǔ)響應(yīng)體報(bào)文
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
encodeEventData(channel, out, res.getResult());
} else {
// 序列化響應(yīng),data一般是Result對(duì)象
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
out.writeUTF(res.getErrorMessage());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
int len = bos.writtenBytes();
// 檢查是否超過默認(rèn)8MB大小
checkPayload(channel, len);
// 寫入消息長度到第13個(gè)字節(jié)
Bytes.int2bytes(len, header, 12);
// 定位指針到報(bào)文頭部開始位置
buffer.writerIndex(savedWriteIndex);
// 寫入完整報(bào)文頭部到buffer
buffer.writeBytes(header);
// 設(shè)置writeIndex到消息體結(jié)束的位置
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// 如果編碼失敗,則復(fù)位buffer
buffer.writerIndex(savedWriteIndex);
// send error message to Consumer, otherwise, Consumer will wait till timeout.
// 將編碼響應(yīng)異常返送給consumer,否則客戶端只能等待到超時(shí)
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
// 數(shù)據(jù)包長度超過限制異常
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
// 告知客戶端超限的異常信息
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else {
// FIXME log error message in Codec and handle in caught() of IoHanndler?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try {
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
}
// Rethrow exception
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
如上,響應(yīng)編碼與請(qǐng)求編碼的邏輯基本大同小異,在編碼出現(xiàn)異常時(shí),會(huì)將異常響應(yīng)返回給客戶端,防止客戶端只能一直等到超時(shí)。為了防止報(bào)錯(cuò)對(duì)象無法在客戶端反序列化,在服務(wù)端會(huì)將異常信息轉(zhuǎn)成字符串處理。對(duì)于響應(yīng)體的編碼,在DubboCodec#encodeResponseData方法中實(shí)現(xiàn):

注意不管什么樣的響應(yīng),都會(huì)先寫入1個(gè)字節(jié)的標(biāo)識(shí)符,具體的值和含義前面已經(jīng)講過。
6.3.2 Dubbo協(xié)議解碼器
解碼相對(duì)更復(fù)雜一些,分為2部分,第一部分是解碼報(bào)文的頭部,第二部分是解碼報(bào)文體內(nèi)容并將其轉(zhuǎn)換成RpcInvocation對(duì)象。我們先看服務(wù)端接受到請(qǐng)求后的解碼過程,具體解碼實(shí)現(xiàn)在ExchangeCodec#decode方法:

protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// 如果加入流的起始處不是Dubbo魔數(shù),
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
// 如果還有數(shù)據(jù)可以讀
if (header.length < readable) {
// 為header分配空間
header = Bytes.copyOf(header, readable);
// 將剩余的可讀字節(jié)讀到header中
buffer.readBytes(header, length, readable - length);
}
// 遍歷讀出來的header看有沒有魔數(shù)
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
// 如果發(fā)現(xiàn)魔數(shù),就把buffer讀索引挪到這個(gè)魔數(shù)的位置,header只保留魔術(shù)之前的數(shù)據(jù)
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 交給父類TelnetCodec來解碼
return super.decode(channel, buffer, readable, header);
}
// 如果可讀數(shù)據(jù)小于16字節(jié),期待更多數(shù)據(jù)
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// 提取頭部存儲(chǔ)的報(bào)問體長度,并校驗(yàn)是否超過限制
int len = Bytes.bytes2int(header, 12);
checkPayload(channel, len);
// 驗(yàn)證是否可以讀取完整報(bào)文體,不完整則期待更多數(shù)據(jù)
int tt = len + HEADER_LENGTH;
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 對(duì)報(bào)文體進(jìn)行解碼,此處is是完整的RPC調(diào)用報(bào)文體
return decodeBody(channel, is, header);
} finally {
// 如果解碼過程又問題,則跳過這次RPC調(diào)用報(bào)文
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
可以看出,解碼過程中需要解決粘包和半包問題。接下來我們看一下DubboCodec對(duì)消息題解碼的實(shí)現(xiàn):
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// 從頭部讀取ID
long id = Bytes.bytes2long(header, 4);
if ((flag & FLAG_REQUEST) == 0) {
// 如果是響應(yīng)體
...
} else {
// 如果是請(qǐng)求體
// 創(chuàng)建Request對(duì)象
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
if (req.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
// 直接在IO線程進(jìn)行解碼
inv = new DecodeableRpcInvocation(channel, req, is, proto);
inv.decode();
} else {
// 交給Dubbo業(yè)務(wù)線程池解碼
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
// 把RpcInvocation數(shù)據(jù)放進(jìn)Request
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// 解碼失敗,標(biāo)記并存儲(chǔ)異常
req.setBroken(true);
req.setData(t);
}
return req;
}
如上,如果默認(rèn)配置在IO線程解碼,直接調(diào)用decode方法;否則不做解碼,延遲到業(yè)務(wù)線程池中解碼。這里沒有提到的是心跳和事件的解碼,其實(shí)很簡單,心跳報(bào)文是沒有消息體的,事件又消息體,在使用Hessian2協(xié)議的情況下默認(rèn)會(huì)傳遞字符R,當(dāng)優(yōu)雅停機(jī)時(shí)會(huì)通過發(fā)送readonly事件來通知客戶端當(dāng)前服務(wù)端不可用。
接下來,我們分析一下如何把消息體轉(zhuǎn)換成RpcInvocation對(duì)象,具體在DecodeableRpcInvocation#decode方法中:
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 讀取Dubbo版本
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(DUBBO_VERSION_KEY, dubboVersion);
// 讀取調(diào)用接口
String path = in.readUTF();
setAttachment(PATH_KEY, path);
// 讀取接口版本
setAttachment(VERSION_KEY, in.readUTF());
// 讀取方法名稱
setMethodName(in.readUTF());
// 讀取方法參數(shù)類型
String desc = in.readUTF();
setParameterTypesDesc(desc);
try {
Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
if (desc.length() > 0) {
// if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
// pts = ReflectUtils.desc2classArray(desc);
// } else {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.lookupService(path);
if (serviceDescriptor != null) {
MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
if (methodDescriptor != null) {
pts = methodDescriptor.getParameterClasses();
this.setReturnTypes(methodDescriptor.getReturnTypes());
}
}
if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
if (!RpcUtils.isGenericCall(path, getMethodName()) && !RpcUtils.isEcho(path, getMethodName())) {
throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
}
pts = ReflectUtils.desc2classArray(desc);
}
// }
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 循環(huán)讀取方法參數(shù)值
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
setParameterTypes(pts);
// 讀取隱式參數(shù)
Map<String, Object> map = in.readAttachments();
if (map != null && map.size() > 0) {
Map<String, Object> attachment = getObjectAttachments();
if (attachment == null) {
attachment = new HashMap<>();
}
attachment.putAll(map);
setObjectAttachments(attachment);
}
// 處理異步參數(shù)回調(diào),如果有則在服務(wù)端創(chuàng)建reference代理實(shí)例
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
setArguments(args);
String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
getAttachment(GROUP_KEY),
getAttachment(VERSION_KEY));
setTargetServiceUniqueName(targetServiceName);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
解碼請(qǐng)求時(shí),嚴(yán)格按照客戶端寫數(shù)據(jù)的順序處理。
解碼響應(yīng)和解碼請(qǐng)求類似,調(diào)用的同樣是DubboCodec#decodeBody,就是上面省略的部分,這里就不贅述了,重點(diǎn)看下響應(yīng)體的解碼,即DecodeableRpcResult#decode方法:

6.4 ChannelHandler
如果讀者熟悉Netty,就很容易理解Dubbo內(nèi)部使用的ChannelHandler組件的原理,Dubbo內(nèi)部使用了大量的Handler組成類似鏈表,依次處理具體邏輯,包括編解碼、心跳時(shí)間戳和方法調(diào)用Handler等。因?yàn)镹ettty每次創(chuàng)建Handler都會(huì)經(jīng)過ChannelPipeline,大量的事件經(jīng)過很多Pipeline會(huì)有較多開銷,因此Dubbo會(huì)將多個(gè)Handler聚合成一個(gè)Handler。(個(gè)人表示,這簡直bullshit)
6.5.1 核心Handler和線程模型
Dubbo的Channelhandler有5中狀態(tài):
- connected Channel已經(jīng)被創(chuàng)建
- disconnected Channel已經(jīng)被斷開
- sent 消息被發(fā)送
- received 消息被接收
- caught 捕獲到異常
Dubbo針對(duì)每個(gè)特性都會(huì)實(shí)現(xiàn)對(duì)應(yīng)的ChannelHandler,在講解Handler的指責(zé)之前,我們Dubbo有哪些常用的Handler:
- ExchangeHandlerAdapter 用于查找服務(wù)方法并調(diào)用
- HeaderExchangeHandler 封裝處理Request/Response和Telnet調(diào)用能力
- DecodeHandler 支持在Dubbo業(yè)務(wù)線程池中做解碼
- ChannelHandlerDispatcher 封裝多Handler廣播調(diào)用
- AllChannelHandler 支持Dubbo業(yè)務(wù)線程池調(diào)用業(yè)務(wù)方法
- HeartbeatHandler 支持心跳處理
- MultiMessageHandler 支持流中多消息報(bào)文批處理
- ConnectionOrderedChannelHandler 單線程池處理TCP的連接和斷開
- MessageOnlyChannelHandler 僅在線程池處理接受報(bào)文,其他事件在IO線程處理。
- WrappedChannelHandler 基于內(nèi)存key-value存儲(chǔ)封裝和共享線程池能力。
- NettyServerHandler 封裝Netty服務(wù)端事件,處理連接、斷開、讀取、寫入和異常等。
- NettyClientHandler 封裝Netty客戶端事件,處理連接、斷開、讀取、寫入和異常等。
Dubbo提供了大量的Handler去承載特性和擴(kuò)展,這些Handler最終會(huì)和底層通信框架做關(guān)聯(lián),比如Netty等。一次完整的RPC調(diào)用貫穿了一系列的Handler,如果直接掛載到底層通信框架(Netty),因?yàn)檎麄€(gè)鏈路比較長,則需要大量鏈?zhǔn)讲檎液褪录?,不僅低效,而且浪費(fèi)資源。
下圖展示了同時(shí)具有入站和出站ChannelHandler的布局,如果一個(gè)入站事件被觸發(fā),比如連接或數(shù)據(jù)讀取,那么它會(huì)從ChannelPipeline頭部一直傳播到ChannelPipeline的尾端。出站的IO事件將從ChannelPipeline最右邊開始,然后向左傳播。當(dāng)然ChannelPipeline傳播時(shí),會(huì)檢測入站的是否實(shí)現(xiàn)了ChannelInboundHandler,出站會(huì)檢測是否實(shí)現(xiàn)了ChannelOutboundHandler,如果沒有實(shí)現(xiàn),則自動(dòng)跳過。Dubbo框架中實(shí)現(xiàn)這兩個(gè)接口類主要是NettyServerHandler和NettyClientHandler。Dubbo通過裝飾者模式包裝Handler,從而不需要將每個(gè)Handler都追加到Pipeline中。因此NettyServer和NettyClient中最多有3個(gè)Handler,分別是編碼、解碼和NettyHandler。
注意區(qū)分兩種ChannelHandler,一種是netty的inbound和outBoundHandler,只需要3個(gè)編碼、解碼和NettyHandler(這個(gè)應(yīng)該是dubbo使用netty3的實(shí)現(xiàn),netty4的實(shí)現(xiàn)分開NettyServerHandler和NettyClientHandler);另一種是Dubbo定義的SPI接口也叫ChannelHandler,上面羅列的這么多Dubbo定義的ChannelHandler就是實(shí)現(xiàn)的spi接口,包括NettyClient這個(gè)類也實(shí)現(xiàn)了ChannelHandler這個(gè)SPI接口,它利用SPI包裝擴(kuò)展的方式將多個(gè)handler都包裝起來。然后NettyHandler會(huì)持有NettyClient的引用,所以真正的處理邏輯都有NettyClient來處理,進(jìn)而一層層的調(diào)用到每一層包裝的handler,所以整個(gè)調(diào)用鏈看起來會(huì)相當(dāng)?shù)膹?fù)雜。

講完Handler的流轉(zhuǎn)機(jī)制后,我們?cè)賮硖接慠PC調(diào)用Provider方處理Handler的邏輯,在DubboProtocol中通過內(nèi)部類繼承自ExchangeHandlerAdapter,完成服務(wù)提供方Invoker實(shí)例的查找并進(jìn)行服務(wù)的真實(shí)調(diào)用。

如上是觸發(fā)業(yè)務(wù)方法調(diào)用的關(guān)鍵,在服務(wù)暴露時(shí)服務(wù)端已經(jīng)按照特定規(guī)則(端口、接口名、接口版本和接口分組)把實(shí)例Invoker存儲(chǔ)到HashMap中,客戶端調(diào)用過來時(shí)必須攜帶相同信息構(gòu)造的key,找到對(duì)應(yīng)Exporter(里面持有Invoker)然后調(diào)用。
我們先跟蹤getInvoker的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)服務(wù)端唯一標(biāo)識(shí)的服務(wù)由4部分組成:端口、接口名、接口版本和接口分組。

Dubbo為了編織這些Handler,適應(yīng)不同的場景,提供了一套可以定制的線程模型。為了使概念更清晰,我們描述的IO線程是指底層直接負(fù)責(zé)讀寫報(bào)文,比如Netty線程池。Dubbo中提供的線程池負(fù)責(zé)業(yè)務(wù)方法調(diào)用,我們稱為業(yè)務(wù)線程。如果一些事件邏輯可以很快執(zhí)行完成,比如做個(gè)標(biāo)記這種簡單操作,則可以直接在IO線程中處理。如果是比較耗時(shí)的處理,比如讀寫數(shù)據(jù)庫等操作,則應(yīng)該將耗時(shí)或者阻塞的任務(wù)轉(zhuǎn)到業(yè)務(wù)線程上執(zhí)行。因?yàn)镮O線程用于接受請(qǐng)求,如果IO線程飽和,則不會(huì)接受新的請(qǐng)求。
我們先看一下Dubbo中是如何實(shí)現(xiàn)線程派發(fā)的:

如上,Dispatcher是線程池的派發(fā)器。這里需要注意的是,Dispatcher真實(shí)的職責(zé)是創(chuàng)建有線程派發(fā)能力的ChannelHandler,比如AllChannelHandler、MessageOnlyChannelHandler和ExecutionChannelHanlder,其本身并不具備線程派發(fā)能力。
Dispatcher屬于Dubbo中的擴(kuò)展點(diǎn),這個(gè)擴(kuò)展點(diǎn)用來動(dòng)態(tài)產(chǎn)生Handler,以滿足不同的場景,目前Dubbo支持一下6種策略調(diào)用:
| 分發(fā)策略 | 分發(fā)實(shí)現(xiàn) | 描述 |
|---|---|---|
| all | AllDispatcher | 將所有IO事件交給業(yè)務(wù)線程池,Dubbo默認(rèn)啟用 |
| connection | ConnectionOrderedDispatcher | 單獨(dú)線程池處理連接斷開事件,和業(yè)務(wù)線程池分開 |
| direct | DirectDispatcher | 所有方法調(diào)用和事件處理在IO線程池,不推薦 |
| execution | ExecutionDispatcher | 只在業(yè)務(wù)線程池處理接受請(qǐng)求,其他都在IO線程池 |
| message | MessageOnlyChannelHandler | 只在業(yè)務(wù)線程池處理請(qǐng)求和響應(yīng),其他在IO線程池 |
| mockdispatcher | MockDispatcher | 默認(rèn)返回null |
具體需要按照使用場景不同啟用不同的策略,建議使用默認(rèn)策略,如果在TCP連接中需要做安全或校驗(yàn),則可以使用ConnectionOrderedDispatcher策略。如果引入新的線程池,則不可避免的導(dǎo)致額外的線程切換,用戶可在Dubbo配置中指定dispatcher屬性讓具體策略生效。
6.4.2 Dubbo請(qǐng)求響應(yīng)Hanlder
在Dubbo內(nèi)部,所有方法調(diào)用都被抽象成Request/Response,每次調(diào)用都會(huì)創(chuàng)建一個(gè)Request,如果是方法調(diào)用則返回一個(gè)Response對(duì)象。HeaderExceptionExchangeHandler就是用了處理這種場景,主要負(fù)責(zé)4中事情:
(1) 更新發(fā)送和讀取請(qǐng)求時(shí)間戳。
(2) 判斷請(qǐng)求格式或編解碼是否有錯(cuò),并響應(yīng)客戶端失敗的具體原因。
(3) 處理Request請(qǐng)求和Response正常響應(yīng)。
(4) 支持Telnet調(diào)用。
我們先來看一下HeaderExchangeHandler#received實(shí)現(xiàn):

筆記記到這里,發(fā)現(xiàn)原書中這章就快結(jié)束了,但仍然對(duì)遠(yuǎn)程調(diào)用過程稀里糊涂的。。。就放棄繼續(xù)記錄了,還是期待下一篇看看官網(wǎng)的對(duì)遠(yuǎn)程調(diào)用的講述吧。