
(1)Zookeeper安裝
https://downloads.apache.org/zookeeper/
(1)更新系統(tǒng)的包管理器
sudo yum update
(2)安裝JDK
sudo yum install java-1.8.0-openjdk-devel
(3)下載ZooKeeper
cd /usr/local/
wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz
(4)解壓ZooKeeper
tar -xvf apache-zookeeper-3.7.1-bin.tar.gz
(5)重命名為”zookeeper”
mv apache-zookeeper-3.7.1-bin zookeeper
(6)創(chuàng)建ZooKeeper數(shù)據(jù)目錄
mkdir /usr/local/zookeeper/data
mkdir /usr/local/zookeeper/logs
(7)創(chuàng)建ZooKeeper配置文件:
ZooKeeper的滴答時(shí)間(以毫秒為單位)、ZooKeeper存儲(chǔ)數(shù)據(jù)的數(shù)據(jù)目錄、ZooKeeper監(jiān)聽的客戶端端口
vim /usr/local/zookeeper/conf/zoo.cfg
tickTime=2000
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/logs
clientPort=2181
(8)啟動(dòng)ZooKeeper
權(quán)限不足解決方案:su root、chmod a+xwr zkServer.sh
/usr/local/zookeeper/bin/zkServer.sh start
(9)使用如下命令檢查ZooKeeper是否正在運(yùn)行:
/usr/local/zookeeper/bin/zkServer.sh status
(2)Kafka安裝
https://kafka.apache.org/
權(quán)限不足解決方案:
chmod a+xwr kafka-topics.sh
chmod a+xwr kafka-console-producer.sh
chmod a+xwr kafka-console-consumer.sh
chmod a+xwr kafka-consumer-groups.sh
(1)安裝Kafka
cd /usr/local/
tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz
mv kafka_2.11-2.4.0 kafka
(2)配置Kafka
vim /usr/local/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs
zookeeper.connect=192.168.19.131:2181
(3)啟動(dòng)Kafka
cd /usr/local/kafka/bin
./kafka-server-start.sh -daemon ../config/server.properties
ps -aux | grep server.properties
(4)使用Kafka
創(chuàng)建主題
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test
查看主題
./kafka-topics.sh --list --zookeeper 192.168.19.131:2181
發(fā)送消息
./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test
接收消息方式一:從最后一條消息的偏移量+1開始消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test
接收消息方式二:從頭開始消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test
(5)單播消息:一個(gè)消費(fèi)組只有一個(gè)消費(fèi)者能消費(fèi)
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
(6)多播消息:不同的消費(fèi)者處于不同的消費(fèi)組
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test
(7)查看消費(fèi)組
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1
./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2
GROUP? ? ? ? ? TOPIC? ? ? ? ? PARTITION? CURRENT-OFFSET? LOG-END-OFFSET? LAG? ? ? ? ? ? CONSUMER-ID? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? HOST? ? ? ? ? ? CLIENT-ID
group1? ? ? ? ? test? ? ? ? ? ? 0? ? ? ? ? 22? ? ? ? ? ? ? 22? ? ? ? ? ? ? 0? ? ? ? ? ? ? consumer-group1
Current-offset:當(dāng)前消費(fèi)組已經(jīng)消費(fèi)的偏移量
Log-end-offset:主題對(duì)應(yīng)分區(qū)消息的結(jié)束偏移量(HW)
Lag:當(dāng)前消費(fèi)組未消費(fèi)的消息數(shù)
(8)主題分區(qū)
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1
cd /usr/local/kafka/data/kafka-logs/topic1-0
cd /usr/local/kafka/data/kafka-logs/topic1-1
說明:定期將自己消費(fèi)分區(qū)的offset提交給kafka內(nèi)部topic、key是consumerGroupId+topic+分區(qū)、value是當(dāng)前offset值
說明:kafka會(huì)定期清理topic里的消息、默認(rèn)保存7天、7天后消息會(huì)被刪除
說明:通過此公式可以選出consumer消費(fèi)的offset要提交到哪個(gè)分區(qū):hash(consumerGroupId)%__consumer_offsets主題分區(qū)數(shù)
__consumer_offsets-0
__consumer_offsets-49
(3)Kafka集群
(1)Kafka集群、3個(gè)broker
3個(gè)server.properties
vim server0.properties
broker.id=0
listeners=PLAINTEXT://192.168.19.131:9092
log.dirs=/usr/local/kafka/data/kafka-logs-0
vim server1.properties
broker.id=1
listeners=PLAINTEXT://192.168.19.131:9093
log.dirs=/usr/local/kafka/data/kafka-logs-1
vim server2.properties
broker.id=2
listeners=PLAINTEXT://192.168.19.131:9094
log.dirs=/usr/local/kafka/data/kafka-logs-2
./kafka-server-start.sh -daemon ../config/server0.properties
./kafka-server-start.sh -daemon ../config/server1.properties
./kafka-server-start.sh -daemon ../config/server2.properties
(2)副本:1個(gè)主題、2個(gè)分區(qū)、3個(gè)副本
./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2
./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2
Topic: topic2 PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: topic2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: topic2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Leader:
寫和讀的操作都在Leader上、Leader負(fù)責(zé)把數(shù)據(jù)同步到follower、當(dāng)leader掛了、經(jīng)過主從選舉、從多個(gè)follower中選舉產(chǎn)生一個(gè)新Leader
Follower:
接收leader的同步的數(shù)據(jù)
Isr:
可以同步的broker節(jié)點(diǎn)和已同步的broker節(jié)點(diǎn)、存放在isr集合中、如果isr節(jié)點(diǎn)中的性能較差、會(huì)被踢出isr集合
總結(jié):broker、主題、分區(qū)、副本
[root@web-server data]# ls
kafka-logs? kafka-logs-0? kafka-logs-1? kafka-logs-2
[root@web-server kafka-logs-1]# ls
cleaner-offset-checkpoint? log-start-offset-checkpoint? meta.properties? recovery-point-offset-checkpoint? replication-offset-checkpoint? topic2-0? topic2-1
[root@web-server kafka-logs-2]# ls
cleaner-offset-checkpoint? log-start-offset-checkpoint? meta.properties? recovery-point-offset-checkpoint? replication-offset-checkpoint? topic2-0? topic2-1
(3)Kafka集群消息的發(fā)送
./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2
(4)Kafka集群消息的發(fā)送
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2
./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2
難點(diǎn):一個(gè)Partition只能被一個(gè)組中的一個(gè)Consumer消費(fèi)、一個(gè)Consumer可以消費(fèi)多個(gè)Partition。
注意:Kafka只在Partition分區(qū)的范圍內(nèi)保證消息消費(fèi)的局部順序性、不能在同一個(gè)topic主題中的多個(gè)Partition中保證總的消費(fèi)順序性。
(4)Kafka-eagle監(jiān)控
(1)安裝JDK
yum install java-1.8.0-openjdk-devel
/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
java -version
(2)解壓
tar -zxvf kafka-eagle-bin-3.0.1.tar.gz
tar -zxvf efak-web-3.0.1-bin.tar.gz
mv efak-web-3.0.1 efak-web
mv efak-web ../
cd /usr/local/efak-web
(3)配置環(huán)境變量
vim /etc/profile
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64
export KE_HOME=/usr/local/efak-web
export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin
source /etc/profile
(4)kafka-eagle內(nèi)部配置問
vim /usr/local/efak-web/conf/system-config.properties
efak.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.19.131:2181
efak.driver=com.mysql.cj.jdbc.Driver
efak.url=jdbc:mysql://192.168.3.53:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=root
(5)啟動(dòng)
./ke.sh start
http://192.168.19.131:8048
admin/123456
windows版本啟動(dòng)問題解決方案:
例如:如果原來希望輸入的命令為:
C:\Program Files\Java\jdk-11.0.12\bin\java.exe -jar xxx.jar
1
現(xiàn)在應(yīng)改為:
"C:\Program Files\Java\jdk-11.0.12\bin\java.exe" -jar xxx.jar
(5)案例:zookeeper+kafka
一、安裝docker
1、Docker 要求 CentOS 系統(tǒng)的內(nèi)核版本高于 3.10 ,查看本頁面的前提條件來驗(yàn)證你的CentOS 版本是否支持 Docker 。
通過 uname -r 命令查看你當(dāng)前的內(nèi)核版本
$ uname -r
2、使用 root 權(quán)限登錄 Centos。確保 yum 包更新到最新。
$ sudo yum update
3、卸載舊版本(如果安裝過舊版本的話)
$ sudo yum remove docker? docker-common docker-selinux docker-engine
4、安裝需要的軟件包, yum-util 提供yum-config-manager功能,另外兩個(gè)是devicemapper驅(qū)動(dòng)依賴的
$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
5、設(shè)置yum源
$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
6、可以查看所有倉庫中所有docker版本,并選擇特定版本安裝
$ yum list docker-ce --showduplicates | sort -r
7、安裝docker
#$ sudo yum install docker-ce? #沒有版本默認(rèn)安裝最新版本、由于repo中默認(rèn)只開啟stable倉庫,故這里安裝的是最新穩(wěn)定版17.12.0
$ sudo yum install <FQPN>? # 例如:sudo yum install docker-ce-17.12.0.ce
8、啟動(dòng)并加入開機(jī)啟動(dòng)
$ sudo systemctl start docker
$ sudo systemctl enable docker
9、驗(yàn)證安裝是否成功(有client和service兩部分表示docker安裝啟動(dòng)都成功了)
$ docker version
##############################################################################################
拉取zookeeker
docker pull wurstmeister/zookeeper
拉取kafka版本為2.12-2.2.0,不填寫版本好則安裝最新,但是個(gè)別系統(tǒng)會(huì)報(bào)錯(cuò)
docker pull wurstmeister/kafka:2.12-2.2.0
啟動(dòng)zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
啟動(dòng)kafka
docker run --name kafka01 \
-p 9092:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
-d? wurstmeister/kafka
進(jìn)入Kafka容器類kafka01是容器名稱,也可以填寫成容器ID
docker exec -it kafka01 /bin/bash
創(chuàng)建my_log topic
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log
查詢創(chuàng)建的主題
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181