Kafka、Zookeeper基于docker安裝,kafka-python簡單使用
kafka-python文檔:KafkaConsumer - kafka-python 2.0.2-dev documentation
一、基本概念
Topic:一組消息數(shù)據(jù)的標記符;
Producer:生產(chǎn)者,用于生產(chǎn)數(shù)據(jù),可將生產(chǎn)后的消息送入指定的Topic;
Consumer:消費者,獲取數(shù)據(jù),可消費指定的Topic;
Group:消費者組,同一個group可以有多個消費者,一條消息在一個group中,只會被一個消費者獲?。?/p>
Partition:分區(qū),為了保證kafka的吞吐量,一個Topic可以設置多個分區(qū)。同一分區(qū)只能被一個消費者訂閱。
二、本地安裝與啟動(基于Docker)
docker安裝教程參考菜鳥教程:https://www.runoob.com/docker/centos-docker-install.html
- 下載zookeeper鏡像與kafka鏡像:
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
- 本地啟動zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
- 本地啟動kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper \
--env KAFKA_ZOOKEEPER_CONNECT=外網(wǎng)IP:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://外網(wǎng)IP:9092 \ //
--env KAFKA_LISTENERS=PLAINTEXT://:9092 \
--env KAFKA_JVM_PERFORMANCE_OPTS=" -Xmx256m -Xms256m" \
wurstmeister/kafka:latest
注意:
- 上述代碼,將kafka啟動在9092端口
- 如果內(nèi)容不足可能會報一個內(nèi)存溢出的錯誤(
OpenJDK 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 1073741824, 0) failed; error='Out of memory' (errno=12)),此時只需要加上--env KAFKA_JVM_PERFORMANCE_OPTS=" -Xmx256m -Xms256m"指定jvm的內(nèi)存大小即可
–link 用于容器直接的互通。
-e KAFKA_BROKER_ID=0 一個 kafka節(jié)點 就是一個 broker。一個集群由多個 broker 組成。一個 broker可以容納多個 topic
-e KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 配置zookeeper管理kafka的路徑
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://139.226.179.239:9092 把kafka的地址端口注冊給zookeeper,若遠程訪問要改成外網(wǎng)IP,千萬注意是外網(wǎng)IP,很多文章只說是宿主機IP, 演示例子上寫的是內(nèi)網(wǎng)IP,很容易被誤導
-e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 配置kafka的監(jiān)聽端口
-v /etc/localtime:/etc/localtime 容器時間同步虛擬機的時間
- 進入kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin
- 創(chuàng)建Topic,分區(qū)為2,Topic name為'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 \
--replication-factor 1 --partitions 2 --topic kafka_demo
- 查看當前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
- 刪除Topic
./kafka-topics.sh --zookeeper localhost:2181 --delete --topic myDeleteTopic
- 安裝kafka-python
pip install kafka-python
三、生產(chǎn)者(Producer)與消費者(Consumer)
生產(chǎn)者和消費者的簡易Demo,這里一起演示:
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import kafka_errors
import traceback
import json
def producer_demo():
# 假設生產(chǎn)的消息為鍵值對(不是一定要鍵值對),且序列化方式為json
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
key_serializer=lambda k: json.dumps(k).encode(),
value_serializer=lambda v: json.dumps(v).encode())
# 發(fā)送三條消息
for i in range(0, 3):
future = producer.send(
'kafka_demo',
key='count_num', # 同一個key值,會被送至同一個分區(qū)
value=str(i),
partition=1) # 向分區(qū)1發(fā)送消息
print("send {}".format(str(i)))
try:
future.get(timeout=10) # 監(jiān)控是否發(fā)送成功
except kafka_errors: # 發(fā)送失敗拋出kafka_errors
traceback.format_exc()
def consumer_demo():
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers=':9092',
group_id='test'
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
這里建議起兩個terminal,或者兩個jupyter notebook頁面來驗證。
先執(zhí)行消費者:
consumer_demo()
再執(zhí)行生產(chǎn)者:
producer_demo()
會看到如下輸出:
>>> producer_demo()
send 0
send 1
send 2
>>> consumer_demo()
receive, key: count_num, value: 0
receive, key: count_num, value: 1
receive, key: count_num, value: 2
四、消費者進階操作
(1)初始化參數(shù):
列舉一些KafkaConsumer初始化時的重要參數(shù):
- group_id
高并發(fā)量,則需要有多個消費者協(xié)作,消費進度,則由group_id統(tǒng)一。例如消費者A與消費者B,在初始化時使用同一個group_id。在進行消費時,一條消息被消費者A消費后,在kafka中會被標記,這條消息不會再被B消費(前提是A消費后正確commit)。
- key_deserializer, value_deserializer
與生產(chǎn)者中的參數(shù)一致,自動解析。
- auto_offset_reset
消費者啟動的時刻,消息隊列中或許已經(jīng)有堆積的未消費消息,有時候需求是從上一次未消費的位置開始讀(則該參數(shù)設置為earliest),有時候的需求為從當前時刻開始讀之后產(chǎn)生的,之前產(chǎn)生的數(shù)據(jù)不再消費(則該參數(shù)設置為latest)。
- enable_auto_commit, auto_commit_interval_ms
是否自動commit,當前消費者消費完該數(shù)據(jù)后,需要commit,才可以將消費完的信息傳回消息隊列的控制中心。enable_auto_commit設置為True后,消費者將自動commit,并且兩次commit的時間間隔為auto_commit_interval_ms。
(2)手動commit
def consumer_demo():
consumer = KafkaConsumer(
'kafka_demo',
bootstrap_servers=':9092',
group_id='test',
enable_auto_commit=False
)
for message in consumer:
print("receive, key: {}, value: {}".format(
json.loads(message.key.decode()),
json.loads(message.value.decode())
)
)
consumer.commit()
(3)查看kafka堆積剩余量
在線環(huán)境中,需要保證消費者的消費速度大于生產(chǎn)者的生產(chǎn)速度,所以需要檢測kafka中的剩余堆積量是在增加還是減小。可以用如下代碼,觀測隊列消息剩余量:
consumer = KafkaConsumer(topic, **kwargs)
partitions = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
print("start to cal offset:")
# total
toff = consumer.end_offsets(partitions)
toff = [(key.partition, toff[key]) for key in toff.keys()]
toff.sort()
print("total offset: {}".format(str(toff)))
# current
coff = [(x.partition, consumer.committed(x)) for x in partitions]
coff.sort()
print("current offset: {}".format(str(coff)))
# cal sum and left
toff_sum = sum([x[1] for x in toff])
cur_sum = sum([x[1] for x in coff if x[1] is not None])
left_sum = toff_sum - cur_sum
print("kafka left: {}".format(left_sum))