問(wèn)題1:闡述 Flink 提供的三種數(shù)據(jù)處理語(yǔ)義,解釋 Checkpoint 機(jī)制如何保證 Flink 程序結(jié)果的 Exactly-Once 語(yǔ)義?
問(wèn)題2:結(jié)合 Kafka 分析,如何通過(guò)兩階段提交協(xié)議,提供端到端的 Exactly-Once 處理?
1.Flink 提供的三種數(shù)據(jù)處理語(yǔ)義
① At-Most-Once:最多一次,如果產(chǎn)生故障,可能丟失數(shù)據(jù)。
② At-Least-Once:最少一次,如果產(chǎn)生故障,可能有重復(fù)數(shù)據(jù)。
③ Exactly-Once:精確一次,如果產(chǎn)生故障,也能保證數(shù)據(jù)不丟失不重復(fù)。
// At-Least-Once 最多一次
CheckpointingMode.AT_LEAST_ONCE
// Exactly-Once 精確一次
CheckpointingMode.AT_LEAST_ONCE
flink 新版本已經(jīng)不提供 At-Most-Once 語(yǔ)義。
2.Checkpoint 機(jī)制保證 Exactly-Once 語(yǔ)義
結(jié)論:Checkpoint Barrier 對(duì)齊機(jī)制實(shí)現(xiàn) Exactly-Once 語(yǔ)義。如果 Barrier 不對(duì)齊,即 At Least Once 語(yǔ)義。
Flink 分布式異步快照的核心是 Checkpoint 機(jī)制,其關(guān)鍵是采用標(biāo)記信號(hào) Barrier,使得數(shù)據(jù)流被切分成微批,進(jìn)行 Checkpoint 保存狀態(tài)數(shù)據(jù),如下圖所示。
Checkpoint Barrier 對(duì)齊機(jī)制,如下圖所示。當(dāng) ExecutionGraph 物理執(zhí)行圖中的 subtask 算子實(shí)例接收到 Barrier 的時(shí)候,subtask 會(huì)記錄它的狀態(tài)數(shù)據(jù)。如果 subtask 有2個(gè)上游(例如 KeyedProcessFunction、CoProcessFunction等),subtask 會(huì)收齊上游的2個(gè) Barrier 后再觸發(fā) Checkpoint(即 Barrier 對(duì)齊)。
說(shuō)明:Barrier 不對(duì)齊可能導(dǎo)致有重復(fù)數(shù)據(jù)。作業(yè)算子的全部并行度均為1,即默認(rèn) Barrier 對(duì)齊。
3.兩階段提交協(xié)議保證端到端 Exactly-Once 語(yǔ)義
3.1 Flink 兩階段提交協(xié)議-2PC
(1)兩階段提交協(xié)議原理
分布式系統(tǒng)利用兩階段提交協(xié)議實(shí)現(xiàn)事務(wù)性,保證數(shù)據(jù)的一致性。兩階段提交分為:預(yù)提交階段 per-commit 與提交階段 commit。通常包含兩個(gè)角色:協(xié)調(diào)者和執(zhí)行者,協(xié)調(diào)者管理所有執(zhí)行者的操作,執(zhí)行者用于執(zhí)行具體的提交操作。
工作流程:
① 協(xié)調(diào)者向執(zhí)行者發(fā)送 pre-commit 命令;
② 執(zhí)行者執(zhí)行預(yù)提交后,向協(xié)調(diào)者發(fā)送 ack;
③ 協(xié)調(diào)者收到 ack 后,向執(zhí)行者發(fā)送 commit 命令;
④ 執(zhí)行者執(zhí)行提交操作;
說(shuō)明:如果有執(zhí)行者的預(yù)提交操作失敗,返回失敗給協(xié)調(diào)者,則協(xié)調(diào)者向所有執(zhí)行者發(fā)送 rollback,執(zhí)行回滾操作,保證數(shù)據(jù)一致性。
兩階段提交原理.JPG
兩階段提交 2PC 的缺點(diǎn):
①單點(diǎn)故障導(dǎo)致阻塞:執(zhí)行者向協(xié)調(diào)者發(fā)送 ack,然后協(xié)調(diào)者發(fā)送故障,導(dǎo)致執(zhí)行者會(huì)一直阻塞下去。
② 數(shù)據(jù)不一致:當(dāng)協(xié)調(diào)者向執(zhí)行者發(fā)送部分 commit 請(qǐng)求之后,發(fā)生故障導(dǎo)致只有一部分執(zhí)行者接受到了 commit 請(qǐng)求。
(2)Flink 的兩階段提交流程。
工作流程:
① JobMaster 即 CheckpointCoordinator 會(huì)定期向每個(gè) source task發(fā)送命令 start checkpoint(trigger checkpoint);
② 當(dāng) source task 收到 trigger checkpoint 指令后,產(chǎn)生 barrier 并通過(guò)廣播的方式發(fā)送到下游。source task 同時(shí)會(huì)執(zhí)行本地 checkpoint n,當(dāng) checkpoint n 完成后,向 JobMaster 發(fā)送 ack;
③ 當(dāng)流圖的所有節(jié)點(diǎn)都完成 checkpoint n,JobMaster 會(huì)收到所有節(jié)點(diǎn)的 ack,那么就表示完成 checkpoint n;說(shuō)明:checkpoint 機(jī)制的調(diào)用流程實(shí)質(zhì)是 2PC。JobMaster 是協(xié)調(diào)者,所有operator task 是執(zhí)行者。start checkpoint 是 pre-commit 的開(kāi)始信號(hào),而每個(gè) operator task 的 checkpoint 是 pre-commit 過(guò)程,ack 是執(zhí)行者 operator task 反饋給協(xié)調(diào)者 JobMaster ,最后 callback 是 commit。
Flink兩階段提交.JPG
3.2 Flink + Kafka 實(shí)現(xiàn)端到端 Exactly-Once 語(yǔ)義
結(jié)論:端到端 Exactly-Once = Flink 內(nèi)部 Exactly-Once + 第三方存儲(chǔ)支持事務(wù)(flink-connector 支持事務(wù))
3.2.1 分析 TwoPhaseCommitSinkFunction
Sink 輸出支持兩階段提交,其流程抽象為 TwoPhaseCommitSinkFunction。
public abstract class TwoPhaseCommitSinkFunction<IN, TXN, CONTEXT>
extends RichSinkFunction<IN>
implements CheckpointedFunction, CheckpointListener {
/**
* Method that starts a new transaction.
*
* @return newly created transaction.
*/
protected abstract TXN beginTransaction() throws Exception;
/**
* Pre commit previously created transaction. Pre commit must make all of the necessary steps to prepare the
* transaction for a commit that might happen in the future. After this point the transaction might still be
* aborted, but underlying implementation must ensure that commit calls on already pre committed transactions
* will always succeed.
*
* <p>Usually implementation involves flushing the data.
*/
protected abstract void preCommit(TXN transaction) throws Exception;
/**
* Commit a pre-committed transaction. If this method fail, Flink application will be
* restarted and {@link TwoPhaseCommitSinkFunction#recoverAndCommit(Object)} will be called again for the
* same transaction.
*/
protected abstract void commit(TXN transaction);
/**
* Abort a transaction.
*/
protected abstract void abort(TXN transaction);
}
具體流程:
① 初始化快照 initializeState 或者執(zhí)行快照 snapshotState 的時(shí)候,都會(huì)創(chuàng)建事務(wù) beginTransaction;
② 執(zhí)行快照 snapshotState 的時(shí)候,預(yù)提交事務(wù) preCommit;
③ 完成快照 notifyCheckpointComplete 的時(shí)候,提交事務(wù) commit;
④ 初始化快照 initializeState 或者 算子 Function 關(guān)閉(異常故障),丟棄事務(wù) abort;
3.2.2 Flink + Kafka 的兩階段提交流程
(1)Start Checkpoint
JobManager(CheckpointCoordinator)向所有 kafka source 節(jié)點(diǎn) trigger Checkpoint,即注入 checkpoint barrier。

(2)Pre-Commit
kafka source task 把 checkpoint barrier 廣播到下游,同時(shí)會(huì)執(zhí)行本地 checkpoint。以此此類(lèi)推,流圖的所有節(jié)點(diǎn)都收到 barrier 和執(zhí)行 checkpoint。最后當(dāng) kafka sink task 執(zhí)行checkpoint 的時(shí)候,向 kafka 預(yù)提交事務(wù) pre-commit。

(3)Commit
當(dāng)流圖的所有節(jié)點(diǎn)都完成 checkpoint,JobManager(CheckpointCoordinator)通知所有operator task,已經(jīng)完成本次 checkpoint。此時(shí) kafka sink task 向 kafka 提交事務(wù) commit。
