Kafka(生產者篇)

流程

image.png

流程講解

  1. 在我們通過代碼send消息之后,這條消息就會發(fā)往攔截器Interceptor

  2. Interceptor會對數據做處理

    • 加解密/脫敏
    • 過濾不滿足條件的數據(ip白名單、錯誤編碼、臟數據或者殘缺數據)
    • 統(tǒng)計消息投遞成功率或結合第三方工具計算消息在Kafka存儲的時間
    • 在消息的header里放一個唯一標識,方便下游做去重
      針對舊版本,新版本Kafka引入了冪等性來保證Once Exactly(剛好一次)
  3. 對數據進行序列化

    • 無論是否存在key,都必須給key和value指定序列化方式
    • 可通過實現Serializer自定義序列化規(guī)則
  4. 對數據進行分區(qū)
    分區(qū)策略很重要,好的分區(qū)策略可以解決數據傾斜的問題
    可通過實現Partitioner接口來自定義分區(qū)規(guī)則,否則規(guī)則如下

    • 如果發(fā)送send的時候指定了分區(qū),則使用指定分區(qū)
    • 如未指定,則根據key進行hash,然后對分區(qū)數取模
    • 如未指定且沒key,則輪詢發(fā)送給分區(qū)(低版本采用隨機)
  5. 臨時存儲
    RecordAccumulator采用了雙端隊列數據結構Deque來臨時存儲
    目的:提高發(fā)送數據的吞吐量

    • 確定消息發(fā)送的分區(qū)后,會在RecordAccumulator尋找對應的Deque
      找不到對應的Deque則新建
    • 從對應的Deque的尾巴中取出最后一個RecordBatch進行判斷
      如果該Batch加上當前消息的大小小于batch.size,則追加進去;
      否則創(chuàng)建新的Batch、將當前消息放進去并將Batch放到Deque隊列
    • 注:RecordBatch是寫Kafka的最小單位
  6. Sender拉取數據
    當滿足linger.msbuffer.memory任一個條件時,會進行數據的拉取

  7. 排隊發(fā)送
    每一個Deque的數據都有一個對應的ClientRequest,負責攜帶RecordBatch
    排隊等待前一個RecordBatch的響應

  8. 包裝
    將ClientRequest扔到KafkaChannel中,等到Selector的發(fā)送

  9. 寫Kafka
    這一步驟是真正的往Kafka的Broker中寫數據,回應的規(guī)則是

    • ack=0:發(fā)送出去就立馬執(zhí)行第10步,不等待響應
      典型的 fire and forget , 性能最好,但也最容易丟數據
    • ack=1:發(fā)送出去,等到那批數據被寫到主副本上時,就成功響應
      由于只是寫到主副本的頁緩存,因此存在丟數據的可能
    • ack=-1:發(fā)送出去,直到ISR隊列中包括主副本在內的min.insync.replicas個副本被寫成功,才成功響應
      • ack=-1搭配min.insync.replicas的結果
        讓kafka的副本復制策略游離在同步復制和異步復制之間
        既避免了同步復制拖慢性能,又提高了異步復制的可靠性
  10. 回復NetworkClient,開始下一個RecordBatch的發(fā)送

  11. NetworkClient回復RecordAccumulator

概念

Kafka的生產者就是往Kafka寫消息的程序
比如flume、spark、filebeat等,可以是一個進程也可以是一個線程

壓縮

  • Kafka的壓縮也是比較有意思的,特別是2.1版本引入的 ZStandard
    在CPU相對空閑的情況可通過設置compression.type來開啟
    使用壓縮要注意以下幾點:

    • 消息的格式需要保證一致(V0、V1和V2不要搭配使用,否則會導致Broker端多一次解壓縮)
    • Broker端不要設置跟compression.type不用的壓縮類型,否則也會多一次解壓縮甚至喪失零拷貝特性

    好處:減小網絡傳輸壓力以及Broker存儲數據的磁盤占用量

生產環(huán)境注意問題

  1. kafka在運行期間可增加分區(qū)數,在增加分區(qū)數前,需注意以下幾點:

    • 數據亂序
      由于消息一般都是進行hash然后對分區(qū)數取模,增加分區(qū)數會導致原來該放到1分區(qū)的消息被放到了2,從而無法保證數據的有序
    • 數據丟失
      分為兩種情況討論
      • 消費者指定分區(qū)消費
        加入消費者A只消費1分區(qū)的數據,而分區(qū)數增加導致原本應該放到1分區(qū)的數據被放到了其他分區(qū),從而導致消費者A無法消費到該條消息
      • 消費者拉取的策略為last
        當產生分區(qū)3時,如果生產者先感知到并往里邊投遞消息;
        消費者隔一段時間后才感知到并且由于配置了last,只能從最新的消息進行拉取,那么分區(qū)3里面就會被一部分消息不會被消費導致丟數據
  2. 調整消息大小
    這個要重點提一下,生產環(huán)境下經常會遇到的坑
    由于kafka默認消息大小message.max.bytes還不到1M,因此會經常調整該值

    但是要切記?。?!
    調整message.max.bytes之前,請先調整
    replica.fetch.max.bytes:確保副本之間能正常復制
    fetch.message.max.bytes:確保消息可以被消費者正常消費

  3. 生產者日志里面存在以下異常

NetworkException 表示網絡異常,這個有可能是由于網絡瞬時故障而導致的異常,可以通過重試解決
LeaderNotAvailableException表示分區(qū)的leader副本不可用,leader副本下線而新的leader副本選舉完成之前,重試之后可以重新恢復
UnknownTopicOrPartitionException
NotEnoughReplicasException
NotCoordinatorException

解釋:以上異常都是可通過重試機制來解決的,因此可通過設置以下兩個參數來解決
retries:遇到以上異常不會直接拋,而是嘗試重試該參數設置的次數,若都不成功再拋異常
retry.backoff.ms:表示兩次重試間隔,一般根據異常情況來調整,網絡不好情況可適當延長

既然有可重試的異常,自然也有不可重試的異常。如:
RecordTooLargeException異常,表示所發(fā)送的消息太大,KafkaProducer對此不會進行任何重試,直接拋出異常
像序列化反序列化失敗、數據格式不對等異常也是不可重試的

注意
使用重試的話消息可能也會亂序,可通過設置以下參數進行避免
max.in.flight.requests.per.connection設置為1 但對性能會有一定的影響

疑問

  1. 新版生產者客戶端(0.9以后引入的)相比舊版都有哪些優(yōu)勢?
  2. Kafka的消息為什么換了那么多種格式?(V0、V1、V2甚至V0之前還有其他的格式)
  3. KafkaProducer是怎么保證線程安全的?

小結

本篇博客一開始以圖的形式,通過給大家描述消息寫到Kafka的流程引入了生產者相關的角色和概念;
之后是簡單介紹Kafka生產者的一些相關概念,最后是列出了一些生產環(huán)境需要注意的問題。

希望讀者能夠喜歡這種描述方式,同時相信讀者也有不少的疑惑或者有覺得不對的地方,歡迎在下方進行留言討論

寫在最后

讀者有沒有曾經好奇過,Kafka為什么這么快 ???
在該篇內容中已埋下了一些伏筆,之后的博客中會跟大家一起進行探討

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容