一、背景
MQ組件是系統(tǒng)架構(gòu)里必不可少的一門利器,設(shè)計層面可以降低系統(tǒng)耦合度,高并發(fā)場景又可以起到削峰填谷的作用,從單體應(yīng)用到集群部署方案,再到現(xiàn)在的微服務(wù)架構(gòu),MQ憑借其優(yōu)秀的性能和高可靠性,得到了廣泛的認(rèn)可。
隨著數(shù)據(jù)量增多,系統(tǒng)壓力變大,開始出現(xiàn)這種現(xiàn)象:數(shù)據(jù)庫已經(jīng)更新了,但消息沒發(fā)出來,或者消息先發(fā)了,但后來數(shù)據(jù)庫更新失敗了,結(jié)果研發(fā)童鞋各種數(shù)據(jù)修復(fù),這種生產(chǎn)問題出現(xiàn)的概率不大,但讓人很郁悶。這個其實(shí)就是數(shù)據(jù)庫事務(wù)與MQ消息的一致性問題,簡單來講,數(shù)據(jù)庫的事務(wù)跟普通MQ消息發(fā)送無法直接綁定與數(shù)據(jù)庫事務(wù)綁定在一起,例如上面提及的兩種問題場景:
- 數(shù)據(jù)庫事務(wù)提交后發(fā)送MQ消息;
- MQ消息先發(fā),然后再提交數(shù)據(jù)庫事務(wù)。
場景1的問題是數(shù)據(jù)庫事務(wù)可能剛剛提交,服務(wù)器就宕機(jī)了,MQ消息沒發(fā)出去,場景2的問題就是MQ消息發(fā)送出去了,但數(shù)據(jù)庫事務(wù)提交失敗,又沒辦法追加已經(jīng)發(fā)出去的MQ消息,結(jié)果導(dǎo)致數(shù)據(jù)沒更新,下游已經(jīng)收到消息,最終事務(wù)出現(xiàn)不一致的情況。
二、事務(wù)消息的引出
我們以微服務(wù)架構(gòu)的購物場景為例,參照一下RocketMQ官方的例子,用戶A發(fā)起訂單,支付100塊錢操作完成后,能得到100積分,賬戶服務(wù)和會員服務(wù)是兩個獨(dú)立的微服務(wù)模塊,有各自的數(shù)據(jù)庫,按照上文提及的問題可能性,將會出現(xiàn)這些情況:
- 如果先扣款,再發(fā)消息,可能錢剛扣完,宕機(jī)了,消息沒發(fā)出去,結(jié)果積分沒增加。
- 如果先發(fā)消息,再扣款,可能積分增加了,但錢沒扣掉,白送了100積分給人家。
-
錢正常扣了,消息也發(fā)送成功了,但會員服務(wù)實(shí)例消費(fèi)消息出現(xiàn)問題,結(jié)果積分沒增加。
購物場景MQ通信案例
由此引出的是數(shù)據(jù)庫事務(wù)與MQ消息的事務(wù)一致性問題,rocketmq事務(wù)消息解決的問題:解決本地事務(wù)執(zhí)行與消息發(fā)送的原子性問題。這里界限一定要明白,是確保MQ生產(chǎn)端正確無誤地將消息發(fā)送出來,沒有多發(fā),也不會漏發(fā)。但至于發(fā)送后消費(fèi)端有沒有正常的消費(fèi)掉(如上面提及的第三種情況,錢正常扣了,消息也發(fā)了,但下游消費(fèi)出問題導(dǎo)致積分不對),這種異常場景將由MQ消息消費(fèi)失敗重試機(jī)制來保證,不在此次的討論范圍內(nèi)。
常用的MQ組件針對此場景都有自己的實(shí)現(xiàn)方案,如ActiveMQ使用AMQP協(xié)議(二階提交方式)保證消息正確發(fā)送,這里我們以RocketMQ為重點(diǎn)進(jìn)行學(xué)習(xí)。
三、RocketMQ事務(wù)消息設(shè)計思路
根據(jù)CAP理論,RocketMQ事務(wù)消息通過異步確保方式,保證事務(wù)的最終一致性。設(shè)計流程上借鑒兩階段提交理論,流程圖如下:

- 應(yīng)用模塊遇到要發(fā)送事務(wù)消息的場景時,先發(fā)送prepare消息給MQ。
- prepare消息發(fā)送成功后,應(yīng)用模塊執(zhí)行數(shù)據(jù)庫事務(wù)(本地事務(wù))。
- 根據(jù)數(shù)據(jù)庫事務(wù)執(zhí)行的結(jié)果,再返回Commit或Rollback給MQ。
- 如果是Commit,MQ把消息下發(fā)給Consumer端,如果是Rollback,直接刪掉prepare消息。
- 第3步的執(zhí)行結(jié)果如果沒響應(yīng),或是超時的,啟動定時任務(wù)回查事務(wù)狀態(tài)(最多重試15次,超過了默認(rèn)丟棄此消息),處理結(jié)果同第4步。
- MQ消費(fèi)的成功機(jī)制由MQ自己保證。
四、RocketMQ事務(wù)消息實(shí)現(xiàn)流程
以RocketMQ 4.5.2版本為例,事務(wù)消息有專門的一個隊列RMQ_SYS_TRANS_HALF_TOPIC,所有的prepare消息都先往這里放,當(dāng)消息收到Commit請求后,就把消息再塞到真實(shí)的Topic隊列里,供Consumer消費(fèi),同時向RMQ_SYS_TRANS_OP_HALF_TOPIC塞一條消息。簡易流程圖如下:

上述流程中,請允許我這樣劃分模塊職責(zé):
- RocketMQ Client即我們工程中導(dǎo)入的依賴jar包,RocketMQ Broker端即部署的服務(wù)端,NameServer暫未體現(xiàn)。
- 應(yīng)用模塊成對出現(xiàn),上游為事務(wù)消息生產(chǎn)端,下游為事務(wù)消息消費(fèi)端(事務(wù)消息對消費(fèi)端是透明的,與普通消息一致)。
應(yīng)用模塊的事務(wù)因為中斷,或是其他的網(wǎng)絡(luò)原因,導(dǎo)致無法立即響應(yīng)的,RocketMQ當(dāng)做UNKNOW處理,RocketMQ事務(wù)消息還提供了一個補(bǔ)救方案:定時查詢事務(wù)消息的數(shù)據(jù)庫事務(wù)狀態(tài)
簡易流程圖如下:

五、源碼剖析
講解的思路基本上按照如下流程圖,根據(jù)模塊職責(zé)和流程逐一分析。
環(huán)境準(zhǔn)備
閱讀源碼前需要在IDE上獲取和調(diào)試RocketMQ的源碼,這部分請自行查閱方法。應(yīng)用模塊(事務(wù)消息生產(chǎn)端)核心源碼
創(chuàng)建一個監(jiān)聽類,實(shí)現(xiàn)TransactionListener接口,在實(shí)現(xiàn)的數(shù)據(jù)庫事務(wù)提交方法和回查事務(wù)狀態(tài)方法模擬結(jié)果。
/**
* @program: rocket
* @description: 調(diào)試事務(wù)消息示例代碼
* @author: Huang
* @create: 2019-10-16
**/
public class SelfTransactionListener implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private AtomicInteger checkTimes = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
/**
* 執(zhí)行本地事務(wù)
*
* @param message
* @param o
* @return
*/
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
String msgKey = message.getKeys();
System.out.println("start execute local transaction " + msgKey);
LocalTransactionState state;
if (msgKey.contains("1")) {
// 第一條消息讓他通過
state = LocalTransactionState.COMMIT_MESSAGE;
} else if (msgKey.contains("2")) {
// 第二條消息模擬異常,明確回復(fù)回滾操作
state = LocalTransactionState.ROLLBACK_MESSAGE;
} else {
// 第三條消息無響應(yīng),讓它調(diào)用回查事務(wù)方法
state = LocalTransactionState.UNKNOW;
// 給剩下3條消息,放1,2,3三種狀態(tài)
localTrans.put(msgKey, transactionIndex.incrementAndGet());
}
System.out.println("executeLocalTransaction:" + message.getKeys() + ",execute state:" + state + ",current time:" + System.currentTimeMillis());
return state;
}
/**
* 回查本地事務(wù)結(jié)果
*
* @param messageExt
* @return
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
String msgKey = messageExt.getKeys();
System.out.println("start check local transaction " + msgKey);
Integer state = localTrans.get(msgKey);
switch (state) {
case 1:
System.out.println("check result unknown 回查次數(shù)" + checkTimes.incrementAndGet());
return LocalTransactionState.UNKNOW;
case 2:
System.out.println("check result commit message, 回查次數(shù)" + checkTimes.incrementAndGet());
return LocalTransactionState.COMMIT_MESSAGE;
case 3:
System.out.println("check result rollback message, 回查次數(shù)" + checkTimes.incrementAndGet());
return LocalTransactionState.ROLLBACK_MESSAGE;
default:
return LocalTransactionState.COMMIT_MESSAGE;
}
}
}
事務(wù)消息生產(chǎn)者代碼示例,共發(fā)送5條消息,基本上包含全部的場景,休眠時間設(shè)置足夠的時間,保證回查事務(wù)時實(shí)例還在運(yùn)行中,代碼如下:
/**
* @program: rocket
* @description: Rocketmq事務(wù)消息
* @author: Huang
* @create: 2019-10-16
**/
public class TransactionProducer {
public static void main(String[] args) {
try {
TransactionMQProducer producer = new TransactionMQProducer("transactionMQProducer");
producer.setNamesrvAddr("10.0.133.29:9876");
producer.setTransactionListener(new SelfTransactionListener());
producer.start();
for (int i = 1; i < 6; i++) {
Message message = new Message("TransactionTopic", "transactionTest","msg-" + i, ("Hello" + ":" + i).getBytes());
try {
SendResult result = producer.sendMessageInTransaction(message, "Hello" + ":" + i);
System.out.printf("Topic:%s send success, misId is:%s%n", message.getTopic(), result.getMsgId());
} catch (Exception e) {
e.printStackTrace();
}
}
Thread.sleep(Integer.MAX_VALUE);
producer.shutdown();
} catch (MQClientException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- RocketMQ Client端代碼,代碼主要邏輯可以分成三段:第一段為設(shè)置消息為prepare消息,并發(fā)送給RocketMQ服務(wù)端
SendResult sendResult = null;
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
第二段:消息發(fā)送成功后,調(diào)用應(yīng)用模塊數(shù)據(jù)庫事務(wù)方法,獲取事務(wù)結(jié)果(為節(jié)省篇幅,代碼有刪節(jié))
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransactionBranch(msg, arg);
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
} catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
第三段:發(fā)送事務(wù)結(jié)果到RocketMQ端,結(jié)束事務(wù),并響應(yīng)結(jié)果給應(yīng)用模塊
try {
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
- RocketMQ Broker端事務(wù)提交/回滾操作(這里取endTransaction部分)
代碼入口:org.apache.rocketmq.broker.processor.EndTransactionProcessor
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 修改消息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實(shí)Topic
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 將消息存儲到真實(shí)Topic中,供Consumer消費(fèi)
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標(biāo)記為刪除狀態(tài),事務(wù)消息回查的定時任務(wù)中會做處理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
} else if (MessageSysFlag.TRANSACTION_ROLLBACK_TYPE == requestHeader.getCommitOrRollback()) {
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
- RocketMQ Broker端定時任務(wù)回查數(shù)據(jù)庫事務(wù)部分
方法入口:org.apache.rocketmq.broker.transaction.TransactionalMessageCheckService
@Override
protected void onWaitEnd() {
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
// 超過15次的回查事務(wù)狀態(tài)失敗后,默認(rèn)是丟棄此消息
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
log.info("Begin to check prepare message, begin time:{}", begin);
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin);
}
回查事務(wù)調(diào)用入口:
// 此段代碼為TransactionalMessageServiceImpl類中的check方法
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1
);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
// 調(diào)用AbstractTransactionalMessageCheckListener的
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
// 此方法在AbstractTransactionalMessageCheckListener類中
public void resolveHalfMsg(final MessageExt msgExt) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
sendCheckMessage(msgExt);
} catch (Exception e) {
LOGGER.error("Send check message error!", e);
}
}
});
}
// 此方法在AbstractTransactionalMessageCheckListener類中
public void sendCheckMessage(MessageExt msgExt) throws Exception {
CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader();
checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset());
checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId());
checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX));
checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId());
checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset());
msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
msgExt.setStoreSize(0);
String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP);
Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId);
if (channel != null) {
// 通過Netty發(fā)送請求到RocketMQ Client端,執(zhí)行checkTransactionState方法
brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt);
} else {
LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId);
}
}
RocketMQ Client接收到服務(wù)端的請求后,重新調(diào)用回查數(shù)據(jù)庫事務(wù)方法,并將事務(wù)結(jié)果再次提交到RocketMQ Broker端
方法入口:org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl類的方法
try {
if (transactionCheckListener != null) {
localTransactionState = transactionCheckListener.checkLocalTransactionState(message);
} else if (transactionListener != null) {
log.debug("Used new check API in transaction message");
localTransactionState = transactionListener.checkLocalTransaction(message);
} else {
log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group);
}
} catch (Throwable e) {
log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e);
exception = e;
}
this.processTransactionState(
localTransactionState,
group,
exception);
六、補(bǔ)充一個問題
官網(wǎng)有提及,事務(wù)消息是不支持延遲消息和批量消息,我手賤試了一下延遲消息,事務(wù)消息設(shè)置一個DelayTimeLevel,結(jié)果這條消息就一直無法從RMQ_SYS_TRANS_HALF_TOPIC移除掉了,應(yīng)用模塊的日志發(fā)現(xiàn)在反復(fù)地嘗試回查事務(wù),Console界面上RMQ_SYS_TRANS_HALF_TOPIC的消息查詢列表很快就超過2000條記錄了,為什么?
我們回到代碼層面進(jìn)行分析,過程如下:
1.設(shè)置了DelayTimeLevel后,數(shù)據(jù)事務(wù)提交后(或是回查數(shù)據(jù)庫事務(wù)完成后),將消息寫入目標(biāo)Topic時,由于DelayTimeLevel的干擾,目標(biāo)Topic將變成SCHEDULE_TOPIC_XXXX,同時REAL_TOPIC變成RMQ_SYS_TRANS_HALF_TOPIC,真實(shí)的Topic在這個環(huán)節(jié)已經(jīng)丟失。
// RocketMQ Broker端接受事務(wù)提交后的處理
org.apache.rocketmq.broker.processor.EndTransactionProcessor類
OperationResult result = new OperationResult();
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE == requestHeader.getCommitOrRollback()) {
// 這里調(diào)用CommitLog的putMessage方法
result = this.brokerController.getTransactionalMessageService().commitMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
RemotingCommand res = checkPrepareMessage(result.getPrepareMessage(), requestHeader);
if (res.getCode() == ResponseCode.SUCCESS) {
// 修改消息的Topic為由RMQ_SYS_TRANS_HALF_TOPIC改為真實(shí)Topic
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), requestHeader.getCommitOrRollback()));
msgInner.setQueueOffset(requestHeader.getTranStateTableOffset());
msgInner.setPreparedTransactionOffset(requestHeader.getCommitLogOffset());
msgInner.setStoreTimestamp(result.getPrepareMessage().getStoreTimestamp());
// 將消息存儲到真實(shí)Topic中,此時Topic已經(jīng)變成SCHEDULE_TOPIC_XXXX
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
// 將消息存儲到RMQ_SYS_TRANS_OP_HALF_TOPIC,標(biāo)記為刪除狀態(tài),事務(wù)消息回查的定時任務(wù)中會做處理
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return sendResult;
}
return res;
}
}
// 此段代碼在org.apache.rocketmq.store.CommitLog類的putMessage方法中
// 由于DelayTimeLevel的干擾,目標(biāo)Topic將變成SCHEDULE_TOPIC_XXXX
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
// Delay Delivery
if (msg.getDelayTimeLevel() > 0) {
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
}
topic = ScheduleMessageService.SCHEDULE_TOPIC;
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
}
}
打印的日志示例如下:
2019-10-17 14\:41\:05 INFO EndTransactionThread_4 - Transaction op message write successfully. messageId=0A00851D00002A9F0000000000000E09, queueId=0
msgExt:MessageExt [queueId=0, storeSize=335, queueOffset=5, sysFlag=8, bornTimestamp=1571293959305, bornHost=/10.0.133.29:54634, storeTimestamp=1571294460555,
storeHost=/10.0.133.29:10911, msgId=0A00851D00002A9F0000000000000E09, commitLogOffset=3593, bodyCRC=1849408413, reconsumeTimes=0, preparedTransactionOffset=0,
toString()=Message{topic='SCHEDULE_TOPIC_XXXX', flag=0, properties={REAL_TOPIC=RMQ_SYS_TRANS_HALF_TOPIC, TRANSACTION_CHECK_TIMES=3, KEYS=msg-test-3,
TRAN_MSG=true, UNIQ_KEY=0A00851D422C18B4AAC25584B0880000, WAIT=false, DELAY=1, PGROUP=transactionMQProducer, TAGS=transactionTest, REAL_QID=0},
body=[72, 101, 108, 108, 111, 84, 105, 109, 101, 58, 51], transactionId='null'}]
2.延遲消息是定時任務(wù)觸發(fā)的,我剛剛設(shè)置的延遲是1秒,定時任務(wù)又把消息重新放回RMQ_SYS_TRANS_HALF_TOPIC中,注意此時只有RMQ_SYS_TRANS_HALF_TOPIC有消息,RMQ_SYS_TRANS_OP_HALF_TOPIC隊列是沒有這條消息的,如下代碼:
// 此段代碼在org.apache.rocketmq.store.schedule.ScheduleMessageService類executeOnTimeup方法內(nèi)
try {
// 消息重新回到RMQ_SYS_TRANS_HALF_TOPIC隊列中
MessageExtBrokerInner msgInner = this.messageTimeup(msgExt);
PutMessageResult putMessageResult =
ScheduleMessageService.this.writeMessageStore
.putMessage(msgInner);
if (putMessageResult != null
&& putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) {
continue;
} else {
log.error(
"ScheduleMessageService, a message time up, but reput it failed, topic: {} msgId {}",
msgExt.getTopic(), msgExt.getMsgId());
ScheduleMessageService.this.timer.schedule(
new DeliverDelayedMessageTimerTask(this.delayLevel,
nextOffset), DELAY_FOR_A_PERIOD);
ScheduleMessageService.this.updateOffset(this.delayLevel,
nextOffset);
return;
}
} catch (Exception e) {
log.error(
"ScheduleMessageService, messageTimeup execute error, drop it. msgExt="
+ msgExt + ", nextOffset=" + nextOffset + ",offsetPy="
+ offsetPy + ",sizePy=" + sizePy, e);
}
3.事務(wù)消息定時任務(wù)啟動,查RMQ_SYS_TRANS_HALF_TOPIC有消息,但RMQ_SYS_TRANS_OP_HALF_TOPIC沒有消息,為了保證消息順序?qū)懭?,又將此消息重新填入RMQ_SYS_TRANS_OP_HALF_TOPIC中,并且觸發(fā)一次回查事務(wù)操作。示例代碼如上文回查事務(wù)調(diào)用入口相同:
// 此段代碼為TransactionalMessageServiceImpl類中的check方法
List<MessageExt> opMsg = pullResult.getMsgFoundList();
boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime)
|| (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout))
|| (valueOfCurrentMinusBorn <= -1
);
if (isNeedCheck) {
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
listener.resolveHalfMsg(msgExt);
} else {
pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset);
log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i,
messageQueue, pullResult);
continue;
}
這樣構(gòu)成了一個死循環(huán),直到嘗試到15次才丟棄此消息(默認(rèn)最大嘗試次數(shù)是15次),這個代價有點(diǎn)大。針對此問題的優(yōu)化,已經(jīng)提交PR到RocketMQ社區(qū),新版本發(fā)布后,事務(wù)消息將屏蔽DelayTimeLevel,這個問題就不會再出現(xiàn)了。
在新版本發(fā)布之前,我們的解決辦法:
- 明確研發(fā)過程中事務(wù)消息禁止設(shè)置DelayTimeLevel。
感覺有風(fēng)險,畢竟新來的童鞋,不是特別了解此部分功能的可能會手抖加上(像我最早那樣)。 - 對RocketMQ Client做一次簡單的封裝,比如提供一個rocketmq-spring-boot-starter,在提供發(fā)送事務(wù)消息的方法里不提供設(shè)置的入口,如下示例:
/**
* 事務(wù)消息發(fā)送
* 不支持延遲發(fā)送和批量發(fā)送
*/
public void sendMessageInTransaction(String topic, String tag, Object message, String requestId) throws Exception {
TransactionMQProducer producer = annotationScan.getProducer(topic + "_" + tag);
producer.sendMessageInTransaction(MessageBuilder.of(topic, tag, message, requestId).build(), message);
}
應(yīng)該靠譜一些,畢竟從源頭杜絕了DelayTimeLevel參數(shù)的設(shè)置。
七、結(jié)束語
本篇簡單介紹了事務(wù)消息的解決的場景和職責(zé)的界限,基本的設(shè)計思路和流程,在此借鑒學(xué)習(xí)了RocketMQ作者的圖稿,然后挑了部分代碼作簡要的講解,還是自己的刨坑過程,文章內(nèi)有任何不正確或不詳盡之處請留言指導(dǎo),謝謝。
專注Java高并發(fā)、分布式架構(gòu),更多技術(shù)干貨分享與心得,請關(guān)注公眾號:Java架構(gòu)社區(qū)

