簡(jiǎn)介
Apache kafka 是一個(gè)分布式的基于push-subscribe的消息系統(tǒng),它具備快速、可擴(kuò)展、可持久化的特點(diǎn)。它現(xiàn)在是Apache旗下的一個(gè)開源系統(tǒng),作為hadoop生態(tài)系統(tǒng)的一部分,被各種商業(yè)公司廣泛應(yīng)用。它的最大的特性就是可以實(shí)時(shí)的處理大量數(shù)據(jù)以滿足各種需求場(chǎng)景:比如基于hadoop的批處理系統(tǒng)、低延遲的實(shí)時(shí)系統(tǒng)、storm/spark流式處理引擎。

特性
高吞吐量、低延遲:kafka每秒可以處理幾十萬條消息,它的延遲最低只有幾毫秒
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性、可靠性:消息被持久化到本地磁盤,并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失敗(若副本數(shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失?。?/p>
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
架構(gòu)組件
Kafka中發(fā)布訂閱的對(duì)象是topic。我們可以為每類數(shù)據(jù)創(chuàng)建一個(gè)topic,把向topic發(fā)布消息的客戶端稱作producer,從topic訂閱消息的客戶端稱作consumer。Producers和consumers可以同時(shí)從多個(gè)topic讀寫數(shù)據(jù)。一個(gè)kafka集群由一個(gè)或多個(gè)broker服務(wù)器組成,它負(fù)責(zé)持久化和備份具體的kafka消息。
Topic:消息存放的主題
Producer:生產(chǎn)者,負(fù)責(zé)push消息到指定的Broker的Topic中
Consumer:消費(fèi)者
Broker:Kafka的服務(wù)實(shí)例,主要負(fù)責(zé)創(chuàng)建Topic,存儲(chǔ)Producer所發(fā)布的消息,記錄消息處理過程,先將消息保存到內(nèi)存中,再落地到磁盤
Replication-factor:復(fù)制因子(kafka容錯(cuò)機(jī)制),即副本數(shù)量,一般與Broker數(shù)量保持一致
Partitions:分區(qū)(每個(gè)分區(qū)只能被同一個(gè)消費(fèi)者組內(nèi)的一個(gè)消費(fèi)者消費(fèi))
Leaders:選舉partition中的副本為leader,負(fù)責(zé)處理讀寫
Follower: 未被選舉為leader的副本為follower,負(fù)責(zé)備份數(shù)據(jù)
Zookeeper:注冊(cè)消息,用于保存broker節(jié)點(diǎn)信息,維護(hù)ISR和OSR隊(duì)列,partition的offset等(ZK在kafka2.8版本之后移除,offset保存在本地)
副本同步機(jī)制
kafka 0.8以后,提供了HA機(jī)制,就是replica副本機(jī)制。每個(gè)partition的數(shù)據(jù)都會(huì)同步到其他機(jī)器上,形成自己的多個(gè)replica副本。然后所有replica會(huì)選舉一個(gè)leader出來,那么生產(chǎn)和消費(fèi)都跟這個(gè)leader打交道,然后其他replica就是follower。寫的時(shí)候,leader會(huì)負(fù)責(zé)把數(shù)據(jù)同步到所有follower上去,讀的時(shí)候就直接讀leader上數(shù)據(jù)即可。
[圖片上傳失敗...(image-6f7b38-1743417387765)]
Kafka 生產(chǎn)者ack機(jī)制
-
ack=0:
意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后不等待任何確認(rèn)
結(jié)果:生產(chǎn)者不知道消息是否成功到達(dá)服務(wù)端,容易造成消息丟失
-
ack=1:
意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后,等待leader的確認(rèn)
結(jié)果:意味著leader節(jié)點(diǎn)成功接收數(shù)據(jù),但是不一定同步了其他副本
-
ack=all 或 ack=-1:
意義:生產(chǎn)者在成功將消息發(fā)送給服務(wù)端之后,等待所有partition的副本確認(rèn)
結(jié)果:當(dāng)所有副本都接收到數(shù)據(jù)之后,才認(rèn)為消息被成功提交
Kafka 如果設(shè)置了ack(ack!=0),需要設(shè)置超時(shí)時(shí)間(retry.backoff.ms)和重試次數(shù)(retries)
ack過程中出現(xiàn)網(wǎng)絡(luò)問題,導(dǎo)致生產(chǎn)者重復(fù)發(fā)送消息怎么處理?
重復(fù)產(chǎn)生的核心原因
-
超時(shí)與重試機(jī)制 當(dāng) Producer 發(fā)送消息后,若在
request.timeout.ms(默認(rèn) 30 秒)內(nèi)未收到 Broker 的ack確認(rèn),Producer 會(huì)認(rèn)為消息發(fā)送失敗并觸發(fā)重試(retries參數(shù)控制重試次數(shù))。 風(fēng)險(xiǎn)場(chǎng)景:Broker 實(shí)際已成功寫入消息,但
ack響應(yīng)因網(wǎng)絡(luò)延遲未及時(shí)到達(dá) Producer。Producer 重試時(shí),Broker 會(huì)再次寫入相同的消息(若未啟用冪等性),導(dǎo)致消息重復(fù)。
Broker 的寫入機(jī)制 Kafka 的 Partition 日志是僅追加(append-only)的,即使消息內(nèi)容相同,重試時(shí)也會(huì)被當(dāng)作新消息寫入(除非開啟冪等性)。
避免重復(fù)的解決方案
(1) 啟用 Producer 冪等性
配置方式: 在 Producer 中設(shè)置
enable.idempotence=true(需同時(shí)滿足acks=all和retries > 0)。-
原理:
Producer 為每個(gè)消息分配唯一
PID(Producer ID)和序列號(hào)。Broker 根據(jù)
PID + 分區(qū)號(hào) + 序列號(hào)去重,拒絕重復(fù)消息。
適用場(chǎng)景: 單 Producer 實(shí)例內(nèi)保證 Exactly-Once 語義,避免因重試導(dǎo)致的重復(fù)。
(2) 使用 Kafka 事務(wù)
-
配置方式:
- 設(shè)置
transactional.id并調(diào)用initTransactions()、beginTransaction()、commitTransaction()。
- 設(shè)置
-
原理:
事務(wù)機(jī)制結(jié)合冪等性,確保跨多個(gè) Partition 和 Consumer 的原子性操作。
Broker 會(huì)記錄事務(wù)狀態(tài),中斷的事務(wù)自動(dòng)回滾。
適用場(chǎng)景: 需要跨多個(gè)消息或“讀-處理-寫”鏈路的 Exactly-Once 語義(如 Kafka Streams)。
(3) 業(yè)務(wù)端去重
-
實(shí)現(xiàn)方式:
在消息中攜帶唯一標(biāo)識(shí)(如 UUID 或業(yè)務(wù)主鍵)。
消費(fèi)者端根據(jù)唯一標(biāo)識(shí)去重(如寫入數(shù)據(jù)庫時(shí)檢查主鍵沖突)。
適用場(chǎng)景: 無法啟用冪等性或事務(wù)的舊版本 Kafka,或需要兼容其他消息系統(tǒng)的場(chǎng)景。
ISR 機(jī)制
Kafka 服務(wù)端根據(jù)副本同步的情況,分成了三個(gè)集合:
AR(Assigned Replicas):包括ISR和OSR
ISR(In-sync Replicas):和leader保持同步的副本集合,可以認(rèn)為是可靠數(shù)據(jù)(leader故障之后會(huì)在這個(gè)集合重新選舉)
OSR(Out-sync Replicas):和leader副本同步失效的副本集合(副本還會(huì)繼續(xù)同步數(shù)據(jù),同步成功之后會(huì)進(jìn)入ISR)
Kafka 數(shù)據(jù)持久化機(jī)制
Kafka 默認(rèn)使用操作系統(tǒng)的頁緩存(page cache)機(jī)制,將消息寫入內(nèi)存中的頁緩存,而非立即刷盤(高吞吐的原因)
默認(rèn)情況下,Kafka 通過以下兩個(gè)參數(shù)控制主動(dòng)刷盤:
log.flush.interval.messages:累計(jì)多少條消息后觸發(fā)刷盤(默認(rèn)Long.MAX_VALUE,即不主動(dòng)觸發(fā))log.flush.interval.ms:間隔多少毫秒后觸發(fā)刷盤(默認(rèn)null,即不主動(dòng)觸發(fā)) 實(shí)際默認(rèn)行為:依賴操作系統(tǒng)后臺(tái)線程(如 Linux 的pdflush)自動(dòng)刷盤,通常延遲約 30 秒
Broker 端配置
-
強(qiáng)制刷盤 修改
log.flush.interval.messages和log.flush.interval.ms,例如:log.flush.interval.messages=1 # 每條消息都刷盤(極端情況,性能極低)
log.flush.interval.ms=1000 # 每秒刷盤一次
此配置會(huì)顯著降低吞吐量,僅適用于對(duì)數(shù)據(jù)丟失零容忍的場(chǎng)景。
Kafka 數(shù)據(jù)模型與消息存儲(chǔ)機(jī)制
[圖片上傳失敗...(image-4d351-1743417387765)]
在同一個(gè)Topic下,一個(gè)partition對(duì)應(yīng)一個(gè)唯一的文件夾,在文件夾下,kafka 消息是采用 Segment File的存儲(chǔ)方式進(jìn)行存儲(chǔ)
Segment File:Segment由.index文件和.log文件組成,將大文件拆分成小文件來存儲(chǔ)(加快IO加載)
.index文件:索引文件,以key-value格式存儲(chǔ),key-表示索引文件的第幾條消息;value-表示這條消息在log file中的物理偏移量
.log文件:數(shù)據(jù)存儲(chǔ)文件
如何通過 offset 找到 某一條消息呢?
首先會(huì)根據(jù) offset 值去查找 Segment 中的 index 文件,因?yàn)?index 文件是以上個(gè)文件的最大 offset 偏移命名的所以可以通過二分法快速定位到索引文件。
找到索引文件后,索引文件中保存的是 offset 和對(duì)應(yīng)的消息行在 log 日志中的存儲(chǔ)行號(hào),因?yàn)?Kafka 采用稀疏矩陣的方式來存儲(chǔ)索引信息,并不是每一條索引都存儲(chǔ),所以這里只是查到文件中符合當(dāng)前 offset 范圍的索引。
拿到 當(dāng)前查到的范圍索引對(duì)應(yīng)的行號(hào)之后再去對(duì)應(yīng)的 log 文件中從 當(dāng)前 Position 位置開始查找 offset 對(duì)應(yīng)的消息,直到找到該 offset 為止。
每一條消息的組成內(nèi)容有如下字段:
Copy
offset: 4964(邏輯偏移量)
position: 75088(物理偏移量)
CreateTime: 1545203239308(創(chuàng)建時(shí)間)
isvalid: true(是否有效)
keysize: -1(鍵大小)
valuesize: 9(值大小)
magic: 2
compresscodec: NONE(壓縮編碼)
producerId: -1
producerEpoch: -1(epoch號(hào))
sequence: -1(序號(hào))
isTransactional: false(是否事務(wù))
headerKeys: []
payload: message_0(消息的具體內(nèi)容)
Kafka pull & push
pull是指consumer來拉取消息(默認(rèn));push是指kafka主動(dòng)推送消息給consumer
pull模式:根據(jù)consumer的消費(fèi)能力進(jìn)行數(shù)據(jù)拉取,可以批量拉或者單條拉;可以設(shè)置不同的提交方式實(shí)現(xiàn)不同的傳輸語義
缺點(diǎn):如果kafka沒有數(shù)據(jù)會(huì)導(dǎo)致consumer空循環(huán)(可以通過設(shè)置拉取數(shù)據(jù)為空或沒達(dá)到一定數(shù)量阻塞來解決)
push模式:不會(huì)導(dǎo)致consumer循環(huán)等待
缺點(diǎn):速率固定、忽略consumer的消費(fèi)能力,可能導(dǎo)致拒絕服務(wù)或網(wǎng)絡(luò)擁堵等情況
kafka常見問題
1、消息冪等性(重復(fù)消費(fèi))
重復(fù)消費(fèi)只針對(duì)消費(fèi)者端而言,消費(fèi)者要保證消息的冪等性,一般要結(jié)合業(yè)務(wù)場(chǎng)景進(jìn)行,主要有兩種解決方案:
1)redis:消費(fèi)數(shù)據(jù)后把消息的唯一鍵存到redis中,每次消費(fèi)的時(shí)候去redis查一下key是否存在
2)mysql:消費(fèi)數(shù)據(jù)后把消息的唯一鍵存到mysql中,每次消費(fèi)的時(shí)候查mysql
2、數(shù)據(jù)丟失
消費(fèi)者:消費(fèi)者自動(dòng)提交offset的時(shí)候會(huì)有數(shù)據(jù)丟失的情況,改為手動(dòng)提交offset可解決,極端情況下數(shù)據(jù)處理完后提交offset的時(shí)候掛了,可通過加冪等性操作解決
kafka:kafka某個(gè)broker宕機(jī),重新選舉leader時(shí)有數(shù)據(jù)未同步時(shí)會(huì)有消息丟失的情況,可通過設(shè)置參數(shù)(保證數(shù)據(jù)寫入到每個(gè)副本后才算寫入成功)解決
生產(chǎn)者:設(shè)置acks=all即可避免丟失數(shù)據(jù)
3、順序性
生產(chǎn)者:生產(chǎn)者沒有順序性問題
kafka:partition內(nèi)部有序,多個(gè)partition時(shí)無法保證順序,對(duì)需要順序消費(fèi)的數(shù)據(jù)指定到同一個(gè)partition即可保證順序
消費(fèi)者:同一個(gè)partition時(shí),當(dāng)消費(fèi)者內(nèi)部啟用多線程時(shí)會(huì)導(dǎo)致順序錯(cuò)亂,可在消費(fèi)者內(nèi)部啟用內(nèi)存隊(duì)列來保證多線程的順序
4、kafka 高性能高吞吐原因
-
磁盤順序讀寫:
Kafka 將消息按順序追加(Append)到 Partition 日志文件末尾,避免機(jī)械硬盤磁頭頻繁尋道
消費(fèi)者按偏移量(Offset)順序讀取消息,充分利用操作系統(tǒng)的磁盤預(yù)讀
頁緩存(page cache):消息直接寫入內(nèi)存頁緩存,由操作系統(tǒng)異步刷盤,讀操作優(yōu)先訪問緩存,減少物理磁盤 I/O
-
零拷貝:
傳統(tǒng)數(shù)據(jù)拷貝流程: 磁盤文件 → 內(nèi)核緩沖區(qū) → 用戶空間緩沖區(qū) → Socket 緩沖區(qū) → 網(wǎng)卡 4 次上下文切換 + 2 次 CPU 拷貝
Kafka 優(yōu)化(
sendfile系統(tǒng)調(diào)用): 磁盤文件 → 內(nèi)核緩沖區(qū) → 網(wǎng)卡 2 次上下文切換 + 0 次 CPU 拷貝,減少 50% 以上 CPU 消耗
-
分區(qū)分段+索引:分區(qū)是指partition,分段是指segment文件,索引是指segment里的.index索引
-
分區(qū)(Partition)機(jī)制
數(shù)據(jù)分片:每個(gè) Topic 劃分為多個(gè) Partition,分布在不同 Broker 上。
并行讀寫:生產(chǎn)者和消費(fèi)者可同時(shí)向多個(gè) Partition 寫入/讀取數(shù)據(jù),充分利用集群資源。
擴(kuò)展性:通過增加 Partition 和 Broker,線性提升吞吐量。
-
生產(chǎn)者負(fù)載均衡
-
分區(qū)策略:
默認(rèn)輪詢(Round Robin)或按 Key 哈希,確保數(shù)據(jù)均勻分布。
自定義策略支持業(yè)務(wù)特化路由。
-
-
消費(fèi)者組(Consumer Group)
并行消費(fèi):同一消費(fèi)者組內(nèi)多個(gè)消費(fèi)者實(shí)例分別消費(fèi)不同 Partition,提升消費(fèi)速度。
水平擴(kuò)容:增加消費(fèi)者數(shù)量即可擴(kuò)展消費(fèi)能力(需 Partition 數(shù)量 ≥ 消費(fèi)者數(shù)量)。
-
-
批量處理(Batching)
-
生產(chǎn)者端:
消息積累到
batch.size(默認(rèn) 16KB)或linger.ms(默認(rèn) 0ms)后批量發(fā)送,減少網(wǎng)絡(luò)請(qǐng)求次數(shù)。壓縮(Compression):支持 GZIP、Snappy、LZ4、Zstandard,降低網(wǎng)絡(luò)傳輸量(尤其文本類消息)。
-
Broker 端:
- 批量寫入磁盤,減少磁盤尋址次數(shù)。
-
消費(fèi)者端:
- 批量拉取消息(
max.poll.records控制單次拉取數(shù)量)。
- 批量拉取消息(
-
直接操作頁緩存(page cache):消息直接寫入內(nèi)存頁緩存,由操作系統(tǒng)異步刷盤,讀操作優(yōu)先訪問緩存,減少物理磁盤 I/O