RocketMQ閱讀筆記之消息消費的消息拉取

消息消費,簡而言之就是消費者從消息隊列里讀取數(shù)據(jù)。消費者有兩種消費方式:

  1. Push方式。消息服務器接收到信息后,主動把消息推送給消費者,實時性高。但是這樣加大了消息服務器的工作壓力,會影響其性能。除此之外,不同消費者的處理信息的能力不同,可能無法及時的消費消息,造成 慢消費 問題。相關類是DefaultMQPushConsumer。

  2. Pull方式。消費者主動向消息服務器拉取消息,主動權在消費者這里。主要的問題是循環(huán)拉取消息的間隔不好設定,設置的間隔時間太久會增加消息的延遲;設置的事件間隔太短,如果消費服務器里沒有可用的消息,那么會造成很多無用的請求開銷,影響其性能。相關類是DefaultMQPullConsumer。

消息消費以組的模式開展,一個消費組里可以包含多個消費者,每一個消費組可以訂閱多個主題,消費組之間有集群模式廣播模式兩種消費模式。

  • 集群模式:
    同一個ConsumerGroup里的每個Consumer只消費所訂閱消息的一部分內容,同一個ConsumerGroup里的所有的Consumer消費的內容合起來才是訂閱的Topic內容的整體,從而達到負載均衡的目的。
  • 廣播模式
    同一個ConsumerGroup里的每個Consumer都能消費到所訂閱Topic的全部信息,也就是一個消息會被多次分發(fā),被多個Consumer消費。

首先先將一下DefaultMQPushConsumer的相關操作。

DefaultMQPushConsumer的啟動

啟動方法是在DefaultMQPushConsumerImpl.start()方法。

首先會根據(jù)服務狀態(tài)選擇策略。定義的狀態(tài)如下所示。

public enum ServiceState {
    /**
     * Service just created,not start
     */
    CREATE_JUST,
    /**
     * Service Running
     */
    RUNNING,
    /**
     * Service shutdown
     */
    SHUTDOWN_ALREADY,
    /**
     * Service Start failure
     */
    START_FAILED;
}
this.serviceState = ServiceState.START_FAILED;

如果是RUNNING、START_FAILED,則跳過該環(huán)節(jié),直接進行下一環(huán)節(jié)。如果是SULTdOWN_ALREDAY,則拋出異常。如果是CREATE_JUST,則需要進入執(zhí)行該環(huán)節(jié)的代碼。
進入里面的區(qū)域時,先預設serviceState的值為START_FAILE,在執(zhí)行一段操作后,如果注冊消費者沒有成功,則修改serviceState為CREATE_JUST,并拋出異常;如果順利執(zhí)行則修改serviceState為RUNNING。

  // 驗證配置
  this.checkConfig();
  this.copySubscription();

訂閱主題訂閱消息SubscriptionData,并放入到RebalanceImpl的訂閱消息中。訂閱關系來源主要有兩個。

  • defaultMQPushConsumer.getSubscription()
  • 訂閱重試主題消息。RocketMQ消息重試是以消費組為單位,而不是主題,消息重試主題為%RETRY%+消費組名。
 private void copySubscription() throws MQClientException {
        try {
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                    break;
                default:
                    break;
            }
        } catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }

如果當前是集群消費模式,修改實例名為Pid。

if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) {
    this.defaultMQPushConsumer.changeInstanceNameToPID();
}

初始化MQClientInstance、ReblanceImple(消息重新負載實現(xiàn)類)

 this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);

                // 設置負載均衡器
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                //設置消費集群模式
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                //Queue allocation algorithm specifying how message queues are allocated to each consumer clients.
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
    
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
    
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);

處理offset存儲方式。
如果消息消費是集群模式,那么消息進度保存在Broker上;如果是廣播模式,那么消息消費進度存儲在消費端。

  if (this.defaultMQPushConsumer.getOffsetStore() != null) {
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                } else {
                    switch (this.defaultMQPushConsumer.getMessageModel()) {
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    }
                }
                this.offsetStore.load();

根據(jù)MessageListener的具體實現(xiàn)方式選擇具體的消息拉取線程實現(xiàn)
可以選擇順序消息消費服務或者并行消息消費服務
最后執(zhí)行ConsumerMessageService主要負責消費消息,內部維護一個線程池。

if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                }

                this.consumeMessageService.start();

向MQClientInstance注冊消費者,并啟動MQClientInstance,在一個JVM中的所有消費者、生產(chǎn)者持有同一個MQClientInstance,MQClientInstance只會啟動一次。

boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) {
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                }

                mQClientFactory.start();

訂閱關系改變,更新NameServer的訂閱關系表。
檢查客戶端狀態(tài)
發(fā)送心跳條
喚醒執(zhí)行消費者負載均衡。

 this.updateTopicSubscribeInfoWhenSubscriptionChanged();
 this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        //馬上rebalance
 this.mQClientFactory.rebalanceImmediately();

在上面提到了offset的存儲問題。現(xiàn)在先講一下什么是offset和存儲規(guī)則。

消息消費進度記錄

消息消費者在消費一批消息后,需要記錄該批消息已經(jīng)消費完畢,否則當消費者重新啟動時又得從消息消費隊列的開始消費,這樣顯然會產(chǎn)生問題。一次消息消費后會從ProcessQueue處理隊列中移除該批消息,返回ProcessQueue最小偏移量,并存入到消息進度表中。該消息進度表的存儲位置和機制是一個重要的問題。

由上面可知,有兩種消費模式,廣播模式集群模式

廣播模式: 同一消費組的所有消費者都會消費該主題下的所有消息。即同一個消息會被所有消費者消費,所以每個消費者應該各自獨立有一個記錄消費進度的文件。
廣播模式下消息進度存儲在消費者本地,主要類是LocalFileOffsetStore.

集群模式: 同一消費者的所有消費者共同消費該主題下的所有消息,一個消息只能被一個消費者所消費,即每個消費者消費的是該消費主題下的部分消息,所以消息消費進度記錄被所有消費者所共享。
集群模式消息進度存儲文件存放在消息服務端Broker,主要類中RemoteBrokerOffsetStore。

幾個重要有關消息消費的類

每一個PullRequest代表一個消費的分組單元
PullRequest會記錄一個topic記錄對應的consumerGroup的拉取進度。

PullRequest

public class PullRequest {
    private String consumerGroup;
    # 待拉取消息隊列
    private MessageQueue messageQueue;
    # 消息處理隊列,從Broker拉取到的消費先存入到ProcessQueue,然后提交到消費者消費線程池消費
    private ProcessQueue processQueue;
    # 待拉取的MessageQueue偏移量
    private long nextOffset;
    # 是否被鎖定
    private boolean lockedFirst = false;

ProcessQueue是MessageQueue在消費端的重現(xiàn)、快照。

ProcessQueue

public class ProcessQueue {
    public final static long REBALANCE_LOCK_MAX_LIVE_TIME =
        Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockMaxLiveTime", "30000"));
    public final static long REBALANCE_LOCK_INTERVAL = Long.parseLong(System.getProperty("rocketmq.client.rebalance.lockInterval", "20000"));
    private final static long PULL_MAX_IDLE_TIME = Long.parseLong(System.getProperty("rocketmq.client.pull.pullMaxIdleTime", "120000"));
    private final Logger log = ClientLogger.getLog();
    private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
    # 消息存儲容器,鍵為消息在ConsumeQueue中的偏移量,value為消息實體
    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
    # ProcessQueue中的消息總數(shù)
    private final AtomicLong msgCount = new AtomicLong();
    private final Lock lockConsume = new ReentrantLock();
    # 消息臨時存儲容器,消息消費線程從ProcessQueue的msgTreeMap中取出消息前,先將消息臨時存儲在msgTreeMapTemp中。
    private final TreeMap<Long, MessageExt> msgTreeMapTemp = new TreeMap<Long, MessageExt>();
    private final AtomicLong tryUnlockTimes = new AtomicLong(0);
    private volatile long queueOffsetMax = 0L;
    # 當前ProcessQueue是否被丟棄
    private volatile boolean dropped = false;
    # 上一次開始消息拉取時間戳
    private volatile long lastPullTimestamp = System.currentTimeMillis();
    # 上一次消息消費時間戳
    private volatile long lastConsumeTimestamp = System.currentTimeMillis();
    private volatile boolean locked = false;
    private volatile long lastLockTimestamp = System.currentTimeMillis();
    private volatile boolean consuming = false;
    private volatile long msgAccCnt = 0;

DefaultMQPushConsumer 客戶端消費者實現(xiàn)

public class DefaultMQPushConsumer extends ClientConfig implements MQPushConsumer {

    /**
     * Internal implementation. Most of the functions herein are delegated to it.
     */
    protected final transient DefaultMQPushConsumerImpl defaultMQPushConsumerImpl;

    # 消費者組名
    private String consumerGroup;

    # 消費模式
    private MessageModel messageModel = MessageModel.CLUSTERING;

   # 消費者從哪個位置消費
   # CONSUME_FROM_LAST_OFFSET: 第一次啟動從隊列最后位置消費,后續(xù)再接著上次消費的進度開始消費
   # CONSUME_FROM_First_OFFSET: 第一次啟動從隊列開始位置消費,后續(xù)再接著上次消費的進度開始消費
   # CONSUME_FROM_TimeStamp: 第一次啟動從指定時間點位置消費,后續(xù)再接著上次消費的進度開始消費
   # 這里的第一次啟動指的是該消費者之前沒有消費過該消息隊列,如果消費過,則會在Broker端記錄消費位置,如果該消費者掛了再啟動時,會自動從上次消費的地方開始。
    private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;

   
    private String consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - (1000 * 60 * 30));
    # 消費分配策略,默認消息平均分配給所有客戶端
    private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
    # topic對應的訂閱tag
    private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
    # 客戶端消費消息的實現(xiàn)類
    private MessageListener messageListener;
    # 存儲實現(xiàn),本地存儲或者Broker存儲
    private OffsetStore offsetStore;
    # Minimum consumer thread number
    private int consumeThreadMin = 20;
    # Max consumer thread number
    private int consumeThreadMax = 64;
    # Threshold for dynamic adjustment of the number of thread pool
    private long adjustThreadPoolNumsThreshold = 100000;
    # 單隊列并行消費的最大跨度,用于流量控制
    private int consumeConcurrentlyMaxSpan = 2000;
    # 一個queue最大消費的消息個數(shù),用于流量控制
    private int pullThresholdForQueue = 1000;
    # 消息拉取時間間隔,默認為0,
    private long pullInterval = 0;
    # 并發(fā)消費時,一次消費消息的數(shù)量
    private int consumeMessageBatchMaxSize = 1;
    # 消息拉取一次的數(shù)量
    private int pullBatchSize = 32;

     # Whether update subscription relationship when every pull
    private boolean postSubscriptionWhenPull = false;
    # Whether the unit of subscription group
    private boolean unitMode = false;

    /**
     * Max re-consume times. -1 means 16 times.
     * </p>
     *
     * If messages are re-consumed more than {@link #maxReconsumeTimes} before success, it's be directed to a deletion
     * queue waiting.
     */
    private int maxReconsumeTimes = -1;

    /**
     * Suspending pulling time for cases requiring slow pulling like flow-control scenario.
     */
    private long suspendCurrentQueueTimeMillis = 1000;

    # Maximum amount of time in minutes a message may block the consuming thread.
    private long consumeTimeout = 15;

DefaultMQPushConsumerImpl 消費者具體實現(xiàn)類

public class DefaultMQPushConsumerImpl implements MQConsumerInner {
    /**
     * Delay some time when exception occur
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION = 3000;
    /**
     * Flow control interval
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL = 50;
    /**
     * Delay some time when suspend pull service
     */
    private static final long PULL_TIME_DELAY_MILLS_WHEN_SUSPEND = 1000;
    private static final long BROKER_SUSPEND_MAX_TIME_MILLIS = 1000 * 15;
    private static final long CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND = 1000 * 30;
    private final Logger log = ClientLogger.getLog();
    private final DefaultMQPushConsumer defaultMQPushConsumer;
    
    //負載均衡實現(xiàn)類 
    private final RebalanceImpl rebalanceImpl = new RebalancePushImpl(this);
    private final ArrayList<FilterMessageHook> filterMessageHookList = new ArrayList<FilterMessageHook>();
    private final long consumerStartTimestamp = System.currentTimeMillis();
    private final ArrayList<ConsumeMessageHook> consumeMessageHookList = new ArrayList<ConsumeMessageHook>();
    private final RPCHook rpcHook;
    private volatile ServiceState serviceState = ServiceState.CREATE_JUST;
    private MQClientInstance mQClientFactory;
    private PullAPIWrapper pullAPIWrapper;
    private volatile boolean pause = false;
    private boolean consumeOrderly = false;
    private MessageListener messageListenerInner;
    private OffsetStore offsetStore;
    private ConsumeMessageService consumeMessageService;
    private long flowControlTimes1 = 0;
    private long flowControlTimes2 = 0;

消息拉取基本流程

具體的實現(xiàn)類是DefaultMQPushConsumerImpl
消息拉取主要有3個主要步驟:

  • 消費拉取客戶端消息拉取請求封裝
  • 消息服務器查找并返回消息
  • 消息拉取客戶端處理返回的消息
  1. 客戶端封裝消息拉取請求

    1.1 從PullRequest中獲取ProcessQueue,如果處理隊列當前狀態(tài)未被丟棄,則更新ProcessQueue的lastPullTimestamp為當前時間戳;如果當前消費者被掛起,則將拉取任務延遲1s再次放入到PullMessageService的拉取任務中,結束本次消息拉取。

    1.2 進行消息拉取流量控制

    PushConsumer有個線程池,消息處理邏輯在各個線程里同時執(zhí)行,在PushConsumer運行的時候,每個Message Queue都有一個對用的ProcessQueue對象,保存了這個Message Queue 消息處理狀態(tài)的快照。

    ProcessQueue對象里主要內容是一個TreeMap和一個讀寫鎖。TreeMap里以Message Queue的Offset作為Key,以消息內容的引用為Value,保存了所有從MessageQueue獲取的,但是還未被處理的信息,讀寫鎖控制著多個線程對TreeMap對象的并發(fā)處理。

    流量控制策略:

    • 消息處理總數(shù),如果ProcessQueue當前處理的消息超過了pullThresholdForQueue=1000將觸發(fā)流量控制,放棄本次拉取任務,并且該隊列的下一次拉取任務將在50毫秒后才加入到拉取任務隊列中。
    • ProcessQueue中隊列最大偏移量與最小偏移量的間距,不能超過consumeConcurrencyMaxSpan,否則觸發(fā)流量控制。
      1.3 拉取該主題訂閱信息,如果為空,結束本次消息拉取,關于該隊列的下一次拉取任務延遲3秒。
      1.4 構建消息拉取系統(tǒng)標記。
      1.5 調用PullAPIWrapper.pullKernelImpl方法后與服務端交互。
      1.6 根據(jù)brokerName、BrokerId從MQClientInstance中獲取Broker地址,在整個RocketMQ Broker的部署結構中,相同名稱的Broker構成主從結構,其BrokerId會不一樣,在每次拉取消息后,會給出一個建議,下次拉取從主節(jié)點還是從節(jié)點拉取。

然后是消息服務端Broker組裝消息。會根據(jù)處理的結果返回不同的狀態(tài)編碼。
主要有下面幾種狀態(tài)編碼。

  • SUCCESS : 成功
  • PULL_RETRY_IMMEDIATElY : 立即重試
  • PULL_OFFSET_MOVED : 偏移量移動
  • PULL_NOT_FOUND : 未找到消息
  1. 消息拉取客戶端處理消息。

先分析狀態(tài)編碼為SUCCESS的后續(xù)處理步驟。

  • 更新PullRequest的下一次拉取偏移量,如果msgFoundList為空,則立即將PullRequest放入到PullMessageService的pullRequestQueue,以便PullMessageService能及時喚醒并再次執(zhí)行消息拉取。
  • 將拉取到的消息存放到ProcessQueue,然后將拉取到的消息提交到ConsumeMessageService中供消費者消費。
  • 將消費提交給消費者線程之后PullCallBack將立即返回,可以說本次消息拉取順利完成,然后根據(jù)pullInterval參數(shù),如果pullInterval > 0,則等待pullInterval毫秒后將PullRequest對象放入到PullMessageService的pullRequestQueue中,該消息隊列的下次拉取即將被激活,達到持續(xù)消息拉取,實現(xiàn)準實時拉取消息的效果。

如果拉取結果為偏移量非法,首先將ProcessQueue設置dropped為ture,表示丟棄該消息隊列,意味著ProcessQueue中拉取的消息將停止消費,然后根據(jù)服務端下一次校對的偏移量嘗試更新消息消費進度,然后嘗試持久化消息消費進度,并將該消息隊列從RebalanceImpl的處理隊列中移除,意味著暫停該消息隊列的消息拉取,等待下一次消息隊列重新加載。

這篇對消息拉取的筆記就暫時寫到這里,下一篇阿靜詳細介紹關于消息拉取失敗后的長輪詢方法。

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

  • 每個人的想法不同 , RocketMQ 介紹的時候就說 是阿里從他們使用的上 解耦出來 近一步簡化 便捷的 目...
    樓亭樵客閱讀 460評論 0 0
  • consumer 1.啟動 有別于其他消息中間件由broker做負載均衡并主動向consumer投遞消息,Rock...
    veShi文閱讀 5,081評論 0 2
  • 消息從producer發(fā)送到了broker之后,消息的訂閱者就可以訂閱消費消息。 roketmq消息拉取方式有兩種...
    圣村的希望閱讀 1,995評論 2 2
  • 簡介 RocketMQ 特點 RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給 Ap...
    預流閱讀 39,515評論 7 55
  • 2017年10月8日如是家人溫玲,種種子第69天 發(fā)心:我今不是為了我個人而聞思修,而是為了六道輪回一切如母有情眾...
    溫馨霏玲閱讀 345評論 2 2

友情鏈接更多精彩內容