最近在做的工作比較需要一個支持任務編排工作流的框架或者平臺,這里記錄下實現(xiàn)上的一些思路。
任務編排工作流
任務編排是什么意思呢,顧名思義就是可以把"任務"這個原子單位按照自己的方式進行編排,任務之間可能互相依賴。復雜一點的編排之后就能形成一個 workflow 工作流了。我們希望這個工作流按照我們編排的方式去執(zhí)行每個原子 task 任務。如下圖所示,我們希望先并發(fā)運行 Task A 和 Task C,Task A 執(zhí)行完后串行運行 Task B,在并發(fā)等待 Task B 和 C 都結束后運行 Task D,這樣就完成了一個典型的任務編排工作流。

DAG 有向無環(huán)圖
首先我們了解圖這個數(shù)據(jù)結構,每個元素稱為頂點 vertex,頂點之間的連線稱為邊 edge。像我們畫的這種帶箭頭關系的稱為有向圖,箭頭關系之間能形成一個環(huán)的成為有環(huán)圖,反之稱為無環(huán)圖。顯然運用在我們任務編排工作流上,最合適的是 DAG 有向無環(huán)圖。
我們在代碼里怎么存儲圖呢,有兩種數(shù)據(jù)結構:鄰接矩陣和鄰接表。
下圖表示一個有向圖的鄰接矩陣,例如 x->y 的邊,只需將 Array[x][y]標識為 1 即可。

此外我們也可以使用鄰接表來存儲,這種存儲方式較好地彌補了鄰接矩陣浪費空間的缺點,但相對來說鄰接矩陣能更快地判斷連通性。

一般在代碼實現(xiàn)上,我們會選擇鄰接矩陣,這樣我們在判斷兩點之間是否有邊更方便點。
一個任務編排框架
了解了 DAG 的基本知識后我們可以來簡單實現(xiàn)一下。首先是存儲結構,我們的 Dag 表示一整個圖,Node 表示各個頂點,每個頂點有其 parents 和 children:
//Dag
public final class DefaultDag<T, R> implements Dag<T, R> {
private Map<T, Node<T, R>> nodes = new HashMap<T, Node<T, R>>();
...
}
//Node
public final class Node<T, R> {
/**
* incoming dependencies for this node
*/
private Set<Node<T, R>> parents = new LinkedHashSet<Node<T, R>>();
/**
* outgoing dependencies for this node
*/
private Set<Node<T, R>> children = new LinkedHashSet<Node<T, R>>();
...
}
畫兩個頂點,以及為這兩個頂點連邊操作如下:
public void addDependency(final T evalFirstNode, final T evalLaterNode) {
Node<T, R> firstNode = createNode(evalFirstNode);
Node<T, R> afterNode = createNode(evalLaterNode);
addEdges(firstNode, afterNode);
}
private Node<T, R> createNode(final T value) {
Node<T, R> node = new Node<T, R>(value);
return node;
}
private void addEdges(final Node<T, R> firstNode, final Node<T, R> afterNode) {
if (!firstNode.equals(afterNode)) {
firstNode.getChildren().add(afterNode);
afterNode.getParents().add(firstNode);
}
}
到現(xiàn)在我們其實已經(jīng)把基礎數(shù)據(jù)結構寫好了,但我們作為一個任務編排框架最終是需要線程去執(zhí)行的,我們把它和線程池一起給包裝一下。
//任務編排線程池
public class DefaultDexecutor <T, R> {
//執(zhí)行線程,和2種重試線程
private final ExecutorService<T, R> executionEngine;
private final ExecutorService immediatelyRetryExecutor;
private final ScheduledExecutorService scheduledRetryExecutor;
//執(zhí)行狀態(tài)
private final ExecutorState<T, R> state;
...
}
//執(zhí)行狀態(tài)
public class DefaultExecutorState<T, R> {
//底層圖數(shù)據(jù)結構
private final Dag<T, R> graph;
//已完成
private final Collection<Node<T, R>> processedNodes;
//未完成
private final Collection<Node<T, R>> unProcessedNodes;
//錯誤task
private final Collection<ExecutionResult<T, R>> erroredTasks;
//執(zhí)行結果
private final Collection<ExecutionResult<T, R>> executionResults;
}
可以看到我們的線程包括執(zhí)行線程池,2 種重試線程池。我們使用 ExecutorState 來保存一些整個任務工作流執(zhí)行過程中的一些狀態(tài)記錄,包括已完成和未完成的 task,每個 task 執(zhí)行的結果等。同時它也依賴我們底層的圖數(shù)據(jù)結構 DAG。
接下來我們要做的事其實很簡單,就是 BFS 這整個 DAG 數(shù)據(jù)結構,然后提交到線程池中去執(zhí)行就可以了,過程中注意一些節(jié)點狀態(tài)的保持,結果的保存即可。

還是以上圖為例,值得說的一點是在 Task D 這個點需要有一個并發(fā)等待的操作,即 Task D 需要依賴 Task B 和 Task C 執(zhí)行結束后再往下執(zhí)行。這里有很多辦法,我選擇了共享變量的方式來完成并發(fā)等待。遍歷工作流中被遞歸的方法的偽代碼如下:
private void doProcessNodes(final Set<Node<T, R>> nodes) {
for (Node<T, R> node : nodes) {
//共享變量 并發(fā)等待
if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents())) {
Task<T, R> task = newTask(node);
this.executionEngine.submit(task);
...
ExecutionResult<T, R> executionResult = this.executionEngine.processResult();
if (executionResult.isSuccess()) {
state.markProcessingDone(processedNode);
}
//繼續(xù)執(zhí)行孩子節(jié)點
doExecute(processedNode.getChildren());
...
}
}
}
這樣我們基本完成了這個任務編排框架的工作,現(xiàn)在我們可以如下來進行示例圖中的任務編排以及執(zhí)行:
DefaultExecutor<String, String> executor = newTaskExecutor();
executor.addDependency("A", "B");
executor.addDependency("B", "D");
executor.addDependency("C", "D");
executor.execute();
任務編排平臺化
好了現(xiàn)在我們已經(jīng)有一款任務編排框架了,但很多時候我們想要可視化、平臺化,讓使用者更加無腦。
框架與平臺最大的區(qū)別在哪里?是可拖拽的可視化輸入么?我覺得這個的復雜度更多在前端。而對于后端平臺來講,與框架最大的區(qū)別是數(shù)據(jù)的持久化。
對于 DAG 的頂點來說,我們需要將每個節(jié)點 Task 的信息給持久化到關系數(shù)據(jù)庫中,包括 Task 的狀態(tài)、輸出結果等。而對于 DAG 的邊來說,我們也得用數(shù)據(jù)庫來存儲各 Task 之間的方向關系。此外,在遍歷執(zhí)行 DAG 的整個過程中的中間狀態(tài)數(shù)據(jù),我們也得搬運到數(shù)據(jù)庫中。
首先我們可以設計一個 workflow 表,來表示一個工作流。接著我們設計一個 task 表,來表示一個執(zhí)行單元。task 表主要字段如下,這里主要是 task_parents 的設計,它是一個 string,存儲 parents 的 taskId,多個由分隔符分隔。
task_id
workflow_id
task_name
task_status
result
task_parents

依賴是上圖這個例子,對比框架來說,我們首先得將其存儲到數(shù)據(jù)庫中去,最終可能得到如下數(shù)據(jù):
task_id workflow_id task_name task_status result task_parents
1 1 A 0 -1
2 1 B 0 1
3 1 C 0 -1
4 1 D 0 2,3
可以看到,這樣也能很好地存儲 DAG 數(shù)據(jù),和框架中代碼的輸入方式差別并不是很大。
接下來我們要做的是遍歷執(zhí)行整個 workflow,這邊和框架的差別也不大。首先我們可以利用select * from task where workflow_id = 1 and task_parents = -1來獲取初始化節(jié)點 Task A 和 Task C,將其提交到我們的線程池中。
接著對應框架代碼中的doExecute(processedNode.getChildren());,我們使用select * from task where task_parents like %3%,就可以得到 Task C 的孩子節(jié)點 Task D,這里使用了模糊查詢是因為我們的 task_parents 可能是由多個父親的 taskId 與分隔號組合而成的字符串。查詢到孩子節(jié)點后,繼續(xù)提交到線程池即可。
別忘了我們在 Task D 這邊還有一個并發(fā)等待的操作,對應框架代碼中的if (!processedNodes.contains(node) && processedNodes.containsAll(node.getParents()))。這邊我們只要判斷select count(1) from task where task_id in (2,3) and status != 1的個數(shù)為 0 即可,即保證 parents task 全部成功。
另外值得注意的是 task 的重試。在框架中,失敗 task 的重試可以是立即使用當前線程重試或者放到一個定時線程池中去重試。而在平臺上,我們的重試基本上來自于用戶在界面上的點擊,即主線程。
至此,我們已經(jīng)將任務編排框架的功能基本平臺化了。作為一個任務編排平臺,可拖拽編排的可視化輸入、整個工作流狀態(tài)的可視化展示、任務的可人工重試都是其優(yōu)點。