【Dubbo】網(wǎng)絡(luò)通信

provider、consumer通信原理

未命名文件 (7).png

Consumer發(fā)送原理

-->Result result = invoker.invoke(invocation)
--------------------------------------------------------------------------擴(kuò)展點(diǎn)----------------
                -->InvokerWrapper.invoke
                  -->ProtocolFilterWrapper.invoke
                    -->ConsumerContextFilter.invoke
                      -->ProtocolFilterWrapper.invoke
                        -->MonitorFilter.invoke
                          -->ProtocolFilterWrapper.invoke
                            -->FutureFilter.invoke
                              -->ListenerInvokerWrapper.invoke
                                -->AbstractInvoker.invoke
---------------------------------------------------------------------------擴(kuò)展點(diǎn)---------------
                                  -->doInvoke(invocation)
                                    -->DubboInvoker.doInvoke//為什么DubboInvoker是個(gè)protocol? 因?yàn)镽egistryDirectory.refreshInvoker.toInvokers: protocol.refer
                                      -->ReferenceCountExchangeClient.request
                                        -->HeaderExchangeClient.request
                                          -->HeaderExchangeChannel.request
                                            -->NettyClient.send
                                            -->AbstractPeer.send
                                              -->NettyChannel.send
                                                -->ChannelFuture future = channel.write(message);//最終的目的:通過netty的channel發(fā)送網(wǎng)絡(luò)數(shù)據(jù)

consumer的RegistryDirectory創(chuàng)建DubboInvoker是在zk配置發(fā)生變化回調(diào)RegistryDirectory.notify的時(shí)候。創(chuàng)建DubboInvoker使用的是protocol.refer(serviceType, url)方法,Protocol$Adpative會(huì)有一些Wapper給DubboInvoker添加很多包裝類,所以在調(diào)用鏈中會(huì)有一些filter,這和服務(wù)發(fā)布的時(shí)候是一樣的

#com.alibaba.dubbo.registry.integration.RegistryDirectory#toInvokers
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
                        invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
......
}
#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
......
ResponseFuture future = currentClient.request(inv, timeout) ;
......
}

provider接收、處理、返回

NettyHandler.messageReceived
  -->AbstractPeer.received
    -->MultiMessageHandler.received
      -->HeartbeatHandler.received
        -->AllChannelHandler.received
          -->ChannelEventRunnable.run //線程池 執(zhí)行線程
            -->DecodeHandler.received
              -->HeaderExchangeHandler.received
                -->handleRequest(exchangeChannel, request)//網(wǎng)絡(luò)通信接收處理 
                  -->DubboProtocol.reply
                    -->getInvoker
                      -->exporterMap.get(serviceKey)//從服務(wù)暴露里面提取 
                      -->DubboExporter.getInvoker()//最終得到一個(gè)invoker
-------------------------------------------------------------------------擴(kuò)展點(diǎn)--------------
                    -->ProtocolFilterWrapper.invoke
                      -->EchoFilter.invoke
                        -->ClassLoaderFilter.invoke
                          -->GenericFilter.invoke
                            -->TraceFilter.invoke
                              -->MonitorFilter.invoke
                                -->TimeoutFilter.invoke
                                  -->ExceptionFilter.invoke
                                    -->InvokerWrapper.invoke
-------------------------------------------------------------------------擴(kuò)展點(diǎn)--------------
                                      -->AbstractProxyInvoker.invoke
                                        -->JavassistProxyFactory.AbstractProxyInvoker.doInvoke
                                          --> 進(jìn)入真正執(zhí)行的實(shí)現(xiàn)類   DemoServiceImpl.sayHello
                                        ....................................
                -->channel.send(response);//把接收處理的結(jié)果,發(fā)送回去 
                  -->AbstractPeer.send
                    -->NettyChannel.send
                      -->ChannelFuture future = channel.write(message);//數(shù)據(jù)發(fā)回consumer

Consumer的接收原理

//consumer的接收原理 
NettyHandler.messageReceived
  -->AbstractPeer.received
    -->MultiMessageHandler.received
      -->HeartbeatHandler.received
        -->AllChannelHandler.received
          -->ChannelEventRunnable.run //線程池 執(zhí)行線程
            -->DecodeHandler.received
              -->HeaderExchangeHandler.received
                -->handleResponse(channel, (Response) message);
                  -->HeaderExchangeHandler.handleResponse
                    -->DefaultFuture.received
                      -->DefaultFuture.doReceived
                        private void doReceived(Response res) {
                            lock.lock();
                            try {
                                response = res;
                                if (done != null) {
                                    done.signal();
                                }
                            } finally {
                                lock.unlock();
                            }
                            if (callback != null) {
                                invokeCallback(callback);
                            }
                        }

發(fā)送數(shù)據(jù)是異步的,可以參考netty的案例,通過channel發(fā)送數(shù)據(jù)不能直接拿到結(jié)果,必須通過epoll中的等待回調(diào)事件再取得數(shù)據(jù)

數(shù)據(jù)通信的基本類

  • NettyChannel
    包含一個(gè)send(Object message, boolean sent)方法,內(nèi)置了一個(gè)netty自己的channel可直接發(fā)送數(shù)據(jù)

  • NettyHandler
    與原生netty的事件交互,獲取到netty的事件后會(huì)回調(diào)上層hook的handler

  • ChannelEventRunnable
    接收到的數(shù)據(jù)的處理

  • DecodeHandler
    數(shù)據(jù)的解碼器

  • HeaderExchangeHandler
    是一個(gè)中間層,負(fù)責(zé)將解析好的數(shù)據(jù)與上層業(yè)務(wù)的邏輯的流轉(zhuǎn)
    handleRequest, handleResponse

image.png

異步轉(zhuǎn)同步

dubbo的consumer發(fā)送請求是非阻塞的,不會(huì)等待返回值。provider接收是阻塞的會(huì)等待provider調(diào)用invoker處理完直接返回給consumer。

    public void start() throws Exception {
        for (int i = 0; i < Integer.MAX_VALUE; i ++) {
            try {
                String hello = demoService.sayHello("world" + i);
                System.out.println("[" + new SimpleDateFormat("HH:mm:ss").format(new Date()) + "] " + hello);
            } catch (Exception e) {
                e.printStackTrace();
            }
            Thread.sleep(2000);
        }
    }

dubbo 是基于netty NIO的非阻塞并行調(diào)用通信。(阻塞 非阻塞 異步 同步 區(qū)別 )dubbo 的通信方式 有3類類型:

  1. 異步,有返回值
    <dubbo:method name="sayHello" async="true"> </dubbo:method>
    Future<String> temp= RpcContext.getContext().getFuture(); hello=temp.get();

  2. 異步,無返回值
    <dubbo:method name="sayHello" return="false"></dubbo:method>

  3. 異步,變同步(默認(rèn)的通信方式)
    A.當(dāng)前線程怎么讓它 “暫停,等結(jié)果回來后,再執(zhí)行”?
    B.socket是一個(gè)全雙工的通信方式,那么在多線程的情況下,如何知道那個(gè)返回結(jié)果對應(yīng)原先那條線程的調(diào)用?
    通過一個(gè)全局唯一的ID來做consumer 和 provider 來回傳輸。

單工 全雙工 半雙工 區(qū)別
  • 單工
    在同一時(shí)間只允許一方向另一方傳送信息,而另一方不能向一方傳送
  • 全雙工
    是指在發(fā)送數(shù)據(jù)的同事也能夠接收數(shù)據(jù),兩者同步進(jìn)行,這好像我們平時(shí)打電話一樣,說話的同事也能夠聽到對方的聲音。目前的網(wǎng)卡一般都支持全雙工
  • 半雙工
    所謂的半雙工就是指一個(gè)時(shí)間段內(nèi)只有一個(gè)動(dòng)作發(fā)生,舉個(gè)簡單例子,一條窄窄的馬路,同事只能有一輛車通過,當(dāng)目前有兩輛車對開,這種情況下就只能一輛車先過,到頭后另一輛車再開,這個(gè)例子就形象的說明了半雙工的原理。
同步、異步實(shí)現(xiàn)

異步.有返回值時(shí)會(huì)將能獲取結(jié)果的future放在上下文變量中,如果需要取結(jié)果直接從future中阻塞獲取就可以了;異步,無返回值,則不再處理;同步,有返回值會(huì)直接調(diào)用future.get

#com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker#doInvoke
    protected Result doInvoke(final Invocation invocation) throws Throwable {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            //是否有返回值
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                //2.異步,無返回值
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                //ReferenceCountExchangeClient
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                //1.異步.有返回值
                //ReferenceCountExchangeClient 發(fā)送請求
                ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {//
                //3. 異步,變同步(默認(rèn)的通信方式)
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
}
  • 生成future
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel#request(java.lang.Object, int)
public ResponseFuture request(Object request, int timeout) throws RemotingException {
        // create request.
        Request req = new Request();
        req.setVersion("2.0.0");
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try{
            //AbstractPeer
            channel.send(req);
        }catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }
  • 阻塞
    可以看到所有的DefaultFuture都會(huì)被維護(hù)到FUTURES這個(gè)map中,而且request.getId()為key。當(dāng)我們調(diào)用future.get獲取結(jié)果的時(shí),會(huì)循環(huán)判斷當(dāng)前response是否為空,如果為空就當(dāng)前線程一直等待在done上,直到超時(shí)為止。
public class DefaultFuture implements ResponseFuture {
    private static final Map<Long, DefaultFuture> FUTURES   = new ConcurrentHashMap<Long, DefaultFuture>();
    private final Lock                            lock = new ReentrantLock();
    private final Condition                       done = lock.newCondition();
    private volatile Response                     response;
   public DefaultFuture(Channel channel, Request request, int timeout){
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (! isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (! isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (! isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }
    public boolean isDone() {
        return response != null;
    }
}
  • 喚醒
    當(dāng)NettyHandler獲取到數(shù)據(jù)之后傳遞給HeaderExchangeHandler后,會(huì)調(diào)用DefaultFuture的靜態(tài)方法received來設(shè)置response。會(huì)根據(jù)response的id從FUTURES中獲取創(chuàng)建時(shí)緩存的future實(shí)例。如果獲取到了就調(diào)用 future.doReceived(response)來設(shè)置返回值,并且通知等待在done上的線程,這時(shí)之前阻塞在future.get()方法上的線程就會(huì)立即返回。
#com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeHandler#handleResponse
   static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }
#com.alibaba.dubbo.remoting.exchange.support.DefaultFuture#received
public static void received(Channel channel, Response response) {
        try {
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at " 
                            + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) 
                            + ", response " + response 
                            + (channel == null ? "" : ", channel: " + channel.getLocalAddress() 
                                + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    private void doReceived(Response res) {
        lock.lock();
        try {
            response = res;
            if (done != null) {
                done.signal();
            }
        } finally {
            lock.unlock();
        }
        if (callback != null) {
            invokeCallback(callback);
        }
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容