Golang 定時任務管理

本文基于Golang Crontab 實現(xiàn)了一個Crontab Job Manager。更加容易使用,同時也能夠滿足更加復雜的場景。

倉儲地址, 如果有用,歡迎點贊,歡迎討論,歡迎找茬。

需求

在開發(fā)中,經(jīng)常遇到一些需要定時任務的場景。各個語言都有定時語言的庫,Golang Cron 提供了Crontab Golang語言版本。這個庫非常不錯,提供最基本的定時任務編排的功能。但是一些復雜需求無法滿足,比如

  • 任何定時任務都有可能失敗,失敗了就panic了,這樣非常不友好。最起碼能夠讓我控制,失敗是重試還是停止
  • 某些任務執(zhí)行周期要10s, 而用戶設置的5s一執(zhí)行,我能不能保證任何時間這個任務只執(zhí)行一次
  • 我想實時的看到任務的狀態(tài),比如是不是在運行?下次運行時間?上次運行時間?
  • 我想看到任務執(zhí)行了多少次,成功了多少次
  • 我想要限制最大任務數(shù)量,比如超過10個任務在執(zhí)行,不運行新的任務執(zhí)行
  • 任務執(zhí)行完了可以告訴我邏輯上有錯誤,還是有結果。我還可以加上一些鉤子函數(shù)來處理任務執(zhí)行的結果

以上的需求都非常常見,可惜這個庫都不支持^_^.

完全沒用的例子

復雜定義任務的場景模型抽象出來大概也就是下面幾個功能點,這個沒用的例子可以很好的體現(xiàn)出來

  • 用戶通過接口,告訴后臺我要做一個什么定時工作,schedule是什么
  • 查看所有定時任務的狀態(tài)
  • 查看所有定時任務的工作結果

本地運行

通過以下命令本地運行

go get -u "github.com/OhBonsai/croner"
go get -u "github.com/gin-gonic/gin"
go get -u "github.com/gorilla/websocket"
cd $GOPATH/src/github.com/OhBonsai/croner/example

go run server.go 
# 打開localhost:8000
完全沒用的例子

前端解釋

原諒我的狗屎前端。怕大家看不懂,我還是解釋一下前端各個部分什么意思。

  1. 圖中①的區(qū)域,是計劃定義區(qū),可以設置一些參數(shù),表示誰多久往聊天室說一句什么話。第二個表單可以輸入1-10的數(shù)字,表示每隔幾秒說話。當然cron支持六位的crontab周期定義。
  2. 圖中②的區(qū)域,是執(zhí)行任務狀態(tài)區(qū),每秒刷新一次
  3. 圖中3的區(qū)域,就是我們的聊天室啦。后臺定時任務鉤子函數(shù)會定時把消息推到channel中,如果websocket服務端收到消息就發(fā)送到瀏覽器

后端邏輯

  1. 實現(xiàn)定時計劃接口func Run() croner.JobRunReturn
type JobS struct {
    Duration int    `json:"duration"`
    Who      string `json:"who"`
    What     string `json:"what"`
}

func (j JobS) Run() croner.JobRunReturn {
    return croner.JobRunReturn{
        Value: fmt.Sprintf("[%s] %s: %s", time.Now().Format(time.RFC850), j.Who, j.What),
    }
}

  1. 初始化設置
var manager = croner.NewCronManager(croner.CronManagerConfig{
    true, false, 0, 0,
})
  1. 加上鉤子函數(shù),如果接收到任務執(zhí)行結果,將結果傳到ch channel
croner.OnJobReturn(func(runReturn *croner.JobRunReturnWithEid) {
    say := runReturn.Value.(string)
    ch <- say
})
  1. 每當接受到post請求,就創(chuàng)建一個任務
_, err = manager.Add(fmt.Sprintf("@every %ds", curJob.Duration), curJob, nil)
  1. 輪詢獲區(qū)ch傳過來的值,通過websocket傳到前端
for {
    select {
    case msg := <-ch:
        conn.WriteMessage(websocket.TextMessage, []byte(msg))
    default:
        continue
    }
}

實現(xiàn)

詳細的使用可以查看測試文件,

任務接口

任務只要實現(xiàn)run()函數(shù)就行啦。這樣我就可以包裝你這個函數(shù)

type JobRunReturn struct {
    Value interface{}
    Error error
}

type JobInf interface {
    Run() JobRunReturn
}

任務失敗控制

Cron沒有失敗控制,通過包裝run()函數(shù)來實現(xiàn)cron的job接口來增加一些邏輯。加上一個defer來恢復panic, 通過設置配置ignorePanic來控制是否忽略錯誤繼續(xù)執(zhí)行,還是發(fā)生錯誤就是STOP

    defer func() {
        j.TotalCount += 1
        if err := recover(); err != nil {
            errString := fmt.Sprintf("WrappedJob-%d %s  execute fail. error is %s", j.Id, j.Name, err)
            println(errString)
            atomic.StoreUint32(&j.status, FAIL)
            if !j.father.ignorePanic {
                j.father.DisActive(j.Id)
            }
            j.father.jobReturnsWithEid <- JobRunReturnWithEid{
                JobRunReturn{nil, JobRunError{errString}},
                j.Id,
            }
        }
        return
    }()

單任務周期時間只執(zhí)行一次

這個主要靠鎖來實現(xiàn),任務運行時就鎖住,直到完成之后才釋放

j.running.Lock()
defer j.running.Unlock()

任務狀態(tài)變更

通過原子操作來變更任務狀態(tài)

atomic.StoreUint32(&(j.status), RUNNING)
defer atomic.StoreUint32(&(j.status), IDLE)

最大任務數(shù)量

通過buffered channel來實現(xiàn)最大任務數(shù)量

permit = make(chan struct{}, c.PoolSize)
permit <- struct{}{}
defer func() { <-permit }()

鉤子

不斷獲取任務回傳結果,然后遍歷執(zhí)行鉤子函數(shù)

    go func(){
        for {
            select {
            case value := <-r.jobReturnsWithEid:
                jobReturnHooks.Run(&value)
            case <-r.stop:
                return
            }
        }
    }()

缺陷

超時停止,本來嘗試做的,配置里面都預留了這個字段。結果發(fā)現(xiàn)有問題。這個貌似要修改croner的源碼,我不想這么做,但又想不出其他實現(xiàn)方案,我畢竟剛使用golang編程。如果有讀者碰到類似問題或者有想法留言提醒我呀

OnlyOne 單次執(zhí)行的時候,下次執(zhí)行的時間就無法預測了。這個時候把任務的Next設置為一個不可能的值,比如1970-0-0。但如果在周期內(nèi)執(zhí)行完了,下次執(zhí)行時間就準了...這貌似沒辦法解決。我也不知道任務什么時候執(zhí)行完。

學習強大的APScheduler, Quartz

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容