一、Kafka 簡介
Apache Kafka 是一個分布式發(fā)布-訂閱消息系統(tǒng)。是大數(shù)據(jù)領(lǐng)域消息隊列中唯一的王者。最初由 linkedin 公司使用 scala 語言開發(fā),在2010年貢獻給了Apache基金會并成為頂級開源項目。至今已有十余年,仍然是大數(shù)據(jù)領(lǐng)域不可或缺的并且是越來越重要的一個組件。
Kafka 適合離線和在線消息,消息保留在磁盤上,并在集群內(nèi)復制以防止數(shù)據(jù)丟失。kafka構(gòu)建在zookeeper同步服務之上。它與 Flink 和 Spark 有非常好的集成,應用于實時流式數(shù)據(jù)分析。
Kafka特點:
- 可靠性:具有副本及容錯機制。
- 可擴展性:kafka無需停機即可擴展節(jié)點及節(jié)點上線。
- 持久性:數(shù)據(jù)存儲到磁盤上,持久性保存。
- 性能:kafka具有高吞吐量。達到TB級的數(shù)據(jù),也有非常穩(wěn)定的性能。
- 速度快:順序?qū)懭牒土憧截惣夹g(shù)使得kafka延遲控制在毫秒級。
二、Kafka 主要組件
先看下 Kafka 系統(tǒng)的架構(gòu)

kafka支持消息持久化,消費端是主動拉取數(shù)據(jù),消費狀態(tài)和訂閱關(guān)系由客戶端負責維護,消息消費完后,不會立即刪除,會保留歷史消息。因此支持多訂閱時,消息只會存儲一份就可以。
- broker:kafka集群中包含一個或者多個服務實例(節(jié)點),這種服務實例被稱為broker(一個broker就是一個節(jié)點/一個服務器);
- topic:每條發(fā)布到kafka集群的消息都屬于某個類別,這個類別就叫做topic;
- partition:partition是一個物理上的概念,每個topic包含一個或者多個partition;
- segment:一個partition當中存在多個segment文件段,每個segment分為兩部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查詢, .log 文件當中數(shù)據(jù)的偏移量位置;
- producer:消息的生產(chǎn)者,負責發(fā)布消息到 kafka 的 broker 中;
- consumer:消息的消費者,向 kafka 的 broker 中讀取消息的客戶端;
- consumer group:消費者組,每一個 consumer 屬于一個特定的 consumer group(可以為每個consumer指定 groupName);
- .log:存放數(shù)據(jù)文件;
- .index:存放.log文件的索引數(shù)據(jù)。
2.1 producer(生產(chǎn)者)
producer主要是用于生產(chǎn)消息,是kafka當中的消息生產(chǎn)者,生產(chǎn)的消息通過topic進行歸類,保存到kafka的broker里面去。

2.2 topic(主題)
- kafka將消息以topic為單位進行歸類;
- topic特指kafka處理的消息源(feeds of messages)的不同分類;
- topic是一種分類或者發(fā)布的一些列記錄的名義上的名字。kafka主題始終是支持多用戶訂閱的;也就是說,一 個主題可以有零個,一個或者多個消費者訂閱寫入的數(shù)據(jù);
- 在kafka集群中,可以有無數(shù)的主題;
- 生產(chǎn)者和消費者消費數(shù)據(jù)一般以主題為單位。更細粒度可以到分區(qū)級別。
2.3 partition(分區(qū))
kafka當中,topic是消息的歸類,一個topic可以有多個分區(qū)(partition),每個分區(qū)保存部分topic的數(shù)據(jù),所有的partition當中的數(shù)據(jù)全部合并起來,就是一個topic當中的所有的數(shù)據(jù)。
一個broker服務下,可以創(chuàng)建多個分區(qū),broker數(shù)與分區(qū)數(shù)沒有關(guān)系; 在kafka中,每一個分區(qū)會有一個編號:編號從0開始。 每一個分區(qū)內(nèi)的數(shù)據(jù)是有序的,但全局的數(shù)據(jù)不能保證是有序的。(有序是指生產(chǎn)什么樣順序,消費時也是什么樣的順序)
2.4 consumer(消費者)
consumer是kafka當中的消費者,主要用于消費kafka當中的數(shù)據(jù),消費者一定是歸屬于某個消費組中的。
2.5 consumer group(消費者組)
消費者組由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次。
每個消費者都屬于某個消費者組,如果不指定,那么所有的消費者都屬于默認的組。
每個消費者組都有一個ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費一個訂閱主題( topic)的所有分區(qū)(partition)。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個消費者(consumer)來消費,可以由不同的消費組來消費。
partition數(shù)量決定了每個consumer group中并發(fā)消費者的最大數(shù)量。如下圖:

如上面左圖所示,如果只有兩個分區(qū),即使一個組內(nèi)的消費者有4個,也會有兩個空閑的。 如上面右圖所示,有4個分區(qū),每個消費者消費一個分區(qū),并發(fā)量達到最大4。
在來看如下一幅圖:

如上圖所示,不同的消費者組消費同一個topic,這個topic有4個分區(qū),分布在兩個節(jié)點上。左邊的 消費組1有兩個消費者,每個消費者就要消費兩個分區(qū)才能把消息完整的消費完,右邊的 消費組2有四個消費者,每個消費者消費一個分區(qū)即可。
總結(jié)下kafka中分區(qū)與消費組的關(guān)系:
消費組: 由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次。 某一個主題下的分區(qū)數(shù),對于消費該主題的同一個消費組下的消費者數(shù)量,應該小于等于該主題下的分區(qū)數(shù)。
如:某一個主題有4個分區(qū),那么消費組中的消費者應該小于等于4,而且最好與分區(qū)數(shù)成整數(shù)倍 1 2 4 這樣。同一個分區(qū)下的數(shù)據(jù),在同一時刻,不能同一個消費組的不同消費者消費。
總結(jié):分區(qū)數(shù)越多,同一時間可以有越多的消費者來進行消費,消費數(shù)據(jù)的速度就會越快,提高消費的性能。
2.6 partition replicas(分區(qū)副本)
kafka 中的分區(qū)副本如下圖所示:

副本數(shù)(replication-factor):控制消息保存在幾個broker(服務器)上,一般情況下副本數(shù)等于broker的個數(shù)。
一個broker服務下,不可以創(chuàng)建多個副本因子。創(chuàng)建主題時,副本因子應該小于等于可用的broker數(shù)。
副本因子操作以分區(qū)為單位的。每個分區(qū)都有各自的主副本和從副本;
主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個分區(qū)下的所有分區(qū),設定角色關(guān)系:一個leader和N個 follower),處于同步狀態(tài)的副本叫做in-sync-replicas(ISR);
follower通過拉的方式從leader同步數(shù)據(jù)。 消費者和生產(chǎn)者都是從leader讀寫數(shù)據(jù),不與follower交互。
副本因子的作用:讓kafka讀取數(shù)據(jù)和寫入數(shù)據(jù)時的可靠性。
副本因子是包含本身,同一個副本因子不能放在同一個broker中。
如果某一個分區(qū)有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因為在另一臺啟動的話,存在數(shù)據(jù)傳遞,只要在機器之間有數(shù)據(jù)傳遞,就會長時間占用網(wǎng)絡IO,kafka是一個高吞吐量的消息系統(tǒng),這個情況不允許發(fā)生)所以不會在另一個broker中啟動。
如果所有的副本都掛了,生產(chǎn)者如果生產(chǎn)數(shù)據(jù)到指定分區(qū)的話,將寫入不成功。
lsr表示:當前可用的副本。
2.7 segment文件
一個partition當中由多個segment文件組成,每個segment文件,包含兩部分,一個是 .log 文件,另外一個是 .index 文件,其中 .log 文件包含了我們發(fā)送的數(shù)據(jù)存儲,.index 文件,記錄的是我們.log文件的數(shù)據(jù)索引值,以便于我們加快數(shù)據(jù)的查詢速度。
索引文件與數(shù)據(jù)文件的關(guān)系
既然它們是一一對應成對出現(xiàn),必然有關(guān)系。索引文件中元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址。
比如索引文件中 3,497 代表:數(shù)據(jù)文件中的第三個message,它的偏移地址為497。
再來看數(shù)據(jù)文件中,Message 368772表示:在全局partiton中是第368772個message。
注:segment index file 采取稀疏索引存儲方式,減少索引文件大小,通過mmap(內(nèi)存映射)可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個對應message設置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間。
.index 與 .log 對應關(guān)系如下:

上圖左半部分是索引文件,里面存儲的是一對一對的key-value,其中key是消息在數(shù)據(jù)文件(對應的log文件)中的編號,比如“1,3,6,8……”, 分別表示在log文件中的第1條消息、第3條消息、第6條消息、第8條消息……
那么為什么在index文件中這些編號不是連續(xù)的呢? 這是因為index文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。 這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。 但缺點是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。
value 代表的是在全局partiton中的第幾個消息。
以索引文件中元數(shù)據(jù) 3,497 為例,其中3代表在右邊log數(shù)據(jù)文件中從上到下第3個消息, 497表示該消息的物理偏移地址(位置)為497(也表示在全局partiton表示第497個消息-順序?qū)懭胩匦?。
log日志目錄及組成
kafka在我們指定的log.dir目錄下,會創(chuàng)建一些文件夾;名字是 (主題名字-分區(qū)名) 所組成的文件夾。 在(主題名字-分區(qū)名)的目錄下,會有兩個文件存在,如下所示:
#索引文件
00000000000000000000.index
#日志內(nèi)容
00000000000000000000.log
在目錄下的文件,會根據(jù)log日志的大小進行切分,.log文件的大小為1G的時候,就會進行切分文件;如下:
-rw-r--r--. 1 root root 389k 1月 17 18:03 00000000000000000000.index
-rw-r--r--. 1 root root 1.0G 1月 17 18:03 00000000000000000000.log
-rw-r--r--. 1 root root 10M 1月 17 18:03 00000000000000077894.index
-rw-r--r--. 1 root root 127M 1月 17 18:03 00000000000000077894.log
在kafka的設計中,將offset值作為了文件名的一部分。
segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個全局 partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字就用 0 填充。
通過索引信息可以快速定位到message。通過index元數(shù)據(jù)全部映射到內(nèi)存,可以避免segment File的IO磁盤操作;
通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。
稀疏索引:為了數(shù)據(jù)創(chuàng)建索引,但范圍并不是為每一條創(chuàng)建,而是為某一個區(qū)間創(chuàng)建; 好處:就是可以減少索引值的數(shù)量。 不好的地方:找到索引區(qū)間之后,要得進行第二次處理。
2.8 message的物理結(jié)構(gòu)
生產(chǎn)者發(fā)送到kafka的每條消息,都被kafka包裝成了一個message
message 的物理結(jié)構(gòu)如下圖所示:

所以生產(chǎn)者發(fā)送給kafka的消息并不是直接存儲起來,而是經(jīng)過kafka的包裝,每條消息都是上圖這個結(jié)構(gòu),只有最后一個字段才是真正生產(chǎn)者發(fā)送的消息數(shù)據(jù)。
三、Kafka 生產(chǎn)者
3.1 消息發(fā)送方式
生產(chǎn)者發(fā)送給kafka數(shù)據(jù),可以采用同步方式或異步方式。
3.1.1 同步方式
發(fā)送一批數(shù)據(jù)給kafka后,等待kafka返回結(jié)果:
- 生產(chǎn)者等待10s,如果broker沒有給出ack響應,就認為失敗。
- 生產(chǎn)者重試3次,如果還沒有響應,就報錯
這類錯誤可以通過重發(fā)消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無主錯誤則可以通過重新為分區(qū)選舉首領(lǐng)來解決。
3.1.2 異步方式
同步發(fā)送消息都有個問題,那就是同一時間只能有一個消息在發(fā)送,這會造成許多消息無法直接發(fā)送,造成消息滯后,無法發(fā)揮效益最大化。
比如消息在應用程序和 Kafka 集群之間一個來回需要 10ms。如果發(fā)送完每個消息后都等待響應的話,那么發(fā)送100個消息需要 1 秒,但是如果是異步方式的話,發(fā)送 100 條消息所需要的時間就會少很多很多。大多數(shù)時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們并不需要等待響應。
注:如果broker遲遲不給ack,而buffer又滿了,開發(fā)者可以設置是否直接清空buffer中的數(shù)據(jù)。
3.1.3 ack機制(確認機制)
生產(chǎn)者數(shù)據(jù)發(fā)送出去,需要服務端返回一個確認碼,即ack響應碼;ack的響應有三個狀態(tài)值0,1,-1
- 0:生產(chǎn)者只負責發(fā)送數(shù)據(jù),不關(guān)心數(shù)據(jù)是否丟失,丟失的數(shù)據(jù),需要再次發(fā)送
- 1:partition的leader收到數(shù)據(jù),不管follow是否同步完數(shù)據(jù),響應的狀態(tài)碼為1
- -1:所有的從節(jié)點都收到數(shù)據(jù),響應的狀態(tài)碼為-1
如果broker端一直不返回ack狀態(tài),producer永遠不知道是否成功;producer可以設置一個超時時間10s,超過時間認為失敗。
3.2 Kafka 生產(chǎn)者分區(qū)機制
Kafka 對于數(shù)據(jù)的讀寫是以分區(qū)為粒度的,分區(qū)可以分布在多個主機(Broker)中,這樣每個節(jié)點能夠?qū)崿F(xiàn)獨立的數(shù)據(jù)寫入和讀取,并且能夠通過增加新的節(jié)點來增加 Kafka 集群的吞吐量,通過分區(qū)部署在多個 Broker 來實現(xiàn)負載均衡的效果。
3.2.1 分區(qū)策略
Kafka 的分區(qū)策略指的就是將生產(chǎn)者發(fā)送到哪個分區(qū)的算法。Kafka 為我們提供了默認的分區(qū)策略,同時它也支持你自定義分區(qū)策略。分區(qū)策略有下面這幾種:
順序輪詢
順序分配,消息是均勻的分配給每個 partition,即每個分區(qū)存儲一次消息。就像下面這樣
上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。
隨機輪詢
隨機輪詢簡而言之就是隨機的向 partition 中保存消息,如下圖所示
按照 key 進行消息保存
這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示
3.3 Kafka 生產(chǎn)者壓縮機制
Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日志項,而日志項才是真正封裝消息的地方。Kafka 底層的消息日志由一系列消息集合日志項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。
Kafka Producer 中使用 compression.type 來開啟壓縮
private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");
上面代碼表明該 Producer 的壓縮算法使用的是 GZIP
四、Kafka 消費者
4.1 分區(qū)重平衡
消費者演變過程大致如下:最初是一個消費者訂閱一個主題并消費其全部分區(qū)的消息,后來有一個消費者加入群組,隨后又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區(qū)的所有權(quán)通過一個消費者轉(zhuǎn)到其他消費者的行為稱為重平衡,英文名也叫做Rebalance` 。如下圖所示
重平衡非常重要,它為消費者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區(qū)被重新分配給另一個消費者時,消息當前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態(tài)之前會拖慢應用程序。
消費者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護自己是消費者組的一員并確認其擁有的分區(qū)。對于不同不的消費群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費者定期發(fā)送心跳,就會認為消費者是存活的并處理其分區(qū)中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發(fā)送心跳。
如果過了一段時間 Kafka 停止發(fā)送心跳了,會話(Session)就會過期,組織協(xié)調(diào)者就會認為這個 Consumer 已經(jīng)死亡,就會觸發(fā)一次重平衡。如果消費者宕機并且停止發(fā)送消息,組織協(xié)調(diào)者會等待幾秒鐘,確認它死亡了才會觸發(fā)重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會觸發(fā)一次重平衡,盡量降低處理停頓。
重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。
重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關(guān)于 Serial 收集器的描述):
更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結(jié)束。
Stop The World這個名字聽起來很帥,但這項工作實際上是由虛擬機在后臺自動發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。
也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......
4.2 分區(qū)重平衡流程
下面我們真正開始介紹 Rebalance 的過程。重平衡過程可以從兩個方面去看:消費者端和協(xié)調(diào)者端,首先我們先看一下消費者端
從消費者看重平衡
從消費者看重平衡有兩個步驟:分別是 消費者加入組 和 等待領(lǐng)導者分配方案。這兩個步驟后分別對應的請求是 JoinGroup 和 SyncGroup。
新的消費者加入群組時,這個消費者會向協(xié)調(diào)器發(fā)送 JoinGroup 請求。在該請求中,每個消費者成員都需要將自己消費的 topic 進行提交,我們上面描述群組協(xié)調(diào)器中說過,這么做的目的就是為了讓協(xié)調(diào)器收集足夠的元數(shù)據(jù)信息,來選取消費者組的領(lǐng)導者。通常情況下,第一個發(fā)送 JoinGroup 請求的消費者會自動稱為領(lǐng)導者。領(lǐng)導者的任務是收集所有成員的訂閱信息,然后根據(jù)這些信息,制定具體的分區(qū)消費分配方案。如圖

在所有的消費者都加入進來并把元數(shù)據(jù)信息提交給領(lǐng)導者后,領(lǐng)導者做出分配方案并發(fā)送 SyncGroup請求給協(xié)調(diào)者,協(xié)調(diào)者負責下發(fā)群組中的消費策略。下圖描述了 SyncGroup 請求的過程

當所有成員都成功接收到分配方案后,消費者組進入到 Stable 狀態(tài),即開始正常的消費工作。
從協(xié)調(diào)者來看重平衡
從協(xié)調(diào)者角度來看重平衡主要有下面這幾種觸發(fā)條件,
新成員加入組
組成員主動離開
組成員崩潰離開
組成員提交位移
我們分別來描述一下,先從新成員加入組開始
新成員加入組
我們討論的場景消費者集群狀態(tài)處于Stable 等待分配的過程,這時候如果有新的成員加入組的話,重平衡的過程

從這個角度來看,協(xié)調(diào)者的過程和消費者類似,只是剛剛從消費者的角度去看,現(xiàn)在從領(lǐng)導者的角度去看
組成員離開
組成員離開消費者群組指的是消費者實例調(diào)用 close() 方法主動通知協(xié)調(diào)者它要退出。這里又會有一個新的請求出現(xiàn) LeaveGroup()請求 。如下圖所示

組成員崩潰
組成員崩潰是指消費者實例出現(xiàn)嚴重故障,宕機或者一段時間未響應,協(xié)調(diào)者接收不到消費者的心跳,就會被認為是組成員崩潰,崩潰離組是被動的,協(xié)調(diào)者通常需要等待一段時間才能感知到,這段時間一般是由消費者端參數(shù) session.timeout.ms 控制的。如下圖所示

五、Kafka 高可用
5.1 副本機制
復制功能是 Kafka 架構(gòu)的核心功能,在 Kafka 文檔里面 Kafka 把自己描述為 一個分布式的、可分區(qū)的、可復制的提交日志服務。復制之所以這么關(guān)鍵,是因為消息的持久存儲非常重要,這能夠保證在主節(jié)點宕機后依舊能夠保證 Kafka 高可用。副本機制也可以稱為備份機制(Replication),通常指分布式系統(tǒng)在多臺網(wǎng)絡交互的機器上保存有相同的數(shù)據(jù)備份/拷貝。
Kafka 使用主題來組織數(shù)據(jù),每個主題又被分為若干個分區(qū),分區(qū)會部署在一到多個 broker 上,每個分區(qū)都會有多個副本,所以副本也會被保存在 broker 上,每個 broker 可能會保存成千上萬個副本。下圖是一個副本復制示意圖

如上圖所示,為了簡單我只畫出了兩個 broker ,每個 broker 指保存了一個 Topic 的消息,在 broker1 中分區(qū) 0 是 Leader,它負責進行分區(qū)的復制工作,把 broker1 中的分區(qū) 0 復制一個副本到 broker2 的主題 A 的分區(qū) 0。同理,主題 A 的分區(qū) 1 也是一樣的道理。
副本類型分為兩種:一種是 Leader(領(lǐng)導者) 副本,一種是Follower(跟隨者)副本。
Leader 副本
Kafka 在創(chuàng)建分區(qū)的時候都要選舉一個副本,這個選舉出來的副本就是 Leader 領(lǐng)導者副本。
Follower 副本
除了 Leader 副本以外的副本統(tǒng)稱為 Follower 副本,F(xiàn)ollower 不對外提供服務。下面是 Leader 副本的工作方式

這幅圖需要注意以下幾點
Kafka 中,F(xiàn)ollower 副本也就是追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產(chǎn)者的請求。所有的請求都是由領(lǐng)導者副本來處理?;蛘哒f,所有的請求都必須發(fā)送到 Leader 副本所在的 broker 中,F(xiàn)ollower 副本只是用做數(shù)據(jù)拉取,采用
異步拉取的方式,并寫入到自己的提交日志中,從而實現(xiàn)與 Leader 的同步當 Leader 副本所在的 broker 宕機后,Kafka 依托于 ZooKeeper 提供的監(jiān)控功能能夠?qū)崟r感知到,并開啟新一輪的選舉,從追隨者副本中選一個作為 Leader。如果宕機的 broker 重啟完成后,該分區(qū)的副本會作為 Follower 重新加入。
首領(lǐng)的另一個任務是搞清楚哪個跟隨者的狀態(tài)與自己是一致的。跟隨者為了保證與領(lǐng)導者的狀態(tài)一致,在有新消息到達之前先嘗試從領(lǐng)導者那里復制消息。為了與領(lǐng)導者保持一致,跟隨者向領(lǐng)導者發(fā)起獲取數(shù)據(jù)的請求,這種請求與消費者為了讀取消息而發(fā)送的信息是一樣的。
跟隨者向領(lǐng)導者發(fā)送消息的過程是這樣的,先請求消息 1,然后再接收到消息 1,在時候到請求 1 之后,發(fā)送請求 2,在收到領(lǐng)導者給發(fā)送給跟隨者之前,跟隨者是不會繼續(xù)發(fā)送消息的。這個過程如下

跟隨者副本在收到響應消息前,是不會繼續(xù)發(fā)送消息,這一點很重要。通過查看每個跟隨者請求的最新偏移量,首領(lǐng)就會知道每個跟隨者復制的進度。如果跟隨者在 10s 內(nèi)沒有請求任何消息,或者雖然跟隨者已經(jīng)發(fā)送請求,但是在 10s 內(nèi)沒有收到消息,就會被認為是不同步的。如果一個副本沒有與領(lǐng)導者同步,那么在領(lǐng)導者掉線后,這個副本將不會稱為領(lǐng)導者,因為這個副本的消息不是全部的。
與之相反的,如果跟隨者同步的消息和領(lǐng)導者副本的消息一致,那么這個跟隨者副本又被稱為同步的副本。也就是說,如果領(lǐng)導者掉線,那么只有同步的副本能夠稱為領(lǐng)導者。
關(guān)于副本機制我們說了這么多,那么副本機制的好處是什么呢?
能夠立刻看到寫入的消息,就是你使用生產(chǎn)者 API 成功向分區(qū)寫入消息后,馬上使用消費者就能讀取剛才寫入的消息
能夠?qū)崿F(xiàn)消息的冪等性,啥意思呢?就是對于生產(chǎn)者產(chǎn)生的消息,在消費者進行消費的時候,它每次都會看到消息存在,并不會存在消息不存在的情況
同步復制和異步復制
我在學習副本機制的時候,有個疑問,既然領(lǐng)導者副本和跟隨者副本是發(fā)送 - 等待機制的,這是一種同步的復制方式,那么為什么說跟隨者副本同步領(lǐng)導者副本的時候是一種異步操作呢?
我認為是這樣的,跟隨者副本在同步領(lǐng)導者副本后會把消息保存在本地 log 中,這個時候跟隨者會給領(lǐng)導者副本一個響應消息,告訴領(lǐng)導者自己已經(jīng)保存成功了,同步復制的領(lǐng)導者會等待所有的跟隨者副本都寫入成功后,再返回給 producer 寫入成功的消息。而異步復制是領(lǐng)導者副本不需要關(guān)心跟隨者副本是否寫入成功,只要領(lǐng)導者副本自己把消息保存到本地 log ,就會返回給 producer 寫入成功的消息。下面是同步復制和異步復制的過程
同步復制
producer 通知 ZooKeeper 識別領(lǐng)導者
producer 向領(lǐng)導者寫入消息
領(lǐng)導者收到消息后會把消息寫入到本地 log
跟隨者會從領(lǐng)導者那里拉取消息
跟隨者向本地寫入 log
跟隨者向領(lǐng)導者發(fā)送寫入成功的消息
領(lǐng)導者會收到所有的跟隨者發(fā)送的消息
領(lǐng)導者向 producer 發(fā)送寫入成功的消息
異步復制
和同步復制的區(qū)別在于,領(lǐng)導者在寫入本地 log 之后,直接向客戶端發(fā)送寫入成功消息,不需要等待所有跟隨者復制完成。
ISR
Kafka 動態(tài)維護了一個同步狀態(tài)的副本的集合(a set of In-Sync Replicas),簡稱ISR,ISR 也是一個很重要的概念,我們之前說過,追隨者副本不提供服務,只是定期的異步拉取領(lǐng)導者副本的數(shù)據(jù)而已,拉取這個操作就相當于是復制,ctrl-c + ctrl-v大家肯定用的熟。那么是不是說 ISR 集合中的副本消息的數(shù)量都會與領(lǐng)導者副本消息數(shù)量一樣呢?那也不一定,判斷的依據(jù)是 broker 中參數(shù) replica.lag.time.max.ms 的值,這個參數(shù)的含義就是跟隨者副本能夠落后領(lǐng)導者副本最長的時間間隔。
replica.lag.time.max.ms 參數(shù)默認的時間是 10 秒,如果跟隨者副本落后領(lǐng)導者副本的時間不超過 10 秒,那么 Kafka 就認為領(lǐng)導者和跟隨者是同步的。即使此時跟隨者副本中存儲的消息要小于領(lǐng)導者副本。如果跟隨者副本要落后于領(lǐng)導者副本 10 秒以上的話,跟隨者副本就會從 ISR 被剔除。倘若該副本后面慢慢地追上了領(lǐng)導者的進度,那么它是能夠重新被加回 ISR 的。這也表明,ISR 是一個動態(tài)調(diào)整的集合,而非靜態(tài)不變的。
Unclean 副本領(lǐng)導者選舉
既然 ISR 是可以動態(tài)調(diào)整的,那么必然會出現(xiàn) ISR 集合中為空的情況,由于領(lǐng)導者副本是一定出現(xiàn)在 ISR 集合中的,那么 ISR 集合為空必然說明領(lǐng)導者副本也掛了,所以此時 Kafka 需要重新選舉一個新的領(lǐng)導者,那么該如何選舉呢?現(xiàn)在你需要轉(zhuǎn)變一下思路,我們上面說 ISR 集合中一定是與領(lǐng)導者同步的副本,那么不再 ISR 集合中的副本一定是不與領(lǐng)導者同步的副本了,也就是不再 ISR 列表中的跟隨者副本會丟失一些消息。如果你開啟 broker 端參數(shù) unclean.leader.election.enable的話,下一個領(lǐng)導者就會在這些非同步的副本中選舉。這種選舉也叫做Unclean 領(lǐng)導者選舉。
如果你接觸過分布式項目的話你一定知道 CAP 理論,那么這種 Unclean 領(lǐng)導者選舉其實是犧牲了數(shù)據(jù)一致性,保證了 Kafka 的高可用性。
你可以根據(jù)你的實際業(yè)務場景決定是否開啟 Unclean 領(lǐng)導者選舉,一般不建議開啟這個參數(shù),因為數(shù)據(jù)的一致性要比可用性重要的多。
5.2 控制器機制
broker 之間也有一個控制器組件(Controller),它是 Kafka 的核心組件。它的主要作用是在 ZooKeeper 的幫助下管理和協(xié)調(diào)整個 Kafka 集群,集群中的每個 broker 都可以稱為 controller。
控制器的選舉
Kafka 當前選舉控制器的規(guī)則是:Kafka 集群中第一個啟動的 broker 通過在 ZooKeeper 里創(chuàng)建一個臨時節(jié)點 /controller 讓自己成為 controller 控制器。其他 broker 在啟動時也會嘗試創(chuàng)建這個節(jié)點,但是由于這個節(jié)點已存在,所以后面想要創(chuàng)建 /controller 節(jié)點時就會收到一個 節(jié)點已存在 的異常。然后其他 broker 會在這個控制器上注冊一個 ZooKeeper 的 watch 對象,/controller節(jié)點發(fā)生變化時,其他 broker 就會收到節(jié)點變更通知。這種方式可以確保只有一個控制器存在。那么只有單獨的節(jié)點一定是有個問題的,那就是單點問題。

如果控制器關(guān)閉或者與 ZooKeeper 斷開鏈接,ZooKeeper 上的臨時節(jié)點就會消失。集群中的其他節(jié)點收到 watch 對象發(fā)送控制器下線的消息后,其他 broker 節(jié)點都會嘗試讓自己去成為新的控制器。其他節(jié)點的創(chuàng)建規(guī)則和第一個節(jié)點的創(chuàng)建原則一致,都是第一個在 ZooKeeper 里成功創(chuàng)建控制器節(jié)點的 broker 會成為新的控制器,那么其他節(jié)點就會收到節(jié)點已存在的異常,然后在新的控制器節(jié)點上再次創(chuàng)建 watch 對象進行監(jiān)聽。

broker controller 故障轉(zhuǎn)移

broker controller 故障轉(zhuǎn)移主要依賴于zookeeper。一開始,broker1 會搶先注冊成功成為 controller,然后由于網(wǎng)絡抖動或者其他原因致使 broker1 掉線,ZooKeeper 通過 Watch 機制覺察到 broker1 的掉線,之后所有存活的 brokers 開始競爭成為 controller,這時 broker3 搶先注冊成功,此時 ZooKeeper 存儲的 controller 信息由 broker1 -> broker3,之后,broker3 會從 ZooKeeper 中讀取元數(shù)據(jù)信息,并初始化到自己的緩存中。
六、Kafka 為什么這么快
6.1 利用 Partition 實現(xiàn)并行處理
我們都知道 Kafka 是一個 Pub-Sub 的消息系統(tǒng),無論是發(fā)布還是訂閱,都要指定 Topic。
Topic 只是一個邏輯的概念。每個 Topic 都包含一個或多個 Partition,不同 Partition 可位于不同節(jié)點。
一方面,由于不同 Partition 可位于不同機器,因此可以充分利用集群優(yōu)勢,實現(xiàn)機器間的并行處理。另一方面,由于 Partition 在物理上對應一個文件夾,即使多個 Partition 位于同一個節(jié)點,也可通過配置讓同一節(jié)點上的不同 Partition 置于不同的磁盤上,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢。
能并行處理,速度肯定會有提升,多個工人肯定比一個工人干的快。
6.2 順序?qū)懘疟P
Kafka 中每個分區(qū)是一個有序的,不可變的消息序列,新的消息不斷追加到 partition 的末尾,這個就是順序?qū)憽?/p>
由于磁盤有限,不可能保存所有數(shù)據(jù),實際上作為消息系統(tǒng) Kafka 也沒必要保存所有數(shù)據(jù),需要刪除舊的數(shù)據(jù)。又由于順序?qū)懭氲脑?,所?Kafka 采用各種刪除策略刪除數(shù)據(jù)的時候,并非通過使用“讀 - 寫”模式去修改文件,而是將 Partition 分為多個 Segment,每個 Segment 對應一個物理文件,通過刪除整個文件的方式去刪除 Partition 內(nèi)的數(shù)據(jù)。這種方式清除舊數(shù)據(jù)的方式,也避免了對文件的隨機寫操作。
6.3 充分利用 Page Cache
引入 Cache 層的目的是為了提高 Linux 操作系統(tǒng)對磁盤訪問的性能。Cache 層在內(nèi)存中緩存了磁盤上的部分數(shù)據(jù)。當數(shù)據(jù)的請求到達時,如果在 Cache 中存在該數(shù)據(jù)且是最新的,則直接將數(shù)據(jù)傳遞給用戶程序,免除了對底層磁盤的操作,提高了性能。Cache 層也正是磁盤 IOPS 為什么能突破 200 的主要原因之一。
在 Linux 的實現(xiàn)中,文件 Cache 分為兩個層面,一是 Page Cache,另一個 Buffer Cache,每一個 Page Cache 包含若干 Buffer Cache。Page Cache 主要用來作為文件系統(tǒng)上的文件數(shù)據(jù)的緩存來用,尤其是針對當進程對文件有 read/write 操作的時候。Buffer Cache 則主要是設計用來在系統(tǒng)對塊設備進行讀寫的時候,對塊進行數(shù)據(jù)緩存的系統(tǒng)來使用。
使用 Page Cache 的好處:
- I/O Scheduler 會將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能
- I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
- 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存)。如果使用應用層 Cache(即 JVM 堆內(nèi)存),會增加 GC 負擔
- 讀操作可直接在 Page Cache 內(nèi)進行。如果消費和生產(chǎn)速度相當,甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)
- 如果進程重啟,JVM 內(nèi)的 Cache 會失效,但 Page Cache 仍然可用
Broker 收到數(shù)據(jù)后,寫磁盤時只是將數(shù)據(jù)寫入 Page Cache,并不保證數(shù)據(jù)一定完全寫入磁盤。從這一點看,可能會造成機器宕機時,Page Cache 內(nèi)的數(shù)據(jù)未寫入磁盤從而造成數(shù)據(jù)丟失。但是這種丟失只發(fā)生在機器斷電等造成操作系統(tǒng)不工作的場景,而這種場景完全可以由 Kafka 層面的 Replication 機制去解決。如果為了保證這種情況下數(shù)據(jù)不丟失而強制將 Page Cache 中的數(shù)據(jù) Flush 到磁盤,反而會降低性能。也正因如此,Kafka 雖然提供了 flush.messages 和 flush.ms 兩個參數(shù)將 Page Cache 中的數(shù)據(jù)強制 Flush 到磁盤,但是 Kafka 并不建議使用。
6.4 零拷貝技術(shù)
Kafka 中存在大量的網(wǎng)絡數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)的過程。這一過程的性能直接影響 Kafka 的整體吞吐量。
操作系統(tǒng)的核心是內(nèi)核,獨立于普通的應用程序,可以訪問受保護的內(nèi)存空間,也有訪問底層硬件設備的權(quán)限。
為了避免用戶進程直接操作內(nèi)核,保證內(nèi)核安全,操作系統(tǒng)將虛擬內(nèi)存劃分為兩部分,一部分是內(nèi)核空間(Kernel-space),一部分是用戶空間(User-space)。
傳統(tǒng)的 Linux 系統(tǒng)中,標準的 I/O 接口(例如 read,write)都是基于數(shù)據(jù)拷貝操作的,即 I/O 操作會導致數(shù)據(jù)在內(nèi)核地址空間的緩沖區(qū)和用戶地址空間的緩沖區(qū)之間進行拷貝,所以標準 I/O 也被稱作緩存 I/O。這樣做的好處是,如果所請求的數(shù)據(jù)已經(jīng)存放在內(nèi)核的高速緩沖存儲器中,那么就可以減少實際的 I/O 操作,但壞處就是數(shù)據(jù)拷貝的過程,會導致 CPU 開銷。
我們把 Kafka 的生產(chǎn)和消費簡化成如下兩個過程來看:
- 網(wǎng)絡數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)
- 磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)
6.4.1 網(wǎng)絡數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)
傳統(tǒng)模式下,數(shù)據(jù)從網(wǎng)絡傳輸?shù)轿募枰?4 次數(shù)據(jù)拷貝、4 次上下文切換和兩次系統(tǒng)調(diào)用。
data = socket.read()// 讀取網(wǎng)絡數(shù)據(jù)
File file = new File()
file.write(data)// 持久化到磁盤
file.flush()
這一過程實際上發(fā)生了四次數(shù)據(jù)拷貝:
首先通過 DMA copy 將網(wǎng)絡數(shù)據(jù)拷貝到內(nèi)核態(tài) Socket Buffer
然后應用程序?qū)?nèi)核態(tài) Buffer 數(shù)據(jù)讀入用戶態(tài)(CPU copy)
接著用戶程序?qū)⒂脩魬B(tài) Buffer 再拷貝到內(nèi)核態(tài)(CPU copy)
最后通過 DMA copy 將數(shù)據(jù)拷貝到磁盤文件
DMA(Direct Memory Access):直接存儲器訪問。DMA 是一種無需 CPU 的參與,讓外設和系統(tǒng)內(nèi)存之間進行雙向數(shù)據(jù)傳輸?shù)挠布C制。使用 DMA 可以使系統(tǒng) CPU 從實際的 I/O 數(shù)據(jù)傳輸過程中擺脫出來,從而大大提高系統(tǒng)的吞吐率。
同時,還伴隨著四次上下文切換,如下圖所示

數(shù)據(jù)落盤通常都是非實時的,kafka 生產(chǎn)者數(shù)據(jù)持久化也是如此。Kafka 的數(shù)據(jù)并不是實時的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高 I/O 效率,就是上一節(jié)提到的 Page Cache。
對于 kafka 來說,Producer 生產(chǎn)的數(shù)據(jù)存到 broker,這個過程讀取到 socket buffer 的網(wǎng)絡數(shù)據(jù),其實可以直接在內(nèi)核空間完成落盤。并沒有必要將 socket buffer 的網(wǎng)絡數(shù)據(jù),讀取到應用進程緩沖區(qū);在這里應用進程緩沖區(qū)其實就是 broker,broker 收到生產(chǎn)者的數(shù)據(jù),就是為了持久化。
在此特殊場景下:接收來自 socket buffer 的網(wǎng)絡數(shù)據(jù),應用進程不需要中間處理、直接進行持久化時??梢允褂?mmap 內(nèi)存文件映射。
Memory Mapped Files:簡稱 mmap,也有叫 MMFile 的,使用 mmap 的目的是將內(nèi)核中讀緩沖區(qū)(read buffer)的地址與用戶空間的緩沖區(qū)(user buffer)進行映射。從而實現(xiàn)內(nèi)核緩沖區(qū)與應用程序內(nèi)存的共享,省去了將數(shù)據(jù)從內(nèi)核讀緩沖區(qū)(read buffer)拷貝到用戶緩沖區(qū)(user buffer)的過程。它的工作原理是直接利用操作系統(tǒng)的 Page 來實現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上。
使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內(nèi)核空間復制的開銷。
mmap 也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會在程序主動調(diào)用 flush 的時候才把數(shù)據(jù)真正的寫到硬盤。Kafka 提供了一個參數(shù)——producer.type 來控制是不是主動 flush;如果 Kafka 寫入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync);寫入 mmap 之后立即返回 Producer 不調(diào)用 flush 就叫異步(async),默認是 sync。

零拷貝(Zero-copy)技術(shù)指在計算機執(zhí)行操作時,CPU 不需要先將數(shù)據(jù)從一個內(nèi)存區(qū)域復制到另一個內(nèi)存區(qū)域,從而可以減少上下文切換以及 CPU 的拷貝時間。
它的作用是在數(shù)據(jù)報從網(wǎng)絡設備到用戶程序空間傳遞的過程中,減少數(shù)據(jù)拷貝次數(shù),減少系統(tǒng)調(diào)用,實現(xiàn) CPU 的零參與,徹底消除 CPU 在這方面的負載。
目前零拷貝技術(shù)主要有三種類型[3]:
- 直接 I/O:數(shù)據(jù)直接跨過內(nèi)核,在用戶地址空間與 I/O 設備之間傳遞,內(nèi)核只是進行必要的虛擬存儲配置等輔助工作;
- 避免內(nèi)核和用戶空間之間的數(shù)據(jù)拷貝:當應用程序不需要對數(shù)據(jù)進行訪問時,則可以避免將數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間
a. mmap
b. sendfile
c. splice && tee
d. sockmap- copy on write:寫時拷貝技術(shù),數(shù)據(jù)不需要提前拷貝,而是當需要修改的時候再進行部分拷貝。
6.4.2 磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)
傳統(tǒng)方式實現(xiàn):先讀取磁盤、再用 socket 發(fā)送,實際也是進過四次 copy
buffer = File.read
Socket.send(buffer)
這一過程可以類比上邊的生產(chǎn)消息:
- 首先通過系統(tǒng)調(diào)用將文件數(shù)據(jù)讀入到內(nèi)核態(tài) Buffer(DMA 拷貝)
- 然后應用程序?qū)?nèi)存態(tài) Buffer 數(shù)據(jù)讀入到用戶態(tài) Buffer(CPU 拷貝)
- 接著用戶程序通過 Socket 發(fā)送數(shù)據(jù)時將用戶態(tài) Buffer 數(shù)據(jù)拷貝到內(nèi)核態(tài) Buffer(CPU 拷貝)
- 最后通過 DMA 拷貝將數(shù)據(jù)拷貝到 NIC Buffer
Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用,提供了零拷貝。數(shù)據(jù)通過 DMA 拷貝到內(nèi)核態(tài) Buffer 后,直接通過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝。這也是零拷貝這一說法的來源。除了減少數(shù)據(jù)拷貝外,因為整個讀文件 - 網(wǎng)絡發(fā)送由一個 sendfile 調(diào)用完成,整個過程只有兩次上下文切換,因此大大提高了性能。

Kafka 在這里采用的方案是通過 NIO 的 transferTo/transferFrom 調(diào)用操作系統(tǒng)的 sendfile 實現(xiàn)零拷貝??偣舶l(fā)生 2 次內(nèi)核數(shù)據(jù)拷貝、2 次上下文切換和一次系統(tǒng)調(diào)用,消除了 CPU 數(shù)據(jù)拷貝。
6.5 數(shù)據(jù)壓縮
Producer 可將數(shù)據(jù)壓縮后發(fā)送給 broker,從而減少網(wǎng)絡傳輸代價,目前支持的壓縮算法有:Snappy、Gzip、LZ4。數(shù)據(jù)壓縮一般都是和批處理配套使用來作為優(yōu)化手段的。