最近參與一個公司大數(shù)據(jù)項目開始入坑Spark,Spark從2.0開始從RDD 的底層API轉(zhuǎn)向了面向Dataset/Dataframe 的高級API,Spark Streaming 也換成了Structured Streaming,而我們用的是2.4,帶上了watermark 功能,對流媒體的處理算是圓滿了。
如果希望從框架結構方面了解1.x 和2.x 的差異,我比較推薦下面這個騰訊團隊寫的wiki
另外下面這篇總結對于想了解2.4 特性也是非常不錯
而從實際的項目來說,實踐了一段時間,還是遇到了幾個坑,這里逐一分享
dropDuplicates 算子
這里我就不累贅舉例這個算子怎么用了,西面直接說說用這個算子的話小心3點
1) 如果你的項目是用了window機制的話,那么去重的語義一般可能就是說在同一個window下去重,這個語義是比較合理的,那么如果你是需要在同一個window下對數(shù)據(jù)去重的話,參數(shù)是提供window 列就可以了,注意有些人是提供 $timestamp 列是不太對的,但是還有一點要注意,調(diào)用這個算子并不是每次只操作一個window,而是hold在你內(nèi)存的所有消息,因此自己去衡量一下性能。我最終沒有用這個算子主要是因為下面這個點
2)dropDuplicates目前只能做keepFirst,而我們的業(yè)務需要可能需要做的是keepLask,目前我沒有發(fā)現(xiàn)有辦法可以實現(xiàn)keepLask。
3)看它源碼可以知道,其實里面就是做了一次的repartition + reduceByKey, 這里最最最要命的就是它需要shuffle,而我們的業(yè)務其實在每個kafka 的partition已經(jīng)做過一次了,這樣的話其實每個分區(qū)自己來做去重就好了,而Spark 又沒有諸如 dropDuplicatesWithinPartition() 的方法,因此該算子滿足不了需求,目前我還沒找到很完美的方法,配合用Apache Collection API 基本可以最少限度的破壞數(shù)據(jù)流處理的語義,可以參考下面的代碼
todayDs.mapPartitions(it -> {
Set<Long> gidSet = new HashSet<>(3000);
return IteratorUtils.filteredIterator(it,
item -> {
if(gidSet.contains(item.getGlobalId())) {
return false;
} else {
gidSet.add(item.getGlobalId());
return true;
}
});
},
......
在Structured Streaming with window watermark 下如何提交consumed Kafka 的offset
可以說這個是我遇到最頭疼的問題,google了非常多的材料,并且也咨詢過一些大廠玩 spark streaming的團隊,也沒有一個比較好的方案,可能也是由于我們業(yè)務本身的特點,無法提供到一個冪等的sink() 語義,因此就使得很難找到一個合理的點去submit Kafka的 offset。
從Spark 2.x Structured Streaming 提供了一些API,可以幫助用戶實現(xiàn)這種 end-to-end exactly-once fault-tolerance guarantees 的語義,這個可以參考一下SSS的官方文檔,上面有些介紹,而我大概總結了一下,可以這樣去理解:
check point 機制,我理解這個應該是一個不down機的機制,當需要去替換一些 worker 或者rolling restart 一些worker的時候,Spark應該是可以從內(nèi)部的一些checkpoint 恢復出當前工作的taskId或者說是batchId,從而可以恢復之前中斷的作業(yè),因此這個狀態(tài)下是不需要考慮作業(yè)的中斷或者重哪里開始重新消費的問題
而有一些場景比如我們發(fā)版時往往是需要整個集群restart的時候,就需要一個機制來界定上次消費的offset了,這個問題在下面這個 Stack Overflow 是討論的比較全面的,而結論很不幸是無法達到我想要的目的
How to get Kafka offsets for structured query for manual and reliable offset management?
其中一個解決方案是采用2.x 提供的Listener機制,在processing 的時候自行去commit 一下offset 到外部持久化地方
這里有兩個致命的問題
因為啟用了watermark, 消費了地方并不是這次sink 的批次,簡單來說,假設window 是1分鐘,watermark 是1分鐘,那么其實我們洗的數(shù)據(jù)是2分鐘之前的數(shù)據(jù),如果我們在上面的listener 上提交了最新的offset,其實重啟后我們將會丟失大概2分鐘的數(shù)據(jù)
-
我假設上面的1 理論上是可以解決,那么下一個問題,還是watermark,因為用了watermark,消息就變相變成了“亂序”,這個怎么理解呢,我們借下面這個圖看看
我們一般的預期是從管道的左邊往右邊順序消費的,但是watermark破壞了這個規(guī)則,假設當前window正在清洗藍色框的數(shù)據(jù),那么我們期望應該是有個機制來標記 j,k 的offset,那么下次就算有問題,我們從這里開始消費就可以了,然而假設現(xiàn)在來了一個遲到的數(shù)據(jù)m,那么其實它是會歸并到了藍色的window下一起來洗數(shù)據(jù)的,而這時你很可能記錄的是m的下標,那么這時如果是down機的話我們就會丟失了 j 到 m 中間的所有數(shù)據(jù)了
因此這個問題我想到最終要實現(xiàn)的話必須要滿足2點
- 我們要記錄window的起始位置而不是末端位置
- 這個window的 sink 必須是冪等的
如果大家還有什么可以實現(xiàn)的方案麻煩告知
foreachBatch 是單線程的
這里我們重新溫習一下 Structured Steaming的整個流程
- 從source 源源不斷地去撈數(shù)據(jù)進來
- 在processing 里其實不管你用了什么算子也好,shuffle也好,SQL view 也好,你看到的其實都是整個內(nèi)存中所有沒有expired 的數(shù)據(jù)
- 最后sink 階段拿到的source,其實是滿足了 watermark 后expired了洗出來的結果集
那么foreachBatch 其實是把所有的partition的結果集再匯總回去driver,由driver來處理,而我們項目由于sink的數(shù)據(jù)比較大,并且是非常重的DB操作,因此導致最后這個階段非常耗時,上到幾十秒都有,而這個東西又阻塞了下一次JOB 的啟動,因此就造成了我們的JOB的調(diào)度頻率非常低下

最后就是拋棄了這個方法,仍然采用 foreach 算子,在里面匯總了一批 partition的數(shù)據(jù)一個批次提交,實測性能提升了好幾倍。
最后
- 剩下要注意就是一些編程原則問題了,比如
- 盡量減少RDD 之間轉(zhuǎn)換時一些大對象的大量創(chuàng)建
- 盡量減少破壞流式處理的編程風格
- 想辦法減少串行處理的耗時,盡量并行化
這些在Spark UI 上可視化信息都非常直觀,照著來優(yōu)化就可以了。
