一、任務(wù)背景
上游Kafka消息量:60W/S,64個(gè)分區(qū)
Streaming資源:32Executor(堆內(nèi)1G 堆外2G) 64Threads
Batch窗口:300S
二、解決過(guò)程
1、故障描述
spark streaming任務(wù)運(yùn)行失敗,故障日志如下:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:999)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
2、故障分析
上述錯(cuò)誤日志含義:消費(fèi)者在處理完一批poll的消息后,在同步提交偏移量給broker時(shí)報(bào)的錯(cuò)。初步分析日志是由于當(dāng)前消費(fèi)者線程消費(fèi)的分區(qū)已經(jīng)被broker給回收了。
Kafka消費(fèi)者配置參數(shù):
max.poll.interval.ms = 600000
Streaming相關(guān)配置參數(shù):
spark.streaming.kafka.maxRatePerPartition=50000
任務(wù)停止后再啟動(dòng),因上游積壓了大量數(shù)據(jù),安裝上述配置啟動(dòng)時(shí)每批最大的數(shù)據(jù)量為960,000,000【maxRatePerPartition的值 * kafka分區(qū)數(shù) * 窗口時(shí)間】,某次重啟運(yùn)行圖如下:

大概耗時(shí)11分鐘,同時(shí)本例中設(shè)置max.poll.interval.ms = 600000【該屬性為kafka消費(fèi)者在每一輪poll()調(diào)用之間的最大延遲時(shí)長(zhǎng),消費(fèi)者在獲取更多記錄之前可以空閑的時(shí)間量的上限。如果此超時(shí)時(shí)間期滿之前poll()沒(méi)有被再次調(diào)用,則消費(fèi)者被視為失敗,并且分組將重新平衡?!?,綜合上述配置結(jié)果會(huì)出現(xiàn)因?yàn)槟撑翁幚頃r(shí)長(zhǎng)超過(guò)kafka等待poll的最大時(shí)長(zhǎng)導(dǎo)致kafka集群進(jìn)行rebalance,和本例的故障現(xiàn)象吻合。
3、解決方案
(a)延長(zhǎng)Poll等待時(shí)長(zhǎng)
(b)降低批次最大數(shù)據(jù)處理量
調(diào)整kafka相關(guān)參數(shù)如下:
max.poll.interval.ms = 1200000
spark.streaming.kafka.maxRatePerPartition=15000
運(yùn)行效果如下:
參數(shù)優(yōu)化后效果.png
