Spring Boot 自定義kafka 消費(fèi)者配置 ContainerFactory最佳實(shí)踐

Spring Boot 自定義kafka 消費(fèi)者配置 ContainerFactory最佳實(shí)踐

本篇博文主要提供一個(gè)在 SpringBoot 中自定義 kafka配置的實(shí)踐,想象這樣一個(gè)場(chǎng)景:你的系統(tǒng)需要監(jiān)聽(tīng)多個(gè)不同集群的消息,在不同的集群中topic沖突了,所以你需要分別定義kafka消息配置。

此篇文章會(huì)在SpringBoot 提供的默認(rèn)模板上提供擴(kuò)展,不會(huì)因?yàn)槟阕远x了消費(fèi)者配置,而導(dǎo)致原生SpringBoot的Kakfa模板配置失效。

引入 MAVEN 依賴(lài)

版本需要你自己指定

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>xxx</version>
</dependency>

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>xxx</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>xxx</version>
</dependency>

引入Java配置類(lèi)

/**
 * 手動(dòng)自定義 kafka 消費(fèi)者 ContainerFactory 配置demo
 */
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConsumerConfig {

    @Autowired
    private KafkaProperties properties;

    @Value("${監(jiān)聽(tīng)服務(wù)地址}")
    private List<String> myServers;

    @Bean("myKafkaContainerFactory")
    @ConditionalOnBean(ConcurrentKafkaListenerContainerFactoryConfigurer.class)
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, consumerFactory());
        return factory;
    }

    //獲得創(chuàng)建消費(fèi)者工廠
    public ConsumerFactory<Object, Object> consumerFactory() {
        KafkaProperties myKafkaProperties = JSON.parseObject(JSON.toJSONString(this.properties), KafkaProperties.class);
        //對(duì)模板 properties 進(jìn)行定制化
        //....
        //例如:定制servers
        myKafkaProperties.setBootstrapServers(myServers);
        return new DefaultKafkaConsumerFactory<>(myKafkaProperties.buildConsumerProperties());
    }

}

yml模板

#kafka配置,更多配置請(qǐng)參考:KafkaProperties
spring.kafka:
  #公共參數(shù),其他的timeout.ms, request.timeout.ms, metadata.fetch.timeout.ms保持默認(rèn)值
  properties:
    #這個(gè)參數(shù)指定producer在發(fā)送批量消息前等待的時(shí)間,當(dāng)設(shè)置此參數(shù)后,即便沒(méi)有達(dá)到批量消息的指定大小(batch-size),到達(dá)時(shí)間后生產(chǎn)者也會(huì)發(fā)送批量消息到broker。默認(rèn)情況下,生產(chǎn)者的發(fā)送消息線(xiàn)程只要空閑了就會(huì)發(fā)送消息,即便只有一條消息。設(shè)置這個(gè)參數(shù)后,發(fā)送線(xiàn)程會(huì)等待一定的時(shí)間,這樣可以批量發(fā)送消息增加吞吐量,但同時(shí)也會(huì)增加延遲。
    linger.ms: 50 #默認(rèn)值:0毫秒,當(dāng)消息發(fā)送比較頻繁時(shí),增加一些延遲可增加吞吐量和性能。
    #這個(gè)參數(shù)指定producer在一個(gè)TCP connection可同時(shí)發(fā)送多少條消息到broker并且等待broker響應(yīng),設(shè)置此參數(shù)較高的值可以提高吞吐量,但同時(shí)也會(huì)增加內(nèi)存消耗。另外,如果設(shè)置過(guò)高反而會(huì)降低吞吐量,因?yàn)榕肯⑿式档?。設(shè)置為1,可以保證發(fā)送到broker的順序和調(diào)用send方法順序一致,即便出現(xiàn)失敗重試的情況也是如此。
    #注意:當(dāng)前消息符合at-least-once,自kafka1.0.0以后,為保證消息有序以及exactly once,這個(gè)配置可適當(dāng)調(diào)大為5。
    max.in.flight.requests.per.connection: 1 #默認(rèn)值:5,設(shè)置為1即表示producer在connection上發(fā)送一條消息,至少要等到這條消息被broker確認(rèn)收到才繼續(xù)發(fā)送下一條,因此是有序的。

  #生產(chǎn)者的配置,可參考o(jì)rg.apache.kafka.clients.producer.ProducerConfig
  producer:
    #這個(gè)參數(shù)可以是任意字符串,它是broker用來(lái)識(shí)別消息是來(lái)自哪個(gè)客戶(hù)端的。在broker進(jìn)行打印日志、衡量指標(biāo)或者配額限制時(shí)會(huì)用到。
    clientId: ${spring.application.name} #方便kafkaserver打印日志定位請(qǐng)求來(lái)源
    bootstrap-servers: 127.0.0.1:8080 #kafka服務(wù)器地址,多個(gè)以逗號(hào)隔開(kāi)
    #acks=0:生產(chǎn)者把消息發(fā)送到broker即認(rèn)為成功,不等待broker的處理結(jié)果。這種方式的吞吐最高,但也是最容易丟失消息的。
    #acks=1:生產(chǎn)者會(huì)在該分區(qū)的leader寫(xiě)入消息并返回成功后,認(rèn)為消息發(fā)送成功。如果群首寫(xiě)入消息失敗,生產(chǎn)者會(huì)收到錯(cuò)誤響應(yīng)并進(jìn)行重試。這種方式能夠一定程度避免消息丟失,但如果leader宕機(jī)時(shí)該消息沒(méi)有復(fù)制到其他副本,那么該消息還是會(huì)丟失。另外,如果我們使用同步方式來(lái)發(fā)送,延遲會(huì)比前一種方式大大增加(至少增加一個(gè)網(wǎng)絡(luò)往返時(shí)間);如果使用異步方式,應(yīng)用感知不到延遲,吞吐量則會(huì)受異步正在發(fā)送中的數(shù)量限制。
    #acks=all:生產(chǎn)者會(huì)等待所有副本成功寫(xiě)入該消息,這種方式是最安全的,能夠保證消息不丟失,但是延遲也是最大的。
    #如果是發(fā)送日志之類(lèi)的,允許部分丟失,可指定acks=0,如果想不丟失消息,可配置為all,但需密切關(guān)注性能和吞吐量。
    acks: all #默認(rèn)值:1
    #當(dāng)生產(chǎn)者發(fā)送消息收到一個(gè)可恢復(fù)異常時(shí),會(huì)進(jìn)行重試,這個(gè)參數(shù)指定了重試的次數(shù)。在實(shí)際情況中,這個(gè)參數(shù)需要結(jié)合retry.backoff.ms(重試等待間隔)來(lái)使用,建議總的重試時(shí)間比集群重新選舉leader的時(shí)間長(zhǎng),這樣可以避免生產(chǎn)者過(guò)早結(jié)束重試導(dǎo)致失敗。
    #另外需注意,當(dāng)開(kāi)啟重試時(shí),若未設(shè)置max.in.flight.requests.per.connection=1,則可能出現(xiàn)發(fā)往同一個(gè)分區(qū)的兩批消息的順序出錯(cuò),比如,第一批發(fā)送失敗了,第二批成功了,然后第一批重試成功了,此時(shí)兩者的順序就顛倒了。
    retries: 2  #發(fā)送失敗時(shí)重試多少次,0=禁用重試(默認(rèn)值)
    #默認(rèn)情況下消息是不壓縮的,此參數(shù)可指定采用何種算法壓縮消息,可取值:none,snappy,gzip,lz4。snappy壓縮算法由Google研發(fā),這種算法在性能和壓縮比取得比較好的平衡;相比之下,gzip消耗更多的CPU資源,但是壓縮效果也是最好的。通過(guò)使用壓縮,我們可以節(jié)省網(wǎng)絡(luò)帶寬和Kafka存儲(chǔ)成本。
    compressionType: "none" #如果不開(kāi)啟壓縮,可設(shè)置為none(默認(rèn)值),比較大的消息可開(kāi)啟。
    #當(dāng)多條消息發(fā)送到一個(gè)分區(qū)時(shí),Producer會(huì)進(jìn)行批量發(fā)送,這個(gè)參數(shù)指定了批量消息大小的上限(以字節(jié)為單位)。當(dāng)批量消息達(dá)到這個(gè)大小時(shí),Producer會(huì)一起發(fā)送到broker;但即使沒(méi)有達(dá)到這個(gè)大小,生產(chǎn)者也會(huì)有定時(shí)機(jī)制來(lái)發(fā)送消息,避免消息延遲過(guò)大。
    batch-size: 16384 #默認(rèn)16K,值越小延遲越低,但是吞吐量和性能會(huì)降低。0=禁用批量發(fā)送
    #這個(gè)參數(shù)設(shè)置Producer暫存待發(fā)送消息的緩沖區(qū)內(nèi)存的大小,如果應(yīng)用調(diào)用send方法的速度大于Producer發(fā)送的速度,那么調(diào)用會(huì)阻塞一定(max.block.ms)時(shí)間后拋出異常。
    buffer-memory: 33554432 #緩沖區(qū)默認(rèn)大小32M
  #消費(fèi)者的配置,可參考:org.apache.kafka.clients.consumer.ConsumerConfig
  consumer:
    #這個(gè)參數(shù)可以為任意值,用來(lái)指明消息從哪個(gè)客戶(hù)端發(fā)出,一般會(huì)在打印日志、衡量指標(biāo)、分配配額時(shí)使用。
    #暫不用提供clientId,2.x版本可放出來(lái),1.x有多個(gè)topic且concurrency>1會(huì)出現(xiàn)JMX注冊(cè)時(shí)異常
    #clientId: ${spring.application.name} #方便kafkaserver打印日志定位請(qǐng)求來(lái)源
    # 簽中kafka集群
    bootstrap-servers: 127.0.0.1:8080 #kafka服務(wù)器地址,多個(gè)以逗號(hào)隔開(kāi)
    #這個(gè)參數(shù)指定了當(dāng)消費(fèi)者第一次讀取分區(qū)或者無(wú)offset時(shí)拉取那個(gè)位置的消息,可以取值為latest(從最新的消息開(kāi)始消費(fèi)),earliest(從最老的消息開(kāi)始消費(fèi)),none(如果無(wú)offset就拋出異常)
    autoOffsetReset: latest #默認(rèn)值:latest
    #這個(gè)參數(shù)指定了消費(fèi)者是否自動(dòng)提交消費(fèi)位移,默認(rèn)為true。如果需要減少重復(fù)消費(fèi)或者數(shù)據(jù)丟失,你可以設(shè)置為false,然后手動(dòng)提交。如果為true,你可能需要關(guān)注自動(dòng)提交的時(shí)間間隔,該間隔由auto.commit.interval.ms設(shè)置。
    enable-auto-commit: false
    #周期性自動(dòng)提交的間隔,單位毫秒
    auto-commit-interval: 2000 #默認(rèn)值:5000
    #這個(gè)參數(shù)允許消費(fèi)者指定從broker讀取消息時(shí)最小的Payload的字節(jié)數(shù)。當(dāng)消費(fèi)者從broker讀取消息時(shí),如果數(shù)據(jù)字節(jié)數(shù)小于這個(gè)閾值,broker會(huì)等待直到有足夠的數(shù)據(jù),然后才返回給消費(fèi)者。對(duì)于寫(xiě)入量不高的主題來(lái)說(shuō),這個(gè)參數(shù)可以減少broker和消費(fèi)者的壓力,因?yàn)闇p少了往返的時(shí)間。而對(duì)于有大量消費(fèi)者的主題來(lái)說(shuō),則可以明顯減輕broker壓力。
    fetchMinSize: 1 #默認(rèn)值: 1
    #上面的fetch.min.bytes參數(shù)指定了消費(fèi)者讀取的最小數(shù)據(jù)量,而這個(gè)參數(shù)則指定了消費(fèi)者讀取時(shí)最長(zhǎng)等待時(shí)間,從而避免長(zhǎng)時(shí)間阻塞。這個(gè)參數(shù)默認(rèn)為500ms。
    fetchMaxWait: 500 #默認(rèn)值:500毫秒
    #這個(gè)參數(shù)控制一個(gè)poll()調(diào)用返回的記錄數(shù),即consumer每次批量拉多少條數(shù)據(jù)。
    maxPollRecords: 500 #默認(rèn)值:500
  listener:
    #創(chuàng)建多少個(gè)consumer,值必須小于等于Kafk Topic的分區(qū)數(shù)。
    ack-mode: MANUAL_IMMEDIATE
    concurrency: 1  #推薦設(shè)置為topic的分區(qū)數(shù)

配置釋義

點(diǎn)開(kāi) KafkaProperties 這個(gè)類(lèi),可以看到這個(gè)是SpringBoot 自動(dòng)配置kafka的配置類(lèi),引入這個(gè)實(shí)例,就相當(dāng)于你拿到了SpringBoot kafka配置模板的參數(shù),就是上述貼的配置,然后再此基礎(chǔ)上重新定義你需要改變的配置,這里主要講消費(fèi)者配置。

代碼中舉了個(gè)重寫(xiě)監(jiān)聽(tīng)servers的例子:

//例如:定制servers
myKafkaProperties.setBootstrapServers(myServers);

@KafkaListener 使用 containerFactory

@Slf4j
@Component
public class ConsumerDemo {

    //聲明consumerID為demo,監(jiān)聽(tīng)topicName為topic.quick.demo的Topic
    //這個(gè)消費(fèi)者的 containerFactory 是SpringBoot 提供的 kafkaListenerContainerFactory 這個(gè)bean
    @KafkaListener(id = "demo", topics = "topic.quick.demo")
    public void listen(String msgData) {
        log.info("demo receive : " + msgData);
    }

    @KafkaListener(topics = "k010", containerFactory = "myKafkaContainerFactory")
    public void listen(String msgData, Acknowledgment ack) {
        log.info("demo receive : " + msgData);
        //手動(dòng)提交
        //enable.auto.commit參數(shù)設(shè)置成false。那么就是Spring來(lái)替為我們做人工提交,從而簡(jiǎn)化了人工提交的方式。
        //所以kafka和springboot結(jié)合中的enable.auto.commit為false為spring的人工提交模式。
        //enable.auto.commit為true是采用kafka的默認(rèn)提交模式。
        ack.acknowledge();
    }
}

如果在@KafkaListener屬性中沒(méi)有指定 containerFactory 那么Spring Boot 會(huì)默認(rèn)注入 name 為“kafkaListenerContainerFactory” 的 containerFactory。具體源碼可跟蹤:KafkaListenerAnnotationBeanPostProcessor中的常量:

public static final String DEFAULT_KAFKA_LISTENER_CONTAINER_FACTORY_BEAN_NAME = "kafkaListenerContainerFactory";
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容