第一次了解kafka不是很深入,后期會繼續(xù)補(bǔ)更,完善~
kafaka工作原理:生產(chǎn)者生產(chǎn)消息-->消費(fèi)者接收消息后消費(fèi)
1. 消費(fèi)者工作原理
連接kafka
先設(shè)置消費(fèi)組id,并制定消費(fèi)哪個(gè)topic:
# trans
? ? trans_cons = Consumer(
? ? ? ? {**sys_conf.get_kafka_config(consumer_id=f'{func_mark}trans')}
? ? )
? ? trans_cons.subscribe(trans_topics)
消費(fèi)對應(yīng)topic的消息:
msgs = cons.consume(sys_conf.CONSUMER_BZ, sys_conf.CONSUMER_TIMEOUT)
因?yàn)閙sgs里面有很多個(gè)消息,需要將消息一個(gè)個(gè)解讀出來然后計(jì)算處理,最后輸入到數(shù)據(jù)庫或者將結(jié)果塞進(jìn)生產(chǎn)者中推到kafka等對方進(jìn)行消費(fèi)。
將msg解析:
for msg in msgs:
? ? print(msg.value())
如需轉(zhuǎn)發(fā)請加載連接http://www.itdecent.cn/p/6e00abd7780a