2022-03-21kafka安裝部署

1、下載http://kafka.apache.org/downloads.html

2、解壓到/usr/local中

3、啟動(dòng)服務(wù)zookeeper和kafka

./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &

./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &

4、創(chuàng)建主題topic

./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test

查看創(chuàng)建好的主題

./kafka-topics.sh --list --zookeeper localhost:2181

刪除主題

默認(rèn)情況下Kafka的Topic是沒法直接刪除的,需要進(jìn)行相關(guān)參數(shù)配置

bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181

方式一:通過delete命令刪除后,手動(dòng)將本地磁盤以及zk上的相關(guān)topic的信息刪除即可

方式二:配置server.properties文件,給定參數(shù)delete.topic.enable=true,重啟kafka服務(wù),此時(shí)執(zhí)行delete命令表示允許進(jìn)行Topic的刪除

5、收發(fā)消息

生產(chǎn)消息

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

消費(fèi)消息

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

6、設(shè)置多個(gè)broker

??????????? 首先為每個(gè)broker創(chuàng)建一個(gè)配置文件:

cp config/server.properties config/server-1.properties

cp config/server.properties config/server-2.properties

vim config/server.properties?

config/server1.properties:

broker.id=0

listeners=PLAINTEXT://192.168.10.130:9092

log.dirs=kafka-logs

zookeeper.connect=localhost:2181

config/server-1.properties:

? ? broker.id=1

listeners=PLAINTEXT://192.168.10.130:9093

log.dirs=kafka-logs-1

zookeeper.connect=localhost:2181

config/server-2.properties:

? ? broker.id=2

listeners=PLAINTEXT://192.168.10.130:9094

log.dirs=kafka-logs-2

zookeeper.connect=localhost:2181

??? 備注1:listeners一定要配置成為IP地址;如果配置為localhost或服務(wù)器的hostname,在使用java發(fā)送數(shù)據(jù)時(shí)就會(huì)拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因?yàn)樵跊]有配置advertised.host.name 的情況下,Kafka并沒有像官方文檔宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機(jī)配置的hostname。遠(yuǎn)端的客戶端并沒有配置 hosts,所以自然是連接不上這個(gè)hostname的。

??? 備注2:當(dāng)使用java客戶端訪問遠(yuǎn)程的kafka時(shí),一定要把集群中所有的端口打開,否則會(huì)連接超時(shí)

??? broker.id是集群中每個(gè)節(jié)點(diǎn)的唯一且永久的名稱,我們修改端口和日志目錄是因?yàn)槲覀儸F(xiàn)在在同一臺(tái)機(jī)器上運(yùn)行,我們要防止broker在同一端口上注冊(cè)和覆蓋對(duì)方的數(shù)據(jù)。

??? 我們已經(jīng)運(yùn)行了zookeeper和剛才的一個(gè)kafka節(jié)點(diǎn),所有我們只需要在啟動(dòng)2個(gè)新的kafka節(jié)點(diǎn)。

./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &

./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &

?現(xiàn)在,我們創(chuàng)建一個(gè)新topic,把備份設(shè)置為:3

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

?好了,現(xiàn)在我們已經(jīng)有了一個(gè)集群了,我們?cè)趺粗烂總€(gè)集群在做什么呢?運(yùn)行命令“describe topics”

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

//所有分區(qū)的摘要

Topic:my-replicated-topic? ? PartitionCount:1? ? ReplicationFactor:3? ? Configs:

//提供一個(gè)分區(qū)信息,因?yàn)槲覀冎挥幸粋€(gè)分區(qū),所以只有一行。

Topic: my-replicated-topic? ? Partition: 0? ? Leader: 1? ? Replicas: 1,2,0? ? Isr: 1,2,0

“l(fā)eader”:該節(jié)點(diǎn)負(fù)責(zé)該分區(qū)的所有的讀和寫,每個(gè)節(jié)點(diǎn)的leader都是隨機(jī)選擇的。

“replicas”:備份的節(jié)點(diǎn)列表,無論該節(jié)點(diǎn)是否是leader或者目前是否還活著,只是顯示。

“isr”:“同步備份”的節(jié)點(diǎn)列表,也就是活著的節(jié)點(diǎn)并且正在同步leader

其中Replicas和Isr中的1,2,0就對(duì)應(yīng)著3個(gè)broker他們的broker.id屬性!

我們運(yùn)行這個(gè)命令,看看一開始我們創(chuàng)建的那個(gè)節(jié)點(diǎn):

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

Topic:test? ? PartitionCount:1? ? ReplicationFactor:1? ? Configs:

Topic: test? ? Partition: 0? ? Leader: 0? ? Replicas: 0? ? Isr: 0

這并不奇怪,剛才創(chuàng)建的主題沒有Replicas,并且在服務(wù)器“0”上,我們創(chuàng)建它的時(shí)候,集群中只有一個(gè)服務(wù)器,所以是“0”。

7、測(cè)試集群的容錯(cuò)能力

7.1發(fā)布消息到集群

[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic

>cluster message 1

>cluster message 2

//Ctrl+C終止產(chǎn)生消息

7.2消費(fèi)消息

[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic

cluster message 1

cluster message 2

//Ctrl+C終止消費(fèi)消息

7.3干掉leader,測(cè)試集群容錯(cuò)

首先查詢誰是leader

> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

//所有分區(qū)的摘要

Topic:my-replicated-topic? ? PartitionCount:1? ? ReplicationFactor:3? ? Configs:

//提供一個(gè)分區(qū)信息,因?yàn)槲覀冎挥幸粋€(gè)分區(qū),所以只有一行。

Topic: my-replicated-topic? ? Partition: 0? ? Leader: 1? ? Replicas: 1,2,0? ? Isr: 1,2,0

可以看到Leader的broker.id為1,找到對(duì)應(yīng)的Broker

[root@administrator bin]# jps -m

5130 Kafka ../config/server.properties

4861 QuorumPeerMain ../config/zookeeper.properties

1231 Bootstrap start start

7420 Kafka ../config/server-2.properties

7111 Kafka ../config/server-1.properties

9139 Jps -m

通過以上查詢到Leader的PID(Kafka ../config/server-1.properties)為7111,殺掉該進(jìn)程

//殺掉該進(jìn)程

kill -9 7111

//再查詢一下,確認(rèn)新的Leader已經(jīng)產(chǎn)生,新的Leader為broker.id=0

[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic? ? ? PartitionCount:1? ? ? ? ReplicationFactor:3? ? Configs:

//備份節(jié)點(diǎn)之一成為新的leader,而broker1已經(jīng)不在同步備份集合里了

Topic: my-replicated-topic? ? ? Partition: 0? ? Leader: 0? ? ? Replicas: 1,0,2 Isr: 0,2

7.4再次消費(fèi)消息,確認(rèn)消息沒有丟失

[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic

cluster message 1

cluster message 2

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容