基于RocketMQ源代碼版本:rocketmq-all-4.5.2-source-release
1、RocketMQ生產(chǎn)者-核心參數(shù)
| 參數(shù)名 | 默認(rèn)值 | 說明 |
|---|---|---|
| producerGroup | DEFAULT_PRODUCER | Producer組名,多個(gè)Producer如果屬于一個(gè)應(yīng)用,發(fā)送同樣的消息,則應(yīng)該將它們歸為同一組。 |
| createTopicKey | TBW102 | 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,需要指定key |
| defaultTopicQueueNums | 4 | 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù) |
| sendMsgTimeout | 10000 | 發(fā)送消息超時(shí)時(shí)間,單位毫秒 |
| compressMsgBodyOverHowmuch | 4096 | 消息Body超過多大開始?jí)嚎s(Consumer收到消息會(huì)自動(dòng)解壓縮),單位字節(jié) |
| retryAnotherBrokerWhenNotStoreOK | FALSE | 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送 |
| maxMessageSize | 131072 | 客戶端限制的消息大小,超過報(bào)錯(cuò),同時(shí)服務(wù)端也會(huì)限制(默認(rèn)128K) |
| transactionCheckListener | 事物消息回查監(jiān)聽器,如果發(fā)送事務(wù)消息,必須設(shè)置 | |
| checkThreadPoolMinSize | 1 | Broker回查Producer事務(wù)狀態(tài)時(shí),線程池大小 |
| checkThreadPoolMaxSize | 1 | Broker回查Producer事務(wù)狀態(tài)時(shí),線程池大小 |
| checkRequestHoldMax | 2000 | Broker回查Producer事務(wù)狀態(tài)時(shí),Producer本地緩沖請(qǐng)求隊(duì)列大小 |
2、RocketMQ主從同步機(jī)制解析
Master-Slave主從同步
同步信息:消息數(shù)據(jù)內(nèi)容(commitLog) + 元數(shù)據(jù)信息(topic配置信息、消費(fèi)者偏移量Offset、延遲偏移量Offset等配置信息)
元數(shù)據(jù)同步:Broker角色識(shí)別,為Slave則啟動(dòng)同步定時(shí)任務(wù),由Netty實(shí)現(xiàn)
- 在BrokerController的handleSlaveSynchronize()方法中
/**
* 該方法的主要作用是處理從節(jié)點(diǎn)的元數(shù)據(jù)同步,
* 即從節(jié)點(diǎn)向主節(jié)點(diǎn)主動(dòng)同步 topic 的路由信息、消費(fèi)進(jìn)度、延遲隊(duì)列處理隊(duì)列、消費(fèi)組訂閱配置等信息。
* @param role
*/
private void handleSlaveSynchronize(BrokerRole role) {
//如果角色為Slave
if (role == BrokerRole.SLAVE) {
if (null != slaveSyncFuture) {
//如果上次同步的 future 不為空,則首先先取消
slaveSyncFuture.cancel(false);
}
//然后設(shè)置 slaveSynchronize 的 master 地址為空
this.slaveSynchronize.setMasterAddr(null);
//在固定時(shí)間開啟只有一個(gè)線程的定時(shí)任務(wù),每 10s 從主節(jié)點(diǎn)同步一次配置數(shù)據(jù)
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
//啟動(dòng)定時(shí)任務(wù)進(jìn)行元數(shù)據(jù)信息的同步
BrokerController.this.slaveSynchronize.syncAll();
}
catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
} else {
//handle the slave synchronise
//如果當(dāng)前節(jié)點(diǎn)的角色為主節(jié)點(diǎn),則取消定時(shí)同步任務(wù)并設(shè)置 master 的地址為空
if (null != slaveSyncFuture) {
slaveSyncFuture.cancel(false);
}
this.slaveSynchronize.setMasterAddr(null);
}
}
public void changeToSlave(int brokerId) {
......
//handle the slave synchronise
//操作主從同步
handleSlaveSynchronize(BrokerRole.SLAVE);
......
}
- 元數(shù)據(jù)同步內(nèi)容:SlaveSynchronize
public void syncAll() {
this.syncTopicConfig(); //同步topic配置信息
this.syncConsumerOffset(); //同步消費(fèi)者偏移量Offset
this.syncDelayOffset(); //同步延遲偏移量Offset
this.syncSubscriptionGroupConfig(); //訂閱組配置信息
}
消息數(shù)據(jù)內(nèi)容(commitLog)同步:時(shí)實(shí)同步由java原生Socket實(shí)現(xiàn),HAService、HAconnection、WaitNotifyObject
HAService:主從同步核心實(shí)現(xiàn)類。
Master節(jié)點(diǎn):
- 1、AcceptSocketService acceptSocketService:服務(wù)端接收連接線程實(shí)現(xiàn)類,作為Master端監(jiān)聽Slave連接的實(shí)現(xiàn)類。
- 2、HAConnection:HA Master-Slave 網(wǎng)絡(luò)連接對(duì)象,對(duì)Master節(jié)點(diǎn)連接、讀寫數(shù)據(jù)。
- A、WriteSocketService writeSocketService:HAConnection網(wǎng)絡(luò)寫封裝,寫到Slave節(jié)點(diǎn)的數(shù)據(jù)。
- B、ReadSocketService readSocketService:HAConnection網(wǎng)絡(luò)讀封裝,讀取來自Slave節(jié)點(diǎn)的數(shù)據(jù)。
Slave節(jié)點(diǎn):
- HAClient haClient:HA客戶端實(shí)現(xiàn),Slave端網(wǎng)絡(luò)的實(shí)現(xiàn)類。
RocketMQ主從同步基本實(shí)現(xiàn)過程如下圖所示:

RocketMQ 的主從同步機(jī)制如下:
- 1、首先啟動(dòng)Master并在指定端口監(jiān)聽Slave的連接請(qǐng)求;
- 2、Slave啟動(dòng),主動(dòng)連接Master,建立TCP連接;
- 3、Slave以每隔5s的間隔時(shí)間向Master拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量Offset,以該偏移量Offset向Master拉取消息;
- 4、Master解析請(qǐng)求,并返回一批數(shù)據(jù)給Slave;
- 5、Slave收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報(bào)拉取進(jìn)度,并更新下一次待拉取偏移量Offset;
- 6、然后重復(fù)第3步;
通信協(xié)議:Master節(jié)點(diǎn)與Slave節(jié)點(diǎn)通信協(xié)議很簡單,如下:
- Slave ====> Master 上報(bào)CommitLog已經(jīng)同步到的物理位置
- Master ====> 傳輸新的CommitLog數(shù)據(jù)
源碼研究RocketMQ主從同步機(jī)制(HA)https://blog.csdn.net/prestigeding/article/details/79600792
RocketMQ主從同步一個(gè)重要的特征:
- RocketMQ4.5.0版本之前,主從同步不具備主從切換功能,即當(dāng)主節(jié)點(diǎn)宕機(jī)后,從不會(huì)接管消息發(fā)送,但可以提供消息讀取。
- RocketMQ4.5.0版本之后,rocketmq基于raft 協(xié)議支持主從切換,引入了多副本機(jī)制,即 DLedger,支持主從切換,即當(dāng)一個(gè)復(fù)制組內(nèi)的主節(jié)點(diǎn)宕機(jī)后,會(huì)在該復(fù)制組內(nèi)觸發(fā)重新選主,選主完成后即可繼續(xù)提供消息寫功能。rocketmq主從切換基于 raft 協(xié)議,而Zookeeper也是基于該協(xié)議。
關(guān)于Raft協(xié)議請(qǐng)點(diǎn)擊:一文搞懂Raft算法
主從切換的主要邏輯在BrokerController類中,具體如下:
Broker 角色變更為Slave
/**
* Broker 角色變更為Slave
* @param brokerId
*/
public void changeToSlave(int brokerId) {
log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
//change the role
//設(shè)置 brokerId,如果broker的id為0,則設(shè)置為1,這里在使用的時(shí)候,注意規(guī)劃好集群內(nèi)節(jié)點(diǎn)的 brokerId
brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
//設(shè)置 broker 的角色為 BrokerRole.SLAVE。
messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);
//handle the scheduled service
try {
//從節(jié)點(diǎn),則關(guān)閉定時(shí)調(diào)度線程(處理 RocketMQ 延遲隊(duì)列),如果是主節(jié)點(diǎn),則啟動(dòng)該線程。
this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
}
//handle the transactional service
try {
//關(guān)閉事務(wù)狀態(tài)回查處理器
this.shutdownProcessorByHa();
} catch (Throwable t) {
log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
}
//handle the slave synchronise
//從節(jié)點(diǎn)需要啟動(dòng)配置信息同步處理器,即啟動(dòng) SlaveSynchronize 定時(shí)從主服務(wù)器同步元數(shù)據(jù)等配置信息
handleSlaveSynchronize(BrokerRole.SLAVE);
try {
//立即向集群內(nèi)所有的 nameserver 告知 broker 信息狀態(tài)的變更
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}
/**
* 關(guān)閉事務(wù)狀態(tài)回查處理器,當(dāng)Master變更Slave后,該方法被調(diào)用。
*/
private void shutdownProcessorByHa() {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.shutdown(true);
}
}
Broker 角色從Slave變更為Master
/**
* Broker 角色從Slave變更為Master的處理邏輯
* @param role
*/
public void changeToMaster(BrokerRole role) {
if (role == BrokerRole.SLAVE) {
return;
}
log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());
//handle the slave synchronise
//Master節(jié)點(diǎn),會(huì)在該方法中設(shè)置slaveSyncFuture.cancel(false);
handleSlaveSynchronize(role);
//handle the scheduled service
//開啟定時(shí)任務(wù)處理線程。
try {
this.messageStore.handleScheduleMessageService(role);
} catch (Throwable t) {
log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
}
//handle the transactional service
//開啟事務(wù)狀態(tài)回查處理線程。
try {
this.startProcessorByHa(BrokerRole.SYNC_MASTER);
} catch (Throwable t) {
log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
}
//if the operations above are totally successful, we change to master
//設(shè)置 brokerId 為 0,配置文件中brokerId為0是Master節(jié)點(diǎn)
brokerConfig.setBrokerId(0); //TO DO check
messageStoreConfig.setBrokerRole(role);
try {
//向 nameserver 立即發(fā)送心跳包以便告知 broker 服務(wù)器當(dāng)前最新的狀態(tài)
this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
} catch (Throwable ignored) {
}
log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}
/**
* 該方法的作用是開啟事務(wù)狀態(tài)回查處理器,
* 即當(dāng)節(jié)點(diǎn)為Master時(shí),開啟對(duì)應(yīng)的事務(wù)狀態(tài)回查處理器,對(duì)PREPARE狀態(tài)的消息發(fā)起事務(wù)狀態(tài)回查請(qǐng)求
* @param role
*/
private void startProcessorByHa(BrokerRole role) {
if (BrokerRole.SLAVE != role) {
if (this.transactionalMessageCheckService != null) {
this.transactionalMessageCheckService.start();
}
}
}
RocketMQ 主從切換DLedger 是基于raft協(xié)議實(shí)現(xiàn)的,在該協(xié)議中就實(shí)現(xiàn)了主節(jié)點(diǎn)的選舉與主節(jié)點(diǎn)失效后集群會(huì)自動(dòng)進(jìn)行重新選舉,經(jīng)過協(xié)商投票產(chǎn)生新的主節(jié)點(diǎn),從而實(shí)現(xiàn)高可用。
BrokerController#initialize(),在 Broker 啟動(dòng)時(shí),如果開啟了多副本機(jī)制,即 enableDLedgerCommitLog 參數(shù)設(shè)置為 true,會(huì)為 集群節(jié)點(diǎn)選主器添加 roleChangeHandler 事件處理器,即節(jié)點(diǎn)發(fā)送變更后的事件處理器。
if (result) {
try {
this.messageStore =
new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
this.brokerConfig);
if (messageStoreConfig.isEnableDLegerCommitLog()) {
DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
}
this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
//load plugin
MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
this.messageStore = MessageStoreFactory.build(context, this.messageStore);
this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
} catch (IOException e) {
result = false;
log.error("Failed to initialize", e);
}
}
DLedgerRoleChangeHandler#handle(),主從狀態(tài)切換的邏輯
/**
* handle 主從狀態(tài)切換處理邏輯
* @param term
* @param role
*/
@Override
public void handle(long term, MemberState.Role role) {
Runnable runnable = new Runnable() {
@Override public void run() {
long start = System.currentTimeMillis();
try {
boolean succ = true;
log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
switch (role) {
case CANDIDATE:
//如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 CANDIDATE,表示正在發(fā)起 Leader 節(jié)點(diǎn),如果該服務(wù)器的角色不是 SLAVE 的話,需要將狀態(tài)切換為 SLAVE
if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
brokerController.changeToSlave(dLedgerCommitLog.getId());
}
break;
case FOLLOWER:
//如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 FOLLOWER,broker 節(jié)點(diǎn)將轉(zhuǎn)換為 從節(jié)點(diǎn)
brokerController.changeToSlave(dLedgerCommitLog.getId());
break;
case LEADER:
//如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 Leader,說明該節(jié)點(diǎn)被選舉為 Leader,
//在切換到 Master 節(jié)點(diǎn)之前,首先需要等待當(dāng)前節(jié)點(diǎn)追加的數(shù)據(jù)都已經(jīng)被提交后才可以將狀態(tài)變更為 Master
while (true) {
if (!dLegerServer.getMemberState().isLeader()) {
succ = false;
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
//如果 ledgerEndIndex 為 -1,表示當(dāng)前節(jié)點(diǎn)還沒有數(shù)據(jù)轉(zhuǎn)發(fā),直接跳出循環(huán),無需等待
break;
}
if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
&& messageStore.dispatchBehindBytes() == 0) {
//如果 ledgerEndIndex 不為 -1 ,則必須等待數(shù)據(jù)都已提交,即 ledgerEndIndex 與 committedIndex 相等
break;
}
Thread.sleep(100);
}
if (succ) {
//并且需要等待 commitlog 日志全部已轉(zhuǎn)發(fā)到 consumequeue中,
// 即 ReputMessageService 中的 reputFromOffset 與 commitlog 的 maxOffset 相等
messageStore.recoverTopicQueueTable();
//等待上述條件滿足后,即可以進(jìn)行狀態(tài)的變更,需要恢復(fù) ConsumeQueue,
// 維護(hù)每一個(gè) queue 對(duì)應(yīng)的 maxOffset,然后將 broker 角色轉(zhuǎn)變?yōu)?master
brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
}
break;
default:
break;
}
log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
} catch (Throwable t) {
log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
}
}
};
executorService.submit(runnable);
}
3、RocketMQ同步消息發(fā)送
消息的同步發(fā)送:producer.send(msg)
同步發(fā)送消息核心實(shí)現(xiàn):DefaultMQProducerImpl
4、RocketMQ異步消息發(fā)送
消息的異步發(fā)送:producer.send(Message msg, SendCallBack sendCallBack)
異步發(fā)送消息核心實(shí)現(xiàn):DefaultMQProducerImpl
5、Netty底層通信框架解析
rocketmq底層網(wǎng)絡(luò)使用的netty框架,類圖如下

Remoting模塊類結(jié)構(gòu)圖:

RecketMQ通信模塊的頂層結(jié)構(gòu)是RemotingServer和RemotingClient,分別對(duì)應(yīng)通信的服務(wù)端和客戶端
RemotingServer類中比較重要的是:localListenPort、registerProcessor和registerDefaultProcessor,registerDefaultProcesor用來設(shè)置接收到消息后的處理方法。
RemotingClient類和RemotingServer類相對(duì)應(yīng),比較重要的方法是updateNameServerAddressList、invokeSync和invokeOneway、updateNameServerAddresList用來獲取有效的NameServer地址,invokeSync與invokeOneway用來向Server端發(fā)送請(qǐng)求
NettyRemotingServer和NettyRemotingClient分別實(shí)現(xiàn)了RemotingServer和RemotingClient這兩個(gè)接口,但它們有很多共有的內(nèi)容,比如invokeSync、invokeOneway等,所以這些共有函數(shù)被提取到NettyRemotingAbstract共同繼承的父類中。
無論是服務(wù)端還是客戶端都需要處理接收到的請(qǐng)求,處理方法由processMessageReceived定義,注意這里接收到的消息已經(jīng)被轉(zhuǎn)換成RemotingCommand了,而不是原始的字節(jié)流。
-
RemotingCommand是RocketMQ自定義的協(xié)議,具體格式如下
image 這個(gè)協(xié)議只有四部分,但是覆蓋了RocketMQ各個(gè)角色間幾乎所有的通信過程,RemotingCommand有實(shí)際的數(shù)據(jù)類型和各部分對(duì)應(yīng),如下所示。
private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;
private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;
private transient byte[] body;
RocketMQ各個(gè)組件間的通信需要頻繁地在字節(jié)碼和RemotingCommand間相互轉(zhuǎn)換,也就是編碼、解碼過程,好在Netty提供了codec支持,這個(gè)頻繁地操作只需要一行設(shè)置即可:pipeline().addLoast(newNettyEncoder(), now NettyDecoder() )
RocketMQ對(duì)通信過程的另一個(gè)抽象是Processor和Executor,當(dāng)接收到一個(gè)消息后,直接根據(jù)消息的類型調(diào)用對(duì)應(yīng)的Processor和Executor,把通信過程和業(yè)務(wù)邏輯分離開來。
具體的rocketmq netty底層設(shè)計(jì)源碼:rocketmq netty底層設(shè)計(jì)
6、RocketMQ生產(chǎn)者-消息返回狀態(tài)詳解
public enum SendStatus {
SEND_OK,
FLUSH_DISK_TIMEOUT,
FLUSH_SLAVE_TIMEOUT,
SLAVE_NOT_AVAILABLE,
}
- 1、SEND_OK:消息發(fā)送成功
- 2、FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但服務(wù)在進(jìn)行刷盤的時(shí)候超時(shí)了
消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,刷盤超時(shí)會(huì)等待下一次的刷盤時(shí)機(jī)再次刷盤,如果此時(shí)服務(wù)器down機(jī)消息丟失,會(huì)返回此種狀態(tài),如果業(yè)務(wù)系統(tǒng)是可靠性消息投遞,那么需要重發(fā)消息。 - 3、FLUSH_SLAVE_TIMEOUT:在主從同步的時(shí)候,同步到Slave超時(shí)了
如果此時(shí)Master節(jié)點(diǎn)down機(jī),消息也會(huì)丟失 - 4、SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但Slave不可用,只有Master節(jié)點(diǎn)down機(jī),消息才會(huì)丟失
- 后三種狀態(tài),如果業(yè)務(wù)系統(tǒng)是可靠性消息投遞,那么需要考慮補(bǔ)償進(jìn)行可靠性的重試投遞
7、RocketMQ生產(chǎn)者-延遲消息
延遲消息:消息發(fā)到Broker后,要特定的時(shí)間才會(huì)被Consumer消費(fèi)
目前RocketMQ只支持固定精度的定時(shí)消息
具體實(shí)現(xiàn):
message.setDelayTimeLevel();
MessageStoreConfig配置類
//設(shè)置固定精度的消息延時(shí)投遞的時(shí)間
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
ScheduleMessageService任務(wù)類
public boolean parseDelayLevel() {
HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
timeUnitTable.put("s", 1000L);
timeUnitTable.put("m", 1000L * 60);
timeUnitTable.put("h", 1000L * 60 * 60);
timeUnitTable.put("d", 1000L * 60 * 60 * 24);
String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
try {
String[] levelArray = levelString.split(" ");
for (int i = 0; i < levelArray.length; i++) {
String value = levelArray[i];
String ch = value.substring(value.length() - 1);
Long tu = timeUnitTable.get(ch);
int level = i + 1;
if (level > this.maxDelayLevel) {
this.maxDelayLevel = level;
}
long num = Long.parseLong(value.substring(0, value.length() - 1));
long delayTimeMillis = tu * num;
//解析并生成延時(shí)時(shí)間
this.delayLevelTable.put(level, delayTimeMillis);
}
} catch (Exception e) {
log.error("parseDelayLevel exception", e);
log.info("levelString String = {}", levelString);
return false;
}
return true;
}
8、RocketMQ生產(chǎn)者-自定義消息發(fā)送規(guī)則
將消息發(fā)送到指定隊(duì)列(MessageQueue)
MessageQueueSelector:用于選擇指定隊(duì)列
public interface MessageQueueSelector {
MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}
要將消息發(fā)送到指定隊(duì)列,這需要手動(dòng)實(shí)現(xiàn)
1、producer實(shí)現(xiàn):
producer只需發(fā)送消息時(shí)調(diào)用如下方法即可
/**
* 發(fā)送有序消息
*
* @param messageMap 消息數(shù)據(jù)
* @param selector 隊(duì)列選擇器,發(fā)送時(shí)會(huì)回調(diào)
* @param order 回調(diào)隊(duì)列選擇器時(shí),此參數(shù)會(huì)傳入隊(duì)列選擇方法,提供配需規(guī)則
* @return 發(fā)送結(jié)果
*/
public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)
實(shí)現(xiàn)MessageQueueSelector,接口并重寫select()方法
public static void asyncMsgCustomQueue() throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
producer.setNamesrvAddr(Const.NAMESER_ADDR);
producer.start();
producer.setSendMsgTimeout(10000);
//1、創(chuàng)建消息
Message message = new Message("test_quick_topic", //主題
"TagA", //標(biāo)簽
"keyA", //用戶自定義的key,唯一的標(biāo)識(shí)
("hello RocketMq").getBytes());//消息體
//2、將消息發(fā)送到指定隊(duì)列
producer.send(message, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
Integer queueNumber = (Integer) arg;
int size = list.size();
int index = queueNumber % size;
return list.get(index);
}
},1);
}
拓展:
rocketmq問題匯總-如何將特定消息發(fā)送至特定queue,消費(fèi)者從特定queue消費(fèi)
