Kafka是一個(gè)分布式流媒體平臺。發(fā)布和訂閱記錄流,類似于消息隊(duì)列或企業(yè)消息傳遞系統(tǒng)。以容錯(cuò)持久的方式存儲記錄流。處理記錄發(fā)生的流。本文講述在三臺主機(jī)上安裝kafka集群的主要步驟
主要內(nèi)容:
- 1.啟動Zookeeper
- 2.安裝Kafka
- 3.測試
- 4.其它
- 5.一鍵腳本
集群規(guī)劃如下:
| 用戶 | 主機(jī)名 | ip | 進(jìn)程 |
|---|---|---|---|
| hadoop | hadoop1 | 192.168.2.111 | Zookeeper、Kafka |
| hadoop | hadoop2 | 192.168.2.112 | Zookeeper、Kafka |
| hadoop | hadoop3 | 192.168.2.113 | Zookeeper、Kafka |
1.啟動Zookeeper
之前Zookeeper集群安裝已經(jīng)安裝好了,現(xiàn)在只需要啟動即可
可以使用腳本批量啟動,也可以多窗口運(yùn)行命令啟動,啟動后的狀態(tài)如下:

使用客戶端創(chuàng)建一個(gè)/kafka目錄來存放kafka相關(guān)的配置文件
./bin/zkCli.sh
創(chuàng)建kafka節(jié)點(diǎn)來存放kafka的配置文件
create /kafka ''
1.2.安裝Kafka
1.2.1.下載
下載地址:傳送們
根據(jù)自己的Scala版本下載相應(yīng)的kafka版本即可,如果沒有,就自己編譯

1.2.2.上傳解壓
tar -zxvf kafka_2.11-1.0.1.tgz -C /opt/soft
1.2.3.配置啟動
1、修改config/server.properties文件如下:
#必須 設(shè)置broker.id(從0開始,3個(gè)節(jié)點(diǎn)分別設(shè)為0,1,2,不能重復(fù))
broker.id=0
#可選 用來監(jiān)聽鏈接的端口,producer 或 consumer 將在此端口建立連接
port=9092
#可選 日志文件目錄
log.dirs=/opt/soft/kafka_2.11-1.0.1/kafka-logs
#可選 在當(dāng)前 broker 上的partition數(shù)量
num.partitions=1
#必須 Zookeeper服務(wù)器
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
這里需要說明的是,默認(rèn)Kafka會使用ZooKeeper默認(rèn)的根 "/" 路徑,這樣有關(guān)Kafka的配置就會散落在ZooKeeper根路徑下面,如果你有其他的應(yīng)用也在使用ZooKeeper集群,查看ZooKeeper中數(shù)據(jù)可能會不直觀,所以強(qiáng)烈建議指定一個(gè)路徑,這就是我們?yōu)槭裁从脄ookeeper客戶端創(chuàng)建/kafka節(jié)點(diǎn)的原因,然后直接在zookeeper.connect配置項(xiàng)中指定:
zookeeper.connect= node2:2181,node3:2181,node4:2181/kafka
2、將kafka_2.11-1.0.1拷貝到其它主機(jī)(hadoop2、hadoop3)
scp -r kafka_2.11-1.0.1/ hadoop@hadoop2:/opt/soft
scp -r kafka_2.11-1.0.1/ hadoop@hadoop3:/opt/soft
3、修改其它主機(jī)(hadoop2、hadoop3)的broker.id
broker.id=1
broker.id=2
4、啟動
可以編寫一鍵啟動腳本,也可以批量窗口操作,運(yùn)行如下命令:
nohup bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
5、查看是否啟動成功
jps

如圖,每臺機(jī)器都應(yīng)該有如上圖的2個(gè)進(jìn)程
3.測試
3.1.創(chuàng)建、查看、生產(chǎn)、消費(fèi)
# 創(chuàng)建一個(gè)叫做“TEST”的topic,它有3個(gè)分區(qū),3個(gè)副本
./bin/kafka-topics.sh --create --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic TEST --partitions 3 --replication-factor 3
# 查看Topics
./bin/kafka-topics.sh --list --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092
./bin/kafka-topics.sh --describe --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092
# 創(chuàng)建一個(gè)生成者往TEST寫數(shù)據(jù)
./bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic TEST
# 創(chuàng)建一個(gè)消費(fèi)者從頭開始消費(fèi)
./bin/kafka-console-consumer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic TEST --from-beginning

Topic名為TEST,有3個(gè)Partition ,有3個(gè)副本
編號為0的Partition,Leader在broker.id=1這個(gè)節(jié)點(diǎn)上,負(fù)責(zé)該P(yáng)artition的讀寫,副本在broker.id為1、0、2這個(gè)三個(gè)節(jié)點(diǎn)上,Isr表示所有存活的副本,并跟broker.id=1這個(gè)節(jié)點(diǎn)同步

3.2.刪除
1、刪除topic
./bin/kafka-topics.sh --delete --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka --topic TEST
4.其它
- 如遇到問題可以在前面設(shè)置的日志文件夾log.dirs里查看啟動日志
- 查看Zookeeper 下 /kafka里的配置信息
ls /kafka/brokers/ids

5.一鍵腳本
設(shè)置環(huán)境變量(三臺主機(jī))
export KAFKA_HOME=/opt/soft/kafka_2.11-1.0.1/
export PATH=$PATH:$KAFKA_HOME/bin
startkafka.sh 一鍵啟動腳本(設(shè)置Kafka的環(huán)境變量)
cat ./slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;nohup kafka-server-start.sh /opt/soft/kafka_2.11-1.0.1/config/server.properties >/dev/null 2>&1 &"
}&
wait
done
stopkafka.sh 一鍵停止腳本(設(shè)置Kafka的環(huán)境變量)
cat ./slave | while read line
do
{
echo $line
ssh $line "source /etc/profile;jps |grep Kafka |cut -c 1-4 |xargs kill -s 9 "
}&
wait
done
slave
hadoop1
hadoop2
hadoop3