詳解Dubbo(四):消費端請求發(fā)送Exchanger

前言

前兩篇文章講了消費端代理的生成,最終到請求發(fā)送操作由Invoker來完成。Invoker同時集成了集群服務(wù)發(fā)現(xiàn)和路由功能,還集成了調(diào)用過程中的自定義擴展Filter。Invoker是業(yè)務(wù)對象的分水嶺,請求到達Invoker之前,都是以業(yè)務(wù)接口和方法的方式調(diào)用,就是說調(diào)用方要拿到接口定義的api。Invoker之后就是Exchanger和Transporter層,只存在Request/Response了,在這里接口和方法變成了Request的一個參數(shù)。這篇文章先看一下Dubbo是怎么初始化遠程通信Client并發(fā)送和接收請求的。下一篇將解析通信協(xié)議以及序列化等操作的實現(xiàn)。

Client初始化

回顧之前的白話Dubbo系列,Invoker調(diào)用的是Exchanger。Exchange層針對消費端和服務(wù)提供端分布封裝成ExchangeClientExchangeServer。它們大部分接口都是一樣的,只是對于Client來說,支持connect()操作來和提供端建立連接;而對于Server端,需要通過bind操作來監(jiān)聽端口,來接收消費端的連接請求。其它的數(shù)據(jù)發(fā)送和接收對于兩端來說其實是一樣的。

Dubbo Client初始化

還是以Dubbo協(xié)議為例,回顧下上一篇DubboProtocol的Invoker初始化操作:

@Override
    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        ...
        ...
        // 創(chuàng)建Invoker
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
    }

在構(gòu)造Invoker的時候,需要傳入一個Client的列表,之后Invoker通過client發(fā)送請求和接收返回結(jié)果,至于請求協(xié)議打包、連接建立、異步io的結(jié)果接收等操作留給ExchangeClient來實現(xiàn)。下面看下getClients()方法的實現(xiàn):

private ExchangeClient[] getClients(URL url) {
        boolean useShareConnect = false;
        //連接數(shù)配置
        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        // 如果沒設(shè)置連接數(shù),說明Consumer希望使用共享連接
        if (connections == 0) {
            useShareConnect = true;
            //共享連接數(shù)配置
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            //獲取共享Client
            shareClients = getSharedClient(url, connections);
        }
        //根據(jù)配置的連接個數(shù)初始化Client
        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                //使用共享client
                clients[i] = shareClients.get(i);
            } else {
                //初始化Client
                clients[i] = initClient(url);
            }
        }
        return clients;
    }

上面的邏輯中涉及到共享Client的概念,因為Dubbo的服務(wù)是以接口為粒度的,每個Invoker對應(yīng)了一個遠程接口的調(diào)用封裝。在實際應(yīng)用中,一個應(yīng)用會包含多個接口,如果對應(yīng)同一個應(yīng)用的多個Invoker每個都初始化一個Client的話,萬一接口過多,會造成每個Consumer和Provider之間建立多個Connection,而且連接數(shù)隨著consumer的個數(shù)增加而成倍數(shù)的增加。所以shareClient的意思就是對于同一個ip+port,所有invoker共享client,有點類似于連接池的概念,達到節(jié)約資源的目的。共享client最終初始化client的方式和普通的是一樣的,所以這里直接看下initClient()方法是如何實現(xiàn)的。

private ExchangeClient initClient(URL url) {
        // client通信框架,默認netty
        String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
        //使用Dubbo通信協(xié)議
        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
        // 設(shè)置發(fā)送心跳的間隔
        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
        // 檢查傳輸層是否支持該框架
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported client type: " + str + "," +
                    " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
        }

        ExchangeClient client;
        try {
            // client延遲建立連接,即第一次調(diào)用時才connect,已經(jīng)不推薦使用
            if (url.getParameter(LAZY_CONNECT_KEY, false)) {
                client = new LazyConnectExchangeClient(url, requestHandler);
            } else {
                //初始化并連接server
                client = Exchangers.connect(url, requestHandler);
            }
        } catch (RemotingException e) {
            throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
        }
        return client;
    }

上面的初始化過程除了初始化參數(shù)外,就是調(diào)用工具類Exchangers來初始化client。這里面除了url之外還有一個ExchangeHandler參數(shù),這個是用來處理服務(wù)端主動發(fā)送來的消息用的,對于Consumer發(fā)送請求的場景這里涉及不到。看下Exchangers.connect()是怎么實現(xiàn)的。

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

具體實現(xiàn)就是根據(jù)url參數(shù)獲取對應(yīng)的Exchanger,然后調(diào)用它的connect方法。Dubbo中默認實現(xiàn)類是HeaderExchanger

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    @Override
    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    @Override
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

上面實現(xiàn)中,通過調(diào)用connect()方法獲取到ExchangeClient的實現(xiàn)HeaderExchangeClient,這里會通過工具類Transporters獲取到一個傳輸層的Client對象。獲取Client對象時,對傳入的handler,又做了兩層封裝,DecodeHandler用來對收到的Response中的數(shù)據(jù)部分解碼成對象;而HeaderExchangeHandler作用是對于異步返回的Response找到當時的Request,將結(jié)果返給當初的調(diào)用方。

HeaderExchangeClient初始化

public HeaderExchangeClient(Client client, boolean startTimer) {
        Assert.notNull(client, "Client can't be null");
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);

        if (startTimer) {
            URL url = client.getUrl();
            startReconnectTask(url);
            startHeartBeatTask(url);
        }
    }

由上面Client的構(gòu)造函數(shù)可以看出主要做了兩件事,首先將傳入的client實現(xiàn)封裝了一層,client的方法比如request()都是直接調(diào)用的channel的方法;其次啟動了兩個定時任務(wù),一個是在和server端的connection斷開后重連,一個是定時發(fā)送心跳。下面看下HeaderExchangeChannel對client做了一層封裝后,主要干了什么。

    @Override
    public void send(Object message, boolean sent) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
        }
        if (message instanceof Request
                || message instanceof Response
                || message instanceof String) {
            channel.send(message, sent);
        } else {
            Request request = new Request();
            request.setVersion(Version.getProtocolVersion());
            request.setTwoWay(false);
            request.setData(message);
            channel.send(request, sent);
        }
    }
    @Override
    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

這個類在發(fā)送前,將調(diào)用參數(shù)封裝成一個request,設(shè)置版本號和請求類型。如果調(diào)用的是request()方法,即需要接收response,以異步請求的方式發(fā)出,返回給調(diào)用方(也就是Invoker)一個future。

請求發(fā)送過程

現(xiàn)在理一下一次請求的發(fā)送過程。在系統(tǒng)初始化階段,創(chuàng)建Invoker的時候會初始化一個和服務(wù)端交互的ExchangeClient,對于Dubbo協(xié)議來說,就是DubboInvoker中包含一個或者多個HeaderExchangeClient。當代理調(diào)用Invoker的invoke()方法發(fā)送請求時,invoker選擇一個Client發(fā)送request。如果是OneWay的請求,調(diào)用send方法,直接返回成功或者失敗的結(jié)果。如果是TwoWay的請求,Client會返回一個Future給invoker,然后client在收到response后,會將收到的結(jié)果set到Future中,調(diào)用方就可以拿到遠程接口的返回值了。

接收請求響應(yīng)

在上面Invoker初始化Client的時候,需要傳入一個ExchangeHandler用來接收異步響應(yīng)回調(diào)。在HeaderExchangeClient初始化的時候,又在handler上面套了兩層,所以最終的關(guān)系圖大概時下面這樣:

請求流轉(zhuǎn)

在底層的Transporter收到Server端的數(shù)據(jù)并處理后,會將數(shù)據(jù)給到DecodeHandler,這個handler判斷返回的數(shù)據(jù)是否實現(xiàn)了Decodeable接口,是的話就調(diào)用decode()方法并把解碼出的value設(shè)置到Response中。

@Override
    public void received(Channel channel, Object message) throws RemotingException {
        if (message instanceof Decodeable) {
            decode(message);
        }
        //provider端解碼Request
        if (message instanceof Request) {
            decode(((Request) message).getData());
        }
       //consumer端解碼Response
        if (message instanceof Response) {
            decode(((Response) message).getResult());
        }
       //調(diào)用下一個handler
        handler.received(channel, message);
    }

第二個handler是HeaderExchangeHandler,如果是Server端返回的請求響應(yīng),最終會到handleResponse()方法中:

static void handleResponse(Channel channel, Response response) throws RemotingException {
        if (response != null && !response.isHeartbeat()) {
            DefaultFuture.received(channel, response);
        }
    }

還記得上面的請求發(fā)送過程嗎?對于TwoWay的請求,request發(fā)出后,調(diào)用方會得到一個Future,這個Future就是在這個Handler這里填充結(jié)果的。這樣一次完整的請求就完成了。

總結(jié)

Invoker將一次遠程方法的調(diào)用封裝成Request后,通過ExchangeClient發(fā)送出去,并通過傳入的ExchangeHandler參數(shù)處理異步返回的Response并和之前的Request關(guān)聯(lián),返回給調(diào)用方。Dubbo這里對于Exchange和Transporter的劃分使用了MEP設(shè)計模式(Message Exchange Pattern),感興趣的話可以看下這個模式的定義。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容