dubbo剖析:記一個(gè)異步方法調(diào)用的坑

注:文章中的坑出現(xiàn)在2.5.4版本之前,這個(gè)坑在2.5.4版本已經(jīng)得到修復(fù)。

一、問(wèn)題描述

問(wèn)題描述

場(chǎng)景描述,如上圖所示:
客戶端遠(yuǎn)程異步調(diào)用服務(wù)A,服務(wù)A在處理客戶端請(qǐng)求的過(guò)程中需要遠(yuǎn)程同步調(diào)用服務(wù)B服務(wù)A服務(wù)B的響應(yīng)中取數(shù)據(jù)時(shí),得到的是 null !!!

二、原因分析

RPC請(qǐng)求響應(yīng)參數(shù)傳遞過(guò)程

2.1 Client的請(qǐng)求發(fā)送過(guò)程

1)Client在發(fā)起RPC調(diào)用請(qǐng)求前,將請(qǐng)求參數(shù)構(gòu)建成 RpcInvocation ;
2)Client在發(fā)起RPC調(diào)用請(qǐng)求前,會(huì)經(jīng)過(guò)Filter處理:

  • ConsumerContextFilter會(huì)將請(qǐng)求信息,如invoker、invocation、Address等,寫(xiě)入 RpcContext;
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

3)Client在發(fā)起RPC調(diào)用請(qǐng)求前,會(huì)經(jīng)過(guò)AbstractInvoker:

  • AbstractInvoker會(huì)將RpcContext中的attachments內(nèi)容寫(xiě)入到 RpcInvocation,以實(shí)現(xiàn)附加參數(shù)的傳遞;
        Map<String, String> context = RpcContext.getContext().getAttachments();
        if (context != null) {
            invocation.addAttachmentsIfAbsent(context);
        }
  • AbstractInvoker會(huì)從RPC請(qǐng)求參數(shù)URLASYNC_KEY的值,并設(shè)置到RpcInvocationattachment中;
       if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
           invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
       }

4)Client在發(fā)起RPC調(diào)用請(qǐng)求時(shí),會(huì)經(jīng)過(guò)DubboInvoker:

  • DubboInvoker會(huì)優(yōu)先從RpcInvocationattachment中獲取并判斷ASYNC_KEY是否為true,以實(shí)現(xiàn)消費(fèi)端的異步調(diào)用;
    public static boolean isAsync(URL url, Invocation inv) {
        boolean isAsync;
        //如果Java代碼中設(shè)置優(yōu)先.
        if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {
            isAsync = true;
        } else {
            isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
        }
        return isAsync;
    }

5)Client在發(fā)起RPC調(diào)用請(qǐng)求時(shí),會(huì)將RpcInvocation作為調(diào)用參數(shù)傳遞給服務(wù)提供方:

  • RpcInvocation中的擴(kuò)展屬性attachments,實(shí)現(xiàn)了請(qǐng)求調(diào)用擴(kuò)展信息傳遞的功能;

2.2 服務(wù)A的請(qǐng)求接收和執(zhí)行過(guò)程

1)服務(wù)端在接收到RPC請(qǐng)求,調(diào)用真正實(shí)現(xiàn)接口前,會(huì)經(jīng)過(guò)ContextFilter

  • ContextFilter會(huì)將請(qǐng)求信息,如invoker、invocation、Address等,寫(xiě)入 RpcContext;
  • ContextFilter會(huì)將請(qǐng)求參數(shù)RpcInvocationattachments擴(kuò)展信息取出,過(guò)濾掉某些特定KEY之后,將其余擴(kuò)展屬性設(shè)置到當(dāng)前RpcContextattachments中;
        Map<String, String> attachments = invocation.getAttachments();
        if (attachments != null) {
            attachments = new HashMap<String, String>(attachments);
            attachments.remove(Constants.PATH_KEY);
            attachments.remove(Constants.GROUP_KEY);
            attachments.remove(Constants.VERSION_KEY);
            attachments.remove(Constants.DUBBO_VERSION_KEY);
            attachments.remove(Constants.TOKEN_KEY);
            attachments.remove(Constants.TIMEOUT_KEY);
            attachments.remove(Constants.ASYNC_KEY);//清空消費(fèi)端的異步參數(shù),2.5.4版本才新加進(jìn)去的
        }
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setAttachments(attachments)
                .setLocalAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());

其中attachments.remove(Constants.ASYNC_KEY);//清空消費(fèi)端的異步參數(shù) 這行代碼是在dubbo的2.5.4版本才加進(jìn)去的,也就是之前的版本中并沒(méi)有這行代碼。

2)在2.5.4版本之前,對(duì)于Client發(fā)來(lái)的異步調(diào)用請(qǐng)求,其RpcInvocation參數(shù)中包含了ASYNC=trueattachment擴(kuò)展信息:

  • 此時(shí)ASYNC=true的這個(gè)擴(kuò)展信息就會(huì)被設(shè)置到服務(wù)A的RpcContext的擴(kuò)展屬性中;
  • 服務(wù)A處理RPC調(diào)用,執(zhí)行實(shí)際接口實(shí)現(xiàn)類(lèi)的邏輯時(shí),因?yàn)橐蕾嚨?code>服務(wù)B,所以會(huì)繼續(xù)發(fā)送RPC調(diào)用請(qǐng)求給服務(wù)B;
  • 服務(wù)A調(diào)用服務(wù)B時(shí),服務(wù)ARpcContext的擴(kuò)展屬性會(huì)被寫(xiě)入到A -> BRpcInvocation參數(shù)中,這就導(dǎo)致ASYNC=true的擴(kuò)展屬性參數(shù)被誤傳到A -> BRpcInvocation參數(shù)中,進(jìn)而導(dǎo)致在服務(wù)A發(fā)起RPC請(qǐng)求調(diào)用時(shí)觸發(fā)了錯(cuò)誤的異步調(diào)用邏輯;
  • 此時(shí)服務(wù)A獲取到的RPC執(zhí)行結(jié)果RpcResult的內(nèi)容當(dāng)然是個(gè)空;
else if (isAsync) {   //1. 異步,有返回值
                ResponseFuture future = currentClient.request(inv, timeout);
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {    //3. 異步->同步(默認(rèn)的通信方式)
                RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }

以上就是這個(gè)坑的產(chǎn)生原因

三、解決方法

自己寫(xiě)了個(gè)Filter,添加到Dubbo服務(wù)提供方接收請(qǐng)求后、實(shí)際處理請(qǐng)求前的Filter執(zhí)行鏈中。
從請(qǐng)求參數(shù)URL中解析出ASYNC擴(kuò)展參數(shù)標(biāo)識(shí),而不依賴RpcInvocation中的值。

@Activate(group = {Constants.PROVIDER}, order = -999)
public class DubboSyncFilter implements Filter {
    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        //避免RpcContext透?jìng)?使用配置文件的async
        boolean isAsync = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false);
        RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, String.valueOf(isAsync));
        return invoker.invoke(invocation);
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容