一、Kafka 核心 API
下圖是官方文檔中的一個圖,形象的描述了能與 Kafka集成的客戶端類型

Kafka的五類客戶端API類型如下:
- AdminClient API:允許管理和檢測Topic、broker以及其他Kafka實例,與Kafka自帶的腳本命令作用類似。
- Producer API:發(fā)布消息到1個或多個Topic,也就是生產(chǎn)者或者說發(fā)布方需要用到的API。
- Consumer API:訂閱1個或多個Topic,并處理產(chǎn)生的消息,也就是消費者或者說訂閱方需要用到的API。
- Stream API:高效地將輸入流轉(zhuǎn)換到輸出流,通常應(yīng)用在一些流處理場景。
- Connector API:從一些源系統(tǒng)或應(yīng)用程序拉取數(shù)據(jù)到Kafka,如上圖中的DB。
本文中,我們將主要介紹 Consumer API。
二、Consumer API
Consumer 消費數(shù)據(jù)時的可靠性是很容易保證的,因為數(shù)據(jù)在 Kafka 中是持久化的,故
不用擔(dān)心數(shù)據(jù)丟失問題。
由于 consumer 在消費過程中可能會出現(xiàn)斷電宕機等故障,consumer 恢復(fù)后,需要從故
障前的位置的繼續(xù)消費,所以 consumer 需要實時記錄自己消費到了哪個 offset,以便故障恢復(fù)后繼續(xù)消費。
所以 offset 的維護是 Consumer 消費數(shù)據(jù)是必須考慮的問題。
2.1、導(dǎo)入相關(guān)依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.6.0</version>
</dependency>
需要用到的類:
- KafkaConsumer:需要創(chuàng)建一個消費者對象,用來消費數(shù)據(jù)。
- ConsumerConfig:獲取所需的一系列配置參數(shù)。
- ConsuemrRecord:每條數(shù)據(jù)都要封裝成一個 ConsumerRecord 對象。
自動提交 offset 的相關(guān)參數(shù):
- enable.auto.commit:是否開啟自動提交 offset 功能
- auto.commit.interval.ms:自動提交 offset 的時間間隔
2.2、自動提交 offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 工作中這種用法有,但是不推薦
*/
public static void helloWorld(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消費者組,只要 group.id 相同,就屬于同一個消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
//自動提交的延遲
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化類
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消費訂閱一個或多個topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
}
}
}
雖然自動提交 offset 十分簡介便利,但由于其是基于時間提交的,開發(fā)人員難以把握
offset 提交的時機。因此 Kafka 還提供了手動提交 offset 的 API。
手動提交 offset 的方法有兩種:分別是 commitSync(同步阻塞提交)和 commitAsync(異步提交)。兩者的相同點是,都會將本次 poll 的一批數(shù)據(jù)最高的偏移量提交;不同點是,commitSync 阻塞當(dāng)前線程,一直到提交成功,并且會自動失敗重試(由不可控因素導(dǎo)致,也會出現(xiàn)提交失?。欢?commitAsync 則沒有失敗重試機制,故有可能提交失敗。
2.3、手動提交offset
- 雖然同步提交 offset 更可靠一些,但是由于其會阻塞當(dāng)前線程,直到提交成功。因此吞
吐量會收到很大的影響。因此更多的情況下,會選用異步提交 offset 的方式。
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手動提交offset
*/
public static void commitedOffset(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消費訂閱一個或多個topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
//消息的消費
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
//throw new RuntimeException("業(yè)務(wù)處理異常");
}
//手動控制offset提交
consumer.commitAsync();
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.4、手動提交offset,并重置offset
重置消費者的offset,該配置生效方式:消費者換組
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手動提交offset,并重置offset
*/
public static void restCommitedOffset(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
//重置消費者的offset,該配置生效方式:消費者換組
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消費訂閱一個或多個topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
for (ConsumerRecord<String, String> record : records) {
//消息的消費
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
//throw new RuntimeException("業(yè)務(wù)處理異常");
}
//手動控制offset提交
consumer.commitAsync();
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.5、手動提交offset,并且手動控制Partition
- 由于同步提交 offset 有失敗重試機制,故更加可靠。
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手動提交offset,并且手動控制Partition
*/
public static void commitedOffsetWithPartition(){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
//消費訂閱一個或多個topic
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
//throw new RuntimeException("業(yè)務(wù)處理異常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//單個partition中的offset,并且進行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//對每個Partition做單獨的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.6、手動提交offset,并且手動控制Partition,更高級
- 手動訂閱某個或某些partition,手動提交offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手動提交offset,并且手動控制Partition,更高級
*/
public static void commitedOffsetWithPartition2(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消費者組,只要 group.id 相同,就屬于同一個消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//關(guān)閉自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自動提交的延遲
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化類
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消費訂閱某個Topic的某個分區(qū)
consumer.assign(Arrays.asList(p0));
//消費訂閱一個或多個topic
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
//throw new RuntimeException("業(yè)務(wù)處理異常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//單個partition中的offset,并且進行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//對每個Partition做單獨的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.7、手動指定Offset的起始位置,及手動提交Offset
private static final String TOPIC_NAME = "yibo_topic";
/**
* 手動指定Offset的起始位置,及手動提交Offset
*/
public static void controlOffset(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消費者組,只要 group.id 相同,就屬于同一個消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//關(guān)閉自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自動提交的延遲
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化類
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
//消費訂閱某個Topic的某個分區(qū)
consumer.assign(Arrays.asList(p0));
while(true){
try {
/**
* 1、人為控制Offset起始位置
* 2、如果出現(xiàn)程序錯誤,重復(fù)消費一次
*/
/**
* 1、第一次從0消費[一般情況]
* 2、比如一次消費了100條,offset置為101并且存入redis中
* 3、每次poll之前從redis中獲取最新的Offset位置
* 4、每次從這個位置開始消費
*/
//手動指定Offset的起始位置
consumer.seek(p0,30);
//每間隔一定時間去拉取消息
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
//throw new RuntimeException("業(yè)務(wù)處理異常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//單個partition中的offset,并且進行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//對每個Partition做單獨的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.8、流量控制 限流
引入guava依賴
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>29.0-jre</version>
</dependency>
private static final String TOPIC_NAME = "yibo_topic";
/*** 令牌生成速率,單位為秒 */
public static final int permitsPerSecond = 1;
/*** 限流器 */
private static final RateLimiter LIMITER = RateLimiter.create(permitsPerSecond);
/**
* 流量控制 限流
*/
public static void controlPause(){
Properties properties = new Properties();
//Kafka 集群
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
//消費者組,只要 group.id 相同,就屬于同一個消費者組
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
//關(guān)閉自動提交 offset
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");
//自動提交的延遲
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
//key,value的反序列化類
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
TopicPartition p0 = new TopicPartition(TOPIC_NAME,0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME,1);
//消費訂閱某個Topic的某個分區(qū)
consumer.assign(Arrays.asList(p0,p1));
//消費訂閱一個或多個topic
// consumer.subscribe(Arrays.asList(TOPIC_NAME));
while(true){
//每間隔一定時間去拉取消息
try {
ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(10000));
//每個partition單獨處理
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> recordList = records.records(partition);
for (ConsumerRecord<String, String> record : recordList) {
/**
* 1、接收到record信息后,去令牌桶中拿取令牌
* 2、如果獲取到令牌,則繼續(xù)業(yè)務(wù)處理
* 3、如果獲取不到令牌,則pause等待令牌
* 4、當(dāng)令牌桶中的令牌足夠,則將consumer置為resume狀態(tài)
*/
// 限流
if (!LIMITER.tryAcquire()) {
System.out.println("無法獲取到令牌,暫停消費");
consumer.pause(Arrays.asList(p0, p1));
}else {
System.out.println("獲取到令牌,恢復(fù)消費");
consumer.resume(Arrays.asList(p0, p1));
}
System.out.printf("patition=%d, offset=%d, key=%s, value=%s%n",
record.partition(),record.offset(),record.key(),record.value());
//業(yè)務(wù)處理異常,則不提交
// throw new RuntimeException("業(yè)務(wù)處理異常");
}
long lastOffset = recordList.get(recordList.size() - 1).offset();
//單個partition中的offset,并且進行提交
Map<TopicPartition, OffsetAndMetadata> offset = new HashMap<>();
offset.put(partition,new OffsetAndMetadata(lastOffset+1));
//對每個Partition做單獨的offset提交
consumer.commitSync(offset);
System.out.println("--------------------- partition - " + partition + "end---------------------");
}
} catch (Exception e) {
log.error("consumer offset error",e);
}
}
}
2.9、Consumer多線程并發(fā)控制
- 經(jīng)典模式,每一個線程單獨創(chuàng)建一個KafkaConsumer,用于保證線程安全。
public class ConsumerThreadSample {
private static final String TOPIC_NAME = "yibo_topic";
/**
* 這種類型是經(jīng)典模式,每一個線程單獨創(chuàng)建一個KafkaConsumer,用于保證線程安全
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
KafkaConsumerRunner r1 = new KafkaConsumerRunner();
Thread t1 = new Thread(r1);
t1.start();
Thread.sleep(15000);
r1.shutdown();
}
public static class KafkaConsumerRunner implements Runnable{
private final AtomicBoolean closed = new AtomicBoolean(false);
private final KafkaConsumer<String,String> consumer;
public KafkaConsumerRunner() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.174.128:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
TopicPartition p0 = new TopicPartition(TOPIC_NAME, 0);
TopicPartition p1 = new TopicPartition(TOPIC_NAME, 1);
consumer.assign(Arrays.asList(p0, p1));
}
@Override
public void run() {
try {
while(!closed.get()){
//處理消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> pRecord = records.records(partition);
// 處理每個分區(qū)的消息
for (ConsumerRecord<String, String> record : pRecord) {
System.out.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
// 返回去告訴kafka新的offset
long lastOffset = pRecord.get(pRecord.size() - 1).offset();
// 注意加1
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} catch (Exception e) {
if(!closed.get()) {
throw e;
}
} finally {
consumer.close();
}
}
public void shutdown() {
closed.set(true);
consumer.wakeup();
}
}
}
2.10、一個Consumer處理數(shù)據(jù),但處理業(yè)務(wù)是多線程并發(fā)控制
- 采用線程池異步處理,此種方式無法手動控制Offset提交,不能保證消息的最終一致性。
public class ConsumerRecordThreadSample {
private static final String TOPIC_NAME = "yibo_topic";
public static void main(String[] args) throws InterruptedException {
String brokerList = "192.168.174.128:9092";
String groupId = "test";
int workerNum = 5;
CunsumerExecutor consumers = new CunsumerExecutor(brokerList, groupId, TOPIC_NAME);
consumers.execute(workerNum);
Thread.sleep(1000000);
consumers.shutdown();
}
// Consumer處理
public static class CunsumerExecutor{
private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
public CunsumerExecutor(String brokerList, String groupId, String topic) {
Properties props = new Properties();
props.put("bootstrap.servers", brokerList);
props.put("group.id", groupId);
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
}
public void execute(int workerNum) {
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(200);
for (final ConsumerRecord record : records) {
executors.submit(new ConsumerRecordWorker(record));
}
}
}
public void shutdown() {
if (consumer != null) {
consumer.close();
}
if (executors != null) {
executors.shutdown();
}
try {
if (!executors.awaitTermination(10, TimeUnit.SECONDS)) {
System.out.println("Timeout.... Ignore for this case");
}
} catch (InterruptedException ignored) {
System.out.println("Other thread interrupted this shutdown, ignore for this case.");
Thread.currentThread().interrupt();
}
}
}
// 記錄處理
public static class ConsumerRecordWorker implements Runnable {
private ConsumerRecord<String, String> record;
public ConsumerRecordWorker(ConsumerRecord record) {
this.record = record;
}
@Override
public void run() {
//具體的業(yè)務(wù)邏輯,比如數(shù)據(jù)入庫操作
System.out.println("Thread - "+ Thread.currentThread().getName());
System.err.printf("patition = %d , offset = %d, key = %s, value = %s%n",
record.partition(), record.offset(), record.key(), record.value());
}
}
}
2.11、數(shù)據(jù)漏消費和重復(fù)消費分析
無論是同步提交還是異步提交 offset,都有可能會造成數(shù)據(jù)的漏消費或者重復(fù)消費。先
提交 offset 后消費,有可能造成數(shù)據(jù)的漏消費;而先消費后提交 offset,有可能會造成數(shù)據(jù)的重復(fù)消費。
2.12、自定義存儲 offset
Kafka 0.9 版本之前,offset 存儲在 zookeeper,0.9 版本及之后,默認(rèn)將 offset 存儲在 Kafka的一個內(nèi)置的 topic 中。除此之外,Kafka 還可以選擇自定義存儲 offset。
offset 的維護是相當(dāng)繁瑣的,因為需要考慮到消費者的 Rebalace。
當(dāng)有新的消費者加入消費者組、已有的消費者推出消費者組或者所訂閱的主題的分區(qū)發(fā)
生變化,就會觸發(fā)到分區(qū)的重新分配,重新分配的過程叫做 Rebalance。
消費者發(fā)生 Rebalance 之后,每個消費者消費的分區(qū)就會發(fā)生變化。因此消費者要首先獲取到自己被重新分配到的分區(qū),并且定位到每個分區(qū)最近提交的 offset 位置繼續(xù)消費。
要實現(xiàn)自定義存儲 offset,需要借助 ConsumerRebalanceListener,以下為示例代碼,其中提交和獲取 offset 的方法,需要根據(jù)所選的 offset 存儲系統(tǒng)自行實現(xiàn)。
public class CustomConsumer {
private static Map<TopicPartition, Long> currentOffset = new HashMap<>();
public static void main(String[] args) {
//創(chuàng)建配置信息
Properties props = new Properties();
//Kafka 集群
props.put("bootstrap.servers","192.168.174.128:9092");
//消費者組,只要 group.id 相同,就屬于同一個消費者組
props.put("group.id", "test");
//關(guān)閉自動提交 offset
props.put("enable.auto.commit", "false");
//Key 和 Value 的反序列化類
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
//創(chuàng)建一個消費者
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//消費者訂閱主題
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
//該方法會在 Rebalance 之前調(diào)用
@Override
public void
onPartitionsRevoked(Collection<TopicPartition> partitions) {
commitOffset(currentOffset);
}
//該方法會在 Rebalance 之后調(diào)用
@Override
public void
onPartitionsAssigned(Collection<TopicPartition> partitions) { currentOffset.clear();
for (TopicPartition partition : partitions) {
consumer.seek(partition, getOffset(partition));//定位到最近提交的 offset 位置繼續(xù)消費
}
}
});
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);//消費者拉取數(shù)據(jù)
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
currentOffset.put(new TopicPartition(record.topic(), record.partition()), record.offset());
}
commitOffset(currentOffset);//異步提交
}
}
//獲取某分區(qū)的最新 offset
private static long getOffset(TopicPartition partition) {
return 0;
}
//提交該消費者所有分區(qū)的 offset
private static void commitOffset(Map<TopicPartition, Long> currentOffset) {
}
}
三、SpringBoot 集成 Kafka
3.1、添加maven依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.6.1</version>
</dependency>
3.2、配置 application.properties
# 指定kafka server的地址,集群配多個,中間,逗號隔開
spring.kafka.bootstrap-servers=192.168.174.128:9092
#=============== provider =======================
# 寫入失敗時,重試次數(shù)。當(dāng)leader節(jié)點失效,一個repli節(jié)點會替代成為leader節(jié)點,此時可能出現(xiàn)寫入失敗,
# 當(dāng)retris為0時,produce不會重復(fù)。retirs重發(fā),此時repli節(jié)點完全成為leader節(jié)點,不會產(chǎn)生消息丟失。
spring.kafka.producer.retries=0
# 每次批量發(fā)送消息的數(shù)量,produce積累到一定數(shù)據(jù),一次發(fā)送
spring.kafka.producer.batch-size=16384
# produce積累數(shù)據(jù)一次發(fā)送,緩存大小達到buffer.memory就發(fā)送數(shù)據(jù)
spring.kafka.producer.buffer-memory=33554432
#procedure要求leader在考慮完成請求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:
#acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會等待來自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無法保證服務(wù)器已收到記錄,并且重試配置將不會生效(因為客戶端通常不會知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。
#acks = 1 這意味著leader會將記錄寫入其本地日志,但無需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會丟失。
#acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個同步副本服務(wù)器仍然存活,記錄就不會丟失,這是最強有力的保證,這相當(dāng)于acks = -1的設(shè)置。
#可以設(shè)置的值為:all, -1, 0, 1
spring.kafka.producer.acks=1
# 指定消息key和消息體的編解碼方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
#=============== consumer =======================
# 指定默認(rèn)消費者group id --> 由于在kafka中,同一組中的consumer不會讀取到同一個消息,依靠groud.id設(shè)置組名
spring.kafka.consumer.group-id=testGroup
# smallest和largest才有效,如果smallest重新0開始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest
spring.kafka.consumer.auto-offset-reset=earliest
# enable.auto.commit:true --> 設(shè)置自動提交offset
spring.kafka.consumer.enable-auto-commit=false
#如果'enable.auto.commit'為true,則消費者偏移自動提交給Kafka的頻率(以毫秒為單位),默認(rèn)值為5000。
spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息體的編解碼方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
#=============== listener =======================
# 在偵聽器容器中運行的線程數(shù)。
spring.kafka.listener.concurrency=5
#listner負(fù)責(zé)ack,每調(diào)用一次,就立即commit
spring.kafka.listener.ack-mode=manual_immediate
spring.kafka.listener.missing-topics-fatal=false
3.3、新建Consumer
@Component
@Slf4j
public class KafkaDemoConsumer {
private static final String TOPIC_NAME = "yibo_topic";
private static final String TOPIC_GROUP1 = "topic_group1";
private static final String TOPIC_GROUP2 = "topic_group2";
@KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP1)
public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test 消費了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
@KafkaListener(topics = TOPIC_NAME, groupId = TOPIC_GROUP2)
public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
Optional message = Optional.ofNullable(record.value());
if (message.isPresent()) {
Object msg = message.get();
log.info("topic_test1 消費了: Topic:" + topic + ",Message:" + msg);
ack.acknowledge();
}
}
}