RocketMQ源碼分析----Consumer消費(fèi)進(jìn)度相關(guān)

在Consumer消費(fèi)的時候總有幾個疑問:

  • 消費(fèi)完成后,這個消費(fèi)進(jìn)度存在哪里
  • 消費(fèi)完成后,還沒保存消費(fèi)進(jìn)度就掛了,會不會導(dǎo)致重復(fù)消費(fèi)

Consumer

消費(fèi)進(jìn)度保存

消費(fèi)完成后,會返回一個ConsumeConcurrentlyStatus.CONSUME_SUCCESS告訴MQ消費(fèi)成功,以MessageListener的consumeMessage為入口分析。
消費(fèi)的時候,是以ConsumeRequest類為Runnable對象,在線程池中進(jìn)行處理的,即ConsumeRequest的run方法會處理這個狀態(tài)

        @Override
        public void run() {

            //....
            status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
            // 如果這個ProcessQueue廢棄了,則不處理
            if (!processQueue.isDropped()) {
                ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
            }
        }

在消費(fèi)完成后,將status交給processConsumeResult處理,代碼如下

    public void processConsumeResult(//
                                     final ConsumeConcurrentlyStatus status, //
                                     final ConsumeConcurrentlyContext context, //
                                     final ConsumeRequest consumeRequest//
    ) {
         //....消費(fèi)成功或者失敗的處理
        
        // 將這批消息從ProcessQueue中移除,代表消費(fèi)完畢,并返回當(dāng)前ProcessQueue中的消息最小的offset
        long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
        if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            // 更新消費(fèi)進(jìn)度
            this.defaultMQPushConsumerImpl.getOffsetStore()
                .updateOffset(consumeRequest.getMessageQueue(), offset, true);
        }
    }

在分析ProcessQueue的時候,說過removeMessage返回有兩種情況:

  1. 如果移除這批消息之后已經(jīng)沒有消息了,那么返回ProcessQueue中最大的offset+1
  2. 如果還有消息,那么返回treeMap中最小的key,即未消費(fèi)的消息中最小的offset

getOffsetStore返回RemoteBrokerOffsetStore,看下其實現(xiàn)

    @Override
    public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
        if (mq != null) {
            // 通過MessageQueue獲取本地的對應(yīng)的消費(fèi)進(jìn)度
            AtomicLong offsetOld = this.offsetTable.get(mq);
            if (null == offsetOld) {
                offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
            }

            if (null != offsetOld) {
                //increaseOnly 為false則直接覆蓋
                //increaseOnly為true則會判斷更新的值比老的值大才會進(jìn)行更新
                if (increaseOnly) {
                    MixAll.compareAndIncreaseOnly(offsetOld, offset);
                } else {
                    offsetOld.set(offset);
                }
            }
        }
    }

這里的increaseOnly參數(shù)根據(jù)不同的情況傳入不同的值,有些情況下會出現(xiàn)并發(fā)修改的情況,那么需要傳入true,內(nèi)部會進(jìn)行CAS的操作,能保證正確的賦值,而一些場景下,只需要進(jìn)行直接覆蓋或者說沒有并發(fā)修改的問題那么傳入false就行了。

消費(fèi)進(jìn)度持久化

offsetTable是一個Map,其保存了消費(fèi)進(jìn)度,這只一個內(nèi)存的結(jié)構(gòu),在Consumer啟動的時候,會啟動一個定時任務(wù)將本地的數(shù)據(jù)同步到broker,每persistConsumerOffsetInterval(默認(rèn)為5)秒進(jìn)行一次操作

    // mqs為需要持久化的隊列集合
    public void persistAll(Set<MessageQueue> mqs) {
        if (null == mqs || mqs.isEmpty())
            return;

        final HashSet<MessageQueue> unusedMQ = new HashSet<MessageQueue>();
        if (mqs != null && !mqs.isEmpty()) {
            // 遍歷本地的消費(fèi)進(jìn)度
            for(Map.Entry<MessageQueue, AtomicLong> entry:this.offsetTable.entrySet()){
                MessageQueue mq = entry.getKey();
                AtomicLong offset = entry.getValue();
                if (offset != null) {
                    // 如果該隊列在需要持久化的隊列中
                    if (mqs.contains(mq)) {
                        try {
                            // 將消費(fèi)進(jìn)度發(fā)送到broker
                            this.updateConsumeOffsetToBroker(mq, offset.get());
                        } catch (Exception e) {
                            log.error("updateConsumeOffsetToBroker exception, " + mq.toString(), e);
                        }
                    } else {//廢棄的消費(fèi)進(jìn)度
                        unusedMQ.add(mq);
                    }
                }
            }
        }
        // 如果有廢棄的MQ,則將其消費(fèi)進(jìn)度廢棄
        if (!unusedMQ.isEmpty()) {
            for (MessageQueue mq : unusedMQ) {
                this.offsetTable.remove(mq);
            }
        }
    }

傳入的是當(dāng)前Consumer分配的MessageQueue列表,rebalance之后,可能分配的MessageQueue已經(jīng)變化,所以offsetTable里有些消費(fèi)進(jìn)度的隊列時不需要的,所以將它的消費(fèi)進(jìn)度廢棄
updateConsumeOffsetToBroker方法就是簡單的網(wǎng)絡(luò)請求,將offset發(fā)送給Broker

消費(fèi)進(jìn)度提交

除了定時提交消費(fèi)進(jìn)度之外,在拉取消息的時候,會順便將本地的消費(fèi)進(jìn)度一起傳到broker,例如查看拉取消息的方法DefaultMQPushConsumerImpl#pullMessage中的一段代碼

boolean commitOffsetEnable = false;
        long commitOffsetValue = 0L;
        // 集群消費(fèi)模式
        if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
            // 通過offsetStore獲取當(dāng)前消費(fèi)進(jìn)度
            // ReadOffsetType.READ_FROM_MEMORY表示從本地獲取(即offsetTable)
            commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
            if (commitOffsetValue > 0) {//
                // 傳給Broker,讓其判斷是否需要保存消費(fèi)進(jìn)度
                commitOffsetEnable = true;
            }
        }
        // 構(gòu)造一些標(biāo)志位,這里主要看commitOffsetEnable值
        // 將commitOffsetEnable放到一個int類型的值中,讓broker判斷是否需要保存消費(fèi)進(jìn)度
                int sysFlag = PullSysFlag.buildSysFlag(//
                commitOffsetEnable, // commitOffset
                true, // suspend
                subExpression != null, // subscription
                classFilter // class filter
        );
        //....
            // 通過拉取消息請求,將commitOffsetValue和sysFlag傳給broker
            this.pullAPIWrapper.pullKernelImpl(//
                    pullRequest.getMessageQueue(), // 1
                    subExpression, // 2
                    subscriptionData.getSubVersion(), // 3
                    pullRequest.getNextOffset(), // 4
                    this.defaultMQPushConsumer.getPullBatchSize(), // 5
                    sysFlag, // 6
                    commitOffsetValue, // 7
                    BrokerSuspendMaxTimeMillis, // 8
                    ConsumerTimeoutMillisWhenSuspend, // 9
                    CommunicationMode.ASYNC, // 10
                    pullCallback// 11
            );

具體broker對消費(fèi)進(jìn)度的處理看后面分析

Broker

消費(fèi)進(jìn)度保存

RocketMQ的網(wǎng)絡(luò)請求都有一個RequestCode,更新消費(fèi)進(jìn)度的Code為UPDATE_CONSUMER_OFFSET,通過查到其使用的地方,找到對應(yīng)的Processor為ClientManageProcessor,其processRequest處理對應(yīng)的請求

    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)
            throws RemotingCommandException {
        switch (request.getCode()) {
            case RequestCode.HEART_BEAT:
                return this.heartBeat(ctx, request);
            case RequestCode.UNREGISTER_CLIENT:
                return this.unregisterClient(ctx, request);
            case RequestCode.GET_CONSUMER_LIST_BY_GROUP:
                return this.getConsumerListByGroup(ctx, request);
            case RequestCode.UPDATE_CONSUMER_OFFSET:
                return this.updateConsumerOffset(ctx, request);
            case RequestCode.QUERY_CONSUMER_OFFSET:
                return this.queryConsumerOffset(ctx, request);
            default:
                break;
        }
        return null;
    }

更新消費(fèi)進(jìn)度的方法為updateConsumerOffset,里面解析了請求體之后又調(diào)用了ConsumerOffsetManager.commitOffset方法

    public void commitOffset(final String clientHost, final String group, final String topic, final int queueId, final long offset) {
        // topic@group 
        String key = topic + TOPIC_GROUP_SEPARATOR + group;
        this.commitOffset(clientHost, key, queueId, offset);
    }

    private void commitOffset(final String clientHost, final String key, final int queueId, final long offset) {
        ConcurrentHashMap<Integer, Long> map = this.offsetTable.get(key);
        if (null == map) {
            map = new ConcurrentHashMap<Integer, Long>(32);
            map.put(queueId, offset);
            this.offsetTable.put(key, map);
        } else {
            Long storeOffset = map.put(queueId, offset);
            if (storeOffset != null && offset < storeOffset) {
                log.warn("[NOTIFYME]update consumer offset less than store. clientHost={}, key={}, queueId={}, requestOffset={}, storeOffset={}",
               clientHost, key, queueId, offset, storeOffset);
            }
        }
    }

邏輯也很簡單就不多說了,有意思的是,Broker的保存消費(fèi)進(jìn)度的結(jié)構(gòu)和Consumer類似,Broker多了一個維度,因為Broker接收的是所有消費(fèi)者的進(jìn)度,而Consumer保存的是自己的
在Consumer的消費(fèi)進(jìn)度上報到Broker之后,Broker只是保存到內(nèi)存,這并不可靠,大概也能猜出,和Consumer一樣,也有一個定時任務(wù)將消費(fèi)進(jìn)度持久化。這時,先看下ConsumerOffsetManager這個類的繼承關(guān)系,他的父類是ConfigManager,這個東西很重要,是幾個重要配置信息持久化類,看下其繼承關(guān)系:


image.png

分別是訂閱關(guān)系管理,消費(fèi)進(jìn)度管理,Topic信息管理,和延遲隊列信息管理,這4個配置信息都需要通過ConfigManager去持久化和加載,看下ConfigManager的幾個方法

public abstract class ConfigManager {
    // 將對象轉(zhuǎn)換成json串
    public abstract String encode();

    //將文件里內(nèi)容(json格式)的轉(zhuǎn)換成對象
    public boolean load() {
        String fileName = null;
            // 獲取文件地址
            fileName = this.configFilePath();
            // 將文件里的內(nèi)容讀取出來
            String jsonString = MixAll.file2String(fileName);
            // json轉(zhuǎn)換成指定對象的數(shù)據(jù)
            this.decode(jsonString);
    }
    // 配置文件地址
    public abstract String configFilePath();
    
    // 與load類似
    private boolean loadBak() {
        String fileName = null;
            fileName = this.configFilePath();
            String jsonString = MixAll.file2String(fileName + ".bak");
            this.decode(jsonString);
        return true;
    }
    // json轉(zhuǎn)換成指定對象的數(shù)據(jù)
    public abstract void decode(final String jsonString);
    // 將對象里的數(shù)據(jù)轉(zhuǎn)換成json并持久化到configFilePath()文件中
    public synchronized void persist() {
        String jsonString = this.encode(true);
            String fileName = this.configFilePath();
                MixAll.string2File(jsonString, fileName);
        
    }

    public abstract String encode(final boolean prettyFormat);

那么ConsumerOffsetManager會實現(xiàn)encode和decode方法并在某個地方定時調(diào)用persist方法,查看其使用的地方,找到BrokerController的initialize方法,有段定時任務(wù)如下:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

可以看到,每flushConsumerOffsetInterval(默認(rèn)5000)毫秒會進(jìn)行一次持久化

拉取消息的時候保存消費(fèi)進(jìn)度

拉取消息的Code為RequestCode.PULL_MESSAGE,對應(yīng)的Processor為PullMessageProcessor,找到其中消費(fèi)進(jìn)度處理的地方

// 上面說的consumer傳過來的commitOffsetEnable
// 當(dāng)Consumer本地消費(fèi)進(jìn)度大于0的時候這個參數(shù)為true
final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.

// brokerAllowSuspend在處理消息請求的時候為true,hold請求自己處理是false
boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
// Master才需要保存進(jìn)度,slave只是同步broker的消息
storeOffsetEnable = storeOffsetEnable
        && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
    this.brokerController.getConsumerOffsetManager().commitOffset(
        RemotingHelper.parseChannelRemoteAddr(channel),
        requestHeader.getConsumerGroup(), 
        requestHeader.getTopic(), 
        requestHeader.getQueueId(), 
        requestHeader.getCommitOffset());//consumer傳上來的offset
}

總的來說:
當(dāng)broker為master的時候,且Consumer消費(fèi)進(jìn)度大于0則在拉取消息的時候順便將消費(fèi)進(jìn)度保存到broker

問題分析

重復(fù)消費(fèi)問題

在ProcessQueue的removeMessage的第二種情況有個問題,假設(shè)有如下情況:
批量拉取了4條消息ABCD,分別對應(yīng)的offset為400|401|402|403,此時consumeBatchSize(批量消費(fèi)數(shù)量,默認(rèn)為1,即一條一條消費(fèi)),那么會分4個線程去消費(fèi)這幾個消息,出現(xiàn)下面消費(fèi)次序
消費(fèi)D -> removeMessage -> 返回400(情況2)
消費(fèi)C -> removeMessage -> 返回400(情況2)
消費(fèi)B -> removeMessage -> 返回400(情況2)
消費(fèi)A -> removeMessage -> 返回404(情況1)

在消費(fèi)A之前,本地消費(fèi)進(jìn)度持久化到Broker之后,應(yīng)用宕機(jī)了,那么此時Broker保存的是offset=400(準(zhǔn)確來說,在消費(fèi)完A且保存消費(fèi)進(jìn)度到broker之前,offset都是400)。那么會有什么問題呢?
先假設(shè)消費(fèi)完DCB且消費(fèi)進(jìn)度上傳完成宕機(jī),然后重啟應(yīng)用,這時候會先從broker獲取應(yīng)該從哪里消費(fèi)(),因為DCB消費(fèi)完成后都是保存400這個消費(fèi)進(jìn)度,那么返回的是400,這時候consumer會請求offset為400的消費(fèi),到這里,已經(jīng)重復(fù)消費(fèi)了DCB。

消費(fèi)進(jìn)度保存在哪里

  1. consumer保存在內(nèi)存,定時上傳broker
  2. broker保存在內(nèi)存,定時刷新到磁盤文件

:以上沒有特別聲明的都是并發(fā)消費(fèi)模式

整體流程圖

image.png
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 姓名:周小蓬 16019110037 轉(zhuǎn)載自:http://blog.csdn.net/YChenFeng/art...
    aeytifiw閱讀 34,897評論 13 425
  • metaq是阿里團(tuán)隊的消息中間件,之前也有用過和了解過kafka,據(jù)說metaq是基于kafka的源碼改過來的,他...
    菜鳥小玄閱讀 33,351評論 0 14
  • consumer 1.啟動 有別于其他消息中間件由broker做負(fù)載均衡并主動向consumer投遞消息,Rock...
    veShi文閱讀 5,077評論 0 2
  • 連日數(shù)陰晴, 新芽臨寒風(fēng)。 春曉寂寂冷, 晨鵲恰恰鳴。
    楓之然閱讀 147評論 6 16
  • 在廣袤的森林盡頭,佇立著一座古老巍峨的城堡。城堡里住著一對姐妹,姐姐凱莉和妹妹雪莉。 雪莉有一雙烏黑明亮的大眼睛,...
    喜樂圓子閱讀 467評論 1 9

友情鏈接更多精彩內(nèi)容