消息中間件的背景分析
場(chǎng)景分析
前面跟著我看過 zk 的源碼,學(xué)過并發(fā)編程的同學(xué)應(yīng)該知道,我們可以使用阻塞隊(duì)列+線程池來實(shí)現(xiàn)生產(chǎn)者消費(fèi)者模式.比如說在一個(gè)應(yīng)用中,A 方法調(diào)用 B 方法去執(zhí)行一些任務(wù)處理.我們可以同步調(diào)用.但是如果這個(gè)時(shí)候請(qǐng)求比較多的情況下,同步調(diào)用比較耗時(shí)會(huì)導(dǎo)致請(qǐng)求阻塞.我們會(huì)使用阻塞隊(duì)列加線程池來實(shí)現(xiàn)異步任務(wù)的處理
那么,問題來了,如果是在分布式系統(tǒng)中,兩個(gè)服務(wù)之間需要通過這種異步隊(duì)列的方式來處理任務(wù),那單進(jìn)程級(jí)別的隊(duì)列就無法解決這個(gè)問題了
因此,引入了消息中間件,也就是把消息處理交給第三方的服務(wù),這個(gè)服務(wù)能夠?qū)崿F(xiàn)數(shù)據(jù)的存儲(chǔ)以及傳輸,使得在分布式架構(gòu)下實(shí)現(xiàn)跨進(jìn)程的遠(yuǎn)程消息通信
所以,簡(jiǎn)單來說: 消息中間件是指利用高效可靠的消息傳輸機(jī)制進(jìn)行平臺(tái)無關(guān)的數(shù)據(jù)交流,并且基于數(shù)據(jù)通信來進(jìn)行分布式系統(tǒng)的集成
思考一下消息中間件的設(shè)計(jì)
可以先從基本的需求開始思考
- 最基本的是要能支持消息的發(fā)送和接收,需要涉及到網(wǎng)絡(luò)通信就一定會(huì)涉及到 NIO
- 消息中心的消息存儲(chǔ)(持久化/非持久化)
- 消息的序列化和反序列化
- 是否跨語言
- 消息的確認(rèn)機(jī)制,如何避免消息重發(fā)
高級(jí)功能
- 消息的有序性
- 是否支持事務(wù)消息
- 消息收發(fā)的性能,對(duì)高并發(fā)大數(shù)據(jù)量的支持
- 是否支持集群
- 消息的可靠性存儲(chǔ)
- 是否支持多協(xié)議
這個(gè)思考的過程其實(shí)就是做需求的整理,然后在使用已有的技術(shù)體系進(jìn)行技術(shù)的實(shí)現(xiàn).而我們所目前階段所去了解的,無非就是別人根據(jù)實(shí)際需求進(jìn)行實(shí)現(xiàn)之后,我們?nèi)绾问褂盟麄兲峁┑?api 進(jìn)行應(yīng)用而已.但是有了這樣一個(gè)全局的思考,那么對(duì)于后續(xù)學(xué)習(xí)這個(gè)技術(shù)本身而言,也顯得很容易了
發(fā)展過程
實(shí)際上消息中間件的發(fā)展也是挺有意思的,我們知道任何一個(gè)技術(shù)的出現(xiàn)都是為了解決實(shí)際問題,這個(gè)問題是 通過一種通用的軟件“總線”也就是一種通信系統(tǒng),解決應(yīng)用程序之間繁重的信息通信工作.最早的小白鼠就是金融交易領(lǐng)域,因?yàn)樵诋?dāng)時(shí)這個(gè)領(lǐng)域中,交易員需要通過不同的終端完成交易,每臺(tái)終端顯示不同的信息.如果接入消息總線,那么交易員只需要在一臺(tái)終端上操作,然后訂閱其他終端感興趣的消息.于是就誕生了發(fā)布訂閱模型(pubsub),同時(shí)誕生了世界上第一個(gè)現(xiàn)代消息隊(duì)列軟件(TIB)The information Bus,TIB 允許開發(fā)者建立一系列規(guī)則去描述消息內(nèi)容,只要消息按照這些規(guī)則發(fā)布出去,任何消費(fèi)者應(yīng)用都能訂閱感興趣的消息.隨著 TIB 帶來的甜頭被廣泛應(yīng)用在各大領(lǐng)域,IBM 也開始研究開發(fā)自己的消息中間件,3 年后 IBM 的消息隊(duì)列 IBM MQ 產(chǎn)品系列發(fā)布,之后的一段時(shí)間 MQ 系列進(jìn)化成了 WebSphere MQ 統(tǒng)治商業(yè)消息隊(duì)列平臺(tái)市場(chǎng)
包括后期微軟也研發(fā)了自己的消息隊(duì)列(MSMQ)
各大廠商紛紛研究自己的 MQ,但是他們是以商業(yè)化模式運(yùn)營(yíng)自己的 MQ 軟件,商業(yè) MQ 想要解決的是應(yīng)用互通的問題,而不是創(chuàng)建標(biāo)準(zhǔn)接口來允許不同 MQ 產(chǎn)品互通.所以有些大型的金融公司可能會(huì)使用來自多個(gè)供應(yīng)商的 MQ 產(chǎn)品,來服務(wù)企業(yè)內(nèi)部不同的應(yīng)用.那么問題來了,如果應(yīng)用已經(jīng)訂閱了 TIB MQ 的消息然后突然需要消費(fèi) IBM MQ 的消息,那么整個(gè)實(shí)現(xiàn)過程會(huì)很麻煩.為了解決這個(gè)問題,在 2001 年誕生了 Java Message Service(JMS),JMS 通過提供公共的 Java API 方式,隱藏單獨(dú) MQ 產(chǎn)品供應(yīng)商的實(shí)現(xiàn)接口,從而跨越了不同 MQ 消費(fèi)和解決互通問題.從技術(shù)層面來說,Java 應(yīng)用程序只需要針對(duì) JMS API 編程,選擇合適的 MQ 驅(qū)動(dòng)即可.JMS 會(huì)處理其他部分.這種方案實(shí)際上是通過單獨(dú)標(biāo)準(zhǔn)化接口來整合很多不同的接口,效果還是不錯(cuò)的,但是碰到了互用性的問題.兩套使用兩種不同編程語言的程序如何通過它們的異步消息傳遞機(jī)制相互通信呢.這個(gè)時(shí)候就需要定義一個(gè)異步消息傳遞的通用標(biāo)準(zhǔn)
所以 AMQP(Advanced Message Queuing Protocol)高級(jí)消息隊(duì)列協(xié)議產(chǎn)生了,它使用了一套標(biāo)準(zhǔn)的底層協(xié)議,加入了許多其他特征來支持互用性,為現(xiàn)代應(yīng)用豐富了消息傳遞需求,針對(duì)標(biāo)準(zhǔn)編碼的任何人都可以和任意 AMQP 供應(yīng)商提供的 MQ 服務(wù)器進(jìn)行交互
除了 JMS 和 AMQP 規(guī)范以外,還有一種 MQTT(Message Queueing Telemetry[特萊米缺] Transport),它是專門為小設(shè)備設(shè)計(jì)的.因?yàn)橛?jì)算性能不高的設(shè)備不能適應(yīng) AMQP 上的復(fù)雜操作,它們需要一種簡(jiǎn)單而且可互用的方式進(jìn)行通信.這是 MQTT 的基本要求,而如今,MQTT 是物聯(lián)網(wǎng)(IOT)生態(tài)系統(tǒng)中主要成分之一
今天要講解的 Kafka,它并沒有遵循上面所說的協(xié)議規(guī)范,注重吞吐量,類似 udp 和 tcp
kafka 的介紹
本期講解的 kafka 是基于 2.0 來講解,所以有些內(nèi)容會(huì)和之前課程講的不太一樣
什么是 Kafka
Kafka 是一款分布式消息發(fā)布和訂閱系統(tǒng),它的特點(diǎn)是高性能,高吞吐量
最早設(shè)計(jì)的目的是作為 LinkedIn 的活動(dòng)流和運(yùn)營(yíng)數(shù)據(jù)的處理管道.這些數(shù)據(jù)主要是用來對(duì)用戶做用戶畫像分析以及服務(wù)器性能數(shù)據(jù)的一些監(jiān)控
所以 kafka 一開始設(shè)計(jì)的目標(biāo)就是作為一個(gè)分布式,高吞吐量的消息系統(tǒng),所以適合運(yùn)用在大數(shù)據(jù)傳輸場(chǎng)景.所以 kafka 在我們大數(shù)據(jù)的課程里面也有講解,但是在 Java 的課程中,我們?nèi)匀恢饕侵v解 kafka 作為分布式消息中間件來講解.不會(huì)去講解到數(shù)據(jù)流的處理這塊
Kafka 的應(yīng)用場(chǎng)景
由于 kafka 具有更好的吞吐量,內(nèi)置分區(qū),冗余及容錯(cuò)性的優(yōu)點(diǎn)(kafka 每秒可以處理幾十萬消息),讓 kafka 成為了一個(gè)很好的大規(guī)模消息處理應(yīng)用的解決方案.所以在企業(yè)級(jí)應(yīng)用長(zhǎng),主要會(huì)應(yīng)用于如下幾個(gè)方面
行為跟蹤
kafka 可以用于跟蹤用戶瀏覽頁面,搜索及其他行為.通過發(fā)布-訂閱模式實(shí)時(shí)記錄到對(duì)應(yīng)的 topic 中,通過后端大數(shù)據(jù)平臺(tái)接入處理分析,并做更進(jìn)一步的實(shí)時(shí)處理和監(jiān)控
日志收集
日志收集方面,有很多比較優(yōu)秀的產(chǎn)品,比如 Apache Flume,很多公司使用 kafka 代理日志聚合.日志聚合表示從服務(wù)器上收集日志文件,然后放到一個(gè)集中的平臺(tái)(文件服務(wù)器)進(jìn)行處理.在實(shí)際應(yīng)用開發(fā)中,我們應(yīng)用程序的 log 都會(huì)輸出到本地的磁盤上,排查問題的話通過 linux 命令來搞定,如果應(yīng)用程序組成了負(fù)載均衡集群,并且集群的機(jī)器有幾十臺(tái)以上,那么想通過日志快速定位到問題,就是很麻煩的事情了.所以一般都會(huì)做一個(gè)日志統(tǒng)一收集平臺(tái)管理 log 日志用來快速查詢重要應(yīng)用的問題.所以很多公司的套路都是把應(yīng)用日志集中到 kafka 上,然后分別導(dǎo)入到 es 和 hdfs 上,用來做實(shí)時(shí)檢索分析和離線統(tǒng)計(jì)數(shù)據(jù)備份等.而另一方面,kafka 本身又提供了很好的 api 來集成日志并且做日志收集
Kafka 本身的架構(gòu)
一個(gè)典型的 kafka 集群包含若干 Producer(可以是應(yīng)用節(jié)點(diǎn)產(chǎn)生的消息,也可以是通過 Flume 收集日志產(chǎn)生的事件),若干個(gè) Broker(kafka 支持水平擴(kuò)展),若干個(gè) Consumer Group,以及一個(gè) zookeeper 集群.kafka 通過 zookeeper 管理集群配置及服務(wù)協(xié)同.Producer 使用 push 模式將消息發(fā)布到 broker,consumer 通過監(jiān)聽使用 pull 模式從 broker 訂閱并消費(fèi)消息
多個(gè) broker 協(xié)同工作,producer 和 consumer 部署在各個(gè)業(yè)務(wù)邏輯中.三者通過 zookeeper 管理協(xié)調(diào)請(qǐng)求和轉(zhuǎn)發(fā).這樣就組成了一個(gè)高性能的分布式消息發(fā)布和訂閱系統(tǒng)圖上有一個(gè)細(xì)節(jié)是和其他 mq 中間件不同的點(diǎn),producer 發(fā)送消息到 broker 的過程是 push,而 consumer 從 broker 消費(fèi)消息的過程是 pull,主動(dòng)去拉數(shù)據(jù).而不是 broker 把數(shù)據(jù)主動(dòng)發(fā)送給 consumer
名詞解釋
Broker
Kafka 集群包含一個(gè)或多個(gè)服務(wù)器,這種服務(wù)器被稱為 broker.broker 端不維護(hù)數(shù)據(jù)的消費(fèi)狀態(tài),提升了性能.直接使用磁盤進(jìn)行存儲(chǔ),線性讀寫,速度快:避免了數(shù)據(jù)在 JVM 內(nèi)存和系統(tǒng)內(nèi)存之間的復(fù)制,減少耗性能的創(chuàng)建對(duì)象和垃圾回收
Producer
負(fù)責(zé)發(fā)布消息到 Kafka broker
Consumer
消息消費(fèi)者,向 Kafka broker 讀取消息的客戶端,consumer 從 broker 拉取(pull)數(shù)據(jù)并進(jìn)行處理
Topic
每條發(fā)布到 Kafka 集群的消息都有一個(gè)類別,這個(gè)類別被稱為 Topic.(物理上不同 Topic 的消息分開存儲(chǔ),邏輯上一個(gè) Topic 的消息雖然保存于一個(gè)或多個(gè) broker 上但用戶只需指定消息的 Topic 即可生產(chǎn)或消費(fèi)數(shù)據(jù)而不必關(guān)心數(shù)據(jù)存于何處)
Partition
Parition 是物理上的概念,每個(gè) Topic 包含一個(gè)或多個(gè) Partition
Consumer Group
每個(gè) Consumer 屬于一個(gè)特定的 Consumer Group(可為每個(gè) Consumer 指定 group name,若不指定 group name 則屬于默認(rèn)的 group)
Topic & Partition
Topic 在邏輯上可以被認(rèn)為是一個(gè) queue,每條消費(fèi)都必須指定它的 Topic,可以簡(jiǎn)單理解為必須指明把這條消息放進(jìn)哪個(gè) queue 里.為了使得 Kafka 的吞吐率可以線性提高,物理上把 Topic 分成一個(gè)或多個(gè) Partition,每個(gè) Partition 在物理上對(duì)應(yīng)一個(gè)文件夾,該文件夾下存儲(chǔ)這個(gè) Partition 的所有消息和索引文件.若創(chuàng)建 topic1 和 topic2 兩個(gè) topic,且分別有 13 個(gè)和 19 個(gè)分區(qū),則整個(gè)集群上會(huì)相應(yīng)會(huì)生成共 32 個(gè)文件夾(本文所用集群共 8 個(gè)節(jié)點(diǎn),此處 topic1 和 topic2 replication-factor 均為 1)
Kafka 的安裝部署
下載 kafka
https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
安裝過程
安裝過程非常簡(jiǎn)單,只需要解壓就行,因?yàn)檫@個(gè)是編譯好之后的可執(zhí)行程序
tar -zxvf kafka_2.11-2.0.0.tgz 解壓
配置 zookeeper
因?yàn)?kafka 依賴于 zookeeper 來做 master 選舉一起其他數(shù)據(jù)的維護(hù),所以需要先啟動(dòng) zookeeper 節(jié)點(diǎn)
kafka 內(nèi)置了 zookeeper 的服務(wù),所以在 bin 目錄下提供了這些腳本
zookeeper-server-start.sh
zookeeper-server-stop.sh
在 config 目錄下,存在一些配置文件
zookeeper.properties server.properties
所以我們可以通過下面的腳本來啟動(dòng) zk 服務(wù),當(dāng)然,也可以自己搭建 zk 的集群來實(shí)現(xiàn)
sh zookeeper-server-start.sh -daemon ../config/zookeeper.properties
啟動(dòng)和停止 kafka
修改 server.properties,增加 zookeeper 的配置
zookeeper.connect=localhost:2181
啟動(dòng) kafka
sh kafka-server-start.sh -damoen config/server.properties
停止 kafka
sh kafka-server-stop.sh -daemon config/server.properties
kafka 的基本操作
創(chuàng)建 topic
sh kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 -- partitions 1 --topic test
Replication-factor 表示該 topic 需要在不同的 broker 中保存幾份,這里設(shè)置成 1,表示在兩個(gè) broker 中保存兩份 Partitions 分區(qū)數(shù)
查看 topic
sh kafka-topics.sh --list --zookeeper localhost:2181
查看 topic 屬性
sh kafka-topics.sh --describe --zookeeper localhost:2181 --topic first_topic
消費(fèi)消息
sh kafka-console-consumer.sh --bootstrap-server 192.168.13.106:9092 --topic test --from-beginning
發(fā)送消息
sh kafka-console-producer.sh --broker-list 192.168.244.128:9092 --topic first_topic
集群環(huán)境安裝
環(huán)境準(zhǔn)備
- 準(zhǔn)備三臺(tái)虛擬機(jī)
- 分別把 kafka 的安裝包部署在三臺(tái)機(jī)器上
修改配置
以下配置修改均為 server.properties
- 分別修改三臺(tái)機(jī)器的 server.properties 配置,同一個(gè)集群中的每個(gè)機(jī)器的 id 必須唯一
broker.id=0
broker.id=1
broker.id=2
- 修改 zookeeper 的連接配置
zookeeper.connect=192.168.13.106:2181
- 修改 listeners 配置
如果配置了 listeners,那么消息生產(chǎn)者和消費(fèi)者會(huì)使用 listeners 的配置來進(jìn)行消息的收發(fā),否則,會(huì)使用 localhost
PLAINTEXT 表示協(xié)議,默認(rèn)是明文,可以選擇其他加密協(xié)議
listeners=PLAINTEXT://192.168.13.102:9092
- 分別啟動(dòng)三臺(tái)服務(wù)器
sh kafka-server-start.sh -daemon ../config/server.properties
消息中間件能做什么
消息中間件主要解決的就是分布式系統(tǒng)之間消息傳遞的問題,它能夠屏蔽各種平臺(tái)以及協(xié)議之間的特性,實(shí)現(xiàn)應(yīng)用程序之間的協(xié)同.舉個(gè)非常簡(jiǎn)單的例子,就拿一個(gè)電商平臺(tái)的注冊(cè)功能來簡(jiǎn)單分析下,用戶注冊(cè)這一個(gè)服務(wù),不單單只是 insert 一條數(shù)據(jù)到數(shù)據(jù)庫里面就完事了,還需要發(fā)送激活郵件,發(fā)送新人紅包或者積分,發(fā)送營(yíng)銷短信等一系列操作.假如說這里面的每一個(gè)操作,都需要消耗 1s,那么整個(gè)注冊(cè)過程就需要耗時(shí) 4s 才能響應(yīng)給用戶
但是我們從注冊(cè)這個(gè)服務(wù)可以看到,每一個(gè)子操作都是相對(duì)獨(dú)立的,同時(shí),基于領(lǐng)域劃分以后,發(fā)送激活郵件,發(fā)送營(yíng)銷短信,贈(zèng)送積分及紅包都屬于不同的子域.所以我們可以對(duì)這些子操作進(jìn)行來實(shí)現(xiàn)異步化執(zhí)行,類似于多線程并行處理的概念
如何實(shí)現(xiàn)異步化呢?用多線程能實(shí)現(xiàn)嗎?多線程當(dāng)然可以實(shí)現(xiàn),只是,消息的持久化,消息的重發(fā)這些條件,多線程并不能滿足.所以需要借助一些開源中間件來解決.而分布式消息隊(duì)列就是一個(gè)非常好的解決辦法,引入分布式消息隊(duì)列以后,架構(gòu)圖就變成這樣了(下圖是異步消息隊(duì)列的場(chǎng)景).通過引入分布式隊(duì)列,就能夠大大提升程序的處理效率,并且還解決了各個(gè)模塊之間的耦合問題
-
這個(gè)是分布式消息隊(duì)列的第一個(gè)解決場(chǎng)景【異步處理】
我們?cè)賮碚归_一種場(chǎng)景,通過分布式消息隊(duì)列來實(shí)現(xiàn)流量整形,比如在電商平臺(tái)的秒殺場(chǎng)景下,流量會(huì)
非常大.通過消息隊(duì)列的方式可以很好的緩解高流量的問題
- 用戶提交過來的請(qǐng)求,先寫入到消息隊(duì)列.消息隊(duì)列是有長(zhǎng)度的,如果消息隊(duì)列長(zhǎng)度超過指定長(zhǎng)度,直接拋棄
- 秒殺的具體核心處理業(yè)務(wù),接收消息隊(duì)列中消息進(jìn)行處理,這里的消息處理能力取決于消費(fèi)端本身的吞吐量
當(dāng)然,消息中間件還有更多應(yīng)用場(chǎng)景,比如在弱一致性事務(wù)模型中,可以采用分布式消息隊(duì)列的實(shí)現(xiàn)最大能力通知方式來實(shí)現(xiàn)數(shù)據(jù)的最終一致性等等
Java 中使用 kafka 進(jìn)行通信
依賴
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>
發(fā)送端代碼
public class Producer extends Thread{
private final KafkaProducer<Integer,String> producer;
private final String topic;
public Producer(String topic){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"practice-producer");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producer = new KafkaProducer<Integer, String>(properties);
this.topic = topic;
}
@Override
public void run(){
int num=0;
while(num < 50){
String msg = "pratice test message:" + num;
try {
producer.send(new ProducerRecord<Integer, String> (topic,msg)).get();
TimeUnit.SECONDS.sleep(2);
num++;
} catch (InterruptedException e){
e.printStackTrace();
} catch (ExecutionException e){
e.printStackTrace();
}
}
}
public static void main(String[] args){
new Producer("test").start();
}
}
消費(fèi)端代碼
public class Consumer extends Thread{
private final KafkaConsumer<Integer,String> consumer;
private final String topic;
public Consumer(String topic){
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.13.102:9092,192 .168.13.103:9092,192.168.13.104:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "practice-consumer");
//設(shè)置 offset 自動(dòng)提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 自動(dòng)提交間隔時(shí)間
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
//對(duì)于 當(dāng)前 groupid 來說,消息的 offset 從最早的消息開始消費(fèi)
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
consumer = new KafkaConsumer<>(properties);
this.topic=topic;
}
@Override
public void run(){
while(true){
consumer.subscribe(Collections.singleton(this.topic));
ConsumerRecords<Integer, String> records = consumer.poll(Duration.ofSeconds(1));
records.forEach(record -> {
System.out.println(record.key()+ " " + record.value()+ " -> offset:" + record.offset());
});
}
}
public static void main(String[] args){
new Consumer("test").start();
}
}
異步發(fā)送
kafka 對(duì)于消息的發(fā)送,可以支持同步和異步,前面演示的案例中,我們是基于同步發(fā)送消息.同步會(huì)需要阻塞,而異步不需要等待阻塞的過程
從本質(zhì)上來說,kafka 都是采用異步的方式來發(fā)送消息到 broker,但是 kafka 并不是每次發(fā)送消息都會(huì)直接發(fā)送到 broker 上,而是把消息放到了一個(gè)發(fā)送隊(duì)列中,然后通過一個(gè)后臺(tái)線程不斷從隊(duì)列取出消息進(jìn)行發(fā)送,發(fā)送成功后會(huì)觸發(fā) callback.kafka 客戶端會(huì)積累一定量的消息統(tǒng)一組裝成一個(gè)批量消息發(fā)送出去,觸發(fā)條件是前面提到的 batch.size 和 linger.ms
而同步發(fā)送的方法,無非就是通過 future.get()來等待消息的發(fā)送返回結(jié)果,但是這種方法會(huì)嚴(yán)重影響消息發(fā)送的性能
batch.size
public void run(){
int num = 0;
while(num < 50){
String msg = "pratice test message:" + num;
try {
producer.send(new ProducerRecord<>(topic, msg), new Callback(){
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e){
System.out.println("callback: " + recordMetadata.offset()+ "->" + recordMetadata.partition());
}
});
TimeUnit.SECONDS.sleep(2);
num++;
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
生產(chǎn)者發(fā)送多個(gè)消息到 broker 上的同一個(gè)分區(qū)時(shí),為了減少網(wǎng)絡(luò)請(qǐng)求帶來的性能開銷,通過批量的方式來提交消息,可以通過這個(gè)參數(shù)來控制批量提交的字節(jié)數(shù)大小,默認(rèn)大小是 16384byte,也就是 16kb,意味著當(dāng)一批消息大小達(dá)到指定的 batch.size 的時(shí)候會(huì)統(tǒng)一發(fā)送
linger.ms
Producer 默認(rèn)會(huì)把兩次發(fā)送時(shí)間間隔內(nèi)收集到的所有 Requests 進(jìn)行一次聚合然后再發(fā)送,以此提高吞吐量,而 linger.ms 就是為每次發(fā)送到 broker 的請(qǐng)求增加一些 delay,以此來聚合更多的 Message 請(qǐng)求.這個(gè)有點(diǎn)想 TCP 里面的 Nagle 算法,在 TCP 協(xié)議的傳輸中,為了減少大量小數(shù)據(jù)包的發(fā)送,采用了 Nagle 算法,也就是基于小包的等-停協(xié)議
batch.size 和 linger.ms 這兩個(gè)參數(shù)是 kafka 性能優(yōu)化的關(guān)鍵參數(shù),很多同學(xué)會(huì)發(fā)現(xiàn) batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個(gè)都配置了,那么怎么工作的呢?實(shí)際上,當(dāng)二者都配置的時(shí)候,只要滿足其中一個(gè)要求,就會(huì)發(fā)送請(qǐng)求到 broker 上
一些基礎(chǔ)配置分析
group.id
consumer group 是 kafka 提供的可擴(kuò)展且具有容錯(cuò)性的消費(fèi)者機(jī)制.既然是一個(gè)組,那么組內(nèi)必然可以有多個(gè)消費(fèi)者或消費(fèi)者實(shí)例(consumer instance),它們共享一個(gè)公共的 ID,即 group ID.組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題(subscribed topics)的所有分區(qū)(partition).當(dāng)然,每個(gè)分區(qū)只能由同一個(gè)消費(fèi)組內(nèi)的一個(gè) consumer 來消費(fèi).如下圖所示,分別有三個(gè)消費(fèi)者,屬于兩個(gè)不同的 group,那么對(duì)于 firstTopic 這個(gè) topic 來說,這兩個(gè)組的消費(fèi)者都能同時(shí)消費(fèi)這個(gè) topic 中的消息,對(duì)于此事的架構(gòu)來說,這個(gè) firstTopic 就類似于 ActiveMQ 中的 topic 概念.如右圖所示,如果 3 個(gè)消費(fèi)者都屬于同一個(gè) group,那么此事 firstTopic 就是一個(gè) Queue 的概念
enable.auto.commit
消費(fèi)者消費(fèi)消息以后自動(dòng)提交,只有當(dāng)消息提交以后,該消息才不會(huì)被再次接收到,還可以配合 auto.commit.interval.ms 控制自動(dòng)提交的頻率
當(dāng)然,我們也可以通過 consumer.commitSync()的方式實(shí)現(xiàn)手動(dòng)提交
auto.offset.reset
這個(gè)參數(shù)是針對(duì)新的 groupid 中的消費(fèi)者而言的,當(dāng)有新 groupid 的消費(fèi)者來消費(fèi)指定的 topic 時(shí),對(duì)于該參數(shù)的配置,會(huì)有不同的語義
auto.offset.reset=latest 情況下,新的消費(fèi)者將會(huì)從其他消費(fèi)者最后消費(fèi)的 offset 處開始消費(fèi) Topic 下的消息
auto.offset.reset= earliest 情況下,新的消費(fèi)者會(huì)從該 topic 最早的消息開始消費(fèi)
auto.offset.reset=none 情況下,新的消費(fèi)者加入以后,由于之前不存在 offset,則會(huì)直接拋出異常
max.poll.records
此設(shè)置限制每次調(diào)用 poll 返回的消息數(shù),這樣可以更容易的預(yù)測(cè)每次 poll 間隔要處理的最大值.通過調(diào)整此值,可以減少 poll 間隔
Springboot + kafka
springboot 的版本和 kafka 的版本,有一個(gè)對(duì)照表格,如果沒有按照正確的版本來引入,那么會(huì)存在版本問題導(dǎo)致 ClassNotFound 的問題,具體請(qǐng)參考
https://spring.io/projects/spring-kafka
jar 包依賴
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.0.RELEASE</version>
</dependency>
KafkaProducer
@Component
public class KafkaProducer {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
public void send(){
kafkaTemplate.send("test","msgKey","msgData");
}
}
KafkaConsumer
@Component
public class KafkaConsumer {
@KafkaListener(topics = {"test"})
public void listener(ConsumerRecord record){
Optional<?> msg = Optional.ofNullable(record.value());
if (msg.isPresent()){
System.out.println(msg.get());
}
}
}
application 配置
spring.kafka.bootstrap-servers=192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.group-id=test-consumer-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
測(cè)試
public static void main(String[] args){
ConfigurableApplicationContext context = SpringApplication.run(KafkaDemoApplication.class, args);
KafkaProducer kafkaProducer = context.getBean(KafkaProducer.class);
for (int i = 0; i < 3; i++){
kafkaProducer.send();
try {
Thread.sleep(3000);
} catch (InterruptedException e){
e.printStackTrace();
}
}
}
原理分析
從前面的整個(gè)演示過程來看,只要不是超大規(guī)模的使用 kafka,那么基本上沒什么大問題,否則,對(duì)于 kafka 本身的運(yùn)維的挑戰(zhàn)會(huì)很大,同時(shí),針對(duì)每一個(gè)參數(shù)的調(diào)優(yōu)也顯得很重要
據(jù)我了解,快手在使用 kafka 集群規(guī)模是挺大的,他們?cè)?19 年的開發(fā)者大會(huì)上有提到,總機(jī)器數(shù)大概 2000 臺(tái);30 多個(gè)集群;topic 12000 個(gè);一共大概 20 萬 TP(topic partition);每天總處理的消息 數(shù)超過 4 萬億條;峰值超過 1 億條
技術(shù)的使用是最簡(jiǎn)單的,要想掌握核心價(jià)值,就勢(shì)必要了解一些原理,在設(shè)計(jì)這個(gè)課程的時(shí)候,我想了很久應(yīng)該從哪個(gè)地方著手,最后還是選擇從最基礎(chǔ)的消息通訊的原理著手
關(guān)于 Topic 和 Partition
Topic
在 kafka 中,topic 是一個(gè)存儲(chǔ)消息的邏輯概念,可以認(rèn)為是一個(gè)消息集合.每條消息發(fā)送到 kafka 集群的消息都有一個(gè)類別.物理上來說,不同的 topic 的消息是分開存儲(chǔ)的,每個(gè) topic 可以有多個(gè)生產(chǎn)者向它發(fā)送消息,也可以有多個(gè)消費(fèi)者去消費(fèi)其中的消息
Partition
每個(gè) topic 可以劃分多個(gè)分區(qū)(每個(gè) Topic 至少有一個(gè)分區(qū)),同一 topic 下的不同分區(qū)包含的消息是不同的.每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè) offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號(hào),kafka 通過 offset 保證消息在分區(qū)內(nèi)的順序,offset 的順序不跨分區(qū),即 kafka 只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的
下圖中,對(duì)于名字為 test 的 topic,做了 3 個(gè)分區(qū),分別是 p0,p1,p2
-
每一條消息發(fā)送到 broker 時(shí),會(huì)根據(jù) partition 的規(guī)則選擇存儲(chǔ)到哪一個(gè) partition.如果 partition 規(guī)則設(shè)置合理,那么所有的消息會(huì)均勻的分布在不同的 partition 中,這樣就有點(diǎn)類似數(shù)據(jù)庫的分庫分表的概念,把數(shù)據(jù)做了分片處理
Topic&Partition 的存儲(chǔ)
Partition 是以文件的形式存儲(chǔ)在文件系統(tǒng)中,比如創(chuàng)建一個(gè)名為 firstTopic 的 topic,其中有 3 個(gè) partition,那么在 kafka 的數(shù)據(jù)目錄(/tmp/kafka-log)中就有 3 個(gè)目錄,firstTopic-0~3,命名規(guī)則是<topic_name>-<partition_id>
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 1 --partitions 3 --topic firstTopic
關(guān)于消息分發(fā)
kafka 消息分發(fā)策略
消息是 kafka 中最基本的數(shù)據(jù)單元,在 kafka 中,一條消息由 key,value 兩部分構(gòu)成,在發(fā)送一條消息時(shí),我們可以指定這個(gè) key,那么 producer 會(huì)根據(jù) key 和 partition 機(jī)制來判斷當(dāng)前這條消息應(yīng)該發(fā)送并存儲(chǔ)到哪個(gè) partition 中.我們可以根據(jù)需要進(jìn)行擴(kuò)展 producer 的 partition 機(jī)制
代碼演示
自定義 Partitioner
public class MyPartitioner implements Partitioner {
private Random random = new Random();
@Override
public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster){
//獲取集群中指定 topic 的所有分區(qū)信息
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);
int numOfPartition = partitionInfos.size();
int partitionNum = 0;
if (o == null){
//key 沒有設(shè)置
partitionNum = random.nextInt(numOfPartition);
//隨機(jī)指定分區(qū)
} else {
partitionNum = Math.abs((o1.hashCode()))% numOfPartition;
}
System.out.println("key->" + o + ",value->" + o1 + "->send to partition:" + partitionNum);
return partitionNum;
}
}
發(fā)送端代碼添加自定義分區(qū)
public KafkaProducerDemo(String topic,boolean isAysnc){
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092");
properties.put(ProducerConfig.CLIENT_ID_CONFIG,"KafkaProducerDemo");
properties.put(ProducerConfig.ACKS_CONFIG,"-1");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.gupaoedu.kafka.MyPa rtitioner");
producer = new KafkaProducer<Integer, String>(properties);
this.topic=topic;
this.isAysnc=isAysnc;
}
消息默認(rèn)的分發(fā)機(jī)制
默認(rèn)情況下,kafka 采用的是 hash 取模的分區(qū)算法.如果 Key 為 null,則會(huì)隨機(jī)分配一個(gè)分區(qū).這個(gè)隨機(jī)是在這個(gè)參數(shù)”metadata.max.age.ms”的時(shí)間范圍內(nèi)隨機(jī)選擇一個(gè).對(duì)于這個(gè)時(shí)間段內(nèi),如果 key 為 null,則只會(huì)發(fā)送到唯一的分區(qū).這個(gè)值值哦默認(rèn)情況下是 10 分鐘更新一次
關(guān)于 Metadata,這個(gè)之前沒講過,簡(jiǎn)單理解就是 Topic/Partition 和 broker 的映射關(guān)系,每一個(gè) topic 的每一個(gè) partition,需要知道對(duì)應(yīng)的 broker 列表是什么,leader 是誰,follower 是誰.這些信息都是存儲(chǔ)在 Metadata 這個(gè)類里面
消費(fèi)端如何消費(fèi)指定的分區(qū)
通過下面的代碼,就可以消費(fèi)指定該 topic 下的 0 號(hào)分區(qū).其他分區(qū)的數(shù)據(jù)就無法接收
//消費(fèi)指定分區(qū)的時(shí)候,不需要再訂閱
//kafkaConsumer.subscribe(Collections.singletonList(topic));
//消費(fèi)指定的分區(qū)
TopicPartition topicPartition = new TopicPartition(topic,0);
kafkaConsumer.assign(Arrays.asList(topicPartition));
消息的消費(fèi)原理
kafka 消息消費(fèi)原理演示
在實(shí)際生產(chǎn)過程中,每個(gè) topic 都會(huì)有多個(gè) partitions,多個(gè) partitions 的好處在于,一方面能夠?qū)?broker 上的數(shù)據(jù)進(jìn)行分片有效減少了消息的容量從而提升 io 性能.另外一方面,為了提高消費(fèi)端的消費(fèi)能力,一般會(huì)通過多個(gè) consumer 去消費(fèi)同一個(gè) topic ,也就是消費(fèi)端的負(fù)載均衡機(jī)制,也就是我們接下來要了解的,在多個(gè) partition 以及多個(gè) consumer 的情況下,消費(fèi)者是如何消費(fèi)消息的
同時(shí),在上一節(jié)課,我們講了,kafka 存在 consumer group 的概念,也就是 group.id 一樣的 consumer,這些 consumer 屬于一個(gè) consumer group,組內(nèi)的所有消費(fèi)者協(xié)調(diào)在一起來消費(fèi)訂閱主題的所有分區(qū).當(dāng)然每一個(gè)分區(qū)只能由同一個(gè)消費(fèi)組內(nèi)的 consumer 來消費(fèi),那么同一個(gè) consumer group 里面的 consumer 是怎么去分配該消費(fèi)哪個(gè)分區(qū)里的數(shù)據(jù)的呢?如下圖所示,3 個(gè)分區(qū),3 個(gè)消費(fèi)者,那么哪個(gè)消費(fèi)者消分哪個(gè)分區(qū)?
對(duì)于上面這個(gè)圖來說,這 3 個(gè)消費(fèi)者會(huì)分別消費(fèi) test 這個(gè) topic 的 3 個(gè)分區(qū),也就是每個(gè) consumer 消費(fèi)一個(gè) partition
代碼演示(3 個(gè) partiton 對(duì)應(yīng) 3 個(gè) consumer)
- 創(chuàng)建一個(gè)帶 3 個(gè)分區(qū)的 topic
- 啟動(dòng) 3 個(gè)消費(fèi)者消費(fèi)同一個(gè) topic,并且這 3 個(gè) consumer 屬于同一個(gè)組
- 啟動(dòng)發(fā)送者進(jìn)行消息發(fā)送
演示結(jié)果:consumer1 會(huì)消費(fèi) partition0 分區(qū),consumer2 會(huì)消費(fèi) partition1 分區(qū),consumer3 會(huì)消費(fèi) partition2 分區(qū)
如果是 2 個(gè) consumer 消費(fèi) 3 個(gè) partition 呢?會(huì)是怎么樣的結(jié)果?
代碼演示(3 個(gè) partiton 對(duì)應(yīng) 2 個(gè) consumer)
- 基于上面演示的案例的 topic 不變
- 啟動(dòng) 2 個(gè)消費(fèi)這消費(fèi)該 topic
- 啟動(dòng)發(fā)送者進(jìn)行消息發(fā)送
演示結(jié)果:consumer1 會(huì)消費(fèi) partition0/partition1 分區(qū),consumer2 會(huì)消費(fèi) partition2 分區(qū)
代碼演示(3 個(gè) partition 對(duì)應(yīng) 4 個(gè)或以上 consumer)
演示結(jié)果:仍然只有 3 個(gè) consumer 對(duì)應(yīng) 3 個(gè) partition,其他的 consumer 無法消費(fèi)消息
通過這個(gè)演示的過程,我希望引出接下來需要了解的 kafka 的分區(qū)分配策略(Partition Assignment Strategy)
consumer 和 partition 的數(shù)量建議
- 如果 consumer 比 partition 多,是浪費(fèi),因?yàn)?kafka 的設(shè)計(jì)是在一個(gè) partition 上是不允許并發(fā)的,所以 consumer 數(shù)不要大于 partition 數(shù)
- 如果 consumer 比 partition 少,一個(gè) consumer 會(huì)對(duì)應(yīng)于多個(gè) partitions,這里主要合理分配 consumer 數(shù)和 partition 數(shù),否則會(huì)導(dǎo)致 partition 里面的數(shù)據(jù)被取的不均勻.最好 partiton 數(shù)目是 consumer 數(shù)目的整數(shù)倍,所以 partition 數(shù)目很重要,比如取 24,就很容易設(shè)定 consumer 數(shù)目
- 如果 consumer 從多個(gè) partition 讀到數(shù)據(jù),不保證數(shù)據(jù)間的順序性,kafka 只保證在一個(gè) partition 上數(shù)據(jù)是有序的,但多個(gè) partition,根據(jù)你讀的順序會(huì)有不同
- 增減 consumer,broker,partition 會(huì)導(dǎo)致 rebalance,所以 rebalance 后 consumer 對(duì)應(yīng)的 partition 會(huì)發(fā)生變化
什么時(shí)候會(huì)觸發(fā)這個(gè)策略呢?
當(dāng)出現(xiàn)以下幾種情況時(shí),kafka 會(huì)進(jìn)行一次分區(qū)分配操作,也就是 kafka consumer 的 rebalance
- 同一個(gè) consumer group 內(nèi)新增了消費(fèi)者
- 消費(fèi)者離開當(dāng)前所屬的 consumer group,比如主動(dòng)停機(jī)或者宕機(jī)
- topic 新增了分區(qū)(也就是分區(qū)數(shù)量發(fā)生了變化)
kafka consuemr 的 rebalance 機(jī)制規(guī)定了一個(gè) consumer group 下的所有 consumer 如何達(dá)成一致來分配訂閱 topic 的每個(gè)分區(qū).而具體如何執(zhí)行分區(qū)策略,就是前面提到過的兩種內(nèi)置的分區(qū)策略.而 kafka 對(duì)于分配策略這塊,提供了可插拔的實(shí)現(xiàn)方式,也就是說,除了這兩種之外,我們還可以創(chuàng)建自己的分配機(jī)制
什么是分區(qū)分配策略
通過前面的案例演示,我們應(yīng)該能猜到,同一個(gè) group 中的消費(fèi)者對(duì)于一個(gè) topic 中的多個(gè) partition,存在一定的分區(qū)分配策略
在 kafka 中,存在三種分區(qū)分配策略,一種是 Range(默認(rèn)),另一種是 RoundRobin(輪詢),StickyAssignor(粘性).在消費(fèi)端中的 ConsumerConfig 中,通過這個(gè)屬性來指定分區(qū)分配策略
public static final String PARTITION_ASSIGNMENT_STRATEGY_CONFIG = "partition.assignment.strategy";
RangeAssignor(范圍分區(qū))
Range 策略是對(duì)每個(gè)主題而言的,首先對(duì)同一個(gè)主題里面的分區(qū)按照序號(hào)進(jìn)行排序,并對(duì)消費(fèi)者按照字母順序進(jìn)行排序
假設(shè) n = 分區(qū)數(shù)/消費(fèi)者數(shù)量
m= 分區(qū)數(shù)%消費(fèi)者數(shù)量
那么前 m 個(gè)消費(fèi)者每個(gè)分配 n+l 個(gè)分區(qū),后面的(消費(fèi)者數(shù)量-m)個(gè)消費(fèi)者每個(gè)分配 n 個(gè)分區(qū)
假設(shè)我們有 10 個(gè)分區(qū),3 個(gè)消費(fèi)者,排完序的分區(qū)將會(huì)是 0,1,2,3,4,5,6,7,8,9;消費(fèi)者線程排完序?qū)?huì)是 C1-0,C2-0,C3-0.然后將 partitions 的個(gè)數(shù)除于消費(fèi)者線程的總數(shù)來決定每個(gè)消費(fèi)者線程消費(fèi)幾個(gè)分區(qū).如果除不盡,那么前面幾個(gè)消費(fèi)者線程將會(huì)多消費(fèi)一個(gè)分區(qū).在我們的例子里面,我們有 10 個(gè)分區(qū),3 個(gè)消費(fèi)者線程,10 / 3 = 3,而且除不盡,那么消費(fèi)者線程 C1-0 將會(huì)多消費(fèi)一個(gè)分區(qū)的結(jié)果看起來是這樣的:
C1-0 將消費(fèi) 0,1,2,3 分區(qū)
C2-0 將消費(fèi) 4,5,6 分區(qū)
C3-0 將消費(fèi) 7,8,9 分區(qū)
假如我們有11個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0 將消費(fèi) 0,1,2,3 分區(qū)
C2-0 將消費(fèi) 4,5,6,7 分區(qū)
C3-0 將消費(fèi) 8,9,10 分區(qū)
假如我們有 2 個(gè)主題(T1 和 T2),分別有 10 個(gè)分區(qū),那么最后分區(qū)分配的結(jié)果看起來是這樣的:
C1-0 將消費(fèi) T1 主題的 0,1,2,3 分區(qū)以及 T2 主題的 0,1,2,3 分區(qū)
C2-0 將消費(fèi) T1 主題的 4,5,6 分區(qū)以及 T2 主題的 4,5,6 分區(qū)
C3-0 將消費(fèi) T1 主題的 7,8,9 分區(qū)以及 T2 主題的 7,8,9 分區(qū)
可以看出,C1-0 消費(fèi)者線程比其他消費(fèi)者線程多消費(fèi)了 2 個(gè)分區(qū),這就是 Range strategy 的一個(gè)很明顯的弊端
RoundRobinAssignor(輪詢分區(qū))
輪詢分區(qū)策略是把所有 partition 和所有 consumer 線程都列出來,然后按照 hashcode 進(jìn)行排序.最后通過輪詢算法分配 partition 給消費(fèi)線程.如果所有 consumer 實(shí)例的訂閱是相同的,那么 partition 會(huì)均勻分布
在我們的例子里面,假如按照 hashCode 排序完的 topic-partitions 組依次為 T1-5,T1-3,T1-0,T1-8,T1-2,T1-1,T1-4,T1-7,T1-6,T1-9,我們的消費(fèi)者線程排序?yàn)?C1-0,C1-1,C2-0,C2-1,最后分區(qū)分配的結(jié)果為:
C1-0 將消費(fèi) T1-5,T1-2,T1-6 分區(qū);
C1-1 將消費(fèi) T1-3,T1-1,T1-9 分區(qū);
C2-0 將消費(fèi) T1-0,T1-4 分區(qū);
C2-1 將消費(fèi) T1-8,T1-7 分區(qū);
使用輪詢分區(qū)策略必須滿足兩個(gè)條件
- 每個(gè)主題的消費(fèi)者實(shí)例具有相同數(shù)量的流
- 每個(gè)消費(fèi)者訂閱的主題必須是相同的
StrickyAssignor 分配策略
kafka 在 0.11.x 版本支持了 StrickyAssignor,翻譯過來叫粘滯策略,它主要有兩個(gè)目的
- 分區(qū)的分配盡可能的均勻
- 分區(qū)的分配盡可能和上次分配保持相同
當(dāng)兩者發(fā)生沖突時(shí),第一個(gè)目標(biāo)優(yōu)先于第二個(gè)目標(biāo).鑒于這兩個(gè)目標(biāo),StickyAssignor 分配策略的具體實(shí)現(xiàn)要比 RangeAssignor 和 RoundRobinAssi gn or 這兩種分配策略要復(fù)雜得多,假設(shè)我們有這樣一個(gè)場(chǎng)景
假設(shè)消費(fèi)組有 3 個(gè)消費(fèi)者:C0,C1,C2,它們分別訂閱了 4 個(gè) Topic(t0,t1,t2,t3),并且每個(gè)主題有兩個(gè)分區(qū)(p0,p1),也就是說,整個(gè)消費(fèi)組訂閱了 8 個(gè)分區(qū):tOpO ,tOpl ,tlpO ,tlpl ,t2p0 ,t2pl ,t3p0 ,t3pl
那么最終的分配場(chǎng)景結(jié)果為
CO: tOpO,tlpl ,t3p0
Cl: tOpl,t2p0 ,t3pl
C2: tlpO,t2pl
這種分配方式有點(diǎn)類似于輪詢策略,但實(shí)際上并不是,因?yàn)榧僭O(shè)這個(gè)時(shí)候,C1 這個(gè)消費(fèi)者掛了,就勢(shì)必會(huì)造成重新分區(qū)(reblance),如果是輪詢,那么結(jié)果應(yīng)該是
CO: tOpO,tlpO,t2p0,t3p0
C2: tOpl,tlpl,t2pl,t3pl
然后,strickyAssignor 它是一種粘滯策略,所以它會(huì)滿足分區(qū)的分配盡可能和上次分配保持相同,所以分配結(jié)果應(yīng)該是
消費(fèi)者 CO: tOpO,tlpl ,t3p0,t2p0
消費(fèi)者 C2: tlpO,t2pl,tOpl,t3pl
也就是說,C0 和 C2 保留了上一次是的分配結(jié)果,并且把原來 C1 的分區(qū)分配給了 C0 和 C2.這種策略的好處是 使得分區(qū)發(fā)生變化時(shí),由于分區(qū)的“粘性,減少了不必要的分區(qū)移動(dòng)
誰來執(zhí)行 Rebalance 以及管理 consumer 的 group 呢?
Kafka 提供了一個(gè)角色:coordinator 來執(zhí)行對(duì)于 consumer group 的管理,Kafka 提供了一個(gè)角色:coordinator 來執(zhí)行對(duì)于 consumer group 的管理,當(dāng) consumer group 的第一個(gè) consumer 啟動(dòng)的時(shí)候,它會(huì)去和 kafka server 確定誰是它們組的 coordinator.之后該 group 內(nèi)的所有成員都會(huì)和該 coordinator 進(jìn)行協(xié)調(diào)通信
如何確定 coordinator
consumer group 如何確定自己的 coordinator 是誰呢,消費(fèi)者向 kafka 集群中的任意一個(gè) broker 發(fā)送一個(gè) GroupCoordinatorRequest 請(qǐng)求,服務(wù)端會(huì)返回一個(gè)負(fù)載最小的 broker 節(jié)點(diǎn)的 id,并將該 broker 設(shè)置為 coordinator
JoinGroup 的過程
在 rebalance 之前,需要保證 coordinator 是已經(jīng)確定好了的,整個(gè) rebalance 的過程分為兩個(gè)步驟,Join 和 Sync
-
join: 表示加入到 consumer group 中,在這一步中,所有的成員都會(huì)向 coordinator 發(fā)送 joinGroup 的請(qǐng)求.一旦所有成員都發(fā)送了 joinGroup 請(qǐng)求,那么 coordinator 會(huì)選擇一個(gè) consumer 擔(dān)任 leader 角色,并把組成員信息和訂閱信息發(fā)送消費(fèi)者
leader 選舉算法比較簡(jiǎn)單,如果消費(fèi)組內(nèi)沒有 leader,那么第一個(gè)加入消費(fèi)組的消費(fèi)者就是消費(fèi)者 leader,如果這個(gè)時(shí)候 leader 消費(fèi)者退出了消費(fèi)組,那么重新選舉一個(gè) leader,這個(gè)選舉很隨意,類似于隨機(jī)算法
protocol_metadata: 序列化后的消費(fèi)者的訂閱信息
leader_id: 消費(fèi)組中的消費(fèi)者,coordinator 會(huì)選擇一個(gè)座位 leader,對(duì)應(yīng)的就是 member_id
member_metadata 對(duì)應(yīng)消費(fèi)者的訂閱信息
members:consumer group 中全部的消費(fèi)者的訂閱信息
generation_id: 年代信息,類似于之前講解 zookeeper 的時(shí)候的 epoch 是一樣的,對(duì)于每一輪 rebalance,generation_id 都會(huì)遞增.主要用來保護(hù) consumer group.隔離無效的 offset 提交.也就是上一輪的 consumer 成員無法提交 offset 到新的 consumer group 中
每個(gè)消費(fèi)者都可以設(shè)置自己的分區(qū)分配策略,對(duì)于消費(fèi)組而言,會(huì)從各個(gè)消費(fèi)者上報(bào)過來的分區(qū)分配策略中選舉一個(gè)彼此都贊同的策略來實(shí)現(xiàn)整體的分區(qū)分配,這個(gè)"贊同"的規(guī)則是,消費(fèi)組內(nèi)的各個(gè)消費(fèi)者會(huì)通過投票來決定
- 在 joingroup 階段,每個(gè) consumer 都會(huì)把自己支持的分區(qū)分配策略發(fā)送到 coordinator
- coordinator 手機(jī)到所有消費(fèi)者的分配策略,組成一個(gè)候選集
- 每個(gè)消費(fèi)者需要從候選集里找出一個(gè)自己支持的策略,并且為這個(gè)策略投票
- 最終計(jì)算候選集中各個(gè)策略的選票數(shù),票數(shù)最多的就是當(dāng)前消費(fèi)組的分配策略
Synchronizing Group State 階段
完成分區(qū)分配之后,就進(jìn)入了 Synchronizing Group State 階段,主要邏輯是向 GroupCoordinator 發(fā)送 SyncGroupRequest 請(qǐng)求,并且處理 SyncGroupResponse 響應(yīng),簡(jiǎn)單來說,就是 leader 將消費(fèi)者對(duì)應(yīng)的 partition 分配方案同步給 consumer group 中的所有 consumer
每個(gè)消費(fèi)者都會(huì)向 coordinator 發(fā)送 syncgroup 請(qǐng)求,不過只有 leader 節(jié)點(diǎn)會(huì)發(fā)送分配方案,其他消費(fèi)者只是打打醬油而已.當(dāng) leader 把方案發(fā)給 coordinator 以后,coordinator 會(huì)把結(jié)果設(shè)置到 SyncGroupResponse 中.這樣所有成員都知道自己應(yīng)該消費(fèi)哪個(gè)分區(qū)
consumer group 的分區(qū)分配方案是在客戶端執(zhí)行的!Kafka 將這個(gè)權(quán)利下放給客戶端主要是因?yàn)檫@樣做可以有更好的靈活性
總結(jié)
我們?cè)賮砜偨Y(jié)一下 consumer group rebalance 的過程
- 對(duì)于每個(gè) consumer group 子集,都會(huì)在服務(wù)端對(duì)應(yīng)一個(gè) GroupCoordinator 進(jìn)行管理,GroupCoordinator 會(huì)在 zookeeper 上添加 watcher,當(dāng)消費(fèi)者加入或者退出 consumer group 時(shí),會(huì)修改 zookeeper 上保存的數(shù)據(jù),從而觸發(fā) GroupCoordinator 開始 Rebalance 操作
- 當(dāng)消費(fèi)者準(zhǔn)備加入某個(gè) Consumer group 或者 GroupCoordinator 發(fā)生故障轉(zhuǎn)移時(shí),消費(fèi)者并不知道 GroupCoordinator 的在網(wǎng)絡(luò)中的位置,這個(gè)時(shí)候就需要確定 GroupCoordinator,消費(fèi)者會(huì)向集群中的任意一個(gè) Broker 節(jié)點(diǎn)發(fā)送 ConsumerMetadataRequest 請(qǐng)求,收到請(qǐng)求的 broker 會(huì)返回一個(gè) response 作為響應(yīng),其中包含管理當(dāng)前 ConsumerGroup 的 GroupCoordinator
- 消費(fèi)者會(huì)根據(jù) broker 的返回信息,連接到 groupCoordinator,并且發(fā)送 HeartbeatRequest,發(fā)送心跳的目的是要要奧噶蘇 GroupCoordinator 這個(gè)消費(fèi)者是正常在線的.當(dāng)消費(fèi)者在指定時(shí)間內(nèi)沒有發(fā)送心跳請(qǐng)求,則 GroupCoordinator 會(huì)觸發(fā) Rebalance 操作
- 發(fā)起 join group 請(qǐng)求,兩種情況
- 如果 GroupCoordinator 返回的心跳包數(shù)據(jù)包含異常,說明 GroupCoordinator 因?yàn)榍懊嬲f的幾種情況導(dǎo)致了 Rebalance 操作,那這個(gè)時(shí)候,consumer 會(huì)發(fā)起 join group 請(qǐng)求
- 新加入到 consumer group 的 consumer 確定好了 GroupCoordinator 以后消費(fèi)者會(huì)向 GroupCoordinator 發(fā)起 join group 請(qǐng)求,GroupCoordinator 會(huì)收集全部消費(fèi)者信息之后,來確認(rèn)可用的消費(fèi)者,并從中選取一個(gè)消費(fèi)者成為 group_leader.并把相應(yīng)的信息(分區(qū)分配策略,leader_id,…)封裝成 response 返回給所有消費(fèi)者,但是只有 group leader 會(huì)收到當(dāng)前 consumer group 中的所有消費(fèi)者信息.當(dāng)消費(fèi)者確定自己是 group leader 以后,會(huì)根據(jù)消費(fèi)者的信息以及選定分區(qū)分配策略進(jìn)行分區(qū)分配
- 接著進(jìn)入 Synchronizing Group State 階段,每個(gè)消費(fèi)者會(huì)發(fā)送 SyncGroupRequest 請(qǐng)求到 GroupCoordinator,但是只有 Group Leader 的請(qǐng)求會(huì)存在分區(qū)分配結(jié)果,GroupCoordinator 會(huì)根據(jù) Group Leader 的分區(qū)分配結(jié)果形成 SyncGroupResponse 返回給所有的 Consumer
- consumer 根據(jù)分配結(jié)果,執(zhí)行相應(yīng)的操作
到這里為止,我們已經(jīng)知道了消息的發(fā)送分區(qū)策略,以及消費(fèi)者的分區(qū)消費(fèi)策略和 rebalance.對(duì)于應(yīng)用層面來說,還有一個(gè)最重要的東西沒有講解,就是 offset,他類似一個(gè)游標(biāo),表示當(dāng)前消費(fèi)的消息的位置
如何保存消費(fèi)端的消費(fèi)位置
什么是 offset
前面在講解 partition 的時(shí)候,提到過 offset,每個(gè) topic 可以劃分多個(gè)分區(qū)(每個(gè) Topic 至少有一個(gè)分區(qū)),同一 topic 下的不同分區(qū)包含的消息是不同的.每個(gè)消息在被添加到分區(qū)時(shí),都會(huì)被分配一個(gè) offset(稱之為偏移量),它是消息在此分區(qū)中的唯一編號(hào),kafka 通過 offset 保證消息在分區(qū)內(nèi)的順序,offset 的順序不跨分區(qū),即 kafka 只保證在同一個(gè)分區(qū)內(nèi)的消息是有序的; 對(duì)于應(yīng)用層的消費(fèi)來說,每次消費(fèi)一個(gè)消息并且提交以后,會(huì)保存當(dāng)前消費(fèi)到的最近的一個(gè) offset.那么 offset 保存在哪里?
offset 在哪里維護(hù)?
在 kafka 中,提供了一個(gè) consumer_offsets_* 的一個(gè) topic,把 offset 信息寫入到這個(gè) topic 中
consumer_offsets——按保存了每個(gè) consumer group 某一時(shí)刻提交的 offset 信息.__consumer_offsets 默認(rèn)有 50 個(gè)分區(qū)
根據(jù)前面我們演示的案例,我們?cè)O(shè)置了一個(gè) KafkaConsumerDemo 的 groupid.首先我們需要找到這個(gè) consumer_group 保存在哪個(gè)分區(qū)中
properties.put(ConsumerConfig.GROUP_ID_CONFIG,"KafkaConsumerDemo");
計(jì)算公式
- Math.abs(“groupid”.hashCode())%groupMetadataTopicPartitionCount ; 由于默認(rèn)情況下 groupMetadataTopicPartitionCount 有 50 個(gè)分區(qū),計(jì)算得到的結(jié)果為:35,意味著當(dāng)前的 consumer_group 的位移信息保存在__consumer_offsets 的第 35 個(gè)分區(qū)
- 執(zhí)行如下命令,可以查看當(dāng)前 consumer_goup 中的 offset 位移提交的信息
kafka-console-consumer.sh --topic __consumer_offsets --partition 15 -- bootstrap-server 192.168.13.102:9092,192.168.13.103:9092,192.168.13.104:9092 --formatter 'kafka.coordinator.group.GroupMetadataManager$OffsetsMessageFormatter'
從輸出結(jié)果中,我們就可以看到 test 這個(gè) topic 的 offset 的位移日志
分區(qū)的副本機(jī)制
我們已經(jīng)知道 Kafka 的每個(gè) topic 都可以分為多個(gè) Partition,并且多個(gè) partition 會(huì)均勻分布在集群的各個(gè)節(jié)點(diǎn)下.雖然這種方式能夠有效的對(duì)數(shù)據(jù)進(jìn)行分片,但是對(duì)于每個(gè) partition 來說,都是單點(diǎn)的,當(dāng)其中一個(gè) partition 不可用的時(shí)候,那么這部分消息就沒辦法消費(fèi).所以 kafka 為了提高 partition 的可靠性而提供了副本的概念(Replica),通過副本機(jī)制來實(shí)現(xiàn)冗余備份
每個(gè)分區(qū)可以有多個(gè)副本,并且在副本集合中會(huì)存在一個(gè) leader 的副本,所有的讀寫請(qǐng)求都是由 leader 副本來進(jìn)行處理.剩余的其他副本都做為 follower 副本,follower 副本會(huì)從 leader 副本同步消息日志.這個(gè)有點(diǎn)類似 zookeeper 中 leader 和 follower 的概念,但是具體的時(shí)間方式還是有比較大的差異.所以我們可以認(rèn)為,副本集會(huì)存在一主多從的關(guān)系
一般情況下,同一個(gè)分區(qū)的多個(gè)副本會(huì)被均勻分配到集群中的不同 broker 上,當(dāng) leader 副本所在的 broker 出現(xiàn)故障后,可以重新選舉新的 leader 副本繼續(xù)對(duì)外提供服務(wù).通過這樣的副本機(jī)制來提高 kafka 集群的可用性
創(chuàng)建一個(gè)帶副本機(jī)制的 topic
通過下面的命令去創(chuàng)建帶 2 個(gè)副本的 topic
sh kafka-topics.sh --create --zookeeper 192.168.11.156:2181 --replication-factor 3 --partitions 3 --topic secondTopic
然后我們可以在/tmp/kafka-log 路徑下看到對(duì)應(yīng) topic 的副本信息了.我們通過一個(gè)圖形的方式來表達(dá)
針對(duì) secondTopic 這個(gè) topic 的 3 個(gè)分區(qū)對(duì)應(yīng)的 3 個(gè)副本
如何知道那個(gè)各個(gè)分區(qū)中對(duì)應(yīng)的 leader 是誰呢?
在 zookeeper 服務(wù)器上,通過如下命令去獲取對(duì)應(yīng)分區(qū)的信息,比如下面這個(gè)是獲取 secondTopic 第 1 個(gè)分區(qū)的狀態(tài)信息
get /brokers/topics/secondTopic/partitions/1/state
{"controller_epoch":12,"leader":0,"version":1,"leader_epoch":0,"isr":[0,1]}
或通過這個(gè)命令 sh kafka-topics.sh --zookeeper 192.168.13.106:2181 --describe --topic test_partition
leader 表示當(dāng)前分區(qū)的 leader 是那個(gè) broker-id.下圖中.綠色線條的表示該分區(qū)中的 leader 節(jié)點(diǎn).其他節(jié)點(diǎn)就為 follower
需要注意的是,kafka 集群中的一個(gè) broker 中最多只能有一個(gè)副本,leader 副本所在的 broker 節(jié)點(diǎn)的分區(qū)叫 leader 節(jié)點(diǎn),follower 副本所在的 broker 節(jié)點(diǎn)的分區(qū)叫 follower 節(jié)點(diǎn)
副本的 leader 選舉
Kafka 提供了數(shù)據(jù)復(fù)制算法保證,如果 leader 副本所在的 broker 節(jié)點(diǎn)宕機(jī)或者出現(xiàn)故障,或者分區(qū)的 leader 節(jié)點(diǎn)發(fā)生故障,這個(gè)時(shí)候怎么處理呢?
那么,kafka 必須要保證從 follower 副本中選擇一個(gè)新的 leader 副本.那么 kafka 是如何實(shí)現(xiàn)選舉的呢?要了解 leader 選舉,我們需要了解幾個(gè)概念
Kafka 分區(qū)下有可能有很多個(gè)副本(replica)用于實(shí)現(xiàn)冗余,從而進(jìn)一步實(shí)現(xiàn)高可用.副本根據(jù)角色的不同可分為 3 類:
- leader 副本:響應(yīng) clients 端讀寫請(qǐng)求的副本
- follower 副本:被動(dòng)地備份 leader 副本中的數(shù)據(jù),不能響應(yīng) clients 端讀寫請(qǐng)求.
- ISR 副本:包含了 leader 副本和所有與 leader 副本保持同步的 follower 副本——如何判定是否與 leader 同步后面會(huì)提到每個(gè) Kafka 副本對(duì)象都有兩個(gè)重要的屬性:LEO 和 HW.注意是所有的副本,而不只是 leader 副本
- LEO:即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值.注意是下一條消息!也就是說,如果 LEO=10,那么表示該副本保存了 10 條消息,位移值范圍是[0,9].另外,leader LEO 和 follower LEO 的更新是有區(qū)別的.我們后面會(huì)詳細(xì)說
- HW:即上面提到的水位值.對(duì)于同一個(gè)副本對(duì)象而言,其 HW 值不會(huì)大于 LEO 值.小于等于 HW 值的所有消息都被認(rèn)為是“已備份”的(replicated).同理,leader 副本和 follower 副本的 HW 更新是有區(qū)別的
從生產(chǎn)者發(fā)出的 一 條消息首先會(huì)被寫入分區(qū)的 leader 副本,不過還需要等待 ISR 集合中的所有 follower 副本都同步完之后才能被認(rèn)為已經(jīng)提交,之后才會(huì)更新分區(qū)的 HW,進(jìn)而消費(fèi)者可以消費(fèi)到這條消息
副本協(xié)同機(jī)制
剛剛提到了,消息的讀寫操作都只會(huì)由 leader 節(jié)點(diǎn)來接收和處理.follower 副本只負(fù)責(zé)同步數(shù)據(jù)以及當(dāng) leader 副本所在的 broker 掛了以后,會(huì)從 follower 副本中選取新的 leader
寫請(qǐng)求首先由 Leader 副本處理,之后 follower 副本會(huì)從 leader 上拉取寫入的消息,這個(gè)過程會(huì)有一定的延遲,導(dǎo)致 follower 副本中保存的消息略少于 leader 副本,但是只要沒有超出閾值都可以容忍.但是如果一個(gè) follower 副本出現(xiàn)異常,比如宕機(jī),網(wǎng)絡(luò)斷開等原因長(zhǎng)時(shí)間沒有同步到消息,那這個(gè)時(shí)候,leader 就會(huì)把它踢出去.kafka 通過 ISR 集合來維護(hù)一個(gè)分區(qū)副本信息
一個(gè)新 leader 被選舉并被接受客戶端的消息成功寫入.Kafka 確保從同步副本列表中選舉一個(gè)副本為 leader;leader 負(fù)責(zé)維護(hù)和跟蹤 ISR(in-Sync replicas ,副本同步隊(duì)列)中所有 follower 滯后的狀態(tài).當(dāng) producer 發(fā)送一條消息到 broker 后,leader 寫入消息并復(fù)制到所有 follower.消息提交之后才被成功復(fù)制到所有的同步副本
ISR
ISR 表示目前“可用且消息量與 leader 相差不多的副本集合,這是整個(gè)副本集合的一個(gè)子集”.怎么去理解可用和相差不多這兩個(gè)詞呢?具體來說,ISR 集合中的副本必須滿足兩個(gè)條件
- 副本所在節(jié)點(diǎn)必須維持著與 zookeeper 的連接
- 副本最后一條消息的 offset 與 leader 副本的最后一條消息的 offset 之間的差值不能超過指定的閾值(replica.lag.time.max.ms)replica.lag.time.max.ms:如果該 follower 在此時(shí)間間隔內(nèi)一直沒有追上過 leader 的所有消息,則該 follower 就會(huì)被剔除 isr 列表
- ISR 數(shù)據(jù)保存在 Zookeeper 的 /brokers/topics/<topic>/partitions/<partitionId>/state 節(jié)點(diǎn)中
follower 副本把 leader 副本 LEO 之前的日志全部同步完成時(shí),則認(rèn)為 follower 副本已經(jīng)追趕上了 leader 副本,這個(gè)時(shí)候會(huì)更新這個(gè)副本的 lastCaughtUpTimeMs 標(biāo)識(shí),kafk 副本管理器會(huì)啟動(dòng)一個(gè)副本過期檢查的定時(shí)任務(wù),這個(gè)任務(wù)會(huì)定期檢查當(dāng)前時(shí)間與副本的 lastCaughtUpTimeMs 的差值是否大于參數(shù) replica.lag.time.max.ms 的值,如果大于,則會(huì)把這個(gè)副本踢出 ISR 集合
如何處理所有的 Replica 不工作的情況
在 ISR 中至少有一個(gè) follower 時(shí),Kafka 可以確保已經(jīng) commit 的數(shù)據(jù)不丟失,但如果某個(gè) Partition 的所有 Replica 都宕機(jī)了,就無法保證數(shù)據(jù)不丟失了
- 等待 ISR 中的任一個(gè) Replica“活”過來,并且選它作為 Leader
- 選擇第一個(gè)“活”過來的 Replica(不一定是 ISR 中的)作為 Leader
這就需要在可用性和一致性當(dāng)中作出一個(gè)簡(jiǎn)單的折衷
如果一定要等待 ISR 中的 Replica“活”過來,那不可用的時(shí)間就可能會(huì)相對(duì)較長(zhǎng).而且如果 ISR 中的所有 Replica 都無法“活”過來了,或者數(shù)據(jù)都丟失了,這個(gè) Partition 將永遠(yuǎn)不可用
選擇第一個(gè)“活”過來的 Replica 作為 Leader,而這個(gè) Replica 不是 ISR 中的 Replica,那即使它并不保證已經(jīng)包含了所有已 commit 的消息,它也會(huì)成為 Leader 而作為 consumer 的數(shù)據(jù)源(前文有說明,所有讀寫都由 Leader 完成).在我們課堂講的版本中,使用的是第一種策略
副本數(shù)據(jù)同步原理
了解了副本的協(xié)同過程以后,還有一個(gè)最重要的機(jī)制,就是數(shù)據(jù)的同步過程.它需要解決
- 怎么傳播消息
- 在向消息發(fā)送端返回 ack 之前需要保證多少個(gè) Replica 已經(jīng)接收到這個(gè)消息
數(shù)據(jù)的處理過程是下圖中,深紅色部分表示 test_replica 分區(qū)的 leader 副本,另外兩個(gè)節(jié)點(diǎn)上淺色部分表示 follower 副本
Producer 在發(fā)布消息到某個(gè) Partition 時(shí)
- 先通過 ZooKeeper 找到該 Partition 的 Leader get /brokers/topics/<topic>/partitions/2/state ,然后無論該 Topic 的 Replication Factor 為多少(也即該 Partition 有多少個(gè) Replica),Producer 只將該消息發(fā)送到該 Partition 的 Leader.
- Leader 會(huì)將該消息寫入其本地 Log.每個(gè) Follower 都從 Leader pull 數(shù)據(jù).這種方式上,Follower 存儲(chǔ)的數(shù)據(jù)順序與 Leader 保持一致
- Follower 在收到該消息并寫入其 Log 后,向 Leader 發(fā)送 ACK
- 一旦 Leader 收到了 ISR 中的所有 Replica 的 ACK,該消息就被認(rèn)為已經(jīng) commit 了,Leader 將增加 HW(HighWatermark)并且向 Producer 發(fā)送 ACK
LEO
即日志末端位移(log end offset),記錄了該副本底層日志(log)中下一條消息的位移值.注意是下一條消息!也就是說,如果 LEO=10,那么表示該副本保存了 10 條消息,位移值范圍是[0,9].另外,leader LEO 和 follower LEO 的更新是有區(qū)別的.我們后面會(huì)詳細(xì)說
HW
即上面提到的水位值(Hight Water).對(duì)于同一個(gè)副本對(duì)象而言,其 HW 值不會(huì)大于 LEO 值.小于等于 HW 值的所有消息都被認(rèn)為是“已備份”的(replicated).同理,leader 副本和 follower 副本的 HW 更新是有區(qū)別的
通過下面這幅圖來表達(dá) LEO,HW 的含義,隨著 follower 副本不斷和 leader 副本進(jìn)行數(shù)據(jù)同步,follower 副本的 LEO 會(huì)主鍵后移并且追趕到 leader 副本,這個(gè)追趕上的判斷標(biāo)準(zhǔn)是當(dāng)前副本的 LEO 是否大于或者等于 leader 副本的 HW,這個(gè)追趕上也會(huì)使得被踢出的 follower 副本重新加入到 ISR 集合中
另外,假如說下圖中的最右側(cè)的 follower 副本被踢出 ISR 集合,也會(huì)導(dǎo)致這個(gè)分區(qū)的 HW 發(fā)生變化,變成了 3
初始狀態(tài)
初始狀態(tài)下,leader 和 follower 的 HW 和 LEO 都是 0,leader 副本會(huì)保存 remote LEO,表示所有 follower LEO,也會(huì)被初始化為 0,這個(gè)時(shí)候,producer 沒有發(fā)送消息.follower 會(huì)不斷地個(gè) leader 發(fā)送 FETCH 請(qǐng)求,但是因?yàn)闆]有數(shù)據(jù),這個(gè)請(qǐng)求會(huì)被 leader 寄存,當(dāng)在指定的時(shí)間之后會(huì)強(qiáng)制完成請(qǐng)求,這個(gè)時(shí)間配置是(replica.fetch.wait.max.ms),如果在指定時(shí)間內(nèi) producer 有消息發(fā)送過來,那么 kafka 會(huì)喚醒 fetch 請(qǐng)求,讓 leader 繼續(xù)處理
數(shù)據(jù)的同步處理會(huì)分兩種情況,這兩種情況下處理方式是不一樣的
- 第一種是 leader 處理完 producer 請(qǐng)求之后,follower 發(fā)送一個(gè) fetch 請(qǐng)求過來
- 第二種是 follower 阻塞在 leader 指定時(shí)間之內(nèi),leader 副本收到 producer 的請(qǐng)求
第一種情況
生產(chǎn)者發(fā)送一條消息
leader 處理完 producer 請(qǐng)求之后,follower 發(fā)送一個(gè) fetch 請(qǐng)求過來 .狀態(tài)圖如下
leader 副本收到請(qǐng)求以后,會(huì)做幾件事情
- 把消息追加到 log 文件,同時(shí)更新 leader 副本的 LEO
- 嘗試更新 leader HW 值.這個(gè)時(shí)候由于 follower 副本還沒有發(fā)送 fetch 請(qǐng)求,那么 leader 的 remote LEO 仍然是 0.leader 會(huì)比較自己的 LEO 以及 remote LEO 的值發(fā)現(xiàn)最小值是 0,與 HW 的值相同,所以不會(huì)更新 HW
follower fetch 消息
follower 發(fā)送 fetch 請(qǐng)求,leader 副本的處理邏輯是:
- 讀取 log 數(shù)據(jù),更新 remote LEO=0(follower 還沒有寫入這條消息,這個(gè)值是根據(jù) follower 的 fetch 請(qǐng)求中的 offset 來確定的)
- 嘗試更新 HW,因?yàn)檫@個(gè)時(shí)候 LEO 和 remoteLEO 還是不一致,所以仍然是 HW=0
- 把消息內(nèi)容和當(dāng)前分區(qū)的 HW 值發(fā)送給 follower 副本
follower 副本收到 response 以后
- 將消息寫入到本地 log,同時(shí)更新 follower 的 LEO
- 更新 follower HW,本地的 LEO 和 leader 返回的 HW 進(jìn)行比較取小的值,所以仍然是 0
第一次交互結(jié)束以后,HW 仍然還是 0,這個(gè)值會(huì)在下一次 follower 發(fā)起 fetch 請(qǐng)求時(shí)被更新
follower 發(fā)第二次 fetch 請(qǐng)求,leader 收到請(qǐng)求以后
- 讀取 log 數(shù)據(jù)
- 更新 remote LEO=1,因?yàn)檫@次 fetch 攜帶的 offset 是 1.
- 更新當(dāng)前分區(qū)的 HW,這個(gè)時(shí)候 leader LEO 和 remote LEO 都是 1,所以 HW 的值也更新為 1 4.把數(shù)據(jù)和當(dāng)前分區(qū)的 HW 值返回給 follower 副本,這個(gè)時(shí)候如果沒有數(shù)據(jù),則返回為空
follower 副本收到 response 以后
- 如果有數(shù)據(jù)則寫本地日志,并且更新 LEO
- 更新 follower 的 HW 值
到目前為止,數(shù)據(jù)的同步就完成了,意味著消費(fèi)端能夠消費(fèi) offset=1 這條消息
第二種情況
前面說過,由于 leader 副本暫時(shí)沒有數(shù)據(jù)過來,所以 follower 的 fetch 會(huì)被阻塞,直到等待超時(shí)或者 leader 接收到新的數(shù)據(jù).當(dāng) leader 收到請(qǐng)求以后會(huì)喚醒處于阻塞的 fetch 請(qǐng)求.處理過程基本上和前面說的一致
- leader 將消息寫入本地日志,更新 Leader 的 LEO
- 喚醒 follower 的 fetch 請(qǐng)求
- 更新 HW
kafka 使用 HW 和 LEO 的方式來實(shí)現(xiàn)副本數(shù)據(jù)的同步,本身是一個(gè)好的設(shè)計(jì),但是在這個(gè)地方會(huì)存在一個(gè)數(shù)據(jù)丟失的問題,當(dāng)然這個(gè)丟失只出現(xiàn)在特定的背景下.我們回想一下,HW 的值是在新的一輪 FETCH 中才會(huì)被更新.我們分析下這個(gè)過程為什么會(huì)出現(xiàn)數(shù)據(jù)丟失
數(shù)據(jù)丟失的問題
前提
min.insync.replicas=1 //設(shè)定 ISR 中的最小副本數(shù)是多少,默認(rèn)值為 1(在 server.properties 中配
置),并且 acks 參數(shù)設(shè)置為-1(表示需要所有副本確認(rèn))時(shí),此參數(shù)才生效
表達(dá)的含義是,至少需要多少個(gè)副本同步才能表示消息是提交的,所以,當(dāng) min.insync.replicas=1 的時(shí)候,一旦消息被寫入 leader 端 log 即被認(rèn)為是“已提交”,而延遲一輪 FETCH RPC 更新 HW 值的設(shè)計(jì)使得 follower HW 值是異步延遲更新的,倘若在這個(gè)過程中 leader 發(fā)生變更,那么成為新 leader 的 follower 的 HW 值就有可能是過期的,使得 clients 端認(rèn)為是成功提交的消息被刪除
producer 的 ack
acks 配置表示 producer 發(fā)送消息到 broker 上以后的確認(rèn)值.有三個(gè)可選項(xiàng)
- 0:表示 producer 不需要等待 broker 的消息確認(rèn).這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)
- 1:表示 producer 只需要獲得 kafka 集群中的 leader 節(jié)點(diǎn)確認(rèn)即可,這個(gè)選擇時(shí)延較小同時(shí)確保了 leader 節(jié)點(diǎn)確認(rèn)接收成功
- all(-1):需要 ISR 中所有的 Replica 給予接收確認(rèn),速度最慢,安全性最高,但是由于 ISR 可能會(huì)縮小到僅包含一個(gè) Replica,所以設(shè)置參數(shù)為 all 并不能一定避免數(shù)據(jù)丟失
數(shù)據(jù)丟失的解決方案
在 kafka0.11.0.0 版本之后,引入了一個(gè) leader epoch 來解決這個(gè)問題,所謂的 leader epoch 實(shí)際上是一對(duì)值(epoch,offset),epoch 代表 leader 的版本號(hào),從 0 開始遞增,當(dāng) leader 發(fā)生過變更,epoch 就+1,而 offset 則是對(duì)應(yīng)這個(gè) epoch 版本的 leader 寫入第一條消息的 offset,比如 (0,0),(1,50),表示第一個(gè) leader 從 offset=0 開始寫消息,一共寫了 50 條.第二個(gè) leader 版本號(hào)是 1,從 offset=50 開始寫,這個(gè)信息會(huì)持久化在對(duì)應(yīng)的分區(qū)的本地磁盤上,文件名是 /tmp/kafka- log/topic/leader-epoch-checkpoint
leader broker 中會(huì)保存這樣一個(gè)緩存,并且定期寫入到 checkpoint 文件中
當(dāng) leader 寫 log 時(shí)它會(huì)嘗試更新整個(gè)緩存: 如果這個(gè) leader 首次寫消息,則會(huì)在緩存中增加一個(gè)條目;否則就不做更新.而每次副本重新成為 leader 時(shí)會(huì)查詢這部分緩存,獲取出對(duì)應(yīng) leader 版本的 offset
我們基于同樣的情況來分析,follower 宕機(jī)并且恢復(fù)之后,有兩種情況,如果這個(gè)時(shí)候 leader 副本沒有掛,也就是意味著沒有發(fā)生 leader 選舉,那么 follower 恢復(fù)之后并不會(huì)去截?cái)嘧约旱娜罩?而是先發(fā)送一個(gè) OffsetsForLeaderEpochRequest 請(qǐng)求給到 leader 副本,leader 副本收到請(qǐng)求之后返回當(dāng)前的 LEO
如果 follower 副本的 leaderEpoch 和 leader 副本的 epoch 相同,leader 的 leo 只可能大于或者等于 follower 副本的 leo 值,所以這個(gè)時(shí)候不會(huì)發(fā)生截?cái)?/p>
如果 follower 副本和 leader 副本的 epoch 值不同,那么 leader 副本會(huì)查找 follower 副本傳過來的 epoch+1 在本地文件中存儲(chǔ)的 StartOffset 返回給 follower 副本,也就是新 leader 副本的 LEO.這樣也避免了數(shù)據(jù)丟失的問題
如果 leader 副本宕機(jī)了重新選舉新的 leader,那么原本的 follower 副本就會(huì)變成 leader,意味著 epoch 從 0 變成 1,使得原本 follower 副本中 LEO 的值的到了保留
Leader 副本的選舉過程
- KafkaController 會(huì)監(jiān)聽 ZooKeeper 的/brokers/ids 節(jié)點(diǎn)路徑,一旦發(fā)現(xiàn)有 broker 掛了,執(zhí)行下面的邏輯.這里暫時(shí)先不考慮 KafkaController 所在 broker 掛了的情況,KafkaController 掛了,各個(gè) broker 會(huì)重新 leader 選舉出新的 KafkaController
- leader 副本在該 broker 上的分區(qū)就要重新進(jìn)行 leader 選舉,目前的選舉策略是
- 優(yōu)先從 isr 列表中選出第一個(gè)作為 leader 副本,這個(gè)叫優(yōu)先副本,理想情況下有限副本就是該分區(qū)的 leader 副本
- 如果 isr 列表為空,則查看該 topic 的 unclean.leader.election.enable 配置.unclean.leader.election.enable:為 true 則代表允許選用非 isr 列表的副本作為 leader,那么此時(shí)就意味著數(shù)據(jù)可能丟失,為 false 的話,則表示不允許,直接拋出 NoReplicaOnlineException 異常,造成 leader 副本選舉失敗
- 如果上述配置為 true,則從其他副本中選出一個(gè)作為 leader 副本,并且 isr 列表只包含該 leader 副本.一旦選舉成功,則將選舉后的 leader 和 isr 和其他副本信息寫入到該分區(qū)的對(duì)應(yīng)的 zk 路徑上
消息的存儲(chǔ)
消息發(fā)送端發(fā)送消息到 broker 上以后,消息是如何持久化的呢?那么接下來去分析下消息的存儲(chǔ)
首先我們需要了解的是,kafka 是使用日志文件的方式來保存生產(chǎn)者和發(fā)送者的消息,每條消息都有一個(gè) offset 值來表示它在分區(qū)中的偏移量.Kafka 中存儲(chǔ)的一般都是海量的消息數(shù)據(jù),為了避免日志文件過大,Log 并不是直接對(duì)應(yīng)在一個(gè)磁盤上的日志文件,而是對(duì)應(yīng)磁盤上的一個(gè)目錄,這個(gè)目錄的命名規(guī)則是<topic_name>_<partition_id>
消息的文件存儲(chǔ)機(jī)制
一個(gè) topic 的多個(gè) partition 在物理磁盤上的保存路徑,路徑保存在 /tmp/kafka-logs/topic_partition,包含日志文件,索引文件和時(shí)間索引文件
kafka 是通過分段的方式將 Log 分為多個(gè) LogSegment,LogSegment 是一個(gè)邏輯上的概念,一個(gè) LogSegment 對(duì)應(yīng)磁盤上的一個(gè)日志文件和一個(gè)索引文件,其中日志文件是用來記錄消息的.索引文件是用來保存消息的索引.那么這個(gè) LogSegment 是什么呢?
LogSegment
假設(shè) kafka 以 partition 為最小存儲(chǔ)單位,那么我們可以想象當(dāng) kafka producer 不斷發(fā)送消息,必然會(huì)引起 partition 文件的無線擴(kuò)張,這樣對(duì)于消息文件的維護(hù)以及被消費(fèi)的消息的清理帶來非常大的挑戰(zhàn),所以 kafka 以 segment 為單位又把 partition 進(jìn)行細(xì)分.每個(gè) partition 相當(dāng)于一個(gè)巨型文件被平均分配到多個(gè)大小相等的 segment 數(shù)據(jù)文件中(每個(gè) segment 文件中的消息不一定相等),這種特性方便已經(jīng)被消費(fèi)的消息的清理,提高磁盤的利用率
- log.segment.bytes=107370 (設(shè)置分段大小),默認(rèn)是 1gb,我們把這個(gè)值調(diào)小以后,可以看到日志分段的效果
-
抽取其中3個(gè)分段來進(jìn)行分析
segment file 由 2 大部分組成,分別為 index file 和 data file,此 2 個(gè)文件一一對(duì)應(yīng),成對(duì)出現(xiàn),后綴".index"和“.log”分別表示為 segment 索引文件,數(shù)據(jù)文件
segment 文件命名規(guī)則:partion 全局的第一個(gè) segment 從 0 開始,后續(xù)每個(gè) segment 文件名為上一個(gè) segment 文件最后一條消息的 offset 值進(jìn)行遞增.數(shù)值最大為 64 位 long 大小,20 位數(shù)字字符長(zhǎng)度,沒有數(shù)字用 0 填充
查看 segment 文件命名規(guī)則
通過下面這條命令可以看到 kafka 消息日志的內(nèi)容
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test- 0/00000000000000000000.log --print-data-log
假如第一個(gè) log 文件的最后一個(gè) offset 為:5376,所以下一個(gè) segment 的文件命名為:00000000000000005376.log.對(duì)應(yīng)的 index 為 00000000000000005376.index
segment 中 index 和 log 的對(duì)應(yīng)關(guān)系
從所有分段中,找一個(gè)分段進(jìn)行分析
為了提高查找消息的性能,為每一個(gè)日志文件添加 2 個(gè)索引索引文件:OffsetIndex 和 TimeIndex,分別對(duì)應(yīng).index 以及.timeindex,TimeIndex 索引文件格式:它是映射時(shí)間戳和相對(duì) offset
查看索引內(nèi)容:
sh kafka-run-class.sh kafka.tools.DumpLogSegments --files /tmp/kafka-logs/test- 0/00000000000000000000.index --print-data-log
如圖所示,index 中存儲(chǔ)了索引以及物理偏移量.log 存儲(chǔ)了消息的內(nèi)容.索引文件的元數(shù)據(jù)執(zhí)行對(duì)應(yīng)數(shù)據(jù)文件中 message 的物理偏移地址.舉個(gè)簡(jiǎn)單的案例來說,以[4053,80899]為例,在 log 文件中,對(duì)應(yīng)的是第 4053 條記錄,物理偏移量(position)為 80899.position 是 ByteBuffer 的指針位置
在 partition 中如何通過 offset 查找 message
查找的算法是
- 根據(jù) offset 的值,查找 segment 段中的 index 索引文件.由于索引文件命名是以上一個(gè)文件的最后一個(gè) offset 進(jìn)行命名的,所以,使用二分查找算法能夠根據(jù) offset 快速定位到指定的索引文件.
- 找到索引文件后,根據(jù) offset 進(jìn)行定位,找到索引文件中的符合范圍的索引.(kafka 采用稀疏索引的方式來提高查找性能)
- 得到 position 以后,再到對(duì)應(yīng)的 log 文件中,從 position 出開始查找 offset 對(duì)應(yīng)的消息,將每條消息的 offset 與目標(biāo) offset 進(jìn)行比較,直到找到消息
比如說,我們要查找 offset=2490 這條消息,那么先找到 00000000000000000000.index,然后找到[2487,49111]這個(gè)索引,再到 log 文件中,根據(jù) 49111 這個(gè) position 開始查找,比較每條消息的 offset 是否大于等于 2490.最后查找到對(duì)應(yīng)的消息以后返回
Log 文件的消息內(nèi)容分析
前面我們通過 kafka 提供的命令,可以查看二進(jìn)制的日志文件信息,一條消息,會(huì)包含很多的字段
offset: 5371 position: 102124 CreateTime: 1531477349286
isvalid: true keysize: -1 valuesize: 12 magic: 2 compresscodec: NONE
producerId: -1 producerEpoch: -1 sequence: -1 isTransactional: false headerKeys: [] payload: message_5371
offset 和 position 這兩個(gè)前面已經(jīng)講過了,createTime 表示創(chuàng)建時(shí)間,keysize 和 valuesize 表示 key 和 value 的大小,compresscodec 表示壓縮編碼,payload:表示消息的具體內(nèi)容
日志的清除策略以及壓縮策略
日志清除策略
前面提到過,日志的分段存儲(chǔ),一方面能夠減少單個(gè)文件內(nèi)容的大小,另一方面,方便 kafka 進(jìn)行日志清理.日志的清理策略有兩個(gè)
- 根據(jù)消息的保留時(shí)間,當(dāng)消息在 kafka 中保存的時(shí)間超過了指定的時(shí)間,就會(huì)觸發(fā)清理過程
- 根據(jù) topic 存儲(chǔ)的數(shù)據(jù)大小,當(dāng) topic 所占的日志文件大小大于一定的閥值,則可以開始刪除最舊的消息.kafka 會(huì)啟動(dòng)一個(gè)后臺(tái)線程,定期檢查是否存在可以刪除的消息
通過 log.retention.bytes 和 log.retention.hours 這兩個(gè)參數(shù)來設(shè)置,當(dāng)其中任意一個(gè)達(dá)到要求,都會(huì)執(zhí)行刪除
默認(rèn)的保留時(shí)間是:7天
日志壓縮策略
Kafka 還提供了“日志壓縮(Log Compaction)”功能,通過這個(gè)功能可以有效的減少日志文件的大小,緩解磁盤緊張的情況,在很多實(shí)際場(chǎng)景中,消息的 key 和 value 的值之間的對(duì)應(yīng)關(guān)系是不斷變化的,就像數(shù)據(jù)庫中的數(shù)據(jù)會(huì)不斷被修改一樣,消費(fèi)者只關(guān)心 key 對(duì)應(yīng)的最新的 value.因此,我們可以開啟 kafka 的日志壓縮功能,服務(wù)端會(huì)在后臺(tái)啟動(dòng)啟動(dòng) Cleaner 線程池,定期將相同的 key 進(jìn)行合并,只保留最新的 value 值.日志的壓縮原理是
磁盤存儲(chǔ)的性能問題
磁盤存儲(chǔ)的性能優(yōu)化
我們現(xiàn)在大部分企業(yè)仍然用的是機(jī)械結(jié)構(gòu)的磁盤,如果把消息以隨機(jī)的方式寫入到磁盤,那么磁盤首先要做的就是尋址,也就是定位到數(shù)據(jù)所在的物理地址,在磁盤上就要找到對(duì)應(yīng)的柱面,磁頭以及對(duì)應(yīng)的扇區(qū);這個(gè)過程相對(duì)內(nèi)存來說會(huì)消耗大量時(shí)間,為了規(guī)避隨機(jī)讀寫帶來的時(shí)間消耗,kafka 采用順序?qū)懙姆绞酱鎯?chǔ)數(shù)據(jù).即使是這樣,但是頻繁的 I/O 操作仍然會(huì)造成磁盤的性能瓶頸
零拷貝
消息從發(fā)送到落地保存,broker 維護(hù)的消息日志本身就是文件目錄,每個(gè)文件都是二進(jìn)制保存,生產(chǎn)者和消費(fèi)者使用相同的格式來處理.在消費(fèi)者獲取消息時(shí),服務(wù)器先從硬盤讀取數(shù)據(jù)到內(nèi)存,然后把內(nèi)存中的數(shù)據(jù)原封不動(dòng)的通過 socket 發(fā)送給消費(fèi)者.雖然這個(gè)操作描述起來很簡(jiǎn)單,但實(shí)際上經(jīng)歷了很多步驟
操作系統(tǒng)將數(shù)據(jù)從磁盤讀入到內(nèi)核空間的頁緩存
- 應(yīng)用程序?qū)?shù)據(jù)從內(nèi)核空間讀入到用戶空間緩存中
- 應(yīng)用程序?qū)?shù)據(jù)寫回到內(nèi)核空間到 socket 緩存中
- 操作系統(tǒng)將數(shù)據(jù)從 socket 緩沖區(qū)復(fù)制到網(wǎng)卡緩沖區(qū),以便將數(shù)據(jù)經(jīng)網(wǎng)絡(luò)發(fā)出
通過“零拷貝”技術(shù),可以去掉這些沒必要的數(shù)據(jù)復(fù)制操作,同時(shí)也會(huì)減少上下文切換次數(shù).現(xiàn)代的 unix 操作系統(tǒng)提供一個(gè)優(yōu)化的代碼路徑,用于將數(shù)據(jù)從頁緩存?zhèn)鬏數(shù)?socket;在 Linux 中,是通過 sendfile 系統(tǒng)調(diào)用來完成的.Java 提供了訪問這個(gè)系統(tǒng)調(diào)用的方法:FileChannel.transferTo API 使用 sendfile,只需要一次拷貝就行,允許操作系統(tǒng)將數(shù)據(jù)直接從頁緩存發(fā)送到網(wǎng)絡(luò)上.所以在這個(gè)優(yōu)化的路徑中,只有最后一步將數(shù)據(jù)拷貝到網(wǎng)卡緩存中是需要的
頁緩存
頁緩存是操作系統(tǒng)實(shí)現(xiàn)的一種主要的磁盤緩存,但凡設(shè)計(jì)到緩存的,基本都是為了提升 i/o 性能,所以頁緩存是用來減少磁盤 I/O 操作的
磁盤高速緩存有兩個(gè)重要因素:
- 訪問磁盤的速度要遠(yuǎn)低于訪問內(nèi)存的速度,若從處理器 L1 和 L2 高速緩存訪問則速度更快.
- 數(shù)據(jù)一旦被訪問,就很有可能短時(shí)間內(nèi)再次訪問.正是由于基于訪問內(nèi)存比磁盤快的多,所以磁盤的內(nèi)存緩存將給系統(tǒng)存儲(chǔ)性能帶來質(zhì)的飛越
當(dāng)一個(gè)進(jìn)程準(zhǔn)備讀取磁盤上的文件內(nèi)容時(shí),操作系統(tǒng)會(huì)先查看待讀取的數(shù)據(jù)所在的頁(page)是否在頁緩存(pagecache)中,如果存在(命中)則直接返回?cái)?shù)據(jù),從而避免了對(duì)物理磁盤的 I/0 操作;如果沒有命中,則操作系統(tǒng)會(huì)向磁盤發(fā)起讀取請(qǐng)求并將讀取的數(shù)據(jù)頁存入頁緩存,之后再將數(shù)據(jù)返回給進(jìn)程.同樣,如果一個(gè)進(jìn)程需要將數(shù)據(jù)寫入磁盤,那么操作系統(tǒng)也會(huì)檢測(cè)數(shù)據(jù)對(duì)應(yīng)的頁是否在頁緩存中,如果不存在,則會(huì)先在頁緩存中添加相應(yīng)的頁,最后將數(shù)據(jù)寫入對(duì)應(yīng)的頁.被修改過后的頁也就變成了臟頁,操作系統(tǒng)會(huì)在合適的時(shí)間把臟頁中的數(shù)據(jù)寫入磁盤,以保持?jǐn)?shù)據(jù)的一致性
Kafka 中大量使用了頁緩存,這是 Kafka 實(shí)現(xiàn)高吞吐的重要因素之一.雖然消息都是先被寫入頁緩存,然后由操作系統(tǒng)負(fù)責(zé)具體的刷盤任務(wù)的,但在 Kafka 中同樣提供了同步刷盤及間斷性強(qiáng)制刷盤(fsync),可以通過 log.flush.interval.messages 和 log.flush.interval.ms 參數(shù)來控制
同步刷盤能夠保證消息的可靠性,避免因?yàn)殄礄C(jī)導(dǎo)致頁緩存數(shù)據(jù)還未完成同步時(shí)造成的數(shù)據(jù)丟失.但是實(shí)際使用上,我們沒必要去考慮這樣的因素以及這種問題帶來的損失,消息可靠性可以由多副本來解決,同步刷盤會(huì)帶來性能的影響.刷盤的操作由操作系統(tǒng)去完成即可
Kafka 消息的可靠性
沒有一個(gè)中間件能夠做到百分之百的完全可靠,可靠性更多的還是基于幾個(gè) 9 的衡量指標(biāo),比如 4 個(gè) 9,5 個(gè) 9.軟件系統(tǒng)的可靠性只能夠無限去接近 100%,但不可能達(dá)到 100%.所以 kafka 如何是實(shí)現(xiàn)最大可能的可靠性呢?
- 分區(qū)副本,你可以創(chuàng)建更多的分區(qū)來提升可靠性,但是分區(qū)數(shù)過多也會(huì)帶來性能上的開銷,一般來說,3個(gè)副本就能滿足對(duì)大部分場(chǎng)景的可靠性要求
- acks,生產(chǎn)者發(fā)送消息的可靠性,也就是我要保證我這個(gè)消息一定是到了 broker 并且完成了多副本的持久化,但這種要求也同樣會(huì)帶來性能上的開銷.它有幾個(gè)可選項(xiàng)
- 1 ,生產(chǎn)者把消息發(fā)送到 leader 副本,leader 副本在成功寫入到本地日志之后就告訴生產(chǎn)者消息提交成功,但是如果 isr 集合中的 follower 副本還沒來得及同步 leader 副本的消息,leader 掛了,就會(huì)造成消息丟失
- -1 ,消息不僅僅寫入到 leader 副本,并且被 ISR 集合中所有副本同步完成之后才告訴生產(chǎn)者已經(jīng)提交成功,這個(gè)時(shí)候即使 leader 副本掛了也不會(huì)造成數(shù)據(jù)丟失
- 0:表示 producer 不需要等待 broker 的消息確認(rèn).這個(gè)選項(xiàng)時(shí)延最小但同時(shí)風(fēng)險(xiǎn)最大(因?yàn)楫?dāng) server 宕機(jī)時(shí),數(shù)據(jù)將會(huì)丟失)
- 保障消息到了 broker 之后,消費(fèi)者也需要有一定的保證,因?yàn)橄M(fèi)者也可能出現(xiàn)某些問題導(dǎo)致消息沒有消費(fèi)到
- enable.auto.commit 默認(rèn)為 true,也就是自動(dòng)提交 offset,自動(dòng)提交是批量執(zhí)行的,有一個(gè)時(shí)間窗口,這種方式會(huì)帶來重復(fù)提交或者消息丟失的問題,所以對(duì)于高可靠性要求的程序,要使用手動(dòng)提交.對(duì)于高可靠要求的應(yīng)用來說,寧愿重復(fù)消費(fèi)也不應(yīng)該因?yàn)橄M(fèi)異常而導(dǎo)致消息丟失