順序消息的實現(xiàn)
順序消息進(jìn)行消費時,若是第一次消費失敗,可以返回SUSPEND_CURRENT_QUEUE_A_MOMENT,下一次會繼續(xù)消費此消息。
順序消息的消費失敗時的重試邏輯,具體代碼在ProccessQueue中,順序消費時手動從processQueue中取消息,內(nèi)部是從msgTreeMap中取出消息后,將消息添加到consumingMsgOrderlyTreeMap中,若是消費成功,將該消息從consumingMsgOrderlyTreeMap中刪除即可。若是消費失敗,執(zhí)行makeMessageToConsumeAgain方法,將這些消息再放回msgTreeMap。
順序消費時有回滾和重試的邏輯,但是新版本不建議使用?;貪L和重試的邏輯和上面相同,回滾時將消息重新放回treeMap,提交時不用操作treeMap,但是需要根據(jù)consumingMsgOrderlyTreeMap找到當(dāng)前消費的offset,從下一個繼續(xù)消費。
順序消息消費時使用同一個線程,可以看一下ConsumeMessageOrderlyService
this.consumeRequestQueue = new LinkedBlockingQueue<Runnable>();
this.consumeExecutor = new ThreadPoolExecutor(
this.defaultMQPushConsumer.getConsumeThreadMin(),
this.defaultMQPushConsumer.getConsumeThreadMax(), // 迷惑性代碼...
1000 * 60,
TimeUnit.MILLISECONDS,
this.consumeRequestQueue,
new ThreadFactoryImpl("ConsumeMessageThread_"));
因為queue的長度是Integer.MAX_VALUE,因此在進(jìn)行消費時使用的是一個線程,并且有序執(zhí)行。
順序消息的消費使用同一個線程是在ConsumeMessageOrderlyService.ConsumeRequest和ProcessQueue中實現(xiàn)的。
// ProcessQueue
private volatile boolean consuming = false;
public boolean putMessage(final List<MessageExt> msgs) {
boolean dispatchToConsume = false;
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
int validMsgCnt = 0;
for (MessageExt msg : msgs) {
MessageExt old = msgTreeMap.put(msg.getQueueOffset(), msg);
if (null == old) {
validMsgCnt++;
this.queueOffsetMax = msg.getQueueOffset();
msgSize.addAndGet(msg.getBody().length);
}
}
msgCount.addAndGet(validMsgCnt);
// 如果有消息可以進(jìn)行消費,并且當(dāng)前queue沒有消費,則將dispatchToConsume和consuming置為true
if (!msgTreeMap.isEmpty() && !this.consuming) {
dispatchToConsume = true;
this.consuming = true;
}
if (!msgs.isEmpty()) {
MessageExt messageExt = msgs.get(msgs.size() - 1);
String property = messageExt.getProperty(MessageConst.PROPERTY_MAX_OFFSET);
if (property != null) {
long accTotal = Long.parseLong(property) - messageExt.getQueueOffset();
if (accTotal > 0) {
this.msgAccCnt = accTotal;
}
}
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("putMessage exception", e);
}
return dispatchToConsume;
}
// ConsumeMessageOrderlyService
public void submitConsumeRequest(
final List<MessageExt> msgs,
final ProcessQueue processQueue,
final MessageQueue messageQueue,
final boolean dispathToConsume) {
if (dispathToConsume) { // putMessage返回true時,才將request提交到線程池
// 如果已經(jīng)開始對該queue進(jìn)行消費了,就不會再次提交任務(wù)
ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
this.consumeExecutor.submit(consumeRequest);
}
}
// 提交給線程池的任務(wù)
// 主要代碼
class ConsumeRequest implements Runnable {
@Override
public void run() {
final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue);
synchronized (objLock) {
if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())
|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {
// 如果可以繼續(xù)消費,直接在當(dāng)前線程中輪詢消費該ProcessQueue即可
for (boolean continueConsume = true; continueConsume; ) {
// 在consumerImpl中的pullMessage方法中持續(xù)給ProcessQueue添加消息
// 手動從ProcessQueue中取消息
List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);
if (!msgs.isEmpty()) {
try {
this.processQueue.getLockConsume().lock();
//消費消息
status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);
} catch (Throwable e) {
} finally {
this.processQueue.getLockConsume().unlock();
}
// 處理消費結(jié)果,若是成功繼續(xù)消費
continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);
} else {
continueConsume = false;
}
}
}
}
看代碼可以發(fā)現(xiàn),如果順序消息消費失敗的話,即消費返回SUSPEND_CURRENT_QUEUE_A_MONENT時,當(dāng)前線程會停止消費,在processConsumeResult時,會提交新的任務(wù)到線程池,在新的線程中繼續(xù)消費該消息。
核心邏輯是保證一個ProcessQueue只在一個線程中輪詢消費消息。
發(fā)送順序消息時會添加一個隊列選擇器,將需要有序的消息發(fā)送到同一個隊列。消費端拉取特定queue的數(shù)據(jù)時天生有序,在消費時使用同一個線程進(jìn)行消費,因此就實現(xiàn)了順序消息。
事務(wù)消息
二階段提交加補償機制
第一階段提交消息到broker,broker將topic修改為RMQ_SYS_TRANS_HALF_TOPIC,存入對consumer不可見的topic/queue。如果此階段寫入成功,執(zhí)行transactionListener.executeLocalTransaction()。
第二階段,根據(jù)本地事務(wù)的執(zhí)行結(jié)果提交或者回滾第一階段提交至broker的消息,這里使用的是OneWay方法,可靠性低,可能出現(xiàn)失敗或者超時的情況。
broker端處理RequestCode.END_TRANSACTION的請求,如果是commit,則將原來的消息取出,更改為正確的topic/queue,并進(jìn)行落盤,然后添加Op狀態(tài)。如果是rollback,則直接添加Op狀態(tài)即可。
添加Op狀態(tài)是將消息添加到Op隊列中,Op隊列是為了補償邏輯時減少判斷。
補償邏輯:
BrokerController啟動時會啟動TransactionMessageCheckService,默認(rèn)每隔60s檢查一次HALF_TOPIC下所有的queue中的消息,檢查步驟如下
- 先判斷當(dāng)前queue和對應(yīng)的opQueue是否添加過消息,如果沒有,遍歷下一個queue,若有,進(jìn)行下一步判斷
- 獲取對應(yīng)的opQueue中的消息,若是沒有消息,遍歷下一個queue,若有,進(jìn)行下一步判斷
- 遍歷當(dāng)前queue
- 如果當(dāng)前偏移量已經(jīng)添加了oP狀態(tài),直接遍歷至下一個偏移量,否則進(jìn)行下一步判斷
- 獲取當(dāng)前消息,若為null,遍歷下一個偏移量,若不為null,進(jìn)行下一步判斷
- 若當(dāng)前消息需要舍棄或者跳過,遍歷下一個偏移量,否則進(jìn)行下一步判斷
- 判斷當(dāng)前消息是否需要check,若暫時不需要,重新走判斷流程
- 若是需要check,broker端給producer發(fā)送
CHECK_TRANSACTION_STATE消息,producer端接收到消息后,執(zhí)行TransactionListener.checkLocalTransaction,將check結(jié)果回發(fā)給broker。