[TOC]
1、概述
2、部署安裝
參考這里通過docker方式安裝
現(xiàn)在 /Users/software/rocketmq/conf 目錄下新建一個文件 broker.conf ,在里面加入內(nèi)容,這些內(nèi)容也是RocketMq官方的默認(rèn)配置。
注意brokerIP1 和 namesrvAddr需要修改為,如果在本機(jī)時使用127.0.0.1實(shí)測在管理后臺方面會有一些功能無法正常使用
brokerClusterName = DefaultCluster
brokerName = broker-a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
brokerRole = ASYNC_MASTER
flushDiskType = ASYNC_FLUSH
brokerIP1=10.120.16.17
namesrvAddr=10.120.16.17:9876
docker compass如下,通過指令 docker-compose up -d 啟動一個rockermq的docker容器
version: '2'
services:
namesrv:
image: rocketmqinc/rocketmq
container_name: rmqnamesrv
ports:
- 9876:9876
command: sh mqnamesrv
broker:
image: rocketmqinc/rocketmq
container_name: rmqbroker
ports:
- 10909:10909
- 10911:10911
- 10912:10912
volumes:
- /Users/software/rocketmq/conf/broker.conf:/opt/rocketmq-4.4.0/conf/broker.conf
#command: sh mqbroker -n namesrv:9876
command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf
depends_on:
- namesrv
environment:
- JAVA_HOME=/usr/lib/jvm/jre
console:
image: styletang/rocketmq-console-ng
container_name: rocketmq-console-ng
ports:
- 8087:8080
depends_on:
- namesrv
environment:
- JAVA_OPTS= -Dlogging.level.root=info -Drocketmq.namesrv.addr=rmqnamesrv:9876
- Dcom.rocketmq.sendMessageWithVIPChannel=false
之后就可以使用端口9876訪問RocketMq了,訪問管理后臺可以使用鏈接 http://127.0.0.1:8087/#/
3、Spring boot代碼對接
疑難雜癥
Docker環(huán)境下廣播消息無法接收到消息
這個問題如果確認(rèn)topic等參數(shù)沒錯,就要注意容器是否能正常建立 {用戶目錄}/logs/rocketmqlogs/rocketmq_client.log 這個目錄以及文件,如果不是root用戶需要在Dockerfile里提前建立這個目錄。
另外Docker環(huán)境下運(yùn)行RocketMq程序比較特殊,因為其無狀態(tài)無法記錄容器重啟前的消費(fèi)者ID,也就無法記錄消費(fèi)到的消息序號。所以可能要根據(jù)具體業(yè)務(wù)情況做一些靈活調(diào)整。如果是廣播類型的消息,消息消費(fèi)起點(diǎn)應(yīng)該用CONSUME_FROM_LAST_OFFSET,否則會導(dǎo)致比較嚴(yán)重的重復(fù)消費(fèi)。如果是集群類型的消息,就應(yīng)該選擇CONSUME_FROM_FIRST_OFFSET務(wù)必確保消息消費(fèi)徹底。
各種消費(fèi)起點(diǎn)的含義如下
/**
* 一個新的訂閱組第一次啟動從隊列的最后位置開始消費(fèi),后續(xù)再啟動接著上次消費(fèi)的進(jìn)度開始消費(fèi)
*/
CONSUME_FROM_LAST_OFFSET,
/**
* 一個新的訂閱組第一次啟動從隊列的最前位置開始消費(fèi),后續(xù)再啟動接著上次消費(fèi)的進(jìn)度開始消費(fèi)
*/
CONSUME_FROM_FIRST_OFFSET,
/**
* 一個新的訂閱組第一次啟動從指定時間點(diǎn)開始消費(fèi),后續(xù)再啟動接著上次消費(fèi)的進(jìn)度開始消費(fèi),時間點(diǎn)設(shè)置參見DefaultMQPushConsumer.consumeTimestamp參數(shù)
*/
CONSUME_FROM_TIMESTAMP,