多線程-源碼解析RunnableFuture

/**
 * A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.
 * @see FutureTask
 * @see Executor
 * @since 1.6
 * @author Doug Lea
 * @param <V> The result type returned by this Future's {@code get} method
 */
public interface RunnableFuture<V> extends Runnable, Future<V> {
    /**
     * Sets this Future to the result of its computation
     * unless it has been cancelled.
     */
    void run();
}

A {@link Future} that is {@link Runnable}. Successful execution of
 * the {@code run} method causes completion of the {@code Future}
 * and allows access to its results.

譯文:可運行的Future。 成功執(zhí)行run方法會導(dǎo)致Future的完成,并允許訪問其結(jié)果。

從源碼可知,RunnableFuture繼承了Future和Runnable,所以其就具有了可以運行在線程中,能夠取消并且能夠異步獲取到運行結(jié)果。接下來學(xué)習(xí)一下它的一個具體實現(xiàn)類FutureTask

概論

/**
 * A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).
 *
 * <p>A {@code FutureTask} can be used to wrap a {@link Callable} or
 * {@link Runnable} object.  Because {@code FutureTask} implements
 * {@code Runnable}, a {@code FutureTask} can be submitted to an
 * {@link Executor} for execution.
 *
 * <p>In addition to serving as a standalone class, this class provides
 * {@code protected} functionality that may be useful when creating
 * customized task classes.
 *
 * @since 1.5
 * @author Doug Lea
 * @param <V> The result type returned by this FutureTask's {@code get} methods
 */
public class FutureTask<V> implements RunnableFuture<V>
* A cancellable asynchronous computation.  This class provides a base
 * implementation of {@link Future}, with methods to start and cancel
 * a computation, query to see if the computation is complete, and
 * retrieve the result of the computation.  The result can only be
 * retrieved when the computation has completed; the {@code get}
 * methods will block if the computation has not yet completed.  Once
 * the computation has completed, the computation cannot be restarted
 * or cancelled (unless the computation is invoked using
 * {@link #runAndReset}).

譯文:可取消的異步計算。 此類提供Future的基本實現(xiàn),其中包含啟動和取消計算,查詢計算是否完成以及返回計算結(jié)果的方法。 只有在計算完成后才能返回結(jié)果; 如果計算尚未完成,則get方法將阻塞。 一旦計算完成,除非使用runAndReset調(diào)用計算,否則無法重新啟動或取消計算。

FutureTask主要成員

public class FutureTask<V> implements RunnableFuture<V> {
     /*
     * FutureTask中定義了一個state變量,用于記錄任務(wù)執(zhí)行的相關(guān)狀態(tài) ,狀態(tài)的變化過程如下
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    //主流程狀態(tài)
    private static final int NEW = 0; //當(dāng)FutureTask實例剛剛創(chuàng)建到callbale的call方法執(zhí)行完成前,處于此狀態(tài)
    private static final int COMPLETING  = 1; //callable的call方法執(zhí)行完成或出現(xiàn)異常時,首先進行此狀態(tài)
    private static final int NORMAL    = 2;//callable的call方法正常結(jié)束時,進入此狀態(tài),將outcom設(shè)置為正常結(jié)果
    private static final int EXCEPTIONAL = 3;//callable的call方法異常結(jié)束時,進入此狀態(tài),將outcome設(shè)置為拋出的異常
    //取消任務(wù)執(zhí)行時可能處于的狀態(tài)
    private static final int CANCELLED= 4;// FutureTask任務(wù)尚未執(zhí)行,即還在任務(wù)隊列的時候,調(diào)用了cancel方法,進入此狀態(tài)
    private static final int INTERRUPTING = 5;// FutureTask的run方法已經(jīng)在執(zhí)行,收到中斷信號,進入此狀態(tài)
    private static final int INTERRUPTED  = 6;// 任務(wù)成功中斷后,進入此狀態(tài)
    
    private Callable<V> callable;//需要執(zhí)行的任務(wù),提示:如果提交的是Runnable對象,會先轉(zhuǎn)換為Callable對象,這是構(gòu)造方法參數(shù)
    private Object outcome; //任務(wù)運行的結(jié)果
    private volatile Thread runner;//執(zhí)行此任務(wù)的線程
  
    //等待該FutureTask的線程鏈表,對于同一個FutureTask,如果多個線程調(diào)用了get方法,對應(yīng)的線程都會加入到waiters鏈表中,同時當(dāng)FutureTask執(zhí)行完成后,也會告知所有waiters中的線程
    private volatile WaitNode waiters;
    ......
}

FutureTask的成員變量并不復(fù)雜,主要記錄以下幾部分信息:
1、狀態(tài)
2、任務(wù)(callable)
3、結(jié)果(outcome)
4、等待線程(waiters)

構(gòu)造方法

Future 是一個接口,而 FutureTask 是一個實實在在的工具類,這個工具類有兩個構(gòu)造函數(shù),它們的參數(shù)和前面介紹的 submit() 方法類似。


FutureTask(Callable<V> callable);
FutureTask(Runnable runnable, V result);

FutureTask的方法

run()方法

FutureTask執(zhí)行任務(wù)的方法當(dāng)然還是run方法:

public void run() {
        if (state != NEW ||
            !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //執(zhí)行任務(wù)
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //設(shè)置結(jié)果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
               //判斷該任務(wù)是否正在響應(yīng)中斷,如果中斷沒有完成,則等待中斷操作完成
                handlePossibleCancellationInterrupt(s);
        }
    }

run方法的大概邏輯如下:
1、如果狀態(tài)不為new或者運行線程runner失敗,說明當(dāng)前任務(wù)已經(jīng)被其他線程啟動或者已經(jīng)被執(zhí)行過,直接返回false
2、調(diào)用call方法執(zhí)行核心任務(wù)邏輯。如果調(diào)用成功則執(zhí)行set(result)方法,將state狀態(tài)設(shè)置成NORMAL。如果調(diào)用失敗拋出異常則執(zhí)行setException(ex)方法,將state狀態(tài)設(shè)置成EXCEPTIONAL,喚醒所有在get()方法上等待的線程
3、如果當(dāng)前狀態(tài)為INTERRUPTING(步驟2已CAS失敗),則一直調(diào)用Thread.yield()直至狀態(tài)不為INTERRUPTING

set()方法

除非已經(jīng)設(shè)置或取消了該Future,否則將此Future的結(jié)果設(shè)置為給定值

/**
     * Sets the result of this future to the given value unless
     * this future has already been set or has been cancelled.
     *
     * <p>This method is invoked internally by the {@link #run} method
     * upon successful completion of the computation.
     *
     * @param v the value
     */
    protected void set(V v) {
        if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
            outcome = v;
            U.putOrderedInt(this, STATE, NORMAL); // final state
            finishCompletion();
        }
    }

finishCompletion()

刪除并發(fā)送所有等待線程的信號,調(diào)用done(),并使callable無效。

 /**
     * Removes and signals all waiting threads, invokes done(), and
     * nulls out callable.
     */
    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
             ////通過CAS把棧頂?shù)脑刂脼閚ull,相當(dāng)于彈出棧頂元素
            if (U.compareAndSwapObject(this, WAITERS, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                          //喚醒等待的線程
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

finishCompletion的邏輯也比較簡單:
1、遍歷waiters鏈表,取出每一個節(jié)點:每個節(jié)點都代表一個正在等待該FutureTask結(jié)果(即調(diào)用過get方法)的線程
2、通過 LockSupport.unpark(t)喚醒每一個節(jié)點,通知每個線程,該任務(wù)執(zhí)行完成

get()方法

get方法很簡單,主要就是調(diào)用awaitDone方法:

/**
     * @throws CancellationException {@inheritDoc}
     */
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

    /**
     * @throws CancellationException {@inheritDoc}
     */
    public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (unit == null)
            throw new NullPointerException();
        int s = state;
        if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
            throw new TimeoutException();
        return report(s);
    }

awaitDone()

/**
     * Awaits completion or aborts on interrupt or timeout.
     *
     * @param timed true if use timed waits
     * @param nanos time to wait, if timed
     * @return state upon completion or at timeout
     */
    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        // The code below is very delicate, to achieve these goals:
        // - call nanoTime exactly once for each call to park
        // - if nanos <= 0L, return promptly without allocation or nanoTime
        // - if nanos == Long.MIN_VALUE, don't underflow
        // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
        //   and we suffer a spurious wakeup, we will do no worse than
        //   to park-spin for a while
        long startTime = 0L;    // Special value 0L means not yet parked
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            int s = state;
            if (s > COMPLETING) { //如果state狀態(tài)大于COMPLETING 則說明任務(wù)執(zhí)行完成,或取消
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING)//如果state=COMPLETING,則使用yield,因為此狀態(tài)的時間特別短,通過yield比掛起響應(yīng)更快。
                // We may have already promised (via isDone) that we are done
                // so never return empty-handed or throw InterruptedException
                Thread.yield();
            else if (Thread.interrupted()) { //如果該線程執(zhí)行interrupt()方法,則從隊列中移除該節(jié)點,并拋出異常
                removeWaiter(q);
                throw new InterruptedException();
            }
            else if (q == null) { //構(gòu)建節(jié)點
                if (timed && nanos <= 0L)
                    return s;
                q = new WaitNode();
            }
            else if (!queued)//把當(dāng)前節(jié)點入棧
                queued = U.compareAndSwapObject(this, WAITERS,
                                                q.next = waiters, q);
             //如果需要阻塞指定時間,則使用LockSupport.parkNanos阻塞指定時間
            //如果到指定時間還沒執(zhí)行完,則從隊列中移除該節(jié)點,并返回當(dāng)前狀態(tài)
            else if (timed) {
                final long parkNanos;
                if (startTime == 0L) { // first time
                    startTime = System.nanoTime();
                    if (startTime == 0L)
                        startTime = 1L;
                    parkNanos = nanos;
                } else {
                    long elapsed = System.nanoTime() - startTime;
                    if (elapsed >= nanos) {
                        removeWaiter(q);
                        return state;
                    }
                    parkNanos = nanos - elapsed;
                }
                // nanoTime may be slow; recheck before parking
                if (state < COMPLETING)
                    LockSupport.parkNanos(this, parkNanos);//阻塞當(dāng)前線程nanos秒
            }
            else
                LockSupport.park(this);//阻塞當(dāng)前線程
        }
    }

整個方法的大致邏輯主要分為以下幾步:
1>如果當(dāng)前狀態(tài)值大于COMPLETING,說明已經(jīng)執(zhí)行完成或者取消,直接返回
2>如果state=COMPLETING,則使用yield,因為此狀態(tài)的時間特別短,通過yield比掛起響應(yīng)更快
3>如果當(dāng)前線程是首次進入循環(huán),為當(dāng)前線程創(chuàng)建wait節(jié)點加入到waiters鏈表中
4>根據(jù)是否定時將當(dāng)前線程掛起(LockSupport.parkNanos LockSupport.park)來阻塞當(dāng)前線程,直到超時或者線程被finishCompletion方法喚醒
5>當(dāng)線程掛起超時或者被喚醒后,重新循環(huán)執(zhí)行上述邏輯

cancel()

 public boolean cancel(boolean mayInterruptIfRunning) {
        //根據(jù)mayInterruptIfRunning是否為true,CAS設(shè)置狀態(tài)為INTERRUPTING或CANCELLED,設(shè)置成功,繼續(xù)第二步,否則返回false
        if (!(state == NEW &&
              U.compareAndSwapInt(this, STATE, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {//如果mayInterruptIfRunning為true,調(diào)用runner.interupt(),設(shè)置狀態(tài)為INTERRUPTED
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    U.putOrderedInt(this, STATE, INTERRUPTED);
                }
            }
        } finally {
            //喚醒所有在get()方法等待的線程
            finishCompletion();
        }
        return true;
    }
public class FutureTaskDemo {
    public static void main(String[] args) {
        FutureTask<String> futureTask=new FutureTask<>(new Runnable(){

            @Override
            public void run() {
                try {
                    Thread.sleep(20*1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },"Hello world");
        Thread thread = new Thread(futureTask,"Thread Future");
        thread.start();
        try {
            String result=futureTask.get();
            System.out.println(result);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }

}

運行時線程堆棧信息

D:\android\progect\IdeaDemo>jps
10624 Launcher
1296 FutureTaskDemo
5360 Jps
12872

D:\android\progect\IdeaDemo>jstack 1296
2021-05-13 00:03:51
Full thread dump Java HotSpot(TM) Client VM (25.202-b08 mixed mode, sharing):

"Thread Future" #9 prio=5 os_prio=0 tid=0x15a1d800 nid=0x2040 waiting on condition [0x15cef000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
        at java.lang.Thread.sleep(Native Method)
        at com.idea.future.FutureTaskDemo$1.run(FutureTaskDemo.java:13)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:748)
"main" #1 prio=5 os_prio=0 tid=0x02a0dc00 nid=0x18a8 waiting on condition [0x00d4f000]
   java.lang.Thread.State: WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x04cfd7e8> (a java.util.concurrent.FutureTask)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)
        at java.util.concurrent.FutureTask.awaitDone(FutureTask.java:429)
        at java.util.concurrent.FutureTask.get(FutureTask.java:191)
        at com.idea.future.FutureTaskDemo.main(FutureTaskDemo.java:22)

由于我在run方法里面設(shè)置了相當(dāng)于耗時的操作,從堆棧信息可以得知,"Thread Future"的狀態(tài)是TIMED_WAITING,而主線程"main"則處于WAITING的狀態(tài)

實現(xiàn)了 Runnable 接口,所以可以將 FutureTask 對象作為任務(wù)提交給 ThreadPoolExecutor 去執(zhí)行,也可以直接被 Thread 執(zhí)行;

又因為實現(xiàn)了 Future 接口,所以也能用來獲得任務(wù)的執(zhí)行結(jié)果。

提交給 ThreadPoolExecutor 去執(zhí)行

下面的示例代碼是將 FutureTask 對象提交給 ThreadPoolExecutor 去執(zhí)行。


// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 創(chuàng)建線程池
ExecutorService es = 
  Executors.newCachedThreadPool();
// 提交FutureTask 
es.submit(futureTask);
// 獲取計算結(jié)果
Integer result = futureTask.get();

直接被 Thread 執(zhí)行

FutureTask 對象直接被 Thread 執(zhí)行的示例代碼如下所示。相信你已經(jīng)發(fā)現(xiàn)了,利用 FutureTask 對象可以很容易獲取子線程的執(zhí)行結(jié)果。


// 創(chuàng)建FutureTask
FutureTask<Integer> futureTask
  = new FutureTask<>(()-> 1+2);
// 創(chuàng)建并啟動線程
Thread T1 = new Thread(futureTask);
T1.start();
// 獲取計算結(jié)果
Integer result = futureTask.get();

總結(jié)

利用 Java 并發(fā)包提供的 Future 可以很容易獲得異步任務(wù)的執(zhí)行結(jié)果,無論異步任務(wù)是通過線程池 ThreadPoolExecutor 執(zhí)行的,還是通過手工創(chuàng)建子線程來執(zhí)行的。

利用多線程可以快速將一些串行的任務(wù)并行化,從而提高性能;如果任務(wù)之間有依賴關(guān)系,比如當(dāng)前任務(wù)依賴前一個任務(wù)的執(zhí)行結(jié)果,這種問題基本上都可以用 Future 來解決。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容