Kafka部署 與 kafka-python 的使用


Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺,由ScalaJava編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁瀏覽,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理,也是為了通過集群來提供實(shí)時(shí)的消息。


在Python環(huán)境中使用Kafka主要分為幾個(gè)步驟

  • 安裝JDK (使用JDK 8)
  • 安裝Kafka (編譯版)
  • 啟動(dòng) Kafka自帶的zookeeper服務(wù)
  • 啟動(dòng) Kafka 服務(wù)并建立Topic
  • 安裝并使用 kafka-python 調(diào)度Kafka

producer.py 負(fù)責(zé)生成消息并將其傳遞進(jìn)對應(yīng)的Topic

from kafka impor KafkaProducer

producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])

msg = "Message".encode('utf-8')
producer.send('KafkaTest', msg)

consumer.py則負(fù)責(zé)接收消息

from kafka impor KafkaConsumer

consumer = KafkaConsumer('KafkaTest', bootstrap_servers=['127.0.0.1:9092'])

for msg in consumer:
    recv = f"Topic: {msg.topic}, Partition: {msg.partition}, Key: {msg.key}, Value: {msg.value}"
    print(recv)

啟動(dòng)consumer.py后,其中的consumer就會(huì)進(jìn)入等待消息的狀態(tài),只要producer.py中發(fā)送了消息,consumer就會(huì)將其打印出來

Topic: KafkaTest, Partition: 0, Key: None, Value: b'Message'
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容