kafka-python操作

kafka-python文檔

一、consumer

1. 常用api

#建立連接
consumer = KafkaConsumer(bootstrap_servers=['ip1:port','ip2:port'],
                         api_version=(0,10),group_id='my_group')

# topic所有的partition
consumer.partitions_for_topic(topic)

# 構(gòu)造topicPartition對象
tps = [TopicPartition(topic, p) for p in consumer.partitions_for_topic(topic)]

# 為consumer分配分區(qū)
consumer.assign(tps)

# kafka每個分區(qū)的最新offset
consumer.end_offsets(tps)

# 當前groupid 每個分區(qū)消費到的位置
for i in range(len(tps)):
    consumer.position(tps[i])

# 消費數(shù)據(jù)
for message in consumer:
    partition = message.partition
    offset = message.offset
    value = message.value

# 重置offset
for i in range(len(tps)):
    consumer.seek(tps[i], partition_offset[i])    #partition_offset保存每
# partition_offset保存每個分區(qū)的起始消費位置
# 形如{0:123, 1:345 },表示0分區(qū)從123開始再次消費

二、producer


三、其他

3.1 json處理

額外的包:  
pip install msgpack
import msgpack
producer:
producer = KafkaProducer(value_serializer=msgpack.dumps)
producer.send('msgpack-topic', {'key': 'value'})
consumer:
KafkaConsumer(value_deserializer=msgpack.unpackb)
此時得到的value是dict類型
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,899評論 13 425
  • 本文轉(zhuǎn)載自http://dataunion.org/?p=9307 背景介紹Kafka簡介Kafka是一種分布式的...
    Bottle丶Fish閱讀 5,586評論 0 34
  • 背景介紹 Kafka簡介 Kafka是一種分布式的,基于發(fā)布/訂閱的消息系統(tǒng)。主要設(shè)計目標如下: 以時間復(fù)雜度為O...
    高廣超閱讀 13,055評論 8 167
  • 七月過去,八月已開始一天,小記一下。 七月初簽訂的合約之二,(之一沒有簽訂)計劃學(xué)習(xí)規(guī)范,計劃整理臺帳,計劃重整檢...
    虎妞034閱讀 189評論 0 0
  • 原來社會就是這樣,不管哪個行業(yè),哪個崗位,或多或少都有些那么不盡人意。雖說每天9小時不算多,但是每周要上一天全天...
    陳同學(xué)ccc閱讀 243評論 0 0

友情鏈接更多精彩內(nèi)容