源碼篇-FutureTask

一、核心成員變量

1. task狀態(tài)
private volatile int state;
// 任務(wù)的初始狀態(tài)都是NEW, 這一點(diǎn)是構(gòu)造函數(shù)保證的
private static final int NEW          = 0;
// 任務(wù)結(jié)束,正在設(shè)置任務(wù)結(jié)果,這是任務(wù)可能正常執(zhí)行完成,可能拋異常了,可能被取消了,反正任務(wù)已經(jīng)不在跑了
private static final int COMPLETING   = 1;
// 任務(wù)正常執(zhí)行完畢
private static final int NORMAL       = 2;
// 任務(wù)執(zhí)行過程中發(fā)生異常
private static final int EXCEPTIONAL  = 3;
// 任務(wù)被取消
private static final int CANCELLED    = 4;
// 正在中斷運(yùn)行任務(wù)的線程
private static final int INTERRUPTING = 5;
// 任務(wù)被中斷
private static final int INTERRUPTED  = 6;
  • 總共有7種狀態(tài):包括了1個(gè)初始態(tài),2個(gè)中間態(tài)和4個(gè)終止態(tài)
    • 任務(wù)的初始狀態(tài)都是NEW, 這一點(diǎn)是構(gòu)造函數(shù)保證的
    • 任務(wù)的中間狀態(tài)有2種
      • COMPLETING 正在設(shè)置任務(wù)結(jié)果
      • INTERRUPTING 正在中斷運(yùn)行任務(wù)的線程
    • 任務(wù)的終止?fàn)顟B(tài)有4種
      • NORMAL:任務(wù)正常執(zhí)行完畢
      • EXCEPTIONAL:任務(wù)執(zhí)行過程中發(fā)生異常
      • CANCELLED:任務(wù)被取消
      • INTERRUPTED:任務(wù)被中斷
  • 任務(wù)的中間態(tài)并不代表任務(wù)正在執(zhí)行,而是任務(wù)已經(jīng)執(zhí)行完了,正在設(shè)置最終的返回結(jié)果,其實(shí)只要state不處于 NEW 狀態(tài),就說明任務(wù)已經(jīng)執(zhí)行完畢
  • 可能的狀態(tài)變化
    • NEW -> COMPLETING -> NORMAL:任務(wù)正常完成
    • NEW -> COMPLETING -> EXCEPTIONAL:執(zhí)行任務(wù)時(shí)拋出了異常
    • NEW -> CANCELLED: 執(zhí)行任務(wù)時(shí)被取消且mayInterruptIfRunning參數(shù)為false
    • NEW -> INTERRUPTING -> INTERRUPTED:執(zhí)行任務(wù)時(shí)被取消且mayInterruptIfRunning參數(shù)為true
2. 其它
// 任務(wù)本身
private Callable<V> callable;

// 任務(wù)的結(jié)果
private Object outcome; // non-volatile, protected by state reads/writes

// 任務(wù)的執(zhí)行者,指得是線程
private volatile Thread runner;
  • callable屬性代表了要執(zhí)行的任務(wù)本身,即FutureTask中的“Task”部分,為Callable類型,這里之所以用Callable而不用Runnable是因?yàn)镕utureTask實(shí)現(xiàn)了Future接口,需要獲取任務(wù)的執(zhí)行結(jié)果
  • outcome屬性代表了任務(wù)的執(zhí)行結(jié)果或者拋出的異常,為Object類型,也就是說outcome可以是任意類型的對(duì)象,所以當(dāng)我們將正常的執(zhí)行結(jié)果返回給調(diào)用者時(shí),需要進(jìn)行強(qiáng)制類型轉(zhuǎn)換,返回由Callable定義的V類型
  • runner屬性代表執(zhí)行任務(wù)的線程,在執(zhí)行run方法時(shí)會(huì)賦值

二、內(nèi)部類WaitNode

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}
  • WaitNode是個(gè)單向鏈表,用來保存所有等待任務(wù)執(zhí)行完畢的線程的集合
  • WaitNode只包含了一個(gè)記錄線程的thread屬性和指向下一個(gè)節(jié)點(diǎn)的next屬性
  • WaitNode是當(dāng)做Treiber棧來使用的,同一時(shí)刻可能有多個(gè)線程都在獲取任務(wù)的執(zhí)行結(jié)果,如果任務(wù)還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個(gè)線程同時(shí)入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對(duì)于出棧的情況也是同理

三、重要方法

1. run方法
public void run() {
    // state != NEW 表示任務(wù)已經(jīng)跑完了;
     // !RUNNER.compareAndSet(this, null, Thread.currentThread())表示已經(jīng)有線程在執(zhí)行這個(gè)任務(wù)了
    // 滿足上面其中一個(gè)條件就沒必要再執(zhí)行這個(gè)任務(wù)了,直接返回
    if (state != NEW ||
        !RUNNER.compareAndSet(this, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        // 任務(wù)不為空,且狀態(tài)是NEW,那么就執(zhí)行任務(wù)
        // 這里又判斷了state == NEW是防止任務(wù)被外部線程中斷或者取消
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 執(zhí)行任務(wù)并返回結(jié)果
                result = c.call();
                // 執(zhí)行完成
                ran = true;
            // 執(zhí)行任務(wù)時(shí)拋出了異常   
            } catch (Throwable ex) {
                // 執(zhí)行結(jié)果為空
                result = null;
                // 執(zhí)行失敗
                ran = false;
                // 設(shè)置異常
                setException(ex);
            }
            if (ran)
                // 如果執(zhí)行成功,設(shè)置執(zhí)行結(jié)果
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        
        // 最后將執(zhí)行者置為空
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        // 如果線程是中間狀態(tài)正在被打斷,將while循環(huán)至中間狀態(tài)被變成最終狀態(tài)
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}
  • 將runner屬性設(shè)置成當(dāng)前正在執(zhí)行run方法的線程
  • 調(diào)用callable成員變量的call方法來執(zhí)行任務(wù)
  • 設(shè)置執(zhí)行結(jié)果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結(jié)果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設(shè)置結(jié)果之前,先將state狀態(tài)設(shè)為中間態(tài)
  • 對(duì)outcome的賦值完成后,設(shè)置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)
  • 喚醒Treiber棧中所有等待的線程
  • 善后清理(waiters, callable,runner設(shè)為null)
  • 檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成
protected void set(V v) {
    // 如果是NEW狀態(tài),將狀態(tài)改為COMPLETING
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 設(shè)置返回結(jié)果
        outcome = v;
        // 將狀態(tài)改為NORMAL
        STATE.setRelease(this, NORMAL); // final state
        // 主要是喚醒waiters里的線程
        finishCompletion();
    }
}
protected void setException(Throwable t) {
    if (STATE.compareAndSet(this, NEW, COMPLETING)) {
        // 這里是把異常對(duì)象賦給outcome
        outcome = t;
        // 將狀態(tài)變成EXCEPTIONAL
        STATE.setRelease(this, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
private void finishCompletion() {
    // assert state > COMPLETING;
    
    // 遍歷等待獲取任務(wù)執(zhí)行結(jié)果的線程
    for (WaitNode q; (q = waiters) != null;) {
        // 將棧頂元素先置為空
        if (WAITERS.weakCompareAndSet(this, q, null)) {
            for (;;) {
                // 獲取等待的線程
                Thread t = q.thread;
                // 如果線程不為空,將線程置為空,并unpark
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                // 獲取下一個(gè)結(jié)點(diǎn)
                WaitNode next = q.next;
                // 如果下一個(gè)結(jié)點(diǎn)為空,直接退出里面的循環(huán),
                // 由于隊(duì)列的當(dāng)前結(jié)點(diǎn)已經(jīng)置為空,所以外面的循環(huán)也退出了
                if (next == null)
                    break;
                // 將當(dāng)前結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)置為空
                q.next = null; // unlink to help gc
                // 下一個(gè)結(jié)點(diǎn)作為新的棧頂結(jié)點(diǎn)
                q = next;
            }
            break;
        }
    }

    // 空方法,留給子類實(shí)現(xiàn)
    done();

    // 任務(wù)置為空
    callable = null;        // to reduce footprint
}
2. get方法
public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        // 返回任務(wù)運(yùn)行的狀態(tài)
        s = awaitDone(false, 0L);
    // 返回任務(wù)運(yùn)行的結(jié)果
    return report(s);
}
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    // The code below is very delicate, to achieve these goals:
    // - call nanoTime exactly once for each call to park
    // - if nanos <= 0L, return promptly without allocation or nanoTime
    // - if nanos == Long.MIN_VALUE, don't underflow
    // - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
    //   and we suffer a spurious wakeup, we will do no worse than
    //   to park-spin for a while
    long startTime = 0L;    // Special value 0L means not yet parked
    
    // 代表即將被放入棧中的結(jié)點(diǎn)
    WaitNode q = null;
    
    // 是否放入棧中,一開始為false
    boolean queued = false;
    for (;;) {
        // 獲取任務(wù)的狀態(tài)
        int s = state;
        // 大于COMPLETING代表任務(wù)執(zhí)行完成
        if (s > COMPLETING) {
            // 如果已經(jīng)創(chuàng)建了WaitNode結(jié)點(diǎn),將等待的線程置為空,最后返回任務(wù)運(yùn)行狀態(tài)
            if (q != null)
                q.thread = null;
            return s;
        }
        // 任務(wù)是COMPLETING狀態(tài),說明正在處理結(jié)果,讓出線程調(diào)度,等下一次循環(huán)再判斷狀態(tài)
        else if (s == COMPLETING)
            // We may have already promised (via isDone) that we are done
            // so never return empty-handed or throw InterruptedException
            Thread.yield();
        // 如果等待的線程被打斷了,從等待隊(duì)列里刪除該隊(duì)列,并拋出異常
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        // 走到這,說明任務(wù)還未完成,再判斷是否有等待時(shí)間,如果等待時(shí)間到了,返回狀態(tài),如果沒到,創(chuàng)建結(jié)點(diǎn)
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        // 走到這,說明任務(wù)還是未完成,且等待結(jié)點(diǎn)已經(jīng)創(chuàng)建了,那么將結(jié)點(diǎn)放入棧中
        else if (!queued)
            queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
        // 如果有等待時(shí)間
        else if (timed) {
            final long parkNanos;
            // 還沒初始化等待的一開始時(shí)間
            if (startTime == 0L) { // first time
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            // 防止虛假喚醒,重新計(jì)算park時(shí)間
            } else {
                long elapsed = System.nanoTime() - startTime;
                // 如果等待時(shí)間到了,那么將結(jié)點(diǎn)從棧中移除
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // nanoTime may be slow; recheck before parking
            
            // park指定時(shí)長
            if (state < COMPLETING)
                LockSupport.parkNanos(this, parkNanos);
        }
        // 將當(dāng)前線程阻塞
        else
            LockSupport.park(this);
    }
}
  • 如果任務(wù)已經(jīng)進(jìn)入終止態(tài)(s > COMPLETING),我們就直接返回任務(wù)的狀態(tài);
  • 否則,如果任務(wù)正在設(shè)置執(zhí)行結(jié)果(s == COMPLETING),我們就讓出當(dāng)前線程的CPU資源繼續(xù)等待
  • 否則,就說明任務(wù)還沒有執(zhí)行,或者任務(wù)正在執(zhí)行過程中,那么這時(shí),如果q現(xiàn)在還為null, 說明當(dāng)前線程還沒有進(jìn)入等待隊(duì)列,于是我們新建了一個(gè)WaitNode, WaitNode的構(gòu)造函數(shù)我們之前已經(jīng)看過了,就是生成了一個(gè)記錄了當(dāng)前線程的節(jié)點(diǎn);
  • 如果q不為null,說明代表當(dāng)前線程的WaitNode已經(jīng)被創(chuàng)建出來了,則接下來如果queued=false,表示當(dāng)前線程還沒有入隊(duì),執(zhí)行入棧操作
  • 如果以上的條件都不滿足,則再接下來因?yàn)楝F(xiàn)在是不帶超時(shí)機(jī)制的get,timed為false,則else if代碼塊跳過,然后來到最后一個(gè)else, 把當(dāng)前線程掛起,此時(shí)線程就處于阻塞等待的狀態(tài)
  • 什么時(shí)候喚醒被掛起的線程
    • 任務(wù)執(zhí)行完畢了,在finishCompletion方法中會(huì)喚醒所有在Treiber棧中等待的線程
    • 等待的線程自身因?yàn)楸恢袛嗟仍蚨粏拘?/li>
private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 現(xiàn)將線程置為空
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next;
                // 因?yàn)橐呀?jīng)將待刪除結(jié)點(diǎn)的thread置為空了,這里不為空說明還沒遍歷到待刪除的結(jié)點(diǎn)
                if (q.thread != null)
                    pred = q;
                // 走到這里,說明找到了待刪除結(jié)點(diǎn),且待刪除結(jié)點(diǎn)不在棧頂
                else if (pred != null) {
                    // 將待刪除結(jié)點(diǎn)的前一個(gè)結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)指向待刪除結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)
                    pred.next = s;
                    // removeWaiter是可以并發(fā)執(zhí)行的,如果有其它線程將pred結(jié)點(diǎn)的thread置為空,再往后遍歷已經(jīng)無意義了,所以再重頭遍歷
                    if (pred.thread == null) // check for race
                        continue retry;
                }
                // 說明刪除的結(jié)點(diǎn)是在棧頂,將棧頂替換成下一個(gè)結(jié)點(diǎn)
                else if (!WAITERS.compareAndSet(this, q, s))
                    continue retry;
            }
            break;
        }
    }
}
  • 傳入一個(gè)需要移除的節(jié)點(diǎn),我們會(huì)將這個(gè)節(jié)點(diǎn)的thread屬性設(shè)置成null,以標(biāo)記該節(jié)點(diǎn)
  • 無論如何,會(huì)遍歷整個(gè)鏈表,清除那些被標(biāo)記的節(jié)點(diǎn)(只是簡(jiǎn)單的將節(jié)點(diǎn)從鏈表中剔除)
  • 如果要清除的節(jié)點(diǎn)就位于棧頂,則還需要注意重新設(shè)置waiters的值,指向新的棧頂節(jié)點(diǎn)
  • 雖說removeWaiter方法傳入了需要剔除的節(jié)點(diǎn),但是事實(shí)上它可能剔除的不止是傳入的節(jié)點(diǎn),而是所有已經(jīng)被標(biāo)記了的節(jié)點(diǎn),這樣不僅清除操作容易了些(不需要專門去定位傳入的node在哪里),而且提升了效率(可以同時(shí)清除所有已經(jīng)被標(biāo)記的節(jié)點(diǎn))
private V report(int s) throws ExecutionException {
    Object x = outcome;
    // 任務(wù)正常完成,返回結(jié)果值
    if (s == NORMAL)
        return (V)x;
    // 任務(wù)未正常完成,拋出異常 
    if (s >= CANCELLED)
        throw new CancellationException();
    // 拋出異常
    throw new ExecutionException((Throwable)x);
}
3. cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
    // 如果任務(wù)狀態(tài)不是NEW,說明任務(wù)完成,直接返回false,無法取消
    if (!(state == NEW && STATE.compareAndSet
          (this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        // 如果mayInterruptIfRunning為真,打斷執(zhí)行任務(wù)的線程
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                STATE.setRelease(this, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  • 任務(wù)完成了,也就是非NEW狀態(tài),或者狀態(tài)更新失敗,那么返回false,也就是取消失敗,也就是說如果任務(wù)完成了或者任務(wù)未完成但是無法終止,就會(huì)取消失敗
  • 其它情況返回true,也就是取消成功,但是cancel操作返回true并不代表任務(wù)真的就是被取消了
    • 如果發(fā)起cancel時(shí)任務(wù)還沒有開始運(yùn)行,則隨后任務(wù)就不會(huì)被執(zhí)行;
    • 如果發(fā)起cancel時(shí)任務(wù)已經(jīng)在運(yùn)行了,則這時(shí)就需要看mayInterruptIfRunning參數(shù)了:
      • 如果mayInterruptIfRunning 為true, 則當(dāng)前在執(zhí)行的任務(wù)會(huì)被中斷
      • 如果mayInterruptIfRunning 為false, 則可以允許正在執(zhí)行的任務(wù)繼續(xù)運(yùn)行,直到它執(zhí)行完
  • cancel方法實(shí)際上完成以下兩種狀態(tài)轉(zhuǎn)換之一
    • NEW -> CANCELLED (對(duì)應(yīng)于mayInterruptIfRunning=false),這條路徑,雖說cancel方法最終返回了true,但它只是簡(jiǎn)單的把state狀態(tài)設(shè)為CANCELLED,并不會(huì)中斷線程的執(zhí)行。但是這樣帶來的后果是,任務(wù)即使執(zhí)行完畢了,也無法設(shè)置任務(wù)的執(zhí)行結(jié)果,因?yàn)榍懊娣治鰎un方法的時(shí)候我們知道,設(shè)置任務(wù)結(jié)果有一個(gè)中間態(tài),而這個(gè)中間態(tài)的設(shè)置,是以當(dāng)前state狀態(tài)為NEW為前提的
    • NEW -> INTERRUPTING -> INTERRUPTED (對(duì)應(yīng)于mayInterruptIfRunning=true)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容