7.Dubbo遠(yuǎn)程調(diào)用(要配合下一篇一起看)

7.1 Dubbo調(diào)用介紹

如果我們手動(dòng)寫一個(gè)簡單的RPC調(diào)用,一般需要把服務(wù)調(diào)用的信息傳遞給服務(wù)端,包括每次服務(wù)調(diào)用的一些共用信息包括服務(wù)調(diào)用接口、方法名、方法參數(shù)類型和方法參數(shù)值等,在傳遞方法參數(shù)值時(shí)需要先序列化對(duì)象并經(jīng)過網(wǎng)絡(luò)傳輸?shù)椒?wù)端,在服務(wù)端接受后再按照客戶端序列化的順序再做一次反序列化,然后拼裝成請(qǐng)求對(duì)象進(jìn)行服務(wù)反射調(diào)用,最終將調(diào)用結(jié)果傳給客戶端。Dubbo的實(shí)現(xiàn)也基本是相同的原理,下圖是Dubbo一次完整RPC調(diào)用中經(jīng)過的步驟:


Dubbo調(diào)用流程

首先在客戶端啟動(dòng)時(shí),會(huì)從注冊(cè)中心拉取和訂閱對(duì)應(yīng)的服務(wù)列表,Cluster會(huì)把拉取的服務(wù)列表聚合成一個(gè)Invoker,每次RPC調(diào)用前會(huì)通過Directory#list獲取providers地址(已經(jīng)生成好的Invoker地址),獲取這些服務(wù)列表給后續(xù)路由和負(fù)載均衡使用。對(duì)應(yīng)上圖①中將多個(gè)服務(wù)提供者做聚合。在框架內(nèi)部實(shí)現(xiàn)Directory接口的是RegistryDirectory類,它和接口名是一對(duì)一的關(guān)系(每一個(gè)接口都有一個(gè)RegistryDirectory實(shí)例),主要負(fù)責(zé)拉取和訂閱服務(wù)提供者、動(dòng)態(tài)配置和路由項(xiàng)。
在Dubbo發(fā)起服務(wù)調(diào)用時(shí),所有路由和負(fù)載均衡都是在客戶端實(shí)現(xiàn)的??蛻舳朔?wù)調(diào)用首先會(huì)觸發(fā)路由操作,然后將路由結(jié)果得到的服務(wù)列表作為負(fù)載均衡參數(shù),經(jīng)過負(fù)載均衡后會(huì)選出一臺(tái)機(jī)器進(jìn)行RPC調(diào)用,這3個(gè)步驟一次對(duì)應(yīng)圖中②③④??蛻舳私?jīng)過路由和負(fù)載均衡后,會(huì)將請(qǐng)求交給底層IO線程池(如Netty)進(jìn)行處理,IO線程池主要處理讀寫、序列化和反序列化等邏輯,因此這里一定不能阻塞操作,Dubbo也提供參數(shù)控制(decode.in.io)參數(shù),在處理反序列化對(duì)象時(shí)會(huì)在業(yè)務(wù)線程池中處理。在⑤中包含兩種類似的線程池,一種是IO線程池(Netty),另一種是Dubbo業(yè)務(wù)線程池(承載業(yè)務(wù)方法調(diào)用)。
目前Dubbo將服務(wù)調(diào)用和Telnet調(diào)用做了端口復(fù)用,子啊編解碼層面也做了適配。在Telnet調(diào)用時(shí),會(huì)新建立一個(gè)TCP連接,傳遞接口、方法和json格式的參數(shù)進(jìn)行服務(wù)調(diào)用,在編解碼層面簡單讀取流中的字符串(因?yàn)椴皇荄ubbo標(biāo)準(zhǔn)頭報(bào)文),最終交給Telnet對(duì)應(yīng)的Handler去解析方法調(diào)用。如果不是Telnet調(diào)用,則服務(wù)提供方會(huì)根據(jù)傳遞過來的接口、分組和版本信息查找Invoker對(duì)應(yīng)的實(shí)例進(jìn)行反射調(diào)用。在⑦中進(jìn)行了端口復(fù)用,Telnet和正常RPC調(diào)用不一樣的地方是序列化和反序列化使用的不是Hessian方式,而是直接使用fastjson進(jìn)行處理。
講解完主要調(diào)用原理,接下來開始探討細(xì)節(jié),比如Dubbo協(xié)議、編解碼實(shí)現(xiàn)和線程模型等,本篇重點(diǎn)主要放在⑤⑥⑦。

7.2 Dubbo協(xié)議

Dubbo協(xié)議參考了現(xiàn)有的TCP/IP協(xié)議,每一次RPC調(diào)用包括協(xié)議頭和協(xié)議體兩部分。16字節(jié)長的報(bào)文頭部主要包含魔數(shù)(0xdabb),以及當(dāng)前請(qǐng)求報(bào)文是否是Request、Response、心跳和事件的信息,請(qǐng)求時(shí)也會(huì)攜帶當(dāng)前報(bào)文體內(nèi)序列化協(xié)議編號(hào)。除此之外,報(bào)文頭還攜帶了請(qǐng)求狀態(tài),以及請(qǐng)求唯一標(biāo)識(shí)和報(bào)文體長度。


Dubbo協(xié)議
偏移比特位 字段 描述
0 ~ 7 魔數(shù)高位 0xda00
8 ~15 魔數(shù)低位 0xbb
16 數(shù)據(jù)包類型 0為Response,1為Request
17 調(diào)用方式 第16為1時(shí)有效,0表示單向調(diào)用,1表示雙向調(diào)用,即有無返回值
18 事件標(biāo)識(shí) 0表示當(dāng)前數(shù)據(jù)包是請(qǐng)求或響應(yīng),1表示心跳包
19 ~ 23 序列化器編號(hào) 2為Hessian2Serialization,3為JavaSerialization,4為CompactedJavaSerialization,6為FastJsonSerialization,7為NativeJavaSerialization,8為KryoSerialization,9為FstSerialization
24 ~ 31 狀態(tài) 20為OK,30為CLIENT_TIMEOUT,31為SERVER_TIMEOUT,40為BAD_REQUEST,50為BAD_RESPONSE,60為SERVICE_NOT_FOUND,70為SERVICE_ERROR,80為SERVER_ERROR,90為CLIENT_ERROR,100為SERVER_THREADPOOL_EXHAUSTED_ERROR
32 ~ 95 請(qǐng)求編號(hào) 這8個(gè)字節(jié)存儲(chǔ)RPC請(qǐng)求的唯一ID,用來將請(qǐng)求和響應(yīng)做關(guān)聯(lián)
96 ~ 127 消息體長度 4個(gè)字節(jié)存儲(chǔ)消息題長度

在消息體中,客戶端嚴(yán)格按照序列化順序?qū)懭胂ⅲ?wù)端也會(huì)遵循相同的順序讀取消息,客戶端發(fā)起的請(qǐng)求消息體一次依次保存下列內(nèi)容:Dubbo版本號(hào)、服務(wù)接口名、服務(wù)接口版本、方法名、參數(shù)類型、方法參數(shù)值和請(qǐng)求額外參數(shù)(attachment)。
服務(wù)端返回的響應(yīng)消息體主要包含回值狀態(tài)標(biāo)記和返回值,其中回值狀態(tài)標(biāo)記包含6中:

狀態(tài)值 狀態(tài)符號(hào) 描述
5 RESPONSE_NULL_VALUE_WITH_ATTACHMENTS 響應(yīng)空值包含隱藏參數(shù)
4 RESPONSE_VALUE_WITH_ATTACHMENTS 響應(yīng)結(jié)果包含隱藏參數(shù)
3 RESPONSE_WITH_EXCEPTION_WITH_ATTACHMENTS 異常返回包含隱藏參數(shù)
2 RESPONSE_NULL_VALUE 響應(yīng)空值
1 RESPONSE_VALUE 響應(yīng)結(jié)果
0 RESPONSE_WITH_EXCEPTION_ 異常返回

我們知道在網(wǎng)絡(luò)通信中(TCP)需要解決網(wǎng)絡(luò)粘包/解包的問題,常用的方法比如用回車、換行、固定長度和特殊分隔符進(jìn)行處理,而Dubbo是使用特殊符號(hào)0xdabb魔法數(shù)來分割處理粘包問題。
在實(shí)際場景中,客戶端會(huì)使用多線程并發(fā)調(diào)用服務(wù),Dubbo如何做到正確響應(yīng)調(diào)用線程呢?關(guān)鍵在于協(xié)議頭的全局請(qǐng)求id標(biāo)識(shí),先看原理圖:


Dubbo請(qǐng)求響應(yīng)

當(dāng)客戶端多個(gè)線程并發(fā)請(qǐng)求時(shí),框架內(nèi)部會(huì)調(diào)用DefaultFuture對(duì)象的get方法進(jìn)行等待。在請(qǐng)求發(fā)起時(shí),框架內(nèi)部會(huì)創(chuàng)建Request對(duì)象,這時(shí)候會(huì)被分配一個(gè)唯一id,DefaultFuture可以從Request中獲取id,并將關(guān)聯(lián)關(guān)系存儲(chǔ)到靜態(tài)HashMap中,就是上圖中的Futures集合。當(dāng)客戶端收到響應(yīng)時(shí),會(huì)根據(jù)Response對(duì)象中的id,從Futures集合中查找對(duì)應(yīng)DefaultFuture對(duì)象,最終會(huì)喚醒對(duì)應(yīng)的線程并通知結(jié)果。客戶端也會(huì)啟動(dòng)一個(gè)定時(shí)掃描線程去探測超時(shí)沒有返回的請(qǐng)求。

6.3 編解碼器的原理

先了解一下編解碼器的類關(guān)系圖:


Dubbo編解碼關(guān)系

如上,AbstractCodec主要提供基礎(chǔ)能力,比如校驗(yàn)報(bào)文長度和查找具體編解碼器等。TransportCodec主要抽象編解碼實(shí)現(xiàn),自動(dòng)幫我們?nèi)フ{(diào)用序列化、反序列化實(shí)現(xiàn)和自動(dòng)cleanup流。我們通過Dubbo編解碼繼承結(jié)構(gòu)可以清晰看到,DubboCodec繼承自ExchageCodec,它又再次繼承了TelnetCodec實(shí)現(xiàn)。我們前面說過Telnet實(shí)現(xiàn)復(fù)用了Dubbo協(xié)議端口,其實(shí)就是在這層編解碼做了通用處理。因?yàn)榱髦锌赡馨鄠€(gè)RPC請(qǐng)求,Dubbo框架嘗試一次性讀取更多完整報(bào)文編解碼生成對(duì)象,也就是圖中的DubboCountCodec,它的實(shí)現(xiàn)思想比較簡單,依次調(diào)用DubboCodec去解碼,如果能解碼成完整報(bào)文,則加入消息列表,然后觸發(fā)下一個(gè)Handler方法調(diào)用。

6.3.1 Dubbo協(xié)議編碼器

編碼器的作用是將Java對(duì)象轉(zhuǎn)成字節(jié)流,主要分兩部分,構(gòu)造報(bào)文頭部,和對(duì)消息體進(jìn)行序列化處理。所有編輯碼層實(shí)現(xiàn)都應(yīng)該繼承自ExchangeCodec,當(dāng)Dubbo協(xié)議編碼請(qǐng)求對(duì)象時(shí),會(huì)調(diào)用ExchangeCodec#encode方法。我們來看下這個(gè)方法是如何對(duì)請(qǐng)求對(duì)象進(jìn)行編碼的:


ExchangeCodec#encodeRequest

如上,是Dubbo將請(qǐng)求對(duì)象轉(zhuǎn)成字節(jié)流的過程,其中encodeRequestData方法是對(duì)RpcInvocation調(diào)用對(duì)象的編碼,主要是對(duì)接口、方法、方法參數(shù)類型、方法參數(shù)等進(jìn)行編碼,在DubboCodec#encodeRequestData中對(duì)此方法進(jìn)行了重寫:


DubboCodec#encodeRequestData

注意隱式參數(shù)是個(gè)HashMap,timeout和group等動(dòng)態(tài)參數(shù)就放在這里。
處理完編碼請(qǐng)求對(duì)象后,我們繼續(xù)分析編碼響應(yīng)對(duì)象,在ExchangeCodec#encodeResponse中:
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
        int savedWriteIndex = buffer.writerIndex();
        try {
            // 獲取指定或默認(rèn)的序列化協(xié)議(Hessian2)
            Serialization serialization = getSerialization(channel);
            // 構(gòu)造16字節(jié)頭
            byte[] header = new byte[HEADER_LENGTH];
            // 設(shè)置2字節(jié)魔數(shù)
            Bytes.short2bytes(MAGIC, header);
            // 第3個(gè)字節(jié)(19~23)設(shè)置序列化編號(hào)
            header[2] = serialization.getContentTypeId();
            // 18設(shè)置事件標(biāo)識(shí),沒有設(shè)置17調(diào)用方式應(yīng)該是因?yàn)轫憫?yīng)用不上
            if (res.isHeartbeat()) {
                header[2] |= FLAG_EVENT;
            }
            // 第4個(gè)字節(jié)設(shè)置響應(yīng)狀態(tài)
            byte status = res.getStatus();
            header[3] = status;
            // 第5個(gè)字節(jié)設(shè)置id
            Bytes.long2bytes(res.getId(), header, 4);

            // 空出16字節(jié)頭部,后面存儲(chǔ)響應(yīng)體報(bào)文
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            // encode response data or error message.
            if (status == Response.OK) {
                if (res.isHeartbeat()) {
                    encodeEventData(channel, out, res.getResult());
                } else {
                    // 序列化響應(yīng),data一般是Result對(duì)象
                    encodeResponseData(channel, out, res.getResult(), res.getVersion());
                }
            } else {
                out.writeUTF(res.getErrorMessage());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();

            int len = bos.writtenBytes();
            // 檢查是否超過默認(rèn)8MB大小
            checkPayload(channel, len);
            // 寫入消息長度到第13個(gè)字節(jié)
            Bytes.int2bytes(len, header, 12);
            // 定位指針到報(bào)文頭部開始位置
            buffer.writerIndex(savedWriteIndex);
            // 寫入完整報(bào)文頭部到buffer
            buffer.writeBytes(header);
            // 設(shè)置writeIndex到消息體結(jié)束的位置
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        } catch (Throwable t) {
            // 如果編碼失敗,則復(fù)位buffer
            buffer.writerIndex(savedWriteIndex);
            // send error message to Consumer, otherwise, Consumer will wait till timeout.
            // 將編碼響應(yīng)異常返送給consumer,否則客戶端只能等待到超時(shí)
            if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
                Response r = new Response(res.getId(), res.getVersion());
                r.setStatus(Response.BAD_RESPONSE);
                // 數(shù)據(jù)包長度超過限制異常
                if (t instanceof ExceedPayloadLimitException) {
                    logger.warn(t.getMessage(), t);
                    try {
                        r.setErrorMessage(t.getMessage());
                        // 告知客戶端超限的異常信息
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                    }
                } else {
                    // FIXME log error message in Codec and handle in caught() of IoHanndler?
                    logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                    try {
                        r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                        channel.send(r);
                        return;
                    } catch (RemotingException e) {
                        logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                    }
                }
            }

            // Rethrow exception
            if (t instanceof IOException) {
                throw (IOException) t;
            } else if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else if (t instanceof Error) {
                throw (Error) t;
            } else {
                throw new RuntimeException(t.getMessage(), t);
            }
        }
    }

如上,響應(yīng)編碼與請(qǐng)求編碼的邏輯基本大同小異,在編碼出現(xiàn)異常時(shí),會(huì)將異常響應(yīng)返回給客戶端,防止客戶端只能一直等到超時(shí)。為了防止報(bào)錯(cuò)對(duì)象無法在客戶端反序列化,在服務(wù)端會(huì)將異常信息轉(zhuǎn)成字符串處理。對(duì)于響應(yīng)體的編碼,在DubboCodec#encodeResponseData方法中實(shí)現(xiàn):


DubboCodec#encodeResponseData

注意不管什么樣的響應(yīng),都會(huì)先寫入1個(gè)字節(jié)的標(biāo)識(shí)符,具體的值和含義前面已經(jīng)講過。

6.3.2 Dubbo協(xié)議解碼器

解碼相對(duì)更復(fù)雜一些,分為2部分,第一部分是解碼報(bào)文的頭部,第二部分是解碼報(bào)文體內(nèi)容并將其轉(zhuǎn)換成RpcInvocation對(duì)象。我們先看服務(wù)端接受到請(qǐng)求后的解碼過程,具體解碼實(shí)現(xiàn)在ExchangeCodec#decode方法:


ExchangeCodec#decode
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
        // 如果加入流的起始處不是Dubbo魔數(shù),
        if (readable > 0 && header[0] != MAGIC_HIGH
                || readable > 1 && header[1] != MAGIC_LOW) {
            int length = header.length;
            // 如果還有數(shù)據(jù)可以讀
            if (header.length < readable) {
                // 為header分配空間
                header = Bytes.copyOf(header, readable);
                // 將剩余的可讀字節(jié)讀到header中
                buffer.readBytes(header, length, readable - length);
            }
            // 遍歷讀出來的header看有沒有魔數(shù)
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                    // 如果發(fā)現(xiàn)魔數(shù),就把buffer讀索引挪到這個(gè)魔數(shù)的位置,header只保留魔術(shù)之前的數(shù)據(jù)
                    buffer.readerIndex(buffer.readerIndex() - header.length + i);
                    header = Bytes.copyOf(header, i);
                    break;
                }
            }
            // 交給父類TelnetCodec來解碼
            return super.decode(channel, buffer, readable, header);
        }

        // 如果可讀數(shù)據(jù)小于16字節(jié),期待更多數(shù)據(jù)
        if (readable < HEADER_LENGTH) {
            return DecodeResult.NEED_MORE_INPUT;
        }
        // 提取頭部存儲(chǔ)的報(bào)問體長度,并校驗(yàn)是否超過限制
        int len = Bytes.bytes2int(header, 12);
        checkPayload(channel, len);
        // 驗(yàn)證是否可以讀取完整報(bào)文體,不完整則期待更多數(shù)據(jù)
        int tt = len + HEADER_LENGTH;
        if (readable < tt) {
            return DecodeResult.NEED_MORE_INPUT;
        }

        // limit input stream.
        ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

        try {
            // 對(duì)報(bào)文體進(jìn)行解碼,此處is是完整的RPC調(diào)用報(bào)文體
            return decodeBody(channel, is, header);
        } finally {
            // 如果解碼過程又問題,則跳過這次RPC調(diào)用報(bào)文
            if (is.available() > 0) {
                try {
                    if (logger.isWarnEnabled()) {
                        logger.warn("Skip input stream " + is.available());
                    }
                    StreamUtils.skipUnusedStream(is);
                } catch (IOException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }

可以看出,解碼過程中需要解決粘包和半包問題。接下來我們看一下DubboCodec對(duì)消息題解碼的實(shí)現(xiàn):

    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
        byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
        // 從頭部讀取ID
        long id = Bytes.bytes2long(header, 4);
        if ((flag & FLAG_REQUEST) == 0) {
            // 如果是響應(yīng)體
            ...
        } else {
            // 如果是請(qǐng)求體
            // 創(chuàng)建Request對(duì)象
            Request req = new Request(id);
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay((flag & FLAG_TWOWAY) != 0);
            if ((flag & FLAG_EVENT) != 0) {
                req.setEvent(true);
            }
            try {
                Object data;
                if (req.isEvent()) {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    data = decodeEventData(channel, in);
                } else {
                    DecodeableRpcInvocation inv;
                    if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                        // 直接在IO線程進(jìn)行解碼
                        inv = new DecodeableRpcInvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        // 交給Dubbo業(yè)務(wù)線程池解碼
                        inv = new DecodeableRpcInvocation(channel, req,
                                new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                    }
                    data = inv;
                }
                // 把RpcInvocation數(shù)據(jù)放進(jìn)Request
                req.setData(data);
            } catch (Throwable t) {
                if (log.isWarnEnabled()) {
                    log.warn("Decode request failed: " + t.getMessage(), t);
                }
                // 解碼失敗,標(biāo)記并存儲(chǔ)異常
                req.setBroken(true);
                req.setData(t);
            }

            return req;
        }
    

如上,如果默認(rèn)配置在IO線程解碼,直接調(diào)用decode方法;否則不做解碼,延遲到業(yè)務(wù)線程池中解碼。這里沒有提到的是心跳和事件的解碼,其實(shí)很簡單,心跳報(bào)文是沒有消息體的,事件又消息體,在使用Hessian2協(xié)議的情況下默認(rèn)會(huì)傳遞字符R,當(dāng)優(yōu)雅停機(jī)時(shí)會(huì)通過發(fā)送readonly事件來通知客戶端當(dāng)前服務(wù)端不可用。
接下來,我們分析一下如何把消息體轉(zhuǎn)換成RpcInvocation對(duì)象,具體在DecodeableRpcInvocation#decode方法中:

public Object decode(Channel channel, InputStream input) throws IOException {
        ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
                .deserialize(channel.getUrl(), input);

        // 讀取Dubbo版本
        String dubboVersion = in.readUTF();
        request.setVersion(dubboVersion);
        setAttachment(DUBBO_VERSION_KEY, dubboVersion);

        // 讀取調(diào)用接口
        String path = in.readUTF();
        setAttachment(PATH_KEY, path);
        // 讀取接口版本
        setAttachment(VERSION_KEY, in.readUTF());

        // 讀取方法名稱
        setMethodName(in.readUTF());

        // 讀取方法參數(shù)類型
        String desc = in.readUTF();
        setParameterTypesDesc(desc);

        try {
            Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
            Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
            if (desc.length() > 0) {
//                if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
//                    pts = ReflectUtils.desc2classArray(desc);
//                } else {
                ServiceRepository repository = ApplicationModel.getServiceRepository();
                ServiceDescriptor serviceDescriptor = repository.lookupService(path);
                if (serviceDescriptor != null) {
                    MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
                    if (methodDescriptor != null) {
                        pts = methodDescriptor.getParameterClasses();
                        this.setReturnTypes(methodDescriptor.getReturnTypes());
                    }
                }
                if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
                    if (!RpcUtils.isGenericCall(path, getMethodName()) && !RpcUtils.isEcho(path, getMethodName())) {
                        throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
                    }
                    pts = ReflectUtils.desc2classArray(desc);
                }
//                }

                args = new Object[pts.length];
                for (int i = 0; i < args.length; i++) {
                    try {
                        // 循環(huán)讀取方法參數(shù)值
                        args[i] = in.readObject(pts[i]);
                    } catch (Exception e) {
                        if (log.isWarnEnabled()) {
                            log.warn("Decode argument failed: " + e.getMessage(), e);
                        }
                    }
                }
            }
            setParameterTypes(pts);

            // 讀取隱式參數(shù)
            Map<String, Object> map = in.readAttachments();
            if (map != null && map.size() > 0) {
                Map<String, Object> attachment = getObjectAttachments();
                if (attachment == null) {
                    attachment = new HashMap<>();
                }
                attachment.putAll(map);
                setObjectAttachments(attachment);
            }

            // 處理異步參數(shù)回調(diào),如果有則在服務(wù)端創(chuàng)建reference代理實(shí)例
            for (int i = 0; i < args.length; i++) {
                args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
            }

            setArguments(args);
            String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
                    getAttachment(GROUP_KEY),
                    getAttachment(VERSION_KEY));
            setTargetServiceUniqueName(targetServiceName);
        } catch (ClassNotFoundException e) {
            throw new IOException(StringUtils.toString("Read invocation data failed.", e));
        } finally {
            if (in instanceof Cleanable) {
                ((Cleanable) in).cleanup();
            }
        }
        return this;

解碼請(qǐng)求時(shí),嚴(yán)格按照客戶端寫數(shù)據(jù)的順序處理。
解碼響應(yīng)和解碼請(qǐng)求類似,調(diào)用的同樣是DubboCodec#decodeBody,就是上面省略的部分,這里就不贅述了,重點(diǎn)看下響應(yīng)體的解碼,即DecodeableRpcResult#decode方法:


DecodeableRpcResult#decode

6.4 ChannelHandler

如果讀者熟悉Netty,就很容易理解Dubbo內(nèi)部使用的ChannelHandler組件的原理,Dubbo內(nèi)部使用了大量的Handler組成類似鏈表,依次處理具體邏輯,包括編解碼、心跳時(shí)間戳和方法調(diào)用Handler等。因?yàn)镹ettty每次創(chuàng)建Handler都會(huì)經(jīng)過ChannelPipeline,大量的事件經(jīng)過很多Pipeline會(huì)有較多開銷,因此Dubbo會(huì)將多個(gè)Handler聚合成一個(gè)Handler。(個(gè)人表示,這簡直bullshit)

6.5.1 核心Handler和線程模型

Dubbo的Channelhandler有5中狀態(tài):

  • connected Channel已經(jīng)被創(chuàng)建
  • disconnected Channel已經(jīng)被斷開
  • sent 消息被發(fā)送
  • received 消息被接收
  • caught 捕獲到異常

Dubbo針對(duì)每個(gè)特性都會(huì)實(shí)現(xiàn)對(duì)應(yīng)的ChannelHandler,在講解Handler的指責(zé)之前,我們Dubbo有哪些常用的Handler:

  • ExchangeHandlerAdapter 用于查找服務(wù)方法并調(diào)用
  • HeaderExchangeHandler 封裝處理Request/Response和Telnet調(diào)用能力
  • DecodeHandler 支持在Dubbo業(yè)務(wù)線程池中做解碼
  • ChannelHandlerDispatcher 封裝多Handler廣播調(diào)用
  • AllChannelHandler 支持Dubbo業(yè)務(wù)線程池調(diào)用業(yè)務(wù)方法
  • HeartbeatHandler 支持心跳處理
  • MultiMessageHandler 支持流中多消息報(bào)文批處理
  • ConnectionOrderedChannelHandler 單線程池處理TCP的連接和斷開
  • MessageOnlyChannelHandler 僅在線程池處理接受報(bào)文,其他事件在IO線程處理。
  • WrappedChannelHandler 基于內(nèi)存key-value存儲(chǔ)封裝和共享線程池能力。
  • NettyServerHandler 封裝Netty服務(wù)端事件,處理連接、斷開、讀取、寫入和異常等。
  • NettyClientHandler 封裝Netty客戶端事件,處理連接、斷開、讀取、寫入和異常等。

Dubbo提供了大量的Handler去承載特性和擴(kuò)展,這些Handler最終會(huì)和底層通信框架做關(guān)聯(lián),比如Netty等。一次完整的RPC調(diào)用貫穿了一系列的Handler,如果直接掛載到底層通信框架(Netty),因?yàn)檎麄€(gè)鏈路比較長,則需要大量鏈?zhǔn)讲檎液褪录?,不僅低效,而且浪費(fèi)資源。
下圖展示了同時(shí)具有入站和出站ChannelHandler的布局,如果一個(gè)入站事件被觸發(fā),比如連接或數(shù)據(jù)讀取,那么它會(huì)從ChannelPipeline頭部一直傳播到ChannelPipeline的尾端。出站的IO事件將從ChannelPipeline最右邊開始,然后向左傳播。當(dāng)然ChannelPipeline傳播時(shí),會(huì)檢測入站的是否實(shí)現(xiàn)了ChannelInboundHandler,出站會(huì)檢測是否實(shí)現(xiàn)了ChannelOutboundHandler,如果沒有實(shí)現(xiàn),則自動(dòng)跳過。Dubbo框架中實(shí)現(xiàn)這兩個(gè)接口類主要是NettyServerHandler和NettyClientHandler。Dubbo通過裝飾者模式包裝Handler,從而不需要將每個(gè)Handler都追加到Pipeline中。因此NettyServer和NettyClient中最多有3個(gè)Handler,分別是編碼、解碼和NettyHandler。

注意區(qū)分兩種ChannelHandler,一種是netty的inbound和outBoundHandler,只需要3個(gè)編碼、解碼和NettyHandler(這個(gè)應(yīng)該是dubbo使用netty3的實(shí)現(xiàn),netty4的實(shí)現(xiàn)分開NettyServerHandler和NettyClientHandler);另一種是Dubbo定義的SPI接口也叫ChannelHandler,上面羅列的這么多Dubbo定義的ChannelHandler就是實(shí)現(xiàn)的spi接口,包括NettyClient這個(gè)類也實(shí)現(xiàn)了ChannelHandler這個(gè)SPI接口,它利用SPI包裝擴(kuò)展的方式將多個(gè)handler都包裝起來。然后NettyHandler會(huì)持有NettyClient的引用,所以真正的處理邏輯都有NettyClient來處理,進(jìn)而一層層的調(diào)用到每一層包裝的handler,所以整個(gè)調(diào)用鏈看起來會(huì)相當(dāng)?shù)膹?fù)雜。

ChannelHandler

講完Handler的流轉(zhuǎn)機(jī)制后,我們?cè)賮硖接慠PC調(diào)用Provider方處理Handler的邏輯,在DubboProtocol中通過內(nèi)部類繼承自ExchangeHandlerAdapter,完成服務(wù)提供方Invoker實(shí)例的查找并進(jìn)行服務(wù)的真實(shí)調(diào)用。


DubboProtocol內(nèi)部實(shí)現(xiàn)類

如上是觸發(fā)業(yè)務(wù)方法調(diào)用的關(guān)鍵,在服務(wù)暴露時(shí)服務(wù)端已經(jīng)按照特定規(guī)則(端口、接口名、接口版本和接口分組)把實(shí)例Invoker存儲(chǔ)到HashMap中,客戶端調(diào)用過來時(shí)必須攜帶相同信息構(gòu)造的key,找到對(duì)應(yīng)Exporter(里面持有Invoker)然后調(diào)用。
我們先跟蹤getInvoker的實(shí)現(xiàn),會(huì)發(fā)現(xiàn)服務(wù)端唯一標(biāo)識(shí)的服務(wù)由4部分組成:端口、接口名、接口版本和接口分組。


DubboProtocol#getInvoker

Dubbo為了編織這些Handler,適應(yīng)不同的場景,提供了一套可以定制的線程模型。為了使概念更清晰,我們描述的IO線程是指底層直接負(fù)責(zé)讀寫報(bào)文,比如Netty線程池。Dubbo中提供的線程池負(fù)責(zé)業(yè)務(wù)方法調(diào)用,我們稱為業(yè)務(wù)線程。如果一些事件邏輯可以很快執(zhí)行完成,比如做個(gè)標(biāo)記這種簡單操作,則可以直接在IO線程中處理。如果是比較耗時(shí)的處理,比如讀寫數(shù)據(jù)庫等操作,則應(yīng)該將耗時(shí)或者阻塞的任務(wù)轉(zhuǎn)到業(yè)務(wù)線程上執(zhí)行。因?yàn)镮O線程用于接受請(qǐng)求,如果IO線程飽和,則不會(huì)接受新的請(qǐng)求。
我們先看一下Dubbo中是如何實(shí)現(xiàn)線程派發(fā)的:
Dubbo線程模型

如上,Dispatcher是線程池的派發(fā)器。這里需要注意的是,Dispatcher真實(shí)的職責(zé)是創(chuàng)建有線程派發(fā)能力的ChannelHandler,比如AllChannelHandler、MessageOnlyChannelHandler和ExecutionChannelHanlder,其本身并不具備線程派發(fā)能力。
Dispatcher屬于Dubbo中的擴(kuò)展點(diǎn),這個(gè)擴(kuò)展點(diǎn)用來動(dòng)態(tài)產(chǎn)生Handler,以滿足不同的場景,目前Dubbo支持一下6種策略調(diào)用:

分發(fā)策略 分發(fā)實(shí)現(xiàn) 描述
all AllDispatcher 將所有IO事件交給業(yè)務(wù)線程池,Dubbo默認(rèn)啟用
connection ConnectionOrderedDispatcher 單獨(dú)線程池處理連接斷開事件,和業(yè)務(wù)線程池分開
direct DirectDispatcher 所有方法調(diào)用和事件處理在IO線程池,不推薦
execution ExecutionDispatcher 只在業(yè)務(wù)線程池處理接受請(qǐng)求,其他都在IO線程池
message MessageOnlyChannelHandler 只在業(yè)務(wù)線程池處理請(qǐng)求和響應(yīng),其他在IO線程池
mockdispatcher MockDispatcher 默認(rèn)返回null

具體需要按照使用場景不同啟用不同的策略,建議使用默認(rèn)策略,如果在TCP連接中需要做安全或校驗(yàn),則可以使用ConnectionOrderedDispatcher策略。如果引入新的線程池,則不可避免的導(dǎo)致額外的線程切換,用戶可在Dubbo配置中指定dispatcher屬性讓具體策略生效。

6.4.2 Dubbo請(qǐng)求響應(yīng)Hanlder

在Dubbo內(nèi)部,所有方法調(diào)用都被抽象成Request/Response,每次調(diào)用都會(huì)創(chuàng)建一個(gè)Request,如果是方法調(diào)用則返回一個(gè)Response對(duì)象。HeaderExceptionExchangeHandler就是用了處理這種場景,主要負(fù)責(zé)4中事情:
(1) 更新發(fā)送和讀取請(qǐng)求時(shí)間戳。
(2) 判斷請(qǐng)求格式或編解碼是否有錯(cuò),并響應(yīng)客戶端失敗的具體原因。
(3) 處理Request請(qǐng)求和Response正常響應(yīng)。
(4) 支持Telnet調(diào)用。
我們先來看一下HeaderExchangeHandler#received實(shí)現(xiàn):


HeaderExchangeHandler#receive

筆記記到這里,發(fā)現(xiàn)原書中這章就快結(jié)束了,但仍然對(duì)遠(yuǎn)程調(diào)用過程稀里糊涂的。。。就放棄繼續(xù)記錄了,還是期待下一篇看看官網(wǎng)的對(duì)遠(yuǎn)程調(diào)用的講述吧。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容