手動(dòng)實(shí)現(xiàn)一個(gè)時(shí)間輪
// 包 timewheel 提供了一個(gè)簡(jiǎn)單的時(shí)間輪調(diào)度器的實(shí)現(xiàn)。
package timewheel
import (
"container/list" // 引入 list 包來(lái)使用雙向鏈表
"context" // 引入 context 包來(lái)進(jìn)行上下文控制
"time" // 引入 time 包來(lái)處理時(shí)間相關(guān)的功能
)
// Task 接口定義了需要時(shí)間輪執(zhí)行的任務(wù),它必須實(shí)現(xiàn) Execute 方法。
type Task interface {
Execute()
}
// TimeNode 結(jié)構(gòu)體表示時(shí)間輪上的一個(gè)節(jié)點(diǎn)。
type TimeNode struct {
task Task // 任務(wù)實(shí)例
excTime time.Time // 預(yù)定的執(zhí)行時(shí)間
cycle int // 時(shí)間輪需要轉(zhuǎn)動(dòng)多少圈后執(zhí)行任務(wù)
}
// TimeWheel 結(jié)構(gòu)體表示時(shí)間輪。
type TimeWheel struct {
startTime time.Time // 時(shí)間輪的啟動(dòng)時(shí)間
interval time.Duration // 時(shí)間輪的時(shí)間間隔
currentPos int // 當(dāng)前的槽位置
ticker *time.Ticker // 定時(shí)觸發(fā)器
slots []*list.List // 時(shí)間輪的槽數(shù)組,每個(gè)槽點(diǎn)是一個(gè)鏈表
tasks chan Task // 任務(wù)channel,用于執(zhí)行任務(wù)
stop chan struct{} // 停止channel,用于停止時(shí)間輪
cancel context.CancelFunc // 上下文取消函數(shù)
}
// NewTimeWheel 創(chuàng)建并初始化一個(gè)時(shí)間輪實(shí)例。
func NewTimeWheel(interval time.Duration, slotNum int) *TimeWheel {
slots := make([]*list.List, slotNum)
for i := range slots {
slots[i] = list.New()
}
return &TimeWheel{
startTime: time.Now().UTC(),
interval: interval,
currentPos: 0,
ticker: time.NewTicker(interval),
slots: slots,
tasks: make(chan Task, 1000),
stop: make(chan struct{}),
}
}
// DefaultTimeWheel 創(chuàng)建一個(gè)默認(rèn)配置的時(shí)間輪實(shí)例。
func DefaultTimeWheel() *TimeWheel {
return NewTimeWheel(time.Second, 60)
}
// Start 啟動(dòng)時(shí)間輪,使其開始調(diào)度任務(wù)。
func (t *TimeWheel) Start() {
ctx := context.Background()
ctx, cancelFunc := context.WithCancel(ctx)
t.cancel = cancelFunc
go t.Loop(ctx)
go func() {
for {
select {
case task := <-t.tasks: // 當(dāng)任務(wù)channel中有任務(wù)時(shí),執(zhí)行任務(wù)
task.Execute()
case <-t.stop: // 當(dāng)接收到停止信號(hào)時(shí),停止定時(shí)器并退出
t.ticker.Stop()
return
}
}
}()
}
// Stop 停止時(shí)間輪,取消所有調(diào)度。
func (t *TimeWheel) Stop() {
t.stop <- struct{}{} // 發(fā)送停止信號(hào)
t.cancel() // 調(diào)用上下文的取消函數(shù)
}
// Loop 是時(shí)間輪的核心調(diào)度循環(huán)。
func (t *TimeWheel) Loop(ctx context.Context) {
for {
select {
case <-ctx.Done(): // 當(dāng)上下文被取消時(shí)退出循環(huán)
return
case <-t.ticker.C: // 定時(shí)器觸發(fā)時(shí)執(zhí)行
t.currentPos = (t.currentPos + 1) % len(t.slots) // 計(jì)算當(dāng)前槽位置
l := t.slots[t.currentPos]
for e := l.Front(); e != nil; {
node := e.Value.(*TimeNode)
if node.cycle > 0 {
node.cycle-- // 減少圈數(shù)
e = e.Next()
continue
}
t.tasks <- node.task // 將任務(wù)發(fā)送到任務(wù)channel
next := e.Next()
l.Remove(e) // 移除已執(zhí)行的任務(wù)
e = next
}
}
}
}
// AddTask 添加任務(wù)到時(shí)間輪。
func (t *TimeWheel) AddTask(task Task, delay time.Duration) {
pos := (t.currentPos + int(delay/t.interval)) % len(t.slots) // 計(jì)算任務(wù)應(yīng)該放入的槽位置
cycle := int(delay/t.interval) / len(t.slots) // 計(jì)算任務(wù)需要等待的圈數(shù)
if t.slots[pos] == nil {
t.slots[pos] = list.New()
}
t.slots[pos].PushBack(
&TimeNode{
task: task,
excTime: time.Now().Add(delay),
cycle: cycle,
},
)
}
說(shuō)明文檔
1. 概述
時(shí)間輪(TimeWheel)是一個(gè)用于任務(wù)調(diào)度的數(shù)據(jù)結(jié)構(gòu),它允許你以固定的時(shí)間間隔調(diào)度任務(wù)。這段Go代碼提供了一個(gè)簡(jiǎn)單的時(shí)間輪實(shí)現(xiàn)。
2. 結(jié)構(gòu)體和接口
-
Task: 一個(gè)接口,所有希望被時(shí)間輪調(diào)度的任務(wù)都應(yīng)該實(shí)現(xiàn)這個(gè)接口的Execute方法。 -
TimeNode: 代表時(shí)間輪上的節(jié)點(diǎn),持有一個(gè)任務(wù)和任務(wù)的執(zhí)行時(shí)間以及剩余圈數(shù)。 -
TimeWheel: 時(shí)間輪的主體,包含了時(shí)間輪的各項(xiàng)配置,如時(shí)間間隔、槽數(shù)組等,并且可以開始和停止任務(wù)調(diào)度。
3. 方法
-
NewTimeWheel: 創(chuàng)建并返回一個(gè)新的時(shí)間輪實(shí)例。 -
DefaultTimeWheel: 創(chuàng)建一個(gè)擁有默認(rèn)配置(一秒鐘的間隔,60個(gè)槽)的時(shí)間輪實(shí)例。 -
Start: 啟動(dòng)時(shí)間輪,開始調(diào)度任務(wù)。 -
Stop: 停止時(shí)間輪,取消所有調(diào)度任務(wù)。 -
Loop: 作為時(shí)間輪的核心調(diào)度循環(huán),負(fù)責(zé)移動(dòng)槽位置,執(zhí)行到期的任務(wù)。 -
AddTask: 向時(shí)間輪添加任務(wù),可以指定延遲時(shí)間。
4. 使用方式
創(chuàng)建時(shí)間輪實(shí)例,通過(guò) AddTask 方法添加任務(wù),然后調(diào)用 Start 方法開始調(diào)度。當(dāng)不再需要時(shí)間輪時(shí),可以調(diào)用 Stop 方法來(lái)停止所有調(diào)度。
5. 注意事項(xiàng)
- 時(shí)間輪初始化時(shí)會(huì)創(chuàng)建一個(gè)任務(wù)通道
tasks,其大小為1000,因此它可以在不阻塞的情況下緩存多達(dá)1000個(gè)待執(zhí)行的任務(wù)。 - 時(shí)間輪的
Loop方法使用了Go的select語(yǔ)句來(lái)高效的處理任務(wù)的執(zhí)行和時(shí)間輪的停止。 - 在停止時(shí)間輪時(shí),應(yīng)確保所有的任務(wù)都已經(jīng)完成或不需要執(zhí)行,以避免資源泄露或未完成的任務(wù)。