搭建zookeeper、kafka集群
1、安裝jdk環(huán)境變量(略)(3臺(tái)機(jī)器)
export JAVA_HOME=/usr/local/java
export PATH=$JAVA_HOME/bin:$JAVA_HOME/jre/bin:$PATH
export CLASSPATH=.$CLASSPATH:$JAVA_HOME/lib:$JAVA_HOME/jre/lib:$JAVA_HOME/lib/tools.jar
2、解壓好zookeeper二進(jìn)制包后、修改配置文件(3臺(tái)機(jī)器)
node1 節(jié)點(diǎn)操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper1
cd /usr/local/zookeeper1/config cp zoo_sample.cfg zoo.cfg
編輯zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper1/data
dataLogDir=/usr/local/zookeeper1/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper1/data
echo "1" > myid (在第一臺(tái)機(jī)器操作)
node2 節(jié)點(diǎn)操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper2
cd /usr/local/zookeeper2/config
cp zoo_sample.cfg zoo.cfg
編輯zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper2/data
dataLogDir=/usr/local/zookeeper2/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper2/data echo "2" > myid (在第二臺(tái)機(jī)器操作)
node3節(jié)點(diǎn)操作
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz -C /usr/local/src/
ln -s /usr/local/src/apache-zookeeper-3.6.0-bin/ /usr/local/zookeeper3
cd /usr/local/zookeeper3/config
cp zoo_sample.cfg zoo.cfg
編輯zoo.cfg配置文件
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/usr/local/zookeeper3/data
dataLogDir=/usr/local/zookeeper3/logs
clientPort=2181
server.1=192.168.7.114:2888:3888 server.2=192.168.7.115:2888:3888 server.3=192.168.7.116:2888:3888
保存退出!
cd /usr/local/zookeeper1/data echo "3" > myid (在第三臺(tái)機(jī)器操作)
3、三臺(tái)機(jī)器添加環(huán)境變量配置
vim /etc/profile
添加一下配置
zookeeper
export ZOOKEEPER_HOME=/usr/local/zookeeper1
export PATH=$ZOOKEEPER_HOME/bin:$PATH
export PATH source /etc/profile
4、然后啟動(dòng)zookeeper服務(wù)(三臺(tái)機(jī)器)
zookeeper集群成功搭建
kafka集群
1、解壓kafka二進(jìn)制包(3臺(tái)機(jī)器)
tar -zxvf kafka_2.12-2.6.0.tgz -C /usr/local/src/ ln -s /usr/local/src/kafka_2.12-2.6.0/ /usr/local/kafka1
node1節(jié)點(diǎn)操作
2、修改配置文件
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=1
listeners=PLAINTEXT://192.168.7.114:9092
advertised.listeners=PLAINTEXT://192.168.7.114:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true
log.dirs=/usr/local/kafka1/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.114:9092 group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.114:9092 compression.type=none
node2節(jié)點(diǎn)操作
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=2
listeners=PLAINTEXT://192.168.7.115:9092
advertised.listeners=PLAINTEXT://192.168.7.115:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true log.dirs=/usr/local/kafka2/
data num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000 group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.115:9092 group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.115:9092 compression.type=none
node3節(jié)點(diǎn)操作
vim /usr/local/kafka1/config/server.properties
配置文件如下
broker.id=3
listeners=PLAINTEXT://192.168.7.116:9092
advertised.listeners=PLAINTEXT://192.168.7.116:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
delete.topic.enable=true
log.dirs=/usr/local/kafka3/data
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.7.114:2181,192.168.7.115:2181,192.168.7.116:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
修改consumer.properties配置文件
bootstrap.servers=192.168.7.116:9092
group.id=test-consumer-group
修改producer.properties配置文件
bootstrap.servers=192.168.7.116:9092
compression.type=none
3、三臺(tái)機(jī)器啟動(dòng)kafka服務(wù)
./kafka-server-start.sh -daemon ../config/server.properties
查看是否配置成功,在隨意一臺(tái)節(jié)點(diǎn)創(chuàng)建一個(gè)topic,測(cè)試其他兩臺(tái)是否會(huì)同步topic
常用命令
啟動(dòng)kafka
./kafka-server-start.sh ../config/server.properties
停止kafka
./kafka-server-stop.sh
創(chuàng)建kafka
./kafka-topics.sh --create --zookeeper 192.168.7.114:2181 --replication-factor 3 --partitions 3 --topic test
列出所有的topic
./kafka-topics.sh -list -zookeeper 127.0.0.1:2181
查看topic分區(qū)副本詳細(xì)信息
./kafka-topics.sh --describe --zookeeper 192.168.7.114:2181 --topic test
kafka的消息所在的位置Topic、Partitions、Offsets三個(gè)因素決定。
kafka2種消費(fèi)模式
點(diǎn)對(duì)點(diǎn)模式:隊(duì)列中的一條消息由一個(gè)專(zhuān)門(mén)的消費(fèi)者進(jìn)行消費(fèi),消費(fèi)者受到這條消息并確認(rèn)后,隊(duì)列就會(huì)刪除這條信息,防止重復(fù)訪問(wèn)
發(fā)布/訂閱模式
生產(chǎn)者將數(shù)據(jù)推送入隊(duì)列,同一條消息會(huì)被所有消費(fèi)者消費(fèi),而消費(fèi)有兩種情況:
消費(fèi)者主動(dòng)拉取消息
隊(duì)列向消費(fèi)者推送信息