RocketMQ消費(fèi)位置

RocketMQ創(chuàng)建消費(fèi)者的時(shí)指定了Topic主題及Tag,我們發(fā)現(xiàn)新創(chuàng)建的消費(fèi)者消費(fèi)不了歷史的數(shù)據(jù),只能消費(fèi)掉創(chuàng)建以后消費(fèi)者發(fā)送的數(shù)據(jù)。這是什么原因,我們能把所有的消息都消費(fèi)嗎?,我們可以指定需要消費(fèi)的消息的時(shí)間嗎?答案是肯定的,下面我們具體分析一下。

前提:我們討論是集群模式下的,廣播模式也是一樣的,只是示例代碼我們用集群模式來(lái)討論。

消息消費(fèi)的位置目前提供了三種方式CONSUME_FROM_LAST_OFFSET(隊(duì)列尾部消費(fèi))、CONSUME_FROM_FIRST_OFFSET(隊(duì)列頭部消費(fèi))、CONSUME_FROM_TIMESTAMP(指定消費(fèi)時(shí)間點(diǎn))。

public enum ConsumeFromWhere {
    CONSUME_FROM_LAST_OFFSET,
    @Deprecated
    CONSUME_FROM_LAST_OFFSET_AND_FROM_MIN_WHEN_BOOT_FIRST,
    @Deprecated
    CONSUME_FROM_MIN_OFFSET,
    @Deprecated
    CONSUME_FROM_MAX_OFFSET,
    CONSUME_FROM_FIRST_OFFSET,
    CONSUME_FROM_TIMESTAMP,
}

分析源碼我們看到有6種方式,其他三種已經(jīng)廢棄掉了,不做討論。

1、從隊(duì)列尾部消費(fèi)(默認(rèn))

我們從新創(chuàng)建一個(gè)消費(fèi)組來(lái)消費(fèi)某個(gè)主題下的消息時(shí),歷史消息沒(méi)有被消費(fèi),當(dāng)生產(chǎn)者重新發(fā)送消息時(shí)則會(huì)接收到最新的,我們分析下其在哪設(shè)置的。

image

當(dāng)創(chuàng)建消費(fèi)者的時(shí)候內(nèi)置了一些參數(shù),從隊(duì)列尾部消費(fèi)。

從隊(duì)列尾部消費(fèi)導(dǎo)致歷史消息消費(fèi)不了,會(huì)丟失一部分?jǐn)?shù)據(jù),如果僅僅是狀態(tài)數(shù)據(jù)則可以這樣設(shè)置,如果是業(yè)務(wù)數(shù)據(jù)導(dǎo)致數(shù)據(jù)丟失。

對(duì)于設(shè)置這個(gè)參數(shù)僅對(duì)于消費(fèi)組第一次創(chuàng)建時(shí)生效,后面再次設(shè)置不生效,因?yàn)樵撓M(fèi)組在服務(wù)端已經(jīng)記錄了消費(fèi)的進(jìn)度,已有進(jìn)度位置。

查看消費(fèi)進(jìn)度文件的位置,我們根據(jù)上幾節(jié)的內(nèi)容查看TopicTest主題下的這個(gè)consumer_test_clustering消費(fèi)組的消息消費(fèi)的進(jìn)度。查看Broker-a服務(wù)器節(jié)點(diǎn)上的信息。

查看消費(fèi)的消費(fèi)進(jìn)度先根據(jù)可視化界面查看

image

查看服務(wù)器文件上的消費(fèi)進(jìn)度信息:/usr/local/rocketmq-all-4.2.0/store/config/consumerOffset.json

image

2、從隊(duì)列頭部消費(fèi)

編寫(xiě)Consumer

public class Consumer1 {    
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_first_offset");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("TopicTest", "*");
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer1===  存入時(shí)間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
                }
            });
            consumer.start();
            System.out.println("Consumer1===啟動(dòng)成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

設(shè)置了消費(fèi)組:consumer.setConsumerGroup("consumer_first_offset");

設(shè)置了消費(fèi)位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

查看其結(jié)果

image

從頭開(kāi)始消費(fèi)是指目前還儲(chǔ)存在broker的上的消息全部消費(fèi)一遍,因?yàn)镽ocketMQ會(huì)將消息持久化到磁盤(pán)文件中,時(shí)間長(zhǎng)就會(huì)導(dǎo)致磁盤(pán)文件會(huì)很多,RocketMQ有一種機(jī)制,只是保留一段時(shí)間的消息,之前的消息會(huì)刪除,可以指定時(shí)間點(diǎn)刪除(無(wú)論消息是否被消費(fèi),到時(shí)間點(diǎn)文件都會(huì)被刪除)

3、從指定時(shí)間點(diǎn)消費(fèi)

消費(fèi)者代碼

public class Consumer1 {    
    public static void main(String[] args){
        try {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
            consumer.setConsumerGroup("consumer_time_offset");
            consumer.setNamesrvAddr("10.10.12.203:9876;10.10.12.204:9876");
            consumer.subscribe("TopicTest", "*");
            //可以設(shè)置從什么時(shí)間開(kāi)始消費(fèi),配合setConsumeTimestamp一起使用默認(rèn)半小時(shí)之前的,格式y(tǒng)yyyMMddhhmmss
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP);
            consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L)); 
            consumer.registerMessageListener(new MessageListenerConcurrently(){

                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> paramList,
                        ConsumeConcurrentlyContext paramConsumeConcurrentlyContext) {
                    try {
                        for(MessageExt msg : paramList){
                            String msgbody = new String(msg.getBody(), "utf-8");
                            SimpleDateFormat sd = new SimpleDateFormat("YYYY-MM-dd HH:mm:ss");
                            Date date = new Date(msg.getStoreTimestamp());
                            System.out.println("Consumer1===  存入時(shí)間 :  "+ sd.format(date) +" == MessageBody: "+ msgbody);//輸出消息內(nèi)容
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER; //稍后再試
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; //消費(fèi)成功
                }
            });
            consumer.start();
            System.out.println("Consumer1===啟動(dòng)成功!");
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

設(shè)置消費(fèi)位置:consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); 設(shè)置消費(fèi)的時(shí)間點(diǎn):consumer.setConsumeTimestamp("20181222171201");

如果從消息進(jìn)度服務(wù)OffsetStore讀取到MessageQueue中的偏移量大于等于0,則使用讀取到的偏移量,只有讀取到的偏移量小于0時(shí)上述策略才會(huì)生效。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容