本篇我們先從理論的角度聊聊在Spark Streaming集成Kafka時的offset狀態(tài)如何管理。
- spark streaming 版本 2.1
- kafka 版本0.9.0.0
在這之前,先重述下spark streaming里面管理偏移量的策略,默認(rèn)的spark streaming它自帶管理的offset的方式是通過checkpoint來記錄每個批次的狀態(tài)持久化到HDFS中,如果機器發(fā)生故障,或者程序故障停止,下次啟動時候,仍然可以從checkpoint的目錄中讀取故障時候rdd的狀態(tài),便能接著上次處理的數(shù)據(jù)繼續(xù)處理,但checkpoint方式最大的弊端是如果代碼升級,新版本的jar不能復(fù)用舊版本的序列化狀態(tài),導(dǎo)致兩個版本不能平滑過渡,結(jié)果就是要么丟數(shù)據(jù),要么數(shù)據(jù)重復(fù),所以官網(wǎng)搞的這個東西,幾乎沒有人敢在生產(chǎn)環(huán)境運行非常重要的流式項目。
所以比較通用的解決辦法就是自己寫代碼管理spark streaming集成kafka時的offset,自己寫代碼管理offset,其實就是把每批次offset存儲到一個外部的存儲系統(tǒng)里面包括(Hbase,HDFS,Zookeeper,Kafka,DB等等),不管用什么存儲系統(tǒng), 都需要考慮到三種時刻的offset的狀態(tài),否則offset的狀態(tài)不完整,就可能導(dǎo)致一些bug出現(xiàn)。
場景一:
當(dāng)一個新的spark streaming+kafka的流式項目第一次啟動的時候,這個時候發(fā)現(xiàn)外部的存儲系統(tǒng)并沒有記錄任何有關(guān)這個topic所有分區(qū)的偏移量,所以就從 KafkaUtils.createDirectStream直接創(chuàng)建InputStream流,默認(rèn)是從最新的偏移量消費,如果是第一次其實最新和最舊的偏移量時相等的都是0,然后在以后的每個批次中都會把最新的offset給存儲到外部存儲系統(tǒng)中,不斷的做更新。
場景二:
當(dāng)流式項目停止后再次啟動,會首先從外部存儲系統(tǒng)讀取是否記錄的有偏移量,如果有的話,就讀取這個偏移量,然后把偏移量集合傳入到KafkaUtils.createDirectStream中進(jìn)行構(gòu)建InputSteam,這樣的話就可以接著上次停止后的偏移量繼續(xù)處理,然后每個批次中仍然的不斷更新外部存儲系統(tǒng)的偏移量,這樣以來就能夠無縫銜接了,無論是故障停止還是升級應(yīng)用,都是透明的處理。
場景三:
對正在運行的一個spark streaming+kafka的流式項目,我們在程序運行期間增加了kafka的分區(qū)個數(shù),請注意:這個時候新增的分區(qū)是不能被正在運行的流式項目感應(yīng)到的,如果想要程序能夠識別新增的分區(qū),那么spark streaming應(yīng)用程序必須得重啟,同時如果你還使用的是自己寫代碼管理的offset就千萬要注意,對已經(jīng)存儲的分區(qū)偏移量,也要把新增的分區(qū)插入進(jìn)去,否則你運行的程序仍然讀取的是原來的分區(qū)偏移量,這樣就會丟失一部分?jǐn)?shù)據(jù)。
總結(jié):
如果自己管理kafka的偏移量,一定要注意上面的三個場景,如果考慮不全,就有可能出現(xiàn)詭異的問題。