FutureTask介紹
FutureTask是一種可以取消的異步的計算任務(wù)。它的計算是通過Callable實現(xiàn)的,可以把它理解為是可以返回結(jié)果的Runnable。
使用FutureTask的優(yōu)勢有:
- 可以獲取線程執(zhí)行后的返回結(jié)果;
- 提供了超時控制功能。
它實現(xiàn)了Runnable接口和Future接口:

什么是異步計算呢?也就是說,在讓該任務(wù)執(zhí)行時,不需要一直等待其運行結(jié)束返回結(jié)果,而是可以先去處理其他的事情,然后再獲取返回結(jié)果。例如你想下載一個很大的文件,這時很耗時的操作,沒必要一直等待著文件下載完,你可以先去吃個飯,然后再回來看下文件是否下載完成,如果下載完成就可以使用了,否則還需要繼續(xù)等待。
FutureTask的實現(xiàn)
FutureTask的狀態(tài)
FutureTask內(nèi)部有這樣幾種狀態(tài):
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
看名字應(yīng)該很好理解了,當創(chuàng)建一個FutureTask對象是,初始的狀態(tài)是NEW,在運行時狀態(tài)會轉(zhuǎn)換,有4中狀態(tài)的轉(zhuǎn)換過程:
- NEW -> COMPLETING -> NORMAL:正常執(zhí)行并返回;
- NEW -> COMPLETING -> EXCEPTIONAL:執(zhí)行過程中出現(xiàn)了異常;
- NEW -> CANCELLED;執(zhí)行前被取消;
- NEW -> INTERRUPTING -> INTERRUPTED:取消時被中斷。
使用FutureTask
下面看一下具體的使用過程:
public class FutureTaskTest {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ExecutorService executor = Executors.newSingleThreadExecutor();
FutureTask<Integer> future = new FutureTask<>(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
int result = 0;
for (int i = 0; i < 100; i++) {
result += i;
}
return result;
}
});
executor.execute(future);
System.out.println(future.get());
}
}
FutureTask內(nèi)部結(jié)構(gòu)
public class FutureTask<V> implements RunnableFuture<V> {
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
private Object outcome; // non-volatile, protected by state reads/writes
/** 執(zhí)行callable的線程 **/
private volatile Thread runner;
/**
* Treiber stack of waiting threads
* 使用Treiber算法實現(xiàn)的無阻塞的Stack,
* 用于存放等待的線程
*/
private volatile WaitNode waiters;
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
public V get() throws InterruptedException, ExecutionException {
...
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
...
}
...
這里的waiters理解為一個stack,因為在調(diào)用get方法時任務(wù)可能還沒有執(zhí)行完,這時需要將調(diào)用get方法的線程放入waiters中。
最重要的兩個get方法,用于獲取返回結(jié)果,第二種提供了超時控制功能。
FutureTask構(gòu)造方法
FutureTask有兩個構(gòu)造方法:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
第二種構(gòu)造方法傳入一個Runnable對象和一個返回值對象,因為Runnable是沒有返回值的,所以要通過result參數(shù)在執(zhí)行完之后返回結(jié)果。
run方法
FutureTask實現(xiàn)了Runnable接口,所以需要實現(xiàn)run方法,代碼如下:
public void run() {
/*
* 首先判斷狀態(tài),如果不是初始狀態(tài),說明任務(wù)已經(jīng)被執(zhí)行或取消;
* runner是FutureTask的一個屬性,用于保存執(zhí)行任務(wù)的線程,
* 如果不為空則表示已經(jīng)有線程正在執(zhí)行,這里用CAS來設(shè)置,失敗則返回。
*/
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 只有初始狀態(tài)才會執(zhí)行
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執(zhí)行任務(wù)
result = c.call();
// 如果沒出現(xiàn)異常,則說明執(zhí)行成功了
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 設(shè)置異常
setException(ex);
}
// 如果執(zhí)行成功,則設(shè)置返回結(jié)果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 無論是否執(zhí)行成功,把runner設(shè)置為null
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 如果被中斷,則說明調(diào)用的cancel(true),
// 這里要保證在cancel方法中把state設(shè)置為INTERRUPTED
// 否則可能在cancel方法中還沒執(zhí)行中斷,造成中斷的泄露
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
總結(jié)一下run方法的執(zhí)行過程
- 只有state為NEW的時候才執(zhí)行任務(wù);
- 執(zhí)行前要設(shè)置runner為當前線程,使用CAS來設(shè)置是為了防止競爭;
- 如果任務(wù)執(zhí)行成功,任務(wù)狀態(tài)從NEW轉(zhuǎn)換為COMPLETING,如果執(zhí)行正常,設(shè)置最終狀態(tài)為NORMAL;如果執(zhí)行中出現(xiàn)了異常,設(shè)置最終狀態(tài)為EXCEPTIONAL;
- 喚醒并刪除Treiber Stack中的所有節(jié)點;
- 如果調(diào)用了cancel(true)方法進行了中斷,要確保在run方法執(zhí)行結(jié)束前的狀態(tài)是INTERRUPTED。
這里涉及到3個比較重要的方法:setException,set和handlePossibleCancellationInterrupt。
setException方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}
如果在執(zhí)行過程中(也就是調(diào)用call方法時)出現(xiàn)了異常,則要把狀態(tài)先設(shè)置為COMPLETING,如果成功,設(shè)置outcome = t,outcome對象是Object類型的,用來保存異常或者返回結(jié)果對象,也就是說,在正常的執(zhí)行過程中(沒有異常,沒有調(diào)用cancel方法),outcome保存著返回結(jié)果對象,會被返回,如果出現(xiàn)了異?;蛘咧袛?,則不會返回并拋出異常,這個在介紹report方法時會講到。
接著設(shè)置狀態(tài)為EXCEPTIONAL,這也是最終的狀態(tài)。
finishCompletion方法稍后再分析。
set方法
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
很簡單,與setException類似,只不過這里的outcome是返回結(jié)果對象,狀態(tài)先設(shè)置為COMPLETING,然后再設(shè)置為MORMAL。
handlePossibleCancellationInterrupt方法
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
handlePossibleCancellationInterrupt方法要確保cancel(true)產(chǎn)生的中斷發(fā)生在run或runAndReset方法執(zhí)行的過程中。這里會循環(huán)的調(diào)用Thread.yield()來確保狀態(tài)在cancel方法中被設(shè)置為INTERRUPTED。
這里不能夠清除中斷標記,因為不能確定中斷一定來自于cancel方法。
finishCompletion方法
private void finishCompletion() {
// assert state > COMPLETING;
// 執(zhí)行該方法時state必須大于COMPLETING
// 逐個喚醒waiters中的線程
for (WaitNode q; (q = waiters) != null;) {
// 設(shè)置棧頂節(jié)點為null
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
// 喚醒線程
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
// 如果next為空,說明??樟?,跳出循環(huán)
WaitNode next = q.next;
if (next == null)
break;
// 方便gc回收
q.next = null; // unlink to help gc
// 重新設(shè)置棧頂node
q = next;
}
break;
}
}
// 鉤子方法
done();
callable = null; // to reduce footprint
}
在調(diào)用get方法時,如果任務(wù)還沒有執(zhí)行結(jié)束,則會阻塞調(diào)用的線程,然后把調(diào)用的線程放入waiters中,這時,如果任務(wù)執(zhí)行完畢,也就是調(diào)用了finishCompletion方法,waiters會依次出棧并逐個喚醒對應(yīng)的線程。
由此可以想到,WaitNode一定是在get方法中被添加到棧中的,下面來看下get方法的實現(xiàn)。
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
這兩個方法類似,首先判斷狀態(tài),如果s <= COMPLETING,說明任務(wù)已經(jīng)執(zhí)行完畢,但set方法或setException方法還未執(zhí)行結(jié)束(還未設(shè)置狀態(tài)為NORMAL或EXCEPTIONAL),這時需要將當前線程添加到waiters中并阻塞。
第二種get提供了超時功能,如果在規(guī)定時間內(nèi)任務(wù)還未執(zhí)行完畢或者狀態(tài)還是COMPLETING,則獲取結(jié)果超時,拋出TimeoutException。而第一種get會一直阻塞直到state > COMPLETING。
awaitDone方法
awaitDone方法的工作是根據(jù)狀態(tài)來判斷是否能夠返回結(jié)果,如果任務(wù)還未執(zhí)行完畢,要添加到waiters中并阻塞,否則返回狀態(tài)。代碼如下:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 計算到期時間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果被中斷,刪除節(jié)點,拋出異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任務(wù)執(zhí)行完畢并且設(shè)置了最終狀態(tài)或者被取消,則返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// s == COMPLETING時通過Thread.yield();讓步其他線程執(zhí)行,
// 主要是為了讓狀態(tài)改變
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 創(chuàng)建一個WaitNode
else if (q == null)
q = new WaitNode();
// CAS設(shè)置棧頂節(jié)點
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 如果設(shè)置了超時,則計算是否已經(jīng)到了開始設(shè)置的到期時間
else if (timed) {
nanos = deadline - System.nanoTime();
// 如果已經(jīng)到了到期時間,刪除節(jié)點,返回狀態(tài)
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞到到期時間
LockSupport.parkNanos(this, nanos);
}
// 如果沒有設(shè)置超時,會一直阻塞,直到被中斷或者被喚醒
else
LockSupport.park(this);
}
}
removeWaiter方法
private void removeWaiter(WaitNode node) {
if (node != null) {
// 將thread設(shè)置為null是因為下面要根據(jù)thread是否為null判斷是否要把node移出
node.thread = null;
// 這里自旋保證刪除成功
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
// q.thread != null說明該q節(jié)點不需要移除
if (q.thread != null)
pred = q;
// 如果q.thread == null,且pred != null,需要刪除q節(jié)點
else if (pred != null) {
// 刪除q節(jié)點
pred.next = s;
// pred.thread == null時說明在并發(fā)情況下被其他線程修改了;
// 返回第一個for循環(huán)重試
if (pred.thread == null) // check for race
continue retry;
}
// 如果q.thread != null且pred == null,說明q是棧頂節(jié)點
// 設(shè)置棧頂元素為s節(jié)點,如果失敗則返回重試
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}
cancel方法
cancel方法用于取消任務(wù),這里可能有兩種情況,一種是任務(wù)已經(jīng)執(zhí)行了,另一種是還未執(zhí)行,代碼如下:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
// mayInterruptIfRunning參數(shù)表示是否要進行中斷
if (mayInterruptIfRunning) {
try {
// runner保存著當前執(zhí)行任務(wù)的線程
Thread t = runner;
// 中斷線程
if (t != null)
t.interrupt();
} finally { // final state
// 設(shè)置最終狀態(tài)為INTERRUPTED
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
第一個if判斷可能有些不好理解,其實等價于如下代碼:
if (!state == NEW ||
!UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED))
如果狀態(tài)不是NEW,或者設(shè)置狀態(tài)為INTERRUPTING或CANCELLED失敗,則取消失敗,返回false。
簡單來說有一下兩種情況:
- 如果當前任務(wù)還沒有執(zhí)行,那么state == NEW,那么會嘗試設(shè)置狀態(tài),如果設(shè)置狀態(tài)失敗會返回false,表示取消失??;
- 如果當前任務(wù)已經(jīng)被執(zhí)行了,那么state > NEW,也就是!state == NEW為true,直接返回false。
也就是說,如果任務(wù)一旦開始執(zhí)行了(state != NEW),那么就不能被取消。
如果mayInterruptIfRunning為true,要中斷當前執(zhí)行任務(wù)的線程。
report方法
get方法在調(diào)用awaitDone方法后,會調(diào)用report方法進行返回:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
很簡單,可以看到有3中執(zhí)行情況:
- 如果
s == NORMAL為true,說明是正常執(zhí)行結(jié)束,那么根據(jù)上述的分析,在正常執(zhí)行結(jié)束時outcome存放的是返回結(jié)果,把outcome返回; - 如果
s >= CANCELLED為true,說明是被取消了,拋出CancellationException; - 如果
s < CANCELLED,那么狀態(tài)只能是是EXCEPTIONAL,表示在執(zhí)行過程中出現(xiàn)了異常,拋出ExecutionException。
runAndReset方法
該方法和run方法類似,區(qū)別在于這個方法不會設(shè)置任務(wù)的執(zhí)行結(jié)果值,所以在正常執(zhí)行時,不會修改state,除非發(fā)生了異常或者中斷,最后返回是否正確的執(zhí)行并復(fù)位:
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
// 不獲取和設(shè)置返回值
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
// 是否正確的執(zhí)行并復(fù)位
return ran && s == NEW;
}
總結(jié)
本文分析了FutureTask的執(zhí)行過程和獲取返回值的過程,要注意以下幾個地方:
- FutureTask是線程安全的,在多線程下任務(wù)也只會被執(zhí)行一次;
- 注意在執(zhí)行時各種狀態(tài)的切換;
- get方法調(diào)用時,如果任務(wù)沒有結(jié)束,要阻塞當前線程,法阻塞的線程會保存在一個Treiber Stack中;
- get方法超時功能如果超時未獲取成功,會拋出TimeoutException;
- 注意在取消時的線程中斷,在run方法中一定要保證結(jié)束時的狀態(tài)是INTERRUPTED,否則在cancel方法中可能沒有執(zhí)行interrupt,造成中斷的泄露。