kafka 集群
機器環(huán)境
系統(tǒng)版本
centos6.6
機器三臺搭建集群
10.1.1.11
10.1.1.12
10.1.1.13
以下安裝的軟件將分別在三臺機器進行安裝,下面只用一臺機器作為例子進行安裝。不同的地方會有說明
zookeeper 安裝
下載并解壓zk
download 地址: https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/
cd /home/root/
wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
tar xf zookeeper-3.4.9.tar.gz
cd zookeeper-3.4.9
創(chuàng)建快照日志和日志目錄
mkdir /Data/zookeeper/data -pv
mkdir /Data/zookeeper/logs -pv
安裝并配置
安裝目錄: /Data/apps/
cp -a /root/zookeeper-3.4.9 /Data/apps/zookeeper
配置文件修改
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/Data/zookeeper/data
dataLogDir=/Data/zookeeper/logs
clientPort=2181
server.1=10.1.1.11:2888:3888
server.2=10.1.1.12:2888:3888
server.3=10.1.1.13:2888:3888
zk 標識server id
目錄地址: /Data/zookeeper/data/myid
在目錄中創(chuàng)建文件myid文件 每個文件中分別寫入當前機器的server id
例如這個機器10.1.1.11
將分別在三臺機器上執(zhí)行
echo 1 >> /Data/zookeeper/data/myid
echo 2 >> /Data/zookeeper/data/myid
echo 3 >> /Data/zookeeper/data/myid
啟動zookeeper
/Data/apps/zookeeper/bin/zkServer.sh start
檢測狀態(tài)
在各個節(jié)點上分別執(zhí)行如下指令,可看到其中有l(wèi)eader和follower,即搭建成功
/Data/apps/zookeeper/bin/zkServer.sh status
kafka安裝
DOWNLOAD 地址: http://kafka.apache.org/downloads.html
下載 kafka
安裝目錄: /Data/apps/kafka
分別在三體機器上進行安裝
wget https://archive.apache.org/dist/kafka/0.9.0.1/kafka_2.10-0.9.0.1.tgz
tar xf kafka_2.10-0.9.0.1.tgz
cp -a kafka_2.11-0.9.0.1 /Data/apps/kafka
建立日志目錄
mkdir /Data/kafka/kafka-logs
配置文件 kafka
broker.id=0 #集群節(jié)點的標示符,不得重復。取值范圍0~n
host.name=10.1.1.11 #三個機器分別修改為自己的IP地址
port=9092
zookeeper.connect=10.1.1.11:2181,10.1.1.12:2181,10.1.1.13:2181
default.replication.factor=2
num.network.threads=3
num.io.threads=8
num.partitions=1
num.recovery.threads.per.data.dir=1
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/Data/kafka/kafka-logs
log.retention.check.interval.ms=300000
log.cleanup.policy=delete #日志的清除策略:直接刪除
log.retention.hours=72 #日志保存時間為3天
log.segment.bytes=1073741824 #每個日志文件的最大的大小,這里為1GB
delete.topic.enable=true #通過配置此項使得刪除topic的指令生效
zookeeper.connection.timeout.ms=6000
啟動 kafka
這里注意,記得要先啟動zookeeper。確保zookeeper啟動以后再執(zhí)行kafka的啟動命令
/Data/apps/kafka/bin/kafka-server-start.sh -daemon /Data/apps/kafka/config/server.properties
ps aux |grep kafka
kafka 常用操作命令
Check service
推薦第一次啟動先不要加上-daemon參數(shù) 觀察一下控制臺輸出是否有success
/Data/apps/kafka/bin/kafka-server-start.sh /Data/apps/kafka/config/server.properties
Create topic
bin/kafka-topics.sh --create --zookeeper 10.1.1.11:2181 --replication-factor 3 --partitions 1 --topic bdstest
Describe a topic
bin/kafka-topics.sh --describe --zookeeper 10.1.1.11:2181 --topic bdstest
List the topic
bin/kafka-console-producer.sh --broker-list 10.1.1.11:9092 --topic bdstest
Start a consumer
bin/kafka-console-consumer.sh --zookeeper 10.1.1.11:2181 --topic bdstest --from-beginning
Start a consumer
bin/kafka-console-consumer.sh --zookeeper 10.1.1.12:2181 --topic nodeHlsTest --from-beginning
Delete a topic
要事先在serve.properties 配置 delete.topic.enable=true
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic bdstest
如果仍然只是僅僅被標記了刪除(zk中并沒有被刪除),那么啟動zkCli.sh,輸入如下指令
/Data/apps/zookeeper/bin/zkCli.sh #敲回車進入zookeeper管理終端
[zk: localhost:2181(CONNECTED) 0] ls /brokers/topics
[test1, test2]
[zk: localhost:2181(CONNECTED) 1] rmr /brokers/topics/test1