一、ScheduledThreadPoolExecutor的爺爺類AbstractExecutorService
本篇把上一篇漏掉的一些如submit等方法的解析補回來,盡量給大家構(gòu)建一個完整的體系。想要了解submit首先得了解FutureTask!
1、FutureTask類
1.1、FutureTask實現(xiàn)的接口,頂層是Runnable和Future
public class FutureTask<V> implements RunnableFuture<V> {···}
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
1.2、FutureTask重要的成員變量
private volatile int state; //任務(wù)的狀態(tài)
/*可能的任務(wù)狀態(tài)切換
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private static final int NEW = 0; //新建狀態(tài)
private static final int COMPLETING = 1; //正在執(zhí)行狀態(tài)
private static final int NORMAL = 2; //正??扇〗Y(jié)果狀態(tài)
private static final int EXCEPTIONAL = 3; //例外狀態(tài)
private static final int CANCELLED = 4; //取消狀態(tài)
private static final int INTERRUPTING = 5; //正在中斷
private static final int INTERRUPTED = 6; //中斷完成
/** 內(nèi)部組合了可返回結(jié)果的狀態(tài)類 */
private Callable<V> callable;
/** 任務(wù)執(zhí)行結(jié)果 */
private Object outcome;
/** 執(zhí)行callable的線程*/
private volatile Thread runner;
/**等待任務(wù)執(zhí)行完成的線程隊列*/
private volatile WaitNode waiters;
1.3、FutureTask重要的成員方法
//用于獲取任務(wù)結(jié)果,多線程競爭無所謂,只是讀取
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
/**取消任務(wù)方法,改變?nèi)蝿?wù)狀態(tài),中斷執(zhí)行任務(wù)的線程,喚醒等待隊列中的線程并把等待隊列線程節(jié)點設(shè)置為空*/
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
/** 獲取結(jié)果,如果當前任務(wù)還沒完成,調(diào)用awaitDone等待*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**定時等待任務(wù)完成,內(nèi)部排隊掛起*/
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
//如果任務(wù)完成則把線程從阻塞隊列中移除
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING)
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
/**任務(wù)執(zhí)行入口*/
public void run() {
//這里的cas操作設(shè)置了runner線程為當前線程
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))。
return;
try {
//構(gòu)造方法里設(shè)置的Runnable或者Callable
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//真正執(zhí)行任務(wù)的call方法,返回執(zhí)行結(jié)果,阻塞調(diào)用
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
runner = null;
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
在認識了FutureTask本質(zhì)是Runnable和Callable任務(wù)之后感覺FutureTask真是沒啥東西,所謂阻塞獲取無非就是死循環(huán)取Callable.call()方法的結(jié)果,等待一定時間獲取也是調(diào)用了LockSupport掛起線程一定時間而已。
2、最重要的方法submit:
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
2.1、submit方法解析:
submit前兩種都調(diào)用了newTaskFor方法,該方法返回一個FutureTask<T>,其內(nèi)部實現(xiàn)實際上調(diào)用了Executors的callable方法,Executor給Runnable和Callable提供了一個適配器,讓Runnable有結(jié)果可以返回,這里暫且知道FutureTask內(nèi)部是由Callable任務(wù)類型實現(xiàn)的就可以了。而真正執(zhí)行任務(wù)的方法還是execute(ftask),該方法的調(diào)用鏈是這樣的
execute(ftask)-->ThreadPoolExecutor.execute(ftask)-->ThreadPoolExecutor.addWorker()
-->t.start()-->(FutureTask)ftask.run()-->Callable.call()-->Executors.RunnableAdapter.call()-->(Runnable)task.run
這個調(diào)用鏈很長,最終的最終還是調(diào)用了task的run方法,中間過程為了保存線程執(zhí)行結(jié)果使用了FutureTask的一些方法。
submit的第三種方法我們是很常用的,直接提交一個Callable任務(wù)給線程池執(zhí)行,并返回一個可以獲取線程執(zhí)行結(jié)果的FutureTask。
以上就是對上一篇的補充,接著分析定時線程池
3、ScheduledThreadPoolExecutor類:
3.1、該類能夠執(zhí)行定時任務(wù)主要依賴于一個內(nèi)部類:ScheduledFutureTask
//該類繼承了FutureTask并實現(xiàn)了RunnableScheduledFuture(這個接口內(nèi)部又繼承了Delayed接口),接口不了解不要緊,只要知道必須實現(xiàn)兩個接口方法isPeriodic和getDelayed即可
private class ScheduledFutureTask<V>
extends FutureTask<V> implements RunnableScheduledFuture<V> {
//任務(wù)編號
private final long sequenceNumber;
//任務(wù)可以開始執(zhí)行的時間
private long time;
//任務(wù)重復(fù)執(zhí)行的時間間隔
private final long period;
/** reExecutePeriodic方法調(diào)用后重新入隊的任務(wù) */
RunnableScheduledFuture<V> outerTask = this;
// 支持快速取消,Delayed隊列的索引值
int heapIndex;
//這個比較方法是任務(wù)入隊的規(guī)則,即隊列按照這個比較方法對任務(wù)排序,實際上就是按照任務(wù)設(shè)定的開始時間創(chuàng)建的任務(wù)隊列
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//任務(wù)的run方法,
public void run() {
//是否可重復(fù)執(zhí)行
boolean periodic = isPeriodic();
//如果任務(wù)當前狀態(tài)不可執(zhí)行,那么取消該任務(wù)
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
//如果任務(wù)可重復(fù)執(zhí)行并且狀態(tài)也是可執(zhí)行的,那么直接執(zhí)行該任務(wù)并設(shè)置下一次要執(zhí)行的時間
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
//重新周期執(zhí)行該任務(wù),內(nèi)部把任務(wù)放到待執(zhí)行的隊列中(如果可執(zhí)行)或者取消任務(wù)
reExecutePeriodic(outerTask);
}
}
//取消任務(wù)方法,調(diào)用了ThreadPoolExecutor的取消方法
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = super.cancel(mayInterruptIfRunning);
//如果任務(wù)取消成功那么從任務(wù)隊列中移除任務(wù)
if (cancelled && removeOnCancel && heapIndex >= 0)
remove(this);
return cancelled;
}
3.1小結(jié):
定時線程池能夠定時執(zhí)行任務(wù)全靠內(nèi)部類ScheduledFutureTask,該類維護了一個DelayedWorkQueue(按照自己指定順序加入元素的隊列),任務(wù)在隊列中的順序?qū)嶋H上是任務(wù)開始執(zhí)行的時間的順序。當任務(wù)執(zhí)行的時候會調(diào)用ThreadPoolExecutor的執(zhí)行方法runAndReset(線程執(zhí)行完交出執(zhí)行權(quán)),執(zhí)行完會設(shè)置下一次將要執(zhí)行的時間并把任務(wù)重新放到任務(wù)隊列等待其他線程執(zhí)行。判斷當前任務(wù)是否可執(zhí)行要看當前線程池是否處于SHUTDOWN并有殘留任務(wù)或者處于RUNNING狀態(tài),如果線程池處于其他狀態(tài),那么當前任務(wù)是要取消的。
3.2、定時線程池最重要的方法schedule
//該方法對普通任務(wù)進行了裝飾,即裝飾城ScheduledFutureTask,真正執(zhí)行任務(wù)的是delayedExecute
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
//本方法真正執(zhí)行定時任務(wù)
private void delayedExecute(RunnableScheduledFuture<?> task) {
//如果線程池關(guān)閉,拒絕接受任務(wù)
if (isShutdown())
reject(task);
else {
//任務(wù)入隊
super.getQueue().add(task);
//如果線程池關(guān)閉或者不能繼續(xù)執(zhí)行任務(wù),從隊列里移除任務(wù)并取消任務(wù)
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
//如果可以執(zhí)行任務(wù)那么確保線程池各方面狀態(tài)正常
else
ensurePrestart();
}
}
//該方法在線程池中啟動了一定數(shù)量的工作線程,addWorker方法我們前面講過,該方法會自己取任務(wù)執(zhí)行
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
3.2小結(jié):
分析至此我們可以明白定時任務(wù)的執(zhí)行原理了,
首先:任務(wù)創(chuàng)建成功之后根據(jù)任務(wù)開始執(zhí)行的時間先后順序把任務(wù)加到任務(wù)隊列里(比如隊列里原來有個任務(wù)10點執(zhí)行,你這個任務(wù)11點執(zhí)行,那你就在它后面,如果再來一個任務(wù)10點半執(zhí)行,那么10點半的任務(wù)在你倆中間)
然后:啟動線程池,線程池的工人們來這個時序隊列里取任務(wù)執(zhí)行,如果在線程池關(guān)閉的時候恰好來了一個任務(wù),那么丟棄該任務(wù)。線程池的線程執(zhí)行完任務(wù)后會交出任務(wù)的使用權(quán)(runner=null),并把任務(wù)下一次執(zhí)行時間設(shè)置成你創(chuàng)建任務(wù)時設(shè)置的時間間隔+原來的執(zhí)行時間,并再一次把任務(wù)加到任務(wù)隊列(如果線程池沒關(guān)閉的話)。