RocketMq總結(jié)

架構(gòu)圖

架構(gòu)圖.png

基本概念

Producer

消息生產(chǎn)者,負(fù)責(zé)產(chǎn)生消息,一般由業(yè)務(wù)系統(tǒng)負(fù)責(zé)產(chǎn)生消息

Consumer

消息消費(fèi)者,負(fù)責(zé)消費(fèi)消息,一般是后臺(tái)系統(tǒng)負(fù)責(zé)異步消費(fèi)

Broker

消息中轉(zhuǎn)角色,負(fù)責(zé)存儲(chǔ)消息,轉(zhuǎn)収消息,一般也稱為 Server。

NameServer

類似于zookeeper


概念模型

image.png

即消息是根據(jù)主題(即我們圖里面的Topic)進(jìn)行訂閱,而每個(gè)Topic下面又可以有多個(gè)隊(duì)列,只是這里的隊(duì)列并不真正存儲(chǔ)消息,而是起到類似索引的作用,消息真正存儲(chǔ)在CommitLog里面,如下圖


RocketMq消息實(shí)際存儲(chǔ)結(jié)構(gòu).png

所有數(shù)據(jù)單獨(dú)存儲(chǔ)到一個(gè) Commit Log,完全順序?qū)懀S機(jī)讀。對(duì)最終用戶展現(xiàn)的隊(duì)列實(shí)際只存儲(chǔ)消息在 Commit Log 的位置信息

Message Queue

在RocketMQ中,所有消息隊(duì)列都是持久化,長度無限的數(shù)據(jù)結(jié)構(gòu),所謂長度無限是指隊(duì)列中的每個(gè)存儲(chǔ)單元都是定長,訪問其中的存儲(chǔ)單元使用Offset來訪問,offset為java long類型,64位,理論上在100年內(nèi)不會(huì)溢出,所以認(rèn)為是長度無限,另外隊(duì)列中只保存最近幾天的數(shù)據(jù),之前的數(shù)據(jù)會(huì)按照過期時(shí)間來刪除。也可以認(rèn)為Message Queue是一個(gè)長度無限的數(shù)組,offset就是下標(biāo)。

這樣做的好處
(1)隊(duì)列輕量化,單個(gè)隊(duì)列數(shù)據(jù)量非常少
(2)對(duì)磁盤的訪問串行化,避免磁盤竟?fàn)?,丌?huì)因?yàn)殛?duì)列增加導(dǎo)致 IOWAIT 增高

這樣做的缺點(diǎn)
(1)寫雖然完全是順序?qū)懀亲x卻發(fā)成了完全的隨機(jī)讀。
(2)讀一條消息,會(huì)先讀 Consume Queue,再讀 Commit Log,增加了開銷。
(3)要保證 Commit Log 不 Consume Queue 完全的一致,增加了編程的復(fù)雜度

RocketMq的解決方案

隨機(jī)讀(主要是指磁盤隨機(jī)讀),盡可能讓讀命中 PAGECACHE,減少 IO 讀操作,所以內(nèi)存越大越好。同時(shí)由于緩存的局部性原理,可以很快的在內(nèi)存上讀取到消息


RocketMq里面的消息類型

順序消息

消費(fèi)消息的順序要同収送消息的順序一致,在 RocketMQ 中,主要指的是局部順序,即一類消息為滿足順序性,必須 Producer 單線程順序發(fā)送到同一個(gè)隊(duì)列,返樣 Consumer 就可以按照 Producer 發(fā)送的順序去消費(fèi)消息。

普通順序消息

順序消息的一種,正常情況下可以保證完全的順序消息,但是一旦發(fā)生通信異常,Broker重啟,由于隊(duì)列總數(shù)發(fā)生發(fā)化,哈希取模后定位的隊(duì)列會(huì)發(fā)化,產(chǎn)生短暫的消息順序不一致。如果業(yè)務(wù)能容忍在集群異常情況(如某個(gè)Broker宕機(jī)或者重啟)下,消息短暫的亂序,使用普通順序方式比較合適。

嚴(yán)格順序消息

嚴(yán)格順序消息順序消息的一種,無論正常異常情況都能保證順序,但是犧牲了分布式Failover特性,即Broker集群中只要有一臺(tái)機(jī)器不可用,則整個(gè)集群都不可用,服務(wù)可用性大大降低。如果服務(wù)器部署為同步雙寫模式,此缺陷可通過備機(jī)自動(dòng)切換為主避免,不過仍然會(huì)存在幾分鐘的服務(wù)不可用。(依賴同步雙寫,主備自動(dòng)切換,自動(dòng)切換功能目前還未實(shí)現(xiàn))目前已知的應(yīng)用只有數(shù)據(jù)庫binlog同步強(qiáng)依賴嚴(yán)格順序消息,其他應(yīng)用絕大部分都可以容忍短暫亂序,推薦使用普通的順序消息。


獲取消息的方式

  • Broker主動(dòng)進(jìn)行推送至消費(fèi)者
    缺點(diǎn):消費(fèi)者可能消費(fèi)過慢造成堆積,同時(shí)如果有很多消費(fèi)者對(duì)于Broker也是一件很繁重的事情

  • 長輪詢
    即消費(fèi)者會(huì)主動(dòng)去拉取,缺點(diǎn)是可能獲取不及時(shí),但長輪詢指的是我會(huì)多等一會(huì),類似于長連接


RocketMq里面消息的幾種消費(fèi)方式


涉及到磁盤,就會(huì)有零拷貝,RocketMq也不例外,常用的零拷貝有如下兩種方式

內(nèi)存映射

image.png

對(duì)應(yīng)的java代碼

File file = new File("data.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());

真正的零拷貝


image.png

對(duì)應(yīng)的java代碼

 File file = new File("test.zip");
        RandomAccessFile raf = new RandomAccessFile(file, "rw");
        FileChannel fileChannel = raf.getChannel();
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
        // 直接使用了transferTo()進(jìn)行通道間的數(shù)據(jù)傳輸
        fileChannel.transferTo(0, fileChannel.size(), socketChannel);

這兩種方式的比較

使用mmap + write方式
優(yōu)點(diǎn):即使頻繁調(diào)用,使用小塊文件傳輸,效率也很高
缺點(diǎn):不能很好的利用DMA方式,會(huì)比sendfile多消耗CPU,內(nèi)存安全性控制復(fù)雜,需要避免JVM Crash問題。

使用sendfile方式
優(yōu)點(diǎn):可以利用DMA方式,消耗CPU較少,大塊文件傳輸效率高,無內(nèi)存安全新問題。
缺點(diǎn):小塊文件效率低于mmap方式,只能是BIO方式傳輸,不能使用NIO。

RocketMq采用的是基于內(nèi)存映射的方式,因?yàn)樾K數(shù)據(jù)傳輸?shù)母鼮轭l繁


消息的持久化

同步刷盤與異步刷盤.png

異步刷盤
寫入到Page Cache后就立馬返回了,然后再調(diào)用fsync函數(shù)異步的去將數(shù)據(jù)刷到磁盤

優(yōu)點(diǎn)

效率高又快

缺點(diǎn)

斷點(diǎn)或者重啟,內(nèi)存里面的數(shù)據(jù)還沒來得及刷入到磁盤就沒有了,所以會(huì)有丟消息的概率


同步刷盤

當(dāng)然就是寫入到Page Cache后就立馬調(diào)用fsync函數(shù)立馬刷入到磁盤

優(yōu)點(diǎn)

可以做到不丟消息

缺點(diǎn)

當(dāng)然就是犧牲性能了


接著再來分析下 幾種消費(fèi)消息的方式

At least Once

是指每個(gè)消息必須投遞一次,RocketMQConsumer先pull消息到本地,消費(fèi)完成后,才吐服務(wù)器返回ack,如果沒有消費(fèi)一定不會(huì)ack消息,所以RocketMQ可以很好的支持此特性

Exactly Only Once

(1).發(fā)送消息階段,不允許収送重復(fù)的消息。
(2).消費(fèi)消息階段,不允許消費(fèi)重復(fù)的消息。

只有以上兩個(gè)條件都滿足情況下,才能稱為消息是“Exactly Only Once”,而要實(shí)現(xiàn)以上兩點(diǎn),在分布式系統(tǒng)環(huán)境下,不可避免要產(chǎn)生巨大的開銷。所以RocketMQ為了追求高性能,并不保證此特性,要求在業(yè)務(wù)上進(jìn)行去重,也就是說消費(fèi)消息要做到冪等性。RocketMQ雖然步能嚴(yán)格保證不重復(fù),但是正常情冴下很少會(huì)出現(xiàn)重復(fù)収送、消費(fèi)情況,只有網(wǎng)絡(luò)異常,Consumer啟停等異常情況下會(huì)出現(xiàn)消息重復(fù)。此問題的本質(zhì)原因是網(wǎng)絡(luò)調(diào)用存在不確定性,即不成功也不失敗的第三種狀態(tài),所以才產(chǎn)生了消息重復(fù)性問題。

定時(shí)消息

定時(shí)消息是指消息發(fā)到Broker后,不能立刻被Consumer消費(fèi),要到特定的時(shí)間點(diǎn)或者等待特定的時(shí)間后才能被消費(fèi)。如果要支持任意的時(shí)間精度,在Broker局面,必須要做消息排序,如果再涉及到持久化,那消息排序要不可避免的產(chǎn)生巨大性能開銷。RocketMQ支持定時(shí)消息,但是不支持任意時(shí)間精度,支持特定的level,例如定時(shí)5s,10s,1m等。


消息過濾

RocketMQ的消息過濾方式有別于其他消息中間件,是在訂閱時(shí),再做過濾,先來看下Consume Queue的存儲(chǔ)結(jié)構(gòu)


ConsumeQueue單個(gè)存儲(chǔ)單元結(jié)構(gòu).png

(1)在Broker端迕行Message Tag比對(duì),先遍歷Consume Queue,如果存儲(chǔ)的Message Tag與訂閱的Message Tag不符合,則跳過,繼續(xù)比對(duì)下一個(gè),符合則傳輸給Consumer。注意:Message Tag是字符串形式,Consume Queue中存儲(chǔ)的是其對(duì)應(yīng)的hashcode,比對(duì)時(shí)也是比對(duì)hashcode。(2).Consumer收到過濾后的消息后,同樣也要執(zhí)行在Broker端的操作,但是比對(duì)的是真實(shí)的Message Tag字符串,而不是Hashcode

這么做的原因?
(1)Message Tag存儲(chǔ)Hashcode,是為了在Consume Queue定長方式存儲(chǔ),節(jié)約空間
(2)過濾過程中不會(huì)訪問Commit Log數(shù)據(jù),可以保證堆積情況下也能高效過濾
(3)即使存在Hash沖突,也可以在Consumer端進(jìn)行修正,保證萬無一失


高可用

談到高可用,自然就想到集群,那么多臺(tái)機(jī)器間消息的同步方式就有同步雙寫和異步復(fù)制兩種

異步復(fù)制

異步復(fù)制的實(shí)現(xiàn)思路非常簡單,Slave啟勱一個(gè)線程,不斷從Master拉取Commit Log中的數(shù)據(jù),然后在異步build出Consume Queue數(shù)據(jù)結(jié)構(gòu)。整個(gè)實(shí)現(xiàn)過程基本同Mysql主從同步類似。

同步雙寫

也類似于Mysql的半同步復(fù)制,即主上寫完,其中一臺(tái)從也要寫完才統(tǒng)一返回給客戶端ok.整體思想是類似的


上面我們談到RocketMq沒有使用Zookeeper而是自己實(shí)現(xiàn)了NameServer

 public boolean initialize() {
                .....        
    this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.routeInfoManager.scanNotActiveBroker();
            }
        }, 5, 10, TimeUnit.SECONDS);

        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

            @Override
            public void run() {
                NamesrvController.this.kvConfigManager.printAllPeriodically();
            }
        }, 1, 10, TimeUnit.MINUTES);

        .....
    }
  • 定時(shí)任務(wù)1:NameServer每隔10s掃描一次Broker,移除處于不激活狀態(tài)的Broker
  • 定時(shí)任務(wù)2:NameServer每隔10分鐘打印一次KV配置

我們可以看到,集群其實(shí)就是維護(hù)心跳,這里面其實(shí)還有很多細(xì)節(jié),還沒看完,看完再更新吧


Producer最佳實(shí)踐
發(fā)送消息注意事項(xiàng)
(1)一個(gè)應(yīng)用盡可能用一個(gè)Topic,消息子類型用tags來標(biāo)識(shí),tags可以由應(yīng)用自由設(shè)置。只有發(fā)送消息設(shè)置了tags,消費(fèi)方在訂閱消息時(shí),才可以利用tags在broker做消息過濾。message.setTags("TagA");

(2)每個(gè)消息在業(yè)務(wù)局面的唯一標(biāo)識(shí)碼,要設(shè)置到keys字段,方便將來定位消息丟失問題。服務(wù)器會(huì)為每個(gè)消息創(chuàng)建索引(哈希索引),應(yīng)用可以通過topic,key來查詢返條消息內(nèi)容,以及消息被誰消費(fèi)。由于是哈希索引,請(qǐng)務(wù)必保證key盡可能唯一,返樣可以避免潛在的哈希沖突。// 訂單IdString orderId = "20034568923546";message.setKeys(orderId);

(3)消息發(fā)送成功或者失敗,要打印消息日志,務(wù)必要打印sendresult和key字段。

(4)send消息方法,只要不拋異常,就代表發(fā)送成功。但是發(fā)送成功會(huì)有多個(gè)狀態(tài),在sendResult里定義
SEND_OK消息發(fā)送成功
FLUSH_DISK_TIMEOUT:消息發(fā)送成功,但是服務(wù)器刷盤超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失
FLUSH_SLAVE_TIMEOUT:消息發(fā)送成功,但是服務(wù)器同步到Slave時(shí)超時(shí),消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失
SLAVE_NOT_AVAILABLE:消息發(fā)送成功,但是此時(shí)slave不可用,消息已經(jīng)進(jìn)入服務(wù)器隊(duì)列,只有此時(shí)服務(wù)器宕機(jī),消息才會(huì)丟失對(duì)與精確發(fā)送順序消息的應(yīng)用,由亍順序消息的局限性,可能會(huì)涉及到主備自動(dòng)切換問題,所以如果sendresult中的status字段不等于SEND_OK,就應(yīng)該嘗試重試。對(duì)于其他應(yīng)用,則沒有必要這樣。
(5)對(duì)于消息不可丟失應(yīng)用,務(wù)必要有消息重發(fā)機(jī)制
例如如果消息發(fā)送失敗,存儲(chǔ)到數(shù)據(jù)庫,能有定時(shí)程序嘗試重發(fā),或者人工觸發(fā)重發(fā)。

Consumer最佳實(shí)踐
(1)將消息的唯一鍵,可以是msgId,也可以是消息內(nèi)容中的唯一標(biāo)識(shí)字段,例如訂單Id等,消費(fèi)之前判斷是否在Db或(全局KV存儲(chǔ))中存在,如果不存在則插入,并消費(fèi),否則跳過。(實(shí)際過程要考慮原子性問題,判斷是否存在可以嘗試插入入,如果報(bào)主鍵沖突,則插入失敗,直接跳過)msgId一定是全局唯一標(biāo)識(shí)符,但是可能會(huì)存在同樣的消息有兩個(gè)不同msgId的情冴(有多種原因),返種情況可能會(huì)使業(yè)務(wù)上重復(fù)消費(fèi),建議最好使用消息內(nèi)容中的唯一標(biāo)識(shí)字段去重。
2.使用業(yè)務(wù)局面的狀態(tài)機(jī)去重

具體可見冪等總結(jié)


負(fù)載均衡策略

AllocateMessageQueueAveragely

平均算法: 算出平均值,將連續(xù)的隊(duì)列按平均值分配給每個(gè)消費(fèi)者。 如果能夠整除,則按順序?qū)⑵骄祩€(gè)Queue分配,如果不能整除,則將多余出的Queue按照Consumer順序逐個(gè)分配


image.png

AllocateMessageQueueAveragelyByCircle

環(huán)形平均算法:將消費(fèi)者按順序形成一個(gè)環(huán)形,然后按照這個(gè)環(huán)形順序逐個(gè)給消費(fèi)者分配一個(gè)Queue


image.png

AllocateMessageQueueConsistentHash

一致性hash算法:先將消費(fèi)端的hash值放于環(huán)上,同時(shí)計(jì)算隊(duì)列的hash值,以順時(shí)針方向,分配給離隊(duì)列hash值最近的一個(gè)消費(fèi)者節(jié)點(diǎn)


image.png

代碼

/*
   1.初始化:默認(rèn)使用AllocateMessageQueueAveragely算法分配Queue
 */
public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {
    public DefaultMQPushConsumer(final String consumerGroup) {
        this(null, consumerGroup, null, new AllocateMessageQueueAveragely());
    }
}
/*
   2.開啟一個(gè)RebalanceService任務(wù)執(zhí)行分配策略
 */
public class MQClientInstance {
    public void start() throws MQClientException {
        synchronized (this) {
            switch (this.serviceState) {
                case CREATE_JUST:
                    ...
                    // Start rebalance service
                    this.rebalanceService.start();
                    ...
                default:
                    break;
            }
        }
    }
}
/*
   3.RebalanceImpl.rebalanceByTopic執(zhí)行具體的分配邏輯
 */
public abstract class RebalanceImpl {
    private boolean rebalanceByTopic(final String topic, final boolean isOrder) {
        boolean balanced = true;
        switch (messageModel) {
            ...
            case CLUSTERING: {
                // 拿到所有的Queue
                Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
                // 拿到所有的消費(fèi)者ID
                List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
                ...
                if (mqSet != null && cidAll != null) {
                    List<MessageQueue> mqAll = new ArrayList<>();
                    mqAll.addAll(mqSet);

                    Collections.sort(mqAll);
                    Collections.sort(cidAll);

                    // 初始化設(shè)置的分配算法:即AllocateMessageQueueAveragely平均分配算法
                    AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

                    List<MessageQueue> allocateResult = null;
                    try {
                        // 調(diào)用分配算法的具體實(shí)現(xiàn)
                        allocateResult = strategy.allocate(...mqAll, cidAll);
                    } catch (Throwable e) {
                        log.error("allocate message queue exception. strategy name: {}, ex: {}", strategy.getName(), e);
                        return false;
                    }
                    ...
                }
                break;
            }
            default:
                break;
        }
        return balanced;
    }
}
/*
   4.執(zhí)行AllocateMessageQueueAveragely平均分配算法的具體實(shí)現(xiàn)
 */
public class AllocateMessageQueueAveragely extends AbstractAllocateMessageQueueStrategy {
    @Override
    public List<MessageQueue> allocate(String consumerGroup, String currentCID, List<MessageQueue> mqAll, List<String> cidAll) {

        List<MessageQueue> result = new ArrayList<>();
        if (!check(consumerGroup, currentCID, mqAll, cidAll)) {
            return result;
        }

        int index = cidAll.indexOf(currentCID);
        int mod = mqAll.size() % cidAll.size();
        int averageSize =
                mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size()
                        + 1 : mqAll.size() / cidAll.size());
        int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
        int range = Math.min(averageSize, mqAll.size() - startIndex);
        for (int i = 0; i < range; i++) {
            result.add(mqAll.get((startIndex + i) % mqAll.size()));
        }
        return result;
    }
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容