dubbo剖析:五 網(wǎng)絡(luò)通信之 -- 請求發(fā)送與接收

注:文章中使用的dubbo源碼版本為2.5.4

零、文章目錄

  • Consumer發(fā)送請求
  • Provider接收請求并發(fā)送響應(yīng)
  • Consumer接收響應(yīng)

一、Consumer發(fā)送請求

1.1 代碼入口

  • dubbo剖析:二 服務(wù)引用 中講到,服務(wù)引用方根據(jù)引用接口DemoService,使用dubbo的代理工廠類JavassistProxyFactory.getProxy()創(chuàng)建出該接口的動(dòng)態(tài)代理對象。
  • 當(dāng)用戶想調(diào)用DemoService的相關(guān)方法時(shí),實(shí)際是調(diào)用了代理對象的相關(guān)方法,從InvokerInvocationHandler.invoke()進(jìn)入Consumer請求發(fā)送流程。

1.2 整體流程

Consumer發(fā)送請求流程圖
  • 上圖從上往下展示了服務(wù)引用方發(fā)送一個(gè)RPC請求的關(guān)鍵步驟,經(jīng)歷了“代理層”、“集群層”、“過濾監(jiān)聽擴(kuò)展點(diǎn)”、“調(diào)用協(xié)議層”、“信息交換層”、“網(wǎng)絡(luò)傳輸層”。
  • 紫色實(shí)線條表示各層關(guān)鍵類的方法調(diào)用,藍(lán)色虛線表示關(guān)鍵類的初始化過程。

1)代理執(zhí)行(InvokerInvocationHandler.invoke):

  • 服務(wù)引用的過程中,由ReferenceConfig使用JavassistProxyFactory為引用接口創(chuàng)建了代理對象;
  • 服務(wù)引用方調(diào)用dubbo代理類DemoService.sayHello時(shí),實(shí)際執(zhí)行InvokerInvocationHandler.invoke()方法,即這是Consumer發(fā)送請求的起點(diǎn);
  • InvokerInvocationHandler內(nèi)包含一個(gè)Invoker,在JavassistProxyFactory.getProxy()過程中通過其構(gòu)造器注入,該Invoker為一個(gè)集群路由功能的AbstractClusterInvoker

2)集群容錯(cuò)+負(fù)載均衡(AbstractClusterInvoker.invoke):

  • 服務(wù)引用的過程中,由RegistryProtocol使用Cluster.join()創(chuàng)建集群Invoker,ClusterExtensionLoader.getExtensionLoader(Cluster.class).getExtension("mergeable")動(dòng)態(tài)生成;
  • 集群Invoker根據(jù)負(fù)載均衡算法有多種不同實(shí)現(xiàn)類(failover、failfast、failsafe、failback),具體使用哪一種由對應(yīng)的Cluster實(shí)現(xiàn)決定;
  • AbstractClusterInvoker通過Directory.list()方法獲取請求路徑對應(yīng)的Invoker列表;
  • AbstractClusterInvoker再通過LoadBalance.select()方法從多個(gè)Invoker中選取一個(gè)做本次調(diào)用,即負(fù)載均衡算法(Random、RoundRobin、LeastActive);

3)Filter鏈擴(kuò)展點(diǎn)(ProtocolFilterWrapper + ProtocolListenerWrapper):

  • ReferenceConfig進(jìn)行服務(wù)引用的過程中,通過refProtocol.refer()創(chuàng)建Invoker對象;
  • refprotocol.refer()先后經(jīng)過修飾類ProtocolFilterWrapper、ProtocolListenerWrapper,最后執(zhí)行RegistryProtocol;ProtocolFilterWrapperProtocolListenerWrapper就是Dubbo引入的擴(kuò)展點(diǎn);
  • 擴(kuò)展點(diǎn)對請求發(fā)送和接收的核心功能流程無影響,目的是以插件的方式進(jìn)行一些輔助功能處理,這里不再進(jìn)一步展開;

4)調(diào)用協(xié)議層執(zhí)行(AbstractInvoker.invoke):

  • 經(jīng)過集群路由和擴(kuò)展點(diǎn),現(xiàn)在將直接執(zhí)行AbstractInvoker.invoke方法,開始真正的遠(yuǎn)程調(diào)用了;
  • 服務(wù)引用的過程中,由RegistryDirectory使用Protocol.refer()創(chuàng)建遠(yuǎn)程執(zhí)行AbstractInvoker,Protocol默認(rèn)采用default實(shí)現(xiàn),即DubboProtocol;
  • AbstractInvoker有多種協(xié)議的具體實(shí)現(xiàn)(dubbo、rmi、hessian、http),具體使用哪一種協(xié)議由對應(yīng)的Protocol實(shí)現(xiàn)決定,默認(rèn)采用dubbo協(xié)議為DubboInvoker;
  • DubboInvoker中包含了ExchangeClient的引用,通過DubboInvoker的構(gòu)造器注入;

5)交換層執(zhí)行(ExchangeClient.request):

  • 遠(yuǎn)程執(zhí)行Invoker通過其引用的ExchangeClient.request完成遠(yuǎn)程調(diào)用請求的發(fā)送并得到ResponseFuture,然后調(diào)用ResponseFuture.get()得到 遠(yuǎn)程調(diào)用結(jié)果Result ;
  • 服務(wù)引用的過程中,由DubboProtocol使用Exchanger.connect()創(chuàng)建ExchangeClient
  • Exchanger的實(shí)現(xiàn)類為HeaderExchanger,由ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type)動(dòng)態(tài)生成;
  • ExchangeClientClient的基礎(chǔ)上封裝了請求響應(yīng)模式(其以Request、Response、ResponseFuture為核心,后續(xù)單獨(dú)文章講解),這也是交換層的核心功能;

6)網(wǎng)絡(luò)層執(zhí)行(Client.send):

  • 交換層ExchangeClient.request封裝請求響應(yīng)模式后,最終依賴網(wǎng)絡(luò)層Client.send將請求消息通過網(wǎng)絡(luò)發(fā)送給服務(wù)提供方;
  • 服務(wù)引用的過程中,由HeaderExchanger使用Transporter.connect()創(chuàng)建Client并完成初始連接操作,Client有多種網(wǎng)絡(luò)層實(shí)現(xiàn)(netty、mina...),具體使用哪一種由對應(yīng)的Transporter實(shí)現(xiàn)決定;
  • Transporter有多種網(wǎng)絡(luò)層實(shí)現(xiàn)(netty、mina...),由ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension()動(dòng)態(tài)生成,默認(rèn)為NettyTransporter;
  • 最后,NettyClient使用其包含的底層NettyChannel完成網(wǎng)絡(luò)消息發(fā)送的功能;

二、Provider接收請求并發(fā)送響應(yīng)

2.1 代碼入口

  • dubbo剖析:一 服務(wù)發(fā)布 中講到,服務(wù)提供方通過NettyServer完成服務(wù)端創(chuàng)建及監(jiān)聽工作。
  • NettyServerdoOpen()階段創(chuàng)建了網(wǎng)絡(luò)事件處理器NettyHandler,當(dāng)服務(wù)端收到客戶端消息時(shí),將觸發(fā)NettyHandlermessageReceived()方法。

2.2 整體流程

接收請求流程圖
  • 上圖從上往下表示了服務(wù)提供方接收到一個(gè)網(wǎng)絡(luò)請求時(shí)的處理步驟,經(jīng)歷了一個(gè)Handler處理器鏈,鏈中的每個(gè)Handler負(fù)責(zé)實(shí)現(xiàn)自己的處理功能。

1)Netty網(wǎng)絡(luò)事件處理器(NettyHandler):

  • 繼承自Netty的原生網(wǎng)絡(luò)時(shí)間處理器實(shí)現(xiàn)類SimpleChannelHandler,定義了網(wǎng)絡(luò)建連(channelConnected)、斷連(channelDisconnected)、消息接收(messageReceived)、異常(exceptionCaught)等事件處理方法;
  • 維護(hù)了<ip:port, channel>的對應(yīng)關(guān)系Map<String, Channel>channels,在網(wǎng)絡(luò)建連/斷連時(shí)進(jìn)行相應(yīng)put/remove操作,并暴露給NettyServer使用;
  • 接收到網(wǎng)絡(luò)消息時(shí),執(zhí)行messageReceived()方法,將Netty的原生Channel轉(zhuǎn)換為Dubbo封裝的NettyChannel,并將事件傳遞給其包含的ChannelHandler處理;

2)復(fù)合消息處理器(MultiMessageHandler):

    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof MultiMessage) {
            MultiMessage list = (MultiMessage) message;
            for (Object obj : list) {
                handler.received(channel, obj);
            }
        } else {
            handler.received(channel, message);
        }
    }
  • 處理MultiMessage,將其拆分成多個(gè)Message處理;

3)心跳消息處理器(HeartbeatHandler):

  • 消息收發(fā)時(shí)重置當(dāng)前通道的最新消息收發(fā)時(shí)間,用于配合HeaderExchangeServerHeaderExchangeClient中的心跳檢測任務(wù)HeartBeatTask;
  • 攔截并處理心跳請求/響應(yīng)消息。對心跳請求消息,構(gòu)建對應(yīng)的心跳響應(yīng)消息并通過Channel發(fā)送回去;對心跳響應(yīng)消息,僅記錄日志后返回,不做功能上的處理;

4)業(yè)務(wù)線程轉(zhuǎn)換處理器(AllChannelHandler):

  • Dubbo通過該處理器完成了 IO線程業(yè)務(wù)線程 的解耦!
  • 內(nèi)部封裝了業(yè)務(wù)線程池,默認(rèn)使用FixedThreadPool;
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

注意點(diǎn):
a)線程池默認(rèn)業(yè)務(wù)線程數(shù)為200
b)隊(duì)列默認(rèn)采用SynchronousQueue

  • 將接收到的網(wǎng)絡(luò)消息事件封裝成可執(zhí)行任務(wù)ChannelEventRunnable,交由業(yè)務(wù)線程池處理;

5)業(yè)務(wù)解碼處理器(DecodeHandler):

  • 進(jìn)行業(yè)務(wù)請求響應(yīng)的解碼工作;
  • RequestResponse中攜帶的消息體或結(jié)果體,如果其實(shí)現(xiàn)了Decodeable接口,則進(jìn)行一次解碼處理;

6)交換層請求響應(yīng)處理器(HeaderExchangeHandler):

  • 交換層真正完成請求響應(yīng)收發(fā)功能的處理器!
  • 將網(wǎng)絡(luò)層Channel轉(zhuǎn)換為交換層ExchangeChannel,為其增加了請求響應(yīng)方法request()
  • 判斷收到的網(wǎng)絡(luò)消息類型,根據(jù)類型分別執(zhí)行不同的處理邏輯;
            if (message instanceof Request) {
                Request request = (Request) message;
                if (request.isEvent()) {
                    handlerEvent(channel, request);
                } else {
                    //case a: 請求響應(yīng)模型的請求處理
                    if (request.isTwoWay()) {
                        Response response = handleRequest(exchangeChannel, request);
                        channel.send(response);
                    } 
                    //case b: 單向消息接收的處理
                    else {
                        handler.received(exchangeChannel, request.getData());
                    }
                }
            } else if (message instanceof Response) {
                //case c: 請求響應(yīng)模型的響應(yīng)處理
                handleResponse(channel, (Response) message);
            }

a)請求響應(yīng)模型的Request消息:調(diào)用ExchangeHandlerAdapter.reply()獲取執(zhí)行結(jié)果Result -->
將本地執(zhí)行結(jié)果Result封裝成RPC響應(yīng)Response --> 通過channel.send()發(fā)送RPC響應(yīng);

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
        Response res = new Response(req.getId(), req.getVersion());
        Object msg = req.getData();
        try {
            // 調(diào)用```ExchangeHandlerAdapter.reply()```獲取執(zhí)行結(jié)果```Result```
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        //將本地執(zhí)行結(jié)果```Result```封裝成RPC響應(yīng)```Response```
        return res;
    }

b)單向請求消息的處理:調(diào)用ExchangeHandlerAdapter.received()處理請求消息,如果該消息是Invocation則執(zhí)行reply()邏輯但不主動(dòng)發(fā)送RPC響應(yīng)Response;

        public void received(Channel channel, Object message) throws RemotingException {
            if (message instanceof Invocation) {
                reply((ExchangeChannel) channel, message);
            } else {
                super.received(channel, message);
            }
        }

c)請求響應(yīng)模型的Response消息:調(diào)用DefaultFuture.received()處理響應(yīng)消息。
...注:請求響應(yīng)模型(Request,Response,DufaultFuture)相關(guān)后續(xù)專門分析,此處不展開...

7)真正本地實(shí)現(xiàn)類方法的執(zhí)行(ExchangeHandlerAdapter):

  • ExchangeHandlerAdapterDubboProtocol創(chuàng)建,并實(shí)現(xiàn)了reply()方法;
  • reply()方法,實(shí)際通過RPC調(diào)用參數(shù)InvocationDubboProtocol.exporterMap中獲取到對應(yīng)的本地實(shí)現(xiàn)DubboExporter --> 進(jìn)而獲取到對應(yīng)的本地執(zhí)行AbstractProxyInvoker --> 最終通過AbstractProxyInvoker.invoke()方法,以反射的方式執(zhí)行真正實(shí)現(xiàn)類的對應(yīng)方法,完成RPC請求。

三、Consumer接收響應(yīng)

整體流程與 “Provider接收請求” 一樣,唯一的區(qū)別是在 交換層請求響應(yīng)處理器(HeaderExchangeHandler)步驟中會(huì)執(zhí)行 “分支c:請求響應(yīng)模型的Response消息”,將Response交由DefaultFuture處理。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • dubbo暴露服務(wù)有兩種情況,一種是設(shè)置了延遲暴露(比如delay="5000"),另外一種是沒有設(shè)置延遲暴露或者...
    加大裝益達(dá)閱讀 21,413評論 5 36
  • 去年10月Carol帶著三個(gè)小伙伴來上初級班時(shí),本來我也應(yīng)該一起來,可是那時(shí)候我和精油還沒有很好地鏈接上,我就不想...
    曉曉Akatsuki閱讀 811評論 0 0
  • 1. 時(shí)序圖簡介 ??時(shí)序圖(Sequence Diagram)是顯示對象之間交互的圖,這些對象是按時(shí)間順序排列的...
    GuoYuebo閱讀 1,645評論 0 1
  • 我想,最美的愛情是你在想著我的時(shí)候恰好我也在想你,放假這么多天了,每次看著視頻中的你,都會(huì)莫名的想笑??粗闵?..
    丶玩世不恭閱讀 630評論 0 1
  • 是否有一種目光 只有你我才有 在最淡然的相視里 感受最深沉的關(guān)切 是否有一條道路 只有我和你相依而行 在最平凡的腳...
    花倦琳瑯閱讀 203評論 0 2

友情鏈接更多精彩內(nèi)容