一、核心成員變量
1. task狀態(tài)
private volatile int state;
// 任務(wù)的初始狀態(tài)都是NEW, 這一點(diǎn)是構(gòu)造函數(shù)保證的
private static final int NEW = 0;
// 任務(wù)結(jié)束,正在設(shè)置任務(wù)結(jié)果,這是任務(wù)可能正常執(zhí)行完成,可能拋異常了,可能被取消了,反正任務(wù)已經(jīng)不在跑了
private static final int COMPLETING = 1;
// 任務(wù)正常執(zhí)行完畢
private static final int NORMAL = 2;
// 任務(wù)執(zhí)行過程中發(fā)生異常
private static final int EXCEPTIONAL = 3;
// 任務(wù)被取消
private static final int CANCELLED = 4;
// 正在中斷運(yùn)行任務(wù)的線程
private static final int INTERRUPTING = 5;
// 任務(wù)被中斷
private static final int INTERRUPTED = 6;
- 總共有7種狀態(tài):包括了1個(gè)初始態(tài),2個(gè)中間態(tài)和4個(gè)終止態(tài)
- 任務(wù)的初始狀態(tài)都是NEW, 這一點(diǎn)是構(gòu)造函數(shù)保證的
- 任務(wù)的中間狀態(tài)有2種
- COMPLETING 正在設(shè)置任務(wù)結(jié)果
- INTERRUPTING 正在中斷運(yùn)行任務(wù)的線程
- 任務(wù)的終止?fàn)顟B(tài)有4種
- NORMAL:任務(wù)正常執(zhí)行完畢
- EXCEPTIONAL:任務(wù)執(zhí)行過程中發(fā)生異常
- CANCELLED:任務(wù)被取消
- INTERRUPTED:任務(wù)被中斷
- 任務(wù)的中間態(tài)并不代表任務(wù)正在執(zhí)行,而是任務(wù)已經(jīng)執(zhí)行完了,正在設(shè)置最終的返回結(jié)果,其實(shí)只要state不處于 NEW 狀態(tài),就說明任務(wù)已經(jīng)執(zhí)行完畢
- 可能的狀態(tài)變化
- NEW -> COMPLETING -> NORMAL:任務(wù)正常完成
- NEW -> COMPLETING -> EXCEPTIONAL:執(zhí)行任務(wù)時(shí)拋出了異常
- NEW -> CANCELLED: 執(zhí)行任務(wù)時(shí)被取消且mayInterruptIfRunning參數(shù)為false
- NEW -> INTERRUPTING -> INTERRUPTED:執(zhí)行任務(wù)時(shí)被取消且mayInterruptIfRunning參數(shù)為true
2. 其它
// 任務(wù)本身
private Callable<V> callable;
// 任務(wù)的結(jié)果
private Object outcome; // non-volatile, protected by state reads/writes
// 任務(wù)的執(zhí)行者,指得是線程
private volatile Thread runner;
- callable屬性代表了要執(zhí)行的任務(wù)本身,即FutureTask中的“Task”部分,為Callable類型,這里之所以用Callable而不用Runnable是因?yàn)镕utureTask實(shí)現(xiàn)了Future接口,需要獲取任務(wù)的執(zhí)行結(jié)果
- outcome屬性代表了任務(wù)的執(zhí)行結(jié)果或者拋出的異常,為Object類型,也就是說outcome可以是任意類型的對(duì)象,所以當(dāng)我們將正常的執(zhí)行結(jié)果返回給調(diào)用者時(shí),需要進(jìn)行強(qiáng)制類型轉(zhuǎn)換,返回由Callable定義的V類型
- runner屬性代表執(zhí)行任務(wù)的線程,在執(zhí)行run方法時(shí)會(huì)賦值
二、內(nèi)部類WaitNode
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
- WaitNode是個(gè)單向鏈表,用來保存所有等待任務(wù)執(zhí)行完畢的線程的集合
- WaitNode只包含了一個(gè)記錄線程的thread屬性和指向下一個(gè)節(jié)點(diǎn)的next屬性
- WaitNode是當(dāng)做Treiber棧來使用的,同一時(shí)刻可能有多個(gè)線程都在獲取任務(wù)的執(zhí)行結(jié)果,如果任務(wù)還在執(zhí)行過程中,則這些線程就要被包裝成WaitNode扔到Treiber棧的棧頂,即完成入棧操作,這樣就有可能出現(xiàn)多個(gè)線程同時(shí)入棧的情況,因此需要使用CAS操作保證入棧的線程安全,對(duì)于出棧的情況也是同理
三、重要方法
1. run方法
public void run() {
// state != NEW 表示任務(wù)已經(jīng)跑完了;
// !RUNNER.compareAndSet(this, null, Thread.currentThread())表示已經(jīng)有線程在執(zhí)行這個(gè)任務(wù)了
// 滿足上面其中一個(gè)條件就沒必要再執(zhí)行這個(gè)任務(wù)了,直接返回
if (state != NEW ||
!RUNNER.compareAndSet(this, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 任務(wù)不為空,且狀態(tài)是NEW,那么就執(zhí)行任務(wù)
// 這里又判斷了state == NEW是防止任務(wù)被外部線程中斷或者取消
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執(zhí)行任務(wù)并返回結(jié)果
result = c.call();
// 執(zhí)行完成
ran = true;
// 執(zhí)行任務(wù)時(shí)拋出了異常
} catch (Throwable ex) {
// 執(zhí)行結(jié)果為空
result = null;
// 執(zhí)行失敗
ran = false;
// 設(shè)置異常
setException(ex);
}
if (ran)
// 如果執(zhí)行成功,設(shè)置執(zhí)行結(jié)果
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 最后將執(zhí)行者置為空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果線程是中間狀態(tài)正在被打斷,將while循環(huán)至中間狀態(tài)被變成最終狀態(tài)
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 將runner屬性設(shè)置成當(dāng)前正在執(zhí)行run方法的線程
- 調(diào)用callable成員變量的call方法來執(zhí)行任務(wù)
- 設(shè)置執(zhí)行結(jié)果outcome, 如果執(zhí)行成功, 則outcome保存的就是執(zhí)行結(jié)果;如果執(zhí)行過程中發(fā)生了異常, 則outcome中保存的就是異常,設(shè)置結(jié)果之前,先將state狀態(tài)設(shè)為中間態(tài)
- 對(duì)outcome的賦值完成后,設(shè)置state狀態(tài)為終止態(tài)(NORMAL或者EXCEPTIONAL)
- 喚醒Treiber棧中所有等待的線程
- 善后清理(waiters, callable,runner設(shè)為null)
- 檢查是否有遺漏的中斷,如果有,等待中斷狀態(tài)完成
protected void set(V v) {
// 如果是NEW狀態(tài),將狀態(tài)改為COMPLETING
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 設(shè)置返回結(jié)果
outcome = v;
// 將狀態(tài)改為NORMAL
STATE.setRelease(this, NORMAL); // final state
// 主要是喚醒waiters里的線程
finishCompletion();
}
}
protected void setException(Throwable t) {
if (STATE.compareAndSet(this, NEW, COMPLETING)) {
// 這里是把異常對(duì)象賦給outcome
outcome = t;
// 將狀態(tài)變成EXCEPTIONAL
STATE.setRelease(this, EXCEPTIONAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
// 遍歷等待獲取任務(wù)執(zhí)行結(jié)果的線程
for (WaitNode q; (q = waiters) != null;) {
// 將棧頂元素先置為空
if (WAITERS.weakCompareAndSet(this, q, null)) {
for (;;) {
// 獲取等待的線程
Thread t = q.thread;
// 如果線程不為空,將線程置為空,并unpark
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 獲取下一個(gè)結(jié)點(diǎn)
WaitNode next = q.next;
// 如果下一個(gè)結(jié)點(diǎn)為空,直接退出里面的循環(huán),
// 由于隊(duì)列的當(dāng)前結(jié)點(diǎn)已經(jīng)置為空,所以外面的循環(huán)也退出了
if (next == null)
break;
// 將當(dāng)前結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)置為空
q.next = null; // unlink to help gc
// 下一個(gè)結(jié)點(diǎn)作為新的棧頂結(jié)點(diǎn)
q = next;
}
break;
}
}
// 空方法,留給子類實(shí)現(xiàn)
done();
// 任務(wù)置為空
callable = null; // to reduce footprint
}
2. get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
// 返回任務(wù)運(yùn)行的狀態(tài)
s = awaitDone(false, 0L);
// 返回任務(wù)運(yùn)行的結(jié)果
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
// 代表即將被放入棧中的結(jié)點(diǎn)
WaitNode q = null;
// 是否放入棧中,一開始為false
boolean queued = false;
for (;;) {
// 獲取任務(wù)的狀態(tài)
int s = state;
// 大于COMPLETING代表任務(wù)執(zhí)行完成
if (s > COMPLETING) {
// 如果已經(jīng)創(chuàng)建了WaitNode結(jié)點(diǎn),將等待的線程置為空,最后返回任務(wù)運(yùn)行狀態(tài)
if (q != null)
q.thread = null;
return s;
}
// 任務(wù)是COMPLETING狀態(tài),說明正在處理結(jié)果,讓出線程調(diào)度,等下一次循環(huán)再判斷狀態(tài)
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
// 如果等待的線程被打斷了,從等待隊(duì)列里刪除該隊(duì)列,并拋出異常
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 走到這,說明任務(wù)還未完成,再判斷是否有等待時(shí)間,如果等待時(shí)間到了,返回狀態(tài),如果沒到,創(chuàng)建結(jié)點(diǎn)
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
// 走到這,說明任務(wù)還是未完成,且等待結(jié)點(diǎn)已經(jīng)創(chuàng)建了,那么將結(jié)點(diǎn)放入棧中
else if (!queued)
queued = WAITERS.weakCompareAndSet(this, q.next = waiters, q);
// 如果有等待時(shí)間
else if (timed) {
final long parkNanos;
// 還沒初始化等待的一開始時(shí)間
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
// 防止虛假喚醒,重新計(jì)算park時(shí)間
} else {
long elapsed = System.nanoTime() - startTime;
// 如果等待時(shí)間到了,那么將結(jié)點(diǎn)從棧中移除
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
// park指定時(shí)長
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
// 將當(dāng)前線程阻塞
else
LockSupport.park(this);
}
}
- 如果任務(wù)已經(jīng)進(jìn)入終止態(tài)(s > COMPLETING),我們就直接返回任務(wù)的狀態(tài);
- 否則,如果任務(wù)正在設(shè)置執(zhí)行結(jié)果(s == COMPLETING),我們就讓出當(dāng)前線程的CPU資源繼續(xù)等待
- 否則,就說明任務(wù)還沒有執(zhí)行,或者任務(wù)正在執(zhí)行過程中,那么這時(shí),如果q現(xiàn)在還為null, 說明當(dāng)前線程還沒有進(jìn)入等待隊(duì)列,于是我們新建了一個(gè)WaitNode, WaitNode的構(gòu)造函數(shù)我們之前已經(jīng)看過了,就是生成了一個(gè)記錄了當(dāng)前線程的節(jié)點(diǎn);
- 如果q不為null,說明代表當(dāng)前線程的WaitNode已經(jīng)被創(chuàng)建出來了,則接下來如果queued=false,表示當(dāng)前線程還沒有入隊(duì),執(zhí)行入棧操作
- 如果以上的條件都不滿足,則再接下來因?yàn)楝F(xiàn)在是不帶超時(shí)機(jī)制的get,timed為false,則else if代碼塊跳過,然后來到最后一個(gè)else, 把當(dāng)前線程掛起,此時(shí)線程就處于阻塞等待的狀態(tài)
- 什么時(shí)候喚醒被掛起的線程
- 任務(wù)執(zhí)行完畢了,在finishCompletion方法中會(huì)喚醒所有在Treiber棧中等待的線程
- 等待的線程自身因?yàn)楸恢袛嗟仍蚨粏拘?/li>
private void removeWaiter(WaitNode node) {
if (node != null) {
// 現(xiàn)將線程置為空
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// 因?yàn)橐呀?jīng)將待刪除結(jié)點(diǎn)的thread置為空了,這里不為空說明還沒遍歷到待刪除的結(jié)點(diǎn)
if (q.thread != null)
pred = q;
// 走到這里,說明找到了待刪除結(jié)點(diǎn),且待刪除結(jié)點(diǎn)不在棧頂
else if (pred != null) {
// 將待刪除結(jié)點(diǎn)的前一個(gè)結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)指向待刪除結(jié)點(diǎn)的下一個(gè)結(jié)點(diǎn)
pred.next = s;
// removeWaiter是可以并發(fā)執(zhí)行的,如果有其它線程將pred結(jié)點(diǎn)的thread置為空,再往后遍歷已經(jīng)無意義了,所以再重頭遍歷
if (pred.thread == null) // check for race
continue retry;
}
// 說明刪除的結(jié)點(diǎn)是在棧頂,將棧頂替換成下一個(gè)結(jié)點(diǎn)
else if (!WAITERS.compareAndSet(this, q, s))
continue retry;
}
break;
}
}
}
- 傳入一個(gè)需要移除的節(jié)點(diǎn),我們會(huì)將這個(gè)節(jié)點(diǎn)的thread屬性設(shè)置成null,以標(biāo)記該節(jié)點(diǎn)
- 無論如何,會(huì)遍歷整個(gè)鏈表,清除那些被標(biāo)記的節(jié)點(diǎn)(只是簡(jiǎn)單的將節(jié)點(diǎn)從鏈表中剔除)
- 如果要清除的節(jié)點(diǎn)就位于棧頂,則還需要注意重新設(shè)置waiters的值,指向新的棧頂節(jié)點(diǎn)
- 雖說removeWaiter方法傳入了需要剔除的節(jié)點(diǎn),但是事實(shí)上它可能剔除的不止是傳入的節(jié)點(diǎn),而是所有已經(jīng)被標(biāo)記了的節(jié)點(diǎn),這樣不僅清除操作容易了些(不需要專門去定位傳入的node在哪里),而且提升了效率(可以同時(shí)清除所有已經(jīng)被標(biāo)記的節(jié)點(diǎn))
private V report(int s) throws ExecutionException {
Object x = outcome;
// 任務(wù)正常完成,返回結(jié)果值
if (s == NORMAL)
return (V)x;
// 任務(wù)未正常完成,拋出異常
if (s >= CANCELLED)
throw new CancellationException();
// 拋出異常
throw new ExecutionException((Throwable)x);
}
3. cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
// 如果任務(wù)狀態(tài)不是NEW,說明任務(wù)完成,直接返回false,無法取消
if (!(state == NEW && STATE.compareAndSet
(this, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// 如果mayInterruptIfRunning為真,打斷執(zhí)行任務(wù)的線程
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
STATE.setRelease(this, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
- 任務(wù)完成了,也就是非NEW狀態(tài),或者狀態(tài)更新失敗,那么返回false,也就是取消失敗,也就是說如果任務(wù)完成了或者任務(wù)未完成但是無法終止,就會(huì)取消失敗
- 其它情況返回true,也就是取消成功,但是cancel操作返回true并不代表任務(wù)真的就是被取消了
- 如果發(fā)起cancel時(shí)任務(wù)還沒有開始運(yùn)行,則隨后任務(wù)就不會(huì)被執(zhí)行;
- 如果發(fā)起cancel時(shí)任務(wù)已經(jīng)在運(yùn)行了,則這時(shí)就需要看mayInterruptIfRunning參數(shù)了:
- 如果mayInterruptIfRunning 為true, 則當(dāng)前在執(zhí)行的任務(wù)會(huì)被中斷
- 如果mayInterruptIfRunning 為false, 則可以允許正在執(zhí)行的任務(wù)繼續(xù)運(yùn)行,直到它執(zhí)行完
- cancel方法實(shí)際上完成以下兩種狀態(tài)轉(zhuǎn)換之一
- NEW -> CANCELLED (對(duì)應(yīng)于mayInterruptIfRunning=false),這條路徑,雖說cancel方法最終返回了true,但它只是簡(jiǎn)單的把state狀態(tài)設(shè)為CANCELLED,并不會(huì)中斷線程的執(zhí)行。但是這樣帶來的后果是,任務(wù)即使執(zhí)行完畢了,也無法設(shè)置任務(wù)的執(zhí)行結(jié)果,因?yàn)榍懊娣治鰎un方法的時(shí)候我們知道,設(shè)置任務(wù)結(jié)果有一個(gè)中間態(tài),而這個(gè)中間態(tài)的設(shè)置,是以當(dāng)前state狀態(tài)為NEW為前提的
- NEW -> INTERRUPTING -> INTERRUPTED (對(duì)應(yīng)于mayInterruptIfRunning=true)