Spark Structured Streaming 2.4 踩的一些坑

最近參與一個公司大數(shù)據(jù)項目開始入坑Spark,Spark從2.0開始從RDD 的底層API轉(zhuǎn)向了面向Dataset/Dataframe 的高級API,Spark Streaming 也換成了Structured Streaming,而我們用的是2.4,帶上了watermark 功能,對流媒體的處理算是圓滿了。

如果希望從框架結構方面了解1.x 和2.x 的差異,我比較推薦下面這個騰訊團隊寫的wiki

CoolplaySpark

另外下面這篇總結對于想了解2.4 特性也是非常不錯

《Spark Structured Streaming》 官方文檔解讀

而從實際的項目來說,實踐了一段時間,還是遇到了幾個坑,這里逐一分享

dropDuplicates 算子

這里我就不累贅舉例這個算子怎么用了,西面直接說說用這個算子的話小心3點

1) 如果你的項目是用了window機制的話,那么去重的語義一般可能就是說在同一個window下去重,這個語義是比較合理的,那么如果你是需要在同一個window下對數(shù)據(jù)去重的話,參數(shù)是提供window 列就可以了,注意有些人是提供 $timestamp 列是不太對的,但是還有一點要注意,調(diào)用這個算子并不是每次只操作一個window,而是hold在你內(nèi)存的所有消息,因此自己去衡量一下性能。我最終沒有用這個算子主要是因為下面這個點

2)dropDuplicates目前只能做keepFirst,而我們的業(yè)務需要可能需要做的是keepLask,目前我沒有發(fā)現(xiàn)有辦法可以實現(xiàn)keepLask。

3)看它源碼可以知道,其實里面就是做了一次的repartition + reduceByKey, 這里最最最要命的就是它需要shuffle,而我們的業(yè)務其實在每個kafka 的partition已經(jīng)做過一次了,這樣的話其實每個分區(qū)自己來做去重就好了,而Spark 又沒有諸如 dropDuplicatesWithinPartition() 的方法,因此該算子滿足不了需求,目前我還沒找到很完美的方法,配合用Apache Collection API 基本可以最少限度的破壞數(shù)據(jù)流處理的語義,可以參考下面的代碼

                todayDs.mapPartitions(it -> {
                    Set<Long> gidSet = new HashSet<>(3000);
                    return IteratorUtils.filteredIterator(it,
                            item -> {
                                if(gidSet.contains(item.getGlobalId())) {
                                    return false;
                                } else {
                                    gidSet.add(item.getGlobalId());
                                    return true;
                                }
                            });
                },
                ......

在Structured Streaming with window watermark 下如何提交consumed Kafka 的offset

可以說這個是我遇到最頭疼的問題,google了非常多的材料,并且也咨詢過一些大廠玩 spark streaming的團隊,也沒有一個比較好的方案,可能也是由于我們業(yè)務本身的特點,無法提供到一個冪等的sink() 語義,因此就使得很難找到一個合理的點去submit Kafka的 offset。

從Spark 2.x Structured Streaming 提供了一些API,可以幫助用戶實現(xiàn)這種 end-to-end exactly-once fault-tolerance guarantees 的語義,這個可以參考一下SSS的官方文檔,上面有些介紹,而我大概總結了一下,可以這樣去理解:

  1. check point 機制,我理解這個應該是一個不down機的機制,當需要去替換一些 worker 或者rolling restart 一些worker的時候,Spark應該是可以從內(nèi)部的一些checkpoint 恢復出當前工作的taskId或者說是batchId,從而可以恢復之前中斷的作業(yè),因此這個狀態(tài)下是不需要考慮作業(yè)的中斷或者重哪里開始重新消費的問題

  2. 而有一些場景比如我們發(fā)版時往往是需要整個集群restart的時候,就需要一個機制來界定上次消費的offset了,這個問題在下面這個 Stack Overflow 是討論的比較全面的,而結論很不幸是無法達到我想要的目的

How to get Kafka offsets for structured query for manual and reliable offset management?

其中一個解決方案是采用2.x 提供的Listener機制,在processing 的時候自行去commit 一下offset 到外部持久化地方

這里有兩個致命的問題

  • 因為啟用了watermark, 消費了地方并不是這次sink 的批次,簡單來說,假設window 是1分鐘,watermark 是1分鐘,那么其實我們洗的數(shù)據(jù)是2分鐘之前的數(shù)據(jù),如果我們在上面的listener 上提交了最新的offset,其實重啟后我們將會丟失大概2分鐘的數(shù)據(jù)

  • 我假設上面的1 理論上是可以解決,那么下一個問題,還是watermark,因為用了watermark,消息就變相變成了“亂序”,這個怎么理解呢,我們借下面這個圖看看


我們一般的預期是從管道的左邊往右邊順序消費的,但是watermark破壞了這個規(guī)則,假設當前window正在清洗藍色框的數(shù)據(jù),那么我們期望應該是有個機制來標記 j,k 的offset,那么下次就算有問題,我們從這里開始消費就可以了,然而假設現(xiàn)在來了一個遲到的數(shù)據(jù)m,那么其實它是會歸并到了藍色的window下一起來洗數(shù)據(jù)的,而這時你很可能記錄的是m的下標,那么這時如果是down機的話我們就會丟失了 j 到 m 中間的所有數(shù)據(jù)了

因此這個問題我想到最終要實現(xiàn)的話必須要滿足2點

  • 我們要記錄window的起始位置而不是末端位置
  • 這個window的 sink 必須是冪等的

如果大家還有什么可以實現(xiàn)的方案麻煩告知

foreachBatch 是單線程的

這里我們重新溫習一下 Structured Steaming的整個流程

  • 從source 源源不斷地去撈數(shù)據(jù)進來
  • 在processing 里其實不管你用了什么算子也好,shuffle也好,SQL view 也好,你看到的其實都是整個內(nèi)存中所有沒有expired 的數(shù)據(jù)
  • 最后sink 階段拿到的source,其實是滿足了 watermark 后expired了洗出來的結果集

那么foreachBatch 其實是把所有的partition的結果集再匯總回去driver,由driver來處理,而我們項目由于sink的數(shù)據(jù)比較大,并且是非常重的DB操作,因此導致最后這個階段非常耗時,上到幾十秒都有,而這個東西又阻塞了下一次JOB 的啟動,因此就造成了我們的JOB的調(diào)度頻率非常低下


最后就是拋棄了這個方法,仍然采用 foreach 算子,在里面匯總了一批 partition的數(shù)據(jù)一個批次提交,實測性能提升了好幾倍。

最后

  • 剩下要注意就是一些編程原則問題了,比如
  • 盡量減少RDD 之間轉(zhuǎn)換時一些大對象的大量創(chuàng)建
  • 盡量減少破壞流式處理的編程風格
  • 想辦法減少串行處理的耗時,盡量并行化

這些在Spark UI 上可視化信息都非常直觀,照著來優(yōu)化就可以了。

最后關于 Spark 2 非常值得推薦的一本書就是 《Spark - The Definitive Guide - Big data processing made simple》 可惜現(xiàn)在只有英文版,看起來非常費勁,希望早日有大蝦翻譯成中文。
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容