什么是Exactly-Once一致性語義
Apache Spark的Exactly-once機制
Apache Flink的Exactly-once機制
Exactly-Once一致性語義
當任意條數(shù)據(jù)流轉(zhuǎn)到某分布式系統(tǒng)中,如果系統(tǒng)在整個處理過程中對該任意條數(shù)據(jù)都僅精確處理一次,且處理結(jié)果正確,則被認為該系統(tǒng)滿足Exactly-Once一致性。
以上僅是我個人對Exactly-once一致性語義的解釋,相較于官方定義,顯得更加通俗點,主要方便大家的理解。正如我的解釋中描述的場景,在數(shù)據(jù)分析過程中需要滿足精確一次處理的條件,這對于很多分布式多系統(tǒng)來說其實是個很大的考驗。
因為分布式系統(tǒng)天生具有跨網(wǎng)絡、多節(jié)點、高并發(fā)、高可用等特性,難免會出現(xiàn)節(jié)點異常、線程死亡、網(wǎng)絡傳輸失敗、并發(fā)阻塞等非可控情況,從而導致數(shù)據(jù)丟失、重復發(fā)送、多次處理等異常接踵而至。如何保持系統(tǒng)高效運行且數(shù)據(jù)僅被精確處理一次是很大的挑戰(zhàn)。
分布式系統(tǒng)Exactly-Once的一致性保障,不是依靠某個環(huán)節(jié)的強一致性,而是要求系統(tǒng)的全流程均保持Exactly-Once一致性??!
Apache Spark的Exactly-Once機制
Apache Spark是一個高性能、內(nèi)存級的分布式計算框架,在大數(shù)據(jù)領域中被廣泛應用于離線分析、實時計算、數(shù)據(jù)挖掘等場景,因其采用獨特的RDD數(shù)據(jù)模型及內(nèi)存式計算,是海量數(shù)據(jù)分析和計算的利器之一。
實時場景下,Spark在整個流式處理中如何保證Exactly-Once一致性是重中之重。這需要整個系統(tǒng)的各環(huán)節(jié)均保持強一致性,包括可靠的數(shù)據(jù)源端(數(shù)據(jù)可重復讀取、不丟失) 、可靠的消費端(Spark內(nèi)部精確一次消費)、可靠的輸出端(冪等性、事務)。
1. 數(shù)據(jù)源端
** 支持可靠的數(shù)據(jù)源接入(例如Kafka), 源數(shù)據(jù)可重讀 **
- Spark Streaming內(nèi)置的Kafka Direct API (KafkaUtils.createDirectStream)。實現(xiàn)精確Exactly-Once一致性語義。
Spark Streaming 自己管理offset(手動提交offset),并保持到checkpoint中
Spark Streaming此時直接調(diào)用Kafka Consumer的API,自己管理維護offset(包括同步提交offset、保存checkpoint),所以即使在重啟情況下數(shù)據(jù)也不會重復。
val ssc = new StreamingContext(sc, Seconds(5))
// val topics = Map("spark" -> 2)
val kafkaParams = Map[String,String]{
"bootstrap.servers" -> "m1:9092,m2:9092,m3:9092",
"group.id" -> "spark",
"auto.offset.reset" -> "smallest"
}
// 直連方式拉取數(shù)據(jù),這種方式不會修改數(shù)據(jù)的偏移量,需要手動的更新
val lines = kafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams)
Driver進程保持與Kafka通信,定期獲取最新offset range范圍,Executor進程根據(jù)offset range拉取kafka消息。因為Kafka本身offset就具有唯一特性,且Spark Streaming此時作為唯一的消費者,故全過程保持Exactly-once的一致性狀態(tài)。注意: 如果程序崩潰,整個流可能會從earliest/latest處恢復重讀,需考慮其他后續(xù)處理
(Spark-Kafka Direct API 流程示意圖)
- Spark Streaming 基于Receiver的Kafka高級API,實現(xiàn)At least Once語義
基于Spark Streaming的Receiver模式,在Executor持續(xù)拉取kafka數(shù)據(jù)流
kafka數(shù)據(jù)存儲到Executor內(nèi)存和WAL(預寫日志)中
WAL(預先日志)寫入完成后,自動更新offset至zookeeper上
利用Spark本身的Receivers線程接收數(shù)據(jù),內(nèi)部調(diào)用Kafka高級消費API,不斷觸發(fā)batch消息拉取。獲取的kafka數(shù)據(jù)在Executor本地存儲,也可以啟用WAL預寫文件,將數(shù)據(jù)存儲到第三方介質(zhì)(HDFS)中。
val sparkConf = new SparkConf().setAppName("kafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")
val topicMap = topics.split(",").map((_, numThreamds, toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
此過程僅可實現(xiàn)At least once(至少一次)*,也就是說數(shù)據(jù)可能會被重復讀取。即使理論上WAL機制可確保數(shù)據(jù)不丟失, 但是會存在消息寫入WAL完成,但因其他原因無法及時更新offset至zookeeper的情況。此時kafka會重新發(fā)送offset,造成數(shù)據(jù)在Executor中多存儲一份。
(Spark-Kafka 高級消費***者API 流程***示意圖)
- 總結(jié)
(1) 高級消費者API需要啟用Receiver線程消費Kafka數(shù)據(jù),相較于第一種增加了開銷,且無法直接實現(xiàn)并行讀取,需要使用多個Kafka Dtstream 消費同一組然后union。
(2) 高級消費API在Executor本地和WAL存儲兩份數(shù)據(jù)<開啟WAL不丟失機制>,而第一種Direct API僅在Executor中存儲數(shù)據(jù)<offset存儲到checkpoint中>
(3) 基于Kafka Direct API的方式,因Spark集成Kafka API直接管理offset,同時依托于Kafka自身特性,實現(xiàn)了Exactly-Once一致性語義。因此在生產(chǎn)中建議使用此種方式??!
2. Spark消費端
Spark的基本數(shù)據(jù)單元是一種被稱作是RDD(分布式彈性數(shù)據(jù)集)的數(shù)據(jù)結(jié)構(gòu),Spark內(nèi)部程序通過對RDD的進行一系列的transform和action操作,完成數(shù)據(jù)的分析處理。
基于RDD內(nèi)存模型,啟用多種一致性策略,實現(xiàn)Exactly-Once一致性。
-
RDD特性
(1) Spark的RDD是分布式、容錯、不可變的數(shù)據(jù)集。其本身是只讀的,不存儲真實的數(shù)據(jù),當結(jié)構(gòu)更新或者丟失時可對RDD進行重建,RDD不會發(fā)生變化。
(2) 每個RDD都會有自己的Dependency RDD,即RDD的血脈機制。在開啟 Checkpoint機制下,可以將RDD依賴保存到HDFS中。當RDD丟失或者程序出現(xiàn)問題,可以快速從血緣關系中恢復。因為記錄了RDD的所有依賴過程,通過血脈追溯可重構(gòu)計算過程且保證多次計算結(jié)果相同。
-
Checkpoint持久化機制 + WAL機制
(1) Spark的Checkpoint機制會在當前job執(zhí)行完成后,再重新啟動一個job,將程序中需要Checkpoint的RDD標記為MarkedForCheckpoint RDD, 且重新執(zhí)行一遍RDD前面的依賴,完成后將結(jié)果保存到checkpoint中,并刪除原先Dependency RDD依賴的血緣關系。同時可以將此次Checkpoint結(jié)果持久化到緩存中,便于后期快速恢復。利用Checkpoint的特性和高可用存儲,保證RDD數(shù)據(jù)結(jié)果不丟失。
(2) 啟用WAL預寫文件機制。如果存在Driver或者Executor異常掛掉的場景,RDD結(jié)果或者jobs信息就會丟失,因此很有必要將此類信息持久化到WAL預寫日志中,通過對元數(shù)據(jù)和中間數(shù)據(jù)存儲備份,WAL機制可以防止數(shù)據(jù)丟失且提供數(shù)據(jù)恢復功能。
程序代碼去重
如果實時流進入到Spark消費端已經(jīng)存在重復數(shù)據(jù),可以編寫Spark程序代碼進行****去重操作,實現(xiàn)Exactly-Once一致性。
(1) 內(nèi)存去重。采用Hashset等數(shù)據(jù)結(jié)構(gòu),讀取數(shù)據(jù)中類似主鍵等唯一性標識字段,在內(nèi)存中存儲并進行去重判斷。
(2) 使用Redis Key去重。借助Redis的Hset等特殊數(shù)據(jù)類型,自動完成Key去重。
(3) DataFrame/SQL場景,使用group by/ over() window開窗等SQL函數(shù)去重
(4) 利用groupByKey等聚合算子去重
(5) 其他方法。。
3. 輸出端
輸出端保持Exactly-Once一致性,其輸出源需要滿足一定條件:
支持冪等寫入、事務寫入機制
- 冪等寫入
首先解釋一下冪等性,先看下百度百科上的定義:“ 冪等是一個數(shù)學與計算機學概念,常見于抽象代數(shù)中。在編程中一個冪等操作的特點******是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。*****”*
結(jié)合語義可知,冪等寫入就是多次寫入會產(chǎn)生相同的結(jié)果,結(jié)果具有不可變性。在Spark中saveAsTextFile算子就是一種比較典型的冪等寫入,也經(jīng)常被用來作為數(shù)據(jù)的輸出源。
此類型的寫入方式,如果在消息中包含唯一主鍵,那么即使源頭存在多條重復數(shù)據(jù),在主鍵約束條件下也不會重復寫入,從而實現(xiàn)Exactly-Once語義。
- 事務寫入
相信大家對事務的概念都不陌生,在一個處理過程中的所有操作均需要滿足一致性,即要不都發(fā)生,要不都不發(fā)生,常見于業(yè)務性、安全性要求比較高的場景,例如銀行卡賬戶金額存取行為等,具有原子性、一致性、隔離性、持久性等四大特征。Spark讀取Kafka數(shù)據(jù)需滿足輸出端的事務寫入,則一般需生成一個唯一ID(可由批次號、時間、分區(qū)、offset等組合),之后將該ID結(jié)合計算結(jié)果在同一個事務中寫入目標源,提交和寫入操作保持原子性,實現(xiàn)輸出端的Exactly-Once語義。