Apache RocketMQ Producer解析

基于RocketMQ源代碼版本:rocketmq-all-4.5.2-source-release

1、RocketMQ生產(chǎn)者-核心參數(shù)

參數(shù)名 默認(rèn)值 說明
producerGroup DEFAULT_PRODUCER Producer組名,多個(gè)Producer如果屬于一個(gè)應(yīng)用,發(fā)送同樣的消息,則應(yīng)該將它們歸為同一組。
createTopicKey TBW102 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,需要指定key
defaultTopicQueueNums 4 在發(fā)送消息時(shí),自動(dòng)創(chuàng)建服務(wù)器不存在的topic,默認(rèn)創(chuàng)建的隊(duì)列數(shù)
sendMsgTimeout 10000 發(fā)送消息超時(shí)時(shí)間,單位毫秒
compressMsgBodyOverHowmuch 4096 消息Body超過多大開始?jí)嚎s(Consumer收到消息會(huì)自動(dòng)解壓縮),單位字節(jié)
retryAnotherBrokerWhenNotStoreOK FALSE 如果發(fā)送消息返回sendResult,但是sendStatus!=SEND_OK,是否重試發(fā)送
maxMessageSize 131072 客戶端限制的消息大小,超過報(bào)錯(cuò),同時(shí)服務(wù)端也會(huì)限制(默認(rèn)128K)
transactionCheckListener 事物消息回查監(jiān)聽器,如果發(fā)送事務(wù)消息,必須設(shè)置
checkThreadPoolMinSize 1 Broker回查Producer事務(wù)狀態(tài)時(shí),線程池大小
checkThreadPoolMaxSize 1 Broker回查Producer事務(wù)狀態(tài)時(shí),線程池大小
checkRequestHoldMax 2000 Broker回查Producer事務(wù)狀態(tài)時(shí),Producer本地緩沖請(qǐng)求隊(duì)列大小

2、RocketMQ主從同步機(jī)制解析

Master-Slave主從同步
同步信息:消息數(shù)據(jù)內(nèi)容(commitLog) + 元數(shù)據(jù)信息(topic配置信息、消費(fèi)者偏移量Offset、延遲偏移量Offset等配置信息)

元數(shù)據(jù)同步:Broker角色識(shí)別,為Slave則啟動(dòng)同步定時(shí)任務(wù),由Netty實(shí)現(xiàn)

  • 在BrokerController的handleSlaveSynchronize()方法中
/**
 * 該方法的主要作用是處理從節(jié)點(diǎn)的元數(shù)據(jù)同步,
 * 即從節(jié)點(diǎn)向主節(jié)點(diǎn)主動(dòng)同步 topic 的路由信息、消費(fèi)進(jìn)度、延遲隊(duì)列處理隊(duì)列、消費(fèi)組訂閱配置等信息。
 * @param role
 */
private void handleSlaveSynchronize(BrokerRole role) {
    //如果角色為Slave
    if (role == BrokerRole.SLAVE) {
        if (null != slaveSyncFuture) {
            //如果上次同步的 future 不為空,則首先先取消
            slaveSyncFuture.cancel(false);
        }
        //然后設(shè)置 slaveSynchronize 的 master 地址為空
        this.slaveSynchronize.setMasterAddr(null);
        
        //在固定時(shí)間開啟只有一個(gè)線程的定時(shí)任務(wù),每 10s 從主節(jié)點(diǎn)同步一次配置數(shù)據(jù)
        slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                try {
                    //啟動(dòng)定時(shí)任務(wù)進(jìn)行元數(shù)據(jù)信息的同步
                    BrokerController.this.slaveSynchronize.syncAll();
                }
                catch (Throwable e) {
                    log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                }
            }
        }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
    } else {
        //handle the slave synchronise
        //如果當(dāng)前節(jié)點(diǎn)的角色為主節(jié)點(diǎn),則取消定時(shí)同步任務(wù)并設(shè)置 master 的地址為空
        if (null != slaveSyncFuture) {
            slaveSyncFuture.cancel(false);
        }
        this.slaveSynchronize.setMasterAddr(null);
    }
}

public void changeToSlave(int brokerId) {
    ......
    //handle the slave synchronise
         //操作主從同步
    handleSlaveSynchronize(BrokerRole.SLAVE);
    ......
}
  • 元數(shù)據(jù)同步內(nèi)容:SlaveSynchronize
public void syncAll() {
    this.syncTopicConfig(); //同步topic配置信息
    this.syncConsumerOffset();  //同步消費(fèi)者偏移量Offset
    this.syncDelayOffset(); //同步延遲偏移量Offset
    this.syncSubscriptionGroupConfig(); //訂閱組配置信息
}

消息數(shù)據(jù)內(nèi)容(commitLog)同步:時(shí)實(shí)同步由java原生Socket實(shí)現(xiàn),HAService、HAconnection、WaitNotifyObject

HAService:主從同步核心實(shí)現(xiàn)類。
Master節(jié)點(diǎn):

  • 1、AcceptSocketService acceptSocketService:服務(wù)端接收連接線程實(shí)現(xiàn)類,作為Master端監(jiān)聽Slave連接的實(shí)現(xiàn)類。
  • 2、HAConnection:HA Master-Slave 網(wǎng)絡(luò)連接對(duì)象,對(duì)Master節(jié)點(diǎn)連接、讀寫數(shù)據(jù)。
  • A、WriteSocketService writeSocketService:HAConnection網(wǎng)絡(luò)寫封裝,寫到Slave節(jié)點(diǎn)的數(shù)據(jù)。
  • B、ReadSocketService readSocketService:HAConnection網(wǎng)絡(luò)讀封裝,讀取來自Slave節(jié)點(diǎn)的數(shù)據(jù)。

Slave節(jié)點(diǎn):

  • HAClient haClient:HA客戶端實(shí)現(xiàn),Slave端網(wǎng)絡(luò)的實(shí)現(xiàn)類。

RocketMQ主從同步基本實(shí)現(xiàn)過程如下圖所示:


image.png

RocketMQ 的主從同步機(jī)制如下:

  • 1、首先啟動(dòng)Master并在指定端口監(jiān)聽Slave的連接請(qǐng)求;
  • 2、Slave啟動(dòng),主動(dòng)連接Master,建立TCP連接;
  • 3、Slave以每隔5s的間隔時(shí)間向Master拉取消息,如果是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量Offset,以該偏移量Offset向Master拉取消息;
  • 4、Master解析請(qǐng)求,并返回一批數(shù)據(jù)給Slave;
  • 5、Slave收到一批消息后,將消息寫入本地commitlog文件中,然后向Master匯報(bào)拉取進(jìn)度,并更新下一次待拉取偏移量Offset;
  • 6、然后重復(fù)第3步;

通信協(xié)議:Master節(jié)點(diǎn)與Slave節(jié)點(diǎn)通信協(xié)議很簡單,如下:

  • Slave ====> Master 上報(bào)CommitLog已經(jīng)同步到的物理位置
  • Master ====> 傳輸新的CommitLog數(shù)據(jù)

源碼研究RocketMQ主從同步機(jī)制(HA)https://blog.csdn.net/prestigeding/article/details/79600792

RocketMQ主從同步一個(gè)重要的特征:

  • RocketMQ4.5.0版本之前,主從同步不具備主從切換功能,即當(dāng)主節(jié)點(diǎn)宕機(jī)后,從不會(huì)接管消息發(fā)送,但可以提供消息讀取。
  • RocketMQ4.5.0版本之后,rocketmq基于raft 協(xié)議支持主從切換,引入了多副本機(jī)制,即 DLedger,支持主從切換,即當(dāng)一個(gè)復(fù)制組內(nèi)的主節(jié)點(diǎn)宕機(jī)后,會(huì)在該復(fù)制組內(nèi)觸發(fā)重新選主,選主完成后即可繼續(xù)提供消息寫功能。rocketmq主從切換基于 raft 協(xié)議,而Zookeeper也是基于該協(xié)議。

關(guān)于Raft協(xié)議請(qǐng)點(diǎn)擊:一文搞懂Raft算法

主從切換的主要邏輯在BrokerController類中,具體如下:

Broker 角色變更為Slave

/**
 * Broker 角色變更為Slave
 * @param brokerId
 */
public void changeToSlave(int brokerId) {
    log.info("Begin to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);

    //change the role
    //設(shè)置 brokerId,如果broker的id為0,則設(shè)置為1,這里在使用的時(shí)候,注意規(guī)劃好集群內(nèi)節(jié)點(diǎn)的 brokerId
    brokerConfig.setBrokerId(brokerId == 0 ? 1 : brokerId); //TO DO check
    //設(shè)置 broker  的角色為 BrokerRole.SLAVE。
    messageStoreConfig.setBrokerRole(BrokerRole.SLAVE);

    //handle the scheduled service
    try {
        //從節(jié)點(diǎn),則關(guān)閉定時(shí)調(diào)度線程(處理 RocketMQ 延遲隊(duì)列),如果是主節(jié)點(diǎn),則啟動(dòng)該線程。
        this.messageStore.handleScheduleMessageService(BrokerRole.SLAVE);
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to slave", t);
    }

    //handle the transactional service
    try {
        //關(guān)閉事務(wù)狀態(tài)回查處理器
        this.shutdownProcessorByHa();
    } catch (Throwable t) {
        log.error("[MONITOR] shutdownProcessorByHa failed when changing to slave", t);
    }

    //handle the slave synchronise
    //從節(jié)點(diǎn)需要啟動(dòng)配置信息同步處理器,即啟動(dòng) SlaveSynchronize 定時(shí)從主服務(wù)器同步元數(shù)據(jù)等配置信息
    handleSlaveSynchronize(BrokerRole.SLAVE);

    try {
        //立即向集群內(nèi)所有的 nameserver 告知 broker  信息狀態(tài)的變更
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
    } catch (Throwable ignored) {

    }
    log.info("Finish to change to slave brokerName={} brokerId={}", brokerConfig.getBrokerName(), brokerId);
}

/**
 * 關(guān)閉事務(wù)狀態(tài)回查處理器,當(dāng)Master變更Slave后,該方法被調(diào)用。
 */
private void shutdownProcessorByHa() {
    if (this.transactionalMessageCheckService != null) {
        this.transactionalMessageCheckService.shutdown(true);
    }
}

Broker 角色從Slave變更為Master

/**
 * Broker 角色從Slave變更為Master的處理邏輯
 * @param role
 */
public void changeToMaster(BrokerRole role) {
    if (role == BrokerRole.SLAVE) {
        return;
    }
    log.info("Begin to change to master brokerName={}", brokerConfig.getBrokerName());

    //handle the slave synchronise
    //Master節(jié)點(diǎn),會(huì)在該方法中設(shè)置slaveSyncFuture.cancel(false);
    handleSlaveSynchronize(role);

    //handle the scheduled service
    //開啟定時(shí)任務(wù)處理線程。
    try {
        this.messageStore.handleScheduleMessageService(role);
    } catch (Throwable t) {
        log.error("[MONITOR] handleScheduleMessageService failed when changing to master", t);
    }

    //handle the transactional service
    //開啟事務(wù)狀態(tài)回查處理線程。
    try {
        this.startProcessorByHa(BrokerRole.SYNC_MASTER);
    } catch (Throwable t) {
        log.error("[MONITOR] startProcessorByHa failed when changing to master", t);
    }

    //if the operations above are totally successful, we change to master
    //設(shè)置 brokerId 為 0,配置文件中brokerId為0是Master節(jié)點(diǎn)
    brokerConfig.setBrokerId(0); //TO DO check
    messageStoreConfig.setBrokerRole(role);

    try {
        //向 nameserver 立即發(fā)送心跳包以便告知 broker 服務(wù)器當(dāng)前最新的狀態(tài)
        this.registerBrokerAll(true, true, brokerConfig.isForceRegister());
    } catch (Throwable ignored) {

    }
    log.info("Finish to change to master brokerName={}", brokerConfig.getBrokerName());
}

/**
 * 該方法的作用是開啟事務(wù)狀態(tài)回查處理器,
 * 即當(dāng)節(jié)點(diǎn)為Master時(shí),開啟對(duì)應(yīng)的事務(wù)狀態(tài)回查處理器,對(duì)PREPARE狀態(tài)的消息發(fā)起事務(wù)狀態(tài)回查請(qǐng)求
 * @param role
 */
private void startProcessorByHa(BrokerRole role) {
    if (BrokerRole.SLAVE != role) {
        if (this.transactionalMessageCheckService != null) {
            this.transactionalMessageCheckService.start();
        }
    }
}

RocketMQ 主從切換DLedger 是基于raft協(xié)議實(shí)現(xiàn)的,在該協(xié)議中就實(shí)現(xiàn)了主節(jié)點(diǎn)的選舉與主節(jié)點(diǎn)失效后集群會(huì)自動(dòng)進(jìn)行重新選舉,經(jīng)過協(xié)商投票產(chǎn)生新的主節(jié)點(diǎn),從而實(shí)現(xiàn)高可用。
BrokerController#initialize(),在 Broker 啟動(dòng)時(shí),如果開啟了多副本機(jī)制,即 enableDLedgerCommitLog 參數(shù)設(shè)置為 true,會(huì)為 集群節(jié)點(diǎn)選主器添加 roleChangeHandler 事件處理器,即節(jié)點(diǎn)發(fā)送變更后的事件處理器。

if (result) {
    try {
        this.messageStore =
            new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, this.messageArrivingListener,
                this.brokerConfig);
        if (messageStoreConfig.isEnableDLegerCommitLog()) {
            DLedgerRoleChangeHandler roleChangeHandler = new DLedgerRoleChangeHandler(this, (DefaultMessageStore) messageStore);
            ((DLedgerCommitLog)((DefaultMessageStore) messageStore).getCommitLog()).getdLedgerServer().getdLedgerLeaderElector().addRoleChangeHandler(roleChangeHandler);
        }
        this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
        //load plugin
        MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, brokerStatsManager, messageArrivingListener, brokerConfig);
        this.messageStore = MessageStoreFactory.build(context, this.messageStore);
        this.messageStore.getDispatcherList().addFirst(new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
    } catch (IOException e) {
        result = false;
        log.error("Failed to initialize", e);
    }
}

DLedgerRoleChangeHandler#handle(),主從狀態(tài)切換的邏輯

/**
 * handle 主從狀態(tài)切換處理邏輯
 * @param term
 * @param role
 */
@Override 
public void handle(long term, MemberState.Role role) {
    Runnable runnable = new Runnable() {
        @Override public void run() {
            long start = System.currentTimeMillis();
            try {
                boolean succ = true;
                log.info("Begin handling broker role change term={} role={} currStoreRole={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole());
                switch (role) {
                    case CANDIDATE:
                        //如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 CANDIDATE,表示正在發(fā)起 Leader 節(jié)點(diǎn),如果該服務(wù)器的角色不是 SLAVE 的話,需要將狀態(tài)切換為 SLAVE
                        if (messageStore.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE) {
                            brokerController.changeToSlave(dLedgerCommitLog.getId());
                        }
                        break;
                    case FOLLOWER:
                        //如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 FOLLOWER,broker 節(jié)點(diǎn)將轉(zhuǎn)換為 從節(jié)點(diǎn)
                        brokerController.changeToSlave(dLedgerCommitLog.getId());
                        break;
                    case LEADER:
                        //如果當(dāng)前節(jié)點(diǎn)狀態(tài)機(jī)狀態(tài)為 Leader,說明該節(jié)點(diǎn)被選舉為 Leader,
                        //在切換到 Master 節(jié)點(diǎn)之前,首先需要等待當(dāng)前節(jié)點(diǎn)追加的數(shù)據(jù)都已經(jīng)被提交后才可以將狀態(tài)變更為 Master
                        while (true) {
                            if (!dLegerServer.getMemberState().isLeader()) {
                                succ = false;
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == -1) {
                                //如果 ledgerEndIndex 為 -1,表示當(dāng)前節(jié)點(diǎn)還沒有數(shù)據(jù)轉(zhuǎn)發(fā),直接跳出循環(huán),無需等待
                                break;
                            }
                            if (dLegerServer.getdLedgerStore().getLedgerEndIndex() == dLegerServer.getdLedgerStore().getCommittedIndex()
                                && messageStore.dispatchBehindBytes() == 0) {
                                //如果 ledgerEndIndex 不為 -1 ,則必須等待數(shù)據(jù)都已提交,即 ledgerEndIndex 與 committedIndex 相等
                                break;
                            }
                            Thread.sleep(100);
                        }
                        if (succ) {
                            //并且需要等待  commitlog 日志全部已轉(zhuǎn)發(fā)到 consumequeue中,
                            // 即 ReputMessageService 中的 reputFromOffset 與 commitlog 的 maxOffset 相等
                            messageStore.recoverTopicQueueTable();

                            //等待上述條件滿足后,即可以進(jìn)行狀態(tài)的變更,需要恢復(fù) ConsumeQueue,
                            // 維護(hù)每一個(gè) queue 對(duì)應(yīng)的 maxOffset,然后將 broker 角色轉(zhuǎn)變?yōu)?master
                            brokerController.changeToMaster(BrokerRole.SYNC_MASTER);
                        }
                        break;
                    default:
                        break;
                }
                log.info("Finish handling broker role change succ={} term={} role={} currStoreRole={} cost={}", succ, term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start));
            } catch (Throwable t) {
                log.info("[MONITOR]Failed handling broker role change term={} role={} currStoreRole={} cost={}", term, role, messageStore.getMessageStoreConfig().getBrokerRole(), DLedgerUtils.elapsed(start), t);
            }
        }
    };
    executorService.submit(runnable);
}

3、RocketMQ同步消息發(fā)送

消息的同步發(fā)送:producer.send(msg)
同步發(fā)送消息核心實(shí)現(xiàn):DefaultMQProducerImpl

4、RocketMQ異步消息發(fā)送

消息的異步發(fā)送:producer.send(Message msg, SendCallBack sendCallBack)
異步發(fā)送消息核心實(shí)現(xiàn):DefaultMQProducerImpl

5、Netty底層通信框架解析

rocketmq底層網(wǎng)絡(luò)使用的netty框架,類圖如下


image.png

Remoting模塊類結(jié)構(gòu)圖:


image.png
  • RecketMQ通信模塊的頂層結(jié)構(gòu)是RemotingServer和RemotingClient,分別對(duì)應(yīng)通信的服務(wù)端和客戶端

  • RemotingServer類中比較重要的是:localListenPort、registerProcessor和registerDefaultProcessor,registerDefaultProcesor用來設(shè)置接收到消息后的處理方法。

  • RemotingClient類和RemotingServer類相對(duì)應(yīng),比較重要的方法是updateNameServerAddressList、invokeSync和invokeOneway、updateNameServerAddresList用來獲取有效的NameServer地址,invokeSync與invokeOneway用來向Server端發(fā)送請(qǐng)求

  • NettyRemotingServer和NettyRemotingClient分別實(shí)現(xiàn)了RemotingServer和RemotingClient這兩個(gè)接口,但它們有很多共有的內(nèi)容,比如invokeSync、invokeOneway等,所以這些共有函數(shù)被提取到NettyRemotingAbstract共同繼承的父類中。

  • 無論是服務(wù)端還是客戶端都需要處理接收到的請(qǐng)求,處理方法由processMessageReceived定義,注意這里接收到的消息已經(jīng)被轉(zhuǎn)換成RemotingCommand了,而不是原始的字節(jié)流。

  • RemotingCommand是RocketMQ自定義的協(xié)議,具體格式如下


    image
  • 這個(gè)協(xié)議只有四部分,但是覆蓋了RocketMQ各個(gè)角色間幾乎所有的通信過程,RemotingCommand有實(shí)際的數(shù)據(jù)類型和各部分對(duì)應(yīng),如下所示。

private int code;
private LanguageCode language = LanguageCode.JAVA;
private int version = 0;
private int opaque = requestId.getAndIncrement();
private int flag = 0;
private String remark;
private HashMap<String, String> extFields;
private transient CommandCustomHeader customHeader;

private SerializeType serializeTypeCurrentRPC = serializeTypeConfigInThisServer;

private transient byte[] body;
  • RocketMQ各個(gè)組件間的通信需要頻繁地在字節(jié)碼和RemotingCommand間相互轉(zhuǎn)換,也就是編碼、解碼過程,好在Netty提供了codec支持,這個(gè)頻繁地操作只需要一行設(shè)置即可:pipeline().addLoast(newNettyEncoder(), now NettyDecoder() )

  • RocketMQ對(duì)通信過程的另一個(gè)抽象是Processor和Executor,當(dāng)接收到一個(gè)消息后,直接根據(jù)消息的類型調(diào)用對(duì)應(yīng)的Processor和Executor,把通信過程和業(yè)務(wù)邏輯分離開來。

  • 具體的rocketmq netty底層設(shè)計(jì)源碼:rocketmq netty底層設(shè)計(jì)

6、RocketMQ生產(chǎn)者-消息返回狀態(tài)詳解

public enum SendStatus {
    SEND_OK,
    FLUSH_DISK_TIMEOUT,
    FLUSH_SLAVE_TIMEOUT,
    SLAVE_NOT_AVAILABLE,
}
  • 1、SEND_OK:消息發(fā)送成功
  • 2、FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但服務(wù)在進(jìn)行刷盤的時(shí)候超時(shí)了
    消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,刷盤超時(shí)會(huì)等待下一次的刷盤時(shí)機(jī)再次刷盤,如果此時(shí)服務(wù)器down機(jī)消息丟失,會(huì)返回此種狀態(tài),如果業(yè)務(wù)系統(tǒng)是可靠性消息投遞,那么需要重發(fā)消息。
  • 3、FLUSH_SLAVE_TIMEOUT:在主從同步的時(shí)候,同步到Slave超時(shí)了
    如果此時(shí)Master節(jié)點(diǎn)down機(jī),消息也會(huì)丟失
  • 4、SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但Slave不可用,只有Master節(jié)點(diǎn)down機(jī),消息才會(huì)丟失
  • 后三種狀態(tài),如果業(yè)務(wù)系統(tǒng)是可靠性消息投遞,那么需要考慮補(bǔ)償進(jìn)行可靠性的重試投遞

7、RocketMQ生產(chǎn)者-延遲消息

延遲消息:消息發(fā)到Broker后,要特定的時(shí)間才會(huì)被Consumer消費(fèi)
目前RocketMQ只支持固定精度的定時(shí)消息
具體實(shí)現(xiàn):

message.setDelayTimeLevel();

MessageStoreConfig配置類

//設(shè)置固定精度的消息延時(shí)投遞的時(shí)間
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

ScheduleMessageService任務(wù)類

public boolean parseDelayLevel() {
    HashMap<String, Long> timeUnitTable = new HashMap<String, Long>();
    timeUnitTable.put("s", 1000L);
    timeUnitTable.put("m", 1000L * 60);
    timeUnitTable.put("h", 1000L * 60 * 60);
    timeUnitTable.put("d", 1000L * 60 * 60 * 24);

    String levelString = this.defaultMessageStore.getMessageStoreConfig().getMessageDelayLevel();
    try {
        String[] levelArray = levelString.split(" ");
        for (int i = 0; i < levelArray.length; i++) {
            String value = levelArray[i];
            String ch = value.substring(value.length() - 1);
            Long tu = timeUnitTable.get(ch);

            int level = i + 1;
            if (level > this.maxDelayLevel) {
                this.maxDelayLevel = level;
            }
            long num = Long.parseLong(value.substring(0, value.length() - 1));
            long delayTimeMillis = tu * num;
            //解析并生成延時(shí)時(shí)間
            this.delayLevelTable.put(level, delayTimeMillis);
        }
    } catch (Exception e) {
        log.error("parseDelayLevel exception", e);
        log.info("levelString String = {}", levelString);
        return false;
    }

    return true;
}

8、RocketMQ生產(chǎn)者-自定義消息發(fā)送規(guī)則

將消息發(fā)送到指定隊(duì)列(MessageQueue)
MessageQueueSelector:用于選擇指定隊(duì)列

public interface MessageQueueSelector {
    MessageQueue select(final List<MessageQueue> mqs, final Message msg, final Object arg);
}

要將消息發(fā)送到指定隊(duì)列,這需要手動(dòng)實(shí)現(xiàn)
1、producer實(shí)現(xiàn):
producer只需發(fā)送消息時(shí)調(diào)用如下方法即可

/**
 * 發(fā)送有序消息
 *
 * @param messageMap 消息數(shù)據(jù)
 * @param selector   隊(duì)列選擇器,發(fā)送時(shí)會(huì)回調(diào)
 * @param order      回調(diào)隊(duì)列選擇器時(shí),此參數(shù)會(huì)傳入隊(duì)列選擇方法,提供配需規(guī)則
 * @return 發(fā)送結(jié)果
 */
public Result<SendResult> send(Message msg, MessageQueueSelector selector, Object arg)

實(shí)現(xiàn)MessageQueueSelector,接口并重寫select()方法

public static void asyncMsgCustomQueue() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("test_quick_producer_name");
    producer.setNamesrvAddr(Const.NAMESER_ADDR);

    producer.start();
    producer.setSendMsgTimeout(10000);
    //1、創(chuàng)建消息
    Message message = new Message("test_quick_topic",  //主題
                                    "TagA",          //標(biāo)簽
                                    "keyA",          //用戶自定義的key,唯一的標(biāo)識(shí)
                                    ("hello RocketMq").getBytes());//消息體
    //2、將消息發(fā)送到指定隊(duì)列
    producer.send(message, new MessageQueueSelector() {
        @Override
        public MessageQueue select(List<MessageQueue> list, Message message, Object arg) {
            Integer queueNumber = (Integer) arg;
            int size = list.size();
            int index = queueNumber % size;
            return list.get(index);
        }
    },1);

}

拓展:

源碼研究RocketMQ主從同步機(jī)制(HA)

rocketmq問題匯總-如何將特定消息發(fā)送至特定queue,消費(fèi)者從特定queue消費(fèi)

參考:

rocketmq netty底層設(shè)計(jì)

rocketMq-延遲消息介紹

RocketMQ(1)-架構(gòu)原理

Rocketmq原理&最佳實(shí)踐

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • Apache RocketMQ 基礎(chǔ)概念及架構(gòu)解析 Apache RocketMQ 系列: Apache Rock...
    掛機(jī)的啊洋zzZ閱讀 6,239評(píng)論 1 47
  • 1 架構(gòu)原理 1.1 應(yīng)用場(chǎng)景 只支持發(fā)布訂閱模式。 大數(shù)據(jù)量的消息堆積能力,最終數(shù)據(jù)是持久化到磁盤上,理論上無限...
    可笑可樂閱讀 9,643評(píng)論 0 2
  • ??今年的一個(gè)周末,去參加了一場(chǎng)rocketMq的meet up分享,由此對(duì)rocketMq產(chǎn)生了極大的興趣,ro...
    左小星閱讀 16,408評(píng)論 7 28
  • 核心組件(4個(gè)組件+消息存儲(chǔ)結(jié)構(gòu)) 客戶端消費(fèi)模式 1. MQ的使用場(chǎng)景 昨天在寫完之后,有些讀者在評(píng)論中提出:到...
    樓亭樵客閱讀 1,148評(píng)論 0 3
  • 同哥說“時(shí)間會(huì)任性的為回憶涂上色彩,就好像明明是五顏六色的回憶,沉淀,加工、過濾,就只剩下了白色?!边@句話剛好出現(xiàn)...
    王玦閱讀 746評(píng)論 3 6

友情鏈接更多精彩內(nèi)容