本文主要是為了介紹多線程中使用的幾種任務(wù):Runnable、Callable、FutureTask等,是對(duì)前面多線程系列的最后一個(gè)補(bǔ)充了,接下來兩篇就是相當(dāng)于實(shí)戰(zhàn)練習(xí)了。
Runnable 和 Callable的區(qū)別
Runnable和Callable都是定義了接口,可以用在線程池中異步執(zhí)行,區(qū)別是:
- Runnable可以直接被Thread執(zhí)行,但是沒有返回值
- Callable執(zhí)行之后有返回值,但是只能提交給線程池執(zhí)行。
Future 和 FutureTask
Future是一個(gè)接口,主要是線程池中任務(wù)執(zhí)行之后用于返回結(jié)果的獲取,定義了
- boolean cancel(boolean mayInterruptIfRunning); 取消任務(wù)
- boolean isCancelled(); 任務(wù)是否取消
- boolean isDone(); 任務(wù)是否執(zhí)行完畢
- V get(); 獲取任務(wù)執(zhí)行的結(jié)果,注意這個(gè)方法阻塞線程
- V get(long timeout, TimeUnit unit); 同上,只是增加了一個(gè)超時(shí)時(shí)間
Future有一個(gè)直接繼承接口RunnableFuture,RunnableFuture有一個(gè)實(shí)現(xiàn)的子類FutureTask,RunnableFuture這個(gè)接口同時(shí)還繼承了Runnable接口,這意味著FutureTask可以作為Future或者Runnable使用。
再來看一下FutureTask的實(shí)現(xiàn),最終內(nèi)部保存了一個(gè)Callable對(duì)象,也就是提交的任務(wù)
先看構(gòu)造函數(shù)
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
...
/**
* Creates a {@code FutureTask} that will, upon running, execute the
* given {@code Runnable}, and arrange that {@code get} will return the
* given result on successful completion.
*
* @param runnable the runnable task
* @param result the result to return on successful completion. If
* you don't need a particular result, consider using
* constructions of the form:
* {@code Future<?> f = new FutureTask<Void>(runnable, null)}
* @throws NullPointerException if the runnable is null
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
一共2個(gè)構(gòu)造函數(shù),一個(gè)是接受Callable,一個(gè)是接受Runnable和默認(rèn)返回值。
詳細(xì)看一下第二個(gè)構(gòu)造參數(shù),注釋很清楚的說明,當(dāng)你需要runnable可取消同時(shí)不關(guān)心返回值時(shí),可以這樣構(gòu)建
Future<?> f = new FutureTask<Void>(runnable, null);
同時(shí)構(gòu)造函數(shù)里,Runnable被適配成了一個(gè)Callable,看一下里面的實(shí)現(xiàn):
/**
* Returns a {@link Callable} object that, when
* called, runs the given task and returns the given result. This
* can be useful when applying methods requiring a
* {@code Callable} to an otherwise resultless action.
* @param task the task to run
* @param result the result to return
* @param <T> the type of the result
* @return a callable object
* @throws NullPointerException if task null
*/
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
...
/**
* A callable that runs given task and returns given result
*/
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
上面兩個(gè)函數(shù)將一個(gè)Runnable適配成了一個(gè)Callable,是Executors中提供的靜態(tài)方法。
再看一下FutureTask對(duì)Runnable的實(shí)現(xiàn)
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
拋開其他的判斷條件,其實(shí)就是對(duì)內(nèi)部保存的Callable調(diào)用了call方法,進(jìn)行執(zhí)行并保存結(jié)果。這就是FutureTask主要的幾個(gè)方法,下面有用。
ExecutorService中Future的應(yīng)用
上面2點(diǎn)主要是為了給這點(diǎn)做伏筆,現(xiàn)在我們來看為什么ExecutorService中的submit()既可以提交Runnable又可以提交Callable并返回結(jié)果,同時(shí)看看直接execute() Runnable會(huì)有什么不同。
Future submit(Runnable task)
先來看一下這個(gè)方法的實(shí)現(xiàn)
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
代碼上可以很直觀的看到,提交的Runnable被newTaskFor()適配成了RunnableFuture。來看一下newTaskFor()這個(gè)方法的實(shí)現(xiàn)。
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
直接是new了一個(gè)FutureTask對(duì)象,上面我們分析過這種情況,runnable其實(shí)是會(huì)被適配成一個(gè)Callable的。
Future<T> submit(Callable<T> task)
再來看一下這個(gè)方法
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
跟上面的代碼簡直一摸一樣,都是適配成了RunnableFuture。
看到這里可以明白,提交Runnable時(shí)是將Runnable適配成了Callable,也就是submit方法最終都會(huì)調(diào)用的的是Callable對(duì)象。
上面我們說過RunnableFuture實(shí)現(xiàn)了Runnable接口,當(dāng)他被execute時(shí),肯定是被當(dāng)作Runnable使用的,看一下兩個(gè)submit方法最終都是通過execute來執(zhí)行的。
上面介紹FutureTask時(shí)我們知道,對(duì)Runnable的實(shí)現(xiàn)FutureTask最后調(diào)用的是Callable的call方法。
到這里可以知道了,
- 當(dāng)我們提交一個(gè)Runnable的任務(wù)時(shí),首先通過FutureTask的構(gòu)造函數(shù)被適配成了一個(gè)Callable對(duì)象被保存FutureTask中。
- 當(dāng)任務(wù)被執(zhí)行時(shí),F(xiàn)utureTask又被當(dāng)作一個(gè)Runnable使用,調(diào)用了保存在內(nèi)部的Callable的call方法,任務(wù)被執(zhí)行并返回了結(jié)果。
- Runnable被適配成Callable時(shí)最終調(diào)用的還是自己的run方法。
ExecutorService中execute()
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
上面注釋的意思是:
- 當(dāng)前核心線程數(shù)少于corePoolSize是,嘗試直接新建Thread用來執(zhí)行任務(wù)。同時(shí)校驗(yàn)添加的過程,防止出錯(cuò)。
- 任務(wù)入隊(duì)時(shí)二次校驗(yàn)是否需要新建線程,判斷是否需要回滾等。
- 如果任務(wù)不能入隊(duì)則新建非核心線程處理,如果失敗那么就拒絕任務(wù)。
這個(gè)就是任務(wù)具體執(zhí)行的過程,同時(shí)也可以知道為什么上一篇博客中通過反射獲取workers的size就能知道當(dāng)前線程的數(shù)量。
總結(jié)
又到總結(jié)時(shí)間,博文主要講了幾個(gè)概念Runnable、Callable、Future以及相關(guān)的子類,總結(jié)如下:
- Runnable可以直接被Thread執(zhí)行,但是沒有返回值
- Callable執(zhí)行之后有返回值,但是只能提交給線程池執(zhí)行。
- Future定義了一系列關(guān)于任務(wù)取消的接口方法
- FutureTask是Future唯一實(shí)現(xiàn)類,它也實(shí)現(xiàn)了Runnable接口
- 線程池submit Callable和Runnable時(shí)最終都會(huì)轉(zhuǎn)換成FutureTask
- FutureTask被執(zhí)行時(shí)是被當(dāng)成Runnable使用的,執(zhí)行了內(nèi)部保存的Callable的call方法