上一篇我們提到 Flink 基于 checkpoint 機(jī)制進(jìn)行容錯,保障了精確一次(exactly-once)語義。那么業(yè)界的其他分布式計(jì)算框架,如 spark streaming、storm 它們的容錯思路與 Flink 相同嗎?容錯到底是在做什么呢?本篇將從更高的角度了解容錯機(jī)制,深入理解容錯的本質(zhì)和核心思想。
#2 主流分布式實(shí)時計(jì)算框架是如何進(jìn)行容錯的?
目前業(yè)界主流的實(shí)時計(jì)算框架包括 Flink、Spark Streaming、Storm。相比于 Batch,Stream 的容錯則需要考慮更多。Batch 數(shù)據(jù)通?;诜€(wěn)定性較高的分布式存儲進(jìn)行數(shù)據(jù)的讀寫(如 HDFS、S3 等),當(dāng)數(shù)據(jù)計(jì)算出現(xiàn)異常時可以通過重新計(jì)算的方式保證最終結(jié)果的一致性,Spark 就是基于這樣的思路設(shè)計(jì)的,它通過 lineage 機(jī)制來重新計(jì)算。并且 Batch 計(jì)算往往不需要過多的考慮數(shù)據(jù)的時效性,而且不需要做到 7×24 小時的運(yùn)行。但相對于 Stream 而言則會更加復(fù)雜。
對于 Stream 而言需要面對不同的流式數(shù)據(jù)源,可以是 File Stream、隊(duì)列(如 Kafka),甚至可能是某個服務(wù)發(fā)來的消息。數(shù)據(jù)源的多樣性就注定了 Stream 的容錯需要重新進(jìn)行考慮。并且 Stream 數(shù)據(jù)的容錯需要在短時間內(nèi)進(jìn)行恢復(fù),否則將可能會導(dǎo)致大量的數(shù)據(jù)積壓甚至丟失,因?yàn)?Stream 數(shù)據(jù)鏈路不會因?yàn)橄掠翁幚砣蝿?wù)的異常而停止數(shù)據(jù)的產(chǎn)出。
讓我們將問題描述的更具體一些,方便更清楚的了解 Stream 的容錯思想。對于分布式計(jì)算而言,目前主流的思路都是采用 Master-Slave 架構(gòu)。Master 主要用于進(jìn)行 Slave 節(jié)點(diǎn)的管理(比如檢測 Slave 是否存活,狀態(tài)管理等),Slave 主要是擔(dān)當(dāng)數(shù)據(jù)計(jì)算的職責(zé)。因此,從容錯角度而言分為:
- Master 節(jié)點(diǎn)容錯(如 Flink JobManager、Spark Streaming Driver)
- Slave 節(jié)點(diǎn)容錯(如 Flink TaskManager、Spark Streaming Executor)
Master 節(jié)點(diǎn)容錯
Master 容錯相對而言較為簡單,因?yàn)椴恍枰苯訁⑴c數(shù)據(jù)計(jì)算。主要分為有狀態(tài)的 Master 和無狀態(tài)的 Master 兩類。
像 Storm 這類無狀態(tài)的實(shí)時計(jì)算框架,Master(即 Storm 的 Nimbus 節(jié)點(diǎn))的異常往往不影響 Slave(即 Storm worker 節(jié)點(diǎn))的數(shù)據(jù)計(jì)算,只需要重新啟動一個 Master 即可,這個過程中不需要進(jìn)行 Master 狀態(tài)的恢復(fù),也不會影響實(shí)時數(shù)據(jù)的處理。甚至 Slave 節(jié)點(diǎn)在無感知的情況下就完成了 Master 的恢復(fù)。但是這種方式會犧牲一定的功能,實(shí)時計(jì)算框架本身無法支持狀態(tài)流的處理。
像 Flink、Spark Streaming 這類包含狀態(tài)的實(shí)時計(jì)算框架,需要恢復(fù) Master 節(jié)點(diǎn)的同時還需要對其狀態(tài)進(jìn)行恢復(fù),Master 狀態(tài)信息包含一些必要的配置、以及對 Slave 節(jié)點(diǎn)狀態(tài)管理的信息(如“某個 Slave 節(jié)點(diǎn)的狀態(tài)快照所在的 HDFS 路徑”)。Spark Streaming、Flink 的做法都是基于 checkpoint 機(jī)制對 Master 節(jié)點(diǎn)的狀態(tài)進(jìn)行備份,異常發(fā)生時需要基于上一次的狀態(tài)備份進(jìn)行恢復(fù)。
Flink 還提供了 HA 機(jī)制,即同時運(yùn)行至少 2 個 JobManager 節(jié)點(diǎn),但是只有其中一個真正的處理管理事務(wù)(稱為主節(jié)點(diǎn)——Leader),其他的僅僅保持狀態(tài)信息的同步(稱為從節(jié)點(diǎn)——Standby)。一旦 Leader 發(fā)生異常,其中一個 Standby 將會代替異常節(jié)點(diǎn)繼續(xù)進(jìn)行任務(wù)的管理。更多關(guān)于 Flink HA 可以參考官方文檔。這種機(jī)制是犧牲更多的資源來換取任務(wù)的穩(wěn)定性,主從切換的成本相比于從狀態(tài)備份中恢復(fù)速度更快。
Slave 節(jié)點(diǎn)容錯
Stream 數(shù)據(jù)處理整體而言可以分為 3 部分:
- 數(shù)據(jù)接收:從數(shù)據(jù)源獲取數(shù)據(jù),如 Flink Source
- 數(shù)據(jù)處理:對數(shù)據(jù)進(jìn)行計(jì)算和處理,如 Flink Transform 算子
- 數(shù)據(jù)寫入:將數(shù)據(jù)寫入外部系統(tǒng)中,如 Flink Sink
根據(jù)不同的保障級別,Stream 數(shù)據(jù)容錯級別又分為 3 種語義:
- at most once:最多一次,數(shù)據(jù)最多被處理一次,即允許數(shù)據(jù)的丟失
- at least once:最少一次,數(shù)據(jù)最少被處理一次,該語義要比 at most once 保障級別更高,因?yàn)樗辉试S數(shù)據(jù)丟失,但是數(shù)據(jù)可能發(fā)生重復(fù)
- exactly once:精確一次,即數(shù)據(jù)僅被處理一次。該語義是最高級別的語義,它不允許數(shù)據(jù)的丟失,也不允許數(shù)據(jù)被重復(fù)的處理
我們之所以將數(shù)據(jù)接收和寫入單獨(dú)拿出來,是因?yàn)樵诿鎸Σ煌臄?shù)據(jù)源時,實(shí)時框架的容錯機(jī)制與最高語義保障級別是不同的。如 Flink 而言,它的 exactly-once 語義總的來說是針對于數(shù)據(jù)處理階段而言,即只有框架內(nèi)數(shù)據(jù)的處理可以保障 exactly-once,而數(shù)據(jù)的接收、寫入是否是 exactly-once 語義取決于數(shù)據(jù)源本身與 Source、Sink 算子的實(shí)現(xiàn)邏輯。通常來說,我們將能夠保障數(shù)據(jù)接收、數(shù)據(jù)處理、數(shù)據(jù)接入整體數(shù)據(jù)一致性稱為端到端(end-to-end)的數(shù)據(jù)一致性。
端到端的數(shù)據(jù)一致性保障相對而言是很復(fù)雜的,因?yàn)閿?shù)據(jù)源的種類眾多,這些一般都不是分布式實(shí)時框架中的一部分,數(shù)據(jù)的發(fā)送與接收邏輯不受實(shí)時框架的控制。
對于 Storm 而言,框架內(nèi)僅提供了相關(guān)的接口用于用戶自己實(shí)現(xiàn)一致性語義,并沒有直接提供各種存儲的一致性 Spouts,數(shù)據(jù)寫入也是如此。數(shù)據(jù)處理過程提供 at-least-once 語義保障(exactly-once 語義由 Storm Trident 提供了保障,本篇暫不做討論)。Storm 通過 ACK 的機(jī)制保證 at-least-once 語義。簡單來說,當(dāng) Storm 接收到一條數(shù)據(jù)后將會給這條數(shù)據(jù)唯一的 id,數(shù)據(jù)被下游 Bolts 處理但是處理后的 id 不會發(fā)生改變,當(dāng)且僅當(dāng)該 id 的數(shù)據(jù)經(jīng)過的 Bolts 全部 ACK 后才認(rèn)定該數(shù)據(jù)被徹底處理(fully processed),否則 Spout 將再次發(fā)送該數(shù)據(jù)直到數(shù)據(jù)被徹底處理。
Spark Streaming 的數(shù)據(jù)接收通過預(yù)寫入日志的機(jī)制保障了 at-least-once 語義。簡單來說,就是將接收到的數(shù)據(jù)以日志的形式寫入到穩(wěn)定的存儲中(存儲位置基于 checkpoint 配置獲?。?,這樣一來就與數(shù)據(jù)源解耦,可以基于預(yù)寫入日志實(shí)現(xiàn)數(shù)據(jù)重發(fā)的能力,從而保障 at-least-once 語義。數(shù)據(jù)處理過程中基于 RDD 的容錯機(jī)制進(jìn)行恢復(fù),提供了精確一次的語義。數(shù)據(jù)寫入需要用戶自己實(shí)現(xiàn),Spark Streaming 提供了兩種思路:冪等寫入和事務(wù)性寫入。
Flink 全局基于 checkpoint 進(jìn)行容錯,通過向流數(shù)據(jù)中插入特殊的事件——checkpoint barrier 來觸發(fā)各個算子制作狀態(tài)快照,快照會寫入到持久化的存儲中。Flink Source、Sink 的語義保障需要依賴數(shù)據(jù)源以及自身的實(shí)現(xiàn)邏輯。但是 Flink 提供了多種狀態(tài)接口,如 ListState、MapState,用于進(jìn)行算子狀態(tài)的記錄,狀態(tài)容錯可以保障 exactly-once 語義。這也是與 Spark Streaming 的不同之處。
容錯的本質(zhì)和核心思想
到這里我們大致了解了各個框架的容錯機(jī)制,我們不禁想回味一下:分布式實(shí)時計(jì)算框架的容錯機(jī)制的本質(zhì)是什么?容錯到底在保障什么?
從本質(zhì)上講,容錯在保障數(shù)據(jù)可以被正確的處理,即使在發(fā)生異常的情況下。實(shí)時流處理的正確性又體現(xiàn)在處理過程的完整性與時序的正確性。即一條數(shù)據(jù)要被所有的邏輯完整的處理一次(根據(jù)語義的不同也可能是多次),且多條數(shù)據(jù)之間的處理的時序不發(fā)生改變。
舉個例子,如下圖所示的數(shù)據(jù)流 DAG 圖中,流數(shù)據(jù)序列 [1, 2, 3, …, n] 被輸入到 A 中,然后最終流向 D。完整性即每一個事件都被完整的 DAG 路徑處理,即 A -> B -> D 或 A -> C -> D,時序性即事件 1 永遠(yuǎn)先于事件 2 被處理,即使在發(fā)生了異常后恢復(fù)的情況下也是如此。

從整體來看,實(shí)時分布式計(jì)算框架的容錯機(jī)制核心思想是確認(rèn)與重試,但是不同的框架重試過程中回滾的數(shù)據(jù)量不同。
Storm 通過 ACK 機(jī)制判斷數(shù)據(jù)是否完整處理,否則將重發(fā)數(shù)據(jù),重新進(jìn)行計(jì)算。這種單條數(shù)據(jù)粒度的 ACK 與重試機(jī)制即可以保障時序性,也可以保障處理過程的完整性。但是這樣細(xì)的粒度犧牲了一定的性能。Storm 并沒有將數(shù)據(jù)流進(jìn)行冗余存儲來保障容錯,從這個角度而言它的容錯是輕量級的。
Spark Streaming 通過微批次的方式將數(shù)據(jù)進(jìn)行截?cái)?,以批次為單位進(jìn)行容錯。這種方式一旦發(fā)生了異常,可以從上一個批次中恢復(fù)繼續(xù)執(zhí)行。這種機(jī)制從一定程度上提升了性能,但是對時效性有損。因?yàn)槲⑴蔚乃悸穼?shù)據(jù)流進(jìn)行了截?cái)?,時間語義上的單位時間也只能根據(jù)批次的大小來界定。Spark Streaming 提供了數(shù)據(jù)流的冗余(預(yù)寫入日志)可以真正做到與數(shù)據(jù)源解耦,對于所有的數(shù)據(jù)源均可以保障容錯的語義,但是這類的容錯是重量級的。
Flink 的思路也是對數(shù)據(jù)進(jìn)行截?cái)?,從而可以分段治之。相比?Spark Streaming 而言這種截?cái)嗖]有改變數(shù)據(jù)流的連續(xù)性,時間語義上的單位時間仍然是以事件粒度來界定。并且 Flink 不會對數(shù)據(jù)流進(jìn)行冗余(雖然 unaligned-checkpoint 會產(chǎn)生一部分的數(shù)據(jù)冗余,但是與 Spark Streaming 這種全部數(shù)據(jù)冗余的思路是不同的),只關(guān)注計(jì)算中的狀態(tài)容錯。這種思路較為輕量級,并且能夠保障 exactly-once 語義。但是這種思路無法應(yīng)對所有的數(shù)據(jù)源場景,需要強(qiáng)依賴數(shù)據(jù)源的實(shí)現(xiàn)與 Source、Sink 算子的邏輯。
總結(jié)
| 框架 | 截?cái)嗔6?/th> | 最高語義支持 | 端到端語義 |
|---|---|---|---|
| Storm | 單條數(shù)據(jù) | at-least-once | 依賴數(shù)據(jù)源和算子具體實(shí)現(xiàn) |
| Spark Streaming | 微批次數(shù)據(jù),影響數(shù)據(jù)連續(xù)性 | at-least-once | at-least-once |
| Flink | 微批次數(shù)據(jù),不影響數(shù)據(jù)連續(xù)性 | exactly-once | 依賴數(shù)據(jù)源和算子具體實(shí)現(xiàn) |
總體而言,實(shí)時流的容錯核心是基于數(shù)據(jù)截?cái)?/strong>和重試機(jī)制。Storm 的數(shù)據(jù)截?cái)嗔6仁菃螚l數(shù)據(jù)級別的,通過 ACK 的機(jī)制實(shí)現(xiàn)的重試機(jī)制,此截?cái)嗔6炔粫绊憯?shù)據(jù)的時效性。Spark Streaming 的截?cái)嗔6仁俏⑴蔚?,截?cái)鄷绊憯?shù)據(jù)的時效性,然后通過數(shù)據(jù)冗余的方式保障了重試機(jī)制,這種冗余數(shù)據(jù)的方式可以面對任何數(shù)據(jù)源時都能夠保證數(shù)據(jù)一致性。Flink 是基于 checkpoint barrier 將數(shù)據(jù)流截?cái)?,barrier 會隨著數(shù)據(jù)流而流動,避免了流量截?cái)鄮淼臅r效性影響,并且 Flink 容錯只關(guān)注狀態(tài),借助狀態(tài)的回滾來保證數(shù)據(jù)一致性。
從容錯實(shí)現(xiàn)來看,三種框架的側(cè)重點(diǎn)有所不同。Storm 作為無狀態(tài)計(jì)算框架,采用的是非常簡單有效的機(jī)制保障容錯。Spark Streaming 更注重?cái)?shù)據(jù)的可恢復(fù)性,希望通過備份原始數(shù)據(jù)能夠在任何情況下、面對任何數(shù)據(jù)源都能夠保證數(shù)據(jù)一致性。Flink 相對而言更加輕量,更注重?cái)?shù)據(jù)的時效性,不希望容錯機(jī)制帶來太多的時效性損失(例如 unaligned-checkpoint)。
感謝你讀到這里,希望你現(xiàn)在對 Flink 容錯機(jī)制和其他的實(shí)時計(jì)算框架的容錯機(jī)制有了一個基本的了解,對其容錯思路和本質(zhì)有了不同的想法。下一篇我們將討論 Flink checkpoint 的數(shù)據(jù)結(jié)構(gòu),探索它究竟是如何存儲的?都存儲了哪些內(nèi)容?基于這些備份數(shù)據(jù)如何在異常中恢復(fù)?