下載解壓kafka:
[root@localhost ~]# wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
[root@localhost ~]# tar -xzf kafka_2.12-2.3.0.tgz
[root@localhost ~]# mv kafka_2.12-2.3.0 kafka
[root@localhost ~]# cd kafka
啟動(dòng)kafka之前,需要先啟動(dòng)zookeeper。
- 單機(jī)部署模式:
單機(jī)模式下可以不動(dòng)配置,直接啟動(dòng)kafka服務(wù)
[root@localhost kafka]# nohup bin/kafka-server-start.sh config/server.properties &
[1] 36113
[root@localhost kafka]# nohup: 忽略輸入并把輸出追加到"nohup.out"
測(cè)試:
===========================創(chuàng)建topic=========================
[root@localhost kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HmcfTest
Created topic HmcfTest.
--topic 定義 topic 名
--replication-factor 定義副本數(shù)
--partitions 定義分區(qū)數(shù)
===========================刪除topic=========================
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicName
需要 server.properties 中設(shè)置 delete.topic.enable=true 否則只是標(biāo)記刪除或者直接重啟(最好不要設(shè)置)
=========================查看topic列表========================
[root@localhost kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
HmcfTest
[root@localhost kafka]#
=========================向topic發(fā)送消息========================
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HmcfTest
>hello kafka
>there is a pythoner or goer
========================查看topic對(duì)象信息========================
[root@localhost kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic HmcfTest
Topic:HmcfTest PartitionCount:1 ReplicationFactor:1 Configs:
Topic: HmcfTest Partition: 0 Leader: 0 Replicas: 0 Isr: 0
=============================消費(fèi)消息============================
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HmcfTest --from-beginning
hello kafka
there is a pythoner or goer
souga
===========================停止kafka服務(wù)=========================
[root@localhost kafka]# bin/kafka-server-stop.sh
- 集群部署模式
對(duì)于Kafka來說,一個(gè)單獨(dú)的broker就是一個(gè)大小為1的集群,所以集群模式無非多啟動(dòng)幾個(gè)broker實(shí)例。
在多機(jī)器下需要修改/etc/hosts文件,將用于kafka集群的機(jī)器配置上去。
如:
[root@localhost kafka]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.64.11 hmcf-01
192.168.64.12 hmcf-02
192.168.64.13 hmcf-03
然后修改對(duì)應(yīng)的server.properties配置文件
其中一些配置參數(shù)可參考網(wǎng)絡(luò)說明。
注意踩坑
配置文件中的advertised.listeners參數(shù),需要寫成IP,寫主機(jī)名的話會(huì)導(dǎo)致外網(wǎng)無法連接,如python腳本無法連接使用。
# server-0
broker.id=0
listeners=PLAINTEXT://192.168.64.11:9092
advertised.listeners=PLAINTEXT://192.168.64.11:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181
# server-1
broker.id=1
listeners=PLAINTEXT://192.168.64.12:9092
advertised.listeners=PLAINTEXT://192.168.64.12:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181
# server-2
broker.id=2
listeners=PLAINTEXT://192.168.64.13:9092
advertised.listeners=PLAINTEXT://192.168.64.13:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181
最后啟動(dòng)三個(gè)broker
[root@localhost kafka]# nohup bin/kafka-server-start.sh config/server.properties &
使用jps命令可以查看到kafka進(jìn)程
[root@localhost kafka]# jps
1428 QuorumPeerMain
5333 Jps
4959 Kafka
====================雞兒分割==============================
創(chuàng)建集群topic
bin/kafka-topics.sh --create --zookeeper hmcf-01:2181, hmcf-02:2181, hmcf-03:2181 --replication-factor 3 --partitions 3 --topic hmcf_test
查看集群topic對(duì)象信息
[root@localhost kafka]# bin/kafka-topics.sh --describe --zookeeper hmcf-01:2181, hmcf-02:2181, hmcf-03:2181 --topic hmcf_test
Topic:hmcf_test PartitionCount:3 ReplicationFactor:3 Configs:
Topic: hmcf_test Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: hmcf_test Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: hmcf_test Partition: 2 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
生產(chǎn)與消費(fèi)
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hmcf_test
>new girl
>boduoyejiyi
>over
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server hmcf-01:9092, hmcf-02:9092, hmcf-03:9092 --topic hmcf_test --from-beginning
new girl
boduoyejiyi
over
^CProcessed a total of 3 messages
指定partition 和 指針位置
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server hmcf-01:9092, hmcf-02:9092, hmcf-03:9092 --topic hmcf_test --partition 0 --offset 2
over
python kafka測(cè)試
生產(chǎn)者
import json
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='localhost:9092')
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('HmcfTest', msg, partition=0)
producer.close()
==============================================================
消費(fèi)者
from kafka import KafkaConsumer
consumer = KafkaConsumer('HmcfTest', bootstrap_servers=['192.168.64.11:9092'])
for msg in consumer:
recv = "%s:%d:%d: key=%s value=%s" %
(msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print(recv)