1. 前言
在前面的文章中,我們介紹了定時(shí)任務(wù)類(lèi) Timer ,他是 JDK 1.3 中出現(xiàn)的,位于 java.util 包下。而今天說(shuō)的 ScheduledThreadPoolExecutor的是在 JUC 包下,是 JDK1.5 新增的。
今天就來(lái)說(shuō)說(shuō)這個(gè)類(lèi)。
2. API 介紹
該類(lèi)內(nèi)部結(jié)構(gòu)和 Timer還是有點(diǎn)類(lèi)似的,也是 3 個(gè)類(lèi):
-
ScheduledThreadPoolExecutor:程序員使用的接口。 -
DelayedWorkQueue: 存儲(chǔ)任務(wù)的隊(duì)列。 -
ScheduledFutureTask: 執(zhí)行任務(wù)的線程。
構(gòu)造方法介紹:
// 使用給定核心池大小創(chuàng)建一個(gè)新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 使用給定初始參數(shù)創(chuàng)建一個(gè)新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, RejectedExecutionHandler handler)
// 使用給定的初始參數(shù)創(chuàng)建一個(gè)新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory)
// 使用給定初始參數(shù)創(chuàng)建一個(gè)新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize, ThreadFactory threadFactory, RejectedExecutionHandler handler)
ScheduledThreadPoolExecutor最多支持 3 個(gè)參數(shù):核心線程數(shù)量,線程工廠,拒絕策略。
為什么沒(méi)有最大線程數(shù)量?由于 ScheduledThreadPoolExecutor 內(nèi)部是個(gè)無(wú)界隊(duì)列,maximumPoolSize 也就沒(méi)有意思了。
再介紹一下他的 API 方法,請(qǐng)?jiān)徫覍?JDK 文檔照抄過(guò)來(lái)了,就當(dāng)是備忘吧,如下:
protected <V> RunnableScheduledFuture<V> decorateTask(Callable<V> callable, RunnableScheduledFuture<V> task) // 修改或替換用于執(zhí)行 callable 的任務(wù)。
protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) // 修改或替換用于執(zhí)行 runnable 的任務(wù)。
void execute(Runnable command) // 使用所要求的零延遲執(zhí)行命令。
boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() // 獲取有關(guān)在此執(zhí)行程序已 shutdown 的情況下、是否繼續(xù)執(zhí)行現(xiàn)有定期任務(wù)的策略。
boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() // 獲取有關(guān)在此執(zhí)行程序已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有延遲任務(wù)的策略。
BlockingQueue<Runnable> getQueue() // 返回此執(zhí)行程序使用的任務(wù)隊(duì)列。
boolean remove(Runnable task) // 從執(zhí)行程序的內(nèi)部隊(duì)列中移除此任務(wù)(如果存在),從而如果尚未開(kāi)始,則其不再運(yùn)行。
<V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) // 創(chuàng)建并執(zhí)行在給定延遲后啟用的 ScheduledFuture。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) // 創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) // 創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開(kāi)始執(zhí)行,然后在 initialDelay+period 后執(zhí)行,接著在 initialDelay + 2 * period 后執(zhí)行,依此類(lèi)推。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) // 創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開(kāi)始之間都存在給定的延遲。
void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) // 設(shè)置有關(guān)在此執(zhí)行程序已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有定期任務(wù)的策略。
void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) // 設(shè)置有關(guān)在此執(zhí)行程序已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有延遲任務(wù)的策略。
void shutdown() // 在以前已提交任務(wù)的執(zhí)行中發(fā)起一個(gè)有序的關(guān)閉,但是不接受新任務(wù)。
List<Runnable> shutdownNow() // 嘗試停止所有正在執(zhí)行的任務(wù)、暫停等待任務(wù)的處理,并返回等待執(zhí)行的任務(wù)列表。
<T> Future<T> submit(Callable<T> task) // 提交一個(gè)返回值的任務(wù)用于執(zhí)行,返回一個(gè)表示任務(wù)的未決結(jié)果的 Future。
Future<?> submit(Runnable task) // 提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。
<T> Future<T> submit(Runnable task, T result) // 提交一個(gè) Runnable 任務(wù)用于執(zhí)行,并返回一個(gè)表示該任務(wù)的 Future。
最經(jīng)常使用的幾個(gè)方法如下:
// 使用給定核心池大小創(chuàng)建一個(gè)新 ScheduledThreadPoolExecutor。
ScheduledThreadPoolExecutor(int corePoolSize)
// 創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作。
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
// 創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期;也就是將在 initialDelay 后開(kāi)始執(zhí)行,然后在 initialDelay+period 后執(zhí)行,接著在 initialDelay + 2 * period 后執(zhí)行,依此類(lèi)推。
ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
// 創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開(kāi)始之間都存在給定的延遲。
ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
除了默認(rèn)的構(gòu)造方法,還有 3 個(gè) schedule 方法。我們將分析他們內(nèi)部的實(shí)現(xiàn)。
3. 構(gòu)造方法內(nèi)部實(shí)現(xiàn)
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
我們感興趣的就是這個(gè) DelayedWorkQueue 隊(duì)列了。他也是一個(gè)阻塞隊(duì)列。這個(gè)隊(duì)列的數(shù)據(jù)結(jié)構(gòu)是堆。同時(shí),這個(gè) queue 也是可比較的,比較什么呢?任務(wù)必須實(shí)現(xiàn) compareTo 方法,這個(gè)方法的比較邏輯是:比較任務(wù)的執(zhí)行時(shí)間,如果任務(wù)的執(zhí)行時(shí)間相同,則比較任務(wù)的加入時(shí)間。
因此,ScheduledFutureTask 有 2 個(gè)變量:
-
time: 任務(wù)的執(zhí)行時(shí)間。 -
sequenceNumber:任務(wù)的加入時(shí)間。
這兩個(gè)變量就是用來(lái)比較任務(wù)的執(zhí)行順序的。整個(gè)調(diào)度的順序就是這個(gè)邏輯。
4. 幾個(gè) schedule 方法的的區(qū)別
剛剛說(shuō)了,有 3 個(gè) schedule 方法:
ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit)
創(chuàng)建并執(zhí)行在給定延遲后啟用的一次性操作。ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)
創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,后續(xù)操作具有給定的周期;也就是將在initialDelay后開(kāi)始執(zhí)行,然后在initialDelay+period后執(zhí)行,接著在initialDelay + 2 * period后執(zhí)行,依此類(lèi)推。ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)
創(chuàng)建并執(zhí)行一個(gè)在給定初始延遲后首次啟用的定期操作,隨后,在每一次執(zhí)行終止和下一次執(zhí)行開(kāi)始之間都存在給定的延遲。
第一個(gè)方法執(zhí)行在給定的時(shí)間后,執(zhí)行一次就結(jié)束。
有點(diǎn)意思的地方是 第二個(gè)方法和 第三個(gè)方法,他們直接的區(qū)別。
這兩個(gè)方法都可以重復(fù)的調(diào)用。但是,重復(fù)調(diào)用的邏輯有所區(qū)別,這里就是比 Timer 好用的地方。
他們的共同點(diǎn)在于:必須等待上個(gè)任務(wù)執(zhí)行完畢才能執(zhí)行下個(gè)任務(wù)。
不同點(diǎn)在于:他們調(diào)度的時(shí)間粗略是不同的。
scheduleAtFixedRate 方法的執(zhí)行周期都是固定的,也就是,他是以上一個(gè)任務(wù)的開(kāi)始執(zhí)行時(shí)間作為起點(diǎn),加上之后的 period 時(shí)間,調(diào)度下次任務(wù)。
scheduleWithFixedDelay 方法則是以上一個(gè)任務(wù)的結(jié)束時(shí)間作為起點(diǎn),加上之后的 period 時(shí)間,調(diào)度下次任務(wù)。
有什么區(qū)別呢?
如何任務(wù)執(zhí)行時(shí)間很短,那就沒(méi)上面區(qū)別。但是,如果任務(wù)執(zhí)行時(shí)間很長(zhǎng),超過(guò)了 period 時(shí)間,那么區(qū)別就出來(lái)了。
我們假設(shè)一下。
我們?cè)O(shè)置 period 時(shí)間為 2 秒,而任務(wù)耗時(shí) 5 秒。
這個(gè)兩個(gè)方法的區(qū)別就體現(xiàn)出來(lái)了。
scheduleAtFixedRate 方法將會(huì)在上一個(gè)任務(wù)結(jié)束完畢立刻執(zhí)行,他和上一個(gè)任務(wù)的開(kāi)始執(zhí)行時(shí)間的間隔是 5 秒(因?yàn)楸仨毜却弦粋€(gè)任務(wù)執(zhí)行完畢)。
scheduleWithFixedDelay 方法將會(huì)在上一個(gè)任務(wù)結(jié)束后,注意:再等待 2 秒,才開(kāi)始執(zhí)行,那么他和上一個(gè)任務(wù)的開(kāi)始執(zhí)行時(shí)間的間隔是 7 秒。
所以,我們?cè)谑褂?ScheduledThreadPoolExecutor 的過(guò)程中需要注意任務(wù)的執(zhí)行時(shí)間不能超過(guò)間隔時(shí)間,如果超過(guò)了,最好使用scheduleAtFixedRate 方法,防止任務(wù)堆積。
當(dāng)然,也和具體的業(yè)務(wù)有關(guān)。不能一概而論。但一定要注意這兩個(gè)方法的區(qū)別。
5. scheduled 方法的實(shí)現(xiàn)
我們看看 scheduleAtFixedRate 方法的內(nèi)部實(shí)現(xiàn)。
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
創(chuàng)建一個(gè) ScheduledFutureTask 對(duì)象,然后裝飾一個(gè)這個(gè)Future ,該類(lèi)實(shí)現(xiàn)是直接返回,子類(lèi)可以有自己的實(shí)現(xiàn),在這個(gè)任務(wù)外裝飾一層。
然后執(zhí)行 delayedExecute 方法,最后返回 Future。
這個(gè) ScheduledFutureTask 實(shí)現(xiàn)了很多接口,比如 Future,Runnable, Comparable,Delayed 等。
ScheduledFutureTask 的構(gòu)造方法如下:
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
層層遞進(jìn),該類(lèi)首先通過(guò)一個(gè)原子靜態(tài) int對(duì)象這只任務(wù)的入隊(duì)編號(hào),然后創(chuàng)建一個(gè) Callable,這個(gè) Callable 是一個(gè)適配器,適配了Runnable和Callable,也就是將Runnable包裝成 callabe, 他的 call方法就是調(diào)用給定任務(wù)的 run 方法。當(dāng)然,這里的 result是沒(méi)有什么作用的。
如果你傳遞的是一個(gè) callable ,那么,就調(diào)用 FutureTask 的 run方法,設(shè)置真正的返回值。
這里使用了適配器模式,還是挺有趣的。
總的來(lái)說(shuō),這個(gè) ScheduledFutureTask 基于 FutureTask, 關(guān)于 FutureTask 我們之前從源碼介紹過(guò)了。
而他自己重寫(xiě)了幾個(gè)方法:compareTo, getDelay, run,isPeriodic4 個(gè)方法。
我們還是要看看 delayedExecute 的實(shí)現(xiàn)。
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
// 添加進(jìn)隊(duì)列。
super.getQueue().add(task);
// 如果線程池關(guān)閉了,且不可以在當(dāng)前狀態(tài)下運(yùn)行任務(wù),且從隊(duì)列刪除任務(wù)成功,就給任務(wù)打上取消標(biāo)記。
// 第二個(gè)判斷是由兩個(gè)變量控制的(下面是默認(rèn)值):
// continueExistingPeriodicTasksAfterShutdown = false 表示關(guān)閉的時(shí)候應(yīng)取消周期性任務(wù)。默認(rèn)關(guān)閉
// executeExistingDelayedTasksAfterShutdown = true。表示關(guān)閉的時(shí)候應(yīng)取消非周期性的任務(wù)。默認(rèn)不關(guān)閉。
// running 狀態(tài)下,canRunInCurrentRunState 必定返回 ture。
// 非 running 狀態(tài)下,canRunInCurrentRunState 根據(jù)上面的兩個(gè)值返回。
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 開(kāi)始執(zhí)行任務(wù)
ensurePrestart();
}
}
說(shuō)說(shuō)上面的方法。
- 判斷是否關(guān)閉,關(guān)閉則拒絕任務(wù)。
- 如果不是 關(guān)閉狀態(tài),則添加進(jìn)隊(duì)列,而添加隊(duì)列的順序我們之前講過(guò)了,根據(jù)
ScheduledFutureTask的compareTo方法來(lái)的,先比較執(zhí)行時(shí)間,再比較添加順序。 - 如果這個(gè)過(guò)程中線程池關(guān)閉了,則判斷此時(shí)是否應(yīng)該取消任務(wù),根據(jù)兩個(gè)變量來(lái)的,注釋里面寫(xiě)了。默認(rèn)的策略是,如果是周期性的任務(wù),就取消,反之不取消。
- 如果沒(méi)有關(guān)閉線程池。就調(diào)用線程池里的線程執(zhí)行任務(wù)。
整體的過(guò)程如圖:

注意上面的圖,如果是周期性的任務(wù),則會(huì)在執(zhí)行完畢后,歸還隊(duì)列。
從哪里可以看出來(lái)呢?
ScheduledFutureTask 的 run 方法:
public void run() {
// 是否是周期性任務(wù)
boolean periodic = isPeriodic();
// 如果不可以在當(dāng)前狀態(tài)下運(yùn)行,就取消任務(wù)(將這個(gè)任務(wù)的狀態(tài)設(shè)置為CANCELLED)。
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性的任務(wù),調(diào)用 FutureTask # run 方法
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性的。
// 執(zhí)行任務(wù),但不設(shè)置返回值,成功后返回 true。(callable 不可以重復(fù)執(zhí)行)
else if (ScheduledFutureTask.super.runAndReset()) {
// 設(shè)置下次執(zhí)行時(shí)間
setNextRunTime();
// 再次將任務(wù)添加到隊(duì)列中
reExecutePeriodic(outerTask);
}
}
邏輯如下:
- 如果不能再當(dāng)前狀態(tài)下運(yùn)行了,就取消這個(gè)任務(wù)。
- 如果不是周期性的任務(wù),就執(zhí)行
FutureTask的run方法。 - 如果是周期性的任務(wù),就需要執(zhí)行
runAndReset方法。 - 執(zhí)行完畢后,重寫(xiě)設(shè)置當(dāng)前任務(wù)的下次執(zhí)行時(shí)間,然后添加進(jìn)隊(duì)列中。
而管理整個(gè)執(zhí)行過(guò)程的就是ScheduledThreadPoolExecutor的父類(lèi) ThreadPoolExecutor的runWorker方法。其中,該方法會(huì)從隊(duì)列中取出數(shù)據(jù),也就是調(diào)用隊(duì)列的 take 方法。
關(guān)于DelayedWorkQueue 的take方法,其中有個(gè) leader 變量,如果 leader 不是空,說(shuō)明已經(jīng)有線程在等待了,那就阻塞當(dāng)前線程,如果是空,說(shuō)明,隊(duì)列的第一個(gè)元素已經(jīng)被更新了,就設(shè)置當(dāng)前線程為 leader.
這是一個(gè) Leader-Follower 模式,Doug Lea 說(shuō)的。
當(dāng)然,take方法整體的邏輯還是不變的。從隊(duì)列的頭部拿數(shù)據(jù)。使用 Condition 做線程之間的協(xié)調(diào)。
5. 總結(jié)
關(guān)于 ScheduledThreadPoolExecutor 調(diào)度類(lèi),我們分析的差不多了,總結(jié)一下。
ScheduledThreadPoolExecutor 是個(gè)定時(shí)任務(wù)線程池,類(lèi)似 Timer,但是比 Timer強(qiáng)大,健壯。
比如不會(huì)像 Timer 那樣,任務(wù)異常了,整個(gè)調(diào)度系統(tǒng)就徹底無(wú)用了。
也比Timer 多了Rate 模式(Rate 和 Delay)。
這兩種模式的區(qū)別就是任務(wù)執(zhí)行的起點(diǎn)時(shí)間不同,Rate是從上一個(gè)任務(wù)的開(kāi)始執(zhí)行時(shí)間開(kāi)始計(jì)算;Delay 是從上一個(gè)任務(wù)的結(jié)束時(shí)間開(kāi)始計(jì)算。
因此,如果任務(wù)本身的時(shí)間超過(guò)了間隔時(shí)間,那么這兩種模式的間隔時(shí)間將會(huì)不一致。
而任務(wù)的排序是通過(guò) ScheduledFutureTask的 compareTo方法排序的,規(guī)則是先比較執(zhí)行時(shí)間,如果時(shí)間相同,再比較加入時(shí)間。
還要注意一點(diǎn)就是:如果任務(wù)執(zhí)行過(guò)程中異常了,那么將不會(huì)再次重復(fù)執(zhí)行。因?yàn)?ScheduledFutureTask的 run方法沒(méi)有做catch處理。所以程序員需要手動(dòng)處理,相對(duì)于 Timer 異常就直接費(fèi)了調(diào)度系統(tǒng)來(lái)說(shuō),要好很多。
ScheduledThreadPoolExecutor 的是實(shí)現(xiàn)基于 ThreadPoolExecutor,大部分功能都是重用的父類(lèi),只是自己在執(zhí)行完畢之后,重新設(shè)置時(shí)間,并再次將任務(wù)還到了隊(duì)列中,形成了定時(shí)任務(wù)。