阿里面試官:RocketMQ與Kafka中如何實(shí)現(xiàn)事務(wù)?

本文已收錄GitHub,更有互聯(lián)網(wǎng)大廠面試真題,面試攻略,高效學(xué)習(xí)資料等

RocketMQ的事務(wù)是如何實(shí)現(xiàn)的?

首先我們來(lái)看 RocketMQ 的事務(wù)。我在之前的課程中,已經(jīng)給大家講解過(guò) RocketMQ 事務(wù)的大致流程,這里我們?cè)僖黄鹜ㄟ^(guò)代碼,重溫一下這個(gè)流程。

public class CreateOrderService {
    @Inject
    private OrderDao orderDao;
    //注入訂單表的DAO
    @Inject
    private ExecutorService executorService;
    //注入一個(gè)ExecutorService
    private TransactionMQProducer producer;
    //初始化transactionListener和producer
    @Init
    public void init() throws MQClientException {
        TransactionListener transactionListener = createTransactionListener();
        producer = new TransactionMQProducer("myGroup");
        producer.setExecutorService(executorService);
        producer.setTransactionListener(transactionListener);
        producer.start();
    }
    //創(chuàng)建訂單服務(wù)的請(qǐng)求入口
    @PUT
    @RequestMapping(...)
    public Boolean createOrder(@RequestBody CreateOrderRequest request) {
        //根據(jù)創(chuàng)建訂單請(qǐng)求創(chuàng)建一條消息
        Message msg = createMessage(request);
        //發(fā)送事務(wù)消息
        SendResult sendResult = producer.sendMessageInTransaction(msg, request);
        //返回:事務(wù)是否成功
        return sendResult.getSendStatus() == SendStatus.SEND_OK;
    }
    private TransactionListener createTransactionListener() {
        return new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
                CreateOrderRequest request = (CreateOrderRequest ) arg;
                try {
                    //執(zhí)行本地事務(wù)創(chuàng)建訂單
                    orderDao.createOrderInDB(request);
                    //如果沒(méi)拋異常說(shuō)明執(zhí)行成功,提交事務(wù)消息
                    return LocalTransactionState.COMMIT_MESSAGE;
                }
                catch (Throwable t) {
                    //失敗則直接回滾事務(wù)消息
                    return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            //反查本地事務(wù)
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt msg) {
                //從消息中獲得訂單ID
                String orderId = msg.getUserProperty("orderId");
                //去數(shù)據(jù)庫(kù)中查詢(xún)訂單號(hào)是否存在,如果存在則提交事務(wù);
                //如果不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回UNKNOW//(PS:這里RocketMQ有個(gè)拼寫(xiě)錯(cuò)誤:UNKNOW)
                return orderDao.isOrderIdExistsInDB(orderId)?
                LocalTransactionState.COMMIT_MESSAGE: LocalTransactionState.UNKNOW;
            }
        }
        ;
    }
    //....
}

在這個(gè)流程中,我們提供一個(gè)創(chuàng)建訂單的服務(wù),功能就是在數(shù)據(jù)庫(kù)中插入一條訂單記錄,并發(fā)送一條創(chuàng)建訂單的消息,要求寫(xiě)數(shù)據(jù)庫(kù)和發(fā)消息這兩個(gè)操作在一個(gè)事務(wù)內(nèi)執(zhí)行,要么都成功,要么都失敗。在這段代碼中,我們首先在 init() 方法中初始化了 transactionListener和發(fā)生 RocketMQ 事務(wù)消息的變量 producer。真正提供創(chuàng)建訂單服務(wù)的方法是createOrder(),在這個(gè)方法里面,我們根據(jù)請(qǐng)求的參數(shù)創(chuàng)建一條消息,然后調(diào)用RocketMQ producer 發(fā)送事務(wù)消息,并返回事務(wù)執(zhí)行結(jié)果。

之后的 createTransactionListener() 方法是在 init() 方法中調(diào)用的,這里面直接構(gòu)造一個(gè)匿名類(lèi),來(lái)實(shí)現(xiàn) RocketMQ 的 TransactionListener 接口,這個(gè)接口需要實(shí)現(xiàn)兩個(gè)方法:

  • executeLocalTransaction:執(zhí)行本地事務(wù),在這里我們直接把訂單數(shù)據(jù)插入到數(shù)據(jù)庫(kù)中,并返回本地事務(wù)的執(zhí)行結(jié)果。
  • checkLocalTransaction:反查本地事務(wù),在這里我們的處理是,在數(shù)據(jù)庫(kù)中查詢(xún)訂單號(hào)是否存在,如果存在則提交事務(wù),如果不存在,可能是本地事務(wù)失敗了,也可能是本地事務(wù)還在執(zhí)行,所以返回 UNKNOW。

這樣,就使用 RocketMQ 的事務(wù)消息功能實(shí)現(xiàn)了一個(gè)創(chuàng)建訂單的分布式事務(wù)。接下來(lái)我們一起通過(guò) RocketMQ 的源代碼來(lái)看一下,它的事務(wù)消息是如何實(shí)現(xiàn)的。

首先看一下在 producer 中,是如何來(lái)發(fā)送事務(wù)消息的:

public TransactionSendResult sendMessageInTransaction(final Message msg,final LocalTransactionExecuter locthrows MQClientException {
    TransactionListener transactionListener = getCheckListener();
    if (null == localTransactionExecuter && null == transactionListener) {
        throw new MQClientException("tranExecutor is null", null);
    }
    Validators.checkMessage(msg, this.defaultMQProducer);
    SendResult sendResult = null;
    //這里給消息添加了屬性,標(biāo)明這是一個(gè)事務(wù)消息,也就是半消息
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_TRANSACTION_PREPARED, "true"
    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_PRODUCER_GROUP, this.defaultM
    //調(diào)用發(fā)送普通消息的方法,發(fā)送這條半消息
    try {
        sendResult = this.send(msg);
    }
    catch (Exception e) {
        throw new MQClientException("send message Exception", e);
    }
    LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
    Throwable localException = null;
    switch (sendResult.getSendStatus()) {
        case SEND_OK: {
            try {
                if (sendResult.getTransactionId() != null) {
                    msg.putUserProperty("__transactionId__", sendResult.getTransactionId
                }
                String transactionId = msg.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT
                if (null != transactionId && !"".equals(transactionId)) {
                    msg.setTransactionId(transactionId);
                }
                //執(zhí)行本地事務(wù)
                if (null != localTransactionExecuter) {
                    localTransactionState = localTransactionExecuter.executeLocalTransac
                    } else if (transactionListener != null) {
                        log.debug("Used new transaction API");
                        localTransactionState = transactionListener.executeLocalTransaction
                        }
                        if (null == localTransactionState) {
                            localTransactionState = LocalTransactionState.UNKNOW;
                        }
                        if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                            log.info("executeLocalTransactionBranch return {}", localTransaction
                            log.info(msg.toString());
                        }
                    }
                    catch (Throwable e) {
                        log.info("executeLocalTransactionBranch exception", e);
                        log.info(msg.toString());
                        localException = e;
                    }
                }
                break;
                case FLUSH_DISK_TIMEOUT:
                case FLUSH_SLAVE_TIMEOUT:
                case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
                break;
                default:
                break;
            }
            //根據(jù)事務(wù)消息和本地事務(wù)的執(zhí)行結(jié)果localTransactionState,決定提交或回滾事務(wù)消息
            //這里給Broker發(fā)送提交或回滾事務(wù)的RPC請(qǐng)求。
            try {
                this.endTransaction(sendResult, localTransactionState, localException);
            }
            catch (Exception e) {
                log.warn("local transaction execute " + localTransactionState + ", but end broke
}
TransactionSendResult transactionSendResult = new TransactionSendResult();transactionSendResult.setSendStatus(sendResult.getSendStatus());transactionSendResult.setMessageQueue(sendResult.getMessageQueue());transactionSendResult.setMsgId(sendResult.getMsgId());
transactionSendResult.setQueueOffset(sendResult.getQueueOffset());transactionSendResult.setTransactionId(sendResult.getTransactionId());transactionSendResult.setLocalTransactionState(localTransactionState);
return transactionSendResult;

這段代碼的實(shí)現(xiàn)邏輯是這樣的:首先給待發(fā)送消息添加了一個(gè)屬性PROPERTY_TRANSACTION_PREPARED,標(biāo)明這是一個(gè)事務(wù)消息,也就是半消息,然后會(huì)像發(fā)送普通消息一樣去把這條消息發(fā)送到 Broker 上。如果發(fā)送成功了,就開(kāi)始調(diào)用我們之前提供的接口 TransactionListener 的實(shí)現(xiàn)類(lèi)中,執(zhí)行本地事務(wù)的方法executeLocalTransaction() 來(lái)執(zhí)行本地事務(wù),在我們的例子中就是在數(shù)據(jù)庫(kù)中插入一條訂單記錄。

最后,根據(jù)半消息發(fā)送的結(jié)果和本地事務(wù)執(zhí)行的結(jié)果,來(lái)決定提交或者回滾事務(wù)。在實(shí)現(xiàn)方法 endTransaction() 中,producer 就是給 Broker 發(fā)送了一個(gè)單向的 RPC 請(qǐng)求,告知Broker 完成事務(wù)的提交或者回滾。由于有事務(wù)反查的機(jī)制來(lái)兜底,這個(gè) RPC 請(qǐng)求即使失敗或者丟失,也都不會(huì)影響事務(wù)最終的結(jié)果。最后構(gòu)建事務(wù)消息的發(fā)送結(jié)果,并返回。

以上,就是 RocketMQ 在 Producer 這一端事務(wù)消息的實(shí)現(xiàn),然后我們?cè)倏匆幌?Broker這一端,它是怎么來(lái)處理事務(wù)消息和進(jìn)行事務(wù)反查的。

Broker 在處理 Producer 發(fā)送消息的請(qǐng)求時(shí),會(huì)根據(jù)消息中的屬性判斷一下,這條消息是普通消息還是半消息:

// ...
if (traFlag != null && Boolean.parseBoolean(traFlag)) {
    // ...
    putMessageResult = this.brokerController.getTransactionalMessageService().prepareMes
} else {
    putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
}
// ...

這段代碼在org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage 方法中,然后我們跟進(jìn)去看看真正處理半消息的業(yè)務(wù)邏輯,這段處理邏輯在類(lèi)org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge 中:

public PutMessageResult putHalfMessage(MessageExtBrokerInner messageInner) {
    return store.putMessage(parseHalfMessageInner(messageInner));
}
private MessageExtBrokerInner parseHalfMessageInner(MessageExtBrokerInner msgInner) {
    //記錄消息的主題和隊(duì)列,到新的屬性中
    MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_TOPIC, msgInner.getMessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_REAL_QUEUE_ID,
    String.valueOf(msgInner.getQueueId()));
    msgInner.setSysFlag(
    MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(), MessageSysFlag.TRANS//替換消息的主題和隊(duì)列為:RMQ_SYS_TRANS_HALF_TOPIC,0
    msgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());
    msgInner.setQueueId(0);
    msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProreturn msgInner;
}

我們可以看到,在這段代碼中,RocketMQ 并沒(méi)有把半消息保存到消息中客戶端指定的那個(gè)隊(duì)列中,而是記錄了原始的主題隊(duì)列后,把這個(gè)半消息保存在了一個(gè)特殊的內(nèi)部主題RMQ_SYS_TRANS_HALF_TOPIC 中,使用的隊(duì)列號(hào)固定為 0。這個(gè)主題和隊(duì)列對(duì)消費(fèi)者是不可見(jiàn)的,所以里面的消息永遠(yuǎn)不會(huì)被消費(fèi)。這樣,就保證了在事務(wù)提交成功之前,這個(gè)半消息對(duì)消費(fèi)者來(lái)說(shuō)是消費(fèi)不到的。

然后我們?cè)倏匆幌?,RocketMQ 是如何進(jìn)行事務(wù)反查的:在 Broker 的TransactionalMessageCheckService 服務(wù)中啟動(dòng)了一個(gè)定時(shí)器,定時(shí)從半消息隊(duì)列中讀出所有待反查的半消息,針對(duì)每個(gè)需要反查的半消息,Broker 會(huì)給對(duì)應(yīng)的 Producer 發(fā)一個(gè)要求執(zhí)行事務(wù)狀態(tài)反查的 RPC 請(qǐng)求,這部分的邏輯在方法org.apache.rocketmq.broker.transaction.AbstractTransactionalMessageCheckListener#sendCheckMessage 中,根據(jù) RPC 返回響應(yīng)中的反查結(jié)果,來(lái)決定這個(gè)半消息是需要提交還是回滾,或者后續(xù)繼續(xù)來(lái)反查。

最后,提交或者回滾事務(wù)實(shí)現(xiàn)的邏輯是差不多的,首先把半消息標(biāo)記為已處理,如果是提交事務(wù),那就把半消息從半消息隊(duì)列中復(fù)制到這個(gè)消息真正的主題和隊(duì)列中去,如果要回滾事務(wù),這一步什么都不需要做,最后結(jié)束這個(gè)事務(wù)。這部分邏輯的實(shí)現(xiàn)在org.apache.rocketmq.broker.processor.EndTransactionProcessor 這個(gè)類(lèi)中。

Kafka的事務(wù)和Exactly Once可以解決什么問(wèn)題?

接下來(lái)我們?cè)僬f(shuō)一下 Kafka 的事務(wù)。之前我們講事務(wù)的時(shí)候說(shuō)過(guò),Kafka 的事務(wù)解決的問(wèn)題和 RocketMQ 是不太一樣的。RocketMQ 中的事務(wù),它解決的問(wèn)題是,確保執(zhí)行本地事務(wù)和發(fā)消息這兩個(gè)操作,要么都成功,要么都失敗。并且,RocketMQ 增加了一個(gè)事務(wù)反查的機(jī)制,來(lái)盡量提高事務(wù)執(zhí)行的成功率和數(shù)據(jù)一致性。

而 Kafka 中的事務(wù),它解決的問(wèn)題是,確保在一個(gè)事務(wù)中發(fā)送的多條消息,要么都成功,要么都失敗。注意,這里面的多條消息不一定要在同一個(gè)主題和分區(qū)中,可以是發(fā)往多個(gè)主題和分區(qū)的消息。當(dāng)然,你可以在 Kafka 的事務(wù)執(zhí)行過(guò)程中,加入本地事務(wù),來(lái)實(shí)現(xiàn)和RocketMQ 中事務(wù)類(lèi)似的效果,但是 Kafka 是沒(méi)有事務(wù)反查機(jī)制的。

Kafka 的這種事務(wù)機(jī)制,單獨(dú)來(lái)使用的場(chǎng)景不多。更多的情況下被用來(lái)配合 Kafka 的冪等機(jī)制來(lái)實(shí)現(xiàn) Kafka 的 Exactly Once 語(yǔ)義。我在之前的課程中也強(qiáng)調(diào)過(guò),這里面的 ExactlyOnce,和我們通常理解的消息隊(duì)列的服務(wù)水平中的 Exactly Once 是不一樣的。

我們通常理解消息隊(duì)列的服務(wù)水平中的 Exactly Once,它指的是,消息從生產(chǎn)者發(fā)送到Broker,然后消費(fèi)者再?gòu)?Broker 拉取消息,然后進(jìn)行消費(fèi)。這個(gè)過(guò)程中,確保每一條消息恰好傳輸一次,不重不丟。我們之前說(shuō)過(guò),包括 Kafka 在內(nèi)的幾個(gè)常見(jiàn)的開(kāi)源消息隊(duì)列,都只能做到 At Least Once,也就是至少一次,保證消息不丟,但有可能會(huì)重復(fù)。做不到Exactly Once。

那 Kafka 中的 Exactly Once 又是解決的什么問(wèn)題呢?它解決的是,在流計(jì)算中,用 Kafka作為數(shù)據(jù)源,并且將計(jì)算結(jié)果保存到 Kafka 這種場(chǎng)景下,數(shù)據(jù)從 Kafka 的某個(gè)主題中消費(fèi),在計(jì)算集群中計(jì)算,再把計(jì)算結(jié)果保存在 Kafka 的其他主題中。這樣的過(guò)程中,保證每條消息都被恰好計(jì)算一次,確保計(jì)算結(jié)果正確。

舉個(gè)例子,比如,我們把所有訂單消息保存在一個(gè) Kafka 的主題 Order 中,在 Flink 集群中運(yùn)行一個(gè)計(jì)算任務(wù),統(tǒng)計(jì)每分鐘的訂單收入,然后把結(jié)果保存在另一個(gè) Kafka 的主題Income 里面。要保證計(jì)算結(jié)果準(zhǔn)確,就要確保,無(wú)論是 Kafka 集群還是 Flink 集群中任何節(jié)點(diǎn)發(fā)生故障,每條消息都只能被計(jì)算一次,不能重復(fù)計(jì)算,否則計(jì)算結(jié)果就錯(cuò)了。這里面有一個(gè)很重要的限制條件,就是數(shù)據(jù)必須來(lái)自 Kafka 并且計(jì)算結(jié)果都必須保存到 Kafka中,才可以享受到 Kafka 的 Excactly Once 機(jī)制。

可以看到,Kafka 的 Exactly Once 機(jī)制,是為了解決在“讀數(shù)據(jù) - 計(jì)算 - 保存結(jié)果”這樣的計(jì)算過(guò)程中數(shù)據(jù)不重不丟,而不是我們通常理解的使用消息隊(duì)列進(jìn)行消息生產(chǎn)消費(fèi)過(guò)程中的 Exactly Once。

Kafka的事務(wù)是如何實(shí)現(xiàn)的?

那 Kafka 的事務(wù)又是怎么實(shí)現(xiàn)的呢?它的實(shí)現(xiàn)原理和 RocketMQ 的事務(wù)是差不多的,都是基于兩階段提交來(lái)實(shí)現(xiàn)的,但是實(shí)現(xiàn)的過(guò)程更加復(fù)雜。

首先說(shuō)一下,參與 Kafka 事務(wù)的幾個(gè)角色,或者說(shuō)是模塊。為了解決分布式事務(wù)問(wèn)題,Kafka 引入了事務(wù)協(xié)調(diào)者這個(gè)角色,負(fù)責(zé)在服務(wù)端協(xié)調(diào)整個(gè)事務(wù)。這個(gè)協(xié)調(diào)者并不是一個(gè)獨(dú)立的進(jìn)程,而是 Broker 進(jìn)程的一部分,協(xié)調(diào)者和分區(qū)一樣通過(guò)選舉來(lái)保證自身的可用性。

和 RocketMQ 類(lèi)似,Kafka 集群中也有一個(gè)特殊的用于記錄事務(wù)日志的主題,這個(gè)事務(wù)日志主題的實(shí)現(xiàn)和普通的主題是一樣的,里面記錄的數(shù)據(jù)就是類(lèi)似于“開(kāi)啟事務(wù)”“提交事務(wù)”這樣的事務(wù)日志。日志主題同樣也包含了很多的分區(qū)。在 Kafka 集群中,可以存在多個(gè)協(xié)調(diào)者,每個(gè)協(xié)調(diào)者負(fù)責(zé)管理和使用事務(wù)日志中的幾個(gè)分區(qū)。這樣設(shè)計(jì),其實(shí)就是為了能并行執(zhí)行多個(gè)事務(wù),提升性能。

下面說(shuō)一下 Kafka 事務(wù)的實(shí)現(xiàn)流程。

首先,當(dāng)我們開(kāi)啟事務(wù)的時(shí)候,生產(chǎn)者會(huì)給協(xié)調(diào)者發(fā)一個(gè)請(qǐng)求來(lái)開(kāi)啟事務(wù),協(xié)調(diào)者在事務(wù)日志中記錄下事務(wù) ID。

然后,生產(chǎn)者在發(fā)送消息之前,還要給協(xié)調(diào)者發(fā)送請(qǐng)求,告知發(fā)送的消息屬于哪個(gè)主題和分區(qū),這個(gè)信息也會(huì)被協(xié)調(diào)者記錄在事務(wù)日志中。接下來(lái),生產(chǎn)者就可以像發(fā)送普通消息一樣來(lái)發(fā)送事務(wù)消息,這里和 RocketMQ 不同的是,RocketMQ 選擇把未提交的事務(wù)消息保存在特殊的隊(duì)列中,而 Kafka 在處理未提交的事務(wù)消息時(shí),和普通消息是一樣的,直接發(fā)給 Broker,保存在這些消息對(duì)應(yīng)的分區(qū)中,Kafka 會(huì)在客戶端的消費(fèi)者中,暫時(shí)過(guò)濾未提交的事務(wù)消息。

消息發(fā)送完成后,生產(chǎn)者給協(xié)調(diào)者發(fā)送提交或回滾事務(wù)的請(qǐng)求,由協(xié)調(diào)者來(lái)開(kāi)始兩階段提交,完成事務(wù)。第一階段,協(xié)調(diào)者把事務(wù)的狀態(tài)設(shè)置為“預(yù)提交”,并寫(xiě)入事務(wù)日志。到這里,實(shí)際上事務(wù)已經(jīng)成功了,無(wú)論接下來(lái)發(fā)生什么情況,事務(wù)最終都會(huì)被提交。

之后便開(kāi)始第二階段,協(xié)調(diào)者在事務(wù)相關(guān)的所有分區(qū)中,都會(huì)寫(xiě)一條“事務(wù)結(jié)束”的特殊消息,當(dāng) Kafka 的消費(fèi)者,也就是客戶端,讀到這個(gè)事務(wù)結(jié)束的特殊消息之后,它就可以把之前暫時(shí)過(guò)濾的那些未提交的事務(wù)消息,放行給業(yè)務(wù)代碼進(jìn)行消費(fèi)了。最后,協(xié)調(diào)者記錄最后一條事務(wù)日志,標(biāo)識(shí)這個(gè)事務(wù)已經(jīng)結(jié)束了。

我把整個(gè)事務(wù)的實(shí)現(xiàn)流程,繪制成一個(gè)簡(jiǎn)單的時(shí)序圖放在這里,便于你理解。

總結(jié)一下 Kafka 這個(gè)兩階段的流程,準(zhǔn)備階段,生產(chǎn)者發(fā)消息給協(xié)調(diào)者開(kāi)啟事務(wù),然后消息發(fā)送到每個(gè)分區(qū)上。提交階段,生產(chǎn)者發(fā)消息給協(xié)調(diào)者提交事務(wù),協(xié)調(diào)者給每個(gè)分區(qū)發(fā)一條“事務(wù)結(jié)束”的消息,完成分布式事務(wù)提交。

總結(jié)

本文分別講解了 Kafka 和 RocketMQ 是如何來(lái)實(shí)現(xiàn)事務(wù)的。你可以看到,它們?cè)趯?shí)現(xiàn)事務(wù)過(guò)程中的一些共同的地方,它們都是基于兩階段提交來(lái)實(shí)現(xiàn)的事務(wù),都利用了特殊的主題中的隊(duì)列和分區(qū)來(lái)記錄事務(wù)日志。

不同之處在于對(duì)處于事務(wù)中的消息的處理方式,RocketMQ 是把這些消息暫存在一個(gè)特殊的隊(duì)列中,待事務(wù)提交后再移動(dòng)到業(yè)務(wù)隊(duì)列中;而 Kafka 直接把消息放到對(duì)應(yīng)的業(yè)務(wù)分區(qū)中,配合客戶端過(guò)濾來(lái)暫時(shí)屏蔽進(jìn)行中的事務(wù)消息。

同時(shí)你需要了解,RocketMQ 和 Kafka 的事務(wù),它們的適用場(chǎng)景是不一樣的,RocketMQ的事務(wù)適用于解決本地事務(wù)和發(fā)消息的數(shù)據(jù)一致性問(wèn)題,而 Kafka 的事務(wù)則是用于實(shí)現(xiàn)它的 Exactly Once 機(jī)制,應(yīng)用于實(shí)時(shí)計(jì)算的場(chǎng)景中。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容