kafka 作為一種開源的分布式消息隊列, 在生產(chǎn)環(huán)境中被大量的使用. 主要用于對系統(tǒng)的流量削峰填谷和做一些系統(tǒng)架構(gòu)上的解耦.
相關名詞
首先介紹一下kafka相關的名詞:
broker: 一般一個kafka集群有多個broker節(jié)點, broker集群組成了kafka集群
topic: 邏輯上存在的概念, 數(shù)據(jù)投遞到某個topic
partition: 一個topic的數(shù)據(jù)由一個或多個partition保存, 每個partition是一個有序的隊列, 同時每個partition都有備份的partition, 一組partition中會選舉出一個leader partition用來做讀寫的操作, 這個leader partition用來跟consumer 和 provider進行數(shù)據(jù)交互, 這也是leader-follower模式的好處, 能夠保證數(shù)據(jù)的有序性. follower partition會去leader partition中fetch數(shù)據(jù). 如果一旦leader partition出現(xiàn)故障, follower partition中還有備份數(shù)據(jù), 可以從follower partition中再選舉出一個leader partition繼續(xù)提供服務.當然這里還有一個問題就是, 如果leader partition發(fā)生故障以后, 其他follower partition沒能fetch到最新的數(shù)據(jù), 有一部分數(shù)據(jù)丟失的話這怎么辦? 我們先把問題拋出來, 后面在介紹.
segment: partition由多個segment組成. partition只是一個文件夾, 實際的數(shù)據(jù)保存在segment中. segment包含兩種文件, 索引文件(.index)和數(shù)據(jù)文件(.log), 在索引文件的命名是最后一條消息的偏移量, 數(shù)據(jù)文件的命名和索引文件保持一致.
offset: 消息消費進度的偏移值
consumer: 消息消費者
consumer group: 消息消費者組, 一條消息可以被多個consumer group消費, 一個consumer group只能有一個consumer消費消息
replicas: patition的副本, 保障partition的高可用
leader: replicas的一個角色, 一個partition只有一個leader, 且通過這個leader partition和生產(chǎn)者, 消費者進行交互
follower: replicas中的一個角色, follower partition是 leader partition的備份, 從leader partition中拉取數(shù)據(jù), 一旦leader partition宕機會從follower partition中選舉出一個leader partition繼續(xù)提供服務.
controller: 用來進行Leader partition的選舉和各種failover
Kafka 文件存儲
Partition
Topic是一個邏輯概念, 我們可以認為是一類消息的分組. Partition之于Topic是存儲數(shù)據(jù)的物理存在. 一個Topic可以進行分片成多個partition, 這是為了實現(xiàn)負載均衡和增強可擴展性, 從存儲層面來看, partition的數(shù)據(jù)是日志追加的方式. 當producer發(fā)送消息時根據(jù)一定的路由規(guī)則(隨機, key-hash, 輪詢算法等), 決定保存到具體的某一個partition中,并順序的寫入到磁盤中. 所以, 但對某個partition而言, 消息是有序的. partition文件包含兩部分, 一部分是索引文件, 一部分是數(shù)據(jù)文件. 索引文件記錄的是消息的偏移量, 能夠定位到具體的某一個消息保存在數(shù)據(jù)文件的哪個位置.

我們可以在server.properties中配置partition的數(shù)量, num.partition=1 .當指定了partition數(shù)量后, kafka會均勻的將partition分配到不同的broker中. partition的數(shù)量決定了這個topic的吞吐率
Segment
Partition并不是最終存儲數(shù)據(jù)的最小粒度, Segement才是最終保存數(shù)據(jù)的最小單元.在server.properties中, 我們可以指定log.dirs來指定partition的存儲位置.partition文件的命名規(guī)則是: topic name + 索引號, 索引范圍[0, num.partitions - 1].
我們來舉個例子, 下面是一個partition文件夾的示例:
00000000000000000000.index
00000000000000000000.log
00000000000000170410.index
00000000000000170410.log
00000000000000239430.index
00000000000000239430.log
前面我們提到segment文件包含兩部分, 一部分是索引文件(.index), 一部分是數(shù)據(jù)文件(.log), 索引文件保存的是元數(shù)據(jù). 在命名上也頗為講究, 文件名含有消息的偏移量, 以00000000000000170410.index和00000000000000170410.log為例

索引文件的元數(shù)據(jù)指向的是對應數(shù)據(jù)文件中消息的物理偏移地址, 有了消息的物理地址, 就可以訪問對應的消息了. 如元數(shù)據(jù)[2, 365]為例, 表示第二個消息,也是該partition的第170410+2=170412條消息的物理偏移地址為365.舉個例子, 假設我們要查找的170414條消息, 我們可以發(fā)現(xiàn)其落在索引文件00000000000000170410.index中, 并根據(jù)該文件中的[4, 666], 定位到數(shù)據(jù)文件666的位置讀取即可. 這里又會涉及到一個問題, 如何判斷完整的讀完消息了, 也就是定位到一條消息的終止位置. kafka的消息具有一定的格式, 會記錄消息的偏移量和消息體的長度, 所以讀取的時候可以定位到消息的終止位置. kafka的消息格式:
baseOffset: int64 #偏移量
batchLength: int32 #消息體長度
partitionLeaderEpoch: int32
magic: int8 (current magic value is 2)
crc: int32
attributes: int16
bit 0~2:
0: no compression
1: gzip
2: snappy
3: lz4
bit 3: timestampType
bit 4: isTransactional (0 means not transactional)
bit 5: isControlBatch (0 means not a control batch)
bit 6~15: unused
lastOffsetDelta: int32
firstTimestamp: int64
maxTimestamp: int64
producerId: int64
producerEpoch: int16
baseSequence: int32
records: [Record]
Kafka 存儲的可靠性
為了提高可用性, kafka也采用了副本機制. 每個partition都會配置一定的副本, 我們可以在broker的server.properties文件中設置default.replication.factor=n n >= 2 來設置復制因子.基于多副本機制, kafka可實現(xiàn)自動故障轉(zhuǎn)移.
- Leader: 響應client端讀寫請求的副本, producer和consumer只和Leader交互
- Follower: 被動地備份Leader數(shù)據(jù), 不能響應client端讀寫請求, 一旦Leader掛掉, 會從他的Follower中選舉一個新的Leader進行服務
- ISR: 包含了Leader在內(nèi)的其他與Leader數(shù)據(jù)保持同步的副本, ISR集合是一個動態(tài)集合, 可以將長時間(閾值)沒和Leader同步數(shù)據(jù)的Follower從集合中剔除.ISR由Leader進行維護跟蹤
AR & ISR & OSR
- AR: 全部副本的集合. AR = ISR + OSR
- ISR: Leader+可被選舉成為Leader的Follower動態(tài)集合
- OSR: 從ISR中剔除的Follower動態(tài)集合, 可以嘗試加入到ISR
假設我們將復制因子設置成2, 也就是有一個leader和一個follower. 在broker啟動時, 此時AR=ISR,OSR為空集合. 一旦Leader發(fā)生宕機或是服務不可用, kafka將會從ISR中選舉出一個新的Leader繼續(xù)提供服務. Leader partition會跟蹤Follower, 如果某個Follower宕機或是落后太多, 將會被移除ISR列表.落后太多的定義有兩種:
- follower復制的消息落后于leader預設的閾值,在server.properties配置中的
replica.lag.max.messages來配置 - 超過一定時間, leader沒有收到follower來pull消息的請求, 可以通過
replica.lag.time.max.ms來配置.
HW & LEO
- LEO: 日志末端位移(Log End Offset). 是收到的最新消息位移
- HW: 水位值, 對于同一個副本而言, HW值小于等于LEO, 小于等于HW的消息都是被認為已經(jīng)備份了, 對于一個partition而言, 其ISR中最小的LEO作為其HW, Consumer也最多只能消費到HW所在的位置. 每個Replica都有HW, 他們各自負責更新自己的HW.
我們來舉例說明HW和LEO.

Kafka的副本機制并不是完全采用同步機制和異步機制, 所謂同步機制就是指, 當Leader partition收到消息時, 需要保證所有的Follower partition都完成了復制, 才對producer響應成功, 這樣就依賴最慢的副本的HW. 這種方式看起來比較可靠, 但是會降低吞吐率, 影響到系統(tǒng)的性能. 異步機制是指只需要保證Leader partition收到消息后, 就響應producer成功(常見的一種主從模式), 如果此時Leader partition發(fā)生宕機, 則可能發(fā)生消息丟失, 這在一些系統(tǒng)上是不被允許的.
消息刷盤
kafka 通過持久化消息到磁盤來保障消息存儲的可靠性, 但是有個矛盾點, 頻繁的刷盤會導致性能降低, 但是刷盤時間間隔過長又存在丟消息的風險. kafka提供了log.flush.interval.ms 和 log.flush.interval.messages 兩個參數(shù)來控制 Broker 的刷盤時機.
-
log.flush.interval.ms: 默認null, 單位ms, 用于控制日志刷盤的時間間隔 -
log.flush.interval.messages: 用于控制日志刷盤的消息量, 即每累計多少條消息后寫到磁盤上.
推薦配置:
#每當producer寫入10000條消息時,刷數(shù)據(jù)到磁盤
log.flush.interval.messages=10000
#每間隔1秒鐘時間,刷數(shù)據(jù)到磁盤
log.flush.interval.ms=1000
Kafka 消息的可靠性
kafka消息的可靠性貫穿了消息的生產(chǎn), 發(fā)送, 存儲, 到消費的全鏈路.
消息生產(chǎn)可靠性
要保證不丟消息先從消息的源頭開始講起. Producer發(fā)送消息有三中方式: Sync(同步), Async(異步), OneWay.通過producer.type來配置, 默認是值sync.
同步對比異步
同步
一條消息發(fā)送到partition需要保證Leader partition收到消息, 并且所有的Follower partition完成消息同步, 能夠保證不丟消息, 但是吞吐率會下降.
異步
異步發(fā)送到一個緩沖區(qū), 緩存區(qū)的數(shù)據(jù)沒發(fā)送出去就返回給client了.
異步發(fā)送的方式可以批量(batch)發(fā)送, 這樣可以提高broker的性能和吞吐量, 減少網(wǎng)絡IO和磁盤請求次數(shù),但是可能會有丟數(shù)據(jù)的風險.
OneWay是異步方式的一種, 他不接收Broker返回的ack值, 只管發(fā), 不管異常.
request.required.acks
我們先來看看producer發(fā)消息的流程:
- producer往topic中發(fā)消息的時候首先通過zk找到Leader partition
- Leader partition收到消息以后會先將消息寫到log文件中, 數(shù)據(jù)還保存在內(nèi)存中
- Follower partition被動從Leader partition中pull消息后也將數(shù)據(jù)寫到其本地的log文件中, 這個時候數(shù)據(jù)也只保存在Follower partition的內(nèi)存中, 還沒寫到磁盤. 為了提高性能,立刻向leader發(fā)送一個ack. leader收到所有follower的ack消息, 則認為這條消息已經(jīng)commit了, 將會向producer發(fā)送一個ack消息.
上面的三個流程對應三個不同的ack時間節(jié)點, producer的ack值有三種配置方式, 我們可以通過request.required.acks屬性來配置:
- 0: producer不等待partition返回確認消息, 這樣可以得到最大的吞吐率, 但是可能丟消息
- 1: 等待Leader partition保存成功狀態(tài)便返回, 但是不會等待Follower partition同步完成, 有不錯的可靠性, 吞吐率也得到提升
- -1: 等待Follower partition都收到數(shù)據(jù), 這種方法看起來最可靠, 理論上不會丟消息
根據(jù)request.required.acks的三種設置方式, 我們發(fā)現(xiàn)request.required.acks=-1 看起來能夠保證不丟消息, 實際上也不能避免丟消息. 假設在如下場景中,我們設置 request.required.acks=-1 , 當Leader partition收到消息后, ISR中除去Leader partition以外的其他Follower副本都因為各種原因(宕機或是同步消息滯后超過閾值), 從ISR中被剔除. 此時ISR中僅剩下Leader partition, 這種情況并不能保證消息的可靠性.
因此, 我們需要對ISR列表中的最小副本數(shù)加以約束, 通過設置參數(shù)min.insync.replicas大于等于2, 來約束需要除去Leader partition以外的至少一個Follower partition同步了消息. 否則producer將收到異常:
org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
綜上所述, 我們可以在producer中設置request.required.acks=-1和broker中設置min.insync.replicas=n n >= 2, 來提高消息的在生產(chǎn)過程中的可靠性.
producer的其他參數(shù)
-
batch.size: 當多條消息的partition相同時, producer會嘗試將他們組裝成一個批量消息, 一次性的發(fā)到partition中, 這樣可以減少請求次數(shù), 有助于提高性能.需要注意的是batch.size不建議設置的過大或過小, 過大會浪費內(nèi)存, 過小這可能降低吞吐量 -
linger.ms: 在批處理的場景下, 有可能消息生產(chǎn)的速率快于消息被發(fā)送到partition的速率, 可以通過設置linger.ms來延遲發(fā)送消息, 提高發(fā)送消息的效率.batch.size的優(yōu)先級高于linger.ms, 一旦消息積累的大小達到了batch.size的閾值, 即便沒有到linger.ms的時間消息也會被發(fā)送到partition. -
buffer.memory: producer用來緩沖等待發(fā)送給partition的消息的總字節(jié)數(shù), 也就是一個緩沖區(qū). 如果緩沖區(qū)被占滿, 之后producer將會被阻塞max.block.ms, 隨后拋出異常.buffer.memory的大小大致與producer可使用的總內(nèi)存相對應, 不是硬綁定, 因為并非producer使用的所有內(nèi)存都用于緩沖.一些額外的內(nèi)存用于壓縮和維護請求. -
retries: producer發(fā)送消息失敗后的重試次數(shù). 部分場景為了絕對可靠設置成Integer.MAX_VALUE -
max.in.flight.requests.per.connection: 在批量發(fā)送的場景下, 假設有兩個批量消息, 他們的目標partition相同, 順序分別為T1, T2. 當他們先后被發(fā)送到partition時, T1由于某些原因發(fā)送失敗了, 這樣就可能會導致消息的亂序. 這對于一些對時序很敏感的系統(tǒng)中是很致命的. 為了解決這種問題, 可以通過max.in.flight.requests.per.connection來設置單個鏈接上發(fā)送的未確認請求的最大數(shù)目. 如果設置大于1(默認是5), 并且發(fā)送失敗則存在由于重試而導致消息重新排序的風險. 所以, 在開啟重試的情況下(retries > 0), 為了規(guī)避消息被重新排序的風險, 建議修改配置max.in.flight.requests.per.connection=1. 這樣就能保證一個時間段內(nèi)只有個批量請求發(fā)到partition.
消費消息可靠性
consumer
Kafka的consumer和partition存在這么一種關系, consumer數(shù)量不應該多于partition的設置, 因為多出來的consumer并不能消費partition的消息, consumer也會固定消費某個或是某幾個partition的消息, 除非觸發(fā)rebalance后可能會導致consumer消費的partition發(fā)生變化. 此外, 一個partition只被被固定的consumer消費. consumer和partition的關系會被保存在zookeeper的節(jié)點中, 當初觸發(fā)rebalance時, 原有的關系節(jié)點將會被刪除, 保存新的綁定關系節(jié)點.
offset
Kafka offset記錄的是消息的消費進度, Kafka的offset有兩種保存方式:
通過配置參數(shù):
zookeeper.connect, 這種情況下, 消費進度會保存到zookeeper下的consumers/{group}/offsets/{topic}/{partition}目錄下.-
通過配置參數(shù):
bootstrap.servers, 這是通過kafka默認api的消費方式, offset會保存在kafka的一個默認topic__consumer_offsets. 查看當前group的消費進度, 要依靠kafka自帶的工具【kafka-consumer-offset-checker】.當前高級別版本的API接口都是通過第二種方式來記錄偏移量的
提交方式
消息commit的方式有兩種, 一種是自動提交enable.auto.commit=true, 一種是手動提交enable.auto.commit=false
自動提交
自動提交規(guī)則是, 消費者會每隔一定的時間間隔來自動提交一次該消費者進程的消費的所有partition的offset(由auto.commit.interval.ms指定).需要注意的是這種方式可能導致重復消費和丟消息的問題.
重復消費: 當consumer poll消息并消費后, 未到自動提交offset的時機,這時觸發(fā)了rebalance. rebalance后這個consumer依然消費原來的partition, 將從最后一個offset消費消息, 這個時候就出現(xiàn)了重復消費.
丟消息: 當consumer poll消息后, 業(yè)務上并沒有處理完, 但是觸發(fā)了commit的時機, 提交了offset, 恰巧的是出現(xiàn)了crash. 當rebalance以后, 這條消息就被丟掉了, 沒能成功消費.
所以, 最好還是采用手動提交的方式來避免rebalance帶來的重復消費和丟消息的問題.
手動提交
剛剛分析了自動提交的問題, 手動提交也不是沒有問題, 手動提交也可能導致一些異常.比如說, 一個consumer進程有兩個線程t1,t2 分別消費兩個partition分區(qū)p1, p2. 當t1消費完消息以后手動commit, 這個時候會提交consumer所有partition的offset, 也就是t2的offset也提交了, 但實際上t2這是可能只是poll了消息并沒有消費完成, 如果consumer進程這個時候發(fā)生crash, 那么t2消費的消息也會丟到. 目前的kafka控制offset粒度還是一個進程粒度.
那咋整呢?如果consumer數(shù)量和partition數(shù)量能夠一致, 那就不會有這個問題, 說白了就不會有一個consumer消費多個partition的情況. 或者, 我們可以poll消息以后放到一個隊列中, 將隊列中的所有消息消費掉以后提交一個批量的offset.
Kafka 全程解析
生產(chǎn)消息
- producer push消息: 基于負載均衡算法獲取到目標partition后, producer先從zk中獲取到該partition的Leader partition
- producer 將消息發(fā)給Leader partition
- Leader partition將消息追加到日志文件中
- Follower partition主動到Leader partition中poll 消息, 寫入本地log, 并向Leader partition發(fā)回一個ack消息
- 當Follower partition完成了同步, Leader partition更新HW, 并向Producer發(fā)送ack
- producer接收到ack, 確認發(fā)送成功
消息存儲
topic是一個邏輯概念, 而topic對應的partition是物理概念, partition并不是最小的存儲單元,其實是一個文件夾, 由多個segment組成,segment才是最小的存儲單元. segment文件分為 索引文件(.index)和數(shù)據(jù)文件(.log). 索引文件存儲元數(shù)據(jù)用來定位消息在數(shù)據(jù)文件的具體位置.
消息消費
consumer指定topic消費, 并將消息消費的偏移量記錄在一個固定的topic中. partition和consumer的關系是多對一的關系, 即一個partition只能被一個consumer消費, 一個consumer可以消費多個partition. 如果consumer的數(shù)據(jù)>partition數(shù)量, 多出來的consumer并不會消費任何的partition.