背景
在實(shí)際的業(yè)務(wù)場景中,我們常常需要周期性執(zhí)行一些任務(wù),比如巡查系統(tǒng)資源,處理過期數(shù)據(jù)等等。這些事情如果人工去執(zhí)行的話,無疑是對人力資源的浪費(fèi)。因此我們就開發(fā)出了定時任務(wù)。目前業(yè)界已有許多出色的定時任務(wù)框架,如quartz,elastic-job,包括SpringBoot也提供了定時任務(wù),當(dāng)然JDK本身也提供了定時任務(wù)功能。
那么我們在用這些框架的時候,有沒有想過它們是怎么實(shí)現(xiàn)定時任務(wù)的呢?時間輪算法就是這樣一種實(shí)現(xiàn)定時任務(wù)的方法。
一、概述
時間輪算法是通過一個時間輪去維護(hù)定時任務(wù),按照一定的時間單位對時間輪進(jìn)行劃分刻度。然后根據(jù)任務(wù)的延時計算任務(wù)該落在時間輪的第幾個刻度,如果任務(wù)時長超出了時間輪的刻度數(shù)量,則增加一個參數(shù)記錄時間輪需要轉(zhuǎn)動的圈數(shù)。
時間輪每轉(zhuǎn)動一次就檢查當(dāng)前刻度下的任務(wù)圈數(shù)是否為0,如果為0說明時間到了就執(zhí)行任務(wù),否則就減少任務(wù)的圈數(shù)。這樣看起來已經(jīng)很好了,可以滿足基本的定時任務(wù)需求了,但是我們還能不能繼續(xù)優(yōu)化一下呢?答案是可以的。想想我們家里的水表,它是不是有多個輪子在轉(zhuǎn)動,時間輪是不是也可以改造成多級聯(lián)動呢?建立3個時間輪,月輪、周輪、日輪,月輪存儲每個月份需要執(zhí)行定時任務(wù),轉(zhuǎn)動時將當(dāng)月份的任務(wù)拋到周輪,周輪轉(zhuǎn)動時將當(dāng)天的任務(wù)拋到日輪中,日輪轉(zhuǎn)動時直接執(zhí)行當(dāng)前刻度下的定時任務(wù)。
1.1 絕對時間和相對時間
定時任務(wù)一般有兩種:
- 1、約定一段時間后執(zhí)行。
- 2、約定某個時間點(diǎn)執(zhí)行。
其實(shí)這兩者是可以互相轉(zhuǎn)換的,比如現(xiàn)在有一個定時任務(wù)是12點(diǎn)執(zhí)行,當(dāng)前時間是9點(diǎn),那就可以認(rèn)為這個任務(wù)是3小時后執(zhí)行。同樣,現(xiàn)在又有一個任務(wù),是3小時后執(zhí)行,那也可以認(rèn)為這個任務(wù)12點(diǎn)執(zhí)行。
假設(shè)我們現(xiàn)在有3個定時任務(wù)A、B、C,分別需要在3點(diǎn)、4點(diǎn)和9點(diǎn)執(zhí)行,我們把它們都轉(zhuǎn)換成絕對時間。

只需要把任務(wù)放到它需要被執(zhí)行的時刻,然后等到時針轉(zhuǎn)到相應(yīng)的位置時,取出該時刻放置的任務(wù),執(zhí)行就可以了。這就是時間輪算法的核心思想。
1.2 重復(fù)執(zhí)行
多數(shù)定時任務(wù)是需要重復(fù)執(zhí)行,比如每天上午9點(diǎn)執(zhí)行生成報表的任務(wù)。對于重復(fù)執(zhí)行的任務(wù),其實(shí)我們需要關(guān)心的只是下次執(zhí)行時間,并不關(guān)心這個任務(wù)需要循環(huán)多少次,還是那每天上午9點(diǎn)的這個任務(wù)來說。
- 1、比如現(xiàn)在是下午4點(diǎn)鐘,我把這個任務(wù)加入到時間輪,并設(shè)定當(dāng)時針轉(zhuǎn)到明天上午九點(diǎn)(該任務(wù)下次執(zhí)行的時間)時執(zhí)行。
- 2、時間來到了第二天上午九點(diǎn),時間輪也轉(zhuǎn)到了9點(diǎn)鐘的位置,發(fā)現(xiàn)該位置有一個生成報表的任務(wù),拿出來執(zhí)行。
- 3、同時時間輪發(fā)現(xiàn)這是一個循環(huán)執(zhí)行的任務(wù),于是把該任務(wù)重新放回到9點(diǎn)鐘的位置。
- 4、重復(fù)步驟2和步驟3。
如果哪一天這個任務(wù)不需要再執(zhí)行了,那么直接通知時間輪,找到這個任務(wù)的位置刪除掉就可以了。由上面的過程我們可以看到,時間輪至少需要提供4個功能:
- 1、加入任務(wù)
- 2、執(zhí)行任務(wù)
- 3、刪除任務(wù)
- 4、沿著時間刻度前進(jìn)
1.3 時間輪的數(shù)據(jù)結(jié)構(gòu)
時鐘可以使用數(shù)組來表示,那么時鐘的每一個刻度就是一個槽,槽用來存在該刻度需要被執(zhí)行的定時任務(wù)。正常業(yè)務(wù)中,同一時刻中是會存在多個定時任務(wù)的,所以每個槽中放一個鏈表或者隊列就可以了,執(zhí)行的時候遍歷一遍即可。

同一時刻存在多個任務(wù)時,只要把該刻度對應(yīng)的鏈表全部遍歷一遍,執(zhí)行(扔到線程池中異步執(zhí)行)其中的任務(wù)即可。
1.4 時間刻度不夠用
增加時間輪的刻度
現(xiàn)在有我有2個定時任務(wù),一個任務(wù)每周一上午9點(diǎn)執(zhí)行,另一個任務(wù)每周三上午9點(diǎn)執(zhí)行。最簡單的辦法就是增大時間輪的長度,可以從12個加到168 (一天24小時,一周就是168小時),那么下周一上午九點(diǎn)就是時間輪的第9個刻度,這周三上午九點(diǎn)就是時間輪的第57個刻度。

這樣做的缺點(diǎn):
- 1、時間刻度太多會導(dǎo)致時間輪走到的多數(shù)刻度沒有任務(wù)執(zhí)行,比如一個月就2個任務(wù),我得移動720次,其中718次是無用功。
- 2、時間刻度太多會導(dǎo)致存儲空間變大,利用率變低,比如一個月就2個任務(wù),我得需要大小是720的數(shù)組,如果我的執(zhí)行時間的粒度精確到秒,那就更恐怖了。
1.5 任務(wù)增加round屬性
現(xiàn)在時間輪的刻度還沿用24,但是槽中的每個任務(wù)增加一個round屬性,代表時鐘轉(zhuǎn)過第幾圈之后再次轉(zhuǎn)到這個槽的時候執(zhí)行。

上圖代表任務(wù)三在指針下一圈移動時執(zhí)行,整體流程就是時間輪沒移動一個刻度的時候都要遍歷槽中所有任務(wù),對每個任務(wù)的round屬性減1,并取出round為0的任務(wù)調(diào)度,這樣可以解決增加時間輪帶來的空間浪費(fèi)。但是這樣帶來的問題時,每次移動刻度的耗時會增加,當(dāng)時間刻度很小(秒級甚至毫秒級),任務(wù)列表有很長,這種方案是不能接受的。
1.6 分層時間輪
分層時間輪是這樣一種思想:
- 1、針對時間復(fù)雜度的問題:不做遍歷計算round,凡是任務(wù)列表中的都應(yīng)該是應(yīng)該被執(zhí)行的,直接全部取出來執(zhí)行。
- 2、針對空間復(fù)雜度的問題:分層,每個時間粒度對應(yīng)一個時間輪,多個時間輪之間進(jìn)行級聯(lián)協(xié)作。
假設(shè)現(xiàn)在有3個定時任務(wù):
- 任務(wù)一每天上午9點(diǎn)執(zhí)行
- 任務(wù)二每周2上午9點(diǎn)執(zhí)行
- 任務(wù)三每月12號上午9點(diǎn)執(zhí)行。
根據(jù)這三個任務(wù)的調(diào)度粒度,可以劃分為3個時間輪,月輪、周輪和天輪,初始添加任務(wù)時,任務(wù)一被添加到天輪上,任務(wù)二被添加到周輪,任務(wù)三被添加到月輪上。三個時間輪按各自的刻度運(yùn)轉(zhuǎn),當(dāng)周輪移動到刻度2時,取出任務(wù)二丟到天輪上,當(dāng)天輪移動到刻度9時執(zhí)行。同樣任務(wù)三在移動到刻度12時,取出任務(wù)三丟給月輪。以此類推。

1.7 round時間輪和分層時間輪的一點(diǎn)比較
相比于round時間輪思想,采用分層時間輪算法的優(yōu)點(diǎn)在于:只需要多耗費(fèi)極少的空間(從1個時間輪到3個時間輪),就能實(shí)現(xiàn)多線程在效率上的提高(一個時間輪是一個線程去行走,3個時間輪可以3個線程行走)。當(dāng)然這是相對的,若提交的任務(wù)都是每隔幾個小時重復(fù)執(zhí)行,那顯然小時時間輪比月、周、小時時間輪組合的耗費(fèi)空間少,且執(zhí)行時間還相同。
1.8 時間輪的應(yīng)用
時間輪的思想應(yīng)用范圍非常廣泛,各種操作系統(tǒng)的定時任務(wù)調(diào)度,Crontab、Dubbo、新版的XXL-JOB、還有基于java的通信框架Netty中也有時間輪的實(shí)現(xiàn),幾乎所有的時間任務(wù)調(diào)度系統(tǒng)采用的都是時間輪的思想。
至于采用round型的時間輪還是采用分層時間輪,看實(shí)際需要吧,時間復(fù)雜度和實(shí)現(xiàn)復(fù)雜度的取舍。
二、時間輪定時使用方式
用Netty的HashedWheelTimer來實(shí)現(xiàn),給Pom加上下面的依賴
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.75.Final</version>
</dependency>
使用測試:
@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {
@Test
public void test() throws InterruptedException {
DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("timer-task"), 1, TimeUnit.MILLISECONDS,8);
TimerTask timerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
System.out.println("hello world " + LocalDateTime.now().format(formatter));
//執(zhí)行完成之后再次加入調(diào)度
timer.newTimeout(this, 4, TimeUnit.SECONDS);
}
};
//將定時任務(wù)放入時間輪
timer.newTimeout(timerTask, 4, TimeUnit.SECONDS);
Thread.currentThread().join();
}
}
在這里使用的是 netty 使用時間輪算法實(shí)現(xiàn)的HashedWheelTimer來做的每隔 4s 的定時調(diào)度。
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel) {
this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}
使用方式比較簡單,創(chuàng)建一個HashedWheelTimer時間輪定時器對象,threadFactory:創(chuàng)建線程的線程工廠
- tickDuration:一個間隔時間(步長)
- tickDuration:間隔時間的單位
- ticksPerWheel:時間輪的大小
輸出如下:
hello world 2022-04-12 18:46:36
hello world 2022-04-12 18:46:40
hello world 2022-04-12 18:46:44
hello world 2022-04-12 18:46:48
hello world 2022-04-12 18:46:52
hello world 2022-04-12 18:46:56
hello world 2022-04-12 18:47:00
hello world 2022-04-12 18:47:04
hello world 2022-04-12 18:47:08
hello world 2022-04-12 18:47:12
hello world 2022-04-12 18:47:16
hello world 2022-04-12 18:47:20
三、時間輪定時內(nèi)部原理
時間輪定時器原理基本都是如下圖:

時間輪算法可以簡單的看成一個循環(huán)數(shù)組+雙向鏈表的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的。
循環(huán)數(shù)組構(gòu)成一個環(huán)形結(jié)構(gòu),指針每隔 tickDuration 時間走一步,每個數(shù)組上掛載一個雙向鏈表結(jié)構(gòu)的定時任務(wù)列表。
雙向鏈表上的任務(wù)有個屬性為 remainingRounds,即當(dāng)前任務(wù)剩下的輪次是多少,每當(dāng)指針走到該任務(wù)的位置時,remainingRounds 減 1,直到remainingRounds 為 0 時,定時任務(wù)觸發(fā)。
通過時間輪算法的原理圖我們可以知道,tickDuration 越小,定時任務(wù)越精確。
3.1 時間輪定時源碼剖析
3.1.1 構(gòu)造方法
首先從 HashedWheelTimer 的構(gòu)造方法分析
public class HashedWheelTimer implements Timer {
public HashedWheelTimer(
ThreadFactory threadFactory,
long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
long maxPendingTimeouts) {
//線程工廠非null判斷
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}
//時間單位非null判斷
if (unit == null) {
throw new NullPointerException("unit");
}
//時間間隔(步長)大于0判斷
if (tickDuration <= 0) {
throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
}
//循環(huán)數(shù)組長度大于0判斷
if (ticksPerWheel <= 0) {
throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
}
// Normalize ticksPerWheel to power of two and initialize the wheel.
// 將ticksPerWheel修改為2的整數(shù)次冪 并且新建數(shù)組
wheel = createWheel(ticksPerWheel);
// 數(shù)組長度-1,其二進(jìn)制均為1. 通過指針tick&mask 獲取當(dāng)前的數(shù)組下標(biāo),類似于hashmap的 hashcode&(len -1)
mask = wheel.length - 1;
// Convert tickDuration to nanos.
long duration = unit.toNanos(tickDuration);
// Prevent overflow.
if (duration >= Long.MAX_VALUE / wheel.length) {
throw new IllegalArgumentException(String.format(
"tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
tickDuration, Long.MAX_VALUE / wheel.length));
}
if (duration < MILLISECOND_NANOS) {
if (logger.isWarnEnabled()) {
logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
tickDuration, MILLISECOND_NANOS);
}
this.tickDuration = MILLISECOND_NANOS;
} else {
this.tickDuration = duration;
}
//創(chuàng)建工作線程,該線程會定期的移動指針,掃描鏈表任務(wù),后面再分析
workerThread = threadFactory.newThread(worker);
leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;
this.maxPendingTimeouts = maxPendingTimeouts;
//判斷HashedWheelTimer實(shí)例是否創(chuàng)建太多,如果是就輸出一個日志
if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
reportTooManyInstances();
}
}
}
構(gòu)造方法比較簡單明了,主要是做一些初始化工作,比如數(shù)組的長度控制為2的整數(shù)次冪,新建數(shù)組,新建工作線程等。
3.2 添加任務(wù)
繼續(xù)往下看如何向時間輪定時器添加一個定時任務(wù)。
public class HashedWheelTimer implements Timer {
@Override
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
if (task == null) {
throw new NullPointerException("task");
}
if (unit == null) {
throw new NullPointerException("unit");
}
//一個計數(shù)器,表示當(dāng)前在隊列中等待的任務(wù)數(shù)量
long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
//默認(rèn)maxPendingTimeouts為-1,如果該值>0.添加新任務(wù)時會進(jìn)行判斷,如果當(dāng)前任務(wù)大于maxPendingTimeouts,則跑出拒絕異常
if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
pendingTimeouts.decrementAndGet();
throw new RejectedExecutionException("Number of pending timeouts ("
+ pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
+ "timeouts (" + maxPendingTimeouts + ")");
}
//檢測工作線程掃描是否啟動,如果未啟動,啟動下
start();
// Add the timeout to the timeout queue which will be processed on the next tick.
// During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
//startTime為工作線程啟動的時間,deadline為:System.nanoTime()+任務(wù)延遲時間-工作線程的啟動時間
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
// Guard against overflow.
//溢出判斷,因?yàn)閟tartTime是在start()方法中啟動工作線程后賦值的,
//在delay大于0的情況下,deadline是不可能小于0,除非溢出了。如果溢出了為deadline賦值一個最大值
if (delay > 0 && deadline < 0) {
deadline = Long.MAX_VALUE;
}
//創(chuàng)建HashedWheelTimeout對象
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
//將任務(wù)加入timeouts隊列
timeouts.add(timeout);
return timeout;
}
}
該方法主要執(zhí)行以下幾個工作
- 1.參數(shù)非空校驗(yàn)
- 2.任務(wù)數(shù)量最大值檢測
- 3.工作線程啟動
- 4.獲取任務(wù)的 deadline,將任務(wù)封裝為 HashedWheelTimeout 對象
- 5.將 HashedWheelTimeout 對象放入任務(wù)隊列 timeouts
3.3 工作線程啟動
下面簡單看下 start 方法
public class HashedWheelTimer implements Timer {
public void start() {
switch (WORKER_STATE_UPDATER.get(this)) {
case WORKER_STATE_INIT:
if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
//如果發(fā)現(xiàn)當(dāng)前工作線程的狀態(tài)為WORKER_STATE_INIT 初始化狀態(tài),則設(shè)置線程狀態(tài)為 WORKER_STATE_STARTED并 啟動工作線程
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
// Wait until the startTime is initialized by the worker.
//startTime 初始值為0,并且在工作線程啟動后設(shè)置。startTimeInitialized是一個CountDownLatch鎖,在工作線程啟動后釋放
while (startTime == 0) {
try {
startTimeInitialized.await();
} catch (InterruptedException ignore) {
// Ignore - it will be ready very soon.
}
}
}
}
該方法主要是啟動工作線程并等待工作線程啟動完成。
繼續(xù)看工作線程的 run 方法做什么事情
3.4 工作線程run方法
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
@Override
public void run() {
// Initialize the startTime.
//線程啟動后初始化startTime 時間為System.nanoTime()
startTime = System.nanoTime();
if (startTime == 0) {
// We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
startTime = 1;
}
// Notify the other threads waiting for the initialization at start().
//釋放start方法中的CountDownLatch鎖
startTimeInitialized.countDown();
//在當(dāng)前工作線程狀態(tài)一直為 WORKER_STATE_STARTED 時循環(huán)執(zhí)行
do {
//waitForNextTick 主要是指針跳動,內(nèi)部使用Thread.sleep實(shí)現(xiàn)
final long deadline = waitForNextTick();
//小于0表示收到了關(guān)閉的信號
if (deadline > 0) {
//tick和mask進(jìn)行按位與操作獲取到當(dāng)前數(shù)組下標(biāo)位置
int idx = (int) (tick & mask);
//從時間輪中移除所有已經(jīng)取消的定時任務(wù)
processCancelledTasks();
//獲取到下標(biāo)對應(yīng)的鏈表頭
HashedWheelBucket bucket =
wheel[idx];
//將隊列中的定時任務(wù)放入到時間輪中
transferTimeoutsToBuckets();
//遍歷鏈表任務(wù),將達(dá)到執(zhí)行時間的任務(wù)觸發(fā)執(zhí)行
bucket.expireTimeouts(deadline);
//指針+1
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
// Fill the unprocessedTimeouts so we can return them from stop() method.
//工作線程停止后,將時間輪上的所有任務(wù)放入unprocessedTimeouts集合
for (HashedWheelBucket bucket: wheel) {
bucket.clearTimeouts(unprocessedTimeouts);
}
//將任務(wù)隊列中的任務(wù)也放入unprocessedTimeouts集合
for (;;) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
break;
}
if (!timeout.isCancelled()) {
unprocessedTimeouts.add(timeout);
}
}
//移除所有的未處理的定時任務(wù)
processCancelledTasks();
}
}
}
該部分代碼主要分為以下幾個部分
- 設(shè)置線程的啟動時間 startTime
- 在工作線程啟動的狀態(tài)下
- 根據(jù)用戶配置的 tickDuration 指針每次跳動一下
- 從時間輪中移除所有已經(jīng)取消的定時任務(wù)
- 將隊列中的定時任務(wù)放入到時間輪中
- 遍歷鏈表任務(wù),將達(dá)到執(zhí)行時間的任務(wù)觸發(fā)執(zhí)行
- 工作線程停止后的清理工作
- 下面看一下指針跳動的代碼
3.5 指針跳動
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
private long waitForNextTick() {
//獲取下一個指針的deadline時間
long deadline = tickDuration * (tick + 1);
for (;;) {
//當(dāng)前工作線程的活動時間
final long currentTime = System.nanoTime() - startTime;
//計算還需要多久達(dá)到deadline 。
//這里加上999999的原因是因?yàn)?只會取整數(shù)部分,并且是使用Thread.sleep時間的,參數(shù)為毫秒。
//為了保證任務(wù)不被提前執(zhí)行,加上999999后就能夠向上取整1ms。
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
//sleepTimeMs 小于0表示達(dá)到了任務(wù)的觸發(fā)時間
if (sleepTimeMs <= 0) {
if (currentTime == Long.MIN_VALUE) {
return -Long.MAX_VALUE;
} else {
return currentTime;
}
}
// Check if we run on windows, as if thats the case we will need
// to round the sleepTime as workaround for a bug that only affect
// the JVM if it runs on windows.
//
// See https://github.com/netty/netty/issues/356
if (PlatformDependent.isWindows()) {
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try {
Thread.sleep(sleepTimeMs);
} catch (InterruptedException ignored) {
if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
return Long.MIN_VALUE;
}
}
}
}
}
}
通過源碼分析我們可以看到時間輪算法實(shí)現(xiàn)的指針跳動是通過Thread.sleep 實(shí)現(xiàn)的,難以理解的就是 (deadline - currentTime + 999999) / 1000000;
3.6 將隊列任務(wù)放入時間輪中
在工作線程的 run 方法中會調(diào)用 transferTimeoutsToBuckets方法,該方法會將用戶提交到隊列中的定時任務(wù)移動到時間輪中,下面具體分析下
public class HashedWheelTimer implements Timer {
private final class Worker implements Runnable {
private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();
private long tick;
private void transferTimeoutsToBuckets() {
// transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
// adds new timeouts in a loop.
//每次最多只遷移 10W 個定時任務(wù),主要是為了防止遷移時間過長,導(dǎo)致時間輪中的任務(wù)延遲執(zhí)行
for (int i = 0; i < 100000; i++) {
HashedWheelTimeout timeout = timeouts.poll();
if (timeout == null) {
// all processed
break;
}
//如果任務(wù)已經(jīng)被取消,就跳過
if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
// Was cancelled in the meantime.
continue;
}
//計算任務(wù)需要放入的數(shù)組位置
long calculated = timeout.deadline / tickDuration;
//由于時間輪中的數(shù)組是循環(huán)數(shù)組,計算還需要幾個輪次
timeout.remainingRounds = (calculated - tick) / wheel.length;
//calculated 和tick 取最大,主要是為了保證過時的任務(wù)能夠被調(diào)度。
//正常情況下calculated是大于tick的,
//如果某些任務(wù)執(zhí)行時間過長,導(dǎo)致tick大于calculated,此時直接把過時的任務(wù)放到當(dāng)前鏈表隊列
final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
//按位與獲取任務(wù)的執(zhí)行位置
int stopIndex = (int) (ticks & mask);
HashedWheelBucket bucket = wheel[stopIndex];
//將任務(wù)放入當(dāng)前數(shù)組上的鏈表
bucket.addTimeout(timeout);
}
}
}
}
transferTimeoutsToBuckets 方法很簡單,我們主要要記住兩點(diǎn)
- 1.每次最多會遷移10W 個隊列中的任務(wù)到時間輪中,為了保證不影響工作線程的指針跳動
- 2.并且我們發(fā)現(xiàn)取消的任務(wù)會直接跳過,過時的任務(wù)會直接放到當(dāng)前位置。
3.7 鏈表任務(wù)遍歷
public class HashedWheelTimer implements Timer {
private static final class HashedWheelBucket {
// Used for the linked-list datastructure
private HashedWheelTimeout head;
private HashedWheelTimeout tail;
/**
* Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
*/
public void expireTimeouts(long deadline) {
HashedWheelTimeout timeout = head;
// process all timeouts
//遍歷鏈表的所有任務(wù)
while (timeout != null) {
HashedWheelTimeout next = timeout.next;
//如果剩下的輪次<=0
if (timeout.remainingRounds <= 0) {
//從雙向鏈表中移除該任務(wù)
next = remove(timeout);
//如果當(dāng)前任務(wù)的deadline小于目前時間輪的deadline,表示任務(wù)已經(jīng)可以被觸發(fā)
if (timeout.deadline <= deadline) {
//任務(wù)執(zhí)行
timeout.expire();
} else {
// The timeout was placed into a wrong slot. This should never happen.
throw new IllegalStateException(String.format(
"timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
}
} else if (timeout.isCancelled()) {
//任務(wù)取消也從鏈表中移除
next = remove(timeout);
} else {
// 任務(wù)的剩余輪次-1
timeout.remainingRounds --;
}
//鏈表遍歷
timeout = next;
}
}
}
}
該方法主要是遍歷鏈表上的定時任務(wù)
- 任務(wù)所剩輪次為 0 并且任務(wù)的 deadline 小于目前時間輪的 deadline,任務(wù)觸發(fā)執(zhí)行
- 任務(wù)被取消,從鏈表中移除
- 任務(wù)輪次大于 0 并且還未取消,輪次 -1
- 遍歷下個定時任務(wù)
3.8 定時任務(wù)執(zhí)行
public class HashedWheelTimer implements Timer {
private static final class HashedWheelTimeout implements Timeout {
public void expire() {
if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
return;
}
try {
task.run(this);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
}
}
}
}
}
定時任務(wù)執(zhí)行代碼,看著很簡單,首先將任務(wù)的狀態(tài)設(shè)置為ST_EXPIRED,然后直接調(diào)用 run方法執(zhí)行任務(wù),這里說明任務(wù)是在工作線程中執(zhí)行的,也就是說如果任務(wù)執(zhí)行時間過長,會影響其它定時任務(wù)的觸發(fā)。
參考:
https://blog.csdn.net/qq_34772568/article/details/105534389
https://blog.csdn.net/su20145104009/article/details/115636136
https://blog.csdn.net/code_geek/article/details/113133327