1、下載http://kafka.apache.org/downloads.html
2、解壓到/usr/local中
3、啟動(dòng)服務(wù)zookeeper和kafka
./zookeeper-server-start.sh ../config/zookeeper.properties 1>/dev/null 2>&1 &
./kafka-server-start.sh ../config/server.properties 1>/dev/null 2>&1 &
4、創(chuàng)建主題topic
./kafka-topics.sh --create --zookeeper localhost:2181 --config max.message.bytes=12800000 --config flush.messages=1 --replication-factor 1 --partitions 1 --topic test
查看創(chuàng)建好的主題
./kafka-topics.sh --list --zookeeper localhost:2181
刪除主題
默認(rèn)情況下Kafka的Topic是沒法直接刪除的,需要進(jìn)行相關(guān)參數(shù)配置
bin/kafka-topics.sh --delete --topic test0 --zookeeper 192.168.187.146:2181
方式一:通過delete命令刪除后,手動(dòng)將本地磁盤以及zk上的相關(guān)topic的信息刪除即可
方式二:配置server.properties文件,給定參數(shù)delete.topic.enable=true,重啟kafka服務(wù),此時(shí)執(zhí)行delete命令表示允許進(jìn)行Topic的刪除
5、收發(fā)消息
生產(chǎn)消息
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
消費(fèi)消息
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
6、設(shè)置多個(gè)broker
??????????? 首先為每個(gè)broker創(chuàng)建一個(gè)配置文件:
cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties
vim config/server.properties?
config/server1.properties:
broker.id=0
listeners=PLAINTEXT://192.168.10.130:9092
log.dirs=kafka-logs
zookeeper.connect=localhost:2181
config/server-1.properties:
? ? broker.id=1
listeners=PLAINTEXT://192.168.10.130:9093
log.dirs=kafka-logs-1
zookeeper.connect=localhost:2181
config/server-2.properties:
? ? broker.id=2
listeners=PLAINTEXT://192.168.10.130:9094
log.dirs=kafka-logs-2
zookeeper.connect=localhost:2181
??? 備注1:listeners一定要配置成為IP地址;如果配置為localhost或服務(wù)器的hostname,在使用java發(fā)送數(shù)據(jù)時(shí)就會(huì)拋出異 常:org.apache.kafka.common.errors.TimeoutException: Batch Expired 。因?yàn)樵跊]有配置advertised.host.name 的情況下,Kafka并沒有像官方文檔宣稱的那樣改為廣播我們配置的host.name,而是廣播了主機(jī)配置的hostname。遠(yuǎn)端的客戶端并沒有配置 hosts,所以自然是連接不上這個(gè)hostname的。
??? 備注2:當(dāng)使用java客戶端訪問遠(yuǎn)程的kafka時(shí),一定要把集群中所有的端口打開,否則會(huì)連接超時(shí)
??? broker.id是集群中每個(gè)節(jié)點(diǎn)的唯一且永久的名稱,我們修改端口和日志目錄是因?yàn)槲覀儸F(xiàn)在在同一臺(tái)機(jī)器上運(yùn)行,我們要防止broker在同一端口上注冊(cè)和覆蓋對(duì)方的數(shù)據(jù)。
??? 我們已經(jīng)運(yùn)行了zookeeper和剛才的一個(gè)kafka節(jié)點(diǎn),所有我們只需要在啟動(dòng)2個(gè)新的kafka節(jié)點(diǎn)。
./kafka-server-start.sh ../config/server-1.properties 1>/dev/null 2>&1 &
./kafka-server-start.sh ../config/server-2.properties 1>/dev/null 2>&1 &
?現(xiàn)在,我們創(chuàng)建一個(gè)新topic,把備份設(shè)置為:3
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
?好了,現(xiàn)在我們已經(jīng)有了一個(gè)集群了,我們?cè)趺粗烂總€(gè)集群在做什么呢?運(yùn)行命令“describe topics”
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分區(qū)的摘要
Topic:my-replicated-topic? ? PartitionCount:1? ? ReplicationFactor:3? ? Configs:
//提供一個(gè)分區(qū)信息,因?yàn)槲覀冎挥幸粋€(gè)分區(qū),所以只有一行。
Topic: my-replicated-topic? ? Partition: 0? ? Leader: 1? ? Replicas: 1,2,0? ? Isr: 1,2,0
“l(fā)eader”:該節(jié)點(diǎn)負(fù)責(zé)該分區(qū)的所有的讀和寫,每個(gè)節(jié)點(diǎn)的leader都是隨機(jī)選擇的。
“replicas”:備份的節(jié)點(diǎn)列表,無論該節(jié)點(diǎn)是否是leader或者目前是否還活著,只是顯示。
“isr”:“同步備份”的節(jié)點(diǎn)列表,也就是活著的節(jié)點(diǎn)并且正在同步leader
其中Replicas和Isr中的1,2,0就對(duì)應(yīng)著3個(gè)broker他們的broker.id屬性!
我們運(yùn)行這個(gè)命令,看看一開始我們創(chuàng)建的那個(gè)節(jié)點(diǎn):
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test? ? PartitionCount:1? ? ReplicationFactor:1? ? Configs:
Topic: test? ? Partition: 0? ? Leader: 0? ? Replicas: 0? ? Isr: 0
這并不奇怪,剛才創(chuàng)建的主題沒有Replicas,并且在服務(wù)器“0”上,我們創(chuàng)建它的時(shí)候,集群中只有一個(gè)服務(wù)器,所以是“0”。
7、測(cè)試集群的容錯(cuò)能力
7.1發(fā)布消息到集群
[root@administrator bin]# ./kafka-console-producer.sh --broker-list 192.168.10.130:9092 --topic my-replicated-topic
>cluster message 1
>cluster message 2
//Ctrl+C終止產(chǎn)生消息
7.2消費(fèi)消息
[root@administrator bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.10.130:9093 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2
//Ctrl+C終止消費(fèi)消息
7.3干掉leader,測(cè)試集群容錯(cuò)
首先查詢誰是leader
> ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
//所有分區(qū)的摘要
Topic:my-replicated-topic? ? PartitionCount:1? ? ReplicationFactor:3? ? Configs:
//提供一個(gè)分區(qū)信息,因?yàn)槲覀冎挥幸粋€(gè)分區(qū),所以只有一行。
Topic: my-replicated-topic? ? Partition: 0? ? Leader: 1? ? Replicas: 1,2,0? ? Isr: 1,2,0
可以看到Leader的broker.id為1,找到對(duì)應(yīng)的Broker
[root@administrator bin]# jps -m
5130 Kafka ../config/server.properties
4861 QuorumPeerMain ../config/zookeeper.properties
1231 Bootstrap start start
7420 Kafka ../config/server-2.properties
7111 Kafka ../config/server-1.properties
9139 Jps -m
通過以上查詢到Leader的PID(Kafka ../config/server-1.properties)為7111,殺掉該進(jìn)程
//殺掉該進(jìn)程
kill -9 7111
//再查詢一下,確認(rèn)新的Leader已經(jīng)產(chǎn)生,新的Leader為broker.id=0
[root@administrator bin]# ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic? ? ? PartitionCount:1? ? ? ? ReplicationFactor:3? ? Configs:
//備份節(jié)點(diǎn)之一成為新的leader,而broker1已經(jīng)不在同步備份集合里了
Topic: my-replicated-topic? ? ? Partition: 0? ? Leader: 0? ? ? Replicas: 1,0,2 Isr: 0,2
7.4再次消費(fèi)消息,確認(rèn)消息沒有丟失
[root@administrator bin]# ./kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
cluster message 1
cluster message 2