Netty源碼分析1 - Promise 異步框架的設(shè)計(jì)

promise-framework是一個(gè)從netty抽取出來(lái)的通用promise異步框架,并簡(jiǎn)化了Listener架構(gòu)。

github:https://github.com/zhaojigang/concurrent-framework
  • 一、使用姿勢(shì)
  • 二、代碼架構(gòu)
  • 三、代碼分析
  • 附、bug記錄

一、使用姿勢(shì)

1.1、回調(diào)方式(推薦 - 完全異步)

    @Test
    public void testListenerNotifyLater() {
        int numListenersBefore = 2; // 設(shè)置結(jié)果前設(shè)置兩個(gè)listener
        int numListenersAfter = 3; // 設(shè)置結(jié)果后設(shè)置三個(gè)listener

        CountDownLatch latch = new CountDownLatch(numListenersBefore + numListenersAfter);
        DefaultPromise<Void> promise = new DefaultPromise<>();

        for (int i = 0; i < numListenersBefore; i++) {
            promise.addListener(new FutureListener<Void>() {
                @Override
                public void operationComplete(Future<Void> future) throws Exception {
                    latch.countDown();
                }
            });
        }

        new Thread(new Runnable() {
            @Override
            public void run() {
                promise.setSuccess(null);

                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < numListenersAfter; i++) {
                            promise.addListener(future -> {
                                latch.countDown();
                            });
                        }
                    }
                }).start();
            }
        }).start();

        try {
            Assert.assertTrue(latch.await(100, TimeUnit.SECONDS), "expect notify " + (numListenersBefore + numListenersAfter) + " listeners");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

流程(非常重要):

  • 首先在main線程中為DefaultPromise實(shí)例添加了兩個(gè)listener
  • 之后啟動(dòng)另外一個(gè)線程A去設(shè)置值(此時(shí)就會(huì)回調(diào)已經(jīng)加入到當(dāng)前的DefaultPromise實(shí)例中的兩個(gè)listener#operationComplete(Future<Void> future),然后刪除這兩個(gè)listener,也就是說(shuō)一個(gè)listener只能被通知一遍)
  • 之后線程A又啟動(dòng)了另外的一條線程B為當(dāng)前的DefaultPromise實(shí)例添加了3個(gè)listener,注意,此時(shí)每添加一個(gè)listener,就會(huì)立即回調(diào)其operationComplete方法,因?yàn)楫?dāng)前的DefaultPromise.isDone()==true了,就是說(shuō)當(dāng)前的DefaultPromise實(shí)例已經(jīng)完成了。

1.2、阻塞get方式 - (不推薦 - 可能阻塞)

    @Test
    private void testFutureStyleWithWaitNotifyAll() throws ExecutionException, InterruptedException {
        Promise<Model> promise = new DefaultPromise<>();

        /**
         * 一個(gè)線程在執(zhí)行g(shù)et(),進(jìn)行wait()
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Object result = promise.get();// 等待條件
                    // 之后做相應(yīng)的業(yè)務(wù)邏輯
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        // sleep 2s 使第一個(gè)線程先等待著
        Thread.sleep(2000);

        /**
         * 另外一個(gè)線程在設(shè)置值,notifyAll喚醒wait()線程
         */
        new Thread(new Runnable() {
            @Override
            public void run() {
                promise.setSuccess(new Model(1L));
            }
        }).start();
    }

步驟:

  • 標(biāo)準(zhǔn)的future阻塞姿勢(shì):一個(gè)線程執(zhí)行g(shù)et(),進(jìn)行wait()阻塞;另外一個(gè)線程設(shè)置值,執(zhí)行notifyAll()

二、代碼架構(gòu)

DefaultPromise.png

說(shuō)明:

  • java.util.concurrent.Future:Java并發(fā)包提供的Future類。
  • io.hulk.promise.framework.Future:繼承java.util.concurrent.Future,增強(qiáng)功能。
  • AbstractFuture:實(shí)現(xiàn)了java.util.concurrent.Future的get()和get(long timeout, TimeUnit unit)阻塞等待模式,使用模板模式搭建這兩個(gè)方法的基本骨架。
  • Promise:可寫的Future,提供setSuccess()等接口方法。
  • DefaultPromise:最終的實(shí)現(xiàn)類,該實(shí)現(xiàn)類實(shí)現(xiàn)了觀察者模式。

三、代碼分析

3.1 java.util.concurrent.Future

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

方法較為簡(jiǎn)陋,例如只有isDone(),而沒(méi)有isSuccess()這樣的方法,沒(méi)有添加listener的接口,也沒(méi)有設(shè)置是否可以取消的接口。所以使用io.hulk.promise.framework.Future增強(qiáng)java.util.concurrent.Future。

3.2 io.hulk.promise.framework.Future

/**
 * from netty4.1
 */
public interface Future<V> extends java.util.concurrent.Future<V> {
    /**
     * Returns {@code true} if and only if the I/O operation was completed successfully.
     */
    boolean isSuccess();

    /**
     * returns {@code true} if and only if the operation can be cancelled via {@link #cancel(boolean)}.
     */
    boolean isCancellable();

    /**
     * Returns the cause of the failed I/O operation if the I/O operation has failed.
     *
     * @return the cause of the failure. {@code null} if succeeded or this future is not completed yet.
     */
    Throwable cause();

    /**
     * Adds the specified listener to this future.
     * The specified listener is notified when this future is {@linkplain #isDone() done}.
     * If this future is already completed, the specified listener is notified immediately.
     */
    Future<V> addListener(FutureListener<V> listener);

    /**
     * Adds the specified listeners to this future.
     * The specified listeners is notified when this future is {@linkplain #isDone() done}.
     * If this future is already completed, the specified listeners is notified immediately.
     */
    Future<V> addListeners(List<FutureListener<V>> listeners);

    /**
     * Removes the first occurrence of the specified listener from this future.
     * The specified listener is no longer notified when this future is {@linkplain #isDone() done}.
     * If the specified listener is not associated with this future, this method does nothing and returns silently.
     */
    Future<V> removeListener(FutureListener<V> listener);

    /**
     * Removes the first occurrence for each of the listeners from this future.
     * The specified listeners is no longer notified when this future is {@linkplain #isDone() done}.
     * If the specified listeners is not associated with this future, this method does nothing and returns silently.
     */
    Future<V> removeListeners(List<FutureListener<V>> listeners);

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> sync() throws InterruptedException;

    /**
     * Waits for this future until it is done, and rethrows the cause of the failure if this future failed.
     * This method catches an {@link InterruptedException} and discards it silently.
     * 即不響應(yīng)中斷
     */
    Future<V> syncUninterruptibly();

    /**
     * Waits for this future to be completed.
     *
     * @throws InterruptedException
     *         if the current thread was interrupted
     */
    Future<V> await() throws InterruptedException;

    /**
     * Waits for this future to be completed without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    Future<V> awaitUninterruptibly();

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit
     * @throws InterruptedException if the current thread was interrupted
     */
    boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException;

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    boolean awaitUninterruptibly(long timeout, TimeUnit timeUnit);

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit
     * @throws InterruptedException if the current thread was interrupted
     */
    boolean await(long timeoutMillis) throws InterruptedException;

    /**
     * Waits for this future to be completed within the specified time limit.
     *
     * @return {@code true} if and only if the future was completed within the specified time limit without interruption.
     * This method catches an {@link InterruptedException} and discards it silently.
     */
    boolean awaitUninterruptibly(long timeoutMillis);

    /**
     * Return the result without blocking. If the future is not done yet this will return {@code null}.
     *
     * As it is possible that a {@code null} value is used to mark the future as successful you also need to check
     * if the future is really done with {@link #isDone()} and not relay on the returned {@code null} value.
     */
    V getNow();

    /**
     * {@inheritDoc}
     *
     * If the cancellation was successful it will fail the future with an {@link java.util.concurrent.CancellationException}.
     *
     */
    @Override
    boolean cancel(boolean mayInterruptIfRunning); // {@inheritDoc} 用在一個(gè)@Override的方法上,表示為父類的方法添加詳細(xì)的注釋
}

3.3 FutureListener

/**
 * Listens to the result of a {@link Future}.
 * The result of the asynchronous operation is notified once
 * this listener is added by calling {@link Future#addListener(FutureListener)}.
 *
 * @author zhaojigang
 * @date 2018/7/16
 */
public interface FutureListener<V> {
    /**
     * Invoked when the operation associated with the {@link Future} has been completed.
     *
     * @param future the source {@link Future} which called this callback
     */
    void operationComplete(Future<V> future) throws Exception;
}

注意:operationComplete(Future<V> future)中的future參數(shù)就是調(diào)用該方法的那個(gè)Future實(shí)例,在DefaultPromise中會(huì)有體現(xiàn)。

3.4 AbstractFuture

/**
 * from netty4.1
 */
public abstract class AbstractFuture<V> implements Future<V>{
    @Override
    public V get() throws InterruptedException, ExecutionException {
        /**
         * 阻塞等到await()調(diào)用完成,即失敗或返回結(jié)果
         */
        await();
        /**
         * 獲取失敗異常信息
         */
        Throwable cause = cause();
        /**
         * 如果異常信息為null,直接獲取響應(yīng)結(jié)果
         */
        if (cause == null) {
            return getNow();
        }
        /**
         * 如果返回結(jié)果result == CancellationException(即執(zhí)行了cancel()),則拋出該異常
         * 否則,拋出ExecutionException
         */
        if (cause instanceof CancellationException) {
            throw (CancellationException)cause;
        }
        throw new ExecutionException(cause);
    }

    @Override
    public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        if(await(timeout, unit)) {
            Throwable cause = cause();
            if (cause == null) {
                return getNow();
            }
            if (cause instanceof CancellationException) {
                throw (CancellationException)cause;
            }
            throw new ExecutionException(cause);
        }
        /**
         * 如果沒(méi)有在指定的時(shí)間內(nèi)await沒(méi)有完成,拋出超時(shí)異常
         */
        throw new TimeoutException();

    }
}

使用模板模式,定義好get()和get(long timeout, TimeUnit unit)的基本框架,至于具體await()/await(timeout, unit)/cause()/getNow()等方法就由具體的類來(lái)實(shí)現(xiàn)了。

值得注意的是:

  • 如果一個(gè)task被成功的cancel()了,會(huì)直接拋出CancellationException。
  • get()/get(long timeout, TimeUnit unit)是阻塞獲取結(jié)果的,所以netty不推薦使用這種方式。

3.5 Promise

/**
 * from netty4.1
 * Special {@link Future} which is writable.
 *
 * 添加設(shè)置操作
 * 將Future中返回值為Future的全部override為Promise
 * @author zhaojigang
 * @date 2018/7/16
 */
public interface Promise<V> extends Future<V> {
    /**
     * Marks this future as a success and notifies all listeners.
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setSuccess(V result);

    /**
     * Marks this future as a success and notifies all listeners.
     *
     * @return {@code true} if and only if successfully marked this future as a success.
     *         Otherwise {@code false} because this future is already marked as either a success or a failure.
     */
    boolean trySuccess(V result);

    /**
     * Marks this future as a failure and notifies all listeners.
     * If it is success or failed already it will throw an {@link IllegalStateException}.
     */
    Promise<V> setFailure(Throwable cause);

    /**
     * Marks this future as a failure and notifies all listeners.
     *
     * @return {@code true} if and only if successfully marked this future as a failure.
     *         {@code false} because this future is already marked as either a success or a failure.
     */
    boolean tryFailure(Throwable cause);

    /**
     * Make this future impossible to cancel.
     *
     * @return {@code true} if and only if successfully marked this future as uncancellable
     *                      or it is already done without being cancelled.
     *         {@code false} if this future has been cancelled already.
     */
    boolean setUncancellable();

    @Override
    Promise<V> addListener(FutureListener<V> listener);

    @Override
    Promise<V> addListeners(List<FutureListener<V>> listeners);

    @Override
    Promise<V> removeListener(FutureListener<V> listener);

    @Override
    Promise<V> removeListeners(List<FutureListener<V>> listeners);

    @Override
    Promise<V> sync() throws InterruptedException;

    @Override
    Promise<V> syncUninterruptibly();

    @Override
    Promise<V> await() throws InterruptedException;

    @Override
    Promise<V> awaitUninterruptibly();
}

Promise是一種可進(jìn)行寫的Future,具有設(shè)置成功結(jié)果,設(shè)置失敗結(jié)果等功能,這樣可以在成功或失敗的時(shí)候回調(diào)注冊(cè)到當(dāng)前Promise實(shí)例的listeners了。就是一種完全異步的方式了,而AbstractFuture#get可能需要阻塞,所以netty推薦我們使用listener回調(diào)模式。

3.6 DefaultPromise

/**
 * from netty4.1
 * <p>
 * 一、DefaultPromise狀態(tài)轉(zhuǎn)換圖:
 * A {@link DefaultPromise} is either <em>uncompleted</em> or <em>completed</em>.
 * When an I/O operation begins, a new future object is created.
 * The new future is uncompleted initially - it is neither succeeded, failed, nor cancelled
 * because the I/O operation is not finished yet.
 * If the I/O operation is finished either successfully, with failure, or by cancellation,
 * the future is marked as completed with more specific information, such as the cause of the
 * failure.
 * Please note that even failure and cancellation belong to the completed state.
 * <pre>
 *                                      +---------------------------+
 *                                      | Completed successfully    |
 *                                      +---------------------------+
 *                                 +---->      isDone() = true      |
 * +--------------------------+    |    |   isSuccess() = true      |
 * |        Uncompleted       |    |    +===========================+
 * +--------------------------+    |    | Completed with failure    |
 * |      isDone() = false    |    |    +---------------------------+
 * |   isSuccess() = false    |----+---->      isDone() = true      |
 * | isCancelled() = false    |    |    |       cause() = non-null  |
 * |       cause() = null     |    |    +===========================+
 * +--------------------------+    |    | Completed by cancellation |
 *                                 |    +---------------------------+
 *                                 +---->      isDone() = true      |
 *                                      | isCancelled() = true      |
 *                                      +---------------------------+
 * </pre>
 * <p>
 * <p>
 * 二、DefaultPromise實(shí)現(xiàn)了兩種執(zhí)行機(jī)制:
 * 1、future:wait/notify實(shí)現(xiàn),可能要阻塞,使用方最終調(diào)用到DefaultPromise父類AbstractFuture#get或者DefaultPromise#syncXxx
 * 2、listener:其實(shí)就是callback實(shí)現(xiàn),不需要阻塞,當(dāng)setSuccess/trySuccess/setFailure/tryFailure/cancel會(huì)直接調(diào)用listener(回調(diào)函數(shù))當(dāng)然如果有等待條件的其他線程,也會(huì)notifyAll
 * <p>
 * 推薦使用第二種,完全異步的。
 */
public class DefaultPromise<V> extends AbstractFuture<V> implements Promise<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPromise.class);
    /**
     * 返回結(jié)果result的原子更新器
     */
    private static final AtomicReferenceFieldUpdater<DefaultPromise, Object> RESULT_UPDATER
            = AtomicReferenceFieldUpdater.newUpdater(DefaultPromise.class, Object.class, "result");
    /**
     * 返回結(jié)果
     */
    private volatile Object result;
    /**
     * 成功的返回標(biāo)記
     */
    private static final Object SUCCESS = new Object();
    /**
     * 不可取消的標(biāo)記
     */
    private static final Object UNCANCELLABLE = new Object();
    /**
     * wait線程的數(shù)量,注意該參數(shù)的修改要進(jìn)行同步(恰好該參數(shù)的所有修改地方都在一個(gè)synchronized中)
     */
    private short waiters;
    /**
     * cancel()時(shí)要將此項(xiàng)異常塞入result
     */
    private static final Throwable CANCELLATION_CAUSE = new CancellationException(DefaultPromise.class.getName() + " invoked cancel()");
    /**
     * Threading - synchronized(this) 事件監(jiān)聽器列表
     * If {@code empty}, it means either 1) no listeners were added yet or 2) all listeners were notified.
     * 也就是說(shuō) 一個(gè)listener通知過(guò)一次就會(huì)被刪除,不會(huì)再通知第二次
     */
    private List<FutureListener<V>> listeners;
    /**
     * Threading - synchronized(this). We must prevent concurrent notification and FIFO listener notification.
     */
    private boolean notifyingListeners;

    /**
     * future返回是否成功
     *
     * @return {@code true} 結(jié)果不為空 && 結(jié)果不是Throwable(失?。?amp;& 結(jié)果不是UNCANCELLABLE(不能取消)
     */
    @Override
    public boolean isSuccess() {
        /**
         * 將成員變量result轉(zhuǎn)換為局部變量進(jìn)行操作的原因?
         *
         * 一、是因?yàn)?在下面的代碼中會(huì)多次調(diào)用this.result,當(dāng)外界的this.result引用發(fā)生變化時(shí),由于this.result是被volatile修飾的,如果直接使用this.result將會(huì)導(dǎo)致多次獲取的result不同,
         * 但是this.result引用發(fā)生變化時(shí),局部變量result不會(huì)發(fā)生變化(注意修改的是this.result引用的值,而不是this.result指向的地址的值,類似下邊的程序)
         * <pre>
         *         public static void main(String[] args) {
         *          DefaultPromiseTest test = new DefaultPromiseTest();
         *
         *          Model m2 = test.m;
         *          System.out.println(m2);
         *
         *          test.m = new Model(200L); // 注意:這里不是this.m.setId(300),所以下面的m2不變
         *          System.out.println(m2);
         *        }
         *
         * </pre>
         *
         * 二、由于this.result是被volatile修飾的,每次獲取都要強(qiáng)制從主存中獲取,無(wú)法從工作線程直接獲取,所以代價(jià)較大,而且將頻繁操作的成員變量局部化更方便JIT優(yōu)化
         * https://blog.csdn.net/shaomingliang499/article/details/50549306
         */
        Object result = this.result;
        return result != null && !(result instanceof Throwable) && result != UNCANCELLABLE;
    }

    /**
     * 等待線程是否可取消
     *
     * @return {@code true} 如果返回結(jié)果result為null,表示沒(méi)有返回成功,也沒(méi)有返回失敗,也沒(méi)有設(shè)置不可取消,此時(shí)可以取消
     */
    @Override
    public boolean isCancellable() {
        return result == null;
    }

    /**
     * 查詢cause:如果result instanceof Throwable,那么表示返回結(jié)果出錯(cuò)了,否則 cause = null,表示一定沒(méi)有錯(cuò)誤
     *
     * @return
     */
    @Override
    public Throwable cause() {
        Object result = this.result;
        return result instanceof Throwable ? (Throwable) result : null;
    }

    @Override
    public Promise<V> setSuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean trySuccess(V result) {
        if (setSuccess0(result)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public Promise<V> setFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return this;
        }
        throw new IllegalStateException("complete already: " + this);
    }

    @Override
    public boolean tryFailure(Throwable cause) {
        if (setFailure0(cause)) {
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public boolean setUncancellable() {
        Object result = this.result;
        /**
         * 從uncompleted設(shè)置為UNCANCELLABLE,如果設(shè)置成功,直接返回
         */
        if (result == UNCANCELLABLE || RESULT_UPDATER.compareAndSet(this, null, UNCANCELLABLE)) {
            return true;
        }

        /**
         * 如果completed 而且又沒(méi)被cancel(),此時(shí)返回true
         */
        return isDone0(result) && !isCancelled();
    }

    @Override
    public Promise<V> addListener(FutureListener<V> listener) {
        ObjectUtil.checkNotNull(listener, "listener");
        /**
         * 防止多個(gè)線程同時(shí)操作listeners隊(duì)列
         */
        synchronized (this) {
            if (listeners == null) {
                listeners = new ArrayList<>();
            }
            listeners.add(listener);
        }

        /**
         * 如果該listener是后加入的,則直接喚醒
         */
        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> addListeners(List<FutureListener<V>> listeners) {
        ObjectUtil.checkNotNull(listeners, "listeners");
        synchronized (this) {
            if (this.listeners == null) {
                listeners = new ArrayList<>();
            }
            this.listeners.addAll(listeners);
        }

        if (isDone()) {
            notifyListeners();
        }

        return this;
    }

    @Override
    public Promise<V> removeListener(FutureListener<V> listener) {
        ObjectUtil.checkNotNull(listeners, "listeners");
        ObjectUtil.checkNotNull(listener, "listener");
        synchronized (this) {
            listeners.remove(listener);
        }
        return this;
    }

    @Override
    public Promise<V> removeListeners(List<FutureListener<V>> listeners) {
        ObjectUtil.checkNotNull(this.listeners, "listeners");
        ObjectUtil.checkNotNull(listeners, "listeners");
        synchronized (this) {
            this.listeners.removeAll(listeners);
        }
        return this;
    }

    @Override
    public Promise<V> sync() throws InterruptedException {
        /**
         * 如果await()發(fā)生了異常,這里正好直接拋出
         */
        await();
        /**
         * 如果await()返回了錯(cuò)誤,也直接拋出
         */
        rethrowIfFailed();
        return this;
    }

    private void rethrowIfFailed() {
        Throwable cause = cause();
        if (cause == null) {
            return;
        }
        throw (RuntimeException) cause;
    }

    @Override
    public Promise<V> syncUninterruptibly() {
        awaitUninterruptibly();
        rethrowIfFailed();
        return this;
    }

    @Override
    public Promise<V> await() throws InterruptedException {
        if (isDone()) {
            return this;
        }

        if (Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        /**
         * wait()/notify()機(jī)制:
         * 前提:每個(gè)對(duì)象都有一個(gè)鎖 + 一個(gè)鎖等待隊(duì)列 + 一個(gè)條件等待隊(duì)列。
         * 線程協(xié)作:wait()/notify()通常都是由兩個(gè)線程來(lái)協(xié)作的,一個(gè)wait()等待條件,另一個(gè)notify()喚醒等待線程
         * 為什么加鎖:wait()/notify()是必須加鎖執(zhí)行的(內(nèi)部執(zhí)行機(jī)制),否則會(huì)拋出異常IllegalMonitorStateException,鎖對(duì)象是當(dāng)前實(shí)例。
         *
         * wait內(nèi)部執(zhí)行機(jī)制:
         * 1、把當(dāng)前線程放入鎖對(duì)象的條件等待隊(duì)列,之后釋放鎖(注意:一定會(huì)釋放鎖,否則notify的線程將無(wú)法獲取該對(duì)象鎖),進(jìn)入阻塞狀態(tài)WAITING或TIMED_WAITING
         * 2、當(dāng)?shù)却龝r(shí)間到了或者被其他線程notify/notifyAll了,則等待的當(dāng)前線程從條件等待隊(duì)列中移除出來(lái),之后再嘗試獲取鎖,如果沒(méi)有獲取鎖,進(jìn)入鎖等待隊(duì)列,線程狀態(tài)改為BLOCKED;如果獲取了鎖,從wait調(diào)用中返回
         *
         * 為什么要寫成:
         * <pre>
         *    while (!isDone()) {
         *      wait();
         *    }
         * </pre>
         * 而不是
         * <pre>
         *     if(!isDone()) {
         *         wait();
         *     }
         * </pre>
         *
         * wait()表示阻塞等待,正常情況下while和if形式是等價(jià)的,但是為了防止wait()被意外喚醒,所以需要在wait()之后繼續(xù)進(jìn)行判斷
         */
        synchronized (this) {
            while (!isDone()) {
                /**
                 * 執(zhí)行wait()之前:waiters加1
                 */
                incWaiters();
                try {
                    wait();
                } finally {
                    /**
                     * wait()結(jié)束之后,waiters減1
                     */
                    decWaiters();
                }
            }
        }
        return this;
    }

    @Override
    public Promise<V> awaitUninterruptibly() {
        if (isDone()) {
            return this;
        }

        boolean interrupted = false;
        synchronized (this) {
            while (!isDone()) {
                incWaiters();
                try {
                    wait();
                } catch (InterruptedException e) {
                    // Interrupted while waiting.
                    interrupted = true;
                } finally {
                    decWaiters();
                }
            }
        }

        /**
         * 捕獲了中斷異常,默默執(zhí)行中斷
         */
        if (interrupted) {
            Thread.currentThread().interrupt();
        }

        return this;
    }

    @Override
    public boolean await(long timeout, TimeUnit timeUnit) throws InterruptedException {
        return await0(timeUnit.toNanos(timeout), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeout, TimeUnit timeUnit) {
        try {
            return await0(timeUnit.toNanos(timeout), false);
        } catch (InterruptedException e) {
            throw new InternalError();
        }
    }

    @Override
    public boolean await(long timeoutMillis) throws InterruptedException {
        return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), true);
    }

    @Override
    public boolean awaitUninterruptibly(long timeoutMillis) {
        try {
            return await0(TimeUnit.MILLISECONDS.toNanos(timeoutMillis), false);
        } catch (InterruptedException e) {
            throw new InternalError();
        }
    }

    @Override
    public V getNow() {
        Object result = this.result;
        if (result instanceof Throwable || result == SUCCESS || result == UNCANCELLABLE) {
            return null;
        }
        return (V) result;
    }

    /**
     * 查看java.util.concurrent.Future#cancel()的注釋,
     * This attempt will fail if the task has already completed(成功 || 失敗 || 已被取消), has already been cancelled,
     * or could not be cancelled for some other reason
     *
     * @param mayInterruptIfRunning this value has no effect in this implementation.
     * @return
     */
    @Override
    public boolean cancel(boolean mayInterruptIfRunning) {
        if (RESULT_UPDATER.compareAndSet(this, null, CANCELLATION_CAUSE)) {
            checkNotifyWaiters();
            notifyListeners();
            return true;
        }
        return false;
    }

    @Override
    public boolean isCancelled() {
        return result instanceof CancellationException;
    }

    @Override
    public boolean isDone() {
        return isDone0(this.result);
    }

    /**
     * 分析并發(fā)問(wèn)題:
     * 1、假設(shè)沒(méi)有notifyingListeners:
     * 當(dāng)前線程A執(zhí)行到while(true)的時(shí)候,假設(shè)另一條線程B也添加了FutureListener并進(jìn)入了第一個(gè)同步塊,此時(shí)線程B也進(jìn)入了while(true),
     * B開始執(zhí)行后來(lái)的這些FutureListeners,之后A才開始執(zhí)行一開始的FutureListeners,這樣就不能保證FIFO的執(zhí)行FutureListener
     * <p>
     * 2、加入notifyingListeners:
     * 在線程A執(zhí)行到第二個(gè)synchronized塊中的if (this.listeners == null)中之前,線程B進(jìn)入第一個(gè)同步塊,由于notifyingListeners = true,則直接返回了,
     * 而B后來(lái)添加的FutureListeners,A會(huì)在第二個(gè)同步快判斷的時(shí)候發(fā)現(xiàn)當(dāng)前的this.listeners.size>0,會(huì)繼續(xù)賦值給本地變量繼續(xù)第二輪循環(huán).
     * <p>
     * 這里有一個(gè)疑問(wèn):當(dāng)外界的this.listeners發(fā)生變化時(shí),temListeners是否變化,假設(shè)A執(zhí)行到while(true),B執(zhí)行了addListener,則此時(shí)外界的this.listener改變了值,temListener是否發(fā)生變化
     */
    private void notifyListeners() {
        List<FutureListener<V>> temListeners;
        synchronized (this) {
            if (notifyingListeners || this.listeners == null) {
                return;
            }
            notifyingListeners = true;
            temListeners = this.listeners;
            this.listeners = null; // 通知完之后就置空,不再通知第二次
        }

        while (true) {
            notifyListeners0(temListeners);
            synchronized (this) {
                if (this.listeners == null) {
                    notifyingListeners = false;
                    return;
                }
                temListeners = this.listeners;
                this.listeners = null;
            }
        }
    }

    private void notifyListeners0(List<FutureListener<V>> listeners) {
        for (FutureListener<V> listener : listeners) {
            try {
                listener.operationComplete(this);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 設(shè)置成功標(biāo)志
     * result != null ? result : SUCCESS
     */
    private boolean setSuccess0(V result) {
        return setValue0(result != null ? result : SUCCESS);
    }

    /**
     * 設(shè)置失敗標(biāo)志
     * result == cause
     */
    private boolean setFailure0(Throwable cause) {
        return setValue0(ObjectUtil.checkNotNull(cause, "cause"));
    }

    private boolean setValue0(Object result) {
        /**
         * 更新result結(jié)果,喚醒所有阻塞線程
         * 將result從null置為result 或者 從UNCANCELLABLE置為result(因?yàn)橛锌赡苁窍葘esult置為UNCANCELLABLE的)
         */
        if (RESULT_UPDATER.compareAndSet(this, null, result) ||
                RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, result)) {
            checkNotifyWaiters();
            return true;
        }
        return false;
    }

    private void checkNotifyWaiters() {
        synchronized (this) {
            if (waiters > 0) {
                notifyAll();
            }
        }
    }

    public boolean isDone0(Object result) {
        return result != null && result != UNCANCELLABLE;
    }

    private void incWaiters() {
        if (++waiters > Short.MAX_VALUE) {
            throw new IllegalStateException("too many waiters :" + this);
        }
    }

    private void decWaiters() {
        --waiters;
    }

    /**
     * 關(guān)于中斷:
     * <p>
     * 1、前提:線程有六種狀態(tài),{@link Thread#getState()}
     * NEW:A thread that has not yet started is in this state.
     * RUNNABLE:A thread executing in the Java virtual machine is in this state.
     * it may be waiting for other resources from the operating system such as processor.
     * BLOCKED:A thread that is blocked waiting for a monitor lock is in this state.
     * A thread in the blocked state is waiting for a monitor lock
     * to enter a synchronized block/method or
     * reenter a synchronized block/method after calling {@link Object#wait() Object.wait}.
     * WAITING:A thread that is waiting indefinitely for another thread to perform a particular action is in this state.
     * TIMED_WAITING:A thread that is waiting for another thread to perform an action for up to a specified waiting time is in this state.
     * TERMINATED:A thread that has exited is in this state.
     * <p>
     * 2、在不同階段調(diào)用中斷Thread.currentThread().interrupt()
     * NEW/TERMINATED:interrupt()中斷沒(méi)有任何效果,中斷位isInterrupted=false
     * RUNNABLE: interrupt()中斷沒(méi)有效果,中斷位isInterrupted=true,在run()方法中自己選擇合適的點(diǎn)去處理
     * BLOCKED:interrupt()中斷位isInterrupted=true,不會(huì)使當(dāng)前線程跳出鎖等待隊(duì)列,也就是說(shuō)依然在等待鎖
     * WAITING/TIMED_WAITING: interrupt()拋出InterruptedException,設(shè)置isInterrupted=false,所以根據(jù)需要,需要自己去設(shè)置中斷位
     *
     * @param timeoutNanos  納秒級(jí)別的超時(shí)時(shí)間
     * @param interruptable 是否可中斷
     * @return
     * @throws InterruptedException
     */
    private boolean await0(long timeoutNanos, boolean interruptable) throws InterruptedException {
        /**
         * 如果completed,直接返回
         */
        if (isDone()) {
            return true;
        }

        /**
         * 如果傳入的超時(shí)時(shí)間<=0,直接result
         */
        if (timeoutNanos <= 0) {
            return isDone();
        }

        /**
         * 如果可中斷 && 線程已被中斷,拋出中斷異常
         */
        if (interruptable && Thread.interrupted()) {
            throw new InterruptedException(toString());
        }

        boolean interrupted = false;
        long startTimeNanos = System.nanoTime();
        try {
            while (true) {
                synchronized (this) {
                    if (isDone()) {
                        return true;
                    }
                    incWaiters();
                    try {
                        wait(timeoutNanos / 1000000, (int) timeoutNanos % 1000000);
                    } catch (InterruptedException e) {
                        if (interruptable) {
                            throw e;
                        } else {
                            /**
                             * 對(duì)于中斷來(lái)講,拋出了中斷異常時(shí),Thread.currentThread().isInterrupted() == false,即不會(huì)設(shè)置中斷標(biāo)志位。
                             * 需要通過(guò)Thread.currentThread().interrupt()來(lái)設(shè)置中斷標(biāo)志位,來(lái)使外界自己根據(jù)中斷位去做一些事
                             * Waits for this future to be completed without interruption. 所以在最后的finally才會(huì)中斷
                             */
                            interrupted = true;
                        }
                    } finally {
                        decWaiters();
                    }
                }

                if (isDone()) {
                    return true;
                }

                /**
                 * 防護(hù)性判斷
                 */
                if (System.nanoTime() - startTimeNanos >= timeoutNanos) {
                    return isDone();
                }
            }
        } finally {
            if (interrupted) {
                /**
                 * 此時(shí)線程處于RUNNABLE狀態(tài),執(zhí)行interrupt()設(shè)置中斷標(biāo)志位
                 */
                Thread.currentThread().interrupt();
            }
        }
    }

    @Override
    public String toString() {
        StringBuilder builder = new StringBuilder();
        builder.append(this.getClass().getSimpleName()).append("@").append(Integer.toHexString(hashCode()));

        Object result = this.result;
        if (result == SUCCESS) {
            builder.append("success");
        } else if (result == UNCANCELLABLE) {
            builder.append("uncancellable");
        } else if (result instanceof Throwable) {
            builder.append(result);
        } else if (result != null) {
            builder.append("success " + result);
        } else {
            builder.append("incompleted");
        }

        return builder.toString();
    }
}

說(shuō)明:

  • notifyListeners0(List<FutureListener<V>> listeners)方法中調(diào)用listener.operationComplete(this);而this就是當(dāng)前的DefaultPromise實(shí)例。
  • 通過(guò)使用notifyingListeners屬性來(lái)實(shí)現(xiàn)監(jiān)聽器的先入先出。
  • 實(shí)現(xiàn)機(jī)制:在業(yè)務(wù)邏輯執(zhí)行前添加監(jiān)聽器addListener(FutureListener<V> listener)在執(zhí)行完業(yè)務(wù)邏輯之后,執(zhí)行setSuccess/trySuccess/setFailure/tryFailure等方法,此時(shí)會(huì)執(zhí)行notifyAll()并回調(diào)添加進(jìn)來(lái)的監(jiān)聽器。假設(shè)有線程阻塞在get()方法上時(shí),在此時(shí)會(huì)做喚醒。

注意:

  • DefaultPromise的狀態(tài)機(jī)流轉(zhuǎn)圖:見類注釋。
  • DefaultPromise可以實(shí)現(xiàn)的兩種使用機(jī)制:見類注釋。再?gòu)?qiáng)調(diào)一點(diǎn),建議使用回調(diào)方式。
  • 學(xué)習(xí)使用AtomicReferenceFieldUpdater來(lái)實(shí)現(xiàn)屬性的cas更新。
  • 學(xué)習(xí)成員變量局部化的做法:不只是防止引用的并發(fā)修改,還是優(yōu)化性能的一種方式。
  • 學(xué)習(xí)wait/notify的實(shí)現(xiàn)機(jī)制:最佳實(shí)踐見《Effective Java 中文版 第2版》的“第69條”
  • 學(xué)習(xí)線程的六種狀態(tài)與中斷對(duì)各種狀態(tài)的影響。

附、Bug記錄

  • netty的getNow有bug,我 這里 提了個(gè)issue,netty也將在4.1.28版本修復(fù)該bug。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 主要內(nèi)容 1.什么是異步 程序或系統(tǒng)中關(guān)于異步的概念使用的比較多,那么什么是異步呢?下面舉個(gè)生活中最常見的情景來(lái)進(jìn)...
    topgunviper閱讀 2,363評(píng)論 0 12
  • 譯序 本指南根據(jù) Jakob Jenkov 最新博客翻譯,請(qǐng)隨時(shí)關(guān)注博客更新:http://tutorials.j...
    高廣超閱讀 5,474評(píng)論 1 68
  • 季文子三思而后行。子聞之曰:“再,斯可矣?!比级笮?,一思再思就好了,思那么多干嘛? 是的“思”超過(guò)三次就會(huì)褪去...
    家悅hideyuki閱讀 634評(píng)論 0 0
  • 直播算時(shí)間總是算不會(huì),從癥結(jié)來(lái)看是把之前的經(jīng)驗(yàn)生搬硬套,用A的經(jīng)驗(yàn)解決B的情況。比如第四版塊時(shí)間不夠了,那前面第三...
    話木閱讀 204評(píng)論 0 0
  • 當(dāng)我每次拿起筆來(lái),準(zhǔn)備寫作的時(shí)候,腦海里也像大多數(shù)寫作者一樣,常常打幾個(gè)問(wèn)號(hào)。 我為什么喜歡寫作文?我想寫些什么?...
    鄉(xiāng)村奶奶閱讀 154評(píng)論 3 3

友情鏈接更多精彩內(nèi)容