1. Producer
Producers 負(fù)責(zé)生產(chǎn)消息。
消息成功寫到 topic 后,broker 會返回 producer 消息的 topic, partition & the offset of the record within the partition。
1.1 Send
發(fā)送方式有兩種:
- Synchronous send
- Asynchronous send
1.2 Retry
消息可能因為一些異常原因?qū)懯。惓7譃閮深悾?/p>
- Retriable errors: KafkaProducer 針對這種異常,可以自動的發(fā)起重試。全部邏輯隱藏在 send 方法中,開發(fā)人員不需要人工干預(yù)
- a connection error can be resolved because the connection may get reestablished.
- A “no leader” error can be resolved when a new leader is elected for the partition.
- Nonretriable errors: 這種錯誤沒法通過重試修復(fù),會直接拋異常,需要開發(fā)人員處理
- message size too large error
1.3 Acks
acks 參數(shù)控制 producer 認(rèn)為 message 寫成功之前必須接收到 partition 成功寫入的副本數(shù)(針對 replicas)??梢园?strong>acks 理解為用來控制數(shù)據(jù)備份時的一致性強(qiáng)弱的。
當(dāng)配置 acks 為:
- 0: producer 不會等待 broker 的寫成功回復(fù),producer 發(fā)完 request 直接 return,把操作權(quán)交給 application developer,這種配置可能造成消息丟失
- 1: leader replica 成功寫消息后,broker 響應(yīng)寫成功通知
如果 partition leader 所在的 broker crash 了,而新的 leader 還沒有選舉出來,則 producer 會收到 error 的 response 并發(fā)起 retry。
如果 partition leader 寫成功(響應(yīng) producer 成功)后 crash 了,而一個還沒有同步到 message 的 replica 被選為新的 leader,那這條消息就丟失了。 - all: request 將存在 buffer 中,直到 leader 觀察到所有的 follower replicas 都備份完 messages,才響應(yīng) producer。優(yōu)點(diǎn):數(shù)據(jù)一致性強(qiáng);缺點(diǎn):性能差。
2. Brokers and Clusters
一個 Kafka 的服務(wù)器叫做一個 broker。broker 接受 producer 傳遞過來的 messages,store messages 到指定的 partition 中,并分配 offsets。它還接受 consumer 發(fā)過來的 poll messages request & heartbeats request
broker 的 metadata 在 zookeeper 中維護(hù)。每一個 broker 都配置有自身唯一的 id,當(dāng) broker start 時,broker 將自身的 id 注冊到 zk 中(通過寫一個 ephemeral node),如果已經(jīng)存在一個相同 ID 的 ephemeral node,則會報錯。
當(dāng) broker 和 zk 斷掉連接后(broker stop / network partition / long garbage-collection pause),broker 啟動時創(chuàng)建的 ephemeral node 將自動被 zk 刪除。
當(dāng)完全丟失 broker & 刪除對應(yīng)的 ephemeral node 后,重啟一個具有相同 id 的新 broker,該 broker 將替代丟失的舊 broker,接受原 broker 相同的 partitions & topics。
2.1 The Controller
一組 brokers 可以搭建成一個 cluster。在 cluster 中有一個 broker 擔(dān)任 cluster controller,一般是第一個加入 cluster 的 broker 擔(dān)任 controller,它會在 zk 中創(chuàng)建一個名叫 /controller 的 ephemeral node。
當(dāng) controller broker stop or loses connectivity to zk 時,它創(chuàng)建的 /controller node 會被 zk 刪除。cluster 中別的 brokers 將被通知 controller 丟失,剩下的 brokers 繼續(xù)搶占 /controller node,第一個寫成功的成為新的 controller。
controller broker 除了承擔(dān)普通的 broker 功能外,還負(fù)責(zé) partition leaders 的選舉。如果 controller broker 發(fā)現(xiàn)有別的 broker 離開 cluster(通過監(jiān)聽 zk 的相關(guān)路徑 node)時,那么所有存在于丟失 broker 上的 leader partitions 需要新的 leader,controller 負(fù)責(zé)選擇一個 partition 作為 leader,并通知給各個 brokers partitions。新的 partition leaders 明確自己的職責(zé),followers 則明確自己需要同步的 new leader。
The controller uses the epoch number to prevent a “split brain” scenario where two nodes believe each is the current controller.
2.2 Multiple Clusters
The replication mechanisms within the Kafka clusters are designed only to work within a single cluster, not between multiple clusters.
The Kafka project includes a tool called MirrorMaker, used for this purpose.
2.3 Zookeeper

Kafka 用 Zookeeper 來維護(hù) broker cluster,存儲 brokers, topics, partitions 的 metadata。
Consumer 的 metadata 在 kafka v0.9 之前的版本中,是通過 Zookeeper 維護(hù)。但是在 v0.9 之后,可以選擇通過 zookeeper 管理,也可以選擇通過 kafka brokers 管理,因為頻繁的讀寫 offsets 對 zk 的壓力較大,所以推薦通過 Kafka broker 管理。
3. Replication
前段時間一直在系統(tǒng)性的學(xué)習(xí)分布式存儲的知識,某種程度上來說,Kafka, Redis, MySQL Cluster, Zookeeper 等都可以理解為分布式存儲的實現(xiàn)。而分布式存儲中的核心實現(xiàn)方法就是 partition & replication。前面我們介紹了 Kafka 的 topic partition,現(xiàn)在來了解下 replication。
為了實現(xiàn)系統(tǒng)的可靠性(availability) & 耐用性(durability),我們可以對 partition 做 replicated。kafka 中的 data 按 topics 組織,每個 topic 可以做 partition,每個 partition 可以有多個 replicas。
replicas 有兩種角色:
- leader replica: 每個 partition 有一個 leader
- 對該 partition 的所有讀寫操作(producer, follower replicas, consumer)都是在 leader 節(jié)點(diǎn)上完成的,這也是保持 consistency 的一種方式
- leader 還知道每個 followers 的同步進(jìn)度
- follower replica: 除 leader 外的,都是 followers,followers 不直接承擔(dān) client 的讀寫任務(wù)。它們的唯一工作就是通過向 leader 發(fā) fetch request 備份 messages。
consumer.poll / replica fetch messages 時,會把自身已有的最大 offset 帶給 leader 來獲得準(zhǔn)確的 messages。當(dāng) leader crashes 時,其中一個擁有最多消息(最大 offsets)的 followers 將變成 leader。
4. Consumer
consumers 消費(fèi)消息。同時 consumer 會跟蹤上報自己已消費(fèi)消息的 offset(kafka 的每個消息在 topic 的 partition 中都有一個唯一的 offset)。
consumers 是以 consumer group 的形式工作的。group 保證每個 partition 只能被一個 consumer 消費(fèi),換言之,group 中的 consumer 消費(fèi)互不相同的 partition。
一個 consumer group/ consumers 可以(通過正則表達(dá)式)訂閱多個 topics,當(dāng)新增滿足正則表達(dá)式的 topic 時,能自動讀取到該 topic 的 msg 。
4.1 Consumer Groups
consumer groups 的可能組織結(jié)構(gòu)有:




由于 consumer 經(jīng)常會做一些高延遲的操作,例如寫數(shù)據(jù)庫、分析數(shù)據(jù)等,consumer 的消費(fèi)能力可能會小于 producer 的生成能力。
分析以下場景:
- 如果 partition count > consumer count,可以往 consumer group 中加更多的 consumers 來分擔(dān)負(fù)載,提升消費(fèi)能力
- 如果 partition count < consumer count,多出來的 consumers 會閑置,需要增加 partition 的數(shù)量。但是這帶來的問題是:相同 key 的 messages 在增減 partition count 前后可能分配到不同的 partition 中
所以,為 topic 創(chuàng)建 partitions 時要預(yù)留足夠的個數(shù)。這樣當(dāng)將來負(fù)載變大時,可以方便的通過添加 consumers 來分流。
上面我們提到的都是一個 consumer group 對 topic 的消費(fèi)。很多情況下,同一個 topic 的消息會有多個不同的應(yīng)用(user cases)感興趣(每一個 use case 都能拿到該 topic 的所有 messages),這時就需要為不同的 user case 創(chuàng)建不同的 consumer group,即有多個 consumer groups 消費(fèi)同一個 topic。

4.2 Partition Rebalance
發(fā)生一下情況時,需要對 consumer group 進(jìn)行 partition rebalance:
- 當(dāng) consumer group 的消費(fèi)能力不足時,增加 consumer
- 當(dāng) consumer crash /network failure 時,移除 consumer
- 管理員增加新的 partition
通過對 partition rebalance 的支持,Kafka 具備了 high availability & scalability。但是正常情況下,盡量避免 partition rebalance。因為:
- consumer 不能消費(fèi)消息,consumer group 會有一個短暫的不可用期
- 當(dāng) partition 從原先的 consumer 移到新的 consumer 時,原先 consumer 丟失了它當(dāng)前處理的狀態(tài)
設(shè)計階段,就要把由于 rebalance 引發(fā)的潛在的消息重復(fù)處理的情況考慮進(jìn)去。
4.3 Group coordinator & Group leader
關(guān)于 consumers 的維護(hù),有兩個重要的概念:group coordinator & group leader:
- group coordinator: 是特殊的 broker。consumers poll 消息 & commit 消費(fèi)消息記錄時,會發(fā)送 heartbeats 到 group coordinator 來同時告知自己的健康狀況。
- group leader: 第一個加入 consumer group 的 consumer 就是該 group 的 group leader。group coordinator 會把 consumers 列表發(fā)給 group leader 來維護(hù)。group leader 負(fù)責(zé) assign & reassign partitions。
如果 consumer crashed/network failure,長時間沒有發(fā)送 heartbeats 到 group coordinator 時,group coordinator 會認(rèn)為該 consumer 失聯(lián),并通知 group leader rebalance partition,group leader 將 rebalance 的結(jié)果通知 group coordinator,由 coordinator 來通知 consumers 新的 partitions 關(guān)系。
4.4 Poll loop
consumer 中的核心功能幾乎都在 consumer.poll() 方法中。poll(timeout) 通過 timeout 參數(shù)控制 poll 的阻塞等待數(shù)據(jù)時間。如果 timeout = 0,則立即返回,無論是否有新消息。timeout 的值是需要根據(jù)自身業(yè)務(wù)設(shè)置的。但是它不僅僅是從 broker 中讀取消息:
- 初始調(diào)用 poll() 時,會去找 groupCoordinator & 加入 consumer group & 接收 partition assignment
- poll 內(nèi)部負(fù)責(zé)處理 partition rebalance
- 發(fā)送 heartbeat: 當(dāng) consumer 停止 poll() 時,會停止發(fā) heartbeat,被 group coordinator 認(rèn)為 fail,把分配給它的 partitions 分配給 consumer group 中別的 consumer。
所以:
- consumer 需要持續(xù) poll 數(shù)據(jù)
- consumer 處理數(shù)據(jù)的過程要越快越好,避免由于長時間不發(fā) heartbeat,引起宕機(jī)誤判
4.5 Commits and Offsets
Kafka 不像大多數(shù) JMS queue 那樣,broker 不主動跟蹤 consumer 的 ack,而是通過 consumer 發(fā)起 commit 來更新最新的 offset 到 _consumer_offsets topic 中
consumer.poll()時, broker 會返回還沒有消費(fèi)的消息記錄,消息中帶有自身的 offset。
consumer 通過 commit 動作發(fā)送一個帶有 partition offset 的 message 到 kafka broker 的特殊 topic(__consumer_offsets topic) 來更新 offset。
consumer 什么時候 commit 該消息呢?
Automatic Commit
配置 enable.auto.commit=true,consumer 會每隔一個 interval (默認(rèn)每隔5s)自動提交 consumer 通過 poll() 收到的最大的 offset。
automatic commits 也是通過 poll loop 來實現(xiàn)的。每次 poll, consumer 都會自動檢查是否到時間執(zhí)行一次 commit 來提交最近一次 poll() 獲得的最大的 offsets。
當(dāng) consumer crashes / new consumer 加入 consumer group,會觸發(fā) rebalance。在 rebalance 后,每個 consumer 被分配一組新的 partitions,并獲取到最新的 committed offset of each partition 來繼續(xù)工作。
但是考慮以下情況:假設(shè)配置每隔 5s commit the latest offset,上次提交 2s 后,發(fā)生了 rebalance,所有 consumers 獲取到之前最近的 offsets,但這個 offset 其實是 2s 前的,這 2s 間到達(dá) consumers 的消息將會被處理兩次。
可以配置較小的 interval 來減少重復(fù)消費(fèi)的消息,但是本質(zhì)上無法完全避免。
可以看出,操作 committed offset 的位置,是可能發(fā)生以下情況:
- 重復(fù)讀取 & 消費(fèi)消息
- 遺漏處理消息
Commit Current Offset
如果想對 offset 的控制更準(zhǔn)確,配置 auto.commit.offset=false,手動 commit offset。
需要開發(fā)人員手動調(diào)用 commitSync(),將把 poll() 返回的最新的 offset,建議:
- 在 client 處理完 poll 回的所有數(shù)據(jù)后,再執(zhí)行該方法,否則會冒著丟失消息的風(fēng)險。
- 面對 rebalance 時,仍然存在重復(fù)消費(fèi)消息的情況
Asynchronous Commit
commitSync() 會阻塞應(yīng)用,直到收到 broker 的明確響應(yīng)。這將很大的影響系統(tǒng)吞吐??梢钥紤]使用異步 consumer.commitAsync()
commitSync() 內(nèi)部有 retry,如果遇到 retriable failure,會持續(xù)重試,影響性能,如果遇到 nonretriable failure,則會直接 commit fail。// todo retry count
commitAsync() 沒有 retry,之所以不支持 retry,是因為它本身是 async 的、非阻塞的。如果失敗了又重試,可能會把這段時間發(fā)生的更新的 commit 的數(shù)據(jù)修改回去。當(dāng)然了,如果真想重試,是可以找到解決方案的,如記錄一個全局單調(diào)遞增的 sequence number,重試前檢查如果 offset 小于該 number,則取消 retry。
Combining Synchronous and Asynchronous Commits
對于手動控制 offset 的情況,commitAsync() & commitSync() 可以結(jié)合使用。正常情況下使用 async,提高性能,并且偶爾由于網(wǎng)絡(luò)原因發(fā)生的失敗也不需要 retry,一般都會在接下來的 commit 中成功,等待服務(wù)停止消費(fèi)時,調(diào)用 sync,確保最終正確提交 offset。
Commit Specified Offset
commitSync() & commitAsync() 存在一個問題,只能在對 batch messages 全部處理完后,將最大的 offset 提交,無法做到更細(xì)粒度的控制。當(dāng)處理 batch messages 的耗時很長,或者 batch 的消息個數(shù)很多時,如果在消費(fèi)過程中發(fā)生了 rebalance,這次 poll 獲取的所有 messages 都需要重新處理一次。
commitSync() & commitAsync() 都提供了帶參數(shù)的方法,允許我們根據(jù)業(yè)務(wù)在消費(fèi) batch messages 的過程中按需要提交 offset。
4.6 Rebalance Listeners
在發(fā)生 partition rebalance 時(可能處在 processing batch messages 間隙),consumer 需要做一些 cleanup work,包括對正在處理的消息的收尾工作,對文件、數(shù)據(jù)庫連接等的管理。我們可以通過在調(diào)用 consumer.subscribe() 方法中傳入自定義的 ConsumerRebalanceListener 來實現(xiàn)。
ConsumerRebalanceListener 有兩個方法需要實現(xiàn):
- onPartitionsRevoked: 執(zhí)行 rebalance 前的收尾工作,consumer 停止處理舊 messages 后 & rebalance 發(fā)生前系統(tǒng)調(diào)用該方法??梢酝ㄟ^該方法來 commit client 已處理的 offsets,這樣能 commit 準(zhǔn)確的 offsets
- onPartitionsAssigned: 執(zhí)行 rebalance 后的初始化工作,partitions reassigned 后 & consumer 開始處理新 messages 前調(diào)用該方法
Consuming Records with Specific Offsets
seek(TopicPartition partition, long offset)
seekToBeginning(TopicPartition tp)
seekToEnd(TopicPartition tp)
Standalone Consumer: Use a Consumer Without a Group
有些情況下,consumer 會需要指定消費(fèi)某些具體的 partitions, 而不是 join consumer group,由 consumer group 分配 partition & rebalance??梢哉{(diào)用 consumer.assign() 來實現(xiàn)該需求。