RocketMQ

RocketMQ組成

image.png

RocketMQ由NameServer、Broker、Producer、Consumer組成:

  • NameServer
    NameServer節(jié)點(diǎn)之間無任何信息同步,topic和路由信息管理
  • Broker
    master與slave的對(duì)應(yīng)關(guān)系通過指定相同brokerName、不同brokerId來定義,brokerId為0表示master,非0表示slave
    每個(gè)broker與NameServer集群中的所有節(jié)點(diǎn)建立長連接,定時(shí)注冊(cè)topic信息到所有NameServer
  • Producer
    與NameServer集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從NameServer獲取topic路由信息,并向提供topic服務(wù)的master建立長連接,且定時(shí)向master發(fā)送心跳
  • Consumer
    與NameServerr集群中的其中一個(gè)節(jié)點(diǎn)(隨機(jī)選擇)建立長連接,定期從NameServer獲取topic路由信息,并向提供topic服務(wù)的master、slave建立長連接,且定時(shí)向master、slave發(fā)送心跳,即可從master訂閱消息,也可從slave訂閱消息,由Broker配置決定

Producer Group作用

主要的用途是事務(wù)消息,Broker 需要向消息發(fā)送者回查事務(wù)狀態(tài)

順序消息

消息的順序性分為兩部分,生產(chǎn)順序性和消費(fèi)順序性

Apache RocketMQ 通過生產(chǎn)者和服務(wù)端的協(xié)議保障單個(gè)生產(chǎn)者串行地發(fā)送消息,并按序存儲(chǔ)和持久化。
如需保證消息生產(chǎn)的順序性,則必須滿足以下條件:

  • 單一生產(chǎn)者
    消息生產(chǎn)的順序性僅支持單一生產(chǎn)者,不同生產(chǎn)者分布在不同的系統(tǒng),即使設(shè)置相同的消息組,不同生產(chǎn)者之間產(chǎn)生的消息也無法判定其先后順序。

  • 串行發(fā)送
    Apache RocketMQ 生產(chǎn)者客戶端支持多線程安全訪問,但如果生產(chǎn)者使用多線程并行發(fā)送,則不同線程間產(chǎn)生的消息將無法判定其先后順序。

滿足以上條件的生產(chǎn)者,將順序消息發(fā)送至 Apache RocketMQ 后,會(huì)保證設(shè)置了同一消息組的消息,按照發(fā)送順序存儲(chǔ)在同一隊(duì)列中。服務(wù)端順序存儲(chǔ)邏輯如下:

  • 相同消息組的消息按照先后順序被存儲(chǔ)在同一個(gè)隊(duì)列。

  • 不同消息組的消息可以混合在同一個(gè)隊(duì)列中,且不保證連續(xù)。

Apache RocketMQ 通過消費(fèi)者和服務(wù)端的協(xié)議保障消息消費(fèi)嚴(yán)格按照存儲(chǔ)的先后順序來處理。
由于順序消息都存儲(chǔ)在同一個(gè)隊(duì)列里,而同一個(gè)隊(duì)列只會(huì)由消費(fèi)組中的一個(gè)消費(fèi)者消費(fèi)
如需保證消息消費(fèi)的順序性,則必須滿足以下條件:

  • 投遞順序
    Apache RocketMQ 通過客戶端SDK和服務(wù)端通信協(xié)議保障消息按照服務(wù)端存儲(chǔ)順序投遞,但業(yè)務(wù)方消費(fèi)消息時(shí)需要嚴(yán)格按照接收---處理---應(yīng)答的語義處理消息,避免因異步處理導(dǎo)致消息亂序。

  • 有限重試
    Apache RocketMQ 順序消息投遞僅在重試次數(shù)限定范圍內(nèi),即一條消息如果一直重試失敗,超過最大重試次數(shù)后將不再重試,跳過這條消息消費(fèi),不會(huì)一直阻塞后續(xù)消息處理。
    對(duì)于需要嚴(yán)格保證消費(fèi)順序的場景,請(qǐng)務(wù)設(shè)置合理的重試次數(shù),避免參數(shù)不合理導(dǎo)致消息亂序。

順序消息的缺陷:

  • 發(fā)送順序消息無法利用集群的Failover特性,因?yàn)椴荒芨鼡QMessageQueue進(jìn)行重試

  • 因?yàn)榘l(fā)送的路由策略導(dǎo)致的熱點(diǎn)問題,可能某一些MessageQueue的數(shù)據(jù)量特別大

  • 消費(fèi)的并行讀依賴于分區(qū)數(shù)量
    一個(gè)隊(duì)列,只能被一個(gè)消費(fèi)者串行消費(fèi),不能多線程消費(fèi)

使用建議:

  • 串行消費(fèi),避免批量消費(fèi)導(dǎo)致亂序
    消息消費(fèi)建議串行處理,避免一次消費(fèi)多條消費(fèi),否則可能出現(xiàn)亂序情況。
    例如:發(fā)送順序?yàn)?->2->3->4,消費(fèi)時(shí)批量消費(fèi),消費(fèi)順序?yàn)?->23(批量處理,失?。?>23(重試處理)->4,此時(shí)可能由于消息3的失敗導(dǎo)致消息2被重復(fù)處理,最后導(dǎo)致消息消費(fèi)亂序。

  • 消息組盡可能打散,避免集中導(dǎo)致熱點(diǎn)
    Apache RocketMQ 保證相同消息組的消息存儲(chǔ)在同一個(gè)隊(duì)列中,如果不同業(yè)務(wù)場景的消息都集中在少量或一個(gè)消息組中,則這些消息存儲(chǔ)壓力都會(huì)集中到服務(wù)端的少量隊(duì)列或一個(gè)隊(duì)列中。容易導(dǎo)致性能熱點(diǎn),且不利于擴(kuò)展。一般建議的消息組設(shè)計(jì)會(huì)采用訂單ID、用戶ID作為順序參考,即同一個(gè)終端用戶的消息保證順序,不同用戶的消息無需保證順序
    因此建議將業(yè)務(wù)以消息組粒度進(jìn)行拆分,例如,將訂單ID、用戶ID作為消息組關(guān)鍵字,可實(shí)現(xiàn)同一終端用戶的消息按照順序處理,不同用戶的消息無需保證順序。

public class SelectMessageQueueByHash implements MessageQueueSelector {
    public SelectMessageQueueByHash() {
    }

    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object shardingKey) {
        int value = shardingKey.hashCode();
        if (value < 0) {
            value = Math.abs(value);
        }

        value %= mqs.size();
        return (MessageQueue)mqs.get(value);
    }
}
SendResult sendResultRMQ = this.defaultMQProducer.send(msgRMQ 
, new SelectMessageQueueByHash ()
, shardingKey);

事務(wù)消息

事務(wù)消息支持在分布式場景下保障消息生產(chǎn)和本地事務(wù)的最終一致性。
事務(wù)消息交互流程如下圖所示:

image.png

使用限制:

  • 消費(fèi)事務(wù)性
    Apache RocketMQ 事務(wù)消息保證本地主分支事務(wù)和下游消息發(fā)送事務(wù)的一致性,但不保證消息消費(fèi)結(jié)果和上游事務(wù)的一致性。因此需要下游業(yè)務(wù)分支自行保證消息正確處理,建議消費(fèi)端做好消費(fèi)重試,如果有短暫失敗可以利用重試機(jī)制保證最終處理成功。

  • 中間狀態(tài)可見性
    Apache RocketMQ 事務(wù)消息為最終一致性,即在消息提交到下游消費(fèi)端處理完成之前,下游分支和上游事務(wù)之間的狀態(tài)會(huì)不一致。因此,事務(wù)消息僅適合接受異步執(zhí)行的事務(wù)場景。

  • 事務(wù)超時(shí)機(jī)制
    Apache RocketMQ 事務(wù)消息的命周期存在超時(shí)機(jī)制,即半事務(wù)消息被生產(chǎn)者發(fā)送服務(wù)端后,如果在指定時(shí)間內(nèi)服務(wù)端無法確認(rèn)提交或者回滾狀態(tài),則消息默認(rèn)會(huì)被回滾。

事務(wù)消息涉及的主題:

  • RMQ_SYS_TRANS_HALF_TOPIC
    prepare消息的主題,事務(wù)消息首先先進(jìn)入到該主題。

  • RMQ_SYS_TRANS_OP_HALF_TOPIC
    當(dāng)消息服務(wù)器收到事務(wù)消息的提交或回滾請(qǐng)求后,會(huì)將消息存儲(chǔ)在該主題下。

回查流程:


image.png

事務(wù)消息在RocketMQ中處理流程:


image.png
image.png
@Component
@Slf4j
public class OrderTransactionalListener implements TransactionListener {
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        log.info("開始執(zhí)行本地事務(wù)....");
        LocalTransactionState state;
        try{
            String body = new String(message.getBody());
            OrderDTO order = JSONObject.parseObject(body, OrderDTO.class);
            orderService.createOrder(order,message.getTransactionId());
            state = LocalTransactionState.COMMIT_MESSAGE;
            log.info("本地事務(wù)已提交。{}",message.getTransactionId());
        }catch (Exception e){
            log.error("執(zhí)行本地事務(wù)失敗。{}",e);
            state = LocalTransactionState.ROLLBACK_MESSAGE;
        }
        return state;
    }

    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        log.info("開始回查本地事務(wù)狀態(tài)。{}",messageExt.getTransactionId());
        LocalTransactionState state;
        String transactionId = messageExt.getTransactionId();
        if (transactionLogService.get(transactionId)>0){
            state = LocalTransactionState.COMMIT_MESSAGE;
        }else {
            state = LocalTransactionState.UNKNOW;
        }
        log.info("結(jié)束本地事務(wù)狀態(tài)查詢:{}",state);
        return state;
    }
}
@Component
@Slf4j
public class TransactionalMsgProducer implements InitializingBean, DisposableBean {
    private String GROUP = "order_transactional";
    private TransactionMQProducer msgProducer;
    //用于執(zhí)行本地事務(wù)和事務(wù)狀態(tài)回查的監(jiān)聽器
    @Autowired
    private OrderTransactionalListener orderTransactionListener;
    //執(zhí)行任務(wù)的線程池
    private ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 60,
            TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(50));
    private void start(){
        try {
            this.msgProducer.start();
        } catch (MQClientException e) {
            log.error("msg producer starter occur error;",e);
        }
    }
    private void shutdown() {
        if(null != msgProducer) {
            try {
                msgProducer.shutdown();
            } catch (Exception e) {
                log.error("producer shutdown occur error;",e);
            }
        }
    }
    public TransactionSendResult send(String data, String topic) throws MQClientException {
        Message message = new Message(topic,data.getBytes());
        return this.msgProducer.sendMessageInTransaction(message, null);
    }
    @Override
    public void afterPropertiesSet() throws Exception {
        msgProducer = new TransactionMQProducer(GROUP);
        msgProducer.setNamesrvAddr("namesrvHost:ip");
        msgProducer.setSendMsgTimeout(Integer.MAX_VALUE);
        msgProducer.setExecutorService(executor);
        msgProducer.setTransactionListener(orderTransactionListener);
        this.start();
    }
    @Override
    public void destroy() throws Exception {
        this.shutdown();
    }
}
@Service
@Slf4j
public class OrderService {
    @Autowired
    private OrderMapper orderMapper;
    @Autowired
    private  TransactionLogMapper transactionLogMapper;
    @Autowired
    private TransactionalMsgProducer producer;
    //執(zhí)行本地事務(wù)時(shí)調(diào)用,將訂單數(shù)據(jù)和事務(wù)日志寫入本地?cái)?shù)據(jù)庫
    @Transactional
    @Override
    public void createOrder(OrderDTO orderDTO,String transactionId){
        //1.創(chuàng)建訂單
        Order order = new Order();
        BeanUtils.copyProperties(orderDTO,order);
        orderMapper.createOrder(order);
        //2.寫入事務(wù)日志
        TransactionLog log = new TransactionLog();
        log.setId(transactionId);
        log.setBusiness("order");
        log.setForeignKey(String.valueOf(order.getId()));
        transactionLogMapper.insert(log);
        log.info("create order success,order={}",orderDTO);
    }
    //前端調(diào)用,只用于向RocketMQ發(fā)送事務(wù)消息
    @Override
    public void createOrder(OrderDTO order) throws MQClientException {
        order.setId(snowflake.nextId());
        order.setOrderNo(snowflake.nextIdStr());
        producer.send(JSON.toJSONString(order),"order");
    }
}

定時(shí)/延時(shí)消息

當(dāng)消息寫入到Broker后,不能立刻被消費(fèi)者消費(fèi),需要等待指定的時(shí)長后才可被消費(fèi)處理的消息,稱為延時(shí)消息。

延時(shí)消息的延遲時(shí)長不支持隨意時(shí)長的延遲,是通過特定的延遲等級(jí)來指定的。默認(rèn)支持18個(gè)等級(jí)的延遲消息,延時(shí)等級(jí)定義為1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,設(shè)置延時(shí)等級(jí)的時(shí)候是設(shè)置延遲時(shí)間對(duì)應(yīng)的序號(hào)(從1開始)

延時(shí)消息寫入Commitlog文件的時(shí),是先寫入SCHEDULE_TOPIC_XXX主題的對(duì)應(yīng)延時(shí)等級(jí)隊(duì)列中的,SCHEDULE_TOPIC_XXX一共有18個(gè)消息隊(duì)列,分別對(duì)應(yīng)每個(gè)延時(shí)等級(jí),每個(gè)消息隊(duì)列的queueId=延時(shí)等級(jí)-1


image.png

延時(shí)消息處理過程:

  • 1.發(fā)送消息
    修改消息Topic名稱和隊(duì)列信息并存儲(chǔ)到CommitLog文件
  • 2.投遞
    轉(zhuǎn)發(fā)消息到延遲主題的CosumeQueue中
  • 3.消息
    延遲服務(wù)消費(fèi)SCHEDULE_TOPIC_XXXX消息
    1. 把消息復(fù)原并存儲(chǔ)
      將信息重新存儲(chǔ)到CommitLog中
  • 5.重新投遞
    將消息投遞到目標(biāo)Topic中
  • 6.消費(fèi)
    消費(fèi)者消費(fèi)目標(biāo)topic中的數(shù)據(jù)
Message msg = new Message("TopicTest" ,"TagA" ,("Hello RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET) );
// 設(shè)置延時(shí)消息的級(jí)別
msg.setDelayTimeLevel(2);

重試消息

消息收發(fā)過程中,若Consumer消費(fèi)某條消息失敗或消費(fèi)超時(shí),則會(huì)在重試間隔時(shí)間后,將消息重新投遞給Consumer消費(fèi),若達(dá)到最大重試次數(shù)后消息還沒有成功被消費(fèi),則消息將被投遞至死信隊(duì)列。您可以通過消費(fèi)死信隊(duì)列中的死信消息來恢復(fù)業(yè)務(wù)異常。
重試消息Topic為%RETRY%{ConsumeGroup}

消息重試主要功能行為包括:

  • 重試間隔:上一次消費(fèi)失敗或超時(shí)后,距下次消息可被重新消費(fèi)的間隔時(shí)間。
    1.無序消息
    間隔時(shí)間根據(jù)重試次數(shù)階梯變化,取值范圍:10秒~2小時(shí)。不支持自定義配置。
    (1)若最大重試次數(shù)小于等于16次,則每次重試的間隔時(shí)間會(huì)階梯變化,具體時(shí)間見下面表格
    (2)若最大重試次數(shù)大于16次,則超過16次的間隔時(shí)間均為2小時(shí)。
    2.順序消息
    間隔時(shí)間可通過自定義參數(shù)suspendTimeMillis取值進(jìn)行配置。參數(shù)取值范圍:10~30000,單位:毫秒,默認(rèn)值:1000毫秒,即1秒。
  • 最大重試次數(shù):消息消費(fèi)失敗后,可被重復(fù)投遞的最大次數(shù)。
    1.無序消息
    最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進(jìn)行配置。默認(rèn)值為16次,該參數(shù)取值無最大限制,建議使用默認(rèn)值。
    2.順序消息
    最大重試次數(shù)可通過自定義參數(shù)MaxReconsumeTimes取值進(jìn)行配置。該參數(shù)取值無最大限制。若未設(shè)置參數(shù)值,默認(rèn)最大重試次數(shù)為Integer.MAX。

配置采用覆蓋的方式生效,即最后啟動(dòng)的Consumer實(shí)例會(huì)覆蓋之前啟動(dòng)的實(shí)例的配置。因此,請(qǐng)確保同一Group ID下的所有Consumer實(shí)例設(shè)置的最大重試次數(shù)和重試間隔相同,否則各實(shí)例間的配置將會(huì)互相覆蓋

image.png

一條消息無論重試多少次,這些重試消息的Message ID都不會(huì)改變。
消息重試只針對(duì)集群消費(fèi)模式生效。
廣播消費(fèi)模式不提供失敗重試特性,即消費(fèi)失敗后,失敗消息不再重試,繼續(xù)消費(fèi)新的消息。

集群消費(fèi)模式下,消息消費(fèi)失敗后期望消息重試,需要在消息監(jiān)聽器接口的實(shí)現(xiàn)中明確進(jìn)行配置(三種方式任選一種):

  • 方式1:返回ConsumeConcurrentlyStatus.RECONSUME_LATER(推薦)
  • 方式2:返回Null
  • 方式3:拋出異常
consumer.setMaxReconsumeTimes(20);

死信隊(duì)列

死信隊(duì)列用于處理無法被正常消費(fèi)的消息,即死信消息
當(dāng)一條消息初次消費(fèi)失敗,RocketMQ會(huì)自動(dòng)進(jìn)行消息重試;達(dá)到最大重試次數(shù)后,若消費(fèi)依然失敗,則表明消費(fèi)者在正常情況下無法正確地消費(fèi)該消息,此時(shí),RocketMQ不會(huì)立刻將消息丟棄,而是將其發(fā)送到該消費(fèi)者對(duì)應(yīng)的特殊隊(duì)列中
在RocketMQ中,這種正常情況下無法被消費(fèi)的消息稱為死信消息,存儲(chǔ)死信消息的特殊隊(duì)列稱為死信隊(duì)列,死信消息的Topic為%DLQ%{ConsumerGroup}。

死信消息具有以下特性:

  • 不會(huì)再被消費(fèi)者正常消費(fèi)。
  • 有效期與正常消息相同,均為 3 天,3 天后會(huì)被自動(dòng)刪除。因此,請(qǐng)?jiān)谒佬畔a(chǎn)生后的 3 天內(nèi)及時(shí)處理。

死信隊(duì)列具有以下特性:

  • 一個(gè)死信隊(duì)列對(duì)應(yīng)一個(gè) Group ID, 而不是對(duì)應(yīng)單個(gè)消費(fèi)者實(shí)例。
  • 如果一個(gè) Group ID 未產(chǎn)生死信消息,消息隊(duì)列RocketMQ版不會(huì)為其創(chuàng)建相應(yīng)的死信隊(duì)列。
  • 一個(gè)死信隊(duì)列包含了對(duì)應(yīng) Group ID 產(chǎn)生的所有死信消息,不論該消息屬于哪個(gè) Topic。

一條消息進(jìn)入死信隊(duì)列,意味著某些因素導(dǎo)致消費(fèi)者無法正常消費(fèi)該消息,因此,通常需要您對(duì)其進(jìn)行特殊處理。排查可疑因素并解決問題后,您可以在RocketMQ控制臺(tái)重新發(fā)送該消息,讓消費(fèi)者重新消費(fèi)一次。

負(fù)載均衡

producer發(fā)送消息的負(fù)載均衡

對(duì)于非順序消息(普通消息、定時(shí)/延時(shí)消息、事務(wù)消息)場景,默認(rèn)且只能使用RoundRobin模式的負(fù)載均衡策略。
(1)輪詢模式(默認(rèn)),RoundRobin
RoundRobin模式下,生產(chǎn)者發(fā)送消息時(shí),以消息為粒度,按照輪詢方式將消息依次發(fā)送到指定主題中的所有可寫目標(biāo)隊(duì)列中,保證消息盡可能均衡地分布到所有隊(duì)列。


image.png

consumer訂閱消息的負(fù)載均衡

image.png

負(fù)載均衡策略算法:
(1)平均負(fù)載(默認(rèn)策略),AllocateMessageQueueAveragely
分配方式類似分頁,對(duì)topic下的所有MessageQueue進(jìn)行排序,對(duì)同一個(gè)消費(fèi)組的所有ConsumerId進(jìn)行排序
MessageQueue作為需要分頁的記錄,Consumer作為頁碼,計(jì)算每頁多少個(gè)MessageQueue,每頁有哪些MessageQueue
(2)環(huán)形平均負(fù)載,AllocateMessageQueueAveragelyByCircle
每個(gè)Consumer分配到MessageQueue的個(gè)數(shù)與平均負(fù)載相同,只是每個(gè)Consumer不是分配到連續(xù)的MessageQueue
(3)用戶自定義配置,AllocateMessageQueueByConfig
(4)機(jī)房負(fù)載策略,AllocateMessageQueueByMachineRoom
(5)機(jī)房負(fù)載策略改進(jìn)版本,AllocateMachineRoomNearBy
(6)一致性哈希策略,AllocateMessageQueueConsistentHash


image.png

一致性哈希有個(gè)哈希環(huán)的概念,哈希環(huán)由0到2^31-1,哈希上的點(diǎn)都是虛擬的,將所有的Consumer使用Consumer的Id進(jìn)行哈希計(jì)算,得到是哈希環(huán)上的點(diǎn),然后把點(diǎn)存儲(chǔ)到TreeMap里,將所有的MessageQueue一次進(jìn)行相同的哈希計(jì)算,按順時(shí)針方向找到距離計(jì)算出的哈希值最近的Consumer點(diǎn),MessageQueue最終就歸屬這個(gè)Consumer

負(fù)載均衡策略由每個(gè)消費(fèi)者自己執(zhí)行計(jì)算的
負(fù)載均衡觸發(fā)時(shí)機(jī):

  • 消費(fèi)者啟動(dòng)
    每當(dāng)實(shí)例的數(shù)量有變更,都會(huì)觸發(fā)一次消費(fèi)組的所有實(shí)例的負(fù)載均衡
  • 定時(shí)任務(wù),每隔20s執(zhí)行

消息存儲(chǔ)

消息存儲(chǔ)的整體架構(gòu)
消息存儲(chǔ)目錄結(jié)構(gòu)
每條消息格式

RocketMQ存儲(chǔ)的文件主要有Commitlog文件、ConsumeQueue文件、IndexFile文件,ConsumeQueue文件和IndexFile文件是基于Commitlog文件異步生成的:

  • Commitlog文件
    RocketMQ 將所有主題的消息存儲(chǔ)在同一個(gè)文件 ,確保消息發(fā)送時(shí)順序?qū)懳募?,盡最大的能力確保消息發(fā)送的高性能與高吞吐量
    文件名為文件的第一個(gè)消息的物理偏移量,文件名長度為20位,左邊補(bǔ)零,剩余為起始偏移量,比如00000000000000000000代表了第一個(gè)文件,起始偏移量為0,文件大小為1G=1073741824;當(dāng)?shù)谝粋€(gè)文件寫滿了,第二個(gè)文件為00000000001073741824,起始偏移量為1073741824,以此類推。
    因?yàn)橄⑹且粭l一條寫入到 commitlog 文件 ,寫入完成后,我們可以得到這條消息的物理偏移量。
    每條消息的物理偏移量是唯一的, commitlog 文件名是遞增的,可以根據(jù)消息的物理偏移量通過二分查找,定位消息位于那個(gè)文件中,并獲取到消息實(shí)體數(shù)據(jù)。

  • ConsumeQueue文件
    用于方便、快速基于topic檢索消息并消費(fèi)消息。
    每個(gè)topic包含多個(gè)消息隊(duì)列,每個(gè)消息隊(duì)列有多個(gè)ConsumerQueue。
    consumequeue文件名由20位數(shù)字構(gòu)成,表示當(dāng)前文件的第一個(gè)索引條目的起始偏移量。與commitLog文件名不同的是,consumequeue后續(xù)文件名是固定的,由于consumequeue文件大小是固定不變的。
    ConsumeQueue文件采取定長設(shè)計(jì),每一個(gè)條目共20個(gè)字節(jié),分別為8字節(jié)的CommitLog物理偏移量、4字節(jié)的消息長度、8字節(jié)tag hashcode,單個(gè)文件由30W個(gè)條目組成,可以像數(shù)組一樣隨機(jī)訪問每一個(gè)條目,每個(gè)ConsumeQueue文件大小約5.72M。
    MessageQueue與ConsumeQueue為1:N關(guān)系。

  • IndexFile文件
    加速消息的檢索性能,根據(jù)消息屬性快速從Commitlog文件文件中檢索消息。
    IndexFile文件名為文件創(chuàng)建的時(shí)間戳,單個(gè)IndexFile文件大小約為400M,一個(gè)IndexFile可以保存 2000W個(gè)索引條目,索引條目只存儲(chǔ)消息key的HashCode而不存消息key,是為了將索引條目設(shè)計(jì)為定長,方便檢索與定位條目,IndexFile的底層存儲(chǔ)設(shè)計(jì)為在文件系統(tǒng)中實(shí)現(xiàn)HashMap結(jié)構(gòu)
    indexFile的具體結(jié)構(gòu),主要包含三部分:索引頭Header、槽位表SlotTable(500W個(gè)槽位)、索引鏈表index list(2000W個(gè)索引數(shù)據(jù))


    image.png
image.png

消息查詢

  • 按照Message Id查詢消息
    RocketMQ中的MessageId的長度總共有16字節(jié),其中包含了消息存儲(chǔ)主機(jī)地址(IP地址和端口),消息Commit Log offset。
    “按照MessageId查詢消息”在RocketMQ中具體做法是:
    (1)Client端從MessageId中解析出Broker的地址(IP地址和端口)和Commit Log的偏移地址后封裝成一個(gè)RPC請(qǐng)求后通過Remoting通信層發(fā)送(業(yè)務(wù)請(qǐng)求碼:VIEW_MESSAGE_BY_ID)。
    (2)Broker端走的是QueryMessageProcessor,讀取消息的過程用其中的 commitLog offset 和 size 去 commitLog 中找到真正的記錄并解析成一個(gè)完整的消息返回。
  • 按照Message Key查詢消息
    主要通過Broker端的QueryMessageProcessor業(yè)務(wù)處理器來查詢,讀取消息的過程
    (1)找槽位=40byte +hash(topic + "#" + key) %500W*4byte ,槽位值slotValue=最新插入index的位置
    (2)遍歷單向鏈表:從slotValue找到最新index在整個(gè)索引文件中位置=40byte +500w * 4byte + slotValue * 20byte,然后根據(jù)單個(gè)索引文件的pre index值找到前一個(gè)索引,一直遍歷下去。index數(shù)據(jù)中key hash和時(shí)間區(qū)間都滿足,則匹配。添加到 List<Long> phyOffsets(commitLog的偏移量list)中。最終根據(jù)其中的commitLog offset從CommitLog文件中讀取消息的實(shí)體內(nèi)容。


    image.png

    image.png

消費(fèi)進(jìn)度

  • 廣播消息消費(fèi)時(shí),消費(fèi)進(jìn)度由消費(fèi)者本地保存
    本地存儲(chǔ)路徑為[{rocketmq.client.localOffsetStoreDir}|{user.home}/.rocketmq_offsets]/clientId/consumerGroup/offsets.json
  • 集群消息消費(fèi)時(shí),消費(fèi)進(jìn)度由消費(fèi)者主動(dòng)上報(bào)給broker
    消費(fèi)者消費(fèi)消息后的消費(fèi)進(jìn)度更新是先更新本地內(nèi)存,當(dāng)消費(fèi)者重新負(fù)載均衡活消費(fèi)者關(guān)閉時(shí),會(huì)上報(bào)消費(fèi)進(jìn)度到broker
    RemoteBrokerOffsetStore:
    private ConcurrentMap<MessageQueue, AtomicLong> offsetTable =
        new ConcurrentHashMap<MessageQueue, AtomicLong>();

消費(fèi)者上報(bào)到broker的消費(fèi)進(jìn)度,在broker中也是先保存在在本地內(nèi)存
ConsumerOffsetManager:

    private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, Long>> offsetTable =
        new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);

broker中保存的消費(fèi)進(jìn)度通過在broker BrokerController 啟動(dòng)定時(shí)任務(wù)每隔5秒持
久化到文件/store/config/consumerOffset.json,該文件存儲(chǔ)的json格式為


image.png
{
    "Topic@ConsumeGroup": {
        QueueId: Offset
    }
}
public class BrokerController {
    public boolean initialize() throws CloneNotSupportedException {
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.consumerOffsetManager.persist();
                    } catch (Throwable e) {
                        log.error("schedule persist consumerOffset error.", e);
                    }
                }
            }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
    }
}

消息刷盤

消息發(fā)送到broker后,broker不是直接把消息寫到commitlog文件,而是寫到操作系統(tǒng)的PageCache,之后再從PageCache刷盤到commitlog文件
消息刷盤方式有三種:

  • 同步刷盤
    使用GroupCommitService
  • 異步刷盤
    未開啟 TransientStorePool 緩存,使用 FlushRealTimeService
    開啟 TransientStorePool 緩存,使用 CommitRealService

高性能

  • 內(nèi)存映射mmap
    mmap 把文件映射到用戶空間里的虛擬內(nèi)存,省去了從內(nèi)核緩沖區(qū)復(fù)制到用戶空間的過程,文件中的位置在虛擬內(nèi)存中有了對(duì)應(yīng)的地址,可以像操作內(nèi)存一樣操作這個(gè)文件,相當(dāng)于已經(jīng)把整個(gè)文件放入內(nèi)存,但在真正使用到這些數(shù)據(jù)前卻不會(huì)消耗物理內(nèi)存,也不會(huì)有讀寫磁盤的操作,只有真正使用這些數(shù)據(jù)時(shí)。
    這個(gè)地址映射的過程,具體到代碼層面,RocketMQ是利用JDK NIO包下的MappedByteBuffer.map()來實(shí)現(xiàn)的,其底層就是基于mmap技術(shù)。
MappedByteBuffer mappedByteBuffer = fileChannel.map(FileChannel.MapMode.READ_WRITE, position, fileSize)

使用Mmap的限制:
a.Mmap映射的內(nèi)存空間釋放的問題;由于映射的內(nèi)存空間本身就不屬于JVM的堆內(nèi)存區(qū)(Java Heap),因此其不受JVM GC的控制,卸載這部分內(nèi)存空間需要通過系統(tǒng)調(diào)用 unmap()方法來實(shí)現(xiàn)。然而unmap()方法是FileChannelImpl類里實(shí)現(xiàn)的私有方法,無法直接顯示調(diào)用。RocketMQ中的做法是,通過Java反射的方式調(diào)用“sun.misc”包下的Cleaner類的clean()方法來釋放映射占用的內(nèi)存空間;
b.MappedByteBuffer內(nèi)存映射大小限制;因?yàn)槠湔加玫氖翘摂M內(nèi)存(非JVM的堆內(nèi)存),大小不受JVM的-Xmx參數(shù)限制,但其大小也受到OS虛擬內(nèi)存大小的限制。一般來說,一次只能映射1.5~2G 的文件至用戶態(tài)的虛擬內(nèi)存空間,這也是為何RocketMQ默認(rèn)設(shè)置單個(gè)CommitLog日志數(shù)據(jù)文件為1G的原因了;
c.使用MappedByteBuffe的其他問題;會(huì)存在內(nèi)存占用率較高和文件關(guān)閉不確定性的問題;


image.png
image.png
image.png
  • PageCache
    PageCache是OS對(duì)文件的緩存,用于加速對(duì)文件的讀寫。一般來說,程序?qū)ξ募M(jìn)行順序讀寫的速度幾乎接近于內(nèi)存的讀寫訪問,這里的主要原因就是在于OS使用PageCache機(jī)制對(duì)讀寫訪問操作進(jìn)行了性能優(yōu)化,將一部分的內(nèi)存用作PageCache
    (1)對(duì)于數(shù)據(jù)文件的讀取
    如果一次讀取文件時(shí)出現(xiàn)未命中PageCache的情況,OS從物理磁盤上訪問讀取文件的同時(shí),會(huì)順序?qū)ζ渌噜弶K的數(shù)據(jù)文件進(jìn)行預(yù)讀取。這樣,只要下次訪問的文件已經(jīng)被加載至PageCache時(shí),讀取操作的速度基本等于訪問內(nèi)存。
    (2)對(duì)于數(shù)據(jù)文件的寫入
    OS會(huì)先寫入至Cache內(nèi),隨后通過異步的方式由pdflush內(nèi)核線程將Cache內(nèi)的數(shù)據(jù)刷盤至物理磁盤上。
    對(duì)于文件的順序讀寫操作來說,讀和寫的區(qū)域都在OS的PageCache內(nèi),此時(shí)讀寫性能接近于內(nèi)存。
    RocketMQ的大致做法是,將數(shù)據(jù)文件映射到OS的虛擬內(nèi)存中(通過JDK NIO的MappedByteBuffer),寫消息的時(shí)候首先寫入PageCache,并通過異步刷盤的方式將消息批量的做持久化(同時(shí)也支持同步刷盤)。
    訂閱消費(fèi)消息時(shí)(對(duì)CommitLog操作是隨機(jī)讀?。?,由于PageCache的局部性熱點(diǎn)原理且整體情況下還是從舊到新的有序讀,因此大部分情況下消息還是可以直接從Page Cache中讀取,不會(huì)產(chǎn)生太多的缺頁(Page Fault)中斷而從磁盤讀取。


    image.png

PageCache機(jī)制也不是完全無缺點(diǎn)的,當(dāng)遇到OS進(jìn)行臟頁回寫,內(nèi)存回收,內(nèi)存swap等情況時(shí),就會(huì)引起較大的消息讀寫延遲。
對(duì)于這些情況,RocketMQ采用了多種優(yōu)化技術(shù),比如內(nèi)存預(yù)分配,文件預(yù)熱,mlock系統(tǒng)調(diào)用等,來保證在最大可能地發(fā)揮PageCache機(jī)制優(yōu)點(diǎn)的同時(shí),盡可能地減少其缺點(diǎn)帶來的消息讀寫延遲。

  • 預(yù)先分配MappedFile
    在消息寫入過程中(調(diào)用CommitLog的putMessage()方法),CommitLog會(huì)先從MappedFileQueue隊(duì)列中獲取一個(gè) MappedFile,如果沒有就新建一個(gè)。
    這里,MappedFile的創(chuàng)建過程是將構(gòu)建好的一個(gè)AllocateRequest請(qǐng)求(具體做法是,將下一個(gè)文件的路徑、下下個(gè)文件的路徑、文件大小為參數(shù)封裝為AllocateRequest對(duì)象)添加至隊(duì)列中,后臺(tái)運(yùn)行的AllocateMappedFileService服務(wù)線程(在Broker啟動(dòng)時(shí),該線程就會(huì)創(chuàng)建并運(yùn)行),會(huì)不停地run,只要請(qǐng)求隊(duì)列里存在請(qǐng)求,就會(huì)去執(zhí)行MappedFile映射文件的創(chuàng)建和預(yù)分配工作,分配的時(shí)候有兩種策略,一種是使用Mmap的方式來構(gòu)建MappedFile實(shí)例,另外一種是從TransientStorePool堆外內(nèi)存池中獲取相應(yīng)的DirectByteBuffer來構(gòu)建MappedFile(ps:具體采用哪種策略,也與刷盤的方式有關(guān))。并且,在創(chuàng)建分配完下個(gè)MappedFile后,還會(huì)將下下個(gè)MappedFile預(yù)先創(chuàng)建并保存至請(qǐng)求隊(duì)列中等待下次獲取時(shí)直接返回。RocketMQ中預(yù)分配MappedFile的設(shè)計(jì)非常巧妙,下次獲取時(shí)候直接返回就可以不用等待MappedFile創(chuàng)建分配所產(chǎn)生的時(shí)間延遲。
image.png
  • 文件預(yù)熱、mlock系統(tǒng)調(diào)用
    (1)mlock系統(tǒng)調(diào)用:其可以將進(jìn)程使用的部分或者全部的地址空間鎖定在物理內(nèi)存中,防止其被交換到swap空間。對(duì)于RocketMQ這種的高吞吐量的分布式消息隊(duì)列來說,追求的是消息讀寫低延遲,那么肯定希望盡可能地多使用物理內(nèi)存,提高數(shù)據(jù)讀寫訪問的操作效率。
    (2)文件預(yù)熱:預(yù)熱的目的主要有兩點(diǎn);第一點(diǎn),由于僅分配內(nèi)存并進(jìn)行mlock系統(tǒng)調(diào)用后并不會(huì)為程序完全鎖定這些內(nèi)存,因?yàn)槠渲械姆猪摽赡苁菍憰r(shí)復(fù)制的。因此,就有必要對(duì)每個(gè)內(nèi)存頁面中寫入一個(gè)假的值。其中,RocketMQ是在創(chuàng)建并分配MappedFile的過程中,預(yù)先寫入一些隨機(jī)值至Mmap映射出的內(nèi)存空間里。第二,調(diào)用Mmap進(jìn)行內(nèi)存映射后,OS只是建立虛擬內(nèi)存地址至物理地址的映射表,而實(shí)際并沒有加載任何文件至內(nèi)存中。程序要訪問數(shù)據(jù)時(shí)OS會(huì)檢查該部分的分頁是否已經(jīng)在內(nèi)存中,如果不在,則發(fā)出一次缺頁中斷。這里,可以想象下1G的CommitLog需要發(fā)生多少次缺頁中斷,才能使得對(duì)應(yīng)的數(shù)據(jù)才能完全加載至物理內(nèi)存中(ps:X86的Linux中一個(gè)標(biāo)準(zhǔn)頁面大小是4KB)?RocketMQ的做法是,在做Mmap內(nèi)存映射的同時(shí)進(jìn)行madvise系統(tǒng)調(diào)用,目的是使OS做一次內(nèi)存映射后對(duì)應(yīng)的文件數(shù)據(jù)盡可能多的預(yù)加載至內(nèi)存中,從而達(dá)到內(nèi)存預(yù)熱的效果。

高可用

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容