kafka簡(jiǎn)介

簡(jiǎn)介

Apache kafka 是一個(gè)分布式的基于push-subscribe的消息系統(tǒng),它具備快速、可擴(kuò)展、可持久化的特點(diǎn)。它現(xiàn)在是Apache旗下的一個(gè)開源系統(tǒng),作為hadoop生態(tài)系統(tǒng)的一部分,被各種商業(yè)公司廣泛應(yīng)用。它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、storm/spark流式處理引擎。

kafka簡(jiǎn)介

特性

  • 高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒

  • 可擴(kuò)展性:kafka集群支持熱擴(kuò)展

  • 持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失

  • 容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/p>

  • 高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫


架構(gòu)組件

Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個(gè)topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時(shí)從多個(gè)topic讀寫數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息。

  • Topic:消息存放的主題

  • Producer:生產(chǎn)者,負(fù)責(zé)push消息到指定的Broker的Topic中

  • Consumer:消費(fèi)者

  • Broker:Kafka的服務(wù)實(shí)例,主要負(fù)責(zé)創(chuàng)建Topic,存儲(chǔ)Producer所發(fā)布的消息,記錄消息處理過程,先將消息保存到內(nèi)存中,再落地到磁盤

  • Replication-factor:復(fù)制因子(kafka容錯(cuò)機(jī)制),即副本數(shù)量,一般與Broker數(shù)量保持一致

  • Partitions:分區(qū)(每個(gè)分區(qū)只能被同一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者消費(fèi))

  • Leaders:選舉partition中的副本為leader,負(fù)責(zé)處理讀寫

  • Follower: 未被選舉為leader的副本為follower,負(fù)責(zé)備份數(shù)據(jù)

  • Zookeeper:注冊(cè)消息,用于保存broker節(jié)點(diǎn)信息,維護(hù)ISR和OSR隊(duì)列,partition的offset等(ZK在kafka2.8版本之后移除,offset保存在本地)


副本同步機(jī)制

kafka 0.8以后,提供了HA機(jī)制,就是replica副本機(jī)制。每個(gè)partition的數(shù)據(jù)都會(huì)同步到其他機(jī)器上,形成自己的多個(gè)replica副本。然后所有replica會(huì)選舉一個(gè)leader出來,那么生產(chǎn)和消費(fèi)都跟這個(gè)leader打交道,然后其他replica就是follower。寫的時(shí)候,leader會(huì)負(fù)責(zé)把數(shù)據(jù)同步到所有follower上去,讀的時(shí)候就直接讀leader上數(shù)據(jù)即可。

[圖片上傳失敗...(image-6f7b38-1743417387765)]

Kafka 生產(chǎn)者ack機(jī)制

  • ack=0:

    • 意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后不等待任何確認(rèn)

    • 結(jié)果:生產(chǎn)者不知道消息是否成功到達(dá)服務(wù)端,容易造成消息丟失

  • ack=1:

    • 意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后,等待leader的確認(rèn)

    • 結(jié)果:意味著leader節(jié)點(diǎn)成功接收數(shù)據(jù),但是不一定同步了其他副本

  • ack=all 或 ack=-1:

    • 意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后,等待所有partition的副本確認(rèn)

    • 結(jié)果:當(dāng)所有副本都接收到數(shù)據(jù)之后,才認(rèn)為消息被成功提交

Kafka 如果設(shè)置了ack(ack!=0),需要設(shè)置超時(shí)時(shí)間(retry.backoff.ms)和重試次數(shù)(retries)


ack過程中出現(xiàn)網(wǎng)絡(luò)問題,導(dǎo)致生產(chǎn)者重復(fù)發(fā)送消息怎么處理?

重復(fù)產(chǎn)生的核心原因
  • 超時(shí)與重試機(jī)制 當(dāng) Producer 發(fā)送消息后,若在 request.timeout.ms(默認(rèn) 30 秒)內(nèi)未收到 Broker 的 ack 確認(rèn),Producer 會(huì)認(rèn)為消息發(fā)送失敗并觸發(fā)重試(retries 參數(shù)控制重試次數(shù))。 風(fēng)險(xiǎn)場(chǎng)景

    • Broker 實(shí)際已成功寫入消息,但 ack 響應(yīng)因網(wǎng)絡(luò)延遲未及時(shí)到達(dá) Producer。

    • Producer 重試時(shí),Broker 會(huì)再次寫入相同的消息(若未啟用冪等性),導(dǎo)致消息重復(fù)。

  • Broker 的寫入機(jī)制 Kafka 的 Partition 日志是僅追加(append-only)的,即使消息內(nèi)容相同,重試時(shí)也會(huì)被當(dāng)作新消息寫入(除非開啟冪等性)。

避免重復(fù)的解決方案

(1) 啟用 Producer 冪等性

  • 配置方式: 在 Producer 中設(shè)置 enable.idempotence=true(需同時(shí)滿足 acks=allretries > 0)。

  • 原理

    • Producer 為每個(gè)消息分配唯一 PID(Producer ID)序列號(hào)。

    • Broker 根據(jù) PID + 分區(qū)號(hào) + 序列號(hào) 去重,拒絕重復(fù)消息。

  • 適用場(chǎng)景: 單 Producer 實(shí)例內(nèi)保證 Exactly-Once 語義,避免因重試導(dǎo)致的重復(fù)。

(2) 使用 Kafka 事務(wù)

  • 配置方式

    • 設(shè)置 transactional.id 并調(diào)用 initTransactions()、beginTransaction()、commitTransaction()。
  • 原理

    • 事務(wù)機(jī)制結(jié)合冪等性,確保跨多個(gè) Partition 和 Consumer 的原子性操作。

    • Broker 會(huì)記錄事務(wù)狀態(tài),中斷的事務(wù)自動(dòng)回滾。

  • 適用場(chǎng)景: 需要跨多個(gè)消息或“讀-處理-寫”鏈路的 Exactly-Once 語義(如 Kafka Streams)。

(3) 業(yè)務(wù)端去重

  • 實(shí)現(xiàn)方式

    • 在消息中攜帶唯一標(biāo)識(shí)(如 UUID 或業(yè)務(wù)主鍵)。

    • 消費(fèi)者端根據(jù)唯一標(biāo)識(shí)去重(如寫入數(shù)據(jù)庫時(shí)檢查主鍵沖突)。

  • 適用場(chǎng)景: 無法啟用冪等性或事務(wù)的舊版本 Kafka,或需要兼容其他消息系統(tǒng)的場(chǎng)景。


ISR 機(jī)制

Kafka 服務(wù)端根據(jù)副本同步的情況,分成了三個(gè)集合:

AR(Assigned Replicas):包括ISR和OSR

ISR(In-sync Replicas):和leader保持同步的副本集合,可以認(rèn)為是可靠數(shù)據(jù)(leader故障之后會(huì)在這個(gè)集合重新選舉)

OSR(Out-sync Replicas):和leader副本同步失效的副本集合(副本還會(huì)繼續(xù)同步數(shù)據(jù),同步成功之后會(huì)進(jìn)入ISR)


Kafka 數(shù)據(jù)持久化機(jī)制

Kafka 默認(rèn)使用操作系統(tǒng)的頁緩存(page cache)機(jī)制,將消息寫入內(nèi)存中的頁緩存,而非立即刷盤(高吞吐的原因)

默認(rèn)情況下,Kafka 通過以下兩個(gè)參數(shù)控制主動(dòng)刷盤:

  • log.flush.interval.messages:累計(jì)多少條消息后觸發(fā)刷盤(默認(rèn) Long.MAX_VALUE,即不主動(dòng)觸發(fā))

  • log.flush.interval.ms:間隔多少毫秒后觸發(fā)刷盤(默認(rèn) null,即不主動(dòng)觸發(fā)) 實(shí)際默認(rèn)行為:依賴操作系統(tǒng)后臺(tái)線程(如 Linux 的 pdflush)自動(dòng)刷盤,通常延遲約 30 秒

Broker 端配置

  • 強(qiáng)制刷盤 修改 log.flush.interval.messageslog.flush.interval.ms,例如:

    • log.flush.interval.messages=1 # 每條消息都刷盤(極端情況,性能極低)

    • log.flush.interval.ms=1000 # 每秒刷盤一次

此配置會(huì)顯著降低吞吐量,僅適用于對(duì)數(shù)據(jù)丟失零容忍的場(chǎng)景。


Kafka 數(shù)據(jù)模型與消息存儲(chǔ)機(jī)制

[圖片上傳失敗...(image-4d351-1743417387765)]

在同一個(gè)Topic下,一個(gè)partition對(duì)應(yīng)一個(gè)唯一的文件夾,在文件夾下,kafka 消息是采用 Segment File的存儲(chǔ)方式進(jìn)行存儲(chǔ)

Segment File:Segment由.index文件和.log文件組成,將大文件拆分成小文件來存儲(chǔ)(加快IO加載

.index文件:索引文件,以key-value格式存儲(chǔ),key-表示索引文件的第幾條消息;value-表示這條消息在log file中的物理偏移量

.log文件:數(shù)據(jù)存儲(chǔ)文件

如何通過 offset 找到 某一條消息呢?

  1. 首先會(huì)根據(jù) offset 值去查找 Segment 中的 index 文件,因?yàn)?index 文件是以上個(gè)文件的最大 offset 偏移命名的所以可以通過二分法快速定位到索引文件。

  2. 找到索引文件后,索引文件中保存的是 offset 和對(duì)應(yīng)的消息行在 log 日志中的存儲(chǔ)行號(hào),因?yàn)?Kafka 采用稀疏矩陣的方式來存儲(chǔ)索引信息,并不是每一條索引都存儲(chǔ),所以這里只是查到文件中符合當(dāng)前 offset 范圍的索引。

  3. 拿到 當(dāng)前查到的范圍索引對(duì)應(yīng)的行號(hào)之后再去對(duì)應(yīng)的 log 文件中從 當(dāng)前 Position 位置開始查找 offset 對(duì)應(yīng)的消息,直到找到該 offset 為止。

每一條消息的組成內(nèi)容有如下字段:

Copy
offset: 4964(邏輯偏移量) 
position: 75088(物理偏移量) 
CreateTime: 1545203239308(創(chuàng)建時(shí)間) 
isvalid: true(是否有效)
keysize: -1(鍵大小) 
valuesize: 9(值大小) 
magic: 2 
compresscodec: NONE(壓縮編碼) 
producerId: -1
producerEpoch: -1(epoch號(hào)) 
sequence: -1(序號(hào)) 
isTransactional: false(是否事務(wù)) 
headerKeys: []
payload: message_0(消息的具體內(nèi)容)

Kafka pull & push

pull是指consumer來拉取消息(默認(rèn));push是指kafka主動(dòng)推送消息給consumer

pull模式:根據(jù)consumer的消費(fèi)能力進(jìn)行數(shù)據(jù)拉取,可以批量拉或者單條拉;可以設(shè)置不同的提交方式實(shí)現(xiàn)不同的傳輸語義

缺點(diǎn):如果kafka沒有數(shù)據(jù)會(huì)導(dǎo)致consumer空循環(huán)(可以通過設(shè)置拉取數(shù)據(jù)為空或沒達(dá)到一定數(shù)量阻塞來解決)

push模式:不會(huì)導(dǎo)致consumer循環(huán)等待

缺點(diǎn):速率固定、忽略consumer的消費(fèi)能力,可能導(dǎo)致拒絕服務(wù)或網(wǎng)絡(luò)擁堵等情況


kafka常見問題

1、消息冪等性(重復(fù)消費(fèi))

重復(fù)消費(fèi)只針對(duì)消費(fèi)者端而言,消費(fèi)者要保證消息的冪等性,一般要結(jié)合業(yè)務(wù)場(chǎng)景進(jìn)行,主要有兩種解決方案:

1)redis:消費(fèi)數(shù)據(jù)后把消息的唯一鍵存到redis中,每次消費(fèi)的時(shí)候去redis查一下key是否存在

2)mysql:消費(fèi)數(shù)據(jù)后把消息的唯一鍵存到mysql中,每次消費(fèi)的時(shí)候查mysql

2、數(shù)據(jù)丟失

  • 消費(fèi)者:消費(fèi)者自動(dòng)提交offset的時(shí)候會(huì)有數(shù)據(jù)丟失的情況,改為手動(dòng)提交offset可解決,極端情況下數(shù)據(jù)處理完后提交offset的時(shí)候掛了,可通過加冪等性操作解決

  • kafka:kafka某個(gè)broker宕機(jī),重新選舉leader時(shí)有數(shù)據(jù)未同步時(shí)會(huì)有消息丟失的情況,可通過設(shè)置參數(shù)(保證數(shù)據(jù)寫入到每個(gè)副本后才算寫入成功)解決

  • 生產(chǎn)者:設(shè)置acks=all即可避免丟失數(shù)據(jù)

3、順序性

  • 生產(chǎn)者:生產(chǎn)者沒有順序性問題

  • kafka:partition內(nèi)部有序,多個(gè)partition時(shí)無法保證順序,對(duì)需要順序消費(fèi)的數(shù)據(jù)指定到同一個(gè)partition即可保證順序

  • 消費(fèi)者:同一個(gè)partition時(shí),當(dāng)消費(fèi)者內(nèi)部啟用多線程時(shí)會(huì)導(dǎo)致順序錯(cuò)亂,可在消費(fèi)者內(nèi)部啟用內(nèi)存隊(duì)列來保證多線程的順序

4、kafka 高性能高吞吐原因

  • 磁盤順序讀寫:

    • Kafka 將消息按順序追加(Append)到 Partition 日志文件末尾,避免機(jī)械硬盤磁頭頻繁尋道

    • 消費(fèi)者按偏移量(Offset)順序讀取消息,充分利用操作系統(tǒng)的磁盤預(yù)讀

    • 頁緩存(page cache):消息直接寫入內(nèi)存頁緩存,由操作系統(tǒng)異步刷盤,讀操作優(yōu)先訪問緩存,減少物理磁盤 I/O

  • 零拷貝

    • 傳統(tǒng)數(shù)據(jù)拷貝流程: 磁盤文件 → 內(nèi)核緩沖區(qū) → 用戶空間緩沖區(qū) → Socket 緩沖區(qū) → 網(wǎng)卡 4 次上下文切換 + 2 次 CPU 拷貝

    • Kafka 優(yōu)化(sendfile系統(tǒng)調(diào)用): 磁盤文件 → 內(nèi)核緩沖區(qū) → 網(wǎng)卡 2 次上下文切換 + 0 次 CPU 拷貝,減少 50% 以上 CPU 消耗

  • 分區(qū)分段+索引:分區(qū)是指partition,分段是指segment文件,索引是指segment里的.index索引

    • 分區(qū)(Partition)機(jī)制

      • 數(shù)據(jù)分片:每個(gè) Topic 劃分為多個(gè) Partition,分布在不同 Broker 上。

      • 并行讀寫:生產(chǎn)者和消費(fèi)者可同時(shí)向多個(gè) Partition 寫入/讀取數(shù)據(jù),充分利用集群資源。

      • 擴(kuò)展性:通過增加 Partition 和 Broker,線性提升吞吐量。

    • 生產(chǎn)者負(fù)載均衡

      • 分區(qū)策略

        • 默認(rèn)輪詢(Round Robin)或按 Key 哈希,確保數(shù)據(jù)均勻分布。

        • 自定義策略支持業(yè)務(wù)特化路由。

    • 消費(fèi)者組(Consumer Group)

      • 并行消費(fèi):同一消費(fèi)者組內(nèi)多個(gè)消費(fèi)者實(shí)例分別消費(fèi)不同 Partition,提升消費(fèi)速度。

      • 水平擴(kuò)容:增加消費(fèi)者數(shù)量即可擴(kuò)展消費(fèi)能力(需 Partition 數(shù)量 ≥ 消費(fèi)者數(shù)量)。

  • 批量處理(Batching)

    • 生產(chǎn)者端

      • 消息積累到 batch.size(默認(rèn) 16KB)或 linger.ms(默認(rèn) 0ms)后批量發(fā)送,減少網(wǎng)絡(luò)請(qǐng)求次數(shù)。

      • 壓縮(Compression):支持 GZIP、Snappy、LZ4、Zstandard,降低網(wǎng)絡(luò)傳輸量(尤其文本類消息)。

    • Broker 端

      • 批量寫入磁盤,減少磁盤尋址次數(shù)。
    • 消費(fèi)者端

      • 批量拉取消息(max.poll.records 控制單次拉取數(shù)量)。
  • 直接操作頁緩存(page cache):消息直接寫入內(nèi)存頁緩存,由操作系統(tǒng)異步刷盤,讀操作優(yōu)先訪問緩存,減少物理磁盤 I/O

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一、簡(jiǎn)介 Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息隊(duì)列(Message Queue),主要應(yīng)用于 大數(shù)...
    brayden_yang閱讀 344評(píng)論 0 1
  • 1,消息引擎系統(tǒng) 1)Kafka是消息引擎系統(tǒng)。兩個(gè)重要因素: 消息設(shè)計(jì)、傳輸協(xié)議設(shè)計(jì)。2)Kafka消息是結(jié)構(gòu)化...
    沐兮_d64c閱讀 2,066評(píng)論 0 13
  • Kafka的主要特點(diǎn):1. 為發(fā)布和訂閱提供高吞吐量,每秒可產(chǎn)生25萬消息(50MB),每秒可處理55萬消息(11...
    zi萱閱讀 781評(píng)論 0 2
  • 1.kafka 集群的架構(gòu) Kafka是最初由Linkedin公司開發(fā),是一個(gè)分布式、支持分區(qū)的(partitio...
    本能帥閱讀 295評(píng)論 0 1
  • 夜鶯2517閱讀 128,186評(píng)論 1 9

友情鏈接更多精彩內(nèi)容