Netflix Conductor框架是典型的服務(wù)編排框架,通過Conductor還可以實現(xiàn)工作流和分布式調(diào)度,性能非常卓越。
關(guān)于Conductor的基本概念在 https://netflix.github.io/conductor/intro/ 文中已經(jīng)有深入介紹,本篇將以實戰(zhàn)案例為出發(fā)點深入介紹Conductor的使用。
一、Conductor的功能全景圖

在正式使用之前我們先來了解Conductor都有哪些功能,通過流程、任務(wù)、歷史、監(jiān)控、客戶端、通信和管理后臺幾個層面來做了功能歸類。
- 流程
流程引擎默認是用DSL來編寫流程定義文件,這是一種JSON格式的文件,我們的工作流案例就是以這個定義文件為驅(qū)動的,但是很可惜目前Conductor只支持手寫定義,無法通過界面生成,這塊就需要后面通過改造Conductor來增加相應功能。 - 任務(wù)
這里面包括的主要是和任務(wù)相關(guān)的功能,通過這個功能可以進行簡單工作流的實現(xiàn),還可以進行并行計算。 - 歷史
如果想要查看之前進行過的(完成,失敗等終態(tài))歷史任務(wù),通過這個功能就可以實現(xiàn)。 - 監(jiān)控
當工作流任務(wù)流程非常冗長的時候,我們對每個節(jié)點的任務(wù)運行情況并不了解,這時候就需要有一個任務(wù)監(jiān)控功能及時知道流程的狀態(tài)方便我們做出相應決策。同時還有一個重要功能是任務(wù)調(diào)度,通過這個功能可以實現(xiàn)類似xxl-job的功能,滿足分布式定時調(diào)度的需求。 - 客戶端和通信
這二個功能本是一體的,既然Conductor是分布式的任務(wù)流程那么核心原理就是通過Server+Worker的方式,利用核心狀態(tài)機發(fā)消息的方式來驅(qū)動客戶端的任務(wù)執(zhí)行,而Worker的實現(xiàn)是跨語言的,可以用JAVA、Python、go等語言實現(xiàn),而Worker需要長輪詢Server端的狀態(tài)來判斷當然是否有自己的任務(wù)來執(zhí)行。 - 管理后臺
通過管理后臺可以查看任務(wù)和工作流的元數(shù)據(jù)定義,工作流的執(zhí)行狀態(tài)等。
二、Conductor的架構(gòu)圖

其中:Task Queues使用Dyno-queues做任務(wù)延遲。
三、實戰(zhàn)案例
通過命令行將Netflix Conductor Sever端啟動之后( https://netflix.github.io/conductor/intro/#installing-and-running 介紹了如何安裝Conductor),訪問localhost:8080地址顯示如下頁面:

這個頁面主要負責的是關(guān)于Conductor的任務(wù)、工作流的元數(shù)據(jù)管理,提供了很多http接口可供使用,如下圖所示:

我們可以直接調(diào)用默認提供的接口頁面通過傳遞參數(shù)來進行任務(wù)和工作流的定義,當然也可以自己寫頁面調(diào)用相應的URL來進行。首先我們要先進行任務(wù)文件的定義,如下圖所示:

在這個截圖中,我們定義了二個任務(wù),分別是leaderRatify和managerRatify,截圖中的原始定義文件如下:
[
{
"name": "leaderRatify",
"retryCount": 3,
"timeoutSeconds": 1200,
"inputKeys": [
"staffName",
"staffDepartment"
],
"outputKeys": [
"leaderAgree",
"leaderDisagree"
],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 600,
"responseTimeoutSeconds": 3600
},
{
"name": "managerRatify",
"retryCount": 3,
"timeoutSeconds": 1200,
"inputKeys": [
"managerName",
"managerDeparment"
],
"outputKeys": [
"managerAgree",
"managerDisagree"
],
"timeoutPolicy": "TIME_OUT_WF",
"retryLogic": "FIXED",
"retryDelaySeconds": 600,
"responseTimeoutSeconds": 3600
}
]
任務(wù)定義好之后,接下來需要通過任務(wù)建立工作流定義,如下圖所示:

工作流定義文件就是我們整個流程所走的路徑,將流程文件轉(zhuǎn)換成流程圖如下所示:

流程定義文件的原始文件內(nèi)容如下:
{
"updateTime": 1540448903202,
"name": "Leave process",
"description": "a demo for workflow",
"version": 1,
"tasks": [
{
"name": "leaderRatify",
"taskReferenceName": "node1",
"inputParameters": {
"staffName": "${workflow.input.staffName}",
"staffDepartment": "${workflow.input.staffDepartment}"
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "managerRatify",
"taskReferenceName": "node2",
"inputParameters": {
"managerName": "${node1.output.leaderName}",
"managerDepartment": "${node1.output.leaderDepartment}"
},
"type": "SIMPLE",
"startDelay": 0
}
],
"outputParameters": {
"leaderName": "${node1.output.leaderName}",
"leaderDepartment": "${node1.output.leaderDepartment}",
"managerAgree": "${node2.output.managerAgree}",
"managerDisagree": "${node2.output.managerDisagree}"
},
"restartable": true,
"schemaVersion": 2
}
上面的流程主要介紹了Task任務(wù)定義文件、工作流流程文件如何定義和上傳的,這二個文件主要是提供給Conductor的狀態(tài)機使用,而我們真正的任務(wù)Worker則需要自己寫java代碼來實現(xiàn),然后通過長輪詢Conductor Server來獲取自己的狀態(tài)以及任務(wù)步驟,Worker代碼如下所示:
class LeaderRatifyWorker implements Worker {
private String taskDefName;
public SampleWorker(String taskDefName) {
this.taskDefName = taskDefName;
}
@Override
public String getTaskDefName() {
return taskDefName;
}
@Override
public TaskResult execute(Task task) {
System.out.printf("Executing %s%n", taskDefName);
System.out.println("staffName:" + task.getInputData().get("staffName"));
System.out.println("staffDepartment:" + task.getInputData().get("staffDepartment"));
TaskResult result = new TaskResult(task);
result.setStatus(TaskResult.Status.COMPLETED);
//Register the output of the task
result.getOutputData().put("outputKey1", "value");
result.getOutputData().put("oddEven", 1);
result.getOutputData().put("mod", 4);
result.getOutputData().put("leaderAgree", "yes");
result.getOutputData().put("leaderDisagree", "no");
return result;
}
}
class ManagerRatifyWorker implements Worker {
private String taskDefName;
public SampleWorker2(String taskDefName) {
this.taskDefName = taskDefName;
}
@Override
public String getTaskDefName() {
return taskDefName;
}
@Override
public TaskResult execute(Task task) {
System.out.printf("Executing %s\n", taskDefName);
System.out.println("managerName:" + task.getInputData().get("managerName"));
System.out.println("managerDepartment:" + task.getInputData().get("managerDepartment"));
TaskResult result = new TaskResult(task);
result.setStatus(TaskResult.Status.COMPLETED);
//Register the output of the task
result.getOutputData().put("managerAgree", String.valueOf(task.getInputData().get("managerName")));
result.getOutputData().put("managerDisagree", String.valueOf(task.getInputData().get("managerDepartment")));
return result;
}
}
//在main方法中創(chuàng)建工作Worker以及設(shè)置需要訪問的Conductor Server端api地址,并將流程進入初始化
public static void main(String[] args) {
TaskClient taskClient = new TaskClient();
taskClient.setRootURI("http://localhost:8080/api/"); //Point this to the server API
int threadCount = 2; //number of threads used to execute workers. To avoid starvation, should be same or more than number of workers
Worker worker1 = new LeaderRatifyWorker("leaderRatify");
Worker worker2 = new ManagerRatifyWorker("managerRatify");
//Create WorkflowTaskCoordinator
WorkflowTaskCoordinator.Builder builder = new WorkflowTaskCoordinator.Builder();
WorkflowTaskCoordinator coordinator = builder.withWorkers(worker1, worker2).withThreadCount(threadCount).withTaskClient(taskClient).build();
//Start for polling and execution of the tasks
coordinator.init();
}
而后通過如下界面啟動工作流,并傳入工作流輸入?yún)?shù):

當流程執(zhí)行完以后,我們來訪問Conductor的Admin管理界面,通過localhost:5000端口訪問,看到如下圖所示界面:

選擇左邊菜單的All選項,右側(cè)出現(xiàn)所有任務(wù)的列表:

可以看到目前所有工作流的狀態(tài)均已經(jīng)是執(zhí)行完畢,通過Status狀態(tài)通過看到每個工作流當前的執(zhí)行狀態(tài),分別是Running、Completed、Timed out、Terminated等狀態(tài)。點擊右側(cè)Workflow列表中第一條workflowID顯示如下界面:

界面中的流程圖節(jié)點顯示為綠色,表示工作流正常的執(zhí)行完畢沒有報任何故障,而右上角紅框的Restart表示可以重啟工作流。
四、小結(jié)
通過使用Netflix Conductor后,我們首先來看一下Conductor到底能干什么:
- 以藍圖為主,基于JSON DSL的藍圖定義了執(zhí)行流程;
- 跟蹤和管理工作流;
- 能夠暫停,恢復和重新啟動流程;
- 用戶界面可視化流程;
- 能夠在需要時同步處理所有任務(wù);
- 能夠擴展到數(shù)百萬個同時運行的流程;
- 由客戶抽象的排隊服務(wù)提供后端支持;
- 能夠通過HTTP或其他傳輸操作,例如gRPC;
但如果要大規(guī)模使用還需要進行一些定制化開發(fā)才能使框架的功效發(fā)揮到最大:
- 流程定義文件需要自己手寫DSL,需要改造成通過流程設(shè)計器界面來生成。
- 無人員和權(quán)限管理功能,需要改造增加。