本章著重討論kafka的producer設(shè)計(jì)以及基于java版本producer的開發(fā)與使用
producer概覽
kafka producer就是負(fù)責(zé)向kafka寫入數(shù)據(jù)的應(yīng)用程序。
子0.9.0.0版本起,kafka發(fā)布了java版本producer供用戶使用,但作為一個(gè)比較完善的生態(tài)系統(tǒng),kafka必然要支持多種語言的producer,這其中比較著名的當(dāng)屬C/C++平臺(tái)上的producer庫librkafka(準(zhǔn)確的說,這些庫也同時(shí)包含了對(duì)consumer的支持,統(tǒng)稱他們是clients似乎更加合理一些),而向python,go或.net這種主流的編程語言也有對(duì)應(yīng)的producer庫。當(dāng)前apache kafka支持的第三方ciients庫的完整列表如下:
- C/C++
- python
- golang
- erlang
- .NET
- clojure
- ruby
- Node.js
- proxy(HTTP REST等)
- perl
- stdin/stdout
- PHP
- rust
- alternative java
- storm
- scala DSL
- ...
值得注意的是,上面這些第三方庫基本上都是由非Apache Kafka社區(qū)的人維護(hù)的,其中一些比較大的庫已經(jīng)由Confluent公司的人參與研發(fā)和維護(hù)(前面提到過的Confluent公司發(fā)布的企業(yè)級(jí)產(chǎn)品包含了librdkafka庫支持C++等平臺(tái)),但如果用戶下載的是Apache Kafka,默認(rèn)是不包含這些庫的,需要額外單獨(dú)下載對(duì)應(yīng)的庫。關(guān)于這些第三方庫的詳細(xì)信息以及下載地址,請(qǐng)?jiān)L問https://cwiki.apache.org/confluence/display/KAFKA/Clients。
另外,Apache Kafka還封裝了一套二進(jìn)制通信協(xié)議,用于對(duì)外提供各種各樣的服務(wù)。對(duì)于producer而言,用戶幾乎可以直接使用任意編程語言按照該協(xié)議的格式進(jìn)行編程,從而實(shí)現(xiàn)向Kafka發(fā)送消息。實(shí)際上內(nèi)置的Java版本producer和上面列出的所有第三方庫在底層都是相同的實(shí)現(xiàn)原理,只是在易用性和性能方面有所差別而已。這組協(xié)議本質(zhì)上為不同的協(xié)議類型分別定義了專屬的緊湊二進(jìn)制字節(jié)數(shù)組格式,然后通過Socket發(fā)送給合適的broker,之后等待broker處理完成后返還響應(yīng)(response)給producer。這樣設(shè)計(jì)的好處在于具有良好的統(tǒng)一性——即所有的協(xié)議類型都是統(tǒng)一格式的,并且由于是自定義的二進(jìn)制格式,這套協(xié)議并不依賴任何外部序列化框架,從而顯得非常輕量級(jí),而且也有很好的擴(kuò)展性。
第6章將詳細(xì)討論這套二進(jìn)制通信協(xié)議的設(shè)計(jì)與使用。在這里只需要知道producer底層是由它們實(shí)現(xiàn)的即可。
Kafka producer在設(shè)計(jì)上要比consumer簡(jiǎn)單一些,因?yàn)樗簧婕皬?fù)雜的組管理操作,即每個(gè)producer都是獨(dú)立進(jìn)行工作的,與其他producer實(shí)例之間沒有關(guān)聯(lián),因此它受到的牽絆自然也要少得多,實(shí)現(xiàn)起來也要簡(jiǎn)單得多。目前producer的首要功能就是向某個(gè)topic的某個(gè)分區(qū)發(fā)送一條消息,所以它首先需要確認(rèn)到底要向topic的哪個(gè)分區(qū)寫入消息——這就是分區(qū)器(partitioner)要做的事情。Kafka producer提供了一個(gè)默認(rèn)的分區(qū)器。對(duì)于每條待發(fā)送的消息而言,如果該消息指定了key,那么該partitioner會(huì)根據(jù)key的哈希值來選擇目標(biāo)分區(qū);若這條消息沒有指定key,則partitioner使用輪詢的方式確認(rèn)目標(biāo)分區(qū)——這樣可以最大限度地確保消息在所有分區(qū)上的均勻性。當(dāng)然producer的API賦予了用戶自行指定目標(biāo)分區(qū)的權(quán)力,即用戶可以在消息發(fā)送時(shí)跳過partitioner直接指定要發(fā)送到的分區(qū)。
另外,producer也允許用戶實(shí)現(xiàn)自定義的分區(qū)策略而非使用默認(rèn)的partitioner,這樣用戶可以很靈活地根據(jù)自身的業(yè)務(wù)需求確定不同的分區(qū)策略。后面章節(jié)中會(huì)詳細(xì)討論如何自定義分區(qū)策略。
有了partitioner的幫助,我們就可以確信具有相同key的所有消息都會(huì)被路由到相同的分區(qū)中。這有助于實(shí)現(xiàn)一些特定的業(yè)務(wù)需求,比如可以利用局部性原理,將某些producer發(fā)送的消息固定地發(fā)送到相同機(jī)架上的分區(qū)從而減少網(wǎng)絡(luò)傳輸?shù)拈_銷等。當(dāng)然了,如前所述,如果沒有指定key,那么所有消息會(huì)被均勻地發(fā)送到所有分區(qū),而這通常也是最合理的分區(qū)策略。
在確認(rèn)了目標(biāo)分區(qū)后,producer要做的第二件事情就是要尋找這個(gè)分區(qū)對(duì)應(yīng)的leader,也就是該分區(qū)leader副本所在的Kafka broker。前面章節(jié)中提到了每個(gè)topic分區(qū)都由若干個(gè)副本組成,其中的一個(gè)副本充當(dāng)leader的角色,也只有l(wèi)eader才能夠響應(yīng)clients發(fā)送過來的請(qǐng)求,而剩下的副本中有一部分副本會(huì)與leader副本保持同步,即所謂的ISR。因此在發(fā)送消息時(shí),producer也就有了多種選擇來實(shí)現(xiàn)消息發(fā)送。比如不等待任何副本的響應(yīng)便返回成功,或者只是等待leader副本響應(yīng)寫入操作之后再返回成功等。不同的選擇也有著不同的優(yōu)缺點(diǎn),我們會(huì)在后續(xù)章節(jié)中討論如何選擇不同的策略。
Java版本producer的工作原理如圖4.1所示。

producer首先使用一個(gè)線程(用戶主線程,也就是用戶啟動(dòng)producer的線程)將待發(fā)送的消息封裝進(jìn)一個(gè)ProducerRecord類實(shí)例,然后將其序列化之后發(fā)送給partitioner,再由后者確定了目標(biāo)分區(qū)后一同發(fā)送到位于producer程序中的一塊內(nèi)存緩沖區(qū)中。而producer的另一個(gè)工作線程(I/O發(fā)送線程,也稱Sender線程)則負(fù)責(zé)實(shí)時(shí)地從該緩沖區(qū)中提取出準(zhǔn)備就緒的消息封裝進(jìn)一個(gè)批次(batch),統(tǒng)一發(fā)送給對(duì)應(yīng)的broker。整個(gè)producer的工作流程大概就是這樣的。
構(gòu)造producer
producer程序?qū)嵗?/h3>
首先,下面給出了一份可運(yùn)行的producer程序代碼清單。這份代碼實(shí)現(xiàn)了最簡(jiǎn)單的功能:構(gòu)造一條消息,然后發(fā)送給Kafka。在運(yùn)行這個(gè)producer程序之前,要保證啟動(dòng)一個(gè)最小規(guī)模的Kafka單機(jī)或集群環(huán)境。Kafka運(yùn)行環(huán)境搭建方法請(qǐng)參考第3章。
package com.huxi.kafkaapi;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class ProducerTest {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // 必須指定
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必須指定
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 必須指定
props.put("acks", "-1");
props.put("retries", 3);
props.put("batch.size", 323840);
props.put("linger.ms", 10);
props.put("buffer.memory", 33554432);
props.put("max.block.ms", 3000);
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
}
}
構(gòu)造一個(gè)producer實(shí)例大致需要以下5個(gè)步驟。
構(gòu)造一個(gè)java.util.Properties對(duì)象,然后至少指定bootstrap.servers、key.serializer和value.serializer這3個(gè)屬性。在上面的代碼清單中這3個(gè)屬性后面都追加了注釋,表明這是必須要指定的參數(shù),它們沒有默認(rèn)值。
使用上一步中創(chuàng)建的Properties實(shí)例構(gòu)造KafkaProducer對(duì)象。
構(gòu)造待發(fā)送的消息對(duì)象ProducerRecord,指定消息要被發(fā)送到的topic、分區(qū)以及對(duì)應(yīng)的key和value。注意,分區(qū)和key信息可以不用指定,由Kafka自行確定目標(biāo)分區(qū)。
調(diào)用KafkaProducer的send方法發(fā)送消息。
關(guān)閉KafkaProducer。
1. 構(gòu)造Properties對(duì)象
下面將詳細(xì)展開每一步要做的事情。首先要構(gòu)造一個(gè)Properties對(duì)象,在這一步中有3個(gè)參數(shù)或?qū)傩允潜仨氁付ǖ?。如果我們翻開Kafka官網(wǎng)中producer的參數(shù)列表(詳見https://kafka.apache.org/documentation/#producerconfigs)會(huì)發(fā)現(xiàn)這3個(gè)參數(shù)是沒有默認(rèn)值的。它們分別如下。
bootstrap.servers
該參數(shù)指定了一組host:port對(duì),用于創(chuàng)建向Kafka broker服務(wù)器的連接,比如k1:9092,k2:9092,k3:9092。上面的代碼清單中指定了localhost:9092,producer使用時(shí)需要替換成實(shí)際的broker列表。如果Kafka集群中機(jī)器數(shù)很多,那么只需要指定部分broker即可,不需要列出所有的機(jī)器。因?yàn)椴还苤付◣着_(tái)機(jī)器,producer都會(huì)通過該參數(shù)找到并發(fā)現(xiàn)集群中所有的broker。為該參數(shù)指定多臺(tái)機(jī)器只是為了故障轉(zhuǎn)移使用。這樣即使某一臺(tái)broker掛掉了,producer重啟后依然可以通過該參數(shù)指定的其他broker連入Kafka集群。
另外,如果broker端沒有顯式配置listeners使用IP地址,那么最好將該參數(shù)也配置成主機(jī)名,而不是IP地址。因?yàn)镵afka內(nèi)部使用的就是FQDN(Fully Qualified Domain Name)。
key.serializer
被發(fā)送到broker端的任何消息的格式都必須是字節(jié)數(shù)組,因此消息的各個(gè)組件必須首先做序列化,然后才能發(fā)送到broker。該參數(shù)就是為消息的key做序列化之用的。這個(gè)參數(shù)指定的是實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類的全限定名稱。Kafka為大部分的初始類型(primitive type)默認(rèn)提供了現(xiàn)成的序列化器。上面的代碼清單中使用了org.apache.kafka.common.serialization.StringSerializer,該類會(huì)將一個(gè)字符串類型轉(zhuǎn)換成字節(jié)數(shù)組。這個(gè)參數(shù)也揭示了一個(gè)事實(shí),那就是用戶可以自定義序列化器,只要實(shí)現(xiàn)Serializer接口即可。
需要注意的是,即使producer程序在發(fā)送消息時(shí)不指定key,這個(gè)參數(shù)也是必須要設(shè)置的,否則程序會(huì)拋出ConfigException異常,提示“key.serializer”參數(shù)無默認(rèn)值,必須要配置。
value.serializer
和key.serializer類似,只是它被用來對(duì)消息體(即消息value)部分做序列化,將消息value部分轉(zhuǎn)換成字節(jié)數(shù)組。上面的代碼清單中該參數(shù)指定了與key.serializer相同的值,即都使用StringSerializer。當(dāng)然了,value.serializer也可以設(shè)置成與key.serializer不同的值。
一定要注意的是,這兩個(gè)參數(shù)都必須是全限定類名。
2. 構(gòu)造KafkaProducer對(duì)象
設(shè)置了這3個(gè)屬性之后,下面就要構(gòu)造KafkaProducer對(duì)象了。KafkaProducer是producer的主入口,所有的功能基本上都是由KafkaProducer來提供的。創(chuàng)建KafkaProducer實(shí)例很簡(jiǎn)單,只需要下面一句命令即可:

創(chuàng)建producer時(shí)也可以同時(shí)指定key和value的序列化類,比如這樣:

如果采用這樣的方式創(chuàng)建producer,那么就不需要顯式地在properties中指定key和value的序列化類了。
構(gòu)造ProduceRecord對(duì)象
構(gòu)造好kafkaProdcer實(shí)例后,下一步就是構(gòu)造消息實(shí)例。java版本producer使用ProducerRecord類來表示每條消息。創(chuàng)建prodducerRecord也很簡(jiǎn)答,最簡(jiǎn)單的形式就是指定topic和value,如下:

當(dāng)然,producerRecord還支持指定更多的消息消息,比如可以控制該消息直接被發(fā)往的分布以及消息的時(shí)間戳,
具體API格式請(qǐng)參見https://kafka.apache.org/10/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html
不過這里要注意的是,一定要謹(jǐn)慎指定時(shí)間戳,因?yàn)樵谀壳暗膋afka設(shè)計(jì)中,時(shí)間戳索引文件中的索引都是按照時(shí)間戳順序排列的,所以如果在producer端隨意指定時(shí)間戳,會(huì)導(dǎo)致該信息的時(shí)間序列混亂,這樣在根據(jù)時(shí)間戳查詢位移的功能時(shí)不會(huì)找到這條消息。同時(shí)kafka的消息留存策略也會(huì)受到影響,因此最好還是然讓kafka自行來指定戳比較好。
發(fā)送消息
發(fā)送消息的主方法是send,producer在地城完全的實(shí)現(xiàn)了異步化,并且通過java的tuture同時(shí)實(shí)現(xiàn)了同步和異步發(fā)送 + 回調(diào)兩種發(fā)送方式。
異步發(fā)送
實(shí)際上所有的寫入操作都是異步的。java版本的producer的send方法會(huì)返回一個(gè)java future對(duì)象供用戶稍后獲取發(fā)送結(jié)果,這就是所謂的回調(diào)機(jī)制。代碼如下:
producer.send(record,new CallBack(){
@Override
public void onCompletion(RecordMetadata metadata,Exception exception){
if(exception == null){
//消息發(fā)送成功
}else{
//執(zhí)行錯(cuò)誤處理邏輯
}
}
});
上面代碼中的CallBack就是發(fā)送消息后的回調(diào)類,實(shí)現(xiàn)方法是onCompletion。該方法的兩個(gè)輸入?yún)?shù)metadata,和exception會(huì)同時(shí)非空,也就是說至少一個(gè)是null。當(dāng)消息發(fā)送成功時(shí),exception是null;繁殖,若消息發(fā)送失敗,metadata就是null。因此在寫producer程序時(shí),最好寫if判斷。
另外,上面的Callbacl實(shí)際上是一個(gè)java接口,用戶可以創(chuàng)建自定義的CallBack實(shí)現(xiàn)類來處理消息發(fā)送后的邏輯,只需要該具體類實(shí)現(xiàn)org.apache.kafka.clients.producer.CallBack接口即可
同步發(fā)送
同步發(fā)送和異步發(fā)送其實(shí)就是通過java的future來區(qū)分的,調(diào)用Future.get()無限等待結(jié)果返回,即實(shí)現(xiàn)同步發(fā)送的效果
ProduceRecord<String,String> record = new ProducerRecord<>("test",Integer.toString(i));
producer.send(record).get();
使用Future.get會(huì)一直等待下去直至kafka broker將發(fā)送結(jié)果返回給producer程序。當(dāng)結(jié)果從broker處返回時(shí)gett方法要么返回發(fā)送結(jié)果要么跑出異常交由producer自行處理。如果沒有錯(cuò)誤,get方法將返回對(duì)應(yīng)的RecordMetadata實(shí)例(包含了已發(fā)送消息的所有元數(shù)據(jù)信息)包括消息發(fā)送的topic、分區(qū)以及該消息在對(duì)應(yīng)分區(qū)的位移信息。
不管是同步發(fā)送還是異步發(fā)送,發(fā)送都有可能失敗,導(dǎo)致返回異常錯(cuò)誤。當(dāng)前kafka的錯(cuò)誤類型包含了兩類:可重試異常和不可重試異常。常見的可重試異常吐下。
LeaderNotAvailableException: 分區(qū)的副本不可用,這通常出現(xiàn)在leader換屆選舉期間,因此通常是順時(shí)的異常,重試之后可以自行恢復(fù)
NotControllerException :controller當(dāng)前不可用(controller是kafka集群中非常重要的角色)。這通常表名controller在經(jīng)歷新一輪的選舉,這也是可以通過重試機(jī)制進(jìn)行恢復(fù)的。
NetworkException:網(wǎng)絡(luò)瞬時(shí)故障導(dǎo)致的異常,可重試
對(duì)于可重試的異常,如果在producer程序中配置了重試次數(shù),那么只要在規(guī)定的重試次數(shù)內(nèi)自行恢復(fù)了,便不會(huì)出現(xiàn)在onCompletion的exception中。不過若超過了重試次數(shù)仍舊沒有成功,則會(huì)被封裝進(jìn)exception中。此時(shí)就需要producer程序自行處理這種異常。
所有可重試異常都繼承自org.apache.kafka.common.errors.RetriableException抽象類。理論上講所有未繼承自RetriableException類的其他異常都屬于不可重試異常,這類異常通常都表明了一些非常嚴(yán)重或kafka無法處理的問題,與producer相關(guān)的如下。
RecordTooLargeException: 發(fā)送的消息尺寸過大,超過了規(guī)定的大小上限。顯然這種異常無論如何重試都是無法成功的。
SerializationException:序列化失敗異常,這也是無法恢復(fù)的。
KafkaException:其他類型的異常
所有這些不可重試異常一旦被捕獲都會(huì)被封裝進(jìn)Future的計(jì)算結(jié)果并返回給producer程序,用戶需要自行處理這些異常。由于不可重試異常和可重試異常在producer程序端可能有不同的處理邏輯,因此可以使用下面的代碼進(jìn)行區(qū)分:
producer.send(record,new Callback()){
@Override
puvlic void onCompletion(RecordMetadata metadata,Exception exception){
if(exception == null){
//消息發(fā)送成
}else {
if(exception instanceof RetriableException){
//處理可重試異常
}else{
//處理不可重試異常
}
}
}
}
5.關(guān)閉producer
producer程序結(jié)束時(shí)一定要關(guān)閉producer!,這一點(diǎn)怎么強(qiáng)調(diào)都不為過。畢竟producer程序運(yùn)行時(shí)占用了系統(tǒng)資源(比如創(chuàng)建了額外的線程、申請(qǐng)了很多內(nèi)存以及創(chuàng)建了多個(gè)socket連接等)因此必須要顯式地調(diào)用KafkaProducer.close方法關(guān)閉producer。不管發(fā)送是成功還是失敗,只要producer程序完成了既定的工作,就應(yīng)該被關(guān)閉。
如果是調(diào)用普通的無參數(shù)close方法,producer會(huì)被允許先處理之前的發(fā)送請(qǐng)求后再關(guān)閉,即所謂的“優(yōu)雅”關(guān)閉退出(graceful shutdown);同時(shí),kafkaProducer還提供了一個(gè)帶超時(shí)的參數(shù)close(timeout)。如果調(diào)用此方法,prodcuer會(huì)等待timeout時(shí)間來完成所有處理中的請(qǐng)求,然后強(qiáng)行退出。這就是說,若timeout超時(shí),則producer會(huì)強(qiáng)制結(jié)束,并立即丟棄所有未發(fā)送以及應(yīng)答的發(fā)送請(qǐng)求。在某種程度上,這會(huì)給用戶一種錯(cuò)覺:仿佛producer端的程序丟失了重要的消息。因此在實(shí)際場(chǎng)景中一定要謹(jǐn)慎使用帶超時(shí)的close方法。
producer 主要參數(shù)
除了前面的boostrap.servers , key.serializer 和value.serializer之外,java版本producer還提供了很多其他重要的參數(shù)。詳細(xì)的參數(shù)列表以及含義和默認(rèn)值可以訪問https://kafka.apache.org/documentation/#producer-configs
acks
acks參數(shù)用于控制producer生產(chǎn)消息的持久性(durability)。對(duì)于producer而言,kafka在乎的 是“已提交”消息的持久性。一旦消息成功提交,那么只要有任何一個(gè)保存了該消息的副本存活,這條消息就會(huì)被視為不可丟失的。經(jīng)常碰到用戶抱怨kafka的producer會(huì)丟消息,其實(shí)這里混淆了一個(gè)概念,即那些所謂的已丟失的消息其實(shí)并沒有被成功寫入kafka。換句話說,他們并沒有被成功提交,因此kafka對(duì)這些消息的持久性不做任何保證——當(dāng)然,producer API確實(shí)提供了毀掉機(jī)制供用戶處理發(fā)送失敗的情況。
具體來說,當(dāng)producer發(fā)送一條消息給kafka集群時(shí),這條消息會(huì)被發(fā)送到指定topic分區(qū)leader所在的broker上,producer等待從該leader broker返回消息的寫入結(jié)果(當(dāng)然并不是無限等待,是由超時(shí)時(shí)間的)以確定消息被成功提交。這一切完成后producer可以繼續(xù)發(fā)送新的消息。kafka能夠保證的是consumer永遠(yuǎn)不會(huì)讀取到尚未提交完成的消息——這和關(guān)系型數(shù)據(jù)庫類似,即在大部分情況下,某個(gè)事物的SQL查詢都不會(huì)看到另一個(gè)事物中尚未提交的數(shù)據(jù)。
顯然,leader broker何時(shí)發(fā)送寫入結(jié)果返回給producer就是一個(gè)需要仔細(xì)考慮的問題了,
它也會(huì)直接影響到消息的持久性甚至是producer端的吞吐量:producer端越快的接收到leader broker響應(yīng),它就越快地發(fā)送下一條消息,即吞吐量也就越大。producer端的acls參數(shù)就是用來控制做這件事的。acks指定了在給producer發(fā)送響應(yīng)前,leader broker必須要確保已成功寫入消息的副本數(shù)。當(dāng)前acks有3個(gè)取值:0、1和all
acks = 0:表示producer完全不理睬leader端的處理結(jié)果,此時(shí),producer發(fā)送消息后立即開啟下一條消息的發(fā)送,根本不等待leaderr broker端返回結(jié)果。由于不接收發(fā)送結(jié)果,因此在這種情況下producer.send的回調(diào)也就完全失去了作用,即用戶無法通過回調(diào)機(jī)制感知任何發(fā)送過程中的失敗,所以acks = 0時(shí)producer并不保證消息會(huì)被成功發(fā)送。但凡事有利有弊,由于不需要等待響應(yīng)結(jié)果,通常這種設(shè)置下producer的吞吐量是最高的。
acks = all或者-1:表示當(dāng)消息發(fā)送時(shí),leader broker不僅會(huì)將消息寫入本地日志,同時(shí)還會(huì)等待ISR中所有其他副本都成功寫入他們各自的本地日志后,才發(fā)送響應(yīng)結(jié)果給producer。顯然當(dāng)設(shè)置acks=all時(shí),只要ISR中至少有一個(gè)副本處于存活的狀態(tài),那么這條消息就肯定不會(huì)丟失,因而可以達(dá)到最高的消息持久性,但通常這種設(shè)置下producer的吞吐量也是最低的。
acks = 1:是0和all的折中方案,也就是默認(rèn)的參數(shù)值。producer發(fā)送消息后leader boker僅將該消息寫入本地日志,然后便發(fā)送響應(yīng)結(jié)果給producer,而無需等待ISR中其他副本寫入該消息。那么此時(shí)只要該leader broker一直存活,kafka就能夠保證這條消息的持久性,同時(shí)也保證了producer的吞吐量。
總結(jié)一下,acks參數(shù)控制producer實(shí)現(xiàn)不同程度的消息持久性,他有3個(gè)取值,對(duì)應(yīng)的優(yōu)缺點(diǎn)以及使用的場(chǎng)景如下:

在producer程序中設(shè)置acks非常簡(jiǎn)單,只需要在構(gòu)造kafkaProducer的Properties對(duì)象中增加“acks”屬性即可:
props.put("acks","1");
//或者
props.put(ProducerConfig.ACKS_CONFIG,"1");
值得注意的是,該參數(shù)的類型是字符串,因此必須要寫成“1”而不是1,否則程序會(huì)報(bào)錯(cuò),提示你沒有指定正確的參數(shù)類型。
buffer.memory
...待續(xù)