不管是把 Kafka 作為消息隊(duì)列、消息、總線還是數(shù)據(jù)存儲(chǔ)平臺(tái)來(lái)使用 ,總是需要有一個(gè)可以往 Kafka 寫(xiě)入數(shù)據(jù)的生產(chǎn)者和一個(gè)可以從 Kafka讀取數(shù)據(jù)的消費(fèi)者,或者一個(gè)兼具兩種角 色的應(yīng)用程序。
例如,在一個(gè)信用卡事務(wù)處理系統(tǒng)里,有一個(gè)客戶端應(yīng)用程序,它可能是一個(gè)在線商店, 每當(dāng)有支付行為發(fā)生時(shí),它負(fù)責(zé)把事務(wù)發(fā)送到 Kafka上。另一個(gè)應(yīng)用程序根據(jù)規(guī)則引擎檢 查這個(gè)事務(wù),決定是批準(zhǔn)還是拒絕。 批準(zhǔn)或拒絕的響應(yīng)消息被寫(xiě)回 Kafka,然后發(fā)送給發(fā)起事務(wù)的在線商店。第三個(gè)應(yīng)用程序從 Kafka上讀取事務(wù)和審核狀態(tài),把它們保存到數(shù)據(jù) 庫(kù), 隨后分析師可以對(duì)這些結(jié)果進(jìn)行分析,或許還能借此改進(jìn)規(guī)則引擎 。
開(kāi)發(fā)者們可以使用 Kafka 內(nèi)置的客戶端 API開(kāi)發(fā) Kafka應(yīng)用程序。
在這一章,我們將從 Kafra生產(chǎn)者的設(shè)計(jì)和組件講起,學(xué)習(xí)如何使用 Kafka生產(chǎn)者。我們將展示如何創(chuàng)建 KafkaProducer和 ProducerRecords對(duì)象、如何將記錄發(fā)送給 Kafka,以及如何處理從 Kafka 返回的錯(cuò)誤,然后介紹用干控制生產(chǎn)者行為的重要配置選項(xiàng),最后深入 探討如何使用不同的分區(qū)方法和序列化器,以及如何自定義序列化器和分區(qū)器 。
在下一章,我們將會(huì)介紹 Kafra的悄費(fèi)者客戶端,以及如何從 Kafka讀取消息。
生產(chǎn)者概覽
一個(gè)應(yīng)用程序在很多情況下需要往 Kafka 寫(xiě)入消息 : 記錄用戶的活動(dòng)(用于審計(jì)和分析 )、 記錄度量指標(biāo)、保存日志、消息、記錄智能家電的信息、與其他應(yīng)用程序進(jìn)行異步通信、 緩沖即將寫(xiě)入到數(shù)據(jù)庫(kù)的數(shù)據(jù),等等。
多樣的使用場(chǎng)景意味著多樣的需求:是否每個(gè)消息都很重要?是否允許丟失 一 小部分消息?偶爾出現(xiàn)重復(fù)消息是否可以接受?是否有嚴(yán)格的延遲和吞吐量要求?
在之前提到的信用卡事務(wù)處理系統(tǒng)里,消息丟失或消息重復(fù)是不允許的,可以接受的延遲最大為 500ms,對(duì)吞吐量要求較高,我們希望每秒鐘可以處理一百萬(wàn)個(gè)消息。
保存網(wǎng)站的點(diǎn)擊信息是另 一種使用場(chǎng)景。在這個(gè)場(chǎng)景里,允許丟失少量的消息或出現(xiàn)少量 的消息重復(fù),延遲可以高一些,只要不影響用戶體驗(yàn)就行。換句話說(shuō),只要用戶點(diǎn)擊鏈接 后可以馬上加載頁(yè)面,那么我們并不介意消息要在幾秒鐘之后才能到達(dá) Kafka 服務(wù)器。 吞 吐量則取決于網(wǎng)站用戶使用網(wǎng)站的頻度。
不同的使用場(chǎng)景對(duì)生產(chǎn)者 API 的使用和配置會(huì)有直接的影響。
盡管生產(chǎn)者 API 使用起來(lái)很簡(jiǎn)單 ,但消息的發(fā)送過(guò)程還是有點(diǎn)復(fù)雜的。下圖展示 了向Kafka 發(fā)送消息的主要步驟。
Kafka 生產(chǎn)者組件圖
我們從創(chuàng)建 一個(gè) ProducerRecord 對(duì)象開(kāi)始, ProducerRecord 對(duì)象需要包含目標(biāo)主題和要發(fā)送的內(nèi)容。我們還可以指定鍵或分區(qū)。在發(fā)送 ProducerRecord對(duì)象時(shí),生產(chǎn)者要先把鍵和 值對(duì)象序列化成字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡(luò)上傳輸 。
接下來(lái),數(shù)據(jù)被傳給分區(qū)器。如果之前在 ProducerRecord對(duì)象里指定了分區(qū),那么分區(qū)器就不會(huì)再做任何事情,直接把指定的分區(qū)返回。如果沒(méi)有指定分區(qū) ,那么分區(qū)器會(huì)根據(jù) ProducerRecord對(duì)象的鍵來(lái)選擇一個(gè)分區(qū) 。選好分區(qū)以后 ,生產(chǎn)者就知道該往哪個(gè)主題和分區(qū)發(fā)送這條記錄了。緊接著,這條記錄被添加到一個(gè)記錄批次里,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。有一個(gè)獨(dú)立的線程負(fù)責(zé)把這些記錄批次發(fā)送到相應(yīng)的 broker 上。
服務(wù)器在收到這些消息時(shí)會(huì)返回一個(gè)響應(yīng)。如果消息成功寫(xiě)入 Kafka,就返回 一 個(gè) RecordMetaData 對(duì)象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量。如果寫(xiě)入 失敗, 就會(huì)返回 一個(gè)錯(cuò)誤 。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗,就返回錯(cuò)誤信息。
創(chuàng)建Kafka生產(chǎn)者
要往 Kafka寫(xiě)入消息,首先要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象,井設(shè)置一些屬性。
下面的代碼片段展示了如何創(chuàng)建一個(gè)新的生產(chǎn)者,這里只指定了必要的屬性,其他使用默認(rèn)設(shè)置。
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers","broker1:9092,broker2:9092");
kafkaProps.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.seializer","org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
Kafka生產(chǎn)者有 3個(gè)必選的屬性
bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的broker地址,生產(chǎn)者會(huì)從給定的 broker里查找到其他 broker的信息。不過(guò)建議至少要提供兩個(gè) broker的信息, 一旦其中一個(gè)宕機(jī),生產(chǎn)者仍然能夠連接到集群上。
key.serializer
broker希望接收到的消息的鍵和值都是字節(jié)數(shù)組。生產(chǎn)者接口允許使用參數(shù)化類型,因此可以把 Java對(duì)象作為鍵和值發(fā)送給 broker。這樣的代碼具有良好的可讀性,不過(guò)生產(chǎn)者需要知道如何把這些 Java對(duì)象轉(zhuǎn)換成字節(jié)數(shù)組。 key.serializer必須被設(shè)置為一個(gè)實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer接口的類,生產(chǎn)者會(huì)使用這個(gè)類把鍵對(duì)象序列化成字節(jié)數(shù)組。 Kafka 客戶端默認(rèn)提供了ByteArraySerializer(這個(gè)只做很少的事情)、 StringSerializer和 IntegerSerializer,因此,如果你只使用常見(jiàn)的幾種 Java對(duì)象類型,那么就沒(méi)必要實(shí)現(xiàn)自己的序列化器 。要注意, key.serializer是必須設(shè)置的,就算你打算只發(fā)送值內(nèi)容。
value.serializer
與 key.serializer一樣, value.serializer指定的類會(huì)將值序列化。如果鍵和值都是字符串,可以使用與 key.serializer 一樣的序列化器。如果鍵是整數(shù)類型而值是字符扇 , 那么需要使用不同的序列化器。
發(fā)送消息主要有3種方式:
1、發(fā)送并忘記( fire-and-forget):我們把消息發(fā)送給服務(wù)器,但井不關(guān)心它是否正常到達(dá)。大多數(shù)情況下,消息會(huì)正常到達(dá),因?yàn)?Kafka是高可用的,而且生產(chǎn)者會(huì)自動(dòng)嘗試重發(fā)。不過(guò),使用這種方式有時(shí)候也會(huì)丟失一些消息。
2、同步發(fā)送:我們使用send()方怯發(fā)送消息, 它會(huì)返回一個(gè)Future對(duì)象,調(diào)用get()方法進(jìn)行等待, 就可以知道悄息是否發(fā)送成功。
3、異步發(fā)送:我們調(diào)用 send() 方怯,并指定一個(gè)回調(diào)函數(shù), 服務(wù)器在返回響應(yīng)時(shí)調(diào)用該函數(shù)。
在下面的幾個(gè)例子中 , 我們會(huì)介紹如何使用上述幾種方式來(lái)發(fā)送消息,以及如何處理可能 發(fā)生的異常情況。
本章的所有例子都使用單線程,但其實(shí)生產(chǎn)者是可以使用多線程來(lái)發(fā)送消息的。剛開(kāi)始的 時(shí)候可以使用單個(gè)消費(fèi)者和單個(gè)線程。如果需要更高的吞吐量,可以在生產(chǎn)者數(shù)量不變的 前提下增加線程數(shù)量。如果這樣做還不夠 , 可以增加生產(chǎn)者數(shù)量。
發(fā)送消息到Kafka
最簡(jiǎn)單的同步發(fā)送消息方式如下所示 :
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record);
} catch(Exception e) {
e.printStack();
}
生產(chǎn)者的 send() 方住將 ProducerRecord對(duì)象作為參數(shù),它需要目標(biāo)主題的名字和要發(fā)送的鍵和值對(duì)象,它們都是字符串。鍵和值對(duì)象的類型必須與序列化器和生產(chǎn)者對(duì)象相匹配。
我們使用生產(chǎn)者的 send() 方越發(fā)送 ProducerRecord對(duì)象。從生產(chǎn)者的架構(gòu)圖里可以看到,消息先是被放進(jìn)緩沖區(qū),然后使用單獨(dú)的線程發(fā)送到服務(wù)器端。 send() 方法會(huì)返回一個(gè)包含 RecordMetadata 的 Future對(duì)象,不過(guò)因?yàn)槲覀儠?huì)忽略返回值,所以無(wú)法知道消息是否發(fā)送成功。如果不關(guān)心發(fā)送結(jié)果,那么可以使用這種發(fā)送方式。比如,記錄 Twitter 消息日志,或記錄不太重要的應(yīng)用程序日志。
我們可以忽略發(fā)送消息時(shí)可能發(fā)生的錯(cuò)誤或在服務(wù)器端可能發(fā)生的錯(cuò)誤,但在發(fā)送消息之前,生產(chǎn)者還是有可能發(fā)生其他的異常。這些異常有可能是 SerializationException (說(shuō)明序列化消息失敗)、 BufferExhaustedException 或 TimeoutException (說(shuō)明緩沖區(qū)已滿),又或者是 InterruptException (說(shuō)明發(fā)送線程被中斷)。
同步發(fā)送消息
ProducerRecord<String, String> record = new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try{
producer.send(record).get();
} catch(Exception e) {
e.printStack();
}
在這里, producer.send() 方住先返回一個(gè) Future對(duì)象,然后調(diào)用 Future對(duì)象的 get() 方法等待 Kafka 響應(yīng)。如果服務(wù)器返回錯(cuò)誤, get()方怯會(huì)拋出異常。如果沒(méi)有發(fā)生錯(cuò)誤,我們會(huì)得到一個(gè) RecordMetadata對(duì)象,可以用它獲取消息的偏移量。如果在發(fā)送數(shù)據(jù)之前或者在發(fā)送過(guò)程中發(fā)生了任何錯(cuò)誤 ,比如 broker返回 了一個(gè)不允許重發(fā)消息的異?;蛘咭呀?jīng)超過(guò)了重發(fā)的次數(shù) ,那么就會(huì)拋出異常。我們只是簡(jiǎn)單地把異常信息打印出來(lái)。
如何處理從Kafka生產(chǎn)者返回的錯(cuò)誤
KafkaProducer一般會(huì)發(fā)生兩類錯(cuò)誤。其中一類是可重試錯(cuò)誤 ,這類錯(cuò)誤可以通過(guò)重發(fā)消息來(lái)解決。比如對(duì)于連接錯(cuò)誤,可以通過(guò)再次建立連接來(lái)解決,“無(wú)主(noleader)” 錯(cuò)誤則可 以通過(guò)重新為分區(qū)選舉首領(lǐng)來(lái)解決。 KafkaProducer可以被配置成自動(dòng)重試,如果在多次重試后仍無(wú)能解決問(wèn)題,應(yīng)用程序會(huì)收到一個(gè)重試異常。另一類錯(cuò)誤無(wú)出通過(guò)重試解決 ,比如“消息太大”異常。對(duì)于這類錯(cuò)誤, KafkaProducer不會(huì)進(jìn)行任何重試,直接拋出異常。
異步發(fā)送消息
假設(shè)消息在應(yīng)用程序和 Kafka集群之間一個(gè)來(lái)回需要 10ms。如果在發(fā)送完每個(gè)消息后都等待回應(yīng),那么發(fā)送 100個(gè)消息需要 1秒。但如果只發(fā)送消息而不等待響應(yīng),那么發(fā)送100個(gè)消息所需要的時(shí)間會(huì)少很多。大多數(shù)時(shí)候,我們并不需要等待響應(yīng)——盡管 Kafka 會(huì)把目標(biāo)主題、分區(qū)信息和消息的偏移量發(fā)送回來(lái),但對(duì)于發(fā)送端的應(yīng)用程序來(lái)說(shuō)不是必需的。不過(guò)在遇到消息發(fā)送失敗時(shí),我們需要拋出異常、記錄錯(cuò)誤日志,或者把消息寫(xiě)入 “錯(cuò)誤消息”文件以便日后分析。
為了在異步發(fā)送消息的同時(shí)能夠?qū)Ξ惓G闆r進(jìn)行處理,生產(chǎn)者提供了回調(diào)支持 。下面是使用異步發(fā)送消息、回調(diào)的一個(gè)例子。
生產(chǎn)者的配置
到目前為止 , 我們只介紹了生產(chǎn)者的幾個(gè)必要配置參數(shù)——bootstrap.servers API 以及序列化器。
生產(chǎn)者還有很多可配置的參數(shù),在 Kafka文檔里都有說(shuō)明,它們大部分都有合理的默認(rèn)值 , 所以沒(méi)有必要去修改它們 。不過(guò)有幾個(gè)參數(shù)在內(nèi)存使用、性能和可靠性方面對(duì)生產(chǎn)者影響比較大,接下來(lái)我們會(huì)一一說(shuō)明。
1. acks
acks 參數(shù)指定了必須要有多少個(gè)分區(qū)副本收到消息,生產(chǎn)者才會(huì)認(rèn)為消息寫(xiě)入是成功的。
這個(gè)參數(shù)對(duì)消息丟失的可能性有重要影響。 該參數(shù)有如下選項(xiàng)。
? 如果 acks=0, 生產(chǎn)者在成功寫(xiě)入悄息之前不會(huì)等待任何來(lái)自服務(wù)器的響應(yīng)。也就是說(shuō), 如果當(dāng)中出現(xiàn)了問(wèn)題 , 導(dǎo)致服務(wù)器沒(méi)有收到消息,那么生產(chǎn)者就無(wú)從得知,消息也就丟 失了。不過(guò),因?yàn)樯a(chǎn)者不需要等待服務(wù)器的響應(yīng),所以它可以以網(wǎng)絡(luò)能夠支持的最大 速度發(fā)送消息,從而達(dá)到很高的吞吐量。
? 如果 acks=1,只要集群的首領(lǐng)節(jié)點(diǎn)收到消息,生產(chǎn)者就會(huì)收到 一個(gè)來(lái)自服務(wù)器的成功 響應(yīng)。如果消息無(wú)撞到達(dá)首領(lǐng)節(jié)點(diǎn)(比如首領(lǐng)節(jié)點(diǎn)崩憤,新的首領(lǐng)還沒(méi)有被選舉出來(lái)), 生產(chǎn)者會(huì)收到一個(gè)錯(cuò)誤響應(yīng),為了避免數(shù)據(jù)丟失,生產(chǎn)者會(huì)重發(fā)消息。不過(guò),如果一個(gè) 沒(méi)有收到消息的節(jié)點(diǎn)成為新首領(lǐng),消息還是會(huì)丟失。這個(gè)時(shí)候的吞吐量取決于使用的是 同步發(fā)送還是異步發(fā)送。如果讓發(fā)送客戶端等待服務(wù)器的響應(yīng)(通過(guò)調(diào)用 Future對(duì)象 的 get()方法),顯然會(huì)增加延遲(在網(wǎng)絡(luò)上傳輸一個(gè)來(lái)回的延遲)。如果客戶端使用異步回調(diào),延遲問(wèn)題就可以得到緩解,不過(guò)吞吐量還是會(huì)受發(fā)送中消息數(shù)量的限制(比如,生 產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少個(gè)消息)。
? 如果 acks=all,只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)全部收到消息時(shí),生產(chǎn)者才會(huì)收到一個(gè)來(lái)自服務(wù)器的成功響應(yīng)。這種模式是最安全的,它可以保證不止一個(gè)服務(wù)器收到消息,就算有服務(wù)器發(fā)生崩潰,整個(gè)集群仍然可以運(yùn)行(第 5 章將討論更多的細(xì)節(jié))。不過(guò),它的延遲比 acks=1時(shí)更高,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。
2. buffer.memory
該參數(shù)用來(lái)設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果 應(yīng)用程序發(fā)送消息的速度超過(guò)發(fā)送到服務(wù)器的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候, send()方法調(diào)用要么被阻塞,要么拋出異常,取決于如何設(shè)置 block.on.buffe.full 參數(shù) (在0.9.0.0版本里被替換成了max.block.ms,表示在拋出異常之前可以阻塞一段時(shí)間)。
3. compression.type
默認(rèn)情況下,消息發(fā)送時(shí)不會(huì)被壓縮。該參數(shù)可以設(shè)置為 snappy、 gzip 或 lz4,它指定了消息被發(fā)送給 broker之前使用哪一種壓縮算法進(jìn)行壓縮。 snappy 壓縮算怯由 Google巳發(fā)明, 它占用較少 的 CPU,卻能提供較好的性能和相當(dāng)可觀的壓縮比,如果比較關(guān)注性能和網(wǎng)絡(luò)帶寬,可以使用這種算法。 gzip壓縮算法一般會(huì)占用較多的 CPU,但會(huì)提供更高的壓縮比,所以如果網(wǎng)絡(luò)帶寬比較有限,可以使用這種算法。使用壓縮可以降低網(wǎng)絡(luò)傳輸開(kāi)銷和存儲(chǔ)開(kāi)銷,而這往往是向 Kafka發(fā)送消息的瓶頸所在。
4. retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng))。在這種情況下, retries參數(shù)的值決定了生產(chǎn)者可以重發(fā)消息的次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者會(huì)在每次重試之間等待 1OOms,不過(guò)可以通過(guò) retries.backoff.ms 參數(shù)來(lái)改變這個(gè)時(shí)間間隔。建議在設(shè)置重試次數(shù)和重試時(shí)間間隔之前, 先測(cè)試一下恢復(fù)一個(gè)崩潰節(jié)點(diǎn)需要多少時(shí)間(比如所有分區(qū)選舉出首領(lǐng)需要多長(zhǎng)時(shí)間), 讓總的重試時(shí)間比 Kafka集群從崩潰中恢復(fù)的時(shí)間長(zhǎng),否則生產(chǎn)者會(huì)過(guò)早地放棄重試。不過(guò)有些錯(cuò)誤不是臨時(shí)性錯(cuò)誤,沒(méi)辦怯通過(guò)重試來(lái)解決(比如“悄息太大”錯(cuò)誤)。一般情 況下,因?yàn)樯a(chǎn)者會(huì)自動(dòng)進(jìn)行重試,所以就沒(méi)必要在代碼邏輯里處理那些可重試的錯(cuò)誤。 你只需要處理那些不可重試的錯(cuò)誤或重試次數(shù)超出上限的情況。
5. batch.size
當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在放一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算(而不是消息個(gè)數(shù))。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過(guò)生產(chǎn)者井不一定都會(huì)等到批次被填滿才發(fā)送,半捕 的批次,甚至只包含一個(gè)消息的批次也有可能被發(fā)送。所以就算把批次大小設(shè)置得很大, 也不會(huì)造成延遲,只是會(huì)占用更多的內(nèi)存而已。但如果設(shè)置得太小,因?yàn)樯a(chǎn)者需要更頻繁地發(fā)送消息,會(huì)增加一些額外的開(kāi)銷。
6. linger.ms
該參數(shù)指定了生產(chǎn)者在發(fā)送批次之前等待更多消息加入批次的時(shí)間。 KafkaProducer 會(huì)在批次填滿或 linger.ms達(dá)到上限時(shí)把批次發(fā)送出去。默認(rèn)情況下,只要有可用的線程, 生產(chǎn)者就會(huì)把消息發(fā)送出去,就算批次里只有一個(gè)消息。把 linger.ms設(shè)置成比0大的數(shù), 讓生產(chǎn)者在發(fā)送批次之前等待一會(huì)兒,使更多的消息加入到這個(gè)批次 。雖然這樣會(huì)增加延遲,但也會(huì)提升吞吐量(因?yàn)橐淮涡园l(fā)送更多的消息,每個(gè)消息的開(kāi)銷就變小了)。
7. client.id
該參數(shù)可以是任意的字符串,服務(wù)器會(huì)用它來(lái)識(shí)別消息的來(lái)源,還可以用在日志和配額指標(biāo)里。
8. max.in.flight.requests.per.connection
該參數(shù)指定了生產(chǎn)者在收到服務(wù)器晌應(yīng)之前可以發(fā)送多少個(gè)消息。它的值越高,就會(huì)占用越多的內(nèi)存,不過(guò)也會(huì)提升吞吐量。 把它設(shè)為 1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器的,即使發(fā)生了重試。
9. timeout.ms、 request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間,metadata.fetch.timeout.ms指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰(shuí))時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間。如果等待響應(yīng)超時(shí),那么生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回 一個(gè)錯(cuò)誤 (拋出異常或執(zhí)行回調(diào))。timeout.ms 指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間,與 asks 的配置相匹配一一如果在指定時(shí)間內(nèi)沒(méi)有收到同步副本的確認(rèn),那么 broker就會(huì)返回 一個(gè)錯(cuò)誤 。
10. max.block.ms
該參數(shù)指定了在調(diào)用 send() 方法或使用 parttitionFor() 方能獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞 時(shí)間。當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已捕,或者沒(méi)有可用的元數(shù)據(jù)時(shí),這些方屈就會(huì)阻塞。在阻塞時(shí)間達(dá)到 max.block.ms時(shí),生產(chǎn)者會(huì)拋出超時(shí)異常。
11 . max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小。它可以指能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請(qǐng)求里所有消息總的大小。例如,假設(shè)這個(gè)值為 1MB,那么可以發(fā)送的單個(gè)最大消息為 1MB,或者生產(chǎn)者可以在單個(gè)請(qǐng)求里發(fā)送一個(gè)批次,該批次包含了 1000個(gè)消息,每個(gè)消息大小為 1KB 。另外, broker對(duì)可接收的消息最大值也有自己的限制( message.max.bytes),所以兩邊的配置最好可以匹配,避免生產(chǎn)者發(fā)送的消息被 broker拒絕 。
12. receive.buffer.bytes 和 send.buffer.bytes
這兩個(gè)參數(shù)分別指定了 TCP socket接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)大小 。 如果它們被設(shè)為 -1 , 就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
順序保證
Kafka可以保證同一個(gè)分區(qū)里的消息是有序的。也就是說(shuō),如果生產(chǎn)者按照一定的順序發(fā)送消息, broker就會(huì)按照這個(gè)順序把它們寫(xiě)入分區(qū),消費(fèi)者也會(huì)按照同樣的順序讀取它們。在某些情況下 , 順序是非常重要的。如果把retries 設(shè)為非零整數(shù),同時(shí)把 max.in.flight.requests.per.connection 設(shè)為比 1大的數(shù),那么,如果第一個(gè)批次消息寫(xiě)入失敗,而第二個(gè)批次寫(xiě)入成功, broker會(huì)重試寫(xiě)入第一個(gè)批次。如果此時(shí)第一個(gè)批次也寫(xiě)入成功,那 么兩個(gè)批次的順序就反過(guò)來(lái)了。
一般來(lái)說(shuō),如果某些場(chǎng)景要求消息是有序的,那么消息是否寫(xiě)入成功也是 很關(guān)鍵的,所以不建議把順序是非常重要的。如果把retries 設(shè)為 0??梢园?max.in.flight.requests.per.connection設(shè)為 1,這樣在生產(chǎn)者嘗試發(fā)送第一批消息時(shí),就不會(huì)有其他的消息發(fā)送給 broker。不過(guò)這樣會(huì)嚴(yán)重影響生產(chǎn)者的吞吐量 ,所以只有在 對(duì)消息的順序有嚴(yán)格要求的情況下才能這么做。