【細(xì)談Java并發(fā)】談?wù)凢utureTask

1、簡(jiǎn)介

FutureTask是一種異步任務(wù)(或異步計(jì)算),舉個(gè)栗子,主線程的邏輯中需要使用某個(gè)值,但這個(gè)值需要負(fù)責(zé)的運(yùn)算得來(lái),那么主線程可以提前建立一個(gè)異步任務(wù)來(lái)計(jì)算這個(gè)值(在其他的線程中計(jì)算),然后去做其他事情,當(dāng)需要這個(gè)值的時(shí)候再通過(guò)剛才建立的異步任務(wù)來(lái)獲取這個(gè)值,有點(diǎn)并行的意思,這樣可以縮短整個(gè)主線程邏輯的執(zhí)行時(shí)間。

與1.6版本不同,1.7的FutureTask不再基于AQS來(lái)構(gòu)建,而是在內(nèi)部采用簡(jiǎn)單的Treiber Stack來(lái)保存等待線程。

2、框架

我們先來(lái)看看類(lèi)圖:

image

可以看到FutureTask實(shí)現(xiàn)了Runnable接口和Future接口,因此FutureTask可以傳遞到線程對(duì)象Thread或Excutor(線程池)來(lái)執(zhí)行。

如果在當(dāng)前線程中需要執(zhí)行比較耗時(shí)的操作,但又不想阻塞當(dāng)前線程時(shí),可以把這些作業(yè)交給FutureTask,另開(kāi)一個(gè)線程在后臺(tái)完成,當(dāng)當(dāng)前線程將來(lái)需要時(shí),就可以通過(guò)FutureTask對(duì)象獲得后臺(tái)作業(yè)的計(jì)算結(jié)果或者執(zhí)行狀態(tài)。

我們來(lái)看看它的構(gòu)造方法

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

可見(jiàn),構(gòu)造一個(gè)FutureTask很簡(jiǎn)單,可以通過(guò)一個(gè)Callable來(lái)構(gòu)建,也可以通過(guò)一個(gè)Runnable和一個(gè)result來(lái)構(gòu)建。

這里要注意的是必須把state的寫(xiě)放到最后,因?yàn)閟tate本身由volatile修飾,所以可以保證callable的可見(jiàn)性。(因?yàn)楹罄m(xù)讀callable之前會(huì)先讀state,還記得這個(gè)volatile寫(xiě)讀的HB規(guī)則吧)。

接下來(lái)我們看一下它的內(nèi)部結(jié)構(gòu):

public class FutureTask<V> implements RunnableFuture<V> {  
      
    /** 
     * 內(nèi)部狀態(tài)可能得遷轉(zhuǎn)過(guò)程: 
     * NEW -> COMPLETING -> NORMAL //正常完成 
     * NEW -> COMPLETING -> EXCEPTIONAL //發(fā)生異常 
     * NEW -> CANCELLED //取消 
     * NEW -> INTERRUPTING -> INTERRUPTED //中斷 
     */  
    private volatile int state;  
    private static final int NEW          = 0;  
    private static final int COMPLETING   = 1;  
    private static final int NORMAL       = 2;  
    private static final int EXCEPTIONAL  = 3;  
    private static final int CANCELLED    = 4;  
    private static final int INTERRUPTING = 5;  
    private static final int INTERRUPTED  = 6;  
    /** 內(nèi)部的callable,運(yùn)行完成后設(shè)置為null */  
    private Callable<V> callable;  
    /** 如果正常完成,就是執(zhí)行結(jié)果,通過(guò)get方法獲取;如果發(fā)生異常,就是具體的異常對(duì)象,通過(guò)get方法拋出。 */  
    private Object outcome; // 本身沒(méi)有volatile修飾, 依賴state的讀寫(xiě)來(lái)保證可見(jiàn)性。  
    /** 執(zhí)行內(nèi)部callable的線程。 */  
    private volatile Thread runner;  
    /** 存放等待線程的Treiber Stack*/  
    private volatile WaitNode waiters;  
}

內(nèi)部結(jié)構(gòu)很明確,重點(diǎn)看下WaitNode的結(jié)構(gòu)吧:

static final class WaitNode {  
    volatile Thread thread;  
    volatile WaitNode next;  
    WaitNode() { thread = Thread.currentThread(); }  
} 

這個(gè)也很簡(jiǎn)單,就是包含了當(dāng)前線程對(duì)象,并有指向下一個(gè)WaitNode的指針,所謂的Treiber Stack就是由WaitNode組成的(一個(gè)單向鏈表)。

經(jīng)常使用FutureTask的話一定會(huì)非常熟悉它的運(yùn)行過(guò)程:

  1. 創(chuàng)建任務(wù),實(shí)際使用時(shí),一般會(huì)結(jié)合線程池(ThreadPoolExecutor)使用,所以是在線程池內(nèi)部創(chuàng)建FutureTask。
  2. 執(zhí)行任務(wù),一般會(huì)有由工作線程(對(duì)于我們當(dāng)前線程來(lái)說(shuō)的其他線程)調(diào)用FutureTask的run方法,完成執(zhí)行。
  3. 獲取結(jié)果,一般會(huì)有我們的當(dāng)前線程去調(diào)用get方法來(lái)獲取執(zhí)行結(jié)果,如果獲取時(shí),任務(wù)并沒(méi)有被執(zhí)行完畢,當(dāng)前線程就會(huì)被阻塞,直到任務(wù)被執(zhí)行完畢,然后獲取結(jié)果。
  4. 取消任務(wù),某些情況下會(huì)放棄任務(wù)的執(zhí)行,進(jìn)行任務(wù)取消。

接下來(lái)我們從源碼的角度看下執(zhí)行任務(wù)過(guò)程,也就是運(yùn)行相關(guān)方法吧 。

3、源碼分析

run()

public void run() {
    // 如果state是NEW,設(shè)置線程為當(dāng)前線程
    if (state != NEW || 
        !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 調(diào)用Callable的call方法,得到結(jié)果
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                // 處理異常狀態(tài)和結(jié)果
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                // 正常處理設(shè)置狀態(tài)和結(jié)果
                set(result);
        }
    } finally {
        // runner必須在設(shè)置了state之后再置空,避免run方法出現(xiàn)并發(fā)問(wèn)題。
        runner = null;
        // 這里還必須再讀一次state,避免丟失中斷。
        int s = state;
        if (s >= INTERRUPTING)
            // 處理可能發(fā)生的取消中斷(cancel(true))。  
            handlePossibleCancellationInterrupt(s);
    }
}

看下run過(guò)程中,正常完成后調(diào)用的set方法:

/**
 * 設(shè)置結(jié)果,狀態(tài)從 NEW 變?yōu)?COMPLETING
 * 設(shè)置返回結(jié)果為t(正常結(jié)果)
 * 改變狀態(tài)從 COMPLETING 到 NORMAL
 * 調(diào)用finishCompletion完成收尾工作
 */
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

set過(guò)程中,首先嘗試將當(dāng)前任務(wù)狀態(tài)state從NEW改為COMPLETING。如果成功的話,再設(shè)置執(zhí)行結(jié)果到outcome。然后將state再次設(shè)置為NORMAL,注意這次使用的是putOrderedInt,其實(shí)就是原子量的LazySet內(nèi)部使用的方法。為什么使用這個(gè)方法?首先LazySet相對(duì)于Volatile-Write來(lái)說(shuō)更廉價(jià),因?yàn)樗鼪](méi)有昂貴的Store/Load屏障,只有Store/Store屏障(x86下Store/Store屏障是一個(gè)空操作),其次,后續(xù)線程不會(huì)及時(shí)的看到state從COMPLETING變?yōu)镹ORMAL,但這沒(méi)什么關(guān)系,而且NORMAL是state的最終狀態(tài)之一,以后不會(huì)在變化了。

上述過(guò)程最后還調(diào)用了一個(gè)finishCompletion方法:

/**
 * 遍歷waiters的next節(jié)點(diǎn),喚醒節(jié)點(diǎn)的線程并把引用變?yōu)閚ull,等待GC
 */
private void finishCompletion() {
    for (WaitNode q; (q = waiters) != null;) {
        // 嘗試將waiters設(shè)置為null。  
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 然后將waiters中的等待線程全部喚醒。  
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);  // 喚醒線程
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
    // 回調(diào)下鉤子方法。  
    done();
    // 置空callable,減少內(nèi)存占用  
    callable = null; 
}

可見(jiàn),finishCompletion主要就是在任務(wù)執(zhí)行完畢后,移除Treiber Stack,并將Treiber Stack中所有等待獲取任務(wù)結(jié)果的線程喚醒,然后回調(diào)下done鉤子方法。

看完了set,再看下run過(guò)程中如果發(fā)生異常,調(diào)用的setException方法:

/**
 * 發(fā)生異常,狀態(tài)從 NEW 變?yōu)?COMPLETING
 * 設(shè)置返回結(jié)果為t(異常結(jié)果)
 * 改變狀態(tài)從 COMPLETING 到 EXCEPTIONAL
 * 調(diào)用finishCompletion完成收尾工作
 */
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}

和set方法一個(gè)套路。

最后看下run過(guò)程中最后調(diào)用的handlePossibleCancellationInterrupt方法:

/**
 * 確保cancel(true)產(chǎn)生的中斷發(fā)生在run或runAndReset方法過(guò)程中。 
 */
private void handlePossibleCancellationInterrupt(int s) {
    // 如果當(dāng)前正在中斷過(guò)程中,自旋等待一下,等中斷完成。 
    if (s == INTERRUPTING)
        while (state == INTERRUPTING)
            Thread.yield(); // wait out pending interrupt
    // 這里的state狀態(tài)一定是INTERRUPTED;  
    // 這里不能清除中斷標(biāo)記,因?yàn)闆](méi)辦法區(qū)分來(lái)自cancel(true)的中斷。  
    // Thread.interrupted();  
}

這里總結(jié)一下run方法:

  1. 只有state為NEW的時(shí)候才執(zhí)行任務(wù)(調(diào)用內(nèi)部callable的run方法)。執(zhí)行前會(huì)原子的設(shè)置執(zhí)行線程(runner),防止競(jìng)爭(zhēng)。
  2. 如果任務(wù)執(zhí)行成功,設(shè)置執(zhí)行結(jié)果,狀態(tài)變更:NEW -> COMPLETING -> NORMAL。
  3. 如果任務(wù)執(zhí)行發(fā)生異常,設(shè)置異常結(jié)果,狀態(tài)變更:NEW -> COMPLETING -> EXCEPTIONAL。
  4. 將Treiber Stack中等待當(dāng)前任務(wù)執(zhí)行結(jié)果的等待節(jié)點(diǎn)中的線程全部喚醒,同時(shí)刪除這些等待節(jié)點(diǎn),將整個(gè)Treiber Stack置空。
  5. 最后別忘了等一下可能發(fā)生的cancel(true)中引起的中斷,讓這些中斷發(fā)生在執(zhí)行任務(wù)過(guò)程中(別泄露出去)。

runAndReset()

protected boolean runAndReset() {
    if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                    null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

該方法和run方法的區(qū)別是,run方法只能被運(yùn)行一次任務(wù),而該方法可以多次運(yùn)行任務(wù)。而runAndReset這個(gè)方法不會(huì)設(shè)置任務(wù)的執(zhí)行結(jié)果值,如果該任務(wù)成功執(zhí)行完成后,不修改state的狀態(tài),還是可運(yùn)行(NEW)狀態(tài),如果取消任務(wù)或出現(xiàn)異常,則不會(huì)再次執(zhí)行。

get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);   // 如果任務(wù)還沒(méi)執(zhí)行完畢,等待任務(wù)執(zhí)行完畢。  
    return report(s);   // 如果任務(wù)執(zhí)行完畢,獲取執(zhí)行結(jié)果。  
}

看下awaitDone方法

private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
    // 先算出到期時(shí)間。  
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            // 如果當(dāng)前線程被中斷,移除等待節(jié)點(diǎn)q,然后拋出中斷異常。
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            // 如果任務(wù)已經(jīng)執(zhí)行完畢  
            if (q != null)
                q.thread = null;    // 如果q不為null,將q中的thread置空。
            return s;   // 返回任務(wù)狀態(tài)。 
        }
        else if (s == COMPLETING) // cannot time out yet
            Thread.yield(); // 如果當(dāng)前正在完成過(guò)程中,出讓CPU。
        else if (q == null)
            q = new WaitNode(); // 創(chuàng)建一個(gè)等待節(jié)點(diǎn)。
        else if (!queued)
            // 將q(包含當(dāng)前線程的等待節(jié)點(diǎn))入隊(duì)。  
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                    q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                //如果超時(shí),移除等待節(jié)點(diǎn)q
                removeWaiter(q);
                //返回任務(wù)狀態(tài)。  
                return state;
            }
            //超時(shí)的話,就阻塞給定時(shí)間。  
            LockSupport.parkNanos(this, nanos);
        }
        else
            //沒(méi)設(shè)置超時(shí)的話,就阻塞當(dāng)前線程。
            LockSupport.park(this);
    }
}

再看下awaitDone方法中調(diào)用的removeWaiter:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        //將node的thread域置空。 
        node.thread = null;
        //下面過(guò)程中會(huì)將node從等待隊(duì)列中移除,以thread域?yàn)閚ull為依據(jù),  
        //如果過(guò)程中發(fā)生了競(jìng)爭(zhēng),重試。  
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                if (q.thread != null)
                    pred = q;
                else if (pred != null) {
                    pred.next = s;
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                        q, s))
                    continue retry;
            }
            break;
        }
    }
}

再看下get方法中獲取結(jié)果時(shí)調(diào)用的report:

private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

report如果是正常狀態(tài),就返回結(jié)果。否則拋出異常。

看完了get方法,再看下get(long timeout, TimeUnit unit)方法:

public V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
            (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

小結(jié)一下get方法:

  1. 首先檢查當(dāng)前任務(wù)的狀態(tài),如果狀態(tài)表示執(zhí)行完成,進(jìn)入第2步。
  2. 獲取執(zhí)行結(jié)果,也可能得到取消或者執(zhí)行異常,get過(guò)程結(jié)束。
  3. 如果當(dāng)前任務(wù)狀態(tài)表示未執(zhí)行或者正在執(zhí)行,那么當(dāng)前線程放入一個(gè)新建的等待節(jié)點(diǎn),然后進(jìn)入Treiber Stack進(jìn)行阻塞等待。
  4. 如果任務(wù)被工作線程(對(duì)當(dāng)前線程來(lái)說(shuō)是其他線程)執(zhí)行完畢,執(zhí)行完畢時(shí)工作線程會(huì)喚醒Treiber Stack上等待的所有線程,所以當(dāng)前線程被喚醒,清空當(dāng)前等待節(jié)點(diǎn)上的線程域,然后進(jìn)入第2步。
  5. 當(dāng)前線程在阻塞等待結(jié)果過(guò)程中可能被中斷,如果被中斷,那么會(huì)移除當(dāng)前線程在Treiber Stack上對(duì)應(yīng)的等待節(jié)點(diǎn),然后拋出中斷異常,get過(guò)程結(jié)束。
  6. 當(dāng)前線程也可能執(zhí)行帶有超時(shí)時(shí)間的阻塞等待,如果超時(shí)時(shí)間過(guò)了,還沒(méi)得到執(zhí)行結(jié)果,那么會(huì)除當(dāng)前線程在Treiber Stack上對(duì)應(yīng)的等待節(jié)點(diǎn),然后拋出超時(shí)異常,get過(guò)程結(jié)束。

cancel(boolean)

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
            UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                    mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {   
        // mayInterruptIfRunning并且有正在運(yùn)行的線程,調(diào)用interrupt中斷,最后設(shè)置狀態(tài)為INTERRUPTED
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}

cancel分為兩種情況:

  1. mayInterruptIfRunning == true。這個(gè)時(shí)候狀態(tài)從 NEW 變?yōu)?INTERRUPTING ,如果有正在運(yùn)行的線程,調(diào)用interrupt中斷,最后把狀態(tài)從 INTERRUPTING 變?yōu)?INTERRUPTED。
  2. mayInterruptIfRunning == false。這個(gè)時(shí)候狀態(tài)從 NEW 變?yōu)?CANCELLED。
  3. 最后都會(huì)執(zhí)行finishCompletion方法,完成結(jié)束的收尾工作。喚醒所有在get()方法等待的線程。

4、jdk1.6不同的地方

為什么jdk 1.6以后的FutureTask不像1.6那樣基于AQS構(gòu)建了?

首先,前面貼代碼了時(shí)候故意去掉了一些注釋,避免讀代碼的時(shí)候受影響,現(xiàn)在我們來(lái)看一下關(guān)鍵的一段:

/* 
 * Revision notes: This differs from previous versions of this 
 * class that relied on AbstractQueuedSynchronizer, mainly to 
 * avoid surprising users about retaining interrupt status during 
 * cancellation races.  
 */  

主要是這句:mainly to avoid surprising users about retaining interrupt status during cancellation races。

大概意思是:使用AQS的方式,可能會(huì)在取消發(fā)生競(jìng)爭(zhēng)過(guò)程中詭異的保留了中斷狀態(tài)。這里之所以沒(méi)有采用這種方式,是為了避免這種情況的發(fā)生。

具體什么情況下會(huì)發(fā)生呢?

ThreadPoolExecutor executor = ...;  
executor.submit(task1).cancel(true);  
executor.submit(task2);  

看上面的代碼,雖然中斷的是task1,但可能task2得到中斷信號(hào)。

原因是什么呢?看下JDK1.6的FutureTask的中斷代碼:

boolean innerCancel(boolean mayInterruptIfRunning) {  
 for (;;) {  
  int s = getState();  
  if (ranOrCancelled(s))  
      return false;  
  if (compareAndSetState(s, CANCELLED))  
      break;  
 }  
    if (mayInterruptIfRunning) {  
        Thread r = runner;  
        if (r != null)  //第1行  
            r.interrupt(); //第2行  
    }  
    releaseShared(0);  
    done();  
    return true;  
}  

結(jié)合上面代碼例子看一下,如果主線程執(zhí)行到第1行的時(shí)候,線程池可能會(huì)認(rèn)為task1已經(jīng)執(zhí)行結(jié)束(被取消),然后讓之前執(zhí)行task1工作線程去執(zhí)行task2,工作線程開(kāi)始執(zhí)行task2之后,然后主線程執(zhí)行第2行(我們會(huì)發(fā)現(xiàn)并沒(méi)有任何同步機(jī)制來(lái)阻止這種情況的發(fā)生),這樣就會(huì)導(dǎo)致task2被中斷了。更多的相關(guān)信息參考這個(gè)Bug說(shuō)明

所以現(xiàn)在就能更好的理解JDK1.7 FutureTask的handlePossibleCancellationInterrupt中為什么要將cancel(true)中的中斷保留在當(dāng)前run方法運(yùn)行范圍內(nèi)了吧!

JDK1.7的FutureTask的代碼解析完畢!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容