路途雖遙遠(yuǎn),將來(lái)更美好
微信公號(hào)號(hào):九點(diǎn)半的馬拉
在傳統(tǒng)模式下,Dubbo消費(fèi)端需要調(diào)用某一遠(yuǎn)程服務(wù)器端的方法時(shí),消費(fèi)端也需要額外導(dǎo)入服務(wù)類接口API,Dubbo也由此實(shí)現(xiàn)了面向接口代理的高性能RPC調(diào)用。
但是當(dāng)服務(wù)消費(fèi)端沒有服務(wù)接口或方法參數(shù)類型時(shí),無(wú)法使用上述的方式進(jìn)行服務(wù)調(diào)用,針對(duì)該場(chǎng)景,Dubbo使用泛化調(diào)用方法進(jìn)行服務(wù)調(diào)用。
Dubbo在進(jìn)行泛化調(diào)用時(shí),將相關(guān)信息封裝到Map對(duì)象中,并利用GenericService接口處理。
舉個(gè)例子
服務(wù)器端配置:
<bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>
<dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>
服務(wù)器端服務(wù)具體實(shí)現(xiàn)類:
public class HelloServiceImpl implements HelloService {
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
CompletableFuture<String> future = new CompletableFuture<>();
new Thread(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
future.complete("sayHelloAsync: " + name);
}).start();
return future;
}
消費(fèi)端配置:
public class GenericCallConsumer {
private static GenericService genericService;
public static void main(String[] args) throws Exception {
ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("generic-call-consumer");
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
applicationConfig.setRegistry(registryConfig);
referenceConfig.setApplication(applicationConfig);
// 開啟泛化
referenceConfig.setGeneric(true);
referenceConfig.setAsync(true);
referenceConfig.setTimeout(7000);
genericService = referenceConfig.get();
invokeAsyncSayHelloAsync();
public static void invokeAsyncSayHelloAsync() throws Exception {
CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",
new String[]{"java.lang.String"}, new Object[]{"world"});
CountDownLatch latch = new CountDownLatch(1);
future.whenComplete((value, t) -> {
System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);
latch.countDown();
});
latch.await();
}
原理分析
消費(fèi)端通過(guò)一個(gè)代理對(duì)象進(jìn)行服務(wù)調(diào)用,
1)執(zhí)行InvokerInvocationHandler#invoke方法,
2)調(diào)用MockClusterInvoker#invoke方法
3)調(diào)用AbstractCluster$InterceptorInvokerNode#invoke方法

這里新增了一個(gè)ClusterInterceptor,與Filter不同,它在一個(gè)特定的地址或invoker被選擇之前的較外層執(zhí)行邏輯,在服務(wù)發(fā)現(xiàn)之前攔截請(qǐng)求。
3.1)調(diào)用ConsumerContextClusterInterceptor#before方法。
從RpcContext設(shè)置invocation,并設(shè)置localAddress和invoker(默認(rèn)FailoverClusterInvoker),清除RpcContext內(nèi)部的SERVER_LOCAL上下文內(nèi)容。
3.2)調(diào)用ClusterInterceptor#intercept方法

默認(rèn)調(diào)用FailoverClusterInvoker#doinvoke方法。
在該方法中從RegistryDirectory中獲取invoker列表,然后獲取負(fù)載均衡LoadBalance(默認(rèn)RandomLoadBalance), 選擇一個(gè)invoker,進(jìn)行服務(wù)調(diào)用。
4)調(diào)用InvokerWrapper#invoke方法,之后執(zhí)行一個(gè)Filter鏈
4.1)調(diào)用ConsumerContextFilter#invoke方法,
設(shè)置RpcContext中的invoker變量問當(dāng)前invoker(默認(rèn)是ProtocolFilterWrapper),設(shè)置invocation。
從RpcContext中獲取timeout-countdown變量,如果存在,則轉(zhuǎn)化為TimeCountDown對(duì)象,判斷該調(diào)用是否超時(shí),如果超時(shí),則返回一個(gè)AsyncRpcResult對(duì)象,記錄一個(gè)異常。
4.2)調(diào)用FutureFilter#invoke方法和MonitorFilter#invoke方法。
4.3)調(diào)用GenericImplFilter#invoke方法,這里是泛化調(diào)用在客戶端的主要核心步驟。
4.3.1)從url中獲取generic字段,調(diào)用方法不是$invoke,也不是$invokeAysnc時(shí):
4.3.1.1)重新創(chuàng)建一個(gè)RpcInvocation,在attributes變量中添加GENERIC_IMPL_MARKER值,設(shè)置為true,其中:attributes變量參數(shù)類型為Map<Object,Object>,并且該變量只在調(diào)用者端,不會(huì)出現(xiàn)在線路上。
4.3.1.2)獲取調(diào)用的方法名,調(diào)用的參數(shù)類型和參數(shù)值,對(duì)參數(shù)類型進(jìn)行解析修改,效果如下:
java.lang.Object[][].class => "java.lang.Object[][]"
4.3.1.3)如果泛化調(diào)用方式為bean方式,遍歷參數(shù)值,并序列化為JavaBeanDescriptor類型數(shù)據(jù);
如果是其他調(diào)用方式,深入對(duì)象,將復(fù)雜類型轉(zhuǎn)化為簡(jiǎn)單類型。
4.3.1.4)如果方法返回類型是CompletableFuture,則設(shè)置方法名為$invokeAysnc;其他情況設(shè)置方法名為$invoke
4.3.1.5)將參數(shù)類型設(shè)置為new Class<?>[]{String.class, String[].class, Object[].class};,這樣轉(zhuǎn)化為傳統(tǒng)的泛化調(diào)用方式,并將參數(shù)值設(shè)置為類似new Object[]{methodName, types, args}的格式。
4.3.2)當(dāng)調(diào)用方法為$invoke或者$invokeAysnc,并且方法參數(shù)變量數(shù)量為3個(gè)時(shí),首先獲取泛化參數(shù),然后判斷泛化調(diào)用方式:
4.3.2.1)如果是nativejava方式,判斷參數(shù)是否為byte[]類型;如果不是,則說(shuō)明參數(shù)傳遞異常。
4.3.2.2)如果是bean方式,則判斷參數(shù)是否為JavaBeanDescriptor類型;如果不是,則說(shuō)明參數(shù)傳遞異常。
4.3.3)在RpcInvocation中的attachment中設(shè)置是否泛化調(diào)用。
4.4) 調(diào)用invoker#invoke方法
當(dāng)遠(yuǎn)程調(diào)用返回結(jié)果時(shí),會(huì)觸發(fā)onResponse方法。
從url中獲取generic參數(shù)值,從invocation中獲取方法名,方法參數(shù)類型,參數(shù)值GENERIC_IMPL是否存在。
如果參數(shù)值GENERIC_IMPL存在,并且為true:
從invoker中獲取接口類型,當(dāng)方法不是$invoke也不是$invokeAysnc,并且接口父類型為GenericService時(shí),從invoker中的interface參數(shù)中獲取真實(shí)的interface,并轉(zhuǎn)化為Class類型。
之后,是所有不同調(diào)用方式的統(tǒng)一處理。
獲取調(diào)用的方法Method,如果調(diào)用方式是Bean方式:
判斷appReponse的value是否為JavaBeanDescriptor類型,如果是,將該value進(jìn)行反序列化,重新賦值;如果不是,則拋出異常。
如果是其他調(diào)用方式,則使用PojoUtils工具類進(jìn)行反序列化。
5)調(diào)用AsyncToSyncInvoker#invoke方法
6)調(diào)用DubboInvoker#invoke方法,發(fā)起遠(yuǎn)程調(diào)用。
服務(wù)端收到請(qǐng)求后,在最終調(diào)用AbstractProxyInvoker#invoke方法之前,會(huì)先執(zhí)行一個(gè)過(guò)濾器鏈,和上述的消費(fèi)端的類似,其中會(huì)經(jīng)過(guò)一個(gè)GenericFilter,該類是服務(wù)端實(shí)現(xiàn)泛化調(diào)用功能的重要步驟。
在GenericFilter中,首先判斷方法名是否為$invoke或者$invokeAsync,由此來(lái)判定是否為泛化調(diào)用,如果方法名不是這兩個(gè),則直接調(diào)用下一個(gè)invoker;如果是,則執(zhí)行下面的邏輯。
獲取參數(shù)名稱、參數(shù)類型、參數(shù)值,通過(guò)反射獲取調(diào)用方法,根據(jù)不同的調(diào)用方式進(jìn)行反序列化,獲取實(shí)際調(diào)用方法的相關(guān)信息,然后將RpcInvocation中的相關(guān)信息進(jìn)行替換:

Dubbo2.7下的超時(shí)機(jī)制
在上述中的ConsumerContextFilter#invoke中涉及到了超時(shí)情況的處理,使用了TimeoutCountDown類,是2020.5.1日提交的信息。
Dubbo2.6版本中,在HeaderExchangeChannel中進(jìn)行遠(yuǎn)程調(diào)用前,會(huì)創(chuàng)建一個(gè)DefaultFuture對(duì)象,里面有一個(gè)靜態(tài)代碼塊,創(chuàng)建一個(gè)線程,執(zhí)行RemotingInvocationTimeoutScan任務(wù),輪詢FUTURES集合,通過(guò)DefaultFuture記錄的開始時(shí)間與當(dāng)前時(shí)間進(jìn)行計(jì)算,判斷是否超時(shí),如果超時(shí),則直接創(chuàng)建一個(gè)超時(shí)的Response,并將該DefaultFuture從FUTURES集合中移除。
當(dāng)服務(wù)端在一定時(shí)間內(nèi)執(zhí)行完邏輯后,會(huì)發(fā)送給客戶端,在此之前,客戶端通過(guò)定時(shí)任務(wù)已經(jīng)將相關(guān)信息從FUTURES集合中移除,所以這次服務(wù)端發(fā)送過(guò)來(lái)的信息在FUTURES集合中查找不到,所以不做處理,服務(wù)端的這次發(fā)送顯得有些多余,對(duì)于客戶端來(lái)說(shuō)是無(wú)用的。
所以在2020.5.1日,提交了上述代碼來(lái)解決這一問題。

在DubboInvoker#doInvoke方法中進(jìn)行遠(yuǎn)程遠(yuǎn)程調(diào)用前,會(huì)計(jì)算timeout。

首先從RpcContext中獲取timeout-countdown變量值
如果為空:
----》 從url中通過(guò)timeout參數(shù)獲取超時(shí)時(shí)間,默認(rèn)是1000,
----》從url中獲取enable-timeout-countdown參數(shù)值,默認(rèn)是false,通過(guò)該參數(shù)開啟新的超時(shí)機(jī)制(使用了上述的TimeoutCountDown)
----》如果開啟了,在attachments變量里添加_TO變量,值為計(jì)算后的timeout
如果不為空:
將其轉(zhuǎn)化為 TimeoutCountDown對(duì)象,計(jì)算剩余的有效時(shí)間,將其設(shè)置為新的timeout,并將其添加到_TO變量,值為計(jì)算后的timeout。

那TimeoutCountDown對(duì)象在什么時(shí)候被創(chuàng)建的呢?
在服務(wù)端的ContextFilter。
從RpcInvocation中獲取“_TO”變量的值,如果不為-1,則在RpcContext中創(chuàng)建一個(gè)TimeoutCountDown。

在后續(xù)的TimeoutFilter中,從RpcContext中獲取TimeoutCountDown,如果超時(shí)了,則清空處理的結(jié)果。

在消費(fèi)端的ConsumerContextFilter中,在進(jìn)行遠(yuǎn)程調(diào)用前,同樣從RpcContext中獲取TimeOutCountDown,當(dāng)過(guò)期時(shí),直接返回一個(gè)異常,而不再進(jìn)行遠(yuǎn)程調(diào)用。

疑問點(diǎn)
但是最后有一個(gè)疑問,即當(dāng)超時(shí)后,服務(wù)端仍然會(huì)發(fā)送給客戶端,雖然結(jié)果已經(jīng)被清空,(可能自己的理解問題)。

下面的這個(gè)建議感覺挺好的,但是在Dubbo在沒有發(fā)現(xiàn)類似的機(jī)制。
