1. Kafka工作流程及文件存儲(chǔ)機(jī)制

Kafka中消息是以topic進(jìn)行分類的,生產(chǎn)者生產(chǎn)消息,消費(fèi)者消費(fèi)消息,都是面向topic的。
topic是邏輯上的概念,而partition是物理上的概念,每個(gè)partition對(duì)應(yīng)于一個(gè)log文件,該log文件中存儲(chǔ)的就是producer生產(chǎn)的數(shù)據(jù)。Producer生產(chǎn)的數(shù)據(jù)會(huì)被不斷追加到該log文件末端,且每條數(shù)據(jù)都有自己的offset。消費(fèi)者組中的每個(gè)消費(fèi)者,都會(huì)實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset,以便出錯(cuò)恢復(fù)時(shí),從上次的位置繼續(xù)消費(fèi)。

由于生產(chǎn)者生產(chǎn)的消息會(huì)不斷追加到log文件末尾,為防止log文件過大導(dǎo)致數(shù)據(jù)定位效率低下,Kafka采取了分片和索引機(jī)制,將每個(gè)partition分為多個(gè)segment。每個(gè)segment對(duì)應(yīng)兩個(gè)文件——“.index”文件和“.log”文件。這些文件位于一個(gè)文件夾下,該文件夾的命名規(guī)則為:topic名稱+分區(qū)序號(hào)。例如,first這個(gè)topic有三個(gè)分區(qū),則其對(duì)應(yīng)的文件夾為first-0,first-1,first-2。
index和log文件以當(dāng)前segment的第一條消息的offset命名。下圖為index文件和log文件的結(jié)構(gòu)示意圖。

“.index”文件存儲(chǔ)大量的索引信息,“.log”文件存儲(chǔ)大量的數(shù)據(jù),索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中message的物理偏移地址。
2. Kafka生產(chǎn)者
2.1 分區(qū)策略
分區(qū)的原因
(1)方便在集群中擴(kuò)展,每個(gè)Partition可以通過調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
(2)可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫了。
分區(qū)的原則
我們需要將producer發(fā)送的數(shù)據(jù)封裝成一個(gè)ProducerRecord對(duì)象。

- 指明 partition 的情況下,直接將指明的值直接作為 partiton 值;
- 沒有指明 partition 值但有 key 的情況下,將 key 的 hash 值與 topic 的 partition 數(shù)進(jìn)行取余得到 partition 值;
- 既沒有 partition 值又沒有 key 值的情況下,第一次調(diào)用時(shí)隨機(jī)生成一個(gè)整數(shù)(后面每次調(diào)用在這個(gè)整數(shù)上自增),將這個(gè)值與 topic 可用的 partition 總數(shù)取余得到 partition 值,也就是常說的 round-robin 算法。
2.2 數(shù)據(jù)可靠性保證
為保證producer發(fā)送的數(shù)據(jù),能可靠的發(fā)送到指定的topic,topic的每個(gè)partition收到producer發(fā)送的數(shù)據(jù)后,都需要向producer發(fā)送ack(acknowledgement確認(rèn)收到),如果producer收到ack,就會(huì)進(jìn)行下一輪的發(fā)送,否則重新發(fā)送數(shù)據(jù)。

2.2.1 副本數(shù)據(jù)同步策略
| 方案 | 優(yōu)點(diǎn) | 缺點(diǎn) |
|---|---|---|
| 半數(shù)以上完成同步,就發(fā)送ack | 延遲低 | 選舉新的leader時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障,需要2n+1個(gè)副本 |
| 全部完成同步,才發(fā)送ack | 選舉新的leader時(shí),容忍n臺(tái)節(jié)點(diǎn)的故障,需要n+1個(gè)副本 | 延遲高 |
Kafka選擇了第二種方案,原因如下:
- 同樣為了容忍n臺(tái)節(jié)點(diǎn)的故障,第一種方案需要2n+1個(gè)副本,而第二種方案只需要n+1個(gè)副本,而Kafka的每個(gè)分區(qū)都有大量的數(shù)據(jù),第一種方案會(huì)造成大量數(shù)據(jù)的冗余。
- 雖然第二種方案的網(wǎng)絡(luò)延遲會(huì)比較高,但網(wǎng)絡(luò)延遲對(duì)Kafka的影響較小。
2.2.2 ISR
采用第二種方案之后,設(shè)想以下情景:leader收到數(shù)據(jù),所有follower都開始同步數(shù)據(jù),但有一個(gè)follower,因?yàn)槟撤N故障,遲遲不能與leader進(jìn)行同步,那leader就要一直等下去,直到它完成同步,才能發(fā)送ack。這個(gè)問題怎么解決呢?
Leader維護(hù)了一個(gè)動(dòng)態(tài)的in-sync replica set (ISR),意為和leader保持同步的follower集合。當(dāng)ISR中的follower完成數(shù)據(jù)的同步之后,leader就會(huì)給follower發(fā)送ack。如果follower長時(shí)間未向leader同步數(shù)據(jù),則該follower將被踢出ISR,該時(shí)間閾值由replica.lag.time.max.ms參數(shù)設(shè)定。Leader發(fā)生故障之后,就會(huì)從ISR中選舉新的leader。
2.2.3 ack應(yīng)答機(jī)制
對(duì)于某些不太重要的數(shù)據(jù),對(duì)數(shù)據(jù)的可靠性要求不是很高,能夠容忍數(shù)據(jù)的少量丟失,所以沒必要等ISR中的follower全部接收成功。
所以Kafka為用戶提供了三種可靠性級(jí)別,用戶根據(jù)對(duì)可靠性和延遲的要求進(jìn)行權(quán)衡,選擇以下的配置。
acks參數(shù)配置:acks:
- 0:producer不等待broker的ack,這一操作提供了一個(gè)最低的延遲,broker一接收到還沒有寫入磁盤就已經(jīng)返回,當(dāng)broker故障時(shí)有可能丟失數(shù)據(jù);
- producer等待broker的ack,partition的leader落盤成功后返回ack,如果在follower同步成功之前l(fā)eader故障,那么將會(huì)丟失數(shù)據(jù);
- -1(all):producer等待broker的ack,partition的leader和follower全部落盤成功后才返回ack。但是如果在follower同步完成后,broker發(fā)送ack之前,leader發(fā)生故障,那么會(huì)造成數(shù)據(jù)重復(fù)。


2.2.4 故障處理細(xì)節(jié)

- follower故障
- follower發(fā)生故障后會(huì)被臨時(shí)踢出ISR,待該follower恢復(fù)后,follower會(huì)讀取本地磁盤記錄的上次的HW,并將log文件高于HW的部分截取掉,從HW開始向leader進(jìn)行同步。等該follower的LEO大于等于該P(yáng)artition的HW,即follower追上leader之后,就可以重新加入ISR了。
- leader故
- leader發(fā)生故障之后,會(huì)從ISR中選出一個(gè)新的leader,之后,為保證多個(gè)副本之間的數(shù)據(jù)一致性,其余的follower會(huì)先將各自的log文件高于HW的部分截掉,然后從新的leader同步數(shù)據(jù)。
3. Kafka消費(fèi)者
3.1 消費(fèi)方式
consumer采用pull(拉)模式從broker中讀取數(shù)據(jù)。
push(推)模式很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由broker決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成consumer來不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而pull模式則可以根據(jù)consumer的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
pull模式不足之處是,如果kafka沒有數(shù)據(jù),消費(fèi)者可能會(huì)陷入循環(huán)中,一直返回空數(shù)據(jù)。針對(duì)這一點(diǎn),Kafka的消費(fèi)者在消費(fèi)數(shù)據(jù)時(shí)會(huì)傳入一個(gè)時(shí)長參數(shù)timeout,如果當(dāng)前沒有數(shù)據(jù)可供消費(fèi),consumer會(huì)等待一段時(shí)間之后再返回,這段時(shí)長即為timeout。
3.2 分區(qū)分配策略
一個(gè)consumer group中有多個(gè)consumer,一個(gè) topic有多個(gè)partition,所以必然會(huì)涉及到partition的分配問題,即確定那個(gè)partition由哪個(gè)consumer來消費(fèi)。
Kafka有兩種分配策略,一是roundrobin,一是range。
roundrobin : 輪詢機(jī)制,動(dòng)態(tài)平均分配
range: 固定等額分配,容易產(chǎn)生分配不均
3.3 offset的維護(hù)
由于consumer在消費(fèi)過程中可能會(huì)出現(xiàn)斷電宕機(jī)等故障,consumer恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以consumer需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè)offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。
Kafka 0.9版本之前,consumer默認(rèn)將offset保存在Zookeeper中,從0.9版本開始,consumer默認(rèn)將offset保存在Kafka一個(gè)內(nèi)置的topic中,該topic為__consumer_offsets。
3.4 Kafka 高效讀寫數(shù)據(jù)
3.4.1 順序?qū)懘疟P
Kafka的producer生產(chǎn)數(shù)據(jù),要寫入到log文件中,寫的過程是一直追加到文件末端,為順序?qū)憽9倬W(wǎng)有數(shù)據(jù)表明,同樣的磁盤,順序?qū)懩艿降?00M/s,而隨機(jī)寫只有100k/s。這與磁盤的機(jī)械機(jī)構(gòu)有關(guān),順序?qū)懼钥?,是因?yàn)槠涫∪チ舜罅看蓬^尋址的時(shí)間。
3.4.2 零復(fù)制技術(shù)

免去了對(duì)用戶端的讀寫流程。
3.5 Zookeeper在Kafka中的作用
Kafka集群中有一個(gè)broker會(huì)被選舉為Controller,負(fù)責(zé)管理集群broker的上下線,所有topic的分區(qū)副本分配和leader選舉等工作。
Controller的管理工作都是依賴于Zookeeper的。
以下為partition的leader選舉過程:
