spark streaming程序中kafka rebalance原因排查

1.Rebalance 的觸發(fā)條件有3個。

  • 組成員個數(shù)發(fā)生變化。例如有新的 consumer 實例加入該消費組或者離開組
  • 訂閱的 Topic 個數(shù)發(fā)生變化
  • 訂閱的 Topic 的分區(qū)數(shù)發(fā)生變化

2. 問題排查

問題一:使用的spark streaming消費kafka,batch interval設置為30s和180時,沒有出現(xiàn)問題。當設置到300s后,發(fā)現(xiàn)streaming每隔300s都會進行一次rebalance

原因:max.poll.interval.ms 設置問題。
max.poll.interval.ms是設置consumer每次拉去間隔時長,默認300s。
也是說,consumer兩次提交offset的間隔不能大于300s,如果大于這個值,kafka會在300s到達時,直接進行Rebalance。

解決辦法:修改max.poll.interval.ms值為600s

問題二:多個spark streaming程序,每個消費不同topic,當有streaming程序重啟后,其中的kafka consumer沒有立馬消費數(shù)據,而是等待幾分鐘后才會入組,然后開始消費數(shù)據

問題原因:不同的streaming程序中的kafka 共用了一個group id。
啟動streaming程序后,consumer 要入組,但是consumerGroup并不會立刻進行Rebalance,而是等組內的其他消費組提交一次offset后,再一起進行Rebalance。

解決方法:不同的streaming程序,使用不同的group id

3. 補充:

1. 實際工作中,為了保證kafka消費的穩(wěn)定性,一般會適當修改以下參數(shù)

request.timeout.ms 60 * 1000 * 4
max.poll.interval.ms 60 * 1000 * 6
session.timeout.ms 60 * 1000 * 3
heartbeat.interval.ms 60 * 1000 * 2

2. spark streaming中offset提交時機

spark streaming中的consumer提交offset的間隔,其實是固定的,即batch interval。
例如batch interval為300s,但數(shù)據處理在10s時就處理完成了,但steaming會等到300s時,再提交偏移量。
同樣,當處理時間大于batch interval,kafka也會在3
00s時進行一次offset的提交,只不過此時提交的offset是拉取的開始值。

所以,對于spark streaming來說,不需要擔心處理時間過短和無限長的問題,只要保證batch interval 小于 max.poll.interval.ms即可。

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容