初識(shí) Kafka
什么是 Kafka
Kafka 是由 Linkedin 公司開發(fā)的,它是一個(gè)分布式的,支持多分區(qū)、多副本,基于 Zookeeper 的分布式消息流平臺(tái),它同時(shí)也是一款開源的基于發(fā)布訂閱模式的消息引擎系統(tǒng)。
Kafka 的基本術(shù)語
消息:Kafka 中的數(shù)據(jù)單元被稱為消息,也被稱為記錄,可以把它看作數(shù)據(jù)庫表中某一行的記錄。
批次:為了提高效率, 消息會(huì)分批次寫入 Kafka,批次就代指的是一組消息。
主題:消息的種類稱為 主題(Topic),可以說一個(gè)主題代表了一類消息。相當(dāng)于是對(duì)消息進(jìn)行分類。主題就像是數(shù)據(jù)庫中的表。
分區(qū):主題可以被分為若干個(gè)分區(qū)(partition),同一個(gè)主題中的分區(qū)可以不在一個(gè)機(jī)器上,有可能會(huì)部署在多個(gè)機(jī)器上,由此來實(shí)現(xiàn) kafka 的伸縮性,單一主題中的分區(qū)有序,但是無法保證主題中所有的分區(qū)有序
生產(chǎn)者: 向主題發(fā)布消息的客戶端應(yīng)用程序稱為生產(chǎn)者(Producer),生產(chǎn)者用于持續(xù)不斷的向某個(gè)主題發(fā)送消息。
消費(fèi)者:訂閱主題消息的客戶端程序稱為消費(fèi)者(Consumer),消費(fèi)者用于處理生產(chǎn)者產(chǎn)生的消息。
消費(fèi)者群組:生產(chǎn)者與消費(fèi)者的關(guān)系就如同餐廳中的廚師和顧客之間的關(guān)系一樣,一個(gè)廚師對(duì)應(yīng)多個(gè)顧客,也就是一個(gè)生產(chǎn)者對(duì)應(yīng)多個(gè)消費(fèi)者,消費(fèi)者群組(Consumer Group)指的就是由一個(gè)或多個(gè)消費(fèi)者組成的群體。
偏移量:偏移量(Consumer Offset)是一種元數(shù)據(jù),它是一個(gè)不斷遞增的整數(shù)值,用來記錄消費(fèi)者發(fā)生重平衡時(shí)的位置,以便用來恢復(fù)數(shù)據(jù)。
broker: 一個(gè)獨(dú)立的 Kafka 服務(wù)器就被稱為 broker,broker 接收來自生產(chǎn)者的消息,為消息設(shè)置偏移量,并提交消息到磁盤保存。
broker 集群:broker 是集群 的組成部分,broker 集群由一個(gè)或多個(gè) broker 組成,每個(gè)集群都有一個(gè) broker 同時(shí)充當(dāng)了集群控制器的角色(自動(dòng)從集群的活躍成員中選舉出來)。
副本:Kafka 中消息的備份又叫做 副本(Replica),副本的數(shù)量是可以配置的,Kafka 定義了兩類副本:領(lǐng)導(dǎo)者副本(Leader Replica) 和 追隨者副本(Follower Replica),前者對(duì)外提供服務(wù),后者只是被動(dòng)跟隨。
重平衡:Rebalance。消費(fèi)者組內(nèi)某個(gè)消費(fèi)者實(shí)例掛掉后,其他消費(fèi)者實(shí)例自動(dòng)重新分配訂閱主題分區(qū)的過程。Rebalance 是 Kafka 消費(fèi)者端實(shí)現(xiàn)高可用的重要手段。
Kafka 的特性(設(shè)計(jì)原則)
- 高吞吐、低延遲:kakfa 最大的特點(diǎn)就是收發(fā)消息非???,kafka 每秒可以處理幾十萬條消息,它的最低延遲只有幾毫秒。
- 高伸縮性: 每個(gè)主題(topic) 包含多個(gè)分區(qū)(partition),主題中的分區(qū)可以分布在不同的主機(jī)(broker)中。
- 持久性、可靠性: Kafka 能夠允許數(shù)據(jù)的持久化存儲(chǔ),消息被持久化到磁盤,并支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失,Kafka 底層的數(shù)據(jù)存儲(chǔ)是基于 Zookeeper 存儲(chǔ)的,Zookeeper 我們知道它的數(shù)據(jù)能夠持久存儲(chǔ)。
- 容錯(cuò)性: 允許集群中的節(jié)點(diǎn)失敗,某個(gè)節(jié)點(diǎn)宕機(jī),Kafka 集群能夠正常工作
- 高并發(fā): 支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫
Kafka 的使用場(chǎng)景
- 活動(dòng)跟蹤:Kafka 可以用來跟蹤用戶行為,比如我們經(jīng)?;厝ヌ詫氋徫?,你打開淘寶的那一刻,你的登陸信息,登陸次數(shù)都會(huì)作為消息傳輸?shù)?Kafka ,當(dāng)你瀏覽購物的時(shí)候,你的瀏覽信息,你的搜索指數(shù),你的購物愛好都會(huì)作為一個(gè)個(gè)消息傳遞給 Kafka ,這樣就可以生成報(bào)告,可以做智能推薦,購買喜好等。
- 傳遞消息:Kafka 另外一個(gè)基本用途是傳遞消息,應(yīng)用程序向用戶發(fā)送通知就是通過傳遞消息來實(shí)現(xiàn)的,這些應(yīng)用組件可以生成消息,而不需要關(guān)心消息的格式,也不需要關(guān)心消息是如何發(fā)送的。
- 度量指標(biāo):Kafka也經(jīng)常用來記錄運(yùn)營監(jiān)控?cái)?shù)據(jù)。包括收集各種分布式應(yīng)用的數(shù)據(jù),生產(chǎn)各種操作的集中反饋,比如報(bào)警和報(bào)告。
- 日志記錄:Kafka 的基本概念來源于提交日志,比如我們可以把數(shù)據(jù)庫的更新發(fā)送到 Kafka 上,用來記錄數(shù)據(jù)庫的更新時(shí)間,通過kafka以統(tǒng)一接口服務(wù)的方式開放給各種consumer,例如hadoop、Hbase、Solr等。
- 流式處理:流式處理是有一個(gè)能夠提供多種應(yīng)用程序的領(lǐng)域。
- 限流削峰:Kafka 多用于互聯(lián)網(wǎng)領(lǐng)域某一時(shí)刻請(qǐng)求特別多的情況下,可以把請(qǐng)求寫入Kafka 中,避免直接請(qǐng)求后端程序?qū)е路?wù)崩潰。
Kafka 的消息隊(duì)列
Kafka 的消息隊(duì)列一般分為兩種模式:點(diǎn)對(duì)點(diǎn)模式和發(fā)布訂閱模式
Kafka 是支持消費(fèi)者群組的,也就是說 Kafka 中會(huì)有一個(gè)或者多個(gè)消費(fèi)者,如果一個(gè)生產(chǎn)者生產(chǎn)的消息由一個(gè)消費(fèi)者進(jìn)行消費(fèi)的話,那么這種模式就是點(diǎn)對(duì)點(diǎn)模式
如果一個(gè)生產(chǎn)者或者多個(gè)生產(chǎn)者產(chǎn)生的消息能夠被多個(gè)消費(fèi)者同時(shí)消費(fèi)的情況,這樣的消息隊(duì)列成為發(fā)布訂閱模式的消息隊(duì)列
Kafka 系統(tǒng)架構(gòu)
如上圖所示,一個(gè)典型的 Kafka 集群中包含若干Producer(可以是web前端產(chǎn)生的Page View,或者是服務(wù)器日志,系統(tǒng)CPU、Memory等),若干broker(Kafka支持水平擴(kuò)展,一般broker數(shù)量越多,集群吞吐率越高),若干Consumer Group,以及一個(gè)Zookeeper集群。Kafka通過Zookeeper管理集群配置,選舉leader,以及在Consumer Group發(fā)生變化時(shí)進(jìn)行rebalance。Producer使用push模式將消息發(fā)布到broker,Consumer使用pull模式從broker訂閱并消費(fèi)消息。
核心 API
Kafka 有四個(gè)核心API,它們分別是
- Producer API,它允許應(yīng)用程序向一個(gè)或多個(gè) topics 上發(fā)送消息記錄
- Consumer API,允許應(yīng)用程序訂閱一個(gè)或多個(gè) topics 并處理為其生成的記錄流
- Streams API,它允許應(yīng)用程序作為流處理器,從一個(gè)或多個(gè)主題中消費(fèi)輸入流并為其生成輸出流,有效的將輸入流轉(zhuǎn)換為輸出流。
- Connector API,它允許構(gòu)建和運(yùn)行將 Kafka 主題連接到現(xiàn)有應(yīng)用程序或數(shù)據(jù)系統(tǒng)的可用生產(chǎn)者和消費(fèi)者。例如,關(guān)系數(shù)據(jù)庫的連接器可能會(huì)捕獲對(duì)表的所有更改
Kafka 為何如此之快
Kafka 實(shí)現(xiàn)了零拷貝原理來快速移動(dòng)數(shù)據(jù),避免了內(nèi)核之間的切換。Kafka 可以將數(shù)據(jù)記錄分批發(fā)送,從生產(chǎn)者到文件系統(tǒng)(Kafka 主題日志)到消費(fèi)者,可以端到端的查看這些批次的數(shù)據(jù)。
批處理能夠進(jìn)行更有效的數(shù)據(jù)壓縮并減少 I/O 延遲,Kafka 采取順序?qū)懭氪疟P的方式,避免了隨機(jī)磁盤尋址的浪費(fèi),更多關(guān)于磁盤尋址的了解,請(qǐng)參閱 程序員需要了解的硬核知識(shí)之磁盤 。
總結(jié)一下其實(shí)就是四個(gè)要點(diǎn)
- 順序讀寫
- 零拷貝
- 消息壓縮
- 分批發(fā)送
Kafka 安裝和重要配置
Kafka 安裝我在 Kafka 系列第一篇應(yīng)該比較詳細(xì)了,詳情見帶你漲姿勢(shì)的認(rèn)識(shí)一下kafka 這篇文章。
那我們還是主要來說一下 Kafka 中的重要參數(shù)配置吧,這些參數(shù)對(duì) Kafka 來說是非常重要的。
broker 端配置
- broker.id
每個(gè) kafka broker 都有一個(gè)唯一的標(biāo)識(shí)來表示,這個(gè)唯一的標(biāo)識(shí)符即是 broker.id,它的默認(rèn)值是 0。這個(gè)值在 kafka 集群中必須是唯一的,這個(gè)值可以任意設(shè)定,
- port
如果使用配置樣本來啟動(dòng) kafka,它會(huì)監(jiān)聽 9092 端口。修改 port 配置參數(shù)可以把它設(shè)置成任意的端口。要注意,如果使用 1024 以下的端口,需要使用 root 權(quán)限啟動(dòng) kakfa。
- zookeeper.connect
用于保存 broker 元數(shù)據(jù)的 Zookeeper 地址是通過 zookeeper.connect 來指定的。比如我可以這么指定 localhost:2181 表示這個(gè) Zookeeper 是運(yùn)行在本地 2181 端口上的。我們也可以通過 比如我們可以通過 zk1:2181,zk2:2181,zk3:2181 來指定 zookeeper.connect 的多個(gè)參數(shù)值。該配置參數(shù)是用冒號(hào)分割的一組 hostname:port/path 列表,其含義如下
hostname 是 Zookeeper 服務(wù)器的機(jī)器名或者 ip 地址。
port 是 Zookeeper 客戶端的端口號(hào)
/path 是可選擇的 Zookeeper 路徑,Kafka 路徑是使用了 chroot 環(huán)境,如果不指定默認(rèn)使用跟路徑。
如果你有兩套 Kafka 集群,假設(shè)分別叫它們 kafka1 和 kafka2,那么兩套集群的zookeeper.connect參數(shù)可以這樣指定:zk1:2181,zk2:2181,zk3:2181/kafka1和zk1:2181,zk2:2181,zk3:2181/kafka2
- log.dirs
Kafka 把所有的消息都保存到磁盤上,存放這些日志片段的目錄是通過 log.dirs 來制定的,它是用一組逗號(hào)來分割的本地系統(tǒng)路徑,log.dirs 是沒有默認(rèn)值的,你必須手動(dòng)指定他的默認(rèn)值。其實(shí)還有一個(gè)參數(shù)是 log.dir,如你所知,這個(gè)配置是沒有 s 的,默認(rèn)情況下只用配置 log.dirs 就好了,比如你可以通過 /home/kafka1,/home/kafka2,/home/kafka3 這樣來配置這個(gè)參數(shù)的值。
- num.recovery.threads.per.data.dir
對(duì)于如下3種情況,Kafka 會(huì)使用可配置的線程池來處理日志片段。
服務(wù)器正常啟動(dòng),用于打開每個(gè)分區(qū)的日志片段;
服務(wù)器崩潰后重啟,用于檢查和截?cái)嗝總€(gè)分區(qū)的日志片段;
服務(wù)器正常關(guān)閉,用于關(guān)閉日志片段。
默認(rèn)情況下,每個(gè)日志目錄只使用一個(gè)線程。因?yàn)檫@些線程只是在服務(wù)器啟動(dòng)和關(guān)閉時(shí)會(huì)用到,所以完全可以設(shè)置大量的線程來達(dá)到井行操作的目的。特別是對(duì)于包含大量分區(qū)的服務(wù)器來說,一旦發(fā)生崩憤,在進(jìn)行恢復(fù)時(shí)使用井行操作可能會(huì)省下數(shù)小時(shí)的時(shí)間。設(shè)置此參數(shù)時(shí)需要注意,所配置的數(shù)字對(duì)應(yīng)的是 log.dirs 指定的單個(gè)日志目錄。也就是說,如果 num.recovery.threads.per.data.dir 被設(shè)為 8,并且 log.dir 指定了 3 個(gè)路徑,那么總共需要 24 個(gè)線程。
- auto.create.topics.enable
默認(rèn)情況下,kafka 會(huì)使用三種方式來自動(dòng)創(chuàng)建主題,下面是三種情況:
當(dāng)一個(gè)生產(chǎn)者開始往主題寫入消息時(shí)
當(dāng)一個(gè)消費(fèi)者開始從主題讀取消息時(shí)
當(dāng)任意一個(gè)客戶端向主題發(fā)送元數(shù)據(jù)請(qǐng)求時(shí)
auto.create.topics.enable參數(shù)我建議最好設(shè)置成 false,即不允許自動(dòng)創(chuàng)建 Topic。在我們的線上環(huán)境里面有很多名字稀奇古怪的 Topic,我想大概都是因?yàn)樵搮?shù)被設(shè)置成了 true 的緣故。
主題默認(rèn)配置
Kafka 為新創(chuàng)建的主題提供了很多默認(rèn)配置參數(shù),下面就來一起認(rèn)識(shí)一下這些參數(shù)
- num.partitions
num.partitions 參數(shù)指定了新創(chuàng)建的主題需要包含多少個(gè)分區(qū)。如果啟用了主題自動(dòng)創(chuàng)建功能(該功能是默認(rèn)啟用的),主題分區(qū)的個(gè)數(shù)就是該參數(shù)指定的值。該參數(shù)的默認(rèn)值是 1。要注意,我們可以增加主題分區(qū)的個(gè)數(shù),但不能減少分區(qū)的個(gè)數(shù)。
- default.replication.factor
這個(gè)參數(shù)比較簡(jiǎn)單,它表示 kafka保存消息的副本數(shù),如果一個(gè)副本失效了,另一個(gè)還可以繼續(xù)提供服務(wù)default.replication.factor 的默認(rèn)值為1,這個(gè)參數(shù)在你啟用了主題自動(dòng)創(chuàng)建功能后有效。
- log.retention.ms
Kafka 通常根據(jù)時(shí)間來決定數(shù)據(jù)可以保留多久。默認(rèn)使用 log.retention.hours 參數(shù)來配置時(shí)間,默認(rèn)是 168 個(gè)小時(shí),也就是一周。除此之外,還有兩個(gè)參數(shù) log.retention.minutes 和 log.retentiion.ms 。這三個(gè)參數(shù)作用是一樣的,都是決定消息多久以后被刪除,推薦使用 log.retention.ms。
- log.retention.bytes
另一種保留消息的方式是判斷消息是否過期。它的值通過參數(shù) log.retention.bytes 來指定,作用在每一個(gè)分區(qū)上。也就是說,如果有一個(gè)包含 8 個(gè)分區(qū)的主題,并且 log.retention.bytes 被設(shè)置為 1GB,那么這個(gè)主題最多可以保留 8GB 數(shù)據(jù)。所以,當(dāng)主題的分區(qū)個(gè)數(shù)增加時(shí),整個(gè)主題可以保留的數(shù)據(jù)也隨之增加。
- log.segment.bytes
上述的日志都是作用在日志片段上,而不是作用在單個(gè)消息上。當(dāng)消息到達(dá) broker 時(shí),它們被追加到分區(qū)的當(dāng)前日志片段上,當(dāng)日志片段大小到達(dá) log.segment.bytes 指定上限(默認(rèn)為 1GB)時(shí),當(dāng)前日志片段就會(huì)被關(guān)閉,一個(gè)新的日志片段被打開。如果一個(gè)日志片段被關(guān)閉,就開始等待過期。這個(gè)參數(shù)的值越小,就越會(huì)頻繁的關(guān)閉和分配新文件,從而降低磁盤寫入的整體效率。
- log.segment.ms
上面提到日志片段經(jīng)關(guān)閉后需等待過期,那么 log.segment.ms 這個(gè)參數(shù)就是指定日志多長時(shí)間被關(guān)閉的參數(shù)和,log.segment.ms 和 log.retention.bytes 也不存在互斥問題。日志片段會(huì)在大小或時(shí)間到達(dá)上限時(shí)被關(guān)閉,就看哪個(gè)條件先得到滿足。
- message.max.bytes
broker 通過設(shè)置 message.max.bytes 參數(shù)來限制單個(gè)消息的大小,默認(rèn)是 1000 000, 也就是 1MB,如果生產(chǎn)者嘗試發(fā)送的消息超過這個(gè)大小,不僅消息不會(huì)被接收,還會(huì)收到 broker 返回的錯(cuò)誤消息。跟其他與字節(jié)相關(guān)的配置參數(shù)一樣,該參數(shù)指的是壓縮后的消息大小,也就是說,只要壓縮后的消息小于 mesage.max.bytes,那么消息的實(shí)際大小可以大于這個(gè)值
這個(gè)值對(duì)性能有顯著的影響。值越大,那么負(fù)責(zé)處理網(wǎng)絡(luò)連接和請(qǐng)求的線程就需要花越多的時(shí)間來處理這些請(qǐng)求。它還會(huì)增加磁盤寫入塊的大小,從而影響 IO 吞吐量。
- retention.ms
規(guī)定了該主題消息被保存的時(shí)常,默認(rèn)是7天,即該主題只能保存7天的消息,一旦設(shè)置了這個(gè)值,它會(huì)覆蓋掉 Broker 端的全局參數(shù)值。
- retention.bytes
retention.bytes:規(guī)定了要為該 Topic 預(yù)留多大的磁盤空間。和全局參數(shù)作用相似,這個(gè)值通常在多租戶的 Kafka 集群中會(huì)有用武之地。當(dāng)前默認(rèn)值是 -1,表示可以無限使用磁盤空間。
JVM 參數(shù)配置
JDK 版本一般推薦直接使用 JDK1.8,這個(gè)版本也是現(xiàn)在中國大部分程序員的首選版本。
說到 JVM 端設(shè)置,就繞不開堆這個(gè)話題,業(yè)界最推崇的一種設(shè)置方式就是直接將 JVM 堆大小設(shè)置為 6GB,這樣會(huì)避免很多 Bug 出現(xiàn)。
JVM 端配置的另一個(gè)重要參數(shù)就是垃圾回收器的設(shè)置,也就是平時(shí)常說的 GC 設(shè)置。如果你依然在使用 Java 7,那么可以根據(jù)以下法則選擇合適的垃圾回收器:
- 如果 Broker 所在機(jī)器的 CPU 資源非常充裕,建議使用 CMS 收集器。啟用方法是指定-XX:+UseCurrentMarkSweepGC。
- 否則,使用吞吐量收集器。開啟方法是指定-XX:+UseParallelGC。
當(dāng)然了,如果你已經(jīng)在使用 Java 8 了,那么就用默認(rèn)的 G1 收集器就好了。在沒有任何調(diào)優(yōu)的情況下,G1 表現(xiàn)得要比 CMS 出色,主要體現(xiàn)在更少的 Full GC,需要調(diào)整的參數(shù)更少等,所以使用 G1 就好了。
一般 G1 的調(diào)整只需要這兩個(gè)參數(shù)即可
- MaxGCPauseMillis
該參數(shù)指定每次垃圾回收默認(rèn)的停頓時(shí)間。該值不是固定的,G1可以根據(jù)需要使用更長的時(shí)間。它的默認(rèn)值是 200ms,也就是說,每一輪垃圾回收大概需要200 ms 的時(shí)間。
- InitiatingHeapOccupancyPercent
該參數(shù)指定了 G1 啟動(dòng)新一輪垃圾回收之前可以使用的堆內(nèi)存百分比,默認(rèn)值是45,這就表明G1在堆使用率到達(dá)45之前不會(huì)啟用垃圾回收。這個(gè)百分比包括新生代和老年代。
Kafka Producer
在 Kafka 中,我們把產(chǎn)生消息的那一方稱為生產(chǎn)者,比如我們經(jīng)?;厝ヌ詫氋徫铮愦蜷_淘寶的那一刻,你的登陸信息,登陸次數(shù)都會(huì)作為消息傳輸?shù)?Kafka 后臺(tái),當(dāng)你瀏覽購物的時(shí)候,你的瀏覽信息,你的搜索指數(shù),你的購物愛好都會(huì)作為一個(gè)個(gè)消息傳遞給 Kafka 后臺(tái),然后淘寶會(huì)根據(jù)你的愛好做智能推薦,致使你的錢包從來都禁不住誘惑,那么這些生產(chǎn)者產(chǎn)生的消息是怎么傳到 Kafka 應(yīng)用程序的呢?發(fā)送過程是怎么樣的呢?
盡管消息的產(chǎn)生非常簡(jiǎn)單,但是消息的發(fā)送過程還是比較復(fù)雜的,如圖
我們從創(chuàng)建一個(gè)ProducerRecord 對(duì)象開始,ProducerRecord 是 Kafka 中的一個(gè)核心類,它代表了一組 Kafka 需要發(fā)送的 key/value 鍵值對(duì),它由記錄要發(fā)送到的主題名稱(Topic Name),可選的分區(qū)號(hào)(Partition Number)以及可選的鍵值對(duì)構(gòu)成。
在發(fā)送 ProducerRecord 時(shí),我們需要將鍵值對(duì)對(duì)象由序列化器轉(zhuǎn)換為字節(jié)數(shù)組,這樣它們才能夠在網(wǎng)絡(luò)上傳輸。然后消息到達(dá)了分區(qū)器。
如果發(fā)送過程中指定了有效的分區(qū)號(hào),那么在發(fā)送記錄時(shí)將使用該分區(qū)。如果發(fā)送過程中未指定分區(qū),則將使用key 的 hash 函數(shù)映射指定一個(gè)分區(qū)。如果發(fā)送的過程中既沒有分區(qū)號(hào)也沒有,則將以循環(huán)的方式分配一個(gè)分區(qū)。選好分區(qū)后,生產(chǎn)者就知道向哪個(gè)主題和分區(qū)發(fā)送數(shù)據(jù)了。
ProducerRecord 還有關(guān)聯(lián)的時(shí)間戳,如果用戶沒有提供時(shí)間戳,那么生產(chǎn)者將會(huì)在記錄中使用當(dāng)前的時(shí)間作為時(shí)間戳。Kafka 最終使用的時(shí)間戳取決于 topic 主題配置的時(shí)間戳類型。
- 如果將主題配置為使用 CreateTime,則生產(chǎn)者記錄中的時(shí)間戳將由 broker 使用。
- 如果將主題配置為使用LogAppendTime,則生產(chǎn)者記錄中的時(shí)間戳在將消息添加到其日志中時(shí),將由 broker 重寫。
然后,這條消息被存放在一個(gè)記錄批次里,這個(gè)批次里的所有消息會(huì)被發(fā)送到相同的主題和分區(qū)上。由一個(gè)獨(dú)立的線程負(fù)責(zé)把它們發(fā)到 Kafka Broker 上。
Kafka Broker 在收到消息時(shí)會(huì)返回一個(gè)響應(yīng),如果寫入成功,會(huì)返回一個(gè) RecordMetaData 對(duì)象,它包含了主題和分區(qū)信息,以及記錄在分區(qū)里的偏移量,上面兩種的時(shí)間戳類型也會(huì)返回給用戶。如果寫入失敗,會(huì)返回一個(gè)錯(cuò)誤。生產(chǎn)者在收到錯(cuò)誤之后會(huì)嘗試重新發(fā)送消息,幾次之后如果還是失敗的話,就返回錯(cuò)誤消息。
創(chuàng)建 Kafka 生產(chǎn)者
要向 Kafka 寫入消息,首先需要?jiǎng)?chuàng)建一個(gè)生產(chǎn)者對(duì)象,并設(shè)置一些屬性。Kafka 生產(chǎn)者有3個(gè)必選的屬性
- bootstrap.servers
該屬性指定 broker 的地址清單,地址的格式為 host:port。清單里不需要包含所有的 broker 地址,生產(chǎn)者會(huì)從給定的 broker 里查找到其他的 broker 信息。不過建議至少要提供兩個(gè) broker 信息,一旦其中一個(gè)宕機(jī),生產(chǎn)者仍然能夠連接到集群上。
- key.serializer
broker 需要接收到序列化之后的 key/value值,所以生產(chǎn)者發(fā)送的消息需要經(jīng)過序列化之后才傳遞給 Kafka Broker。生產(chǎn)者需要知道采用何種方式把 Java 對(duì)象轉(zhuǎn)換為字節(jié)數(shù)組。key.serializer 必須被設(shè)置為一個(gè)實(shí)現(xiàn)了org.apache.kafka.common.serialization.Serializer 接口的類,生產(chǎn)者會(huì)使用這個(gè)類把鍵對(duì)象序列化為字節(jié)數(shù)組。這里拓展一下 Serializer 類
Serializer 是一個(gè)接口,它表示類將會(huì)采用何種方式序列化,它的作用是把對(duì)象轉(zhuǎn)換為字節(jié),實(shí)現(xiàn)了 Serializer 接口的類主要有 ByteArraySerializer、StringSerializer、IntegerSerializer ,其中 ByteArraySerialize 是 Kafka 默認(rèn)使用的序列化器,其他的序列化器還有很多,你可以通過 這里 查看其他序列化器。要注意的一點(diǎn):key.serializer 是必須要設(shè)置的,即使你打算只發(fā)送值的內(nèi)容。
- value.serializer
與 key.serializer 一樣,value.serializer 指定的類會(huì)將值序列化。
下面代碼演示了如何創(chuàng)建一個(gè) Kafka 生產(chǎn)者,這里只指定了必要的屬性,其他使用默認(rèn)的配置
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">private Properties properties = new Properties();
properties.put("bootstrap.servers","broker1:9092,broker2:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties = new KafkaProducer<String,String>(properties);</pre>
來解釋一下這段代碼
- 首先創(chuàng)建了一個(gè) Properties 對(duì)象
- 使用 StringSerializer 序列化器序列化 key / value 鍵值對(duì)
- 在這里我們創(chuàng)建了一個(gè)新的生產(chǎn)者對(duì)象,并為鍵值設(shè)置了恰當(dāng)?shù)念愋停缓蟀?Properties 對(duì)象傳遞給他。
Kafka 消息發(fā)送
實(shí)例化生產(chǎn)者對(duì)象后,接下來就可以開始發(fā)送消息了,發(fā)送消息主要由下面幾種方式
簡(jiǎn)單消息發(fā)送
Kafka 最簡(jiǎn)單的消息發(fā)送如下:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","West","France");
producer.send(record);</pre>
代碼中生產(chǎn)者(producer)的 send() 方法需要把 ProducerRecord 的對(duì)象作為參數(shù)進(jìn)行發(fā)送,ProducerRecord 有很多構(gòu)造函數(shù),這個(gè)我們下面討論,這里調(diào)用的是
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public ProducerRecord(String topic, K key, V value) {}</pre>
這個(gè)構(gòu)造函數(shù),需要傳遞的是 topic主題,key 和 value。
把對(duì)應(yīng)的參數(shù)傳遞完成后,生產(chǎn)者調(diào)用 send() 方法發(fā)送消息(ProducerRecord對(duì)象)。我們可以從生產(chǎn)者的架構(gòu)圖中看出,消息是先被寫入分區(qū)中的緩沖區(qū)中,然后分批次發(fā)送給 Kafka Broker。
發(fā)送成功后,send() 方法會(huì)返回一個(gè) Future(java.util.concurrent) 對(duì)象,F(xiàn)uture 對(duì)象的類型是 RecordMetadata 類型,我們上面這段代碼沒有考慮返回值,所以沒有生成對(duì)應(yīng)的 Future 對(duì)象,所以沒有辦法知道消息是否發(fā)送成功。如果不是很重要的信息或者對(duì)結(jié)果不會(huì)產(chǎn)生影響的信息,可以使用這種方式進(jìn)行發(fā)送。
我們可以忽略發(fā)送消息時(shí)可能發(fā)生的錯(cuò)誤或者在服務(wù)器端可能發(fā)生的錯(cuò)誤,但在消息發(fā)送之前,生產(chǎn)者還可能發(fā)生其他的異常。這些異常有可能是 SerializationException(序列化失敗),BufferedExhaustedException 或 TimeoutException(說明緩沖區(qū)已滿),又或是 InterruptedException(說明發(fā)送線程被中斷)
同步發(fā)送消息
第二種消息發(fā)送機(jī)制如下所示
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","West","France");
try{
RecordMetadata recordMetadata = producer.send(record).get();
}catch(Exception e){
e.printStackTrace();
}
</pre>
這種發(fā)送消息的方式較上面的發(fā)送方式有了改進(jìn),首先調(diào)用 send() 方法,然后再調(diào)用 get() 方法等待 Kafka 響應(yīng)。如果服務(wù)器返回錯(cuò)誤,get() 方法會(huì)拋出異常,如果沒有發(fā)生錯(cuò)誤,我們會(huì)得到 RecordMetadata 對(duì)象,可以用它來查看消息記錄。
生產(chǎn)者(KafkaProducer)在發(fā)送的過程中會(huì)出現(xiàn)兩類錯(cuò)誤:其中一類是重試錯(cuò)誤,這類錯(cuò)誤可以通過重發(fā)消息來解決。比如連接的錯(cuò)誤,可以通過再次建立連接來解決;無主錯(cuò)誤則可以通過重新為分區(qū)選舉首領(lǐng)來解決。KafkaProducer 被配置為自動(dòng)重試,如果多次重試后仍無法解決問題,則會(huì)拋出重試異常。另一類錯(cuò)誤是無法通過重試來解決的,比如消息過大對(duì)于這類錯(cuò)誤,KafkaProducer 不會(huì)進(jìn)行重試,直接拋出異常。
異步發(fā)送消息
同步發(fā)送消息都有個(gè)問題,那就是同一時(shí)間只能有一個(gè)消息在發(fā)送,這會(huì)造成許多消息無法直接發(fā)送,造成消息滯后,無法發(fā)揮效益最大化。
比如消息在應(yīng)用程序和 Kafka 集群之間一個(gè)來回需要 10ms。如果發(fā)送完每個(gè)消息后都等待響應(yīng)的話,那么發(fā)送100個(gè)消息需要 1 秒,但是如果是異步方式的話,發(fā)送 100 條消息所需要的時(shí)間就會(huì)少很多很多。大多數(shù)時(shí)候,雖然Kafka 會(huì)返回 RecordMetadata 消息,但是我們并不需要等待響應(yīng)。
為了在異步發(fā)送消息的同時(shí)能夠?qū)Ξ惓G闆r進(jìn)行處理,生產(chǎn)者提供了回掉支持。下面是回調(diào)的一個(gè)例子
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("CustomerCountry", "Huston", "America");
producer.send(producerRecord,new DemoProducerCallBack());
class DemoProducerCallBack implements Callback {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception != null){
exception.printStackTrace();;
}
}
}</pre>
首先實(shí)現(xiàn)回調(diào)需要定義一個(gè)實(shí)現(xiàn)了org.apache.kafka.clients.producer.Callback的類,這個(gè)接口只有一個(gè) onCompletion方法。如果 kafka 返回一個(gè)錯(cuò)誤,onCompletion 方法會(huì)拋出一個(gè)非空(non null)異常,這里我們只是簡(jiǎn)單的把它打印出來,如果是生產(chǎn)環(huán)境需要更詳細(xì)的處理,然后在 send() 方法發(fā)送的時(shí)候傳遞一個(gè) Callback 回調(diào)的對(duì)象。
生產(chǎn)者分區(qū)機(jī)制
Kafka 對(duì)于數(shù)據(jù)的讀寫是以分區(qū)為粒度的,分區(qū)可以分布在多個(gè)主機(jī)(Broker)中,這樣每個(gè)節(jié)點(diǎn)能夠?qū)崿F(xiàn)獨(dú)立的數(shù)據(jù)寫入和讀取,并且能夠通過增加新的節(jié)點(diǎn)來增加 Kafka 集群的吞吐量,通過分區(qū)部署在多個(gè) Broker 來實(shí)現(xiàn)負(fù)載均衡的效果。
上面我們介紹了生產(chǎn)者的發(fā)送方式有三種:不管結(jié)果如何直接發(fā)送、發(fā)送并返回結(jié)果、發(fā)送并回調(diào)。由于消息是存在主題(topic)的分區(qū)(partition)中的,所以當(dāng) Producer 生產(chǎn)者發(fā)送產(chǎn)生一條消息發(fā)給 topic 的時(shí)候,你如何判斷這條消息會(huì)存在哪個(gè)分區(qū)中呢?
這其實(shí)就設(shè)計(jì)到 Kafka 的分區(qū)機(jī)制了。
分區(qū)策略
Kafka 的分區(qū)策略指的就是將生產(chǎn)者發(fā)送到哪個(gè)分區(qū)的算法。Kafka 為我們提供了默認(rèn)的分區(qū)策略,同時(shí)它也支持你自定義分區(qū)策略。
如果要自定義分區(qū)策略的話,你需要顯示配置生產(chǎn)者端的參數(shù) Partitioner.class,我們可以看一下這個(gè)類它位于 org.apache.kafka.clients.producer 包下
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">public interface Partitioner extends Configurable, Closeable {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
public void close();
default public void onNewBatch(String topic, Cluster cluster, int prevPartition) {}
}</pre>
Partitioner 類有三個(gè)方法,分別來解釋一下
- partition(): 這個(gè)類有幾個(gè)參數(shù): topic,表示需要傳遞的主題;key 表示消息中的鍵值;keyBytes表示分區(qū)中序列化過后的key,byte數(shù)組的形式傳遞;value 表示消息的 value 值;valueBytes 表示分區(qū)中序列化后的值數(shù)組;cluster表示當(dāng)前集群的原數(shù)據(jù)。Kafka 給你這么多信息,就是希望讓你能夠充分地利用這些信息對(duì)消息進(jìn)行分區(qū),計(jì)算出它要被發(fā)送到哪個(gè)分區(qū)中。
- close() : 繼承了 Closeable 接口能夠?qū)崿F(xiàn) close() 方法,在分區(qū)關(guān)閉時(shí)調(diào)用。
- onNewBatch(): 表示通知分區(qū)程序用來創(chuàng)建新的批次
其中與分區(qū)策略息息相關(guān)的就是 partition() 方法了,分區(qū)策略有下面這幾種
順序輪詢
順序分配,消息是均勻的分配給每個(gè) partition,即每個(gè)分區(qū)存儲(chǔ)一次消息。就像下面這樣
上圖表示的就是輪詢策略,輪訓(xùn)策略是 Kafka Producer 提供的默認(rèn)策略,如果你不使用指定的輪訓(xùn)策略的話,Kafka 默認(rèn)會(huì)使用順序輪訓(xùn)策略的方式。
隨機(jī)輪詢
隨機(jī)輪詢簡(jiǎn)而言之就是隨機(jī)的向 partition 中保存消息,如下圖所示
實(shí)現(xiàn)隨機(jī)分配的代碼只需要兩行,如下
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());</pre>
先計(jì)算出該主題總的分區(qū)數(shù),然后隨機(jī)地返回一個(gè)小于它的正整數(shù)。
本質(zhì)上看隨機(jī)策略也是力求將數(shù)據(jù)均勻地打散到各個(gè)分區(qū),但從實(shí)際表現(xiàn)來看,它要遜于輪詢策略,所以如果追求數(shù)據(jù)的均勻分布,還是使用輪詢策略比較好。事實(shí)上,隨機(jī)策略是老版本生產(chǎn)者使用的分區(qū)策略,在新版本中已經(jīng)改為輪詢了。
按照 key 進(jìn)行消息保存
這個(gè)策略也叫做 key-ordering 策略,Kafka 中每條消息都會(huì)有自己的key,一旦消息被定義了 Key,那么你就可以保證同一個(gè) Key 的所有消息都進(jìn)入到相同的分區(qū)里面,由于每個(gè)分區(qū)下的消息處理都是有順序的,故這個(gè)策略被稱為按消息鍵保序策略,如下圖所示
實(shí)現(xiàn)這個(gè)策略的 partition 方法同樣簡(jiǎn)單,只需要下面兩行代碼即可:
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();</pre>
上面這幾種分區(qū)策略都是比較基礎(chǔ)的策略,除此之外,你還可以自定義分區(qū)策略。
生產(chǎn)者壓縮機(jī)制
壓縮一詞簡(jiǎn)單來講就是一種互換思想,它是一種經(jīng)典的用 CPU 時(shí)間去換磁盤空間或者 I/O 傳輸量的思想,希望以較小的 CPU 開銷帶來更少的磁盤占用或更少的網(wǎng)絡(luò) I/O 傳輸。如果你還不了解的話我希望你先讀完這篇文章 程序員需要了解的硬核知識(shí)之壓縮算法,然后你就明白壓縮是怎么回事了。
Kafka 壓縮是什么
Kafka 的消息分為兩層:消息集合 和 消息。一個(gè)消息集合中包含若干條日志項(xiàng),而日志項(xiàng)才是真正封裝消息的地方。Kafka 底層的消息日志由一系列消息集合日志項(xiàng)組成。Kafka 通常不會(huì)直接操作具體的一條條消息,它總是在消息集合這個(gè)層面上進(jìn)行寫入操作。
在 Kafka 中,壓縮會(huì)發(fā)生在兩個(gè)地方:Kafka Producer 和 Kafka Consumer,為什么啟用壓縮?說白了就是消息太大,需要變小一點(diǎn) 來使消息發(fā)的更快一些。
Kafka Producer 中使用 compression.type 來開啟壓縮
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">private Properties properties = new Properties();
properties.put("bootstrap.servers","192.168.1.9:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "gzip");
Producer<String,String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String,String> record =
new ProducerRecord<String, String>("CustomerCountry","Precision Products","France");</pre>
上面代碼表明該 Producer 的壓縮算法使用的是 GZIP
有壓縮必有解壓縮,Producer 使用壓縮算法壓縮消息后并發(fā)送給服務(wù)器后,由 Consumer 消費(fèi)者進(jìn)行解壓縮,因?yàn)椴捎玫暮畏N壓縮算法是隨著 key、value 一起發(fā)送過去的,所以消費(fèi)者知道采用何種壓縮算法。
Kafka 重要參數(shù)配置
在上一篇文章 帶你漲姿勢(shì)的認(rèn)識(shí)一下kafka中,我們主要介紹了一下 kafka 集群搭建的參數(shù),本篇文章我們來介紹一下 Kafka 生產(chǎn)者重要的配置,生產(chǎn)者有很多可配置的參數(shù),在文檔里(http://kafka.apache.org/documentation/#producerconfigs)都有說明,我們介紹幾個(gè)在內(nèi)存使用、性能和可靠性方面對(duì)生產(chǎn)者影響比較大的參數(shù)進(jìn)行說明
key.serializer
用于 key 鍵的序列化,它實(shí)現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口
value.serializer
用于 value 值的序列化,實(shí)現(xiàn)了 org.apache.kafka.common.serialization.Serializer 接口
acks
acks 參數(shù)指定了要有多少個(gè)分區(qū)副本接收消息,生產(chǎn)者才認(rèn)為消息是寫入成功的。此參數(shù)對(duì)消息丟失的影響較大
- 如果 acks = 0,就表示生產(chǎn)者也不知道自己產(chǎn)生的消息是否被服務(wù)器接收了,它才知道它寫成功了。如果發(fā)送的途中產(chǎn)生了錯(cuò)誤,生產(chǎn)者也不知道,它也比較懵逼,因?yàn)闆]有返回任何消息。這就類似于 UDP 的運(yùn)輸層協(xié)議,只管發(fā),服務(wù)器接受不接受它也不關(guān)心。
- 如果 acks = 1,只要集群的 Leader 接收到消息,就會(huì)給生產(chǎn)者返回一條消息,告訴它寫入成功。如果發(fā)送途中造成了網(wǎng)絡(luò)異?;蛘?Leader 還沒選舉出來等其他情況導(dǎo)致消息寫入失敗,生產(chǎn)者會(huì)受到錯(cuò)誤消息,這時(shí)候生產(chǎn)者往往會(huì)再次重發(fā)數(shù)據(jù)。因?yàn)橄⒌陌l(fā)送也分為 同步 和 異步,Kafka 為了保證消息的高效傳輸會(huì)決定是同步發(fā)送還是異步發(fā)送。如果讓客戶端等待服務(wù)器的響應(yīng)(通過調(diào)用 Future 中的 get() 方法),顯然會(huì)增加延遲,如果客戶端使用回調(diào),就會(huì)解決這個(gè)問題。
- 如果 acks = all,這種情況下是只有當(dāng)所有參與復(fù)制的節(jié)點(diǎn)都收到消息時(shí),生產(chǎn)者才會(huì)接收到一個(gè)來自服務(wù)器的消息。不過,它的延遲比 acks =1 時(shí)更高,因?yàn)槲覀円却恢灰粋€(gè)服務(wù)器節(jié)點(diǎn)接收消息。
buffer.memory
此參數(shù)用來設(shè)置生產(chǎn)者內(nèi)存緩沖區(qū)的大小,生產(chǎn)者用它緩沖要發(fā)送到服務(wù)器的消息。如果應(yīng)用程序發(fā)送消息的速度超過發(fā)送到服務(wù)器的速度,會(huì)導(dǎo)致生產(chǎn)者空間不足。這個(gè)時(shí)候,send() 方法調(diào)用要么被阻塞,要么拋出異常,具體取決于 block.on.buffer.null 參數(shù)的設(shè)置。
compression.type
此參數(shù)來表示生產(chǎn)者啟用何種壓縮算法,默認(rèn)情況下,消息發(fā)送時(shí)不會(huì)被壓縮。該參數(shù)可以設(shè)置為 snappy、gzip 和 lz4,它指定了消息發(fā)送給 broker 之前使用哪一種壓縮算法進(jìn)行壓縮。下面是各壓縮算法的對(duì)比
retries
生產(chǎn)者從服務(wù)器收到的錯(cuò)誤有可能是臨時(shí)性的錯(cuò)誤(比如分區(qū)找不到首領(lǐng)),在這種情況下,reteis 參數(shù)的值決定了生產(chǎn)者可以重發(fā)的消息次數(shù),如果達(dá)到這個(gè)次數(shù),生產(chǎn)者會(huì)放棄重試并返回錯(cuò)誤。默認(rèn)情況下,生產(chǎn)者在每次重試之間等待 100ms,這個(gè)等待參數(shù)可以通過 retry.backoff.ms 進(jìn)行修改。
batch.size
當(dāng)有多個(gè)消息需要被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者會(huì)把它們放在同一個(gè)批次里。該參數(shù)指定了一個(gè)批次可以使用的內(nèi)存大小,按照字節(jié)數(shù)計(jì)算。當(dāng)批次被填滿,批次里的所有消息會(huì)被發(fā)送出去。不過生產(chǎn)者井不一定都會(huì)等到批次被填滿才發(fā)送,任意條數(shù)的消息都可能被發(fā)送。
client.id
此參數(shù)可以是任意的字符串,服務(wù)器會(huì)用它來識(shí)別消息的來源,一般配置在日志里
max.in.flight.requests.per.connection
此參數(shù)指定了生產(chǎn)者在收到服務(wù)器響應(yīng)之前可以發(fā)送多少消息,它的值越高,就會(huì)占用越多的內(nèi)存,不過也會(huì)提高吞吐量。把它設(shè)為1 可以保證消息是按照發(fā)送的順序?qū)懭敕?wù)器。
timeout.ms、request.timeout.ms 和 metadata.fetch.timeout.ms
request.timeout.ms 指定了生產(chǎn)者在發(fā)送數(shù)據(jù)時(shí)等待服務(wù)器返回的響應(yīng)時(shí)間,metadata.fetch.timeout.ms 指定了生產(chǎn)者在獲取元數(shù)據(jù)(比如目標(biāo)分區(qū)的首領(lǐng)是誰)時(shí)等待服務(wù)器返回響應(yīng)的時(shí)間。如果等待時(shí)間超時(shí),生產(chǎn)者要么重試發(fā)送數(shù)據(jù),要么返回一個(gè)錯(cuò)誤。timeout.ms 指定了 broker 等待同步副本返回消息確認(rèn)的時(shí)間,與 asks 的配置相匹配----如果在指定時(shí)間內(nèi)沒有收到同步副本的確認(rèn),那么 broker 就會(huì)返回一個(gè)錯(cuò)誤。
max.block.ms
此參數(shù)指定了在調(diào)用 send() 方法或使用 partitionFor() 方法獲取元數(shù)據(jù)時(shí)生產(chǎn)者的阻塞時(shí)間當(dāng)生產(chǎn)者的發(fā)送緩沖區(qū)已捕,或者沒有可用的元數(shù)據(jù)時(shí),這些方法就會(huì)阻塞。在阻塞時(shí)間達(dá)到 max.block.ms 時(shí),生產(chǎn)者會(huì)拋出超時(shí)異常。
max.request.size
該參數(shù)用于控制生產(chǎn)者發(fā)送的請(qǐng)求大小。它可以指能發(fā)送的單個(gè)消息的最大值,也可以指單個(gè)請(qǐng)求里所有消息的總大小。
receive.buffer.bytes 和 send.buffer.bytes
Kafka 是基于 TCP 實(shí)現(xiàn)的,為了保證可靠的消息傳輸,這兩個(gè)參數(shù)分別指定了 TCP Socket 接收和發(fā)送數(shù)據(jù)包的緩沖區(qū)的大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)的默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心,那么可以適當(dāng)增大這些值。
Kafka Consumer
應(yīng)用程序使用 KafkaConsumer 從 Kafka 中訂閱主題并接收來自這些主題的消息,然后再把他們保存起來。應(yīng)用程序首先需要?jiǎng)?chuàng)建一個(gè) KafkaConsumer 對(duì)象,訂閱主題并開始接受消息,驗(yàn)證消息并保存結(jié)果。一段時(shí)間后,生產(chǎn)者往主題寫入的速度超過了應(yīng)用程序驗(yàn)證數(shù)據(jù)的速度,這時(shí)候該如何處理?如果只使用單個(gè)消費(fèi)者的話,應(yīng)用程序會(huì)跟不上消息生成的速度,就像多個(gè)生產(chǎn)者像相同的主題寫入消息一樣,這時(shí)候就需要多個(gè)消費(fèi)者共同參與消費(fèi)主題中的消息,對(duì)消息進(jìn)行分流處理。
Kafka 消費(fèi)者從屬于消費(fèi)者群組。一個(gè)群組中的消費(fèi)者訂閱的都是相同的主題,每個(gè)消費(fèi)者接收主題一部分分區(qū)的消息。下面是一個(gè) Kafka 分區(qū)消費(fèi)示意圖
上圖中的主題 T1 有四個(gè)分區(qū),分別是分區(qū)0、分區(qū)1、分區(qū)2、分區(qū)3,我們創(chuàng)建一個(gè)消費(fèi)者群組1,消費(fèi)者群組中只有一個(gè)消費(fèi)者,它訂閱主題T1,接收到 T1 中的全部消息。由于一個(gè)消費(fèi)者處理四個(gè)生產(chǎn)者發(fā)送到分區(qū)的消息,壓力有些大,需要幫手來幫忙分擔(dān)任務(wù),于是就演變?yōu)橄聢D
這樣一來,消費(fèi)者的消費(fèi)能力就大大提高了,但是在某些環(huán)境下比如用戶產(chǎn)生消息特別多的時(shí)候,生產(chǎn)者產(chǎn)生的消息仍舊讓消費(fèi)者吃不消,那就繼續(xù)增加消費(fèi)者。
如上圖所示,每個(gè)分區(qū)所產(chǎn)生的消息能夠被每個(gè)消費(fèi)者群組中的消費(fèi)者消費(fèi),如果向消費(fèi)者群組中增加更多的消費(fèi)者,那么多余的消費(fèi)者將會(huì)閑置,如下圖所示
向群組中增加消費(fèi)者是橫向伸縮消費(fèi)能力的主要方式??偠灾?,我們可以通過增加消費(fèi)組的消費(fèi)者來進(jìn)行水平擴(kuò)展提升消費(fèi)能力。這也是為什么建議創(chuàng)建主題時(shí)使用比較多的分區(qū)數(shù),這樣可以在消費(fèi)負(fù)載高的情況下增加消費(fèi)者來提升性能。另外,消費(fèi)者的數(shù)量不應(yīng)該比分區(qū)數(shù)多,因?yàn)槎喑鰜淼南M(fèi)者是空閑的,沒有任何幫助。
Kafka 一個(gè)很重要的特性就是,只需寫入一次消息,可以支持任意多的應(yīng)用讀取這個(gè)消息。換句話說,每個(gè)應(yīng)用都可以讀到全量的消息。為了使得每個(gè)應(yīng)用都能讀到全量消息,應(yīng)用需要有不同的消費(fèi)組。對(duì)于上面的例子,假如我們新增了一個(gè)新的消費(fèi)組 G2,而這個(gè)消費(fèi)組有兩個(gè)消費(fèi)者,那么就演變?yōu)橄聢D這樣
在這個(gè)場(chǎng)景中,消費(fèi)組 G1 和消費(fèi)組 G2 都能收到 T1 主題的全量消息,在邏輯意義上來說它們屬于不同的應(yīng)用。
總結(jié)起來就是如果應(yīng)用需要讀取全量消息,那么請(qǐng)為該應(yīng)用設(shè)置一個(gè)消費(fèi)組;如果該應(yīng)用消費(fèi)能力不足,那么可以考慮在這個(gè)消費(fèi)組里增加消費(fèi)者。
消費(fèi)者組和分區(qū)重平衡
消費(fèi)者組是什么
消費(fèi)者組(Consumer Group)是由一個(gè)或多個(gè)消費(fèi)者實(shí)例(Consumer Instance)組成的群組,具有可擴(kuò)展性和可容錯(cuò)性的一種機(jī)制。消費(fèi)者組內(nèi)的消費(fèi)者共享一個(gè)消費(fèi)者組ID,這個(gè)ID 也叫做 Group ID,組內(nèi)的消費(fèi)者共同對(duì)一個(gè)主題進(jìn)行訂閱和消費(fèi),同一個(gè)組中的消費(fèi)者只能消費(fèi)一個(gè)分區(qū)的消息,多余的消費(fèi)者會(huì)閑置,派不上用場(chǎng)。
我們?cè)谏厦嫣岬搅藘煞N消費(fèi)方式
- 一個(gè)消費(fèi)者群組消費(fèi)一個(gè)主題中的消息,這種消費(fèi)模式又稱為點(diǎn)對(duì)點(diǎn)的消費(fèi)方式,點(diǎn)對(duì)點(diǎn)的消費(fèi)方式又被稱為消息隊(duì)列
- 一個(gè)主題中的消息被多個(gè)消費(fèi)者群組共同消費(fèi),這種消費(fèi)模式又稱為發(fā)布-訂閱模式
消費(fèi)者重平衡
我們從上面的消費(fèi)者演變圖中可以知道這么一個(gè)過程:最初是一個(gè)消費(fèi)者訂閱一個(gè)主題并消費(fèi)其全部分區(qū)的消息,后來有一個(gè)消費(fèi)者加入群組,隨后又有更多的消費(fèi)者加入群組,而新加入的消費(fèi)者實(shí)例分?jǐn)偭俗畛跸M(fèi)者的部分消息,這種把分區(qū)的所有權(quán)通過一個(gè)消費(fèi)者轉(zhuǎn)到其他消費(fèi)者的行為稱為重平衡,英文名也叫做 Rebalance 。如下圖所示
重平衡非常重要,它為消費(fèi)者群組帶來了高可用性 和 伸縮性,我們可以放心的添加消費(fèi)者或移除消費(fèi)者,不過在正常情況下我們并不希望發(fā)生這樣的行為。在重平衡期間,消費(fèi)者無法讀取消息,造成整個(gè)消費(fèi)者組在重平衡的期間都不可用。另外,當(dāng)分區(qū)被重新分配給另一個(gè)消費(fèi)者時(shí),消息當(dāng)前的讀取狀態(tài)會(huì)丟失,它有可能還需要去刷新緩存,在它重新恢復(fù)狀態(tài)之前會(huì)拖慢應(yīng)用程序。
消費(fèi)者通過向組織協(xié)調(diào)者(Kafka Broker)發(fā)送心跳來維護(hù)自己是消費(fèi)者組的一員并確認(rèn)其擁有的分區(qū)。對(duì)于不同不的消費(fèi)群體來說,其組織協(xié)調(diào)者可以是不同的。只要消費(fèi)者定期發(fā)送心跳,就會(huì)認(rèn)為消費(fèi)者是存活的并處理其分區(qū)中的消息。當(dāng)消費(fèi)者檢索記錄或者提交它所消費(fèi)的記錄時(shí)就會(huì)發(fā)送心跳。
如果過了一段時(shí)間 Kafka 停止發(fā)送心跳了,會(huì)話(Session)就會(huì)過期,組織協(xié)調(diào)者就會(huì)認(rèn)為這個(gè) Consumer 已經(jīng)死亡,就會(huì)觸發(fā)一次重平衡。如果消費(fèi)者宕機(jī)并且停止發(fā)送消息,組織協(xié)調(diào)者會(huì)等待幾秒鐘,確認(rèn)它死亡了才會(huì)觸發(fā)重平衡。在這段時(shí)間里,死亡的消費(fèi)者將不處理任何消息。在清理消費(fèi)者時(shí),消費(fèi)者將通知協(xié)調(diào)者它要離開群組,組織協(xié)調(diào)者會(huì)觸發(fā)一次重平衡,盡量降低處理停頓。
重平衡是一把雙刃劍,它為消費(fèi)者群組帶來高可用性和伸縮性的同時(shí),還有有一些明顯的缺點(diǎn)(bug),而這些 bug 到現(xiàn)在社區(qū)還無法修改。
重平衡的過程對(duì)消費(fèi)者組有極大的影響。因?yàn)槊看沃仄胶膺^程中都會(huì)導(dǎo)致萬物靜止,參考 JVM 中的垃圾回收機(jī)制,也就是 Stop The World ,STW,(引用自《深入理解 Java 虛擬機(jī)》中 p76 關(guān)于 Serial 收集器的描述):
更重要的是它在進(jìn)行垃圾收集時(shí),必須暫停其他所有的工作線程。直到它收集結(jié)束。Stop The World 這個(gè)名字聽起來很帥,但這項(xiàng)工作實(shí)際上是由虛擬機(jī)在后臺(tái)自動(dòng)發(fā)起并完成的,在用戶不可見的情況下把用戶正常工作的線程全部停掉,這對(duì)很多應(yīng)用來說都是難以接受的。
也就是說,在重平衡期間,消費(fèi)者組中的消費(fèi)者實(shí)例都會(huì)停止消費(fèi),等待重平衡的完成。而且重平衡這個(gè)過程很慢......
創(chuàng)建消費(fèi)者
上面的理論說的有點(diǎn)多,下面就通過代碼來講解一下消費(fèi)者是如何消費(fèi)的
在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對(duì)象。創(chuàng)建 KafkaConsumer 對(duì)象與創(chuàng)建 KafkaProducer 對(duì)象十分相似 --- 把需要傳遞給消費(fèi)者的屬性放在 properties 對(duì)象中,后面我們會(huì)著重討論 Kafka 的一些配置,這里我們先簡(jiǎn)單的創(chuàng)建一下,使用3個(gè)屬性就足矣,分別是 bootstrap.server,key.deserializer,value.deserializer 。
這三個(gè)屬性我們已經(jīng)用過很多次了,如果你還不是很清楚的話,可以參考 帶你漲姿勢(shì)是認(rèn)識(shí)一下Kafka Producer
還有一個(gè)屬性是 group.id 這個(gè)屬性不是必須的,它指定了 KafkaConsumer 是屬于哪個(gè)消費(fèi)者群組。創(chuàng)建不屬于任何一個(gè)群組的消費(fèi)者也是可以的
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">Properties properties = new Properties();
properties.put("bootstrap.server","192.168.1.9:9092"); properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
KafkaConsumer<String,String> consumer = new KafkaConsumer<>(properties);</pre>
主題訂閱
創(chuàng)建好消費(fèi)者之后,下一步就開始訂閱主題了。subscribe() 方法接受一個(gè)主題列表作為參數(shù),使用起來比較簡(jiǎn)單
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">consumer.subscribe(Collections.singletonList("customerTopic"));</pre>
為了簡(jiǎn)單我們只訂閱了一個(gè)主題 customerTopic,參數(shù)傳入的是一個(gè)正則表達(dá)式,正則表達(dá)式可以匹配多個(gè)主題,如果有人創(chuàng)建了新的主題,并且主題的名字與正則表達(dá)式相匹配,那么會(huì)立即觸發(fā)一次重平衡,消費(fèi)者就可以讀取新的主題。
要訂閱所有與 test 相關(guān)的主題,可以這樣做
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">consumer.subscribe("test.*");</pre>
輪詢
我們知道,Kafka 是支持訂閱/發(fā)布模式的,生產(chǎn)者發(fā)送數(shù)據(jù)給 Kafka Broker,那么消費(fèi)者是如何知道生產(chǎn)者發(fā)送了數(shù)據(jù)呢?其實(shí)生產(chǎn)者產(chǎn)生的數(shù)據(jù)消費(fèi)者是不知道的,KafkaConsumer 采用輪詢的方式定期去 Kafka Broker 中進(jìn)行數(shù)據(jù)的檢索,如果有數(shù)據(jù)就用來消費(fèi),如果沒有就再繼續(xù)輪詢等待,下面是輪詢等待的具體實(shí)現(xiàn)
<pre style="-webkit-tap-highlight-color: transparent; box-sizing: border-box; font-family: Consolas, Menlo, Courier, monospace; font-size: 16px; white-space: pre-wrap; position: relative; line-height: 1.5; color: rgb(153, 153, 153); margin: 1em 0px; padding: 12px 10px; background: rgb(244, 245, 246); border: 1px solid rgb(232, 232, 232); font-style: normal; font-variant-ligatures: normal; font-variant-caps: normal; font-weight: 400; letter-spacing: normal; orphans: 2; text-align: start; text-indent: 0px; text-transform: none; widows: 2; word-spacing: 0px; -webkit-text-stroke-width: 0px; text-decoration-style: initial; text-decoration-color: initial;">try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
for (ConsumerRecord<String, String> record : records) {
int updateCount = 1;
if (map.containsKey(record.value())) {
updateCount = (int) map.get(record.value() + 1);
}
map.put(record.value(), updateCount);
}
}
}finally {
consumer.close();
}</pre>
- 這是一個(gè)無限循環(huán)。消費(fèi)者實(shí)際上是一個(gè)長期運(yùn)行的應(yīng)用程序,它通過輪詢的方式向 Kafka 請(qǐng)求數(shù)據(jù)。
- 第三行代碼非常重要,Kafka 必須定期循環(huán)請(qǐng)求數(shù)據(jù),否則就會(huì)認(rèn)為該 Consumer 已經(jīng)掛了,會(huì)觸發(fā)重平衡,它的分區(qū)會(huì)移交給群組中的其它消費(fèi)者。傳給 poll() 方法的是一個(gè)超市時(shí)間,用 java.time.Duration 類來表示,如果該參數(shù)被設(shè)置為 0 ,poll() 方法會(huì)立刻返回,否則就會(huì)在指定的毫秒數(shù)內(nèi)一直等待 broker 返回?cái)?shù)據(jù)。
- poll() 方法會(huì)返回一個(gè)記錄列表。每條記錄都包含了記錄所屬主題的信息,記錄所在分區(qū)的信息、記錄在分區(qū)中的偏移量,以及記錄的鍵值對(duì)。我們一般會(huì)遍歷這個(gè)列表,逐條處理每條記錄。
- 在退出應(yīng)用程序之前使用 close() 方法關(guān)閉消費(fèi)者。網(wǎng)絡(luò)連接和 socket 也會(huì)隨之關(guān)閉,并立即觸發(fā)一次重平衡,而不是等待群組協(xié)調(diào)器發(fā)現(xiàn)它不再發(fā)送心跳并認(rèn)定它已經(jīng)死亡。
線程安全性
在同一個(gè)群組中,我們無法讓一個(gè)線程運(yùn)行多個(gè)消費(fèi)者,也無法讓多個(gè)線程安全的共享一個(gè)消費(fèi)者。按照規(guī)則,一個(gè)消費(fèi)者使用一個(gè)線程,如果一個(gè)消費(fèi)者群組中多個(gè)消費(fèi)者都想要運(yùn)行的話,那么必須讓每個(gè)消費(fèi)者在自己的線程中運(yùn)行,可以使用 Java 中的 ExecutorService 啟動(dòng)多個(gè)消費(fèi)者進(jìn)行進(jìn)行處理。
消費(fèi)者配置
到目前為止,我們學(xué)習(xí)了如何使用消費(fèi)者 API,不過只介紹了幾個(gè)最基本的屬性,Kafka 文檔列出了所有與消費(fèi)者相關(guān)的配置說明。大部分參數(shù)都有合理的默認(rèn)值,一般不需要修改它們,下面我們就來介紹一下這些參數(shù)。
- fetch.min.bytes
該屬性指定了消費(fèi)者從服務(wù)器獲取記錄的最小字節(jié)數(shù)。broker 在收到消費(fèi)者的數(shù)據(jù)請(qǐng)求時(shí),如果可用的數(shù)據(jù)量小于 fetch.min.bytes指定的大小,那么它會(huì)等到有足夠的可用數(shù)據(jù)時(shí)才把它返回給消費(fèi)者。這樣可以降低消費(fèi)者和 broker 的工作負(fù)載,因?yàn)樗鼈冊(cè)谥黝}使用頻率不是很高的時(shí)候就不用來回處理消息。如果沒有很多可用數(shù)據(jù),但消費(fèi)者的 CPU 使用率很高,那么就需要把該屬性的值設(shè)得比默認(rèn)值大。如果消費(fèi)者的數(shù)量比較多,把該屬性的值調(diào)大可以降低 broker 的工作負(fù)載。
- fetch.max.wait.ms
我們通過上面的 fetch.min.bytes 告訴 Kafka,等到有足夠的數(shù)據(jù)時(shí)才會(huì)把它返回給消費(fèi)者。而 fetch.max.wait.ms 則用于指定 broker 的等待時(shí)間,默認(rèn)是 500 毫秒。如果沒有足夠的數(shù)據(jù)流入 kafka 的話,消費(fèi)者獲取的最小數(shù)據(jù)量要求就得不到滿足,最終導(dǎo)致 500 毫秒的延遲。如果要降低潛在的延遲,就可以把參數(shù)值設(shè)置的小一些。如果 fetch.max.wait.ms 被設(shè)置為 100 毫秒的延遲,而 fetch.min.bytes 的值設(shè)置為 1MB,那么 Kafka 在收到消費(fèi)者請(qǐng)求后,要么返回 1MB 的數(shù)據(jù),要么在 100 ms 后返回所有可用的數(shù)據(jù)。就看哪個(gè)條件首先被滿足。
- max.partition.fetch.bytes
該屬性指定了服務(wù)器從每個(gè)分區(qū)里返回給消費(fèi)者的最大字節(jié)數(shù)。它的默認(rèn)值時(shí) 1MB,也就是說,KafkaConsumer.poll() 方法從每個(gè)分區(qū)里返回的記錄最多不超過 max.partition.fetch.bytes 指定的字節(jié)。如果一個(gè)主題有20個(gè)分區(qū)和5個(gè)消費(fèi)者,那么每個(gè)消費(fèi)者需要至少4 MB的可用內(nèi)存來接收記錄。在為消費(fèi)者分配內(nèi)存時(shí),可以給它們多分配一些,因?yàn)槿绻航M里有消費(fèi)者發(fā)生崩潰,剩下的消費(fèi)者需要處理更多的分區(qū)。max.partition.fetch.bytes 的值必須比 broker 能夠接收的最大消息的字節(jié)數(shù)(通過 max.message.size 屬性配置大),否則消費(fèi)者可能無法讀取這些消息,導(dǎo)致消費(fèi)者一直掛起重試。 在設(shè)置該屬性時(shí),另外一個(gè)考量的因素是消費(fèi)者處理數(shù)據(jù)的時(shí)間。消費(fèi)者需要頻繁的調(diào)用 poll() 方法來避免會(huì)話過期和發(fā)生分區(qū)再平衡,如果單次調(diào)用poll() 返回的數(shù)據(jù)太多,消費(fèi)者需要更多的時(shí)間進(jìn)行處理,可能無法及時(shí)進(jìn)行下一個(gè)輪詢來避免會(huì)話過期。如果出現(xiàn)這種情況,可以把 max.partition.fetch.bytes 值改小,或者延長會(huì)話過期時(shí)間。
- session.timeout.ms
這個(gè)屬性指定了消費(fèi)者在被認(rèn)為死亡之前可以與服務(wù)器斷開連接的時(shí)間,默認(rèn)是 3s。如果消費(fèi)者沒有在 session.timeout.ms 指定的時(shí)間內(nèi)發(fā)送心跳給群組協(xié)調(diào)器,就會(huì)被認(rèn)定為死亡,協(xié)調(diào)器就會(huì)觸發(fā)重平衡。把它的分區(qū)分配給消費(fèi)者群組中的其它消費(fèi)者,此屬性與 heartbeat.interval.ms 緊密相關(guān)。heartbeat.interval.ms 指定了 poll() 方法向群組協(xié)調(diào)器發(fā)送心跳的頻率,session.timeout.ms 則指定了消費(fèi)者可以多久不發(fā)送心跳。所以,這兩個(gè)屬性一般需要同時(shí)修改,heartbeat.interval.ms 必須比 session.timeout.ms 小,一般是 session.timeout.ms 的三分之一。如果 session.timeout.ms 是 3s,那么 heartbeat.interval.ms 應(yīng)該是 1s。把 session.timeout.ms 值設(shè)置的比默認(rèn)值小,可以更快地檢測(cè)和恢復(fù)崩憤的節(jié)點(diǎn),不過長時(shí)間的輪詢或垃圾收集可能導(dǎo)致非預(yù)期的重平衡。把該屬性的值設(shè)置得大一些,可以減少意外的重平衡,不過檢測(cè)節(jié)點(diǎn)崩潰需要更長的時(shí)間。
- auto.offset.reset
該屬性指定了消費(fèi)者在讀取一個(gè)沒有偏移量的分區(qū)或者偏移量無效的情況下的該如何處理。它的默認(rèn)值是 latest,意思指的是,在偏移量無效的情況下,消費(fèi)者將從最新的記錄開始讀取數(shù)據(jù)。另一個(gè)值是 earliest,意思指的是在偏移量無效的情況下,消費(fèi)者將從起始位置處開始讀取分區(qū)的記錄。
- enable.auto.commit
我們稍后將介紹幾種不同的提交偏移量的方式。該屬性指定了消費(fèi)者是否自動(dòng)提交偏移量,默認(rèn)值是 true,為了盡量避免出現(xiàn)重復(fù)數(shù)據(jù)和數(shù)據(jù)丟失,可以把它設(shè)置為 false,由自己控制何時(shí)提交偏移量。如果把它設(shè)置為 true,還可以通過 auto.commit.interval.ms 屬性來控制提交的頻率
- partition.assignment.strategy
我們知道,分區(qū)會(huì)分配給群組中的消費(fèi)者。PartitionAssignor 會(huì)根據(jù)給定的消費(fèi)者和主題,決定哪些分區(qū)應(yīng)該被分配給哪個(gè)消費(fèi)者,Kafka 有兩個(gè)默認(rèn)的分配策略Range 和 RoundRobin
- client.id
該屬性可以是任意字符串,broker 用他來標(biāo)識(shí)從客戶端發(fā)送過來的消息,通常被用在日志、度量指標(biāo)和配額中
- max.poll.records
該屬性用于控制單次調(diào)用 call() 方法能夠返回的記錄數(shù)量,可以幫你控制在輪詢中需要處理的數(shù)據(jù)量。
- receive.buffer.bytes 和 send.buffer.bytes
socket 在讀寫數(shù)據(jù)時(shí)用到的 TCP 緩沖區(qū)也可以設(shè)置大小。如果它們被設(shè)置為 -1,就使用操作系統(tǒng)默認(rèn)值。如果生產(chǎn)者或消費(fèi)者與 broker 處于不同的數(shù)據(jù)中心內(nèi),可以適當(dāng)增大這些值,因?yàn)榭鐢?shù)據(jù)中心的網(wǎng)絡(luò)一般都有比較高的延遲和比較低的帶寬。
提交和偏移量的概念
特殊偏移
我們上面提到,消費(fèi)者在每次調(diào)用poll() 方法進(jìn)行定時(shí)輪詢的時(shí)候,會(huì)返回由生產(chǎn)者寫入 Kafka 但是還沒有被消費(fèi)者消費(fèi)的記錄,因此我們可以追蹤到哪些記錄是被群組里的哪個(gè)消費(fèi)者讀取的。消費(fèi)者可以使用 Kafka 來追蹤消息在分區(qū)中的位置(偏移量)
消費(fèi)者會(huì)向一個(gè)叫做 _consumer_offset 的特殊主題中發(fā)送消息,這個(gè)主題會(huì)保存每次所發(fā)送消息中的分區(qū)偏移量,這個(gè)主題的主要作用就是消費(fèi)者觸發(fā)重平衡后記錄偏移使用的,消費(fèi)者每次向這個(gè)主題發(fā)送消息,正常情況下不觸發(fā)重平衡,這個(gè)主題是不起作用的,當(dāng)觸發(fā)重平衡后,消費(fèi)者停止工作,每個(gè)消費(fèi)者可能會(huì)分到對(duì)應(yīng)的分區(qū),這個(gè)主題就是讓消費(fèi)者能夠繼續(xù)處理消息所設(shè)置的。
如果提交的偏移量小于客戶端最后一次處理的偏移量,那么位于兩個(gè)偏移量之間的消息就會(huì)被重復(fù)處理
如果提交的偏移量大于最后一次消費(fèi)時(shí)的偏移量,那么處于兩個(gè)偏移量中間的消息將會(huì)丟失
既然_consumer_offset 如此重要,那么它的提交方式是怎樣的呢?下面我們就來說一下####提交方式
KafkaConsumer API 提供了多種方式來提交偏移量
自動(dòng)提交
最簡(jiǎn)單的方式就是讓消費(fèi)者自動(dòng)提交偏移量。如果 enable.auto.commit 被設(shè)置為true,那么每過 5s,消費(fèi)者會(huì)自動(dòng)把從 poll() 方法輪詢到的最大偏移量提交上去。提交時(shí)間間隔由 auto.commit.interval.ms 控制,默認(rèn)是 5s。與消費(fèi)者里的其他東西一樣,自動(dòng)提交也是在輪詢中進(jìn)行的。消費(fèi)者在每次輪詢中會(huì)檢查是否提交該偏移量了,如果是,那么就會(huì)提交從上一次輪詢中返回的偏移量。
提交當(dāng)前偏移量
把 auto.commit.offset 設(shè)置為 false,可以讓應(yīng)用程序決定何時(shí)提交偏移量。使用 commitSync() 提交偏移量。這個(gè) API 會(huì)提交由 poll() 方法返回的最新偏移量,提交成功后馬上返回,如果提交失敗就拋出異常。
commitSync() 將會(huì)提交由 poll() 返回的最新偏移量,如果處理完所有記錄后要確保調(diào)用了 commitSync(),否則還是會(huì)有丟失消息的風(fēng)險(xiǎn),如果發(fā)生了在均衡,從最近一批消息到發(fā)生在均衡之間的所有消息都將被重復(fù)處理。
異步提交
異步提交 commitAsync() 與同步提交 commitSync() 最大的區(qū)別在于異步提交不會(huì)進(jìn)行重試,同步提交會(huì)一致進(jìn)行重試。
同步和異步組合提交
一般情況下,針對(duì)偶爾出現(xiàn)的提交失敗,不進(jìn)行重試不會(huì)有太大的問題,因?yàn)槿绻峤皇∈且驗(yàn)榕R時(shí)問題導(dǎo)致的,那么后續(xù)的提交總會(huì)有成功的。但是如果在關(guān)閉消費(fèi)者或再均衡前的最后一次提交,就要確保提交成功。
因此,在消費(fèi)者關(guān)閉之前一般會(huì)組合使用commitAsync和commitSync提交偏移量。
提交特定的偏移量
消費(fèi)者API允許調(diào)用 commitSync() 和 commitAsync() 方法時(shí)傳入希望提交的 partition 和 offset 的 map,即提交特定的偏移量。