自動創(chuàng)建topic
調用kafkaTemplate的send,發(fā)去的topic如果沒有,會自動創(chuàng)建,默認是一個partition、一個replicas的。
Topic: llc2021 PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: llc2021 Partition: 0 Leader: 3 Replicas: 3 Isr: 3
consumer端的auto-offset-reset
earliest:如果topic中本來就含有消息,即有offset,啟動消費者它會從頭去消費。
latest:如果topic中本來就含有消息,即有offset,但啟動消費者它并不會去消費,它只消費最新的。
@KafkaListener詳解
id 監(jiān)聽器的id
①. 消費者線程命名規(guī)則
有:Thread[haha-2-C-1,5,main],絕頂線程名前綴名字
無:Thread[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1,5,main]
②.在相同容器中的監(jiān)聽器ID不能重復
否則會報錯
Caused by: java.lang.IllegalStateException: Another endpoint is already registered with id
③.會覆蓋消費者工廠的消費組GroupId
假如配置文件屬性配置了消費組kafka.consumer.group-id=BASE-DEMO
正常情況它是該容器中的默認消費組
但是如果設置了 @KafkaListener(id = "consumer-id7", topics = {"SHI_TOPIC3"})
那么當前消費者的消費組就是consumer-id7 ;
當然如果你不想要他作為groupId的話 可以設置屬性idIsGroup = false;那么還是會使用默認的GroupId;
④. 如果配置了屬性groupId,則其優(yōu)先級最高
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3",groupId = "groupId-test"),最終這個消費者的消費組GroupId是 “groupId-test”
groupId 消費組名
指定該消費組的消費組名; 關于消費組名的配置可以看看上面的 id 監(jiān)聽器的id。
topics 指定要監(jiān)聽哪些topic(與topicPattern、topicPartitions 三選一)
可以同時監(jiān)聽多個
topics = {"SHI_TOPIC3","SHI_TOPIC4"}
topicPattern 匹配Topic進行監(jiān)聽(與topics、topicPartitions 三選一)
topicPartitions 顯式分區(qū)分配
可以為監(jiān)聽器配置明確的主題和分區(qū)(以及可選的初始偏移量)
@KafkaListener(id = "thing2", topicPartitions =
{ @TopicPartition(topic = "topic1", partitions = { "0", "1" }),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
})
public void listen(ConsumerRecord<?, ?> record) {
...
}
上面例子意思是 監(jiān)聽topic1的0,1分區(qū);監(jiān)聽topic2的第0分區(qū),并且第1分區(qū)從offset為100的開始消費;
errorHandler 異常處理
實現KafkaListenerErrorHandler; 然后做一些異常處理;
@Component
public class KafkaDefaultListenerErrorHandler implements KafkaListenerErrorHandler {
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
return null;
}
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) {
//do someting
return null;
}
}
調用的時候 填寫beanName;例如errorHandler="kafkaDefaultListenerErrorHandler"
containerFactory 監(jiān)聽器工廠
指定生成監(jiān)聽器的工廠類;
批量消費的工廠類
/**
* 監(jiān)聽器工廠 批量消費
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
//設置為批量消費,每個批次數量在Kafka配置參數中設置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
使用containerFactory = "batchFactory"
clientIdPrefix 客戶端前綴
會覆蓋消費者工廠的kafka.consumer.client-id屬性; 最為前綴后面接 -n n是數字,如果沒有,就用consumer-GROUPD_ID-數字
有:[Consumer clientId=orion-2, groupId=sadan] Adding newly assigned partitions: test-3
無 [Consumer clientId=consumer-sadan_fake-3, groupId=sadan_fake] Adding newly assigned partitions: test-3
concurrency并發(fā)數
會覆蓋消費者工廠中的concurrency ,這里的并發(fā)數就是多線程消費; 比如說單機情況下,你設置了3; 相當于就是啟動了3個客戶端,即3個消費者,來分配消費分區(qū);
/**
* 監(jiān)聽器工廠
* @return
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> concurrencyFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(kafkaConsumerFactory());
factory.setConcurrency(6);
return factory;
}
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1)
雖然使用的工廠是concurrencyFactory(concurrency配置了6); 但是他最終生成的監(jiān)聽器數量 是1;
- 這里注意一點:
項目中總的消費者線程數量為: concurrency * @KafkaListener的數量(默認監(jiān)聽全部的partition)
- 當concurrency < partition 的數量,會出現消費不均的情況,一個消費者的線程可能消費多個partition 的數據
- 當concurrency = partition 的數量,最佳狀態(tài),一個消費者的線程消費一個 partition 的數據
- 當concurrency > partition 的數量,會出現有的消費者的線程沒有可消費的partition, 造成資源的浪費
properties 配置其他屬性
kafka中的屬性看org.apache.kafka.clients.consumer.ConsumerConfig ;
同名的都可以修改掉;
用法
@KafkaListener(id = "consumer-id5",idIsGroup = false,topics = "SHI_TOPIC3", containerFactory = "concurrencyFactory",concurrency = "1"
, clientIdPrefix = "myClientId5",groupId = "groupId-test",
properties = {
"enable.auto.commit:false","max.poll.interval.ms:6000" },errorHandler="kafkaDefaultListenerErrorHandler")
關于concurrency的配置
單機環(huán)境:
針對監(jiān)聽單個topic,concurrency一般和partition的數目一致即可,即有多少個partition對應多少個消費者線程,最大程度高效,如果不等就按照合適的分區(qū)策略(RoundRobinAssignor 和 RangeAssignor)去給個別消費者多一些partition來消費,負擔重一點,如果線程過多,會浪費資源,有的消費者沒有分區(qū)消費。
監(jiān)聽一個topic(llc2021)有4個分區(qū),3個消費者
@KafkaListener(id = "haha",groupId = "sadan",topics = "llc2021",clientIdPrefix = "orion",concurrency = "3")
orion-0-6337a10a-5d00-47c2-8c47-c2f79fcd2d9d=Assignment(partitions=[llc2021-0, llc2021-1]),
orion-1-dbc3ad53-deac-47cb-aa18-587f4bf1d884=Assignment(partitions=[llc2021-2]),
orion-2-66fb68a5-4fda-4a9c-8201-20eaf5b617d6=Assignment(partitions=[llc2021-3])
監(jiān)聽2個topic,一個(llc2021)4個分區(qū),一個(llc2022)3個分區(qū),5個消費者
@KafkaListener(groupId = "sadan_fake",topics = {"llc2021","llc2022"},concurrency = "5")
consumer-sadan_fake-5-1b672bd3-e75f-43ee-9828-b60ef7a006cf=Assignment(partitions=[]),
consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969=Assignment(partitions=[llc2021-1, llc2022-1]),
consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890=Assignment(partitions=[llc2021-2, llc2022-2]),
consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036=Assignment(partitions=[llc2021-3]),
consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068=Assignment(partitions=[llc2021-0, llc2022-0])}
查看消費者情況
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 0 0 0 0 orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106 orion-0
sadan llc2021 1 0 0 0 orion-0-4399d21b-5ca7-4702-9d34-62db1d6957da /192.168.1.106 orion-0
sadan llc2021 2 1 1 0 orion-1-7867333b-dd71-49e9-97af-054d93b2a183 /192.168.1.106 orion-1
sadan llc2021 3 4 4 0 orion-2-fc7be1ef-f620-4e17-b48c-96c85986e5ee /192.168.1.106 orion-2
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan_fake --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan_fake llc2021 1 0 0 0 consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106 consumer-sadan_fake-2
sadan_fake llc2022 1 - 0 - consumer-sadan_fake-2-99e0c292-fea8-4bc7-9c0c-325332c46969 /192.168.1.106 consumer-sadan_fake-2
sadan_fake llc2022 0 - 0 - consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106 consumer-sadan_fake-1
sadan_fake llc2021 0 0 0 0 consumer-sadan_fake-1-489f4e77-eada-4782-8543-7fa0ba06c068 /192.168.1.106 consumer-sadan_fake-1
sadan_fake llc2022 2 - 0 - consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106 consumer-sadan_fake-3
sadan_fake llc2021 2 1 1 0 consumer-sadan_fake-3-f0eb1b2c-fd6b-4f2d-bf3d-937e8772d890 /192.168.1.106 consumer-sadan_fake-3
sadan_fake llc2021 3 4 4 0 consumer-sadan_fake-4-6c961d9d-e955-4356-885d-939161bfd036 /192.168.1.106 consumer-sadan_fake-4
集群環(huán)境(4個分區(qū),每個node的concurrency是3)
多個節(jié)點監(jiān)聽同一個topic,節(jié)點先后啟動就會開啟進行重平衡了,如下:
A節(jié)點啟動,可見分區(qū)按照策略已經全部分配給該group的各個消費者
orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[llc2021-3]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[llc2021-2]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-0, llc2021-1])
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 0 0 0 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 1 0 0 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 3 5 5 0 orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3 /192.168.1.106 orion-2
sadan llc2021 2 2 2 0 orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05 /192.168.1.106 orion-1
B節(jié)點啟動后,應為消費者增多了,發(fā)生了重平衡:
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-1, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-1, groupId=sadan] Revoke previously assigned partitions llc2021-2
INFO 24168 --- [ haha-1-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-2]
INFO 24168 --- [ haha-1-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-1, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-0, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-0, groupId=sadan] Revoke previously assigned partitions llc2021-1, llc2021-0
INFO 24168 --- [ haha-0-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-1, llc2021-0]
INFO 24168 --- [ haha-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-0, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-2, groupId=sadan] Attempt to heartbeat failed since group is rebalancing
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-2, groupId=sadan] Revoke previously assigned partitions llc2021-3
INFO 24168 --- [ haha-2-C-1] o.s.k.l.KafkaMessageListenerContainer : sadan: partitions revoked: [llc2021-3]
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=orion-2, groupId=sadan] (Re-)joining group
INFO 24168 --- [ haha-2-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=orion-2, groupId=sadan] Finished assignment for group at generation 60: {orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]), orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]), consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]), consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]), orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]), consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
//注意finished 分區(qū)
{orion-2-0dd0f850-ee2c-4d1e-b803-ce7cf45c99e3=Assignment(partitions=[]),
orion-1-129ea70f-1d73-49a9-b0a5-8b7053339e05=Assignment(partitions=[]),
consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377=Assignment(partitions=[llc2021-1]),
orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b=Assignment(partitions=[llc2021-0])}
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 1 0 0 0 consumer-sadan-2-2093eff9-21dc-44fa-9d5f-4113ab737377 /192.168.1.106 consumer-sadan-2
sadan llc2021 2 2 2 0 consumer-sadan-3-312b273e-315f-4308-87fe-c8c486bcd55e /192.168.1.106 consumer-sadan-3
sadan llc2021 3 5 5 0 orion-0-8a9c50e7-f15b-40b8-88a7-1e369f9b2c7a /192.168.1.106 orion-0
sadan llc2021 0 0 0 0 consumer-sadan-1-94f91f10-6926-4b21-80c2-54e840e62d6b /192.168.1.106 consumer-sadan-1
因為2個節(jié)點一共6個消費者,有2個消費者是沒有分配到partition去消費的,浪費資源,所以還是要按照分區(qū)策略構想好,再設置正確的消費者線程數,合理的消費者數應該是 partitions/nodeNum=concurrency。
各個node都設置concurrency=2,不浪費消費者,都擁有一個partition
{orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116=Assignment(partitions=[llc2021-2]),
consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1=Assignment(partitions=[llc2021-1]),
orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a=Assignment(partitions=[llc2021-3]),
consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba=Assignment(partitions=[llc2021-0])}
/usr/local/kafka/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group sadan --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 3 5 5 0 orion-1-4aadcb82-f0ef-4dad-b7ed-f4d86d70df8a /192.168.1.106 orion-1
sadan llc2021 2 2 2 0 orion-0-ccd8e8dc-a52c-4b41-8263-9a4f2e82e116 /192.168.1.106 orion-0
sadan llc2021 0 0 0 0 consumer-sadan-1-00dd7348-b854-40a5-8607-b85f8944eaba /192.168.1.106 consumer-sadan-1
sadan llc2021 1 0 0 0 consumer-sadan-2-6abf945b-310e-4cd4-8402-55d4fa7a17b1 /192.168.1.106 consumer-sadan-2
接收消息發(fā)生異常
container設置了全局異常handler,可以獨立創(chuàng)建各自的errorHandler設置并在@KafkaListener中配置。如果handler中拋出了exception,不會提交offset(offset前提已設置手動),用命令可以查看hw落后于leo的。
這里是1個topic,4個分區(qū),2個節(jié)點,其中1個故意設置異常
注意consumerID
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 3 795 801 6 orion-1-1a4f3ef9-6716-4829-ab43-70adbaf3713e /192.168.1.106 orion-1
sadan llc2021 2 790 791 1 orion-0-d6d8f3cf-29fd-4c1a-99d4-d34b617cf434 /192.168.1.106 orion-0
sadan llc2021 1 570 570 0 consumer-sadan-2-b1113c48-9f8c-462a-8bfa-f3a474283c57 /192.168.1.106 consumer-sadan-2
sadan llc2021 0 1089 1089 0 consumer-sadan-1-d98a21da-b9d9-43e6-ab1e-13347ba241e2 /192.168.1.106 consumer-sadan-1
關閉故意設置異常的節(jié)點,會發(fā)生重平衡,所有分區(qū)全都給了另外一個節(jié)點,于是它會去消費落后的message,如果關閉了正常的節(jié)點,當然也是觸發(fā)重平衡,設置異常的節(jié)點會再觸發(fā)一次消費落后的message,只是又一次報錯,而消費不下去而已。
注意consumerID
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
sadan llc2021 1 572 572 0 orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106 orion-1
sadan llc2021 3 801 807 6 orion-1-f2d5eca2-b18f-4274-acc3-af2cd76f600c /192.168.1.106 orion-1
sadan llc2021 0 1090 1090 0 orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106 orion-0
sadan llc2021 2 791 792 1 orion-0-8431e37d-8e46-4f2a-a658-ba84fdbe5b69 /192.168.1.106 orion-0
spring.kafka.listener.concurrency
spring-kafka并行度concurrency在應用集群部署(多個節(jié)點)正確設置,附上Kafka Manager監(jiān)控效果