kafka線上環(huán)境部署

集群環(huán)境規(guī)劃

操作系統(tǒng)的選型

目前部署Kafka最多的3類操作系統(tǒng)分別是Linux、OS X和Windows,其中部署在Linux上的最多,而Linux也是推薦的操作系統(tǒng)。為什么呢?且不說當(dāng)前的現(xiàn)狀的確是Linux服務(wù)器數(shù)量最多,單論它與Kafka本身的相適性,Linux也要比Windows等其他操作系統(tǒng)更加適合部署Kafka。這里筆者羅列出自己能想到的兩個主要原因:I/O模型的使用和數(shù)據(jù)網(wǎng)絡(luò)傳輸效率。

談到I/O模型,就不能不說當(dāng)前主流且耳熟能詳?shù)?種模型:阻塞I/O、非阻塞I/O、I/O多路復(fù)用、信號驅(qū)動I/O和異步I/O。每一種I/O模型都有典型的使用場景,比如Socket的阻塞模式和非阻塞模式就對應(yīng)于前兩種模型,而Linux中的select函數(shù)就屬于I/O多路復(fù)用模型,至于第5種模型其實很少有UNIX和類UNIX系統(tǒng)支持,Windows的IOCP(I/O Completion Port,簡稱IOCP)屬于此模型。至于大名鼎鼎的Linux epoll模型,則可以看作兼具第3種和第4種模型的特性。

由于篇幅有限,我們不會針對每種I/O模型進(jìn)行詳細(xì)的展開,但通常情況下我們會認(rèn)為epoll比select模型高級。畢竟epoll取消了輪詢機(jī)制,取而代之的是回調(diào)機(jī)制(callback)。這樣當(dāng)?shù)讓舆B接Socket數(shù)較多時,可以避免很多無意義的CPU時間浪費。另外,Windows的IOCP模型可以說是真正的異步I/O模型,但由于其母系統(tǒng)的局限性,IOCP并沒有廣泛應(yīng)用。

說了這么多,這些和Kafka又有什么關(guān)系呢?關(guān)鍵就在于clients底層網(wǎng)絡(luò)庫的設(shè)計。Kafka新版本clients在設(shè)計底層網(wǎng)絡(luò)庫時采用了Java的Selector機(jī)制,而后者在Linux上的實現(xiàn)機(jī)制就是epoll;但是在Windows平臺上,Java NIO的Selector底層是使用select模型而非IOCP實現(xiàn)的,只有Java NIO2才是使用IOCP實現(xiàn)的。因此在這一點上,在Linux上部署Kafka要比在Windows上部署能夠得到更高效的I/O處理性能。

對于第二個方面,即數(shù)據(jù)網(wǎng)絡(luò)傳輸效率而言,Linux也更有優(yōu)勢。具體來說,Kafka這種應(yīng)用必然需要大量地通過網(wǎng)絡(luò)與磁盤進(jìn)行數(shù)據(jù)傳輸,而大部分這樣的操作都是通過Java的FileChannel.transferTo方法實現(xiàn)的。在Linux平臺上該方法底層會調(diào)用sendfile系統(tǒng)調(diào)用,即采用了Linux提供的零拷貝(Zero Copy)技術(shù)。

如前面章節(jié)所言,這種零拷貝技術(shù)可以有效地改善數(shù)據(jù)傳輸?shù)男阅?。在?nèi)核驅(qū)動程序處理I/O數(shù)據(jù)的時候,它可以減少甚至完全規(guī)避不必要的CPU數(shù)據(jù)拷貝操作,避免數(shù)據(jù)在操作系統(tǒng)內(nèi)核地址空間和用戶應(yīng)用程序地址空間的緩沖區(qū)間進(jìn)行重復(fù)拷貝,因而可以獲得很好的性能。Linux提供的諸如mmap、sendfile以及splice等系統(tǒng)調(diào)用即實現(xiàn)了這樣的技術(shù)。

然而對于Windows平臺而言,雖然它也提供了TransmitFile函數(shù)來支持零拷貝技術(shù),但是直到Java 8u60版本W(wǎng)indows平臺才正式讓FileChannel的transferTo方法調(diào)用該函數(shù)。具體詳見這個JDK bug:http://bugs.java.com/view_bug.do?bug_id=8064407。
鑒于很多公司目前的生產(chǎn)環(huán)境中還沒有正式上線Java 8,因而在Windows平臺上部署Kafka將很有可能無法享受到零拷貝技術(shù)帶來的高效數(shù)據(jù)傳輸。

磁盤規(guī)劃

如果問哪個因素對Kafka性能最重要?磁盤無疑是排名靠前的答案。眾所周知,Kafka是大量使用磁盤的。Kafka的每條消息都必須被持久化到底層的存儲中,并且只有被規(guī)定數(shù)量的broker成功接收后才能通知clients消息發(fā)送成功,因此消息越是被更快地保存在磁盤上,處理clients請求的延時越低,表現(xiàn)出來的用戶體驗也就越好。

在確定磁盤時,一個常見的問題就是選擇普通的機(jī)械硬盤(HDD)還是固態(tài)硬盤(SSD)。機(jī)械硬盤成本低且容量大,而SSD通常有著極低的尋道時間(seek time)和存取時間(access time),性能上的優(yōu)勢很大,但同時也有著非常高的成本。因此在規(guī)劃Kafka線上環(huán)境時,讀者就需要根據(jù)公司自身的實際條件進(jìn)行有針對性的選型。但以筆者使用Kafka的經(jīng)驗來看,Kafka使用磁盤的方式在很大程度上抵消了SSD提供的那些突出優(yōu)勢。眾所周知,SSD強(qiáng)就強(qiáng)在它不是機(jī)械裝置,而全部由電子芯片及電路板組成,因而可以極大地避免傳統(tǒng)機(jī)械硬盤緩慢的磁頭尋道時間。一般機(jī)械硬盤的尋道時間都是毫秒級的。若有大量的隨機(jī)I/O操作,則整體的磁盤延時將是非常可觀的,但SSD則不受這樣的拖累??墒沁@點差異對于Kafka來說又顯得不是那么重要。為什么?因為Kafka是順序?qū)懘疟P的,而磁盤順序I/O的性能,即使機(jī)械硬盤也是不弱的——順序I/O不需要頻繁地移動磁頭,因而節(jié)省了耗時的尋道時間。所以從磁盤的使用這個方面來看,筆者并不認(rèn)為兩者有著巨大的性能差異。關(guān)于Kafka底層的持久化實現(xiàn),我們會在第6章中詳細(xì)討論。因此對于預(yù)算有限且追求高性價比的公司而言,機(jī)械硬盤完全可以勝任Kafka存儲的任務(wù)。

既然是資源規(guī)劃和硬件選型,我們不妨看下LinkedIn公司是怎么做的。
之前提到過,LinkedIn公司目前的Kafka就搭建于RAID 10之上。他們在Kafka層面設(shè)定的副本數(shù)是2,因此根據(jù)RAID 10的特性,這套集群實際上提供了4倍的數(shù)據(jù)冗余,且只能容忍一臺broker宕機(jī)(因為副本數(shù)=2)。若LinkedIn公司把副本數(shù)提高到3,那么就提供了6倍的數(shù)據(jù)冗余。這將是一筆很大的成本開銷。但是,如果我們假設(shè)LinkedIn公司使用的是JBOD方案。雖然目前JBOD有諸多限制,但其低廉的價格和超高的性價比的確是非常大的優(yōu)勢。另外通過一些簡單的設(shè)置,JBOD方案可以達(dá)到和RAID方案一樣的數(shù)據(jù)冗余效果。比如說,如果使用JBOD并且設(shè)置副本數(shù)為4,那么Kafka集群依然提供4倍的數(shù)據(jù)冗余,但是這個方案中整個集群可以容忍最多3臺broker宕機(jī)而不丟失數(shù)據(jù)。對比之前的RAID方案,JBOD方案沒有犧牲任何高可靠性或是增加硬件成本,同時還提升了整個集群的高可用性。
事實上,LinkedIn公司目前正在計劃將整個Kafka集群從RAID 10遷移到JBOD上,只不過在整個過程中JBOD方案需要解決當(dāng)前Kafka一些固有缺陷,比如:

  • 任意磁盤損壞都會導(dǎo)致broker宕機(jī)——普通磁盤損壞的概率是很大的,因此這個缺陷從某種程度上來說是致命的。不過社區(qū)正在改進(jìn)這個問題,未來版本中只要為broker配置的多塊磁盤中還有狀態(tài)良好的磁盤,broker就不會掛掉。

  • JBOD的管理需要更加細(xì)粒度化——目前Kafka沒有提供腳本或其他工具用于在不同磁盤間進(jìn)行分區(qū)手動分配,但這是使用JBOD方案中必要的功能。

  • JBOD也應(yīng)該提供類似于負(fù)載均衡的功能——目前只是簡單地依賴輪詢的方式為新副本數(shù)據(jù)選擇磁盤,后續(xù)需要提供更加豐富的策略。

結(jié)合JBOD和RAID之間的優(yōu)劣對比以及LinkedIn公司的實際案例,筆者認(rèn)為:對于一般的公司或組織而言,選擇JBOD方案的性價比更高。另外推薦用戶為每個broker都配置多個日志路徑,每個路徑都獨立掛載在不同的磁盤上,這使得多塊物理磁盤磁頭同時執(zhí)行物理I/O寫操作,可以極大地加速Kafka消息生產(chǎn)的速度。

最后關(guān)于磁盤的一個建議就是,盡量不要使用NAS(Network Attached Storage)這樣的網(wǎng)絡(luò)存儲設(shè)備。對比本地存儲,人們總是以為NAS方案速度更快也更可靠,其實然。NAS一個很大的弊端在于,它們通常都運行在低端的硬件上,這就使得它們的性能很差,可能比一臺筆記本電腦的硬盤強(qiáng)不了多少,表現(xiàn)為平均延時有很大的不穩(wěn)定性,而幾乎所有高端的NAS設(shè)備廠商都售賣專有的硬件設(shè)備,因此成本的開銷也是一個需要考慮的因素。

綜合以上所有的考量,筆者給硬盤規(guī)劃的結(jié)論性總結(jié)如下。

  • 追求性價比的公司可以考慮使用JBOD。
  • 使用機(jī)械硬盤完全可以滿足Kafka集群的使用,SSD更好。

磁盤容量規(guī)劃

Kafka集群到底需要多大的磁盤容量?

Kafka的每條消息都保存在實際的物理磁盤中,這些消息默認(rèn)會被broker保存一段時間之后清除。這段時間是可以配置的,因此用戶可以根據(jù)自身實際業(yè)務(wù)場景和存儲需求來大致計算線上環(huán)境所需的磁盤容量。

讓我們以一個實際的例子來看下應(yīng)該如何思考這個問題。假設(shè)在你的業(yè)務(wù)場景中,clients每天會產(chǎn)生1億條消息,每條消息保存兩份并保留一周的時間,平均一條消息的大小是1KB,那么我們需要為Kafka規(guī)劃多少磁盤空間呢?如果每天1億條消息,那么每天產(chǎn)生的消息會占用1億×2×1KB/1000/1000=200GB的磁盤空間。我們最好再額外預(yù)留10%的磁盤空間用于其他數(shù)據(jù)文件(比如索引文件等)的存儲,因此在這種使用場景下每天新發(fā)送的消息將占用210GB左右的磁盤空間。因為還要保存一周的數(shù)據(jù),所以整體的磁盤容量規(guī)劃是210×7≈1.5TB。當(dāng)然,這是無壓縮的情況。如果在clients啟用了消息壓縮,我們可以預(yù)估一個平均的壓縮比(比如0.5),那么整體的磁盤容量就是0.75TB。

總之對于磁盤容量的規(guī)劃和以下多個因素有關(guān)。

  • 新增消息數(shù)。
  • 消息留存時間。
  • 平均消息大小。
  • 副本數(shù)。
  • 是否啟用壓縮。

內(nèi)存規(guī)劃

乍一看似乎關(guān)于內(nèi)存規(guī)劃的討論沒什么必要,畢竟用戶能做的就只是分配一個合適大小的內(nèi)存,其他也沒有可以調(diào)整的地方了。其實不然!Kafka對于內(nèi)存的使用可稱作其設(shè)計亮點之一。雖然在前面我們強(qiáng)調(diào)了Kafka大量依靠文件系統(tǒng)和磁盤來保存消息,但其實它還會對消息進(jìn)行緩存,而這個消息緩存的地方就是內(nèi)存,具體來說是操作系統(tǒng)的頁緩存(page cache)。

Kafka雖然會持久化每條消息,但其實這個工作都是底層的文件系統(tǒng)來完成的,Kafka僅僅將消息寫入page cache而已,之后將消息“沖刷”到磁盤的任務(wù)完全交由操作系統(tǒng)來完成。另外consumer在讀取消息時也會首先嘗試從該區(qū)域中查找,如果直接命中則完全不用執(zhí)行耗時的物理I/O操作,從而提升了consumer的整體性能。不論是緩沖已發(fā)送消息還是待讀取消息,操作系統(tǒng)都要先開辟一塊內(nèi)存區(qū)域用于存放接收的Kafka消息,因此這塊內(nèi)存區(qū)域大小的設(shè)置對于Kafka的性能就顯得尤為關(guān)鍵了。

Kafka對于Java堆內(nèi)存的使用反而不是很多,因為Kafka中的消息通常都屬于“朝生夕滅”的對象實例,可以很快地垃圾回收(GC)。一般情況下,broker所需的堆內(nèi)存都不會超過6GB。所以對于一臺16GB內(nèi)存的機(jī)器而言,文件系統(tǒng)page cache的大小甚至可以達(dá)到10~14GB!

除以上這些考量之外,用戶還需要把page cache大小與實際線上環(huán)境中設(shè)置的日志段大小相比較(關(guān)于日志段的描述會在第6章中詳細(xì)展開)。假設(shè)單個日志段文件大小設(shè)置為10GB,那么你至少應(yīng)該給予page cache 10GB以上的內(nèi)存空間。這樣,待消費的消息有很大概率會保存在頁緩存中,故consumer能夠直接命中頁緩存而無須執(zhí)行緩慢的磁盤I/O讀操作。
總之對于內(nèi)存規(guī)劃的建議如下。

  • 盡量分配更多的內(nèi)存給操作系統(tǒng)的page cache。
  • 不要為broker設(shè)置過大的堆內(nèi)存,最好不超過6GB。
  • page cache大小至少要大于一個日志段的大小。

CPU規(guī)劃

比起磁盤和內(nèi)存,CPU于Kafka而言并沒有那么重要——嚴(yán)格來說,Kafka不屬于計算密集型(CPU-bound)的系統(tǒng),因此對于CPU需要記住一點就可以了:追求多核而非高時鐘頻率。簡單來說,Kafka的機(jī)器有16個CPU核這件事情比該機(jī)器CPU時鐘高達(dá)4GHz更加重要,因為Kafka可能無法充分利用這4GHz的頻率,但幾乎肯定會用滿16個CPU核。Kafka broker通常會創(chuàng)建幾十個后臺線程,再加上多個垃圾回收線程,多核系統(tǒng)顯然是最佳的配置選擇。

當(dāng)然,凡事皆有例外。若clients端啟用了消息壓縮,那么除了要為clients機(jī)器分配足夠的CPU資源外,broker端也有可能需要大量的CPU資源——盡管Kafka 0.10.0.0改進(jìn)了在broker端的消息處理,免除了解壓縮消息的負(fù)擔(dān)以節(jié)省磁盤占用和網(wǎng)絡(luò)帶寬,但并非所有情況下都可以避免這種解壓縮(比如clients端和broker端配置的消息版本號不匹配)。若出現(xiàn)這種情況,用戶就需要為broker端的機(jī)器也配置充裕的CPU資源。

基于以上的判斷依據(jù),我們對CPU資源規(guī)劃的建議如下。

  • 使用多核系統(tǒng),CPU核數(shù)最好大于8。
  • 如果使用Kafka 0.10.0.0之前的版本或clients端- 與broker端消息版本不一致(若無顯式配置,這種情況多半由clients和broker版本不一致造成),則考慮多配置一些資源以防止消息解壓縮操作消耗過多CPU。

帶寬規(guī)劃

對于Kafka這種在網(wǎng)絡(luò)間傳輸大量數(shù)據(jù)的分布式數(shù)據(jù)管道而言,帶寬資源至關(guān)重要,并且特別容易成為系統(tǒng)的瓶頸,因此一個快速且穩(wěn)定的網(wǎng)絡(luò)是Kafka集群搭建的前提條件。低延時的網(wǎng)絡(luò)以及高帶寬有助于實現(xiàn)Kafka集群的高吞吐量以及用戶請求處理低延時。
當(dāng)前主流的網(wǎng)絡(luò)環(huán)境皆是使用以太網(wǎng),帶寬主要也有兩種:1Gb/s和10Gb/s,即平時所說的千兆位網(wǎng)絡(luò)和萬兆位網(wǎng)絡(luò)。無論是哪種帶寬,對于大多數(shù)的Kafka集群來說都足矣了。

舉一個實際的例子來說明如何規(guī)劃帶寬資源。假設(shè)用戶網(wǎng)絡(luò)環(huán)境中的帶寬是1Gb/s,用戶的業(yè)務(wù)目標(biāo)是每天用1小時處理1TB的業(yè)務(wù)消息,那么在這種情況下Kafka到底需要多少臺機(jī)器呢?讓我們來計算一下:網(wǎng)絡(luò)帶寬是1Gb/s,即每秒傳輸1Gb的數(shù)據(jù),假設(shè)分配的機(jī)器為Kafka專屬使用(通常不建議與其他應(yīng)用或是框架部署在同一臺機(jī)器上)且為Kafka分配70%的帶寬資源——考慮到機(jī)器上還有其他的進(jìn)程使用網(wǎng)絡(luò)且網(wǎng)卡通常不能用滿,超過一定閾值可能出現(xiàn)網(wǎng)絡(luò)丟包的情況,因此70%的設(shè)定實際上是很合理的——那么Kafka單臺broker的帶寬就是1Gb/s×0.7≈710Mb/s,但事實上這是Kafka所使用的最高帶寬,用戶不能奢望Kafka集群平時就一直使用如此多的帶寬,畢竟萬一碰到突發(fā)流量,會極容易把網(wǎng)卡“打滿”,因此在70%的基礎(chǔ)上,一般再截取1/3,即710Mb/s/3≈240Mb/s。這里的1/3是一個相對保守的數(shù)字,用戶可以根據(jù)自身的業(yè)務(wù)特點酌情增加。好了,根據(jù)現(xiàn)有的網(wǎng)絡(luò)情況,我們明確了單臺broker的帶寬是240Mb/s。如果要在1小時內(nèi)處理1TB的業(yè)務(wù)消息,即每秒需要處理292MB左右的數(shù)據(jù),也就是每秒2336Mb數(shù)據(jù),那么至少需要2336/240≈10臺broker機(jī)器。若副本數(shù)是2,那么這個數(shù)字還需要再翻1倍,即20臺broker機(jī)器。根據(jù)萬兆位網(wǎng)卡來評估broker機(jī)器的方法是類似的。
關(guān)于帶寬資源方面的規(guī)劃,用戶還需要注意的是盡量避免使用跨機(jī)房的網(wǎng)絡(luò)環(huán)境,特別是那些跨城市甚至是跨大洲的網(wǎng)絡(luò)。因為這些網(wǎng)絡(luò)條件下請求的延時將會非常高,不管是broker端還是clients端都需要額外做特定的配置才能適應(yīng)。

綜合上述內(nèi)容,我們對帶寬資源規(guī)劃的建議如下。

  • 盡量使用高速網(wǎng)絡(luò)。
  • 根據(jù)自身網(wǎng)絡(luò)條件和帶寬來評估Kafka集群機(jī)器數(shù)量。
  • 避免使用跨機(jī)房網(wǎng)絡(luò)。

典型線上配置

下面給出一份典型的線上環(huán)境配置,用戶可以參考這份配置以及結(jié)合自己的實際情況進(jìn)行二次調(diào)整。

  • CPU 24核。
  • 內(nèi)存32GB。
  • 磁盤1TB 7200轉(zhuǎn)SAS盤兩塊。
  • 帶寬1Gb/s。
  • ulimit-n 1000000.
  • Socket Buffer至少64KB——適用于跨機(jī)房網(wǎng)絡(luò)傳輸。

偽分布式環(huán)境安裝

單節(jié)點偽分布式環(huán)境是指集群由一臺ZooKeeper服務(wù)器和一臺Kafka broker服務(wù)器組成,如圖3.1所示。


為了搭建圖3.1中的單節(jié)點Kafka集群,我們必須依次執(zhí)行以下操作。

  • 安裝Java。
  • 安裝ZooKeeper。
  • 安裝Apache Kafka。

安裝java

不論是ZooKeeper還是Kafka都需要提前安裝好Java并且正確配置好Java環(huán)境。鑒于Java 7自2015年8月便不再更新,筆者強(qiáng)烈建議Java虛擬機(jī)版本使用Java 8,并且使用比較成熟的Oracle公司的HotSpot虛擬機(jī)。為節(jié)省篇幅,本章將以Linux CentOS 6 64位作為安裝演示的操作系統(tǒng),其他Linux發(fā)行版的安裝方法是類似的。Windows和Mac OS平臺上的安裝步驟參考官網(wǎng)。
Java安裝可以使用java -version命令來進(jìn)行驗證。默認(rèn)情況下,該命令輸出顯示安裝過的Java版本。若沒有安裝Java,該命令會提示“無法找到Java虛擬機(jī)。另外筆者推薦使用Oracle版本的虛擬機(jī),而非OpenJDK版本的虛擬機(jī)。

安裝ZooKeeper

ZooKeeper是安裝Kafka集群必要的組件,并且Kafka大量地使用ZooKeeper來保存集群的元數(shù)據(jù)信息以及consumer位移信息(老版本)。雖然在偽分布式集群中直接使用Kafka自帶的ZooKeeper可能更方便,但其實單獨安裝一個外部的ZooKeeper服務(wù)器同樣很簡單。

安裝單節(jié)點Kafka集群

第一步我們下載Apache Kafka,官網(wǎng)地址是http://kafka.apache.org/downloads.html。截至筆者寫稿時,當(dāng)前最新的版本是1.0.0,因此我們需要下載的文件是kafka_2.11-1.0.0.tgz。筆者在這里選用由Scala 2.11編譯的版本,因為社區(qū)剛剛加入對Scala 2.12的支持且Scala 2.12不支持Java 7,某些用戶可能無法使用該版本編譯的Kafka。

文件下載完成之后執(zhí)行解壓縮操作并創(chuàng)建保存Kafka數(shù)據(jù)的文件目錄:

tar -zxvf kafka_2.11-1.0.0.tgz
mv kafka_2.11-1.0.0 kafka
mkdir -p /home/work/kafka/data-logs
cd kafka

之后打開config目錄下的server.properties文件,修改下列配置:

log.dirs=/home/work/kafka/data-logs

然后保存修改并退出。最后通過下列命令啟動Kafka broker:

bin/kafka-server-start.sh config/server.properties

如果你想要在后臺運行Kafka broker,只需要在啟動命令中加入-daemon:

bin/kafka-server-start.sh -daemon config/server.properties

bin/kafka-server-start.sh -daemon config/server.properties

看到以下輸出證明啟動成功:

INFO [Kafka Server 0], started(kafka.server.KafkaServer)

多節(jié)點環(huán)境安裝

單節(jié)點的Kafka偽集群應(yīng)付日常的應(yīng)用開發(fā)或是功能驗證綽綽有余,但若在生產(chǎn)環(huán)境中直接使用則無法充分利用Kafka提供的分布式特性,比如負(fù)載均衡和故障轉(zhuǎn)移等。此外,Kafka還具有優(yōu)秀的準(zhǔn)線性擴(kuò)容的能力,因此用戶可以很容易地擴(kuò)展Kafka節(jié)點數(shù)量以應(yīng)對不斷增長的消息處理需求。同時由于Kafka提供了完備的備份機(jī)制,多節(jié)點集群天然地為用戶提供高可用保障,極大地降低了人工維護(hù)的成本。

從本質(zhì)上來說,多節(jié)點Kafka集群由一套多節(jié)點ZooKeeper集群和一套多節(jié)點Kafka集群組成,如圖3.5所示。

  • 安裝多節(jié)點ZooKeeper集群。
  • 安裝多節(jié)點Kafka集群。

安裝多節(jié)點ZooKeeper集群

目前來說Kafka可以說是強(qiáng)依賴ZooKeeper的,因此生產(chǎn)環(huán)境中一個高可用、高可靠的ZooKeeper集群也是必不可少的。ZooKeeper集群通常被稱為一個ensemble。只要這個ensemble中的大多數(shù)節(jié)點存活,那么ZooKeeper集群就能正常提供服務(wù)。顯然,既然是大多數(shù),那么最好使用奇數(shù)個服務(wù)器,即2n+1個服務(wù)器,這樣整個ZooKeeper集群最多可以容忍n臺服務(wù)器宕機(jī)而保證依然提供服務(wù)。如果使用偶數(shù)個服務(wù)器則通常會浪費一臺服務(wù)器的資源。

下面舉一個例子來說明:假設(shè)我們使用5臺ZooKeeper服務(wù)器構(gòu)建集群,倘若2臺服務(wù)器宕機(jī),剩下的3臺服務(wù)器占了半數(shù)以上,故而ZooKeeper服務(wù)正常工作;但假如我們使用了4臺服務(wù)器,若2臺服務(wù)器宕機(jī),剩下的2臺服務(wù)器不滿足“半數(shù)以上服務(wù)器存活”的條件,因此此時ZooKeeper集群將停止服務(wù)——由此可見,雖然使用了4臺服務(wù)器,但我們依然只能容忍1臺服務(wù)器崩潰,這就是為什么ZooKeeper集群節(jié)點數(shù)量通常是奇數(shù)的原因。

基于上面的規(guī)則,一個生產(chǎn)環(huán)境中最少的ZooKeeper集群節(jié)點數(shù)量是3,這樣1個節(jié)點“掛掉”了不會影響整個集群的運作。在實際使用場景中,5臺服務(wù)器構(gòu)成的ZooKeeper集群也是十分常見的,而再多數(shù)量的集群則不常見。當(dāng)然具體數(shù)量的確定要依據(jù)用戶對高可靠性的需求。通常我們都需要在高可靠性與成本間取得適當(dāng)?shù)钠胶狻?/p>

在安裝ZooKeeper集群之前,我們假定Java已經(jīng)被正確地安裝和配置到了生產(chǎn)環(huán)境中,這里不再贅述。本例將安裝一個有3個節(jié)點的ZooKeeper集群,并假設(shè)3個節(jié)點機(jī)器的主機(jī)名分別是zk1、zk2和zk3。多節(jié)點模式中所用到的配置文件與我們在單節(jié)點ZooKeeper安裝相關(guān)章節(jié)中的配置文件大部分相同,只是有些微小的差別。下面的例子給出了一份典型的多節(jié)點環(huán)境配置文件zoo.cfg:

-dataDir=/usr/zookeeper/data_dir
clientPort=2181
initLimit=5
syncLimit=2
server.1=zk1:2888:3888
server.2=zk2:2888:3888
server.3=zk3:2888:3888
  • tickTime:ZooKeeper最小的時間單位,用于丈量心跳時間和超時時間等。通常設(shè)置成默認(rèn)值2秒即可。

  • dataDir:非常重要的參數(shù)!ZooKeeper會在內(nèi)存中保存系統(tǒng)快照,并定期寫入該路徑指定的文件夾中。生產(chǎn)環(huán)境中需要注意該文件夾的磁盤占用情況。

  • clientPort:ZooKeeper監(jiān)聽客戶端連接的端口,一般設(shè)置成默認(rèn)值2181即可。

  • initLimit:指定follower節(jié)點初始時連接leader節(jié)點的最大tick次數(shù)。假設(shè)是5,表示follower必須要在5×tickTime時間內(nèi)(默認(rèn)是10秒)連接上leader,否則將被視為超

  • syncLimit:設(shè)定了follower節(jié)點與leader節(jié)點進(jìn)行同步的最大時間。與initLimit類似,它也是以tickTime為單位進(jìn)行指定的。

  • server.X=host:port:port:配置文件中的最后3行都是這種形式的。這里的X必須是一個全局唯一的數(shù)字,且需要與myid文件中的數(shù)字相對應(yīng)(關(guān)于myid文件的設(shè)置稍后會做詳細(xì)討論)。一般設(shè)置X值為1~255之間的整數(shù)。這行的后面還配置了兩個端口,通常是2888和3888。第一個端口用于使follower節(jié)點連接leader節(jié)點,而第二個端口則用于leader選舉。

設(shè)置好配置文件,下面就該創(chuàng)建上面提到的myid文件。眾所周知,每個ZooKeeper服務(wù)器都有一個唯一的ID。這個ID主要用在兩個地方:一個就是剛剛我們配置的zoo.cfg文件,另一個則是myid文件。myid文件位于zoo.cfg中dataDir配置的目錄下,其內(nèi)容也很簡單,僅是一個數(shù)字,即ID。

首先,在ZooKeeper的conf目錄下創(chuàng)建3個配置文件zoo1.cfg、zoo2.cfg和zoo3.cfg。

創(chuàng)建好ZooKeeper配置文件,下一步就是創(chuàng)建myid文件了。myid文件必須位于配置文件中的dataDir中,即/mnt/disk/huxitest/data_logs/zookeeper1,2,3下,具體命令如下:

echo "1" > /mnt/disk/huxitest/data_logs/zookeeper1/myid
echo "2" > /mnt/disk/huxitest/data_logs/zookeeper2/myid
echo "3" > /mnt/disk/huxitest/data_logs/zookeeper3/myid

下一步就是啟動3個控制臺終端分別在ZooKeeper的安裝目錄下執(zhí)行以下命令啟動ZooKeeper服務(wù)器:

echo "1" > /mnt/disk/huxitest/data_logs/zookeeper1/myid
echo "2" > /mnt/disk/huxitest/data_logs/zookeeper2/myid
echo "3" > /mnt/disk/huxitest/data_logs/zookeeper3/myid
如果是多節(jié)點安裝方案,既可以使用上面的命令啟動ZooKeeper,也可以直接運行zkServer腳本啟動ZooKeeper,比如:

bin/zkServer.sh(bat) start conf/zoo.cfg

當(dāng)所有ZooKeeper服務(wù)器啟動成功后,我們還需要檢查一下整個集群的狀態(tài),分別執(zhí)行以下命令:

cd /mnt/disk/huxitest/zookeeper
bin/zkServer.sh status conf/zoo1.cfg
bin/zkServer.sh status conf/zoo2.cfg
bin/zkServer.sh status conf/zoo3.cfg
~~

另外還可以執(zhí)行JDK的jps命令來查看ZooKeeper進(jìn)程以查看集群狀態(tài)。

ZooKeeper的主進(jìn)程名是QuorumPeerMain,圖3.10的jps命令輸出也可以證明已經(jīng)成功地啟動了3個ZooKeeper進(jìn)程。值得一提的是,如果是在多臺機(jī)器上搭建ZooKeeper集群,那么每臺機(jī)器上都至少應(yīng)該有一個這樣的進(jìn)程被啟動。

安裝多節(jié)點Kafka

搭建Kafka集群的第一步是要創(chuàng)建多份配置文件


在上面3個配置文件中我們需要每臺Kafka服務(wù)器指定不同的broker ID。該ID在整個集群中必須是唯一的。而配置listeners時最好使用節(jié)點的FQDN(Fully Qualified Domain Name),即全稱域名,盡量不要使用IP地址。另外配置文件中還有一個特別重要的參數(shù),即zookeeper.connect。鑒于我們之前已經(jīng)搭建了一個3節(jié)點ZooKeeper集群,此時配置zookeeper.connect必須同時指定所有的ZooKeeper節(jié)點。注意本例中使用的端口分別是8001、8002和8003。

創(chuàng)建Kafka配置文件之后,剩下的工作就很簡單了,只需要執(zhí)行下列命令啟動Kafka broker服務(wù)器:

bin/kafka-server-start.sh -daemon config/server1.properties

查看位于Kafka logs目錄下的server.log,確認(rèn)Kafka broker已經(jīng)啟動成功,如圖3.14所示。

另外,也可以使用jps命令來確認(rèn)broker進(jìn)程啟動成功,如圖3.15所示。

至此,一個3節(jié)點的Kafka分布式集群就搭建成功了。如果需要為集群增加更多的Kafka broker節(jié)點,只需要配置一份類似的配置文件,然后利用該文件直接運行啟動命令即可。應(yīng)該說,Kafka對于集群擴(kuò)展操作是很友好的,不需要用戶承擔(dān)太多的維護(hù)和管理成本。

驗證部署

成功搭建起多節(jié)點的Kafka集群還不夠,我們還需要驗證線上環(huán)境是沒有錯誤且可以使用的。下面將從以下幾個方面分別來驗證Kafka集群部署-的正確性。

  • 測試topic創(chuàng)建與刪除。
  • 測試消息的生產(chǎn)與發(fā)送。
  • 生產(chǎn)者吞吐量測試。
  • 消費者吞吐量測試。

測試topic創(chuàng)建與刪除

topic能夠正確地創(chuàng)建與刪除才能說明Kafka集群在正常工作,因為通常它都表明了Kafka的控制器(controller)已經(jīng)被成功地選舉出來并開始履行自身的職責(zé)。很多用戶喜歡在集群搭建后就立即開始運行producer和consumer,甚至完全依靠Kafka的自動topic創(chuàng)建功能,而不去手動創(chuàng)建并做驗證。筆者其實并不推薦這種做法,以筆者多年使用Kafka的經(jīng)驗,建議讀者在開始使用Kafka集群前最好提前把所需的topic創(chuàng)建出來,并執(zhí)行對應(yīng)的命令做驗證。這樣既可以測試整個集群的運行狀況,也保證了producer和consumer運行時不會因為topic分區(qū)leader的各種問題導(dǎo)致短暫停頓現(xiàn)象。

接下來,我們首先創(chuàng)建一個測試topic,名為test-topic。為了充分利用搭建的3臺服務(wù)器,我們創(chuàng)建3個分區(qū),每個分區(qū)都分配3個副本,執(zhí)行如下命令:

bin/kafka-topics.sh  --zookeeper zk1:8001,zk2:8002,zk3:8003 --create  --topic test-topic --partitions 3 --replication-factor 3
Created topic "test-topic".

上面的輸出表明topic已經(jīng)被成功創(chuàng)建,但我們還需要運行一些命令來做詳細(xì)驗證,如下:

bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 –list

test-topic
bin/kafka-topics.sh --zookeeper zk1:8001,zk2:8002,zk3:8003 --describe --topic test-topic

topic詳細(xì)分區(qū)信息如圖3.16所示。


顯而易見,該topic下有3個分區(qū),每個分區(qū)有3個副本。每個分區(qū)的leader分別是0、1、2,表明Kafka將該topic的這3個分區(qū)均勻地在3臺broker上進(jìn)行了分配。

下面來測試刪除topic,執(zhí)行如下命令:

>bin/kafka-topics.sh  --zookeeper zk1:8001,zk2:8002,zk3:8003 --delete  --topic test-topic
Topic test-topic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

上面的輸出僅僅表示該topic被成功地標(biāo)記為“待刪除”,至于topic是否會被真正刪除取決于broker端參數(shù)delete.topic.enable。該參數(shù)在當(dāng)前Kafka 1.0.0版本中被默認(rèn)設(shè)置為true,即表明Kafka默認(rèn)允許刪除topic。事實上,該參數(shù)在舊版本中默認(rèn)值一直是false,故若用戶顯式設(shè)置該參數(shù)為false,或使用了1.0.0之前版本的默認(rèn)值,那么即使運行了上面的命令,Kafka也不會刪除該topic。

由于在之前搭建Kafka集群時我們配置了該參數(shù)為true,因此Kafka會將所有與該topic相關(guān)的數(shù)據(jù)全部刪除,不過這是一個異步過程,具體的刪除操作對命令執(zhí)行者來說是完全透明的。我們可以查詢底層的文件系統(tǒng)來驗證topic分區(qū)的日志目錄是否被刪除

另外,也可以再次執(zhí)行kafka-topics腳本來列出當(dāng)前的topic列表,如果test-topic不在該列表中,則表明該topic被刪除成功,

有些時候,用戶可能發(fā)現(xiàn)topic沒有被成功刪除,這可能是topic分區(qū)數(shù)過多或數(shù)據(jù)過多的原因。Kafka當(dāng)前只能一個分區(qū)一個分區(qū)地刪除數(shù)據(jù),無法做到同時刪除,因此用戶可以查詢底層的文件系統(tǒng)來判斷刪除操作是否在正常執(zhí)行。如果發(fā)現(xiàn)刪除操作停滯了,這通常表明該Kafka集群有問題,此時便需要進(jìn)一步地詳查才能確定無法刪除的真正原因,比如是否有分區(qū)正處于被分配過程中等。

測試消息發(fā)送與消費

成功測試了topic之后接下來我們測試一下Kafka集群是否可以正常地發(fā)送消息和讀取消息。為了實現(xiàn)這一目的,我們需要用到Kafka默認(rèn)提供的kafka-console-producer和kafka-console-consumer腳本。這對腳本可以很方便地用來測試消息的發(fā)送和讀取。發(fā)送消息時,用戶從鍵盤輸入消息,按回車鍵后即表示發(fā)送該條消息。下面我們打開兩個終端,一個用于發(fā)送消息,另一個用于接收消息,如圖3.17和圖3.18所示。

生產(chǎn)者吞吐量測試

除了基本的console-producer和console-consumer腳本可以用于測試簡單的消息發(fā)送與接收,Kafka還提供了性能吞吐量測試腳本,它們分別是kafka-producer-perf-test腳本和kafka-consumer-perf-test腳本。

kafka-producer-perf-test腳本是Kafka提供的用于測試producer性能的腳本,該腳本可以很方便地計算出producer在一段時間內(nèi)的吞吐量和平均延時,具體使用說明如圖3.20所示。

圖3.20中該腳本的輸出結(jié)果表明在這臺測試機(jī)上運行一個Kafka producer的平均吞吐量是8MB/s,即占用64Mb/s左右的帶寬,平均每秒能發(fā)送41963條消息,平均延時是2.36秒,最大延時是3.51秒,平均有50%的消息發(fā)送需要花費2.79秒,95%的消息發(fā)送需要花費3.14秒,99%的消息發(fā)送需要花費3.36秒,而99.9%的消息發(fā)送需要花費3.5秒。本例測試出來的producer占用的帶寬是64Mb/s(8 × 8.0.0 MB/s),和千兆位網(wǎng)卡相比,這點帶寬遠(yuǎn)遠(yuǎn)沒有達(dá)到網(wǎng)卡的上限,因此這也說明producer還有很大的優(yōu)化空間。

在kafka-producer-perf-test腳本的實際使用過程中,最好讓該腳本長時間穩(wěn)定地運行一段時間,這樣測試出來的結(jié)果才能準(zhǔn)確。畢竟腳本運行之初會執(zhí)行很多初始化工作,運行之處的吞吐量不能真實反映系統(tǒng)的實際情況。

消費者吞吐量測試

和kafka-producer-perf-test腳本類似,Kafka為consumer也提供了方便、便捷的性能測試腳本,即kafka-consumer-perf-test腳本。我們首先用它在剛剛搭建的Kafka集群環(huán)境中測試一下新版本consumer的吞吐量,如圖3.21所示。

圖3.21的例子中我們測試消費50萬條消息的consumer的吞吐量。結(jié)果表明,在該環(huán)境中consumer在1秒多的時間內(nèi)總共消費了95MB的消息,因此吞吐量大約是92MB/s,即736Mb/s。這樣我們就對本機(jī)consumer的性能有了一個大致的了解,為后續(xù)評估整體consumer性能目標(biāo)提供了一個很有力的基礎(chǔ)。

圖3.21測試的是新版本consumer的吞吐量,下面依然使用該腳本測試一下舊版本consumer,如圖3.22所示。

圖3.22中運行consumer指定的是ZooKeeper的連接信息,說明我們使用的是舊版本consumer。結(jié)果表明本次測試共消費190MB左右的消息,吞吐量大約是147MB/

參數(shù)設(shè)置

broker端參數(shù)

Kafka broker端提供了很多參數(shù)用于調(diào)優(yōu)系統(tǒng)的各個方面,有一些參數(shù)是所有Kafka環(huán)境都需要考慮和配置的,不論是單機(jī)環(huán)境還是分布式環(huán)境。這些參數(shù)都是Kafka broker的基礎(chǔ)配置,一定要明確它們的含義。

broker端參數(shù)需要在Kafka目錄下的config/server.properties文件中進(jìn)行設(shè)置。當(dāng)前對于絕大多數(shù)的broker端參數(shù)而言,Kafka尚不支持動態(tài)修改——這就是說,如果要新增、修改,抑或是刪除某些broker參數(shù)的話,需要重啟對應(yīng)的broker服務(wù)器。

  • broker.id——Kafka使用唯一的一個整數(shù)來標(biāo)識每個broker,這就是broker.id。該參數(shù)默認(rèn)是-1。如果不指定,Kafka會自動生成一個唯一值??傊还苡脩糁付ㄊ裁炊急仨毐WC該值在Kafka集群中是唯一的,不能與其他broker沖突。在實際使用中,推薦使用從0開始的數(shù)字序列,如0、1、2……
  • log.dirs——非常重要的參數(shù)!該參數(shù)指定了Kafka持久化消息的目錄。若待保存的消息數(shù)量非常多,那么最好確保該文件夾下有充足的磁盤空間。該參數(shù)可以設(shè)置多個目錄,以逗號分隔,比如/home/kafka1,/home/kafka2。在實際使用過程中,指定多個目錄的做法通常是被推薦的,因為這樣Kafka可以把負(fù)載均勻地分配到多個目錄下。若用戶機(jī)器上有N塊物理硬盤(并且假設(shè)這臺機(jī)器完全給Kafka使用),那么設(shè)置N個目錄(須掛載在不同磁盤上的目錄)是一個很好的選擇。N個磁頭可以同時執(zhí)行寫操作,極大地提升了吞吐量。注意,這里的“均勻”是根據(jù)目錄下的分區(qū)數(shù)進(jìn)行比較的,而不是根據(jù)實際的磁盤空間。值得一提的是,若不設(shè)置該參數(shù),Kafka默認(rèn)使用/tmp/kafka-logs作為消息保存的目錄。把消息保存在/tmp目錄下,在實際的生產(chǎn)環(huán)境中是極其不可取的。
  • zookeeper.connect——同樣是非常重要的參數(shù)。如果說前兩個參數(shù)還有默認(rèn)值可以使用的話(雖然極其不推薦將默認(rèn)值應(yīng)用到線上環(huán)境),那么此參數(shù)則完全沒有默認(rèn)值,是必須要設(shè)置的。該參數(shù)也可以是一個CSV(comma-separated values)列表,比如在前面的例子中設(shè)置的那樣:zk1:2181,zk2:2181,zk3:2181。如果要使用一套ZooKeeper環(huán)境管理多套Kafka集群,那么設(shè)置該參數(shù)的時候就必須指定ZooKeeper的chroot,比如zk1:2181,zk2:2181,zk3:2181/kafka_cluster1。結(jié)尾的/kafka_cluster1就是chroot,它是可選的配置,如果不指定則默認(rèn)使用ZooKeeper的根路徑。在實際使用過程中,配置chroot可以起到很好的隔離效果。這樣管理Kafka集群將變得更加容易。

  • listeners——broker監(jiān)聽器的CSV列表,格式是[協(xié)議]://[主機(jī)名]:[端口],[[協(xié)議]]://[主機(jī)名]:[端口]]。該參數(shù)主要用于客戶端連接broker使用,可以認(rèn)為是broker端開放給clients的監(jiān)聽端口。如果不指定主機(jī)名,則表示綁定默認(rèn)網(wǎng)卡;如果主機(jī)名是0.0.0.0,則表示綁定所有網(wǎng)卡。Kafka當(dāng)前支持的協(xié)議類型包括PLAINTEXT、SSL及SASL_SSL等。對于新版本的Kafka,筆者推薦只設(shè)置listeners一個參數(shù)就夠了,對于已經(jīng)過時的兩個參數(shù)host.name和port,就不用再配置了。對于未啟用安全的Kafka集群,使用PLAINTEXT協(xié)議足矣。如果啟用了安全認(rèn)證,可以考慮使用SSL或SASL_SSL協(xié)議。

  • advertised.listeners——和listeners類似,該參數(shù)也是用于發(fā)布給clients的監(jiān)聽器,不過該參數(shù)主要用于IaaS環(huán)境,比如云上的機(jī)器通常都配有多塊網(wǎng)卡(私網(wǎng)網(wǎng)卡和公網(wǎng)網(wǎng)卡)。對于這種機(jī)器,用戶可以設(shè)置該參數(shù)綁定公網(wǎng)IP供外部clients使用,然后配置上面的listeners來綁定私網(wǎng)IP供broker間通信使用。當(dāng)然不設(shè)置該參數(shù)也是可以的,只是云上的機(jī)器很容易出現(xiàn)clients無法獲取數(shù)據(jù)的問題,原因就是listeners綁定的是默認(rèn)網(wǎng)卡,而默認(rèn)網(wǎng)卡通常都是綁定私網(wǎng)IP的。在實際使用場景中,對于配有多塊網(wǎng)卡的機(jī)器而言,這個參數(shù)通常都是需要配置的。

  • unclean.leader.election.enable——是否開啟unclean leader選舉。何為unclean leader選舉?在第1章中我們提到了ISR的概念。ISR中的所有副本都有資格隨時成為新的leader,但若ISR變空而此時leader又宕機(jī)了,Kafka應(yīng)該如何選舉新的leader呢?為了不影響Kafka服務(wù),該參數(shù)默認(rèn)值是false,即表明如果發(fā)生這種情況,Kafka不允許從剩下存活的非ISR副本中選擇一個當(dāng)leader。因為如果允許,這樣做固然可以讓Kafka繼續(xù)提供服務(wù)給clients,但會造成消息數(shù)據(jù)的丟失,而在一般的用戶使用場景中,數(shù)據(jù)不丟失是基本的業(yè)務(wù)需求,因此設(shè)置此參數(shù)為false顯得很有必要。事實上,Kafka社區(qū)在1.0.0版本才正式將該參數(shù)默認(rèn)值調(diào)整為false,這表明社區(qū)在高可用性與數(shù)據(jù)完整性之間選擇了后者。

  • delete.topic.enable——是否允許Kafka刪除topic。默認(rèn)情況下,Kafka集群允許用戶刪除topic及其數(shù)據(jù)。這樣當(dāng)用戶發(fā)起刪除topic操作時,broker端會執(zhí)行topic刪除邏輯。在實際生產(chǎn)環(huán)境中我們發(fā)現(xiàn)允許Kafka刪除topic其實是一個很方便的功能,再加上自Kafka 0.9.0.0新增的ACL權(quán)限特性,以往對于誤操作和惡意操作的擔(dān)心完全消失了,因此設(shè)置該參數(shù)為true是推薦的做法。

  • log.retention.{hours|minutes|ms}——這組參數(shù)控制了消息數(shù)據(jù)的留存時間,它們是“三兄弟”。如果同時設(shè)置,優(yōu)先選取ms的設(shè)置,minutes次之,hours最后。有了這3個參數(shù),用戶可以很方便地在3個時間維度上設(shè)置日志的留存時間。默認(rèn)的留存時間是7天,即Kafka只會保存最近7天的數(shù)據(jù),并自動刪除7天前的數(shù)據(jù)。當(dāng)前較新版本的Kafka會根據(jù)消息的時間戳信息進(jìn)行留存與否的判斷。對于沒有時間戳的老版本消息格式,Kafka會根據(jù)日志文件的最近修改時間(last modified time)進(jìn)行判斷??梢赃@樣說,這組參數(shù)定義的是時間維度上的留存策略。實際線上環(huán)境中,需要根據(jù)用戶的業(yè)務(wù)需求進(jìn)行設(shè)置。保存消息很長時間的業(yè)務(wù)通常都需要設(shè)置一個較大的值。

  • log.retention.bytes——如果說上面那組參數(shù)定義了時間維度上的留存策略,那么這個參數(shù)便定義了空間維度上的留存策略,即它控制著Kafka集群需要為每個消息日志保存多大的數(shù)據(jù)。對于大小超過該參數(shù)的分區(qū)日志而言,Kafka會自動清理該分區(qū)的過期日志段文件。該參數(shù)默認(rèn)值是-1,表示Kafka永遠(yuǎn)不會根據(jù)消息日志文件總大小來刪除日志。和上面的參數(shù)一樣,生產(chǎn)環(huán)境中需要根據(jù)實際業(yè)務(wù)場景設(shè)置該參數(shù)的值。

  • min.insync.replicas——該參數(shù)其實是與producer端的acks參數(shù)配合使用的。關(guān)于acks含義的介紹,我們留到第4章中詳細(xì)展開。這里只需要了解acks=-1表示producer端尋求最高等級的持久化保證,而min.insync.replicas也只有在acks=-1時才有意義。它指定了broker端必須成功響應(yīng)clients消息發(fā)送的最少副本數(shù)。假如broker端無法滿足該條件,則clients的消息發(fā)送并不會被視為成功。它與acks配合使用可以令Kafka集群達(dá)成最高等級的消息持久化。在實際使用中如果用戶非常在意被發(fā)送的消息是否真的成功寫入了所有副本,那么推薦將參數(shù)設(shè)置為副本數(shù)-1。舉一個例子,假設(shè)某個topic的每個分區(qū)的副本數(shù)是3,那么推薦設(shè)置該參數(shù)為2,這樣我們就能夠容忍一臺broker宕機(jī)而不影響服務(wù);若設(shè)置參數(shù)為3,那么只要任何一臺broker宕機(jī),整個Kafka集群將無法繼續(xù)提供服務(wù)。因此用戶需要在高可用和數(shù)據(jù)一致性之間取得平衡。

  • num.network.threads——一個非常重要的參數(shù)。它控制了一個broker在后臺用于處理網(wǎng)絡(luò)請求的線程數(shù),默認(rèn)是3。通常情況下,broker啟動時會創(chuàng)建多個線程處理來自其他broker和clients發(fā)送過來的各種請求。注意,這里的“處理”其實只是負(fù)責(zé)轉(zhuǎn)發(fā)請求,它會將接收到的請求轉(zhuǎn)發(fā)到后面的處理線程中。在真實的環(huán)境中,用戶需要不斷地監(jiān)控NetworkProcessorAvgIdlePercent JMX指標(biāo)。如果該指標(biāo)持續(xù)低于0.3,筆者建議適當(dāng)增加該參數(shù)的值。在第8章中我們將會詳細(xì)探討各種JMX監(jiān)控。

  • num.io.threads——這個參數(shù)就是控制broker端實際處理網(wǎng)絡(luò)請求的線程數(shù),默認(rèn)值是8,即Kafka broker默認(rèn)創(chuàng)建8個線程以輪詢方式不停地監(jiān)聽轉(zhuǎn)發(fā)過來的網(wǎng)絡(luò)請求并進(jìn)行實時處理。Kafka同樣也為請求處理提供了一個JMX監(jiān)控指標(biāo)Request HandlerAvgIdlePercent。如果發(fā)現(xiàn)該指標(biāo)持續(xù)低于0.3,則可以考慮適當(dāng)增加該參數(shù)的值。

  • message.max.bytes——Kafka broker能夠接收的最大消息大小,默認(rèn)是977KB,還不到1MB,可見是非常小的。在實際使用場景中,突破1MB大小的消息十分常見,因此用戶有必要綜合考慮Kafka集群可能處理的最大消息尺寸并設(shè)置該參數(shù)值。

上面這些參數(shù)是筆者認(rèn)為最重要的broker端參數(shù),但不可否認(rèn)的是,Kafka broker端的參數(shù)遠(yuǎn)遠(yuǎn)不止這些。筆者粗略數(shù)了一下,以Kafka 1.0.0版本為例,broker端的參數(shù)有167個之多。顯然我們不可能涵蓋所有的參數(shù),其他的參數(shù)含義以及使用方法詳見Kafka官網(wǎng)https://kafka.apache.org/documentation/#brokerconfigs。

topic級別參數(shù)

除broker端參數(shù)之外,Kafka還提供了一些topic級別的參數(shù)供用戶使用。所謂的topic級別,是指覆蓋broker端全局參數(shù)。每個不同的topic都可以設(shè)置自己的參數(shù)值。舉一個例子來說,上面提到的日志留存時間。顯然,在實際使用中,在全局設(shè)置一個通用的留存時間并不方便,因為每個業(yè)務(wù)的topic可能有不同的留存策略。如果只能設(shè)置全局參數(shù),那么勢必要取所有業(yè)務(wù)留存時間的最大值作為全局參數(shù)值,這樣必然會造成空間的浪費。因此Kafka提供了很多topic級別的參數(shù),常見的包括如下幾個。

  • delete.retention.ms——每個topic可以設(shè)置自己的日志留存時間以覆蓋全局默認(rèn)值。

  • max.message.bytes——覆蓋全局的message.max.bytes,即為每個topic指定不同的最大消息尺寸。

  • retention.bytes——覆蓋全局的log.retention.bytes,每個topic設(shè)不同的日志留存尺寸。

GC參數(shù)

kafka broker端代碼雖然使用scala語言編寫,但終歸要編譯為.class文件在JVM 上運行。既然是JVM上運行,垃圾回收參數(shù)設(shè)置就顯得非常重要

對于使用java7的用戶 ,那么在選擇GC收集器時可以根據(jù)以下法則進(jìn)行確認(rèn)

  • 如果用戶機(jī)器上的CPU資源非常充裕,那么推薦使用CMS收集器。這樣可以充分利用多CPU執(zhí)行并發(fā)垃圾收集。啟用方法 -XX:+UseCurrentMarkSweepGC.

  • 相反地,則使用吞吐量收集器,及所謂的throughput collector。這樣不會擠占緊張的CPU資源,使kafka broker達(dá)到最大的吞吐量。啟用方法-XX:UseParallel GC

若用戶使用java8——這是推薦的版本。實際上如果用戶在kafka官網(wǎng)上下載使用scala 2.22編譯的kafka二進(jìn)制壓縮包。那么就必須安裝并使用java8-推薦使用G1垃圾收集器。根據(jù)筆者的實際使用經(jīng)驗,在沒有任何調(diào)優(yōu)的情況下,G1收集器本身回比CMS表現(xiàn)出更好的性能,主要體現(xiàn)在Full GC的次數(shù)更少、需要微調(diào)的參數(shù)更少等方面。因此推薦用戶始終使用G1收集器,不論實在broker端還是在clients端

除此之外,我們還需要打開GC日志的監(jiān)控,并實時確保不會出現(xiàn)“G1HR #StartFullGC”.
只有uG1的其他擦?xí)梢愿鶕?jù)使用實際情況酌情做微小調(diào)整。

JVM參數(shù)

kafka推薦用戶使用 oracle JKD版本使java8.另外由于kafka broker主要使用的是堆外內(nèi)存,即大量使用操作系統(tǒng)的頁緩存,因此其實并不需要為JVM分配太多的內(nèi)存。在實際中,通常為broker設(shè)置不超過6GB的堆空間。一下就是一份典型的生產(chǎn)環(huán)境中的JVM參數(shù)列表:
-Xmx6g -Xmx6g -XX:metaspaceSize=96m -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinmetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80

OS參數(shù)

kafka雖然支持很多平臺,但到目前為止被廣泛使用并已被證明表現(xiàn)良好的平臺依然是linux平臺。

通常情況下,kafka并不需要太多的os級別的參數(shù)調(diào)優(yōu),但依然有一些OS參數(shù)是必須調(diào)整的。

  • 文件描述符限制:kafka會頻繁的創(chuàng)建并修改文件系統(tǒng)的中文件,這包括消息的日志文件、索引文件及各種元數(shù)據(jù)管理文件等。因此如果一個broker上面有很多topic的分區(qū),那么這個broker勢必就需要打開很多個文件——大致數(shù)量約等于分區(qū)數(shù) × (分區(qū)總大小、日志段大?。?3.舉一個例子,假設(shè)broker上保存了50個分區(qū),每個分區(qū)平均尺寸是10GB,每個日志段大小是1GB,那么這個broker需要維護(hù)1500個左右的文件描述符。因此在實際使用場景中最好首先增大進(jìn)程能夠打開的最大文件描述符上限,比如設(shè)置一個很大的值,如100000.具體設(shè)置方法為ulimit -n 100000.

  • Socket緩沖區(qū)大小:這里指的是OS級別的socker緩沖區(qū)大小,而非kafka自己提供的socker緩沖區(qū)參數(shù)。事實上。kafka自己的參數(shù)將其設(shè)置為64KB,這對于普通的內(nèi)網(wǎng)環(huán)境而言通常是足夠的,因為內(nèi)網(wǎng)環(huán)境下往返時間(RTT)一般都很低,不會產(chǎn)生過多的數(shù)據(jù)堆積在socket緩沖區(qū)中,但對于那些跨地區(qū)的數(shù)據(jù)傳輸而言,僅僅增加kafka參數(shù)就不夠了,因為前者也受限于OS級別的設(shè)置。因此如果是做遠(yuǎn)距離的數(shù)據(jù)傳輸,那么建議將OS級別的socket緩沖區(qū)調(diào)大,比如增加到128KB,甚至更大。

  • 最好使用Ext4 或XFS文件系統(tǒng):其實kafka操作的都是普通文件,并沒有依賴于特定的文件系統(tǒng),但依然推薦使用Ext4或XFS文件系統(tǒng),特別是XFS通常有著更好的性能。這種性能的提升主要影響的是kafka的寫入功能。根據(jù)官網(wǎng)的測試報告,使用XFS的寫入時間大約是160ms,而使用Ext4大約是250ms。因此生產(chǎn)環(huán)境中最好使用XFS文件系統(tǒng)。

  • 關(guān)閉swap:其實這是很多使用磁盤的應(yīng)用程序的常規(guī)調(diào)優(yōu)手段。具體命令為sysctl vm.swappiness = <一個較小的數(shù)>,即大幅度江都swap空間的使用,以免極大地拉低性能,后面章節(jié)會詳細(xì)討論為何不顯示設(shè)置該值為0

  • 設(shè)置更長的flush時間: 我們知道kafka依賴OS頁緩存的“刷盤”功能實現(xiàn)消息真正的寫入物理磁盤,默認(rèn)刷盤間隔是5秒。通常情況下,這個間隔時間太短了,適當(dāng)增加該值可以在很大程度上提升OS物理寫入操作的性能,LinkedIn公司自己將該值設(shè)置為2分鐘以增加整體的物理寫入吞吐量。

總結(jié)

本章著重講述了如何搭建kafka生產(chǎn)環(huán)境以及設(shè)置kafka集群需要考慮的各方面參數(shù),并且針對如何評估kafka線上集群分別從各個方面進(jìn)行了探討。

下一章節(jié):producer開發(fā)

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容