Spark——Exactly-Once

  • 什么是Exactly-Once一致性語義

  • Apache Spark的Exactly-once機制

  • Apache Flink的Exactly-once機制

Exactly-Once一致性語義

當任意條數(shù)據(jù)流轉(zhuǎn)到某分布式系統(tǒng)中,如果系統(tǒng)在整個處理過程中對該任意條數(shù)據(jù)都僅精確處理一次,且處理結(jié)果正確,則被認為該系統(tǒng)滿足Exactly-Once一致性。

以上僅是我個人對Exactly-once一致性語義的解釋,相較于官方定義,顯得更加通俗點,主要方便大家的理解。正如我的解釋中描述的場景,在數(shù)據(jù)分析過程中需要滿足精確一次處理的條件,這對于很多分布式多系統(tǒng)來說其實是個很大的考驗。

因為分布式系統(tǒng)天生具有跨網(wǎng)絡、多節(jié)點、高并發(fā)、高可用等特性,難免會出現(xiàn)節(jié)點異常、線程死亡、網(wǎng)絡傳輸失敗、并發(fā)阻塞等非可控情況,從而導致數(shù)據(jù)丟失、重復發(fā)送、多次處理等異常接踵而至。如何保持系統(tǒng)高效運行且數(shù)據(jù)僅被精確處理一次是很大的挑戰(zhàn)。

分布式系統(tǒng)Exactly-Once的一致性保障,不是依靠某個環(huán)節(jié)的強一致性,而是要求系統(tǒng)的全流程均保持Exactly-Once一致性??!

Apache Spark的Exactly-Once機制

Apache Spark是一個高性能、內(nèi)存級的分布式計算框架,在大數(shù)據(jù)領域中被廣泛應用于離線分析、實時計算、數(shù)據(jù)挖掘等場景,因其采用獨特的RDD數(shù)據(jù)模型及內(nèi)存式計算,是海量數(shù)據(jù)分析和計算的利器之一。

實時場景下,Spark在整個流式處理中如何保證Exactly-Once一致性是重中之重。這需要整個系統(tǒng)的各環(huán)節(jié)均保持強一致性,包括可靠的數(shù)據(jù)源端(數(shù)據(jù)可重復讀取、不丟失) 、可靠的消費端(Spark內(nèi)部精確一次消費)、可靠的輸出端(冪等性、事務)。

1. 數(shù)據(jù)源端

** 支持可靠的數(shù)據(jù)源接入(例如Kafka), 源數(shù)據(jù)可重讀 **

  • Spark Streaming內(nèi)置的Kafka Direct API (KafkaUtils.createDirectStream)。實現(xiàn)精確Exactly-Once一致性語義。
Spark Streaming 自己管理offset(手動提交offset),并保持到checkpoint中

Spark Streaming此時直接調(diào)用Kafka Consumer的API,自己管理維護offset(包括同步提交offset、保存checkpoint),所以即使在重啟情況下數(shù)據(jù)也不會重復。

val ssc = new StreamingContext(sc, Seconds(5))

// val topics = Map("spark" -> 2)

val kafkaParams = Map[String,String]{
  "bootstrap.servers" -> "m1:9092,m2:9092,m3:9092",
  "group.id" -> "spark",
  "auto.offset.reset" -> "smallest"
}
// 直連方式拉取數(shù)據(jù),這種方式不會修改數(shù)據(jù)的偏移量,需要手動的更新
val lines = kafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams)

Driver進程保持與Kafka通信,定期獲取最新offset range范圍,Executor進程根據(jù)offset range拉取kafka消息。因為Kafka本身offset就具有唯一特性,且Spark Streaming此時作為唯一的消費者,故全過程保持Exactly-once的一致性狀態(tài)。注意: 如果程序崩潰,整個流可能會從earliest/latest處恢復重讀,需考慮其他后續(xù)處理

圖片
                                       (Spark-Kafka Direct API 流程示意圖)
  • Spark Streaming 基于Receiver的Kafka高級API,實現(xiàn)At least Once語義
基于Spark Streaming的Receiver模式,在Executor持續(xù)拉取kafka數(shù)據(jù)流
kafka數(shù)據(jù)存儲到Executor內(nèi)存和WAL(預寫日志)中
WAL(預先日志)寫入完成后,自動更新offset至zookeeper上

利用Spark本身的Receivers線程接收數(shù)據(jù),內(nèi)部調(diào)用Kafka高級消費API,不斷觸發(fā)batch消息拉取。獲取的kafka數(shù)據(jù)在Executor本地存儲,也可以啟用WAL預寫文件,將數(shù)據(jù)存儲到第三方介質(zhì)(HDFS)中。

val sparkConf = new SparkConf().setAppName("kafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

val topicMap = topics.split(",").map((_, numThreamds, toInt)).toMap
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

此過程僅可實現(xiàn)At least once(至少一次)*,也就是說數(shù)據(jù)可能會被重復讀取。即使理論上WAL機制可確保數(shù)據(jù)不丟失, 但是會存在消息寫入WAL完成,但因其他原因無法及時更新offset至zookeeper的情況。此時kafka會重新發(fā)送offset,造成數(shù)據(jù)在Executor中多存儲一份。

圖片
                                (Spark-Kafka 高級消費***者API 流程***示意圖)
  • 總結(jié)

(1) 高級消費者API需要啟用Receiver線程消費Kafka數(shù)據(jù),相較于第一種增加了開銷,且無法直接實現(xiàn)并行讀取,需要使用多個Kafka Dtstream 消費同一組然后union。

(2) 高級消費API在Executor本地和WAL存儲兩份數(shù)據(jù)<開啟WAL不丟失機制>,而第一種Direct API僅在Executor中存儲數(shù)據(jù)<offset存儲到checkpoint中>

(3) 基于Kafka Direct API的方式,因Spark集成Kafka API直接管理offset,同時依托于Kafka自身特性,實現(xiàn)了Exactly-Once一致性語義。因此在生產(chǎn)中建議使用此種方式??!

2. Spark消費端

Spark的基本數(shù)據(jù)單元是一種被稱作是RDD(分布式彈性數(shù)據(jù)集)的數(shù)據(jù)結(jié)構(gòu),Spark內(nèi)部程序通過對RDD的進行一系列的transform和action操作,完成數(shù)據(jù)的分析處理。

基于RDD內(nèi)存模型,啟用多種一致性策略,實現(xiàn)Exactly-Once一致性。

  • RDD特性

    (1) Spark的RDD是分布式、容錯、不可變的數(shù)據(jù)集。其本身是只讀的,不存儲真實的數(shù)據(jù),當結(jié)構(gòu)更新或者丟失時可對RDD進行重建,RDD不會發(fā)生變化。

圖片

(2) 每個RDD都會有自己的Dependency RDD,即RDD的血脈機制。在開啟 Checkpoint機制下,可以將RDD依賴保存到HDFS中。當RDD丟失或者程序出現(xiàn)問題,可以快速從血緣關系中恢復。因為記錄了RDD的所有依賴過程,通過血脈追溯可重構(gòu)計算過程且保證多次計算結(jié)果相同

圖片

  • Checkpoint持久化機制 + WAL機制

    (1) Spark的Checkpoint機制會在當前job執(zhí)行完成后,再重新啟動一個job,將程序中需要Checkpoint的RDD標記為MarkedForCheckpoint RDD, 且重新執(zhí)行一遍RDD前面的依賴,完成后將結(jié)果保存到checkpoint中,并刪除原先Dependency RDD依賴的血緣關系。同時可以將此次Checkpoint結(jié)果持久化到緩存中,便于后期快速恢復。利用Checkpoint的特性和高可用存儲,保證RDD數(shù)據(jù)結(jié)果不丟失。

    (2) 啟用WAL預寫文件機制。如果存在Driver或者Executor異常掛掉的場景,RDD結(jié)果或者jobs信息就會丟失,因此很有必要將此類信息持久化到WAL預寫日志中,通過對元數(shù)據(jù)和中間數(shù)據(jù)存儲備份,WAL機制可以防止數(shù)據(jù)丟失且提供數(shù)據(jù)恢復功能

  • 程序代碼去重

如果實時流進入到Spark消費端已經(jīng)存在重復數(shù)據(jù),可以編寫Spark程序代碼進行****去重操作,實現(xiàn)Exactly-Once一致性。
(1) 內(nèi)存去重。采用Hashset等數(shù)據(jù)結(jié)構(gòu),讀取數(shù)據(jù)中類似主鍵等唯一性標識字段,在內(nèi)存中存儲并進行去重判斷。
(2) 使用Redis Key去重。借助Redis的Hset等特殊數(shù)據(jù)類型,自動完成Key去重。
(3) DataFrame/SQL場景,使用group by/ over() window開窗等SQL函數(shù)去重
(4) 利用groupByKey等聚合算子去重
(5) 其他方法。。

3. 輸出端

 輸出端保持Exactly-Once一致性,其輸出源需要滿足一定條件:

支持冪等寫入、事務寫入機制

  • 冪等寫入

首先解釋一下冪等性,先看下百度百科上的定義: 冪等是一個數(shù)學與計算機學概念,常見于抽象代數(shù)中。在編程中一個冪等操作的特點******是其任意多次執(zhí)行所產(chǎn)生的影響均與一次執(zhí)行的影響相同。*****”*

圖片

結(jié)合語義可知,冪等寫入就是多次寫入會產(chǎn)生相同的結(jié)果,結(jié)果具有不可變性。在Spark中saveAsTextFile算子就是一種比較典型的冪等寫入,也經(jīng)常被用來作為數(shù)據(jù)的輸出源。

此類型的寫入方式,如果在消息中包含唯一主鍵,那么即使源頭存在多條重復數(shù)據(jù),在主鍵約束條件下也不會重復寫入,從而實現(xiàn)Exactly-Once語義。

  • 事務寫入

相信大家對事務的概念都不陌生,在一個處理過程中的所有操作均需要滿足一致性,即要不都發(fā)生,要不都不發(fā)生,常見于業(yè)務性、安全性要求比較高的場景,例如銀行卡賬戶金額存取行為等,具有原子性、一致性、隔離性、持久性等四大特征。Spark讀取Kafka數(shù)據(jù)需滿足輸出端的事務寫入,則一般需生成一個唯一ID(可由批次號、時間、分區(qū)、offset等組合),之后將該ID結(jié)合計算結(jié)果在同一個事務中寫入目標源,提交和寫入操作保持原子性,實現(xiàn)輸出端的Exactly-Once語義。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容