RocketMQ是通過MQFaultStrategy的selectOneMessageQueue方法來選擇發(fā)送隊列的
MQFaultStrategy
我們先來看下MQFaultStrategy中重要的屬性
//延遲容錯對象,維護延遲Brokers的信息
//key:brokerName
private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
//延遲容錯開關(guān)
private boolean sendLatencyFaultEnable = false;
//延遲級別數(shù)組
private long[] latencyMax = { 50L, 100L, 550L, 1000L, 2000L, 3000L, 15000L };
//不可用時長數(shù)組
private long[] notAvailableDuration = {0L, 0L, 30000L, 60000L, 120000L, 180000L, 600000L};
MQFaultStrategy中最重要的屬性是latencyFaultTolerance,它維護了那些消息發(fā)送延遲較高的brokers的信息,同時延遲的時間長短對應(yīng)了延遲級別latencyMax 和時長notAvailableDuration ,sendLatencyFaultEnable 控制了是否開啟發(fā)送消息延遲功能。
來看主方法
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
//判斷是否開啟了開關(guān)
if (this.sendLatencyFaultEnable) {
try {
//獲取一個可用的并且brokerName=lastBrokerName的消息隊列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
//選擇一個相對好的broker,不考慮可用性的消息隊列
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
//隨機選擇一個消息隊列
return tpInfo.selectOneMessageQueue();
}
//獲得 lastBrokerName 對應(yīng)的一個消息隊列,不考慮該隊列的可用性
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
我們來看開啟了延遲容錯的邏輯:
1.首先選擇一個broker==lastBrokerName并且可用的一個隊列(也就是該隊列并沒有因為延遲過長而被加進(jìn)了延遲容錯對象latencyFaultTolerance 中)
2.如果第一步中沒有找到合適的隊列,此時舍棄broker==lastBrokerName這個條件,選擇一個相對較好的broker來發(fā)送
3.隨機選擇一個隊列來發(fā)送
LatencyFaultToleranceImpl
selectOneMessageQueue選擇隊列的基本邏輯我們已經(jīng)了解了,現(xiàn)在來具體看下LatencyFaultToleranceImpl是怎么來維護這些broker的可用性和延遲的呢?
主要屬性faultItemTable 和內(nèi)部類FaultItem
private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
class FaultItem implements Comparable<FaultItem> {
private final String name;
private volatile long currentLatency;
private volatile long startTimestamp;
public FaultItem(final String name) {
this.name = name;
}
}
顧名思義這是一個延遲對象List,key為broker,value為FaultItem,F(xiàn)aultItem中存儲了該broker的name,延遲界別和延遲開始的時間。
判斷隊列可用性方法如下:
public boolean isAvailable(final String name) {
final FaultItem faultItem = this.faultItemTable.get(name);
if (faultItem != null) {
return faultItem.isAvailable();
}
return true;
}
如果faultItem 中不存在該broker,返回true,當(dāng)存在時,還需判斷isAvailable
public boolean isAvailable() {
return (System.currentTimeMillis() - startTimestamp) >= 0;
}
如果延遲時間已過也返回true。
updateFaultItem
選擇完隊列后,執(zhí)行發(fā)送步驟
//發(fā)送start時間
beginTimestampPrev = System.currentTimeMillis();
//發(fā)送
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
//發(fā)送結(jié)束時間
endTimestamp = System.currentTimeMillis();
//更新broker的延遲情況
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
我們可以看到這里計算了某個broker的發(fā)送時間,然后根據(jù)這個時間去更新FaultItem
public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
if (this.sendLatencyFaultEnable) {
long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
}
}
private long computeNotAvailableDuration(final long currentLatency) {
for (int i = latencyMax.length - 1; i >= 0; i--) {
if (currentLatency >= latencyMax[i])
return this.notAvailableDuration[i];
}
return 0;
}
public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
FaultItem old = this.faultItemTable.get(name);
if (null == old) {
final FaultItem faultItem = new FaultItem(name);
faultItem.setCurrentLatency(currentLatency);
faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
old = this.faultItemTable.putIfAbsent(name, faultItem);
if (old != null) {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
} else {
old.setCurrentLatency(currentLatency);
old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
}
}
這里根據(jù)延遲時間對比MQFaultStrategy中的延遲級別數(shù)組latencyMax 不可用時長數(shù)組notAvailableDuration 來將該broker加進(jìn)faultItemTable中。
總結(jié)
1.所有的broker延遲信息都會被記錄
2.發(fā)送消息時會選擇延遲最低的broker來發(fā)送,提高效率
3.broker延遲過高會自動減少它的消息分配,充分發(fā)揮所有服務(wù)器的能力