Go定時任務(wù)源碼 - robfig/cron

介紹

robfig/cron是Go語言實現(xiàn)的開源定時任務(wù)調(diào)度框架,核心代碼是巧妙的使用chan + select + for實現(xiàn)了一個輕量級調(diào)度協(xié)程,不但語法簡潔,而且具有很好的性能。

設(shè)計

任務(wù)抽象(業(yè)務(wù)隔離):任務(wù)抽象成一個Job接口,業(yè)務(wù)邏輯類只需實現(xiàn)該接口

type Job interface {
  Run()
}

計劃接口:通過當(dāng)前時間計算任務(wù)的下次執(zhí)行執(zhí)行時間,具體實現(xiàn)類可以根據(jù)實際需求實現(xiàn)

type Schedule interface {
    Next(time.Time) time.Time
}

定時任務(wù)對象:保存執(zhí)行的任務(wù)Job、計算執(zhí)行時間

type Entry struct {
    ID       EntryID   // id
    Schedule Schedule  // 計劃
    Next     time.Time // 下次執(zhí)行時間
    Job      Job       // 任務(wù)
}

任務(wù)調(diào)度管理:保存定時任務(wù)對象(Entry),調(diào)度任務(wù)執(zhí)行,提供新增、刪除接口(涉及關(guān)聯(lián)資源競爭)

// 任務(wù)管理類
type Cron struct {
    nextID  int64        // 生成entry自增ID
    entries []*Entry     // 保存Entry
    add     chan *Entry  // 添加
    remove  chan EntryID // 刪除
}
// 刪除
func (c *Cron) Remove(id EntryID) { 
    c.remove <- id
}
// 新增
func (c *Cron) Add(spec string, cmd Job) EntryID  { 
    entry := &Entry{
        ID:         EntryID(atomic.AddInt64(&c.nextID, 1)),
        Schedule:   ParseStandard(spec),
        Job:        cmd,
    }
    c.add <- entry
    return entry.ID
}

核心調(diào)度:計算下次執(zhí)行時間 -> 排序 -> 取最早執(zhí)行數(shù)據(jù) -> timer 等待,因為只有一個協(xié)程在執(zhí)行這個run的調(diào)度,所以不存在資源競爭,不需要加鎖,另外考慮到執(zhí)行任務(wù)可能涉及阻塞,例如:IO操作,所以一般startJob方法會開啟協(xié)程執(zhí)行

func (c *Cron) run() {
    now := time.Now()
    for _, entry := range c.entries {
        entry.Next = entry.Schedule.Next(now) // 計算下次執(zhí)行時間
    }
    for {
        sort.Sort(byTime(c.entries)) // 時間排序
        timer := time.NewTimer(c.entries[0].Next.Sub(now))
        select {
        case now = <-timer.C:
            for _, e := range c.entries {
                if e.Next.After(now) || e.Next.IsZero() {
                    break
                }
                c.startJob(e.Job) // 開協(xié)程執(zhí)行
                e.Next = e.Schedule.Next(now) // 計算下次執(zhí)行時間
            }
        case newEntry := <-c.add: // 新增
            timer.Stop()
            newEntry.Next = newEntry.Schedule.Next(now)
            c.entries = append(c.entries, newEntry)
        }
    ...
    }
}
// 執(zhí)行任務(wù)
func (c *Cron) startJob(j Job) {
    go func() {
        j.Run()
    }()
}

啟動時會開啟唯一協(xié)程執(zhí)行run方法,計算任務(wù)執(zhí)行時間,執(zhí)行,任務(wù)管理等

func New() *Cron {
    c := &Cron{
        entries: nil,
        add:     make(chan *Entry),
        remove:  make(chan EntryID),
    }
    return c
}
func (c *Cron) Start() {
    go c.run()
}

總結(jié)

  1. 共享資源(定時任務(wù))的管理和調(diào)度由唯一協(xié)程管理
  2. 通過for + select + channel來循環(huán)計算執(zhí)行時間,監(jiān)聽任務(wù)到期、增刪事件
  3. 執(zhí)行任務(wù)會新啟協(xié)程執(zhí)行,不阻塞調(diào)度
  4. 采用扇入/扇出原理,多協(xié)程添加、增刪任務(wù)調(diào)度協(xié)程(Fan In),調(diào)度啟動新協(xié)程執(zhí)行任務(wù)(Fan Out)
  5. 調(diào)度協(xié)程使用的是CSP并發(fā)模型思想

我的博客:https://itart.cn
原文地址:https://itart.cn/blogs/2022/explore/cron-source-code.html

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容