Netty 中的異步編程Future 和 promise

Netty 中大量 I/O 操作都是異步執(zhí)行,本篇博文來(lái)聊聊 Netty 中的異步編程。

Java Future 提供的異步模型

JDK 5 引入了 Future 模式。Future 接口是 Java 多線程 Future 模式的實(shí)現(xiàn),在 java.util.concurrent包中,可以來(lái)進(jìn)行異步計(jì)算。

對(duì)于異步編程,我們想要的實(shí)現(xiàn)是:提交一個(gè)任務(wù),在任務(wù)執(zhí)行期間提交者可以做別的事情,這個(gè)任務(wù)是在異步執(zhí)行的,當(dāng)任務(wù)執(zhí)行完畢通知提交者任務(wù)完成獲取結(jié)果。

那么在 Future 中是怎么實(shí)現(xiàn)的呢?我們先看接口定義:

public interface Future<V> {

    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

我們看一個(gè)示例:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        Future<Integer> submit = executorService.submit(() -> {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        });
        Integer value = null;
        try {
            value = submit.get();
        } catch (Exception e) {
            e.printStackTrace();
        } 
        System.out.println(value);
        System.out.println("end");
        
    }
}

Futrue 的使用方式是:投遞一個(gè)任務(wù)到 Future 中執(zhí)行,操作完之后調(diào)用 Future#get() 或者 Future#isDone() 方法判斷是否執(zhí)行完畢。從這個(gè)邏輯上看, Future 提供的功能是:用戶線程需要主動(dòng)輪詢 Future 線程是否完成當(dāng)前任務(wù),如果不通過(guò)輪詢是否完成而是同步等待獲取則會(huì)阻塞直到執(zhí)行完畢為止。所以從這里看,F(xiàn)uture并不是真正的異步,因?yàn)樗倭艘粋€(gè)回調(diào),充其量只能算是一個(gè)同步非阻塞模式。

executorService.submit()方法獲取帶返回值的 Future 結(jié)果有兩種方式:

  1. 一種是通過(guò)實(shí)現(xiàn) Callable接口;
  2. 第二種是中間變量返回。繼承 Future 的子類: FutureTask,通過(guò) FutureTask 返回異步結(jié)果而不是在主線程中獲?。‵utureTask 本質(zhì)也是使用 Callable 進(jìn)行創(chuàng)建)。

上面兩種方式的代碼就變?yōu)檫@樣:

public class FutureTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        System.out.println("start");
        //方式1 通過(guò) executorService 提交一個(gè)異步線程
        //Future<Integer> submit = executorService.submit(new NewCallableTask());

        //方式2 通過(guò) FutureTask 包裝異步線程的返回,返回結(jié)果在 FutureTask 中獲取而不是 在提交線程中
        FutureTask<Integer> task = new FutureTask<>(new NewCallableTask());
        executorService.submit(task);
        //-------------方式2--------------

        Integer value = null;
        try {
            value = task.get();
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println(value);
        System.out.println("end");

    }

    /**
     * 通過(guò)實(shí)現(xiàn) Callable 接口
     */
     static class NewCallableTask implements Callable<Integer> {

        @Override
        public Integer call() throws Exception {
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return 1;
        }
    }

}

一般在使用線程池創(chuàng)建線程執(zhí)行任務(wù)的時(shí)候會(huì)有兩種方式,要么實(shí)現(xiàn) Runnable 接口,要么實(shí)現(xiàn) Callable 接口,它們的區(qū)別在于:

  1. Callable 可以在任務(wù)結(jié)束的時(shí)候提供一個(gè)返回值,Runnable 無(wú)法提供這個(gè)功能;
  2. Callable 的 call 方法分可以拋出異常,而 Runnable 的 run 方法不能拋出異常。

而我們的異步返回自然是使用 Callable 方式。那么 Callable 是如何實(shí)現(xiàn)的呢?

從 Callable 被提交的地方入手:executorService.submit(task), ExecutorService 是一個(gè)接口,他的默認(rèn)實(shí)現(xiàn)類是:AbstractExecutorService,我們看這里的 submit()實(shí)現(xiàn)方式:

public <T> Future<T> submit(Callable<T> task) {
  if (task == null) throw new NullPointerException();
  RunnableFuture<T> ftask = newTaskFor(task);
  execute(ftask);
  return ftask;
}

可以看到將 Callable 又包裝成了 RunnableFuture。而這個(gè) RunnableFuture 就比較神奇,它同時(shí)繼承了 Runnable 和 Future ,既有線程的能力又有可攜帶返回值的功能。

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

所以再看 submit()方法,其實(shí)是將 RunnableFuture 線程送入線程池執(zhí)行,執(zhí)行是一個(gè)新線程,只是這個(gè)執(zhí)行的對(duì)象提供了 get()方法來(lái)獲取執(zhí)行結(jié)果。

那么 Callable 優(yōu)勢(shì)如何變?yōu)?RunnableFuture 的呢?我們看 newTaskFor(task)方法:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
  return new FutureTask<T>(callable);
}

將 Callable 包裝為 FutureTask 對(duì)象,看到這里又關(guān)聯(lián)到 FutureTask , :

public class FutureTask<V> implements RunnableFuture<V> {
  
}

可以看到 FutureTask 是 RunnableFuture 的子類,這也就解釋了上面的示例為什么在線程池中可以提交 FutureTask 實(shí)例。

更詳細(xì)的執(zhí)行過(guò)程這里就不再分析,重點(diǎn)剖析 Future 的實(shí)現(xiàn)過(guò)程,它并不是真正的異步,沒(méi)有實(shí)現(xiàn)回調(diào)。所以在Java8 中又新增了一個(gè)真正的異步函數(shù):CompletableFuture。

CompletableFuture 非阻塞異步編程模型

Java 8 中新增加了一個(gè)類:CompletableFuture,它提供了非常強(qiáng)大的 Future 的擴(kuò)展功能,最重要的是實(shí)現(xiàn)了回調(diào)的功能。

使用示例:

public class CallableFutureTest {

        
    public static void main(String[] args) {
        System.out.println("start");
        /**
         * 異步非阻塞
         */
        CompletableFuture.runAsync(() -> {
            try {
                Thread.sleep(3000);
                System.out.println("sleep done");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        
        try {
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        System.out.println("done");
    }
}

CompletableFuture.runAsync()方法提供了異步執(zhí)行無(wú)返回值任務(wù)的功能。

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    // do something
    return "result";
}, executorService);

CompletableFuture.supplyAsync()方法提供了異步執(zhí)行有返回值任務(wù)的功能。

CompletableFuture源碼中有四個(gè)靜態(tài)方法用來(lái)執(zhí)行異步任務(wù):

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier){..}

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor){..}

public static CompletableFuture<Void> runAsync(Runnable runnable){..}

public static CompletableFuture<Void> runAsync(Runnable runnable,
Executor executor){..}

前面兩個(gè)可以看到是帶返回值的方法,后面兩個(gè)是不帶返回值的方法。同時(shí)支持傳入自定義的線程池,如果不傳入線程池的話默認(rèn)是使用 ForkJoinPool.commonPool()作為它的線程池執(zhí)行異步代碼。

合并兩個(gè)異步任務(wù)

如果有兩個(gè)任務(wù)需要異步執(zhí)行,且后面需要對(duì)這兩個(gè)任務(wù)的結(jié)果進(jìn)行合并處理,CompletableFuture 也支持這種處理:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
    return "Task2";
}, executorService);
CompletableFuture<String> future = future1.thenCombineAsync(future2, (task1, task2) -> {
    return task1 + task2; // return "Task1Task2" String
});

通過(guò) CompletableFuture.thenCombineAsync()方法獲取兩個(gè)任務(wù)的結(jié)果然后進(jìn)行相應(yīng)的操作。

下一個(gè)依賴上一個(gè)的結(jié)果

如果第二個(gè)任務(wù)依賴第一個(gè)任務(wù)的結(jié)果:

ExecutorService executorService = Executors.newFixedThreadPool(100);

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
    return "Task1";
}, executorService);
CompletableFuture<String> future = future1.thenComposeAsync(task1 -> {
    return CompletableFuture.supplyAsync(() -> {
        return task1 + "Task2"; // return "Task1Task2" String
    });
}, executorService);

CompletableFuture.thenComposeAsync()支持將第一個(gè)任務(wù)的結(jié)果傳入第二個(gè)任務(wù)中。

常用 API 介紹

  1. 拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,上一個(gè)任務(wù)完成后的動(dòng)作
public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)

上面四個(gè)方法表示在當(dāng)前階段任務(wù)完成之后下一步要做什么。whenComplete 表示在當(dāng)前線程內(nèi)繼續(xù)做下一步,帶 Async 后綴的表示使用新線程去執(zhí)行。

  1. 拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,使用 handler 來(lái)處理邏輯,可以返回與第一階段處理的返回類型不一樣的返回類型。

    public <U> CompletableFuture<U>  handle(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
    public <U> CompletableFuture<U>  handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)
    

    Handler 與 whenComplete 的區(qū)別是 handler 是可以返回一個(gè)新的 CompletableFuture 類型的。

    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        return "hahaha";
    }).handle((r, e) -> {
        return 1;
    });
    
  2. 拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作, thenApply方法

    public <U> CompletableFuture<U>  thenApply(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn)
    public <U> CompletableFuture<U>  thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
    

    注意到 thenApply 方法的參數(shù)中是沒(méi)有 Throwable,這就意味著如有有異常就會(huì)立即失敗,不能在處理邏輯內(nèi)處理。且 thenApply 返回的也是新的 CompletableFuture。 這就是它與前面兩個(gè)的區(qū)別。

  3. 拿到上一個(gè)任務(wù)的結(jié)果做后續(xù)操作,可以不返回任何值,thenAccept方法

    public CompletableFuture<Void>  thenAccept(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action)
    public CompletableFuture<Void>  thenAcceptAsync(Consumer<? super T> action, Executor executor)
    

    看這里的示例:

    CompletableFuture.supplyAsync(() -> {
      return "result";
    }).thenAccept(r -> {
      System.out.println(r);
    }).thenAccept(r -> {
      System.out.println(r);
    });
    

    執(zhí)行完畢是不會(huì)返回任何值的。

CompletableFuture 的特性提現(xiàn)在執(zhí)行完 runAsync 或者 supplyAsync 之后的操作上。CompletableFuture 能夠?qū)⒒卣{(diào)放到與任務(wù)不同的線程中執(zhí)行,也能將回調(diào)作為繼續(xù)執(zhí)行的同步函數(shù),在與任務(wù)相同的線程中執(zhí)行。它避免了傳統(tǒng)回調(diào)最大的問(wèn)題,那就是能夠?qū)⒖刂屏鞣蛛x到不同的事件處理器中。

另外當(dāng)你依賴 CompletableFuture 的計(jì)算結(jié)果才能進(jìn)行下一步的時(shí)候,無(wú)需手動(dòng)判斷當(dāng)前計(jì)算是否完成,可以通過(guò) CompletableFuture 的事件監(jiān)聽(tīng)自動(dòng)去完成。

Netty 中的異步編程

說(shuō) Netty 中的異步編程之前先說(shuō)一個(gè)異步編程模型:Future/Promise異步模型。

future和promise起源于函數(shù)式編程和相關(guān)范例(如邏輯編程 ),目的是將值(future)與其計(jì)算方式(promise)分離,從而允許更靈活地進(jìn)行計(jì)算,特別是通過(guò)并行化。

Future 表示目標(biāo)計(jì)算的返回值,Promise 表示計(jì)算的方式,這個(gè)模型將返回結(jié)果和計(jì)算邏輯分離,目的是為了讓計(jì)算邏輯不影響返回結(jié)果,從而抽象出一套異步編程模型。那計(jì)算邏輯如何與結(jié)果關(guān)聯(lián)呢?它們之間的紐帶就是 callback。

引用自:https://zh.wikipedia.org/wiki/Future%E4%B8%8Epromise

在 Netty 中的異步編程就是基于該模型來(lái)實(shí)現(xiàn)。Netty 中非常多的異步調(diào)用,最簡(jiǎn)單的例子就是我們 Server 和 Client 端啟動(dòng)的例子:

Server:

1.png

Client:

2.png

Netty 中使用了一個(gè) ChannelFuture 來(lái)實(shí)現(xiàn)異步操作,看似與 Java 中的 Future 相似,我們看一下代碼:

public interface ChannelFuture extends Future<Void> {
  
}

這里 ChannelFuture 繼承了一個(gè) Future,這是 Java 中的 Future 嗎?跟下去發(fā)現(xiàn)并不是 JDK 的,而是 Netty 自己實(shí)現(xiàn)的。該類位于:io.netty.util.concurrent包中:

public interface Future<V> extends java.util.concurrent.Future<V> {
  
  // 只有IO操作完成時(shí)才返回true
  boolean isSuccess();
  // 只有當(dāng)cancel(boolean)成功取消時(shí)才返回true
  boolean isCancellable();
  // IO操作發(fā)生異常時(shí),返回導(dǎo)致IO操作以此的原因,如果沒(méi)有異常,返回null
  Throwable cause();
  // 向Future添加事件,future完成時(shí),會(huì)執(zhí)行這些事件,如果add時(shí)future已經(jīng)完成,會(huì)立即執(zhí)行監(jiān)聽(tīng)事件
  Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> addListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 移除監(jiān)聽(tīng)事件,future完成時(shí),不會(huì)觸發(fā)
  Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> listener);
  Future<V> removeListeners(GenericFutureListener<? extends Future<? super V>>... listeners);
  // 等待future done
  Future<V> sync() throws InterruptedException;
  // 等待future done,不可打斷
  Future<V> syncUninterruptibly();
  // 等待future完成
  Future<V> await() throws InterruptedException;
  // 等待future 完成,不可打斷
  Future<V> awaitUninterruptibly();
  boolean await(long timeout, TimeUnit unit) throws InterruptedException;
  boolean await(long timeoutMillis) throws InterruptedException;
  boolean awaitUninterruptibly(long timeout, TimeUnit unit);
  boolean awaitUninterruptibly(long timeoutMillis);
  // 立刻獲得結(jié)果,如果沒(méi)有完成,返回null
  V getNow();
  // 如果成功取消,future會(huì)失敗,導(dǎo)致CancellationException
  @Override
  boolean cancel(boolean mayInterruptIfRunning);
  
}

Netty 自己實(shí)現(xiàn)的 Future 繼承了 JDK 的 Future,新增了 sync()await() 用于阻塞等待,還加了 Listeners,只要任務(wù)結(jié)束去回調(diào) Listener 就可以了,那么我們就不一定要主動(dòng)調(diào)用 isDone()來(lái)獲取狀態(tài),或通過(guò) get()阻塞方法來(lái)獲取值。

Netty的 Future 與 Java 的 Future 雖然類名相同,但功能上略有不同,Netty 中引入了 Promise 機(jī)制。在 Java 的 Future 中,業(yè)務(wù)邏輯為一個(gè) Callable 或 Runnable 實(shí)現(xiàn)類,該類的 call()run()執(zhí)行完畢意味著業(yè)務(wù)邏輯的完結(jié),在 Promise 機(jī)制中,可以在業(yè)務(wù)邏輯中人工設(shè)置業(yè)務(wù)邏輯的成功與失敗,這樣更加方便的監(jiān)控自己的業(yè)務(wù)邏輯。

public interface Promise<V> extends Future<V> {
    // 設(shè)置future執(zhí)行結(jié)果為成功
    Promise<V> setSuccess(V result);
    
    // 嘗試設(shè)置future執(zhí)行結(jié)果為成功,返回是否設(shè)置成功
    boolean trySuccess(V result);
    // 設(shè)置失敗
    Promise<V> setFailure(Throwable cause);
    // 嘗試設(shè)置future執(zhí)行結(jié)果為失敗,返回是否設(shè)置成功 
    boolean tryFailure(Throwable cause);
    // 設(shè)置為不能取消
    boolean setUncancellable();
    
    // 源碼中,以下為覆蓋了Future的方法,例如;
    
    Future<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
    
    @Override
    Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener);
}

Promise 接口繼承自 Future 接口,重點(diǎn)添加了上述幾個(gè)方法,可以人工設(shè)置 future 的執(zhí)行成功與失敗,并通知所有監(jiān)聽(tīng)的 listener。

從 Future 和 Promise 提供的方法來(lái)看,F(xiàn)uture 都是 get 類型的方法,主要用來(lái)判斷當(dāng)前任務(wù)的狀態(tài)。而 Promise 中是 set 類型的方法,主要來(lái)對(duì)任務(wù)的狀態(tài)來(lái)進(jìn)行操作。這里就體現(xiàn)出來(lái)將 結(jié)果和操作過(guò)程分離的設(shè)計(jì)。

Promise 實(shí)現(xiàn)類是DefaultPromise類,該類十分重要,F(xiàn)uture 的 listener 機(jī)制也是由它實(shí)現(xiàn)的,所以我們先來(lái)分析一下該類。先來(lái)看一下它的重要屬性:

// 可以嵌套的Listener的最大層數(shù),可見(jiàn)最大值為8
private static final int MAX_LISTENER_STACK_DEPTH = Math.min(8,
                                                           SystemPropertyUtil.getInt("io.netty.defaultPromise.maxListenerStackDepth", 8));
// result字段由使用RESULT_UPDATER更新
@SuppressWarnings("rawtypes")
private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER;
private static final Signal SUCCESS = Signal.valueOf(DefaultPromise.class, "SUCCESS");
// 異步操作不可取消
private static final Signal UNCANCELLABLE = Signal.valueOf(DefaultPromise.class, "UNCANCELLABLE");
// 異步操作失敗時(shí)保存異常原因
private static final CauseHolder CANCELLATION_CAUSE_HOLDER = new CauseHolder(ThrowableUtil.unknownStackTrace(
  new CancellationException(), DefaultPromise.class, "cancel(...)"));

第一個(gè)套 listener,是指在 listener 的 operationComplete() 方法中,可以再次使用 future.addListener() 繼續(xù)添加 listener,Netty 限制的最大層數(shù)是8,用戶可使用系統(tǒng)變量io.netty.defaultPromise.maxListenerStackDepth設(shè)置。

為了更好的說(shuō)明,先寫了一個(gè)示例,Netty 中的 Future/Promise模型是可以單獨(dú)拿出來(lái)使用的。


import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.util.concurrent.DefaultPromise;
import io.netty.util.concurrent.Promise;

import java.util.concurrent.TimeUnit;

/**
 * @author rickiyang
 * @date 2020-04-19
 * @Desc TODO
 */
public class PromiseTest {

    public static void main(String[] args) {
        PromiseTest testPromise = new PromiseTest();
        Promise<String> promise = testPromise.doSomething("哈哈");
        promise.addListener(future -> System.out.println(promise.get()+", something is done"));

    }

    /**
     * 創(chuàng)建一個(gè)DefaultPromise并返回,將業(yè)務(wù)邏輯放入線程池中執(zhí)行
     * @param value
     * @return
     */
    private Promise<String> doSomething(String value) {
        NioEventLoopGroup loop = new NioEventLoopGroup();
        DefaultPromise<String> promise = new DefaultPromise<>(loop.next());
        loop.schedule(() -> {
            try {
                Thread.sleep(1000);
                promise.setSuccess("執(zhí)行成功。" + value);
                return promise;
            } catch (InterruptedException ignored) {
                promise.setFailure(ignored);
            }
            return promise;
        }, 0, TimeUnit.SECONDS);
        return promise;
    }

}

通過(guò)這個(gè)例子可以看到,Promise 能夠在業(yè)務(wù)邏輯線程中通知 Future 成功或失敗,由于 Promise 繼承了 Netty 的 Future,因此可以加入監(jiān)聽(tīng)事件。而 Future 和 Promise 的好處在于,獲取到 Promise 對(duì)象后可以為其設(shè)置異步調(diào)用完成后的操作,然后立即繼續(xù)去做其他任務(wù)。

來(lái)看一下 addListener() 方法:

@Override
public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
  checkNotNull(listener, "listener");
    //并發(fā)控制,保證多線程情況下只有一個(gè)線程執(zhí)行添加操作
  synchronized (this) {
    addListener0(listener);
  }
    // 操作完成,通知監(jiān)聽(tīng)者
  if (isDone()) {
    notifyListeners();
  }

  return this;
}



private void addListener0(GenericFutureListener<? extends Future<? super V>> listener) {
  if (listeners == null) {
    listeners = listener;
  } else if (listeners instanceof DefaultFutureListeners) {
    // 如果當(dāng)前Promise實(shí)例持有l(wèi)isteners的是DefaultFutureListeners類型,則調(diào)用它的add()方法進(jìn)行添加
    ((DefaultFutureListeners) listeners).add(listener);
  } else {
    // 步入這里說(shuō)明當(dāng)前Promise實(shí)例持有l(wèi)isteners為單個(gè)GenericFutureListener實(shí)例,需要轉(zhuǎn)換為DefaultFutureListeners實(shí)例
    listeners = new DefaultFutureListeners((GenericFutureListener<? extends Future<V>>) listeners, listener);
  }
}

這里看到有一個(gè)全局變量listeners,我們看到他的定義:

private Object listeners;

為啥會(huì)是一個(gè) Object 類型的對(duì)象呢,不是應(yīng)該是 List 或者是數(shù)組才對(duì)嘛。Netty之所以這樣設(shè)計(jì),是因?yàn)榇蠖鄶?shù)情況下 listener 只有一個(gè),用集合和數(shù)組都會(huì)造成浪費(fèi)。當(dāng)只有一個(gè) listener 時(shí),該字段為一個(gè) GenericFutureListener 對(duì)象;當(dāng)多于一個(gè) listener 時(shí),該字段為 DefaultFutureListeners ,可以儲(chǔ)存多個(gè) listener。

我們?cè)賮?lái)看 notifyListeners() 方法:

private void notifyListeners() {
  EventExecutor executor = executor();
  //當(dāng)前EventLoop線程需要檢查listener嵌套
  if (executor.inEventLoop()) {
    final InternalThreadLocalMap threadLocals = InternalThreadLocalMap.get();
    //這里是當(dāng)前l(fā)istener的嵌套層數(shù)
    final int stackDepth = threadLocals.futureListenerStackDepth();
    if (stackDepth < MAX_LISTENER_STACK_DEPTH) {
      threadLocals.setFutureListenerStackDepth(stackDepth + 1);
      try {
        notifyListenersNow();
      } finally {
        threadLocals.setFutureListenerStackDepth(stackDepth);
      }
      return;
    }
  }
    //外部線程直接提交給新線程執(zhí)行
  safeExecute(executor, new Runnable() {
    @Override
    public void run() {
      notifyListenersNow();
    }
  });
}

這里有個(gè)疑問(wèn)就是為什么要設(shè)置當(dāng)前的調(diào)用棧深度+1。

接著看真正執(zhí)行通知的方法:

private void notifyListenersNow() {
  Object listeners;
  synchronized (this) {
     // 正在通知或已沒(méi)有監(jiān)聽(tīng)者(外部線程刪除)直接返回
    if (notifyingListeners || this.listeners == null) {
      return;
    }
    notifyingListeners = true;
    listeners = this.listeners;
    this.listeners = null;
  }
  for (;;) {
    //只有一個(gè)listener
    if (listeners instanceof DefaultFutureListeners) {
      notifyListeners0((DefaultFutureListeners) listeners);
    } else {
      //有多個(gè)listener
      notifyListener0(this, (GenericFutureListener<? extends Future<V>>) listeners);
    }
    synchronized (this) {
      if (this.listeners == null) {
        // 執(zhí)行完畢且外部線程沒(méi)有再添加監(jiān)聽(tīng)者
        notifyingListeners = false;
        return;
      }
      //外部線程添加了新的監(jiān)聽(tīng)者繼續(xù)執(zhí)行
      listeners = this.listeners;
      this.listeners = null;
    }
  }
}

Netty 中 DefalutPromise 是一個(gè)非常常用的類,這是 Promise 實(shí)現(xiàn)的基礎(chǔ)。DefaultChannelPromise DefalutPromise 的子類,加入了 channel 這個(gè)屬性。

Promise 目前支持兩種類型的監(jiān)聽(tīng)器:

  • GenericFutureListener:支持泛型的 Future ;
  • GenericProgressiveFutureListener:它是GenericFutureListener的子類,支持進(jìn)度表示和支持泛型的Future 監(jiān)聽(tīng)器(有些場(chǎng)景需要多個(gè)步驟實(shí)現(xiàn),類似于進(jìn)度條那樣)。

為了讓 Promise 支持多個(gè)監(jiān)聽(tīng)器,Netty 添加了一個(gè)默認(rèn)修飾符修飾的DefaultFutureListeners類用于保存監(jiān)聽(tīng)器實(shí)例數(shù)組:

final class DefaultFutureListeners {

    private GenericFutureListener<? extends Future<?>>[] listeners;
    private int size;
    private int progressiveSize; // the number of progressive listeners
    
    // 這個(gè)構(gòu)造相對(duì)特別,是為了讓Promise中的listeners(Object類型)實(shí)例由單個(gè)GenericFutureListener實(shí)例轉(zhuǎn)換為DefaultFutureListeners類型
    @SuppressWarnings("unchecked")
    DefaultFutureListeners(GenericFutureListener<? extends Future<?>> first, GenericFutureListener<? extends Future<?>> second) {
        listeners = new GenericFutureListener[2];
        listeners[0] = first;
        listeners[1] = second;
        size = 2;
        if (first instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
        if (second instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void add(GenericFutureListener<? extends Future<?>> l) {
        GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        final int size = this.size;
        // 注意這里,每次擴(kuò)容數(shù)組長(zhǎng)度是原來(lái)的2倍
        if (size == listeners.length) {
            this.listeners = listeners = Arrays.copyOf(listeners, size << 1);
        }
        // 把當(dāng)前的GenericFutureListener加入數(shù)組中
        listeners[size] = l;
        // 監(jiān)聽(tīng)器總數(shù)量加1
        this.size = size + 1;
        // 如果為GenericProgressiveFutureListener,則帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量加1
        if (l instanceof GenericProgressiveFutureListener) {
            progressiveSize ++;
        }
    }

    public void remove(GenericFutureListener<? extends Future<?>> l) {
        final GenericFutureListener<? extends Future<?>>[] listeners = this.listeners;
        int size = this.size;
        for (int i = 0; i < size; i ++) {
            if (listeners[i] == l) {
                // 計(jì)算需要需要移動(dòng)的監(jiān)聽(tīng)器的下標(biāo)
                int listenersToMove = size - i - 1;
                if (listenersToMove > 0) {
                    // listenersToMove后面的元素全部移動(dòng)到數(shù)組的前端
                    System.arraycopy(listeners, i + 1, listeners, i, listenersToMove);
                }
                // 當(dāng)前監(jiān)聽(tīng)器總量的最后一個(gè)位置設(shè)置為null,數(shù)量減1
                listeners[-- size] = null;
                this.size = size;
                // 如果監(jiān)聽(tīng)器是GenericProgressiveFutureListener,則帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量減1
                if (l instanceof GenericProgressiveFutureListener) {
                    progressiveSize --;
                }
                return;
            }
        }
    }
    
    // 返回監(jiān)聽(tīng)器實(shí)例數(shù)組
    public GenericFutureListener<? extends Future<?>>[] listeners() {
        return listeners;
    }
    
    // 返回監(jiān)聽(tīng)器總數(shù)量
    public int size() {
        return size;
    }
    
    // 返回帶進(jìn)度指示的監(jiān)聽(tīng)器總數(shù)量
    public int progressiveSize() {
        return progressiveSize;
    }
}

以上就是關(guān)于 Promise 和監(jiān)聽(tīng)器相關(guān)的實(shí)現(xiàn)分析,再回到之前的啟動(dòng)類,是不是還有一個(gè) sync() 方法:

@Override
public Promise<V> sync() throws InterruptedException {
  await();
  rethrowIfFailed();
  return this;
}


public Promise<V> await() throws InterruptedException {
  // 異步操作已經(jīng)完成,直接返回
  if (isDone()) {
    return this;    
  }
  if (Thread.interrupted()) {
    throw new InterruptedException(toString());
  }
  // 死鎖檢測(cè)
  checkDeadLock();
  // 同步使修改waiters的線程只有一個(gè)
  synchronized (this) {
    while (!isDone()) { // 等待直到異步操作完成
      incWaiters();   // ++waiters;
      try {
        wait(); // JDK方法
      } finally {
        decWaiters(); // --waiters
      }
    }
  }
  return this;
}

這里其實(shí)就是一個(gè)同步檢測(cè)當(dāng)前事件是否完成的過(guò)程。

以上就是 Netty 中實(shí)現(xiàn)的 Future/Promise 異步回調(diào)機(jī)制。實(shí)現(xiàn)并不是很難懂,代碼很值得學(xué)習(xí)。除了 Netty 中實(shí)現(xiàn)了 Future/Promise模型,在Guava中也有相關(guān)的實(shí)現(xiàn),大家日常使用可以看習(xí)慣引用相關(guān)的包。

Guava實(shí)現(xiàn):

<dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>21.0</version>
</dependency>
  
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newSingleThreadExecutor());
ListenableFuture<Integer> future = service.submit(new Callable<Integer>() {
    public Integer call() throws Exception {
        TimeUnit.SECONDS.sleep(5);
        return 100;
    }
});
Futures.addCallback(future, new FutureCallback<Integer>() {
    public void onSuccess(Integer result) {
        System.out.println("success:" + result);
    }

    public void onFailure(Throwable throwable) {
        System.out.println("fail, e = " + throwable);
    }
});

Thread.currentThread().join();
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容