流程

流程講解
在我們通過代碼send消息之后,這條消息就會發(fā)往攔截器Interceptor
-
Interceptor會對數據做處理
- 加解密/脫敏
- 過濾不滿足條件的數據(ip白名單、錯誤編碼、臟數據或者殘缺數據)
- 統(tǒng)計消息投遞成功率或結合第三方工具計算消息在Kafka存儲的時間
- 在消息的header里放一個唯一標識,方便下游做去重
針對舊版本,新版本Kafka引入了冪等性來保證Once Exactly(剛好一次)
-
對數據進行序列化
- 無論是否存在key,都必須給key和value指定序列化方式
- 可通過實現Serializer自定義序列化規(guī)則
-
對數據進行分區(qū)
分區(qū)策略很重要,好的分區(qū)策略可以解決數據傾斜的問題
可通過實現Partitioner接口來自定義分區(qū)規(guī)則,否則規(guī)則如下- 如果發(fā)送send的時候指定了分區(qū),則使用指定分區(qū)
- 如未指定,則根據key進行hash,然后對分區(qū)數取模
- 如未指定且沒key,則輪詢發(fā)送給分區(qū)(低版本采用隨機)
-
臨時存儲
RecordAccumulator采用了雙端隊列數據結構Deque來臨時存儲
目的:提高發(fā)送數據的吞吐量- 確定消息發(fā)送的分區(qū)后,會在RecordAccumulator尋找對應的Deque
找不到對應的Deque則新建 - 從對應的Deque的尾巴中取出最后一個RecordBatch進行判斷
如果該Batch加上當前消息的大小小于batch.size,則追加進去;
否則創(chuàng)建新的Batch、將當前消息放進去并將Batch放到Deque隊列 - 注:RecordBatch是寫Kafka的最小單位
- 確定消息發(fā)送的分區(qū)后,會在RecordAccumulator尋找對應的Deque
Sender拉取數據
當滿足linger.ms和buffer.memory任一個條件時,會進行數據的拉取排隊發(fā)送
每一個Deque的數據都有一個對應的ClientRequest,負責攜帶RecordBatch
排隊等待前一個RecordBatch的響應包裝
將ClientRequest扔到KafkaChannel中,等到Selector的發(fā)送-
寫Kafka
這一步驟是真正的往Kafka的Broker中寫數據,回應的規(guī)則是- ack=0:發(fā)送出去就立馬執(zhí)行第10步,不等待響應
典型的fire and forget, 性能最好,但也最容易丟數據 - ack=1:發(fā)送出去,等到那批數據被寫到主副本上時,就成功響應
由于只是寫到主副本的頁緩存,因此存在丟數據的可能 - ack=-1:發(fā)送出去,直到ISR隊列中包括主副本在內的
min.insync.replicas個副本被寫成功,才成功響應-
ack=-1搭配min.insync.replicas的結果
讓kafka的副本復制策略游離在同步復制和異步復制之間
既避免了同步復制拖慢性能,又提高了異步復制的可靠性
-
- ack=0:發(fā)送出去就立馬執(zhí)行第10步,不等待響應
回復NetworkClient,開始下一個RecordBatch的發(fā)送
NetworkClient回復RecordAccumulator
概念
Kafka的生產者就是往Kafka寫消息的程序
比如flume、spark、filebeat等,可以是一個進程也可以是一個線程
壓縮
-
Kafka的壓縮也是比較有意思的,特別是2.1版本引入的 ZStandard
在CPU相對空閑的情況可通過設置compression.type來開啟
使用壓縮要注意以下幾點:- 消息的格式需要保證一致(V0、V1和V2不要搭配使用,否則會導致Broker端多一次解壓縮)
- Broker端不要設置跟
compression.type不用的壓縮類型,否則也會多一次解壓縮甚至喪失零拷貝特性
好處:減小網絡傳輸壓力以及Broker存儲數據的磁盤占用量
生產環(huán)境注意問題
-
kafka在運行期間可增加分區(qū)數,在增加分區(qū)數前,需注意以下幾點:
- 數據亂序
由于消息一般都是進行hash然后對分區(qū)數取模,增加分區(qū)數會導致原來該放到1分區(qū)的消息被放到了2,從而無法保證數據的有序 - 數據丟失
分為兩種情況討論- 消費者指定分區(qū)消費
加入消費者A只消費1分區(qū)的數據,而分區(qū)數增加導致原本應該放到1分區(qū)的數據被放到了其他分區(qū),從而導致消費者A無法消費到該條消息 - 消費者拉取的策略為last
當產生分區(qū)3時,如果生產者先感知到并往里邊投遞消息;
消費者隔一段時間后才感知到并且由于配置了last,只能從最新的消息進行拉取,那么分區(qū)3里面就會被一部分消息不會被消費導致丟數據
- 消費者指定分區(qū)消費
- 數據亂序
-
調整消息大小
這個要重點提一下,生產環(huán)境下經常會遇到的坑
由于kafka默認消息大小message.max.bytes還不到1M,因此會經常調整該值但是要切記?。?!
調整message.max.bytes之前,請先調整
replica.fetch.max.bytes:確保副本之間能正常復制
fetch.message.max.bytes:確保消息可以被消費者正常消費 生產者日志里面存在以下異常
NetworkException 表示網絡異常,這個有可能是由于網絡瞬時故障而導致的異常,可以通過重試解決
LeaderNotAvailableException表示分區(qū)的leader副本不可用,leader副本下線而新的leader副本選舉完成之前,重試之后可以重新恢復
UnknownTopicOrPartitionException
NotEnoughReplicasException
NotCoordinatorException
解釋:以上異常都是可通過重試機制來解決的,因此可通過設置以下兩個參數來解決
retries:遇到以上異常不會直接拋,而是嘗試重試該參數設置的次數,若都不成功再拋異常
retry.backoff.ms:表示兩次重試間隔,一般根據異常情況來調整,網絡不好情況可適當延長
既然有可重試的異常,自然也有不可重試的異常。如:
RecordTooLargeException異常,表示所發(fā)送的消息太大,KafkaProducer對此不會進行任何重試,直接拋出異常
像序列化反序列化失敗、數據格式不對等異常也是不可重試的
注意
使用重試的話消息可能也會亂序,可通過設置以下參數進行避免
max.in.flight.requests.per.connection設置為1 但對性能會有一定的影響
疑問
- 新版生產者客戶端(0.9以后引入的)相比舊版都有哪些優(yōu)勢?
- Kafka的消息為什么換了那么多種格式?(V0、V1、V2甚至V0之前還有其他的格式)
- KafkaProducer是怎么保證線程安全的?
小結
本篇博客一開始以圖的形式,通過給大家描述消息寫到Kafka的流程引入了生產者相關的角色和概念;
之后是簡單介紹Kafka生產者的一些相關概念,最后是列出了一些生產環(huán)境需要注意的問題。
希望讀者能夠喜歡這種描述方式,同時相信讀者也有不少的疑惑或者有覺得不對的地方,歡迎在下方進行留言討論
寫在最后
讀者有沒有曾經好奇過,Kafka為什么這么快 ???
在該篇內容中已埋下了一些伏筆,之后的博客中會跟大家一起進行探討