深入淺出Netflix Conductor使用

Netflix Conductor框架是典型的服務(wù)編排框架,通過Conductor還可以實(shí)現(xiàn)工作流和分布式調(diào)度,性能非常卓越。

關(guān)于Conductor的基本概念在 https://netflix.github.io/conductor/intro/ 文中已經(jīng)有深入介紹,本篇將以實(shí)戰(zhàn)案例為出發(fā)點(diǎn)深入介紹Conductor的使用。

一、Conductor的功能全景圖

image.png

在正式使用之前我們先來(lái)了解Conductor都有哪些功能,通過流程、任務(wù)、歷史、監(jiān)控、客戶端、通信和管理后臺(tái)幾個(gè)層面來(lái)做了功能歸類。

  • 流程
    流程引擎默認(rèn)是用DSL來(lái)編寫流程定義文件,這是一種JSON格式的文件,我們的工作流案例就是以這個(gè)定義文件為驅(qū)動(dòng)的,但是很可惜目前Conductor只支持手寫定義,無(wú)法通過界面生成,這塊就需要后面通過改造Conductor來(lái)增加相應(yīng)功能。
  • 任務(wù)
    這里面包括的主要是和任務(wù)相關(guān)的功能,通過這個(gè)功能可以進(jìn)行簡(jiǎn)單工作流的實(shí)現(xiàn),還可以進(jìn)行并行計(jì)算。
  • 歷史
    如果想要查看之前進(jìn)行過的(完成,失敗等終態(tài))歷史任務(wù),通過這個(gè)功能就可以實(shí)現(xiàn)。
  • 監(jiān)控
    當(dāng)工作流任務(wù)流程非常冗長(zhǎng)的時(shí)候,我們對(duì)每個(gè)節(jié)點(diǎn)的任務(wù)運(yùn)行情況并不了解,這時(shí)候就需要有一個(gè)任務(wù)監(jiān)控功能及時(shí)知道流程的狀態(tài)方便我們做出相應(yīng)決策。同時(shí)還有一個(gè)重要功能是任務(wù)調(diào)度,通過這個(gè)功能可以實(shí)現(xiàn)類似xxl-job的功能,滿足分布式定時(shí)調(diào)度的需求。
  • 客戶端和通信
    這二個(gè)功能本是一體的,既然Conductor是分布式的任務(wù)流程那么核心原理就是通過Server+Worker的方式,利用核心狀態(tài)機(jī)發(fā)消息的方式來(lái)驅(qū)動(dòng)客戶端的任務(wù)執(zhí)行,而Worker的實(shí)現(xiàn)是跨語(yǔ)言的,可以用JAVA、Python、go等語(yǔ)言實(shí)現(xiàn),而Worker需要長(zhǎng)輪詢Server端的狀態(tài)來(lái)判斷當(dāng)然是否有自己的任務(wù)來(lái)執(zhí)行。
  • 管理后臺(tái)
    通過管理后臺(tái)可以查看任務(wù)和工作流的元數(shù)據(jù)定義,工作流的執(zhí)行狀態(tài)等。

二、Conductor的架構(gòu)圖

image.png

其中:Task Queues使用Dyno-queues做任務(wù)延遲。

三、實(shí)戰(zhàn)案例

通過命令行將Netflix Conductor Sever端啟動(dòng)之后( https://netflix.github.io/conductor/intro/#installing-and-running 介紹了如何安裝Conductor),訪問localhost:8080地址顯示如下頁(yè)面:

image.png

這個(gè)頁(yè)面主要負(fù)責(zé)的是關(guān)于Conductor的任務(wù)、工作流的元數(shù)據(jù)管理,提供了很多http接口可供使用,如下圖所示:

image.png

我們可以直接調(diào)用默認(rèn)提供的接口頁(yè)面通過傳遞參數(shù)來(lái)進(jìn)行任務(wù)和工作流的定義,當(dāng)然也可以自己寫頁(yè)面調(diào)用相應(yīng)的URL來(lái)進(jìn)行。首先我們要先進(jìn)行任務(wù)文件的定義,如下圖所示:

image.png

在這個(gè)截圖中,我們定義了二個(gè)任務(wù),分別是leaderRatify和managerRatify,截圖中的原始定義文件如下:

[
 
{
 
  "name": "leaderRatify",
 
  "retryCount": 3,
 
  "timeoutSeconds": 1200,
 
  "inputKeys": [
 
    "staffName",
 
    "staffDepartment"
 
  ],
 
  "outputKeys": [
 
    "leaderAgree",
 
    "leaderDisagree"
 
  ],
 
  "timeoutPolicy": "TIME_OUT_WF",
 
  "retryLogic": "FIXED",
 
  "retryDelaySeconds": 600,
 
  "responseTimeoutSeconds": 3600
 
},
 
{
 
  "name": "managerRatify",
 
  "retryCount": 3,
 
  "timeoutSeconds": 1200,
 
  "inputKeys": [
 
    "managerName",
 
    "managerDeparment"
 
  ],
 
  "outputKeys": [
 
    "managerAgree",
 
    "managerDisagree"
 
  ],
 
  "timeoutPolicy": "TIME_OUT_WF",
 
  "retryLogic": "FIXED",
 
  "retryDelaySeconds": 600,
 
  "responseTimeoutSeconds": 3600
 
}
 
]

任務(wù)定義好之后,接下來(lái)需要通過任務(wù)建立工作流定義,如下圖所示:


image.png

工作流定義文件就是我們整個(gè)流程所走的路徑,將流程文件轉(zhuǎn)換成流程圖如下所示:

image.png

流程定義文件的原始文件內(nèi)容如下:

{
  "updateTime": 1540448903202,
  "name": "Leave process",
  "description": "a demo for workflow",
  "version": 1,
  "tasks": [
    {
      "name": "leaderRatify",
      "taskReferenceName": "node1",
      "inputParameters": {
        "staffName": "${workflow.input.staffName}",
        "staffDepartment": "${workflow.input.staffDepartment}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    },
    {
      "name": "managerRatify",
      "taskReferenceName": "node2",
      "inputParameters": {
        "managerName": "${node1.output.leaderName}",
        "managerDepartment": "${node1.output.leaderDepartment}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    }
  ],
  "outputParameters": {
    "leaderName": "${node1.output.leaderName}",
    "leaderDepartment": "${node1.output.leaderDepartment}",
    "managerAgree": "${node2.output.managerAgree}",
    "managerDisagree": "${node2.output.managerDisagree}"
  },
  "restartable": true,
  "schemaVersion": 2
}

上面的流程主要介紹了Task任務(wù)定義文件、工作流流程文件如何定義和上傳的,這二個(gè)文件主要是提供給Conductor的狀態(tài)機(jī)使用,而我們真正的任務(wù)Worker則需要自己寫java代碼來(lái)實(shí)現(xiàn),然后通過長(zhǎng)輪詢Conductor Server來(lái)獲取自己的狀態(tài)以及任務(wù)步驟,Worker代碼如下所示:

class LeaderRatifyWorker implements Worker {
    private String taskDefName;
    public SampleWorker(String taskDefName) {
        this.taskDefName = taskDefName;
    }
    @Override
    public String getTaskDefName() {
        return taskDefName;
    }
    @Override
    public TaskResult execute(Task task) {
        System.out.printf("Executing %s%n", taskDefName);
        System.out.println("staffName:" + task.getInputData().get("staffName"));
        System.out.println("staffDepartment:" + task.getInputData().get("staffDepartment"));
        TaskResult result = new TaskResult(task);
        result.setStatus(TaskResult.Status.COMPLETED);
        //Register the output of the task
        result.getOutputData().put("outputKey1", "value");
        result.getOutputData().put("oddEven", 1);
        result.getOutputData().put("mod", 4);
        result.getOutputData().put("leaderAgree", "yes");
        result.getOutputData().put("leaderDisagree", "no");
        return result;
    }
}
class ManagerRatifyWorker implements Worker {
    private String taskDefName;
    public SampleWorker2(String taskDefName) {
        this.taskDefName = taskDefName;
    }
    @Override
    public String getTaskDefName() {
        return taskDefName;
    }
    @Override
    public TaskResult execute(Task task) {
        System.out.printf("Executing %s\n", taskDefName);
        System.out.println("managerName:" + task.getInputData().get("managerName"));
        System.out.println("managerDepartment:" + task.getInputData().get("managerDepartment"));
        TaskResult result = new TaskResult(task);
        result.setStatus(TaskResult.Status.COMPLETED);
        //Register the output of the task
        result.getOutputData().put("managerAgree", String.valueOf(task.getInputData().get("managerName")));
        result.getOutputData().put("managerDisagree", String.valueOf(task.getInputData().get("managerDepartment")));
 
        return result;
    }
}
  
//在main方法中創(chuàng)建工作Worker以及設(shè)置需要訪問的Conductor Server端api地址,并將流程進(jìn)入初始化
 public static void main(String[] args) {
        TaskClient taskClient = new TaskClient();
        taskClient.setRootURI("http://localhost:8080/api/");       //Point this to the server API
        int threadCount = 2;         //number of threads used to execute workers.  To avoid starvation, should be same or more than number of workers
        Worker worker1 = new LeaderRatifyWorker("leaderRatify");
        Worker worker2 = new ManagerRatifyWorker("managerRatify");
        //Create WorkflowTaskCoordinator
        WorkflowTaskCoordinator.Builder builder = new WorkflowTaskCoordinator.Builder();
        WorkflowTaskCoordinator coordinator = builder.withWorkers(worker1, worker2).withThreadCount(threadCount).withTaskClient(taskClient).build();
        //Start for polling and execution of the tasks
        coordinator.init();
}

而后通過如下界面啟動(dòng)工作流,并傳入工作流輸入?yún)?shù):


image.png

當(dāng)流程執(zhí)行完以后,我們來(lái)訪問Conductor的Admin管理界面,通過localhost:5000端口訪問,看到如下圖所示界面:

image.png

選擇左邊菜單的All選項(xiàng),右側(cè)出現(xiàn)所有任務(wù)的列表:


image.png

可以看到目前所有工作流的狀態(tài)均已經(jīng)是執(zhí)行完畢,通過Status狀態(tài)通過看到每個(gè)工作流當(dāng)前的執(zhí)行狀態(tài),分別是Running、Completed、Timed out、Terminated等狀態(tài)。點(diǎn)擊右側(cè)Workflow列表中第一條workflowID顯示如下界面:


image.png

界面中的流程圖節(jié)點(diǎn)顯示為綠色,表示工作流正常的執(zhí)行完畢沒有報(bào)任何故障,而右上角紅框的Restart表示可以重啟工作流。

四、小結(jié)

通過使用Netflix Conductor后,我們首先來(lái)看一下Conductor到底能干什么:

  • 以藍(lán)圖為主,基于JSON DSL的藍(lán)圖定義了執(zhí)行流程;
  • 跟蹤和管理工作流;
  • 能夠暫停,恢復(fù)和重新啟動(dòng)流程;
  • 用戶界面可視化流程;
  • 能夠在需要時(shí)同步處理所有任務(wù);
  • 能夠擴(kuò)展到數(shù)百萬(wàn)個(gè)同時(shí)運(yùn)行的流程;
  • 由客戶抽象的排隊(duì)服務(wù)提供后端支持;
  • 能夠通過HTTP或其他傳輸操作,例如gRPC;

但如果要大規(guī)模使用還需要進(jìn)行一些定制化開發(fā)才能使框架的功效發(fā)揮到最大:

  • 流程定義文件需要自己手寫DSL,需要改造成通過流程設(shè)計(jì)器界面來(lái)生成。
  • 無(wú)人員和權(quán)限管理功能,需要改造增加。
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容