Java并發(fā)編程源碼分析系列:
- 分析Java線程池的創(chuàng)建
- 分析Java線程池執(zhí)行原理
- 分析Java線程池Callable任務(wù)執(zhí)行原理
- 分析ReentrantLock的實現(xiàn)原理
- 分析CountDownLatch的實現(xiàn)原理
- 分析同步工具Semaphore和CyclicBarrier的實現(xiàn)原理
延遲或周期執(zhí)行任務(wù)可以使用Timer或者ScheduledThreadPoolExecutor,前者可以拋棄,后者是今天的主角。
ScheduledThreadPoolExecutor繼承了ThreadPoolExecutor,對應(yīng)執(zhí)行任務(wù)變成ScheduledFutureTask。本文會在前三篇分析線程池原理的基礎(chǔ)上,分析ScheduledThreadPoolExecutor的實現(xiàn)原理,最后介紹下為什么不用Timer了。
ScheduledThreadPoolExecutor的創(chuàng)建
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(10);
ScheduledExecutorService singleScheduledThreadPool = Executors.newSingleThreadScheduledExecutor();
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
ScheduledThreadPoolExecutor的創(chuàng)建可以使用Executors,也可以自己傳參構(gòu)建。上面的構(gòu)造函數(shù)是參數(shù)最全的版本,可以設(shè)置線程目標(biāo)數(shù)量、線程工廠和飽和策略。至于等待隊列,使用內(nèi)部類DelayedWorkQueue,看后文分析。
ScheduledFutureTask
ScheduledFutureTask的構(gòu)造函數(shù)沒什么特別,保存了三個參數(shù)。
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
- time:任務(wù)執(zhí)行時間;
- period:任務(wù)周期執(zhí)行間隔;
- sequenceNumber:自增的任務(wù)序號。
Callable默認(rèn)period=0,表示任務(wù)不是周期執(zhí)行,因為只有Runnable可以周期執(zhí)行。想想也是,Callable目的是獲得執(zhí)行結(jié)果,沒有必要重復(fù)調(diào)用。

ScheduledFutureTask繼承了我們熟悉的FutureTask,這個不用多說。圖1是它實現(xiàn)的接口,比較陌生的是Delayed,而Delayed又繼承了Comparable。
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
這兩個接口的存在很容易理解,ScheduledFutureTask在等待隊列里調(diào)度不再按照FIFO,而是按照執(zhí)行時間,誰即將執(zhí)行,誰就排在前面。在這里也可以看到sequenceNumber的作用,當(dāng)執(zhí)行時間相同時,按照序號排序。
添加延遲任務(wù)
對ScheduledThreadPoolExecutor使用通用的execute或者submit提交任務(wù),最終調(diào)用schedule方法,默認(rèn)馬上執(zhí)行。如果需要延遲執(zhí)行,需要直接使用schedule,傳遞時間參數(shù)。
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
Runnable和Callable包裝成ScheduledFutureTask實例,保存了延遲信息,然后執(zhí)行delayedExecute。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
如果線程池已經(jīng)關(guān)閉,直接調(diào)用飽和策略,否則將任務(wù)加入等待隊列。加入之后,需要再判斷線程池的狀態(tài),和當(dāng)前任務(wù)是否能運行。如果不能繼續(xù)執(zhí)行,將任務(wù)移出隊列并取消任務(wù)。
canRunInCurrentRunState處理任務(wù)加入等待隊列后,又未執(zhí)行就發(fā)生線程池關(guān)閉的情況,它通過預(yù)設(shè)的兩個變量判斷任務(wù)到底能不能執(zhí)行。
- 延遲任務(wù)用executeExistingDelayedTasksAfterShutdown
- 周期任務(wù)用continueExistingPeriodicTasksAfterShutdown
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
最后調(diào)用到ensurePrestart,使用addWorkder增加工作線程,這在ThreadPoolExecutor解釋過了
添加周期任務(wù)
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,long period,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,null,triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
執(zhí)行周期任務(wù)有上面兩個方法,具體作用方法名寫得很清楚:
- scheduleAtFixedRate:按固定的頻率執(zhí)行,不受執(zhí)行時長影響,到點就執(zhí)行;
- scheduleWithFixedDelay:任務(wù)執(zhí)行完后,按固定的延后時間再執(zhí)行。
兩個方法幾乎一樣,不同的是構(gòu)建ScheduledFutureTask時,period一個傳正數(shù),另一個傳負(fù)數(shù)。不用懷疑,區(qū)分兩種情況就是用正負(fù)。
等待隊列
線程池的等待隊列使用了內(nèi)部類DelayedWorkQueue,和普通線程池等待隊列最大的不同是它的任務(wù)是按照目標(biāo)執(zhí)行時間進(jìn)行排序。
入隊的offer被重寫了,add和put方法也是調(diào)用offer,具體BlockingQueue的實現(xiàn)邏輯不在這里討論,重點是看offer里的siftUp方法。
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
int parent = (k - 1) >>> 1;
RunnableScheduledFuture<?> e = queue[parent];
if (key.compareTo(e) >= 0)
break;
queue[k] = e;
setIndex(e, k);
k = parent;
}
queue[k] = key;
setIndex(key, k);
}
siftUp根據(jù)任務(wù)的compareTo,將任務(wù)移動到隊列中指定的位置,就是這樣。
對應(yīng)地,出隊take方法,根據(jù)任務(wù)的delay時間,小于等于0時將任務(wù)出隊,否則等待。
任務(wù)執(zhí)行
當(dāng)線程池從等待隊列取出一個任務(wù)時,會執(zhí)行它的run方法。
public void run() {
boolean periodic = isPeriodic();
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {
setNextRunTime();
reExecutePeriodic(outerTask);
}
}
方法有三個分支,第一個if判斷任務(wù)在當(dāng)前線程池狀態(tài)下是否能執(zhí)行,canRunInCurrentRunState已經(jīng)講解過。第二個if是判斷是否周期任務(wù),不是的話直接執(zhí)行,不需要多余的操作。重點來看第三個if,也就是周期執(zhí)行任務(wù)。
- runAndReset:任務(wù)執(zhí)行完重置為初始狀態(tài),等待下一次執(zhí)行;
- setNextRunTime:計算下次執(zhí)行時間;
- reExecutePeriodic:再調(diào)度任務(wù)。
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
計算下次執(zhí)行時間,period根據(jù)正負(fù)有不同的計算邏輯,負(fù)的時間也會先改正,很明顯對應(yīng)上文的scheduleAtFixedRate和scheduleWithFixedDelay兩個方法。
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
將任務(wù)重新加入等待隊列,中間幾個方法都解釋過了。
Timer的缺陷
自從知道ScheduledThreadPoolExecutor,再沒有使用Timer,因為它有幾個缺陷:
- 多任務(wù)在單線程里執(zhí)行,一個任務(wù)結(jié)束,另一個任務(wù)才能開始,時間間隔不準(zhǔn);
- 出現(xiàn)異常會導(dǎo)致全部任務(wù)停止;
- 絕對時間,受系統(tǒng)時間影響。
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);
Timer的代碼很簡單,主要數(shù)據(jù)結(jié)構(gòu)是一個任務(wù)隊列和一個執(zhí)行線程。新增的任務(wù)會加入任務(wù)隊列,到達(dá)時間后,由執(zhí)行線程執(zhí)行。只有一個線程,很容易理解上面講的缺陷。
ScheduledThreadPoolExecutor每個任務(wù)都有對應(yīng)的執(zhí)行線程,時間使用相對時間計算,也就沒有上面的缺陷,所以沒有理由使用Timer了。