導(dǎo)語(yǔ):Kafka 是一個(gè)分布式的基于發(fā)布/訂閱模式的消息引擎系統(tǒng)。其高可靠、高性能、高吞吐的優(yōu)勢(shì)使其廣泛應(yīng)用于各個(gè)業(yè)務(wù)場(chǎng)景。所以這里我們有必要對(duì)它的特性做一個(gè)全面分析。
Kafka 基礎(chǔ)

架構(gòu)圖

搞定術(shù)語(yǔ)
生產(chǎn)者:Producer。向主題發(fā)布新消息的應(yīng)用程序。 消費(fèi)者:Consumer。從主題訂閱新消息的應(yīng)用程序。
消息:Record。Kafka是消息引擎嘛,這里的消息就是指Kafka處理的主要對(duì)象。
主題:Topic。主題是承載消息的邏輯容器,在實(shí)際使用中多用來(lái)區(qū)分具體的業(yè)務(wù)。
分區(qū):Partition。一個(gè)有序不變的消息序列。每個(gè)主題下可以有多個(gè)分區(qū)。 消息位移:Offset。表示分區(qū)中每條消息的位置信息,是一個(gè)單調(diào)遞增且不變的值。
副本:Replica。Kafka中同一條消息能夠被拷貝到多個(gè)地方以提供數(shù)據(jù)冗余\容災(zāi),這些地方就是所謂的副本。副本還分為領(lǐng)導(dǎo)者副本和追隨者副本,各自有不同的角色劃分。副本是在分區(qū)層級(jí)下的,即每個(gè)分區(qū)可配置多個(gè)副本實(shí)現(xiàn)高可用。
消費(fèi)者位移:Consumer Offset。表征消費(fèi)者消費(fèi)進(jìn)度,每個(gè)消費(fèi)者都有自己的消費(fèi)者位移。
消費(fèi)者組:Consumer Group。多個(gè)消費(fèi)者實(shí)例共同組成的一個(gè)組,同時(shí)消費(fèi)多個(gè)分區(qū)以實(shí)現(xiàn)高吞吐。
重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過(guò)程。Rebalance是Kafka消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。
什么情況下發(fā)生重平衡?
- consumer 發(fā)生變化
- partition 發(fā)生變化
-
topic 數(shù)量發(fā)生變化
image.png
核心參數(shù)
? Broker參數(shù)
? 存儲(chǔ)類(lèi)
? log.dirs=/home/kafka1,/home/kafka2,/home/kafka3
? ZooKeeper相關(guān)
? zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1
? 連接類(lèi)
Ssl sasl(PLAINTEX \sasl-PLAINTEX、scram)
? listeners=CONTROLLER: //localhost:9092
? listener.security.protocol.map=CONTROLLER:PLAINTEX
? Topic 管理
? auto.create.topics.enable: 自動(dòng)創(chuàng)建topic
? unclean.leader.election.enable: false, ar=isr + 非isr
? auto.leader.rebalance.enable:
? 數(shù)據(jù)留存
? log.retention.{hours|minutes|ms} :數(shù)據(jù)壽命 hours=168h
? log.rentention.bytes: -1 表示沒(méi)限制
? message.max.bytes: broker接受的最大數(shù)據(jù)大小
生產(chǎn)消費(fèi)
- requet.require.ack=0 -1 1 用來(lái)保證是否讓所有的isr同步數(shù)據(jù)
kafka特性深入剖析
#日志存儲(chǔ)
Kafka 中的消息是以主題為基本單位進(jìn)行歸類(lèi)的,各個(gè)主題在邏輯上相互獨(dú)立。每個(gè)主題又可以分為一個(gè)或多個(gè)分區(qū),分區(qū)的數(shù)量可以在主題創(chuàng)建的時(shí)候指定,也可以在之后修改。每條消息在發(fā)送的時(shí)候會(huì)根據(jù)分區(qū)規(guī)則被追加到指定的分區(qū)中,分區(qū)中的每條消息都會(huì)被分配一個(gè)唯一的序列號(hào),也就是通常所說(shuō)的偏移量(offset)。

不考慮多副本的情況,一個(gè)分區(qū)對(duì)應(yīng)一個(gè)日志(Log)。為了防止 Log 過(guò)大,Kafka 又引入了日志分段(LogSegment)的概念,將 Log 切分為多個(gè) LogSegment,相當(dāng)于一個(gè)巨型文件被平均分配為多個(gè)相對(duì)較小的文件,這樣也便于消息的維護(hù)和清理。

向主題topic-log中發(fā)送一定量的消息,某一時(shí)刻topic-log-0目錄中的布局:

#分區(qū)副本剖析
Kafka 通過(guò)多副本機(jī)制實(shí)現(xiàn)故障自動(dòng)轉(zhuǎn)移,在 Kafka 集群中某個(gè) broker 節(jié)點(diǎn)失效的情況下仍然保證服務(wù)可用。

我們?cè)撊绾未_保副本中所有的數(shù)據(jù)都是一致的呢?特別是對(duì)Kafka而言,當(dāng)生產(chǎn)者發(fā)送消息到某個(gè)主題后,消息是如何同步到對(duì)應(yīng)的所有副本中的呢?針對(duì)這個(gè)問(wèn)題,最常見(jiàn)的解決方案就是采用基于領(lǐng)導(dǎo)者(Leader-based)的副本機(jī)制。

第一,副本分成兩類(lèi):領(lǐng)導(dǎo)者(Leader Replica)和追隨者(Follower Replica)。
第二,F(xiàn)ollower副本是不對(duì)外提供服務(wù)的。這就是說(shuō),任何一個(gè)追隨者副本都不能響應(yīng)消費(fèi)者和生產(chǎn)者的讀寫(xiě)請(qǐng)求。所有的請(qǐng)求都必須由領(lǐng)導(dǎo)者副本來(lái)處理,或者說(shuō),所有的讀寫(xiě)請(qǐng)求都必須發(fā)往領(lǐng)導(dǎo)者副本所在的Broker,由該Broker負(fù)責(zé)處理。
第三,當(dāng)領(lǐng)導(dǎo)者副本掛掉了,或者說(shuō)領(lǐng)導(dǎo)者副本所在的Broker宕機(jī)時(shí),Kafka依托于ZooKeeper提供的監(jiān)控功能能夠?qū)崟r(shí)感知到,并立即開(kāi)啟新一輪的領(lǐng)導(dǎo)者選舉,從追隨者副本中選一個(gè)作為新的領(lǐng)導(dǎo)者。老Leader副本重啟回來(lái)后,只能作為追隨者副本加入到集群中。
ISR AR
- 分區(qū)中的所有副本統(tǒng)稱(chēng)為 AR,而 ISR 是指與 leader 副本保持同步狀態(tài)的副本集合,當(dāng)然 leader 副本本身也是這個(gè)集合中的一員。

失效副本
正常情況下,分區(qū)的所有副本都處于 ISR 集合中,但是難免會(huì)有異常情況發(fā)生,從而某些副本被剝離出 ISR 集合中。在 ISR 集合之外,也就是處于同步失效或功能失效(比如副本處于非存活狀態(tài))的副本統(tǒng)稱(chēng)為失效副本,失效副本對(duì)應(yīng)的分區(qū)也就稱(chēng)為同步失效分區(qū),即 under-replicated 分區(qū)。
bin/kafka-topics.sh --zookeeper localhost: 2181/kafka --describe --topic topic-partitions --under-replicated-partitions
[root@node1 kafka_2.11-2.0.0]# bin/kafka-topics.sh --zookeeper localhost:2181/ kafka --describe --topic topic-partitions ]
Topic: topic-partitions Partition: 0 Leader: 1 Replicas: 1,2,0 Isr: 1,0
Topic: topic-partitions Partition: 1 Leader: 0 Replicas: 2,0,1 Isr: 0,1
Topic: topic-partitions Partition: 2 Leader: 0 Replicas: 0,1,2 Isr: 0,1
分析:

LEO與HW
LEO 標(biāo)識(shí)每個(gè)分區(qū)中最后一條消息的下一個(gè)位置,分區(qū)的每個(gè)副本都有自己的 LEO,ISR 中最小的 LEO 即為 HW,俗稱(chēng)高水位,消費(fèi)者只能拉取到 HW 之前的消息。

分析在拉去數(shù)據(jù)過(guò)程中各個(gè)副本 LEO 和 HW 的變化情況:

#可靠性ack分析
僅依靠副本數(shù)來(lái)支撐可靠性是遠(yuǎn)遠(yuǎn)不夠的,大多數(shù)人還會(huì)想到生產(chǎn)者客戶(hù)端參數(shù) request.required.acks。
- 對(duì)于 acks = 1 的配置,生產(chǎn)者將消息發(fā)送到 leader 副本,leader 副本在成功寫(xiě)入本地日志之后會(huì)告知生產(chǎn)者已經(jīng)成功提交,如下圖所示。如果此時(shí) ISR 集合的 follower 副本還沒(méi)來(lái)得及拉取到 leader 中新寫(xiě)入的消息,leader 就宕機(jī)了,那么此次發(fā)送的消息就會(huì)丟失。

- 對(duì)于 ack = -1(all) 的配置,生產(chǎn)者將消息發(fā)送到 leader 副本,leader 副本在成功寫(xiě)入本地日志之后還要等待 ISR 中的 follower 副本全部同步完成才能夠告知生產(chǎn)者已經(jīng)成功提交,即使此時(shí) leader 副本宕機(jī),消息也不會(huì)丟失。

- 對(duì)于 ack = 0 的配置這意味著producer無(wú)需等待來(lái)自broker的確認(rèn)而繼續(xù)發(fā)送下一批消息。這種情況下數(shù)據(jù)傳輸效率最高,但是數(shù)據(jù)可靠性確是最低的。
#消息交付可靠性保障
至少一次(at least once):消息不會(huì)丟失,但有可能被重復(fù)發(fā)送。
最多一次(at most once):消息可能會(huì)丟失,但絕不會(huì)被重復(fù)發(fā)送。
精確一次(exactly once):消息不會(huì)丟失,也不會(huì)被重復(fù)發(fā)送。
at least once
如果producer收到來(lái)自Kafka broker的確認(rèn)(ack)或者acks = all,
則表示該消息已經(jīng)寫(xiě)入到Kafka。
但如果producer ack超時(shí)或收到錯(cuò)誤,則可能會(huì)重試發(fā)送消息,客戶(hù)端會(huì)認(rèn)為該消息未寫(xiě)入Kafka。
At most once
如果在ack超時(shí)或返回錯(cuò)誤時(shí)producer不重試,
則該消息可能最終不會(huì)寫(xiě)入Kafka,因此不會(huì)傳遞給consumer。
requet.request.ack=0
exactly once
produce1. kafka 0.11.0.0版本引入了idempotent producer機(jī)制,在這個(gè)機(jī)制中同一消息可能被producer發(fā)送多次,但是在broker端只會(huì)寫(xiě)入一次.
2. 冪等producer 能保證單分區(qū)上無(wú)重復(fù)消息;
props.put(“enable.idempotence”, ture)
Producer 需要做兩件事:
1)初始化時(shí)像向 Broker 申請(qǐng)一個(gè) ProducerID
2)為每條消息綁定一個(gè) SequenceNumber
3) broker 保存SequenceNumber。
自動(dòng)ack=-1 && 單分區(qū)去重
#磁盤(pán)順序讀寫(xiě)
kafak采用的是磁盤(pán)順序讀寫(xiě)方式,極大提升了讀寫(xiě)性能。

#高性能頁(yè)緩存
一般磁盤(pán) I/O 的場(chǎng)景有以下幾種:
用戶(hù)調(diào)用標(biāo)準(zhǔn) C 庫(kù)進(jìn)行 I/O 操作,數(shù)據(jù)流為:應(yīng)用程序 buffer→C 庫(kù)標(biāo)準(zhǔn) IObuffer→文件系統(tǒng)頁(yè)緩存→通過(guò)具體文件系統(tǒng)到磁盤(pán)。
用戶(hù)調(diào)用文件 I/O,數(shù)據(jù)流為:應(yīng)用程序 buffer→文件系統(tǒng)頁(yè)緩存→通過(guò)具體文件系統(tǒng)到磁盤(pán)。
用戶(hù)打開(kāi)文件時(shí)使用 O_DIRECT,繞過(guò)頁(yè)緩存直接讀寫(xiě)磁盤(pán)。
臟頁(yè)

#零拷貝
所謂的零拷貝是指將數(shù)據(jù)直接從磁盤(pán)文件復(fù)制到網(wǎng)卡設(shè)備中,而不需要經(jīng)由應(yīng)用程序之手。零拷貝大大提高了應(yīng)用程序的性能,減少了內(nèi)核和用戶(hù)模式之間的上下文切換。

調(diào)用 read() 時(shí),文件 A 中的內(nèi)容被復(fù)制到了內(nèi)核模式下的 Read Buffer 中。
CPU 控制將內(nèi)核模式數(shù)據(jù)復(fù)制到用戶(hù)模式下。
調(diào)用 write() 時(shí),將用戶(hù)模式下的內(nèi)容復(fù)制到內(nèi)核模式下的 Socket Buffer 中。
將內(nèi)核模式下的 Socket Buffer 的數(shù)據(jù)復(fù)制到網(wǎng)卡設(shè)備中傳送。
#零拷貝

常用命令
# zk
cd /usr/local/Cellar/zookeeper/3.4.10/bin
- 安裝、解壓
* 下載后解壓,Zookeeper 的配置文件在 conf 目錄下,有 zoo_sample.cfg 和 log4j.properties,將zoo_sample.cfg 重命名成zoo.cfg,因?yàn)?Zookeeper 在啟動(dòng)時(shí)會(huì)找這個(gè)文件作為默認(rèn)配置文件。
- 啟動(dòng)
`./zkServer start`
- 客戶(hù)端
`./zkCli`
#kafka
cd /Users/vking/tools/kafka_2.11-1.0.1/bin
./kafka-server-start.sh ../config/server.properties
log /tmp/kafka-logs
#創(chuàng)建topic
./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test00
# 查看topic 列表
./kafka-topics.sh --zookeeper 127.0.0.1:2181 --list
__consumer_offsets 這個(gè)是kafka內(nèi)置的topic,保存consumer消費(fèi)的offset的
# 生產(chǎn)
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test_1
#消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test_1 --group group_1 --from-beginning
#查看kafka配置
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type brokers --entity-default --describe
./kafka-configs.sh --zookeeper 127.0.0.1:2181 --entity-type brokers --entity-name 0 --describe
總結(jié)
到這里Kafka 的核心特性基本剖析完了,當(dāng)然kafka也還有其他優(yōu)秀的特性 這里限于篇幅沒(méi)法一一剖析,有疑問(wèn)的同學(xué)我們可以評(píng)論區(qū)交流。
看完文章后,同學(xué)們可以試著去回答一個(gè)面試題:kafka的高可靠、高性能分別是怎么保證的?
