Kafka是由Apache軟件基金會(huì)開發(fā)的一個(gè)開源流處理平臺,由Scala和Java編寫。Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動(dòng)作流數(shù)據(jù)。 這種動(dòng)作(網(wǎng)頁瀏覽,搜索和其他用戶的行動(dòng))是在現(xiàn)代網(wǎng)絡(luò)上的許多社會(huì)功能的一個(gè)關(guān)鍵因素。 這些數(shù)據(jù)通常是由于吞吐量的要求而通過處理日志和日志聚合來解決。 對于像Hadoop一樣的日志數(shù)據(jù)和離線分析系統(tǒng),但又要求實(shí)時(shí)處理的限制,這是一個(gè)可行的解決方案。Kafka的目的是通過Hadoop的并行加載機(jī)制來統(tǒng)一線上和離線的消息處理,也是為了通過集群來提供實(shí)時(shí)的消息。
在Python環(huán)境中使用Kafka主要分為幾個(gè)步驟
- 安裝
JDK(使用JDK 8) - 安裝
Kafka(編譯版) - 啟動(dòng)
Kafka自帶的zookeeper服務(wù) - 啟動(dòng)
Kafka服務(wù)并建立Topic - 安裝并使用
kafka-python調(diào)度Kafka
producer.py 負(fù)責(zé)生成消息并將其傳遞進(jìn)對應(yīng)的Topic中
from kafka impor KafkaProducer
producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])
msg = "Message".encode('utf-8')
producer.send('KafkaTest', msg)
consumer.py則負(fù)責(zé)接收消息
from kafka impor KafkaConsumer
consumer = KafkaConsumer('KafkaTest', bootstrap_servers=['127.0.0.1:9092'])
for msg in consumer:
recv = f"Topic: {msg.topic}, Partition: {msg.partition}, Key: {msg.key}, Value: {msg.value}"
print(recv)
啟動(dòng)consumer.py后,其中的consumer就會(huì)進(jìn)入等待消息的狀態(tài),只要producer.py中發(fā)送了消息,consumer就會(huì)將其打印出來
Topic: KafkaTest, Partition: 0, Key: None, Value: b'Message'