1、簡(jiǎn)介
FutureTask是一種異步任務(wù)(或異步計(jì)算),舉個(gè)栗子,主線程的邏輯中需要使用某個(gè)值,但這個(gè)值需要負(fù)責(zé)的運(yùn)算得來(lái),那么主線程可以提前建立一個(gè)異步任務(wù)來(lái)計(jì)算這個(gè)值(在其他的線程中計(jì)算),然后去做其他事情,當(dāng)需要這個(gè)值的時(shí)候再通過(guò)剛才建立的異步任務(wù)來(lái)獲取這個(gè)值,有點(diǎn)并行的意思,這樣可以縮短整個(gè)主線程邏輯的執(zhí)行時(shí)間。
與1.6版本不同,1.7的FutureTask不再基于AQS來(lái)構(gòu)建,而是在內(nèi)部采用簡(jiǎn)單的Treiber Stack來(lái)保存等待線程。
2、框架
我們先來(lái)看看類(lèi)圖:

可以看到FutureTask實(shí)現(xiàn)了Runnable接口和Future接口,因此FutureTask可以傳遞到線程對(duì)象Thread或Excutor(線程池)來(lái)執(zhí)行。
如果在當(dāng)前線程中需要執(zhí)行比較耗時(shí)的操作,但又不想阻塞當(dāng)前線程時(shí),可以把這些作業(yè)交給FutureTask,另開(kāi)一個(gè)線程在后臺(tái)完成,當(dāng)當(dāng)前線程將來(lái)需要時(shí),就可以通過(guò)FutureTask對(duì)象獲得后臺(tái)作業(yè)的計(jì)算結(jié)果或者執(zhí)行狀態(tài)。
我們來(lái)看看它的構(gòu)造方法
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可見(jiàn),構(gòu)造一個(gè)FutureTask很簡(jiǎn)單,可以通過(guò)一個(gè)Callable來(lái)構(gòu)建,也可以通過(guò)一個(gè)Runnable和一個(gè)result來(lái)構(gòu)建。
這里要注意的是必須把state的寫(xiě)放到最后,因?yàn)閟tate本身由volatile修飾,所以可以保證callable的可見(jiàn)性。(因?yàn)楹罄m(xù)讀callable之前會(huì)先讀state,還記得這個(gè)volatile寫(xiě)讀的HB規(guī)則吧)。
接下來(lái)我們看一下它的內(nèi)部結(jié)構(gòu):
public class FutureTask<V> implements RunnableFuture<V> {
/**
* 內(nèi)部狀態(tài)可能得遷轉(zhuǎn)過(guò)程:
* NEW -> COMPLETING -> NORMAL //正常完成
* NEW -> COMPLETING -> EXCEPTIONAL //發(fā)生異常
* NEW -> CANCELLED //取消
* NEW -> INTERRUPTING -> INTERRUPTED //中斷
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** 內(nèi)部的callable,運(yùn)行完成后設(shè)置為null */
private Callable<V> callable;
/** 如果正常完成,就是執(zhí)行結(jié)果,通過(guò)get方法獲取;如果發(fā)生異常,就是具體的異常對(duì)象,通過(guò)get方法拋出。 */
private Object outcome; // 本身沒(méi)有volatile修飾, 依賴state的讀寫(xiě)來(lái)保證可見(jiàn)性。
/** 執(zhí)行內(nèi)部callable的線程。 */
private volatile Thread runner;
/** 存放等待線程的Treiber Stack*/
private volatile WaitNode waiters;
}
內(nèi)部結(jié)構(gòu)很明確,重點(diǎn)看下WaitNode的結(jié)構(gòu)吧:
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
這個(gè)也很簡(jiǎn)單,就是包含了當(dāng)前線程對(duì)象,并有指向下一個(gè)WaitNode的指針,所謂的Treiber Stack就是由WaitNode組成的(一個(gè)單向鏈表)。
經(jīng)常使用FutureTask的話一定會(huì)非常熟悉它的運(yùn)行過(guò)程:
- 創(chuàng)建任務(wù),實(shí)際使用時(shí),一般會(huì)結(jié)合線程池(ThreadPoolExecutor)使用,所以是在線程池內(nèi)部創(chuàng)建FutureTask。
- 執(zhí)行任務(wù),一般會(huì)有由工作線程(對(duì)于我們當(dāng)前線程來(lái)說(shuō)的其他線程)調(diào)用FutureTask的run方法,完成執(zhí)行。
- 獲取結(jié)果,一般會(huì)有我們的當(dāng)前線程去調(diào)用get方法來(lái)獲取執(zhí)行結(jié)果,如果獲取時(shí),任務(wù)并沒(méi)有被執(zhí)行完畢,當(dāng)前線程就會(huì)被阻塞,直到任務(wù)被執(zhí)行完畢,然后獲取結(jié)果。
- 取消任務(wù),某些情況下會(huì)放棄任務(wù)的執(zhí)行,進(jìn)行任務(wù)取消。
接下來(lái)我們從源碼的角度看下執(zhí)行任務(wù)過(guò)程,也就是運(yùn)行相關(guān)方法吧 。
3、源碼分析
run()
public void run() {
// 如果state是NEW,設(shè)置線程為當(dāng)前線程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 調(diào)用Callable的call方法,得到結(jié)果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 處理異常狀態(tài)和結(jié)果
result = null;
ran = false;
setException(ex);
}
if (ran)
// 正常處理設(shè)置狀態(tài)和結(jié)果
set(result);
}
} finally {
// runner必須在設(shè)置了state之后再置空,避免run方法出現(xiàn)并發(fā)問(wèn)題。
runner = null;
// 這里還必須再讀一次state,避免丟失中斷。
int s = state;
if (s >= INTERRUPTING)
// 處理可能發(fā)生的取消中斷(cancel(true))。
handlePossibleCancellationInterrupt(s);
}
}
看下run過(guò)程中,正常完成后調(diào)用的set方法:
/**
* 設(shè)置結(jié)果,狀態(tài)從 NEW 變?yōu)?COMPLETING
* 設(shè)置返回結(jié)果為t(正常結(jié)果)
* 改變狀態(tài)從 COMPLETING 到 NORMAL
* 調(diào)用finishCompletion完成收尾工作
*/
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
set過(guò)程中,首先嘗試將當(dāng)前任務(wù)狀態(tài)state從NEW改為COMPLETING。如果成功的話,再設(shè)置執(zhí)行結(jié)果到outcome。然后將state再次設(shè)置為NORMAL,注意這次使用的是putOrderedInt,其實(shí)就是原子量的LazySet內(nèi)部使用的方法。為什么使用這個(gè)方法?首先LazySet相對(duì)于Volatile-Write來(lái)說(shuō)更廉價(jià),因?yàn)樗鼪](méi)有昂貴的Store/Load屏障,只有Store/Store屏障(x86下Store/Store屏障是一個(gè)空操作),其次,后續(xù)線程不會(huì)及時(shí)的看到state從COMPLETING變?yōu)镹ORMAL,但這沒(méi)什么關(guān)系,而且NORMAL是state的最終狀態(tài)之一,以后不會(huì)在變化了。
上述過(guò)程最后還調(diào)用了一個(gè)finishCompletion方法:
/**
* 遍歷waiters的next節(jié)點(diǎn),喚醒節(jié)點(diǎn)的線程并把引用變?yōu)閚ull,等待GC
*/
private void finishCompletion() {
for (WaitNode q; (q = waiters) != null;) {
// 嘗試將waiters設(shè)置為null。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 然后將waiters中的等待線程全部喚醒。
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t); // 喚醒線程
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 回調(diào)下鉤子方法。
done();
// 置空callable,減少內(nèi)存占用
callable = null;
}
可見(jiàn),finishCompletion主要就是在任務(wù)執(zhí)行完畢后,移除Treiber Stack,并將Treiber Stack中所有等待獲取任務(wù)結(jié)果的線程喚醒,然后回調(diào)下done鉤子方法。
看完了set,再看下run過(guò)程中如果發(fā)生異常,調(diào)用的setException方法:
/**
* 發(fā)生異常,狀態(tài)從 NEW 變?yōu)?COMPLETING
* 設(shè)置返回結(jié)果為t(異常結(jié)果)
* 改變狀態(tài)從 COMPLETING 到 EXCEPTIONAL
* 調(diào)用finishCompletion完成收尾工作
*/
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
和set方法一個(gè)套路。
最后看下run過(guò)程中最后調(diào)用的handlePossibleCancellationInterrupt方法:
/**
* 確保cancel(true)產(chǎn)生的中斷發(fā)生在run或runAndReset方法過(guò)程中。
*/
private void handlePossibleCancellationInterrupt(int s) {
// 如果當(dāng)前正在中斷過(guò)程中,自旋等待一下,等中斷完成。
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// 這里的state狀態(tài)一定是INTERRUPTED;
// 這里不能清除中斷標(biāo)記,因?yàn)闆](méi)辦法區(qū)分來(lái)自cancel(true)的中斷。
// Thread.interrupted();
}
這里總結(jié)一下run方法:
- 只有state為NEW的時(shí)候才執(zhí)行任務(wù)(調(diào)用內(nèi)部callable的run方法)。執(zhí)行前會(huì)原子的設(shè)置執(zhí)行線程(runner),防止競(jìng)爭(zhēng)。
- 如果任務(wù)執(zhí)行成功,設(shè)置執(zhí)行結(jié)果,狀態(tài)變更:NEW -> COMPLETING -> NORMAL。
- 如果任務(wù)執(zhí)行發(fā)生異常,設(shè)置異常結(jié)果,狀態(tài)變更:NEW -> COMPLETING -> EXCEPTIONAL。
- 將Treiber Stack中等待當(dāng)前任務(wù)執(zhí)行結(jié)果的等待節(jié)點(diǎn)中的線程全部喚醒,同時(shí)刪除這些等待節(jié)點(diǎn),將整個(gè)Treiber Stack置空。
- 最后別忘了等一下可能發(fā)生的cancel(true)中引起的中斷,讓這些中斷發(fā)生在執(zhí)行任務(wù)過(guò)程中(別泄露出去)。
runAndReset()
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
該方法和run方法的區(qū)別是,run方法只能被運(yùn)行一次任務(wù),而該方法可以多次運(yùn)行任務(wù)。而runAndReset這個(gè)方法不會(huì)設(shè)置任務(wù)的執(zhí)行結(jié)果值,如果該任務(wù)成功執(zhí)行完成后,不修改state的狀態(tài),還是可運(yùn)行(NEW)狀態(tài),如果取消任務(wù)或出現(xiàn)異常,則不會(huì)再次執(zhí)行。
get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L); // 如果任務(wù)還沒(méi)執(zhí)行完畢,等待任務(wù)執(zhí)行完畢。
return report(s); // 如果任務(wù)執(zhí)行完畢,獲取執(zhí)行結(jié)果。
}
看下awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 先算出到期時(shí)間。
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
// 如果當(dāng)前線程被中斷,移除等待節(jié)點(diǎn)q,然后拋出中斷異常。
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING) {
// 如果任務(wù)已經(jīng)執(zhí)行完畢
if (q != null)
q.thread = null; // 如果q不為null,將q中的thread置空。
return s; // 返回任務(wù)狀態(tài)。
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield(); // 如果當(dāng)前正在完成過(guò)程中,出讓CPU。
else if (q == null)
q = new WaitNode(); // 創(chuàng)建一個(gè)等待節(jié)點(diǎn)。
else if (!queued)
// 將q(包含當(dāng)前線程的等待節(jié)點(diǎn))入隊(duì)。
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
//如果超時(shí),移除等待節(jié)點(diǎn)q
removeWaiter(q);
//返回任務(wù)狀態(tài)。
return state;
}
//超時(shí)的話,就阻塞給定時(shí)間。
LockSupport.parkNanos(this, nanos);
}
else
//沒(méi)設(shè)置超時(shí)的話,就阻塞當(dāng)前線程。
LockSupport.park(this);
}
}
再看下awaitDone方法中調(diào)用的removeWaiter:
private void removeWaiter(WaitNode node) {
if (node != null) {
//將node的thread域置空。
node.thread = null;
//下面過(guò)程中會(huì)將node從等待隊(duì)列中移除,以thread域?yàn)閚ull為依據(jù),
//如果過(guò)程中發(fā)生了競(jìng)爭(zhēng),重試。
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
再看下get方法中獲取結(jié)果時(shí)調(diào)用的report:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report如果是正常狀態(tài),就返回結(jié)果。否則拋出異常。
看完了get方法,再看下get(long timeout, TimeUnit unit)方法:
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
小結(jié)一下get方法:
- 首先檢查當(dāng)前任務(wù)的狀態(tài),如果狀態(tài)表示執(zhí)行完成,進(jìn)入第2步。
- 獲取執(zhí)行結(jié)果,也可能得到取消或者執(zhí)行異常,get過(guò)程結(jié)束。
- 如果當(dāng)前任務(wù)狀態(tài)表示未執(zhí)行或者正在執(zhí)行,那么當(dāng)前線程放入一個(gè)新建的等待節(jié)點(diǎn),然后進(jìn)入Treiber Stack進(jìn)行阻塞等待。
- 如果任務(wù)被工作線程(對(duì)當(dāng)前線程來(lái)說(shuō)是其他線程)執(zhí)行完畢,執(zhí)行完畢時(shí)工作線程會(huì)喚醒Treiber Stack上等待的所有線程,所以當(dāng)前線程被喚醒,清空當(dāng)前等待節(jié)點(diǎn)上的線程域,然后進(jìn)入第2步。
- 當(dāng)前線程在阻塞等待結(jié)果過(guò)程中可能被中斷,如果被中斷,那么會(huì)移除當(dāng)前線程在Treiber Stack上對(duì)應(yīng)的等待節(jié)點(diǎn),然后拋出中斷異常,get過(guò)程結(jié)束。
- 當(dāng)前線程也可能執(zhí)行帶有超時(shí)時(shí)間的阻塞等待,如果超時(shí)時(shí)間過(guò)了,還沒(méi)得到執(zhí)行結(jié)果,那么會(huì)除當(dāng)前線程在Treiber Stack上對(duì)應(yīng)的等待節(jié)點(diǎn),然后拋出超時(shí)異常,get過(guò)程結(jié)束。
cancel(boolean)
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try {
// mayInterruptIfRunning并且有正在運(yùn)行的線程,調(diào)用interrupt中斷,最后設(shè)置狀態(tài)為INTERRUPTED
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
cancel分為兩種情況:
- mayInterruptIfRunning == true。這個(gè)時(shí)候狀態(tài)從 NEW 變?yōu)?INTERRUPTING ,如果有正在運(yùn)行的線程,調(diào)用interrupt中斷,最后把狀態(tài)從 INTERRUPTING 變?yōu)?INTERRUPTED。
- mayInterruptIfRunning == false。這個(gè)時(shí)候狀態(tài)從 NEW 變?yōu)?CANCELLED。
- 最后都會(huì)執(zhí)行finishCompletion方法,完成結(jié)束的收尾工作。喚醒所有在get()方法等待的線程。
4、jdk1.6不同的地方
為什么jdk 1.6以后的FutureTask不像1.6那樣基于AQS構(gòu)建了?
首先,前面貼代碼了時(shí)候故意去掉了一些注釋,避免讀代碼的時(shí)候受影響,現(xiàn)在我們來(lái)看一下關(guān)鍵的一段:
/*
* Revision notes: This differs from previous versions of this
* class that relied on AbstractQueuedSynchronizer, mainly to
* avoid surprising users about retaining interrupt status during
* cancellation races.
*/
主要是這句:mainly to avoid surprising users about retaining interrupt status during cancellation races。
大概意思是:使用AQS的方式,可能會(huì)在取消發(fā)生競(jìng)爭(zhēng)過(guò)程中詭異的保留了中斷狀態(tài)。這里之所以沒(méi)有采用這種方式,是為了避免這種情況的發(fā)生。
具體什么情況下會(huì)發(fā)生呢?
ThreadPoolExecutor executor = ...;
executor.submit(task1).cancel(true);
executor.submit(task2);
看上面的代碼,雖然中斷的是task1,但可能task2得到中斷信號(hào)。
原因是什么呢?看下JDK1.6的FutureTask的中斷代碼:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false;
if (compareAndSetState(s, CANCELLED))
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null) //第1行
r.interrupt(); //第2行
}
releaseShared(0);
done();
return true;
}
結(jié)合上面代碼例子看一下,如果主線程執(zhí)行到第1行的時(shí)候,線程池可能會(huì)認(rèn)為task1已經(jīng)執(zhí)行結(jié)束(被取消),然后讓之前執(zhí)行task1工作線程去執(zhí)行task2,工作線程開(kāi)始執(zhí)行task2之后,然后主線程執(zhí)行第2行(我們會(huì)發(fā)現(xiàn)并沒(méi)有任何同步機(jī)制來(lái)阻止這種情況的發(fā)生),這樣就會(huì)導(dǎo)致task2被中斷了。更多的相關(guān)信息參考這個(gè)Bug說(shuō)明。
所以現(xiàn)在就能更好的理解JDK1.7 FutureTask的handlePossibleCancellationInterrupt中為什么要將cancel(true)中的中斷保留在當(dāng)前run方法運(yùn)行范圍內(nèi)了吧!
JDK1.7的FutureTask的代碼解析完畢!