Spring-KafkaListener使用

1.導(dǎo)包

  <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.7.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.1</version>
        </dependency>

2.Kafka配置類

import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import lvye.java.datacenter.mq.KafkaConsumer;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PropertiesLoaderUtils;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.*;

import java.io.IOException;
import java.util.*;


@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;
    @Value("${kafka.session.timeout.ms}")
    private Integer sessionTimeoutMs;
    @Value("${kafka.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.auto.commit.interval.ms}")
    private Integer autoCommitIntervalMs;
    @Value("${kafka.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.group.id}")
    private String groupId;
    @Value("${kafka.avro.schema.registry.url}")
    private String schemaRegistryUrl;
    @Value("${kafka.max_poll_record}")
    private String  maxPollRecord;

    public static Map<String, Class<?>> config ;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>> kafkaListenerContainerFactory(@Autowired KafkaBatchExceptionHandler batchErrorHandler) {
        ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(null);
        factory.setBatchListener(true);
        factory.getContainerProperties()
                .setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(30000);
        factory.setBatchErrorHandler(batchErrorHandler);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, GenericRecord> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean(name="topicConfigs")
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBootstrapServers);
       props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitIntervalMs);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeoutMs);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 6);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");
        props.put("schema.registry.url", schemaRegistryUrl);
        return props;
    }


    @Bean
    public static HashMap<String,Class<?>> loadTopic () {
        Properties propertie = null;
        HashMap<String, Class<?>> topics = new HashMap<>();
        try {
            propertie = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        Set< Map.Entry<Object,Object>> kvs = propertie.entrySet();
        try {
            for(Map.Entry<Object,Object> kv:kvs){
                topics.put((String)kv.getKey(), Class.forName((String)kv.getValue()));
            }
        }catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        config = topics;
        return topics;
    };

    @Bean
    public KafkaConsumer consumer() {
        return new KafkaConsumer();
    }


}

3.配置文件(application.properties)

kafka.bootstrap.servers=121.102.172.42:9092,121.102.172.42:9092,121.102.172.43:9092
kafka.avro.schema.registry.url=http://121.102.218.111:8081
kafka.session.timeout.ms=300000
kafka.enable.auto.commit=false
kafka.auto.commit.interval.ms=60000
kafka.auto.offset.reset=earliest
kafka.max_poll_record=1000
kafka.group.id=DmSync

3.定義Consumer類

@Component("DcKafkaConsumer")
public class KafkaConsumer {
 
   public String[] getTopics() {
        Properties properties = null;
        try {
            properties = PropertiesLoaderUtils.loadAllProperties("kafka-topic-mapping.properties");
        } catch (IOException e) {
            throw new RuntimeException("load kafka-topic-mapping file error");
        }
        return properties.keySet().toArray(new String[properties.keySet().size()]);
    }

   @KafkaListener(topics = "#{DcKafkaConsumer.getTopics()}")
    public void listen(List<ConsumerRecord<String, GenericRecord>> msgs, Acknowledgment ack) {
        msgs.forEach(p -> dealMsg(p, session));
        ack.acknowledge();
    }

      private void dealMsg(ConsumerRecord<String, GenericRecord> msg) {
        Object o;
        String opt = msg.value().get(operate).toString();
        String snapshot = "r";
        if (insert.equals(opt) || update.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        } else if (delete.equals(opt)) {
            o = JSON.parseObject(msg.value().get("before").toString(), KafkaConfig.config.get(msg.topic()));
           //todo
        } else if (snapshot.equals(opt)) {
            o = JSON.parseObject(msg.value().get("after").toString(), KafkaConfig.config.get(msg.topic()));
            //todo
        }
    }


}

4.定義kafka-topic與實(shí)體類映射文件kafka-topic-mapping.properties
內(nèi)容

xxx-topic1=類1全路徑
xxx-topic2=類2全路徑
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Spring Cloud為開發(fā)人員提供了快速構(gòu)建分布式系統(tǒng)中一些常見模式的工具(例如配置管理,服務(wù)發(fā)現(xiàn),斷路器,智...
    卡卡羅2017閱讀 136,506評論 19 139
  • 1. 概念 介紹 Kafka是一個(gè)分布式的、可分區(qū)的、可復(fù)制的消息系統(tǒng)。它提供了普通消息系統(tǒng)的功能,但具有自己獨(dú)特...
    EmmaQin閱讀 6,045評論 0 1
  • 之前自己寫過一篇入門文章kafka簡單入門及與spring boot整合,主要是結(jié)合kafka官方的文檔入門,學(xué)習(xí)...
    非典型_程序員閱讀 3,072評論 0 3
  • 我是黑夜里大雨紛飛的人啊 1 “又到一年六月,有人笑有人哭,有人歡樂有人憂愁,有人驚喜有人失落,有的覺得收獲滿滿有...
    陌忘宇閱讀 8,814評論 28 54
  • 信任包括信任自己和信任他人 很多時(shí)候,很多事情,失敗、遺憾、錯(cuò)過,源于不自信,不信任他人 覺得自己做不成,別人做不...
    吳氵晃閱讀 6,355評論 4 8

友情鏈接更多精彩內(nèi)容