springboot2集成kafka注意點

自動創(chuàng)建topic

調用kafkaTemplate的send,發(fā)去的topic如果沒有,會自動創(chuàng)建,默認是一個partition、一個replicas的。

Topic: llc2021  PartitionCount: 1   ReplicationFactor: 1    Configs: 
    Topic: llc2021  Partition: 0    Leader: 3   Replicas: 3 Isr: 3
consumer端的auto-offset-reset

earliest:如果topic中本來就含有消息,即有offset,啟動消費者它會從頭去消費。

latest:如果topic中本來就含有消息,即有offset,但啟動消費者它并不會去消費,它只消費最新的。

@KafkaListener詳解
id 監(jiān)聽器的id

①. 消費者線程命名規(guī)則
有:Thread[haha-2-C-1,5,main],絕頂線程名前綴名字
無:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1,5,main]

②.在相同容器中的監(jiān)聽器ID不能重復
否則會報錯

Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id

③.會覆蓋消費者工廠的消費組GroupId
假如配置文件屬性配置了消費組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認消費組
但是如果設置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當前消費者的消費組就是consumer-id7 ;

當然如果你不想要他作為groupId的話 可以設置屬性idIsGroup = false;那么還是會使用默認的GroupId;

④. 如果配置了屬性groupId,則其優(yōu)先級最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test"),最終這個消費者的消費組GroupId是 “groupId-test”

groupId 消費組名

指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監(jiān)聽器的id。

topics 指定要監(jiān)聽哪些topic(與topicPattern、topicPartitions 三選一)

可以同時監(jiān)聽多個
topics = {"SHI_TOPIC3","SHI_TOPIC4"}

topicPattern 匹配Topic進行監(jiān)聽(與topics、topicPartitions 三選一)
topicPartitions 顯式分區(qū)分配
可以為監(jiān)聽器配置明確的主題和分區(qū)(以及可選的初始偏移量)

@KafkaListener(id = "thing2", topicPartitions =
        { @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
          @TopicPartition(topic = "topic2", partitions = "0",
             partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
        })
public void listen(ConsumerRecord<?, ?> record) {
    ...
}

上面例子意思是 監(jiān)聽topic1的0,1分區(qū);監(jiān)聽topic2的第0分區(qū),并且第1分區(qū)從offset為100的開始消費;

errorHandler 異常處理

實現KafkaListenerErrorHandler; 然后做一些異常處理;

@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        return null;
    }

    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
        //do someting
        return null;
    }
}

調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"

containerFactory 監(jiān)聽器工廠

指定生成監(jiān)聽器的工廠類;

批量消費的工廠類

    /**
     * 監(jiān)聽器工廠 批量消費
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        //設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        return factory;
    }

使用containerFactory = "batchFactory"

clientIdPrefix 客戶端前綴

會覆蓋消費者工廠的kafka.consumer.client-id屬性; 最為前綴后面接 -n n是數字,如果沒有,就用consumer-GROUPD_ID-數字

有:[Consumer clientId=orion-2, groupId=sadan] Adding newly assigned partitions: test-3

無 [Consumer clientId=consumer-sadan_fake-3, groupId=sadan_fake] Adding newly assigned partitions: test-3

concurrency并發(fā)數

會覆蓋消費者工廠中的concurrency ,這里的并發(fā)數就是多線程消費; 比如說單機情況下,你設置了3; 相當于就是啟動了3個客戶端,即3個消費者,來分配消費分區(qū);

    /**
     * 監(jiān)聽器工廠 
     * @return
     */
    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(kafkaConsumerFactory());
        factory.setConcurrency(6);
        return factory;
    }

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)

雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監(jiān)聽器數量 是1;

  • 這里注意一點:

項目中總的消費者線程數量為: concurrency * @KafkaListener的數量(默認監(jiān)聽全部的partition)

  1. 當concurrency < partition 的數量,會出現消費不均的情況,一個消費者的線程可能消費多個partition 的數據
  2. 當concurrency = partition 的數量,最佳狀態(tài),一個消費者的線程消費一個 partition 的數據
  3. 當concurrency > partition 的數量,會出現有的消費者的線程沒有可消費的partition, 造成資源的浪費
properties 配置其他屬性

kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;

用法

    @KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
            , clientIdPrefix = "myClientId5",groupId = "groupId-test",
            properties = {
                    "enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")

關于concurrency的配置

單機環(huán)境:

針對監(jiān)聽單個topic,concurrency一般和partition的數目一致即可,即有多少個partition對應多少個消費者線程,最大程度高效,如果不等就按照合適的分區(qū)策略(RoundRobinAssignor 和 RangeAssignor)去給個別消費者多一些partition來消費,負擔重一點,如果線程過多,會浪費資源,有的消費者沒有分區(qū)消費。

監(jiān)聽一個topic(llc2021)有4個分區(qū),3個消費者
@KafkaListener(id = "haha",groupId = "sadan",topics = "llc2021",clientIdPrefix = "orion",concurrency = "3")
 
orion-0-6337a10a-5d00-47c2-8c47-c2f79fcd2d9d=Assignment(partitions=[llc2021-0, llc2021-1]),
orion-1-dbc3ad53-deac-47cb-aa18-587f4bf1d884=Assignment(partitions=[llc2021-2]), 
orion-2-66fb68a5-4fda-4a9c-8201-20eaf5b617d6=Assignment(partitions=[llc2021-3])

監(jiān)聽2個topic,一個(llc2021)4個分區(qū),一個(llc2022)3個分區(qū),5個消費者
@KafkaListener(groupId = "sadan_fake",topics = {"llc2021","llc2022"},concurrency = "5")

consumer-sadan_fake-5-1b672bd3-e75f-43ee-9828-b60ef7a006cf=Assignment(partitions=[]), 
consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969=Assignment(partitions=[llc2021-1, llc2022-1]),
consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890=Assignment(partitions=[llc2021-2, llc2022-2]),
consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036=Assignment(partitions=[llc2021-3]), 
consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068=Assignment(partitions=[llc2021-0, llc2022-0])}

查看消費者情況
/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         0          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
sadan           llc2021         1          0               0               0               orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106  orion-0
sadan           llc2021         2          1               1               0               orion-1-7867333b-dd71-49e9-97af-054d93b2a183 /192.168.1.106  orion-1
sadan           llc2021         3          4               4               0               orion-2-fc7be1ef-f620-4e17-b48c-96c85986e5ee /192.168.1.106  orion-2


/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan_fake --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                HOST            CLIENT-ID
sadan_fake      llc2021         1          0               0               0               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
sadan_fake      llc2022         1          -               0               -               consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106  consumer-sadan_fake-2
sadan_fake      llc2022         0          -               0               -               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
sadan_fake      llc2021         0          0               0               0               consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106  consumer-sadan_fake-1
sadan_fake      llc2022         2          -               0               -               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
sadan_fake      llc2021         2          1               1               0               consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106  consumer-sadan_fake-3
sadan_fake      llc2021         3          4               4               0               consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036 /192.168.1.106  consumer-sadan_fake-4

集群環(huán)境(4個分區(qū),每個node的concurrency是3)

多個節(jié)點監(jiān)聽同一個topic,節(jié)點先后啟動就會開啟進行重平衡了,如下:
A節(jié)點啟動,可見分區(qū)按照策略已經全部分配給該group的各個消費者

orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[llc2021-3]), 
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[llc2021-2]), 
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-0, llc2021-1])

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         0          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
sadan           llc2021         1          0               0               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106  orion-0
sadan           llc2021         3          5               5               0               orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3 /192.168.1.106  orion-2
sadan           llc2021         2          2               2               0               orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05 /192.168.1.106  orion-1

B節(jié)點啟動后,應為消費者增多了,發(fā)生了重平衡:

INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-1, groupId=sadan] Revoke previously assigned partitions llc2021-2
INFO 24168 --- [     haha-1-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-2]
INFO 24168 --- [     haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-1, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-0, groupId=sadan] Revoke previously assigned partitions llc2021-1, llc2021-0
INFO 24168 --- [     haha-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-1, llc2021-0]
INFO 24168 --- [     haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-0, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Revoke previously assigned partitions llc2021-3
INFO 24168 --- [     haha-2-C-1] o.s.k.l.KafkaMessageListenerContainer    : sadan: partitions revoked: [llc2021-3]
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=orion-2, groupId=sadan] (Re-)joining group
INFO 24168 --- [     haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=orion-2, groupId=sadan] Finished assignment for group at generation 60: {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]), orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]), consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]), orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]), consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}

//注意finished 分區(qū)
{orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), 
consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         1          0               0               0               consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377 /192.168.1.106  consumer-sadan-2
sadan           llc2021         2          2               2               0               consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e /192.168.1.106  consumer-sadan-3
sadan           llc2021         3          5               5               0               orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a          /192.168.1.106  orion-0
sadan           llc2021         0          0               0               0               consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b /192.168.1.106  consumer-sadan-1

因為2個節(jié)點一共6個消費者,有2個消費者是沒有分配到partition去消費的,浪費資源,所以還是要按照分區(qū)策略構想好,再設置正確的消費者線程數,合理的消費者數應該是 partitions/nodeNum=concurrency

各個node都設置concurrency=2,不浪費消費者,都擁有一個partition

{orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1=Assignment(partitions=[llc2021-1]),
orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a=Assignment(partitions=[llc2021-3]), 
consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba=Assignment(partitions=[llc2021-0])}

/usr/local/kafka/bin/kafka-consumer-groups.sh  --bootstrap-server localhost:9092 --group sadan --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         3          5               5               0               orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a          /192.168.1.106  orion-1
sadan           llc2021         2          2               2               0               orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116          /192.168.1.106  orion-0
sadan           llc2021         0          0               0               0               consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba /192.168.1.106  consumer-sadan-1
sadan           llc2021         1          0               0               0               consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1 /192.168.1.106  consumer-sadan-2

接收消息發(fā)生異常

container設置了全局異常handler,可以獨立創(chuàng)建各自的errorHandler設置并在@KafkaListener中配置。如果handler中拋出了exception,不會提交offset(offset前提已設置手動),用命令可以查看hw落后于leo的。

這里是1個topic,4個分區(qū),2個節(jié)點,其中1個故意設置異常

注意consumerID
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID
sadan           llc2021         3          795             801             6               orion-1-1a4f3ef9-6716-4829-ab43-70adbaf3713e          /192.168.1.106  orion-1
sadan           llc2021         2          790             791             1               orion-0-d6d8f3cf-29fd-4c1a-99d4-d34b617cf434          /192.168.1.106  orion-0
sadan           llc2021         1          570             570             0               consumer-sadan-2-b1113c48-9f8c-462a-8bfa-f3a474283c57 /192.168.1.106  consumer-sadan-2
sadan           llc2021         0          1089            1089            0               consumer-sadan-1-d98a21da-b9d9-43e6-ab1e-13347ba241e2 /192.168.1.106  consumer-sadan-1

關閉故意設置異常的節(jié)點,會發(fā)生重平衡,所有分區(qū)全都給了另外一個節(jié)點,于是它會去消費落后的message,如果關閉了正常的節(jié)點,當然也是觸發(fā)重平衡,設置異常的節(jié)點會再觸發(fā)一次消費落后的message,只是又一次報錯,而消費不下去而已。

注意consumerID
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                  HOST            CLIENT-ID
sadan           llc2021         1          572             572             0               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
sadan           llc2021         3          801             807             6               orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106  orion-1
sadan           llc2021         0          1090            1090            0               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0
sadan           llc2021         2          791             792             1               orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106  orion-0

參考:
@KafkaListener詳解與使用

spring.kafka.listener.concurrency

spring-kafka并行度concurrency在應用集群部署(多個節(jié)點)正確設置,附上Kafka Manager監(jiān)控效果

@KafkaListener詳解

kafka的一些參數說明

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容