FutureTask是java中一個(gè)用來實(shí)現(xiàn)可取消的同步計(jì)算的類??扇∠且?yàn)檫@個(gè)類可以調(diào)用cancel方法取消計(jì)算(其實(shí)也是有條件的取消),同步是因?yàn)檎{(diào)用get方法獲取計(jì)算結(jié)果的時(shí)候需要等待計(jì)算完成。
1.簡(jiǎn)單使用
Callable<Integer> callable = new Callable<Integer>()
{
@Override
public Integer call() throws Exception
{
int sum = 0;
int value = 10;
while (value-- >= 0)
{
sum += value;
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
System.out.println("done");
return sum;
}
};
FutureTask<Integer> task = new FutureTask<Integer>(callable);
new Thread(task).start();
try
{
System.out.println(task.get());
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (ExecutionException e)
{
e.printStackTrace();
}
FutureTask的使用比較簡(jiǎn)單,通過構(gòu)造方法傳入Callable對(duì)象或者是Runnable對(duì)象和返回值,新開一個(gè)線程執(zhí)行這個(gè)Callable對(duì)象,最后通過get方法獲取計(jì)算結(jié)果。
2.原理
從FutureTask的繼承關(guān)系上來看,F(xiàn)utureTask實(shí)現(xiàn)了RunnableFuture接口,RunnbleFuture接口又繼承了Runnable和Future接口,所以FutureTask可以看做是一個(gè)Runnable和Future的組合體。
public FutureTask(Callable<V> callable)
{
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
從構(gòu)造方法上看,僅僅是設(shè)置了兩個(gè)成員變量。當(dāng)放到線程中去執(zhí)行的時(shí)候,看看run方法。
public void run()
{
//1.CAS設(shè)置runner為當(dāng)前的線程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try
{
Callable<V> c = callable;
if (c != null && state == NEW)
{
V result;
boolean ran;
try
{
//2.執(zhí)行call的內(nèi)容
result = c.call();
ran = true;
}
catch (Throwable ex)
{
result = null;
ran = false;
setException(ex);
}
//3.設(shè)置執(zhí)行結(jié)果
if (ran)
set(result);
}
}
finally
{
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
代碼比較簡(jiǎn)單,首先通過CAS的方式設(shè)置runner變量為當(dāng)前的線程,然后執(zhí)行callable中的call方法,執(zhí)行完成之后調(diào)用set方法,并且在fianlly中重置runner為null。再看下set方法。
protected void set(V v)
{
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING))
{
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion()
{
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;)
{
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null))
{
for (;;)
{
Thread t = q.thread;
if (t != null)
{
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
set方法中,首先通過CAS的方式設(shè)置state為Completing,然后將結(jié)果設(shè)置給outcome,在此通過CAS將state設(shè)置為NORMAL.finishCompletion方法是將通過調(diào)用get方法而導(dǎo)致阻塞的線程喚醒。
調(diào)用get方法的時(shí)候會(huì)同步等待,看看get方法。
public V get() throws InterruptedException, ExecutionException
{
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException
{
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;)
{
if (Thread.interrupted())
{
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
if (s > COMPLETING)
{
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed)
{
nanos = deadline - System.nanoTime();
if (nanos <= 0L)
{
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
調(diào)用get的時(shí)機(jī),如果是在COMPLETING狀態(tài)之后,直接調(diào)用report方法返回。如果是在任務(wù)完成之前,則會(huì)調(diào)用awaitDone方法。awaitDone方法采用一個(gè)自旋的方式來確定state的狀態(tài),調(diào)用步奏如下:
- 新建一個(gè)
WaitNode對(duì)象 - 通過CAS的方式將waitNode放到等待鏈表中
- 掛起當(dāng)前的線程
當(dāng)當(dāng)前的任務(wù)執(zhí)行完成之后,也就是調(diào)用set方法之后,state狀態(tài)被重置,并且通過finishCompletion方法喚醒通過get方法而阻塞的線程,此時(shí)get方法繼續(xù)執(zhí)行,執(zhí)行report方法。
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
report方法直接將set方法中設(shè)置的outcome返回即可。
再來看下cancel方法
public boolean cancel(boolean mayInterruptIfRunning)
{
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try // in case call to interrupt throws exception
{
if (mayInterruptIfRunning)
{
try
{
Thread t = runner;
if (t != null)
t.interrupt();
}
finally // final state
{
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
}
finally
{
finishCompletion();
}
return true;
}
調(diào)用cancel的時(shí)機(jī)決定了它的行為,如果調(diào)用的時(shí)候,任務(wù)還沒有結(jié)束,首先通過CAS的方式設(shè)置state的狀態(tài)為INTERRUPTING或者是CANCELLED,然后通過finishCompletion方法喚醒所有掛起的線程,那么在上面介紹的awaitDone方法將會(huì)跳出循環(huán),進(jìn)入report方法,拋出異常。如果任務(wù)已經(jīng)結(jié)束,那么cancel方法直接返回false,表示cancel失敗。所有cancel方法只是取消所有get方法的掛起,立刻返回當(dāng)前的值或者拋出異常,而不是關(guān)閉當(dāng)前的正在運(yùn)行的線程。