go語(yǔ)言API完成復(fù)雜任務(wù)流程工作實(shí)踐

背景介紹

通過(guò)API執(zhí)行一個(gè)復(fù)雜的任務(wù)(包含多個(gè)子任務(wù))時(shí),做成異步接口是一個(gè)比較好的選擇。但是這樣做有幾個(gè)問(wèn)題就要解決:

  1. 任務(wù)執(zhí)行到一半,宿主機(jī)宕機(jī)或重啟,任務(wù)如何保證可靠性(任務(wù)繼續(xù)執(zhí)行完成)。
  2. 客戶端通過(guò)API可以隨時(shí)查看任務(wù)的執(zhí)行進(jìn)展(子任務(wù)完成時(shí)刷新狀態(tài))。
  3. 執(zhí)行子任務(wù)后失敗后要支持對(duì)前面的子任務(wù)進(jìn)行回滾(FailRollBack), 回滾的流程按照子任務(wù)的逆向流程執(zhí)行。
    舉例:任務(wù)流程:子任務(wù)1 -> 子任務(wù)2 -> 子任務(wù)3 -> 子任務(wù)4,在子任務(wù)3失敗時(shí),按照 子任務(wù)3 -> 子任務(wù)2 -> 子任務(wù)1順序進(jìn)行回滾(回滾流程支持靈活配置)
  4. 子任務(wù)執(zhí)行成功要支持執(zhí)行SuccessCallBack
    舉例: 執(zhí)行子任務(wù)成功后,刷新任務(wù)的執(zhí)行的進(jìn)展為當(dāng)前子任務(wù)執(zhí)行成功,可以放在SuccessCallBack執(zhí)行
  5. 子任務(wù)執(zhí)行失敗要支持執(zhí)行FailCallBack
    舉例: 執(zhí)行子任務(wù)失敗時(shí),需要刷新任務(wù)狀態(tài)為failed,可以放在FailCallBack執(zhí)行

解決方案

1. API服務(wù)通過(guò)主備的方式部署(可靠性)。

go使用Gin框架,能夠支持幾千的并發(fā)訪問(wèn)。go協(xié)程可以支持本實(shí)例并發(fā)任務(wù)處理。要是任務(wù)量過(guò)大,需要多實(shí)例并行處理,建議使用基于事件的主從模式(本文不做討論)。
主備模式的實(shí)現(xiàn)參考k8s的控制器互斥鎖機(jī)制(宿主機(jī)宕機(jī)或重啟時(shí),通過(guò)K8s基礎(chǔ)設(shè)施完成備實(shí)例切換成主實(shí)例)
代碼:

import
(
...
"k8s.io/client-go/tools/leaderelection/resourcelock"
...
)

func Run(ctx context.Context, config *appconfig.Config, stopCh <-chan struct{}) error {
    // Start watch config file change.
    if config.ComponentConfig.EnableWatch {
        go config.ComponentConfig.Watch(ctx)
    }

    ctrl := controller.New(config, stopCh)

    name, err := os.Hostname()
    if err != nil {
        panic("get hostname failed")
    }
    // 搶占主實(shí)例
    rl, err := resourcelock.NewFromKubeconfig(
        resourcelock.EndpointsLeasesResourceLock,
        config.KubeConfig.Leaderelection.Namespace,
        config.KubeConfig.Leaderelection.Name,
        resourcelock.ResourceLockConfig{
            Identity: name + "_" + string(uuid.NewUUID()),
        },
        config.KubeConfig.KubeRestConfig,
        15*time.Second)

    if err != nil {
        return err
    }

    // start the leader election code loop
    leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
        Lock: rl,
        // IMPORTANT: you MUST ensure that any code you have that
        // is protected by the lease must terminate **before**
        // you call cancel. Otherwise, you could have a background
        // loop still running and another process could
        // get elected before your background loop finished, violating
        // the stated goal of the lease.
        ReleaseOnCancel: true,
        LeaseDuration:   60 * time.Second,
        RenewDeadline:   15 * time.Second,
        RetryPeriod:     5 * time.Second,
        Callbacks: leaderelection.LeaderCallbacks{
            OnStartedLeading: func(ctx context.Context) {
                // we're notified when we start - this is where you would
                // usually put your code
                if err := ctrl.Run(ctx.Done()); err != nil {
                    panic(err)
                }
            },
            OnStoppedLeading: func() {
                // we can do cleanup here
                log.Warn("leaderelection lost")
                os.Exit(1)
            },
        },
    })

    log.Info("Stop Server With Graceful.")

    return nil
}

2. 工作流框架(子任務(wù)拆分)

有一些開源的工作流框架可以考慮,如:Conductor,但是總體使用下來(lái)比較重,性能有影響,在處理并發(fā)任務(wù)時(shí)會(huì)有故障,問(wèn)題也不太好排查。如下是我開發(fā)的直接嵌入的工作流處理框架:通過(guò)編排方式對(duì)子任務(wù)主流程,子任務(wù)的失敗處理FailCallBack,子任務(wù)的狀態(tài)刷新SuccessCallBack,子任務(wù)的重試,子任務(wù)的回滾RollBack進(jìn)行編排,可以根據(jù)業(yè)務(wù)場(chǎng)景靈活配置。工作流框架代碼可以直接引入到業(yè)務(wù)代碼中,性能較高,出問(wèn)題也比較容易排查,已驗(yàn)證可以有效解決問(wèn)題。

代碼
(如下連接上下文的Metadata(子任務(wù)上下文狀態(tài)同步),還沒(méi)有設(shè)計(jì)為通用型的結(jié)構(gòu),各業(yè)務(wù)場(chǎng)景可以設(shè)計(jì)自己的模型結(jié)構(gòu))

type AppWorkFlow struct {
    Metadata *v1.WorkflowAppParams
    Works    []AppWork
}

type AppWork struct {
    WorkName        string
    Work            WorkFlowFn
    RollBack        WorkFlowFn
    SuccessCallBack WorkFlowFn
    FailCallBack    WorkFlowFn
}

type WorkFlowFn func(*v1.WorkflowAppParams) error

func (workFlow *AppWorkFlow) Start() {
    rollBackWorks := []WorkFlowFn{}
    for _, work := range workFlow.Works {
        if work.RollBack != nil {
            rollBackWorks = append(rollBackWorks, work.RollBack)
        }
        metadata := workFlow.Metadata
        if err := work.Work(metadata); err != nil {
            metadata.Logger.Infof("app %s process %s do err,%+v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
            // 更新失敗狀態(tài)
            if err := work.FailCallBack(metadata); err != nil {
                metadata.Logger.Warnf("app %s process %s FailCallBack err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
                return
            }
            for _, rollBackWork := range rollBackWorks {
                if err := rollBackWork(workFlow.Metadata); err != nil {
                    metadata.Logger.Warnf("rollback %s process %s do err,%v", workFlow.Metadata.AppRecord.Name, work.WorkName, err)
                }
            }
            // 回滾完成即任務(wù)退出
            return
        }
    }
}

在一些錯(cuò)誤場(chǎng)景,子任務(wù)需要重試,工作流框架提供重試子任務(wù)的功能可以在編排時(shí)直接配置

func WorkforTimeout(workName string, period int64, timeOut int64, work WorkFlowFn) WorkFlowFn {
    return func(app *v1.WorkflowAppParams) error {
        for timeOut > 0 {
            if err := work(app); err != nil {
                if !errors.Is(err, ErrNeedRetry) {
                    log.Errorf("%s err, %v, break work", workName, err)
                    return err
                }
                log.Infof("%s err, %v, continue work", workName, err)
                time.Sleep(time.Duration(period) * time.Second)
                timeOut--
                continue
            }
            return nil
        }
        app.AppRecord.Reason = fmt.Sprintf("%s until timeout: [%s]", workName, app.AppRecord.Reason)
        return errors.Errorf("%s until timeout", workName)
    }
}

var ErrNeedRetry = errors.New("need retry")

業(yè)務(wù)代碼

每個(gè)子任務(wù)流程中只需更新上下文模型的狀態(tài),可以通過(guò)配置FailCallBack更新失敗的狀態(tài),也可以也通過(guò)配置SuccsessCallBack更新任務(wù)的執(zhí)行進(jìn)展,若是需要在后面的子任務(wù)失敗時(shí)被回滾(一些資源釋放的場(chǎng)景)則可以配置RollBack(SuccsessCallBack和RollBack下面的流程沒(méi)有舉例)

const (
    WORK_CREATE_CREATEINSTANCE       = "create app instance"
    WORK_CREATE_UPDATECREATESTATUS   = "update creating status"
    WORK_CREATE_CHECKAPPCREATESTATUS = "check app create status"
    WORK_CREATE_UPDATERUNNIGSTATUS   = "update running status"
)

func (ac *AppController) CreateApp(params *v1.WorkflowAppParams) (*appv1.CreateAppResponse, error) {
    createAppWorkFlow := AppWorkFlow{
        Metadata: params,
        Works: []AppWork{
            {
                WorkName:     WORK_CREATE_CREATEINSTANCE,
                Work:         ac.createAppInstance,
                FailCallBack: ac.updateAppStatus,
            },
            {
                WorkName: WORK_CREATE_UPDATECREATESTATUS,
                Work:     ac.updateAppStatus,
            },
            {
                WorkName:     WORK_CREATE_CHECKAPPCREATESTATUS,
                Work:         WorkforTimeout(WORK_CREATE_CHECKAPPCREATESTATUS, 3, 5*60, ac.checkAppStatus), 
                FailCallBack: ac.updateAppStatus,
            },
            {
                WorkName: WORK_CREATE_UPDATERUNNIGSTATUS,
                Work:     ac.updateAppStatus,
            },
        },
    }
    go createAppWorkFlow.Start()
    return &appv1.CreateAppResponse{
        Status: 200,
        Data:   params.AppRecord.Id,
    }, nil
}

如下流程即執(zhí)行子任務(wù)的過(guò)程,錯(cuò)誤時(shí)返回err即可,會(huì)在defer中通過(guò)metadata刷新狀態(tài),工作流的主流程檢查到子任務(wù)發(fā)生錯(cuò)誤會(huì)執(zhí)行FailCallBack或者RollBack刷新狀態(tài)或回滾

func (ac *AppController) createAppInstance(params *v1.WorkflowAppParams) (err error) {
    defer func() {
        if err != nil {
            params.AppRecord.Reason = err.Error()
            params.AppRecord.Status = v1.APP_STATUS_FAILED
        }
    }()
        // 如下流程即執(zhí)行子任務(wù)的過(guò)程
    logHeader := deepcopyMap(params.HttpHeader)
    logHeader["X-Access-Token"] = utils.MaskToken(logHeader["X-Access-Token"])
    var reqBody []byte
    var logBody []byte
    if params.AppRecord.ManageBy == "vcluster" {
        reqBody, err = json.Marshal(params.CreateAppReq)
        if err != nil {
            return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.CreateAppReq)
        }
        logBody = reqBody
    } else {
        formatReqBody := params.AppRequestData
        formatReqBody.Spec.Kubeconfig = utils.MaskToken(formatReqBody.Spec.Kubeconfig)
        logBody, _ = json.Marshal(formatReqBody)
        reqBody, err = json.Marshal(params.AppRequestData)
        if err != nil {
            return errors.Wrapf(err, "[Failed to marshal request data, params: %v]", params.AppRequestData)
        }
    }
    httpLogPrint := v1.HttpLogPrint{
        Url:    params.AppConfig.Url,
        Header: logHeader,
        Method: "POST",
        Body:   string(logBody),
    }
    params.Logger.Infof("http create app instance type %s, url %s", params.AppRecord.ManageBy, params.AppConfig.Url)
    resp, err := adapter.AppPostRestRequest(params.HttpHeader, params.AppConfig.Url, reqBody, httpLogPrint)
    if err != nil {
        return errors.Wrapf(err, "[Failed to create app: %s, err, %v]", params.AppRecord.Name, err)
    }
    if resp.Code != 0 {
        return errors.Errorf("create app: %s Http Post err, errCode: %d, msg: [%s]", params.AppRecord.Name, resp.Code, resp.Msg)
    }
    if resp.Data.AppId != "" {
        // 下層應(yīng)用選擇用自己的appId
        params.AppRecord.AppId = resp.Data.AppId
    }
    params.AppRecord.Status = v1.APP_STATUS_CREATING
    return nil
}

本示例中刷新狀態(tài)的函數(shù)作為FailCallBack,也作為子任務(wù)在上一個(gè)子任務(wù)成功時(shí)刷新狀態(tài)。
總之子任務(wù)可以靈活配置,子任務(wù)之間通過(guò)metadata作為上下文進(jìn)行狀態(tài)同步。

func (ac *AppController) updateAppStatus(params *v1.WorkflowAppParams) error {
    if params.AppRecord.Reason != "" {
        params.AppRecord.Reason = reasonDetail(params.AppRecord.Reason)
    }
    if err := ac.controller.AppRepo().Update(params.AppRecord); err != nil {
        return errors.Errorf("update app %s status %s err, %v", params.AppRecord.Name, params.AppRecord.Status, err)
    }
    return nil
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容