基于dubbo實(shí)現(xiàn)異步調(diào)用(1)

1.前言

Java中常見的實(shí)現(xiàn)異步調(diào)用的方式:
1.ThreadPool
2.CompletableFuture
3.MQ
4.BlockingQueue
5.Fork/Join

那么作為一款優(yōu)秀的RPC框架,dubbo是如何實(shí)現(xiàn)異步調(diào)用的呢?本文將介紹2.6.x版本以來dubbo異步調(diào)用方式的演進(jìn)。
1.增加consumer配置
2.參數(shù)回調(diào)(2.7.0已廢棄,本文將不展開)
3.事件通知
4.直接定義返回CompletableFuture的服務(wù)接口
5.利用AsyncFor注解實(shí)現(xiàn)客戶端的同步轉(zhuǎn)異步
6.利用RpcContext.startAsync()實(shí)現(xiàn)服務(wù)端的同步轉(zhuǎn)異步
其中前面3種方式在2.6.x版本中就已支持,但參數(shù)回調(diào)在2.7.0版本中已廢棄,后面3種則是在2.7.0版本中新增的方式。

2.基于dubbo實(shí)現(xiàn)異步調(diào)用

2.1 增加consumer配置

這種方式很簡單,只需要在服務(wù)引用時增加<dubbo:method>配置即可,如下所示,其中name為需要異步調(diào)用的方法名,async表示是否啟用異步調(diào)用。

<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
    <dubbo:method name="sayHello" async="true" />
</dubbo:reference>

此時consumer端有3種調(diào)用方式:

  • 由于配置了異步調(diào)用,因此此時直接調(diào)用將返回null:
String result = asyncService.sayHello("world");
  • 通過RpcContext獲取Future對象,調(diào)用get方法時阻塞知道返回結(jié)果:
asyncService.sayHello("world");
Future<String> future = RpcContext.getContext().getFuture();
String result = future.get();
  • 通過ResponseFuture設(shè)置回調(diào),執(zhí)行完成會回調(diào)done方法,拋異常則會回調(diào)caught方法:
asyncService.sayHello("world");
ResponseFuture responseFuture = ((FutureAdapter)RpcContext.getContext().getFuture()).getFuture();
responseFuture.setCallback(new ResponseCallback() {
    @Override
    public void done(Object response) {
        System.out.println("done");
    }

    @Override
    public void caught(Throwable exception) {
        System.out.println("caught");
    }
});

try {
    System.out.println("result = " + responseFuture.get());
} catch (RemotingException e) {
    e.printStackTrace();
}

如果只想異步調(diào)用,不需要返回值,則可以配置 return="false",這樣可以避免Future對象的創(chuàng)建,此時RpcContext.getContext().getFuture()將返回null;

2.2 直接定義返回CompletableFuture的服務(wù)接口

在上述方式中,想獲取異步調(diào)用的結(jié)果,需要從RpcContext中獲取,使用起來不是很方便?;趈ava 8中引入的CompletableFuture,dubbo在2.7.0版本中也增加了對CompletableFuture的支持,我們可以直接定義一個返回CompletableFuture類型的接口。

public interface AsyncService {

    String sayHello(String name);

    CompletableFuture<String> sayHelloAsync(String name);
}

服務(wù)端實(shí)現(xiàn)如下:

public class AsyncServiceImpl implements AsyncService {

    @Override
    public String sayHello(String name) {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return name;
    }


    @Override
    public CompletableFuture<String> sayHelloAsync(String name) {
        return CompletableFuture.supplyAsync(() -> name);
    }
}

如此一來,我們就實(shí)現(xiàn)了服務(wù)端的異步,客戶端直接調(diào)用接口即可,不需要再從RpcContext中獲取返回值:

CompletableFuture<String> completableFuture = asyncService.sayHelloAsync("async");
String result = completableFuture.get();
2.3 事件通知

dubbo允許consumer 端在調(diào)用之前、調(diào)用之后或出現(xiàn)異常時,觸發(fā) oninvoke、onreturn、onthrow 三個事件。類似于Spring中的前置增強(qiáng)、后置增強(qiáng)和異常拋出增強(qiáng)。只需要在服務(wù)引用時,增加以下配置指定事件通知的方法即可:

<dubbo:reference id="asyncService" check="false" interface="com.alibaba.dubbo.demo.AsyncService" url="localhost:20880">
    <dubbo:method name="sayHello" 
                  oninvoke="notifyServiceImpl.onInvoke" 
                  onreturn="notifyServiceImpl.onReturn" 
                  onthrow="notifyServiceImpl.onThrow" />
</dubbo:reference>

事件通知服務(wù)如下:

public class NotifyServiceImpl implements NotifyService {

    // 方法參數(shù)與調(diào)用方法參數(shù)相同
    @Override
    public void onInvoke(String name) {
        System.out.println("onInvoke: " + name);
    }

    // 第一個參數(shù)為調(diào)用方法的返回值,其余為調(diào)用方法的參數(shù)
    @Override
    public void onReturn(String retName, String name) {
        System.out.println("onReturn: " + name);
    }

    // 第一個參數(shù)為調(diào)用異常,其余為調(diào)用方法的參數(shù)
    @Override
    public void onThrow(Throwable ex, String name) {
        System.out.println("onThrow: " + name);
    }
}

與Spring增強(qiáng)不同的是,dubbo中的事件通知也可以是異步,只需要將調(diào)用方法配置為async="true"即可,但oninvoke方法無法異步執(zhí)行。

2.4 異步調(diào)用源碼分析

dubbo中的異步調(diào)用實(shí)際上是通過引入一個FutureFilter來實(shí)現(xiàn)的,關(guān)鍵源碼如下。

2.4.1 調(diào)用前獲取方法信息
@Activate(group = Constants.CONSUMER)
public class FutureFilter implements PostProcessFilter {

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    @Override
    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        fireInvokeCallback(invoker, invocation);
        // need to configure if there's return value before the invocation in order to help invoker to judge if it's
        // necessary to return future.
        return postProcessResult(invoker.invoke(invocation), invoker, invocation);
    }
    ...
}

在fireInvokeCallback()方法中,會首先調(diào)用getAsyncMethodInfo()獲取目標(biāo)方法的方法信息,看是否有配置事件通知:

private ConsumerMethodModel.AsyncMethodInfo getAsyncMethodInfo(Invoker<?> invoker, Invocation invocation) {
    // 首先獲取消費(fèi)者信息
    final ConsumerModel consumerModel = ApplicationModel.getConsumerModel(invoker.getUrl().getServiceKey());
    if (consumerModel == null) {
        return null;
    }

    // 獲取消費(fèi)者對應(yīng)的方法信息
    ConsumerMethodModel methodModel = consumerModel.getMethodModel(invocation.getMethodName());
    if (methodModel == null) {
        return null;
    }

    // 獲取消費(fèi)者對應(yīng)方法的事件信息,即是否有配置事件通知
    final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = methodModel.getAsyncInfo();
    if (asyncMethodInfo == null) {
        return null;
    }
    return asyncMethodInfo;
}
2.4.2 同步觸發(fā)oninvoke事件

獲取到調(diào)用方法對應(yīng)的信息后,回到fireInvokeCallback()方法:

private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
    final ConsumerMethodModel.AsyncMethodInfo asyncMethodInfo = getAsyncMethodInfo(invoker, invocation);
    if (asyncMethodInfo == null) {
        return;
    }

    // 獲取事件配置信息
    final Method onInvokeMethod = asyncMethodInfo.getOninvokeMethod();
    final Object onInvokeInst = asyncMethodInfo.getOninvokeInstance();

    if (onInvokeMethod == null && onInvokeInst == null) {
        return;
    }
    if (onInvokeMethod == null || onInvokeInst == null) {
        throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a oninvoke callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl());
    }
    if (!onInvokeMethod.isAccessible()) {
        onInvokeMethod.setAccessible(true);
    }

    // 獲取方法參數(shù)
    Object[] params = invocation.getArguments();
    try {
        // 觸發(fā)oninvoke事件
        onInvokeMethod.invoke(onInvokeInst, params);
    } catch (InvocationTargetException e) {
        // 觸發(fā)onthrow事件
        fireThrowCallback(invoker, invocation, e.getTargetException());
    } catch (Throwable e) {
        fireThrowCallback(invoker, invocation, e);
    }
}
2.4.3 調(diào)用結(jié)果處理

方法調(diào)用完成后,會回到postProcessResult()方法:

@Override
public Result postProcessResult(Result result, Invoker<?> invoker, Invocation invocation) {

    // 如果是異步調(diào)用,返回結(jié)果會被封裝成AsyncRpcResult類型的對象,具體在哪里封裝的,后面會講到
    if (result instanceof AsyncRpcResult) {
        AsyncRpcResult asyncResult = (AsyncRpcResult) result;
        asyncResult.thenApplyWithContext(r -> {
            asyncCallback(invoker, invocation, r);
            return r;
        });
        return asyncResult;
    } else {
        syncCallback(invoker, invocation, result);
        return result;
    }
}

syncCallback和asyncCallback里面的邏輯比較簡單,就是根據(jù)方法是正常返回還是拋異常,觸發(fā)對應(yīng)的事件??梢钥吹?,如果被調(diào)用方法是同步的,則這兩個事件也是同步的,反之亦然。

2.4.4 方法調(diào)用核心過程

在postProcessResult()方法中,第一個參數(shù)是invoker.invoke(invocation),這里就會走到下一個Filter鏈完成filter鏈的處理,最終調(diào)到原始服務(wù),走到DubboInvoker#doInvoke方法:

protected Result doInvoke(final Invocation invocation) throws Throwable {
    ...
    try {
        // 讀取async配置
        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
        // 讀取future_generated/future_returntype配置,還沒搞明白是干啥的
        boolean isAsyncFuture = RpcUtils.isGeneratedFuture(inv) || RpcUtils.isFutureReturnType(inv);
        // 讀取return配置
        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        if (isOneway) {
            // 如果配置return="true",future對象就直接設(shè)置為null
            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
            currentClient.send(inv, isSent);
            RpcContext.getContext().setFuture(null);
            return new RpcResult();
        } else if (isAsync) {
            // 如果配置async="true",構(gòu)建future對象
            ResponseFuture future = currentClient.request(inv, timeout);
            // For compatibility
            FutureAdapter<Object> futureAdapter = new FutureAdapter<>(future);
            RpcContext.getContext().setFuture(futureAdapter);

            // 同時將返回結(jié)果包裝為AsyncResult對象
            Result result;
            if (isAsyncFuture) {
                // register resultCallback, sometimes we need the asyn result being processed by the filter chain.
                result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
            } else {
                result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false);
            }
            return result;
        } else {
            // 否則就是同步調(diào)用,future當(dāng)然也是null
            RpcContext.getContext().setFuture(null);
            return (Result) currentClient.request(inv, timeout).get();
        }
    }
    ...
}

通過這個過程不難發(fā)現(xiàn),不管是同步調(diào)用還是異步調(diào)用,最終都會走到ExchangeClient#send方法,再往下會走到HeaderExchangeChannel#request方法,這個一個異步方法,返回ResponseFuture對象。

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

看到這里我才恍然大悟,原來dubbo中同步調(diào)用也是通過異步調(diào)用來實(shí)現(xiàn),只是同步調(diào)用發(fā)起后,直接調(diào)用future#get的方法來同步等待結(jié)果的返回,而異步調(diào)用只返回Future Response,在用戶需要關(guān)心其結(jié)果時才調(diào)用get方法。

參考:
http://dubbo.apache.org/zh-cn/blog/dubbo-invoke.html
http://dubbo.apache.org/zh-cn/blog/dubbo-new-async.html
https://mp.weixin.qq.com/s?__biz=MzUzNTY4NTYxMA==&mid=2247484959&idx=5&sn=654b4ae76e76ac1a436f2ceb1774f4a6&chksm=fa80f69acdf77f8c5371d4a929557d6ec7fba3020c3bbb1490f0835218e8aef7af285e91d097&scene=21#wechat_redirect

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,663評論 19 139
  • 先看官網(wǎng)兩張圖【引用來自官網(wǎng)】:image.png 官網(wǎng)說明: 1.首先 ReferenceConfig 類的 i...
    致慮閱讀 1,094評論 0 2
  • 1.ios高性能編程 (1).內(nèi)層 最小的內(nèi)層平均值和峰值(2).耗電量 高效的算法和數(shù)據(jù)結(jié)構(gòu)(3).初始化時...
    歐辰_OSR閱讀 30,265評論 8 265
  • 山外青天天愈藍(lán), 風(fēng)追落花花漸遠(yuǎn)。 玉手輕裙翩翩舞, 疑是天降采花仙。
    遠(yuǎn)山一夢閱讀 435評論 2 2
  • 今天呢我想跟大家說說我在工作中遇見的三個人。這三個人呢在我職業(yè)生涯的不同階段,以不同的方式給予了極大的鼓勵和啟示。...
    天涯咫尺之間閱讀 284評論 0 0

友情鏈接更多精彩內(nèi)容