
rocketmq-python 是一個基于 rocketmq-client-cpp 封裝的 RocketMQ Python 客戶端。
rocketmq-client-python安裝
目前rocketmq庫只支持linux和mac。
rocketmq-client-python 的安裝:
pip install rocketmq
安裝太慢?國內源安裝:
pip install rocketmq -i https://pypi.tuna.tsinghua.edu.cn/simple
示例代碼:
Producer
from rocketmq.client import Producer, Message
producer = Producer('PID-XXX')
producer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')#rocketmq隊列接口地址(服務器ip:port)
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# producer.set_namesrv_addr('127.0.0.1:9887')
producer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')#可以不使用
producer.start()
msg_body = {"id":"test_id","name":"test_name","message":"test_message"}
ss = json.dumps(msg_body).encode('utf-8')
msg = Message('YOUR-TOPIC') #topic名稱
msg.set_keys('XXX')#每個消息在業(yè)務層面的唯一標識碼,要設置到keys字段,方便將來定位消息丟失問題。服務器會為每個消息創(chuàng)建索引(哈希索引),應用可以通過topic,key來查詢這條消息內容,以及消息被誰消費。由于是哈希索引,請務必保證key盡可能唯一,這樣可以避免潛在的哈希沖突。
msg.set_tags('XXX')#一個應用盡可能用一個Topic,消息子類型用tags來標識,tags可以由應用自由設置。只有發(fā)送消息設置了tags,消費方在訂閱消息時,才可以利用tags在broker做消息過濾。
msg.set_body(ss)
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
其中:
設置ip:port的位置:producer.set_namesrv_addr('xxx.xxx.xxx.xxx:xxxxx')
當只有單一服務器時,格式是上面這個;
當有多個服務器地址(集群模式)時,可以使用:producer.set_namesrv_addr("xxx.xxx.xxx.xxx:xxxxx,xxx.xxx.xxx.xxx:xxxxx")
如果使用pandas數(shù)據,pandas數(shù)據可以直接轉換
df.to_json(orient='records').encode('utf-8'),然后放入body中發(fā)送。
不同應用的多個Topic使用同一個namesrv_addr時數(shù)據傳輸會發(fā)生沖突
解決方案:每一個Topic對應一個 “PID-XXX”
PushConsumer
import time
from rocketmq.client import PushConsumer
def callback(msg):
? ? print(msg.id, msg.body)
consumer = PushConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')
consumer.subscribe('YOUR-TOPIC', callback)
consumer.start()
while True:
? ? time.sleep(3600)
consumer.shutdown()
PullConsumer
from rocketmq.client import PullConsumer
consumer = PullConsumer('CID_XXX')
consumer.set_namesrv_domain('http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet')
# For ip and port name server address, use `set_namesrv_addr` method, for example:
# consumer.set_namesrv_addr('127.0.0.1:9887')
consumer.set_session_credentials('XXX', 'XXXX', 'ALIYUN')
consumer.start()
for msg in consumer.pull('YOUR-TOPIC'):
? ? print(msg.id, msg.body)
consumer.shutdown()
控制日志的輸出頻率
from rocketmq.client import dll
dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)
ffi.py
class _CLogLevel(CtypesEnum):
? ? FATAL = 1
? ? ERROR = 2
? ? WARN = 3
? ? INFO = 4
? ? DEBUG = 5
? ? TRACE = 6
? ? LEVEL_NUM = 7
log4j定義了8個級別的log(除去OFF和ALL,可以說分為6個級別),優(yōu)先級從高到低依次為:OFF、FATAL、ERROR、WARN、INFO、DEBUG、TRACE、 ALL。
ALL 最低等級的,用于打開所有日志記錄。
TRACE designates finer-grained informational events than the DEBUG.Since:1.2.12,很低的日志級別,一般不會使用。
DEBUG 指出細粒度信息事件對調試應用程序是非常有幫助的,主要用于開發(fā)過程中打印一些運行信息。
INFO 消息在粗粒度級別上突出強調應用程序的運行過程。打印一些你感興趣的或者重要的信息,這個可以用于生產環(huán)境中輸出程序運行的一些重要信息,但是不能濫用,避免打印過多的日志。
WARN 表明會出現(xiàn)潛在錯誤的情形,有些信息不是錯誤信息,但是也要給程序員的一些提示。
ERROR 指出雖然發(fā)生錯誤事件,但仍然不影響系統(tǒng)的繼續(xù)運行。打印錯誤和異常信息,如果不想輸出太多的日志,可以使用這個級別。
FATAL 指出每個嚴重的錯誤事件將會導致應用程序的退出。這個級別比較高了。重大錯誤,這種級別你可以直接停止程序了。
應用案例
PushConsumer
import json
from rocketmq.client import PushConsumer, dll
import traceback
import logging
class RocketMQ():
? ? def __init__(self):
? ? ? ? logging.basicConfig(level=logging.CRITICAL, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
? ? ? ? self.logger = logging.getLogger(__name__)? ?
? ? ? ? self.consumer = PushConsumer("PID-XXX")
? ? ? ? self.consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")
? ? ? ? self.topic_name = "xxx"
? ? ? ? #減少日志輸出
? ? ? ? dll.SetPushConsumerLogLevel(namesrv_addr.encode('utf-8'), 1)
? ? def callback(self,msg):
? ? ? ? test_body = json.loads(msg.body)
? ? ? ? try:
? ? ? ? ? ? self.my_func(test_body)
? ? ? ? ? ? return PushConsumer
? ? ? ? except Exception as e:
? ? ? ? ? ? print('>>>>>>>>>>allback msg:\n{}'.format(es_body))
? ? ? ? ? ? print('>>>>>>>>>>callback error:\n{}'.format(e))
? ? ? ? ? ? return PushConsumer
? ? def onMessage(self):
? ? ? ? self.consumer.subscribe(self.topic_name, self.callback)
? ? ? ? self.consumer.start()
? ? ? ? while True:
? ? ? ? ? ? time.sleep(2)
? ? ? ? self.consumer.shutdown()
? ? def my_func(test_body):
? ? ? ? print(test_body)
if __name__ == '__main__':
? ? mq = RocketMQ()
? ? mq.onMessage()
Producer
from rocketmq.client import Producer, Message
import json
producer = Producer("PID-XXX")
producer.set_namesrv_addr('XX.XX.XX.XX:XXXX')
producer.start()
topic_name = "xxx"
key_name = "abc"
tags = "123"
msg_body = {
? ? "key_1":value_1,
? ? "key_2":value_2
}
ss = json.dumps(msg_body).encode('utf-8')
msg = Message(topic_name)
msg.set_keys(key_name)
msg.set_tags(tags)
msg.set_body(ss)
ret = producer.send_sync(msg)
print(ret.status, ret.msg_id, ret.offset)
producer.shutdown()
PullConsumer
from rocketmq.client import PullConsumer
consumer = PullConsumer("PID-XXX")
consumer.set_namesrv_addr("XX.XX.XX.XX:XXXX")
consumer.start()
while True:
? ? topic_name = "xxx"
? ? for msg in consumer.pull(topic_name):print(msg.id, msg.body)
Topic
Topic創(chuàng)建的核心步驟如下
1、mqadmin向broker發(fā)起創(chuàng)建Topic的命令。
2、broker生成Topic對應的topicConfig配置保存在broker的TopicConfigManager中。
3、broker向所有的namesrv上報topicConfig信息。
4、namesrv的RouteInfoManager的topicQueueTable保存topic的QueueData信息。
5、broker會通過定時任務定期向namesrv發(fā)送心跳信息更新topic配置。
usage: mqadmin updateTopic -b <arg> | -c <arg>? [-h] [-n <arg>] [-o <arg>] [-p <arg>] [-r <arg>] [-s <arg>] -t
? ? ? <arg> [-u <arg>] [-w <arg>]
-b,--brokerAddr <arg>? ? ? create topic to which broker
-c,--clusterName <arg>? ? ? create topic to which cluster
-h,--help? ? ? ? ? ? ? ? ? Print help
-n,--namesrvAddr <arg>? ? ? Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876
-o,--order <arg>? ? ? ? ? ? set topic's order(true|false)
-p,--perm <arg>? ? ? ? ? ? set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]
-r,--readQueueNums <arg>? ? set read queue nums
-s,--hasUnitSub <arg>? ? ? has unit sub (true|false)
-t,--topic <arg>? ? ? ? ? ? topic name
-u,--unit <arg>? ? ? ? ? ? is unit topic (true|false)
-w,--writeQueueNums <arg>? set write queue nums
通過 --brokerAddr在指定的broker創(chuàng)建topic。
通過 --clusterName在整個集群創(chuàng)建topic。
通過 --namesrvAddr指定namesrv地址。
通過 --topic來指定topic名稱。
通過 --perm來指定Topic的權限管理。
在rocketmq中添加新的Topic
sh mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t orderTopic
創(chuàng)建Topic時報錯解決方案
Java HotSpot? 64-Bit Server VM warning: ignoring option PermSize=128m; support was removed in 8.0
Java HotSpot? 64-Bit Server VM warning: ignoring option MaxPermSize=128m; support was removed in 8.0
org.apache.rocketmq.tools.command.SubCommandException: UpdateTopicSubCommand command failed
at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:181)
at org.apache.rocketmq.tools.command.MQAdminStartup.main0(MQAdminStartup.java:135)
at org.apache.rocketmq.tools.command.MQAdminStartup.main(MQAdminStartup.java:86)
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=[10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:84)
at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:73)
at org.apache.rocketmq.acl.common.AclSigner.calSignature(AclSigner.java:68)
at org.apache.rocketmq.acl.common.AclUtils.calSignature(AclUtils.java:58)
at org.apache.rocketmq.acl.common.AclClientRPCHook.doBeforeRequest(AclClientRPCHook.java:44)
at org.apache.rocketmq.remoting.netty.NettyRemotingAbstract.doBeforeRpcHooks(NettyRemotingAbstract.java:172)
at org.apache.rocketmq.remoting.netty.NettyRemotingClient.invokeSync(NettyRemotingClient.java:370)
at org.apache.rocketmq.client.impl.MQClientAPIImpl.getBrokerClusterInfo(MQClientAPIImpl.java:1180)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl.examineBrokerClusterInfo(DefaultMQAdminExtImpl.java:275)
at org.apache.rocketmq.tools.admin.DefaultMQAdminExt.examineBrokerClusterInfo(DefaultMQAdminExt.java:222)
at org.apache.rocketmq.tools.command.CommandUtil.fetchMasterAddrByClusterName(CommandUtil.java:83)
at org.apache.rocketmq.tools.command.topic.UpdateTopicSubCommand.execute(UpdateTopicSubCommand.java:154)
… 2 more
Caused by: org.apache.rocketmq.acl.common.AclException: [10015:signature-failed] unable to calculate a request signature. error=Algorithm HmacSHA1 not available
at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:63)
at org.apache.rocketmq.acl.common.AclSigner.signAndBase64Encode(AclSigner.java:79)
… 13 more
Caused by: java.security.NoSuchAlgorithmException: Algorithm HmacSHA1 not available
at javax.crypto.Mac.getInstance(Mac.java:181)
at org.apache.rocketmq.acl.common.AclSigner.sign(AclSigner.java:57)
… 14 more
解決辦法是:
1.進入rocketmq的bin目錄下:/var/www/rocketmq/rocketmq-all-4.4.0-bin-release/bin,
/var/www/rocketmq是我自己的安裝路徑。
2.用vim tools.sh打開tools.sh.在JAVA_OPT配置中,在-Djava.ext.dirs這一行的后面添加ext的路徑,原配置如下
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
添加ext文件的絕對路徑,添加后重新執(zhí)行命令即可
JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn256m -XX:PermSize=128m -XX:MaxPermSize=128m"
JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${BASE_DIR}/lib:${JAVA_HOME}/jre/lib/ext:/usr/java/jdk1.8.0_65/jre/lib/ext"
JAVA_OPT="${JAVA_OPT} -cp ${CLASSPATH}"
刪除Topic
deleteTopic -n localhost:9876 -c DefaultCluster -t orderTopic
rocketmq查看命令
首先進入 RocketMQ 工程,進入/RocketMQ/bin? 在該目錄下有個 mqadmin 腳本 .
查看幫助:? 在 mqadmin 下可以查看有哪些命令?
a: 查看具體命令的使用 : sh mqadmin? ?
b: sh mqadmin help 命令名稱?
例如,查看 updateTopic 的使用
sh mqadmin help updateTopic
2. 關閉nameserver和所有的broker:
? 進入到bin下:
sh mqshutdown namesrv
sh mqshutdown broker
3. 查看所有消費組group:
sh mqadmin consumerProgress -n 192.168.1.23:9876
4. 查看指定消費組下的所有topic數(shù)據堆積情況:
sh mqadmin consumerProgress -n 192.168.1.23:9876 -g warning-group
5. 查看所有topic :
sh mqadmin topicList -n 192.168.1.23:9876
6. 查看topic信息列表詳情統(tǒng)計
sh mqadmin topicstatus -n 192.168.1.23:9876 -t topicWarning
7.? 新增topic
sh mqadmin updateTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning
8. 刪除topic
? sh mqadmin deleteTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning
9、查詢集群消息
sh mqadmin? clusterList -n 192.168.1.23:9876
Reference
https://github.com/apache/rocketmq-client-python
https://www.oschina.net/p/rocketmq-python
https://www.cnblogs.com/qi-yuan-008/p/14022378.html
https://blog.csdn.net/shiyong1949/article/details/52643711
http://www.itdecent.cn/p/b84190af20a8
https://www.cnblogs.com/gmq-sh/p/6232633.html