什么是rocketmq
RocketMQ 是阿里巴巴開源的消息隊列中間件。具有下列特點:
- 能夠保證嚴(yán)格的消息順序
- 提供豐富的消息拉取模式
- 高效的訂閱者水平擴展能力
- 億級消息堆積能力
- 事務(wù)消息
“嚴(yán)格的消息順序” 是指在需要的情況下,可以使 producer 發(fā)送的消息被 consumer 順序的接收; “豐富的消息拉取模式” 是指可以選擇 pull 或 push 兩種消息消費模式(但是其實都是 consumer 主動從broker 拉取消息);“訂閱者水平擴展能力” 是指可以多個 consumer 同時 subscribe 同一個隊列時,根據(jù) consumer 是否在同一個 consumer group 來決定消息是交給所有 consumer 消費還是選擇某個 consumer 消費,可以實現(xiàn) consumer 側(cè)的負(fù)載均衡;“億級消息堆積能力” 是指 broker 接收到的消息后會將其存在文件中,所以可以做到存儲大量消息,并供不同消費者重復(fù)消費?!笆聞?wù)消息” 是指可以用來實現(xiàn)最終一致性的分布式事務(wù)。
rocketmq的組成部分

上圖是一個典型的 RocketMQ 網(wǎng)絡(luò)拓?fù)鋱D,有以下組成部分:
- producer
- consumer
- Name server
- Broker
Broker 又分為 master 和 slave,master 可以進行消息的讀寫,slave 同步 master 接收的消息,只能用來進行消息的讀取。其中:
(1) producer 為消息的生產(chǎn)者,為了提高寫消息的效率,同時防止單點,可以部署多個 master broker,producer 可以向不同的 broker 寫入數(shù)據(jù)。
(2)consumer 為消息的消費者,有集群模式和廣播模式兩種消費方式,還可以設(shè)置 consumer group。在集群模式下,同一條消息只會被同一個 consumer group 中的一個消費者消費,不同 consumer group 的 consumer 可以消費同一條消息;而廣播模式則是多個 consumer 都會消費到同一條消息。
(3)Name server 用來管理 broker 以及 broker 上的 topic,可以接收 Broker 的注冊、注銷請求,讓 producer 查詢 topic 下的所有 BrokerQueue,put 消息,Consumer 獲取 topic 下所有的 BrokerQueue,get 消息
(4) Broker 又分為 master 和 slave,master 可以進行消息的讀寫,slave 同步 master 接收的消息,只能用來進行消息的讀取。一個 Master 可以有多個 Slave,但一個 Slave 只能對應(yīng)一個 Master,Master 與 Slave 的對應(yīng)關(guān)系通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slaver。Master可以部署多個。每個Broker與Name Server 集群中的所有節(jié)點建立長連接,定時注冊 Topic 信息到所有的 NameServer。
需要注意的是 producer 和 consumer 在生產(chǎn)和消費消息時,都需要指定消息的 topic,當(dāng) topic 匹配時,consumer 才會消費到 producer 發(fā)送的消息,除此之外, producer 在發(fā)送消息時還可以指定消息的 tag,consumer 也可以指定自己關(guān)注哪些 tag,這樣就可以對消息的消費進行更加細(xì)粒度的控制 。
broker 中同一個 topic 又可以分為不同的 queue,consumer 在集群模式下消費時,同一個 topic 下不同的 queue 會被 分配給同一個 consumer group 中不同的 consumer,實現(xiàn)接收端的負(fù)載均衡,同時也為順序消息的實現(xiàn)提供了基礎(chǔ)。

在同一個broker上,所有 topic 的所有 queue 的消息,存放在一個文件里面,并且,為不同的 queue 生成了不同的 ConsumeQueue,這樣, consumer 就可以指定 topic、消息發(fā)送時間等信息,從 ConsumeQueue 中讀取消息在 commit log 中的偏移,然后再去 commit log 中讀取消息:

rocketmq環(huán)境搭建與基本使用
安裝
搭建 RocketMQ 環(huán)境需要下列條件:
- 64bit JDK 1.7+;
- Maven 3.2.x
先從 github 獲取 RocketMQ 的源碼:
git clone https://github.com/apache/incubator-rocketmq.git
然后進入源碼目錄進行編譯:
mvn clean package install -Prelease-all assembly:assembly -U
需要注意的是,在 Mac os x 上,有些測試無法通過,加入 -DskipTests 即可,不影響使用。在 linux 和 windows 上都沒有這個問題。
然后就可以進入 target/apache-rocketmq-all/,準(zhǔn)備運行 name server 和 broker了。
單 broker 測試
運行 name server:
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
如果看到日志中出現(xiàn): The Name Server boot success...,說明 name server 就啟動成功了。
運行 broker:
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
看到 The broker[%s, 192.168.0.133:10911] boot success... 這樣的日志,就算啟動成功了。
然后運行 consumer,代碼如下:
/*
* Instantiate with specified consumer group name.
*/
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("192.168.0.133:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* consumer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Specify where to start in case the specified consumer group is a brand new one.
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
/*
* Subscribe one more more topics to consume. * represent this consumer will consume all sub tags
*/
consumer.subscribe("TopicTest", "*");
/*
* Register callback to execute on arrival of messages fetched from brokers.
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.printf(Thread.currentThread().getName() + " Receive New Messages: " + msgs + "%n");
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
/*
* Launch the consumer instance.
*/
consumer.start();
System.out.printf("Consumer Started.%n");
再運行 producer:
/*
* Instantiate with a producer group name.
*/
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.133:9876");
/*
* Specify name server addresses.
* <p/>
*
* Alternatively, you may specify name server addresses via exporting environmental variable: NAMESRV_ADDR
* <pre>
* {@code
* producer.setNamesrvAddr("name-server1-ip:9876;name-server2-ip:9876");
* }
* </pre>
*/
/*
* Launch the instance.
*/
producer.start();
for (int i = 0; i < 1000; i++) {
try {
/*
* Create a message instance, specifying topic, tag and message body.
*/
Message msg = new Message("TopicTest" /* Topic */,
"TagA" /* Tag */,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
/*
* Call send message to deliver message to one of brokers.
*/
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
/*
* Shut down once the producer instance is not longer in use.
*/
producer.shutdown();
就可以看到 consumer 打印出接收到的消息了:
...
ConsumeMessageThread_16 Receive New Messages: [MessageExt [queueId=0, storeSize=180, queueOffset=749, sysFlag=0, bornTimestamp=1492238283708, bornHost=/192.168.0.103:50436, storeTimestamp=1492238278824, storeHost=/192.168.0.104:10911, msgId=C0A8006800002A9F0000000000091409, commitLogOffset=594953, bodyCRC=801108784, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=750, CONSUME_START_TIME=1492238283710, UNIQ_KEY=C0A80067C1B018B4AAC248A9BDBC03E3, WAIT=true, TAGS=TagA}, body=18]]]
...
RocketMQ 集群
只使用單個 Broker 單個 Name Server 的話,無法保證服務(wù)的高可用,所以一般會選擇啟動多個 NameServer,多個 Master 以及 多個 slave??梢耘渲玫倪x項主要有:(1)接收到消息寫入文件后刷盤是異步還是同步,同步刷盤會導(dǎo)致磁盤 IO 增多從而運行效率下降,同時由于有若干 slave 備份消息,一般不建議使用同步刷盤;(2)master slave 之間復(fù)制消息使用同步還是異步方式,同步方式的情況下 producer 寫入消息后,當(dāng)消息從 master 復(fù)制到 slave 成功后才返回,而異步情況下 master 處理好了消息就直接返回了。在 incubator-rocketmq/target/apache-rocketmq-all/conf 目錄下,有一些示例配置:2m-2s-async、2m-2s-sync、2m-noslave 分別對應(yīng)不同的配置示例,這里就配置 2m-noslave。
有2臺服務(wù)器 192.168.0.133 以及 192.168.0.104,我們先在兩臺服務(wù)器上分別啟動 name server。
然后使用
nohup bash mqbroker -c ../conf/2m-noslave/broker-a.properties -n '192.168.0.133:9876;192.168.0.104:9876' &
nohup bash mqbroker -c ../conf/2m-noslave/broker-b.properties -n '192.168.0.133:9876;192.168.0.104:9876' &
分別啟動不同的 Broker。這里需要注意的是 Broker 的配置項和 org.apache.rocketmq.common.BrokerConfig 類的成員變量一一對應(yīng),如果有定制化的,直接看看 BrokerConfig 中有什么選項就好了。
查看 Name Server 的日志,可以看到兩個 Broker 分別在兩個 Name Server 上注冊成功。在 consumer 和 producer 中,也記得使用下面的代碼來設(shè)置 Name Server:
consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
順序消息
順序消息指消息被消費的順序和 producer 發(fā)送消息的順序嚴(yán)格一致。RocketMQ 要實現(xiàn)順序消息有 2 個要求:
- Producer 保證發(fā)送消息到同一個隊列;
- consumer 保證同一個隊列同時只有一個 consumer 在消費。
具體實現(xiàn)上,Producer 需要使用 MessageQueueSelector 根據(jù)業(yè)務(wù)需求使用某個參數(shù),比如訂單號,將關(guān)聯(lián)的數(shù)據(jù)發(fā)送到同一個隊列去。
Consumer 需要使用 MessageListenerOrderly,它將會定時的向 Broker 申請鎖住某些特定的隊列,Broker 的RebalanceLockManager 里的 ConcurrentHashMap mqLockTable 記錄著隊列與 consumer client 的對應(yīng)關(guān)系,consumer 可以嘗試對隊列加鎖,并獲取自己當(dāng)前持有哪些隊列的鎖:
private final ConcurrentHashMap<String/* group */, ConcurrentHashMap<MessageQueue, LockEntry>> mqLockTable =
new ConcurrentHashMap<String, ConcurrentHashMap<MessageQueue, LockEntry>>(1024);
對于 consumer,除了知道自己持有哪些隊列的鎖,可以對這些隊列進行消費外,還需要保證同一時間只有一個線程會消費同一個隊列,所以在本地維護了一個變量,其類型為:
public class MessageQueueLock {
private ConcurrentHashMap<MessageQueue, Object> mqLockTable =
new ConcurrentHashMap<MessageQueue, Object>();
public Object fetchLockObject(final MessageQueue mq) {
Object objLock = this.mqLockTable.get(mq);
if (null == objLock) {
objLock = new Object();
Object prevLock = this.mqLockTable.putIfAbsent(mq, objLock);
if (prevLock != null) {
objLock = prevLock;
}
}
return objLock;
}
}
對于每一個隊列,都有一個 objLock,在消費時對該 objLock 使用 synchronizd 加鎖,保證同一時間只有一個線程在消費該隊列。
對于每個正在處理中的隊列,用一個 ProcessQueue 維護其狀態(tài),并在內(nèi)部使用一個 TreeMap 記錄所有本地獲取到且未消費的消息,key 為消息的 offset,value 為消息,方便按消息的 offset 獲取消息:
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
為了實現(xiàn)消費失敗時暫停消費,還再讀取消息進行處理時將消息放到一個暫存隊列里:
public List<MessageExt> takeMessags(final int batchSize) {
List<MessageExt> result = new ArrayList<MessageExt>(batchSize);
final long now = System.currentTimeMillis();
try {
this.lockTreeMap.writeLock().lockInterruptibly();
this.lastConsumeTimestamp = now;
try {
if (!this.msgTreeMap.isEmpty()) {
for (int i = 0; i < batchSize; i++) {
Map.Entry<Long, MessageExt> entry = this.msgTreeMap.pollFirstEntry();
if (entry != null) {
result.add(entry.getValue());
msgTreeMapTemp.put(entry.getKey(), entry.getValue());
} else {
break;
}
}
}
if (result.isEmpty()) {
consuming = false;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("take Messages exception", e);
}
return result;
}
這樣,就可以在處理失敗時將消息從 msgTreeMapTemp 放回 msgTreeMap 中,在成功時候增加消息消費的 offset 了:
public void rollback() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
this.msgTreeMap.putAll(this.msgTreeMapTemp);
this.msgTreeMapTemp.clear();
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("rollback exception", e);
}
}
public long commit() {
try {
this.lockTreeMap.writeLock().lockInterruptibly();
try {
Long offset = this.msgTreeMapTemp.lastKey();
msgCount.addAndGet(this.msgTreeMapTemp.size() * (-1));
this.msgTreeMapTemp.clear();
if (offset != null) {
return offset + 1;
}
} finally {
this.lockTreeMap.writeLock().unlock();
}
} catch (InterruptedException e) {
log.error("commit exception", e);
}
return -1;
}
在處理完消息后,會根據(jù)處理結(jié)果進行一些后序動作,包括增加消費的 offset,并更新 offset 到 Broker 等,這樣就不會每次隊列重啟都重新消費之前的數(shù)據(jù)了:
public boolean processConsumeResult(//
final List<MessageExt> msgs, //
final ConsumeOrderlyStatus status, //
final ConsumeOrderlyContext context, //
final ConsumeRequest consumeRequest//
) {
boolean continueConsume = true;
long commitOffset = -1L;
if (context.isAutoCommit()) {
switch (status) {
case COMMIT:
case ROLLBACK:
log.warn("the message queue consume result is illegal, we think you want to ack these message {}",
consumeRequest.getMessageQueue());
case SUCCESS:
commitOffset = consumeRequest.getProcessQueue().commit();
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
} else {
commitOffset = consumeRequest.getProcessQueue().commit();
}
break;
default:
break;
}
} else {
switch (status) {
case SUCCESS:
this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
break;
case COMMIT:
commitOffset = consumeRequest.getProcessQueue().commit();
break;
case ROLLBACK:
consumeRequest.getProcessQueue().rollback();
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
break;
case SUSPEND_CURRENT_QUEUE_A_MOMENT:
this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), msgs.size());
if (checkReconsumeTimes(msgs)) {
consumeRequest.getProcessQueue().makeMessageToCosumeAgain(msgs);
this.submitConsumeRequestLater(//
consumeRequest.getProcessQueue(), //
consumeRequest.getMessageQueue(), //
context.getSuspendCurrentQueueTimeMillis());
continueConsume = false;
}
break;
default:
break;
}
}
if (commitOffset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), commitOffset, false);
}
return continueConsume;
}
分析了這么多,還是上一段代碼來說明一下使用的方法,下面為 producer:
try {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
// producer.setNamesrvAddr("192.168.0.104:9876");
// producer.setNamesrvAddr("192.168.0.133:9876");
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
Random random = new Random();
random.setSeed(System.currentTimeMillis());
for (int i = 0; i < 100; i++) {
int orderId = Math.abs(random.nextInt());
Message msg =
new Message("TopicTestShunxu", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
System.out.println(mqs);
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
e.printStackTrace();
}
}
下面是 consumer:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
consumer.setNamesrvAddr("192.168.0.104:9876;192.168.0.133:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTestShunxu", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
context.setAutoCommit(true);
this.consumeTimes.incrementAndGet();
for (MessageExt msg : msgs) {
System.out.println(msg.getStoreHost() + " " + msg.getQueueId() + " " + new String(msg.getBody(), Charset.forName("UTF-8")));
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
注意,對于 consumer 而言,在暫時無法成功處理消息時,需要返回 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT,這樣就會在一段時間之后重試消費消息。
另外還有一點要注意的是,順序消息不能保證消息只被消費一次:比如當(dāng)某個 consumer 處理完消息但還沒有更新消息 offset 到 broker 時掛了,其他的 consumer 會獲取隊列的鎖,并且重新消費該消息。所以在 consumer 的業(yè)務(wù)邏輯中一定一定要對消息做去重處理,否則要是發(fā)了兩份貨或者轉(zhuǎn)了兩筆錢,你老板可能就會扣你工資了 ??。
事務(wù)消息
所謂的事務(wù)消息,是指將事務(wù)處理+消息發(fā)送結(jié)合起來,保證同時失敗或同時成功。比如從賬戶 A 扣錢,發(fā)了一個消息給賬戶 B 增加一筆錢,那么必須保證扣錢成功就發(fā)出去消息,扣錢失敗不能發(fā)出去消息。這樣做的好處是什么呢?
在單機環(huán)境下,一個轉(zhuǎn)賬操作如下:

但是當(dāng)用戶十分多以后,兩個賬戶可能不在一臺服務(wù)器上,可能需要這樣做:

但是像上圖這樣做,編程會十分復(fù)雜,要考慮到各種異常情況,同時效率也比較低。那么可能會有下面的這種解決方案,將大事務(wù)分解為小事務(wù)+消息,不追求完全的一致性,只需要最終一致就好:

最終一致性這種處理問題的思路我們其實經(jīng)常會用到,一個典型的例子就是調(diào)用通過第三方支付平臺給用戶轉(zhuǎn)賬,我們在調(diào)用其 API 進行請求時,可能會返回成功,可能會返回失敗,也可能返回未知狀態(tài)。如果直接返回了成功或失敗,就可以直接決定調(diào)用失敗或者是調(diào)用成功,減少用戶賬戶余額,但是如果返回未知,則可能需要從用戶賬戶中扣款,然后記錄用戶有一筆轉(zhuǎn)賬在進行中,后續(xù)對該轉(zhuǎn)賬進行處理,查詢其是否成功來決定完成扣款或返還金額到用戶賬戶。這里就用一個轉(zhuǎn)賬記錄實現(xiàn)了最終一致性。
但是這種場景有一個問題:那就是到底什么時發(fā)送消息。如果在事務(wù)完成之前發(fā),那么事務(wù)失敗的話怎么辦?如果在事務(wù)完成之后發(fā),那么消息發(fā)送失敗了怎么辦?當(dāng)然還有一種選擇是在事務(wù)中發(fā)送消息,先不 commit 事務(wù),在消息發(fā)送后根據(jù)消息發(fā)送結(jié)果決定是 commit 還是 rollback,但是這樣又會造成事務(wù)時間過長,可能會造成數(shù)據(jù)庫查詢效率下降。
RocketMQ 解決這個問題的方法是進行兩階段提交,在事務(wù)開始前先發(fā)送一個 prepared 消息,完成事務(wù)后再發(fā)送確認(rèn)消息,之后,consumer 就可以讀取到這個消息進行消費了。但是,這又引入了一個問題,確認(rèn)消息發(fā)送失敗了怎么辦?RocketMQ 是這么做的:在收到 prepared 消息而未收到確認(rèn)消息的情況下,每隔一段時間向消息發(fā)送端( producer )確認(rèn),事務(wù)是否執(zhí)行成功。這樣就能保證消息發(fā)送與本地事務(wù)同時成功或同時失敗。
所以,使用事務(wù)消息要提供兩種 callback:
- 執(zhí)行事務(wù)的 callback,在執(zhí)行完事務(wù)后根據(jù)執(zhí)行結(jié)果發(fā)送確認(rèn)消息;
- RocketMQ 查詢事務(wù)結(jié)果的 callback,在這個 callback 里查詢事務(wù)執(zhí)行的結(jié)果。
下面,就來一個簡單的例子:
//執(zhí)行事務(wù)的 callback
public class TransactionExecuterImpl implements LocalTransactionExecuter {
private AtomicInteger transactionIndex = new AtomicInteger(1);
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
int value = transactionIndex.getAndIncrement();
System.out.println("execute local transaction " + msg.toString());
if (value == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
//檢查事務(wù)完成情況的 callback 比如可以在 msg 中帶上 訂單號,查詢訂單是否支付成功
public class TransactionCheckListenerImpl implements TransactionCheckListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
System.out.printf("server checking TrMsg " + msg.toString() + "%n");
int value = transactionIndex.getAndIncrement();
if ((value % 6) == 0) {
throw new RuntimeException("Could not find db");
} else if ((value % 5) == 0) {
return LocalTransactionState.ROLLBACK_MESSAGE;
} else if ((value % 4) == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
下面是 producer:
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("trans_group");
producer.setNamesrvAddr("192.168.0.133:9876");
producer.setCheckThreadPoolMinSize(2);
producer.setCheckThreadPoolMaxSize(2);
producer.setCheckRequestHoldMax(2000);
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
for (int i = 0; i < 100; i++) {
try {
Message msg =
new Message("topicTrans", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
以上。