kafka-python文檔
一、consumer
1. 常用api
#建立連接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
api_version=(0,10),group_id='my_group')
# topic所有的partition
consumer.partitions_for_topic(topic)
# 構(gòu)造topicPartition對象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]
# 為consumer分配分區(qū)
consumer.assign(tps)
# kafka每個分區(qū)的最新offset
consumer.end_offsets(tps)
# 當前groupid 每個分區(qū)消費到的位置
for i in range(len(tps)):
consumer.position(tps[i])
# 消費數(shù)據(jù)
for message in consumer:
partition = message.partition
offset = message.offset
value = message.value
# 重置offset
for i in range(len(tps)):
consumer.seek(tps[i], partition_offset[i]) #partition_offset保存每
# partition_offset保存每個分區(qū)的起始消費位置
# 形如{0:123, 1:345 },表示0分區(qū)從123開始再次消費
二、producer
三、其他
3.1 json處理
額外的包:
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此時得到的value是dict類型
最后編輯于 :
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。