Spring-Kafka(二)—— 快速入門,只需三秒

創(chuàng)建工程

這里需要注意一下,我們導(dǎo)入的Spring-Kafka為2.1.8版本,SpringBoot為2.0.4的正式版,請保持版本一致、




好了,已經(jīng)三秒了,真男人,你可以關(guān)閉屏幕冷靜一下了,停止你那顫抖的身體。


編寫第一個Demo

實現(xiàn)順序

  1. 創(chuàng)建消費者和生產(chǎn)者的Map配置
  2. 根據(jù)Map配置創(chuàng)建對應(yīng)的消費者工廠(consumerFactory)和生產(chǎn)者工廠(producerFactory)
  3. 根據(jù)consumerFactory創(chuàng)建監(jiān)聽器的監(jiān)聽器工廠
  4. 根據(jù)producerFactory創(chuàng)建KafkaTemplate(Kafka操作類)
  5. 創(chuàng)建監(jiān)聽容器

先給你們瞄一眼項目結(jié)構(gòu),記得把Kafka 啟動...


項目結(jié)構(gòu)

創(chuàng)建KafkaConfiguration配置類

都是一些配置參數(shù),具體的作用也在代碼中寫明了,值得注意的是,KafkaTemplate的類型為<Integer,String>,我們可以找kafkaTemplate的send方法,有多個重載方法,其中有個方法如下,key和data參數(shù)都為泛型,這其實就是對應(yīng)著KafkaTemplate<Integer,String>。那具體有什么用呢,還記得我們的Topic中可以包含多個Partition(分區(qū))嗎,那我們?nèi)绻幌胧謩又付òl(fā)送到哪個分區(qū),我們則可以利用key去實現(xiàn)。這里我們的key是Integer類型,template會根據(jù) key 路由到對應(yīng)的partition中,如果key存在對應(yīng)的partitionID則發(fā)送到該partition中,否則由算法選擇發(fā)送到哪個partition。

    public ListenableFuture<SendResult<K, V>> send(String topic, K key, V data) {
        ProducerRecord<K, V> producerRecord = new ProducerRecord(topic, key, data);
        return this.doSend(producerRecord);
    }
@Configuration
@EnableKafka
public class KafkaConfiguration {

    //ConcurrentKafkaListenerContainerFactory為創(chuàng)建Kafka監(jiān)聽器的工程類,這里只配置了消費者
    @Bean
    public ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    //根據(jù)consumerProps填寫的參數(shù)創(chuàng)建消費者工廠
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerProps());
    }

    //根據(jù)senderProps填寫的參數(shù)創(chuàng)建生產(chǎn)者工廠
    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(senderProps());
    }

    //kafkaTemplate實現(xiàn)了Kafka發(fā)送接收等功能
    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        KafkaTemplate template = new KafkaTemplate<Integer, String>(producerFactory());
        return template;
    }

    //消費者配置參數(shù)
    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        //連接地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //GroupID
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "bootKafka");
        //是否自動提交
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        //自動提交的頻率
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        //Session超時設(shè)置
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        //鍵的反序列化方式
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        //值的反序列化方式
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    //生產(chǎn)者配置
    private Map<String, Object> senderProps (){
        Map<String, Object> props = new HashMap<>();
        //連接地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //重試,0為不啟用重試機制
        props.put(ProducerConfig.RETRIES_CONFIG, 1);
        //控制批處理大小,單位為字節(jié)
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //批量發(fā)送,延遲為1毫秒,啟用該功能能有效減少生產(chǎn)者發(fā)送消息次數(shù),從而提高并發(fā)量
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        //生產(chǎn)者可以使用的總內(nèi)存字節(jié)來緩沖等待發(fā)送到服務(wù)器的記錄
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
        //鍵的序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        //值的序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

}


創(chuàng)建DemoListener消費者

這里的消費者其實就是一個監(jiān)聽類,指定監(jiān)聽名為topic.quick.demo的Topic,consumerID為demo。

@Component
public class DemoListener {

    private static final Logger log= LoggerFactory.getLogger(DemoListener.class);

    //聲明consumerID為demo,監(jiān)聽topicName為topic.quick.demo的Topic
    @KafkaListener(id = "demo", topics = "topic.quick.demo")
    public void listen(String msgData) {
        log.info("demo receive : "+msgData);
    }
}


創(chuàng)建測試類

這里的send方法第一參數(shù)為TopicName,第二個參數(shù)則是發(fā)送的數(shù)據(jù)

@SpringBootTest
@RunWith(SpringRunner.class)
public class DemoTest {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void testDemo() throws InterruptedException {
        kafkaTemplate.send("topic.quick.demo", "this is my first demo");
        //休眠5秒,為了使監(jiān)聽器有足夠的時間監(jiān)聽到topic的數(shù)據(jù)
        Thread.sleep(5000);
    }
}

接下來直接運行這個測試方法,我們可以看到日志中輸出了我們發(fā)送的消息,這就代表我們成功的消費了測試方法中發(fā)送的消息。

2018-09-06 17:26:20.850  INFO 6232 --- [     demo-0-C-1] com.viu.kafka.listen.DemoListener        : demo receive : this is my first demo


啟動項目

看清楚了是啟動項目,不是測試類,我們來觀察一下控制臺的輸出日志

首先這個是KafkaConsumer的配置信息,每個消費者都會輸出該配置信息,配置太多就不做講解了

2018-09-06 17:40:15.258  INFO 9944 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 100
    auto.offset.reset = latest
    bootstrap.servers = [localhost:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = demo
    heartbeat.interval.ms = 3000
    interceptor.classes = null
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.IntegerDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 15000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer

2018-09-06 17:40:15.274  INFO 9944 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.2
2018-09-06 17:40:15.274  INFO 9944 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : 2a121f7b1d402825



這些日志就代表我們成功的創(chuàng)建了Consumer,由于沒有做并發(fā)配置,所以現(xiàn)在為單個消費者模式,系統(tǒng)會做一個分配Partition的操作,也就是將某個Partition指定給某個消費者消費。 這里有個地方需要注意一下,
看到日志中有輸出[Consumer clientId=consumer-1, groupId=demo],我們之前在監(jiān)聽中@KafkaListener注解中配置的id=demo,怎么就變成了groupId=demo,這是因為@KafkaListener注解如果沒有指定groupId這個屬性的值,則會默認(rèn)把id作為groupId。

2018-09-06 17:40:15.287  INFO 9944 --- [     demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=demo] Discovered group coordinator admin-PC:9092 (id: 2147483647 rack: null)
2018-09-06 17:40:15.290  INFO 9944 --- [     demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=demo] Revoking previously assigned partitions []
2018-09-06 17:40:15.290  INFO 9944 --- [     demo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2018-09-06 17:40:15.290  INFO 9944 --- [     demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=demo] (Re-)joining group
2018-09-06 17:40:15.301  INFO 9944 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2018-09-06 17:40:15.302  INFO 9944 --- [     demo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=demo] Successfully joined group with generation 33
2018-09-06 17:40:15.303  INFO 9944 --- [     demo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=demo] Setting newly assigned partitions [topic.quick.demo-0]




結(jié)束

SpringBoot2.0已經(jīng)提供了Kafka的自動配置,可以在application.properties文件中配置,別問我為什么要寫一堆代碼來創(chuàng)建這些工廠,相對于properties方式我更喜歡java Config方法創(chuàng)建這些配置,因為很直觀,雖然是有點麻煩。


更多文章請關(guān)注該 Spring-Kafka史上最強入門教程 專題

博主常駐地~ http://blog.seasedge.cn/archives/9.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容