Kafka 是一個(gè)分布式消息系統(tǒng),具有高水平擴(kuò)展和高吞吐量的特點(diǎn)。在Kafka 集群中,沒有 “中心主節(jié)點(diǎn)” 的概念,集群中所有的節(jié)點(diǎn)都是對等的。

Kafka 幾大概念
Topic(主題)
Kafka 中可將消息分類,每一類的消息稱為一個(gè) Topic,消費(fèi)者可以對不同的 Topic 進(jìn)行不同的處理。
Broker(代理)
每個(gè) Broker 即一個(gè) Kafka 服務(wù)實(shí)例,多個(gè) Broker 構(gòu)成一個(gè) Kafka 集群,生產(chǎn)者發(fā)布的消息將保存在 Broker 中,消費(fèi)者將從 Broker 中拉取消息進(jìn)行消費(fèi)。
Producer(生產(chǎn)者)
負(fù)責(zé)生產(chǎn)消息并發(fā)送給 Broker 。
Consumer(生產(chǎn)者)
負(fù)責(zé)消費(fèi) Broker 中 Topic 消息,每個(gè) Consumer 實(shí)例歸屬于一個(gè) Consumer Group 查看更多介紹
Partition(分區(qū))
Partition 是 Kafka 中比較特色的部分,一個(gè) Topic 可以分為多個(gè) Partition,每個(gè) Partition 是一個(gè)有序的隊(duì)列,Partition 中的每條消息都存在一個(gè)有序的偏移量(Offest) ,同一個(gè) Consumer Group 中,只有一個(gè) Consumer 實(shí)例可消費(fèi)某個(gè) Partition 的消息。
Kafka 持久化
每個(gè) Topic 將消息分成多 Partition,每個(gè) Partition 在存儲層面是 append log 文件。任何發(fā)布到此 Partition 的消息都會被直接追加到 log 文件的尾部,每條消息在文件中的位置稱為 Offest(偏移量),Partition 是以文件的形式存儲在文件系統(tǒng)中,log 文件根據(jù) Broker 中的配置保留一定時(shí)間后刪除來釋放磁盤空間。

Kafka 集群架構(gòu)圖

從圖中可以看出 Kafka 強(qiáng)依賴于 ZooKeeper ,通過 ZooKeeper 管理自身集群,如:Broker 列表管理、Partition 與 Broker 的關(guān)系、Partition 與 Consumer 的關(guān)系、Producer 與 Consumer 負(fù)載均衡、消費(fèi)進(jìn)度 Offset 記錄、消費(fèi)者注冊 等,所以為了達(dá)到高可用,ZooKeeper 自身也必須是集群。
官方提供的 Kafka 的下載包(Binary downloads) 內(nèi)默認(rèn)包含了 ZooKeeper 模塊,所以可以選擇不單獨(dú) 下載 ZooKeeper,不過實(shí)際情況可能還是需要單獨(dú)下載 ZooKeeper 來部署,畢竟環(huán)境的資源要求都不一樣。
下面的例子將基于 Windows 環(huán)境采用單獨(dú)搭建 ZooKeeper 的方式,Linux 環(huán)境基本類似,推薦用 Docker ,更簡單一些。
Kafka 環(huán)境搭建依賴 Java 環(huán)境,所以需要預(yù)先安裝好 JDK
ZooKeeper 集群
因?yàn)槭峭慌_機(jī)器上完成搭建,所以采用復(fù)制多份來完成
下載 解壓后復(fù)制出3份,zookeeper-1、zookeeper-2、zookeeper-3,創(chuàng)建 data 目錄存放數(shù)據(jù),創(chuàng)建 log 目錄存放日志;
-
在 data 的 zk1、zk2、zk2 目錄中分別創(chuàng)建文件 myid,文件內(nèi)容分別寫入 1、2、3;
zookeeper dir -
重命名 zookeeper-1/conf 文件下 zoo_sample.cfg => zoo.cfg,修改如下:
# 數(shù)據(jù)存放目錄 dataDir=E:\\cluster_zookeeper\\data\\zk1 # 日志存放目錄 dataLogDir=E:\\cluster_zookeeper\\log\\zk1 # 監(jiān)聽端口 clientPort=2181 # 集群配置 # server.x 分別對應(yīng)myid文件的內(nèi)容(每個(gè) zoo.cfg 文件都需要添加) # 2287(通訊端口):3387(選舉端口) server.1=localhost:2287:3387 server.2=localhost:2288:3388 server.3=localhost:2289:3389 -
zookeeper-2、zookeeper-3 類似 zookeeper-1 進(jìn)行調(diào)整,dataDir、dataLogDir、clientPort 都需要調(diào)整:
dataDir=E:\\cluster_zookeeper\\data\\zk2 dataLogDir=E:\\cluster_zookeeper\\log\\zk2 clientPort=2182 server.1=localhost:2287:3387 server.2=localhost:2288:3388 server.3=localhost:2289:3389dataDir=E:\\cluster_zookeeper\\data\\zk3 dataLogDir=E:\\cluster_zookeeper\\log\\zk3 clientPort=2183 server.1=localhost:2287:3387 server.2=localhost:2288:3388 server.3=localhost:2289:3389 -
啟動 ZooKeeper
分別通過 cmd 進(jìn)入每個(gè) ZooKeeper 實(shí)例的 bin 文件夾,輸入命令:
zkServer從日志中發(fā)現(xiàn),目前 1、3 為 Follower,2 為 Leader:
start zookeeper cluster
Kafka 集群
-
下載 解壓后復(fù)制出3份,kafka-1、kafka-2、kafka-3,創(chuàng)建 log 目錄存放消息日志;
kafka dir -
修改 kafka-1/config 文件夾下的 server.properties :
# broker 編號,集群內(nèi)必須唯一 broker.id=1 # host 地址 host.name=127.0.0.1 # 端口 port=9092 # 消息日志存放地址 log.dirs=E:\\cluster_kafka\\log\\k1 # ZooKeeper 地址,多個(gè)用,分隔 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 -
kafka-2、kafka-3 類似 kafka-1 進(jìn)行調(diào)整
broker.id=2 host.name=127.0.0.1 port=9093 log.dirs=E:\\cluster_kafka\\log\\k2 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183broker.id=3 host.name=127.0.0.1 port=9094 log.dirs=E:\\cluster_kafka\\log\\k3 zookeeper.connect=localhost:2181,localhost:2182,localhost:2183 -
啟動 Kafka
分別通過 cmd 進(jìn)入每個(gè) Kafka 實(shí)例,輸入命令啟動:
bin\windows\kafka-server-start.bat .\config\server.propertiesstart kafka cluster
注意:如果出現(xiàn)以下錯(cuò)誤:
找不到或無法加載主類 Files\Java\jdk1.8.0...;
解決方案:在 bin\windows\kafka-run-class.bat 中找到 "set COMMAND=%JAVA%" 行,給 %CLASSPATH% 加上雙引號
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
通過以上的步驟,Kafka 集群搭建完成,接下來可以通過命令行或 Kafka 的 Java、C# 等語言的客戶端進(jìn)行消息的生產(chǎn)和消費(fèi)測試。
測試
-
創(chuàng)建 testTopic
.\bin\windows\kafka-topics.bat --create --zookeeper localhost:2181,localhost:2182,localhost:2183 --replication-factor 1 --partitions 1 --topic testTopic -
生產(chǎn)消息
.\bin\windows\kafka-console-producer.bat --broker-list localhost:9092,localhost:9093,localhost:9094 --topic testTopic -
消費(fèi)消息
.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092,localhost:9093,localhost:9094 --topic testTopictest result


