0-前言
Kafka是一種高吞吐量的分布式消息發(fā)布訂閱系統(tǒng),由Scala語言編寫,其分布式特性基于Zookeeper實現(xiàn)。需要指出的是Kafka雖然也是類似于消息隊列發(fā)布訂閱的使用模式,但其并不完全遵循JMS協(xié)議。
1-集群原理

上圖展示了Kafka中各個元素及互相之間的關(guān)聯(lián)關(guān)系,下面就幾個重點部分進行說明:
-
Topic和Partition
Topic用于區(qū)分不同的消息組,每一條消息都必須對應(yīng)一個Topic。在Kafka中一個Topic又可以被分為若干個Partition,每個Partition還可以有一個或多個副本構(gòu)成leader/follower關(guān)系,這些Partition分布在不同的Broker節(jié)點之上。需要注意的是,Producer 和 Consumer 只與 leader Partition進行交互,該leader宕機后會由Controller重新進行選舉。
Partition的狀態(tài)有四種:
- NonExistentPartition -- 表示該分區(qū)要么沒有被創(chuàng)建過或曾經(jīng)被創(chuàng)建過但后面被刪除了
- NewPartition -- 分區(qū)創(chuàng)建之后就處于NewPartition狀態(tài)。在這個狀態(tài)中,分區(qū)已經(jīng)分配了副本,但是還沒有選舉出leader和ISR
- OnlinePartition -- 一旦分區(qū)的leader被推選出來,它就處于OnlinePartition狀態(tài)
- OfflinePartition -- 如果leader選舉出來后,leader宕機了,那么該分區(qū)就處于OfflinePartition狀態(tài)
-
Consumer
每一個Consumer都屬于一個Consumer Group,同一個Group下都會擁有相同的GroupId。Kafka中消息的消費是以Consumer Group為單位的,簡單來說就是一條消息會被每一個Group各消費一次,而同一個Group中只會有一個Consumer消費該消息。由于一個Partition只能被每個Group中的一個Consumer消費(一個Consumer可以消費多個Partition),因此每個Group中Consumer的數(shù)量不應(yīng)多于Partition的數(shù)量。 -
Controller
Kafka集群中的其中一個Broker會被選舉為Controller,主要負(fù)責(zé)Partition管理和副本狀態(tài)管理,也會執(zhí)行類似于重分配Partition之類的管理任務(wù)。如果當(dāng)前的Controller宕機,會從其他正常的Broker中重新選舉Controller。 -
ISR
leader Partition會在Zookeeper中維護一個與其基本保持同步的Replica列表,該列表稱為ISR(in-sync Replica),每個Partition都會有一個ISR,由leader動態(tài)維護。可以簡單認(rèn)為ISR是一個記錄follower的列表,如果一個follower比leader落后太多,或者超過一定時間未發(fā)起數(shù)據(jù)復(fù)制請求,則leader會將其從ISR中移除。當(dāng)ISR中所有Replica都向leader發(fā)送ACK時,leader才會進行commit操作。
2-集群搭建
Kafka部署需要準(zhǔn)備以下環(huán)境:
- java環(huán)境
- Gradle環(huán)境
- Zookeeper
操作系統(tǒng)為Centos6.9,java采用jdk1.8,Zookeeper版本3.6.4,由于這些都是已安裝的所以在此不做安裝說明,只說一下Gradle的安裝
2.1-Gradle安裝
我安裝的是4.10.2版本,首先對安裝包進行解壓
unzip -d /opt/gradle gradle-4.10.2-bin.zip
打開/etc/profile配置環(huán)境變量path
export PATH=$PATH:/opt/gradle/gradle-4.10.2/bin
通過以下命令檢查是否配置成功
gradle -v
看到如下信息說明已成功
------------------------------------------------------------
Gradle 4.10.2
------------------------------------------------------------
Build time: 2018-09-19 18:10:15 UTC
Revision: b4d8d5d170bb4ba516e88d7fe5647e2323d791dd
Kotlin DSL: 1.0-rc-6
Kotlin: 1.2.61
Groovy: 2.4.15
Ant: Apache Ant(TM) version 1.9.11 compiled on March 23 2018
JVM: 1.8.0_181 (Oracle Corporation 25.181-b13)
OS: Linux 2.6.32-696.13.2.el6.x86_64 amd64
2.2-Kafka安裝
Kafka集群使用了三臺服務(wù)器,每臺上單獨部署一個Kafka節(jié)點,Kafka版本2.0.0
首先進行解壓
tar -xzf kafka_2.11-2.0.0.tgz
進入Kafka目錄
cd kafka_2.11-2.0.0
修改配置文件
vi config/server.properties
其中有幾個重要參數(shù)如下
broker.id=0
advertised.listeners=PLAINTEXT://ip:9092
zookeeper.connect=ip1:2181,ip2:2181,ip3:2181
log.dirs=/tmp/kafka-logs
log.retention.hours=168
num.partitions=3
broker.id可以隨便設(shè)置,但必須保證同一集群下的節(jié)點不重復(fù);zookeeper.connect是zookeeper的地址,需要按實際填寫;advertised.listeners是對外展示的地址和端口(端口默認(rèn)使用9092),在后面配置管理頁面的時候有用;log.dirs和log.retention.hours表示日志的存放路徑和保存時間;num.partitions是Topic下默認(rèn)分配的Partition數(shù)量。
接著修改一下Kafka的啟動配置
vi bin/kafka-server-start.sh
文件中增加下述內(nèi)容
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G"
export JMX_PORT="9999"
fi
這里設(shè)置了堆的大小以及JMX監(jiān)控端口,方便后續(xù)使用監(jiān)控頁面。
bin/kafka-server-start.sh -daemon config/server.properties
以上設(shè)置都完成后啟動Kafka
2.3-監(jiān)控頁面安裝
Kafka官方?jīng)]有提供可視化的監(jiān)控頁面,但有不少第三方的開源產(chǎn)品可供選擇,比較主流的有以下三款
- KafkaOffsetMonitor
- Kafka web console
- Kafka Manager
最終我選擇的是yahoo開源的Kafka Manager,項目地址https://github.com/yahoo/kafka-manager。
首先下載源碼包kafka-manager-1.3.3.18.tar.gz,這里安裝的是最新的1.3.3.18版
tar -zxf kafka-manager-1.3.3.18.tar.gz
cd kafka-manager-1.3.3.18
./sbt clean dist
依次執(zhí)行上述三條命令,解壓并進入目錄,生成安裝包。Kafka-manager使用了Play框架,如果之前從來沒有用過Play框架,下載依賴的jar包會花費較長的時間(真的不是一般的久,我花了2個小時。。。),想要加快這個過程可以將源配置為aliyun的私服。
操作如下,通過cd ~進入當(dāng)前用戶目錄,然后通過命令mkdir .sbt創(chuàng)建.sbt目錄,進入創(chuàng)建的該目錄,使用vi創(chuàng)建repositories文件,編輯內(nèi)容如下:
[repositories]
local
aliyun: http://maven.aliyun.com/nexus/content/groups/public
typesafe: http://repo.typesafe.com/typesafe/ivy-releases/, [organization]/[module]/(scala_[scalaVersion]/)(sbt_[sbtVersion]/)[revision]/[type]s/[artifact](-[classifier]).[ext], bootOnly
待命令執(zhí)行完成后,在target/universal 目錄中會生成一個zip壓縮包kafka-manager-1.3.3.18.zip。
解壓并進入該壓縮包
unzip kafka-manager-1.3.3.18.zip
cd kafka-manager-1.3.3.18
修改配置信息
vi conf/application.conf
修改配置文件中的zookeeper相關(guān)配置項kafka-manager.zkhosts,修改完成后通過下述命令啟動工程
bin/kafka-manager
啟動默認(rèn)使用9000端口,若要指定其它端口也可以通過以下命令啟動
bin/kafka-manager -Dconfig.file=/path/to/application.conf -Dhttp.port=9000
接著便可以通過http://ip:9000地址訪問

3-總結(jié)
本文介紹了Kafka的集群原理,記錄了Kafka集群的部署過程,以及可視化監(jiān)控頁面kafka manager的安裝過程。