【rocketmq-client-python】學習筆記

rocketmq-python 是一個基于 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。

rocketmq-client-python安裝

目前rocketmq庫只支持linux和mac。

rocketmq-client-python 的安裝:

pip install rocketmq

安裝太慢?國內源安裝:

pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple


示例代碼:

Producer

from rocketmq.client import Producer, Message

producer = Producer('PID-XXX')

producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq隊列接口地址(服務器ip:port)

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# producer.set_namesrv_addr('127.0.0.1:9887')

producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用

producer.start()

msg_body = {"id":"test_id","name":"test_name","message":"test_message"}

ss = json.dumps(msg_body).encode('utf-8')

msg = Message('YOUR-TOPIC') #topic名稱

msg.set_keys('XXX')#每個消息在業(yè)務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創(chuàng)建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。由于是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。

msg.set_tags('XXX')#一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。只有發(fā)送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。

msg.set_body(ss)

ret = producer.send_sync(msg)

print(ret.status, ret.msg_id, ret.offset)

producer.shutdown()


其中:

設置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')

當只有單一服務器時,格式是上面這個;

當有多個服務器地址(集群模式)時,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")

如果使用pandas數(shù)據,pandas數(shù)據可以直接轉換

df.to_json(orient='records').encode('utf-8'),然后放入body中發(fā)送。

不同應用的多個Topic使用同一個namesrv_addr時數(shù)據傳輸會發(fā)生沖突

解決方案:每一個Topic對應一個 “PID-XXX”


PushConsumer

import time

from rocketmq.client import PushConsumer

def callback(msg):

? ? print(msg.id, msg.body)

consumer = PushConsumer('CID_XXX')

consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# consumer.set_namesrv_addr('127.0.0.1:9887')

consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')

consumer.subscribe('YOUR-TOPIC', callback)

consumer.start()

while True:

? ? time.sleep(3600)

consumer.shutdown()


PullConsumer

from rocketmq.client import PullConsumer

consumer = PullConsumer('CID_XXX')

consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')

# For ip and port name server address, use `set_namesrv_addr` method, for example:

# consumer.set_namesrv_addr('127.0.0.1:9887')

consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')

consumer.start()

for msg in consumer.pull('YOUR-TOPIC'):

? ? print(msg.id, msg.body)

consumer.shutdown()


控制日志的輸出頻率

from rocketmq.client import dll

dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)


ffi.py

class _CLogLevel(CtypesEnum):

? ? FATAL = 1

? ? ERROR = 2

? ? WARN = 3

? ? INFO = 4

? ? DEBUG = 5

? ? TRACE = 6

? ? LEVEL_NUM = 7

log4j定義了8個級別的log(除去OFF和ALL,可以說分為6個級別),優(yōu)先級從高到低依次為:OFF、FATAL、ERROR、WARN、INFO、DEBUG、TRACE、 ALL。

ALL 最低等級的,用于打開所有日志記錄。

TRACE designates finer-grained informational events than the DEBUG.Since:1.2.12,很低的日志級別,一般不會使用。

DEBUG 指出細粒度信息事件對調試應用程序是非常有幫助的,主要用于開發(fā)過程中打印一些運行信息。

INFO 消息在粗粒度級別上突出強調應用程序的運行過程。打印一些你感興趣的或者重要的信息,這個可以用于生產環(huán)境中輸出程序運行的一些重要信息,但是不能濫用,避免打印過多的日志。

WARN 表明會出現(xiàn)潛在錯誤的情形,有些信息不是錯誤信息,但是也要給程序員的一些提示。

ERROR 指出雖然發(fā)生錯誤事件,但仍然不影響系統(tǒng)的繼續(xù)運行。打印錯誤和異常信息,如果不想輸出太多的日志,可以使用這個級別。

FATAL 指出每個嚴重的錯誤事件將會導致應用程序的退出。這個級別比較高了。重大錯誤,這種級別你可以直接停止程序了。

應用案例

PushConsumer

import json

from rocketmq.client import PushConsumer, dll

import traceback

import logging

class RocketMQ():

? ? def __init__(self):

? ? ? ? logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')

? ? ? ? self.logger = logging.getLogger(__name__)? ?

? ? ? ? self.consumer = PushConsumer("PID-XXX")

? ? ? ? self.consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")

? ? ? ? self.topic_name = "xxx"

? ? ? ? #減少日志輸出

? ? ? ? dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)

? ? def callback(self,msg):

? ? ? ? test_body = json.loads(msg.body)

? ? ? ? try:

? ? ? ? ? ? self.my_func(test_body)

? ? ? ? ? ? return PushConsumer

? ? ? ? except Exception as e:

? ? ? ? ? ? print('>>>>>>>>>>allback msg:\n{}'.format(es_body))

? ? ? ? ? ? print('>>>>>>>>>>callback error:\n{}'.format(e))

? ? ? ? ? ? return PushConsumer


? ? def onMessage(self):

? ? ? ? self.consumer.subscribe(self.topic_name, self.callback)

? ? ? ? self.consumer.start()

? ? ? ? while True:

? ? ? ? ? ? time.sleep(2)

? ? ? ? self.consumer.shutdown()

? ? def my_func(test_body):

? ? ? ? print(test_body)

if __name__ == '__main__':

? ? mq = RocketMQ()

? ? mq.onMessage()


Producer

from rocketmq.client import Producer, Message

import json

producer = Producer("PID-XXX")

producer.set_namesrv_addr('XX.XX.XX.XX:XXXX')

producer.start()

topic_name = "xxx"

key_name = "abc"

tags = "123"

msg_body = {

? ? "key_1":value_1,

? ? "key_2":value_2

}

ss = json.dumps(msg_body).encode('utf-8')

msg = Message(topic_name)

msg.set_keys(key_name)

msg.set_tags(tags)

msg.set_body(ss)

ret = producer.send_sync(msg)

print(ret.status, ret.msg_id, ret.offset)

producer.shutdown()


PullConsumer

from rocketmq.client import PullConsumer

consumer = PullConsumer("PID-XXX")

consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")

consumer.start()

while True:

? ? topic_name = "xxx"

? ? for msg in consumer.pull(topic_name):print(msg.id, msg.body)


Topic

Topic創(chuàng)建的核心步驟如下

1、mqadmin向broker發(fā)起創(chuàng)建Topic的命令。

2、broker生成Topic對應的topicConfig配置保存在broker的TopicConfigManager中。

3、broker向所有的namesrv上報topicConfig信息。

4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。

5、broker會通過定時任務定期向namesrv發(fā)送心跳信息更新topic配置。


usage: mqadmin updateTopic -b <arg> | -c <arg>? [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t

? ? ? <arg> [-u <arg>] [-w <arg>]

-b,--brokerAddr <arg>? ? ? create topic to which broker

-c,--clusterName <arg>? ? ? create topic to which cluster

-h,--help? ? ? ? ? ? ? ? ? Print help

-n,--namesrvAddr <arg>? ? ? Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876

-o,--order <arg>? ? ? ? ? ? set topic's order(true|false)

-p,--perm <arg>? ? ? ? ? ? set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]

-r,--readQueueNums <arg>? ? set read queue nums

-s,--hasUnitSub <arg>? ? ? has unit sub (true|false)

-t,--topic <arg>? ? ? ? ? ? topic name

-u,--unit <arg>? ? ? ? ? ? is unit topic (true|false)

-w,--writeQueueNums <arg>? set write queue nums


通過 --brokerAddr在指定的broker創(chuàng)建topic。

通過 --clusterName在整個集群創(chuàng)建topic。

通過 --namesrvAddr指定namesrv地址。

通過 --topic來指定topic名稱。

通過 --perm來指定Topic的權限管理。

在rocketmq中添加新的Topic

sh mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderTopic

創(chuàng)建Topic時報錯解決方案

Java HotSpot? 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0

Java HotSpot? 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0

org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181)

at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:135)

at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:86)

Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available

at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)

at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:73)

at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:68)

at org.apache.rocketmq.acl.common.AclUtils.calSignature(AclUtils.java:58)

at org.apache.rocketmq.acl.common.AclClientRPCHook.doBeforeRequest(AclClientRPCHook.java:44)

at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172)

at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:370)

at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)

at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)

at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)

at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154)

… 2 more

Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available

at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:63)

at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:79)

… 13 more

Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available

at javax.crypto.Mac.getInstance(Mac.java:181)

at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:57)

… 14 more


解決辦法是:

1.進入rocketmq的bin目錄下:/var/www/rocketmq/rocketmq-all-4.4.0-bin-release/bin,

/var/www/rocketmq是我自己的安裝路徑。

2.用vim tools.sh打開tools.sh.在JAVA_OPT配置中,在-Djava.ext.dirs這一行的后面添加ext的路徑,原配置如下

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext"

JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

添加ext文件的絕對路徑,添加后重新執(zhí)行命令即可

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"

JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"

JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"

刪除Topic

deleteTopic -n localhost:9876 -c DefaultCluster -t orderTopic

rocketmq查看命令

首先進入 RocketMQ 工程,進入/RocketMQ/bin? 在該目錄下有個 mqadmin 腳本 .

查看幫助:? 在 mqadmin 下可以查看有哪些命令?

a: 查看具體命令的使用 : sh mqadmin? ?

b: sh mqadmin help 命令名稱?

例如,查看 updateTopic 的使用

sh mqadmin help updateTopic

2. 關閉nameserver和所有的broker:

? 進入到bin下:

sh mqshutdown namesrv

sh mqshutdown broker

3. 查看所有消費組group:

sh mqadmin consumerProgress -n 192.168.1.23:9876

4. 查看指定消費組下的所有topic數(shù)據堆積情況:

sh mqadmin consumerProgress -n 192.168.1.23:9876 -g warning-group

5. 查看所有topic :

sh mqadmin topicList -n 192.168.1.23:9876

6. 查看topic信息列表詳情統(tǒng)計

sh mqadmin topicstatus -n 192.168.1.23:9876 -t topicWarning

7.? 新增topic

sh mqadmin updateTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

8. 刪除topic

? sh mqadmin deleteTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

9、查詢集群消息

sh mqadmin? clusterList -n 192.168.1.23:9876

Reference

https://github.com/apache/rocketmq-client-python

https://www.oschina.net/p/rocketmq-python

https://www.cnblogs.com/qi-yuan-008/p/14022378.html

https://blog.csdn.net/shiyong1949/article/details/52643711

http://www.itdecent.cn/p/b84190af20a8

https://www.cnblogs.com/gmq-sh/p/6232633.html

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

友情鏈接更多精彩內容