本文基于Golang Crontab 實現(xiàn)了一個Crontab Job Manager。更加容易使用,同時也能夠滿足更加復雜的場景。
倉儲地址, 如果有用,歡迎點贊,歡迎討論,歡迎找茬。
需求
在開發(fā)中,經(jīng)常遇到一些需要定時任務的場景。各個語言都有定時語言的庫,Golang Cron 提供了Crontab Golang語言版本。這個庫非常不錯,提供最基本的定時任務編排的功能。但是一些復雜需求無法滿足,比如
- 任何定時任務都有可能失敗,失敗了就panic了,這樣非常不友好。最起碼能夠讓我控制,失敗是重試還是停止
- 某些任務執(zhí)行周期要10s, 而用戶設置的5s一執(zhí)行,我能不能保證任何時間這個任務只執(zhí)行一次
- 我想實時的看到任務的狀態(tài),比如是不是在運行?下次運行時間?上次運行時間?
- 我想看到任務執(zhí)行了多少次,成功了多少次
- 我想要限制最大任務數(shù)量,比如超過10個任務在執(zhí)行,不運行新的任務執(zhí)行
- 任務執(zhí)行完了可以告訴我邏輯上有錯誤,還是有結果。我還可以加上一些鉤子函數(shù)來處理任務執(zhí)行的結果
以上的需求都非常常見,可惜這個庫都不支持^_^.
完全沒用的例子
復雜定義任務的場景模型抽象出來大概也就是下面幾個功能點,這個沒用的例子可以很好的體現(xiàn)出來
- 用戶通過接口,告訴后臺我要做一個什么定時工作,schedule是什么
- 查看所有定時任務的狀態(tài)
- 查看所有定時任務的工作結果
本地運行
通過以下命令本地運行
go get -u "github.com/OhBonsai/croner"
go get -u "github.com/gin-gonic/gin"
go get -u "github.com/gorilla/websocket"
cd $GOPATH/src/github.com/OhBonsai/croner/example
go run server.go
# 打開localhost:8000

前端解釋
原諒我的狗屎前端。怕大家看不懂,我還是解釋一下前端各個部分什么意思。
- 圖中①的區(qū)域,是計劃定義區(qū),可以設置一些參數(shù),表示
誰多久往聊天室說一句什么話。第二個表單可以輸入1-10的數(shù)字,表示每隔幾秒說話。當然cron支持六位的crontab周期定義。 - 圖中②的區(qū)域,是執(zhí)行任務狀態(tài)區(qū),每秒刷新一次
- 圖中3的區(qū)域,就是我們的聊天室啦。后臺定時任務鉤子函數(shù)會定時把消息推到
channel中,如果websocket服務端收到消息就發(fā)送到瀏覽器
后端邏輯
- 實現(xiàn)定時計劃接口
func Run() croner.JobRunReturn
type JobS struct {
Duration int `json:"duration"`
Who string `json:"who"`
What string `json:"what"`
}
func (j JobS) Run() croner.JobRunReturn {
return croner.JobRunReturn{
Value: fmt.Sprintf("[%s] %s: %s", time.Now().Format(time.RFC850), j.Who, j.What),
}
}
- 初始化設置
var manager = croner.NewCronManager(croner.CronManagerConfig{
true, false, 0, 0,
})
- 加上鉤子函數(shù),如果接收到任務執(zhí)行結果,將結果傳到
chchannel
croner.OnJobReturn(func(runReturn *croner.JobRunReturnWithEid) {
say := runReturn.Value.(string)
ch <- say
})
- 每當接受到post請求,就創(chuàng)建一個任務
_, err = manager.Add(fmt.Sprintf("@every %ds", curJob.Duration), curJob, nil)
- 輪詢獲區(qū)
ch傳過來的值,通過websocket傳到前端
for {
select {
case msg := <-ch:
conn.WriteMessage(websocket.TextMessage, []byte(msg))
default:
continue
}
}
實現(xiàn)
詳細的使用可以查看測試文件,
任務接口
任務只要實現(xiàn)run()函數(shù)就行啦。這樣我就可以包裝你這個函數(shù)
type JobRunReturn struct {
Value interface{}
Error error
}
type JobInf interface {
Run() JobRunReturn
}
任務失敗控制
Cron沒有失敗控制,通過包裝run()函數(shù)來實現(xiàn)cron的job接口來增加一些邏輯。加上一個defer來恢復panic, 通過設置配置ignorePanic來控制是否忽略錯誤繼續(xù)執(zhí)行,還是發(fā)生錯誤就是STOP
defer func() {
j.TotalCount += 1
if err := recover(); err != nil {
errString := fmt.Sprintf("WrappedJob-%d %s execute fail. error is %s", j.Id, j.Name, err)
println(errString)
atomic.StoreUint32(&j.status, FAIL)
if !j.father.ignorePanic {
j.father.DisActive(j.Id)
}
j.father.jobReturnsWithEid <- JobRunReturnWithEid{
JobRunReturn{nil, JobRunError{errString}},
j.Id,
}
}
return
}()
單任務周期時間只執(zhí)行一次
這個主要靠鎖來實現(xiàn),任務運行時就鎖住,直到完成之后才釋放
j.running.Lock()
defer j.running.Unlock()
任務狀態(tài)變更
通過原子操作來變更任務狀態(tài)
atomic.StoreUint32(&(j.status), RUNNING)
defer atomic.StoreUint32(&(j.status), IDLE)
最大任務數(shù)量
通過buffered channel來實現(xiàn)最大任務數(shù)量
permit = make(chan struct{}, c.PoolSize)
permit <- struct{}{}
defer func() { <-permit }()
鉤子
不斷獲取任務回傳結果,然后遍歷執(zhí)行鉤子函數(shù)
go func(){
for {
select {
case value := <-r.jobReturnsWithEid:
jobReturnHooks.Run(&value)
case <-r.stop:
return
}
}
}()
缺陷
超時停止,本來嘗試做的,配置里面都預留了這個字段。結果發(fā)現(xiàn)有問題。這個貌似要修改croner的源碼,我不想這么做,但又想不出其他實現(xiàn)方案,我畢竟剛使用golang編程。如果有讀者碰到類似問題或者有想法留言提醒我呀
OnlyOne 單次執(zhí)行的時候,下次執(zhí)行的時間就無法預測了。這個時候把任務的Next設置為一個不可能的值,比如1970-0-0。但如果在周期內(nèi)執(zhí)行完了,下次執(zhí)行時間就準了...這貌似沒辦法解決。我也不知道任務什么時候執(zhí)行完。
學習強大的APScheduler, Quartz