MQ系列:
kafka系列一: kafka簡介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:動態(tài)添加監(jiān)聽器
kafka系列五:失敗后重試機制
前言
上次介紹過,在springboot中,使用Kafka時可以使用@KafkaListener很方便的對topic進行監(jiān)聽。但是對于在項目啟動時,動態(tài)增加topic的監(jiān)聽,這種方式就無法實現(xiàn),因此需要一種動態(tài)添加監(jiān)聽topic的方式。
明顯,我們的目標在于如何通過代碼實現(xiàn)和@KafkaListener同樣的效果。要做到這樣,那就必須要了解@KafkaListener的原理。
@KafkaListener的原理
通過閱讀Spring-kafka代碼可以了解到@KafkaListener的工作原理如下圖所示

從圖中不難理解@KafkaListener從啟動到拉取消息的過程,可以看到最終是調(diào)用KafkaMessageListenerContainer的start()方法,啟動線程調(diào)用kafkaConsumer的poll()方法和被注解的方法。
動態(tài)方案
從上面已經(jīng)可以看出最終是調(diào)用KafkaMessageListenerContainer的start()方法進行監(jiān)聽kafka topic的消息,那么我們將動態(tài)變化的kafka配置生成一個KafkaMessageListenerContainer,并啟動即可。
以下源碼是KafkaMessageListenerContainer的構(gòu)造函數(shù)
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties) {
this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
}
因此我們需要構(gòu)建ConsumerFactory和ContainerProperties,對于ConsumerFactory,其實現(xiàn)類為DefaultKafkaConsumerFactory,構(gòu)造函數(shù)為:
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Deserializer<K> keyDeserializer,
@Nullable Deserializer<V> valueDeserializer) {
this.configs = new HashMap<>(configs);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
定義consumerFactory
通過kafka的屬性和序列化方式即可初始化DefaultKafkaConsumerFactory。
因此,定義以下消費者工廠方法
/***
* 消費者工廠
* @param groupId
* @return
*/
private DefaultKafkaConsumerFactory<String, String> consumerFactory(String groupId) {
// consumer配置
Map<String, Object> configMap = new HashMap();
// 采用手動提交的方式
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
configMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
// 序列化
Deserializer<String> stringDeserializer = new StringDeserializer();
return new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
}
定義addKafkaListener
ContainerProperties存放了kafka監(jiān)聽器運行時的相關屬性,因此在初始化后,還需要將kafka的相關屬性賦值進去。
public void addKafkaListener(String topic, String groupId) {
// kafka 消費者
DefaultKafkaConsumerFactory<String, String> factory = consumerFactory(groupId);
// 相關屬性
ContainerProperties props = new ContainerProperties(topic);
// 設置監(jiān)聽器
props.setMessageListener(new CustomerMsgHandler());
props.setGroupId(groupId);
props.setAckMode(ContainerProperties.AckMode.MANUAL);
// 構(gòu)造 ListenerContainer
ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, props);
// 啟動
container.start();
}
定義CustomerMsgHandler
另外,還需要實現(xiàn)AcknowledgingMessageListener接口(onMessage方法),定義自己處理消息的類:
@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
// TODO doSomething
// 前面設置了手動提交ack的方式,這里需要在消息處理完成后提交ack
acknowledgment.acknowledge();
}
}
以上,可以通過讀取配置,實例化KafkaMessageListenerContainer并調(diào)用其start()方法,即可實現(xiàn)動態(tài)kafka topic的監(jiān)聽。
總結(jié)
要實現(xiàn)動態(tài)添加新Topic的監(jiān)聽器,實例化KafkaMessageListenerContainer并調(diào)用其start()方法,即可實現(xiàn)動態(tài)kafka topic的監(jiān)聽。
另外還需要在配置文件中配置kafka的相關屬性。