問題描述
我們最近有個(gè)項(xiàng)目,需要實(shí)時(shí)消費(fèi)訂單成單的消息,提煉完數(shù)據(jù)后把結(jié)果寫入HDFS,因此checkpointConfiguration 自然而然也采用默認(rèn)配置寫到HDFS中去(其實(shí)按照分布式集群模式的Spark,是必須配置到HDFS 兼容的目錄)
項(xiàng)目采用Spark Structured Streaming 2.4,啟用 window + watermark, 上游source 是Kafka,目前的watermark 是1分鐘,抓取消息并沒有采用triggerInterval,而是實(shí)時(shí)不斷抓取,因此在水位沒有到的時(shí)候一個(gè)Job 的平均duration 大概 1~3s,總共32個(gè)executors,也就是說期間將會(huì)有32個(gè)線程不斷去HDFS寫 checkpoint文件。
現(xiàn)在發(fā)現(xiàn)一個(gè)問題,就是:在平均1~3s 的作業(yè)當(dāng)中,時(shí)不時(shí)會(huì)有耗時(shí)20s+ 的作業(yè),查看結(jié)果并非是寫結(jié)果導(dǎo)致,而是會(huì)有個(gè)別的executor 寫checkpoint 到HDFS異常的慢,如下圖

當(dāng)我們開啟了hadoop包路徑的INFO日志級別時(shí),我們會(huì)看到類似的日志

從日志上很清洗能看到,卡頓源于一個(gè)executor 在往HDFS 的tmp 目錄寫checkpoint 文件
checkpoint 機(jī)制科普
我們都知道,Spark 2.x 的checkpoint 機(jī)制是Spark 在運(yùn)行過程中使用的一個(gè)中間狀態(tài)保存的機(jī)制,在啟用了諸如 window 功能時(shí),是必須開啟的,其歸根的實(shí)現(xiàn)是Spark StateStore;
它主要實(shí)現(xiàn)了2中很重要的功能
- 它需要保存每次RDD轉(zhuǎn)換時(shí)的中間狀態(tài)數(shù)據(jù),官方的名詞叫
增量式持續(xù)查詢的實(shí)現(xiàn)簡單描述就是一個(gè)流式處理,必須標(biāo)注每個(gè)executor當(dāng)前處理到哪里了?必須把這個(gè)中間的結(jié)果先保存了(根據(jù)不同的配置可能是一個(gè) 內(nèi)存+file 的組合) - 每個(gè)executor當(dāng)前完成作業(yè)的進(jìn)度,比如一個(gè)executor 掛了,它需要從這個(gè)地方來恢復(fù)

在這里我打算詳細(xì)介紹Spark 的StateStore了,關(guān)于它的設(shè)計(jì)機(jī)制和接口,我推薦大家閱讀這篇介紹
Structured Streaming 之狀態(tài)存儲(chǔ)解析
那好,我們現(xiàn)在只需要簡單知道,Spark 在處理流式程序的時(shí)候,它是需要有個(gè)地方來寫這個(gè)checkpoint 信息的,并且是每個(gè)executor并發(fā)來寫,那我們現(xiàn)在來研究一下它的文件長什么樣,我們回到Spark 的官方文檔,去看看checkpointing章節(jié)
http://spark.apache.org/docs/latest/streaming-programming-guide.html#checkpointing
從官方文檔介紹看,一類文件名叫 metadata,主要是記錄了一些 physical plan的一些信息,字面理解就是萬一executor 掛了,它重啟后需要從一個(gè)地方恢復(fù) driver需要它干些什么活,而另外一類就是 delta 文件,這就是上面說的每次job 的中間RDD 的中間結(jié)果數(shù)據(jù)。
問題分析
那我們可以直接去HDFS上看看我們寫的這些 文件

題外話這里再科普一下 snapshot 和delta的區(qū)別,spark會(huì)每個(gè)job每個(gè)executor都寫一個(gè)delta,而到一定個(gè)數(shù)個(gè)delta就會(huì)匯總成一個(gè)snapshot,恢復(fù)的時(shí)候會(huì)先找snapshot,再匯聚這個(gè)snapshot之后的delta,這樣來恢復(fù)一個(gè)現(xiàn)場
從這個(gè)圖上可以看到目前我們的程序的幾個(gè)特點(diǎn)
- 現(xiàn)在每個(gè)executor 寫的數(shù)據(jù)其實(shí)很小,最大不過幾十K,但是HDFS block 是128M,這樣32個(gè)線程算下來,小文件是挺多的
- 寫入比較頻繁,平均每3s就寫一次了
其實(shí)對于一個(gè)繁忙的大HDFS集群來說,這兩點(diǎn)都不是一個(gè)好事情
解決的一些思路
雖然我目前并無實(shí)質(zhì)解決這個(gè)抖動(dòng)問題,但是從分析來看我們會(huì)有一些思路,也在這里分享出來一起討論:
- 首先,我覺得這應(yīng)該不是寫入文件的瓶頸,因?yàn)榫退闳绾晤l繁,而且除了我們業(yè)務(wù),還有非常多的業(yè)務(wù)同樣有寫入,如果我們寫入都瓶頸那么整個(gè)集群早就到瓶頸了。那我先猜測卡頓應(yīng)該是發(fā)生在 name node 的RPC 調(diào)用,是否在同一個(gè) hdfs client 下對于并發(fā)調(diào)用是會(huì)有一定的資源競爭,那么我嘗試去調(diào)大下面的參數(shù)
'dfs.namenode.handler.count': '64'
'dfs.datanode.handler.count': '64'
發(fā)現(xiàn)效果不明顯
我們的業(yè)務(wù)其實(shí)對數(shù)據(jù)的
有且僅有一次消費(fèi)保障要求不明顯,因?yàn)槲覀兦逑措娚唐脚_(tái)的成單消息只是對里面的一些收益做一些粗略預(yù)算,就算中間掛掉機(jī)器,這部分?jǐn)?shù)據(jù)我們是可以忽略,況且這種情況風(fēng)險(xiǎn)還是非常非常低,對比的話我們希望的是去掉這種寫HDFS的延遲而盡可能加大消費(fèi)的速度,所以第二個(gè)思路我們有想過寫入宿主機(jī)mount的本地目錄(我們的spark 已經(jīng)上了k8s集群);這個(gè)方案我覺得在對數(shù)據(jù)不敏感的流式處理場景應(yīng)該效率還是蠻高的,比如我們一個(gè)宿主機(jī)大概可以處理超過一般的executors,當(dāng)executor 掛掉的時(shí)候,就算分配不到同樣的宿主機(jī),應(yīng)該也是有機(jī)制可以獲取到mount的目錄(我對k8s了解不深,這只是我的猜測)最壞的結(jié)果是我不需要這批次的數(shù)據(jù)了。把整個(gè)checkpoint 機(jī)制寫到外部存儲(chǔ)去,其實(shí)這樣對整個(gè)計(jì)算環(huán)境也是有好處的,比如節(jié)省大量的內(nèi)存,加個(gè)寫的速度等等,我永遠(yuǎn)相信,我肯定不是第一個(gè)吐糟這個(gè)機(jī)制的人,總有人比你吐糟的早,解決的早,因此還是有不少的人開源了一些實(shí)現(xiàn)機(jī)制,比如下面兄弟實(shí)現(xiàn)了寫入 rocketDB的一個(gè)方案
- 終極大招,自己去實(shí)現(xiàn)
--conf spark.sql.streaming.stateStore.providerClass來覆蓋Spark的默認(rèn)實(shí)現(xiàn)org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider
上面的第3個(gè)點(diǎn)其實(shí)就是自己實(shí)現(xiàn)了一套
--conf spark.sql.streaming.stateStore.providerClass="ru.chermenin.spark.sql.execution.streaming.state.RocksDbStateStoreProvider"
只要照著這個(gè)來做一個(gè),在spark啟動(dòng)是指定這個(gè)實(shí)現(xiàn),即可搞一套完全自定義的checkpoint 機(jī)制,里面的具體接口大家可以直接看源代碼,或者參考我上面的那篇文章,其實(shí)寫的非常詳細(xì)
好了,目前我只想到這幾種思路,如果大家也有遇到同樣的問題,或者你已經(jīng)有更好的辦法的,歡迎推薦。
