背景介紹
通過(guò)API執(zhí)行一個(gè)復(fù)雜的任務(wù)(包含多個(gè)子任務(wù))時(shí),做成異步接口是一個(gè)比較好的選擇。但是這樣做有幾個(gè)問(wèn)題就要解決:
- 任務(wù)執(zhí)行到一半,宿主機(jī)宕機(jī)或重啟,任務(wù)如何保證可靠性(任務(wù)繼續(xù)執(zhí)行完成)。
- 客戶端通過(guò)API可以隨時(shí)查看任務(wù)的執(zhí)行進(jìn)展(子任務(wù)完成時(shí)刷新狀態(tài))。
- 執(zhí)行子任務(wù)后失敗后要支持對(duì)前面的子任務(wù)進(jìn)行回滾(FailRollBack), 回滾的流程按照子任務(wù)的逆向流程執(zhí)行。
舉例:任務(wù)流程:子任務(wù)1 -> 子任務(wù)2 -> 子任務(wù)3 -> 子任務(wù)4,在子任務(wù)3失敗時(shí),按照 子任務(wù)3 -> 子任務(wù)2 -> 子任務(wù)1順序進(jìn)行回滾(回滾流程支持靈活配置) - 子任務(wù)執(zhí)行成功要支持執(zhí)行SuccessCallBack
舉例: 執(zhí)行子任務(wù)成功后,刷新任務(wù)的執(zhí)行的進(jìn)展為當(dāng)前子任務(wù)執(zhí)行成功,可以放在SuccessCallBack執(zhí)行 - 子任務(wù)執(zhí)行失敗要支持執(zhí)行FailCallBack
舉例: 執(zhí)行子任務(wù)失敗時(shí),需要刷新任務(wù)狀態(tài)為failed,可以放在FailCallBack執(zhí)行
解決方案
1. API服務(wù)通過(guò)主備的方式部署(可靠性)。
go使用Gin框架,能夠支持幾千的并發(fā)訪問(wèn)。go協(xié)程可以支持本實(shí)例并發(fā)任務(wù)處理。要是任務(wù)量過(guò)大,需要多實(shí)例并行處理,建議使用基于事件的主從模式(本文不做討論)。
主備模式的實(shí)現(xiàn)參考k8s的控制器互斥鎖機(jī)制(宿主機(jī)宕機(jī)或重啟時(shí),通過(guò)K8s基礎(chǔ)設(shè)施完成備實(shí)例切換成主實(shí)例)
代碼:
import
(
...
"k8s.io/client-go/tools/leaderelection/resourcelock"
...
)
func Run(ctx context.Context, config *appconfig.Config, stopCh <-chan struct{}) error {
// Start watch config file change.
if config.ComponentConfig.EnableWatch {
go config.ComponentConfig.Watch(ctx)
}
ctrl := controller.New(config, stopCh)
name, err := os.Hostname()
if err != nil {
panic("get hostname failed")
}
// 搶占主實(shí)例
rl, err := resourcelock.NewFromKubeconfig(
resourcelock.EndpointsLeasesResourceLock,
config.KubeConfig.Leaderelection.Namespace,
config.KubeConfig.Leaderelection.Name,
resourcelock.ResourceLockConfig{
Identity: name + "_" + string(uuid.NewUUID()),
},
config.KubeConfig.KubeRestConfig,
15*time.Second)
if err != nil {
return err
}
// start the leader election code loop
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: rl,
// IMPORTANT: you MUST ensure that any code you have that
// is protected by the lease must terminate **before**
// you call cancel. Otherwise, you could have a background
// loop still running and another process could
// get elected before your background loop finished, violating
// the stated goal of the lease.
ReleaseOnCancel: true,
LeaseDuration: 60 * time.Second,
RenewDeadline: 15 * time.Second,
RetryPeriod: 5 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
// we're notified when we start - this is where you would
// usually put your code
if err := ctrl.Run(ctx.Done()); err != nil {
panic(err)
}
},
OnStoppedLeading: func() {
// we can do cleanup here
log.Warn("leaderelection lost")
os.Exit(1)
},
},
})
log.Info("Stop Server With Graceful.")
return nil
}
2. 工作流框架(子任務(wù)拆分)
有一些開源的工作流框架可以考慮,如:Conductor,但是總體使用下來(lái)比較重,性能有影響,在處理并發(fā)任務(wù)時(shí)會(huì)有故障,問(wèn)題也不太好排查。如下是我開發(fā)的直接嵌入的工作流處理框架:通過(guò)編排方式對(duì)子任務(wù)主流程,子任務(wù)的失敗處理FailCallBack,子任務(wù)的狀態(tài)刷新SuccessCallBack,子任務(wù)的重試,子任務(wù)的回滾RollBack進(jìn)行編排,可以根據(jù)業(yè)務(wù)場(chǎng)景靈活配置。工作流框架代碼可以直接引入到業(yè)務(wù)代碼中,性能較高,出問(wèn)題也比較容易排查,已驗(yàn)證可以有效解決問(wèn)題。
代碼
(如下連接上下文的Metadata(子任務(wù)上下文狀態(tài)同步),還沒(méi)有設(shè)計(jì)為通用型的結(jié)構(gòu),各業(yè)務(wù)場(chǎng)景可以設(shè)計(jì)自己的模型結(jié)構(gòu))
type AppWorkFlow struct {
Metadata *v1.WorkflowAppParams
Works []AppWork
}
type AppWork struct {
WorkName string
Work WorkFlowFn
RollBack WorkFlowFn
SuccessCallBack WorkFlowFn
FailCallBack WorkFlowFn
}
type WorkFlowFn func(*v1.WorkflowAppParams) error
func (workFlow *AppWorkFlow) Start() {
rollBackWorks := []WorkFlowFn{}
for _, work := range workFlow.Works {
if work.RollBack != nil {
rollBackWorks = append(rollBackWorks, work.RollBack)
}
metadata := workFlow.Metadata
if err := work.Work(metadata); err != nil {
metadata.Logger.Infof("app %s process %s do err,%+v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
// 更新失敗狀態(tài)
if err := work.FailCallBack(metadata); err != nil {
metadata.Logger.Warnf("app %s process %s FailCallBack err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
return
}
for _, rollBackWork := range rollBackWorks {
if err := rollBackWork(workFlow.Metadata); err != nil {
metadata.Logger.Warnf("rollback %s process %s do err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
}
}
// 回滾完成即任務(wù)退出
return
}
}
}
在一些錯(cuò)誤場(chǎng)景,子任務(wù)需要重試,工作流框架提供重試子任務(wù)的功能可以在編排時(shí)直接配置
func WorkforTimeout(workName string, period int64, timeOut int64, work WorkFlowFn) WorkFlowFn {
return func(app *v1.WorkflowAppParams) error {
for timeOut > 0 {
if err := work(app); err != nil {
if !errors.Is(err, ErrNeedRetry) {
log.Errorf("%s err, %v, break work", workName, err)
return err
}
log.Infof("%s err, %v, continue work", workName, err)
time.Sleep(time.Duration(period) * time.Second)
timeOut--
continue
}
return nil
}
app.AppRecord.Reason = fmt.Sprintf("%s until timeout: [%s]", workName, app.AppRecord.Reason)
return errors.Errorf("%s until timeout", workName)
}
}
var ErrNeedRetry = errors.New("need retry")
業(yè)務(wù)代碼
每個(gè)子任務(wù)流程中只需更新上下文模型的狀態(tài),可以通過(guò)配置FailCallBack更新失敗的狀態(tài),也可以也通過(guò)配置SuccsessCallBack更新任務(wù)的執(zhí)行進(jìn)展,若是需要在后面的子任務(wù)失敗時(shí)被回滾(一些資源釋放的場(chǎng)景)則可以配置RollBack(SuccsessCallBack和RollBack下面的流程沒(méi)有舉例)
const (
WORK_CREATE_CREATEINSTANCE = "create app instance"
WORK_CREATE_UPDATECREATESTATUS = "update creating status"
WORK_CREATE_CHECKAPPCREATESTATUS = "check app create status"
WORK_CREATE_UPDATERUNNIGSTATUS = "update running status"
)
func (ac *AppController) CreateApp(params *v1.WorkflowAppParams) (*appv1.CreateAppResponse, error) {
createAppWorkFlow := AppWorkFlow{
Metadata: params,
Works: []AppWork{
{
WorkName: WORK_CREATE_CREATEINSTANCE,
Work: ac.createAppInstance,
FailCallBack: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_UPDATECREATESTATUS,
Work: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_CHECKAPPCREATESTATUS,
Work: WorkforTimeout(WORK_CREATE_CHECKAPPCREATESTATUS, 3, 5*60, ac.checkAppStatus),
FailCallBack: ac.updateAppStatus,
},
{
WorkName: WORK_CREATE_UPDATERUNNIGSTATUS,
Work: ac.updateAppStatus,
},
},
}
go createAppWorkFlow.Start()
return &appv1.CreateAppResponse{
Status: 200,
Data: params.AppRecord.Id,
}, nil
}
如下流程即執(zhí)行子任務(wù)的過(guò)程,錯(cuò)誤時(shí)返回err即可,會(huì)在defer中通過(guò)metadata刷新狀態(tài),工作流的主流程檢查到子任務(wù)發(fā)生錯(cuò)誤會(huì)執(zhí)行FailCallBack或者RollBack刷新狀態(tài)或回滾
func (ac *AppController) createAppInstance(params *v1.WorkflowAppParams) (err error) {
defer func() {
if err != nil {
params.AppRecord.Reason = err.Error()
params.AppRecord.Status = v1.APP_STATUS_FAILED
}
}()
// 如下流程即執(zhí)行子任務(wù)的過(guò)程
logHeader := deepcopyMap(params.HttpHeader)
logHeader["X-Access-Token"] = utils.MaskToken(logHeader["X-Access-Token"])
var reqBody []byte
var logBody []byte
if params.AppRecord.ManageBy == "vcluster" {
reqBody, err = json.Marshal(params.CreateAppReq)
if err != nil {
return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.CreateAppReq)
}
logBody = reqBody
} else {
formatReqBody := params.AppRequestData
formatReqBody.Spec.Kubeconfig = utils.MaskToken(formatReqBody.Spec.Kubeconfig)
logBody, _ = json.Marshal(formatReqBody)
reqBody, err = json.Marshal(params.AppRequestData)
if err != nil {
return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.AppRequestData)
}
}
httpLogPrint := v1.HttpLogPrint{
Url: params.AppConfig.Url,
Header: logHeader,
Method: "POST",
Body: string(logBody),
}
params.Logger.Infof("http create app instance type %s, url %s", params.AppRecord.ManageBy, params.AppConfig.Url)
resp, err := adapter.AppPostRestRequest(params.HttpHeader, params.AppConfig.Url, reqBody, httpLogPrint)
if err != nil {
return errors.Wrapf(err, "[Failed to create app: %s, err, %v]", params.AppRecord.Name, err)
}
if resp.Code != 0 {
return errors.Errorf("create app: %s Http Post err, errCode: %d, msg: [%s]", params.AppRecord.Name, resp.Code, resp.Msg)
}
if resp.Data.AppId != "" {
// 下層應(yīng)用選擇用自己的appId
params.AppRecord.AppId = resp.Data.AppId
}
params.AppRecord.Status = v1.APP_STATUS_CREATING
return nil
}
本示例中刷新狀態(tài)的函數(shù)作為FailCallBack,也作為子任務(wù)在上一個(gè)子任務(wù)成功時(shí)刷新狀態(tài)。
總之子任務(wù)可以靈活配置,子任務(wù)之間通過(guò)metadata作為上下文進(jìn)行狀態(tài)同步。
func (ac *AppController) updateAppStatus(params *v1.WorkflowAppParams) error {
if params.AppRecord.Reason != "" {
params.AppRecord.Reason = reasonDetail(params.AppRecord.Reason)
}
if err := ac.controller.AppRepo().Update(params.AppRecord); err != nil {
return errors.Errorf("update app %s status %s err, %v", params.AppRecord.Name, params.AppRecord.Status, err)
}
return nil
}