目錄
一、問題思考
二、事務消息客戶端發(fā)送流程
1.事務發(fā)送與普通啟動差異
2.事務消息發(fā)送調(diào)用鏈
3.事務消息發(fā)送分析
4.事務消息發(fā)送結(jié)果分析
5.結(jié)束事務分析
三、事務消息服務端存儲流程
1.事務消息存儲調(diào)用鏈
2.事務半消息存儲代碼分析(一)
3.事務半消息存儲代碼分析(二)
四、事務消息服務端響應結(jié)束事務請求
1.處理未知類型請求
2.處理事務提交請求
3.處理事務回滾請求
五、事務消息服務端狀態(tài)回查
1.事務回查線程類調(diào)用鏈
2.事務回查邏輯
3.客戶端響應事務回查
六、事務消息交互示意圖
一、問題思考
從官方給的例子入手,代碼如下:
示例類:org.apache.rocketmq.example.transaction.TransactionProducer.java
public static void main(String[] args) throws MQClientException, InterruptedException {
//@1 定義TransactionListener
TransactionListener transactionListener = new TransactionListenerImpl();
//@2 使用事務發(fā)送Producer
TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");
//@3 定義線程池
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
//設(shè)置線程池
producer.setExecutorService(executorService);
//設(shè)置監(jiān)聽器
producer.setTransactionListener(transactionListener);
producer.setNamesrvAddr("127.0.0.1:9876");
//@4 發(fā)送者啟動
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message("TopicTest1234", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//@5 消息發(fā)送
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
//發(fā)送者關(guān)閉
producer.shutdown();
}
從上面客戶端例子中思考一些問題:
1. @1定義TransactionListener做什么用?
2. @2定義的TransactionMQProducer與普通Produer區(qū)別在哪里?
3. @3定義線程池executorService又是干啥的?
4. @4事務發(fā)送者啟動發(fā)送流程是怎么樣?
5. 發(fā)送事務消息如何和Broker進行交互的?
二、事務消息客戶端發(fā)送流程
1.事務發(fā)送與普通啟動差異
@1 producer.start();
@2 TransactionMQProducer#start
this.defaultMQProducerImpl.initTransactionEnv();
super.start();
@3 DefaultMQProducerImpl#initTransactionEnv()
this.checkExecutor = producer.getExecutorService();
小結(jié):事務發(fā)送時比普通發(fā)送啟動多了initTransactionEnv操作,即:給ExecutorService checkExecutor賦值。
2.事務消息發(fā)送調(diào)用鏈
@1 SendResult sendResult = producer.sendMessageInTransaction
@2 TransactionMQProducer#sendMessageInTransaction
@3 DefaultMQProducerImpl#sendMessageInTransaction
3.事務消息發(fā)送分析
方法:DefaultMQProducerImpl#sendMessageInTransaction
//@1
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//@2 表示消息的prepare消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true");
//@3 生產(chǎn)者組,用于回查本地事務事,從生產(chǎn)者組中選擇隨機選擇一個生產(chǎn)者即可
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultMQProducer.getProducerGroup());
try {
//@4 消息發(fā)送
sendResult = this.send(msg);
} catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
方法:DefaultMQProducerImpl#sendKernelImpl
//事務消息發(fā)送,設(shè)置PREPARED標記
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
//@5 請求header中設(shè)置事務標記
requestHeader.setSysFlag(sysFlag);
//@6 發(fā)送消息請求的RequestCode
request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
小結(jié):
@1獲取TransactionListener即示例代碼傳入的Listener
@2在消息屬性中加入PROPERTY_TRANSACTION_PREPARED = "TRAN_MSG"即事務半消息
@3設(shè)置ProducerGroup Broker在事務回查時調(diào)用
@4事務消息發(fā)送采用同步發(fā)送,發(fā)送流程與普通消息發(fā)送一致
@5請求header中設(shè)置事務標記SEND_MESSAGE = 10
4.事務消息發(fā)送結(jié)果分析
方法:DefaultMQProducerImpl#sendMessageInTransaction
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
switch (sendResult.getSendStatus()) {
//@1
case SEND_OK: {
...
//@2 執(zhí)行本地事務
localTransactionState = transactionListener.executeLocalTransaction(msg, arg);
...
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
小結(jié):
@1 發(fā)送半消息(Prepared)消息成功,設(shè)置transactionId。
@2 發(fā)送半消息成功后,通過transactionListener回調(diào)客戶端查詢本地事務執(zhí)行情況,并返回事務執(zhí)行狀態(tài)。
LocalTransactionState有COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW三種狀態(tài)。
5.結(jié)束事務分析
方法:DefaultMQProducerImpl#sendMessageInTransaction
try {
//@1 結(jié)束事務,根據(jù)返回的事務狀態(tài)執(zhí)行提交、回滾、暫時不處理
this.endTransaction(sendResult, localTransactionState, localException);
} catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", e);
}
TransactionSendResult transactionSendResult = new TransactionSendResult();
...
return transactionSendResult;
方法:DefaultMQProducerImpl#endTransaction
...
switch (localTransactionState) {
//@2 設(shè)置事務提交標記Header
case COMMIT_MESSAGE: requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE);
break;
//@3 設(shè)置事務回滾標記Header
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE);
break;
//@4 設(shè)置事務未知標記Header
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE);
break;
default:
break;
}
//@5 通過一次發(fā)送方式向Broker提交事務 this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark,
this.defaultMQProducer.getSendMsgTimeout());
}
小結(jié):
@1 根據(jù)本地事務執(zhí)行返回的狀態(tài)localTransactionState,調(diào)用結(jié)束事務方法
@2 requestHeader設(shè)置事務提交標記0x2 << 2=8
@3 requestHeader設(shè)置事務回滾標記0x3 << 2=12
@4 requestHeader設(shè)置未知標記0
@5 通過一次發(fā)送方式向Broker提交事務 RequestCode為END_TRANSACTION = 37
三、事務消息服務端存儲流程
1.事務消息存儲調(diào)用鏈
@1 SendMessageProcessor#processRequest
response = this.sendMessage(ctx, request, mqtraceContext, requestHeader)
@2 SendMessageProcessor#sendMessage
2.事務半消息存儲代碼分析(一)
方法:SendMessageProcessor#sendMessage
//@1 可以通過配置來是否接受事務消息存儲
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()){
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return response;
}
//@2 prepare消息存儲
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
小結(jié):
@1 可以通過Broker配置屬性rejectTransactionMessage來決定是否接受事務消息請求,默認為false即接受。
@2 半消息存儲
3.事務半消息存儲代碼分析(二)
方法:TransactionalMessageBridge#putHalfMessage
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner){
return store.putMessage(parseHalfMessageInner(messageInner));
}
方法:TransactionalMessageBridge#parseHalfMessageInner
//@1 備份原主題
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getTopic());
//@2 備份原queueID
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
//@3 重置sysFlag
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANSACTION_NOT_TYPE));
//@3 主題變更 RMQ_SYS_TRANS_HALF_TOPIC
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
//@4 消息隊列變更為0
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
小結(jié):半消息在存儲前將存儲的主題設(shè)置為RMQ_SYS_TRANS_HALF_TOPIC,將原來的Topic備份到屬性中,同時也備份了原來的QueueId。這也是為什么半消息不會被消費者消費的原因。
四、事務消息服務端響應結(jié)束事務請求
1.處理未知類型請求
方法:EndTransactionProcessor#processRequest
case MessageSysFlag.TRANSACTION_NOT_TYPE: {
LOGGER.warn("The producer[{}] end transaction in sending message, and it's pending status."
+ "RequestHeader: {} Remark: {}",
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
requestHeader.toString(),
request.getRemark());
//@1
return null;
}
小結(jié):結(jié)束事務在處理未知類型TRANSACTION_NOT_TYPE時,只打印告警日志不做處理。
2.處理事務提交請求
半消息查找。方法:EndTransactionProcessor#processRequest
if (MessageSysFlag.TRANSACTION_COMMIT_TYPE
== requestHeader.getCommitOrRollback()){
//@1 將prepare消息找出來
result = this.brokerController.getTransactionalMessageService()
.commitMessage(requestHeader);
...
//@2
MessageExtBrokerInner msgInner = endMessageTransaction(result.getPrepareMessage());
}
半消息還原。方法:EndTransactionProcessor#endMessageTransaction
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
//@3 置換為原來的Topic
msgInner.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC));
//@4 置換為原來的QueueId
msgInner.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID)));
...
//清除屬性設(shè)置
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC);
MessageAccessor.clearProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID);
return msgInner;
小結(jié):
@1 根據(jù)偏移量將半消息查找出來
@2 將存儲在RMQ_SYS_TRANS_HALF_TOPIC還原
@3 置換為原來的Topic
@4 置換為原來的QueueId
還原后消息存儲。方法:EndTransactionProcessor#processRequest
//@1 新組裝的消息存儲(提交)
RemotingCommand sendResult = sendFinalMessage(msgInner);
if (sendResult.getCode() == ResponseCode.SUCCESS) {
//@2 刪除prepare消息 是將消息存儲于RMQ_SYS_TRANS_OP_HALF_TOPIC中
this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
小結(jié):
@1 將還原后的消息存儲
@2 刪除半消息消息
半消息刪除。方法:TransactionalMessageServiceImpl#putOpMessage
public boolean putOpMessage(MessageExt messageExt, String opType) {
MessageQueue messageQueue = new MessageQueue(messageExt.getTopic(),
this.brokerController.getBrokerConfig().getBrokerName(), messageExt.getQueueId());
if (TransactionalMessageUtil.REMOVETAG.equals(opType)) {
return addRemoveTagInTransactionOp(messageExt, messageQueue);
}
return true;
}
private boolean addRemoveTagInTransactionOp(MessageExt messageExt, MessageQueue messageQueue) {
//@1 主題變更為RMQ_SYS_TRANS_OP_HALF_TOPIC
Message message = new Message(TransactionalMessageUtil.buildOpTopic(), TransactionalMessageUtil.REMOVETAG, String.valueOf(messageExt.getQueueOffset()).getBytes(TransactionalMessageUtil.charset));
//@2 存儲消息
writeOp(message, messageQueue);
return true;
}
小結(jié):半消息的刪除是將Topic從RMQ_SYS_TRANS_HALF_TOPIC變更為RMQ_SYS_TRANS_OP_HALF_TOPIC存儲到日志文件,依靠文件刪除機制刪除。
3.處理事務回滾請求
方法:EndTransactionProcessor#processRequest
//@1 查找半消息
result = this.brokerController.getTransactionalMessageService().rollbackMessage(requestHeader);
if (result.getResponseCode() == ResponseCode.SUCCESS) {
if (res.getCode() == ResponseCode.SUCCESS) {
//@2 刪除prepare消息 是將消息存儲于RMQ_SYS_TRANS_OP_HALF_TOPIC中 this.brokerController.getTransactionalMessageService().deletePrepareMessage(result.getPrepareMessage());
}
return res;
}
}
小結(jié):處理事務回滾請求,將半消息查找出來,將其刪除即:將Topic從RMQ_SYS_TRANS_HALF_TOPIC變更為RMQ_SYS_TRANS_OP_HALF_TOPIC并存儲,依靠文件刪除機制刪除。
五、事務消息服務端狀態(tài)回查
1.事務回查線程類調(diào)用鏈
線程類初始化:TransactionalMessageCheckService
@1 main(String[] args)
start(createBrokerController(args));
@2 createBrokerController
@3 initialize()
@4 initialTransaction()
this.transactionalMessageCheckService =
new TransactionalMessageCheckService(this);
小結(jié):在Broker初始化啟動時,TransactionalMessageCheckService線程類也隨著啟動初始化。
2.事務回查邏輯
方法:TransactionalMessageCheckService#run
//@1 時間間隔為60秒
long checkInterval = brokerController.getBrokerConfig().getTransactionCheckInterval();
while (!this.isStopped()) {
this.waitForRunning(checkInterval);
}
方法:TransactionalMessageCheckService#onWaitEnd
//@2 transactionTimeOut默認6秒
long timeout = brokerController.getBrokerConfig().getTransactionTimeOut();
//@3 最大核查次數(shù)為15次
int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax();
long begin = System.currentTimeMillis();
this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener());
小結(jié):事務回查每隔60秒執(zhí)行一次,一次執(zhí)行超時時間為6秒,最大回查次數(shù)為15次。
回查邏輯(一)。方法:TransactionalMessageServiceImpl#check
//@1
Set<MessageQueue> msgQueues = transactionalMessageBridge.fetchMessageQueues(topic);
for (MessageQueue messageQueue : msgQueues) {
//獲取對應的RMQ_SYS_TRANS_OP_HALF_TOPIC中的隊列
MessageQueue opQueue = getOpQueue(messageQueue);
//半消息消費隊列中偏移量
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue);
//OP已刪除消費隊列中的偏移量
long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue);
}
//@2
PullResult pullResult = fillOpRemoveMap(removeMap, opQueue, opOffset, halfOffset, doneOpOffset);
方法:TransactionalMessageServiceImpl#fillOpRemoveMap
//@3
PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32);
List<MessageExt> opMsg = pullResult.getMsgFoundList();
for (MessageExt opMessageExt : opMsg) {
Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset));
if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) {
//已經(jīng)處理過的消息即commit和rollback
if (queueOffset < miniOffset) {
doneOpOffset.add(opMessageExt.getQueueOffset());
} else {
//已經(jīng)處理刪除過了,但是半消息還沒有更新
removeMap.put(queueOffset, opMessageExt.getQueueOffset());
}
} else {
log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt);
}
}
小結(jié):
@1 從半消息隊列中查找消息隊列
@2 opQueue隊列中的消息均為已經(jīng)刪除的半消息,需要檢查下是否已經(jīng)刪除了,當時半消息隊列還沒有更新。
@3 miniOffset為半消息消費隊列中的最大偏移量;queueOffset為刪除消費隊列的消息偏移量;通過比較兩者來確定是否已經(jīng)刪除了,而半消息狀態(tài)還沒有更新,并將這類消息存儲在removeMap中。
回查邏輯(二)。方法:TransactionalMessageServiceImpl#check
while (true) {
//查找消息
GetResult getResult = getHalfMsg(messageQueue, i);
//@1 已經(jīng)處理過了,半消息滯后了,偏移量繼續(xù)遞增往下走
if (removeMap.containsKey(i)) {
log.info("Half offset {} has been committed/rolled back", i);
removeMap.remove(i);
}
//@2 needSkip 超過存儲時間(默認3天) needDiscard 超過回查次數(shù),默認15次
if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) {
listener.resolveDiscardMsg(msgExt);
newOffset = i + 1;
i++;
continue;
}
//@3 消息存儲時間大于開始時間暫不處理
if (msgExt.getStoreTimestamp() >= startTime) {
break;
}
//@4 存儲的時間小于需要回查的時間 跳過
if (valueOfCurrentMinusBorn < checkImmunityTime) {
if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) {
newOffset = i + 1;
i++;
continue;
}
}
//接著往下處理
newOffset = i + 1;
i++;
}
小結(jié)
@1 removeMap(即已刪除隊列有而半消息隊列未更新的消息)有則不在處理跳過該消息。
@2 超過存儲時間或者回查次數(shù)超過15次不再處理
@3 消息存儲時間大于核查程序開始時間暫不處理
@4 如果定義了回查的時間間隔需要判斷是否到時間了
回查邏輯(三)。方法:TransactionalMessageServiceImpl#check
if (isNeedCheck) {
//@1
if (!putBackHalfMsgQueue(msgExt, i)) {
continue;
}
//@2
listener.resolveHalfMsg(msgExt);
}} else {
}
//@3
//保存prepare消息隊列的回查消費進度
if (newOffset != halfOffset) {
transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset);
}
long newOpOffset = calculateOpOffset(doneOpOffset, opOffset);
//保存OP消費進度
if (newOpOffset != opOffset) {
transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset);
}
小結(jié):
@1 將半消息重新存儲在RMQ_SYS_TRANS_HALF_TOPIC中,由于本次回查尚未知道結(jié)果,所以進行存儲。
@2 發(fā)到客戶端進行回查,回查的RequestCode為CHECK_TRANSACTION_STATE = 39,根據(jù)ProductGroup隨機獲取客戶端通道Channel進行回查。
@3 保存半消息和已處理消息的消費進度。
3.客戶端響應事務回查
方法:ClientRemotingProcessor#checkTransactionState
producer.checkTransactionState(addr, messageExt, requestHeader);
方法:MQProducerInner#checkTransactionState
//@1
localTransactionState = transactionListener.checkLocalTransaction(message);
//@2
DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark,
3000);
小結(jié):
@1 執(zhí)行本地事務回查并返回事務回查狀態(tài)
@2 將事務回查狀態(tài)提交到Broker
六、事務消息交互示意圖

作者老梁,哈啰出行高級技術(shù)專家,參與了《RocketMQ技術(shù)內(nèi)幕》審稿工作。專注后端中間件方向,已陸續(xù)發(fā)表RocketMQ系列、Kafka系列、gRPC系列、Sentinel系列、Java NIO系列。其中RocketMQ系列已發(fā)表40余篇。源碼、實戰(zhàn)、原理、調(diào)優(yōu)期待與你一起學習。

