創(chuàng)建docker nerwork
docker network create -d bridge mynetwork
docker compose安裝Kafka集群版
- 調(diào)整IP地址KAFKA_CFG_ADVERTISED_LISTENERS
- 調(diào)整volumes文件掛載地址
docker-compose.yml
version: "3"
services:
kafka1:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka11
user: root
ports:
- 9192:9092
- 9193:9093
environment:
### 通用配置
# 允許使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制類請求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定義kafka服務端socket監(jiān)聽端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定義安全協(xié)議
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka時的集群id,集群內(nèi)的Kafka都要用這個id做初始化,生成一個UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允許使用PLAINTEXT監(jiān)聽器,默認false,不建議在生產(chǎn)環(huán)境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 設置broker最大內(nèi)存,和初始內(nèi)存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允許自動創(chuàng)建主題
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定義外網(wǎng)訪問地址(宿主機ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9192
# broker.id,必須唯一
- KAFKA_BROKER_ID=1
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka1/kafka/kraft:/bitnami/kafka
#extra_hosts:
#- "kafka1:云服務器IP"
#- "kafka2:云服務器IP"
#- "kafka3:云服務器IP"
kafka2:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka22
user: root
ports:
- 9292:9092
- 9293:9093
environment:
### 通用配置
# 允許使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制類請求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定義kafka服務端socket監(jiān)聽端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定義安全協(xié)議
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka時的集群id,集群內(nèi)的Kafka都要用這個id做初始化,生成一個UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允許使用PLAINTEXT監(jiān)聽器,默認false,不建議在生產(chǎn)環(huán)境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 設置broker最大內(nèi)存,和初始內(nèi)存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允許自動創(chuàng)建主題
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定義外網(wǎng)訪問地址(宿主機ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9292
# broker.id,必須唯一
- KAFKA_BROKER_ID=2
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka2/kafka/kraft:/bitnami/kafka
kafka3:
image: 'bitnami/kafka:3.3.1'
network_mode: mynetwork
container_name: kafka33
user: root
ports:
- 9392:9092
- 9393:9093
environment:
### 通用配置
# 允許使用kraft,即Kafka替代Zookeeper
- KAFKA_ENABLE_KRAFT=yes
# kafka角色,做broker,也要做controller
- KAFKA_CFG_PROCESS_ROLES=broker,controller
# 指定供外部使用的控制類請求信息
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
# 定義kafka服務端socket監(jiān)聽端口
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
# 定義安全協(xié)議
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 使用Kafka時的集群id,集群內(nèi)的Kafka都要用這個id做初始化,生成一個UUID即可
- KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
# 集群地址
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@kafka11:9093,2@kafka22:9093,3@kafka33:9093
# 允許使用PLAINTEXT監(jiān)聽器,默認false,不建議在生產(chǎn)環(huán)境使用
- ALLOW_PLAINTEXT_LISTENER=yes
# 設置broker最大內(nèi)存,和初始內(nèi)存
- KAFKA_HEAP_OPTS=-Xmx512M -Xms256M
# 不允許自動創(chuàng)建主題
- KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
### broker配置
# 定義外網(wǎng)訪問地址(宿主機ip地址和端口)
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://10.10.201.68:9392
# broker.id,必須唯一
- KAFKA_BROKER_ID=3
volumes:
- /Users/wenyaobo/Src/kafka_data/kafka3/kafka/kraft:/bitnami/kafka
docker-compose.yml文件目錄下執(zhí)行
docker-compose -f docker-compose.yml up
docker compose安裝Kafka可視化頁面
version: "3"
services:
kafka-ui:
image: provectuslabs/kafka-ui:latest
network_mode: mynetwork
container_name: kafka-ui
restart: always
ports:
- 9091:8080
volumes:
- /Users/wenyaobo/Src/kafka_ui/etc/localtime:/etc/localtime
environment:
# 集群名稱
- KAFKA_CLUSTERS_0_NAME=local
# 集群地址
- KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka11:9092,kafka22:9092,kafka33:9092
docker-compose.yml文件目錄下執(zhí)行
docker-compose -f docker-compose.yml up
kafka操作
#進入容器
docker exec -it kafka11 /bin/bash
#進入目錄
cd /opt/bitnami/kafka/bin/
#創(chuàng)建topic
kafka-topics.sh --create --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --replication-factor 3 --partitions 3 --topic test
Created topic test.
#查看所有Topic
kafka-topics.sh --list --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092
test
#查看topic詳情
kafka-topics.sh --describe --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --topic test
Topic: test TopicId: yiKjk9VTTZqVolLOEbZrbw PartitionCount: 3 ReplicationFactor: 1 Configs: min.insync.replicas=1,cleanup.policy=delete,retention.ms=86400000,retention.bytes=-1
Topic: test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
Topic: test Partition: 2 Leader: 2 Replicas: 2 Isr: 2
# 啟動一個生產(chǎn)者(輸入消息)
kafka-console-producer.sh --broker-list kafka11:9092,kafka22:9092,kafka33:9092 --topic test
[等待輸入自己的內(nèi)容 出現(xiàn)>輸入即可]
>i am a new msg !
>i am a good msg ?
# 啟動一個消費者(等待消息)
# 注意這里的--from-beginning,每次都會從頭開始讀取,你可以嘗試去掉和不去掉看下效果
kafka-console-consumer.sh --bootstrap-server kafka11:9092,kafka22:9092,kafka33:9092 --topic test --from-beginning
[等待消息]
i am a new msg !
i am a good msg ?