Callable接口
Java中的子線程通常是通過Thread或者Runnable的方式實現(xiàn)的,但是這種方式只能通過回調(diào),或者共享變量等方式來傳遞數(shù)據(jù),而Callable則是可以獲取返回結(jié)果的一種子線程實現(xiàn)方式。
Callable是一個接口,源碼如下:
public interface Callable<V> {
V call() throws Exception;
}
非常簡單,只有一個方法,和一個泛型V,所以我們創(chuàng)建Callable對象的時候,也只需要指定返回類型并實現(xiàn)call方法就可以了。
Future接口
看完了Callable接口,會發(fā)現(xiàn)它非常簡單,沒有辦法在子線程中直接通過它來獲取到返回結(jié)果的,這時候就需要Future發(fā)揮作用了。源碼如下:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
-
boolean cancel(boolean mayInterruptIfRunning)在任務(wù)開始前取消,傳入值表示任務(wù)開始后是否允許打斷,返回值表示是否取消成功(任務(wù)已經(jīng)開始不允許打斷,已經(jīng)運行結(jié)束,已經(jīng)取消等等狀態(tài)會返回失敗) -
boolean isCancelled()是否已經(jīng)被取消 -
boolean isDone()是否完成任務(wù) -
V get()嘗試獲取返回結(jié)果,阻塞方法 -
V get(long timeout, TimeUnit unit)同上,可以指定超時時間
可以看到,Future實際上可以理解為Callable的管理類。
在線程池中執(zhí)行任務(wù)時,除了execute方法之外,還有一個submit方法:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
它返回的就是一個Future對象,可以通過它來回去Callable任務(wù)的執(zhí)行結(jié)果:
Callable<Integer> callable = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Sub Thread is calculating...");
Thread.sleep(10000);
return 10;
}
};
Future<Integer> future = Executors.newCachedThreadPool().submit(callable);
try {
System.out.println("Main Thread start waiting result... ");
int res = future.get();
System.out.println("Main Thread get result: " + res);
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//運行結(jié)果:
Main Thread start waiting result...
Sub Thread is calculating...
Main Thread get result: 10
FutureTask
如果不用Java提供的線程池,直接用Thread怎樣在子線程中運行Callable呢? 這時候就要用到FutureTask類了。
FutureTask實現(xiàn)了Future和Runnable接口,這就意味著,它既可以放在Thread中去運行,又能夠?qū)θ蝿?wù)進行管理,下面是源碼:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
可以看到,構(gòu)造函數(shù)中傳入了待運行的任務(wù)Callable對象或者Runnable對象和指定的返回結(jié)果,V result用來指定運行完成后的返回值,如果不想用指定值,可以用Future<?> f = new FutureTask<Void>(runnable, null)來返回null。采用Callable構(gòu)造方法創(chuàng)建的FutureTask對象,執(zhí)行完畢返回的是實際運算結(jié)果,而Runnable 構(gòu)造函數(shù)返回值是傳入的result。
task的狀態(tài)
FutureTask中持有的任務(wù)對象,有以下幾種狀態(tài):
private static final int NEW = 0; //新建或運行中
private static final int COMPLETING = 1;//任務(wù)運行結(jié)束,正在處理一些后續(xù)操作
private static final int NORMAL = 2;//任務(wù)已經(jīng)完成,COMPLETING的下一個狀態(tài)
private static final int EXCEPTIONAL = 3;//任務(wù)拋出異常,COMPLETING的下一個狀態(tài)
private static final int CANCELLED = 4;//任務(wù)被取消
private static final int INTERRUPTING = 5;//收到打斷指令,還沒有執(zhí)行interrupt
private static final int INTERRUPTED = 6;//收到打斷指令,也執(zhí)行了interrupt
可能的狀態(tài)變化主要有以下幾種:
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
task的執(zhí)行過程
FutureTask實現(xiàn)了Runnable接口,是可以直接放在Thread中執(zhí)行的,實際上運行的就是它的run方法:
public void run() {
//r如果當(dāng)前狀態(tài)不是NEW,說明任務(wù)已經(jīng)執(zhí)行完成了,直接返回
//如果當(dāng)前狀態(tài)是NEW,嘗試用CAS方式將當(dāng)前線程賦值給RUNNER,賦值前RUNNER的值應(yīng)該是null,否則賦值失敗
//賦值失敗表示已經(jīng)有線程執(zhí)行了run方法,直接返回
if (state != NEW ||
!U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
//運行中拋出異常
setException(ex);
}
//ran為true,說明正常運行結(jié)束,得到了返回結(jié)果
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
執(zhí)行結(jié)果其實是比較簡單的,通過RUNNER來記錄執(zhí)行任務(wù)的線程,從而保證只有一個線程可以執(zhí)行該任務(wù)。運行結(jié)束后有兩個出口:
-
setException(ex);運行中出錯,拋出異常 -
set(result);任務(wù)執(zhí)行完畢,獲取到返回值
protected void setException(Throwable t) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
U.putOrderedInt(this, STATE, EXCEPTIONAL); // final state
finishCompletion();
}
}
protected void set(V v) {
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
U.putOrderedInt(this, STATE, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (U.compareAndSwapObject(this, WAITERS, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//喚醒等待線程
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
done();
callable = null; // to reduce footprint
}
這兩個方法實際上是一樣的,都是將狀態(tài)賦值為COMPLETING,然后保存結(jié)果(運行結(jié)果或錯誤信息),再執(zhí)行finishCompletion方法,通知WAITERS里記錄的等待線程繼續(xù)執(zhí)行,并清空WAITERS。
獲取返回結(jié)果
獲取返回結(jié)果是通過get()方法:
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
// We may have already promised (via isDone) that we are done
// so never return empty-handed or throw InterruptedException
Thread.yield();
else if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
else if (q == null) {
if (timed && nanos <= 0L)
return s;
q = new WaitNode();
}
else if (!queued)
//cas機制,將新建節(jié)點q的next指向原來的節(jié)點workers,然后將workers更新為新建的節(jié)點。workers(WAITERS)實際上就是持有了所有等待線程的一個鏈表
queued = U.compareAndSwapObject(this, WAITERS,
q.next = waiters, q);
else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);
}
else
LockSupport.park(this);
}
}
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
get()方法比較簡單,先判斷當(dāng)前狀態(tài),如果狀態(tài) < COMPLETING,說明任務(wù)沒有執(zhí)行完畢,直接調(diào)用awaitDone方法。
awaitDone方法可以接受兩個參數(shù),用來指定是否設(shè)置超時時間。它內(nèi)部是一個無限for循環(huán)。下面是awaitDone方法的執(zhí)行步驟(忽略超時設(shè)置):
進入
awaitDone方法時,state一定是小于COMPLETING的,第一次會走else if (q == null)分支,創(chuàng)建一個WaitNode()對象用來保存當(dāng)前線程第二次循環(huán)q已經(jīng)不是null了,如果任務(wù)仍然沒有結(jié)束,會執(zhí)行
else if (!queued)分支,queued表示創(chuàng)建的WaitNode()是否已經(jīng)添加到鏈表里,如果沒有嘗試添加,直到添加成功為止。等待線程添加成功以后進入下一個循環(huán),此時如果任務(wù)仍然沒有結(jié)束,會走到else分支,掛起當(dāng)前線程(阻塞)
此處阻塞的是等待結(jié)果的線程,也就是調(diào)用
FutureTask的get()方法的線程,而不是執(zhí)行任務(wù)的線程。阻塞線程用的是LockSupport.park(this)方法,喚醒的方法是LockSupport.unpark(),該方法在上邊的finishCompletion()中出現(xiàn)了,也就是說,任務(wù)執(zhí)行結(jié)束(運行完,拋出異常,被取消)時,等待的線程才會被喚醒,繼續(xù)下一次循環(huán)。任務(wù)結(jié)束以后,如果state是
COMPLETING狀態(tài),說明一些清理任務(wù)還沒有執(zhí)行完,等待的線程會讓出cpu,讓其他線程優(yōu)先執(zhí)行直到state 大于
COMPLETING,說明FutureTask已經(jīng)完全結(jié)束了,此時會會執(zhí)行(s > COMPLETING)分支,把節(jié)點置空,并返回。
awaitDone返回以后,說明任務(wù)已經(jīng)執(zhí)行完成了,會進入report方法:
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
可以看到,如果是正常結(jié)束,或者拋出異常結(jié)束,會返回結(jié)果,而如果是被取消,則會拋出異常。
使用
Callable<Integer> call = new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("正在計算結(jié)果...");
Thread.sleep(3000);
return 1;
}
};
FutureTask<Integer> task = new FutureTask<>(call);
Thread thread = new Thread(task);
thread.start();
Integer result = task.get();
System.out.println("結(jié)果為:" + result);
總結(jié)
FutureTask可以視為一個管理Callable任務(wù)的工具類,執(zhí)行Callable任務(wù)的是FutureTask的run方法,所以,可以通過new Thread(futuretask)的方法來實現(xiàn)子線程執(zhí)行任務(wù)獲取執(zhí)行結(jié)果是通過
FutureTask的get方法,調(diào)用該方法后,如果線程會被掛起,知道任務(wù)結(jié)束為止獲取結(jié)果的線程數(shù)量沒有限定,可以是任意個線程
獲取結(jié)果的線程被掛起以后,可以通過取消,超時等方法在任務(wù)執(zhí)行完畢以前結(jié)束掛起狀態(tài)。