Spring Boot 中使用@KafkaListener并發(fā)批量接收消息

轉(zhuǎn)自:https://blog.csdn.net/russle/article/details/81258590

kakfa是我們?cè)陧?xiàng)目開發(fā)中經(jīng)常使用的消息中間件。由于它的寫性能非常高,因此,經(jīng)常會(huì)碰到Kafka消息隊(duì)列擁堵的情況。遇到這種情況時(shí),有時(shí)我們不能直接清理整個(gè)topic,因?yàn)檫€有別的服務(wù)正在使用該topic。因此只能額外啟動(dòng)一個(gè)相同名稱的consumer-group來加快消息消費(fèi)(經(jīng)測(cè)試,如果該topic只有一個(gè)分區(qū),實(shí)際上再啟動(dòng)一個(gè)新的消費(fèi)者,沒有作用)。

具體代碼在這里,歡迎加星號(hào)、fork。

官方文檔在https://docs.spring.io/spring-kafka/reference/html/_reference.html

第一步,并發(fā)消費(fèi)

先看代碼,重點(diǎn)是這我們使用的是ConcurrentKafkaListenerContainerFactory并且設(shè)置了factory.setConcurrency(4); (我的topic有4個(gè)分區(qū),為了加快消費(fèi)將并發(fā)設(shè)置為4,也就是有4個(gè)KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

第二步,批量消費(fèi)

然后是批量消費(fèi)。重點(diǎn)是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一個(gè)設(shè)啟用批量消費(fèi),一個(gè)設(shè)置批量消費(fèi)每次最多消費(fèi)多少條消息記錄。

重點(diǎn)說明一下,我們?cè)O(shè)置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是說如果沒有達(dá)到50條消息,我們就一直等待。官方的解釋是”The maximum number of records returned in a single call to poll().”, 也就是50表示的是一次poll最多返回的記錄數(shù)。

從啟動(dòng)日志中可以看到還有個(gè) max.poll.interval.ms = 300000, 也就說每間隔max.poll.interval.ms我們就調(diào)用一次poll。每次poll最多返回50條記錄。

max.poll.interval.ms官方解釋是”The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. “;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

啟動(dòng)日志截圖


startInfo.png

關(guān)于max.poll.records和max.poll.interval.ms官方解釋截圖:


maxPoll.png

第三步,分區(qū)消費(fèi)

對(duì)于只有一個(gè)分區(qū)的topic,不需要分區(qū)消費(fèi),因?yàn)闆]有意義。下面的例子是針對(duì)有2個(gè)分區(qū)的情況(我的完整代碼中有4個(gè)listenPartitionX方法,我的topic設(shè)置了4個(gè)分區(qū)),讀者可以根據(jù)自己的情況進(jìn)行調(diào)整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

關(guān)于分區(qū)和消費(fèi)者關(guān)系,后面會(huì)補(bǔ)充,先摘錄如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,總結(jié),如果我們的topic有多個(gè)分區(qū),經(jīng)過以上步驟可以很好的加快消息消費(fèi)。如果只有一個(gè)分區(qū),因?yàn)橐呀?jīng)有一個(gè)同名group id在消費(fèi)了,新啟動(dòng)的一個(gè)基本上沒有作用(本人測(cè)試結(jié)果)。

具體代碼在這里,歡迎加星號(hào),fork。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • * A client that consumes records from a Kafka cluster. * ...
    ljglwz閱讀 3,667評(píng)論 0 0
  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,569評(píng)論 19 139
  • Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,152評(píng)論 0 43
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,586評(píng)論 0 34
  • 為誰而來 又為誰而存在 這個(gè)世界有多少崎嶇 又有多少無奈 茫茫人海,我在午夜里徘徊 一切的一切在夜色中淡去 唯有那...
    詩酒人生閱讀 318評(píng)論 2 0

友情鏈接更多精彩內(nèi)容