定時任務(wù)——時間輪算法

背景

在實(shí)際的業(yè)務(wù)場景中,我們常常需要周期性執(zhí)行一些任務(wù),比如巡查系統(tǒng)資源,處理過期數(shù)據(jù)等等。這些事情如果人工去執(zhí)行的話,無疑是對人力資源的浪費(fèi)。因此我們就開發(fā)出了定時任務(wù)。目前業(yè)界已有許多出色的定時任務(wù)框架,如quartz,elastic-job,包括SpringBoot也提供了定時任務(wù),當(dāng)然JDK本身也提供了定時任務(wù)功能。
那么我們在用這些框架的時候,有沒有想過它們是怎么實(shí)現(xiàn)定時任務(wù)的呢?時間輪算法就是這樣一種實(shí)現(xiàn)定時任務(wù)的方法。

一、概述

時間輪算法是通過一個時間輪去維護(hù)定時任務(wù),按照一定的時間單位對時間輪進(jìn)行劃分刻度。然后根據(jù)任務(wù)的延時計算任務(wù)該落在時間輪的第幾個刻度,如果任務(wù)時長超出了時間輪的刻度數(shù)量,則增加一個參數(shù)記錄時間輪需要轉(zhuǎn)動的圈數(shù)。

時間輪每轉(zhuǎn)動一次就檢查當(dāng)前刻度下的任務(wù)圈數(shù)是否為0,如果為0說明時間到了就執(zhí)行任務(wù),否則就減少任務(wù)的圈數(shù)。這樣看起來已經(jīng)很好了,可以滿足基本的定時任務(wù)需求了,但是我們還能不能繼續(xù)優(yōu)化一下呢?答案是可以的。想想我們家里的水表,它是不是有多個輪子在轉(zhuǎn)動,時間輪是不是也可以改造成多級聯(lián)動呢?建立3個時間輪,月輪、周輪、日輪,月輪存儲每個月份需要執(zhí)行定時任務(wù),轉(zhuǎn)動時將當(dāng)月份的任務(wù)拋到周輪,周輪轉(zhuǎn)動時將當(dāng)天的任務(wù)拋到日輪中,日輪轉(zhuǎn)動時直接執(zhí)行當(dāng)前刻度下的定時任務(wù)。

1.1 絕對時間和相對時間

定時任務(wù)一般有兩種:

  • 1、約定一段時間后執(zhí)行。
  • 2、約定某個時間點(diǎn)執(zhí)行。

其實(shí)這兩者是可以互相轉(zhuǎn)換的,比如現(xiàn)在有一個定時任務(wù)是12點(diǎn)執(zhí)行,當(dāng)前時間是9點(diǎn),那就可以認(rèn)為這個任務(wù)是3小時后執(zhí)行。同樣,現(xiàn)在又有一個任務(wù),是3小時后執(zhí)行,那也可以認(rèn)為這個任務(wù)12點(diǎn)執(zhí)行。

假設(shè)我們現(xiàn)在有3個定時任務(wù)A、B、C,分別需要在3點(diǎn)、4點(diǎn)和9點(diǎn)執(zhí)行,我們把它們都轉(zhuǎn)換成絕對時間。

只需要把任務(wù)放到它需要被執(zhí)行的時刻,然后等到時針轉(zhuǎn)到相應(yīng)的位置時,取出該時刻放置的任務(wù),執(zhí)行就可以了。這就是時間輪算法的核心思想。

1.2 重復(fù)執(zhí)行

多數(shù)定時任務(wù)是需要重復(fù)執(zhí)行,比如每天上午9點(diǎn)執(zhí)行生成報表的任務(wù)。對于重復(fù)執(zhí)行的任務(wù),其實(shí)我們需要關(guān)心的只是下次執(zhí)行時間,并不關(guān)心這個任務(wù)需要循環(huán)多少次,還是那每天上午9點(diǎn)的這個任務(wù)來說。

  • 1、比如現(xiàn)在是下午4點(diǎn)鐘,我把這個任務(wù)加入到時間輪,并設(shè)定當(dāng)時針轉(zhuǎn)到明天上午九點(diǎn)(該任務(wù)下次執(zhí)行的時間)時執(zhí)行。
  • 2、時間來到了第二天上午九點(diǎn),時間輪也轉(zhuǎn)到了9點(diǎn)鐘的位置,發(fā)現(xiàn)該位置有一個生成報表的任務(wù),拿出來執(zhí)行。
  • 3、同時時間輪發(fā)現(xiàn)這是一個循環(huán)執(zhí)行的任務(wù),于是把該任務(wù)重新放回到9點(diǎn)鐘的位置。
  • 4、重復(fù)步驟2和步驟3。

如果哪一天這個任務(wù)不需要再執(zhí)行了,那么直接通知時間輪,找到這個任務(wù)的位置刪除掉就可以了。由上面的過程我們可以看到,時間輪至少需要提供4個功能:

  • 1、加入任務(wù)
  • 2、執(zhí)行任務(wù)
  • 3、刪除任務(wù)
  • 4、沿著時間刻度前進(jìn)

1.3 時間輪的數(shù)據(jù)結(jié)構(gòu)

時鐘可以使用數(shù)組來表示,那么時鐘的每一個刻度就是一個槽,槽用來存在該刻度需要被執(zhí)行的定時任務(wù)。正常業(yè)務(wù)中,同一時刻中是會存在多個定時任務(wù)的,所以每個槽中放一個鏈表或者隊列就可以了,執(zhí)行的時候遍歷一遍即可。

同一時刻存在多個任務(wù)時,只要把該刻度對應(yīng)的鏈表全部遍歷一遍,執(zhí)行(扔到線程池中異步執(zhí)行)其中的任務(wù)即可。

1.4 時間刻度不夠用

增加時間輪的刻度

現(xiàn)在有我有2個定時任務(wù),一個任務(wù)每周一上午9點(diǎn)執(zhí)行,另一個任務(wù)每周三上午9點(diǎn)執(zhí)行。最簡單的辦法就是增大時間輪的長度,可以從12個加到168 (一天24小時,一周就是168小時),那么下周一上午九點(diǎn)就是時間輪的第9個刻度,這周三上午九點(diǎn)就是時間輪的第57個刻度。

這樣做的缺點(diǎn):

  • 1、時間刻度太多會導(dǎo)致時間輪走到的多數(shù)刻度沒有任務(wù)執(zhí)行,比如一個月就2個任務(wù),我得移動720次,其中718次是無用功。
  • 2、時間刻度太多會導(dǎo)致存儲空間變大,利用率變低,比如一個月就2個任務(wù),我得需要大小是720的數(shù)組,如果我的執(zhí)行時間的粒度精確到秒,那就更恐怖了。

1.5 任務(wù)增加round屬性

現(xiàn)在時間輪的刻度還沿用24,但是槽中的每個任務(wù)增加一個round屬性,代表時鐘轉(zhuǎn)過第幾圈之后再次轉(zhuǎn)到這個槽的時候執(zhí)行。

上圖代表任務(wù)三在指針下一圈移動時執(zhí)行,整體流程就是時間輪沒移動一個刻度的時候都要遍歷槽中所有任務(wù),對每個任務(wù)的round屬性減1,并取出round為0的任務(wù)調(diào)度,這樣可以解決增加時間輪帶來的空間浪費(fèi)。但是這樣帶來的問題時,每次移動刻度的耗時會增加,當(dāng)時間刻度很小(秒級甚至毫秒級),任務(wù)列表有很長,這種方案是不能接受的。

1.6 分層時間輪

分層時間輪是這樣一種思想:

  • 1、針對時間復(fù)雜度的問題:不做遍歷計算round,凡是任務(wù)列表中的都應(yīng)該是應(yīng)該被執(zhí)行的,直接全部取出來執(zhí)行。
  • 2、針對空間復(fù)雜度的問題:分層,每個時間粒度對應(yīng)一個時間輪,多個時間輪之間進(jìn)行級聯(lián)協(xié)作。

假設(shè)現(xiàn)在有3個定時任務(wù):

    1. 任務(wù)一每天上午9點(diǎn)執(zhí)行
    1. 任務(wù)二每周2上午9點(diǎn)執(zhí)行
    1. 任務(wù)三每月12號上午9點(diǎn)執(zhí)行。

根據(jù)這三個任務(wù)的調(diào)度粒度,可以劃分為3個時間輪,月輪、周輪和天輪,初始添加任務(wù)時,任務(wù)一被添加到天輪上,任務(wù)二被添加到周輪,任務(wù)三被添加到月輪上。三個時間輪按各自的刻度運(yùn)轉(zhuǎn),當(dāng)周輪移動到刻度2時,取出任務(wù)二丟到天輪上,當(dāng)天輪移動到刻度9時執(zhí)行。同樣任務(wù)三在移動到刻度12時,取出任務(wù)三丟給月輪。以此類推。

1.7 round時間輪和分層時間輪的一點(diǎn)比較

相比于round時間輪思想,采用分層時間輪算法的優(yōu)點(diǎn)在于:只需要多耗費(fèi)極少的空間(從1個時間輪到3個時間輪),就能實(shí)現(xiàn)多線程在效率上的提高(一個時間輪是一個線程去行走,3個時間輪可以3個線程行走)。當(dāng)然這是相對的,若提交的任務(wù)都是每隔幾個小時重復(fù)執(zhí)行,那顯然小時時間輪比月、周、小時時間輪組合的耗費(fèi)空間少,且執(zhí)行時間還相同。

1.8 時間輪的應(yīng)用

時間輪的思想應(yīng)用范圍非常廣泛,各種操作系統(tǒng)的定時任務(wù)調(diào)度,Crontab、Dubbo、新版的XXL-JOB、還有基于java的通信框架Netty中也有時間輪的實(shí)現(xiàn),幾乎所有的時間任務(wù)調(diào)度系統(tǒng)采用的都是時間輪的思想。

至于采用round型的時間輪還是采用分層時間輪,看實(shí)際需要吧,時間復(fù)雜度和實(shí)現(xiàn)復(fù)雜度的取舍。

二、時間輪定時使用方式

用Netty的HashedWheelTimer來實(shí)現(xiàn),給Pom加上下面的依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.75.Final</version>
</dependency>

使用測試:

@RunWith(SpringRunner.class)
@SpringBootTest
public class Test {

    @Test
    public void test() throws InterruptedException {
        DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        HashedWheelTimer timer = new HashedWheelTimer(new NamedThreadFactory("timer-task"), 1, TimeUnit.MILLISECONDS,8);
        TimerTask timerTask = new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                System.out.println("hello world " + LocalDateTime.now().format(formatter));
                //執(zhí)行完成之后再次加入調(diào)度
                timer.newTimeout(this, 4, TimeUnit.SECONDS);
            }
        };
        //將定時任務(wù)放入時間輪
        timer.newTimeout(timerTask, 4, TimeUnit.SECONDS);
        Thread.currentThread().join();
    }
}

在這里使用的是 netty 使用時間輪算法實(shí)現(xiàn)的HashedWheelTimer來做的每隔 4s 的定時調(diào)度。

public HashedWheelTimer(
        ThreadFactory threadFactory,
        long tickDuration, TimeUnit unit, int ticksPerWheel) {
    this(threadFactory, tickDuration, unit, ticksPerWheel, true);
}

使用方式比較簡單,創(chuàng)建一個HashedWheelTimer時間輪定時器對象,threadFactory:創(chuàng)建線程的線程工廠

  • tickDuration:一個間隔時間(步長)
  • tickDuration:間隔時間的單位
  • ticksPerWheel:時間輪的大小

輸出如下:

hello world 2022-04-12 18:46:36
hello world 2022-04-12 18:46:40
hello world 2022-04-12 18:46:44
hello world 2022-04-12 18:46:48
hello world 2022-04-12 18:46:52
hello world 2022-04-12 18:46:56
hello world 2022-04-12 18:47:00
hello world 2022-04-12 18:47:04
hello world 2022-04-12 18:47:08
hello world 2022-04-12 18:47:12
hello world 2022-04-12 18:47:16
hello world 2022-04-12 18:47:20

三、時間輪定時內(nèi)部原理

時間輪定時器原理基本都是如下圖:

時間輪算法可以簡單的看成一個循環(huán)數(shù)組+雙向鏈表的數(shù)據(jù)結(jié)構(gòu)實(shí)現(xiàn)的。
循環(huán)數(shù)組構(gòu)成一個環(huán)形結(jié)構(gòu),指針每隔 tickDuration 時間走一步,每個數(shù)組上掛載一個雙向鏈表結(jié)構(gòu)的定時任務(wù)列表。

雙向鏈表上的任務(wù)有個屬性為 remainingRounds,即當(dāng)前任務(wù)剩下的輪次是多少,每當(dāng)指針走到該任務(wù)的位置時,remainingRounds 減 1,直到remainingRounds 為 0 時,定時任務(wù)觸發(fā)。

通過時間輪算法的原理圖我們可以知道,tickDuration 越小,定時任務(wù)越精確。

3.1 時間輪定時源碼剖析

3.1.1 構(gòu)造方法

首先從 HashedWheelTimer 的構(gòu)造方法分析

public class HashedWheelTimer implements Timer {

    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {
        //線程工廠非null判斷
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        //時間單位非null判斷
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        //時間間隔(步長)大于0判斷
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        //循環(huán)數(shù)組長度大于0判斷
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
        // 將ticksPerWheel修改為2的整數(shù)次冪 并且新建數(shù)組
        wheel = createWheel(ticksPerWheel);
        // 數(shù)組長度-1,其二進(jìn)制均為1. 通過指針tick&mask 獲取當(dāng)前的數(shù)組下標(biāo),類似于hashmap的 hashcode&(len -1)
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        long duration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (duration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }

        if (duration < MILLISECOND_NANOS) {
            if (logger.isWarnEnabled()) {
                logger.warn("Configured tickDuration %d smaller then %d, using 1ms.",
                            tickDuration, MILLISECOND_NANOS);
            }
            this.tickDuration = MILLISECOND_NANOS;
        } else {
            this.tickDuration = duration;
        }
        //創(chuàng)建工作線程,該線程會定期的移動指針,掃描鏈表任務(wù),后面再分析
        workerThread = threadFactory.newThread(worker);

        leak = leakDetection || !workerThread.isDaemon() ? leakDetector.track(this) : null;

        this.maxPendingTimeouts = maxPendingTimeouts;
        //判斷HashedWheelTimer實(shí)例是否創(chuàng)建太多,如果是就輸出一個日志
        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
            WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }
}

構(gòu)造方法比較簡單明了,主要是做一些初始化工作,比如數(shù)組的長度控制為2的整數(shù)次冪,新建數(shù)組,新建工作線程等。

3.2 添加任務(wù)

繼續(xù)往下看如何向時間輪定時器添加一個定時任務(wù)。

public class HashedWheelTimer implements Timer {

    @Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        //一個計數(shù)器,表示當(dāng)前在隊列中等待的任務(wù)數(shù)量
        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();
        //默認(rèn)maxPendingTimeouts為-1,如果該值>0.添加新任務(wù)時會進(jìn)行判斷,如果當(dāng)前任務(wù)大于maxPendingTimeouts,則跑出拒絕異常
        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                + "timeouts (" + maxPendingTimeouts + ")");
        }
        //檢測工作線程掃描是否啟動,如果未啟動,啟動下
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        //startTime為工作線程啟動的時間,deadline為:System.nanoTime()+任務(wù)延遲時間-工作線程的啟動時間
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        //溢出判斷,因?yàn)閟tartTime是在start()方法中啟動工作線程后賦值的,
        //在delay大于0的情況下,deadline是不可能小于0,除非溢出了。如果溢出了為deadline賦值一個最大值
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        //創(chuàng)建HashedWheelTimeout對象
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        //將任務(wù)加入timeouts隊列
        timeouts.add(timeout);
        return timeout;
    }
}

該方法主要執(zhí)行以下幾個工作

  • 1.參數(shù)非空校驗(yàn)
  • 2.任務(wù)數(shù)量最大值檢測
  • 3.工作線程啟動
  • 4.獲取任務(wù)的 deadline,將任務(wù)封裝為 HashedWheelTimeout 對象
  • 5.將 HashedWheelTimeout 對象放入任務(wù)隊列 timeouts

3.3 工作線程啟動

下面簡單看下 start 方法

public class HashedWheelTimer implements Timer {

    public void start() {
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    //如果發(fā)現(xiàn)當(dāng)前工作線程的狀態(tài)為WORKER_STATE_INIT 初始化狀態(tài),則設(shè)置線程狀態(tài)為 WORKER_STATE_STARTED并 啟動工作線程
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        //startTime 初始值為0,并且在工作線程啟動后設(shè)置。startTimeInitialized是一個CountDownLatch鎖,在工作線程啟動后釋放
        while (startTime == 0) {
            try {
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }
}

該方法主要是啟動工作線程并等待工作線程啟動完成。
繼續(xù)看工作線程的 run 方法做什么事情

3.4 工作線程run方法

public class HashedWheelTimer implements Timer {

    private final class Worker implements Runnable {
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        private long tick;

        @Override
        public void run() {
            // Initialize the startTime.
            //線程啟動后初始化startTime 時間為System.nanoTime()
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            //釋放start方法中的CountDownLatch鎖
            startTimeInitialized.countDown();
            //在當(dāng)前工作線程狀態(tài)一直為 WORKER_STATE_STARTED 時循環(huán)執(zhí)行
            do {
                //waitForNextTick 主要是指針跳動,內(nèi)部使用Thread.sleep實(shí)現(xiàn)
                final long deadline = waitForNextTick();
                //小于0表示收到了關(guān)閉的信號
                if (deadline > 0) {
                    //tick和mask進(jìn)行按位與操作獲取到當(dāng)前數(shù)組下標(biāo)位置
                    int idx = (int) (tick & mask);
                    //從時間輪中移除所有已經(jīng)取消的定時任務(wù)
                    processCancelledTasks();
                    //獲取到下標(biāo)對應(yīng)的鏈表頭
                    HashedWheelBucket bucket =
                            wheel[idx];
                    //將隊列中的定時任務(wù)放入到時間輪中
                    transferTimeoutsToBuckets();
                    //遍歷鏈表任務(wù),將達(dá)到執(zhí)行時間的任務(wù)觸發(fā)執(zhí)行
                    bucket.expireTimeouts(deadline);
                    //指針+1
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            //工作線程停止后,將時間輪上的所有任務(wù)放入unprocessedTimeouts集合
            for (HashedWheelBucket bucket: wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            //將任務(wù)隊列中的任務(wù)也放入unprocessedTimeouts集合
            for (;;) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            //移除所有的未處理的定時任務(wù)
            processCancelledTasks();
        }
    }
}

該部分代碼主要分為以下幾個部分

  • 設(shè)置線程的啟動時間 startTime
  • 在工作線程啟動的狀態(tài)下
    • 根據(jù)用戶配置的 tickDuration 指針每次跳動一下
    • 從時間輪中移除所有已經(jīng)取消的定時任務(wù)
    • 將隊列中的定時任務(wù)放入到時間輪中
    • 遍歷鏈表任務(wù),將達(dá)到執(zhí)行時間的任務(wù)觸發(fā)執(zhí)行
  • 工作線程停止后的清理工作
    • 下面看一下指針跳動的代碼

3.5 指針跳動

public class HashedWheelTimer implements Timer {

    private final class Worker implements Runnable {
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        private long tick;

        private long waitForNextTick() {
            //獲取下一個指針的deadline時間
            long deadline = tickDuration * (tick + 1);

            for (;;) {
                //當(dāng)前工作線程的活動時間
                final long currentTime = System.nanoTime() - startTime;
                //計算還需要多久達(dá)到deadline  。
                //這里加上999999的原因是因?yàn)?只會取整數(shù)部分,并且是使用Thread.sleep時間的,參數(shù)為毫秒。
                //為了保證任務(wù)不被提前執(zhí)行,加上999999后就能夠向上取整1ms。
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
                //sleepTimeMs 小于0表示達(dá)到了任務(wù)的觸發(fā)時間
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }

                // Check if we run on windows, as if thats the case we will need
                // to round the sleepTime as workaround for a bug that only affect
                // the JVM if it runs on windows.
                //
                // See https://github.com/netty/netty/issues/356
                if (PlatformDependent.isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }

                try {
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }
    }
}

通過源碼分析我們可以看到時間輪算法實(shí)現(xiàn)的指針跳動是通過Thread.sleep 實(shí)現(xiàn)的,難以理解的就是 (deadline - currentTime + 999999) / 1000000;

3.6 將隊列任務(wù)放入時間輪中

在工作線程的 run 方法中會調(diào)用 transferTimeoutsToBuckets方法,該方法會將用戶提交到隊列中的定時任務(wù)移動到時間輪中,下面具體分析下

public class HashedWheelTimer implements Timer {

    private final class Worker implements Runnable {
        private final Set<Timeout> unprocessedTimeouts = new HashSet<Timeout>();

        private long tick;

        private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // adds new timeouts in a loop.
            //每次最多只遷移 10W 個定時任務(wù),主要是為了防止遷移時間過長,導(dǎo)致時間輪中的任務(wù)延遲執(zhí)行
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                //如果任務(wù)已經(jīng)被取消,就跳過
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }
                //計算任務(wù)需要放入的數(shù)組位置
                long calculated = timeout.deadline / tickDuration;
                //由于時間輪中的數(shù)組是循環(huán)數(shù)組,計算還需要幾個輪次
                timeout.remainingRounds = (calculated - tick) / wheel.length;
                //calculated 和tick 取最大,主要是為了保證過時的任務(wù)能夠被調(diào)度。
                //正常情況下calculated是大于tick的,
                //如果某些任務(wù)執(zhí)行時間過長,導(dǎo)致tick大于calculated,此時直接把過時的任務(wù)放到當(dāng)前鏈表隊列
                final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
                //按位與獲取任務(wù)的執(zhí)行位置
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                //將任務(wù)放入當(dāng)前數(shù)組上的鏈表
                bucket.addTimeout(timeout);
            }
        }
    }
}

transferTimeoutsToBuckets 方法很簡單,我們主要要記住兩點(diǎn)

  • 1.每次最多會遷移10W 個隊列中的任務(wù)到時間輪中,為了保證不影響工作線程的指針跳動
  • 2.并且我們發(fā)現(xiàn)取消的任務(wù)會直接跳過,過時的任務(wù)會直接放到當(dāng)前位置。

3.7 鏈表任務(wù)遍歷

public class HashedWheelTimer implements Timer {

    private static final class HashedWheelBucket {
        // Used for the linked-list datastructure
        private HashedWheelTimeout head;
        private HashedWheelTimeout tail;

        /**
         * Expire all {@link HashedWheelTimeout}s for the given {@code deadline}.
         */
        public void expireTimeouts(long deadline) {
            HashedWheelTimeout timeout = head;

            // process all timeouts
            //遍歷鏈表的所有任務(wù)
            while (timeout != null) {
                HashedWheelTimeout next = timeout.next;
                //如果剩下的輪次<=0
                if (timeout.remainingRounds <= 0) {
                    //從雙向鏈表中移除該任務(wù)
                    next = remove(timeout);
                    //如果當(dāng)前任務(wù)的deadline小于目前時間輪的deadline,表示任務(wù)已經(jīng)可以被觸發(fā)
                    if (timeout.deadline <= deadline) {
                        //任務(wù)執(zhí)行
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    //任務(wù)取消也從鏈表中移除
                    next = remove(timeout);
                } else {
                    // 任務(wù)的剩余輪次-1
                    timeout.remainingRounds --;
                }
                //鏈表遍歷
                timeout = next;
            }
        }
    }
}

該方法主要是遍歷鏈表上的定時任務(wù)

  • 任務(wù)所剩輪次為 0 并且任務(wù)的 deadline 小于目前時間輪的 deadline,任務(wù)觸發(fā)執(zhí)行
  • 任務(wù)被取消,從鏈表中移除
  • 任務(wù)輪次大于 0 并且還未取消,輪次 -1
  • 遍歷下個定時任務(wù)

3.8 定時任務(wù)執(zhí)行

public class HashedWheelTimer implements Timer {

    private static final class HashedWheelTimeout implements Timeout {

        public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }
    }
}

定時任務(wù)執(zhí)行代碼,看著很簡單,首先將任務(wù)的狀態(tài)設(shè)置為ST_EXPIRED,然后直接調(diào)用 run方法執(zhí)行任務(wù),這里說明任務(wù)是在工作線程中執(zhí)行的,也就是說如果任務(wù)執(zhí)行時間過長,會影響其它定時任務(wù)的觸發(fā)。

參考:
https://blog.csdn.net/qq_34772568/article/details/105534389

https://blog.csdn.net/su20145104009/article/details/115636136

https://blog.csdn.net/code_geek/article/details/113133327

https://blog.csdn.net/beslet/article/details/119974430

https://blog.csdn.net/qq_34039868/article/details/105384808

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容