分布式消息隊列 分為生產者和消費者
scala寫的
集群部署:
1.安裝zookeeper
2.解壓kafka
3.修改配置文件 vi /conf/server.properties
#id,每臺機器id不一致 可用0,1,2
broker.id=0
#端口號
prot=9092
#host每臺機器對用自己的ip
host.name=ip1
#與上方保持一致
advertised.host.name=ip1
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/myfile/kafka/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
#注:此處/kafka是將kafka數據全部存入zk中/kafka下,所以下方連接shell操作時也必須用zkip:2181/kafka才能正確操作
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
zookeeper.connection.timeout.ms=6000
4.啟動
每臺機器輸入
bin/kafka-server-start.sh config/server.properties
5.shell操作(如果上方配置在zk/kafka下,下方的連接zk全部要加/kafka)
查看當前服務器中的所有topic
bin/kafka-topics.sh --list --zookeeper zk01:2181(/kafka)
創(chuàng)建topic
bin/kafka-topics.sh --create --zookeeper zk01:2181 --replication-factor 1 --partitions 1 --topic test
刪除topic
sh bin/kafka-topics.sh --delete --zookeeper zk0 --topic test
需要server.properties中設置delete.topic.enable=true否則只是標記刪除或者直接重啟。
通過shell命令發(fā)送消息
kafka-console-producer.sh --broker-list kafka01:9092 --topic itheima
通過shell消費消息
sh bin/kafka-console-consumer.sh --zookeeper zk01:2181 --from-beginning --topic test1
查看消費位置
sh kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
查看某個Topic的詳情
sh kafka-topics.sh --topic test --describe --zookeeper zk01:2181
對分區(qū)數進行修改
kafka-topics.sh --zookeeper zk01 --alter --partitions 15 --topic utopic