Apache Kafka 是什么?
Kafka 是基于發(fā)布與訂閱的消息系統(tǒng)。它最初由 LinkedIn 公司開(kāi)發(fā),之后成為 Apache 項(xiàng)目的一部分。Kafka 是一個(gè)分布式的,可分區(qū)的,冗余備份的持久性的日志服務(wù)。它主要用于處理活躍的流式數(shù)據(jù)。
在大數(shù)據(jù)系統(tǒng)中,常常會(huì)碰到一個(gè)問(wèn)題,整個(gè)大數(shù)據(jù)是由各個(gè)子系統(tǒng)組成,數(shù)據(jù)需要在各個(gè)子系統(tǒng)中高性能、低延遲的不停流轉(zhuǎn)。傳統(tǒng)的企業(yè)消息系統(tǒng)并不是非常適合大規(guī)模的數(shù)據(jù)處理。為了同時(shí)搞定在線應(yīng)用(消息)和離線應(yīng)用(數(shù)據(jù)文件、日志),Kafka 就出現(xiàn)了。Kafka 可以起到兩個(gè)作用:
- 降低系統(tǒng)組網(wǎng)復(fù)雜度。
- 降低編程復(fù)雜度,各個(gè)子系統(tǒng)不在是相互協(xié)商接口,各個(gè)子系統(tǒng)類似插口插在插座上,Kafka 承擔(dān)高速數(shù)據(jù)總線的作用。
Kafka 的主要特點(diǎn)?
1、同時(shí)為發(fā)布和訂閱提供高吞吐量。據(jù)了解,Kafka 每秒可以生產(chǎn)約 25 萬(wàn)消息(50MB),每秒處理 55 萬(wàn)消息(110MB)。
2、可進(jìn)行持久化操作。將消息持久化到磁盤,因此可用于批量消費(fèi),例如 ETL ,以及實(shí)時(shí)應(yīng)用程序。通過(guò)將數(shù)據(jù)持久化到硬盤,以及replication ,可以防止數(shù)據(jù)丟失。
3、分布式系統(tǒng),易于向外擴(kuò)展。所有的 Producer、Broker 和Consumer 都會(huì)有多個(gè),均為分布式的。并且,無(wú)需停機(jī)即可擴(kuò)展機(jī)器。
-
4、消息被處理的狀態(tài)是在 Consumer 端維護(hù),而不是由 Broker 端維護(hù)。當(dāng)失敗時(shí),能自動(dòng)平衡。
這段是從網(wǎng)絡(luò)上找來(lái)的。感覺(jué)想要表達(dá)的意思是
- 消息是否被處理完成,是通過(guò) Consumer 提交消費(fèi)進(jìn)度給 Broker ,而不是 Broker 消息被 Consumer 拉取后,就標(biāo)記為已消費(fèi)。
- 當(dāng) Consumer 異常崩潰時(shí),可以重新分配消息分區(qū)到其它的 Consumer 們,然后繼續(xù)消費(fèi)。
5、支持 online 和 offline 的場(chǎng)景。
聊聊 Kafka 的設(shè)計(jì)要點(diǎn)?
1)吞吐量
高吞吐是 Kafka 需要實(shí)現(xiàn)的核心目標(biāo)之一,為此 kafka 做了以下一些設(shè)計(jì):
-
1、數(shù)據(jù)磁盤持久化:消息不在內(nèi)存中 Cache ,直接寫入到磁盤,充分利用磁盤的順序讀寫性能。
直接使用 Linux 文件系統(tǒng)的 Cache ,來(lái)高效緩存數(shù)據(jù)。
-
2、zero-copy:減少 IO 操作步驟
采用 Linux Zero-Copy 提高發(fā)送性能。
- 傳統(tǒng)的數(shù)據(jù)發(fā)送需要發(fā)送 4 次上下文切換。
- 采用 sendfile 系統(tǒng)調(diào)用之后,數(shù)據(jù)直接在內(nèi)核態(tài)交換,系統(tǒng)上下文切換減少為 2 次。《為什么Kafka這么快》http://www.itdecent.cn/p/99cc19dde7df
3、數(shù)據(jù)批量發(fā)送
4、數(shù)據(jù)壓縮
-
5、Topic 劃分為多個(gè) Partition ,提高并行度。
數(shù)據(jù)在磁盤上存取代價(jià)為
O(1)。- Kafka 以 Topic 來(lái)進(jìn)行消息管理,每個(gè) Topic 包含多個(gè) Partition ,每個(gè) Partition 對(duì)應(yīng)一個(gè)邏輯 log ,有多個(gè) segment 文件組成。
- 每個(gè) segment 中存儲(chǔ)多條消息(見(jiàn)下圖),消息 id 由其邏輯位置決定,即從消息 id 可直接定位到消息的存儲(chǔ)位置,避免 id 到位置的額外映射。
- 每個(gè) Partition 在內(nèi)存中對(duì)應(yīng)一個(gè) index ,記錄每個(gè) segment 中的第一條消息偏移。
發(fā)布者發(fā)到某個(gè) Topic 的消息會(huì)被均勻的分布到多個(gè) Partition 上(隨機(jī)或根據(jù)用戶指定的回調(diào)函數(shù)進(jìn)行分布),Broker 收到發(fā)布消息往對(duì)應(yīng) Partition 的最后一個(gè) segment 上添加該消息。
當(dāng)某個(gè) segment上 的消息條數(shù)達(dá)到配置值或消息發(fā)布時(shí)間超過(guò)閾值時(shí),segment上 的消息會(huì)被 flush 到磁盤,只有 flush 到磁盤上的消息訂閱者才能訂閱到,segment 達(dá)到一定的大小后將不會(huì)再往該 segment 寫數(shù)據(jù),Broker 會(huì)創(chuàng)建新的 segment 文件。
2)負(fù)載均衡
- 1、Producer 根據(jù)用戶指定的算法,將消息發(fā)送到指定的 Partition 中。
- 2、Topic 存在多個(gè) Partition ,每個(gè) Partition 有自己的replica ,每個(gè) replica 分布在不同的 Broker 節(jié)點(diǎn)上。多個(gè)Partition 需要選取出 Leader partition ,Leader Partition 負(fù)責(zé)讀寫,并由 Zookeeper 負(fù)責(zé) fail over 。
- 3、相同 Topic 的多個(gè) Partition 會(huì)分配給不同的 Consumer 進(jìn)行拉取消息,進(jìn)行消費(fèi)。
3)拉取系統(tǒng)
由于 Kafka Broker 會(huì)持久化數(shù)據(jù),Broker 沒(méi)有內(nèi)存壓力,因此, Consumer 非常適合采取 pull 的方式消費(fèi)數(shù)據(jù),具有以下幾點(diǎn)好處:
- 1、簡(jiǎn)化 Kafka 設(shè)計(jì)。
- 2、Consumer 根據(jù)消費(fèi)能力自主控制消息拉取速度。
- 3、Consumer 根據(jù)自身情況自主選擇消費(fèi)模式,例如批量,重復(fù)消費(fèi),從尾端開(kāi)始消費(fèi)等。
4)可擴(kuò)展性
通過(guò) Zookeeper 管理 Broker 與 Consumer 的動(dòng)態(tài)加入與離開(kāi)。
- 當(dāng)需要增加 Broker 節(jié)點(diǎn)時(shí),新增的 Broker 會(huì)向 Zookeeper 注冊(cè),而 Producer 及 Consumer 會(huì)根據(jù)注冊(cè)在 Zookeeper 上的 watcher 感知這些變化,并及時(shí)作出調(diào)整。
- 當(dāng)新增和刪除 Consumer 節(jié)點(diǎn)時(shí),相同 Topic 的多個(gè) Partition 會(huì)分配給剩余的 Consumer 們。
Kafka 的架構(gòu)是怎么樣的?
Kafka 的整體架構(gòu)非常簡(jiǎn)單,是分布式架構(gòu),Producer、Broker 和Consumer 都可以有多個(gè)。
- Producer,Consumer 實(shí)現(xiàn) Kafka 注冊(cè)的接口。
- 數(shù)據(jù)從 Producer 發(fā)送到 Broker 中,Broker 承擔(dān)一個(gè)中間緩存和分發(fā)的作用。
- Broker 分發(fā)注冊(cè)到系統(tǒng)中的 Consumer。Broker 的作用類似于緩存,即活躍的數(shù)據(jù)和離線處理系統(tǒng)之間的緩存。
- 客戶端和服務(wù)器端的通信,是基于簡(jiǎn)單,高性能,且與編程語(yǔ)言無(wú)關(guān)的 TCP 協(xié)議。
幾個(gè)重要的基本概念:
Topic:特指 Kafka 處理的消息源(feeds of messages)的不同分類。
-
Partition:Topic 物理上的分組(分區(qū)),一個(gè) Topic 可以分為多個(gè) Partition 。每個(gè) Partition 都是一個(gè)有序的隊(duì)列。Partition 中的每條消息都會(huì)被分配一個(gè)有序的 id(offset)。
- replicas:Partition 的副本集,保障 Partition 的高可用。
- leader:replicas 中的一個(gè)角色,Producer 和 Consumer 只跟 Leader 交互。
- follower:replicas 中的一個(gè)角色,從 leader 中復(fù)制數(shù)據(jù),作為副本,一旦 leader 掛掉,會(huì)從它的 followers 中選舉出一個(gè)新的 leader 繼續(xù)提供服務(wù)。
Message:消息,是通信的基本單位,每個(gè) Producer 可以向一個(gè)Topic(主題)發(fā)布一些消息。
Producers:消息和數(shù)據(jù)生產(chǎn)者,向 Kafka 的一個(gè) Topic 發(fā)布消息的過(guò)程,叫做 producers 。
-
Consumers:消息和數(shù)據(jù)消費(fèi)者,訂閱 Topic ,并處理其發(fā)布的消息的過(guò)程,叫做 consumers 。
Consumer group:每個(gè) Consumer 都屬于一個(gè) Consumer group,每條消息只能被 Consumer group 中的一個(gè) Consumer 消費(fèi),但可以被多個(gè) Consumer group 消費(fèi)。
-
Broker:緩存代理,Kafka 集群中的一臺(tái)或多臺(tái)服務(wù)器統(tǒng)稱為 broker 。
Controller:Kafka 集群中,通過(guò) Zookeeper 選舉某個(gè) Broker 作為 Controller ,用來(lái)進(jìn)行 leader election 以及 各種 failover 。
ZooKeeper:Kafka 通過(guò) ZooKeeper 來(lái)存儲(chǔ)集群的 Topic、Partition 等元信息等。
單純角色來(lái)說(shuō),Kafka 和 RocketMQ 是基本一致的。比較明顯的差異是:
RocketMQ 從 Kafka 演化而來(lái)。
1、Kafka 使用 Zookeeper 作為命名服務(wù);RocketMQ 自己實(shí)現(xiàn)了一個(gè)輕量級(jí)的 Namesrv 。
-
2、Kafka Broker 的每個(gè)分區(qū)都有一個(gè)首領(lǐng)分區(qū);RocketMQ 每個(gè)分區(qū)的“首領(lǐng)”分區(qū),都在 Broker Master 節(jié)點(diǎn)上。
RocketMQ 沒(méi)有首領(lǐng)分區(qū)一說(shuō),所以打上了引號(hào)。
-
3、Kafka Consumer 使用 poll 的方式拉取消息;RocketMQ Consumer 提供 poll 的方式的同時(shí),封裝了一個(gè) push 的方式。
RocketMQ 的 push 的方式,也是基于 poll 的方式的封裝。
… 當(dāng)然還有其它 …
Kafka 為什么要將 Topic 進(jìn)行分區(qū)?
為了負(fù)載均衡,從而能夠水平拓展。
- Topic 只是邏輯概念,面向的是 Producer 和 Consumer ,而 Partition 則是物理概念。如果 Topic 不進(jìn)行分區(qū),而將 Topic 內(nèi)的消息存儲(chǔ)于一個(gè) Broker,那么關(guān)于該 Topic 的所有讀寫請(qǐng)求都將由這一個(gè) Broker 處理,吞吐量很容易陷入瓶頸,這顯然是不符合高吞吐量應(yīng)用場(chǎng)景的。
- 有了 Partition 概念以后,假設(shè)一個(gè) Topic 被分為 10 個(gè) Partitions ,Kafka 會(huì)根據(jù)一定的算法將 10 個(gè) Partition 盡可能均勻的分布到不同的 Broker(服務(wù)器)上。
- 當(dāng) Producer 發(fā)布消息時(shí),Producer 客戶端可以采用 random、key-hash 及輪詢等算法選定目標(biāo) Partition ,若不指定,Kafka 也將根據(jù)一定算法將其置于某一分區(qū)上。
- 當(dāng) Consumer 拉取消息時(shí),Consumer 客戶端可以采用 Range、輪詢 等算法分配 Partition ,從而從不同的 Broker 拉取對(duì)應(yīng)的 Partition 的 leader 分區(qū)。
所以,Partiton 機(jī)制可以極大的提高吞吐量,并且使得系統(tǒng)具備良好的水平擴(kuò)展能力。
Kafka 的應(yīng)用場(chǎng)景有哪些?
1)消息隊(duì)列
比起大多數(shù)的消息系統(tǒng)來(lái)說(shuō),Kafka 有更好的吞吐量,內(nèi)置的分區(qū),冗余及容錯(cuò)性,這讓 Kafka 成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案。消息系統(tǒng)一般吞吐量相對(duì)較低,但是需要更小的端到端延時(shí),并常常依賴于 Kafka 提供的強(qiáng)大的持久性保障。在這個(gè)領(lǐng)域,Kafka 足以媲美傳統(tǒng)消息系統(tǒng),如 ActiveMQ 或 RabbitMQ 。
2)行為跟蹤
Kafka 的另一個(gè)應(yīng)用場(chǎng)景,是跟蹤用戶瀏覽頁(yè)面、搜索及其他行為,以發(fā)布訂閱的模式實(shí)時(shí)記錄到對(duì)應(yīng)的 Topic 里。那么這些結(jié)果被訂閱者拿到后,就可以做進(jìn)一步的實(shí)時(shí)處理,或?qū)崟r(shí)監(jiān)控,或放到 Hadoop / 離線數(shù)據(jù)倉(cāng)庫(kù)里處理。
3)元信息監(jiān)控
作為操作記錄的監(jiān)控模塊來(lái)使用,即匯集記錄一些操作信息,可以理解為運(yùn)維性質(zhì)的數(shù)據(jù)監(jiān)控吧。
4)日志收集
日志收集方面,其實(shí)開(kāi)源產(chǎn)品有很多,包括 Scribe、Apache Flume 。很多人使用 Kafka 代替日志聚合(log aggregation)。日志聚合一般來(lái)說(shuō)是從服務(wù)器上收集日志文件,然后放到一個(gè)集中的位置(文件服務(wù)器或 HDFS)進(jìn)行處理。
然而, Kafka 忽略掉文件的細(xì)節(jié),將其更清晰地抽象成一個(gè)個(gè)日志或事件的消息流。這就讓 Kafka 處理過(guò)程延遲更低,更容易支持多數(shù)據(jù)源和分布式數(shù)據(jù)處理。比起以日志為中心的系統(tǒng)比如 Scribe 或者 Flume 來(lái)說(shuō),Kafka 提供同樣高效的性能和因?yàn)閺?fù)制導(dǎo)致的更高的耐用性保證,以及更低的端到端延遲。
5)流處理
這個(gè)場(chǎng)景可能比較多,也很好理解。保存收集流數(shù)據(jù),以提供之后對(duì)接的 Storm 或其他流式計(jì)算框架進(jìn)行處理。很多用戶會(huì)將那些從原始 Topic 來(lái)的數(shù)據(jù)進(jìn)行階段性處理,匯總,擴(kuò)充或者以其他的方式轉(zhuǎn)換到新的 Topic 下再繼續(xù)后面的處理。
例如一個(gè)文章推薦的處理流程,可能是先從 RSS 數(shù)據(jù)源中抓取文章的內(nèi)容,然后將其丟入一個(gè)叫做“文章”的 Topic 中。后續(xù)操作可能是需要對(duì)這個(gè)內(nèi)容進(jìn)行清理,比如回復(fù)正常數(shù)據(jù)或者刪除重復(fù)數(shù)據(jù),最后再將內(nèi)容匹配的結(jié)果返還給用戶。這就在一個(gè)獨(dú)立的 Topic 之外,產(chǎn)生了一系列的實(shí)時(shí)數(shù)據(jù)處理的流程。Strom 和 Samza 是非常著名的實(shí)現(xiàn)這種類型數(shù)據(jù)轉(zhuǎn)換的框架。
6)事件源
事件源,是一種應(yīng)用程序設(shè)計(jì)的方式。該方式的狀態(tài)轉(zhuǎn)移被記錄為按時(shí)間順序排序的記錄序列。Kafka 可以存儲(chǔ)大量的日志數(shù)據(jù),這使得它成為一個(gè)對(duì)這種方式的應(yīng)用來(lái)說(shuō)絕佳的后臺(tái)。比如動(dòng)態(tài)匯總(News feed)。
7)持久性日志(Commit Log)
Kafka 可以為一種外部的持久性日志的分布式系統(tǒng)提供服務(wù)。這種日志可以在節(jié)點(diǎn)間備份數(shù)據(jù),并為故障節(jié)點(diǎn)數(shù)據(jù)回復(fù)提供一種重新同步的機(jī)制。Kafka 中日志壓縮功能為這種用法提供了條件。在這種用法中,Kafka 類似于 Apache BookKeeper 項(xiàng)目。
Kafka 消息發(fā)送和消費(fèi)的簡(jiǎn)化流程是什么?

- 1、Producer ,根據(jù)指定的 partition 方法(round-robin、hash等),將消息發(fā)布到指定 Topic 的 Partition 里面。
- 2、Kafka 集群,接收到 Producer 發(fā)過(guò)來(lái)的消息后,將其持久化到硬盤,并保留消息指定時(shí)長(zhǎng)(可配置),而不關(guān)注消息是否被消費(fèi)。
- 3、Consumer ,從 Kafka 集群 pull 數(shù)據(jù),并控制獲取消息的 offset 。至于消費(fèi)的進(jìn)度,可手動(dòng)或者自動(dòng)提交給 Kafka 集群。
1)Producer 發(fā)送消息
Producer 采用 push 模式將消息發(fā)布到 Broker,每條消息都被 append 到 Patition 中,屬于順序?qū)懘疟P(順序?qū)懘疟P效率比隨機(jī)寫內(nèi)存要高,保障 Kafka 吞吐率)。Producer 發(fā)送消息到 Broker 時(shí),會(huì)根據(jù)分區(qū)算法選擇將其存儲(chǔ)到哪一個(gè) Partition 。
其路由機(jī)制為:
- 1、指定了 Partition ,則直接使用。
- 2、未指定 Partition 但指定 key ,通過(guò)對(duì) key 進(jìn)行 hash 選出一個(gè) Partition 。
- 3、Partition 和 key 都未指定,使用輪詢選出一個(gè) Partition 。
寫入流程:
1、Producer 先從 ZooKeeper 的
"/brokers/.../state"節(jié)點(diǎn)找到該 Partition 的 leader 。注意噢,Producer 只和 Partition 的 leader 進(jìn)行交互。2、Producer 將消息發(fā)送給該 leader 。
3、leader 將消息寫入本地 log 。
4、followers 從 leader pull 消息,寫入本地 log 后 leader 發(fā)送 ACK 。
5、leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark ,最后 commit 的 offset) 并向 Producer 發(fā)送 ACK 。
2)Broker 存儲(chǔ)消息
物理上把 Topic 分成一個(gè)或多個(gè) Patition,每個(gè) Patition 物理上對(duì)應(yīng)一個(gè)文件夾(該文件夾存儲(chǔ)該 Patition 的所有消息和索引文件)。
3)Consumer 消費(fèi)消息
high-level Consumer API 提供了 consumer group 的語(yǔ)義,一個(gè)消息只能被 group 內(nèi)的一個(gè) Consumer 所消費(fèi),且 Consumer 消費(fèi)消息時(shí)不關(guān)注 offset ,最后一個(gè) offset 由 ZooKeeper 保存(下次消費(fèi)時(shí),該 group 中的 Consumer 將從 offset 記錄的位置開(kāi)始消費(fèi))。
注意:
- 1、如果消費(fèi)線程大于 Patition 數(shù)量,則有些線程將收不到消息。
- 2、如果 Patition 數(shù)量大于消費(fèi)線程數(shù),則有些線程多收到多個(gè) Patition 的消息。
- 3、如果一個(gè)線程消費(fèi)多個(gè) Patition,則無(wú)法保證你收到的消息的順序,而一個(gè) Patition 內(nèi)的消息是有序的。
Consumer 采用 pull 模式從 Broker 中讀取數(shù)據(jù)。
- push 模式,很難適應(yīng)消費(fèi)速率不同的消費(fèi)者,因?yàn)橄l(fā)送速率是由 Broker 決定的。它的目標(biāo)是盡可能以最快速度傳遞消息,但是這樣很容易造成 Consumer 來(lái)不及處理消息,典型的表現(xiàn)就是拒絕服務(wù)以及網(wǎng)絡(luò)擁塞。而 pull 模式,則可以根據(jù) Consumer 的消費(fèi)能力以適當(dāng)?shù)乃俾氏M(fèi)消息。
- 對(duì)于 Kafka 而言,pull 模式更合適,它可簡(jiǎn)化 Broker 的設(shè)計(jì),Consumer 可自主控制消費(fèi)消息的速率,同時(shí) Consumer 可以自己控制消費(fèi)方式——即可批量消費(fèi)也可逐條消費(fèi),同時(shí)還能選擇不同的提交方式從而實(shí)現(xiàn)不同的傳輸語(yǔ)義。
Kafka Producer 有哪些發(fā)送模式?
Kafka 的發(fā)送模式由 Producer 端的配置參數(shù) producer.type來(lái)設(shè)置。
- 這個(gè)參數(shù)指定了在后臺(tái)線程中消息的發(fā)送方式是同步的還是異步的,默認(rèn)是同步的方式,即
producer.type=sync。 - 如果設(shè)置成異步的模式,即
producer.type=async,可以是 Producer 以 batch 的形式 push 數(shù)據(jù),這樣會(huì)極大的提高 Broker的性能,但是這樣會(huì)增加丟失數(shù)據(jù)的風(fēng)險(xiǎn)。 - 如果需要確保消息的可靠性,必須要將
producer.type設(shè)置為 sync 。
對(duì)于異步模式,還有 4 個(gè)配套的參數(shù),如下:

- 以 batch 的方式推送數(shù)據(jù)可以極大的提高處理效率,Kafka Producer 可以將消息在內(nèi)存中累計(jì)到一定數(shù)量后作為一個(gè) batch 發(fā)送請(qǐng)求。batch 的數(shù)量大小可以通過(guò) Producer 的參數(shù)(
batch.num.messages)控制。通過(guò)增加 batch 的大小,可以減少網(wǎng)絡(luò)請(qǐng)求和磁盤 IO 的次數(shù),當(dāng)然具體參數(shù)設(shè)置需要在效率和時(shí)效性方面做一個(gè)權(quán)衡。 - 在比較新的版本中還有
batch.size這個(gè)參數(shù)。Producer 會(huì)嘗試批量發(fā)送屬于同一個(gè) Partition 的消息以減少請(qǐng)求的數(shù)量. 這樣可以提升客戶端和服務(wù)端的性能。默認(rèn)大小是 16348 byte (16k).- 發(fā)送到 Broker 的請(qǐng)求可以包含多個(gè) batch ,每個(gè) batch 的數(shù)據(jù)屬于同一個(gè) Partition
- 太小的 batch 會(huì)降低吞吐. 太大會(huì)浪費(fèi)內(nèi)存。
Kafka Consumer 是否可以消費(fèi)指定的分區(qū)消息?
Consumer 消費(fèi)消息時(shí),向 Broker 發(fā)出“fetch”請(qǐng)求去消費(fèi)特定分區(qū)的消息,Consumer 指定消息在日志中的偏移量(offset),就可以消費(fèi)從這個(gè)位置開(kāi)始的消息,Consumer 擁有了 offset 的控制權(quán),可以向后回滾去重新消費(fèi)之前的消息,這是很有意義的。
Kafka 的 high-level API 和 low-level API 的區(qū)別?
High Level API
- 屏蔽了每個(gè) Topic 的每個(gè) Partition 的 offset 管理(自動(dòng)讀取Zookeeper 中該 Consumer group 的 last offset)、Broker 失敗轉(zhuǎn)移、以及增減 Partition 時(shí) Consumer 時(shí)的負(fù)載均衡(Kafka 自動(dòng)進(jìn)行負(fù)載均衡)。
- 如果 Consumer 比 Partition 多,是一種浪費(fèi)。一個(gè) Partition 上是不允許并發(fā)的,所以 Consumer 數(shù)不要大于 Partition 數(shù)。
Low Level API
Low-level API 也就是 Simple Consumer API ,實(shí)際上非常復(fù)雜。
- API 控制更靈活,例如消息重復(fù)讀取,消息 offset 跳讀,Exactly Once 原語(yǔ)。
- API 更復(fù)雜,offset 不再透明,需要自己管理,Broker 自動(dòng)失敗轉(zhuǎn)移需要處理,增加 Consumer、Partition、Broker 需要自己做負(fù)載均衡。
Kafka 的NIO網(wǎng)絡(luò)通信模型
Kafka的網(wǎng)絡(luò)通信模型是基于NIO的Reactor多線程模型來(lái)設(shè)計(jì)的。這里先引用Kafka源碼中注釋的一段話:
An NIO socket server. The threading model is
1 Acceptor thread that handles new connections.
Acceptor has N Processor threads that each have their own selector and read requests from sockets.
M Handler threads that handle requests and produce responses back to the processor threads for writing.
Kafka的網(wǎng)絡(luò)通信層模型,主要采用了1(1個(gè)Acceptor線程)+N(N個(gè)Processor線程)+M(M個(gè)業(yè)務(wù)處理線程)。下面的表格簡(jiǎn)要的列舉了下
| 線程數(shù) | 線程名 | 線程具體說(shuō)明 |
|---|---|---|
| 1 | kafka-socket-acceptor_%x | Acceptor線程,負(fù)責(zé)監(jiān)聽(tīng)Client端發(fā)起的請(qǐng)求 |
| N | kafka-network-thread_%d | Processor線程,負(fù)責(zé)對(duì)Socket進(jìn)行讀寫 |
| M | kafka-request-handler-_%d | Worker線程,處理具體的業(yè)務(wù)邏輯并生成Response返回 |
Kafka網(wǎng)絡(luò)通信層的完整框架圖如下圖所示:

Kafka的網(wǎng)絡(luò)通信層框架結(jié)構(gòu)有幾個(gè)重要概念:
(1)Acceptor:1個(gè)接收線程,負(fù)責(zé)監(jiān)聽(tīng)新的連接請(qǐng)求,同時(shí)注冊(cè)O(shè)P_ACCEPT 事件,將新的連接按照"round robin"方式交給對(duì)應(yīng)的 Processor 線程處理;
(2)Processor:N個(gè)處理器線程,其中每個(gè) Processor 都有自己的 selector,它會(huì)向 Acceptor 分配的 SocketChannel 注冊(cè)相應(yīng)的 OP_READ 事件,N 的大小由“num.networker.threads”決定;
(3)KafkaRequestHandler:M個(gè)請(qǐng)求處理線程,包含在線程池—KafkaRequestHandlerPool內(nèi)部,從RequestChannel的全局請(qǐng)求隊(duì)列—requestQueue中獲取請(qǐng)求數(shù)據(jù)并交給KafkaApis處理,M的大小由“num.io.threads”決定;
(4)RequestChannel:其為Kafka服務(wù)端的請(qǐng)求通道,該數(shù)據(jù)結(jié)構(gòu)中包含了一個(gè)全局的請(qǐng)求隊(duì)列 requestQueue和多個(gè)與Processor處理器相對(duì)應(yīng)的響應(yīng)隊(duì)列responseQueue,提供給Processor與請(qǐng)求處理線程KafkaRequestHandler和KafkaApis交換數(shù)據(jù)的地方。
(5)NetworkClient:其底層是對(duì) Java NIO 進(jìn)行相應(yīng)的封裝,位于Kafka的網(wǎng)絡(luò)接口層。Kafka消息生產(chǎn)者對(duì)象—KafkaProducer的send方法主要調(diào)用NetworkClient完成消息發(fā)送;
(6)SocketServer:其是一個(gè)NIO的服務(wù),它同時(shí)啟動(dòng)一個(gè)Acceptor接收線程和多個(gè)Processor處理器線程。提供了一種典型的Reactor多線程模式,將接收客戶端請(qǐng)求和處理請(qǐng)求相分離;
(7)KafkaServer:代表了一個(gè)Kafka Broker的實(shí)例;其startup方法為實(shí)例啟動(dòng)的入口;
(8)KafkaApis:Kafka的業(yè)務(wù)邏輯處理Api,負(fù)責(zé)處理不同類型的請(qǐng)求;比如“發(fā)送消息”、“獲取消息偏移量—offset”和“處理心跳請(qǐng)求”等;
Kafka網(wǎng)絡(luò)通信層的設(shè)計(jì)與具體實(shí)現(xiàn)
結(jié)合Kafka網(wǎng)絡(luò)通信層的源碼來(lái)分析其設(shè)計(jì)與實(shí)現(xiàn),這里主要詳細(xì)介紹網(wǎng)絡(luò)通信層的幾個(gè)重要元素—SocketServer、Acceptor、Processor、RequestChannel和KafkaRequestHandler。本文分析的源碼部分均基于Kafka的0.11.0版本。
1、SocketServer
SocketServer是接收客戶端Socket請(qǐng)求連接、處理請(qǐng)求并返回處理結(jié)果的核心類,Acceptor及Processor的初始化、處理邏輯都是在這里實(shí)現(xiàn)的。在KafkaServer實(shí)例啟動(dòng)時(shí)會(huì)調(diào)用其startup的初始化方法,會(huì)初始化1個(gè) Acceptor和N個(gè)Processor線程(每個(gè)EndPoint都會(huì)初始化,一般來(lái)說(shuō)一個(gè)Server只會(huì)設(shè)置一個(gè)端口),其實(shí)現(xiàn)如下:
def startup() {
this.synchronized {
connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
val sendBufferSize = config.socketSendBufferBytes
val recvBufferSize = config.socketReceiveBufferBytes
val brokerId = config.brokerId
var processorBeginIndex = 0
// 一個(gè)broker一般只設(shè)置一個(gè)端口
config.listeners.foreach { endpoint =>
val listenerName = endpoint.listenerName
val securityProtocol = endpoint.securityProtocol
val processorEndIndex = processorBeginIndex + numProcessorThreads
//N 個(gè) processor
for (i <- processorBeginIndex until processorEndIndex)
processors(i) = newProcessor(i, connectionQuotas, listenerName, securityProtocol, memoryPool)
//1個(gè) Acceptor
val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId,
processors.slice(processorBeginIndex, processorEndIndex), connectionQuotas)
acceptors.put(endpoint, acceptor)
KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
acceptor.awaitStartup()
processorBeginIndex = processorEndIndex
}
}
2、Acceptor
Acceptor是一個(gè)繼承自抽象類AbstractServerThread的線程類。Acceptor的主要任務(wù)是監(jiān)聽(tīng)并且接收客戶端的請(qǐng)求,同時(shí)建立數(shù)據(jù)傳輸通道—SocketChannel,然后以輪詢的方式交給一個(gè)后端的Processor線程處理(具體的方式是添加socketChannel至并發(fā)隊(duì)列并喚醒Processor線程處理)。
在該線程類中主要可以關(guān)注以下兩個(gè)重要的變量:
(1),nioSelector:通過(guò)NSelector.open()方法創(chuàng)建的變量,封裝了JAVA NIO Selector的相關(guān)操作;
(2),serverChannel:用于監(jiān)聽(tīng)端口的服務(wù)端Socket套接字對(duì)象;
下面來(lái)看下Acceptor主要的run方法的源碼:
def run() {
//首先注冊(cè)O(shè)P_ACCEPT事件
serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
startupComplete()
try {
var currentProcessor = 0
//以輪詢方式查詢并等待關(guān)注的事件發(fā)生
while (isRunning) {
try {
val ready = nioSelector.select(500)
if (ready > 0) {
val keys = nioSelector.selectedKeys()
val iter = keys.iterator()
while (iter.hasNext && isRunning) {
try {
val key = iter.next
iter.remove()
if (key.isAcceptable)
//如果事件發(fā)生則調(diào)用accept方法對(duì)OP_ACCEPT事件處理
accept(key, processors(currentProcessor))
else
throw new IllegalStateException("Unrecognized key state for acceptor thread.")
//輪詢算法
// round robin to the next processor thread
currentProcessor = (currentProcessor + 1) % processors.length
} catch {
case e: Throwable => error("Error while accepting connection", e)
}
}
}
}
//代碼省略
}
def accept(key: SelectionKey, processor: Processor) {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
try {
connectionQuotas.inc(socketChannel.socket().getInetAddress)
socketChannel.configureBlocking(false)
socketChannel.socket().setTcpNoDelay(true)
socketChannel.socket().setKeepAlive(true)
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socketChannel.socket().setSendBufferSize(sendBufferSize)
processor.accept(socketChannel)
} catch {
//省略部分代碼
}
}
def accept(socketChannel: SocketChannel) {
newConnections.add(socketChannel)
wakeup()
}
在上面源碼中可以看到,Acceptor線程啟動(dòng)后,首先會(huì)向用于監(jiān)聽(tīng)端口的服務(wù)端套接字對(duì)象—ServerSocketChannel上注冊(cè)O(shè)P_ACCEPT 事件。然后以輪詢的方式等待所關(guān)注的事件發(fā)生。如果該事件發(fā)生,則調(diào)用accept()方法對(duì)OP_ACCEPT事件進(jìn)行處理。這里,Processor是通過(guò)round robin方法選擇的,這樣可以保證后面多個(gè)Processor線程的負(fù)載基本均勻。
Acceptor的accept()方法的作用主要如下:
(1)通過(guò)SelectionKey取得與之對(duì)應(yīng)的serverSocketChannel實(shí)例,并調(diào)用它的accept()方法與客戶端建立連接;
(2)調(diào)用connectionQuotas.inc()方法增加連接統(tǒng)計(jì)計(jì)數(shù);并同時(shí)設(shè)置第(1)步中創(chuàng)建返回的socketChannel屬性(如sendBufferSize、KeepAlive、TcpNoDelay、configureBlocking等)
(3)將socketChannel交給processor.accept()方法進(jìn)行處理。這里主要是將socketChannel加入Processor處理器的并發(fā)隊(duì)列newConnections隊(duì)列中,然后喚醒Processor線程從隊(duì)列中獲取socketChannel并處理。其中,newConnections會(huì)被Acceptor線程和Processor線程并發(fā)訪問(wèn)操作,所以newConnections是ConcurrentLinkedQueue隊(duì)列(一個(gè)基于鏈接節(jié)點(diǎn)的無(wú)界線程安全隊(duì)列)
3、Processor
Processor同Acceptor一樣,也是一個(gè)線程類,繼承了抽象類AbstractServerThread。其主要是從客戶端的請(qǐng)求中讀取數(shù)據(jù)和將KafkaRequestHandler處理完響應(yīng)結(jié)果返回給客戶端。在該線程類中主要關(guān)注以下幾個(gè)重要的變量:
(1)newConnections:在上面的Acceptor一節(jié)中已經(jīng)提到過(guò),它是一種ConcurrentLinkedQueue[SocketChannel]類型的隊(duì)列,用于保存新連接交由Processor處理的socketChannel;
(2)inflightResponses:是一個(gè)Map[String, RequestChannel.Response]類型的集合,用于記錄尚未發(fā)送的響應(yīng);
(3)selector:是一個(gè)類型為KSelector變量,用于管理網(wǎng)絡(luò)連接;
下面先給出Processor處理器線程run方法執(zhí)行的流程圖:

從上面的流程圖中能夠可以看出Processor處理器線程在其主流程中主要完成了這樣子幾步操作:
(1)處理newConnections隊(duì)列中的socketChannel。遍歷取出隊(duì)列中的每個(gè)socketChannel并將其在selector上注冊(cè)O(shè)P_READ事件;
(2)處理RequestChannel中與當(dāng)前Processor對(duì)應(yīng)響應(yīng)隊(duì)列中的Response。在這一步中會(huì)根據(jù)responseAction的類型(NoOpAction/SendAction/CloseConnectionAction)進(jìn)行判斷,若為“NoOpAction”,表示該連接對(duì)應(yīng)的請(qǐng)求無(wú)需響應(yīng);若為“SendAction”,表示該Response需要發(fā)送給客戶端,則會(huì)通過(guò)“selector.send”注冊(cè)O(shè)P_WRITE事件,并且將該Response從responseQueue響應(yīng)隊(duì)列中移至inflightResponses集合中;“CloseConnectionAction”,表示該連接是要關(guān)閉的;
(3)調(diào)用selector.poll()方法進(jìn)行處理。該方法底層即為調(diào)用nioSelector.select()方法進(jìn)行處理。
(4)處理已接受完成的數(shù)據(jù)包隊(duì)列—completedReceives。在processCompletedReceives方法中調(diào)用“requestChannel.sendRequest”方法將請(qǐng)求Request添加至requestChannel的全局請(qǐng)求隊(duì)列—requestQueue中,等待KafkaRequestHandler來(lái)處理。同時(shí),調(diào)用“selector.mute”方法取消與該請(qǐng)求對(duì)應(yīng)的連接通道上的OP_READ事件;
(5)處理已發(fā)送完的隊(duì)列—completedSends。當(dāng)已經(jīng)完成將response發(fā)送給客戶端,則將其從inflightResponses移除,同時(shí)通過(guò)調(diào)用“selector.unmute”方法為對(duì)應(yīng)的連接通道重新注冊(cè)O(shè)P_READ事件;
(6)處理斷開(kāi)連接的隊(duì)列。將該response從inflightResponses集合中移除,同時(shí)將connectionQuotas統(tǒng)計(jì)計(jì)數(shù)減1;
4、RequestChannel
在Kafka的網(wǎng)絡(luò)通信層中,RequestChannel為Processor處理器線程與KafkaRequestHandler線程之間的數(shù)據(jù)交換提供了一個(gè)數(shù)據(jù)緩沖區(qū),是通信過(guò)程中Request和Response緩存的地方。因此,其作用就是在通信中起到了一個(gè)數(shù)據(jù)緩沖隊(duì)列的作用。Processor線程將讀取到的請(qǐng)求添加至RequestChannel的全局請(qǐng)求隊(duì)列—requestQueue中;KafkaRequestHandler線程從請(qǐng)求隊(duì)列中獲取并處理,處理完以后將Response添加至RequestChannel的響應(yīng)隊(duì)列—responseQueue中,并通過(guò)responseListeners喚醒對(duì)應(yīng)的Processor線程,最后Processor線程從響應(yīng)隊(duì)列中取出后發(fā)送至客戶端。
5、KafkaRequestHandler
KafkaRequestHandler也是一種線程類,在KafkaServer實(shí)例啟動(dòng)時(shí)候會(huì)實(shí)例化一個(gè)線程池—KafkaRequestHandlerPool對(duì)象(包含了若干個(gè)KafkaRequestHandler線程),這些線程以守護(hù)線程的方式在后臺(tái)運(yùn)行。在KafkaRequestHandler的run方法中會(huì)循環(huán)地從RequestChannel中阻塞式讀取request,讀取后再交由KafkaApis來(lái)具體處理。
6、KafkaApis
KafkaApis是用于處理對(duì)通信網(wǎng)絡(luò)傳輸過(guò)來(lái)的業(yè)務(wù)消息請(qǐng)求的中心轉(zhuǎn)發(fā)組件。該組件反映出Kafka Broker Server可以提供哪些服務(wù)。
Kafka 的數(shù)據(jù)存儲(chǔ)模型是怎么樣的?
Kafka 每個(gè) Topic 下面的所有消息都是以 Partition 的方式分布式的存儲(chǔ)在多個(gè)節(jié)點(diǎn)上。同時(shí)在 Kafka 的機(jī)器上,每個(gè) Partition 其實(shí)都會(huì)對(duì)應(yīng)一個(gè)日志目錄,在目錄下面會(huì)對(duì)應(yīng)多個(gè)日志分段(LogSegment)。
下面先介紹一下partition中的segment file的組成:
- segment file 組成:由2部分組成,分別為index file和data file,這兩個(gè)文件是一一對(duì)應(yīng)的,后綴”.index”和”.log”分別表示索引文件和數(shù)據(jù)文件;
- segment file 命名規(guī)則:partition的第一個(gè)segment從0開(kāi)始,后續(xù)每個(gè)segment文件名為上一個(gè)segment文件最后一條消息的offset,ofsset的數(shù)值最大為64位(long類型),20位數(shù)字字符長(zhǎng)度,沒(méi)有數(shù)字用0填充。如下圖所示:
關(guān)于segment file中index與data file對(duì)應(yīng)關(guān)系圖,這里我們選用網(wǎng)上的一個(gè)圖片,如下所示:
segment的索引文件中存儲(chǔ)著大量的元數(shù)據(jù),數(shù)據(jù)文件中存儲(chǔ)著大量消息,索引文件中的元數(shù)據(jù)指向?qū)?yīng)數(shù)據(jù)文件中的message的物理偏移地址。以索引文件中的3,497為例,在數(shù)據(jù)文件中表示第3個(gè)message(在全局partition表示第368772個(gè)message),以及該消息的物理偏移地址為497。
注:Partition中的每條message由offset來(lái)表示它在這個(gè)partition中的偏移量,這個(gè)offset并不是該Message在partition中實(shí)際存儲(chǔ)位置,而是邏輯上的一個(gè)值(如上面的3),但它卻唯一確定了partition中的一條Message(可以認(rèn)為offset是partition中Message的id)。
Kafka 的消息格式是怎么樣的?
message中的物理結(jié)構(gòu)為:
參數(shù)說(shuō)明:
| 關(guān)鍵字 | 解釋說(shuō)明 |
|---|---|
| 8 byte offset | 在parition(分區(qū))內(nèi)的每條消息都有一個(gè)有序的id號(hào),這個(gè)id號(hào)被稱為偏移(offset),它可以唯一確定每條消息在parition(分區(qū))內(nèi)的位置。即offset表示partiion的第多少message |
| 4 byte message size | message大小 |
| 4 byte CRC32 | 用crc32校驗(yàn)message |
| 1 byte “magic” | 表示本次發(fā)布Kafka服務(wù)程序協(xié)議版本號(hào) |
| 1 byte “attributes” | 表示為獨(dú)立版本、或標(biāo)識(shí)壓縮類型、或編碼類型 |
| 4 byte key length | 表示key的長(zhǎng)度,當(dāng)key為-1時(shí),K byte key字段不填 |
| K byte key | 可選 |
| value bytes payload | 表示實(shí)際消息數(shù)據(jù) |
3.3.通過(guò)offset查找message
假如我們想要讀取offset=368776的message,需要通過(guò)下面2個(gè)步驟查找。
1). 查找segment file
00000000000000000000.index表示最開(kāi)始的文件,起始偏移量(offset)為0.第二個(gè)文件00000000000000368769.index的消息量起始偏移量為368770 = 368769 + 1.同樣,第三個(gè)文件00000000000000737337.index的起始偏移量為737338=737337 + 1,其他后續(xù)文件依次類推,以起始偏移量命名并排序這些文件,只要根據(jù)offset 二分查找文件列表,就可以快速定位到具體文件。
當(dāng)offset=368776時(shí)定位到00000000000000368769.index|log
2). 通過(guò)segment file查找message
通過(guò)第一步定位到segment file,當(dāng)offset=368776時(shí),依次定位到00000000000000368769.index的元數(shù)據(jù)物理位置和00000000000000368769.log的物理偏移地址,然后再通過(guò)00000000000000368769.log順序查找直到offset=368776為止。
segment index file并沒(méi)有為數(shù)據(jù)文件中的每條message建立索引,而是采取稀疏索引存儲(chǔ)方式,每隔一定字節(jié)的數(shù)據(jù)建立一條索引,它減少了索引文件大小,通過(guò)map可以直接內(nèi)存操作,稀疏索引為數(shù)據(jù)文件的每個(gè)對(duì)應(yīng)message設(shè)置一個(gè)元數(shù)據(jù)指針,它比稠密索引節(jié)省了更多的存儲(chǔ)空間,但查找起來(lái)需要消耗更多的時(shí)間。
Kafka高效文件存儲(chǔ)設(shè)計(jì)特點(diǎn):
- Kafka把topic中一個(gè)parition大文件分成多個(gè)小文件段,通過(guò)多個(gè)小文件段,就容易定期清除或刪除已經(jīng)消費(fèi)完文件,減少磁盤占用。
- 通過(guò)索引信息可以快速定位message和確定response的最大大小。
- 通過(guò)index元數(shù)據(jù)全部映射到memory,可以避免segment file的IO磁盤操作。
- 通過(guò)索引文件稀疏存儲(chǔ),可以大幅降低index文件元數(shù)據(jù)占用空間大小。
為什么不能以 Partition 作為存儲(chǔ)單位?
如果就以 Partition 為最小存儲(chǔ)單位,可以想象,當(dāng) Kafka Producer 不斷發(fā)送消息,必然會(huì)引起 Partition 文件的無(wú)限擴(kuò)張,將對(duì)消息文件的維護(hù)以及已消費(fèi)的消息的清理帶來(lái)嚴(yán)重的影響,因此,需以 segment 為單位將 Partition 進(jìn)一步細(xì)分。
每個(gè) Partition(目錄)相當(dāng)于一個(gè)巨型文件,被平均分配到多個(gè)大小相等的 segment(段)數(shù)據(jù)文件中(每個(gè) segment 文件中消息數(shù)量不一定相等),這種特性也方便 old segment 的刪除,即方便已被消費(fèi)的消息的清理,提高磁盤的利用率。每個(gè) Partition 只需要支持順序讀寫就行,segment 的文件生命周期由服務(wù)端配置參數(shù)(log.segment.bytes,log.roll.{ms,hours} 等若干參數(shù))決定。
Kafka 的副本機(jī)制是怎么樣的?
Kafka 的副本機(jī)制,是多個(gè) Broker 節(jié)點(diǎn)對(duì)其他節(jié)點(diǎn)的 Topic 分區(qū)的日志進(jìn)行復(fù)制。當(dāng)集群中的某個(gè)節(jié)點(diǎn)出現(xiàn)故障,訪問(wèn)故障節(jié)點(diǎn)的請(qǐng)求會(huì)被轉(zhuǎn)移到其他正常節(jié)點(diǎn)(這一過(guò)程通常叫 Reblance),Kafka 每個(gè)主題的每個(gè)分區(qū)都有一個(gè)主副本以及 0 個(gè)或者多個(gè)副本,副本保持和主副本的數(shù)據(jù)同步,當(dāng)主副本出故障時(shí)就會(huì)被替代。
注意哈,下面說(shuō)的 Leader 指的是每個(gè) Topic 的某個(gè)分區(qū)的 Leader ,而不是 Broker 集群中的【集群控制器】。
在 Kafka 中并不是所有的副本都能被拿來(lái)替代主副本,所以在 Kafka 的Leader 節(jié)點(diǎn)中維護(hù)著一個(gè) ISR(In sync Replicas)集合,翻譯過(guò)來(lái)也叫正在同步中集合,在這個(gè)集合中的需要滿足兩個(gè)條件:
- 1、節(jié)點(diǎn)必須和 Zookeeper 保持連接。
- 2、在同步的過(guò)程中這個(gè)副本不能落后主副本太多。
另外還有個(gè) AR(Assigned Replicas)用來(lái)標(biāo)識(shí)副本的全集,OSR 用來(lái)表示由于落后被剔除的副本集合,所以公式如下:
- ISR = Leader + 沒(méi)有落后太多的副本。
- AR = OSR + ISR 。
這里先要說(shuō)下兩個(gè)名詞:HW 和 LEO 。
- HW(高水位 HighWatermark),是 Consumer 能夠看到的此 Partition 的位置。
- LEO(logEndOffset),是每個(gè) Partition 的 log 最后一條 Message 的位置。
- HW 能保證 Leader 所在的 Broker 失效,該消息仍然可以從新選舉的Leader 中獲取,不會(huì)造成消息丟失。
當(dāng) Producer 向 Leader 發(fā)送數(shù)據(jù)時(shí),可以通過(guò)request.required.acks 參數(shù)來(lái)設(shè)置數(shù)據(jù)可靠性的級(jí)別:
- 1(默認(rèn)):這意味著 Producer 在 ISR 中的 Leader 已成功收到的數(shù)據(jù)并得到確認(rèn)后發(fā)送下一條 message 。如果 Leader 宕機(jī)了,則會(huì)丟失數(shù)據(jù)。
- 0:這意味著 Producer 無(wú)需等待來(lái)自 Broker 的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。
- -1:Producer 需要等待 ISR 中的所有 Follower 都確認(rèn)接收到數(shù)據(jù)后才算一次發(fā)送完成,可靠性最高。但是這樣也不能保證數(shù)據(jù)不丟失,比如當(dāng) ISR 中只有 Leader 時(shí)(其他節(jié)點(diǎn)都和 Zookeeper 斷開(kāi)連接,或者都沒(méi)追上),這樣就變成了
acks=1的情況。
ZooKeeper 在 Kafka 中起到什么作用?
在基于 Kafka 的分布式消息隊(duì)列中,ZooKeeper 的作用有:
1、Broker 在 ZooKeeper 中的注冊(cè)。
2、Topic 在 ZooKeeper 中的注冊(cè)。
3、Consumer 在 ZooKeeper 中的注冊(cè)。
4、Producer 負(fù)載均衡。主要指的是,Producer 從 Zookeeper 拉取 Topic 元數(shù)據(jù),從而能夠?qū)⑾l(fā)送負(fù)載均衡到對(duì)應(yīng) Topic 的分區(qū)中。
5、Consumer 負(fù)載均衡。
6、記錄消費(fèi)進(jìn)度 Offset 。Kafka 已推薦將 consumer 的 Offset 信息保存在 Kafka 內(nèi)部的 Topic 中。
7、記錄 Partition 與 Consumer 的關(guān)系。
Kafka 如何實(shí)現(xiàn)高可用?
Zookeeper 部署 2N+1 節(jié)點(diǎn),形成 Zookeeper 集群,保證高可用。
Kafka Broker 部署集群。每個(gè) Topic 的 Partition ,基于【副本機(jī)制】,在 Broker 集群中復(fù)制,形成 replica 副本,保證消息存儲(chǔ)的可靠性。每個(gè) replica 副本,都會(huì)選擇出一個(gè) leader 分區(qū)(Partition),提供給客戶端(Producer 和 Consumer)進(jìn)行讀寫。
Kafka Producer 無(wú)需考慮集群,因?yàn)楹蜆I(yè)務(wù)服務(wù)部署在一起。Producer 從 Zookeeper 拉取到 Topic 的元數(shù)據(jù)后,選擇對(duì)應(yīng)的 Topic 的 leader 分區(qū),進(jìn)行消息發(fā)送寫入。而 Broker 根據(jù) Producer 的
request.required.acks配置,是寫入自己完成就響應(yīng)給 Producer 成功,還是寫入所有 Broker 完成再響應(yīng)。這個(gè),就是胖友自己對(duì)消息的可靠性的選擇。-
Kafka Consumer 部署集群。每個(gè) Consumer 分配其對(duì)應(yīng)的 Topic Partition ,根據(jù)對(duì)應(yīng)的分配策略。并且,Consumer 只從 leader 分區(qū)(Partition)拉取消息。另外,當(dāng)有新的 Consumer 加入或者老的 Consumer 離開(kāi),都會(huì)將 Topic Partition 再均衡,重新分配給 Consumer 。
注意噢,此處說(shuō)的都是同一個(gè) Kafka Consumer group 。
總的來(lái)說(shuō),Kafka 和 RocketMQ 的高可用方式是比較類似的,主要的差異在 Kafka Broker 的副本機(jī)制,和 RocketMQ Broker 的主從復(fù)制,兩者的差異,以及差異帶來(lái)的生產(chǎn)和消費(fèi)不同。當(dāng)然,實(shí)際上,都是和“主” Broker 做消息的發(fā)送和讀取不是!
Kafka 是否會(huì)弄丟數(shù)據(jù)?
消費(fèi)端弄丟了數(shù)據(jù)
唯一可能導(dǎo)致消費(fèi)者弄丟數(shù)據(jù)的情況,就是說(shuō),你消費(fèi)到了這個(gè)消息,然后消費(fèi)者那邊自動(dòng)提交了 offset,讓 Kafka 以為你已經(jīng)消費(fèi)好了這個(gè)消息,但其實(shí)你才剛準(zhǔn)備處理這個(gè)消息,你還沒(méi)處理,你自己就掛了,此時(shí)這條消息就丟咯。
這不是跟 RabbitMQ 差不多嗎,大家都知道 Kafka 會(huì)自動(dòng)提交 offset,那么只要關(guān)閉自動(dòng)提交 offset,在處理完之后自己手動(dòng)提交 offset,就可以保證數(shù)據(jù)不會(huì)丟。但是此時(shí)確實(shí)還是可能會(huì)有重復(fù)消費(fèi),比如你剛處理完,還沒(méi)提交 offset,結(jié)果自己掛了,此時(shí)肯定會(huì)重復(fù)消費(fèi)一次,自己保證冪等性就好了。
生產(chǎn)環(huán)境碰到的一個(gè)問(wèn)題,就是說(shuō)我們的 Kafka 消費(fèi)者消費(fèi)到了數(shù)據(jù)之后是寫到一個(gè)內(nèi)存的 queue 里先緩沖一下,結(jié)果有的時(shí)候,你剛把消息寫入內(nèi)存 queue,然后消費(fèi)者會(huì)自動(dòng)提交 offset。然后此時(shí)我們重啟了系統(tǒng),就會(huì)導(dǎo)致內(nèi)存 queue 里還沒(méi)來(lái)得及處理的數(shù)據(jù)就丟失了。
Kafka 弄丟了數(shù)據(jù)
這塊比較常見(jiàn)的一個(gè)場(chǎng)景,就是 Kafka 某個(gè) broker 宕機(jī),然后重新選舉 partition 的 leader。大家想想,要是此時(shí)其他的 follower 剛好還有些數(shù)據(jù)沒(méi)有同步,結(jié)果此時(shí) leader 掛了,然后選舉某個(gè) follower 成 leader 之后,不就少了一些數(shù)據(jù)?這就丟了一些數(shù)據(jù)啊。
生產(chǎn)環(huán)境也遇到過(guò),我們也是,之前 Kafka 的 leader 機(jī)器宕機(jī)了,將 follower 切換為 leader 之后,就會(huì)發(fā)現(xiàn)說(shuō)這個(gè)數(shù)據(jù)就丟了。
所以此時(shí)一般是要求起碼設(shè)置如下 4 個(gè)參數(shù):
- 給 topic 設(shè)置
replication.factor參數(shù):這個(gè)值必須大于 1,要求每個(gè) partition 必須有至少 2 個(gè)副本。 - 在 Kafka 服務(wù)端設(shè)置
min.insync.replicas參數(shù):這個(gè)值必須大于 1,這個(gè)是要求一個(gè) leader 至少感知到有至少一個(gè) follower 還跟自己保持聯(lián)系,沒(méi)掉隊(duì),這樣才能確保 leader 掛了還有一個(gè) follower 吧。 - 在 producer 端設(shè)置
acks=all:這個(gè)是要求每條數(shù)據(jù),必須是寫入所有 replica 之后,才能認(rèn)為是寫成功了。 - 在 producer 端設(shè)置
retries=MAX(很大很大很大的一個(gè)值,無(wú)限次重試的意思):這個(gè)是要求一旦寫入失敗,就無(wú)限重試,卡在這里了。
我們生產(chǎn)環(huán)境就是按照上述要求配置的,這樣配置之后,至少在 Kafka broker 端就可以保證在 leader 所在 broker 發(fā)生故障,進(jìn)行 leader 切換時(shí),數(shù)據(jù)不會(huì)丟失。
生產(chǎn)者會(huì)不會(huì)弄丟數(shù)據(jù)?
如果按照上述的思路設(shè)置了 acks=all ,一定不會(huì)丟,要求是,你的 leader 接收到消息,所有的 follower 都同步到了消息之后,才認(rèn)為本次寫成功了。如果沒(méi)滿足這個(gè)條件,生產(chǎn)者會(huì)自動(dòng)不斷的重試,重試無(wú)限次。
Kafka 如何保證消息的順序性?
Kafka 本身,并不像 RocketMQ 一樣,提供順序性的消息。所以,提供的方案,都是相對(duì)有損的。如下:
這里的順序消息,我們更多指的是,單個(gè) Partition 的消息,被順序消費(fèi)。
方式一,Consumer ,對(duì)每個(gè) Partition 內(nèi)部單線程消費(fèi),單線程吞吐量太低,一般不會(huì)用這個(gè)。
方式二,Consumer ,拉取到消息后,寫到 N 個(gè)內(nèi)存 queue,具有相同 key 的數(shù)據(jù)都到同一個(gè)內(nèi)存 queue 。然后,對(duì)于 N 個(gè)線程,每個(gè)線程分別消費(fèi)一個(gè)內(nèi)存 queue 即可,這樣就能保證順序性。
這種方式,相當(dāng)于對(duì)【方式一】的改進(jìn),將相同 Partition 的消息進(jìn)一步拆分,保證相同 key 的數(shù)據(jù)消費(fèi)是順序的。
不過(guò)這種方式,消費(fèi)進(jìn)度的更新會(huì)比較麻煩。
當(dāng)然,實(shí)際情況也不太需要考慮消息的順序性,基本沒(méi)有業(yè)務(wù)需要。
Leader 選舉
為了保證可靠性,對(duì)于任意一條消息,只有它被 ISR 中的所有 follower 都從 leader 復(fù)制過(guò)去才會(huì)被認(rèn)為已提交,并返回信息給 producer。如此,可以避免因部分?jǐn)?shù)據(jù)被寫進(jìn) leader,而尚未被任何 follower 復(fù)制就宕機(jī)的情況下而造成數(shù)據(jù)丟失。對(duì)于 producer 而言,它可以選擇是否等待消息 commit,這可以通過(guò)參數(shù) request.required.acks 來(lái)設(shè)置。這種機(jī)制可以確保:只要 ISR 中有一個(gè)或者以上的 follower,一條被 commit 的消息就不會(huì)丟失。
問(wèn)題 1:如何在保證可靠性的前提下避免吞吐量下降?
有一個(gè)很重要的問(wèn)題是當(dāng) leader 宕機(jī)了,怎樣在 follower 中選舉出新的 leader,因?yàn)?follower 可能落后很多或者直接 crash 了,所以必須確保選擇 “最新” 的 follower 作為新的 leader。一個(gè)基本的原則就是,如果 leader 掛掉,新的 leader 必須擁有原來(lái)的 leader 已經(jīng) commit 的所有消息,這不就是 ISR 中副本的特征嗎?
但是,存在一個(gè)問(wèn)題,ISR 列表維持多大的規(guī)模合適呢?換言之,leader 在一個(gè)消息被 commit 前需要等待多少個(gè) follower 確認(rèn)呢?等待 follower 的數(shù)量越多,與 leader 保持同步的 follower 就越多,可靠性就越高,但這也會(huì)造成吞吐率的下降。
少數(shù)服從多數(shù)的選舉原則
一種常用的選舉 leader 的策略是 “少數(shù)服從多數(shù)” ,不過(guò),Kafka 并不是采用這種方式。這種模式下,如果有 2f+1 個(gè)副本,那么在 commit 之前必須保證有 f+1 個(gè) replica 復(fù)制完消息,同時(shí)為了保證能正確選舉出新的 leader,失敗的副本數(shù)不能超過(guò) f 個(gè)。這種方式有個(gè)很大的優(yōu)勢(shì),系統(tǒng)的延遲取決于最快的幾臺(tái)機(jī)器,也就是說(shuō)比如副本數(shù)為 3,那么延遲就取決于最快的那個(gè) follower 而不是最慢的那個(gè)。
“少數(shù)服從多數(shù)” 的策略也有一些劣勢(shì),為了保證 leader 選舉的正常進(jìn)行,它所能容忍的失敗的 follower 數(shù)比較少,如果要容忍 1 個(gè) follower 掛掉,那么至少要 3 個(gè)以上的副本,如果要容忍 2 個(gè) follower 掛掉,必須要有 5 個(gè)以上的副本。也就是說(shuō),在生產(chǎn)環(huán)境下為了保證較高的容錯(cuò)率,必須要有大量的副本,而大量的副本又會(huì)在大數(shù)據(jù)量下導(dǎo)致性能的急劇下降。這種算法更多用在 ZooKeeper 這種共享集群配置的系統(tǒng)中,而很少在需要大量數(shù)據(jù)的系統(tǒng)中使用。
Kafka 選舉 leader 的策略是怎樣的?
實(shí)際上,leader 選舉的算法非常多,比如 ZooKeeper 的 Zab、Raft 以及 Viewstamped Replication。而 Kafka 所使用的 leader 選舉算法更像是微軟的 PacificA 算法。
Kafka 在 ZooKeeper 中為每一個(gè) partition 動(dòng)態(tài)的維護(hù)了一個(gè) ISR,這個(gè) ISR 里的所有 replica 都與 leader 保持同步,只有 ISR 里的成員才能有被選為 leader 的可能(通過(guò)參數(shù)配置:unclean.leader.election.enable=false)。在這種模式下,對(duì)于 f+1 個(gè)副本,一個(gè) Kafka topic 能在保證不丟失已經(jīng) commit 消息的前提下容忍 f 個(gè)副本的失敗,在大多數(shù)使用場(chǎng)景下,這種模式是十分有利的。事實(shí)上,對(duì)于任意一條消息,只有它被 ISR 中的所有 follower 都從 leader 復(fù)制過(guò)去才會(huì)被認(rèn)為已提交,并返回信息給 producer,從而保證可靠性。但與 “少數(shù)服從多數(shù)” 策略不同的是,Kafka ISR 列表中副本的數(shù)量不需要超過(guò)副本總數(shù)的一半,即不需要滿足 “多數(shù)派” 原則,通常,ISR 列表副本數(shù)大于等于 2 即可,如此,便在可靠性和吞吐量方面取得平衡。
極端情況下的 leader 選舉策略
前已述及,當(dāng) ISR 中至少有一個(gè) follower 時(shí)(ISR 包括 leader),Kafka 可以確保已經(jīng) commit 的消息不丟失,但如果某一個(gè) partition 的所有 replica 都掛了,自然就無(wú)法保證數(shù)據(jù)不丟失了。這種情況下如何進(jìn)行 leader 選舉呢?通常有兩種方案:
- 等待 ISR 中任意一個(gè) replica 恢復(fù)過(guò)來(lái),并且選它作為 leader;
- 選擇第一個(gè)恢復(fù)過(guò)來(lái)的 replica(并不一定是在 ISR 中)作為leader。
如何選擇呢?這就需要在可用性和一致性當(dāng)中作出抉擇。如果一定要等待 ISR 中的 replica 恢復(fù)過(guò)來(lái),不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng)。而且如果 ISR 中所有的 replica 都無(wú)法恢復(fù)了,或者數(shù)據(jù)丟失了,這個(gè) partition 將永遠(yuǎn)不可用。
選擇第一個(gè)恢復(fù)過(guò)來(lái)的 replica 作為 leader,如果這個(gè) replica 不是 ISR 中的 replica,那么,它可能并不具備所有已經(jīng) commit 的消息,從而造成消息丟失。默認(rèn)情況下,Kafka 采用第二種策略,即 unclean.leader.election.enable=true,也可以將此參數(shù)設(shè)置為 false 來(lái)啟用第一種策略。
unclean.leader.election.enable 這個(gè)參數(shù)對(duì)于 leader 的選舉、系統(tǒng)的可用性以及數(shù)據(jù)的可靠性都有至關(guān)重要的影響。生產(chǎn)環(huán)境中應(yīng)慎重權(quán)衡。


