軟件篇-kafka(over)-研究-深入研究生產(chǎn)消費(fèi)模型

1.生產(chǎn)者

1.1 架構(gòu)

當(dāng)需要往broker發(fā)送消息時(shí),則需要?jiǎng)?chuàng)建一個(gè)或者多個(gè)生產(chǎn)者往broker發(fā)布消息,雖然借助SpringBootbroker里面發(fā)送消息的API比較簡(jiǎn)單

如果借助SprinBoot發(fā)送消息,后面章節(jié)會(huì)闡述到

但是很多時(shí)候不同的業(yè)務(wù)場(chǎng)景會(huì)出現(xiàn)不同的問(wèn)題,例如:

  • 允不允許消息重復(fù)
  • 允不允許消息延遲
  • 允不允許消息的丟失

不同的場(chǎng)景下,光是知道API的使用肯定是滿(mǎn)足不了的,因此在使用API之前還需了解發(fā)送消息的原理。

發(fā)送消息原理圖如下:

image-20210513100249431

關(guān)于上述原理圖解釋如下:

  • 生產(chǎn)者源源不斷生產(chǎn)消息,一般消息為key-value形式,當(dāng)然也可以不指定key

  • 消息需要不能直接發(fā)送給broker,而是需要經(jīng)過(guò)序列化成為一段字節(jié)序列才可以傳輸

  • 序列完成需要經(jīng)過(guò)分區(qū)器,分區(qū)器會(huì)根據(jù)分區(qū)分配策略去決定這個(gè)消息發(fā)往哪個(gè)分區(qū)

    分區(qū)策略分為兩種情況,消息有key值和消息沒(méi)有key值

    • 消息有key值時(shí),會(huì)根據(jù)key值得hash值然后對(duì)分區(qū)數(shù)進(jìn)行取模決定消息發(fā)送給哪個(gè)partition
    • 消息沒(méi)有key值,會(huì)隨機(jī)發(fā)送給某個(gè)分區(qū)(不同的版本,策略不一樣,有的時(shí)輪詢(xún),有的隨機(jī),有的則是一段時(shí)間內(nèi)只發(fā)送給某個(gè)分區(qū),隔了一段時(shí)間發(fā)送給另外一個(gè)分區(qū))
  • 分區(qū)數(shù)確定以后真正發(fā)送具體的broker上,brokerleader會(huì)把消息寫(xiě)入文件中

    寫(xiě)入成功則發(fā)送元數(shù)據(jù)給生產(chǎn)者,如果失敗則根據(jù)配置是否重試機(jī)制進(jìn)行重試

1.2 topic

當(dāng)消息發(fā)送到topic時(shí),其實(shí)消息是發(fā)送到topicpartition上,而在物理上一個(gè)partition就是對(duì)應(yīng)的就是一個(gè)目錄

例如:在kafka-eagle上創(chuàng)建wangzhtopic,且分區(qū)數(shù)為3,副本數(shù)為 1,如下:

image-20210513102003513

查看該topic詳情可知,三個(gè)分區(qū)其中131上的partition-0leader,其他的如下:

image-20210513102046056

同時(shí)取查看131機(jī)器上的數(shù)據(jù)/var/data/kafka(這個(gè)目錄是當(dāng)時(shí)安裝時(shí)指定的數(shù)據(jù)存儲(chǔ)目錄)

image-20210513102509413

所以一個(gè)分區(qū)在物理上對(duì)應(yīng)的就是一個(gè)目錄

1.3 存儲(chǔ)

當(dāng)發(fā)送消息時(shí)到topicpartitions上,分區(qū)會(huì)消息寫(xiě)入segment文件上,一個(gè)partitions由多個(gè)segment文件組成,如下:

image-20210513110017402

每個(gè)segment文件默認(rèn)存儲(chǔ)數(shù)據(jù)大小為1G,當(dāng)然也可以通過(guò)修改kafka參數(shù)調(diào)整

# 單個(gè)segment存儲(chǔ)數(shù)據(jù)大小
log.segment.bytes=具體內(nèi)容

# 當(dāng)超過(guò)一定的時(shí)間(默認(rèn)七天),寫(xiě)入segment文件的數(shù)據(jù)還沒(méi)有達(dá)到1G(默認(rèn)大小)
# 也會(huì)重新創(chuàng)建新的segment文件
log.segment.ms=時(shí)間

從上圖中看出,第一個(gè)segment文件的偏移量一定是從0開(kāi)始的,而下一個(gè)segment文件則是從上個(gè)segment文件偏移量開(kāi)始的


同時(shí)segment文件分為.index.log文件,如下:

image-20210513110632406

其中.log用來(lái)存儲(chǔ)真正的數(shù)據(jù),.index是索引文件

假如如果想要消費(fèi)偏移量為197的文件,如果沒(méi)有索引則需要從頭到位去尋找,而有了索引文件就完全可以提高查詢(xún)速度

其中前面一大串代表文件名,第一個(gè)segment文件肯定是從0開(kāi)始,第二個(gè)segment文件命名則是以上個(gè)文件偏移量+1命名,如下:

第一個(gè)segment文件命名

0000000000000000.index

0000000000000000.log

當(dāng)上一個(gè)文件偏移量為1679898是,那么下個(gè)segment文件命名為

00000000001679899.index

00000000001679899.log

以此類(lèi)推

1.4 發(fā)送

經(jīng)過(guò)上面的消息,已經(jīng)知道生產(chǎn)者發(fā)送原理,接下來(lái)就借助SpringBootbroker發(fā)送消息。如下:

1.4.1 創(chuàng)建

先創(chuàng)建springboot項(xiàng)目kafka-springboot-test,并且導(dǎo)入kafka依賴(lài),其pom.xml內(nèi)容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.example</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

application.yml增加以下配置

spring:
  kafka:
   # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
    # 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
    # false 代表關(guān)閉此功能
      missing-topics-fatal: false
    producer:
    # 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
    # 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
    # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
    # 后面會(huì)講述如何自定義序列化器
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
    
    # 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
    # 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
    # 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
    # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

1.4.2 發(fā)布

當(dāng)配置完成即可發(fā)布消息,發(fā)消息先創(chuàng)建topic,上文中已經(jīng)創(chuàng)建了test_topic這里就不再創(chuàng)建了

發(fā)布消息則是借助org.springframework.kafka.core.KafkaTemplate發(fā)布消息,直接注入即可,代碼如下:

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootTest
@Slf4j
class DemoApplicationTests {

    /**
     * 第一個(gè)泛型為 key值的數(shù)據(jù)類(lèi)型
     * 第二個(gè)泛型為 value值的數(shù)據(jù)類(lèi)型
     */
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate;

       @Test
    void contextLoads() throws Exception {
        ListenableFuture<SendResult<String, String>> resultListenableFuture =
                kafkaTemplate.send("wangzh", "test-key", "test-topic");
        
        log.info("元數(shù)據(jù)信息:" + resultListenableFuture.get());

        log.info("發(fā)送消息完畢");

       // 關(guān)閉連接
       kafkaTemplate.destroy();
    }

}

執(zhí)行代碼成功后,就在kafka-eagle看到消息發(fā)送結(jié)果,如下

image-20210513113951216

當(dāng)然也可以指定partition發(fā)送消息

1.4.3 acks

acks 參數(shù) 規(guī)則定了必須須要有多少分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫(xiě)入是成功的 這個(gè)參數(shù)對(duì)消息丟失 可能性有重要影響,目前該參數(shù)配置如下:

acks = 0

生產(chǎn)者在成功寫(xiě)入消息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)

意味著生產(chǎn)者不知道消息有沒(méi)有把消息發(fā)送到broker,只要生產(chǎn)者將消息添加到Socket緩沖區(qū)就認(rèn)為消息發(fā)送成功,不需要等待服務(wù)器 的響應(yīng)。因此這種方式也可以支持很高的吞吐量

acks=1

只要集群的leader節(jié)點(diǎn)收到消息并寫(xiě)入到segment文件,生產(chǎn)者就會(huì)收到來(lái)自服務(wù)器的成功響應(yīng),視為發(fā)送成功

假如leader數(shù)據(jù)寫(xiě)入成功,然后宕機(jī),此時(shí)所有的副本還沒(méi)來(lái)的及同步數(shù)據(jù),那么

剛寫(xiě)入的數(shù)據(jù)就會(huì)丟失

acks=all

集群的leader收到消息并寫(xiě)入到segment中,同時(shí)等待所有的副本同步消息成功后才認(rèn)為消息發(fā)送成功

這種模式是最安全的,及時(shí)有的leader發(fā)生奔潰,那還是可以重新選舉leader進(jìn)行通信

在配置文件的producer里面設(shè)置acks即可

image-20210513133303905

2.消費(fèi)者

2.1 架構(gòu)

消費(fèi)者如果訂閱了某個(gè)主題消息,那么就可以去進(jìn)行消費(fèi),同時(shí)一個(gè)消費(fèi)者屬于一個(gè)消費(fèi)組,一個(gè)消費(fèi)組里面所有的消費(fèi)者都訂閱同一個(gè)主題.如下:

image-20210513141948856

當(dāng)消費(fèi)組里面只有一個(gè)消費(fèi)者時(shí),那么這個(gè)消費(fèi)就回去消費(fèi)所有分區(qū)的消息,當(dāng)然一般開(kāi)發(fā)也就足夠了。

但有時(shí)候生產(chǎn)者生產(chǎn)消息過(guò)快,而消費(fèi)者消費(fèi)消息過(guò)慢,就會(huì)很容易導(dǎo)致消息堆積,從而阻塞,那么就可以在消費(fèi)者組里面多增加幾個(gè)消費(fèi)者,如下:

image-20210513143001268

注意:同一組的消費(fèi)者是不會(huì)消費(fèi)同一主題的同一分區(qū)消息

當(dāng)然如果消費(fèi)者的數(shù)量超過(guò)了分區(qū)數(shù),那么超過(guò)的消費(fèi)者就會(huì)處于空閑狀態(tài)

image-20210513143347884

因此不要讓消費(fèi)者的數(shù)量超過(guò)分區(qū)數(shù)

一個(gè)消息只能被一個(gè)組消費(fèi)一次,例如上圖中consumer-1消費(fèi)了消息A,那么其他的消費(fèi)者就不能夠再次消費(fèi)A

如果在消費(fèi)時(shí),手動(dòng)指定了偏移量,那么就會(huì)重復(fù)消費(fèi)消息,這種情況特殊

當(dāng)然同一個(gè)消息可以被多個(gè)消費(fèi)組進(jìn)行消費(fèi),如下圖所示:

image-20210513145040997

2.2 分配

如下圖,消費(fèi)組中可以增加,當(dāng)增加一個(gè)消費(fèi)者,就會(huì)分?jǐn)傊跋M(fèi)者的消費(fèi)壓力,那么當(dāng)新增一個(gè)消費(fèi)者是如何將分區(qū)分配給消費(fèi)者的呢

image-20210513143001268

當(dāng)消費(fèi)者新增一個(gè)消費(fèi)者時(shí),會(huì)提高消費(fèi)者的高可用和伸縮性,且當(dāng)加入到消費(fèi)組之后就會(huì)

給新增的消費(fèi)者分配一個(gè)partition,這種操作稱(chēng)為再分配

注意:在再分配期間,消費(fèi)者會(huì)暫停消費(fèi)消息,直到分配分區(qū)完成才會(huì)繼續(xù)消費(fèi)消息

且當(dāng)分區(qū)分配給再次分配給某個(gè)消費(fèi)者時(shí),消費(fèi)者的消息可能丟失讀取狀態(tài)

同理當(dāng)consumer-2消費(fèi)者退出消費(fèi)者組時(shí),那么partition-2就會(huì)分配到consumer-1,讓他去進(jìn)行消費(fèi)

那么kafka是如何知道消費(fèi)組里面需要再分配呢?這主要是借助于組協(xié)調(diào)器,每個(gè)消費(fèi)組都會(huì)由屬于自己的組協(xié)調(diào)器。

每隔消費(fèi)者都會(huì)發(fā)送心跳到協(xié)調(diào)器,用來(lái)維護(hù)群組關(guān)系和分區(qū)關(guān)系,如下圖所示:

image-20210513151828938

這樣kafka就知道了每個(gè)消費(fèi)者屬于哪個(gè)消費(fèi)組,以及如何去分配partition

協(xié)調(diào)器就類(lèi)似于spring cloud里面的注冊(cè)中心

當(dāng)消費(fèi)者因?yàn)槟承┮蛩赝蝗煌V瓜M(fèi),也就是說(shuō)協(xié)調(diào)器收不到消費(fèi)者的心跳,那么協(xié)調(diào)器會(huì)等待幾秒,幾秒期間還是沒(méi)有收到心跳,那么協(xié)調(diào)器就會(huì)把該消費(fèi)者剔除出組,然后實(shí)現(xiàn)再分配。

2.3 消費(fèi)

這里同樣借助SpringBoot去消費(fèi)消息,消費(fèi)者配置如下:

spring:
  kafka:
    # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
      # 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
      # false 代表關(guān)閉此功能
      missing-topics-fatal: false
      producer:
        # 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
        # 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
        # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
        # 后面會(huì)講述如何自定義序列化器
        key-serializer: org.apache.kafka.common.serialization.StringSerializer

        # 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
        # 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
        # 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
        # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      # 消費(fèi)者組id
      group-id: wangzh-group

      # 是否允許自動(dòng)提交offset
      # 每當(dāng)消費(fèi)者消費(fèi)一個(gè)消息就會(huì)產(chǎn)生一個(gè)偏移量
      # 偏移量是消費(fèi)者提交到kafka中,保存在`__consumer_offsets` topic中
      enable-auto-commit: true

      # 提交偏移量間隔時(shí)間數(shù) 100ms提交一次
      auto-commit-interval: 100

      # 消費(fèi)消息時(shí)的反序列器
      # 消費(fèi)消息時(shí)會(huì)將字節(jié)序列反序列化為字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 消費(fèi)消息時(shí)的反序列化器
      # 消費(fèi)消息時(shí)會(huì)將字節(jié)序反序列化為字符串
      # 如果消息不是字符串時(shí),需要自己寫(xiě)反序列話(huà)器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 偏移量配置
      # latest 當(dāng)各個(gè)分區(qū)有已提交的偏移量是,就從提交的偏移量后開(kāi)始消費(fèi),如果沒(méi)有則消費(fèi)該分區(qū)最新產(chǎn)生的數(shù)據(jù)
      # none 各個(gè)分區(qū)都提交了偏移量后,才從偏移量后開(kāi)始消費(fèi),只要存在一個(gè)分區(qū)沒(méi)有提交偏移
      # 量那么拋出異常
      # earlist 當(dāng)各個(gè)分區(qū)有已提交的偏移量時(shí),則從提交的偏移量開(kāi)始消費(fèi),如果沒(méi)有偏移量則
      # 從頭開(kāi)始消費(fèi)
      auto-offset-reset: latest

消費(fèi)消,利用org.springframework.kafka.annotation.KafkaListener注解即可消費(fèi)消息,如下:

package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class Consumer {

    /**
     * topics 為 topic名字,可以填寫(xiě)多個(gè)topic的名字
     * ConsumerRecord 為 消息記錄,包含了一條消息大部分?jǐn)?shù)據(jù)
     */
    @KafkaListener(topics = {"wangzh"})
    public void consumer(ConsumerRecord<String,String> record) {
      log.info("消息key:" + record.key());
      log.info("消息value:" + record.value());
      log.info("消息偏移量:" + record.offset());
      log.info("消息topic" + record.topic());
    }
}

啟動(dòng)項(xiàng)目即可看到消費(fèi)的消息,如下:

image-20210513154435765

2.4 批量

上次消費(fèi)消息時(shí)一條一條消費(fèi),也就是當(dāng)一條消息消費(fèi)完成,才會(huì)去消費(fèi)下一條,這肯定不大合理,因此在數(shù)據(jù)量大的情況下需要去進(jìn)行批量消費(fèi)

批量消費(fèi)設(shè)置如下:

spring:
  kafka:
    # kafka集群地址
    bootstrap-servers: 192.168.80.130:9092,192.168.80.131:9092,192.168.80.132:9092
    listener:
      # 如果沒(méi)有至少一個(gè)配置的主題,則容器是否應(yīng)無(wú)法啟動(dòng)
      # false 代表關(guān)閉此功能
      missing-topics-fatal: false
      producer:
        # 發(fā)布消息時(shí),key的序列化器,這里是kafka提供的序列化器
        # 當(dāng)發(fā)送消息的key值不是字符串時(shí),需要自己寫(xiě)自定義序列化器
        # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
        # 后面會(huì)講述如何自定義序列化器
        key-serializer: org.apache.kafka.common.serialization.StringSerializer

        # 發(fā)布消息時(shí),value的序列化器,這里是kafka提供的序列化器
        # 當(dāng)發(fā)送消息的key不是字符串時(shí),需要自己寫(xiě)自定義序列化器
        # 一般來(lái)說(shuō)發(fā)布消息大多數(shù)都不是字符串,因此還是需要發(fā)送消息
        # 生產(chǎn)者通過(guò)該序列化器將消息的key值序列化為字節(jié)數(shù)組
        value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 設(shè)置為批量消費(fèi),默認(rèn)為單條消費(fèi)
      type: batch
    consumer:
      # 消費(fèi)者組id
      group-id: wangzh-group

      # 是否允許自動(dòng)提交offset
      # 每當(dāng)消費(fèi)者消費(fèi)一個(gè)消息就會(huì)產(chǎn)生一個(gè)偏移量
      # 偏移量是消費(fèi)者提交到kafka中,保存在`__consumer_offsets` topic中
      enable-auto-commit: true

      # 提交偏移量間隔時(shí)間數(shù) 100ms提交一次
      auto-commit-interval: 100

      # 消費(fèi)消息時(shí)的反序列器
      # 消費(fèi)消息時(shí)會(huì)將字節(jié)序列反序列化為字符串
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 消費(fèi)消息時(shí)的反序列化器
      # 消費(fèi)消息時(shí)會(huì)將字節(jié)序反序列化為字符串
      # 如果消息不是字符串時(shí),需要自己寫(xiě)反序列話(huà)器
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

      # 偏移量配置
      # latest 當(dāng)各個(gè)分區(qū)有已提交的偏移量是,就從提交的偏移量后開(kāi)始消費(fèi),如果沒(méi)有則消費(fèi)該分區(qū)最新產(chǎn)生的數(shù)據(jù)
      # none 各個(gè)分區(qū)都提交了偏移量后,才從偏移量后開(kāi)始消費(fèi),只要存在一個(gè)分區(qū)沒(méi)有提交偏移
      # 量那么拋出異常
      # earlist 當(dāng)各個(gè)分區(qū)有已提交的偏移量時(shí),則從提交的偏移量開(kāi)始消費(fèi),如果沒(méi)有偏移量則
      # 從頭開(kāi)始消費(fèi)
      auto-offset-reset: latest

      # 批量消費(fèi)時(shí),最多一次消費(fèi)多少條數(shù)據(jù)
      max-poll-records: 1000

image-20210513155124595
image-20210513155139439

同時(shí)還需要修改接受消息的參數(shù),修改如下:

@KafkaListener(topics = {"wangzh"})
public void consumer(List<ConsumerRecord<String,String>> records) {
    records.forEach(record -> {
        log.info("消息key:" + record.key());
        log.info("消息value:" + record.value());
        log.info("消息偏移量:" + record.offset());
        log.info("消息topic" + record.topic());
    });
}
image-20210513155347649

2.5 指定

通過(guò)之前的學(xué)習(xí)知道,消費(fèi)者每消費(fèi)一條消息就會(huì)提交一次偏移量,下次消費(fèi)時(shí)從偏移量后面開(kāi)始消費(fèi),這樣保證消息不會(huì)重復(fù)消費(fèi)。

有時(shí)候有一種特殊情況,需要指定偏移量去進(jìn)行消費(fèi),那么之前普通消費(fèi)并不能滿(mǎn)足,因此需要自定義操作


package com.example.demo;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.List;

@Component
@Slf4j
public class Consumer {

    /**
     * 每次消費(fèi) {"0","1","2"} 消息偏移量從1開(kāi)始消費(fèi)
     * @param records
     */
    @KafkaListener(topicPartitions = {
            @TopicPartition(topic = "wangzh",partitions = {"0","1","2"},partitionOffsets = @PartitionOffset(initialOffset =  "1",partition = "*"))
    })
    public void consumer(List<ConsumerRecord<String,String>> records) {
        records.forEach(record -> {
            log.info("消息key:" + record.key());
            log.info("消息value:" + record.value());
            log.info("消息偏移量:" + record.offset());
            log.info("消息topic" + record.topic());
        });
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容