消息中間件:Kafka

(1)Zookeeper安裝

https://downloads.apache.org/zookeeper/

(1)更新系統(tǒng)的包管理器

sudo yum update

(2)安裝JDK

sudo yum install java-1.8.0-openjdk-devel

(3)下載ZooKeeper

cd /usr/local/

wget https://downloads.apache.org/zookeeper/zookeeper-3.7.1/apache-zookeeper-3.7.1-bin.tar.gz

(4)解壓ZooKeeper

tar -xvf apache-zookeeper-3.7.1-bin.tar.gz

(5)重命名為”zookeeper”

mv apache-zookeeper-3.7.1-bin zookeeper

(6)創(chuàng)建ZooKeeper數(shù)據(jù)目錄

mkdir /usr/local/zookeeper/data

mkdir /usr/local/zookeeper/logs

(7)創(chuàng)建ZooKeeper配置文件:

ZooKeeper的滴答時(shí)間(以毫秒為單位)、ZooKeeper存儲(chǔ)數(shù)據(jù)的數(shù)據(jù)目錄、ZooKeeper監(jiān)聽的客戶端端口

vim /usr/local/zookeeper/conf/zoo.cfg

tickTime=2000

dataDir=/usr/local/zookeeper/data

dataLogDir=/usr/local/zookeeper/logs

clientPort=2181

(8)啟動(dòng)ZooKeeper

權(quán)限不足解決方案:su root、chmod a+xwr zkServer.sh

/usr/local/zookeeper/bin/zkServer.sh start

(9)使用如下命令檢查ZooKeeper是否正在運(yùn)行:

/usr/local/zookeeper/bin/zkServer.sh status

(2)Kafka安裝

https://kafka.apache.org/

權(quán)限不足解決方案:

chmod a+xwr kafka-topics.sh

chmod a+xwr kafka-console-producer.sh

chmod a+xwr kafka-console-consumer.sh

chmod a+xwr kafka-consumer-groups.sh

(1)安裝Kafka

cd /usr/local/

tar -zxvf /usr/local/kafka_2.11-2.4.0.tgz

mv kafka_2.11-2.4.0 kafka

(2)配置Kafka

vim /usr/local/kafka/config/server.properties

broker.id=0

listeners=PLAINTEXT://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs

zookeeper.connect=192.168.19.131:2181

(3)啟動(dòng)Kafka

cd /usr/local/kafka/bin

./kafka-server-start.sh -daemon ../config/server.properties

ps -aux | grep server.properties

(4)使用Kafka

創(chuàng)建主題

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 1 --topic test

查看主題

./kafka-topics.sh --list --zookeeper 192.168.19.131:2181

發(fā)送消息

./kafka-console-producer.sh --broker-list 192.168.19.131:9092 --topic test

接收消息方式一:從最后一條消息的偏移量+1開始消費(fèi)

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --topic test

接收消息方式二:從頭開始消費(fèi)

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --from-beginning --topic test

(5)單播消息:一個(gè)消費(fèi)組只有一個(gè)消費(fèi)者能消費(fèi)

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

(6)多播消息:不同的消費(fèi)者處于不同的消費(fèi)組

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group1 --topic test

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092 --consumer-property group.id=group2 --topic test

(7)查看消費(fèi)組

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --list

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group1

./kafka-consumer-groups.sh --bootstrap-server 192.168.19.131:9092 --describe --group group2

GROUP? ? ? ? ? TOPIC? ? ? ? ? PARTITION? CURRENT-OFFSET? LOG-END-OFFSET? LAG? ? ? ? ? ? CONSUMER-ID? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? HOST? ? ? ? ? ? CLIENT-ID

group1? ? ? ? ? test? ? ? ? ? ? 0? ? ? ? ? 22? ? ? ? ? ? ? 22? ? ? ? ? ? ? 0? ? ? ? ? ? ? consumer-group1

Current-offset:當(dāng)前消費(fèi)組已經(jīng)消費(fèi)的偏移量

Log-end-offset:主題對(duì)應(yīng)分區(qū)消息的結(jié)束偏移量(HW)

Lag:當(dāng)前消費(fèi)組未消費(fèi)的消息數(shù)

(8)主題分區(qū)

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 1 --partitions 2 --topic topic1

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic1

cd /usr/local/kafka/data/kafka-logs/topic1-0

cd /usr/local/kafka/data/kafka-logs/topic1-1

說明:定期將自己消費(fèi)分區(qū)的offset提交給kafka內(nèi)部topic、key是consumerGroupId+topic+分區(qū)、value是當(dāng)前offset值

說明:kafka會(huì)定期清理topic里的消息、默認(rèn)保存7天、7天后消息會(huì)被刪除

說明:通過此公式可以選出consumer消費(fèi)的offset要提交到哪個(gè)分區(qū):hash(consumerGroupId)%__consumer_offsets主題分區(qū)數(shù)

__consumer_offsets-0

__consumer_offsets-49

(3)Kafka集群

(1)Kafka集群、3個(gè)broker

3個(gè)server.properties

vim server0.properties

broker.id=0

listeners=PLAINTEXT://192.168.19.131:9092

log.dirs=/usr/local/kafka/data/kafka-logs-0

vim server1.properties

broker.id=1

listeners=PLAINTEXT://192.168.19.131:9093

log.dirs=/usr/local/kafka/data/kafka-logs-1

vim server2.properties

broker.id=2

listeners=PLAINTEXT://192.168.19.131:9094

log.dirs=/usr/local/kafka/data/kafka-logs-2

./kafka-server-start.sh -daemon ../config/server0.properties

./kafka-server-start.sh -daemon ../config/server1.properties

./kafka-server-start.sh -daemon ../config/server2.properties

(2)副本:1個(gè)主題、2個(gè)分區(qū)、3個(gè)副本

./kafka-topics.sh --create --zookeeper 192.168.19.131:2181 --replication-factor 3 --partitions 2 --topic topic2

./kafka-topics.sh --describe --zookeeper 192.168.19.131:2181 --topic topic2

Topic: topic2 PartitionCount: 2 ReplicationFactor: 3 Configs:

Topic: topic2 Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1

Topic: topic2 Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2

Leader:

寫和讀的操作都在Leader上、Leader負(fù)責(zé)把數(shù)據(jù)同步到follower、當(dāng)leader掛了、經(jīng)過主從選舉、從多個(gè)follower中選舉產(chǎn)生一個(gè)新Leader

Follower:

接收leader的同步的數(shù)據(jù)

Isr:

可以同步的broker節(jié)點(diǎn)和已同步的broker節(jié)點(diǎn)、存放在isr集合中、如果isr節(jié)點(diǎn)中的性能較差、會(huì)被踢出isr集合

總結(jié):broker、主題、分區(qū)、副本

[root@web-server data]# ls

kafka-logs? kafka-logs-0? kafka-logs-1? kafka-logs-2

[root@web-server kafka-logs-1]# ls

cleaner-offset-checkpoint? log-start-offset-checkpoint? meta.properties? recovery-point-offset-checkpoint? replication-offset-checkpoint? topic2-0? topic2-1

[root@web-server kafka-logs-2]# ls

cleaner-offset-checkpoint? log-start-offset-checkpoint? meta.properties? recovery-point-offset-checkpoint? replication-offset-checkpoint? topic2-0? topic2-1

(3)Kafka集群消息的發(fā)送

./kafka-console-producer.sh --broker-list 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --topic topic2

(4)Kafka集群消息的發(fā)送

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group1 --from-beginning --topic topic2

./kafka-console-consumer.sh --bootstrap-server 192.168.19.131:9092,192.168.19.131:9093,192.168.19.131:9094 --consumer-property group.id=group2 --from-beginning --topic topic2

難點(diǎn):一個(gè)Partition只能被一個(gè)組中的一個(gè)Consumer消費(fèi)、一個(gè)Consumer可以消費(fèi)多個(gè)Partition。

注意:Kafka只在Partition分區(qū)的范圍內(nèi)保證消息消費(fèi)的局部順序性、不能在同一個(gè)topic主題中的多個(gè)Partition中保證總的消費(fèi)順序性。

(4)Kafka-eagle監(jiān)控

(1)安裝JDK

yum install java-1.8.0-openjdk-devel

/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

java -version

(2)解壓

tar -zxvf kafka-eagle-bin-3.0.1.tar.gz

tar -zxvf efak-web-3.0.1-bin.tar.gz

mv efak-web-3.0.1 efak-web

mv efak-web ../

cd /usr/local/efak-web

(3)配置環(huán)境變量

vim /etc/profile

export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.372.b07-1.el7_9.x86_64

export KE_HOME=/usr/local/efak-web

export PATH=$PATH:$JAVA_HOME/bin:$KE_HOME/bin

source /etc/profile

(4)kafka-eagle內(nèi)部配置問

vim /usr/local/efak-web/conf/system-config.properties

efak.zk.cluster.alias=cluster1

cluster1.zk.list=192.168.19.131:2181

efak.driver=com.mysql.cj.jdbc.Driver

efak.url=jdbc:mysql://192.168.3.53:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull

efak.username=root

efak.password=root

(5)啟動(dòng)

./ke.sh start

http://192.168.19.131:8048

admin/123456

windows版本啟動(dòng)問題解決方案:

例如:如果原來希望輸入的命令為:

C:\Program Files\Java\jdk-11.0.12\bin\java.exe -jar xxx.jar

1

現(xiàn)在應(yīng)改為:

"C:\Program Files\Java\jdk-11.0.12\bin\java.exe" -jar xxx.jar

(5)案例:zookeeper+kafka

一、安裝docker

1、Docker 要求 CentOS 系統(tǒng)的內(nèi)核版本高于 3.10 ,查看本頁面的前提條件來驗(yàn)證你的CentOS 版本是否支持 Docker 。

通過 uname -r 命令查看你當(dāng)前的內(nèi)核版本

$ uname -r

2、使用 root 權(quán)限登錄 Centos。確保 yum 包更新到最新。

$ sudo yum update

3、卸載舊版本(如果安裝過舊版本的話)

$ sudo yum remove docker? docker-common docker-selinux docker-engine

4、安裝需要的軟件包, yum-util 提供yum-config-manager功能,另外兩個(gè)是devicemapper驅(qū)動(dòng)依賴的

$ sudo yum install -y yum-utils device-mapper-persistent-data lvm2

5、設(shè)置yum源

$ sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo

6、可以查看所有倉庫中所有docker版本,并選擇特定版本安裝

$ yum list docker-ce --showduplicates | sort -r

7、安裝docker

#$ sudo yum install docker-ce? #沒有版本默認(rèn)安裝最新版本、由于repo中默認(rèn)只開啟stable倉庫,故這里安裝的是最新穩(wěn)定版17.12.0

$ sudo yum install <FQPN>? # 例如:sudo yum install docker-ce-17.12.0.ce

8、啟動(dòng)并加入開機(jī)啟動(dòng)

$ sudo systemctl start docker

$ sudo systemctl enable docker

9、驗(yàn)證安裝是否成功(有client和service兩部分表示docker安裝啟動(dòng)都成功了)

$ docker version

##############################################################################################

拉取zookeeker

docker pull wurstmeister/zookeeper

拉取kafka版本為2.12-2.2.0,不填寫版本好則安裝最新,但是個(gè)別系統(tǒng)會(huì)報(bào)錯(cuò)

docker pull wurstmeister/kafka:2.12-2.2.0

啟動(dòng)zookeeper

docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper

啟動(dòng)kafka

docker run --name kafka01 \

-p 9092:9092 \

-e KAFKA_BROKER_ID=0 \

-e KAFKA_ZOOKEEPER_CONNECT=127.0.0.1:2181 \

-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 \

-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \

-d? wurstmeister/kafka

進(jìn)入Kafka容器類kafka01是容器名稱,也可以填寫成容器ID

docker exec -it kafka01 /bin/bash

創(chuàng)建my_log topic

/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.3.191:2181 --replication-factor 1 --partitions 1 --topic my_log

查詢創(chuàng)建的主題

/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.3.191:2181

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容