kafka基礎(chǔ)原理總結(jié)

1 目標(biāo)

kafka在數(shù)據(jù)緩沖、異步通信、匯集日志、系統(tǒng)解耦等方面有廣泛的應(yīng)用,旨在了解kafka基礎(chǔ)原理。

2 ** 基本概念**

2.1 生產(chǎn)者

Producer:用于往Broker中發(fā)送/生產(chǎn)Message。

2.2 消費者

Consumer消息消費者,向 Kafka broker 讀取消息的客戶端。

Consumer Group每個 Consumer 屬于一個特定的 Consumer Group(可為每個 Consumer 指定 group name,若不指定 group name 則屬于默認(rèn)的 group)。

Offset:消息者的消費進度,在Partition中的編號,編號順序不跨Partition。

2.3 隊列及備份

Broker:已發(fā)布的消息保存在一組服務(wù)器中,它們被稱為代理(Broker)或Kafka集群。

Topic:用于劃分Message的邏輯概念,一個Topic可以分布在多個Broker上。

Partition:是Kafka中橫向擴展和一切并行化的基礎(chǔ),每個Topic都至少被切分為1個Partition。

邏輯log:一個Partition對應(yīng)一個邏輯log

Segment:一個partition有多個segment ,寫總是對應(yīng)最新的segment,刪總是對應(yīng)最舊的segment,讀哪一個segment根據(jù)consumer的offset來決定

Replication:Kafka支持以Partition為單位對Message進行冗余備份,每個Partition都可以配置至少1個Replication(當(dāng)僅1個Replication時即僅該Partition本身)。

Leader:每個Replication集合中的Partition都會選出一個唯一的Leader,所有的讀寫請求都由Leader處理。其他Replicas從Leader處把數(shù)據(jù)更新同步到本地,過程類似大家熟悉的MySQL中的Binlog同步。

follower:Replication中的一個角色,從 leader 中復(fù)制數(shù)據(jù)。

controller:kafka 集群中的其中一個服務(wù)器,用來進行 leader election 以及 各種 failover。

3 生產(chǎn)機制及策略

3.1 生產(chǎn)策略

生產(chǎn)者直接向某topic的某partition發(fā)送數(shù)據(jù)。leader負(fù)責(zé)主備策略,寫入數(shù)據(jù),發(fā)送ack。

3.2 消息路由

1. 指定了 patition,則直接使用;

2. 未指定 patition 但指定 key,通過對 key 的 value 進行hash 選出一個 patition

3. patition 和 key 都未指定,使用輪詢選出一個 patition。

3.3 寫入流程

1. producer 先從 zookeeper 的 "/brokers/.../state" 節(jié)點找到該 partition 的 leader

2. producer 將消息發(fā)送給該 leader

3. leader 將消息寫入本地 log

4. followers 從 leader pull 消息,寫入本地 log 后 leader 發(fā)送 ACK

5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向 producer 發(fā)送 ACK

producer 寫入消息序列圖如下所示:

image.png

3.4 存儲

物理上把 topic 分成一個或多個 patition(對應(yīng) server.properties 中的 num.partitions=3 配置),每個 patition 物理上對應(yīng)一個文件夾(該文件夾存儲該 patition 的所有消息和索引文件),如下:

image.png

無論消息是否被消費,kafka 都會保留所有消息。有兩種策略可以刪除舊數(shù)據(jù):

1. 基于時間:log.retention.hours=168

2. 基于大?。簂og.retention.bytes=1073741824

需要注意的是,因為Kafka讀取特定消息的時間復(fù)雜度為O(1),即與文件大小無關(guān),所以這里刪除過期文件與提高 Kafka 性能無關(guān)。

3.5 存儲結(jié)構(gòu)

每個日志文件都是一個 log entrie 序列,每個 log entrie 包含一個 4 字節(jié)整型數(shù)值(值為 N+5),1 個字節(jié)的 "magic value",4 個字節(jié)的 CRC 校驗碼,其后跟 N 個字節(jié)的消息體。每條消息都有一個當(dāng)前 Partition 下唯一的 64 字節(jié)的 offset,它指明了這條消息的起始位置。
這個 log entries 并非由一個文件構(gòu)成,而是分成多個 segment,每個 segment 以該 segment 第一條消息的 offset 命名并以“.kafka”為后綴。另外會有一個索引文件,它標(biāo)明了每個 segment 下包含的 log entry 的 offset 范圍,如下圖所示。


image.png

4 消費機制及策略

4.1 消費策略

一個topic 可以配置幾個partition,produce發(fā)送的消息分發(fā)到不同的partition中,consumer接受數(shù)據(jù)的時候是按照group來接受,kafka確保每個partition只能同一個group中的同一個consumer消費,如果想要重復(fù)消費,那么需要其他的組來消費。Zookeerper中保存這每個topic下的每個partition在每個group中消費的offset

新版kafka把這個offsert保存到了一個__consumer_offsert的topic下

這個__consumer_offsert 有50個分區(qū),通過將group的id哈希值%50的值來確定要保存到那一個分區(qū). 這樣也是為了考慮到zookeeper不擅長大量讀寫的原因。

所以,如果要一個group用幾個consumer來同時讀取的話,需要多線程來讀取,一個線程相當(dāng)于一個consumer實例。當(dāng)consumer的數(shù)量大于分區(qū)的數(shù)量的時候,有的consumer線程會讀取不到數(shù)據(jù)。

假設(shè)一個topic test 被groupA消費了,現(xiàn)在啟動另外一個新的groupB來消費test,默認(rèn)test-groupB的offset不是0,而是沒有新建立,除非當(dāng)test有數(shù)據(jù)的時候,groupB會收到該數(shù)據(jù),該條數(shù)據(jù)也是第一條數(shù)據(jù),groupB的offset也是剛初始化的ofsert, 除非用顯式的用–from-beginnging 來獲取從0開始數(shù)據(jù)

image.png

消費者:消費者使用fetch的方式拉取數(shù)據(jù)。kafkaServer不直接負(fù)責(zé)每個consumer的當(dāng)前消費到了哪里,所以需要client端和zk聯(lián)合維護每個partition讀到了哪里,即Offset。

所以這樣看上去,kafkaServer在一定程度上更像是一個大部分為順序讀取的,基于文件的日志系統(tǒng)。

消費語義:對比其他MQ的多播,等語義,Kafka看上去略顯單薄,其主要通過User Group的概念實現(xiàn)消費語義。而UserGroup實際對應(yīng)的就是Offset的更改策略。

User1,User2同屬一個userGroup時,即表示二者共用一套Offset。因每個partition 的offset只能由一個線程維護,因此注定了每個UserGroup里只能有一個消費線程對一個partition進行消費。

同樣,如果希望實現(xiàn)多播,那就User1和User2用兩個userGroup。

4.2 高吞吐率

Kafka 會為每一個 Consumer Group 保留一些 metadata 信息——當(dāng)前消費的消息的 position,也即 offset。這個 offset 由 Consumer 控制。正常情況下 Consumer 會在消費完一條消息后遞增該 offset。當(dāng)然,Consumer 也可將 offset 設(shè)成一個較小的值,重新消費一些消息。因為 offet 由 Consumer 控制,所以 Kafka broker 是無狀態(tài)的,它不需要標(biāo)記哪些消息被哪些消費過,也不需要通過 broker 去保證同一個 Consumer Group 只有一個 Consumer 能消費某一條消息,因此也就不需要鎖機制,這也為 Kafka 的高吞吐率提供了有力保障。

5 ** 控制**

Broker:Kafka中使用Broker來接受Producer和Consumer的請求,并把Message持久化到本地磁盤。每個Cluster當(dāng)中會選舉出一個Broker來擔(dān)任Controller,負(fù)責(zé)處理Partition的Leader選舉,協(xié)調(diào)Partition遷移等工作。

ISR(In-Sync Replica):是Replicas的一個子集,表示目前Alive且與Leader能夠“Catch-up”的Replicas集合。由于讀寫都是首先落到Leader上,所以一般來說通過同步機制從Leader上拉取數(shù)據(jù)的Replica都會和Leader有一些延遲(包括了延遲時間和延遲條數(shù)兩個維度),任意一個超過閾值都會把該Replica踢出ISR。每個Partition都有它自己獨立的ISR。

6 傳遞擔(dān)保

At most once 消息可能會丟,但絕不會重復(fù)傳輸

At least one 消息絕不會丟,但可能會重復(fù)傳輸

Exactly once 每條消息肯定會被傳輸一次且僅傳輸一次,很多時候這是用戶所想要的。

當(dāng) Producer 向 broker 發(fā)送消息時,一旦這條消息被 commit,因數(shù) replication 的存在,它就不會丟。但是如果 Producer 發(fā)送數(shù)據(jù)給 broker 后,遇到網(wǎng)絡(luò)問題而造成通信中斷,那 Producer 就無法判斷該條消息是否已經(jīng) commit。雖然 Kafka 無法確定網(wǎng)絡(luò)故障期間發(fā)生了什么,但是 Producer 可以生成一種類似于主鍵的東西,發(fā)生故障時冪等性的重試多次,這樣就做到了 Exactly once。截止到目前 (Kafka 0.8.2 版本,2015-03-04),這一 Feature 還并未實現(xiàn),有希望在 Kafka 未來的版本中實現(xiàn)。(所以目前默認(rèn)情況下一條消息從 Producer 到 broker 是確保了 At least once,可通過設(shè)置 Producer 異步發(fā)送實現(xiàn) At most once)。

接下來討論的是消息從 broker 到 Consumer 的 delivery guarantee 語義。(僅針對 Kafka consumer high level API)。Consumer 在從 broker 讀取消息后,可以選擇 commit,該操作會在 Zookeeper 中保存該 Consumer 在該 Partition 中讀取的消息的 offset。該 Consumer 下一次再讀該 Partition 時會從下一條開始讀取。如未 commit,下一次讀取的開始位置會跟上一次 commit 之后的開始位置相同。當(dāng)然可以將 Consumer 設(shè)置為 autocommit,即 Consumer 一旦讀到數(shù)據(jù)立即自動 commit。如果只討論這一讀取消息的過程,那 Kafka 是確保了 Exactly once。但實際使用中應(yīng)用程序并非在 Consumer 讀取完數(shù)據(jù)就結(jié)束了,而是要進行進一步處理,而數(shù)據(jù)處理與 commit 的順序在很大程度上決定了消息從 broker 和 consumer 的 delivery guarantee semantic。

讀完消息先 commit 再處理消息。這種模式下,如果 Consumer 在 commit 后還沒來得及處理消息就 crash 了,下次重新開始工作后就無法讀到剛剛已提交而未處理的消息,這就對應(yīng)于 At most once

讀完消息先處理再 commit。這種模式下,如果在處理完消息之后 commit 之前 Consumer crash 了,下次重新開始工作時還會處理剛剛未 commit 的消息,實際上該消息已經(jīng)被處理過了。這就對應(yīng)于 At least once。在很多使用場景下,消息都有一個主鍵,所以消息的處理往往具有冪等性,即多次處理這一條消息跟只處理一次是等效的,那就可以認(rèn)為是 Exactly once。(筆者認(rèn)為這種說法比較牽強,畢竟它不是 Kafka 本身提供的機制,主鍵本身也并不能完全保證操作的冪等性。而且實際上我們說 delivery guarantee 語義是討論被處理多少次,而非處理結(jié)果怎樣,因為處理方式多種多樣,我們不應(yīng)該把處理過程的特性——如是否冪等性,當(dāng)成 Kafka 本身的 Feature)

如果一定要做到 Exactly once,就需要協(xié)調(diào) offset 和實際操作的輸出。精典的做法是引入兩階段提交。如果能讓 offset 和操作輸入存在同一個地方,會更簡潔和通用。這種方式可能更好,因為許多輸出系統(tǒng)可能不支持兩階段提交。比如,Consumer 拿到數(shù)據(jù)后可能把數(shù)據(jù)放到 HDFS,如果把最新的 offset 和數(shù)據(jù)本身一起寫到 HDFS,那就可以保證數(shù)據(jù)的輸出和 offset 的更新要么都完成,要么都不完成,間接實現(xiàn) Exactly once。(目前就 high level API 而言,offset 是存于 Zookeeper 中的,無法存于 HDFS,而 low level API 的 offset 是由自己去維護的,可以將之存于 HDFS 中)

總之,Kafka 默認(rèn)保證 At least once,并且允許通過設(shè)置 Producer 異步提交來實現(xiàn) At most once。而 Exactly once 要求與外部存儲系統(tǒng)協(xié)作,幸運的是 Kafka 提供的 offset 可以非常直接非常容易得使用這種方式。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容