一.前期準備
1.1 Win7官網(wǎng)下載kafka包
本文使用版本kafka_2.10-0.10.1.0.tgz
1.2 配置jdk、scala、zookeeper
jdk1.7:linux jdk安裝和配置
scala2.10.6:linux scala安裝和配置
zookeeper3.4.9:zookeeper3.49集群安裝和配置
jdk,scala,kafka版本要對應(yīng)
1.3 centos7集群服務(wù)器
主機名 ? ?系統(tǒng) ? ? ? ? ?IP地址
master ? ?centos7 ? ?192.168.32.128
slave01 ? centos7 ? ?192.168.32.131
slave02 ? centos7 ? ? 192.168.32.132
二.kafka集群搭建
以下操作只針對master主機服務(wù)器,其他主機服務(wù)器類似。
2.1 上傳kafka包至 /opt/software目錄
2.2 解壓和拷貝kafka至 /usr/local/kafka
cd /opt/software
tar -zxvf kafka_2.10-0.10.1.0.tgz
cp -r kafka_2.10-0.10.1.0 /usr/local/kafka

三.kafka集群配置
3.1 server.properties文件配置
進入kafka中的config目錄
vi server.properties
#switch to enable topic deletion or not, default value is false
delete.topic.enable=true
#本機對應(yīng)的ip地址
listeners=PLAINTEXT://192.168.32.128:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.32.128:2181,192.168.32.132:2181,192.168.32.131:2181

注意:相關(guān)配置項不要重復(fù)。
3.2 新增/tmp/kafka-logs
mkdir -p /tmp/kafka-logs
至此 master主機服務(wù)器kafka已搭建完成。
3.3 其他服務(wù)器kafka搭建
搭建192.168.32.131/slave01和192.168.32.132/slave02服務(wù)器的kafka
拷貝master服務(wù)器的kafka至slave01和slave02的 /usr/local/目錄
master服務(wù)器:
cd /usr/local
scp -r kafka root@192.168.32.131:/usr/local/
slave01服務(wù)器:
類似3.1配置server.properties
注意:
broker.id=1,要與master中broker.id區(qū)別
listeners=PLAINTEXT://192.168.32.131:9092,需要改成本機對應(yīng)ip

slave02服務(wù)器配置類似。
四.kafka集群測試
4.1 測試命令
Step 1: Start the server
后臺方式啟動,推薦第一次配置的新手不要加入-daemon參數(shù),看看控制臺輸出的是否有success.
bin/kafka-server-start.sh -daemon config/server.properties
Step 2: Create a topic(replication-factor一定要大于1,否則kafka只有一份數(shù)據(jù),leader一旦崩潰程序就沒有輸入源了,分區(qū)數(shù)目視輸入源而定)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic topicTest
Step 3: Describe a topic
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic topicTest
step 4: list the topic
bin/kafka-topics.sh --list --zookeeper localhost:2181
step 5: send some message
bin/kafka-console-producer.sh --broker-list localhost:2181 --topic topicTest
step 6: start a consumer
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic topicTest --from-beginning
step 7: delete a topic
要事先在 serve.properties 配置 delete.topic.enable=true
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicTest
# 如果仍然只是僅僅被標記了刪除(zk中并沒有被刪除),那么啟動zkCli.sh,輸入如下指令
rmr /brokers/topics/topicTest
4.2 集群測試
step1:啟動zookeeper集群
step2:啟動kafka集群
三臺服務(wù)器分別運行啟動命令
bin/kafka-server-start.sh -daemon config/server.properties
jps查看進程

注意:kafka是kafka進程,QuorumPeerMain是zookeeper進程
step3. 創(chuàng)建主題和查看主題
bin/kafka-topics.sh --create --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --replication-factor 3 --partitions 3 --topic topicTest
bin/kafka-topics.sh --list --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181

step4. 啟動消息生產(chǎn)者和消息消費者
master服務(wù)器
bin/kafka-console-producer.sh --broker-list 192.168.32.128:9092,192.168.32.131:9092,192.168.32.132:9092 --topic topicTest
slave01或slave02服務(wù)器
bin/kafka-console-consumer.sh --zookeeper 192.168.32.128:2181,192.168.32.131:2181,192.168.32.132:2181 --topic topicTest --from-beginning
master服務(wù)器輸入信息,slave01或slave02會顯示master輸?shù)牡男畔?/p>

