RocketMQ消息發(fā)送

RocketMQ支持3種消息發(fā)送方式:同步(sync)、異步(async)和單向(one way)。

  1. 同步:發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),同步等待,直到消息服務(wù)器返回發(fā)送結(jié)果。
  2. 異步:發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),指定消息發(fā)送成功后的回調(diào)函數(shù),調(diào)用消息發(fā)送API后,立即返回,消息發(fā)送者線程不阻塞,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個(gè)新的線程中執(zhí)行。
  3. 單向:消息發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊(cè)回調(diào)函數(shù)。簡(jiǎn)單地說(shuō),就是只管發(fā),不在乎消息是否成功存儲(chǔ)在消息服務(wù)器上。

RocketMQ消息發(fā)送需要考慮以下3個(gè)問(wèn)題:

  1. 消息隊(duì)列如何進(jìn)行負(fù)載?
  2. 消息發(fā)送如何實(shí)現(xiàn)高可用?
  3. 批量消息發(fā)送如何實(shí)現(xiàn)一致性?

1. Topic路由機(jī)制

初次發(fā)送時(shí)會(huì)根據(jù)topic的名稱向NameServer集群查詢topic的路由信息,然后將其存儲(chǔ)在本地內(nèi)存緩存中,并且每隔30s依次遍歷緩存中的topic,向NameServer查詢最新的路由信息。如果成功查詢到路由信息,會(huì)將這些信息更新至本地緩存,實(shí)現(xiàn)topic路由信息的動(dòng)態(tài)感知。

RocketMQ提供了自動(dòng)創(chuàng)建主題(topic)的機(jī)制,消息發(fā)送者向一個(gè)不存在的主題發(fā)送消息時(shí),向NameServer查詢?cè)撝黝}的路由信息會(huì)先返回空,如果開(kāi)啟了自動(dòng)創(chuàng)建主題機(jī)制,會(huì)使用一個(gè)默認(rèn)的主題名再次從NameServer查詢路由信息,然后消息發(fā)送者會(huì)使用默認(rèn)主題的路由信息進(jìn)行負(fù)載均衡,但不會(huì)直接使用默認(rèn)路由信息為新主題創(chuàng)建對(duì)應(yīng)的路由信息。使用默認(rèn)主題創(chuàng)建路由信息的流程如圖所示。


默認(rèn)消息時(shí)序.jpg

注意:RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由信息來(lái)自Broker的心跳包并存儲(chǔ)在內(nèi)存中。

2. 消息發(fā)送高可用設(shè)計(jì)

發(fā)送端在自動(dòng)發(fā)現(xiàn)主題的路由信息后,RocketMQ默認(rèn)使用輪詢算法進(jìn)行路由的負(fù)載均衡。RocketMQ在消息發(fā)送時(shí)支持自定義的隊(duì)列負(fù)載算法,需要特別注意的是,使用自定義的路由負(fù)載算法后,RocketMQ的重試機(jī)制將失效。RocketMQ為了實(shí)現(xiàn)消息發(fā)送高可用,引入了兩個(gè)非常重要的特性:

  1. 消息發(fā)送重試機(jī)制。RocketMQ在消息發(fā)送時(shí)如果出現(xiàn)失敗,默認(rèn)會(huì)重試兩次。
  2. 故障規(guī)避機(jī)制。當(dāng)消息第一次發(fā)送失敗時(shí),如果下一次消息還是發(fā)送到剛剛失敗的Broker上,其消息發(fā)送大概率還是會(huì)失敗,因此為了保證重試的可靠性,在重試時(shí)會(huì)盡量避開(kāi)剛剛接收失敗的Broker,而是選擇其他Broker上的隊(duì)列進(jìn)行發(fā)送,從而提高消息發(fā)送的成功率。

消息發(fā)送的高可用性設(shè)計(jì)如圖所示:


1756605750676.jpg

3. 消息發(fā)送流程

消息生產(chǎn)者啟動(dòng)流程在DefaultMQProducerImpl#start中,記錄了也記不住,這里不贅述了,主要記錄核心業(yè)務(wù)流程。
RocketMQ消息發(fā)送的關(guān)鍵點(diǎn)如圖所示:


image.png

消息發(fā)送流程主要的步驟為驗(yàn)證消息、查找路由、消息發(fā)送(包含異常處理機(jī)制),如下所示:
DefaultMQProducerImpl#send

public SendResult send(Message msg) throws MQClientException, RemotingException,
        MQBrokerException, InterruptedException {
    return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException,
        RemotingException, MQBrokerException, InterruptedException {
    return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}

這里重點(diǎn)介紹下Broker的故障延遲機(jī)制。
MQFaultStrategy#selectOneMessageQueue

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final
        String lastBrokerName) {
    if (this.sendLatencyFaultEnable) {
        try {
            int index = tpInfo.getSendWhichQueue().getAndIncrement();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                // 輪訓(xùn)隊(duì)列
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                // 驗(yàn)證該消息隊(duì)列是否可用
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                    // lastBrokerName就是上一次選擇的執(zhí)行發(fā)送消息失敗的Broker
                    if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                        return mq;
                    }
                }
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %
                            writeQueueNums);
                    }
                    return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

消息發(fā)送API核心入口DefaultMQProducerImpl#sendKernelImpl:

private SendResult sendKernelImpl(final Message msg,
        final MessageQueue mq,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final TopicPublishInfo topicPublishInfo,
        final long timeout)

4. 批量消息發(fā)送

批量消息發(fā)送是將同一主題的多條消息一起打包發(fā)送到消息服務(wù)端,減少網(wǎng)絡(luò)調(diào)用次數(shù),提高網(wǎng)絡(luò)傳輸效率。當(dāng)然,并不是在同一批次中發(fā)送的消息數(shù)量越多,性能就越好,判斷依據(jù)是單條消息的長(zhǎng)度,如果單條消息內(nèi)容比較長(zhǎng),則打包發(fā)送多條消息會(huì)影響其他線程發(fā)送消息的響應(yīng)時(shí)間,并且單批次消息發(fā)送總長(zhǎng)度不能超過(guò)Default MQProducer#maxMessageSize。批量發(fā)送消息要解決的是如何將這些消息編碼,以便服務(wù)端能夠正確解碼每條消息的內(nèi)容。

發(fā)送單條消息時(shí),消息體的內(nèi)容將保存在body中。發(fā)送批量消息時(shí),需要將多條消息體的內(nèi)容存儲(chǔ)在body中。如何存儲(chǔ)更便于服務(wù)端正確解析每條消息呢?RocketMQ采取的方式是,對(duì)單條消息內(nèi)容使用固定格式進(jìn)行存儲(chǔ),如圖所示:


image.png

首先在消息發(fā)送端,調(diào)用batch()方法,將一批消息封裝成MessageBatch對(duì)象。Message-Batch繼承自Message對(duì)象,內(nèi)部持有List<Message> messages。這樣一來(lái),批量消息發(fā)送與單條消息發(fā)送的處理流程就完全一樣了。MessageBatch只需要將該集合中每條消息的消息體聚合成一個(gè)byte[]數(shù)組,在消息服務(wù)端能夠從該byte[]數(shù)組中正確解析出消息。
MessageDecoder#encodeMessage

public static byte[] encodeMessage(Message message) {
    byte[] body = message.getBody(); int
    bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    propertiesLength = (short) propertiesBytes.length; int
    sysFlag = message.getFlag();
    int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);
    // 2 MAGICCODE
    byteBuffer.putInt(0);
    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    int flag = message.getFlag();
    byteBuffer.putInt(flag);
    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);
    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);
    return byteBuffer.array();
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容