下載與安裝
從 http://www.apache.org/dist/kafka/ 下載最新版本的 kafka,這里使用的是 kafka_2.12-0.10.2.1.tgz
$ tar zxvf kafka_2.12-0.10.2.1.tgz
$ cd kafka_2.12-0.10.2.1
運行
啟動 zookeeper 服務
$ bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 kafka Broker 服務
$ bin/kafka-server-start.sh config/server.properties
測試
首先,創(chuàng)建一個單分區(qū)單副本的 topic: mytopic
$ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mytopic
Created topic "mytopic".
然后,可以通過運行 list 命令來查看已經(jīng)存在的 topic,比如:
$ bin/kafka-topics.sh --list --zookeeper localhost:2181
mytopic
也可以使用 describe 命令來查看。由于我們現(xiàn)在是單分區(qū)單副本的case,所以 Leader 和 Isr (復制節(jié)點集合)都只在Broker 0上。
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic mytopic
Topic:mytopic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
現(xiàn)在,我們通過 Kafka 自帶命令行客戶端向這個 topic 發(fā)送消息。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic mytopic
aaa
bbb
ccc
另開一個終端,然后使用 Kafka 自帶的命令行工具來消費這些消息。
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning
...
aaa
bbb
ccc
此時可以在 producer 命令行窗口繼續(xù)輸入消息,然后觀察 consumer 終端窗口,可以看到它們被消費打印出來。
使用 Kafka Connect 導入導出數(shù)據(jù)
下面使用 Kafka 的 Connect 演示從一個變化的文件中讀取增量數(shù)據(jù)然后輸出到另外一個文件中。
首先運行下面的腳本,此腳本會每隔一秒會向 test.txt 文件中追加一個數(shù)字。
$ for i in {1..300};do echo $i >> test.txt; sleep 1; done
然后運行下面的腳本,此時生產(chǎn)者從 test.txt 文件中讀取文件內容并作為消息發(fā)送到topic中,然后消費者從topic中消費消息并輸出到 test.sink.txt 文件中。命令行使用的配置文件中定義了輸入輸出的文件名和使用的topic名。
$ bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
運行后,另開一個終端來觀察 test.sink.txt 文件中的內容,可以看到文件中的內容會不停的增加。
刪除 Topic
要刪除 topic,可以使用下面的命令
$ bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic mytopic
但是運行命令后,topic并沒有被刪除,使用 “bin/kafka-topics.sh --list --zookeeper localhost:2181” 仍然可以查到。此時我們需要修改config/server.properties文件中的 “delete.topic.enable=true” 來打開這個功能。此時我們再執(zhí)行上面的 --delete 操作,即可刪除topic了。