SpringKafka常用組件與配置

kafka模塊提供了一些開箱即用的功能,但大部分特性均來自spring-kafka。spring-kafka本身其實(shí)已經(jīng)可以做足夠多的事情。
spring-kafka針對(duì)kafka的操作做了高級(jí)抽象,本文進(jìn)行簡(jiǎn)要介紹一些常用功能,以方便開發(fā)人員熟知。

發(fā)送消息

spring-kafka提供了兩種發(fā)送操作類:KafkaTemplate及ReplyingKafkaTemplate。前者為普通發(fā)送者,后者可以同步接收消費(fèi)者的回復(fù)消息。

消費(fèi)消息

初始消費(fèi)

kafka的auto.offset.reset允許我們?cè)O(shè)置kafka中沒有初始o(jì)ffset或當(dāng)前offset沒有存儲(chǔ)在服務(wù)器時(shí)的初始消費(fèi)方式,一般我們?cè)O(shè)置為earliest:當(dāng)分區(qū)中存在已消費(fèi)的offset時(shí),從當(dāng)前位置開始消費(fèi);不存在時(shí),從頭拉取,此種方式防止我們晚于生產(chǎn)者進(jìn)入消費(fèi),從而導(dǎo)致丟失消息。
但是此種模式針對(duì)同一個(gè)topic的不同分組,消費(fèi)者將會(huì)從頭開始消費(fèi)(比如某天有人興起換了個(gè)分組名稱)。
故開發(fā)者在消費(fèi)消息,處理業(yè)務(wù)時(shí)必須需要做冪等性處理。(如根據(jù)狀態(tài)處理,根據(jù)唯一鍵進(jìn)行寫入等)

提交方式

此處所指的提交即為修改offset。
spring-kafka的消費(fèi)者可以指定其提交方式,默認(rèn)為自動(dòng)提交??梢酝ㄟ^修改enable-auto-commit配置來開啟手動(dòng)提交。
需注意的是,僅開啟此選項(xiàng),listener的默認(rèn)應(yīng)答方式ack-mode為batch,仍是自動(dòng)提交,只不過變成了每批記錄傳遞給監(jiān)聽器之后批量提交。
如果需要變?yōu)轱@示提交,需要設(shè)置ack-mode為MANUAL或MANUAL_IMMEDIATE,并在監(jiān)聽器中加入Acknowledgment參數(shù),調(diào)用其acknowledgment方法進(jìn)行手動(dòng)提交。
大體代碼如下:

application.yml

spring:
  kafka:
    consumer:
      enable-auto-commit: false
    listener:
      ack-mode: manual_immediate

listener

    @KafkaListener(topics = "acknowledgment-test")
    public void listen(UserModel userModel, Acknowledgment acknowledgment) {
        System.out.println(userModel);
        acknowledgment.acknowledge();
    }

批量消費(fèi)

可以通過spring.kafka.listener.type=batch開啟批量消費(fèi)。
可以通過spring.kafka.consumer.max-poll-recrods來修改每次拉取的消息。默認(rèn)為500。
可以通過spring.kafka.listener.concurrency來修改同時(shí)消費(fèi)的監(jiān)聽器。建議與分區(qū)數(shù)量一致。

事務(wù)

kafka在早在0.11版本便引入了事務(wù)機(jī)制及消息僅發(fā)送一次(但唯一消息只有在同一分區(qū)才有效)。但是spring-kafa并沒有提供此方式的配置。(可能只針對(duì)一個(gè)分區(qū)的唯一消息比較雞肋?)

spring-kafka提供了事務(wù)機(jī)制。允許我們可以使用kafka提供的事務(wù)機(jī)制回滾提交。

消息事務(wù)的場(chǎng)景

數(shù)據(jù)庫(kù)事務(wù)的是針對(duì)于數(shù)據(jù)庫(kù)的ACID屬性。而消息系統(tǒng)的事務(wù)則是針對(duì)消息生產(chǎn)/消費(fèi)的原則操作。兩者為不同的數(shù)據(jù)源,不可混為一談。

我們使用kafka消息事務(wù)的場(chǎng)景有以下兩種:

  1. 在一次業(yè)務(wù)中,存在消費(fèi)消息,又存在生產(chǎn)消息。此時(shí)如果消息生產(chǎn)失敗,那么消費(fèi)者需要回滾。這種情況稱為consumer-transform-producer
  2. 在一次業(yè)務(wù)中,存在多次生產(chǎn)消息,其中后續(xù)生產(chǎn)的消息拋出異常,前置生產(chǎn)的消息需要回滾。

同步事務(wù)

spring-kafka提供了一種同步鏈?zhǔn)聞?wù),可以允許kafka數(shù)據(jù)源事務(wù)與其他數(shù)據(jù)源結(jié)合,要么一起成功,要么一起失敗??梢酝ㄟ^ChainedKafkaTransactionManager來實(shí)現(xiàn)。

開啟方式

spring-kafka提供了spring.kafka.producer.transaction-id-prefix屬性開啟事務(wù),僅需要配置事務(wù)前綴,并且在所有涉及到kafka操作及監(jiān)聽的方法上增加@Transcational注解。
注意:spring-kafka的事務(wù)是針對(duì)單示例的,即每個(gè)@Transcational所標(biāo)識(shí)的方法均會(huì)創(chuàng)建一個(gè)事務(wù),并且生成一個(gè)事務(wù)id。這種方式比較適合于真實(shí)場(chǎng)景。

大體代碼如下:

application.yml

spring:
  kafka:
    producer:
      transaction-id-prefix: test-transacation 

快速開發(fā)框架
高質(zhì)量圖片壓縮工具

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容