
? 我司免費(fèi)開(kāi)源項(xiàng)目,用于消息隊(duì)列處理,中等承載容量.? 適合中等并發(fā)場(chǎng)合,廣大開(kāi)發(fā)者可以前去使用哦.
dophon框架中的消息隊(duì)列模塊,包括本地消息中心,遠(yuǎn)程消息中心,以及生產(chǎn)消費(fèi)相關(guān)裝飾器以及內(nèi)部操作模塊等 dophon-mq 項(xiàng)目介紹 dophon框架中的消息隊(duì)列模塊,包括本地消息中心,遠(yuǎn)程消息中心,以及生產(chǎn)消費(fèi)相關(guān)裝飾器以及內(nèi)部操作模塊等
有問(wèn)題可發(fā)送郵件聯(lián)系作者:athspiring_admin@athspiring.com
一個(gè)輕量級(jí)消息隊(duì)列,承載能力中等,高頻巨量消息請(qǐng)選擇成熟的消息隊(duì)列(rocket-mq,kafuka等) 即使使用線程池處理消息,極為消耗cpu資源 該隊(duì)列基于io作為消息持久化(消息延遲主要為消息中心的讀寫(xiě)延遲) 可通過(guò)配置選擇本地消息中心以及遠(yuǎn)程消息中心 軟件架構(gòu) 軟件架構(gòu)說(shuō)明
安裝教程 <<<<<<< HEAD
pip install dophon_mq ======= pip install dophon-mq e822369e3d8eac4b4bdbcaa2f1613c902a8c4815
使用說(shuō)明 1 配置 自定義配置:
<application.py>
msg_queue_max_num = 30 # 消息隊(duì)列線程池承載話題上限
mq={ 'remote_center':True, # 使用遠(yuǎn)程消息中心 # 若為false或不配置,則字典內(nèi)下面的配置無(wú)效 'remote_address':'127.0.0.1', # 消息中心地址 'remote_port':58800 # 消息中心端口 } 2 生產(chǎn)者配置 推薦使用json格式傳遞數(shù)據(jù)(便于消費(fèi)者轉(zhuǎn)義數(shù)據(jù))
from dophon_mq import *
@producer( tag='DEMO_TAG', # 消息發(fā)送的話題 delay = 3 # 消息發(fā)送的延遲時(shí)間(秒) ) def producer(): return 'aaa' 3 消費(fèi)者配置 方式一:
from dophon_mq import *
@consumer( tag='DEMO_TAG', # 消息消費(fèi)的話題 delay = 1, # 消息消費(fèi)的延遲 arg_name = 'args' # 承載消息的參數(shù)名(默認(rèn)使用名為args的參數(shù)承載) ) def consumer(args): print(args) consumer() 4 統(tǒng)一管理消費(fèi)者 from dophon_mq import *
class TestConsumer(ConsumerCenter):
@consumer(tag='test_msg_tag|test_msg_tag2', delay=1, arg_name = 'msg')
def consume_msg(
? ? msg? # 統(tǒng)一配置的消費(fèi)入口方法以承載參數(shù)取代實(shí)例參數(shù)(self)
):
? ? print(msg)
? ? print(timestamp)
? ? print(tag)
實(shí)例化衍生類(lèi)啟動(dòng)消費(fèi)者
TestConsumer() 參與貢獻(xiàn) Fork 本項(xiàng)目 新建 Feat_xxx 分支 提交代碼 新建 Pull Request
項(xiàng)目獲取地址:https://github.com/AthspiringSpftware/dophonMq.git