博客鏈接:http://www.ideabuffer.cn/2017/04/14/深入理解Java線程池:ScheduledThreadPoolExecutor/
介紹
自JDK1.5開始,JDK提供了ScheduledThreadPoolExecutor類來支持周期性任務的調(diào)度。在這之前的實現(xiàn)需要依靠Timer和TimerTask或者其它第三方工具來完成。但Timer有不少的缺陷:
- Timer是單線程模式;
- 如果在執(zhí)行任務期間某個TimerTask耗時較久,那么就會影響其它任務的調(diào)度;
- Timer的任務調(diào)度是基于絕對時間的,對系統(tǒng)時間敏感;
- Timer不會捕獲執(zhí)行TimerTask時所拋出的異常,由于Timer是單線程,所以一旦出現(xiàn)異常,則線程就會終止,其他任務也得不到執(zhí)行。
ScheduledThreadPoolExecutor繼承ThreadPoolExecutor來重用線程池的功能,它的實現(xiàn)方式如下:
- 將任務封裝成ScheduledFutureTask對象,ScheduledFutureTask基于相對時間,不受系統(tǒng)時間的改變所影響;
- ScheduledFutureTask實現(xiàn)了
java.lang.Comparable接口和java.util.concurrent.Delayed接口,所以有兩個重要的方法:compareTo和getDelay。compareTo方法用于比較任務之間的優(yōu)先級關系,如果距離下次執(zhí)行的時間間隔較短,則優(yōu)先級高;getDelay方法用于返回距離下次任務執(zhí)行時間的時間間隔; - ScheduledThreadPoolExecutor定義了一個DelayedWorkQueue,它是一個有序隊列,會通過每個任務按照距離下次執(zhí)行時間間隔的大小來排序;
- ScheduledFutureTask繼承自FutureTask,可以通過返回Future對象來獲取執(zhí)行的結(jié)果。
通過如上的介紹,可以對比一下Timer和ScheduledThreadPoolExecutor:
| Timer | ScheduledThreadPoolExecutor |
|---|---|
| 單線程 | 多線程 |
| 單個任務執(zhí)行時間影響其他任務調(diào)度 | 多線程,不會影響 |
| 基于絕對時間 | 基于相對時間 |
| 一旦執(zhí)行任務出現(xiàn)異常不會捕獲,其他任務得不到執(zhí)行 | 多線程,單個任務的執(zhí)行不會影響其他線程 |
所以,在JDK1.5之后,應該沒什么理由繼續(xù)使用Timer進行任務調(diào)度了。
ScheduledThreadPoolExecutor的使用
下面用一個具體的例子來說明ScheduledThreadPoolExecutor的使用:
public class ScheduledThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
// 創(chuàng)建大小為5的線程池
ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
for (int i = 0; i < 3; i++) {
Task worker = new Task("task-" + i);
// 只執(zhí)行一次
// scheduledThreadPool.schedule(worker, 5, TimeUnit.SECONDS);
// 周期性執(zhí)行,每5秒執(zhí)行一次
scheduledThreadPool.scheduleAtFixedRate(worker, 0,5, TimeUnit.SECONDS);
}
Thread.sleep(10000);
System.out.println("Shutting down executor...");
// 關閉線程池
scheduledThreadPool.shutdown();
boolean isDone;
// 等待線程池終止
do {
isDone = scheduledThreadPool.awaitTermination(1, TimeUnit.DAYS);
System.out.println("awaitTermination...");
} while(!isDone);
System.out.println("Finished all threads");
}
}
class Task implements Runnable {
private String name;
public Task(String name) {
this.name = name;
}
@Override
public void run() {
System.out.println("name = " + name + ", startTime = " + new Date());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("name = " + name + ", endTime = " + new Date());
}
}
下面就來具體分析一下ScheduledThreadPoolExecutor的實現(xiàn)過程。
ScheduledThreadPoolExecutor的實現(xiàn)
ScheduledThreadPoolExecutor的類結(jié)構(gòu)
看下ScheduledThreadPoolExecutor內(nèi)部的類圖:

不要被這么多類嚇到,這里只不過是為了更清楚的了解ScheduledThreadPoolExecutor有關調(diào)度和隊列的接口。
ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,實現(xiàn)了ScheduledExecutorService接口,該接口定義了schedule等任務調(diào)度的方法。
同時ScheduledThreadPoolExecutor有兩個重要的內(nèi)部類:DelayedWorkQueue和ScheduledFutureTask??梢钥吹?,DelayeddWorkQueue是一個阻塞隊列,而ScheduledFutureTask繼承自FutureTask,并且實現(xiàn)了Delayed接口。有關FutureTask的介紹請參考另一篇文章:FutureTask源碼解析。
ScheduledThreadPoolExecutor的構(gòu)造方法
ScheduledThreadPoolExecutor有3中構(gòu)造方法:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), handler);
}
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
因為ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以這里都是調(diào)用的ThreadPoolExecutor類的構(gòu)造方法。有關ThreadPoolExecutor可以參考深入理解Java線程池:ThreadPoolExecutor。
這里注意傳入的阻塞隊列是DelayedWorkQueue類型的對象。后面會詳細介紹。
schedule方法
在上文的例子中,使用了schedule方法來進行任務調(diào)度,schedule方法的代碼如下:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
首先,這里的兩個重載的schedule方法只是傳入的第一個參數(shù)不同,可以是Runnable對象或者Callable對象。會把傳入的任務封裝成一個RunnableScheduledFuture對象,其實也就是ScheduledFutureTask對象,decorateTask默認什么功能都沒有做,子類可以重寫該方法:
/**
* 修改或替換用于執(zhí)行 runnable 的任務。此方法可重寫用于管理內(nèi)部任務的具體類。默認實現(xiàn)只返回給定任務。
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Runnable runnable, RunnableScheduledFuture<V> task) {
return task;
}
/**
* 修改或替換用于執(zhí)行 callable 的任務。此方法可重寫用于管理內(nèi)部任務的具體類。默認實現(xiàn)只返回給定任務。
*/
protected <V> RunnableScheduledFuture<V> decorateTask(
Callable<V> callable, RunnableScheduledFuture<V> task) {
return task;
}
然后,通過調(diào)用delayedExecute方法來延時執(zhí)行任務。
最后,返回一個ScheduledFuture對象。
scheduleAtFixedRate方法
該方法設置了執(zhí)行周期,下一次執(zhí)行時間相當于是上一次的執(zhí)行時間加上period,它是采用已固定的頻率來執(zhí)行任務:
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
scheduleWithFixedDelay方法
該方法設置了執(zhí)行周期,與scheduleAtFixedRate方法不同的是,下一次執(zhí)行時間是上一次任務執(zhí)行完的系統(tǒng)時間加上period,因而具體執(zhí)行時間不是固定的,但周期是固定的,是采用相對固定的延遲來執(zhí)行任務:
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
注意這里的unit.toNanos(-delay));,這里把周期設置為負數(shù)來表示是相對固定的延遲執(zhí)行。
scheduleAtFixedRate和scheduleWithFixedDelay的區(qū)別在setNextRunTime方法中就可以看出來:
private void setNextRunTime() {
long p = period;
// 固定頻率,上次執(zhí)行時間加上周期時間
if (p > 0)
time += p;
// 相對固定延遲執(zhí)行,使用當前系統(tǒng)時間加上周期時間
else
time = triggerTime(-p);
}
setNextRunTime方法會在run方法中執(zhí)行完任務后調(diào)用。
triggerTime方法
triggerTime方法用于獲取下一次執(zhí)行的具體時間:
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
這里的delay < (Long.MAX_VALUE >> 1是為了判斷是否要防止Long類型溢出,如果delay的值小于Long類型最大值的一半,則直接返回delay,否則需要進行防止溢出處理。
overflowFree方法
該方法的作用是限制隊列中所有節(jié)點的延遲時間在Long.MAX_VALUE之內(nèi),防止在compareTo方法中溢出。
private long overflowFree(long delay) {
// 獲取隊列中的第一個節(jié)點
Delayed head = (Delayed) super.getQueue().peek();
if (head != null) {
// 獲取延遲時間
long headDelay = head.getDelay(NANOSECONDS);
// 如果延遲時間小于0,并且 delay - headDelay 超過了Long.MAX_VALUE
// 將delay設置為 Long.MAX_VALUE + headDelay 保證delay小于Long.MAX_VALUE
if (headDelay < 0 && (delay - headDelay < 0))
delay = Long.MAX_VALUE + headDelay;
}
return delay;
}
當一個任務已經(jīng)可以執(zhí)行出隊操作,但還沒有執(zhí)行,可能由于線程池中的工作線程不是空閑的。具體分析一下這種情況:
- 為了方便說明,假設Long.MAX_VALUE=1023,也就是11位,并且當前的時間是100,調(diào)用triggerTime時并沒有對delay進行判斷,而是直接返回了
now() + delay,也就是相當于100 + 1023,這肯定是溢出了,那么返回的時間是-925; - 如果頭節(jié)點已經(jīng)可以出隊但是還沒有執(zhí)行出隊,那么頭節(jié)點的執(zhí)行時間應該是小于當前時間的,假設是95;
- 這時調(diào)用offer方法向隊列中添加任務,在offer方法中會調(diào)用siftUp方法來排序,在siftUp方法執(zhí)行時又會調(diào)用ScheduledFutureTask中的compareTo方法來比較執(zhí)行時間;
- 這時如果執(zhí)行到了compareTo方法中的
long diff = time - x.time;時,那么計算后的結(jié)果就是-925 - 95 = -1020,那么將返回-1,而正常情況應該是返回1,因為新加入的任務的執(zhí)行時間要比頭結(jié)點的執(zhí)行時間要晚,這就不是我們想要的結(jié)果了,這會導致隊列中的順序不正確。 - 同理也可以算一下在執(zhí)行compareTo方法中的
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);時也會有這種情況; - 所以在triggerTime方法中對delay的大小做了判斷,就是為了防止這種情況發(fā)生。
如果執(zhí)行了overflowFree方法呢,這時headDelay = 95 - 100 = -5,然后執(zhí)行delay = 1023 + (-5) = 1018,那么triggerTime會返回100 + 1018 = -930,再執(zhí)行compareTo方法中的long diff = time - x.time;時,diff = -930 - 95 = -930 - 100 + 5 = 1018 + 5 = 1023,沒有溢出,符合正常的預期。
所以,overflowFree方法中把已經(jīng)超時的部分時間給減去,就是為了避免在compareTo方法中出現(xiàn)溢出情況。
(說實話,這段代碼看的很痛苦,一般情況下也不會發(fā)生這種情況,誰會傳一個Long.MAX_VALUE呢。要知道Long.MAX_VALUE的納秒數(shù)換算成年的話是292年,誰會這么無聊。。。)
ScheduledFutureTask的getDelay方法
public long getDelay(TimeUnit unit) {
// 執(zhí)行時間減去當前系統(tǒng)時間
return unit.convert(time - now(), NANOSECONDS);
}
ScheduledFutureTask的構(gòu)造方法
ScheduledFutureTask繼承自FutureTask并實現(xiàn)了RunnableScheduledFuture接口,具體可以參考上文的類圖,構(gòu)造方法如下:
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* Creates a periodic action with given nano time and period.
*/
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
/**
* Creates a one-shot action with given nanoTime-based trigger time.
*/
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
這里面有幾個重要的屬性,下面來解釋一下:
- time:下次任務執(zhí)行時的時間;
- period:執(zhí)行周期;
- sequenceNumber:保存任務被添加到ScheduledThreadPoolExecutor中的序號。
在schedule方法中,創(chuàng)建完ScheduledFutureTask對象之后,會執(zhí)行delayedExecute方法來執(zhí)行任務。
delayedExecute方法
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果線程池已經(jīng)關閉,使用拒絕策略拒絕任務
if (isShutdown())
reject(task);
else {
// 添加到阻塞隊列中
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 確保線程池中至少有一個線程啟動,即使corePoolSize為0
// 該方法在ThreadPoolExecutor中實現(xiàn)
ensurePrestart();
}
}
說一下這里的第二個if判斷:
- 如果不是SHUTDOWN狀態(tài),執(zhí)行else,否則執(zhí)行步驟2;
- 如果在當前線程池運行狀態(tài)下可以執(zhí)行任務,執(zhí)行else,否則執(zhí)行步驟3;
- 從阻塞隊列中刪除任務,如果失敗,執(zhí)行else,否則執(zhí)行步驟4;
- 取消任務,但不中斷執(zhí)行中的任務。
對于步驟2,可以通過setContinueExistingPeriodicTasksAfterShutdownPolicy方法設置在線程池關閉時,周期任務繼續(xù)執(zhí)行,默認為false,也就是線程池關閉時,不再執(zhí)行周期任務。
ensurePrestart方法在ThreadPoolExecutor中定義:
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
調(diào)用了addWorker方法,可以在深入理解Java線程池:ThreadPoolExecutor中查看addWorker方法的介紹,線程池中的工作線程是通過該方法來啟動并執(zhí)行任務的。
ScheduledFutureTask的run方法
回顧一下線程池的執(zhí)行過程:當線程池中的工作線程啟動時,不斷地從阻塞隊列中取出任務并執(zhí)行,當然,取出的任務實現(xiàn)了Runnable接口,所以是通過調(diào)用任務的run方法來執(zhí)行任務的。
這里的任務類型是ScheduledFutureTask,所以下面看一下ScheduledFutureTask的run方法:
public void run() {
// 是否是周期性任務
boolean periodic = isPeriodic();
// 當前線程池運行狀態(tài)下如果不可以執(zhí)行任務,取消該任務
if (!canRunInCurrentRunState(periodic))
cancel(false);
// 如果不是周期性任務,調(diào)用FutureTask中的run方法執(zhí)行
else if (!periodic)
ScheduledFutureTask.super.run();
// 如果是周期性任務,調(diào)用FutureTask中的runAndReset方法執(zhí)行
// runAndReset方法不會設置執(zhí)行結(jié)果,所以可以重復執(zhí)行任務
else if (ScheduledFutureTask.super.runAndReset()) {
// 計算下次執(zhí)行該任務的時間
setNextRunTime();
// 重復執(zhí)行任務
reExecutePeriodic(outerTask);
}
}
有關FutureTask的run方法和runAndReset方法,可以參考FutureTask源碼解析。
分析一下執(zhí)行過程:
- 如果當前線程池運行狀態(tài)不可以執(zhí)行任務,取消該任務,然后直接返回,否則執(zhí)行步驟2;
- 如果不是周期性任務,調(diào)用FutureTask中的run方法執(zhí)行,會設置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟3;
- 如果是周期性任務,調(diào)用FutureTask中的runAndReset方法執(zhí)行,不會設置執(zhí)行結(jié)果,然后直接返回,否則執(zhí)行步驟4和步驟5;
- 計算下次執(zhí)行該任務的具體時間;
- 重復執(zhí)行任務。
ScheduledFutureTask的reExecutePeriodic方法
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
該方法和delayedExecute方法類似,不同的是:
- 由于調(diào)用reExecutePeriodic方法時已經(jīng)執(zhí)行過一次周期性任務了,所以不會reject當前任務;
- 傳入的任務一定是周期性任務。
onShutdown方法
onShutdown方法是ThreadPoolExecutor中的鉤子方法,在ThreadPoolExecutor中什么都沒有做,參考深入理解Java線程池:ThreadPoolExecutor,該方法是在執(zhí)行shutdown方法時被調(diào)用:
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
// 獲取在線程池已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有延遲任務
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy();
// 獲取在線程池已 shutdown 的情況下是否繼續(xù)執(zhí)行現(xiàn)有定期任務
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy();
// 如果在線程池已 shutdown 的情況下不繼續(xù)執(zhí)行延遲任務和定期任務
// 則依次取消任務,否則則根據(jù)取消狀態(tài)來判斷
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// Traverse snapshot to avoid iterator exceptions
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
// 如果有在 shutdown 后不繼續(xù)的延遲任務或周期任務,則從隊列中刪除并取消任務
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t))
t.cancel(false);
}
}
}
}
tryTerminate();
}
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己實現(xiàn)阻塞的工作隊列,是因為ScheduledThreadPoolExecutor要求的工作隊列有些特殊。
DelayedWorkQueue是一個基于堆的數(shù)據(jù)結(jié)構(gòu),類似于DelayQueue和PriorityQueue。在執(zhí)行定時任務的時候,每個任務的執(zhí)行時間都不同,所以DelayedWorkQueue的工作就是按照執(zhí)行時間的升序來排列,執(zhí)行時間距離當前時間越近的任務在隊列的前面(注意:這里的順序并不是絕對的,堆中的排序只保證了子節(jié)點的下次執(zhí)行時間要比父節(jié)點的下次執(zhí)行時間要大,而葉子節(jié)點之間并不一定是順序的,下文中會說明)。
堆結(jié)構(gòu)如下圖所示:

可見,DelayedWorkQueue是一個基于最小堆結(jié)構(gòu)的隊列。堆結(jié)構(gòu)可以使用數(shù)組表示,可以轉(zhuǎn)換成如下的數(shù)組:

在這種結(jié)構(gòu)中,可以發(fā)現(xiàn)有如下特性:
假設,索引值從0開始,子節(jié)點的索引值為k,父節(jié)點的索引值為p,則:
- 一個節(jié)點的左子節(jié)點的索引為:k = p * 2 + 1;
- 一個節(jié)點的右子節(jié)點的索引為:k = (p + 1) * 2;
- 一個節(jié)點的父節(jié)點的索引為:p = (k - 1) / 2。
為什么要使用DelayedWorkQueue呢?
定時任務執(zhí)行時需要取出最近要執(zhí)行的任務,所以任務在隊列中每次出隊時一定要是當前隊列中執(zhí)行時間最靠前的,所以自然要使用優(yōu)先級隊列。
DelayedWorkQueue是一個優(yōu)先級隊列,它可以保證每次出隊的任務都是當前隊列中執(zhí)行時間最靠前的,由于它是基于堆結(jié)構(gòu)的隊列,堆結(jié)構(gòu)在執(zhí)行插入和刪除操作時的最壞時間復雜度是 O(logN)。
DelayedWorkQueue的屬性
// 隊列初始容量
private static final int INITIAL_CAPACITY = 16;
// 根據(jù)初始容量創(chuàng)建RunnableScheduledFuture類型的數(shù)組
private RunnableScheduledFuture<?>[] queue =
new RunnableScheduledFuture<?>[INITIAL_CAPACITY];
private final ReentrantLock lock = new ReentrantLock();
private int size = 0;
// leader線程
private Thread leader = null;
// 當較新的任務在隊列的頭部可用時,或者新線程可能需要成為leader,則通過該條件發(fā)出信號
private final Condition available = lock.newCondition();
注意這里的leader,它是Leader-Follower模式的變體,用于減少不必要的定時等待。什么意思呢?對于多線程的網(wǎng)絡模型來說:
所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態(tài):proccesser。它的基本原則就是,永遠最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產(chǎn)生一個Leader負責等待網(wǎng)絡IO事件,當有一個事件產(chǎn)生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網(wǎng)絡事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態(tài)內(nèi)存分配和線程間的數(shù)據(jù)交換。
參考自:http://blog.csdn.net/goldlevi/article/details/7705180
具體leader的作用在分析take方法時再詳細介紹。
offer方法
既然是阻塞隊列,入隊的操作如add和put方法都調(diào)用了offer方法,下面查看一下offer方法:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture<?> e = (RunnableScheduledFuture<?>)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
// queue是一個RunnableScheduledFuture類型的數(shù)組,如果容量不夠需要擴容
if (i >= queue.length)
grow();
size = i + 1;
// i == 0 說明堆中還沒有數(shù)據(jù)
if (i == 0) {
queue[0] = e;
setIndex(e, 0);
} else {
// i != 0 時,需要對堆進行重新排序
siftUp(i, e);
}
// 如果傳入的任務已經(jīng)是隊列的第一個節(jié)點了,這時available需要發(fā)出信號
if (queue[0] == e) {
// leader設置為null為了使在take方法中的線程在通過available.signal();后會執(zhí)行available.awaitNanos(delay);
leader = null;
available.signal();
}
} finally {
lock.unlock();
}
return true;
}
有關Condition的介紹請參考深入理解AbstractQueuedSynchronizer(三)
這里的重點是siftUp方法。
siftUp方法
private void siftUp(int k, RunnableScheduledFuture<?> key) {
while (k > 0) {
// 找到父節(jié)點的索引
int parent = (k - 1) >>> 1;
// 獲取父節(jié)點
RunnableScheduledFuture<?> e = queue[parent];
// 如果key節(jié)點的執(zhí)行時間大于父節(jié)點的執(zhí)行時間,不需要再排序了
if (key.compareTo(e) >= 0)
break;
// 如果key.compareTo(e) < 0,說明key節(jié)點的執(zhí)行時間小于父節(jié)點的執(zhí)行時間,需要把父節(jié)點移到后面
queue[k] = e;
// 設置索引為k
setIndex(e, k);
k = parent;
}
// key設置為排序后的位置中
queue[k] = key;
setIndex(key, k);
}
代碼很好理解,就是循環(huán)的根據(jù)key節(jié)點與它的父節(jié)點來判斷,如果key節(jié)點的執(zhí)行時間小于父節(jié)點,則將兩個節(jié)點交換,使執(zhí)行時間靠前的節(jié)點排列在隊列的前面。
假設新入隊的節(jié)點的延遲時間(調(diào)用getDelay()方法獲得)是5,執(zhí)行過程如下:
-
先將新的節(jié)點添加到數(shù)組的尾部,這時新節(jié)點的索引k為7:
-
計算新父節(jié)點的索引:parent = (k - 1) >>> 1,parent = 3,那么queue[3]的時間間隔值為8,因為 5 < 8 ,將執(zhí)行queue[7] = queue[3]::
-
這時將k設置為3,繼續(xù)循環(huán),再次計算parent為1,queue[1]的時間間隔為3,因為 5 > 3 ,這時退出循環(huán),最終k為3:
可見,每次新增節(jié)點時,只是根據(jù)父節(jié)點來判斷,而不會影響兄弟節(jié)點。
另外,setIndex方法只是設置了ScheduledFutureTask中的heapIndex屬性:
private void setIndex(RunnableScheduledFuture<?> f, int idx) {
if (f instanceof ScheduledFutureTask)
((ScheduledFutureTask)f).heapIndex = idx;
}
take方法
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
// 計算當前時間到執(zhí)行時間的時間間隔
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
// leader不為空,阻塞線程
if (leader != null)
available.await();
else {
// leader為空,則把leader設置為當前線程,
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 阻塞到執(zhí)行時間
available.awaitNanos(delay);
} finally {
// 設置leader = null,讓其他線程執(zhí)行available.awaitNanos(delay);
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
// 如果leader不為空,則說明leader的線程正在執(zhí)行available.awaitNanos(delay);
// 如果queue[0] == null,說明隊列為空
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
take方法是什么時候調(diào)用的呢?在深入理解Java線程池:ThreadPoolExecutor中,介紹了getTask方法,工作線程會循環(huán)地從workQueue中取任務。但定時任務卻不同,因為如果一旦getTask方法取出了任務就開始執(zhí)行了,而這時可能還沒有到執(zhí)行的時間,所以在take方法中,要保證只有在到指定的執(zhí)行時間的時候任務才可以被取走。
再來說一下leader的作用,這里的leader是為了減少不必要的定時等待,當一個線程成為leader時,它只等待下一個節(jié)點的時間間隔,但其它線程無限期等待。 leader線程必須在從take()或poll()返回之前signal其它線程,除非其他線程成為了leader。
舉例來說,如果沒有l(wèi)eader,那么在執(zhí)行take時,都要執(zhí)行available.awaitNanos(delay),假設當前線程執(zhí)行了該段代碼,這時還沒有signal,第二個線程也執(zhí)行了該段代碼,則第二個線程也要被阻塞。多個這時執(zhí)行該段代碼是沒有作用的,因為只能有一個線程會從take中返回queue[0](因為有l(wèi)ock),其他線程這時再返回for循環(huán)執(zhí)行時取的queue[0],已經(jīng)不是之前的queue[0]了,然后又要繼續(xù)阻塞。
所以,為了不讓多個線程頻繁的做無用的定時等待,這里增加了leader,如果leader不為空,則說明隊列中第一個節(jié)點已經(jīng)在等待出隊,這時其它的線程會一直阻塞,減少了無用的阻塞(注意,在finally中調(diào)用了signal()來喚醒一個線程,而不是signalAll())。
poll方法
下面看下poll方法,與take類似,但這里要提供超時功能:
public RunnableScheduledFuture<?> poll(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null) {
if (nanos <= 0)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
long delay = first.getDelay(NANOSECONDS);
// 如果delay <= 0,說明已經(jīng)到了任務執(zhí)行的時間,返回。
if (delay <= 0)
return finishPoll(first);
// 如果nanos <= 0,說明已經(jīng)超時,返回null
if (nanos <= 0)
return null;
first = null; // don't retain ref while waiting
// nanos < delay 說明需要等待的時間小于任務要執(zhí)行的延遲時間
// leader != null 說明有其它線程正在對任務進行阻塞
// 這時阻塞當前線程nanos納秒
if (nanos < delay || leader != null)
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 這里的timeLeft表示delay減去實際的等待時間
long timeLeft = available.awaitNanos(delay);
// 計算剩余的等待時間
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
finishPoll方法
當調(diào)用了take或者poll方法能夠獲取到任務時,會調(diào)用該方法進行返回:
private RunnableScheduledFuture<?> finishPoll(RunnableScheduledFuture<?> f) {
// 數(shù)組長度-1
int s = --size;
// 取出最后一個節(jié)點
RunnableScheduledFuture<?> x = queue[s];
queue[s] = null;
// 長度不為0,則從第一個元素開始排序,目的是要把最后一個節(jié)點放到合適的位置上
if (s != 0)
siftDown(0, x);
setIndex(f, -1);
return f;
}
siftDown方法
siftDown方法使堆從k開始向下調(diào)整:
private void siftDown(int k, RunnableScheduledFuture<?> key) {
// 根據(jù)二叉樹的特性,數(shù)組長度除以2,表示取有子節(jié)點的索引
int half = size >>> 1;
// 判斷索引為k的節(jié)點是否有子節(jié)點
while (k < half) {
// 左子節(jié)點的索引
int child = (k << 1) + 1;
RunnableScheduledFuture<?> c = queue[child];
// 右子節(jié)點的索引
int right = child + 1;
// 如果有右子節(jié)點并且左子節(jié)點的時間間隔大于右子節(jié)點,取時間間隔最小的節(jié)點
if (right < size && c.compareTo(queue[right]) > 0)
c = queue[child = right];
// 如果key的時間間隔小于等于c的時間間隔,跳出循環(huán)
if (key.compareTo(c) <= 0)
break;
// 設置要移除索引的節(jié)點為其子節(jié)點
queue[k] = c;
setIndex(c, k);
k = child;
}
// 將key放入索引為k的位置
queue[k] = key;
setIndex(key, k);
}
siftDown方法執(zhí)行時包含兩種情況,一種是沒有子節(jié)點,一種是有子節(jié)點(根據(jù)half判斷)。例如:
沒有子節(jié)點的情況:
假設初始的堆如下:

假設 k = 3 ,那么 k = half ,沒有子節(jié)點,在執(zhí)行siftDown方法時直接把索引為3的節(jié)點設置為數(shù)組的最后一個節(jié)點:

有子節(jié)點的情況:
假設 k = 0 ,那么執(zhí)行以下步驟:
-
獲取左子節(jié)點,child = 1 ,獲取右子節(jié)點, right = 2 :
- 由于
right < size,這時比較左子節(jié)點和右子節(jié)點時間間隔的大小,這里 3 < 7 ,所以 c = queue[child] ; -
比較key的時間間隔是否小于c的時間間隔,這里不滿足,繼續(xù)執(zhí)行,把索引為k的節(jié)點設置為c,然后將k設置為child,;
-
因為 half = 3 ,k = 1 ,繼續(xù)執(zhí)行循環(huán),這時的索引變?yōu)椋?/p>
-
這時再經(jīng)過如上判斷后,將k的值為3,最終的結(jié)果如下:
-
最后,如果在finishPoll方法中調(diào)用的話,會把索引為0的節(jié)點的索引設置為-1,表示已經(jīng)刪除了該節(jié)點,并且size也減了1,最后的結(jié)果如下:
可見,siftdown方法在執(zhí)行完并不是有序的,但可以發(fā)現(xiàn),子節(jié)點的下次執(zhí)行時間一定比父節(jié)點的下次執(zhí)行時間要大,由于每次都會取左子節(jié)點和右子節(jié)點中下次執(zhí)行時間最小的節(jié)點,所以還是可以保證在take和poll時出隊是有序的。
remove方法
public boolean remove(Object x) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = indexOf(x);
if (i < 0)
return false;
setIndex(queue[i], -1);
int s = --size;
RunnableScheduledFuture<?> replacement = queue[s];
queue[s] = null;
if (s != i) {
// 從i開始向下調(diào)整
siftDown(i, replacement);
// 如果queue[i] == replacement,說明i是葉子節(jié)點
// 如果是這種情況,不能保證子節(jié)點的下次執(zhí)行時間比父節(jié)點的大
// 這時需要進行一次向上調(diào)整
if (queue[i] == replacement)
siftUp(i, replacement);
}
return true;
} finally {
lock.unlock();
}
}
假設初始的堆結(jié)構(gòu)如下:

這時要刪除8的節(jié)點,那么這時 k = 1,key為最后一個節(jié)點:

這時通過上文對siftDown方法的分析,siftDown方法執(zhí)行后的結(jié)果如下:

這時會發(fā)現(xiàn),最后一個節(jié)點的值比父節(jié)點還要小,所以這里要執(zhí)行一次siftUp方法來保證子節(jié)點的下次執(zhí)行時間要比父節(jié)點的大,所以最終結(jié)果如下:

總結(jié)
本文詳細分析了ScheduedThreadPoolExecutor的實現(xiàn),主要介紹了以下方面:
- 與Timer執(zhí)行定時任務的比較,相比Timer,ScheduedThreadPoolExecutor有什么優(yōu)點;
- ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor,所以它也是一個線程池,也有coorPoolSize和workQueue,ScheduledThreadPoolExecutor特殊的地方在于,自己實現(xiàn)了優(yōu)先工作隊列DelayedWorkQueue;
- ScheduedThreadPoolExecutor實現(xiàn)了ScheduledExecutorService,所以就有了任務調(diào)度的方法,如schedule,scheduleAtFixedRate和scheduleWithFixedDelay,同時注意他們之間的區(qū)別;
- 內(nèi)部類ScheduledFutureTask繼承自FutureTask,實現(xiàn)了任務的異步執(zhí)行并且可以獲取返回結(jié)果。同時也實現(xiàn)了Delayed接口,可以通過getDelay方法獲取將要執(zhí)行的時間間隔;
- 周期任務的執(zhí)行其實是調(diào)用了FutureTask類中的runAndReset方法,每次執(zhí)行完不設置結(jié)果和狀態(tài)。參考FutureTask源碼解析;
- 詳細分析了DelayedWorkQueue的數(shù)據(jù)結(jié)構(gòu),它是一個基于最小堆結(jié)構(gòu)的優(yōu)先隊列,并且每次出隊時能夠保證取出的任務是當前隊列中下次執(zhí)行時間最小的任務。同時注意一下優(yōu)先隊列中堆的順序,堆中的順序并不是絕對的,但要保證子節(jié)點的值要比父節(jié)點的值要大,這樣就不會影響出隊的順序。
總體來說,ScheduedThreadPoolExecutor的重點是要理解下次執(zhí)行時間的計算,以及優(yōu)先隊列的出隊、入隊和刪除的過程,這兩個是理解ScheduedThreadPoolExecutor的關鍵。







