rocketmq 使用學(xué)習(xí)

什么是rocketmq


RocketMQ 是阿里巴巴開源的消息隊列中間件。具有下列特點:

  • 能夠保證嚴(yán)格的消息順序
  • 提供豐富的消息拉取模式
  • 高效的訂閱者水平擴展能力
  • 億級消息堆積能力
  • 事務(wù)消息

“嚴(yán)格的消息順序” 是指在需要的情況下,可以使 producer 發(fā)送的消息被 consumer 順序的接收; “豐富的消息拉取模式” 是指可以選擇 pull 或 push 兩種消息消費模式(但是其實都是 consumer 主動從broker 拉取消息);“訂閱者水平擴展能力” 是指可以多個 consumer 同時 subscribe 同一個隊列時,根據(jù) consumer 是否在同一個 consumer group 來決定消息是交給所有 consumer 消費還是選擇某個 consumer 消費,可以實現(xiàn) consumer 側(cè)的負(fù)載均衡;“億級消息堆積能力” 是指 broker 接收到的消息后會將其存在文件中,所以可以做到存儲大量消息,并供不同消費者重復(fù)消費?!笆聞?wù)消息” 是指可以用來實現(xiàn)最終一致性的分布式事務(wù)。

rocketmq的組成部分


RocketMQ 集群網(wǎng)絡(luò)拓?fù)鋱D

上圖是一個典型的 RocketMQ 網(wǎng)絡(luò)拓?fù)鋱D,有以下組成部分:

  • producer
  • consumer
  • Name server
  • Broker

Broker 又分為 master 和 slave,master 可以進行消息的讀寫,slave 同步 master 接收的消息,只能用來進行消息的讀取。其中:

(1) producer 為消息的生產(chǎn)者,為了提高寫消息的效率,同時防止單點,可以部署多個 master broker,producer 可以向不同的 broker 寫入數(shù)據(jù)。
(2)consumer 為消息的消費者,有集群模式和廣播模式兩種消費方式,還可以設(shè)置 consumer group。在集群模式下,同一條消息只會被同一個 consumer group 中的一個消費者消費,不同 consumer group 的 consumer 可以消費同一條消息;而廣播模式則是多個 consumer 都會消費到同一條消息。
(3)Name server 用來管理 broker 以及 broker 上的 topic,可以接收 Broker 的注冊、注銷請求,讓 producer 查詢 topic 下的所有 BrokerQueue,put 消息,Consumer 獲取 topic 下所有的 BrokerQueue,get 消息
(4) Broker 又分為 master 和 slave,master 可以進行消息的讀寫,slave 同步 master 接收的消息,只能用來進行消息的讀取。一個 Master 可以有多個 Slave,但一個 Slave 只能對應(yīng)一個 Master,Master 與 Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與Name Server 集群中的所有節(jié)點建立長連接,定時注冊 Topic 信息到所有的 NameServer。

需要注意的是 producer 和 consumer 在生產(chǎn)和消費消息時,都需要指定消息的 topic,當(dāng) topic 匹配時,consumer 才會消費到 producer 發(fā)送的消息,除此之外, producer 在發(fā)送消息時還可以指定消息的 tag,consumer 也可以指定自己關(guān)注哪些 tag,這樣就可以對消息的消費進行更加細(xì)粒度的控制 。

broker 中同一個 topic 又可以分為不同的 queue,consumer 在集群模式下消費時,同一個 topic 下不同的 queue 會被 分配給同一個 consumer group 中不同的 consumer,實現(xiàn)接收端的負(fù)載均衡,同時也為順序消息的實現(xiàn)提供了基礎(chǔ)。

在同一個broker上,所有 topic 的所有 queue 的消息,存放在一個文件里面,并且,為不同的 queue 生成了不同的 ConsumeQueue,這樣, consumer 就可以指定 topic、消息發(fā)送時間等信息,從 ConsumeQueue 中讀取消息在 commit log 中的偏移,然后再去 commit log 中讀取消息:

Broker 消息的存儲

rocketmq環(huán)境搭建與基本使用


安裝

搭建 RocketMQ 環(huán)境需要下列條件:

  • 64bit JDK 1.7+;
  • Maven 3.2.x

先從 github 獲取 RocketMQ 的源碼:

git clone https://github.com/apache/incubator-rocketmq.git

然后進入源碼目錄進行編譯:

 mvn clean package install -Prelease-all assembly:assembly -U

需要注意的是,在 Mac os x 上,有些測試無法通過,加入 -DskipTests 即可,不影響使用。在 linux 和 windows 上都沒有這個問題。

然后就可以進入 target/apache-rocketmq-all/,準(zhǔn)備運行 name server 和 broker了。

單 broker 測試

運行 name server:

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

如果看到日志中出現(xiàn): The Name Server boot success...,說明 name server 就啟動成功了。

運行 broker:

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log 

看到 The broker[%s, 192.168.0.133:10911] boot success... 這樣的日志,就算啟動成功了。

然后運行 consumer,代碼如下:

        /*
         * Instantiate with specified consumer group name.
         */
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        consumer.setNamesrvAddr("192.168.0.133:9876");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Specify where to start in case the specified consumer group is a brand new one.
         */
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        /*
         * Subscribe one more more topics to consume. * represent this consumer will consume all sub tags
         */
        consumer.subscribe("TopicTest", "*");

        /*
         *  Register callback to execute on arrival of messages fetched from brokers.
         */
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        /*
         *  Launch the consumer instance.
         */
        consumer.start();

        System.out.printf("Consumer Started.%n");

再運行 producer:


        /*
         * Instantiate with a producer group name.
         */
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        producer.setNamesrvAddr("192.168.0.133:9876");

        /*
         * Specify name server addresses.
         * <p/>
         *
         * Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
         * <pre>
         * {@code
         * producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
         * }
         * </pre>
         */

        /*
         * Launch the instance.
         */
        producer.start();

        for (int i = 0; i < 1000; i++) {
            try {

                /*
                 * Create a message instance, specifying topic, tag and message body.
                 */
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );

                /*
                 * Call send message to deliver message to one of brokers.
                 */
                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }

        /*
         * Shut down once the producer instance is not longer in use.
         */
        producer.shutdown();

就可以看到 consumer 打印出接收到的消息了:

                ...
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=749, sysFlag=0, bornTimestamp=1492238283708, bornHost=/192.168.0.103:50436, storeTimestamp=1492238278824, storeHost=/192.168.0.104:10911, msgId=C0A8006800002A9F0000000000091409, commitLogOffset=594953, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1492238283710, UNIQ_KEY=C0A80067C1B018B4AAC248A9BDBC03E3, WAIT=true, TAGS=TagA}, body=18]]]
                ...

RocketMQ 集群

只使用單個 Broker 單個 Name Server 的話,無法保證服務(wù)的高可用,所以一般會選擇啟動多個 NameServer,多個 Master 以及 多個 slave??梢耘渲玫倪x項主要有:(1)接收到消息寫入文件后刷盤是異步還是同步,同步刷盤會導(dǎo)致磁盤 IO 增多從而運行效率下降,同時由于有若干 slave 備份消息,一般不建議使用同步刷盤;(2)master slave 之間復(fù)制消息使用同步還是異步方式,同步方式的情況下 producer 寫入消息后,當(dāng)消息從 master 復(fù)制到 slave 成功后才返回,而異步情況下 master 處理好了消息就直接返回了。在 incubator-rocketmq/target/apache-rocketmq-all/conf 目錄下,有一些示例配置:2m-2s-async、2m-2s-sync、2m-noslave 分別對應(yīng)不同的配置示例,這里就配置 2m-noslave。

有2臺服務(wù)器 192.168.0.133 以及 192.168.0.104,我們先在兩臺服務(wù)器上分別啟動 name server。

然后使用

nohup bash mqbroker -c ../conf/2m-noslave/broker-a.properties -n '192.168.0.133:9876;192.168.0.104:9876' &
nohup bash mqbroker -c ../conf/2m-noslave/broker-b.properties -n '192.168.0.133:9876;192.168.0.104:9876' &

分別啟動不同的 Broker。這里需要注意的是 Broker 的配置項和 org.apache.rocketmq.common.BrokerConfig 類的成員變量一一對應(yīng),如果有定制化的,直接看看 BrokerConfig 中有什么選項就好了。

查看 Name Server 的日志,可以看到兩個 Broker 分別在兩個 Name Server 上注冊成功。在 consumer 和 producer 中,也記得使用下面的代碼來設(shè)置 Name Server:

consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");

順序消息


順序消息指消息被消費的順序和 producer 發(fā)送消息的順序嚴(yán)格一致。RocketMQ 要實現(xiàn)順序消息有 2 個要求:

  1. Producer 保證發(fā)送消息到同一個隊列;
  2. consumer 保證同一個隊列同時只有一個 consumer 在消費。

具體實現(xiàn)上,Producer 需要使用 MessageQueueSelector 根據(jù)業(yè)務(wù)需求使用某個參數(shù),比如訂單號,將關(guān)聯(lián)的數(shù)據(jù)發(fā)送到同一個隊列去。

Consumer 需要使用 MessageListenerOrderly,它將會定時的向 Broker 申請鎖住某些特定的隊列,Broker 的RebalanceLockManager 里的 ConcurrentHashMap mqLockTable 記錄著隊列與 consumer client 的對應(yīng)關(guān)系,consumer 可以嘗試對隊列加鎖,并獲取自己當(dāng)前持有哪些隊列的鎖:

   private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
        new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);

對于 consumer,除了知道自己持有哪些隊列的鎖,可以對這些隊列進行消費外,還需要保證同一時間只有一個線程會消費同一個隊列,所以在本地維護了一個變量,其類型為:

public class MessageQueueLock {
    private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
        new ConcurrentHashMap<MessageQueue, Object>();

    public Object fetchLockObject(final MessageQueue mq) {
        Object objLock = this.mqLockTable.get(mq);
        if (null == objLock) {
            objLock = new Object();
            Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
            if (prevLock != null) {
                objLock = prevLock;
            }
        }

        return objLock;
    }
}

對于每一個隊列,都有一個 objLock,在消費時對該 objLock 使用 synchronizd 加鎖,保證同一時間只有一個線程在消費該隊列。

對于每個正在處理中的隊列,用一個 ProcessQueue 維護其狀態(tài),并在內(nèi)部使用一個 TreeMap 記錄所有本地獲取到且未消費的消息,key 為消息的 offset,value 為消息,方便按消息的 offset 獲取消息:

    private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();

為了實現(xiàn)消費失敗時暫停消費,還再讀取消息進行處理時將消息放到一個暫存隊列里:

  public List<MessageExt> takeMessags(final int batchSize) {
        List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
        final long now = System.currentTimeMillis();
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            this.lastConsumeTimestamp = now;
            try {
                if (!this.msgTreeMap.isEmpty()) {
                    for (int i = 0; i < batchSize; i++) {
                        Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
                        if (entry != null) {
                            result.add(entry.getValue());
                            msgTreeMapTemp.put(entry.getKey(), entry.getValue());
                        } else {
                            break;
                        }
                    }
                }

                if (result.isEmpty()) {
                    consuming = false;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("take Messages exception", e);
        }

        return result;
    }

這樣,就可以在處理失敗時將消息從 msgTreeMapTemp 放回 msgTreeMap 中,在成功時候增加消息消費的 offset 了:

    public void rollback() {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                this.msgTreeMap.putAll(this.msgTreeMapTemp);
                this.msgTreeMapTemp.clear();
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("rollback exception", e);
        }
    }

  public long commit() {
        try {
            this.lockTreeMap.writeLock().lockInterruptibly();
            try {
                Long offset = this.msgTreeMapTemp.lastKey();
                msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
                this.msgTreeMapTemp.clear();
                if (offset != null) {
                    return offset + 1;
                }
            } finally {
                this.lockTreeMap.writeLock().unlock();
            }
        } catch (InterruptedException e) {
            log.error("commit exception", e);
        }

        return -1;
    }

在處理完消息后,會根據(jù)處理結(jié)果進行一些后序動作,包括增加消費的 offset,并更新 offset 到 Broker 等,這樣就不會每次隊列重啟都重新消費之前的數(shù)據(jù)了:

    public boolean processConsumeResult(//
        final List<MessageExt> msgs, //
        final ConsumeOrderlyStatus status, //
        final ConsumeOrderlyContext context, //
        final ConsumeRequest consumeRequest//
    ) {
        boolean continueConsume = true;
        long commitOffset = -1L;
        if (context.isAutoCommit()) {
            switch (status) {
                case COMMIT:
                case ROLLBACK:
                    log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
                        consumeRequest.getMessageQueue());
                case SUCCESS:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(//
                            consumeRequest.getProcessQueue(), //
                            consumeRequest.getMessageQueue(), //
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    } else {
                        commitOffset = consumeRequest.getProcessQueue().commit();
                    }
                    break;
                default:
                    break;
            }
        } else {
            switch (status) {
                case SUCCESS:
                    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    break;
                case COMMIT:
                    commitOffset = consumeRequest.getProcessQueue().commit();
                    break;
                case ROLLBACK:
                    consumeRequest.getProcessQueue().rollback();
                    this.submitConsumeRequestLater(//
                        consumeRequest.getProcessQueue(), //
                        consumeRequest.getMessageQueue(), //
                        context.getSuspendCurrentQueueTimeMillis());
                    continueConsume = false;
                    break;
                case SUSPEND_CURRENT_QUEUE_A_MOMENT:
                    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
                    if (checkReconsumeTimes(msgs)) {
                        consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
                        this.submitConsumeRequestLater(//
                            consumeRequest.getProcessQueue(), //
                            consumeRequest.getMessageQueue(), //
                            context.getSuspendCurrentQueueTimeMillis());
                        continueConsume = false;
                    }
                    break;
                default:
                    break;
            }
        }

        if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
            this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
        }

        return continueConsume;
    }

分析了這么多,還是上一段代碼來說明一下使用的方法,下面為 producer:

       try {
            DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
            producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
//            producer.setNamesrvAddr("192.168.0.104:9876");
//            producer.setNamesrvAddr("192.168.0.133:9876");
            producer.start();

            String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
            Random random = new Random();
            random.setSeed(System.currentTimeMillis());
            for (int i = 0; i < 100; i++) {
                int orderId = Math.abs(random.nextInt());
                Message msg =
                    new Message("TopicTestShunxu", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        System.out.println(mqs);
                        Integer id = (Integer) arg;
                        int index = id % mqs.size();
                        return mqs.get(index);
                    }
                }, orderId);

                System.out.printf("%s%n", sendResult);
            }

            producer.shutdown();
        } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
            e.printStackTrace();
        }
    }

下面是 consumer:

  DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");

        consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");

        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("TopicTestShunxu", "TagA || TagC || TagD");

        consumer.registerMessageListener(new MessageListenerOrderly() {
            AtomicLong consumeTimes = new AtomicLong(0);

            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                context.setAutoCommit(true);
                this.consumeTimes.incrementAndGet();
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getStoreHost() + " " + msg.getQueueId() + " " + new String(msg.getBody(), Charset.forName("UTF-8")));
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.printf("Consumer Started.%n");

注意,對于 consumer 而言,在暫時無法成功處理消息時,需要返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,這樣就會在一段時間之后重試消費消息。

另外還有一點要注意的是,順序消息不能保證消息只被消費一次:比如當(dāng)某個 consumer 處理完消息但還沒有更新消息 offset 到 broker 時掛了,其他的 consumer 會獲取隊列的鎖,并且重新消費該消息。所以在 consumer 的業(yè)務(wù)邏輯中一定一定要對消息做去重處理,否則要是發(fā)了兩份貨或者轉(zhuǎn)了兩筆錢,你老板可能就會扣你工資了 ??。

事務(wù)消息


所謂的事務(wù)消息,是指將事務(wù)處理+消息發(fā)送結(jié)合起來,保證同時失敗或同時成功。比如從賬戶 A 扣錢,發(fā)了一個消息給賬戶 B 增加一筆錢,那么必須保證扣錢成功就發(fā)出去消息,扣錢失敗不能發(fā)出去消息。這樣做的好處是什么呢?

在單機環(huán)境下,一個轉(zhuǎn)賬操作如下:

單機事務(wù)

但是當(dāng)用戶十分多以后,兩個賬戶可能不在一臺服務(wù)器上,可能需要這樣做:

集群環(huán)境下的轉(zhuǎn)賬

但是像上圖這樣做,編程會十分復(fù)雜,要考慮到各種異常情況,同時效率也比較低。那么可能會有下面的這種解決方案,將大事務(wù)分解為小事務(wù)+消息,不追求完全的一致性,只需要最終一致就好:

大事務(wù)分解為小事務(wù)

最終一致性這種處理問題的思路我們其實經(jīng)常會用到,一個典型的例子就是調(diào)用通過第三方支付平臺給用戶轉(zhuǎn)賬,我們在調(diào)用其 API 進行請求時,可能會返回成功,可能會返回失敗,也可能返回未知狀態(tài)。如果直接返回了成功或失敗,就可以直接決定調(diào)用失敗或者是調(diào)用成功,減少用戶賬戶余額,但是如果返回未知,則可能需要從用戶賬戶中扣款,然后記錄用戶有一筆轉(zhuǎn)賬在進行中,后續(xù)對該轉(zhuǎn)賬進行處理,查詢其是否成功來決定完成扣款或返還金額到用戶賬戶。這里就用一個轉(zhuǎn)賬記錄實現(xiàn)了最終一致性。

但是這種場景有一個問題:那就是到底什么時發(fā)送消息。如果在事務(wù)完成之前發(fā),那么事務(wù)失敗的話怎么辦?如果在事務(wù)完成之后發(fā),那么消息發(fā)送失敗了怎么辦?當(dāng)然還有一種選擇是在事務(wù)中發(fā)送消息,先不 commit 事務(wù),在消息發(fā)送后根據(jù)消息發(fā)送結(jié)果決定是 commit 還是 rollback,但是這樣又會造成事務(wù)時間過長,可能會造成數(shù)據(jù)庫查詢效率下降。

RocketMQ 解決這個問題的方法是進行兩階段提交,在事務(wù)開始前先發(fā)送一個 prepared 消息,完成事務(wù)后再發(fā)送確認(rèn)消息,之后,consumer 就可以讀取到這個消息進行消費了。但是,這又引入了一個問題,確認(rèn)消息發(fā)送失敗了怎么辦?RocketMQ 是這么做的:在收到 prepared 消息而未收到確認(rèn)消息的情況下,每隔一段時間向消息發(fā)送端( producer )確認(rèn),事務(wù)是否執(zhí)行成功。這樣就能保證消息發(fā)送與本地事務(wù)同時成功或同時失敗。

所以,使用事務(wù)消息要提供兩種 callback:

  1. 執(zhí)行事務(wù)的 callback,在執(zhí)行完事務(wù)后根據(jù)執(zhí)行結(jié)果發(fā)送確認(rèn)消息;
  2. RocketMQ 查詢事務(wù)結(jié)果的 callback,在這個 callback 里查詢事務(wù)執(zhí)行的結(jié)果。

下面,就來一個簡單的例子:

//執(zhí)行事務(wù)的 callback 
public class TransactionExecuterImpl implements LocalTransactionExecuter {
    private AtomicInteger transactionIndex = new AtomicInteger(1);

    @Override
    public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
        int value = transactionIndex.getAndIncrement();
        System.out.println("execute local transaction " + msg.toString());

        if (value == 0) {
            throw new RuntimeException("Could not find db");
        } else if ((value % 5) == 0) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if ((value % 4) == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;
    }
}
//檢查事務(wù)完成情況的 callback 比如可以在 msg 中帶上 訂單號,查詢訂單是否支付成功
public class TransactionCheckListenerImpl implements TransactionCheckListener {
    private AtomicInteger transactionIndex = new AtomicInteger(0);

    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
        System.out.printf("server checking TrMsg " + msg.toString() + "%n");

        int value = transactionIndex.getAndIncrement();
        if ((value % 6) == 0) {
            throw new RuntimeException("Could not find db");
        } else if ((value % 5) == 0) {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        } else if ((value % 4) == 0) {
            return LocalTransactionState.COMMIT_MESSAGE;
        }

        return LocalTransactionState.UNKNOW;
    }
}

下面是 producer:

        TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
        TransactionMQProducer producer = new TransactionMQProducer("trans_group");
        producer.setNamesrvAddr("192.168.0.133:9876");
        producer.setCheckThreadPoolMinSize(2);
        producer.setCheckThreadPoolMaxSize(2);
        producer.setCheckRequestHoldMax(2000);
        producer.setTransactionCheckListener(transactionCheckListener);
        producer.start();

        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
        for (int i = 0; i < 100; i++) {
            try {
                Message msg =
                    new Message("topicTrans", tags[i % tags.length], "KEY" + i,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
                System.out.printf("%s%n", sendResult);

                Thread.sleep(10);
            } catch (MQClientException | UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }

        for (int i = 0; i < 100000; i++) {
            Thread.sleep(1000);
        }
        producer.shutdown();

以上。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 分布式開放消息系統(tǒng)(RocketMQ)的原理與實踐 來源:http://www.itdecent.cn/p/453...
    meng_philip123閱讀 13,225評論 6 104
  • 消息中間件需要解決哪些問題? Publish/Subscribe 發(fā)布訂閱是消息中間件的最基本功能,也是相對于傳統(tǒng)...
    壹點零閱讀 1,712評論 0 7
  • 簡介 近年來,淘寶天貓“雙十一”活動影響了整個中國互聯(lián)網(wǎng)電商的發(fā)展,在“雙十一”的背后,有一系列開放平臺技術(shù)的使用...
    MisterCH閱讀 2,716評論 1 7
  • 錦溪花園酒店坐落于江蘇省昆山市錦溪古鎮(zhèn),臨近淀山湖、周莊,周邊景色優(yōu)美 此民宿有三套風(fēng)格,一套是以上中式高檔風(fēng)格,...
    美幻堂閱讀 204評論 0 0

友情鏈接更多精彩內(nèi)容