kafka安裝使用

使用kafka的前提是安裝好了 jdkzookeeper

下載解壓kafka:

[root@localhost ~]# wget http://mirror.bit.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
[root@localhost ~]# tar -xzf kafka_2.12-2.3.0.tgz
[root@localhost ~]# mv kafka_2.12-2.3.0 kafka
[root@localhost ~]# cd kafka

啟動(dòng)kafka之前,需要先啟動(dòng)zookeeper。

  • 單機(jī)部署模式:
    單機(jī)模式下可以不動(dòng)配置,直接啟動(dòng)kafka服務(wù)
[root@localhost kafka]# nohup bin/kafka-server-start.sh config/server.properties &
[1] 36113
[root@localhost kafka]# nohup: 忽略輸入并把輸出追加到"nohup.out"

測(cè)試:

===========================創(chuàng)建topic=========================
[root@localhost kafka]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic HmcfTest
Created topic HmcfTest.

--topic 定義 topic 名
--replication-factor  定義副本數(shù)
--partitions  定義分區(qū)數(shù)


===========================刪除topic=========================
bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic topicName 
需要 server.properties 中設(shè)置 delete.topic.enable=true 否則只是標(biāo)記刪除或者直接重啟(最好不要設(shè)置)


=========================查看topic列表========================
[root@localhost kafka]# bin/kafka-topics.sh --list --zookeeper localhost:2181
HmcfTest
[root@localhost kafka]#


=========================向topic發(fā)送消息========================
[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic HmcfTest
>hello kafka
>there is a pythoner or goer


========================查看topic對(duì)象信息========================
[root@localhost kafka]# bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic HmcfTest
Topic:HmcfTest  PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: HmcfTest Partition: 0    Leader: 0       Replicas: 0     Isr: 0


=============================消費(fèi)消息============================
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic HmcfTest --from-beginning
hello kafka
there is a pythoner or goer
souga

===========================停止kafka服務(wù)=========================
[root@localhost kafka]# bin/kafka-server-stop.sh



  • 集群部署模式
    對(duì)于Kafka來說,一個(gè)單獨(dú)的broker就是一個(gè)大小為1的集群,所以集群模式無非多啟動(dòng)幾個(gè)broker實(shí)例。

在多機(jī)器下需要修改/etc/hosts文件,將用于kafka集群的機(jī)器配置上去。
如:

[root@localhost kafka]# cat /etc/hosts
127.0.0.1   localhost localhost.localdomain localhost4 localhost4.localdomain4
::1         localhost localhost.localdomain localhost6 localhost6.localdomain6

192.168.64.11 hmcf-01
192.168.64.12 hmcf-02
192.168.64.13 hmcf-03

然后修改對(duì)應(yīng)的server.properties配置文件
其中一些配置參數(shù)可參考網(wǎng)絡(luò)說明。
注意踩坑
配置文件中的advertised.listeners參數(shù),需要寫成IP,寫主機(jī)名的話會(huì)導(dǎo)致外網(wǎng)無法連接,如python腳本無法連接使用。

# server-0
broker.id=0
listeners=PLAINTEXT://192.168.64.11:9092
advertised.listeners=PLAINTEXT://192.168.64.11:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181
# server-1
broker.id=1
listeners=PLAINTEXT://192.168.64.12:9092
advertised.listeners=PLAINTEXT://192.168.64.12:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181
# server-2
broker.id=2
listeners=PLAINTEXT://192.168.64.13:9092
advertised.listeners=PLAINTEXT://192.168.64.13:9092
log.dirs=/tmp/kafka-logs
num.partitions=5
zookeeper.connect=localhost:2181,192.168.64.12:2182,192.168.64.13:2181

最后啟動(dòng)三個(gè)broker

[root@localhost kafka]# nohup bin/kafka-server-start.sh config/server.properties &

使用jps命令可以查看到kafka進(jìn)程

[root@localhost kafka]# jps
1428 QuorumPeerMain
5333 Jps
4959 Kafka





====================雞兒分割==============================

創(chuàng)建集群topic

 bin/kafka-topics.sh --create --zookeeper hmcf-01:2181, hmcf-02:2181, hmcf-03:2181 --replication-factor 3 --partitions 3 --topic hmcf_test

查看集群topic對(duì)象信息

[root@localhost kafka]# bin/kafka-topics.sh --describe --zookeeper hmcf-01:2181, hmcf-02:2181, hmcf-03:2181 --topic hmcf_test
Topic:hmcf_test PartitionCount:3        ReplicationFactor:3     Configs:
        Topic: hmcf_test        Partition: 0    Leader: 2       Replicas: 2,1,0 Isr: 2,1,0
        Topic: hmcf_test        Partition: 1    Leader: 0       Replicas: 0,2,1 Isr: 0,2,1
        Topic: hmcf_test        Partition: 2    Leader: 1       Replicas: 1,0,2 Isr: 1,0,2

生產(chǎn)與消費(fèi)

[root@localhost kafka]# bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hmcf_test 
>new girl
>boduoyejiyi
>over

[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server hmcf-01:9092, hmcf-02:9092, hmcf-03:9092 --topic hmcf_test --from-beginning
new girl
boduoyejiyi
over
^CProcessed a total of 3 messages

指定partition 和 指針位置
[root@localhost kafka]# bin/kafka-console-consumer.sh --bootstrap-server hmcf-01:9092, hmcf-02:9092, hmcf-03:9092 --topic hmcf_test --partition 0  --offset 2
over




python kafka測(cè)試

生產(chǎn)者

import json
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

msg_dict = {
    "sleep_time": 10,
    "db_config": {
        "database": "test_1",
        "host": "xxxx",
        "user": "root",
        "password": "root"
    },
    "table": "msg",
    "msg": "Hello World"
}
msg = json.dumps(msg_dict)
producer.send('HmcfTest', msg, partition=0)
producer.close()

==============================================================
消費(fèi)者

from kafka import KafkaConsumer

consumer = KafkaConsumer('HmcfTest', bootstrap_servers=['192.168.64.11:9092'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" %
           (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print(recv)
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容