FutureTask是一個(gè)支持取消的異步處理器,一般在線程池中用于異步接受callable返回值。
主要實(shí)現(xiàn)分三部分:
1、封裝callable,然后放到線程池中去異步執(zhí)行->run。
2、獲取結(jié)果->get。
3、取消任務(wù)->cancel。
接下來(lái)主要學(xué)習(xí)下該模型如何實(shí)現(xiàn)
舉例說(shuō)明FutureTask在線程池中的應(yīng)用
// 第一步,定義線程池,
ExecutorService executor = new ThreadPoolExecutor(
minPoolSize,
maxPollSize,
keepAliveTime,
TimeUnit.SECONDS,
new SynchronousQueue<>());
// 第二步,放到線程池中執(zhí)行,返回FutureTask
FutureTask task = executor.submit(callable);
// 第三步,獲取返回值
T data = task.get();
學(xué)習(xí)下FutureTask實(shí)現(xiàn)
類屬性
//以下是FutureTask的各種狀態(tài)
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable; //執(zhí)行的任務(wù)
private Object outcome; //存儲(chǔ)結(jié)果或者異常
private volatile Thread runner;//執(zhí)行callable的線程
private volatile WaitNode waiters; //調(diào)用get方法等待獲取結(jié)果的線程棧
其中各種狀態(tài)存在 最終狀態(tài) status>COMPLETING
1)NEW -> COMPLETING -> NORMAL(有正常結(jié)果)
2) NEW -> COMPLETING -> EXCEPTIONAL(結(jié)果為異常)
3) NEW -> CANCELLED(無(wú)結(jié)果)
4) NEW -> INTERRUPTING -> INTERRUPTED(無(wú)結(jié)果)
類方法
從上面舉例說(shuō)明開(kāi)始分析
run()方法
FutureTask繼承runnable,ExecutorService submit把提交的任務(wù)封裝成FutureTask然后放到線程池ThreadPoolExecutor的execute執(zhí)行。
public void run() {
//如果不是初始狀態(tài)或者cas設(shè)置運(yùn)行線程是當(dāng)前線程不成功,直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 執(zhí)行callable任務(wù) 這里對(duì)異常進(jìn)行了catch
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex); // 封裝異常到outcome
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
// 這里如果是中斷中,設(shè)置成最終狀態(tài)
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
以上是run方法源碼實(shí)現(xiàn)很簡(jiǎn)單,解析如下:
1、如果不是始狀態(tài)或者cas設(shè)置運(yùn)行線程是當(dāng)前線程不成功,直接返回,防止多個(gè)線程重復(fù)執(zhí)行。
2、執(zhí)行callable的call(),即提交執(zhí)行任務(wù)(這里做了catch,會(huì)捕獲執(zhí)行任務(wù)的異常封裝到outcome中)
3、如果成功執(zhí)行set方法,封裝結(jié)果。
set方法
protected void set(V v) {
//cas方式設(shè)置成completing狀態(tài),防止多個(gè)線程同時(shí)處理
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 封裝結(jié)果
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // 最終設(shè)置成normal狀態(tài)
finishCompletion();
}
}
解析如下:
1、cas方式設(shè)置成completing狀態(tài),防止多個(gè)線程同時(shí)處理
2、封裝結(jié)果到outcome,然后設(shè)置到最終狀態(tài)normal
3、執(zhí)行finishCompletion方法。
finishCompletion方法
// state > COMPLETING; 不管異常,中斷,還是執(zhí)行完成,都需要執(zhí)行該方法來(lái)喚醒調(diào)用get方法阻塞的線程
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
// cas 設(shè)置waiters為null,防止多個(gè)線程執(zhí)行。
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 循環(huán)喚醒所有等待結(jié)果的線程
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//該方法為空,可以被重寫(xiě)
done();
callable = null; // to reduce footprint
}
解析如下:
遍歷waiters中的等待節(jié)點(diǎn),并通過(guò) LockSupport喚醒每一個(gè)節(jié)點(diǎn),通知每個(gè)線程,該任務(wù)執(zhí)行完成(可能是執(zhí)行完成,也可能cancel,異常等)
以上就是執(zhí)行的過(guò)程,接下來(lái)分析獲取結(jié)果的過(guò)程->get
get方法
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
解析如下:
以上兩個(gè)方法,原理一樣,其中一個(gè)設(shè)置超時(shí)時(shí)間,支持最多阻塞多長(zhǎng)時(shí)間。
狀態(tài)如果小于COMPLETING,說(shuō)明還沒(méi)到最終狀態(tài),(不管是否是成功,還是異常,還是取消)
調(diào)用awaitDone方法阻塞線程,最終調(diào)用report方法返回結(jié)果。
awaitDone方法
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
//線程可中斷,如果當(dāng)前阻塞獲取結(jié)果線程執(zhí)行interrupt()方法,則從隊(duì)列中移除該節(jié)點(diǎn),并拋出中斷異常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果已經(jīng)是最終狀態(tài),退出返回
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//這里做了個(gè)優(yōu)化,competiting到最終狀態(tài)時(shí)間很短,通過(guò)yield比掛起響應(yīng)更快。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 初始化該阻塞節(jié)點(diǎn)
else if (q == null)
q = new WaitNode();
// cas方式寫(xiě)到阻塞waiters棧中
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 這里做阻塞時(shí)間處理。
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 阻塞線程,有超時(shí)時(shí)間
LockSupport.parkNanos(this, nanos);
}
else
// 阻塞線程
LockSupport.park(this);
}
}
解析如下:
整體流程已寫(xiě)到注解中,整體實(shí)現(xiàn)是放在一個(gè)死循環(huán)中,唯一出口,是達(dá)到最終狀態(tài)。
然后是構(gòu)建節(jié)點(diǎn)元素,并將該節(jié)點(diǎn)入棧,同時(shí)阻塞當(dāng)前線程等待運(yùn)行主任務(wù)的線程喚醒該節(jié)點(diǎn)。
report方法
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
然后是report方法,如果是正常結(jié)束,返回結(jié)果,如果不是正常結(jié)束,(取消,中斷)拋出異常。
最后分析下取消
cancel方法
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
解析如下:
mayInterruptIfRunning參數(shù)是是否允許運(yùn)行中被中斷取消。
1、根據(jù)入?yún)⑹欠駷閠rue,CAS設(shè)置狀態(tài)為INTERRUPTING或CANCELLED,設(shè)置成功,繼續(xù)第二步,否則直接返回false。
2、如果允許運(yùn)行中被中斷取消,調(diào)用runner.interupt()進(jìn)行中斷取消,設(shè)置狀態(tài)為INTERRUPTED
喚醒所有在get()方法等待的線程
此處有兩種狀態(tài)轉(zhuǎn)換
1)如果mayInterruptIfRunning為true
status狀態(tài)轉(zhuǎn)換為 new -> INTERRUPTING->INTERRUPTED
主動(dòng)去中斷執(zhí)行線程,然后喚醒所有等待結(jié)果的線程
2)如果mayInterruptIfRunning為false
status狀態(tài)轉(zhuǎn)換為 new -> CANCELLED。
不會(huì)去中斷執(zhí)行線程,直接喚醒所有等待結(jié)果的線程,從awaitDone方法中可以看到,喚醒等待線程后,直接從跳轉(zhuǎn)回get方法,然后把結(jié)果返回給獲取結(jié)果的線程,當(dāng)然此時(shí)的結(jié)果是null。
總結(jié),以上就是FutureTask的源碼簡(jiǎn)單解析,實(shí)現(xiàn)比較簡(jiǎn)單,F(xiàn)utureTask就是一個(gè)實(shí)現(xiàn)Future模式,支持取消的異步處理器。