手動(dòng)實(shí)現(xiàn)一個(gè)時(shí)間輪

手動(dòng)實(shí)現(xiàn)一個(gè)時(shí)間輪

// 包 timewheel 提供了一個(gè)簡(jiǎn)單的時(shí)間輪調(diào)度器的實(shí)現(xiàn)。
package timewheel

import (
    "container/list" // 引入 list 包來(lái)使用雙向鏈表
    "context"        // 引入 context 包來(lái)進(jìn)行上下文控制
    "time"           // 引入 time 包來(lái)處理時(shí)間相關(guān)的功能
)

// Task 接口定義了需要時(shí)間輪執(zhí)行的任務(wù),它必須實(shí)現(xiàn) Execute 方法。
type Task interface {
    Execute()
}

// TimeNode 結(jié)構(gòu)體表示時(shí)間輪上的一個(gè)節(jié)點(diǎn)。
type TimeNode struct {
    task    Task      // 任務(wù)實(shí)例
    excTime time.Time // 預(yù)定的執(zhí)行時(shí)間
    cycle   int       // 時(shí)間輪需要轉(zhuǎn)動(dòng)多少圈后執(zhí)行任務(wù)
}

// TimeWheel 結(jié)構(gòu)體表示時(shí)間輪。
type TimeWheel struct {
    startTime  time.Time     // 時(shí)間輪的啟動(dòng)時(shí)間
    interval   time.Duration // 時(shí)間輪的時(shí)間間隔
    currentPos int           // 當(dāng)前的槽位置
    ticker     *time.Ticker  // 定時(shí)觸發(fā)器
    slots      []*list.List  // 時(shí)間輪的槽數(shù)組,每個(gè)槽點(diǎn)是一個(gè)鏈表
    tasks      chan Task     // 任務(wù)channel,用于執(zhí)行任務(wù)
    stop       chan struct{} // 停止channel,用于停止時(shí)間輪
    cancel     context.CancelFunc // 上下文取消函數(shù)
}

// NewTimeWheel 創(chuàng)建并初始化一個(gè)時(shí)間輪實(shí)例。
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
    slots := make([]*list.List, slotNum)
    for i := range slots {
        slots[i] = list.New()
    }
    return &TimeWheel{
        startTime:  time.Now().UTC(),
        interval:   interval,
        currentPos: 0,
        ticker:     time.NewTicker(interval),
        slots:      slots,
        tasks:      make(chan Task, 1000),
        stop:       make(chan struct{}),
    }
}

// DefaultTimeWheel 創(chuàng)建一個(gè)默認(rèn)配置的時(shí)間輪實(shí)例。
func DefaultTimeWheel() *TimeWheel {
    return NewTimeWheel(time.Second, 60)
}

// Start 啟動(dòng)時(shí)間輪,使其開始調(diào)度任務(wù)。
func (t *TimeWheel) Start() {
    ctx := context.Background()
    ctx, cancelFunc := context.WithCancel(ctx)
    t.cancel = cancelFunc
    go t.Loop(ctx)
    go func() {
        for {
            select {
            case task := <-t.tasks: // 當(dāng)任務(wù)channel中有任務(wù)時(shí),執(zhí)行任務(wù)
                task.Execute()
            case <-t.stop: // 當(dāng)接收到停止信號(hào)時(shí),停止定時(shí)器并退出
                t.ticker.Stop()
                return
            }
        }

    }()
}

// Stop 停止時(shí)間輪,取消所有調(diào)度。
func (t *TimeWheel) Stop() {
    t.stop <- struct{}{} // 發(fā)送停止信號(hào)
    t.cancel()           // 調(diào)用上下文的取消函數(shù)
}

// Loop 是時(shí)間輪的核心調(diào)度循環(huán)。
func (t *TimeWheel) Loop(ctx context.Context) {
    for {
        select {
        case <-ctx.Done(): // 當(dāng)上下文被取消時(shí)退出循環(huán)
            return
        case <-t.ticker.C: // 定時(shí)器觸發(fā)時(shí)執(zhí)行
            t.currentPos = (t.currentPos + 1) % len(t.slots) // 計(jì)算當(dāng)前槽位置
            l := t.slots[t.currentPos]
            for e := l.Front(); e != nil; {
                node := e.Value.(*TimeNode)
                if node.cycle > 0 {
                    node.cycle-- // 減少圈數(shù)
                    e = e.Next()
                    continue
                }
                t.tasks <- node.task // 將任務(wù)發(fā)送到任務(wù)channel
                next := e.Next()
                l.Remove(e) // 移除已執(zhí)行的任務(wù)
                e = next
            }

        }
    }
}

// AddTask 添加任務(wù)到時(shí)間輪。
func (t *TimeWheel) AddTask(task Task, delay time.Duration) {
    pos := (t.currentPos + int(delay/t.interval)) % len(t.slots) // 計(jì)算任務(wù)應(yīng)該放入的槽位置
    cycle := int(delay/t.interval) / len(t.slots) // 計(jì)算任務(wù)需要等待的圈數(shù)

    if t.slots[pos] == nil {
        t.slots[pos] = list.New()
    }
    t.slots[pos].PushBack(
        &TimeNode{
            task:    task,
            excTime: time.Now().Add(delay),
            cycle:   cycle,
        },
    )
}

說(shuō)明文檔

1. 概述

時(shí)間輪(TimeWheel)是一個(gè)用于任務(wù)調(diào)度的數(shù)據(jù)結(jié)構(gòu),它允許你以固定的時(shí)間間隔調(diào)度任務(wù)。這段Go代碼提供了一個(gè)簡(jiǎn)單的時(shí)間輪實(shí)現(xiàn)。

2. 結(jié)構(gòu)體和接口

  • Task: 一個(gè)接口,所有希望被時(shí)間輪調(diào)度的任務(wù)都應(yīng)該實(shí)現(xiàn)這個(gè)接口的 Execute 方法。
  • TimeNode: 代表時(shí)間輪上的節(jié)點(diǎn),持有一個(gè)任務(wù)和任務(wù)的執(zhí)行時(shí)間以及剩余圈數(shù)。
  • TimeWheel: 時(shí)間輪的主體,包含了時(shí)間輪的各項(xiàng)配置,如時(shí)間間隔、槽數(shù)組等,并且可以開始和停止任務(wù)調(diào)度。

3. 方法

  • NewTimeWheel: 創(chuàng)建并返回一個(gè)新的時(shí)間輪實(shí)例。
  • DefaultTimeWheel: 創(chuàng)建一個(gè)擁有默認(rèn)配置(一秒鐘的間隔,60個(gè)槽)的時(shí)間輪實(shí)例。
  • Start: 啟動(dòng)時(shí)間輪,開始調(diào)度任務(wù)。
  • Stop: 停止時(shí)間輪,取消所有調(diào)度任務(wù)。
  • Loop: 作為時(shí)間輪的核心調(diào)度循環(huán),負(fù)責(zé)移動(dòng)槽位置,執(zhí)行到期的任務(wù)。
  • AddTask: 向時(shí)間輪添加任務(wù),可以指定延遲時(shí)間。

4. 使用方式

創(chuàng)建時(shí)間輪實(shí)例,通過(guò) AddTask 方法添加任務(wù),然后調(diào)用 Start 方法開始調(diào)度。當(dāng)不再需要時(shí)間輪時(shí),可以調(diào)用 Stop 方法來(lái)停止所有調(diào)度。

5. 注意事項(xiàng)

  • 時(shí)間輪初始化時(shí)會(huì)創(chuàng)建一個(gè)任務(wù)通道 tasks,其大小為1000,因此它可以在不阻塞的情況下緩存多達(dá)1000個(gè)待執(zhí)行的任務(wù)。
  • 時(shí)間輪的 Loop 方法使用了Go的 select 語(yǔ)句來(lái)高效的處理任務(wù)的執(zhí)行和時(shí)間輪的停止。
  • 在停止時(shí)間輪時(shí),應(yīng)確保所有的任務(wù)都已經(jīng)完成或不需要執(zhí)行,以避免資源泄露或未完成的任務(wù)。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容