一、Kafka的安裝:
1.準(zhǔn)備工作:
- 安裝jdk
- 安裝Zookeeper
2.Kafka集群部署:
1)解壓安裝包
[honey@hadoop102 software]$ tar -zxvf kafka_2.11-0.11.0.0.tgz -C /opt/module/
2)修改解壓后的文件名稱
[honey@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka
3)在/opt/module/kafka目錄下創(chuàng)建logs文件夾
[honey@hadoop102 kafka]$ mkdir logs
4)修改配置文件
[honey@hadoop102 kafka]$ cd config/
[honey@hadoop102 config]$ vi server.properties
輸入以下內(nèi)容:
#broker的全局唯一編號(hào),不能重復(fù)
broker.id=0
#刪除topic功能使能
delete.topic.enable=true
#處理網(wǎng)絡(luò)請(qǐng)求的線程數(shù)量
num.network.threads=3
#用來處理磁盤IO的現(xiàn)成數(shù)量
num.io.threads=8
#發(fā)送套接字的緩沖區(qū)大小
socket.send.buffer.bytes=102400
#接收套接字的緩沖區(qū)大小
socket.receive.buffer.bytes=102400
#請(qǐng)求套接字的緩沖區(qū)大小
socket.request.max.bytes=104857600
#kafka運(yùn)行日志存放的路徑
log.dirs=/opt/module/kafka/logs
#topic在當(dāng)前broker上的分區(qū)個(gè)數(shù)
num.partitions=1
#用來恢復(fù)和清理data下數(shù)據(jù)的線程數(shù)量
num.recovery.threads.per.data.dir=1
#segment文件保留的最長時(shí)間,超時(shí)將被刪除
log.retention.hours=168
#配置連接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181
5)配置環(huán)境變量
[root@hadoop102 module]# vi /etc/profile
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin
[root@hadoop102 module]# source /etc/profile
6)分發(fā)安裝包
[root@hadoop102 etc]# xsync profile
[honey@hadoop102 module]$ xsync kafka/
7)分別在hadoop103和hadoop104上修改配置文件/opt/module/kafka/config/server.properties中的broker.id=1、broker.id=2
注:broker.id不得重復(fù)
8)啟動(dòng)集群
依次在hadoop102、hadoop103、hadoop104節(jié)點(diǎn)上啟動(dòng)kafka
[honey@hadoop102 kafka]$ bin/kafka-server-start.sh config/server.properties &
[honey@hadoop103 kafka]$ bin/kafka-server-start.sh config/server.properties &
[honey@hadoop104 kafka]$ bin/kafka-server-start.sh config/server.properties &
9)關(guān)閉集群
[honey@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
[honey@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
[honey@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
二、Kafka命令行操作
1)查看當(dāng)前服務(wù)器中的所有topic
[honey@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
2)創(chuàng)建topic
[honey@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor
3 --partitions 1 --topic first
選項(xiàng)說明:
--topic 定義topic名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)
3)刪除topic
[honey@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
需要server.properties中設(shè)置delete.topic.enable=true否則只是標(biāo)記刪除或者直接重啟。
4)發(fā)送消息
[honey@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>hello world
>honey honey
5)消費(fèi)消息
[honey@hadoop103 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --from-beginning --topic first
# --from-beginning:會(huì)把first主題中以往所有的數(shù)據(jù)都讀取出來。根據(jù)業(yè)務(wù)場景選擇是否增加該配置。
6)查看某個(gè)Topic的詳情
[honey@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first