kafka系列四:動態(tài)添加監(jiān)聽器


MQ系列:
kafka系列一: kafka簡介
kafka系列二: kafka部署
kafka系列三: Spring kafka
kafka系列四:動態(tài)添加監(jiān)聽器
kafka系列五:失敗后重試機制


前言

上次介紹過,在springboot中,使用Kafka時可以使用@KafkaListener很方便的對topic進行監(jiān)聽。但是對于在項目啟動時,動態(tài)增加topic的監(jiān)聽,這種方式就無法實現(xiàn),因此需要一種動態(tài)添加監(jiān)聽topic的方式。

明顯,我們的目標在于如何通過代碼實現(xiàn)和@KafkaListener同樣的效果。要做到這樣,那就必須要了解@KafkaListener的原理。

@KafkaListener的原理

通過閱讀Spring-kafka代碼可以了解到@KafkaListener的工作原理如下圖所示

@KafkaListener的工作原理

從圖中不難理解@KafkaListener從啟動到拉取消息的過程,可以看到最終是調(diào)用KafkaMessageListenerContainer的start()方法,啟動線程調(diào)用kafkaConsumer的poll()方法和被注解的方法。

動態(tài)方案

從上面已經(jīng)可以看出最終是調(diào)用KafkaMessageListenerContainer的start()方法進行監(jiān)聽kafka topic的消息,那么我們將動態(tài)變化的kafka配置生成一個KafkaMessageListenerContainer,并啟動即可。
以下源碼是KafkaMessageListenerContainer的構(gòu)造函數(shù)

public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
            ContainerProperties containerProperties) {

        this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
    }

因此我們需要構(gòu)建ConsumerFactory和ContainerProperties,對于ConsumerFactory,其實現(xiàn)類為DefaultKafkaConsumerFactory,構(gòu)造函數(shù)為:

public DefaultKafkaConsumerFactory(Map<String, Object> configs,
            @Nullable Deserializer<K> keyDeserializer,
            @Nullable Deserializer<V> valueDeserializer) {
        this.configs = new HashMap<>(configs);
        this.keyDeserializer = keyDeserializer;
        this.valueDeserializer = valueDeserializer;
    }

定義consumerFactory

通過kafka的屬性和序列化方式即可初始化DefaultKafkaConsumerFactory。
因此,定義以下消費者工廠方法

    /***
     * 消費者工廠
     * @param groupId
     * @return
     */
    private DefaultKafkaConsumerFactory<String, String> consumerFactory(String groupId) {
        // consumer配置
        Map<String, Object> configMap = new HashMap();
        // 采用手動提交的方式
        configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enable_auto_commit);
        configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
        configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, auto_commit_interval);
        configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        configMap.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());

        // 序列化
        Deserializer<String> stringDeserializer = new StringDeserializer();

        return new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
    }

定義addKafkaListener

ContainerProperties存放了kafka監(jiān)聽器運行時的相關屬性,因此在初始化后,還需要將kafka的相關屬性賦值進去。

public void addKafkaListener(String topic, String groupId) {
        // kafka 消費者
        DefaultKafkaConsumerFactory<String, String> factory = consumerFactory(groupId);
        
        // 相關屬性
        ContainerProperties props = new ContainerProperties(topic);

        // 設置監(jiān)聽器
        props.setMessageListener(new CustomerMsgHandler());
        props.setGroupId(groupId);
        props.setAckMode(ContainerProperties.AckMode.MANUAL);

        // 構(gòu)造 ListenerContainer
        ConcurrentMessageListenerContainer<String, String> container = new ConcurrentMessageListenerContainer<>(factory, props);
        // 啟動
        container.start();
    }

定義CustomerMsgHandler

另外,還需要實現(xiàn)AcknowledgingMessageListener接口(onMessage方法),定義自己處理消息的類:

@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {

    @Override
    public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
        // TODO doSomething

        // 前面設置了手動提交ack的方式,這里需要在消息處理完成后提交ack
        acknowledgment.acknowledge();
    }
}

以上,可以通過讀取配置,實例化KafkaMessageListenerContainer并調(diào)用其start()方法,即可實現(xiàn)動態(tài)kafka topic的監(jiān)聽。

總結(jié)

要實現(xiàn)動態(tài)添加新Topic的監(jiān)聽器,實例化KafkaMessageListenerContainer并調(diào)用其start()方法,即可實現(xiàn)動態(tài)kafka topic的監(jiān)聽。

另外還需要在配置文件中配置kafka的相關屬性。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容