kafka 一篇文章就夠了

一、Kafka 簡介

Apache Kafka 是一個分布式發(fā)布-訂閱消息系統(tǒng)。是大數(shù)據(jù)領(lǐng)域消息隊列中唯一的王者。最初由 linkedin 公司使用 scala 語言開發(fā),在2010年貢獻給了Apache基金會并成為頂級開源項目。至今已有十余年,仍然是大數(shù)據(jù)領(lǐng)域不可或缺的并且是越來越重要的一個組件。

Kafka 適合離線和在線消息,消息保留在磁盤上,并在集群內(nèi)復制以防止數(shù)據(jù)丟失。kafka構(gòu)建在zookeeper同步服務之上。它與 Flink 和 Spark 有非常好的集成,應用于實時流式數(shù)據(jù)分析。

Kafka特點:

  1. 可靠性:具有副本及容錯機制。
  2. 可擴展性:kafka無需停機即可擴展節(jié)點及節(jié)點上線。
  3. 持久性:數(shù)據(jù)存儲到磁盤上,持久性保存。
  4. 性能:kafka具有高吞吐量。達到TB級的數(shù)據(jù),也有非常穩(wěn)定的性能。
  5. 速度快:順序?qū)懭牒土憧截惣夹g(shù)使得kafka延遲控制在毫秒級。

二、Kafka 主要組件

先看下 Kafka 系統(tǒng)的架構(gòu)

kafka架構(gòu)

kafka支持消息持久化,消費端是主動拉取數(shù)據(jù),消費狀態(tài)和訂閱關(guān)系由客戶端負責維護,消息消費完后,不會立即刪除,會保留歷史消息。因此支持多訂閱時,消息只會存儲一份就可以。

  1. broker:kafka集群中包含一個或者多個服務實例(節(jié)點),這種服務實例被稱為broker(一個broker就是一個節(jié)點/一個服務器);
  2. topic:每條發(fā)布到kafka集群的消息都屬于某個類別,這個類別就叫做topic;
  3. partition:partition是一個物理上的概念,每個topic包含一個或者多個partition;
  4. segment:一個partition當中存在多個segment文件段,每個segment分為兩部分,.log文件和 .index 文件,其中 .index 文件是索引文件,主要用于快速查詢, .log 文件當中數(shù)據(jù)的偏移量位置;
  5. producer:消息的生產(chǎn)者,負責發(fā)布消息到 kafka 的 broker 中;
  6. consumer:消息的消費者,向 kafka 的 broker 中讀取消息的客戶端;
  7. consumer group:消費者組,每一個 consumer 屬于一個特定的 consumer group(可以為每個consumer指定 groupName);
  8. .log:存放數(shù)據(jù)文件;
  9. .index:存放.log文件的索引數(shù)據(jù)。

2.1 producer(生產(chǎn)者)

producer主要是用于生產(chǎn)消息,是kafka當中的消息生產(chǎn)者,生產(chǎn)的消息通過topic進行歸類,保存到kafka的broker里面去。


2.2 topic(主題)

  1. kafka將消息以topic為單位進行歸類;
  2. topic特指kafka處理的消息源(feeds of messages)的不同分類;
  3. topic是一種分類或者發(fā)布的一些列記錄的名義上的名字。kafka主題始終是支持多用戶訂閱的;也就是說,一 個主題可以有零個,一個或者多個消費者訂閱寫入的數(shù)據(jù);
  4. 在kafka集群中,可以有無數(shù)的主題;
  5. 生產(chǎn)者和消費者消費數(shù)據(jù)一般以主題為單位。更細粒度可以到分區(qū)級別。

2.3 partition(分區(qū))

kafka當中,topic是消息的歸類,一個topic可以有多個分區(qū)(partition),每個分區(qū)保存部分topic的數(shù)據(jù),所有的partition當中的數(shù)據(jù)全部合并起來,就是一個topic當中的所有的數(shù)據(jù)。

一個broker服務下,可以創(chuàng)建多個分區(qū),broker數(shù)與分區(qū)數(shù)沒有關(guān)系; 在kafka中,每一個分區(qū)會有一個編號:編號從0開始。 每一個分區(qū)內(nèi)的數(shù)據(jù)是有序的,但全局的數(shù)據(jù)不能保證是有序的。(有序是指生產(chǎn)什么樣順序,消費時也是什么樣的順序)

2.4 consumer(消費者)

consumer是kafka當中的消費者,主要用于消費kafka當中的數(shù)據(jù),消費者一定是歸屬于某個消費組中的。

2.5 consumer group(消費者組)

消費者組由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次

每個消費者都屬于某個消費者組,如果不指定,那么所有的消費者都屬于默認的組。

每個消費者組都有一個ID,即group ID。組內(nèi)的所有消費者協(xié)調(diào)在一起來消費一個訂閱主題( topic)的所有分區(qū)(partition)。當然,每個分區(qū)只能由同一個消費組內(nèi)的一個消費者(consumer)來消費,可以由不同的消費組來消費。
partition數(shù)量決定了每個consumer group中并發(fā)消費者的最大數(shù)量。如下圖:

如上面左圖所示,如果只有兩個分區(qū),即使一個組內(nèi)的消費者有4個,也會有兩個空閑的。 如上面右圖所示,有4個分區(qū),每個消費者消費一個分區(qū),并發(fā)量達到最大4。

在來看如下一幅圖:

如上圖所示,不同的消費者組消費同一個topic,這個topic有4個分區(qū),分布在兩個節(jié)點上。左邊的 消費組1有兩個消費者,每個消費者就要消費兩個分區(qū)才能把消息完整的消費完,右邊的 消費組2有四個消費者,每個消費者消費一個分區(qū)即可。

總結(jié)下kafka中分區(qū)與消費組的關(guān)系

消費組: 由一個或者多個消費者組成,同一個組中的消費者對于同一條消息只消費一次。 某一個主題下的分區(qū)數(shù),對于消費該主題的同一個消費組下的消費者數(shù)量,應該小于等于該主題下的分區(qū)數(shù)。

如:某一個主題有4個分區(qū),那么消費組中的消費者應該小于等于4,而且最好與分區(qū)數(shù)成整數(shù)倍 1 2 4 這樣。同一個分區(qū)下的數(shù)據(jù),在同一時刻,不能同一個消費組的不同消費者消費

總結(jié):分區(qū)數(shù)越多,同一時間可以有越多的消費者來進行消費,消費數(shù)據(jù)的速度就會越快,提高消費的性能

2.6 partition replicas(分區(qū)副本)

kafka 中的分區(qū)副本如下圖所示:

副本數(shù)(replication-factor):控制消息保存在幾個broker(服務器)上,一般情況下副本數(shù)等于broker的個數(shù)。

一個broker服務下,不可以創(chuàng)建多個副本因子。創(chuàng)建主題時,副本因子應該小于等于可用的broker數(shù)。

副本因子操作以分區(qū)為單位的。每個分區(qū)都有各自的主副本和從副本;

主副本叫做leader,從副本叫做 follower(在有多個副本的情況下,kafka會為同一個分區(qū)下的所有分區(qū),設定角色關(guān)系:一個leader和N個 follower),處于同步狀態(tài)的副本叫做in-sync-replicas(ISR);

follower通過拉的方式從leader同步數(shù)據(jù)。 消費者和生產(chǎn)者都是從leader讀寫數(shù)據(jù),不與follower交互。

副本因子的作用:讓kafka讀取數(shù)據(jù)和寫入數(shù)據(jù)時的可靠性。

副本因子是包含本身,同一個副本因子不能放在同一個broker中。

如果某一個分區(qū)有三個副本因子,就算其中一個掛掉,那么只會剩下的兩個中,選擇一個leader,但不會在其他的broker中,另啟動一個副本(因為在另一臺啟動的話,存在數(shù)據(jù)傳遞,只要在機器之間有數(shù)據(jù)傳遞,就會長時間占用網(wǎng)絡IO,kafka是一個高吞吐量的消息系統(tǒng),這個情況不允許發(fā)生)所以不會在另一個broker中啟動。

如果所有的副本都掛了,生產(chǎn)者如果生產(chǎn)數(shù)據(jù)到指定分區(qū)的話,將寫入不成功。

lsr表示:當前可用的副本。

2.7 segment文件

一個partition當中由多個segment文件組成,每個segment文件,包含兩部分,一個是 .log 文件,另外一個是 .index 文件,其中 .log 文件包含了我們發(fā)送的數(shù)據(jù)存儲,.index 文件,記錄的是我們.log文件的數(shù)據(jù)索引值,以便于我們加快數(shù)據(jù)的查詢速度。

索引文件與數(shù)據(jù)文件的關(guān)系

既然它們是一一對應成對出現(xiàn),必然有關(guān)系。索引文件中元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址。

比如索引文件中 3,497 代表:數(shù)據(jù)文件中的第三個message,它的偏移地址為497。

再來看數(shù)據(jù)文件中,Message 368772表示:在全局partiton中是第368772個message。

注:segment index file 采取稀疏索引存儲方式,減少索引文件大小,通過mmap(內(nèi)存映射)可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個對應message設置一個元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲空間,但查找起來需要消耗更多的時間。

.index 與 .log 對應關(guān)系如下:

上圖左半部分是索引文件,里面存儲的是一對一對的key-value,其中key是消息在數(shù)據(jù)文件(對應的log文件)中的編號,比如“1,3,6,8……”, 分別表示在log文件中的第1條消息、第3條消息、第6條消息、第8條消息……

那么為什么在index文件中這些編號不是連續(xù)的呢? 這是因為index文件中并沒有為數(shù)據(jù)文件中的每條消息都建立索引,而是采用了稀疏存儲的方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引。 這樣避免了索引文件占用過多的空間,從而可以將索引文件保留在內(nèi)存中。 但缺點是沒有建立索引的Message也不能一次定位到其在數(shù)據(jù)文件的位置,從而需要做一次順序掃描,但是這次順序掃描的范圍就很小了。

value 代表的是在全局partiton中的第幾個消息。

以索引文件中元數(shù)據(jù) 3,497 為例,其中3代表在右邊log數(shù)據(jù)文件中從上到下第3個消息, 497表示該消息的物理偏移地址(位置)為497(也表示在全局partiton表示第497個消息-順序?qū)懭胩匦?。

log日志目錄及組成
kafka在我們指定的log.dir目錄下,會創(chuàng)建一些文件夾;名字是 (主題名字-分區(qū)名) 所組成的文件夾。 在(主題名字-分區(qū)名)的目錄下,會有兩個文件存在,如下所示:

#索引文件
00000000000000000000.index
#日志內(nèi)容
00000000000000000000.log

在目錄下的文件,會根據(jù)log日志的大小進行切分,.log文件的大小為1G的時候,就會進行切分文件;如下:

-rw-r--r--. 1 root root 389k  1月  17  18:03   00000000000000000000.index
-rw-r--r--. 1 root root 1.0G  1月  17  18:03   00000000000000000000.log
-rw-r--r--. 1 root root  10M  1月  17  18:03   00000000000000077894.index
-rw-r--r--. 1 root root 127M  1月  17  18:03   00000000000000077894.log

在kafka的設計中,將offset值作為了文件名的一部分。

segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個全局 partion的最大offset(偏移message數(shù))。數(shù)值最大為64位long大小,20位數(shù)字字符長度,沒有數(shù)字就用 0 填充。

通過索引信息可以快速定位到message。通過index元數(shù)據(jù)全部映射到內(nèi)存,可以避免segment File的IO磁盤操作;

通過索引文件稀疏存儲,可以大幅降低index文件元數(shù)據(jù)占用空間大小。

稀疏索引:為了數(shù)據(jù)創(chuàng)建索引,但范圍并不是為每一條創(chuàng)建,而是為某一個區(qū)間創(chuàng)建; 好處:就是可以減少索引值的數(shù)量。 不好的地方:找到索引區(qū)間之后,要得進行第二次處理。

2.8 message的物理結(jié)構(gòu)

生產(chǎn)者發(fā)送到kafka的每條消息,都被kafka包裝成了一個message

message 的物理結(jié)構(gòu)如下圖所示:

所以生產(chǎn)者發(fā)送給kafka的消息并不是直接存儲起來,而是經(jīng)過kafka的包裝,每條消息都是上圖這個結(jié)構(gòu),只有最后一個字段才是真正生產(chǎn)者發(fā)送的消息數(shù)據(jù)。

三、Kafka 生產(chǎn)者

3.1 消息發(fā)送方式

生產(chǎn)者發(fā)送給kafka數(shù)據(jù),可以采用同步方式或異步方式。

3.1.1 同步方式

發(fā)送一批數(shù)據(jù)給kafka后,等待kafka返回結(jié)果:

  • 生產(chǎn)者等待10s,如果broker沒有給出ack響應,就認為失敗。
  • 生產(chǎn)者重試3次,如果還沒有響應,就報錯
    這類錯誤可以通過重發(fā)消息來解決。比如連接的錯誤,可以通過再次建立連接來解決;無主錯誤則可以通過重新為分區(qū)選舉首領(lǐng)來解決。

3.1.2 異步方式

同步發(fā)送消息都有個問題,那就是同一時間只能有一個消息在發(fā)送,這會造成許多消息無法直接發(fā)送,造成消息滯后,無法發(fā)揮效益最大化。

比如消息在應用程序和 Kafka 集群之間一個來回需要 10ms。如果發(fā)送完每個消息后都等待響應的話,那么發(fā)送100個消息需要 1 秒,但是如果是異步方式的話,發(fā)送 100 條消息所需要的時間就會少很多很多。大多數(shù)時候,雖然Kafka 會返回 RecordMetadata 消息,但是我們并不需要等待響應。

注:如果broker遲遲不給ack,而buffer又滿了,開發(fā)者可以設置是否直接清空buffer中的數(shù)據(jù)。

3.1.3 ack機制(確認機制)

生產(chǎn)者數(shù)據(jù)發(fā)送出去,需要服務端返回一個確認碼,即ack響應碼;ack的響應有三個狀態(tài)值0,1,-1

  • 0:生產(chǎn)者只負責發(fā)送數(shù)據(jù),不關(guān)心數(shù)據(jù)是否丟失,丟失的數(shù)據(jù),需要再次發(fā)送
  • 1:partition的leader收到數(shù)據(jù),不管follow是否同步完數(shù)據(jù),響應的狀態(tài)碼為1
  • -1:所有的從節(jié)點都收到數(shù)據(jù),響應的狀態(tài)碼為-1

如果broker端一直不返回ack狀態(tài),producer永遠不知道是否成功;producer可以設置一個超時時間10s,超過時間認為失敗。

3.2 Kafka 生產(chǎn)者分區(qū)機制

Kafka 對于數(shù)據(jù)的讀寫是以分區(qū)為粒度的,分區(qū)可以分布在多個主機(Broker)中,這樣每個節(jié)點能夠?qū)崿F(xiàn)獨立的數(shù)據(jù)寫入和讀取,并且能夠通過增加新的節(jié)點來增加 Kafka 集群的吞吐量,通過分區(qū)部署在多個 Broker 來實現(xiàn)負載均衡的效果。

3.2.1 分區(qū)策略

Kafka 的分區(qū)策略指的就是將生產(chǎn)者發(fā)送到哪個分區(qū)的算法。Kafka 為我們提供了默認的分區(qū)策略,同時它也支持你自定義分區(qū)策略。分區(qū)策略有下面這幾種:
順序輪詢

順序分配,消息是均勻的分配給每個 partition,即每個分區(qū)存儲一次消息。就像下面這樣

上圖表示的就是輪詢策略,輪訓策略是 Kafka Producer 提供的默認策略,如果你不使用指定的輪訓策略的話,Kafka 默認會使用順序輪訓策略的方式。

隨機輪詢

隨機輪詢簡而言之就是隨機的向 partition 中保存消息,如下圖所示


按照 key 進行消息保存

這個策略也叫做 key-ordering 策略,Kafka 中每條消息都會有自己的key,一旦消息被定義了 Key,那么你就可以保證同一個 Key 的所有消息都進入到相同的分區(qū)里面,由于每個分區(qū)下的消息處理都是有順序的,故這個策略被稱為按消息鍵保序策略,如下圖所示

3.3 Kafka 生產(chǎn)者壓縮機制

Kafka 的消息分為兩層:消息集合 和 消息。一個消息集合中包含若干條日志項,而日志項才是真正封裝消息的地方。Kafka 底層的消息日志由一系列消息集合日志項組成。Kafka 通常不會直接操作具體的一條條消息,它總是在消息集合這個層面上進行寫入操作。
Kafka Producer 中使用 compression.type 來開啟壓縮

private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");

Producer<String,String> producer = new KafkaProducer<String, String>(properties);

ProducerRecord<String,String> record =
  new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");

上面代碼表明該 Producer 的壓縮算法使用的是 GZIP

四、Kafka 消費者

4.1 分區(qū)重平衡

消費者演變過程大致如下:最初是一個消費者訂閱一個主題并消費其全部分區(qū)的消息,后來有一個消費者加入群組,隨后又有更多的消費者加入群組,而新加入的消費者實例分攤了最初消費者的部分消息,這種把分區(qū)的所有權(quán)通過一個消費者轉(zhuǎn)到其他消費者的行為稱為重平衡,英文名也叫做Rebalance` 。如下圖所示

重平衡非常重要,它為消費者群組帶來了高可用性伸縮性,我們可以放心的添加消費者或移除消費者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費者無法讀取消息,造成整個消費者組在重平衡的期間都不可用。另外,當分區(qū)被重新分配給另一個消費者時,消息當前的讀取狀態(tài)會丟失,它有可能還需要去刷新緩存,在它重新恢復狀態(tài)之前會拖慢應用程序。

消費者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護自己是消費者組的一員并確認其擁有的分區(qū)。對于不同不的消費群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費者定期發(fā)送心跳,就會認為消費者是存活的并處理其分區(qū)中的消息。當消費者檢索記錄或者提交它所消費的記錄時就會發(fā)送心跳。

如果過了一段時間 Kafka 停止發(fā)送心跳了,會話(Session)就會過期,組織協(xié)調(diào)者就會認為這個 Consumer 已經(jīng)死亡,就會觸發(fā)一次重平衡。如果消費者宕機并且停止發(fā)送消息,組織協(xié)調(diào)者會等待幾秒鐘,確認它死亡了才會觸發(fā)重平衡。在這段時間里,死亡的消費者將不處理任何消息。在清理消費者時,消費者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會觸發(fā)一次重平衡,盡量降低處理停頓。

重平衡是一把雙刃劍,它為消費者群組帶來高可用性和伸縮性的同時,還有有一些明顯的缺點(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。

重平衡的過程對消費者組有極大的影響。因為每次重平衡過程中都會導致萬物靜止,參考 JVM 中的垃圾回收機制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機》中 p76 關(guān)于 Serial 收集器的描述):

更重要的是它在進行垃圾收集時,必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個名字聽起來很帥,但這項工作實際上是由虛擬機在后臺自動發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對很多應用來說都是難以接受的。

也就是說,在重平衡期間,消費者組中的消費者實例都會停止消費,等待重平衡的完成。而且重平衡這個過程很慢......

4.2 分區(qū)重平衡流程

下面我們真正開始介紹 Rebalance 的過程。重平衡過程可以從兩個方面去看:消費者端和協(xié)調(diào)者端,首先我們先看一下消費者端

從消費者看重平衡

從消費者看重平衡有兩個步驟:分別是 消費者加入組等待領(lǐng)導者分配方案。這兩個步驟后分別對應的請求是 JoinGroupSyncGroup

新的消費者加入群組時,這個消費者會向協(xié)調(diào)器發(fā)送 JoinGroup 請求。在該請求中,每個消費者成員都需要將自己消費的 topic 進行提交,我們上面描述群組協(xié)調(diào)器中說過,這么做的目的就是為了讓協(xié)調(diào)器收集足夠的元數(shù)據(jù)信息,來選取消費者組的領(lǐng)導者。通常情況下,第一個發(fā)送 JoinGroup 請求的消費者會自動稱為領(lǐng)導者。領(lǐng)導者的任務是收集所有成員的訂閱信息,然后根據(jù)這些信息,制定具體的分區(qū)消費分配方案。如圖

在所有的消費者都加入進來并把元數(shù)據(jù)信息提交給領(lǐng)導者后,領(lǐng)導者做出分配方案并發(fā)送 SyncGroup請求給協(xié)調(diào)者,協(xié)調(diào)者負責下發(fā)群組中的消費策略。下圖描述了 SyncGroup 請求的過程

當所有成員都成功接收到分配方案后,消費者組進入到 Stable 狀態(tài),即開始正常的消費工作。

從協(xié)調(diào)者來看重平衡

從協(xié)調(diào)者角度來看重平衡主要有下面這幾種觸發(fā)條件,

  • 新成員加入組

  • 組成員主動離開

  • 組成員崩潰離開

  • 組成員提交位移

我們分別來描述一下,先從新成員加入組開始

新成員加入組

我們討論的場景消費者集群狀態(tài)處于Stable 等待分配的過程,這時候如果有新的成員加入組的話,重平衡的過程

從這個角度來看,協(xié)調(diào)者的過程和消費者類似,只是剛剛從消費者的角度去看,現(xiàn)在從領(lǐng)導者的角度去看

組成員離開

組成員離開消費者群組指的是消費者實例調(diào)用 close() 方法主動通知協(xié)調(diào)者它要退出。這里又會有一個新的請求出現(xiàn) LeaveGroup()請求 。如下圖所示

組成員崩潰

組成員崩潰是指消費者實例出現(xiàn)嚴重故障,宕機或者一段時間未響應,協(xié)調(diào)者接收不到消費者的心跳,就會被認為是組成員崩潰,崩潰離組是被動的,協(xié)調(diào)者通常需要等待一段時間才能感知到,這段時間一般是由消費者端參數(shù) session.timeout.ms 控制的。如下圖所示

五、Kafka 高可用

5.1 副本機制

復制功能是 Kafka 架構(gòu)的核心功能,在 Kafka 文檔里面 Kafka 把自己描述為 一個分布式的、可分區(qū)的、可復制的提交日志服務。復制之所以這么關(guān)鍵,是因為消息的持久存儲非常重要,這能夠保證在主節(jié)點宕機后依舊能夠保證 Kafka 高可用。副本機制也可以稱為備份機制(Replication),通常指分布式系統(tǒng)在多臺網(wǎng)絡交互的機器上保存有相同的數(shù)據(jù)備份/拷貝。

Kafka 使用主題來組織數(shù)據(jù),每個主題又被分為若干個分區(qū),分區(qū)會部署在一到多個 broker 上,每個分區(qū)都會有多個副本,所以副本也會被保存在 broker 上,每個 broker 可能會保存成千上萬個副本。下圖是一個副本復制示意圖

如上圖所示,為了簡單我只畫出了兩個 broker ,每個 broker 指保存了一個 Topic 的消息,在 broker1 中分區(qū) 0 是 Leader,它負責進行分區(qū)的復制工作,把 broker1 中的分區(qū) 0 復制一個副本到 broker2 的主題 A 的分區(qū) 0。同理,主題 A 的分區(qū) 1 也是一樣的道理。

副本類型分為兩種:一種是 Leader(領(lǐng)導者) 副本,一種是Follower(跟隨者)副本。

Leader 副本

Kafka 在創(chuàng)建分區(qū)的時候都要選舉一個副本,這個選舉出來的副本就是 Leader 領(lǐng)導者副本。

Follower 副本

除了 Leader 副本以外的副本統(tǒng)稱為 Follower 副本,F(xiàn)ollower 不對外提供服務。下面是 Leader 副本的工作方式

這幅圖需要注意以下幾點

  • Kafka 中,F(xiàn)ollower 副本也就是追隨者副本是不對外提供服務的。這就是說,任何一個追隨者副本都不能響應消費者和生產(chǎn)者的請求。所有的請求都是由領(lǐng)導者副本來處理?;蛘哒f,所有的請求都必須發(fā)送到 Leader 副本所在的 broker 中,F(xiàn)ollower 副本只是用做數(shù)據(jù)拉取,采用異步拉取的方式,并寫入到自己的提交日志中,從而實現(xiàn)與 Leader 的同步

  • 當 Leader 副本所在的 broker 宕機后,Kafka 依托于 ZooKeeper 提供的監(jiān)控功能能夠?qū)崟r感知到,并開啟新一輪的選舉,從追隨者副本中選一個作為 Leader。如果宕機的 broker 重啟完成后,該分區(qū)的副本會作為 Follower 重新加入。

首領(lǐng)的另一個任務是搞清楚哪個跟隨者的狀態(tài)與自己是一致的。跟隨者為了保證與領(lǐng)導者的狀態(tài)一致,在有新消息到達之前先嘗試從領(lǐng)導者那里復制消息。為了與領(lǐng)導者保持一致,跟隨者向領(lǐng)導者發(fā)起獲取數(shù)據(jù)的請求,這種請求與消費者為了讀取消息而發(fā)送的信息是一樣的。

跟隨者向領(lǐng)導者發(fā)送消息的過程是這樣的,先請求消息 1,然后再接收到消息 1,在時候到請求 1 之后,發(fā)送請求 2,在收到領(lǐng)導者給發(fā)送給跟隨者之前,跟隨者是不會繼續(xù)發(fā)送消息的。這個過程如下

跟隨者副本在收到響應消息前,是不會繼續(xù)發(fā)送消息,這一點很重要。通過查看每個跟隨者請求的最新偏移量,首領(lǐng)就會知道每個跟隨者復制的進度。如果跟隨者在 10s 內(nèi)沒有請求任何消息,或者雖然跟隨者已經(jīng)發(fā)送請求,但是在 10s 內(nèi)沒有收到消息,就會被認為是不同步的。如果一個副本沒有與領(lǐng)導者同步,那么在領(lǐng)導者掉線后,這個副本將不會稱為領(lǐng)導者,因為這個副本的消息不是全部的。

與之相反的,如果跟隨者同步的消息和領(lǐng)導者副本的消息一致,那么這個跟隨者副本又被稱為同步的副本。也就是說,如果領(lǐng)導者掉線,那么只有同步的副本能夠稱為領(lǐng)導者。

關(guān)于副本機制我們說了這么多,那么副本機制的好處是什么呢?

  • 能夠立刻看到寫入的消息,就是你使用生產(chǎn)者 API 成功向分區(qū)寫入消息后,馬上使用消費者就能讀取剛才寫入的消息

  • 能夠?qū)崿F(xiàn)消息的冪等性,啥意思呢?就是對于生產(chǎn)者產(chǎn)生的消息,在消費者進行消費的時候,它每次都會看到消息存在,并不會存在消息不存在的情況

同步復制和異步復制

我在學習副本機制的時候,有個疑問,既然領(lǐng)導者副本和跟隨者副本是發(fā)送 - 等待機制的,這是一種同步的復制方式,那么為什么說跟隨者副本同步領(lǐng)導者副本的時候是一種異步操作呢?

我認為是這樣的,跟隨者副本在同步領(lǐng)導者副本后會把消息保存在本地 log 中,這個時候跟隨者會給領(lǐng)導者副本一個響應消息,告訴領(lǐng)導者自己已經(jīng)保存成功了,同步復制的領(lǐng)導者會等待所有的跟隨者副本都寫入成功后,再返回給 producer 寫入成功的消息。而異步復制是領(lǐng)導者副本不需要關(guān)心跟隨者副本是否寫入成功,只要領(lǐng)導者副本自己把消息保存到本地 log ,就會返回給 producer 寫入成功的消息。下面是同步復制和異步復制的過程

同步復制

  • producer 通知 ZooKeeper 識別領(lǐng)導者

  • producer 向領(lǐng)導者寫入消息

  • 領(lǐng)導者收到消息后會把消息寫入到本地 log

  • 跟隨者會從領(lǐng)導者那里拉取消息

  • 跟隨者向本地寫入 log

  • 跟隨者向領(lǐng)導者發(fā)送寫入成功的消息

  • 領(lǐng)導者會收到所有的跟隨者發(fā)送的消息

  • 領(lǐng)導者向 producer 發(fā)送寫入成功的消息

異步復制

和同步復制的區(qū)別在于,領(lǐng)導者在寫入本地 log 之后,直接向客戶端發(fā)送寫入成功消息,不需要等待所有跟隨者復制完成。

ISR

Kafka 動態(tài)維護了一個同步狀態(tài)的副本的集合(a set of In-Sync Replicas),簡稱ISR,ISR 也是一個很重要的概念,我們之前說過,追隨者副本不提供服務,只是定期的異步拉取領(lǐng)導者副本的數(shù)據(jù)而已,拉取這個操作就相當于是復制,ctrl-c + ctrl-v大家肯定用的熟。那么是不是說 ISR 集合中的副本消息的數(shù)量都會與領(lǐng)導者副本消息數(shù)量一樣呢?那也不一定,判斷的依據(jù)是 broker 中參數(shù) replica.lag.time.max.ms 的值,這個參數(shù)的含義就是跟隨者副本能夠落后領(lǐng)導者副本最長的時間間隔。

replica.lag.time.max.ms 參數(shù)默認的時間是 10 秒,如果跟隨者副本落后領(lǐng)導者副本的時間不超過 10 秒,那么 Kafka 就認為領(lǐng)導者和跟隨者是同步的。即使此時跟隨者副本中存儲的消息要小于領(lǐng)導者副本。如果跟隨者副本要落后于領(lǐng)導者副本 10 秒以上的話,跟隨者副本就會從 ISR 被剔除。倘若該副本后面慢慢地追上了領(lǐng)導者的進度,那么它是能夠重新被加回 ISR 的。這也表明,ISR 是一個動態(tài)調(diào)整的集合,而非靜態(tài)不變的。

Unclean 副本領(lǐng)導者選舉

既然 ISR 是可以動態(tài)調(diào)整的,那么必然會出現(xiàn) ISR 集合中為空的情況,由于領(lǐng)導者副本是一定出現(xiàn)在 ISR 集合中的,那么 ISR 集合為空必然說明領(lǐng)導者副本也掛了,所以此時 Kafka 需要重新選舉一個新的領(lǐng)導者,那么該如何選舉呢?現(xiàn)在你需要轉(zhuǎn)變一下思路,我們上面說 ISR 集合中一定是與領(lǐng)導者同步的副本,那么不再 ISR 集合中的副本一定是不與領(lǐng)導者同步的副本了,也就是不再 ISR 列表中的跟隨者副本會丟失一些消息。如果你開啟 broker 端參數(shù) unclean.leader.election.enable的話,下一個領(lǐng)導者就會在這些非同步的副本中選舉。這種選舉也叫做Unclean 領(lǐng)導者選舉。

如果你接觸過分布式項目的話你一定知道 CAP 理論,那么這種 Unclean 領(lǐng)導者選舉其實是犧牲了數(shù)據(jù)一致性,保證了 Kafka 的高可用性。

你可以根據(jù)你的實際業(yè)務場景決定是否開啟 Unclean 領(lǐng)導者選舉,一般不建議開啟這個參數(shù),因為數(shù)據(jù)的一致性要比可用性重要的多。

5.2 控制器機制

broker 之間也有一個控制器組件(Controller),它是 Kafka 的核心組件。它的主要作用是在 ZooKeeper 的幫助下管理和協(xié)調(diào)整個 Kafka 集群,集群中的每個 broker 都可以稱為 controller。

控制器的選舉

Kafka 當前選舉控制器的規(guī)則是:Kafka 集群中第一個啟動的 broker 通過在 ZooKeeper 里創(chuàng)建一個臨時節(jié)點 /controller 讓自己成為 controller 控制器。其他 broker 在啟動時也會嘗試創(chuàng)建這個節(jié)點,但是由于這個節(jié)點已存在,所以后面想要創(chuàng)建 /controller 節(jié)點時就會收到一個 節(jié)點已存在 的異常。然后其他 broker 會在這個控制器上注冊一個 ZooKeeper 的 watch 對象,/controller節(jié)點發(fā)生變化時,其他 broker 就會收到節(jié)點變更通知。這種方式可以確保只有一個控制器存在。那么只有單獨的節(jié)點一定是有個問題的,那就是單點問題。

如果控制器關(guān)閉或者與 ZooKeeper 斷開鏈接,ZooKeeper 上的臨時節(jié)點就會消失。集群中的其他節(jié)點收到 watch 對象發(fā)送控制器下線的消息后,其他 broker 節(jié)點都會嘗試讓自己去成為新的控制器。其他節(jié)點的創(chuàng)建規(guī)則和第一個節(jié)點的創(chuàng)建原則一致,都是第一個在 ZooKeeper 里成功創(chuàng)建控制器節(jié)點的 broker 會成為新的控制器,那么其他節(jié)點就會收到節(jié)點已存在的異常,然后在新的控制器節(jié)點上再次創(chuàng)建 watch 對象進行監(jiān)聽。

broker controller 故障轉(zhuǎn)移

broker controller 故障轉(zhuǎn)移主要依賴于zookeeper。一開始,broker1 會搶先注冊成功成為 controller,然后由于網(wǎng)絡抖動或者其他原因致使 broker1 掉線,ZooKeeper 通過 Watch 機制覺察到 broker1 的掉線,之后所有存活的 brokers 開始競爭成為 controller,這時 broker3 搶先注冊成功,此時 ZooKeeper 存儲的 controller 信息由 broker1 -> broker3,之后,broker3 會從 ZooKeeper 中讀取元數(shù)據(jù)信息,并初始化到自己的緩存中。

六、Kafka 為什么這么快

6.1 利用 Partition 實現(xiàn)并行處理

我們都知道 Kafka 是一個 Pub-Sub 的消息系統(tǒng),無論是發(fā)布還是訂閱,都要指定 Topic。

Topic 只是一個邏輯的概念。每個 Topic 都包含一個或多個 Partition,不同 Partition 可位于不同節(jié)點。

一方面,由于不同 Partition 可位于不同機器,因此可以充分利用集群優(yōu)勢,實現(xiàn)機器間的并行處理。另一方面,由于 Partition 在物理上對應一個文件夾,即使多個 Partition 位于同一個節(jié)點,也可通過配置讓同一節(jié)點上的不同 Partition 置于不同的磁盤上,從而實現(xiàn)磁盤間的并行處理,充分發(fā)揮多磁盤的優(yōu)勢。

能并行處理,速度肯定會有提升,多個工人肯定比一個工人干的快。

6.2 順序?qū)懘疟P

Kafka 中每個分區(qū)是一個有序的,不可變的消息序列,新的消息不斷追加到 partition 的末尾,這個就是順序?qū)憽?/p>

由于磁盤有限,不可能保存所有數(shù)據(jù),實際上作為消息系統(tǒng) Kafka 也沒必要保存所有數(shù)據(jù),需要刪除舊的數(shù)據(jù)。又由于順序?qū)懭氲脑?,所?Kafka 采用各種刪除策略刪除數(shù)據(jù)的時候,并非通過使用“讀 - 寫”模式去修改文件,而是將 Partition 分為多個 Segment,每個 Segment 對應一個物理文件,通過刪除整個文件的方式去刪除 Partition 內(nèi)的數(shù)據(jù)。這種方式清除舊數(shù)據(jù)的方式,也避免了對文件的隨機寫操作。

6.3 充分利用 Page Cache

引入 Cache 層的目的是為了提高 Linux 操作系統(tǒng)對磁盤訪問的性能。Cache 層在內(nèi)存中緩存了磁盤上的部分數(shù)據(jù)。當數(shù)據(jù)的請求到達時,如果在 Cache 中存在該數(shù)據(jù)且是最新的,則直接將數(shù)據(jù)傳遞給用戶程序,免除了對底層磁盤的操作,提高了性能。Cache 層也正是磁盤 IOPS 為什么能突破 200 的主要原因之一。

在 Linux 的實現(xiàn)中,文件 Cache 分為兩個層面,一是 Page Cache,另一個 Buffer Cache,每一個 Page Cache 包含若干 Buffer Cache。Page Cache 主要用來作為文件系統(tǒng)上的文件數(shù)據(jù)的緩存來用,尤其是針對當進程對文件有 read/write 操作的時候。Buffer Cache 則主要是設計用來在系統(tǒng)對塊設備進行讀寫的時候,對塊進行數(shù)據(jù)緩存的系統(tǒng)來使用。

使用 Page Cache 的好處:

  • I/O Scheduler 會將連續(xù)的小塊寫組裝成大塊的物理寫從而提高性能
  • I/O Scheduler 會嘗試將一些寫操作重新按順序排好,從而減少磁盤頭的移動時間
  • 充分利用所有空閑內(nèi)存(非 JVM 內(nèi)存)。如果使用應用層 Cache(即 JVM 堆內(nèi)存),會增加 GC 負擔
  • 讀操作可直接在 Page Cache 內(nèi)進行。如果消費和生產(chǎn)速度相當,甚至不需要通過物理磁盤(直接通過 Page Cache)交換數(shù)據(jù)
  • 如果進程重啟,JVM 內(nèi)的 Cache 會失效,但 Page Cache 仍然可用

Broker 收到數(shù)據(jù)后,寫磁盤時只是將數(shù)據(jù)寫入 Page Cache,并不保證數(shù)據(jù)一定完全寫入磁盤。從這一點看,可能會造成機器宕機時,Page Cache 內(nèi)的數(shù)據(jù)未寫入磁盤從而造成數(shù)據(jù)丟失。但是這種丟失只發(fā)生在機器斷電等造成操作系統(tǒng)不工作的場景,而這種場景完全可以由 Kafka 層面的 Replication 機制去解決。如果為了保證這種情況下數(shù)據(jù)不丟失而強制將 Page Cache 中的數(shù)據(jù) Flush 到磁盤,反而會降低性能。也正因如此,Kafka 雖然提供了 flush.messages 和 flush.ms 兩個參數(shù)將 Page Cache 中的數(shù)據(jù)強制 Flush 到磁盤,但是 Kafka 并不建議使用。

6.4 零拷貝技術(shù)

Kafka 中存在大量的網(wǎng)絡數(shù)據(jù)持久化到磁盤(Producer 到 Broker)和磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)的過程。這一過程的性能直接影響 Kafka 的整體吞吐量。

操作系統(tǒng)的核心是內(nèi)核,獨立于普通的應用程序,可以訪問受保護的內(nèi)存空間,也有訪問底層硬件設備的權(quán)限。

為了避免用戶進程直接操作內(nèi)核,保證內(nèi)核安全,操作系統(tǒng)將虛擬內(nèi)存劃分為兩部分,一部分是內(nèi)核空間(Kernel-space),一部分是用戶空間(User-space)。

傳統(tǒng)的 Linux 系統(tǒng)中,標準的 I/O 接口(例如 read,write)都是基于數(shù)據(jù)拷貝操作的,即 I/O 操作會導致數(shù)據(jù)在內(nèi)核地址空間的緩沖區(qū)和用戶地址空間的緩沖區(qū)之間進行拷貝,所以標準 I/O 也被稱作緩存 I/O。這樣做的好處是,如果所請求的數(shù)據(jù)已經(jīng)存放在內(nèi)核的高速緩沖存儲器中,那么就可以減少實際的 I/O 操作,但壞處就是數(shù)據(jù)拷貝的過程,會導致 CPU 開銷。

我們把 Kafka 的生產(chǎn)和消費簡化成如下兩個過程來看

  1. 網(wǎng)絡數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)
  2. 磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)

6.4.1 網(wǎng)絡數(shù)據(jù)持久化到磁盤 (Producer 到 Broker)

傳統(tǒng)模式下,數(shù)據(jù)從網(wǎng)絡傳輸?shù)轿募枰?4 次數(shù)據(jù)拷貝、4 次上下文切換和兩次系統(tǒng)調(diào)用。

data = socket.read()// 讀取網(wǎng)絡數(shù)據(jù) 
File file = new File() 
file.write(data)// 持久化到磁盤 
file.flush() 

這一過程實際上發(fā)生了四次數(shù)據(jù)拷貝:

  1. 首先通過 DMA copy 將網(wǎng)絡數(shù)據(jù)拷貝到內(nèi)核態(tài) Socket Buffer

  2. 然后應用程序?qū)?nèi)核態(tài) Buffer 數(shù)據(jù)讀入用戶態(tài)(CPU copy)

  3. 接著用戶程序?qū)⒂脩魬B(tài) Buffer 再拷貝到內(nèi)核態(tài)(CPU copy)

  4. 最后通過 DMA copy 將數(shù)據(jù)拷貝到磁盤文件

DMA(Direct Memory Access):直接存儲器訪問。DMA 是一種無需 CPU 的參與,讓外設和系統(tǒng)內(nèi)存之間進行雙向數(shù)據(jù)傳輸?shù)挠布C制。使用 DMA 可以使系統(tǒng) CPU 從實際的 I/O 數(shù)據(jù)傳輸過程中擺脫出來,從而大大提高系統(tǒng)的吞吐率。

同時,還伴隨著四次上下文切換,如下圖所示

數(shù)據(jù)落盤通常都是非實時的,kafka 生產(chǎn)者數(shù)據(jù)持久化也是如此。Kafka 的數(shù)據(jù)并不是實時的寫入硬盤,它充分利用了現(xiàn)代操作系統(tǒng)分頁存儲來利用內(nèi)存提高 I/O 效率,就是上一節(jié)提到的 Page Cache。

對于 kafka 來說,Producer 生產(chǎn)的數(shù)據(jù)存到 broker,這個過程讀取到 socket buffer 的網(wǎng)絡數(shù)據(jù),其實可以直接在內(nèi)核空間完成落盤。并沒有必要將 socket buffer 的網(wǎng)絡數(shù)據(jù),讀取到應用進程緩沖區(qū);在這里應用進程緩沖區(qū)其實就是 broker,broker 收到生產(chǎn)者的數(shù)據(jù),就是為了持久化。

在此特殊場景下:接收來自 socket buffer 的網(wǎng)絡數(shù)據(jù),應用進程不需要中間處理、直接進行持久化時??梢允褂?mmap 內(nèi)存文件映射。

Memory Mapped Files:簡稱 mmap,也有叫 MMFile 的,使用 mmap 的目的是將內(nèi)核中讀緩沖區(qū)(read buffer)的地址與用戶空間的緩沖區(qū)(user buffer)進行映射。從而實現(xiàn)內(nèi)核緩沖區(qū)與應用程序內(nèi)存的共享,省去了將數(shù)據(jù)從內(nèi)核讀緩沖區(qū)(read buffer)拷貝到用戶緩沖區(qū)(user buffer)的過程。它的工作原理是直接利用操作系統(tǒng)的 Page 來實現(xiàn)文件到物理內(nèi)存的直接映射。完成映射之后你對物理內(nèi)存的操作會被同步到硬盤上。

使用這種方式可以獲取很大的 I/O 提升,省去了用戶空間到內(nèi)核空間復制的開銷。

mmap 也有一個很明顯的缺陷——不可靠,寫到 mmap 中的數(shù)據(jù)并沒有被真正的寫到硬盤,操作系統(tǒng)會在程序主動調(diào)用 flush 的時候才把數(shù)據(jù)真正的寫到硬盤。Kafka 提供了一個參數(shù)——producer.type 來控制是不是主動 flush;如果 Kafka 寫入到 mmap 之后就立即 flush 然后再返回 Producer 叫同步(sync);寫入 mmap 之后立即返回 Producer 不調(diào)用 flush 就叫異步(async),默認是 sync。

零拷貝(Zero-copy)技術(shù)指在計算機執(zhí)行操作時,CPU 不需要先將數(shù)據(jù)從一個內(nèi)存區(qū)域復制到另一個內(nèi)存區(qū)域,從而可以減少上下文切換以及 CPU 的拷貝時間。

它的作用是在數(shù)據(jù)報從網(wǎng)絡設備到用戶程序空間傳遞的過程中,減少數(shù)據(jù)拷貝次數(shù),減少系統(tǒng)調(diào)用,實現(xiàn) CPU 的零參與,徹底消除 CPU 在這方面的負載。

目前零拷貝技術(shù)主要有三種類型[3]

  • 直接 I/O:數(shù)據(jù)直接跨過內(nèi)核,在用戶地址空間與 I/O 設備之間傳遞,內(nèi)核只是進行必要的虛擬存儲配置等輔助工作;
  • 避免內(nèi)核和用戶空間之間的數(shù)據(jù)拷貝:當應用程序不需要對數(shù)據(jù)進行訪問時,則可以避免將數(shù)據(jù)從內(nèi)核空間拷貝到用戶空間
    a. mmap
    b. sendfile
    c. splice && tee
    d. sockmap
  • copy on write:寫時拷貝技術(shù),數(shù)據(jù)不需要提前拷貝,而是當需要修改的時候再進行部分拷貝。

6.4.2 磁盤文件通過網(wǎng)絡發(fā)送(Broker 到 Consumer)

傳統(tǒng)方式實現(xiàn):先讀取磁盤、再用 socket 發(fā)送,實際也是進過四次 copy

buffer = File.read 
Socket.send(buffer)

這一過程可以類比上邊的生產(chǎn)消息:

  1. 首先通過系統(tǒng)調(diào)用將文件數(shù)據(jù)讀入到內(nèi)核態(tài) Buffer(DMA 拷貝)
  2. 然后應用程序?qū)?nèi)存態(tài) Buffer 數(shù)據(jù)讀入到用戶態(tài) Buffer(CPU 拷貝)
  3. 接著用戶程序通過 Socket 發(fā)送數(shù)據(jù)時將用戶態(tài) Buffer 數(shù)據(jù)拷貝到內(nèi)核態(tài) Buffer(CPU 拷貝)
  4. 最后通過 DMA 拷貝將數(shù)據(jù)拷貝到 NIC Buffer

Linux 2.4+ 內(nèi)核通過 sendfile 系統(tǒng)調(diào)用,提供了零拷貝。數(shù)據(jù)通過 DMA 拷貝到內(nèi)核態(tài) Buffer 后,直接通過 DMA 拷貝到 NIC Buffer,無需 CPU 拷貝。這也是零拷貝這一說法的來源。除了減少數(shù)據(jù)拷貝外,因為整個讀文件 - 網(wǎng)絡發(fā)送由一個 sendfile 調(diào)用完成,整個過程只有兩次上下文切換,因此大大提高了性能。

Kafka 在這里采用的方案是通過 NIO 的 transferTo/transferFrom 調(diào)用操作系統(tǒng)的 sendfile 實現(xiàn)零拷貝??偣舶l(fā)生 2 次內(nèi)核數(shù)據(jù)拷貝、2 次上下文切換和一次系統(tǒng)調(diào)用,消除了 CPU 數(shù)據(jù)拷貝。

6.5 數(shù)據(jù)壓縮

Producer 可將數(shù)據(jù)壓縮后發(fā)送給 broker,從而減少網(wǎng)絡傳輸代價,目前支持的壓縮算法有:Snappy、Gzip、LZ4。數(shù)據(jù)壓縮一般都是和批處理配套使用來作為優(yōu)化手段的。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容