通常我們會(huì)通過 Thread 類創(chuàng)建執(zhí)行線程并執(zhí)行提交的任務(wù)(Runnable)
new Thread(new Runnable() {
@Override
public void run() {
}
}).start();
但是這種方式有個(gè)缺陷,它沒有返回值,有時(shí)候,我們想知道查詢?nèi)蝿?wù)是否完成,或者我們想在任務(wù)執(zhí)行完畢后立即通知我們,很顯單純的 Thread 類做不到。 然而, Executor, Callable, FutureTask 可以幫我們實(shí)現(xiàn),不過,最終執(zhí)行任務(wù)的方式還是 Thread + Runnable,這些類只不過包裝了一些邏輯而已。
Callable
public interface Callable<V> {
V call() throws Exception;
}
通常我們說,如果需要返回任務(wù)執(zhí)行的結(jié)果,任務(wù)需要實(shí)現(xiàn) Callable 接口,而不是 Runnable 接口,這個(gè)說法并不是很精確,因?yàn)榫€程中執(zhí)行的任務(wù)永遠(yuǎn)是 Runnable,而 Callable 只是在 Runnable 的 run() 方法中調(diào)用而已,這個(gè)可以在后面的源碼分析中可知。
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Future 代表一個(gè)異步任務(wù)的結(jié)果,可以通過 get() 來查詢結(jié)果,但是會(huì)阻塞當(dāng)前線程,直到結(jié)果返回。也可以通過 cancel() 來取消一個(gè)任務(wù)。 也可以通過 isCancelled() 和 isDone() 查詢?nèi)蝿?wù)的狀態(tài)。
其中 cancel() 方法比較特殊,它會(huì)嘗試取消當(dāng)前任務(wù)的執(zhí)行。如果當(dāng)前任務(wù)已經(jīng)完成,或者已經(jīng)被取消,或者由于某些原因不能被取消,cancel() 方法就會(huì)返回 false。 而如果 cancel() 方法返回 true ,并且當(dāng)前任務(wù)還沒有啟動(dòng),這個(gè)任務(wù)將永遠(yuǎn)不會(huì)運(yùn)行。 而如果這個(gè)任務(wù)已經(jīng)啟動(dòng)了,cancel(boolean mayInterruptIfRunning) 的參數(shù) mayInterruptIfRunning 決定了執(zhí)行當(dāng)前任務(wù)的線程時(shí)候被中斷,從而來停止任務(wù)。
FutureTask
public class FutureTask<V> implements RunnableFuture<V> {}
Future 實(shí)現(xiàn)了 RunnableFuture 接口,而 RunnableFuture 正如它的名字一樣,繼承了 Runnable 和 Future 兩個(gè)接口。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
例子
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
Future<String> future = service.submit(new Task());
try {
String result = future.get();
System.out.println(result);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}
class Task implements Callable<String> {
@Override
public String call() throws Exception {
TimeUnit.SECONDS.sleep(10);
return "Task completed!";
}
}
執(zhí)行結(jié)果
Task completed!
執(zhí)行結(jié)果很簡單,但是 get() 方法是會(huì)阻塞主線程 10 秒的,有時(shí)候,例如在 Android 中,阻塞主線程這么長時(shí)間,這是不可接受的。那么有沒有一種方法,能夠在任務(wù)執(zhí)行完畢后,自動(dòng)通知我們呢? 當(dāng)然有,往下看。
原理
ExecutorService 接口的 submit(Callable) 方法是在 AbstractExecutorService 中實(shí)現(xiàn)的。
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
submit() 方法首先把 Callable 對象轉(zhuǎn)換為了 RunnableFuture 接口實(shí)現(xiàn)類對象,在這里用的是 FutureTask, 最終也是返回這個(gè) FutureTask 對象,只是最后被轉(zhuǎn)為了更為通用的 Future 接口,因此例子中的 submit() 返回結(jié)果也可以強(qiáng)制轉(zhuǎn)換為 FutureTask ,只是我們一般不這樣使用它,這個(gè)事情后面再說。從這里可以看出,最終執(zhí)行的任務(wù)還是 Runnable。
由于 RunnableFuture 繼承自 Runnable 接口,因此能被 Executor 的 execute(Runnable) 方法執(zhí)行。當(dāng)調(diào)用 execute(ftask) 提交任務(wù)后,就會(huì)執(zhí)行 FutureTask 的 run() 方法,主要源碼如下
public void run() {
// ...
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// STE1: 調(diào)用 Callable 接口的 call() 方法
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
// STEP2: 設(shè)置結(jié)果
set(result);
}
} finally {
// ...
}
}
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
// 這就是 get() 方法返回的結(jié)果
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
private void finishCompletion() {
// assert state > COMPLETING;
// ...
// STEP3: 任務(wù)執(zhí)行完畢后調(diào)用 FutureTask 的 done() 方法
done();
callable = null;
}
protected void done() { }
通過我對源碼的注釋,可以看到 FutureTask.run() 方法,實(shí)際做了三件事
- 調(diào)用
Callable的run()方法,并獲取結(jié)果。 - 用
outcome保存返回的結(jié)果。通過Future接口的get()方法就可以獲取這個(gè)結(jié)果 - 任務(wù)執(zhí)行完畢后,調(diào)用
FutureTask的done()方法。
FutureTask 的正確使用
從剛才的源碼實(shí)現(xiàn)中可以發(fā)現(xiàn),提交的是 Callable 對象,而最終執(zhí)行的是 FutureTask 對象。那么我們完全可以自己用 Callable 創(chuàng)建一個(gè) FutureTask 對象,然后用 Executor 的 execute(Runnable) 方法提交任務(wù)。那么,這么做有什么好處呢?因?yàn)槲覀兛梢詮?fù)寫 FutureTask 中的 done() 方法,從而在任務(wù)執(zhí)行完畢后回調(diào)。
public class Test {
public static void main(String[] args) {
ExecutorService service = Executors.newCachedThreadPool();
MyFutureTask futureTask = new MyFutureTask(new Task());
System.out.println(Thread.currentThread().getName() + ": start task");
service.execute(futureTask);
}
}
class MyFutureTask extends FutureTask<String> {
public MyFutureTask(Callable<String> callable) {
super(callable);
}
@Override
protected void done() {
try {
System.out.println(Thread.currentThread().getName() + ": " + get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
;
}
}
class Task implements Callable<String> {
@Override
public String call() throws Exception {
return "Task completed!";
}
}
執(zhí)行結(jié)果如下
main: start task
pool-1-thread-1: Task completed!
根據(jù)上面講的原理,這里的結(jié)果并不意外。 從這個(gè)例子可以看出,FutureTask 的 done() 方法能夠在任務(wù)執(zhí)行完畢后第一時(shí)間回調(diào),這就不需要去調(diào)用會(huì)阻塞當(dāng)前線程的get() 方法。
另外,從結(jié)果看 done() 方法并不是在主線程中執(zhí)行,如果是像在 Android 中,是不能在工作線程中直接更新 UI 的。
其實(shí)我們的代碼可以寫得再簡單點(diǎn),只是觀賞性并不好
FutureTask<String> futureTask = new FutureTask<String>(new Task()){
@Override
protected void done() {
try {
System.out.println(Thread.currentThread().getName() + ": " + get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
};
直接在創(chuàng)建 FutureTask 對象的時(shí)候,復(fù)寫它的 done() 方法。
ExecutorService.submit()
查看 API 發(fā)現(xiàn),ExecutorService 的 submit() 方法可以傳入一個(gè) Runnable 對象作為參數(shù)。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
通過上面的原理分析可知,submit(Runnable task) 方法的任務(wù)返回結(jié)果是 null,而 submit(Runnable task, T result) 返回的結(jié)果就是參數(shù) result.
Runnable 轉(zhuǎn) Callable
有時(shí)候代碼中只用了 Runnable 來執(zhí)行任務(wù),如果我們想給這個(gè)任務(wù)一個(gè)返回結(jié)果,就需要把 Runnable 轉(zhuǎn)為 Callable ,然后用 ExecutorService.submit() 來執(zhí)行。 Executors 提供了靜態(tài)方法用來轉(zhuǎn)換
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
如果看懂了上面的原理,這里就很簡單了 ~