Netty 中大量 I/O 操作都是異步執(zhí)行,本篇博文來(lái)聊聊 Netty 中的異步編程。
Java Future 提供的異步模型
JDK 5 引入了 Future 模式。Future 接口是 Java 多線程 Future 模式的實(shí)現(xiàn),在 java.util.concurrent包中,可以來(lái)進(jìn)行異步計(jì)算。
對(duì)于異步編程,我們想要的實(shí)現(xiàn)是:提交一個(gè)任務(wù),在任務(wù)執(zhí)行期間提交者可以做別的事情,這個(gè)任務(wù)是在異步執(zhí)行的,當(dāng)任務(wù)執(zhí)行完畢通知提交者任務(wù)完成獲取結(jié)果。
那么在 Future 中是怎么實(shí)現(xiàn)的呢?我們先看接口定義:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
我們看一個(gè)示例:
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
Future<Integer> submit = executorService.submit(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
});
Integer value = null;
try {
value = submit.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(value);
System.out.println("end");
}
}
Futrue 的使用方式是:投遞一個(gè)任務(wù)到 Future 中執(zhí)行,操作完之后調(diào)用 Future#get() 或者 Future#isDone() 方法判斷是否執(zhí)行完畢。從這個(gè)邏輯上看, Future 提供的功能是:用戶線程需要主動(dòng)輪詢 Future 線程是否完成當(dāng)前任務(wù),如果不通過(guò)輪詢是否完成而是同步等待獲取則會(huì)阻塞直到執(zhí)行完畢為止。所以從這里看,F(xiàn)uture并不是真正的異步,因?yàn)樗倭艘粋€(gè)回調(diào),充其量只能算是一個(gè)同步非阻塞模式。
executorService.submit()方法獲取帶返回值的 Future 結(jié)果有兩種方式:
- 一種是通過(guò)實(shí)現(xiàn)
Callable接口; - 第二種是中間變量返回。繼承 Future 的子類: FutureTask,通過(guò) FutureTask 返回異步結(jié)果而不是在主線程中獲?。‵utureTask 本質(zhì)也是使用
Callable進(jìn)行創(chuàng)建)。
上面兩種方式的代碼就變?yōu)檫@樣:
public class FutureTest {
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(2);
System.out.println("start");
//方式1 通過(guò) executorService 提交一個(gè)異步線程
//Future<Integer> submit = executorService.submit(new NewCallableTask());
//方式2 通過(guò) FutureTask 包裝異步線程的返回,返回結(jié)果在 FutureTask 中獲取而不是 在提交線程中
FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
executorService.submit(task);
//-------------方式2--------------
Integer value = null;
try {
value = task.get();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(value);
System.out.println("end");
}
/**
* 通過(guò)實(shí)現(xiàn) Callable 接口
*/
static class NewCallableTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
}
}
一般在使用線程池創(chuàng)建線程執(zhí)行任務(wù)的時(shí)候會(huì)有兩種方式,要么實(shí)現(xiàn) Runnable 接口,要么實(shí)現(xiàn) Callable 接口,它們的區(qū)別在于:
- Callable 可以在任務(wù)結(jié)束的時(shí)候提供一個(gè)返回值,Runnable 無(wú)法提供這個(gè)功能;
- Callable 的 call 方法分可以拋出異常,而 Runnable 的 run 方法不能拋出異常。
而我們的異步返回自然是使用 Callable 方式。那么 Callable 是如何實(shí)現(xiàn)的呢?
從 Callable 被提交的地方入手:executorService.submit(task), ExecutorService 是一個(gè)接口,他的默認(rèn)實(shí)現(xiàn)類是:AbstractExecutorService,我們看這里的 submit()實(shí)現(xiàn)方式:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
可以看到將 Callable 又包裝成了 RunnableFuture。而這個(gè) RunnableFuture 就比較神奇,它同時(shí)繼承了 Runnable 和 Future ,既有線程的能力又有可攜帶返回值的功能。
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
所以再看 submit()方法,其實(shí)是將 RunnableFuture 線程送入線程池執(zhí)行,執(zhí)行是一個(gè)新線程,只是這個(gè)執(zhí)行的對(duì)象提供了 get()方法來(lái)獲取執(zhí)行結(jié)果。
那么 Callable 優(yōu)勢(shì)如何變?yōu)?RunnableFuture 的呢?我們看 newTaskFor(task)方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
將 Callable 包裝為 FutureTask 對(duì)象,看到這里又關(guān)聯(lián)到 FutureTask , :
public class FutureTask<V> implements RunnableFuture<V> {
}
可以看到 FutureTask 是 RunnableFuture 的子類,這也就解釋了上面的示例為什么在線程池中可以提交 FutureTask 實(shí)例。
更詳細(xì)的執(zhí)行過(guò)程這里就不再分析,重點(diǎn)剖析 Future 的實(shí)現(xiàn)過(guò)程,它并不是真正的異步,沒(méi)有實(shí)現(xiàn)回調(diào)。所以在Java8 中又新增了一個(gè)真正的異步函數(shù):CompletableFuture。
CompletableFuture 非阻塞異步編程模型
Java 8 中新增加了一個(gè)類:CompletableFuture,它提供了非常強(qiáng)大的 Future 的擴(kuò)展功能,最重要的是實(shí)現(xiàn)了回調(diào)的功能。
使用示例:
public class CallableFutureTest {
public static void main(String[] args) {
System.out.println("start");
/**
* 異步非阻塞
*/
CompletableFuture.runAsync(() -> {
try {
Thread.sleep(3000);
System.out.println("sleep done");
} catch (InterruptedException e) {
e.printStackTrace();
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("done");
}
}
CompletableFuture.runAsync()方法提供了異步執(zhí)行無(wú)返回值任務(wù)的功能。
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// do something
return "result";
}, executorService);
CompletableFuture.supplyAsync()方法提供了異步執(zhí)行有返回值任務(wù)的功能。
CompletableFuture源碼中有四個(gè)靜態(tài)方法用來(lái)執(zhí)行異步任務(wù):
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}
public static CompletableFuture<Void> runAsync(Runnable runnable){..}
public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}
前面兩個(gè)可以看到是帶返回值的方法,后面兩個(gè)是不帶返回值的方法。同時(shí)支持傳入自定義的線程池,如果不傳入線程池的話默認(rèn)是使用 ForkJoinPool.commonPool()作為它的線程池執(zhí)行異步代碼。
合并兩個(gè)異步任務(wù)
如果有兩個(gè)任務(wù)需要異步執(zhí)行,且后面需要對(duì)這兩個(gè)任務(wù)的結(jié)果進(jìn)行合并處理,CompletableFuture 也支持這種處理:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
return task1 + task2; // return "Task1Task2" String
});
通過(guò) CompletableFuture.thenCombineAsync()方法獲取兩個(gè)任務(wù)的結(jié)果然后進(jìn)行相應(yīng)的操作。
下一個(gè)依賴上一個(gè)的結(jié)果
如果第二個(gè)任務(wù)依賴第一個(gè)任務(wù)的結(jié)果:
ExecutorService executorService = Executors.newFixedThreadPool(100);
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
return CompletableFuture.supplyAsync(() -> {
return task1 + "Task2"; // return "Task1Task2" String
});
}, executorService);
CompletableFuture.thenComposeAsync()支持將第一個(gè)任務(wù)的結(jié)果傳入第二個(gè)任務(wù)中。
常用 API 介紹
- 拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,上一個(gè)任務(wù)完成后的動(dòng)作
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
上面四個(gè)方法表示在當(dāng)前階段任務(wù)完成之后下一步要做什么。whenComplete 表示在當(dāng)前線程內(nèi)繼續(xù)做下一步,帶 Async 后綴的表示使用新線程去執(zhí)行。
-
拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,使用 handler 來(lái)處理邏輯,可以返回與第一階段處理的返回類型不一樣的返回類型。
public <U> CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn) public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)Handler 與 whenComplete 的區(qū)別是 handler 是可以返回一個(gè)新的 CompletableFuture 類型的。
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> { return "hahaha"; }).handle((r, e) -> { return 1; }); -
拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作, thenApply方法
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn) public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)注意到 thenApply 方法的參數(shù)中是沒(méi)有 Throwable,這就意味著如有有異常就會(huì)立即失敗,不能在處理邏輯內(nèi)處理。且 thenApply 返回的也是新的 CompletableFuture。 這就是它與前面兩個(gè)的區(qū)別。
-
拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,可以不返回任何值,thenAccept方法
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)看這里的示例:
CompletableFuture.supplyAsync(() -> { return "result"; }).thenAccept(r -> { System.out.println(r); }).thenAccept(r -> { System.out.println(r); });執(zhí)行完畢是不會(huì)返回任何值的。
CompletableFuture 的特性提現(xiàn)在執(zhí)行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能夠?qū)⒒卣{(diào)放到與任務(wù)不同的線程中執(zhí)行,也能將回調(diào)作為繼續(xù)執(zhí)行的同步函數(shù),在與任務(wù)相同的線程中執(zhí)行。它避免了傳統(tǒng)回調(diào)最大的問(wèn)題,那就是能夠?qū)⒖刂屏鞣蛛x到不同的事件處理器中。
另外當(dāng)你依賴 CompletableFuture 的計(jì)算結(jié)果才能進(jìn)行下一步的時(shí)候,無(wú)需手動(dòng)判斷當(dāng)前計(jì)算是否完成,可以通過(guò) CompletableFuture 的事件監(jiān)聽(tīng)自動(dòng)去完成。
Netty 中的異步編程
說(shuō) Netty 中的異步編程之前先說(shuō)一個(gè)異步編程模型:Future/Promise異步模型。
future和promise起源于函數(shù)式編程和相關(guān)范例(如邏輯編程 ),目的是將值(future)與其計(jì)算方式(promise)分離,從而允許更靈活地進(jìn)行計(jì)算,特別是通過(guò)并行化。
Future 表示目標(biāo)計(jì)算的返回值,Promise 表示計(jì)算的方式,這個(gè)模型將返回結(jié)果和計(jì)算邏輯分離,目的是為了讓計(jì)算邏輯不影響返回結(jié)果,從而抽象出一套異步編程模型。那計(jì)算邏輯如何與結(jié)果關(guān)聯(lián)呢?它們之間的紐帶就是 callback。
在 Netty 中的異步編程就是基于該模型來(lái)實(shí)現(xiàn)。Netty 中非常多的異步調(diào)用,最簡(jiǎn)單的例子就是我們 Server 和 Client 端啟動(dòng)的例子:
Server:

Client:

Netty 中使用了一個(gè) ChannelFuture 來(lái)實(shí)現(xiàn)異步操作,看似與 Java 中的 Future 相似,我們看一下代碼:
public interface ChannelFuture extends Future<Void> {
}
這里 ChannelFuture 繼承了一個(gè) Future,這是 Java 中的 Future 嗎?跟下去發(fā)現(xiàn)并不是 JDK 的,而是 Netty 自己實(shí)現(xiàn)的。該類位于:io.netty.util.concurrent包中:
public interface Future<V> extends java.util.concurrent.Future<V> {
// 只有IO操作完成時(shí)才返回true
boolean isSuccess();
// 只有當(dāng)cancel(boolean)成功取消時(shí)才返回true
boolean isCancellable();
// IO操作發(fā)生異常時(shí),返回導(dǎo)致IO操作以此的原因,如果沒(méi)有異常,返回null
Throwable cause();
// 向Future添加事件,future完成時(shí),會(huì)執(zhí)行這些事件,如果add時(shí)future已經(jīng)完成,會(huì)立即執(zhí)行監(jiān)聽(tīng)事件
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 移除監(jiān)聽(tīng)事件,future完成時(shí),不會(huì)觸發(fā)
Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
// 等待future done
Future<V> sync() throws InterruptedException;
// 等待future done,不可打斷
Future<V> syncUninterruptibly();
// 等待future完成
Future<V> await() throws InterruptedException;
// 等待future 完成,不可打斷
Future<V> awaitUninterruptibly();
boolean await(long timeout, TimeUnit unit) throws InterruptedException;
boolean await(long timeoutMillis) throws InterruptedException;
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
boolean awaitUninterruptibly(long timeoutMillis);
// 立刻獲得結(jié)果,如果沒(méi)有完成,返回null
V getNow();
// 如果成功取消,future會(huì)失敗,導(dǎo)致CancellationException
@Override
boolean cancel(boolean mayInterruptIfRunning);
}
Netty 自己實(shí)現(xiàn)的 Future 繼承了 JDK 的 Future,新增了 sync() 和await() 用于阻塞等待,還加了 Listeners,只要任務(wù)結(jié)束去回調(diào) Listener 就可以了,那么我們就不一定要主動(dòng)調(diào)用 isDone()來(lái)獲取狀態(tài),或通過(guò) get()阻塞方法來(lái)獲取值。
Netty的 Future 與 Java 的 Future 雖然類名相同,但功能上略有不同,Netty 中引入了 Promise 機(jī)制。在 Java 的 Future 中,業(yè)務(wù)邏輯為一個(gè) Callable 或 Runnable 實(shí)現(xiàn)類,該類的 call()或 run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié),在 Promise 機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。
public interface Promise<V> extends Future<V> {
// 設(shè)置future執(zhí)行結(jié)果為成功
Promise<V> setSuccess(V result);
// 嘗試設(shè)置future執(zhí)行結(jié)果為成功,返回是否設(shè)置成功
boolean trySuccess(V result);
// 設(shè)置失敗
Promise<V> setFailure(Throwable cause);
// 嘗試設(shè)置future執(zhí)行結(jié)果為失敗,返回是否設(shè)置成功
boolean tryFailure(Throwable cause);
// 設(shè)置為不能取消
boolean setUncancellable();
// 源碼中,以下為覆蓋了Future的方法,例如;
Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
@Override
Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}
Promise 接口繼承自 Future 接口,重點(diǎn)添加了上述幾個(gè)方法,可以人工設(shè)置 future 的執(zhí)行成功與失敗,并通知所有監(jiān)聽(tīng)的 listener。
從 Future 和 Promise 提供的方法來(lái)看,F(xiàn)uture 都是 get 類型的方法,主要用來(lái)判斷當(dāng)前任務(wù)的狀態(tài)。而 Promise 中是 set 類型的方法,主要來(lái)對(duì)任務(wù)的狀態(tài)來(lái)進(jìn)行操作。這里就體現(xiàn)出來(lái)將 結(jié)果和操作過(guò)程分離的設(shè)計(jì)。
Promise 實(shí)現(xiàn)類是DefaultPromise類,該類十分重要,F(xiàn)uture 的 listener 機(jī)制也是由它實(shí)現(xiàn)的,所以我們先來(lái)分析一下該類。先來(lái)看一下它的重要屬性:
// 可以嵌套的Listener的最大層數(shù),可見(jiàn)最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 異步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 異步操作失敗時(shí)保存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
new CancellationException(), DefaultPromise.class, "cancel(...)"));
第一個(gè)套 listener,是指在 listener 的 operationComplete() 方法中,可以再次使用 future.addListener() 繼續(xù)添加 listener,Netty 限制的最大層數(shù)是8,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth設(shè)置。
為了更好的說(shuō)明,先寫了一個(gè)示例,Netty 中的 Future/Promise模型是可以單獨(dú)拿出來(lái)使用的。
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;
import java.util.concurrent.TimeUnit;
/**
* @author rickiyang
* @date 2020-04-19
* @Desc TODO
*/
public class PromiseTest {
public static void main(String[] args) {
PromiseTest testPromise = new PromiseTest();
Promise<String> promise = testPromise.doSomething("哈哈");
promise.addListener(future -> System.out.println(promise.get()+", something is done"));
}
/**
* 創(chuàng)建一個(gè)DefaultPromise并返回,將業(yè)務(wù)邏輯放入線程池中執(zhí)行
* @param value
* @return
*/
private Promise<String> doSomething(String value) {
NioEventLoopGroup loop = new NioEventLoopGroup();
DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
loop.schedule(() -> {
try {
Thread.sleep(1000);
promise.setSuccess("執(zhí)行成功。" + value);
return promise;
} catch (InterruptedException ignored) {
promise.setFailure(ignored);
}
return promise;
}, 0, TimeUnit.SECONDS);
return promise;
}
}
通過(guò)這個(gè)例子可以看到,Promise 能夠在業(yè)務(wù)邏輯線程中通知 Future 成功或失敗,由于 Promise 繼承了 Netty 的 Future,因此可以加入監(jiān)聽(tīng)事件。而 Future 和 Promise 的好處在于,獲取到 Promise 對(duì)象后可以為其設(shè)置異步調(diào)用完成后的操作,然后立即繼續(xù)去做其他任務(wù)。
來(lái)看一下 addListener() 方法:
@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
checkNotNull(listener, "listener");
//并發(fā)控制,保證多線程情況下只有一個(gè)線程執(zhí)行添加操作
synchronized (this) {
addListener0(listener);
}
// 操作完成,通知監(jiān)聽(tīng)者
if (isDone()) {
notifyListeners();
}
return this;
}
private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
if (listeners == null) {
listeners = listener;
} else if (listeners instanceof DefaultFutureListeners) {
// 如果當(dāng)前Promise實(shí)例持有l(wèi)isteners的是DefaultFutureListeners類型,則調(diào)用它的add()方法進(jìn)行添加
((DefaultFutureListeners) listeners).add(listener);
} else {
// 步入這里說(shuō)明當(dāng)前Promise實(shí)例持有l(wèi)isteners為單個(gè)GenericFutureListener實(shí)例,需要轉(zhuǎn)換為DefaultFutureListeners實(shí)例
listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
}
}
這里看到有一個(gè)全局變量listeners,我們看到他的定義:
private Object listeners;
為啥會(huì)是一個(gè) Object 類型的對(duì)象呢,不是應(yīng)該是 List 或者是數(shù)組才對(duì)嘛。Netty之所以這樣設(shè)計(jì),是因?yàn)榇蠖鄶?shù)情況下 listener 只有一個(gè),用集合和數(shù)組都會(huì)造成浪費(fèi)。當(dāng)只有一個(gè) listener 時(shí),該字段為一個(gè) GenericFutureListener 對(duì)象;當(dāng)多于一個(gè) listener 時(shí),該字段為 DefaultFutureListeners ,可以儲(chǔ)存多個(gè) listener。
我們?cè)賮?lái)看 notifyListeners() 方法:
private void notifyListeners() {
EventExecutor executor = executor();
//當(dāng)前EventLoop線程需要檢查listener嵌套
if (executor.inEventLoop()) {
final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
//這里是當(dāng)前l(fā)istener的嵌套層數(shù)
final int stackDepth = threadLocals.futureListenerStackDepth();
if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
threadLocals.setFutureListenerStackDepth(stackDepth + 1);
try {
notifyListenersNow();
} finally {
threadLocals.setFutureListenerStackDepth(stackDepth);
}
return;
}
}
//外部線程直接提交給新線程執(zhí)行
safeExecute(executor, new Runnable() {
@Override
public void run() {
notifyListenersNow();
}
});
}
這里有個(gè)疑問(wèn)就是為什么要設(shè)置當(dāng)前的調(diào)用棧深度+1。
接著看真正執(zhí)行通知的方法:
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// 正在通知或已沒(méi)有監(jiān)聽(tīng)者(外部線程刪除)直接返回
if (notifyingListeners || this.listeners == null) {
return;
}
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
//只有一個(gè)listener
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
//有多個(gè)listener
notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// 執(zhí)行完畢且外部線程沒(méi)有再添加監(jiān)聽(tīng)者
notifyingListeners = false;
return;
}
//外部線程添加了新的監(jiān)聽(tīng)者繼續(xù)執(zhí)行
listeners = this.listeners;
this.listeners = null;
}
}
}
Netty 中 DefalutPromise 是一個(gè)非常常用的類,這是 Promise 實(shí)現(xiàn)的基礎(chǔ)。DefaultChannelPromise DefalutPromise 的子類,加入了 channel 這個(gè)屬性。
Promise 目前支持兩種類型的監(jiān)聽(tīng)器:
- GenericFutureListener:支持泛型的 Future ;
- GenericProgressiveFutureListener:它是
GenericFutureListener的子類,支持進(jìn)度表示和支持泛型的Future 監(jiān)聽(tīng)器(有些場(chǎng)景需要多個(gè)步驟實(shí)現(xiàn),類似于進(jìn)度條那樣)。
為了讓 Promise 支持多個(gè)監(jiān)聽(tīng)器,Netty 添加了一個(gè)默認(rèn)修飾符修飾的DefaultFutureListeners類用于保存監(jiān)聽(tīng)器實(shí)例數(shù)組:
final class DefaultFutureListeners {
private GenericFutureListener<? extends Future<?>>[] listeners;
private int size;
private int progressiveSize; // the number of progressive listeners
// 這個(gè)構(gòu)造相對(duì)特別,是為了讓Promise中的listeners(Object類型)實(shí)例由單個(gè)GenericFutureListener實(shí)例轉(zhuǎn)換為DefaultFutureListeners類型
@SuppressWarnings("unchecked")
DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
listeners = new GenericFutureListener[2];
listeners[0] = first;
listeners[1] = second;
size = 2;
if (first instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
if (second instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void add(GenericFutureListener<? extends Future<?>> l) {
GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
final int size = this.size;
// 注意這里,每次擴(kuò)容數(shù)組長(zhǎng)度是原來(lái)的2倍
if (size == listeners.length) {
this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
}
// 把當(dāng)前的GenericFutureListener加入數(shù)組中
listeners[size] = l;
// 監(jiān)聽(tīng)器總數(shù)量加1
this.size = size + 1;
// 如果為GenericProgressiveFutureListener,則帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量加1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize ++;
}
}
public void remove(GenericFutureListener<? extends Future<?>> l) {
final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
int size = this.size;
for (int i = 0; i < size; i ++) {
if (listeners[i] == l) {
// 計(jì)算需要需要移動(dòng)的監(jiān)聽(tīng)器的下標(biāo)
int listenersToMove = size - i - 1;
if (listenersToMove > 0) {
// listenersToMove后面的元素全部移動(dòng)到數(shù)組的前端
System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
}
// 當(dāng)前監(jiān)聽(tīng)器總量的最后一個(gè)位置設(shè)置為null,數(shù)量減1
listeners[-- size] = null;
this.size = size;
// 如果監(jiān)聽(tīng)器是GenericProgressiveFutureListener,則帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量減1
if (l instanceof GenericProgressiveFutureListener) {
progressiveSize --;
}
return;
}
}
}
// 返回監(jiān)聽(tīng)器實(shí)例數(shù)組
public GenericFutureListener<? extends Future<?>>[] listeners() {
return listeners;
}
// 返回監(jiān)聽(tīng)器總數(shù)量
public int size() {
return size;
}
// 返回帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量
public int progressiveSize() {
return progressiveSize;
}
}
以上就是關(guān)于 Promise 和監(jiān)聽(tīng)器相關(guān)的實(shí)現(xiàn)分析,再回到之前的啟動(dòng)類,是不是還有一個(gè) sync() 方法:
@Override
public Promise<V> sync() throws InterruptedException {
await();
rethrowIfFailed();
return this;
}
public Promise<V> await() throws InterruptedException {
// 異步操作已經(jīng)完成,直接返回
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
// 死鎖檢測(cè)
checkDeadLock();
// 同步使修改waiters的線程只有一個(gè)
synchronized (this) {
while (!isDone()) { // 等待直到異步操作完成
incWaiters(); // ++waiters;
try {
wait(); // JDK方法
} finally {
decWaiters(); // --waiters
}
}
}
return this;
}
這里其實(shí)就是一個(gè)同步檢測(cè)當(dāng)前事件是否完成的過(guò)程。
以上就是 Netty 中實(shí)現(xiàn)的 Future/Promise 異步回調(diào)機(jī)制。實(shí)現(xiàn)并不是很難懂,代碼很值得學(xué)習(xí)。除了 Netty 中實(shí)現(xiàn)了 Future/Promise模型,在Guava中也有相關(guān)的實(shí)現(xiàn),大家日常使用可以看習(xí)慣引用相關(guān)的包。
Guava實(shí)現(xiàn):
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>21.0</version>
</dependency>
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(5);
return 100;
}
});
Futures.addCallback(future, new FutureCallback<Integer>() {
public void onSuccess(Integer result) {
System.out.println("success:" + result);
}
public void onFailure(Throwable throwable) {
System.out.println("fail, e = " + throwable);
}
});
Thread.currentThread().join();