Springboot注解@KafkaListener實(shí)現(xiàn)Kafka批量消費(fèi)

在使用時(shí)Kafka時(shí),經(jīng)常遇到大批量消息在隊(duì)列中,如果一個(gè)消息一個(gè)消息的消費(fèi)的話效率太低下了,所以批量消費(fèi)消息是很有必要的,廢話不多數(shù),直接上代碼。

批量監(jiān)聽(tīng)器

從版本1.1開(kāi)始,@KafkaListener可以被配置為批量接收從Kafka話題隊(duì)列中的Message。要配置監(jiān)聽(tīng)器容器工廠以創(chuàng)建批處理偵聽(tīng)器,需要設(shè)置batchListener屬性為true,代碼如下:

@Bean
KafkaListenerContainerFactory<?> batchFactory() {
  ConcurrentKafkaListenerContainerFactory<String, String> factory = new 
        ConcurrentKafkaListenerContainerFactory<>();
  factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
  factory.setBatchListener(true); // 開(kāi)啟批量監(jiān)聽(tīng)
  return factory;
}

@Bean
public Map<String, Object> consumerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
  props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //設(shè)置每次接收Message的數(shù)量
  props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
  props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
  props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
  props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
  return props;
}
批量接收

在@KafkaListener注解中聲明工廠為batchFactory().

@KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> list) {
  List<String> messages = new ArrayList<>();
  for (ConsumerRecord<?, ?> record : list) {
    Optional<?> kafkaMessage = Optional.ofNullable(record.value());
    // 獲取消息
    kafkaMessage.ifPresent(o -> messages.add(o.toString()));
  }
  if (messages.size() > 0) {
    // 更新索引
    updateES(messages);
  }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,915評(píng)論 13 425
  • Spring Cloud為開(kāi)發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見(jiàn)模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,694評(píng)論 19 139
  • Kafka簡(jiǎn)介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計(jì)目標(biāo)如下: 以時(shí)間復(fù)雜度為O(1)的方...
    Alukar閱讀 3,155評(píng)論 0 43
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡(jiǎn)介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,592評(píng)論 0 34
  • 已記不得是什么情況下看到不寫(xiě)就出局的,當(dāng)初的想法也越來(lái)越模糊了 還好,有個(gè)機(jī)會(huì)讓自己靜靜地想想當(dāng)初...
    張婷_amy閱讀 344評(píng)論 0 0

友情鏈接更多精彩內(nèi)容