Callable
我們知道線程運(yùn)行任務(wù)其中的一個(gè)方法就是創(chuàng)建一個(gè)實(shí)現(xiàn)Runnable接口的類,然后通過Thread的構(gòu)造方法設(shè)置進(jìn)去,線程啟動(dòng)后就可以執(zhí)行Runnalbe的邏輯。
可以回顧一樣Runnable接口的定義:
java.lang.Runnable
public interface Runnable {
public abstract void run();
}
可以看到Runnable接口只定義了一個(gè)方法,而且這個(gè)方法沒有返回值。
現(xiàn)在如果我們執(zhí)行一個(gè)任務(wù)需要它返回給我們運(yùn)行結(jié)果,該怎么做?
Java為我們提供了另外一個(gè)接口Callable,貌似有這樣的功能,相比于Runnable,它允許有返回值,并且可以拋出檢查型異常信息。
java.util.concurrent.Callable
@FunctionalInterface
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
可以看到它還是一個(gè)泛型接口,方法的返回值可以泛型化。
于是,我們類似這樣定義我們的任務(wù),提供返回值。
public class MyTask implements Callable<Integer> {
@Override
public Integer call() throws Exception {
return 1 * 1;
}
}
當(dāng)你這樣定義后,你會(huì)發(fā)現(xiàn)了一個(gè)問題,任務(wù)是需要線程來運(yùn)行了,但是看了一下Thread類,發(fā)現(xiàn)沒有類似構(gòu)造方法或者是普通方法來將Callable類型的對(duì)象設(shè)置進(jìn)去,Thread只能運(yùn)行Runnable類型的任務(wù)對(duì)象。我想JDK API的制作者將這個(gè)接口的名字定義為"Callable"是有特殊含義: 使用實(shí)現(xiàn)它接口的類創(chuàng)造出來的對(duì)象只是一個(gè)可被調(diào)用的對(duì)象,而不是一個(gè)可運(yùn)行的對(duì)象。
線程運(yùn)行提供返回值,現(xiàn)在這個(gè)需求我們卡住了,先暫時(shí)放在一邊,我們繼續(xù)。
Runnable和Callable都可以理解為任務(wù)具體業(yè)務(wù)邏輯封裝接口。Java是面向?qū)ο蟮?,一切都是?duì)象,一段具體的業(yè)務(wù)邏輯代碼當(dāng)然也就需要有某種東西封裝起來,實(shí)現(xiàn)了Runnable或Callable接口的類創(chuàng)建的對(duì)象就是業(yè)務(wù)邏輯代碼的載體。
Future
有時(shí)候線程執(zhí)行任務(wù)是耗時(shí)的,這種情況下我們希望不要浪費(fèi)時(shí)間一直等待它返回結(jié)果,而是希望利用等待的時(shí)間處理其他任務(wù),只要知道有一個(gè)線程在跑,并且它在某個(gè)時(shí)刻會(huì)返回結(jié)果就行了。也就是說希望我們的執(zhí)行是異步的。
異步:當(dāng)一個(gè)異步過程調(diào)用發(fā)出后,調(diào)用者不能立刻得到結(jié)果。實(shí)際處理這個(gè)調(diào)用的部件在完成后,通過狀態(tài)、通知和回調(diào)來通知調(diào)用者。
Java提供了Future接口來實(shí)現(xiàn)對(duì)異步調(diào)用的支持。下面是接口的定義:
java.util.concurrent.Future
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
A Future represents the result of an asynchronous computation.
API對(duì)這個(gè)接口的描述是:Future代表異步計(jì)算的結(jié)果。這個(gè)接口提供了get方法,用于獲取異步執(zhí)行后的結(jié)果。
現(xiàn)在我們通過實(shí)現(xiàn)這個(gè)接口來定義我們的任務(wù)類,跟上面類似你會(huì)發(fā)現(xiàn),先不說異步獲取結(jié)果,首先這個(gè)定義出來的任務(wù)類創(chuàng)建的對(duì)象必須可以被線程調(diào)用,但是僅僅執(zhí)行這個(gè)接口的任務(wù)類不滿足。
也就是說我們需要一個(gè)既能被線程運(yùn)行又能異步獲取結(jié)果的類存在。
Java定義了RunnableFuture接口來支持這種需求。RunnableFuture接口的定義如下:
java.util.concurrent.RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture接口繼承了Runnable接口和Future接口,這樣實(shí)現(xiàn)了這個(gè)接口的任務(wù)類能被線程調(diào)用,然后也能異步獲取結(jié)果。
不過這里要注意:Future只是表示可異步獲取結(jié)果的接口類,它只是一個(gè)規(guī)定,實(shí)現(xiàn)它接口的類要實(shí)現(xiàn)它的這種語義。當(dāng)然我們可以自己實(shí)現(xiàn)如何異步調(diào)用,又如何返回異步調(diào)用的結(jié)果。不過JDK為我們提供了FutureTask接口,我們直接使用它就可以了。
FutureTask
public class FutureTask<V> implements RunnableFuture<V>
上面是FutureTask的定義,可以看到實(shí)現(xiàn)了RunnableFuture接口,這樣通過它創(chuàng)建的對(duì)象就是可被線程運(yùn)行的和可異步獲取結(jié)果的。
可以簡單地從字面上理解這個(gè)類:未來任務(wù)類。也就是說創(chuàng)建這個(gè)任務(wù)之后直接交給線程執(zhí)行,什么時(shí)候運(yùn)行不用關(guān)心了,相信你運(yùn)行后會(huì)帶回來某些東西。
##FutureTask類概覽

上面不是說過可返回結(jié)果的Callable對(duì)象無法被線程運(yùn)行嘛,從上面可以看出FutureTask可接收Callable類型的對(duì)象,并且FutureTask還是可運(yùn)行的,那么在FutureTask的run方法運(yùn)行Callable的call方法,就到達(dá)了線程運(yùn)行Callable代碼邏輯的效果。
##可能的狀態(tài)轉(zhuǎn)移
在我們之前使用Runnable的時(shí)候,實(shí)現(xiàn)了Runnable接口的類創(chuàng)建的對(duì)象就是一個(gè)任務(wù),創(chuàng)建這個(gè)任務(wù)后交給線程,之后這個(gè)任務(wù)對(duì)我們來說就是不可控了,但是我們的"FutureTask"必須是可控的,因?yàn)闃I(yè)務(wù)邏輯執(zhí)行完成后,需要得到計(jì)算的結(jié)果。
正如上面FutureTask類概覽所示,state用來表示FutureTask的內(nèi)部狀態(tài),所擁有的狀態(tài)如上圖所示,可能的狀態(tài)轉(zhuǎn)移為:
NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED
##FutureTask執(zhí)行原理
1. 結(jié)果的載體
上面我們已經(jīng)說了Callable對(duì)象封裝了業(yè)務(wù)邏輯代碼,結(jié)果是業(yè)務(wù)邏輯代碼執(zhí)行后產(chǎn)生的,雖然說Callable對(duì)象提供的call方法可以返回執(zhí)行后的結(jié)果,但是我們無法直接從這對(duì)象中獲取,因?yàn)檫@個(gè)對(duì)象不能被線程直接調(diào)用,能被線程直接調(diào)用的是實(shí)現(xiàn)了Runnable接口的類的對(duì)象,執(zhí)行的代碼在run方法體中,也就是Callable封裝的業(yè)務(wù)邏輯代碼必須以某種方式被放到實(shí)現(xiàn)了Runnable接口的類的對(duì)象的run方法中執(zhí)行。而FutureTask是一個(gè)實(shí)現(xiàn)了Runnable接口的類,它可以用來執(zhí)行Callable封裝的邏輯??梢钥吹紽utureTask內(nèi)部有:
private Callable<V> callable;
這個(gè)屬性,它用來接收封裝了業(yè)務(wù)邏輯代碼的Callable對(duì)象。也就是線程通過調(diào)用FutureTask就間接地調(diào)用了Callable對(duì)象封裝的邏輯代碼,這個(gè)調(diào)用發(fā)生在run方法中。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
Runnable接口的run方法是被線程調(diào)用的,而且上面我們說過這個(gè)方法是沒有返回值的,就算我們持有了FutureTask對(duì)象的引用,并且Callable對(duì)象的邏輯已經(jīng)在run方法內(nèi)執(zhí)行,我們也沒法獲取到Callable對(duì)象執(zhí)行返回的值,不過既然我們有FutureTask對(duì)象的持有權(quán),而且Callable對(duì)象方法的執(zhí)行在FutureTask對(duì)象的run方法內(nèi)部,那么我們就可以在FutureTask內(nèi)部定義一個(gè)屬性來接收Callable對(duì)象方法的返回值。FutureTask的outcome屬性正是啟到這個(gè)作用:
private Object outcome; // non-volatile, protected by state reads/writes
這個(gè)值的設(shè)置就是在run方法的內(nèi)部:
if (ran)
set(result);
2. 結(jié)果的獲取
FutureTask實(shí)現(xiàn)類Future接口,F(xiàn)uture定義的get方法提供了獲取異步任務(wù)的執(zhí)行結(jié)果的作用,這個(gè)執(zhí)行結(jié)果就是我們上面說的outcome的值。
Callable對(duì)象封裝的業(yè)務(wù)邏輯代碼執(zhí)行操作可能是非常耗時(shí)的,也就是說cal
l方法一時(shí)半會(huì)執(zhí)行不完,執(zhí)行不完就無法將執(zhí)行的結(jié)果設(shè)置到outcome中,如果FutureTask的方法只是簡單的像下面這樣返回outcome:
public V get() throws InterruptedException, ExecutionException {
return (V) outcome;
}
那么返回的這個(gè)outcome就毫無意義,因?yàn)闃I(yè)務(wù)邏輯代碼還沒執(zhí)行完,根本就還沒設(shè)置結(jié)果。
如何解決呢?解決的辦法就是如果業(yè)務(wù)邏輯代碼沒有執(zhí)行完,那么當(dāng)線程調(diào)用get方法獲取結(jié)果的時(shí)候就讓他掛起,讓它知道處理還沒執(zhí)行完,你這時(shí)候要獲取這個(gè)值的話得到的也是無用的數(shù)據(jù),所以你要等等。
現(xiàn)在就有兩個(gè)問題了:如何判斷業(yè)務(wù)邏輯是否執(zhí)行完?如何掛起?什么喚醒?
@@如何判斷業(yè)務(wù)邏輯是否執(zhí)行完
邏輯邏輯代碼執(zhí)行完就是FutureTask的run方法正確無誤地調(diào)用了Callable對(duì)象的call方法,然后成功的設(shè)置了outcome值,也就是說如果到了這個(gè)時(shí)間點(diǎn)那么就可以說任務(wù)執(zhí)行完了。也就是說到這個(gè)點(diǎn)之后設(shè)置一個(gè)狀態(tài)變量來表示任務(wù)完成,F(xiàn)utureTask的state就是這樣的一個(gè)狀態(tài)變量,判斷這個(gè)狀態(tài)具有某值之后那么就表明任務(wù)已經(jīng)完成了。
所以當(dāng)線程調(diào)用get方法的時(shí)候首先就應(yīng)該判斷這個(gè)值是否就是表明任務(wù)完成的特定值,如果不是那么當(dāng)前線程就要?jiǎng)e掛起。
@@如何掛起
上面我們已經(jīng)說過了當(dāng)調(diào)用get方法,線程看到狀態(tài)變量的值不是表示任務(wù)完成的值時(shí),它要暫停執(zhí)行,想想我們學(xué)過的知識(shí)使一個(gè)線程掛起有兩種(不考慮sleep)方式:Object的wait方法和LockSupport的park方法。Object的wait方法需要同步機(jī)制的支持,而我們這里沒有共享資源訪問同步的問題(state是共享資源,但是會(huì)采用無鎖的機(jī)制),所以不適合。FutureTask內(nèi)部采用的是LockSupport的park方法來掛起線程。
LockSupport.park(this);
這里還有一個(gè)問題就是會(huì)有多個(gè)線程請(qǐng)求get方法,那么它們都需要掛起,而后面要重新喚醒它們,那么就需要有個(gè)地方暫存它們,F(xiàn)utureTask使用
Treiber Stack棧這種數(shù)據(jù)結(jié)構(gòu)來暫存掛起的線程。
Treiber Stack是一個(gè)可擴(kuò)展的無鎖棧,利用細(xì)粒度的并發(fā)原語CAS來實(shí)現(xiàn)的。
FutureTask內(nèi)部定義了代表當(dāng)前線程的WaitNode節(jié)點(diǎn):
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
通過CAS操作來實(shí)現(xiàn)節(jié)點(diǎn)的入棧:
UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q)
@@什么時(shí)候喚醒
當(dāng)業(yè)務(wù)邏輯代碼執(zhí)行完畢,并且成功設(shè)置了outcome值后就要喚醒之前棧中被掛起的線程。
FutureTask代碼中,當(dāng)run方法內(nèi)部Callable調(diào)用call執(zhí)行業(yè)務(wù)邏輯操作無誤后,就會(huì)執(zhí)行設(shè)置outcome的操作,outcome操作設(shè)置成功后,會(huì)做喚醒操作。
run
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
set
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
finishCompletion
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
3. 關(guān)于取消
Future接口定義了取消任務(wù)的方法,可提供取消任務(wù)的功能:
boolean cancel(boolean mayInterruptIfRunning)
這里,我們首先要弄明白一點(diǎn),取消任務(wù)是取消什么?
FutureTask說到底還是像Runnable一樣被線程調(diào)用,線程調(diào)用就是執(zhí)行run方法,所以取消任務(wù)就是中斷執(zhí)行run方法的那個(gè)線程,哪個(gè)線程在執(zhí)行FutureTask的run方法,F(xiàn)utureTask是有記錄的:
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
這個(gè)屬性就是用來記錄執(zhí)行FutureTask的線程對(duì)象的。這個(gè)屬性在run方法中設(shè)置:
public void run() {
if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread()))
return;
}
cancel邏輯就是中斷這個(gè)線程:
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
總結(jié)
FutureTask說白了就是一個(gè)實(shí)現(xiàn)了Runnable接口的類,線程可以調(diào)用它的run方法。但是它跟普通實(shí)現(xiàn)Runnable接口類不一樣,它又實(shí)現(xiàn)了Future接口,內(nèi)部通過包裝Callable這個(gè)能執(zhí)行業(yè)務(wù)邏輯代碼又能返回結(jié)果的對(duì)象使得它具有了返回處理結(jié)果的能力。同時(shí)通過其內(nèi)部定義的任務(wù)處理狀態(tài)的屬性的支持,使得它能感知任務(wù)處理狀態(tài),通過這個(gè)狀態(tài)來掛起和喚醒想獲取它處理結(jié)果的線程。
FutureTask是一種主動(dòng)感知狀態(tài)的異步調(diào)用模式。
其他的異步模式有:通知和回調(diào)。FutureTask是主動(dòng)去詢問狀態(tài)。