Kafka(3)生產(chǎn)者客戶端

一、前言

生產(chǎn)者客戶端(Producer API)是負(fù)責(zé)發(fā)送消息流到Kafka集群不同Topics的應(yīng)用程序。
Kafka0.9版本之前,使用的是Scala語言編寫的客戶端,從Kafka 0.9x版本開始,官方推出了Java版客戶端。(雖然Kafka是用Java/Scala語言編寫的,但也有C/C++、Python、Go等其他語言客戶端,這些語言客戶端并非由Kafka社區(qū)維護(hù)),下面是對(duì)Kafka Java版生產(chǎn)者客戶端的相關(guān)介紹。

二、整體結(jié)構(gòu)及流程

生產(chǎn)者客戶端整體結(jié)構(gòu)圖
  • 上圖是生產(chǎn)者客戶端的整體結(jié)構(gòu)圖,主要包括客戶端核心類KafkaProducer、消息累加器類RecordAccumulator、處理發(fā)送請(qǐng)求器類Sender以及網(wǎng)絡(luò)I/O類Selector。
  • 客戶端核心類KafkaProducer是一個(gè)線程安全的類,構(gòu)建KafkaProducer同時(shí)會(huì)創(chuàng)建消息累加器RecordAccumulator、創(chuàng)建并啟動(dòng)請(qǐng)求發(fā)送Sender線程。
  • KafkaProducer也是發(fā)送信息的入口,消息首先經(jīng)過KafkaProducer,然后通過攔截器、序列化器、分區(qū)器、消息大小驗(yàn)證、事務(wù)處理等流程后追加到消息累加器RecordAccumulator中。
  • 消息累加器RecordAccumulator內(nèi)部為每一個(gè)主題分區(qū)維護(hù)著一個(gè)雙端隊(duì)列Deque,每個(gè)Deque存放的是ProducerBatch類型數(shù)據(jù),ProducerBatch是由多個(gè)較小ProducerRecord拼湊而成,這樣可以減少網(wǎng)絡(luò)請(qǐng)求次數(shù),提高傳輸速率。消息累加器存儲(chǔ)批量消息數(shù)據(jù)類型如下:
# 批量消息數(shù)據(jù)存儲(chǔ)容器,其中TopicPartition表示主題分區(qū)對(duì)象
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
  • RecordAccumulator 主要作用是緩存消息、組裝為消息批量對(duì)象,并壓縮消息,然后提供給Sender線程批量發(fā)送,進(jìn)而減少網(wǎng)絡(luò)傳輸資源,提升系統(tǒng)吞吐量。RecordAccumulator的緩存大小可以通過參數(shù)進(jìn)行配置,默認(rèn)值為32M。壓縮類型包括GZIP、SNAPPY、LZ4、ZSTD這4種。
  • Sender接收到消息之后,會(huì)將數(shù)據(jù)轉(zhuǎn)換為<NodeId, List<ProducerBatch>>的形式。其中NodeId代表broker節(jié)點(diǎn)Id,List<ProducerBatch>代表要發(fā)往這個(gè)節(jié)點(diǎn)的消息數(shù)據(jù)集合,但不是最終的請(qǐng)求對(duì)象。Sender還會(huì)進(jìn)一步封裝為ClientRequest,一個(gè)ClientRequest發(fā)往一個(gè)Node節(jié)點(diǎn),并且發(fā)送到集群之前,還會(huì)添加到InFlightRequests,最后通過Selector進(jìn)行網(wǎng)絡(luò)I/O層發(fā)送。
  • Selector是一個(gè)非阻塞支持多連接的網(wǎng)絡(luò)I/O類。包括NetworkSend和NetworkReceive兩部分,分別負(fù)責(zé)向Kafka集群發(fā)送網(wǎng)絡(luò)請(qǐng)求以及接收Kafka集群的響應(yīng)。

三、重要配置參數(shù)

bootstrap.servers
  • 描述:用于建立初始連接到Kafka集群(主機(jī)/端口對(duì))的配置列表。這個(gè)配置不需要包含整個(gè)集群的服務(wù)器。不論這個(gè)參數(shù)配置了哪些服務(wù)器來初始化連接,客戶端都是會(huì)均衡地與集群中的所有服務(wù)器建立連接。如果集群變化,元數(shù)據(jù)會(huì)動(dòng)態(tài)更新。為了避免單節(jié)點(diǎn)風(fēng)險(xiǎn),最好配置多臺(tái)主機(jī)。
  • 類型:list。
  • 默認(rèn)值:無。
max.request.size
  • 描述:請(qǐng)求消息最大值(單位byte),序列化的消息字節(jié)數(shù)大于這個(gè)配置參數(shù)時(shí),會(huì)拋出RecordTooLargeException。
  • 類型:int。
  • 默認(rèn)值:1048576。
buffer.memory
  • 描述:Producer 用來緩沖等待被發(fā)送到服務(wù)器的記錄的總字節(jié)數(shù)。如果記錄發(fā)送的速度比發(fā)送到服務(wù)器的速度快,Producer 就會(huì)阻塞,如果阻塞的時(shí)間超過 max.block.ms 配置的時(shí)長,則會(huì)拋出一個(gè)異常。這個(gè)配置與 Producer 的可用總內(nèi)存有一定的對(duì)應(yīng)關(guān)系,但并不是完全等價(jià)的關(guān)系,因?yàn)?Producer 的可用內(nèi)存并不是全部都用來緩存。一些額外的內(nèi)存可能會(huì)用于壓縮(如果啟用了壓縮),以及維護(hù)正在運(yùn)行的請(qǐng)求。 序列化的消息字節(jié)數(shù)大于這個(gè)配置參數(shù)時(shí),會(huì)拋出RecordTooLargeException
  • 類型:long。
  • 默認(rèn)值:33554432。
acks
  • 描述:此配置是 Producer 在確認(rèn)一個(gè)請(qǐng)求發(fā)送完成之前需要收到的反饋信息的數(shù)量。 這個(gè)參數(shù)是為了保證發(fā)送請(qǐng)求的可靠性。以下配置方式是允許的:
    (1) acks=0 如果設(shè)置為0,則 producer 不會(huì)等待服務(wù)器的反饋。該消息會(huì)被立刻添加到 socket buffer 中并認(rèn)為已經(jīng)發(fā)送完成。在這種情況下,服務(wù)器是否收到請(qǐng)求是沒法保證的,并且參數(shù)retries也不會(huì)生效(因?yàn)榭蛻舳藷o法獲得失敗信息)。每個(gè)記錄返回的 offset 總是被設(shè)置為-1。
    (2)acks=1 如果設(shè)置為1,leader節(jié)點(diǎn)會(huì)將記錄寫入本地日志,并且在所有 follower 節(jié)點(diǎn)反饋之前就先確認(rèn)成功。在這種情況下,如果 leader 節(jié)點(diǎn)在接收記錄之后,并且在 follower 節(jié)點(diǎn)復(fù)制數(shù)據(jù)完成之前產(chǎn)生錯(cuò)誤,則這條記錄會(huì)丟失。
    (3)acks=all 如果設(shè)置為all,這就意味著 leader 節(jié)點(diǎn)會(huì)等待所有同步中的副本確認(rèn)之后再確認(rèn)這條記錄是否發(fā)送完成。只要至少有一個(gè)同步副本存在,記錄就不會(huì)丟失。這種方式是對(duì)請(qǐng)求傳遞的最有效保證。acks=-1與acks=all是等效的。
  • 類型:string。
  • 默認(rèn)值:1。
compression.type
  • 描述:Producer 生成數(shù)據(jù)時(shí)可使用的壓縮類型。默認(rèn)值是none(即不壓縮)??膳渲玫膲嚎s類型包括:none, gzip, snappy, 或者 lz4 。壓縮是針對(duì)批處理的所有數(shù)據(jù),所以批處理的效果也會(huì)影響壓縮比(更多的批處理意味著更好的壓縮)。
  • 類型:string。
  • 默認(rèn)值:none。
retries
  • 描述:若設(shè)置大于0的值,則客戶端會(huì)將發(fā)送失敗的記錄重新發(fā)送,盡管這些記錄有可能是暫時(shí)性的錯(cuò)誤。請(qǐng)注意,這種 retry 與客戶端收到錯(cuò)誤信息之后重新發(fā)送記錄并無區(qū)別。允許 retries 并且沒有設(shè)置max.in.flight.requests.per.connection 為1時(shí),記錄的順序可能會(huì)被改變。比如:當(dāng)兩個(gè)批次都被發(fā)送到同一個(gè) partition ,第一個(gè)批次發(fā)生錯(cuò)誤并發(fā)生 retries 而第二個(gè)批次已經(jīng)成功,則第二個(gè)批次的記錄就會(huì)先于第一個(gè)批次出現(xiàn)。
  • 類型:int。
  • 默認(rèn)值:0。
batch.size
  • 描述:當(dāng)將多個(gè)記錄被發(fā)送到同一個(gè)分區(qū)時(shí), Producer 將嘗試將記錄組合到更少的請(qǐng)求中。這有助于提升客戶端和服務(wù)器端的性能。這個(gè)配置控制一個(gè)批次的默認(rèn)大?。ㄒ宰止?jié)為單位)。當(dāng)記錄的大小超過了配置的字節(jié)數(shù), Producer 將不再嘗試往批次增加記錄。發(fā)送到 broker 的請(qǐng)求會(huì)包含多個(gè)批次的數(shù)據(jù),每個(gè)批次對(duì)應(yīng)一個(gè) partition 的可用數(shù)據(jù)小的 batch.size 將減少批處理,并且可能會(huì)降低吞吐量(如果 batch.size = 0的話將完全禁用批處理)。 很大的 batch.size 可能造成內(nèi)存浪費(fèi),因?yàn)槲覀円话銜?huì)在 batch.size 的基礎(chǔ)上分配一部分緩存以應(yīng)付額外的記錄。
  • 類型:int。
  • 默認(rèn)值:16384。
client.id
  • 描述:發(fā)出請(qǐng)求時(shí)傳遞給服務(wù)器的 ID 字符串。這樣做的目的是為了在服務(wù)端的請(qǐng)求日志中能夠通過邏輯應(yīng)用名稱來跟蹤請(qǐng)求的來源,而不是只能通過IP和端口號(hào)跟進(jìn)。
  • 類型:string。
  • 默認(rèn)值:""。
connections.max.idle.ms
  • 描述:在此配置指定的毫秒數(shù)之后,關(guān)閉空閑連接。
  • 類型:long。
  • 默認(rèn)值:540000。
security.protocol
  • 描述:與 brokers 通訊的協(xié)議。可配置的值有: PLAINTEXT, SSL, SASLPLAINTEXT, SASLSSL。
  • 類型:string。
  • 默認(rèn)值:PLAINTEXT。

上面列出了優(yōu)先級(jí)較高的一些配置,更多配置可以參考Kafka官方網(wǎng)站。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請(qǐng)通過簡信或評(píng)論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容