kafka原理

一、為什么需要消息系統(tǒng)

1.解耦:
  允許你獨立的擴展或修改兩邊的處理過程,只要確保它們遵守同樣的接口約束。
2.冗余:
  消息隊列把數(shù)據(jù)進行持久化直到它們已經(jīng)被完全處理,通過這一方式規(guī)避了數(shù)據(jù)丟失風險。許多消息隊列所采用的"插入-獲取-刪除"范式中,在把一個消息從隊列中刪除之前,需要你的處理系統(tǒng)明確的指出該消息已經(jīng)被處理完畢,從而確保你的數(shù)據(jù)被安全的保存直到你使用完畢。
3.擴展性:
  因為消息隊列解耦了你的處理過程,所以增大消息入隊和處理的頻率是很容易的,只要另外增加處理過程即可。
4.靈活性 & 峰值處理能力:
  在訪問量劇增的情況下,應用仍然需要繼續(xù)發(fā)揮作用,但是這樣的突發(fā)流量并不常見。如果為以能處理這類峰值訪問為標準來投入資源隨時待命無疑是巨大的浪費。使用消息隊列能夠使關鍵組件頂住突發(fā)的訪問壓力,而不會因為突發(fā)的超負荷的請求而完全崩潰。
5.可恢復性:
  系統(tǒng)的一部分組件失效時,不會影響到整個系統(tǒng)。消息隊列降低了進程間的耦合度,所以即使一個處理消息的進程掛掉,加入隊列中的消息仍然可以在系統(tǒng)恢復后被處理。
6.順序保證:
  在大多使用場景下,數(shù)據(jù)處理的順序都很重要。大部分消息隊列本來就是排序的,并且能保證數(shù)據(jù)會按照特定的順序來處理。(Kafka 保證一個 Partition 內(nèi)的消息的有序性)
7.緩沖:
  有助于控制和優(yōu)化數(shù)據(jù)流經(jīng)過系統(tǒng)的速度,解決生產(chǎn)消息和消費消息的處理速度不一致的情況。
8.異步通信:
  很多時候,用戶不想也不需要立即處理消息。消息隊列提供了異步處理機制,允許用戶把一個消息放入隊列,但并不立即處理它。想向隊列中放入多少消息就放多少,然后在需要的時候再去處理它們。

二、kafka 架構(gòu)

2.1 拓撲結(jié)構(gòu)
如下圖:

image.png

2.2 相關概念
如圖.1中,kafka 相關名詞解釋如下:

1.producer:
  消息生產(chǎn)者,發(fā)布消息到 kafka 集群的終端或服務。
2.broker:
  kafka 集群中包含的服務器。
3.topic:
  每條發(fā)布到 kafka 集群的消息屬于的類別,即 kafka 是面向 topic 的。
4.partition:
  partition 是物理上的概念,每個 topic 包含一個或多個 partition。kafka 分配的單位是 partition。
5.consumer:
  從 kafka 集群中消費消息的終端或服務。
6.Consumer group:
  high-level consumer API 中,每個 consumer 都屬于一個 consumer group,每條消息只能被 consumer group 中的一個 Consumer 消費,但可以被多個 consumer group 消費。
7.replica:
  partition 的副本,保障 partition 的高可用。
8.leader:
  replica 中的一個角色, producer 和 consumer 只跟 leader 交互。
9.follower:
  replica 中的一個角色,從 leader 中復制數(shù)據(jù)。
10.controller:
  kafka 集群中的其中一個服務器,用來進行 leader election 以及 各種 failover。
12.zookeeper:
  kafka 通過 zookeeper 來存儲集群的 meta 信息。

2.3 zookeeper 節(jié)點
kafka 在 zookeeper 中的存儲結(jié)構(gòu)如下圖所示:

image.png

2.4 kafka controller

Kakfa Broker Leader的選舉:Kakfa Broker集群受Zookeeper管理。所有的Kafka Broker節(jié)點一起去Zookeeper上注冊一個臨時節(jié)點,因為只有一個Kafka
Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節(jié)點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。(這個過程叫Controller在ZooKeeper注冊Watch)。這個Controller會監(jiān)聽其他的Kafka Broker的所有信息,如果這個kafka broker controller宕機了,在zookeeper上面的那個臨時節(jié)點就會消失,此時所有的kafka
broker又會一起去Zookeeper上注冊一個臨時節(jié)點,因為只有一個Kafka Broker會注冊成功,其他的都會失敗,所以這個成功在Zookeeper上注冊臨時節(jié)點的這個Kafka Broker會成為Kafka Broker Controller,其他的Kafka broker叫Kafka Broker follower。

三、producer 發(fā)布消息

3.1 寫入方式
producer 采用 push 模式將消息發(fā)布到 broker,每條消息都被 append 到 patition 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機寫內(nèi)存要高,保障 kafka 吞吐率)。

3.2 消息路由
producer 發(fā)送消息到 broker 時,會根據(jù)分區(qū)算法選擇將其存儲到哪一個 partition。其路由機制為:

1\. 指定了 patition,則直接使用;
2\. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition
3\. patition 和 key 都未指定,使用輪詢選出一個 patition。

3.3 寫入流程
producer 寫入消息序列圖如下所示:

image.png
1\. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點找到該 partition 的 leader
2\. producer 將消息發(fā)送給該 leader
3\. leader 將消息寫入本地 log
4\. followers 從 leader pull 消息,寫入本地 log 后 leader 發(fā)送 ACK
5\. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK

3.4 producer delivery guarantee
一般情況下存在三種情況:

1\. At most once 消息可能會丟,但絕不會重復傳輸
2\. At least one 消息絕不會丟,但可能會重復傳輸
3\. Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次

當 producer 向 broker 發(fā)送消息時,一旦這條消息被 commit,由于 replication 的存在,它就不會丟。但是如果 producer 發(fā)送數(shù)據(jù)給 broker 后,遇到網(wǎng)絡問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無法確定網(wǎng)絡故障期間發(fā)生了什么,但是 producer 可以生成一種類似于主鍵的東西,發(fā)生故障時冪等性的重試多次,這樣就做到了 Exactly once,但目前還并未實現(xiàn)。所以目前默認情況下一條消息從 producer 到 broker 是確保了 At least once,可通過設置 producer 異步發(fā)送實現(xiàn)At most once。

四、broker 保存消息

4.1 存儲方式
物理上把 topic 分成一個或多個 patition(對應 server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:

image.png

4.2 存儲策略
無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

  1. 基于時間:log.retention.hours=168
  2. 基于大?。簂og.retention.bytes=1073741824
    需要注意的是,因為Kafka讀取特定消息的時間復雜度為O(1),即與文件大小無關,所以這里刪除過期文件與提高 Kafka 性能無關。

4.3 topic 創(chuàng)建與刪除
4.3.1 創(chuàng)建 topic
創(chuàng)建 topic 的序列圖如下所示:

image.png
  1. controller 在 ZooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher,當 topic 被創(chuàng)建,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。

  2. controller從 /brokers/ids 讀取當前所有可用的 broker 列表,對于 set_p 中的每一個 partition:
    2.1 從分配給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并將AR設置為新的 ISR
    2.2 將新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state

  3. controller 通過 RPC 向相關的 broker 發(fā)送 LeaderAndISRRequest。
    4.3.2 刪除 topic
    刪除 topic 的序列圖如下所示:

image.png
流程說明:
  1. controller 在 zooKeeper 的 /brokers/topics 節(jié)點上注冊 watcher,當 topic 被刪除,則 controller 會通過 watch 得到該 topic 的 partition/replica 分配。

  2. 若 delete.topic.enable=false,結(jié)束;否則 controller 注冊在 /admin/delete_topics 上的 watch 被 fire,controller 通過回調(diào)向?qū)?broker 發(fā)送 StopReplicaRequest。

五、kafka 高可用機制

5.1 replication
如圖.1所示,同一個 partition 可能會有多個 replica(對應 server.properties 配置中的 default.replication.factor=N)。沒有 replica 的情況下,一旦 broker 宕機,其上所有 patition 的數(shù)據(jù)都不可被消費,同時 producer 也不能再將數(shù)據(jù)存于其上的 patition。引入replication 之后,同一個 partition 可能會有多個 replica,而這時需要在這些 replica 之間選出一個 leader,producer 和 consumer 只與這個 leader 交互,其它 replica 作為 follower 從 leader 中復制數(shù)據(jù)。

Kafka 分配 Replica 的算法如下:

  1. 將所有 broker(假設共 n 個 broker)和待分配的 partition 排序
  2. 將第 i 個 partition 分配到第(i mod n)個 broker 上
  3. 將第 i 個 partition 的第 j 個 replica 分配到第((i + j) mode n)個 broker上

5.2 leader failover
當 partition 對應的 leader 宕機時,需要從 follower 中選舉出新 leader。在選舉新leader時,一個基本的原則是,新的 leader 必須擁有舊 leader commit 過的所有消息。

kafka 在 zookeeper 中(/brokers/.../state)動態(tài)維護了一個 ISR(in-sync replicas),由3.3節(jié)的寫入流程可知 ISR 里面的所有 replica 都跟上了 leader,只有 ISR 里面的成員才能選為 leader。對于 f+1 個 replica,一個 partition 可以在容忍 f 個 replica 失效的情況下保證消息不丟失。

當所有 replica 都不工作時,有兩種可行的方案:

  1. 等待 ISR 中的任一個 replica 活過來,并選它作為 leader。可保障數(shù)據(jù)不丟失,但時間可能相對較長。
  2. 選擇第一個活過來的 replica(不一定是 ISR 成員)作為 leader。無法保障數(shù)據(jù)不丟失,但相對不可用時間較短。
    kafka 0.8.* 使用第二種方式。

kafka 通過 Controller 來選舉 leader,流程請參考5.3節(jié)。

5.3 broker failover
kafka broker failover 序列圖如下所示:

image.png
  1. controller 在 zookeeper 的 /brokers/ids/[brokerId] 節(jié)點注冊 Watcher,當 broker 宕機時 zookeeper 會 fire watch
  2. controller 從 /brokers/ids 節(jié)點讀取可用broker
  3. controller決定set_p,該集合包含宕機 broker 上的所有 partition
  4. 對 set_p 中的每一個 partition
    4.1 從/brokers/topics/[topic]/partitions/[partition]/state 節(jié)點讀取 ISR
    4.2 決定新 leader(如4.3節(jié)所描述)
    4.3 將新 leader、ISR、controller_epoch 和 leader_epoch 等信息寫入 state 節(jié)點
  5. 通過 RPC 向相關 broker 發(fā)送 leaderAndISRRequest 命令

6. consumer 消費消息

6.1 consumer API
kafka 提供了兩套 consumer API:

  1. The high-level Consumer API
  2. The SimpleConsumer API
    其中 high-level consumer API 提供了一個從 kafka 消費數(shù)據(jù)的高層抽象,而 SimpleConsumer API 則需要開發(fā)人員更多地關注細節(jié)。

以下針對 high-level Consumer API 進行說明。

6.2 consumer group
如 2.2 節(jié)所說, kafka 的分配單位是 patition。每個 consumer 都屬于一個 group,一個 partition 只能被同一個 group 內(nèi)的一個 consumer 所消費(也就保障了一個消息只能被 group 內(nèi)的一個 consuemr 所消費),但是多個 group 可以同時消費這個 partition。

kafka 的設計目標之一就是同時實現(xiàn)離線處理和實時處理,根據(jù)這一特性,可以使用 spark/Storm 這些實時處理系統(tǒng)對消息在線處理,同時使用 Hadoop 批處理系統(tǒng)進行離線處理,還可以將數(shù)據(jù)備份到另一個數(shù)據(jù)中心,只需要保證這三者屬于不同的 consumer group。如下圖所示:

image.png

6.3 消費方式
consumer 采用 pull 模式從 broker 中讀取數(shù)據(jù)。

push 模式很難適應消費速率不同的消費者,因為消息發(fā)送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞消息,但是這樣很容易造成 consumer 來不及處理消息,典型的表現(xiàn)就是拒絕服務以及網(wǎng)絡擁塞。而 pull 模式則可以根據(jù) consumer 的消費能力以適當?shù)乃俾氏M消息。

kafka文件的存儲機制

(1)、kafka文件存儲基本結(jié)構(gòu)

  • 在Kafka文件存儲中,同一個topic下有多個不同partition,每個partition為一個目錄,partiton命名規(guī)則為topic名稱+有序序號,第一個partiton序號從0開始,序號最大值為partitions數(shù)量減1。
    如,前篇里面的orderMq這個topic對應的partitions在三臺機器上名稱分別為
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-2

drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-1
drwxr-xr-x. 2 root root 4096 11月 14 18:45 orderMq-2

drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-0
drwxr-xr-x. 2 root root 4096 11月 21 22:25 orderMq-1

注:重復的是副本,partition是為orderMq-0,orderMq-1,orderMq-2

  • 每個partion(目錄)相當于一個巨型文件被平均分配到多個大小相等segment(段)數(shù)據(jù)文件中。但每個段segment file消息數(shù)量不一定相等,這種特性方便old segment file快速被刪除。默認保留7天的數(shù)據(jù)。
    如orderMq-0目錄下(index和log為后綴名的文件合稱就是segment 文件)
[root@mini3 orderMq-0]# ll
總用量 4
-rw-r--r--. 1 root root 10485760 11月 21 22:31 00000000000000000000.index
-rw-r--r--. 1 root root      219 11月 22 05:22 00000000000000000000.log

image.png

-每個partiton只需要支持順序讀寫就行了,segment文件生命周期由服務端配置參數(shù)決定。(什么時候創(chuàng)建,什么時候刪除)

(2)kafka的segment 文件

  • Segment file組成:由2大部分組成,分別為index file和data file,此2個文件一一對應,成對出現(xiàn),后綴”.index”和“.log”分別表示為segment索引文件、數(shù)據(jù)文件。
image.png
  • Segment文件命名規(guī)則:partion全局的第一個segment從0開始,后續(xù)每個segment文件名為上一個segment文件最后一條消息的offset值。數(shù)值最大為64位long大小,19位數(shù)字字符長度,沒有數(shù)字用0填充。

  • 索引文件存儲大量元數(shù)據(jù),數(shù)據(jù)文件存儲大量消息,索引文件中元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址。

image.png
3,497:當前l(fā)og文件中的第幾條信息,存放在磁盤上的那個地方 
上述圖中索引文件存儲大量元數(shù)據(jù),數(shù)據(jù)文件存儲大量消息,索引文件中元數(shù)據(jù)指向?qū)獢?shù)據(jù)文件中message的物理偏移地址。 
其中以索引文件中元數(shù)據(jù)3,497為例,依次在數(shù)據(jù)文件中表示第3個message(在全局partiton表示第368772個message)、以及該消息的物理偏移地址為497。 
(3)、kafka查找message 
讀取offset=368776的message,需要通過下面2個步驟查找。 

image.png
第一步:查找segment file 
00000000000000000000.index表示最開始的文件,起始偏移量(offset)為0 
00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1 
00000000000000737337.index的起始偏移量為737338=737337 + 1 
其他后續(xù)文件依次類推。 
以起始偏移量命名并排序這些文件,只要根據(jù)offset **二分查找**文件列表,就可以快速定位到具體文件。當offset=368776時定位到00000000000000368769.index和對應log文件。

第二步:通過segment file查找message
當offset=368776時,依次定位到00000000000000368769.index的元數(shù)據(jù)物理位置和00000000000000368769.log的物理偏移地址
然后再通過00000000000000368769.log順序查找直到offset=368776為止。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容