前言
前兩篇文章講了消費端代理的生成,最終到請求發(fā)送操作由Invoker來完成。Invoker同時集成了集群服務(wù)發(fā)現(xiàn)和路由功能,還集成了調(diào)用過程中的自定義擴展Filter。Invoker是業(yè)務(wù)對象的分水嶺,請求到達Invoker之前,都是以業(yè)務(wù)接口和方法的方式調(diào)用,就是說調(diào)用方要拿到接口定義的api。Invoker之后就是Exchanger和Transporter層,只存在Request/Response了,在這里接口和方法變成了Request的一個參數(shù)。這篇文章先看一下Dubbo是怎么初始化遠程通信Client并發(fā)送和接收請求的。下一篇將解析通信協(xié)議以及序列化等操作的實現(xiàn)。
Client初始化
回顧之前的白話Dubbo系列,Invoker調(diào)用的是Exchanger。Exchange層針對消費端和服務(wù)提供端分布封裝成ExchangeClient和ExchangeServer。它們大部分接口都是一樣的,只是對于Client來說,支持connect()操作來和提供端建立連接;而對于Server端,需要通過bind操作來監(jiān)聽端口,來接收消費端的連接請求。其它的數(shù)據(jù)發(fā)送和接收對于兩端來說其實是一樣的。
Dubbo Client初始化
還是以Dubbo協(xié)議為例,回顧下上一篇DubboProtocol的Invoker初始化操作:
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
...
...
// 創(chuàng)建Invoker
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
在構(gòu)造Invoker的時候,需要傳入一個Client的列表,之后Invoker通過client發(fā)送請求和接收返回結(jié)果,至于請求協(xié)議打包、連接建立、異步io的結(jié)果接收等操作留給ExchangeClient來實現(xiàn)。下面看下getClients()方法的實現(xiàn):
private ExchangeClient[] getClients(URL url) {
boolean useShareConnect = false;
//連接數(shù)配置
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// 如果沒設(shè)置連接數(shù),說明Consumer希望使用共享連接
if (connections == 0) {
useShareConnect = true;
//共享連接數(shù)配置
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
//獲取共享Client
shareClients = getSharedClient(url, connections);
}
//根據(jù)配置的連接個數(shù)初始化Client
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
//使用共享client
clients[i] = shareClients.get(i);
} else {
//初始化Client
clients[i] = initClient(url);
}
}
return clients;
}
上面的邏輯中涉及到共享Client的概念,因為Dubbo的服務(wù)是以接口為粒度的,每個Invoker對應(yīng)了一個遠程接口的調(diào)用封裝。在實際應(yīng)用中,一個應(yīng)用會包含多個接口,如果對應(yīng)同一個應(yīng)用的多個Invoker每個都初始化一個Client的話,萬一接口過多,會造成每個Consumer和Provider之間建立多個Connection,而且連接數(shù)隨著consumer的個數(shù)增加而成倍數(shù)的增加。所以shareClient的意思就是對于同一個ip+port,所有invoker共享client,有點類似于連接池的概念,達到節(jié)約資源的目的。共享client最終初始化client的方式和普通的是一樣的,所以這里直接看下initClient()方法是如何實現(xiàn)的。
private ExchangeClient initClient(URL url) {
// client通信框架,默認netty
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
//使用Dubbo通信協(xié)議
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// 設(shè)置發(fā)送心跳的間隔
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// 檢查傳輸層是否支持該框架
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// client延遲建立連接,即第一次調(diào)用時才connect,已經(jīng)不推薦使用
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
//初始化并連接server
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
上面的初始化過程除了初始化參數(shù)外,就是調(diào)用工具類Exchangers來初始化client。這里面除了url之外還有一個ExchangeHandler參數(shù),這個是用來處理服務(wù)端主動發(fā)送來的消息用的,對于Consumer發(fā)送請求的場景這里涉及不到。看下Exchangers.connect()是怎么實現(xiàn)的。
public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
if (url == null) {
throw new IllegalArgumentException("url == null");
}
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
具體實現(xiàn)就是根據(jù)url參數(shù)獲取對應(yīng)的Exchanger,然后調(diào)用它的connect方法。Dubbo中默認實現(xiàn)類是HeaderExchanger:
public class HeaderExchanger implements Exchanger {
public static final String NAME = "header";
@Override
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}
@Override
public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}
}
上面實現(xiàn)中,通過調(diào)用connect()方法獲取到ExchangeClient的實現(xiàn)HeaderExchangeClient,這里會通過工具類Transporters獲取到一個傳輸層的Client對象。獲取Client對象時,對傳入的handler,又做了兩層封裝,DecodeHandler用來對收到的Response中的數(shù)據(jù)部分解碼成對象;而HeaderExchangeHandler作用是對于異步返回的Response找到當時的Request,將結(jié)果返給當初的調(diào)用方。
HeaderExchangeClient初始化
public HeaderExchangeClient(Client client, boolean startTimer) {
Assert.notNull(client, "Client can't be null");
this.client = client;
this.channel = new HeaderExchangeChannel(client);
if (startTimer) {
URL url = client.getUrl();
startReconnectTask(url);
startHeartBeatTask(url);
}
}
由上面Client的構(gòu)造函數(shù)可以看出主要做了兩件事,首先將傳入的client實現(xiàn)封裝了一層,client的方法比如request()都是直接調(diào)用的channel的方法;其次啟動了兩個定時任務(wù),一個是在和server端的connection斷開后重連,一個是定時發(fā)送心跳。下面看下HeaderExchangeChannel對client做了一層封裝后,主要干了什么。
@Override
public void send(Object message, boolean sent) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send message " + message + ", cause: The channel " + this + " is closed!");
}
if (message instanceof Request
|| message instanceof Response
|| message instanceof String) {
channel.send(message, sent);
} else {
Request request = new Request();
request.setVersion(Version.getProtocolVersion());
request.setTwoWay(false);
request.setData(message);
channel.send(request, sent);
}
}
@Override
public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
// create request.
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
這個類在發(fā)送前,將調(diào)用參數(shù)封裝成一個request,設(shè)置版本號和請求類型。如果調(diào)用的是request()方法,即需要接收response,以異步請求的方式發(fā)出,返回給調(diào)用方(也就是Invoker)一個future。
請求發(fā)送過程
現(xiàn)在理一下一次請求的發(fā)送過程。在系統(tǒng)初始化階段,創(chuàng)建Invoker的時候會初始化一個和服務(wù)端交互的ExchangeClient,對于Dubbo協(xié)議來說,就是DubboInvoker中包含一個或者多個HeaderExchangeClient。當代理調(diào)用Invoker的invoke()方法發(fā)送請求時,invoker選擇一個Client發(fā)送request。如果是OneWay的請求,調(diào)用send方法,直接返回成功或者失敗的結(jié)果。如果是TwoWay的請求,Client會返回一個Future給invoker,然后client在收到response后,會將收到的結(jié)果set到Future中,調(diào)用方就可以拿到遠程接口的返回值了。
接收請求響應(yīng)
在上面Invoker初始化Client的時候,需要傳入一個ExchangeHandler用來接收異步響應(yīng)回調(diào)。在HeaderExchangeClient初始化的時候,又在handler上面套了兩層,所以最終的關(guān)系圖大概時下面這樣:

在底層的Transporter收到Server端的數(shù)據(jù)并處理后,會將數(shù)據(jù)給到DecodeHandler,這個handler判斷返回的數(shù)據(jù)是否實現(xiàn)了Decodeable接口,是的話就調(diào)用decode()方法并把解碼出的value設(shè)置到Response中。
@Override
public void received(Channel channel, Object message) throws RemotingException {
if (message instanceof Decodeable) {
decode(message);
}
//provider端解碼Request
if (message instanceof Request) {
decode(((Request) message).getData());
}
//consumer端解碼Response
if (message instanceof Response) {
decode(((Response) message).getResult());
}
//調(diào)用下一個handler
handler.received(channel, message);
}
第二個handler是HeaderExchangeHandler,如果是Server端返回的請求響應(yīng),最終會到handleResponse()方法中:
static void handleResponse(Channel channel, Response response) throws RemotingException {
if (response != null && !response.isHeartbeat()) {
DefaultFuture.received(channel, response);
}
}
還記得上面的請求發(fā)送過程嗎?對于TwoWay的請求,request發(fā)出后,調(diào)用方會得到一個Future,這個Future就是在這個Handler這里填充結(jié)果的。這樣一次完整的請求就完成了。
總結(jié)
Invoker將一次遠程方法的調(diào)用封裝成Request后,通過ExchangeClient發(fā)送出去,并通過傳入的ExchangeHandler參數(shù)處理異步返回的Response并和之前的Request關(guān)聯(lián),返回給調(diào)用方。Dubbo這里對于Exchange和Transporter的劃分使用了MEP設(shè)計模式(Message Exchange Pattern),感興趣的話可以看下這個模式的定義。