
本文已收錄GitHub,更有互聯(lián)網(wǎng)大廠面試真題,面試攻略,高效學(xué)習(xí)資料等
RocketMQ的事務(wù)是如何實(shí)現(xiàn)的?
首先我們來(lái)看 RocketMQ 的事務(wù)。我在之前的課程中,已經(jīng)給大家講解過(guò) RocketMQ 事務(wù)的大致流程,這里我們?cè)僖黄鹜ㄟ^(guò)代碼,重溫一下這個(gè)流程。
public class CreateOrderService {
@Inject
private OrderDao orderDao;
//注入訂單表的DAO
@Inject
private ExecutorService executorService;
//注入一個(gè)ExecutorService
private TransactionMQProducer producer;
//初始化transactionListener和producer
@Init
public void init() throws MQClientException {
TransactionListener transactionListener = createTransactionListener();
producer = new TransactionMQProducer("myGroup");
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
}
//創(chuàng)建訂單服務(wù)的請(qǐng)求入口
@PUT
@RequestMapping(...)
public Boolean createOrder(@RequestBody CreateOrderRequest request) {
//根據(jù)創(chuàng)建訂單請(qǐng)求創(chuàng)建一條消息
Message msg = createMessage(request);
//發(fā)送事務(wù)消息
SendResult sendResult = producer.sendMessageInTransaction(msg, request);
//返回:事務(wù)是否成功
return sendResult.getSendStatus() == SendStatus.SEND_OK;
}
private TransactionListener createTransactionListener() {
return new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
CreateOrderRequest request = (CreateOrderRequest ) arg;
try {
//執(zhí)行本地事務(wù)創(chuàng)建訂單
orderDao.createOrderInDB(request);
//如果沒(méi)拋異常說(shuō)明執(zhí)行成功,提交事務(wù)消息
return LocalTransactionState.COMMIT_MESSAGE;
}
catch (Throwable t) {
//失敗則直接回滾事務(wù)消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
//反查本地事務(wù)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
//從消息中獲得訂單ID
String orderId = msg.getUserProperty("orderId");
//去數(shù)據(jù)庫(kù)中查詢(xún)訂單號(hào)是否存在,如果存在則提交事務(wù);
//如果不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回UNKNOW//(PS:這里RocketMQ有個(gè)拼寫(xiě)錯(cuò)誤:UNKNOW)
return orderDao.isOrderIdExistsInDB(orderId)?
LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
}
}
;
}
//....
}
在這個(gè)流程中,我們提供一個(gè)創(chuàng)建訂單的服務(wù),功能就是在數(shù)據(jù)庫(kù)中插入一條訂單記錄,并發(fā)送一條創(chuàng)建訂單的消息,要求寫(xiě)數(shù)據(jù)庫(kù)和發(fā)消息這兩個(gè)操作在一個(gè)事務(wù)內(nèi)執(zhí)行,要么都成功,要么都失敗。在這段代碼中,我們首先在 init() 方法中初始化了 transactionListener和發(fā)生 RocketMQ 事務(wù)消息的變量 producer。真正提供創(chuàng)建訂單服務(wù)的方法是createOrder(),在這個(gè)方法里面,我們根據(jù)請(qǐng)求的參數(shù)創(chuàng)建一條消息,然后調(diào)用RocketMQ producer 發(fā)送事務(wù)消息,并返回事務(wù)執(zhí)行結(jié)果。
之后的 createTransactionListener() 方法是在 init() 方法中調(diào)用的,這里面直接構(gòu)造一個(gè)匿名類(lèi),來(lái)實(shí)現(xiàn) RocketMQ 的 TransactionListener 接口,這個(gè)接口需要實(shí)現(xiàn)兩個(gè)方法:
- executeLocalTransaction:執(zhí)行本地事務(wù),在這里我們直接把訂單數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中,并返回本地事務(wù)的執(zhí)行結(jié)果。
- checkLocalTransaction:反查本地事務(wù),在這里我們的處理是,在數(shù)據(jù)庫(kù)中查詢(xún)訂單號(hào)是否存在,如果存在則提交事務(wù),如果不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回 UNKNOW。
這樣,就使用 RocketMQ 的事務(wù)消息功能實(shí)現(xiàn)了一個(gè)創(chuàng)建訂單的分布式事務(wù)。接下來(lái)我們一起通過(guò) RocketMQ 的源代碼來(lái)看一下,它的事務(wù)消息是如何實(shí)現(xiàn)的。
首先看一下在 producer 中,是如何來(lái)發(fā)送事務(wù)消息的:
public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter locthrows MQClientException {
TransactionListener transactionListener = getCheckListener();
if (null == localTransactionExecuter && null == transactionListener) {
throw new MQClientException("tranExecutor is null", null);
}
Validators.checkMessage(msg, this.defaultMQProducer);
SendResult sendResult = null;
//這里給消息添加了屬性,標(biāo)明這是一個(gè)事務(wù)消息,也就是半消息
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultM
//調(diào)用發(fā)送普通消息的方法,發(fā)送這條半消息
try {
sendResult = this.send(msg);
}
catch (Exception e) {
throw new MQClientException("send message Exception", e);
}
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
Throwable localException = null;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
try {
if (sendResult.getTransactionId() != null) {
msg.putUserProperty("__transactionId__", sendResult.getTransactionId
}
String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT
if (null != transactionId && !"".equals(transactionId)) {
msg.setTransactionId(transactionId);
}
//執(zhí)行本地事務(wù)
if (null != localTransactionExecuter) {
localTransactionState = localTransactionExecuter.executeLocalTransac
} else if (transactionListener != null) {
log.debug("Used new transaction API");
localTransactionState = transactionListener.executeLocalTransaction
}
if (null == localTransactionState) {
localTransactionState = LocalTransactionState.UNKNOW;
}
if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
log.info("executeLocalTransactionBranch return {}", localTransaction
log.info(msg.toString());
}
}
catch (Throwable e) {
log.info("executeLocalTransactionBranch exception", e);
log.info(msg.toString());
localException = e;
}
}
break;
case FLUSH_DISK_TIMEOUT:
case FLUSH_SLAVE_TIMEOUT:
case SLAVE_NOT_AVAILABLE:
localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
break;
default:
break;
}
//根據(jù)事務(wù)消息和本地事務(wù)的執(zhí)行結(jié)果localTransactionState,決定提交或回滾事務(wù)消息
//這里給Broker發(fā)送提交或回滾事務(wù)的RPC請(qǐng)求。
try {
this.endTransaction(sendResult, localTransactionState, localException);
}
catch (Exception e) {
log.warn("local transaction execute " + localTransactionState + ", but end broke
}
TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;
這段代碼的實(shí)現(xiàn)邏輯是這樣的:首先給待發(fā)送消息添加了一個(gè)屬性PROPERTY_TRANSACTION_PREPARED,標(biāo)明這是一個(gè)事務(wù)消息,也就是半消息,然后會(huì)像發(fā)送普通消息一樣去把這條消息發(fā)送到 Broker 上。如果發(fā)送成功了,就開(kāi)始調(diào)用我們之前提供的接口 TransactionListener 的實(shí)現(xiàn)類(lèi)中,執(zhí)行本地事務(wù)的方法executeLocalTransaction() 來(lái)執(zhí)行本地事務(wù),在我們的例子中就是在數(shù)據(jù)庫(kù)中插入一條訂單記錄。
最后,根據(jù)半消息發(fā)送的結(jié)果和本地事務(wù)執(zhí)行的結(jié)果,來(lái)決定提交或者回滾事務(wù)。在實(shí)現(xiàn)方法 endTransaction() 中,producer 就是給 Broker 發(fā)送了一個(gè)單向的 RPC 請(qǐng)求,告知Broker 完成事務(wù)的提交或者回滾。由于有事務(wù)反查的機(jī)制來(lái)兜底,這個(gè) RPC 請(qǐng)求即使失敗或者丟失,也都不會(huì)影響事務(wù)最終的結(jié)果。最后構(gòu)建事務(wù)消息的發(fā)送結(jié)果,并返回。
以上,就是 RocketMQ 在 Producer 這一端事務(wù)消息的實(shí)現(xiàn),然后我們?cè)倏匆幌?Broker這一端,它是怎么來(lái)處理事務(wù)消息和進(jìn)行事務(wù)反查的。
Broker 在處理 Producer 發(fā)送消息的請(qǐng)求時(shí),會(huì)根據(jù)消息中的屬性判斷一下,這條消息是普通消息還是半消息:
// ...
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
// ...
putMessageResult = this.brokerController.getTransactionalMessageService().prepareMes
} else {
putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// ...
這段代碼在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage 方法中,然后我們跟進(jìn)去看看真正處理半消息的業(yè)務(wù)邏輯,這段處理邏輯在類(lèi)org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge 中:
public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
//記錄消息的主題和隊(duì)列,到新的屬性中
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
String.valueOf(msgInner.getQueueId()));
msgInner.setSysFlag(
MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANS//替換消息的主題和隊(duì)列為:RMQ_SYS_TRANS_HALF_TOPIC,0
msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
msgInner.setQueueId(0);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProreturn msgInner;
}
我們可以看到,在這段代碼中,RocketMQ 并沒(méi)有把半消息保存到消息中客戶端指定的那個(gè)隊(duì)列中,而是記錄了原始的主題隊(duì)列后,把這個(gè)半消息保存在了一個(gè)特殊的內(nèi)部主題RMQ_SYS_TRANS_HALF_TOPIC 中,使用的隊(duì)列號(hào)固定為 0。這個(gè)主題和隊(duì)列對(duì)消費(fèi)者是不可見(jiàn)的,所以里面的消息永遠(yuǎn)不會(huì)被消費(fèi)。這樣,就保證了在事務(wù)提交成功之前,這個(gè)半消息對(duì)消費(fèi)者來(lái)說(shuō)是消費(fèi)不到的。
然后我們?cè)倏匆幌?,RocketMQ 是如何進(jìn)行事務(wù)反查的:在 Broker 的TransactionalMessageCheckService 服務(wù)中啟動(dòng)了一個(gè)定時(shí)器,定時(shí)從半消息隊(duì)列中讀出所有待反查的半消息,針對(duì)每個(gè)需要反查的半消息,Broker 會(huì)給對(duì)應(yīng)的 Producer 發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的 RPC 請(qǐng)求,這部分的邏輯在方法org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage 中,根據(jù) RPC 返回響應(yīng)中的反查結(jié)果,來(lái)決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來(lái)反查。
最后,提交或者回滾事務(wù)實(shí)現(xiàn)的邏輯是差不多的,首先把半消息標(biāo)記為已處理,如果是提交事務(wù),那就把半消息從半消息隊(duì)列中復(fù)制到這個(gè)消息真正的主題和隊(duì)列中去,如果要回滾事務(wù),這一步什么都不需要做,最后結(jié)束這個(gè)事務(wù)。這部分邏輯的實(shí)現(xiàn)在org.apache.rocketmq.broker.processor.EndTransactionProcessor 這個(gè)類(lèi)中。
Kafka的事務(wù)和Exactly Once可以解決什么問(wèn)題?
接下來(lái)我們?cè)僬f(shuō)一下 Kafka 的事務(wù)。之前我們講事務(wù)的時(shí)候說(shuō)過(guò),Kafka 的事務(wù)解決的問(wèn)題和 RocketMQ 是不太一樣的。RocketMQ 中的事務(wù),它解決的問(wèn)題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個(gè)操作,要么都成功,要么都失敗。并且,RocketMQ 增加了一個(gè)事務(wù)反查的機(jī)制,來(lái)盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。
而 Kafka 中的事務(wù),它解決的問(wèn)題是,確保在一個(gè)事務(wù)中發(fā)送的多條消息,要么都成功,要么都失敗。注意,這里面的多條消息不一定要在同一個(gè)主題和分區(qū)中,可以是發(fā)往多個(gè)主題和分區(qū)的消息。當(dāng)然,你可以在 Kafka 的事務(wù)執(zhí)行過(guò)程中,加入本地事務(wù),來(lái)實(shí)現(xiàn)和RocketMQ 中事務(wù)類(lèi)似的效果,但是 Kafka 是沒(méi)有事務(wù)反查機(jī)制的。
Kafka 的這種事務(wù)機(jī)制,單獨(dú)來(lái)使用的場(chǎng)景不多。更多的情況下被用來(lái)配合 Kafka 的冪等機(jī)制來(lái)實(shí)現(xiàn) Kafka 的 Exactly Once 語(yǔ)義。我在之前的課程中也強(qiáng)調(diào)過(guò),這里面的 ExactlyOnce,和我們通常理解的消息隊(duì)列的服務(wù)水平中的 Exactly Once 是不一樣的。
我們通常理解消息隊(duì)列的服務(wù)水平中的 Exactly Once,它指的是,消息從生產(chǎn)者發(fā)送到Broker,然后消費(fèi)者再?gòu)?Broker 拉取消息,然后進(jìn)行消費(fèi)。這個(gè)過(guò)程中,確保每一條消息恰好傳輸一次,不重不丟。我們之前說(shuō)過(guò),包括 Kafka 在內(nèi)的幾個(gè)常見(jiàn)的開(kāi)源消息隊(duì)列,都只能做到 At Least Once,也就是至少一次,保證消息不丟,但有可能會(huì)重復(fù)。做不到Exactly Once。

那 Kafka 中的 Exactly Once 又是解決的什么問(wèn)題呢?它解決的是,在流計(jì)算中,用 Kafka作為數(shù)據(jù)源,并且將計(jì)算結(jié)果保存到 Kafka 這種場(chǎng)景下,數(shù)據(jù)從 Kafka 的某個(gè)主題中消費(fèi),在計(jì)算集群中計(jì)算,再把計(jì)算結(jié)果保存在 Kafka 的其他主題中。這樣的過(guò)程中,保證每條消息都被恰好計(jì)算一次,確保計(jì)算結(jié)果正確。

舉個(gè)例子,比如,我們把所有訂單消息保存在一個(gè) Kafka 的主題 Order 中,在 Flink 集群中運(yùn)行一個(gè)計(jì)算任務(wù),統(tǒng)計(jì)每分鐘的訂單收入,然后把結(jié)果保存在另一個(gè) Kafka 的主題Income 里面。要保證計(jì)算結(jié)果準(zhǔn)確,就要確保,無(wú)論是 Kafka 集群還是 Flink 集群中任何節(jié)點(diǎn)發(fā)生故障,每條消息都只能被計(jì)算一次,不能重復(fù)計(jì)算,否則計(jì)算結(jié)果就錯(cuò)了。這里面有一個(gè)很重要的限制條件,就是數(shù)據(jù)必須來(lái)自 Kafka 并且計(jì)算結(jié)果都必須保存到 Kafka中,才可以享受到 Kafka 的 Excactly Once 機(jī)制。
可以看到,Kafka 的 Exactly Once 機(jī)制,是為了解決在“讀數(shù)據(jù) - 計(jì)算 - 保存結(jié)果”這樣的計(jì)算過(guò)程中數(shù)據(jù)不重不丟,而不是我們通常理解的使用消息隊(duì)列進(jìn)行消息生產(chǎn)消費(fèi)過(guò)程中的 Exactly Once。
Kafka的事務(wù)是如何實(shí)現(xiàn)的?
那 Kafka 的事務(wù)又是怎么實(shí)現(xiàn)的呢?它的實(shí)現(xiàn)原理和 RocketMQ 的事務(wù)是差不多的,都是基于兩階段提交來(lái)實(shí)現(xiàn)的,但是實(shí)現(xiàn)的過(guò)程更加復(fù)雜。
首先說(shuō)一下,參與 Kafka 事務(wù)的幾個(gè)角色,或者說(shuō)是模塊。為了解決分布式事務(wù)問(wèn)題,Kafka 引入了事務(wù)協(xié)調(diào)者這個(gè)角色,負(fù)責(zé)在服務(wù)端協(xié)調(diào)整個(gè)事務(wù)。這個(gè)協(xié)調(diào)者并不是一個(gè)獨(dú)立的進(jìn)程,而是 Broker 進(jìn)程的一部分,協(xié)調(diào)者和分區(qū)一樣通過(guò)選舉來(lái)保證自身的可用性。
和 RocketMQ 類(lèi)似,Kafka 集群中也有一個(gè)特殊的用于記錄事務(wù)日志的主題,這個(gè)事務(wù)日志主題的實(shí)現(xiàn)和普通的主題是一樣的,里面記錄的數(shù)據(jù)就是類(lèi)似于“開(kāi)啟事務(wù)”“提交事務(wù)”這樣的事務(wù)日志。日志主題同樣也包含了很多的分區(qū)。在 Kafka 集群中,可以存在多個(gè)協(xié)調(diào)者,每個(gè)協(xié)調(diào)者負(fù)責(zé)管理和使用事務(wù)日志中的幾個(gè)分區(qū)。這樣設(shè)計(jì),其實(shí)就是為了能并行執(zhí)行多個(gè)事務(wù),提升性能。

下面說(shuō)一下 Kafka 事務(wù)的實(shí)現(xiàn)流程。
首先,當(dāng)我們開(kāi)啟事務(wù)的時(shí)候,生產(chǎn)者會(huì)給協(xié)調(diào)者發(fā)一個(gè)請(qǐng)求來(lái)開(kāi)啟事務(wù),協(xié)調(diào)者在事務(wù)日志中記錄下事務(wù) ID。
然后,生產(chǎn)者在發(fā)送消息之前,還要給協(xié)調(diào)者發(fā)送請(qǐng)求,告知發(fā)送的消息屬于哪個(gè)主題和分區(qū),這個(gè)信息也會(huì)被協(xié)調(diào)者記錄在事務(wù)日志中。接下來(lái),生產(chǎn)者就可以像發(fā)送普通消息一樣來(lái)發(fā)送事務(wù)消息,這里和 RocketMQ 不同的是,RocketMQ 選擇把未提交的事務(wù)消息保存在特殊的隊(duì)列中,而 Kafka 在處理未提交的事務(wù)消息時(shí),和普通消息是一樣的,直接發(fā)給 Broker,保存在這些消息對(duì)應(yīng)的分區(qū)中,Kafka 會(huì)在客戶端的消費(fèi)者中,暫時(shí)過(guò)濾未提交的事務(wù)消息。
消息發(fā)送完成后,生產(chǎn)者給協(xié)調(diào)者發(fā)送提交或回滾事務(wù)的請(qǐng)求,由協(xié)調(diào)者來(lái)開(kāi)始兩階段提交,完成事務(wù)。第一階段,協(xié)調(diào)者把事務(wù)的狀態(tài)設(shè)置為“預(yù)提交”,并寫(xiě)入事務(wù)日志。到這里,實(shí)際上事務(wù)已經(jīng)成功了,無(wú)論接下來(lái)發(fā)生什么情況,事務(wù)最終都會(huì)被提交。
之后便開(kāi)始第二階段,協(xié)調(diào)者在事務(wù)相關(guān)的所有分區(qū)中,都會(huì)寫(xiě)一條“事務(wù)結(jié)束”的特殊消息,當(dāng) Kafka 的消費(fèi)者,也就是客戶端,讀到這個(gè)事務(wù)結(jié)束的特殊消息之后,它就可以把之前暫時(shí)過(guò)濾的那些未提交的事務(wù)消息,放行給業(yè)務(wù)代碼進(jìn)行消費(fèi)了。最后,協(xié)調(diào)者記錄最后一條事務(wù)日志,標(biāo)識(shí)這個(gè)事務(wù)已經(jīng)結(jié)束了。
我把整個(gè)事務(wù)的實(shí)現(xiàn)流程,繪制成一個(gè)簡(jiǎn)單的時(shí)序圖放在這里,便于你理解。

總結(jié)一下 Kafka 這個(gè)兩階段的流程,準(zhǔn)備階段,生產(chǎn)者發(fā)消息給協(xié)調(diào)者開(kāi)啟事務(wù),然后消息發(fā)送到每個(gè)分區(qū)上。提交階段,生產(chǎn)者發(fā)消息給協(xié)調(diào)者提交事務(wù),協(xié)調(diào)者給每個(gè)分區(qū)發(fā)一條“事務(wù)結(jié)束”的消息,完成分布式事務(wù)提交。
總結(jié)
本文分別講解了 Kafka 和 RocketMQ 是如何來(lái)實(shí)現(xiàn)事務(wù)的。你可以看到,它們?cè)趯?shí)現(xiàn)事務(wù)過(guò)程中的一些共同的地方,它們都是基于兩階段提交來(lái)實(shí)現(xiàn)的事務(wù),都利用了特殊的主題中的隊(duì)列和分區(qū)來(lái)記錄事務(wù)日志。
不同之處在于對(duì)處于事務(wù)中的消息的處理方式,RocketMQ 是把這些消息暫存在一個(gè)特殊的隊(duì)列中,待事務(wù)提交后再移動(dòng)到業(yè)務(wù)隊(duì)列中;而 Kafka 直接把消息放到對(duì)應(yīng)的業(yè)務(wù)分區(qū)中,配合客戶端過(guò)濾來(lái)暫時(shí)屏蔽進(jìn)行中的事務(wù)消息。
同時(shí)你需要了解,RocketMQ 和 Kafka 的事務(wù),它們的適用場(chǎng)景是不一樣的,RocketMQ的事務(wù)適用于解決本地事務(wù)和發(fā)消息的數(shù)據(jù)一致性問(wèn)題,而 Kafka 的事務(wù)則是用于實(shí)現(xiàn)它的 Exactly Once 機(jī)制,應(yīng)用于實(shí)時(shí)計(jì)算的場(chǎng)景中。