1.生產(chǎn)者
1.1 架構(gòu)
當(dāng)需要往broker發(fā)送消息時(shí),則需要?jiǎng)?chuàng)建一個(gè)或者多個(gè)生產(chǎn)者往broker發(fā)布消息,雖然借助SpringBoot往broker里面發(fā)送消息的API比較簡(jiǎn)單
如果借助
SprinBoot發(fā)送消息,后面章節(jié)會(huì)闡述到
但是很多時(shí)候不同的業(yè)務(wù)場(chǎng)景會(huì)出現(xiàn)不同的問(wèn)題,例如:
- 允不允許消息重復(fù)
- 允不允許消息延遲
- 允不允許消息的丟失
不同的場(chǎng)景下,光是知道API的使用肯定是滿(mǎn)足不了的,因此在使用API之前還需了解發(fā)送消息的原理。
發(fā)送消息原理圖如下:

關(guān)于上述原理圖解釋如下:
生產(chǎn)者源源不斷生產(chǎn)消息,一般消息為
key-value形式,當(dāng)然也可以不指定key消息需要不能直接發(fā)送給
broker,而是需要經(jīng)過(guò)序列化成為一段字節(jié)序列才可以傳輸-
序列完成需要經(jīng)過(guò)分區(qū)器,分區(qū)器會(huì)根據(jù)分區(qū)分配策略去決定這個(gè)消息發(fā)往哪個(gè)分區(qū)
分區(qū)策略分為兩種情況,消息有key值和消息沒(méi)有key值
- 消息有key值時(shí),會(huì)根據(jù)key值得
hash值然后對(duì)分區(qū)數(shù)進(jìn)行取模決定消息發(fā)送給哪個(gè)partition - 消息沒(méi)有key值,會(huì)隨機(jī)發(fā)送給某個(gè)分區(qū)(不同的版本,策略不一樣,有的時(shí)輪詢(xún),有的隨機(jī),有的則是一段時(shí)間內(nèi)只發(fā)送給某個(gè)分區(qū),隔了一段時(shí)間發(fā)送給另外一個(gè)分區(qū))
- 消息有key值時(shí),會(huì)根據(jù)key值得
-
分區(qū)數(shù)確定以后真正發(fā)送具體的
broker上,broker的leader會(huì)把消息寫(xiě)入文件中寫(xiě)入成功則發(fā)送元數(shù)據(jù)給生產(chǎn)者,如果失敗則根據(jù)配置是否重試機(jī)制進(jìn)行重試
1.2 topic
當(dāng)消息發(fā)送到topic時(shí),其實(shí)消息是發(fā)送到topic的partition上,而在物理上一個(gè)partition就是對(duì)應(yīng)的就是一個(gè)目錄
例如:在kafka-eagle上創(chuàng)建wangzh的topic,且分區(qū)數(shù)為3,副本數(shù)為 1,如下:

查看該topic詳情可知,三個(gè)分區(qū)其中131上的partition-0為leader,其他的如下:

同時(shí)取查看131機(jī)器上的數(shù)據(jù)/var/data/kafka(這個(gè)目錄是當(dāng)時(shí)安裝時(shí)指定的數(shù)據(jù)存儲(chǔ)目錄)

所以一個(gè)分區(qū)在物理上對(duì)應(yīng)的就是一個(gè)目錄
1.3 存儲(chǔ)
當(dāng)發(fā)送消息時(shí)到topic的partitions上,分區(qū)會(huì)消息寫(xiě)入segment文件上,一個(gè)partitions由多個(gè)segment文件組成,如下:

每個(gè)segment文件默認(rèn)存儲(chǔ)數(shù)據(jù)大小為1G,當(dāng)然也可以通過(guò)修改kafka參數(shù)調(diào)整
# 單個(gè)segment存儲(chǔ)數(shù)據(jù)大小
log.segment.bytes=具體內(nèi)容
# 當(dāng)超過(guò)一定的時(shí)間(默認(rèn)七天),寫(xiě)入segment文件的數(shù)據(jù)還沒(méi)有達(dá)到1G(默認(rèn)大小)
# 也會(huì)重新創(chuàng)建新的segment文件
log.segment.ms=時(shí)間
從上圖中看出,第一個(gè)segment文件的偏移量一定是從0開(kāi)始的,而下一個(gè)segment文件則是從上個(gè)segment文件偏移量開(kāi)始的
同時(shí)segment文件分為.index和.log文件,如下:

其中.log用來(lái)存儲(chǔ)真正的數(shù)據(jù),.index是索引文件
假如如果想要消費(fèi)偏移量為197的文件,如果沒(méi)有索引則需要從頭到位去尋找,而有了索引文件就完全可以提高查詢(xún)速度
其中前面一大串代表文件名,第一個(gè)segment文件肯定是從0開(kāi)始,第二個(gè)segment文件命名則是以上個(gè)文件偏移量+1命名,如下:
第一個(gè)segment文件命名
0000000000000000.index
0000000000000000.log當(dāng)上一個(gè)文件偏移量為
1679898是,那么下個(gè)segment文件命名為
00000000001679899.index
00000000001679899.log以此類(lèi)推
1.4 發(fā)送
經(jīng)過(guò)上面的消息,已經(jīng)知道生產(chǎn)者發(fā)送原理,接下來(lái)就借助SpringBoot往broker發(fā)送消息。如下:
1.4.1 創(chuàng)建
先創(chuàng)建springboot項(xiàng)目kafka-springboot-test,并且導(dǎo)入kafka依賴(lài),其pom.xml內(nèi)容如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.10.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>demo</name>
<description>Demo project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<excludes>
<exclude>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</build>
</project>
在application.yml增加以下配置
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
# false 代表關(guān)閉此功能
missing-topics-fatal: false
producer:
# 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
# 后面會(huì)講述如何自定義序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
value-serializer: org.apache.kafka.common.serialization.StringSerializer
1.4.2 發(fā)布
當(dāng)配置完成即可發(fā)布消息,發(fā)消息先創(chuàng)建topic,上文中已經(jīng)創(chuàng)建了test_topic這里就不再創(chuàng)建了
發(fā)布消息則是借助org.springframework.kafka.core.KafkaTemplate發(fā)布消息,直接注入即可,代碼如下:
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
@SpringBootTest
@Slf4j
class DemoApplicationTests {
/**
* 第一個(gè)泛型為 key值的數(shù)據(jù)類(lèi)型
* 第二個(gè)泛型為 value值的數(shù)據(jù)類(lèi)型
*/
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Test
void contextLoads() throws Exception {
ListenableFuture<SendResult<String, String>> resultListenableFuture =
kafkaTemplate.send("wangzh", "test-key", "test-topic");
log.info("元數(shù)據(jù)信息:" + resultListenableFuture.get());
log.info("發(fā)送消息完畢");
// 關(guān)閉連接
kafkaTemplate.destroy();
}
}
執(zhí)行代碼成功后,就在kafka-eagle看到消息發(fā)送結(jié)果,如下

當(dāng)然也可以指定
partition發(fā)送消息
1.4.3 acks
acks 參數(shù) 規(guī)則定了必須須要有多少分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫(xiě)入是成功的 這個(gè)參數(shù)對(duì)消息丟失 可能性有重要影響,目前該參數(shù)配置如下:
acks = 0
生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)
意味著生產(chǎn)者不知道消息有沒(méi)有把消息發(fā)送到
broker,只要生產(chǎn)者將消息添加到Socket緩沖區(qū)就認(rèn)為消息發(fā)送成功,不需要等待服務(wù)器 的響應(yīng)。因此這種方式也可以支持很高的吞吐量
acks=1
只要集群的
leader節(jié)點(diǎn)收到消息并寫(xiě)入到segment文件,生產(chǎn)者就會(huì)收到來(lái)自服務(wù)器的成功響應(yīng),視為發(fā)送成功假如
leader數(shù)據(jù)寫(xiě)入成功,然后宕機(jī),此時(shí)所有的副本還沒(méi)來(lái)的及同步數(shù)據(jù),那么剛寫(xiě)入的數(shù)據(jù)就會(huì)丟失
acks=all
集群的
leader收到消息并寫(xiě)入到segment中,同時(shí)等待所有的副本同步消息成功后才認(rèn)為消息發(fā)送成功這種模式是最安全的,及時(shí)有的
leader發(fā)生奔潰,那還是可以重新選舉leader進(jìn)行通信
在配置文件的producer里面設(shè)置acks即可

2.消費(fèi)者
2.1 架構(gòu)
消費(fèi)者如果訂閱了某個(gè)主題消息,那么就可以去進(jìn)行消費(fèi),同時(shí)一個(gè)消費(fèi)者屬于一個(gè)消費(fèi)組,一個(gè)消費(fèi)組里面所有的消費(fèi)者都訂閱同一個(gè)主題.如下:

當(dāng)消費(fèi)組里面只有一個(gè)消費(fèi)者時(shí),那么這個(gè)消費(fèi)就回去消費(fèi)所有分區(qū)的消息,當(dāng)然一般開(kāi)發(fā)也就足夠了。
但有時(shí)候生產(chǎn)者生產(chǎn)消息過(guò)快,而消費(fèi)者消費(fèi)消息過(guò)慢,就會(huì)很容易導(dǎo)致消息堆積,從而阻塞,那么就可以在消費(fèi)者組里面多增加幾個(gè)消費(fèi)者,如下:

注意:同一組的消費(fèi)者是不會(huì)消費(fèi)同一主題的同一分區(qū)消息
當(dāng)然如果消費(fèi)者的數(shù)量超過(guò)了分區(qū)數(shù),那么超過(guò)的消費(fèi)者就會(huì)處于空閑狀態(tài)

因此不要讓消費(fèi)者的數(shù)量超過(guò)分區(qū)數(shù)
一個(gè)消息只能被一個(gè)組消費(fèi)一次,例如上圖中consumer-1消費(fèi)了消息A,那么其他的消費(fèi)者就不能夠再次消費(fèi)A了
如果在消費(fèi)時(shí),手動(dòng)指定了偏移量,那么就會(huì)重復(fù)消費(fèi)消息,這種情況特殊
當(dāng)然同一個(gè)消息可以被多個(gè)消費(fèi)組進(jìn)行消費(fèi),如下圖所示:

2.2 分配
如下圖,消費(fèi)組中可以增加,當(dāng)增加一個(gè)消費(fèi)者,就會(huì)分?jǐn)傊跋M(fèi)者的消費(fèi)壓力,那么當(dāng)新增一個(gè)消費(fèi)者是如何將分區(qū)分配給消費(fèi)者的呢

當(dāng)消費(fèi)者新增一個(gè)消費(fèi)者時(shí),會(huì)提高消費(fèi)者的高可用和伸縮性,且當(dāng)加入到消費(fèi)組之后就會(huì)
給新增的消費(fèi)者分配一個(gè)partition,這種操作稱(chēng)為再分配
注意:在再分配期間,消費(fèi)者會(huì)暫停消費(fèi)消息,直到分配分區(qū)完成才會(huì)繼續(xù)消費(fèi)消息
且當(dāng)分區(qū)分配給再次分配給某個(gè)消費(fèi)者時(shí),消費(fèi)者的消息可能丟失讀取狀態(tài)
同理當(dāng)consumer-2消費(fèi)者退出消費(fèi)者組時(shí),那么partition-2就會(huì)分配到consumer-1,讓他去進(jìn)行消費(fèi)
那么kafka是如何知道消費(fèi)組里面需要再分配呢?這主要是借助于組協(xié)調(diào)器,每個(gè)消費(fèi)組都會(huì)由屬于自己的組協(xié)調(diào)器。
每隔消費(fèi)者都會(huì)發(fā)送心跳到協(xié)調(diào)器,用來(lái)維護(hù)群組關(guān)系和分區(qū)關(guān)系,如下圖所示:

這樣kafka就知道了每個(gè)消費(fèi)者屬于哪個(gè)消費(fèi)組,以及如何去分配partition
協(xié)調(diào)器就類(lèi)似于
spring cloud里面的注冊(cè)中心
當(dāng)消費(fèi)者因?yàn)槟承┮蛩赝蝗煌V瓜M(fèi),也就是說(shuō)協(xié)調(diào)器收不到消費(fèi)者的心跳,那么協(xié)調(diào)器會(huì)等待幾秒,幾秒期間還是沒(méi)有收到心跳,那么協(xié)調(diào)器就會(huì)把該消費(fèi)者剔除出組,然后實(shí)現(xiàn)再分配。
2.3 消費(fèi)
這里同樣借助SpringBoot去消費(fèi)消息,消費(fèi)者配置如下:
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
# false 代表關(guān)閉此功能
missing-topics-fatal: false
producer:
# 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
# 后面會(huì)講述如何自定義序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
# 消費(fèi)者組id
group-id: wangzh-group
# 是否允許自動(dòng)提交offset
# 每當(dāng)消費(fèi)者消費(fèi)一個(gè)消息就會(huì)產(chǎn)生一個(gè)偏移量
# 偏移量是消費(fèi)者提交到kafka中,保存在`__consumer_offsets` topic中
enable-auto-commit: true
# 提交偏移量間隔時(shí)間數(shù) 100ms提交一次
auto-commit-interval: 100
# 消費(fèi)消息時(shí)的反序列器
# 消費(fèi)消息時(shí)會(huì)將字節(jié)序列反序列化為字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消費(fèi)消息時(shí)的反序列化器
# 消費(fèi)消息時(shí)會(huì)將字節(jié)序反序列化為字符串
# 如果消息不是字符串時(shí),需要自己寫(xiě)反序列話(huà)器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 偏移量配置
# latest 當(dāng)各個(gè)分區(qū)有已提交的偏移量是,就從提交的偏移量后開(kāi)始消費(fèi),如果沒(méi)有則消費(fèi)該分區(qū)最新產(chǎn)生的數(shù)據(jù)
# none 各個(gè)分區(qū)都提交了偏移量后,才從偏移量后開(kāi)始消費(fèi),只要存在一個(gè)分區(qū)沒(méi)有提交偏移
# 量那么拋出異常
# earlist 當(dāng)各個(gè)分區(qū)有已提交的偏移量時(shí),則從提交的偏移量開(kāi)始消費(fèi),如果沒(méi)有偏移量則
# 從頭開(kāi)始消費(fèi)
auto-offset-reset: latest
消費(fèi)消,利用org.springframework.kafka.annotation.KafkaListener注解即可消費(fèi)消息,如下:
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class Consumer {
/**
* topics 為 topic名字,可以填寫(xiě)多個(gè)topic的名字
* ConsumerRecord 為 消息記錄,包含了一條消息大部分?jǐn)?shù)據(jù)
*/
@KafkaListener(topics = {"wangzh"})
public void consumer(ConsumerRecord<String,String> record) {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
}
}
啟動(dòng)項(xiàng)目即可看到消費(fèi)的消息,如下:

2.4 批量
上次消費(fèi)消息時(shí)一條一條消費(fèi),也就是當(dāng)一條消息消費(fèi)完成,才會(huì)去消費(fèi)下一條,這肯定不大合理,因此在數(shù)據(jù)量大的情況下需要去進(jìn)行批量消費(fèi)
批量消費(fèi)設(shè)置如下:
spring:
kafka:
# kafka集群地址
bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
listener:
# 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
# false 代表關(guān)閉此功能
missing-topics-fatal: false
producer:
# 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
# 后面會(huì)講述如何自定義序列化器
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
# 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
# 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
# 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 設(shè)置為批量消費(fèi),默認(rèn)為單條消費(fèi)
type: batch
consumer:
# 消費(fèi)者組id
group-id: wangzh-group
# 是否允許自動(dòng)提交offset
# 每當(dāng)消費(fèi)者消費(fèi)一個(gè)消息就會(huì)產(chǎn)生一個(gè)偏移量
# 偏移量是消費(fèi)者提交到kafka中,保存在`__consumer_offsets` topic中
enable-auto-commit: true
# 提交偏移量間隔時(shí)間數(shù) 100ms提交一次
auto-commit-interval: 100
# 消費(fèi)消息時(shí)的反序列器
# 消費(fèi)消息時(shí)會(huì)將字節(jié)序列反序列化為字符串
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 消費(fèi)消息時(shí)的反序列化器
# 消費(fèi)消息時(shí)會(huì)將字節(jié)序反序列化為字符串
# 如果消息不是字符串時(shí),需要自己寫(xiě)反序列話(huà)器
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 偏移量配置
# latest 當(dāng)各個(gè)分區(qū)有已提交的偏移量是,就從提交的偏移量后開(kāi)始消費(fèi),如果沒(méi)有則消費(fèi)該分區(qū)最新產(chǎn)生的數(shù)據(jù)
# none 各個(gè)分區(qū)都提交了偏移量后,才從偏移量后開(kāi)始消費(fèi),只要存在一個(gè)分區(qū)沒(méi)有提交偏移
# 量那么拋出異常
# earlist 當(dāng)各個(gè)分區(qū)有已提交的偏移量時(shí),則從提交的偏移量開(kāi)始消費(fèi),如果沒(méi)有偏移量則
# 從頭開(kāi)始消費(fèi)
auto-offset-reset: latest
# 批量消費(fèi)時(shí),最多一次消費(fèi)多少條數(shù)據(jù)
max-poll-records: 1000


同時(shí)還需要修改接受消息的參數(shù),修改如下:
@KafkaListener(topics = {"wangzh"})
public void consumer(List<ConsumerRecord<String,String>> records) {
records.forEach(record -> {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
});
}

2.5 指定
通過(guò)之前的學(xué)習(xí)知道,消費(fèi)者每消費(fèi)一條消息就會(huì)提交一次偏移量,下次消費(fèi)時(shí)從偏移量后面開(kāi)始消費(fèi),這樣保證消息不會(huì)重復(fù)消費(fèi)。
有時(shí)候有一種特殊情況,需要指定偏移量去進(jìn)行消費(fèi),那么之前普通消費(fèi)并不能滿(mǎn)足,因此需要自定義操作
package com.example.demo;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
@Slf4j
public class Consumer {
/**
* 每次消費(fèi) {"0","1","2"} 消息偏移量從1開(kāi)始消費(fèi)
* @param records
*/
@KafkaListener(topicPartitions = {
@TopicPartition(topic = "wangzh",partitions = {"0","1","2"},partitionOffsets = @PartitionOffset(initialOffset = "1",partition = "*"))
})
public void consumer(List<ConsumerRecord<String,String>> records) {
records.forEach(record -> {
log.info("消息key:" + record.key());
log.info("消息value:" + record.value());
log.info("消息偏移量:" + record.offset());
log.info("消息topic" + record.topic());
});
}
}