KafkaListener 批量消費異常時回退偏移量

1.實現(xiàn)BatchErrorHandler

@Component
public  class KafkaBatchExceptionHandler implements BatchErrorHandler {

    @Override
    public void handle(Exception e, ConsumerRecords<?, ?> consumerRecords) {
    }

    @Override
    public void handle(Exception thrownException, ConsumerRecords<?, ?> consumerRecords, Consumer<?, ?> consumer) {
        Map<TopicPartition, Long> offsetsToReset = new HashMap<>();
        consumerRecords.forEach((record) -> {
            offsetsToReset.compute(new TopicPartition(record.topic(), record.partition()),
                    (k, v) -> v == null ? record.offset() : Math.min(v, record.offset()));

        });
        offsetsToReset.forEach((k, v) -> consumer.seek(k, v));

    }


}

2.注冊進KafkaListenerContainerFactory中

   @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(null);
        factory.setBatchListener(true);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(30000);
        factory.setBatchErrorHandler(batchErrorHandler);
        return factory;
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容