Flink Exactly-Once 投遞實(shí)現(xiàn)淺析

隨著近來(lái)越來(lái)越多的業(yè)務(wù)遷移到 Flink 上,對(duì) Flink 作業(yè)的準(zhǔn)確性要求也隨之進(jìn)一步提高,其中最為關(guān)鍵的是如何在不同業(yè)務(wù)場(chǎng)景下保證 exactly-once 的投遞語(yǔ)義。雖然不少實(shí)時(shí)系統(tǒng)(e.g. 實(shí)時(shí)計(jì)算/消息隊(duì)列)都宣稱支持 exactly-once,exactly-once 投遞似乎是一個(gè)已被解決的問(wèn)題,但是其實(shí)它們更多是針對(duì)內(nèi)部模塊之間的信息投遞,比如 Kafka 生產(chǎn)(producer 到 Kafka broker)和消費(fèi)(broker 到 consumer)的 exactly-once。而 Flink 作為實(shí)時(shí)計(jì)算引擎,在實(shí)際場(chǎng)景業(yè)務(wù)會(huì)涉及到很多不同組件,由于組件特性和定位的不同,F(xiàn)link 并不是對(duì)所有組件都支持 exactly-once(見(jiàn)[1]),而且不同組件實(shí)現(xiàn) exactly-once 的方法也有所差異,有些實(shí)現(xiàn)或許會(huì)帶來(lái)副作用或者用法上的局限性,因此深入了解 Flink exactly-once 的實(shí)現(xiàn)機(jī)制對(duì)于設(shè)計(jì)穩(wěn)定可靠的架構(gòu)有十分重要的意義。

下文將基于 Flink 詳細(xì)分析 exactly-once 的難點(diǎn)所在以及實(shí)現(xiàn)方案,而這些結(jié)論也可以推廣到其他實(shí)時(shí)系統(tǒng),特別是流式計(jì)算系統(tǒng)。

Exactly-Once 難點(diǎn)分析

由于在分布式系統(tǒng)的進(jìn)程間協(xié)調(diào)需要通過(guò)網(wǎng)絡(luò),而網(wǎng)絡(luò)情況在很多情況下是不可預(yù)知的,通常發(fā)送消息要考慮三種情況:正常返回、錯(cuò)誤返回和超時(shí),其中錯(cuò)誤返回又可以分為可重試錯(cuò)誤返回(e.g. 數(shù)據(jù)庫(kù)維護(hù)暫時(shí)不可用)和不可重試錯(cuò)誤返回(e.g. 認(rèn)證錯(cuò)誤),而可重試錯(cuò)誤返回和超時(shí)都會(huì)導(dǎo)致重發(fā)消息,導(dǎo)致下游可能接收到重復(fù)的消息,也就是 at-least-once 的投遞語(yǔ)義。而 exactly-once 是在 at-least-once 的基礎(chǔ)之上加上了可以識(shí)別出重發(fā)數(shù)據(jù)或者將消息包裝為為冪等操作的機(jī)制。

其實(shí)消息的 exactly-once 投遞并不是一個(gè)分布式系統(tǒng)產(chǎn)生的新課題(雖然它一般特指分布式領(lǐng)域的 exactly-once),早在計(jì)算網(wǎng)絡(luò)發(fā)展初期的 TCP 協(xié)議已經(jīng)實(shí)現(xiàn)了網(wǎng)絡(luò)的可靠傳輸。TCP 協(xié)議的 exactly-once 實(shí)現(xiàn)方式是將消息傳遞變?yōu)橛袪顟B(tài)的:首先同步建立連接,然后發(fā)送的每個(gè)數(shù)據(jù)包加上遞增的序列號(hào)(sequence number),發(fā)送完畢后再同步釋放連接。由于發(fā)送端和接受端都保存了狀態(tài)信息(已發(fā)送數(shù)據(jù)包的序列號(hào)/已接收數(shù)據(jù)包的序列號(hào)),它們可以知道哪些數(shù)據(jù)包是缺失或重復(fù)的。

而在分布式環(huán)境下 exactly-once 則更為復(fù)雜,最大的不同點(diǎn)在于分布式系統(tǒng)需要容忍進(jìn)程崩潰和節(jié)點(diǎn)丟失,這會(huì)帶來(lái)許多問(wèn)題,比如下面常見(jiàn)的幾個(gè):

進(jìn)程狀態(tài)需要持續(xù)化到可靠的分布式存儲(chǔ),以防止節(jié)點(diǎn)丟失帶來(lái)狀態(tài)的丟失。

由于發(fā)送消息是一個(gè)兩階段的操作(即發(fā)送消息和收到對(duì)方的確認(rèn)),重啟之后的進(jìn)程沒(méi)有辦法判斷崩潰前是否已經(jīng)使用當(dāng)前序列號(hào)發(fā)送過(guò)消息,因此可能會(huì)導(dǎo)致重復(fù)使用序列號(hào)的問(wèn)題。

被認(rèn)為崩潰的進(jìn)程有可能并沒(méi)有退出,隨后再次連上來(lái)變?yōu)?zombie 進(jìn)程繼續(xù)發(fā)送數(shù)據(jù)。

第2點(diǎn)和第3點(diǎn)其實(shí)是同一個(gè)問(wèn)題,即需要區(qū)分出原本進(jìn)程和重啟后的進(jìn)程。對(duì)此業(yè)界已經(jīng)有比較成熟的解決方案: 引入 epoch 表示進(jìn)程的不同世代并用分布式協(xié)調(diào)系統(tǒng)來(lái)負(fù)責(zé)管理。雖然還有一些衍生的細(xì)節(jié)問(wèn)題,但總體來(lái)說(shuō)問(wèn)題都不大。但是第1點(diǎn)問(wèn)題造成了一個(gè)比較深遠(yuǎn)的影響,即為了減低 IO 成本,狀態(tài)的保存必然是微批量(micro-batching)的而不是流式的,這會(huì)導(dǎo)致?tīng)顟B(tài)的保存總是落后于流計(jì)算進(jìn)度,因而為了保證 exactly-once 流計(jì)算引擎需要實(shí)現(xiàn)事務(wù)回滾。

狀態(tài) Exactly-Once 和端到端 Exactly-Once

Flink 提供 exactly-once 的狀態(tài)(state)投遞語(yǔ)義,這為有狀態(tài)的(stateful)計(jì)算提供了準(zhǔn)確性保證。其中比較容易令人混淆的一點(diǎn)是狀態(tài)投遞語(yǔ)義和更加常見(jiàn)的端到端(end to end)投遞語(yǔ)義,而實(shí)現(xiàn)前者是實(shí)現(xiàn)后者的前置條件。

Flink 從 0.9 版本開(kāi)始提供 State API,標(biāo)志著 Flink 進(jìn)入了 Stateful Streaming 的時(shí)代。State API 簡(jiǎn)單來(lái)說(shuō)是“不受進(jìn)程重啟影響的“數(shù)據(jù)結(jié)構(gòu),其命名規(guī)范也與常見(jiàn)的數(shù)據(jù)結(jié)構(gòu)一致,比如 MapState、ListState。Flink 官方提供的算子(比如 KafkaSource)和用戶開(kāi)發(fā)的算子都可以使用 State API 來(lái)保存狀態(tài)信息。和大多數(shù)分布式系統(tǒng)一樣 Flink 采用快照的方式來(lái)將整個(gè)作業(yè)的狀態(tài)定期同步到外部存儲(chǔ),也就是將 State API 保存的信息以序列化的形式存儲(chǔ),作業(yè)恢復(fù)的時(shí)候只要讀取外部存儲(chǔ)即可將作業(yè)恢復(fù)到先前某個(gè)時(shí)間點(diǎn)的狀態(tài)。由于從快照恢復(fù)同時(shí)會(huì)回滾數(shù)據(jù)流的處理進(jìn)度,所以 State 是天然的 exactly-once 投遞。

而端到端的一致性則需要上下游的外部系統(tǒng)配合,因?yàn)?Flink 無(wú)法將它們的狀態(tài)也保存到快照并獨(dú)立地回滾它們,否則就不叫作外部系統(tǒng)了。通常來(lái)說(shuō) Flink 的上游是可以重復(fù)讀取或者消費(fèi)的 pull-based 持續(xù)化存儲(chǔ),所以要實(shí)現(xiàn) source 端的 exactly-once 只需要回滾 source 的讀取進(jìn)度即可(e.g. Kafka 的 offset)。而 sink 端的 exactly-once 則比較復(fù)雜,因?yàn)?sink 是 push-based 的。所謂覆水難收,要撤回發(fā)出去的消息是并不是容易的事情,因?yàn)檫@要求下游根據(jù)消息作出的一系列反應(yīng)都是可撤回的。這就需要用 State API 來(lái)保存已發(fā)出消息的元數(shù)據(jù),記錄哪些數(shù)據(jù)是重啟后需要回滾的。

下面將分析 Flink 是如何實(shí)現(xiàn) exactly-once Sink 的。

Exactly-Once Sink 原理

Flink 的 exactly-once sink 均基于快照機(jī)制,按照實(shí)現(xiàn)原理可以分為冪等(Idempotent) sink 和事務(wù)性(Transactional) sink 兩種。

冪等 Sink

冪等性是分布式領(lǐng)域里十分有用的特性,它意味著相同的操作執(zhí)行一次和執(zhí)行多次可以獲得相同的結(jié)果,因此 at-least-once 自然等同于 exactly-once。如此一來(lái),在從快照恢復(fù)的時(shí)候冪等 sink 便不需要對(duì)外部系統(tǒng)撤回已發(fā)消息,相當(dāng)于回避了外部系統(tǒng)的狀態(tài)回滾問(wèn)題。比如寫入 KV 數(shù)據(jù)庫(kù)的 sink,由于插入一行的操作是冪等的,因此 sink 可以無(wú)狀態(tài)的,在錯(cuò)誤恢復(fù)時(shí)也不需要關(guān)心外部系統(tǒng)的狀態(tài)。從某種意義來(lái)講,上文提到的 TCP 協(xié)議也是利用了發(fā)送數(shù)據(jù)包冪等性來(lái)保證 exactly-once。

然而冪等 sink 的適用場(chǎng)景依賴于業(yè)務(wù)邏輯,如果下游業(yè)務(wù)本來(lái)就無(wú)法保證冪等性,這時(shí)就需要應(yīng)用事務(wù)性 sink。

事務(wù)性 Sink

事務(wù)性 sink 顧名思義類似于傳統(tǒng) DBMS 的事務(wù),將一系列(一般是一個(gè) checkpoint 內(nèi))的所有輸出包裝為一個(gè)邏輯單元,理想的情況下提供 ACID 的事務(wù)保證。之所以說(shuō)是“理想的情況下”,主要是因?yàn)?sink 依賴于目標(biāo)輸出系統(tǒng)的事務(wù)保證,而分布式系統(tǒng)對(duì)于事務(wù)的支持并不一定很完整,比如 HBase 就不支持跨行事務(wù),再比如 HDFS 等文件系統(tǒng)是不提供事務(wù)的,這種情況下 sink 只可以在客戶端的基礎(chǔ)上再包裝一層來(lái)盡最大努力地提供事務(wù)保證。

然而僅有下游系統(tǒng)本身提供的事務(wù)保證對(duì)于 exactly-once sink 來(lái)說(shuō)是不夠的,因?yàn)橥粋€(gè) sink 的子任務(wù)(subtask)會(huì)有多個(gè),對(duì)于下游系統(tǒng)來(lái)說(shuō)它們是處在不同會(huì)話和事務(wù)中的,并不能保證操作的原子性,因此 exactly-once sink 還需要實(shí)現(xiàn)分布式事務(wù)來(lái)達(dá)到所有 subtask 的一致 commit 或 rollback。由于 sink 事務(wù)生命周期是與 checkpoint 一一對(duì)應(yīng)的,或者說(shuō) checkpoint 本來(lái)就是實(shí)現(xiàn)作業(yè)狀態(tài)持久化的分布式事務(wù),sink 的分布式事務(wù)也理所當(dāng)然可以通過(guò) checkpoint 機(jī)制提供的 hook 來(lái)實(shí)現(xiàn)。

Checkpoint 提供給算子的 hook 有 CheckpointedFunction 和 CheckpointListener 兩個(gè),前者在算子進(jìn)行 checkpoint 快照時(shí)被調(diào)用,后者在 checkpoint 成功后調(diào)用。為了簡(jiǎn)單起見(jiàn) Flink 結(jié)合上述兩個(gè)接口抽象出 exactly-once sink 的通用邏輯抽象 TwoPhaseCommitSinkFunction 接口,從命名即可看出這是對(duì)兩階段提交協(xié)議的一個(gè)實(shí)現(xiàn),其主要方法如下:

beginTransaction: 初始化一個(gè)事務(wù)。在有新數(shù)據(jù)到達(dá)并且當(dāng)前事務(wù)為空時(shí)調(diào)用。

preCommit: 預(yù)提交數(shù)據(jù),即不再寫入當(dāng)前事務(wù)并準(zhǔn)好提交當(dāng)前事務(wù)。在 sink 算子進(jìn)行快照的時(shí)候調(diào)用。

commit: 正式提交數(shù)據(jù),將準(zhǔn)備好的事務(wù)提交。在作業(yè)的 checkpoint 完成時(shí)調(diào)用。

abort: 放棄事務(wù)。在作業(yè) checkpoint 失敗的時(shí)候調(diào)用。

下面以 Bucketing File Sink 作為例子來(lái)說(shuō)明如何基于異步 checkpoint 來(lái)實(shí)現(xiàn)事務(wù)性 sink。

Bucketing File Sink 是 Flink 提供的一個(gè) FileSystem Connector,用于將數(shù)據(jù)流寫到固定大小的文件里。Bucketing File Sink 將文件分為三種狀態(tài),in-progress/pending/committed,分別表示正在寫的文件、寫完準(zhǔn)備提交的文件和已經(jīng)提交的文件。


運(yùn)行時(shí),Bucketing File Sink 首先會(huì)打開(kāi)一個(gè)臨時(shí)文件并不斷地將收到的數(shù)據(jù)寫入(相當(dāng)于事務(wù)的 beginTransaction 步驟),這時(shí)文件處于 in-progress。直到這個(gè)文件因?yàn)榇笮〕^(guò)閾值或者一段時(shí)間內(nèi)沒(méi)有新數(shù)據(jù)寫入,這時(shí)文件關(guān)閉并變?yōu)?pending 狀態(tài)(相當(dāng)于事務(wù)的 pre-commit 步驟)。由于 Flink checkpoint 是異步的,可能有多個(gè)并發(fā)的 checkpoint,Bucketing File Sink 會(huì)記錄 pending 文件對(duì)應(yīng)的 checkpoint epoch,當(dāng)某個(gè) epoch 的 checkpoint 完成后,Bucketing File Sink 會(huì)收到 callback 并將對(duì)應(yīng)的文件改為 committed 狀態(tài)。這是通過(guò)原子操作重命名來(lái)完成的,因此可以保證 pre-commit 的事務(wù)要么 commit 成功要么 commit 失敗,不會(huì)出現(xiàn)其他中間狀態(tài)。

Commit 出現(xiàn)錯(cuò)誤會(huì)導(dǎo)致作業(yè)自動(dòng)重啟,重啟后 Bucketing File Sink 本身已被恢復(fù)為上次 checkpoint 時(shí)的狀態(tài),不過(guò)仍需要將文件系統(tǒng)的狀態(tài)也恢復(fù)以保證一致性。從 checkpoint 恢復(fù)后對(duì)應(yīng)的事務(wù)會(huì)再次重試 commit,它會(huì)將記錄的 pending 文件改為 committed 狀態(tài),記錄的 in-progress 文件 truncate 到 checkpoint 記錄下來(lái)的 offset,而其余未被記錄的 pending 文件和 in-progress 文件都將被刪除。

上面主要圍繞事務(wù)保證的 AC 兩點(diǎn)(Atomicity 和 Consistency),而在 I(Isolation)上 Flink exactly-once sink 也有不同的實(shí)現(xiàn)方式。實(shí)際上由于 Flink 的流計(jì)算特性,當(dāng)前事務(wù)的未 commit 數(shù)據(jù)是一直在積累的,根據(jù)緩存未 commit 數(shù)據(jù)的地方的不同,可以將事務(wù)性 sink 分為兩種實(shí)現(xiàn)方式。

在 sink 端緩存未 commit 數(shù)據(jù),等 checkpoint 完成以后將緩存的數(shù)據(jù) flush 到下游。這種方式可以提供 read-committed 的事務(wù)隔離級(jí)別,但同時(shí)由于未 commit 的數(shù)據(jù)不會(huì)發(fā)往下游(與 checkpoint 同步),sink 端緩存會(huì)帶來(lái)一定的延遲,相當(dāng)于退化為與 checkpoint 同步的 micro-batching 模式。

在下游系統(tǒng)緩存未 commit 數(shù)據(jù),等 checkpoint 完成后通知下游 commit。這樣的好處是數(shù)據(jù)是流式發(fā)往下游的,不會(huì)在每次 checkpoint 完成后出現(xiàn)網(wǎng)絡(luò) IO 的高峰,并且事務(wù)隔離級(jí)別可以由下游設(shè)置,下游可以選擇低延遲弱一致性的 read-uncommitted 或高延遲強(qiáng)一致性的 read-committed。

在 Bucketing File Sink 的例子中,處于 in-progress 和 pending 狀態(tài)的文件默認(rèn)情況下都是隱藏文件(在實(shí)踐中是使用下劃線作為文件名前綴,HDFS 的 FileInputFormat 會(huì)將其過(guò)濾掉),只有 commit 成功后文件才對(duì)用戶是可見(jiàn)的,即提供了 read-committed 的事務(wù)隔離性。理想的情況下 exactly-once sink 都應(yīng)該使用在下游系統(tǒng)緩存未 commit 數(shù)據(jù)的方式,因?yàn)檫@最為符合流式計(jì)算的理念。最為典型的是下游系統(tǒng)本來(lái)就支持事務(wù),那么未 commit 的數(shù)據(jù)很自然地就是緩存在下游系統(tǒng)的,否則 sink 可以選擇像上例的 Bucketing File Sink 一樣在下游系統(tǒng)的用戶層面實(shí)現(xiàn)自己的事務(wù),或者 fallback 到等待數(shù)據(jù)變?yōu)?committed 再發(fā)出的 micro-batching 模式。

總結(jié)

Exactly-once 是實(shí)時(shí)系統(tǒng)最為關(guān)鍵的準(zhǔn)確性要求,也是當(dāng)前限制大部分分布式實(shí)時(shí)系統(tǒng)應(yīng)用到準(zhǔn)確性要求更高的業(yè)務(wù)場(chǎng)景(比如在線事務(wù)處理 OLTP)的問(wèn)題之一。目前來(lái)說(shuō)流式計(jì)算的 exactly-once 在理論上已經(jīng)有了很大的突破,而 Flink 社區(qū)也在積極汲取最先進(jìn)的思想和實(shí)踐經(jīng)驗(yàn)。隨著 Flink 在 exactly-once 上的技術(shù)愈發(fā)成熟,結(jié)合 Flink 本身的流處理特性,相信在不遠(yuǎn)的將來(lái),除了構(gòu)造數(shù)據(jù)分析、數(shù)據(jù)管道應(yīng)用, Flink 也可以在微服務(wù)領(lǐng)域占有一席之地。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 2017年12月Apache Flink社區(qū)發(fā)布了1.4版本。該版本正式引入了一個(gè)里程碑式的功能:兩階段提交Sin...
    香山上的麻雀閱讀 2,224評(píng)論 0 3
  • 目錄 一、一致性定義 Flink通過(guò)插入barrier將流分為邏輯上的批,用來(lái)保存狀態(tài)。因此一個(gè)checkpoin...
    拓荒者001閱讀 2,904評(píng)論 0 1
  • 雨中獨(dú)行 2009-6-28 21:15 生活應(yīng)該怎樣享受? 就是想自己想想的, 做自己最想做的。
    夕陽(yáng)在山閱讀 175評(píng)論 0 0
  • 悶。 四下里一片岑寂。 我探出手,撫摸到身旁女人的豐腴,山巒,平原,森林,川淵,這具身體我很陌生,陌生到如同撫摸燥...
    小天羊星閱讀 355評(píng)論 0 1
  • #本文參加‘青春’大賽,本人保證本文為本人原創(chuàng),如有問(wèn)題則與主辦方無(wú)關(guān),自愿放棄評(píng)優(yōu)評(píng)獎(jiǎng)資格。 李鈺 187389...
    一perdu閱讀 357評(píng)論 0 3

友情鏈接更多精彩內(nèi)容