Dubbo2.7的泛化調(diào)用和超時(shí)機(jī)制

路途雖遙遠(yuǎn),將來(lái)更美好
微信公號(hào)號(hào):九點(diǎn)半的馬拉

在傳統(tǒng)模式下,Dubbo消費(fèi)端需要調(diào)用某一遠(yuǎn)程服務(wù)器端的方法時(shí),消費(fèi)端也需要額外導(dǎo)入服務(wù)類接口API,Dubbo也由此實(shí)現(xiàn)了面向接口代理的高性能RPC調(diào)用。

但是當(dāng)服務(wù)消費(fèi)端沒有服務(wù)接口或方法參數(shù)類型時(shí),無(wú)法使用上述的方式進(jìn)行服務(wù)調(diào)用,針對(duì)該場(chǎng)景,Dubbo使用泛化調(diào)用方法進(jìn)行服務(wù)調(diào)用。

Dubbo在進(jìn)行泛化調(diào)用時(shí),將相關(guān)信息封裝到Map對(duì)象中,并利用GenericService接口處理。

舉個(gè)例子

服務(wù)器端配置:

<bean id="helloserviceimpl" class="org.apache.dubbo.samples.generic.call.impl.HelloServiceImpl"/>

<dubbo:service interface="org.apache.dubbo.samples.generic.call.api.HelloService" ref="helloserviceimpl"/>

服務(wù)器端服務(wù)具體實(shí)現(xiàn)類:

public class HelloServiceImpl implements HelloService {
    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            future.complete("sayHelloAsync: " + name);
        }).start();
        return future;
    }

消費(fèi)端配置:

public class GenericCallConsumer {
    private static GenericService genericService;
    public static void main(String[] args) throws Exception {
        ApplicationConfig applicationConfig = new ApplicationConfig();
        applicationConfig.setName("generic-call-consumer");
        RegistryConfig registryConfig = new RegistryConfig();
        registryConfig.setAddress("zookeeper://127.0.0.1:2181");
        ReferenceConfig<GenericService> referenceConfig = new ReferenceConfig<>();
    referenceConfig.setInterface("org.apache.dubbo.samples.generic.call.api.HelloService");
        applicationConfig.setRegistry(registryConfig);
        referenceConfig.setApplication(applicationConfig);
        // 開啟泛化
        referenceConfig.setGeneric(true);
        referenceConfig.setAsync(true);
        referenceConfig.setTimeout(7000);
        genericService = referenceConfig.get();
        invokeAsyncSayHelloAsync();
public static void invokeAsyncSayHelloAsync() throws Exception {
        CompletableFuture<Object> future = genericService.$invokeAsync("sayHelloAsync",
                new String[]{"java.lang.String"}, new Object[]{"world"});
        CountDownLatch latch = new CountDownLatch(1);
        future.whenComplete((value, t) -> {
           System.err.println("invokeAsyncSayHelloAsync(whenComplete): " + value);
            latch.countDown();
        });
        latch.await();
}

原理分析

消費(fèi)端通過(guò)一個(gè)代理對(duì)象進(jìn)行服務(wù)調(diào)用,

1)執(zhí)行InvokerInvocationHandler#invoke方法,

2)調(diào)用MockClusterInvoker#invoke方法

3)調(diào)用AbstractCluster$InterceptorInvokerNode#invoke方法

code1.png

這里新增了一個(gè)ClusterInterceptor,與Filter不同,它在一個(gè)特定的地址或invoker被選擇之前的較外層執(zhí)行邏輯,在服務(wù)發(fā)現(xiàn)之前攔截請(qǐng)求。

3.1)調(diào)用ConsumerContextClusterInterceptor#before方法。
從RpcContext設(shè)置invocation,并設(shè)置localAddress和invoker(默認(rèn)FailoverClusterInvoker),清除RpcContext內(nèi)部的SERVER_LOCAL上下文內(nèi)容。

3.2)調(diào)用ClusterInterceptor#intercept方法

code2.png

默認(rèn)調(diào)用FailoverClusterInvoker#doinvoke方法。

在該方法中從RegistryDirectory中獲取invoker列表,然后獲取負(fù)載均衡LoadBalance(默認(rèn)RandomLoadBalance), 選擇一個(gè)invoker,進(jìn)行服務(wù)調(diào)用。

4)調(diào)用InvokerWrapper#invoke方法,之后執(zhí)行一個(gè)Filter鏈

4.1)調(diào)用ConsumerContextFilter#invoke方法,

設(shè)置RpcContext中的invoker變量問當(dāng)前invoker(默認(rèn)是ProtocolFilterWrapper),設(shè)置invocation。

從RpcContext中獲取timeout-countdown變量,如果存在,則轉(zhuǎn)化為TimeCountDown對(duì)象,判斷該調(diào)用是否超時(shí),如果超時(shí),則返回一個(gè)AsyncRpcResult對(duì)象,記錄一個(gè)異常。

4.2)調(diào)用FutureFilter#invoke方法和MonitorFilter#invoke方法。

4.3)調(diào)用GenericImplFilter#invoke方法,這里是泛化調(diào)用在客戶端的主要核心步驟

4.3.1)從url中獲取generic字段,調(diào)用方法不是$invoke,也不是$invokeAysnc時(shí):

4.3.1.1)重新創(chuàng)建一個(gè)RpcInvocation,在attributes變量中添加GENERIC_IMPL_MARKER值,設(shè)置為true,其中:attributes變量參數(shù)類型為Map<Object,Object>,并且該變量只在調(diào)用者端,不會(huì)出現(xiàn)在線路上。

4.3.1.2)獲取調(diào)用的方法名,調(diào)用的參數(shù)類型和參數(shù)值,對(duì)參數(shù)類型進(jìn)行解析修改,效果如下:

java.lang.Object[][].class => "java.lang.Object[][]"

4.3.1.3)如果泛化調(diào)用方式為bean方式,遍歷參數(shù)值,并序列化為JavaBeanDescriptor類型數(shù)據(jù);

如果是其他調(diào)用方式,深入對(duì)象,將復(fù)雜類型轉(zhuǎn)化為簡(jiǎn)單類型。

4.3.1.4)如果方法返回類型是CompletableFuture,則設(shè)置方法名為$invokeAysnc;其他情況設(shè)置方法名為$invoke

4.3.1.5)將參數(shù)類型設(shè)置為new Class<?>[]{String.class, String[].class, Object[].class};,這樣轉(zhuǎn)化為傳統(tǒng)的泛化調(diào)用方式,并將參數(shù)值設(shè)置為類似new Object[]{methodName, types, args}的格式。

4.3.2)當(dāng)調(diào)用方法為$invoke或者$invokeAysnc,并且方法參數(shù)變量數(shù)量為3個(gè)時(shí),首先獲取泛化參數(shù),然后判斷泛化調(diào)用方式:

4.3.2.1)如果是nativejava方式,判斷參數(shù)是否為byte[]類型;如果不是,則說(shuō)明參數(shù)傳遞異常。

4.3.2.2)如果是bean方式,則判斷參數(shù)是否為JavaBeanDescriptor類型;如果不是,則說(shuō)明參數(shù)傳遞異常。

4.3.3)在RpcInvocation中的attachment中設(shè)置是否泛化調(diào)用。

4.4) 調(diào)用invoker#invoke方法

當(dāng)遠(yuǎn)程調(diào)用返回結(jié)果時(shí),會(huì)觸發(fā)onResponse方法。

從url中獲取generic參數(shù)值,從invocation中獲取方法名,方法參數(shù)類型,參數(shù)值GENERIC_IMPL是否存在。

如果參數(shù)值GENERIC_IMPL存在,并且為true:
從invoker中獲取接口類型,當(dāng)方法不是$invoke也不是$invokeAysnc,并且接口父類型為GenericService時(shí),從invoker中的interface參數(shù)中獲取真實(shí)的interface,并轉(zhuǎn)化為Class類型。

之后,是所有不同調(diào)用方式的統(tǒng)一處理。

獲取調(diào)用的方法Method,如果調(diào)用方式是Bean方式:
判斷appReponse的value是否為JavaBeanDescriptor類型,如果是,將該value進(jìn)行反序列化,重新賦值;如果不是,則拋出異常。

如果是其他調(diào)用方式,則使用PojoUtils工具類進(jìn)行反序列化。

5)調(diào)用AsyncToSyncInvoker#invoke方法

6)調(diào)用DubboInvoker#invoke方法,發(fā)起遠(yuǎn)程調(diào)用。

服務(wù)端收到請(qǐng)求后,在最終調(diào)用AbstractProxyInvoker#invoke方法之前,會(huì)先執(zhí)行一個(gè)過(guò)濾器鏈,和上述的消費(fèi)端的類似,其中會(huì)經(jīng)過(guò)一個(gè)GenericFilter,該類是服務(wù)端實(shí)現(xiàn)泛化調(diào)用功能的重要步驟。

GenericFilter中,首先判斷方法名是否為$invoke或者$invokeAsync,由此來(lái)判定是否為泛化調(diào)用,如果方法名不是這兩個(gè),則直接調(diào)用下一個(gè)invoker;如果是,則執(zhí)行下面的邏輯。

獲取參數(shù)名稱、參數(shù)類型、參數(shù)值,通過(guò)反射獲取調(diào)用方法,根據(jù)不同的調(diào)用方式進(jìn)行反序列化,獲取實(shí)際調(diào)用方法的相關(guān)信息,然后將RpcInvocation中的相關(guān)信息進(jìn)行替換:

code3.png

Dubbo2.7下的超時(shí)機(jī)制

在上述中的ConsumerContextFilter#invoke中涉及到了超時(shí)情況的處理,使用了TimeoutCountDown類,是2020.5.1日提交的信息。

Dubbo2.6版本中,在HeaderExchangeChannel中進(jìn)行遠(yuǎn)程調(diào)用前,會(huì)創(chuàng)建一個(gè)DefaultFuture對(duì)象,里面有一個(gè)靜態(tài)代碼塊,創(chuàng)建一個(gè)線程,執(zhí)行RemotingInvocationTimeoutScan任務(wù),輪詢FUTURES集合,通過(guò)DefaultFuture記錄的開始時(shí)間與當(dāng)前時(shí)間進(jìn)行計(jì)算,判斷是否超時(shí),如果超時(shí),則直接創(chuàng)建一個(gè)超時(shí)的Response,并將該DefaultFuture從FUTURES集合中移除。

當(dāng)服務(wù)端在一定時(shí)間內(nèi)執(zhí)行完邏輯后,會(huì)發(fā)送給客戶端,在此之前,客戶端通過(guò)定時(shí)任務(wù)已經(jīng)將相關(guān)信息從FUTURES集合中移除,所以這次服務(wù)端發(fā)送過(guò)來(lái)的信息在FUTURES集合中查找不到,所以不做處理,服務(wù)端的這次發(fā)送顯得有些多余,對(duì)于客戶端來(lái)說(shuō)是無(wú)用的。

所以在2020.5.1日,提交了上述代碼來(lái)解決這一問題。

code4.png

DubboInvoker#doInvoke方法中進(jìn)行遠(yuǎn)程遠(yuǎn)程調(diào)用前,會(huì)計(jì)算timeout。

code5.png

首先從RpcContext中獲取timeout-countdown變量值

如果為空:
----》 從url中通過(guò)timeout參數(shù)獲取超時(shí)時(shí)間,默認(rèn)是1000,
----》從url中獲取enable-timeout-countdown參數(shù)值,默認(rèn)是false,通過(guò)該參數(shù)開啟新的超時(shí)機(jī)制(使用了上述的TimeoutCountDown)
----》如果開啟了,在attachments變量里添加_TO變量,值為計(jì)算后的timeout

如果不為空:
將其轉(zhuǎn)化為 TimeoutCountDown對(duì)象,計(jì)算剩余的有效時(shí)間,將其設(shè)置為新的timeout,并將其添加到_TO變量,值為計(jì)算后的timeout。

code6.png

那TimeoutCountDown對(duì)象在什么時(shí)候被創(chuàng)建的呢?

在服務(wù)端的ContextFilter。

從RpcInvocation中獲取“_TO”變量的值,如果不為-1,則在RpcContext中創(chuàng)建一個(gè)TimeoutCountDown。

code7.png

在后續(xù)的TimeoutFilter中,從RpcContext中獲取TimeoutCountDown,如果超時(shí)了,則清空處理的結(jié)果。

code8.png

在消費(fèi)端的ConsumerContextFilter中,在進(jìn)行遠(yuǎn)程調(diào)用前,同樣從RpcContext中獲取TimeOutCountDown,當(dāng)過(guò)期時(shí),直接返回一個(gè)異常,而不再進(jìn)行遠(yuǎn)程調(diào)用。

code10.png

疑問點(diǎn)

但是最后有一個(gè)疑問,即當(dāng)超時(shí)后,服務(wù)端仍然會(huì)發(fā)送給客戶端,雖然結(jié)果已經(jīng)被清空,(可能自己的理解問題)。

code11.png

下面的這個(gè)建議感覺挺好的,但是在Dubbo在沒有發(fā)現(xiàn)類似的機(jī)制。


code12.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容