生產(chǎn)側(cè)
from pykafka import KafkaClient
import sys
import json
client = KafkaClient(hosts='192.168.1.1')
topics = client.topics
topic = topics['test_topic']
print(topic)
written_msgs = 0
with topic.get_producer() as producer:
for line in sys.stdin:
line = line.strip('\n')
items = line.split('\t')
user_id = items[0]
device_id = items[1]
producer.produce(json.dumps({'user_id': user_id,
'device_id':device_id
}))
written_msgs += 1
if written_msgs % 1000 == 0:
print("written_msgs: %d" % written_msgs)
print("written_msgs: %d" % written_msgs)
消費(fèi)側(cè)
#coding:utf-8
from pykafka import KafkaClient
import time
from pykafka.common import OffsetType
#使用和生產(chǎn)者一樣的host以及topic
client = KafkaClient(hosts='192.168.1.1')
topics = client.topics
topic = topics['test_topic']
print(topic)
consumer = topic.get_simple_consumer(
consumer_group="mygroup",
auto_commit_enable=True #加這一行是為了避免重復(fù)消費(fèi)
)
## 獲取被消費(fèi)數(shù)據(jù)的偏移量和消費(fèi)內(nèi)容
for message in consumer:
if message is not None:
#打印消費(fèi)信息
print(message.offset, message.value)