producer開發(fā)

本章著重討論kafka的producer設(shè)計(jì)以及基于java版本producer的開發(fā)與使用

producer概覽

kafka producer就是負(fù)責(zé)向kafka寫入數(shù)據(jù)的應(yīng)用程序。
子0.9.0.0版本起,kafka發(fā)布了java版本producer供用戶使用,但作為一個(gè)比較完善的生態(tài)系統(tǒng),kafka必然要支持多種語言的producer,這其中比較著名的當(dāng)屬C/C++平臺(tái)上的producer庫librkafka(準(zhǔn)確的說,這些庫也同時(shí)包含了對(duì)consumer的支持,統(tǒng)稱他們是clients似乎更加合理一些),而向python,go或.net這種主流的編程語言也有對(duì)應(yīng)的producer庫。當(dāng)前apache kafka支持的第三方ciients庫的完整列表如下:

  • C/C++
  • python
  • golang
  • erlang
  • .NET
  • clojure
  • ruby
  • Node.js
  • proxy(HTTP REST等)
  • perl
  • stdin/stdout
  • PHP
  • rust
  • alternative java
  • storm
  • scala DSL
  • ...

值得注意的是,上面這些第三方庫基本上都是由非Apache Kafka社區(qū)的人維護(hù)的,其中一些比較大的庫已經(jīng)由Confluent公司的人參與研發(fā)和維護(hù)(前面提到過的Confluent公司發(fā)布的企業(yè)級(jí)產(chǎn)品包含了librdkafka庫支持C++等平臺(tái)),但如果用戶下載的是Apache Kafka,默認(rèn)是不包含這些庫的,需要額外單獨(dú)下載對(duì)應(yīng)的庫。關(guān)于這些第三方庫的詳細(xì)信息以及下載地址,請(qǐng)?jiān)L問https://cwiki.apache.org/confluence/display/KAFKA/Clients。

另外,Apache Kafka還封裝了一套二進(jìn)制通信協(xié)議,用于對(duì)外提供各種各樣的服務(wù)。對(duì)于producer而言,用戶幾乎可以直接使用任意編程語言按照該協(xié)議的格式進(jìn)行編程,從而實(shí)現(xiàn)向Kafka發(fā)送消息。實(shí)際上內(nèi)置的Java版本producer和上面列出的所有第三方庫在底層都是相同的實(shí)現(xiàn)原理,只是在易用性和性能方面有所差別而已。這組協(xié)議本質(zhì)上為不同的協(xié)議類型分別定義了專屬的緊湊二進(jìn)制字節(jié)數(shù)組格式,然后通過Socket發(fā)送給合適的broker,之后等待broker處理完成后返還響應(yīng)(response)給producer。這樣設(shè)計(jì)的好處在于具有良好的統(tǒng)一性——即所有的協(xié)議類型都是統(tǒng)一格式的,并且由于是自定義的二進(jìn)制格式,這套協(xié)議并不依賴任何外部序列化框架,從而顯得非常輕量級(jí),而且也有很好的擴(kuò)展性。

第6章將詳細(xì)討論這套二進(jìn)制通信協(xié)議的設(shè)計(jì)與使用。在這里只需要知道producer底層是由它們實(shí)現(xiàn)的即可。

Kafka producer在設(shè)計(jì)上要比consumer簡(jiǎn)單一些,因?yàn)樗簧婕皬?fù)雜的組管理操作,即每個(gè)producer都是獨(dú)立進(jìn)行工作的,與其他producer實(shí)例之間沒有關(guān)聯(lián),因此它受到的牽絆自然也要少得多,實(shí)現(xiàn)起來也要簡(jiǎn)單得多。目前producer的首要功能就是向某個(gè)topic的某個(gè)分區(qū)發(fā)送一條消息,所以它首先需要確認(rèn)到底要向topic的哪個(gè)分區(qū)寫入消息——這就是分區(qū)器(partitioner)要做的事情。Kafka producer提供了一個(gè)默認(rèn)的分區(qū)器。對(duì)于每條待發(fā)送的消息而言,如果該消息指定了key,那么該partitioner會(huì)根據(jù)key的哈希值來選擇目標(biāo)分區(qū);若這條消息沒有指定key,則partitioner使用輪詢的方式確認(rèn)目標(biāo)分區(qū)——這樣可以最大限度地確保消息在所有分區(qū)上的均勻性。當(dāng)然producer的API賦予了用戶自行指定目標(biāo)分區(qū)的權(quán)力,即用戶可以在消息發(fā)送時(shí)跳過partitioner直接指定要發(fā)送到的分區(qū)。

另外,producer也允許用戶實(shí)現(xiàn)自定義的分區(qū)策略而非使用默認(rèn)的partitioner,這樣用戶可以很靈活地根據(jù)自身的業(yè)務(wù)需求確定不同的分區(qū)策略。后面章節(jié)中會(huì)詳細(xì)討論如何自定義分區(qū)策略。

有了partitioner的幫助,我們就可以確信具有相同key的所有消息都會(huì)被路由到相同的分區(qū)中。這有助于實(shí)現(xiàn)一些特定的業(yè)務(wù)需求,比如可以利用局部性原理,將某些producer發(fā)送的消息固定地發(fā)送到相同機(jī)架上的分區(qū)從而減少網(wǎng)絡(luò)傳輸?shù)拈_銷等。當(dāng)然了,如前所述,如果沒有指定key,那么所有消息會(huì)被均勻地發(fā)送到所有分區(qū),而這通常也是最合理的分區(qū)策略。

在確認(rèn)了目標(biāo)分區(qū)后,producer要做的第二件事情就是要尋找這個(gè)分區(qū)對(duì)應(yīng)的leader,也就是該分區(qū)leader副本所在的Kafka broker。前面章節(jié)中提到了每個(gè)topic分區(qū)都由若干個(gè)副本組成,其中的一個(gè)副本充當(dāng)leader的角色,也只有l(wèi)eader才能夠響應(yīng)clients發(fā)送過來的請(qǐng)求,而剩下的副本中有一部分副本會(huì)與leader副本保持同步,即所謂的ISR。因此在發(fā)送消息時(shí),producer也就有了多種選擇來實(shí)現(xiàn)消息發(fā)送。比如不等待任何副本的響應(yīng)便返回成功,或者只是等待leader副本響應(yīng)寫入操作之后再返回成功等。不同的選擇也有著不同的優(yōu)缺點(diǎn),我們會(huì)在后續(xù)章節(jié)中討論如何選擇不同的策略。

Java版本producer的工作原理如圖4.1所示。


producer首先使用一個(gè)線程(用戶主線程,也就是用戶啟動(dòng)producer的線程)將待發(fā)送的消息封裝進(jìn)一個(gè)ProducerRecord類實(shí)例,然后將其序列化之后發(fā)送給partitioner,再由后者確定了目標(biāo)分區(qū)后一同發(fā)送到位于producer程序中的一塊內(nèi)存緩沖區(qū)中。而producer的另一個(gè)工作線程(I/O發(fā)送線程,也稱Sender線程)則負(fù)責(zé)實(shí)時(shí)地從該緩沖區(qū)中提取出準(zhǔn)備就緒的消息封裝進(jìn)一個(gè)批次(batch),統(tǒng)一發(fā)送給對(duì)應(yīng)的broker。整個(gè)producer的工作流程大概就是這樣的。

構(gòu)造producer

producer程序?qū)嵗?/h3>

首先,下面給出了一份可運(yùn)行的producer程序代碼清單。這份代碼實(shí)現(xiàn)了最簡(jiǎn)單的功能:構(gòu)造一條消息,然后發(fā)送給Kafka。在運(yùn)行這個(gè)producer程序之前,要保證啟動(dòng)一個(gè)最小規(guī)模的Kafka單機(jī)或集群環(huán)境。Kafka運(yùn)行環(huán)境搭建方法請(qǐng)參考第3章。

package com.huxi.kafkaapi;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTest {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092"); // 必須指定
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必須指定
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必須指定

        props.put("acks", "-1");
        props.put("retries", 3);
        props.put("batch.size", 323840);
        props.put("linger.ms", 10);
        props.put("buffer.memory", 33554432);
        props.put("max.block.ms", 3000);

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 0; i < 100; i++)
            producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));

        producer.close();
    }
}

構(gòu)造一個(gè)producer實(shí)例大致需要以下5個(gè)步驟。

  1. 構(gòu)造一個(gè)java.util.Properties對(duì)象,然后至少指定bootstrap.servers、key.serializer和value.serializer這3個(gè)屬性。在上面的代碼清單中這3個(gè)屬性后面都追加了注釋,表明這是必須要指定的參數(shù),它們沒有默認(rèn)值。

  2. 使用上一步中創(chuàng)建的Properties實(shí)例構(gòu)造KafkaProducer對(duì)象。

  3. 構(gòu)造待發(fā)送的消息對(duì)象ProducerRecord,指定消息要被發(fā)送到的topic、分區(qū)以及對(duì)應(yīng)的key和value。注意,分區(qū)和key信息可以不用指定,由Kafka自行確定目標(biāo)分區(qū)。

  4. 調(diào)用KafkaProducer的send方法發(fā)送消息。

  5. 關(guān)閉KafkaProducer。

1. 構(gòu)造Properties對(duì)象

下面將詳細(xì)展開每一步要做的事情。首先要構(gòu)造一個(gè)Properties對(duì)象,在這一步中有3個(gè)參數(shù)或?qū)傩允潜仨氁付ǖ?。如果我們翻開Kafka官網(wǎng)中producer的參數(shù)列表(詳見https://kafka.apache.org/documentation/#producerconfigs)會(huì)發(fā)現(xiàn)這3個(gè)參數(shù)是沒有默認(rèn)值的。它們分別如下。

bootstrap.servers

該參數(shù)指定了一組host:port對(duì),用于創(chuàng)建向Kafka broker服務(wù)器的連接,比如k1:9092,k2:9092,k3:9092。上面的代碼清單中指定了localhost:9092,producer使用時(shí)需要替換成實(shí)際的broker列表。如果Kafka集群中機(jī)器數(shù)很多,那么只需要指定部分broker即可,不需要列出所有的機(jī)器。因?yàn)椴还苤付◣着_(tái)機(jī)器,producer都會(huì)通過該參數(shù)找到并發(fā)現(xiàn)集群中所有的broker。為該參數(shù)指定多臺(tái)機(jī)器只是為了故障轉(zhuǎn)移使用。這樣即使某一臺(tái)broker掛掉了,producer重啟后依然可以通過該參數(shù)指定的其他broker連入Kafka集群。

另外,如果broker端沒有顯式配置listeners使用IP地址,那么最好將該參數(shù)也配置成主機(jī)名,而不是IP地址。因?yàn)镵afka內(nèi)部使用的就是FQDN(Fully Qualified Domain Name)。

key.serializer

被發(fā)送到broker端的任何消息的格式都必須是字節(jié)數(shù)組,因此消息的各個(gè)組件必須首先做序列化,然后才能發(fā)送到broker。該參數(shù)就是為消息的key做序列化之用的。這個(gè)參數(shù)指定的是實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類的全限定名稱。Kafka為大部分的初始類型(primitive type)默認(rèn)提供了現(xiàn)成的序列化器。上面的代碼清單中使用了org.apache.kafka.common.serialization.StringSerializer,該類會(huì)將一個(gè)字符串類型轉(zhuǎn)換成字節(jié)數(shù)組。這個(gè)參數(shù)也揭示了一個(gè)事實(shí),那就是用戶可以自定義序列化器,只要實(shí)現(xiàn)Serializer接口即可。

需要注意的是,即使producer程序在發(fā)送消息時(shí)不指定key,這個(gè)參數(shù)也是必須要設(shè)置的,否則程序會(huì)拋出ConfigException異常,提示“key.serializer”參數(shù)無默認(rèn)值,必須要配置。

value.serializer

和key.serializer類似,只是它被用來對(duì)消息體(即消息value)部分做序列化,將消息value部分轉(zhuǎn)換成字節(jié)數(shù)組。上面的代碼清單中該參數(shù)指定了與key.serializer相同的值,即都使用StringSerializer。當(dāng)然了,value.serializer也可以設(shè)置成與key.serializer不同的值。
一定要注意的是,這兩個(gè)參數(shù)都必須是全限定類名。

2. 構(gòu)造KafkaProducer對(duì)象

設(shè)置了這3個(gè)屬性之后,下面就要構(gòu)造KafkaProducer對(duì)象了。KafkaProducer是producer的主入口,所有的功能基本上都是由KafkaProducer來提供的。創(chuàng)建KafkaProducer實(shí)例很簡(jiǎn)單,只需要下面一句命令即可:


image.png

創(chuàng)建producer時(shí)也可以同時(shí)指定key和value的序列化類,比如這樣:


image.png

如果采用這樣的方式創(chuàng)建producer,那么就不需要顯式地在properties中指定key和value的序列化類了。

構(gòu)造ProduceRecord對(duì)象

構(gòu)造好kafkaProdcer實(shí)例后,下一步就是構(gòu)造消息實(shí)例。java版本producer使用ProducerRecord類來表示每條消息。創(chuàng)建prodducerRecord也很簡(jiǎn)答,最簡(jiǎn)單的形式就是指定topic和value,如下:

image.png

當(dāng)然,producerRecord還支持指定更多的消息消息,比如可以控制該消息直接被發(fā)往的分布以及消息的時(shí)間戳,
具體API格式請(qǐng)參見https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html

不過這里要注意的是,一定要謹(jǐn)慎指定時(shí)間戳,因?yàn)樵谀壳暗膋afka設(shè)計(jì)中,時(shí)間戳索引文件中的索引都是按照時(shí)間戳順序排列的,所以如果在producer端隨意指定時(shí)間戳,會(huì)導(dǎo)致該信息的時(shí)間序列混亂,這樣在根據(jù)時(shí)間戳查詢位移的功能時(shí)不會(huì)找到這條消息。同時(shí)kafka的消息留存策略也會(huì)受到影響,因此最好還是然讓kafka自行來指定戳比較好。

發(fā)送消息

發(fā)送消息的主方法是send,producer在地城完全的實(shí)現(xiàn)了異步化,并且通過java的tuture同時(shí)實(shí)現(xiàn)了同步和異步發(fā)送 + 回調(diào)兩種發(fā)送方式。

異步發(fā)送

實(shí)際上所有的寫入操作都是異步的。java版本的producer的send方法會(huì)返回一個(gè)java future對(duì)象供用戶稍后獲取發(fā)送結(jié)果,這就是所謂的回調(diào)機(jī)制。代碼如下:

producer.send(record,new CallBack(){
    @Override
    public void onCompletion(RecordMetadata metadata,Exception exception){
            if(exception == null){
            //消息發(fā)送成功
            }else{
            //執(zhí)行錯(cuò)誤處理邏輯
            }
    }
});

上面代碼中的CallBack就是發(fā)送消息后的回調(diào)類,實(shí)現(xiàn)方法是onCompletion。該方法的兩個(gè)輸入?yún)?shù)metadata,和exception會(huì)同時(shí)非空,也就是說至少一個(gè)是null。當(dāng)消息發(fā)送成功時(shí),exception是null;繁殖,若消息發(fā)送失敗,metadata就是null。因此在寫producer程序時(shí),最好寫if判斷。

另外,上面的Callbacl實(shí)際上是一個(gè)java接口,用戶可以創(chuàng)建自定義的CallBack實(shí)現(xiàn)類來處理消息發(fā)送后的邏輯,只需要該具體類實(shí)現(xiàn)org.apache.kafka.clients.producer.CallBack接口即可

同步發(fā)送

同步發(fā)送和異步發(fā)送其實(shí)就是通過java的future來區(qū)分的,調(diào)用Future.get()無限等待結(jié)果返回,即實(shí)現(xiàn)同步發(fā)送的效果

ProduceRecord<String,String> record = new ProducerRecord<>("test",Integer.toString(i));
producer.send(record).get();

使用Future.get會(huì)一直等待下去直至kafka broker將發(fā)送結(jié)果返回給producer程序。當(dāng)結(jié)果從broker處返回時(shí)gett方法要么返回發(fā)送結(jié)果要么跑出異常交由producer自行處理。如果沒有錯(cuò)誤,get方法將返回對(duì)應(yīng)的RecordMetadata實(shí)例(包含了已發(fā)送消息的所有元數(shù)據(jù)信息)包括消息發(fā)送的topic、分區(qū)以及該消息在對(duì)應(yīng)分區(qū)的位移信息。

不管是同步發(fā)送還是異步發(fā)送,發(fā)送都有可能失敗,導(dǎo)致返回異常錯(cuò)誤。當(dāng)前kafka的錯(cuò)誤類型包含了兩類:可重試異常和不可重試異常。常見的可重試異常吐下。

  • LeaderNotAvailableException: 分區(qū)的副本不可用,這通常出現(xiàn)在leader換屆選舉期間,因此通常是順時(shí)的異常,重試之后可以自行恢復(fù)

  • NotControllerException :controller當(dāng)前不可用(controller是kafka集群中非常重要的角色)。這通常表名controller在經(jīng)歷新一輪的選舉,這也是可以通過重試機(jī)制進(jìn)行恢復(fù)的。

  • NetworkException:網(wǎng)絡(luò)瞬時(shí)故障導(dǎo)致的異常,可重試

對(duì)于可重試的異常,如果在producer程序中配置了重試次數(shù),那么只要在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù)了,便不會(huì)出現(xiàn)在onCompletion的exception中。不過若超過了重試次數(shù)仍舊沒有成功,則會(huì)被封裝進(jìn)exception中。此時(shí)就需要producer程序自行處理這種異常。

所有可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類。理論上講所有未繼承自RetriableException類的其他異常都屬于不可重試異常,這類異常通常都表明了一些非常嚴(yán)重或kafka無法處理的問題,與producer相關(guān)的如下。

  • RecordTooLargeException: 發(fā)送的消息尺寸過大,超過了規(guī)定的大小上限。顯然這種異常無論如何重試都是無法成功的。

  • SerializationException:序列化失敗異常,這也是無法恢復(fù)的。

  • KafkaException:其他類型的異常

所有這些不可重試異常一旦被捕獲都會(huì)被封裝進(jìn)Future的計(jì)算結(jié)果并返回給producer程序,用戶需要自行處理這些異常。由于不可重試異常和可重試異常在producer程序端可能有不同的處理邏輯,因此可以使用下面的代碼進(jìn)行區(qū)分:

producer.send(record,new Callback()){
     @Override
     puvlic void onCompletion(RecordMetadata metadata,Exception exception){
            if(exception == null){
                  //消息發(fā)送成
            }else {
                  if(exception instanceof RetriableException){
                        //處理可重試異常
                  }else{
                        //處理不可重試異常
                  }
            }
      }
}

5.關(guān)閉producer

producer程序結(jié)束時(shí)一定要關(guān)閉producer!,這一點(diǎn)怎么強(qiáng)調(diào)都不為過。畢竟producer程序運(yùn)行時(shí)占用了系統(tǒng)資源(比如創(chuàng)建了額外的線程、申請(qǐng)了很多內(nèi)存以及創(chuàng)建了多個(gè)socket連接等)因此必須要顯式地調(diào)用KafkaProducer.close方法關(guān)閉producer。不管發(fā)送是成功還是失敗,只要producer程序完成了既定的工作,就應(yīng)該被關(guān)閉。

如果是調(diào)用普通的無參數(shù)close方法,producer會(huì)被允許先處理之前的發(fā)送請(qǐng)求后再關(guān)閉,即所謂的“優(yōu)雅”關(guān)閉退出(graceful shutdown);同時(shí),kafkaProducer還提供了一個(gè)帶超時(shí)的參數(shù)close(timeout)。如果調(diào)用此方法,prodcuer會(huì)等待timeout時(shí)間來完成所有處理中的請(qǐng)求,然后強(qiáng)行退出。這就是說,若timeout超時(shí),則producer會(huì)強(qiáng)制結(jié)束,并立即丟棄所有未發(fā)送以及應(yīng)答的發(fā)送請(qǐng)求。在某種程度上,這會(huì)給用戶一種錯(cuò)覺:仿佛producer端的程序丟失了重要的消息。因此在實(shí)際場(chǎng)景中一定要謹(jǐn)慎使用帶超時(shí)的close方法。

producer 主要參數(shù)

除了前面的boostrap.servers , key.serializer 和value.serializer之外,java版本producer還提供了很多其他重要的參數(shù)。詳細(xì)的參數(shù)列表以及含義和默認(rèn)值可以訪問https://kafka.apache.org/documentation/#producer-configs

acks

acks參數(shù)用于控制producer生產(chǎn)消息的持久性(durability)。對(duì)于producer而言,kafka在乎的 是“已提交”消息的持久性。一旦消息成功提交,那么只要有任何一個(gè)保存了該消息的副本存活,這條消息就會(huì)被視為不可丟失的。經(jīng)常碰到用戶抱怨kafka的producer會(huì)丟消息,其實(shí)這里混淆了一個(gè)概念,即那些所謂的已丟失的消息其實(shí)并沒有被成功寫入kafka。換句話說,他們并沒有被成功提交,因此kafka對(duì)這些消息的持久性不做任何保證——當(dāng)然,producer API確實(shí)提供了毀掉機(jī)制供用戶處理發(fā)送失敗的情況。
具體來說,當(dāng)producer發(fā)送一條消息給kafka集群時(shí),這條消息會(huì)被發(fā)送到指定topic分區(qū)leader所在的broker上,producer等待從該leader broker返回消息的寫入結(jié)果(當(dāng)然并不是無限等待,是由超時(shí)時(shí)間的)以確定消息被成功提交。這一切完成后producer可以繼續(xù)發(fā)送新的消息。kafka能夠保證的是consumer永遠(yuǎn)不會(huì)讀取到尚未提交完成的消息——這和關(guān)系型數(shù)據(jù)庫類似,即在大部分情況下,某個(gè)事物的SQL查詢都不會(huì)看到另一個(gè)事物中尚未提交的數(shù)據(jù)。
顯然,leader broker何時(shí)發(fā)送寫入結(jié)果返回給producer就是一個(gè)需要仔細(xì)考慮的問題了,
它也會(huì)直接影響到消息的持久性甚至是producer端的吞吐量:producer端越快的接收到leader broker響應(yīng),它就越快地發(fā)送下一條消息,即吞吐量也就越大。producer端的acls參數(shù)就是用來控制做這件事的。acks指定了在給producer發(fā)送響應(yīng)前,leader broker必須要確保已成功寫入消息的副本數(shù)。當(dāng)前acks有3個(gè)取值:0、1和all

  • acks = 0:表示producer完全不理睬leader端的處理結(jié)果,此時(shí),producer發(fā)送消息后立即開啟下一條消息的發(fā)送,根本不等待leaderr broker端返回結(jié)果。由于不接收發(fā)送結(jié)果,因此在這種情況下producer.send的回調(diào)也就完全失去了作用,即用戶無法通過回調(diào)機(jī)制感知任何發(fā)送過程中的失敗,所以acks = 0時(shí)producer并不保證消息會(huì)被成功發(fā)送。但凡事有利有弊,由于不需要等待響應(yīng)結(jié)果,通常這種設(shè)置下producer的吞吐量是最高的。

  • acks = all或者-1:表示當(dāng)消息發(fā)送時(shí),leader broker不僅會(huì)將消息寫入本地日志,同時(shí)還會(huì)等待ISR中所有其他副本都成功寫入他們各自的本地日志后,才發(fā)送響應(yīng)結(jié)果給producer。顯然當(dāng)設(shè)置acks=all時(shí),只要ISR中至少有一個(gè)副本處于存活的狀態(tài),那么這條消息就肯定不會(huì)丟失,因而可以達(dá)到最高的消息持久性,但通常這種設(shè)置下producer的吞吐量也是最低的。

  • acks = 1:是0和all的折中方案,也就是默認(rèn)的參數(shù)值。producer發(fā)送消息后leader boker僅將該消息寫入本地日志,然后便發(fā)送響應(yīng)結(jié)果給producer,而無需等待ISR中其他副本寫入該消息。那么此時(shí)只要該leader broker一直存活,kafka就能夠保證這條消息的持久性,同時(shí)也保證了producer的吞吐量。

總結(jié)一下,acks參數(shù)控制producer實(shí)現(xiàn)不同程度的消息持久性,他有3個(gè)取值,對(duì)應(yīng)的優(yōu)缺點(diǎn)以及使用的場(chǎng)景如下:


image.png

在producer程序中設(shè)置acks非常簡(jiǎn)單,只需要在構(gòu)造kafkaProducer的Properties對(duì)象中增加“acks”屬性即可:

props.put("acks","1");
//或者
props.put(ProducerConfig.ACKS_CONFIG,"1");

值得注意的是,該參數(shù)的類型是字符串,因此必須要寫成“1”而不是1,否則程序會(huì)報(bào)錯(cuò),提示你沒有指定正確的參數(shù)類型。

buffer.memory

...待續(xù)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容