rocketmq消息軌跡調(diào)研

背景

可靠性和可用性是每個(gè)mq系統(tǒng)最重要的兩個(gè)特征。雖然成熟的mq對(duì)這兩個(gè)特性支持的很好,但是mq仍然需要一些其他的手段去確保一個(gè)消息的整條鏈路沒(méi)有問(wèn)題。通過(guò)一些方法,我們應(yīng)該看到一個(gè)消息的完整鏈路并且迅速找到消息處理過(guò)程中失敗的根源問(wèn)題。
所以RocketMQ 支持消息軌跡跟蹤的特性,我們能輕松的找到和分析息處理過(guò)程中失敗的根源問(wèn)題。并且我們能查到很多參數(shù)的值,比如消息發(fā)送耗時(shí),消息消費(fèi)耗時(shí),broker端存儲(chǔ)時(shí)間等等。

設(shè)計(jì)架構(gòu)

架構(gòu)包括兩個(gè)部分,存儲(chǔ)端和客戶端收集

生產(chǎn)者消費(fèi)者的多線程模式和阻塞隊(duì)列

在客戶端,對(duì)于生產(chǎn)者,我們能收集信息(比如,發(fā)送和消費(fèi)消息耗費(fèi)的時(shí)間,對(duì)應(yīng)broker存儲(chǔ)時(shí)間,對(duì)應(yīng)broker的ip等等)并且把信息放入阻塞隊(duì)列。對(duì)于消費(fèi)者,使用一個(gè)叫做MQ-AsyncArrayDispatcher的線程從阻塞隊(duì)列去把消息軌跡跟蹤的信息取出。然后這個(gè)異步的線程(MQ-AsyncArrayDispatcher)把消息的軌跡跟蹤打包作為AsyncAppenderRequest 任務(wù)提交給線程池執(zhí)行。
最后,AsyncAppenderRequest task主要的執(zhí)行過(guò)程是把從客戶端收集到的軌跡跟蹤信息發(fā)送到一個(gè)特殊的broker 節(jié)點(diǎn)上。

實(shí)現(xiàn)兩個(gè)hook

包括“SendMessageHook/ConsumeMessageHook”,用這兩個(gè)雷我們能夠在發(fā)布和訂閱消息的前后收集到消息的軌跡跟蹤數(shù)據(jù)。

新定義一個(gè)特殊的broke節(jié)點(diǎn)去存儲(chǔ)消息軌跡跟蹤數(shù)據(jù)

在一個(gè)集群中我們能定義一個(gè)特殊的broker服務(wù)節(jié)點(diǎn)去存儲(chǔ)消息軌跡跟蹤的數(shù)據(jù)。我們?cè)赽roker.properties文件中,能夠加一個(gè)flag(比如autoTraceBrokerEnable)去定義這個(gè)broker是否是一個(gè)用來(lái)存儲(chǔ)消息軌跡跟蹤數(shù)據(jù)的特殊節(jié)點(diǎn)。

  • autoTraceBrokerEnable is false。表明這個(gè)broker 是一個(gè)普通的節(jié)點(diǎn),然后"Trace_Topic”將不去建立在這個(gè)節(jié)點(diǎn)上。并且正常的消息還會(huì)正常處理。
  • autoTraceBrokerEnable is true。表明broker是一個(gè)特殊的節(jié)點(diǎn),它是特別用來(lái)存儲(chǔ)消息軌跡跟蹤數(shù)據(jù)的。并且"Trace_Topic"在broker開(kāi)始階段自動(dòng)創(chuàng)建,這個(gè)節(jié)點(diǎn)自動(dòng)在nameserver注冊(cè) 它擁有的topic集合(包括Trace_Topic)。這樣,在一個(gè)RocketMQ 集群中,僅僅有一個(gè)特殊的broker節(jié)點(diǎn)去存儲(chǔ)消息軌跡跟蹤數(shù)據(jù)。并且客戶端(包括發(fā)布和訂閱消息)會(huì)通過(guò)nameserver知道那個(gè)broker節(jié)點(diǎn)是負(fù)責(zé)收集消息軌跡跟蹤數(shù)據(jù)的,并發(fā)送。

如何查詢軌跡跟蹤數(shù)據(jù)?

舉個(gè)例子,保存消息軌跡跟蹤數(shù)據(jù)的topic被稱作"RMQ_SYS_TRACE_DATA_XXX”而不是普通消息的topic。但是仍然用普通消息查詢方式(通過(guò)messageId + topic,topic+ key or topic實(shí)現(xiàn))通過(guò)RocketMQ console查詢是不可行的,查不到我們期望的結(jié)果。所以,當(dāng)發(fā)送被客戶端收集的消息軌跡跟蹤數(shù)據(jù)時(shí),我們能通過(guò)使用普通消息的msgId(不是offset MsgId) 或key去填充消息軌跡跟蹤數(shù)據(jù)的keyset 屬性,所以Broker端能夠根據(jù)普通消息的msgId或key去創(chuàng)建索引文件。
并且在broker端,我們能夠通過(guò)業(yè)務(wù)進(jìn)程調(diào)用QueryMessageProcessor去調(diào)用broker端的queryMessage() 方法。

源碼分析

接口的設(shè)計(jì)和變化

  • 數(shù)據(jù)傳輸?shù)漠惒浇涌冢涸黾覣syncDispatcher 類
public interface AsyncDispatcher {

    void start() throws MQClientException;

    boolean append(Object ctx);

    void flush() throws IOException;

    void shutdown();    
    }
  • 定義兩個(gè)hook的實(shí)現(xiàn):
    a.ClientSendMessageTraceHookImpl
    b.ClientConsumeMessageTraceHookImpl
  • 定義和寫(xiě)了對(duì)應(yīng)的數(shù)據(jù)模型:
    a.TraceBean
    b.TraceConstants
    c.TraceContext
    d.TraceDataEncoder
    e.TraceDispatcherType
    f.TraceTransferBean
    g.TuxeTraceType
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容