1.Rebalance 的觸發(fā)條件有3個。
- 組成員個數(shù)發(fā)生變化。例如有新的 consumer 實例加入該消費組或者離開組
- 訂閱的 Topic 個數(shù)發(fā)生變化
- 訂閱的 Topic 的分區(qū)數(shù)發(fā)生變化
2. 問題排查
問題一:使用的spark streaming消費kafka,batch interval設置為30s和180時,沒有出現(xiàn)問題。當設置到300s后,發(fā)現(xiàn)streaming每隔300s都會進行一次rebalance
原因:max.poll.interval.ms 設置問題。
max.poll.interval.ms是設置consumer每次拉去間隔時長,默認300s。
也是說,consumer兩次提交offset的間隔不能大于300s,如果大于這個值,kafka會在300s到達時,直接進行Rebalance。
解決辦法:修改max.poll.interval.ms值為600s
問題二:多個spark streaming程序,每個消費不同topic,當有streaming程序重啟后,其中的kafka consumer沒有立馬消費數(shù)據,而是等待幾分鐘后才會入組,然后開始消費數(shù)據
問題原因:不同的streaming程序中的kafka 共用了一個group id。
啟動streaming程序后,consumer 要入組,但是consumerGroup并不會立刻進行Rebalance,而是等組內的其他消費組提交一次offset后,再一起進行Rebalance。
解決方法:不同的streaming程序,使用不同的group id
3. 補充:
1. 實際工作中,為了保證kafka消費的穩(wěn)定性,一般會適當修改以下參數(shù)
request.timeout.ms 60 * 1000 * 4
max.poll.interval.ms 60 * 1000 * 6
session.timeout.ms 60 * 1000 * 3
heartbeat.interval.ms 60 * 1000 * 2
2. spark streaming中offset提交時機
spark streaming中的consumer提交offset的間隔,其實是固定的,即batch interval。
例如batch interval為300s,但數(shù)據處理在10s時就處理完成了,但steaming會等到300s時,再提交偏移量。
同樣,當處理時間大于batch interval,kafka也會在3
00s時進行一次offset的提交,只不過此時提交的offset是拉取的開始值。
所以,對于spark streaming來說,不需要擔心處理時間過短和無限長的問題,只要保證batch interval 小于 max.poll.interval.ms即可。