注:文章中的坑出現(xiàn)在2.5.4版本之前,這個(gè)坑在2.5.4版本已經(jīng)得到修復(fù)。
一、問(wèn)題描述

場(chǎng)景描述,如上圖所示:
客戶端遠(yuǎn)程異步調(diào)用服務(wù)A,服務(wù)A在處理客戶端請(qǐng)求的過(guò)程中需要遠(yuǎn)程同步調(diào)用服務(wù)B,服務(wù)A從服務(wù)B的響應(yīng)中取數(shù)據(jù)時(shí),得到的是 null !!!
二、原因分析

2.1 Client的請(qǐng)求發(fā)送過(guò)程
1)Client在發(fā)起RPC調(diào)用請(qǐng)求前,將請(qǐng)求參數(shù)構(gòu)建成 RpcInvocation ;
2)Client在發(fā)起RPC調(diào)用請(qǐng)求前,會(huì)經(jīng)過(guò)Filter處理:
-
ConsumerContextFilter會(huì)將請(qǐng)求信息,如invoker、invocation、Address等,寫(xiě)入RpcContext;
@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setLocalAddress(NetUtils.getLocalHost(), 0)
.setRemoteAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
if (invocation instanceof RpcInvocation) {
((RpcInvocation) invocation).setInvoker(invoker);
}
try {
return invoker.invoke(invocation);
} finally {
RpcContext.getContext().clearAttachments();
}
}
}
3)Client在發(fā)起RPC調(diào)用請(qǐng)求前,會(huì)經(jīng)過(guò)AbstractInvoker:
-
AbstractInvoker會(huì)將RpcContext中的attachments內(nèi)容寫(xiě)入到RpcInvocation,以實(shí)現(xiàn)附加參數(shù)的傳遞;
Map<String, String> context = RpcContext.getContext().getAttachments();
if (context != null) {
invocation.addAttachmentsIfAbsent(context);
}
-
AbstractInvoker會(huì)從RPC請(qǐng)求參數(shù)URL中ASYNC_KEY的值,并設(shè)置到RpcInvocation的attachment中;
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
4)Client在發(fā)起RPC調(diào)用請(qǐng)求時(shí),會(huì)經(jīng)過(guò)DubboInvoker:
- DubboInvoker會(huì)優(yōu)先從
RpcInvocation的attachment中獲取并判斷ASYNC_KEY是否為true,以實(shí)現(xiàn)消費(fèi)端的異步調(diào)用;
public static boolean isAsync(URL url, Invocation inv) {
boolean isAsync;
//如果Java代碼中設(shè)置優(yōu)先.
if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {
isAsync = true;
} else {
isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);
}
return isAsync;
}
5)Client在發(fā)起RPC調(diào)用請(qǐng)求時(shí),會(huì)將RpcInvocation作為調(diào)用參數(shù)傳遞給服務(wù)提供方:
-
RpcInvocation中的擴(kuò)展屬性attachments,實(shí)現(xiàn)了請(qǐng)求調(diào)用擴(kuò)展信息傳遞的功能;
2.2 服務(wù)A的請(qǐng)求接收和執(zhí)行過(guò)程
1)服務(wù)端在接收到RPC請(qǐng)求,調(diào)用真正實(shí)現(xiàn)接口前,會(huì)經(jīng)過(guò)ContextFilter。
-
ContextFilter會(huì)將請(qǐng)求信息,如invoker、invocation、Address等,寫(xiě)入RpcContext; -
ContextFilter會(huì)將請(qǐng)求參數(shù)RpcInvocation的attachments擴(kuò)展信息取出,過(guò)濾掉某些特定KEY之后,將其余擴(kuò)展屬性設(shè)置到當(dāng)前RpcContext的attachments中;
Map<String, String> attachments = invocation.getAttachments();
if (attachments != null) {
attachments = new HashMap<String, String>(attachments);
attachments.remove(Constants.PATH_KEY);
attachments.remove(Constants.GROUP_KEY);
attachments.remove(Constants.VERSION_KEY);
attachments.remove(Constants.DUBBO_VERSION_KEY);
attachments.remove(Constants.TOKEN_KEY);
attachments.remove(Constants.TIMEOUT_KEY);
attachments.remove(Constants.ASYNC_KEY);//清空消費(fèi)端的異步參數(shù),2.5.4版本才新加進(jìn)去的
}
RpcContext.getContext()
.setInvoker(invoker)
.setInvocation(invocation)
.setAttachments(attachments)
.setLocalAddress(invoker.getUrl().getHost(),
invoker.getUrl().getPort());
其中attachments.remove(Constants.ASYNC_KEY);//清空消費(fèi)端的異步參數(shù) 這行代碼是在dubbo的2.5.4版本才加進(jìn)去的,也就是之前的版本中并沒(méi)有這行代碼。
2)在2.5.4版本之前,對(duì)于Client發(fā)來(lái)的異步調(diào)用請(qǐng)求,其RpcInvocation參數(shù)中包含了ASYNC=true的attachment擴(kuò)展信息:
- 此時(shí)
ASYNC=true的這個(gè)擴(kuò)展信息就會(huì)被設(shè)置到服務(wù)A的RpcContext的擴(kuò)展屬性中; - 在
服務(wù)A處理RPC調(diào)用,執(zhí)行實(shí)際接口實(shí)現(xiàn)類(lèi)的邏輯時(shí),因?yàn)橐蕾嚨?code>服務(wù)B,所以會(huì)繼續(xù)發(fā)送RPC調(diào)用請(qǐng)求給服務(wù)B; -
服務(wù)A調(diào)用服務(wù)B時(shí),服務(wù)A的RpcContext的擴(kuò)展屬性會(huì)被寫(xiě)入到A -> B的RpcInvocation參數(shù)中,這就導(dǎo)致ASYNC=true的擴(kuò)展屬性參數(shù)被誤傳到A -> B的RpcInvocation參數(shù)中,進(jìn)而導(dǎo)致在服務(wù)A發(fā)起RPC請(qǐng)求調(diào)用時(shí)觸發(fā)了錯(cuò)誤的異步調(diào)用邏輯; - 此時(shí)
服務(wù)A獲取到的RPC執(zhí)行結(jié)果RpcResult的內(nèi)容當(dāng)然是個(gè)空;
else if (isAsync) { //1. 異步,有返回值
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else { //3. 異步->同步(默認(rèn)的通信方式)
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
以上就是這個(gè)坑的產(chǎn)生原因
三、解決方法
自己寫(xiě)了個(gè)Filter,添加到Dubbo服務(wù)提供方接收請(qǐng)求后、實(shí)際處理請(qǐng)求前的Filter執(zhí)行鏈中。
從請(qǐng)求參數(shù)URL中解析出ASYNC擴(kuò)展參數(shù)標(biāo)識(shí),而不依賴RpcInvocation中的值。
@Activate(group = {Constants.PROVIDER}, order = -999)
public class DubboSyncFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
//避免RpcContext透?jìng)?使用配置文件的async
boolean isAsync = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false);
RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, String.valueOf(isAsync));
return invoker.invoke(invocation);
}
}