kafka編程應用

Kafka中的分區(qū)器、序列化器、攔截器是否了解?它們之間的處理順序是什么?

  • 序列化器:生產(chǎn)者需要用序列化器(Serializer)把對象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡發(fā)送給 Kafka。而在對側(cè),消費者需要用反序列化器(Deserializer)把從 Kafka 中收到的字節(jié)數(shù)組轉(zhuǎn)換成相應的對象。
  • 分區(qū)器:分區(qū)器的作用就是為消息分配分區(qū)。如果消息 ProducerRecord 中沒有指定 partition 字段,那么就需要依賴分區(qū)器,根據(jù) key 這個字段來計算 partition 的值。
  • Kafka 一共有兩種攔截器:生產(chǎn)者攔截器和消費者攔截器。
    1、生產(chǎn)者攔截器既可以用來在消息發(fā)送前做一些準備工作,比如按照某個規(guī)則過濾不符合要求的消息、修改消息的內(nèi)容等,也可以用來在發(fā)送回調(diào)邏輯前做一些定制化的需求,比如統(tǒng)計類工作。
    消費者攔截器主要在消費到消息或在提交消費位移時進行一些定制化的操作。
    2、消息在通過 send() 方法發(fā)往 broker 的過程中,有可能需要經(jīng)過攔截器(Interceptor)、序列化器(Serializer)和分區(qū)器(Partitioner)的一系列作用之后才能被真正地發(fā)往 broker。攔截器(下一章會詳細介紹)一般不是必需的,而序列化器是必需的。消息經(jīng)過序列化之后就需要確定它發(fā)往的分區(qū),如果消息 ProducerRecord 中指定了 partition 字段,那么就不需要分區(qū)器的作用,因為 partition 代表的就是所要發(fā)往的分區(qū)號。

處理順序 :攔截器->序列化器->分區(qū)器

KafkaProducer 在將消息序列化和計算分區(qū)之前會調(diào)用生產(chǎn)者攔截器的 onSend() 方法來對消息進行相應的定制化操作。
然后生產(chǎn)者需要用序列化器(Serializer)把對象轉(zhuǎn)換成字節(jié)數(shù)組才能通過網(wǎng)絡發(fā)送給 Kafka。
最后可能會被發(fā)往分區(qū)器為消息分配分區(qū)。

kafka 遠程連接

首先,我們需要下載與服務端相同的kafka版本地址
https://downloads.apache.org/kafka/2.4.0/kafka_2.12-2.4.0.tgz
服務端需要監(jiān)聽外網(wǎng)ip,需要暴露的外網(wǎng)

advertised.listeners=PLAINTEXT://xxx:9092

執(zhí)行命令

D:\Learn Study\springBoot\springBoot\simple\kafka>cd D:\opt\kafka_2.12-2.4.0\bin\windows

D:\opt\kafka_2.12-2.4.0\bin\windows>kafka-topics.bat --create --zookeeper 47.105.194.139:2181 --replication-factor 1 --partitions 1 --topic test5
Picked up _JAVA_OPTIONS: -Djava.net.preferIPv4Stack=true
Created topic test5.

說明已經(jīng)成功連接了。
我們可以基于程序編程了。

1、建立項目,引入pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.iva.study</groupId>
    <artifactId>spring-boot</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>spring-boot-kafka</name>
    <description>spring-boot-kafka</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <!--核心依賴-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <!--核心依賴End-->

        <!--輔助:可以修改不重啟-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <!--可以修改不重啟End-->

        <!--測試-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
        <!--測試End-->

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
2、添加配置
spring.kafka.bootstrap-servers=47.105.194.139:9092

3、開始編程

@SpringBootApplication
public class SpringBootKafkaApplication implements CommandLineRunner {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApplication.class, args);
    }


    private final Logger logger = LoggerFactory.getLogger(SpringBootKafkaApplication.class);

    @Autowired
    private KafkaTemplate<Object, Object> template;


    @KafkaListener(id = "webGroup", topics = "test5")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }

    @Override
    public void run(String... strings) throws Exception {
        this.template.send("test5", "haha");
    }
}

Spring-kafka-test嵌入式Kafka Server

不過上面的代碼能夠啟動成功,前提是你已經(jīng)有了Kafka Server的服務環(huán)境,我們知道Kafka是由Scala + Zookeeper構(gòu)建的,可以從官網(wǎng)下載部署包在本地部署。但是,我想告訴你,為了簡化開發(fā)環(huán)節(jié)驗證Kafka相關功能,Spring-Kafka-Test已經(jīng)封裝了Kafka-test提供了注解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文后面的所有測試用例的Kafka都是使用這種嵌入式服務提供的。

創(chuàng)建topic方法

  • bean自動創(chuàng)建【不方便,需要編碼】
@Configuration
public class KafkaInitialConfiguration {

    //創(chuàng)建TopicName為topic.quick.initial的Topic并設置分區(qū)數(shù)為8以及副本數(shù)為1
    @Bean
    public NewTopic initialTopic() {
        return new NewTopic("topic.quick.initial",8, (short) 1 );
    }
}
  • bean手動創(chuàng)建
    暴露對象想AdminClient
@Configuration
public class KafkaInitialConfiguration1 {
    @Autowired
    KafkaAdmin kafkaAdmin;

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin.getConfig());
    }
}

調(diào)用

@SpringBootApplication
public class SpringBootKafkaApplication  implements CommandLineRunner {
    public static void main(String[] args) {
        SpringApplication.run(SpringBootKafkaApplication.class, args);
    }

    @Autowired
    private AdminClient adminClient;

    @Override
    public void run(String... strings) throws Exception {
        NewTopic topic =  new NewTopic("topic7",2, (short) 1 );
        adminClient.createTopics(Arrays.asList(topic));
    }
}

查詢topic信息

  @Override
    public void run(String... strings) throws Exception {
        DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList("test"));
        result.all().get().forEach((k,v)->System.out.println("k: "+k+" ,v: "+v.toString()+"\n"));
    }

kafka生產(chǎn)者

  • 異步
    @Override
    public void run(String... strings) throws Exception {
        template.send("test5","k2").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("kafka執(zhí)行失敗");
            }

            @Override
            public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
                System.out.println("kafka執(zhí)行成功"+objectObjectSendResult.getProducerRecord().value());
            }
        });
    }
  • 同步
    @Override
    public void run(String... strings) throws Exception {
        ListenableFuture<SendResult<Object,Object>> future = template.send("test5","kl");
        try {
            SendResult<Object,Object> result = future.get();
            System.out.println(result.getProducerRecord().value());
        }catch (Throwable e){
            e.printStackTrace();
        }
    }

kafka消費者

    @KafkaListener(id = "webGroup", topics = "test5")
    public void listen(String input) {
        logger.info("input value: {}" , input);
    }

KAFKA事務消息

默認情況下,Spring-kafka自動生成的KafkaTemplate實例,是不具有事務消息發(fā)送能力的。需要使用如下配置激活事務特性。事務激活后,所有的消息發(fā)送只能在發(fā)生事務的方法內(nèi)執(zhí)行了,不然就會拋一個沒有事務交易的異常

spring.kafka.producer.transaction-id-prefix=kafka_tx.

當發(fā)送消息有事務要求時,比如,當所有消息發(fā)送成功才算成功,如下面的例子:假設第一條消費發(fā)送后,在發(fā)第二條消息前出現(xiàn)了異常,那么第一條已經(jīng)發(fā)送的消息也會回滾。而且正常情況下,假設在消息一發(fā)送后休眠一段時間,在發(fā)送第二條消息,消費端也只有在事務方法執(zhí)行完成后才會接收到消息

  • 方式一,不用注解
    @Override
    public void run(String... strings) throws Exception {
        template.executeInTransaction(t ->{
            t.send("test5","k4");
            if("error".equals("error")){
                throw new RuntimeException("failed");
            }
            t.send("test5","ckl");
            return true;
        });
    }
  • 方式二,注解
    @Transactional(rollbackFor = RuntimeException.class)
    @Override
    public void run(String... strings) throws Exception {
        template.send("test5","k4");
            if("error".equals("error")){
                throw new RuntimeException("failed");
            }
        template.send("test5","ckl");
    }

參考資料:
springboot集成kafka示例
spring boot集成kafka之spring-kafka深入探秘
Spring-Kafka(三)—— 操作Topic以及Kafka Tool 2的使用

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容