在使用時(shí)Kafka時(shí),經(jīng)常遇到大批量消息在隊(duì)列中,如果一個(gè)消息一個(gè)消息的消費(fèi)的話效率太低下了,所以批量消費(fèi)消息是很有必要的,廢話不多數(shù),直接上代碼。
批量監(jiān)聽(tīng)器
從版本1.1開(kāi)始,@KafkaListener可以被配置為批量接收從Kafka話題隊(duì)列中的Message。要配置監(jiān)聽(tīng)器容器工廠以創(chuàng)建批處理偵聽(tīng)器,需要設(shè)置batchListener屬性為true,代碼如下:
@Bean
KafkaListenerContainerFactory<?> batchFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new
ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
factory.setBatchListener(true); // 開(kāi)啟批量監(jiān)聽(tīng)
return factory;
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); //設(shè)置每次接收Message的數(shù)量
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
批量接收
在@KafkaListener注解中聲明工廠為batchFactory().
@KafkaListener(topics = "teemo", id = "consumer", containerFactory = "batchFactory")
public void listen(List<ConsumerRecord<?, ?>> list) {
List<String> messages = new ArrayList<>();
for (ConsumerRecord<?, ?> record : list) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
// 獲取消息
kafkaMessage.ifPresent(o -> messages.add(o.toString()));
}
if (messages.size() > 0) {
// 更新索引
updateES(messages);
}
}