Future
在java 8之前,我們可以使用Callable+Future來異步執(zhí)行任務和獲取結果,比如
ExecutorService service = new ThreadPoolExecutor(5,5,0, TimeUnit.SECONDS,new ArrayBlockingQueue<>(100));
Future<String> f = service.submit(()->{
Thread.sleep(200);
return "helloWorld";
}
);
System.out.println(f.get(300,TimeUnit.MILLISECONDS));
其獲取結果,get方法實現(xiàn)本質是輪詢校驗結果狀態(tài)積,阻塞實現(xiàn)依賴的是LockSupport.park()方法。
那么在dubbo交給Apache進行孵化之前的版本中,比如2.6.1版本中,其異步調用機制ResponseFuture的實現(xiàn)就借鑒了jdk的Future的模式,以DubboInvoker#doInvoke方法為例
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
可以看到,同步與異步的本質區(qū)別就是調用get()方法的時機不同,同步調用的話,請求的同時由dubbo線程直接調用get方法阻塞,獲取結果;而異步調用,dubbo直接返回RpcResult,后續(xù)由業(yè)務線程再來調用get方法獲取結果。
dubbo雖然借鑒了jdk的Future,但是代碼全部是自己寫的,以DefaultFuture#get()為例
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
if (isDone() || System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
可以看到,dubbo的DefaultFuture實現(xiàn),主要依賴lock+condition的模式,不是jdk Future的LockSupport.park()模式。
這種模式的缺點有很多,最大的缺點就是結果獲取是阻塞的。
CompletableFuture
在java 8之后,jdk引入了CompletableFuture類,可以看到其實現(xiàn)了Future和CompletionStage,所以我們可以繼續(xù)像使用Future一樣使用CompletableFuture。
public class CompletableFuture<T> implements Future<T>, CompletionStage<T>
那么CompletionStage 是做什么的呢,用類文件注釋的第一句話說,其代表一種異步階段,執(zhí)行一些行為或者計算,執(zhí)行完畢后,會觸發(fā)其他CompletionStage的執(zhí)行。
A stage of a possibly asynchronous computation, that performs an
action or computes a value when another CompletionStage completes.
相較于Future,CompletableFuture提供的很多新特性都依賴與這個CompletionStage,這里主要介紹其在dubbo異步調用中的應用,其他特性不多介紹,重點介紹下其回調機制,先看用法
CompletableFuture<String> f = new CompletableFuture();
try {
f.whenComplete((v,t)->{
if(t!=null){
System.out.println("Exception");
}else{
System.out.println(v);
}
});
f.complete("HelloWorld");
當CompletableFuture拿到結果的時候,會回調whenComplete方法注冊的回調邏輯,其核心實現(xiàn)見CompletableFuture#postComplete, 用注釋的話說,每一步,這個stack會pop and run?;卣{也是基于此實現(xiàn)(Doug Lea大神的作品不是簡單能說明白的,后續(xù)再開一文研究)
/**
* Pops and tries to trigger all reachable dependents. Call only
* when known to be done.
*/
final void postComplete() {
/*
* On each step, variable f holds current dependents to pop
* and run. It is extended along only one path at a time,
* pushing others to avoid unbounded recursion.
*/
CompletableFuture<?> f = this; Completion h;
while ((h = f.stack) != null ||
(f != this && (h = (f = this).stack) != null)) {
CompletableFuture<?> d; Completion t;
if (f.casStack(h, t = h.next)) {
if (t != null) {
if (f != this) {
pushStack(h);
continue;
}
h.next = null; // detach
}
f = (d = h.tryFire(NESTED)) == null ? this : d;
}
}
}
那么dubbo的異步調用是怎么利用這個回調機制的呢?見DubboInvoker#doInvoke (2.7.3版本)
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
AsyncRpcResult asyncRpcResult = new AsyncRpcResult(inv);
CompletableFuture<Object> responseFuture = currentClient.request(inv, timeout);
asyncRpcResult.subscribeTo(responseFuture);
// save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
FutureContext.getContext().setCompatibleFuture(responseFuture);
return asyncRpcResult;
}
之前2.6.1版本中,同步異步的區(qū)別是誰來調get()方法,那么在2.7.3版本,DubboInvoker對同步異步調用的處理直接統(tǒng)一了,都會返回一個AsyncRpcResult, 這個AsyncRpcResult本身就繼承自CompletableFuture,同時其會subscribe一個響應的CompletableFuture,這里就有了兩個CompletableFuture;那么subscribe做了什么呢?
public void subscribeTo(CompletableFuture<?> future) {
future.whenComplete((obj, t) -> {
if (t != null) {
this.completeExceptionally(t);
} else {
this.complete((Result) obj);
}
});
}
subscribe會對響應CompletableFuture注冊了一個回調,響應完成時,觸發(fā)這個回調;這個回調邏輯就是執(zhí)行AsyncRpcResult自身的complete方法,那么如果AsyncRpcResult也有注冊回調,此時就會被鏈式觸發(fā)。
新版本的dubbo既然在DubboInvoker這里對于同步異步的處理是一樣的,都是直接返回一個AsyncRpcResult,那么對于我們使用者來說,怎么來區(qū)別同步和異步呢?其實關鍵就在于怎么用這個AsyncRpcResult。如果我們拿到AsyncRpcResult直接get,可以認為這就是同步調用,如果我們拿到AsyncRpcResult,不去調用get,而是去注冊一個回調函數(shù),等待鏈式觸發(fā),用回調的方式拿結果,那么這就是異步。
總結:老版本dubbo的異步調用可以認為是假異步,因為結果的獲取是阻塞的,新版本隨著jdk引入CompletableFuture,由于回調機制的存在,我們業(yè)務代碼使用dubbo時候,也可以注冊回調,實現(xiàn)真正的異步非阻塞。