scheduledthreadpool是JDK自帶的一個(gè)定時(shí)調(diào)度任務(wù)的實(shí)現(xiàn),通過(guò)它可以實(shí)現(xiàn)定時(shí)的循環(huán)調(diào)度,最近在看線程池的源碼,順便也把它看了一下,發(fā)現(xiàn)里面的實(shí)現(xiàn)真的是很精彩,干貨很多。
首先,scheduledthreadpool是繼承自ThreadPoolExecutor,也就是說(shuō)它具有線程池的一些特性,它也正是利用線程池實(shí)現(xiàn)了任務(wù)的調(diào)度。
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given core pool size.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @throws IllegalArgumentException if {@code corePoolSize < 0}
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
可以發(fā)現(xiàn)ScheduledThreadPoolExecutor只是用了核心線程池,同時(shí)它的任務(wù)隊(duì)列是采用了DelayedWorkQueue去實(shí)現(xiàn)。對(duì)于ThreadPoolExecutor不熟悉的,可以翻翻我之前的筆記,有對(duì)線程池很詳細(xì)的解釋。
接下來(lái)看任務(wù)提交的代碼,以定時(shí)循環(huán)調(diào)度為例:
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
command表示要提交的線程。
initialDelay表示初始化時(shí)的延遲。
period表示調(diào)度周期。
unit為時(shí)間單位。
首先將command等參數(shù)包裝成ScheduledFutureTask類,這個(gè)類是任務(wù)調(diào)度的基本單位。
triggerTime方法主要是用來(lái)計(jì)算下次被調(diào)度的時(shí)間:
/**
* Returns the trigger time of a delayed action.
*/
private long triggerTime(long delay, TimeUnit unit) {
return triggerTime(unit.toNanos((delay < 0) ? 0 : delay));
}
/**
* Returns the trigger time of a delayed action.
*/
long triggerTime(long delay) {
return now() +
((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}
我們一會(huì)兒在回頭看ScheduledFutureTask,先看任務(wù)的提交delayedExecute(t):
/**
* Main execution method for delayed or periodic tasks. If pool
* is shut down, rejects the task. Otherwise adds task to queue
* and starts a thread, if necessary, to run it. (We cannot
* prestart the thread to run the task because the task (probably)
* shouldn't be run yet,) If the pool is shut down while the task
* is being added, cancel and remove it if required by state and
* run-after-shutdown parameters.
*
* @param task the task
*/
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown())
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
這里主要做了一個(gè)任務(wù)入隊(duì)的操作, super.getQueue().add(task);這個(gè)Queue就是我們剛才看到的DelayedWorkQueue,接下來(lái)ensurePrestart()是判斷當(dāng)前核心線程池的大小,如果過(guò)少,那么增加核心線程。保證任務(wù)及時(shí)運(yùn)行。
程序很短,但是我們并沒(méi)有看到任務(wù)是怎么跑起來(lái),怎么被調(diào)度的。那這里其實(shí)有個(gè)前提的知識(shí)就是,當(dāng)任務(wù)添加到Queue之中后,那么線程池中的線程會(huì)不斷的去Queue中獲取任務(wù),那么我們看一下Queue的offer方法和take方法,是怎么放入、怎么取出的:
public boolean offer(Runnable x) {
if (x == null)
throw new NullPointerException();
RunnableScheduledFuture e = (RunnableScheduledFuture)x;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int i = size;
if (i >= queue.length)
grow();//擴(kuò)容
size = i + 1;
if (i == 0) {//如果是第一個(gè)元素
queue[0] = e;
setIndex(e, 0);
} else {//如果不是,那么進(jìn)行上濾操作,保證堆有序
siftUp(i, e);
}
//如果當(dāng)前新增加的元素在堆頂,那么可能是最新要執(zhí)行的
if (queue[0] == e) {
leader = null;
available.signal();//發(fā)一個(gè)信號(hào),通知take的時(shí)候的線程,趕緊檢測(cè)新加入的task
}
} finally {
lock.unlock();
}
return true;
}
這是DelayedWorkQueue的offer方法,DelayedWorkQueue里面是使用數(shù)組去維護(hù)任務(wù)隊(duì)列的,那么數(shù)組是怎么保證任務(wù)有序呢?
其實(shí)仔細(xì)看代碼,我們能發(fā)現(xiàn),這里的實(shí)現(xiàn)是用一個(gè)二叉堆去對(duì)數(shù)組元素進(jìn)行排序。確切的說(shuō)是小頂堆。
首先判斷容量,如果容量不夠就擴(kuò)容,接著判斷是不是第一個(gè)元素,如果是,那么直接放在index為0的位置,不是的話進(jìn)行上濾操作。接下來(lái)判斷添加的元素是不是在堆頂,如果是那么需要進(jìn)行優(yōu)先調(diào)度,那么進(jìn)行signal。
其實(shí)這里又引申出一個(gè)問(wèn)題,那么就是是靠什么排序的?這個(gè)時(shí)候我們看一下任務(wù)的實(shí)體ScheduledFutureTask,它復(fù)寫了compareTo方法:
//比較方法
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
//根據(jù)time去比較,time是在創(chuàng)建任務(wù)的時(shí)候計(jì)算出來(lái)的,指下一次運(yùn)行的時(shí)間
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
里面用time去判斷大小,time便是下一次調(diào)度的時(shí)間點(diǎn),那么顯然越小的離現(xiàn)在越近,越要放在前面。
看了offer方法我們?cè)倏纯磘ake方法,這個(gè)方法是用來(lái)獲取任務(wù)的:
public RunnableScheduledFuture take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//阻塞
//隊(duì)列的存儲(chǔ)采用數(shù)組,優(yōu)先級(jí)排序采用二叉堆實(shí)現(xiàn)。
try {
for (;;) {
//最大的一個(gè)是最先應(yīng)該被執(zhí)行的
RunnableScheduledFuture first = queue[0];
if (first == null)
available.await();
else {
//獲取第一個(gè)任務(wù),是距離當(dāng)前最近的任務(wù),可能會(huì)有一點(diǎn)延遲
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay <= 0)//要立即執(zhí)行
return finishPoll(first);
else if (leader != null)
available.await();//拿不到leader的線程全部await
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);//如果此時(shí)線程喚醒了,那么其他線程將不能進(jìn)入同步塊
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();//喚醒所有的await線程
lock.unlock();
}
}
毫無(wú)疑問(wèn),take中直接獲取queue[0],它是距離目前最近的要被執(zhí)行的任務(wù),先檢測(cè)一下還有多長(zhǎng)時(shí)間,任務(wù)會(huì)被執(zhí)行,如果小于0,那么立刻彈出,并且做一個(gè)下濾操作,重新找出堆頂元素。如果不小于0,那么證明時(shí)間還沒(méi)到,那么available.awaitNanos(delay);等到delay時(shí)間后自動(dòng)喚醒,或者因?yàn)樘砑恿艘粋€(gè)更加緊急的任務(wù)即offer中的signal被調(diào)用了,那么喚醒,重新循環(huán)獲取最優(yōu)先執(zhí)行的任務(wù),如果delay小于0,那么直接彈出任務(wù)。
至此任務(wù)調(diào)度的邏輯分析完了,但是還有周期執(zhí)行是怎么實(shí)現(xiàn)的呢?其實(shí)是在ScheduledFutureTask的run中實(shí)現(xiàn)的:
/**
* Overrides FutureTask version so as to reset/requeue if periodic.
*/
public void run() {
boolean periodic = isPeriodic();//判斷是不是定時(shí)周期調(diào)度的
if (!canRunInCurrentRunState(periodic))
cancel(false);
else if (!periodic)
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) {//設(shè)置當(dāng)前狀態(tài)是可重復(fù)執(zhí)行的
setNextRunTime();//計(jì)算下一次執(zhí)行時(shí)期
reExecutePeriodic(outerTask);//加入隊(duì)列
}
}
判斷是不是周期調(diào)度的任務(wù),如果是等待執(zhí)行完畢之后,重新設(shè)置下一次執(zhí)行時(shí)間,并且將此任務(wù)重新offer到queue中,這樣就實(shí)現(xiàn)了周期調(diào)度。
問(wèn)題:
其實(shí)這里是有一個(gè)問(wèn)題的,就是如果當(dāng)核心線程池比較少,但是執(zhí)行的任務(wù)又有很多阻塞性的任務(wù),那么就會(huì)導(dǎo)致在任務(wù)到期改執(zhí)行的時(shí)候,而沒(méi)有線程去執(zhí)行任務(wù)。這樣就會(huì)導(dǎo)致任務(wù)調(diào)度時(shí)間不準(zhǔn),同時(shí)后面的任務(wù)也可能被影響,所以在設(shè)置的時(shí)候可以將自己的核心線程池調(diào)大一點(diǎn),避免這種問(wèn)題。
整體的流程可以參考下面這個(gè)圖,描述的十分清楚:
