rocket源碼 順序消息和事務(wù)消息

順序消息的實現(xiàn)

順序消息進(jìn)行消費時,若是第一次消費失敗,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT,下一次會繼續(xù)消費此消息。

順序消息的消費失敗時的重試邏輯,具體代碼在ProccessQueue中,順序消費時手動從processQueue中取消息,內(nèi)部是從msgTreeMap中取出消息后,將消息添加到consumingMsgOrderlyTreeMap中,若是消費成功,將該消息從consumingMsgOrderlyTreeMap中刪除即可。若是消費失敗,執(zhí)行makeMessageToConsumeAgain方法,將這些消息再放回msgTreeMap。

順序消費時有回滾和重試的邏輯,但是新版本不建議使用?;貪L和重試的邏輯和上面相同,回滾時將消息重新放回treeMap,提交時不用操作treeMap,但是需要根據(jù)consumingMsgOrderlyTreeMap找到當(dāng)前消費的offset,從下一個繼續(xù)消費。

順序消息消費時使用同一個線程,可以看一下ConsumeMessageOrderlyService

this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
            this.defaultMQPushConsumer.getConsumeThreadMin(),
            this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代碼...
            1000 * 60,
            TimeUnit.MILLISECONDS,
            this.consumeRequestQueue,
            new ThreadFactoryImpl("ConsumeMessageThread_"));

因為queue的長度是Integer.MAX_VALUE,因此在進(jìn)行消費時使用的是一個線程,并且有序執(zhí)行。

順序消息的消費使用同一個線程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中實現(xiàn)的。

// ProcessQueue

private volatile boolean consuming = false;


    public boolean putMessage(final List<MessageExt> msgs) {
        boolean dispatchToConsume = false;
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                int validMsgCnt = 0;
                for (MessageExt msg : msgs) {
                    MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
                    if (null == old) {
                        validMsgCnt++;
                        this.queueOffsetMax = msg.getQueueOffset();
                        msgSize.addAndGet(msg.getBody().length);
                    }
                }
                msgCount.addAndGet(validMsgCnt);
                // 如果有消息可以進(jìn)行消費,并且當(dāng)前queue沒有消費,則將dispatchToConsume和consuming置為true
                if (!msgTreeMap.isEmpty() && !this.consuming) {
                    dispatchToConsume = true;
                    this.consuming = true;
                }

                if (!msgs.isEmpty()) {
                    MessageExt messageExt = msgs.get(msgs.size() - 1);
                    String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
                    if (property != null) {
                        long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
                        if (accTotal > 0) {
                            this.msgAccCnt = accTotal;
                        }
                    }
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("putMessage exception", e);
        }

        return dispatchToConsume;
    }
// ConsumeMessageOrderlyService

    public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) {
        if (dispathToConsume) { // putMessage返回true時,才將request提交到線程池
        // 如果已經(jīng)開始對該queue進(jìn)行消費了,就不會再次提交任務(wù)
            ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
            this.consumeExecutor.submit(consumeRequest);
        }
    }
// 提交給線程池的任務(wù)
// 主要代碼
    class ConsumeRequest implements Runnable {

        @Override
        public void run() {
            final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
            synchronized (objLock) {
                if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
                    || (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
                    // 如果可以繼續(xù)消費,直接在當(dāng)前線程中輪詢消費該ProcessQueue即可
                    for (boolean continueConsume = true; continueConsume; ) {
                        // 在consumerImpl中的pullMessage方法中持續(xù)給ProcessQueue添加消息
                        // 手動從ProcessQueue中取消息
                        List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
                        if (!msgs.isEmpty()) {
                            try {
                                this.processQueue.getLockConsume().lock();
                                //消費消息
                                status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
                            } catch (Throwable e) {
                               
                            } finally {
                                this.processQueue.getLockConsume().unlock();
                            }
                            // 處理消費結(jié)果,若是成功繼續(xù)消費
                            continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
                        } else {
                            continueConsume = false;
                        }
                    }
                } 
        }

看代碼可以發(fā)現(xiàn),如果順序消息消費失敗的話,即消費返回SUSPEND_CURRENT_QUEUE_A_MONENT時,當(dāng)前線程會停止消費,在processConsumeResult時,會提交新的任務(wù)到線程池,在新的線程中繼續(xù)消費該消息。

核心邏輯是保證一個ProcessQueue只在一個線程中輪詢消費消息。

發(fā)送順序消息時會添加一個隊列選擇器,將需要有序的消息發(fā)送到同一個隊列。消費端拉取特定queue的數(shù)據(jù)時天生有序,在消費時使用同一個線程進(jìn)行消費,因此就實現(xiàn)了順序消息。

事務(wù)消息

二階段提交加補償機制

第一階段提交消息到broker,broker將topic修改為RMQ_SYS_TRANS_HALF_TOPIC,存入對consumer不可見的topic/queue。如果此階段寫入成功,執(zhí)行transactionListener.executeLocalTransaction()

第二階段,根據(jù)本地事務(wù)的執(zhí)行結(jié)果提交或者回滾第一階段提交至broker的消息,這里使用的是OneWay方法,可靠性低,可能出現(xiàn)失敗或者超時的情況。

broker端處理RequestCode.END_TRANSACTION的請求,如果是commit,則將原來的消息取出,更改為正確的topic/queue,并進(jìn)行落盤,然后添加Op狀態(tài)。如果是rollback,則直接添加Op狀態(tài)即可。

添加Op狀態(tài)是將消息添加到Op隊列中,Op隊列是為了補償邏輯時減少判斷。

補償邏輯:

BrokerController啟動時會啟動TransactionMessageCheckService,默認(rèn)每隔60s檢查一次HALF_TOPIC下所有的queue中的消息,檢查步驟如下

  • 先判斷當(dāng)前queue和對應(yīng)的opQueue是否添加過消息,如果沒有,遍歷下一個queue,若有,進(jìn)行下一步判斷
  • 獲取對應(yīng)的opQueue中的消息,若是沒有消息,遍歷下一個queue,若有,進(jìn)行下一步判斷
  • 遍歷當(dāng)前queue
  • 如果當(dāng)前偏移量已經(jīng)添加了oP狀態(tài),直接遍歷至下一個偏移量,否則進(jìn)行下一步判斷
  • 獲取當(dāng)前消息,若為null,遍歷下一個偏移量,若不為null,進(jìn)行下一步判斷
  • 若當(dāng)前消息需要舍棄或者跳過,遍歷下一個偏移量,否則進(jìn)行下一步判斷
  • 判斷當(dāng)前消息是否需要check,若暫時不需要,重新走判斷流程
  • 若是需要check,broker端給producer發(fā)送CHECK_TRANSACTION_STATE消息,producer端接收到消息后,執(zhí)行TransactionListener.checkLocalTransaction,將check結(jié)果回發(fā)給broker。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容