RocketMQ源碼之selectOneMessageQueue選擇隊列

RocketMQ是通過MQFaultStrategy的selectOneMessageQueue方法來選擇發(fā)送隊列的

MQFaultStrategy

我們先來看下MQFaultStrategy中重要的屬性

    //延遲容錯對象,維護延遲Brokers的信息
    //key:brokerName
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
    
    //延遲容錯開關(guān)
    private boolean sendLatencyFaultEnable = false;
    
    //延遲級別數(shù)組
    private long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };

    //不可用時長數(shù)組
    private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};

MQFaultStrategy中最重要的屬性是latencyFaultTolerance,它維護了那些消息發(fā)送延遲較高的brokers的信息,同時延遲的時間長短對應(yīng)了延遲級別latencyMax 和時長notAvailableDuration ,sendLatencyFaultEnable 控制了是否開啟發(fā)送消息延遲功能。
來看主方法

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判斷是否開啟了開關(guān)
        if (this.sendLatencyFaultEnable) {
            try {
//獲取一個可用的并且brokerName=lastBrokerName的消息隊列
                int index = tpInfo.getSendWhichQueue().getAndIncrement();
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    if (pos < 0)
                        pos = 0;
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
                        if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
                            return mq;
                    }
                }
//選擇一個相對好的broker,不考慮可用性的消息隊列
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                if (writeQueueNums > 0) {
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        mq.setBrokerName(notBestBroker);
                        mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
                    }
                    return mq;
                } else {
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
//隨機選擇一個消息隊列
            return tpInfo.selectOneMessageQueue();
        }
//獲得 lastBrokerName 對應(yīng)的一個消息隊列,不考慮該隊列的可用性
        return tpInfo.selectOneMessageQueue(lastBrokerName);
}

我們來看開啟了延遲容錯的邏輯:

1.首先選擇一個broker==lastBrokerName并且可用的一個隊列(也就是該隊列并沒有因為延遲過長而被加進(jìn)了延遲容錯對象latencyFaultTolerance 中)
2.如果第一步中沒有找到合適的隊列,此時舍棄broker==lastBrokerName這個條件,選擇一個相對較好的broker來發(fā)送
3.隨機選擇一個隊列來發(fā)送

LatencyFaultToleranceImpl

selectOneMessageQueue選擇隊列的基本邏輯我們已經(jīng)了解了,現(xiàn)在來具體看下LatencyFaultToleranceImpl是怎么來維護這些broker的可用性和延遲的呢?
主要屬性faultItemTable 和內(nèi)部類FaultItem

private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
class FaultItem implements Comparable<FaultItem> {
        private final String name;
        private volatile long currentLatency;
        private volatile long startTimestamp;

        public FaultItem(final String name) {
            this.name = name;
        }
}

顧名思義這是一個延遲對象List,key為broker,value為FaultItem,F(xiàn)aultItem中存儲了該broker的name,延遲界別和延遲開始的時間。

判斷隊列可用性方法如下:

public boolean isAvailable(final String name) {
     final FaultItem faultItem = this.faultItemTable.get(name);
     if (faultItem != null) {
         return faultItem.isAvailable();
     }
     return true;
}

如果faultItem 中不存在該broker,返回true,當(dāng)存在時,還需判斷isAvailable

public boolean isAvailable() {
     return (System.currentTimeMillis() - startTimestamp) >= 0;
}

如果延遲時間已過也返回true。

updateFaultItem

選擇完隊列后,執(zhí)行發(fā)送步驟

//發(fā)送start時間
beginTimestampPrev = System.currentTimeMillis();
//發(fā)送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
//發(fā)送結(jié)束時間
endTimestamp = System.currentTimeMillis();
//更新broker的延遲情況
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);

我們可以看到這里計算了某個broker的發(fā)送時間,然后根據(jù)這個時間去更新FaultItem

public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
}

private long computeNotAvailableDuration(final long currentLatency) {
        for (int i = latencyMax.length - 1; i >= 0; i--) {
            if (currentLatency >= latencyMax[i])
                return this.notAvailableDuration[i];
        }

        return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        FaultItem old = this.faultItemTable.get(name);
        if (null == old) {
            final FaultItem faultItem = new FaultItem(name);
            faultItem.setCurrentLatency(currentLatency);
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);

            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            old.setCurrentLatency(currentLatency);
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
}

這里根據(jù)延遲時間對比MQFaultStrategy中的延遲級別數(shù)組latencyMax 不可用時長數(shù)組notAvailableDuration 來將該broker加進(jìn)faultItemTable中。

總結(jié)

1.所有的broker延遲信息都會被記錄
2.發(fā)送消息時會選擇延遲最低的broker來發(fā)送,提高效率
3.broker延遲過高會自動減少它的消息分配,充分發(fā)揮所有服務(wù)器的能力

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容