《RocketMQ實戰(zhàn)與原理解析》
DefaultMQProducer 的使用
發(fā)送消息要經(jīng)過五個步驟:
1)設(shè)置 Producer 的 GroupName。
2)設(shè)置 InstanceName,當一個 Jvm 需要啟動多個 Producer 的時候, 通過設(shè)置不同的 InstanceName 來區(qū)分,不設(shè)置的話系統(tǒng)使用默認名稱“DEFAULT”。
3)設(shè)置發(fā)送失敗重試次數(shù),當網(wǎng)絡(luò)出現(xiàn)異常的時候,這個次數(shù)影響消息的重復投遞次數(shù)。想保證不丟消息,可以設(shè)置多重試幾次。
4)設(shè)置 NameServer 地址。
5)組裝消息并發(fā)送。消息的發(fā)送有同步和異步兩種方式,上面的代碼使用的是異步方式。
在第 2 章的例子中用的是同步方式。消息發(fā)送的返回狀態(tài)有如下四種: FLUSH_DISK_TIMEOUT、FLUSH_SLAVE_TIMEOUT、SLAVE_NOT_AVAILABLE、SEND_OK,不同狀態(tài)在不同的刷盤策略和同步策略的配置下含義是不同的。
FLUSH_ DISK_TIMEOUT:表示沒有在規(guī)定時間內(nèi)完成刷盤(需要 Broker 的刷盤策略被設(shè)置成 SYNC_FLUSH 才會報這個錯誤)。
FLUSH_SLAVE_TIMEOUT:表示在主備方式下,并且 Broker 被設(shè)置成 SYNC_MASTER 方式,沒有在設(shè)定時間內(nèi)完成主從同步。
SLAVE_NOT_AVAILABLE:這個狀態(tài)產(chǎn)生的場景和 FLUSH_SLAVE_TIMEOUT 類似,表示在主備方式下,并且 Broker 被設(shè)置成 SYNC_MASTER,但是沒有找到被配置成 Slave 的 Broker。
SEND_OK:表示發(fā)送成功,發(fā)送成功的具體含義,比如消息是否已經(jīng)被存儲到磁盤?
消息是否被同步到了 Slave 上?消息在 Slave 上是否被寫入磁盤? 需要結(jié)合所配置的刷盤策略、主從策略來定。這個狀態(tài)還可以簡單理解為,沒有發(fā)生上面列出的三個問題狀態(tài)就是 SEND_OK。寫一個高質(zhì)量的生產(chǎn)者程序,重點在于對發(fā)送結(jié)果的處理,要充分考慮各種異常,寫清對應(yīng)的處理邏輯。
Demo
public class DefaultMQProducerDemo {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setInstanceName("instance1");
producer.setRetryTimesWhenSendFailed(3);
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10; i++) {
try {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s %n", sendResult);
sendResult.getSendStatus();
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}
TransactionMQProducer 發(fā)送事務(wù)消息
RocketMQ 的事務(wù)消息,是指發(fā)送消息事件和其他事件需要同時成功或同時失敗。比如銀行轉(zhuǎn)賬,A銀行的某賬戶要轉(zhuǎn)一萬元到B銀行的某賬戶。A銀行發(fā)送“B銀行賬戶增加一萬元”這個消息,要和“從A銀行賬戶扣除一萬元”這個操作同時成功或者同時失敗。
RocketMQ 采用兩階段提交的方式實現(xiàn)事務(wù)消息,TransactionMQProducer 處理上面情況的流程是,先發(fā)一個“準備從B銀行賬戶增加一萬元”的消息,發(fā)送成功后做從A銀行賬戶扣除一萬元的操作,根據(jù)操作結(jié)果是否成功,確定之前的“準備從B銀行賬戶增加一萬元” 的消息是做 commit 還是 rollback,具體流程如下:
1)發(fā)送方向 RocketMQ 發(fā)送“待確認”消息。
2)RocketMQ 將收到的“待確認”消息持久化成功后,向發(fā)送方回復消息已經(jīng)發(fā)送成功,此時第一階段消息發(fā)送完成。
3)發(fā)送方開始執(zhí)行本地事件邏輯。
4)發(fā)送方根據(jù)本地事件執(zhí)行結(jié)果向 RocketMQ 發(fā)送二次確認(Commit 或是 Rollback)消息,
RocketMQ 收到 Commit 狀態(tài)則將第一階段消息標記為可投遞,閱方將能夠收到該消息;收到 Rollback 狀態(tài)則刪除第一階段的消息,訂閱方接收不到該消息。
5)如果出現(xiàn)異常情況,步驟 4)提交的二次確認最終未到達 RocketMQ,服務(wù)器在經(jīng)過固定時間段后將對“待確認”消息發(fā)起回查請求。
6)發(fā)送方收到消息回查請求后(如果發(fā)送一階段消息的 Producer 不能工作,回查請求將被發(fā)送到和 Producer 在同一個 Group 里的其他 Producer),
通過檢查對應(yīng)消息的本地事件執(zhí)行結(jié)果返回 Commit 或 Roolback 狀態(tài)。
7)RocketMQ 收到回查請求后,按照步驟 4)的邏輯處理。
上面的邏輯似乎很好地實現(xiàn)了事務(wù)消息功能,它也 RocketMQ 之前的版本實現(xiàn)事務(wù)消息的邏輯。但是因為 RocketMQ 依賴將數(shù)據(jù)順序?qū)懙酱疟P這個特征來提高性能,步驟 4)卻需要更改第一階段消息的狀態(tài),這樣會造成磁盤 Catch 的臟頁過多,降低系統(tǒng)的性能。所以 RocketMQ 在 4.x 的版本中將這部分功能去除。系統(tǒng)中的一些上層 Class 都還在,用戶可以根據(jù)實際需求實現(xiàn)自己的事務(wù)功能。
客戶端有三個類來支持用戶實現(xiàn)事務(wù)消息,
第一個類是 LocalTransactionExecuter,用來實例化步驟 3)的邏輯,根據(jù)情況返回 LocalTransactionState.ROLLBACK_MESSAGE 或者 LocalTransactionState.COMMIT_MESSAGE 狀態(tài)。
第二個類是 TransactionMQProducer,它的用法和 DefaultMQProducer 類似,要通過它啟動一個 Producer 并發(fā)消息,但是比 DefaultMQProducer 多設(shè)置本地事務(wù)處理函數(shù)和回查狀態(tài)函數(shù)。
第三個類是 TransactionCheckListener,實現(xiàn)步驟 5)中 MQ 服務(wù)器的回查請求,返回 LocalTransactionState.ROLLBACK_ MESSAGE 或者 LocalTransactionState.COMMIT_ MESSAGE。
Demo
消息體
@Data
@AllArgsConstructor
public class UserTransferMessage {
/**
* 流水id
*/
private Long flowId;
/**
* 轉(zhuǎn)賬發(fā)起人用戶
*/
private String fromUser;
/**
* 轉(zhuǎn)賬接收人用戶
*/
private String toUser;
/**
* 轉(zhuǎn)賬金額
*/
private Long amount;
}
rocketmq-client 3.* 版本
實現(xiàn)LocalTransactionExecuter
@Slf4j
public class TransactionExecuterImpl implements LocalTransactionExecuter {
public static ConcurrentHashMap<Long, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) {
String transactionMqMessageJson = new String(msg.getBody());
log.info("接收到事務(wù)消息 transactionMqMessageJson = {}", transactionMqMessageJson);
UserTransferMessage userTransferMessage = GsonUtil.GSON.fromJson(transactionMqMessageJson, UserTransferMessage.class);
log.info("開始執(zhí)行本地事務(wù)");
Long transactionMQMessageId = userTransferMessage.getFlowId();
/*
隨機設(shè)置事務(wù)執(zhí)行結(jié)果
transactionMQMessageId % 2 == 0 , 執(zhí)行完成, 成功
transactionMQMessageId % 2 == 1 , 執(zhí)行完成, 失敗
transactionMQMessageId % 2 == 2 , 未知
*/
localTrans.put(userTransferMessage.getFlowId(), (int) (transactionMQMessageId % 3));
int result = localTrans.get(userTransferMessage.getFlowId());
LocalTransactionState state;
if (result == 0) {
state = LocalTransactionState.COMMIT_MESSAGE;
} else if (result == 1) {
state = LocalTransactionState.ROLLBACK_MESSAGE;
} else {
state = LocalTransactionState.UNKNOW;
}
log.info("執(zhí)行本地事務(wù)完成 state = {}", state);
return state;
}
}
實現(xiàn)TransactionCheckListener
@Slf4j
public class TransactionCheckListenerImpl implements TransactionCheckListener {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
String transactionMqMessageJson = new String(messageExt.getBody());
log.info("校驗本地事務(wù)的結(jié)果, transactionMqMessageJson = {}", transactionMqMessageJson);
/*
隨機設(shè)置事務(wù)執(zhí)行結(jié)果
randomInt % 2 == 0 , 執(zhí)行完成, 成功
randomInt % 2 == 1 , 執(zhí)行完成, 失敗
randomInt % 2 == 2 , 未知
*/
long randomInt = RandomUtil.RANDOM.nextInt(Integer.MAX_VALUE) % 3;
log.info("TransactionCheckListenerImpl, randomInt = {}", randomInt);
UserTransferMessage userTransferMessage = GsonUtil.GSON.fromJson(transactionMqMessageJson, UserTransferMessage.class);
TransactionExecuterImpl.localTrans.put(userTransferMessage.getFlowId(), (int) randomInt);
if (randomInt == 0) {
return LocalTransactionState.COMMIT_MESSAGE;
} else if (randomInt == 1) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
return LocalTransactionState.UNKNOW;
}
}
利用TransactionMQProducer發(fā)送消息
@Slf4j
public class TransactionMQProducerDemo {
public static final String TOPIC_NAME = "TransactionMQProducerTopic";
public static void main(String[] args) throws MQClientException, InterruptedException {
//初始化transactionCheckListener
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
TransactionExecuterImpl transcationExuctor = new TransactionExecuterImpl();
//創(chuàng)建事務(wù)TransactionMQProducer
TransactionMQProducer producer = new TransactionMQProducer("TransactionMQProducerGroup");
producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
//設(shè)置事務(wù)監(jiān)聽
producer.setTransactionCheckListener(transactionCheckListener);
producer.start();
for (int i = 0; i < 1; i++) {
try {
long flowId = RandomUtil.RANDOM.nextInt(Integer.MAX_VALUE);
long amount = RandomUtil.RANDOM.nextInt(Integer.MAX_VALUE);
UserTransferMessage userTransferMessage = new UserTransferMessage(flowId, "A", "B", amount);
Message msg = new Message(TOPIC_NAME, "TagA", "KEY" + i,
GsonUtil.GSON.toJson(userTransferMessage).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, transcationExuctor, null);
log.info("消息發(fā)送完成 sendResult = {}", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
Thread.currentThread().join();
producer.shutdown();
}
}
rocketmq-client 4.* 版本
@Slf4j
public class TransactionListenerImpl implements TransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
log.info("msgBody = {}", new String(msg.getBody()));
int value = transactionIndex.getAndIncrement();
int status = value % 3;
log.info("status = {}", status);
localTrans.put(msg.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
log.info("msgBody = {}", new String(msg.getBody()));
Integer status = localTrans.get(msg.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
}
@Slf4j
public class TransactionProducer {
public static final String TOPIC_NAME = "TransactionMQProducerTopic_v4";
public static void main(String[] args) throws MQClientException, InterruptedException {
TransactionListener transactionListener = new TransactionListenerImpl();
TransactionMQProducer producer = new TransactionMQProducer("TransactionMQProducerGroupv4");
ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2000), new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setNamesrvAddr(MqConfig.NAMESRV_ADDR);
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg =
new Message(TOPIC_NAME, tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
log.info("sendResult = {}", sendResult);
Thread.sleep(10);
} catch (MQClientException | UnsupportedEncodingException e) {
e.printStackTrace();
}
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
producer.shutdown();
}
}
使用約束
(1)事務(wù)消息不支持調(diào)度和批處理。
(2)為了避免單個消息多次檢查,導致一半隊列消息積累,我們有限的數(shù)量檢查單個消息默認15次,但用戶可以改變這個極限,改變“transactionCheckMax”參數(shù)的配置代理,如果一個消息一直在檢查“transactionCheckMax”時代,代理會丟棄該消息和打印一個錯誤日志默認在同一時間。用戶可以通過重寫“AbstractTransactionCheckListener”類來更改此行為。
(3)事務(wù)消息將在代理配置中的參數(shù)“transactionTimeout”確定的一段時間后被檢查。用戶也可以通過設(shè)置用戶屬性“CHECK_IMMUNITY_TIME_IN_SECONDS”來改變這個限制。在發(fā)送事務(wù)性消息時,該參數(shù)優(yōu)先于“transactionMsgTimeout”參數(shù)。
(4)事務(wù)消息可能會被檢查或使用多次。
(5)重新提交到用戶目標主題的已提交消息可能會失敗。目前,它取決于日志記錄。高可用性是由RocketMQ本身的高可用性機制保證的。如果您希望確保事務(wù)消息不會丟失,并且保證事務(wù)的完整性,建議使用同步雙寫。機制。
(6)事務(wù)性消息的生產(chǎn)者id不能與其他類型消息的生產(chǎn)者id共享。與其他類型的消息不同,事務(wù)性消息允許向后查詢。MQ服務(wù)器通過客戶機的生產(chǎn)者id查詢客戶機。
發(fā)送延時消息
發(fā)送延遲消息 RocketMQ 支持發(fā)送延遲消息, Broker 收到這類消息后,延遲一段時間再處理,使消息在規(guī)定的一段時間后生效。
延遲消息的使用方法是在創(chuàng)建 Message 對象時,調(diào)用 setDelayTimeLevel(int level) 方法設(shè)置延遲時間,然后再把這個消息發(fā)送出去。
目前延遲的時間不支持任意設(shè)置,僅支持預設(shè)值的時間長度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。
比如 setDelayTimeLevel(3) 表示延遲10s
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(3);
自定義發(fā)送規(guī)則
自定義消息發(fā)送規(guī)則一個 Topic 會有多個 Message Queue, 如果使用 Producer 的默認配置,這個 Producer 會輪流向各個 Message Queue 發(fā)送消息。
Consumer 在消費消息的時候,會根據(jù)負載均衡策略,消費被分配到的 Message Queue,如果不經(jīng)過特定的設(shè)置,某條消息被發(fā)往哪個 Message Queue,被哪個 Consumer 消費是未知的。
如果業(yè)務(wù)需要我們把消息發(fā)送到指定的 Message Queue 里,比如把同一類型的消息都發(fā)往相同的 Message Queue,該怎么辦呢?
可以用 MessageQueueSelector,發(fā)送消息的時候,把 MessageQueueSelector 的對象作為參數(shù),使用 public SendResult send(Message msg, MessageQueueSelector selector, Object arg) 函數(shù)發(fā)送消息即可。在 MessageQueueSelector 的實現(xiàn)中,根據(jù)傳入的 Object 參數(shù),或者根據(jù) Message 消息內(nèi)容確定把消息發(fā)往那個 Message Queue,返回被選中的 Message Queue。
public class OrderMessageQueueSelector implements MessageQueueSelector {
private Random random = new Random();
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object orderKey) {
int size = mqs.size();
int index = random.nextInt() % size;
return mqs.get(index);
}
}
// 發(fā)送時設(shè)置選擇器
try {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
msg.setDelayTimeLevel(3);
producer.send(msg, messageQueueSelector, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.printf("%s %n", sendResult);
SendStatus sendStatus = sendResult.getSendStatus();
log.info("sendStatus = {}", sendStatus);
}
@Override
public void onException(Throwable e) {
e.printStackTrace();
}
});
} catch (Exception e) {
e.printStackTrace();
}
發(fā)送順序消息
消息有序指的是一類消息消費時,能按照發(fā)送的順序來消費。例如:一個訂單產(chǎn)生了三條消息分別是訂單創(chuàng)建、訂單付款、訂單完成。消費時要按照這個順序消費才能有意義,但是同時訂單之間是可以并行消費的。RocketMQ可以嚴格的保證消息有序。
順序消息分為全局順序消息與分區(qū)順序消息,全局順序是指某個Topic下的所有消息都要保證順序;部分順序消息只要保證每一組消息被順序消費即可。
- 全局順序 對于指定的一個 Topic,所有消息按照嚴格的先入先出(FIFO)的順序進行發(fā)布和消費。 適用場景:性能要求不高,所有的消息嚴格按照 FIFO 原則進行消息發(fā)布和消費的場景
- 分區(qū)順序 對于指定的一個 Topic,所有消息根據(jù) sharding key 進行區(qū)塊分區(qū)。 同一個分區(qū)內(nèi)的消息按照嚴格的 FIFO 順序進行發(fā)布和消費。 Sharding key 是順序消息中用來區(qū)分不同分區(qū)的關(guān)鍵字段,和普通消息的 Key 是完全不同的概念。 適用場景:性能要求高,以 sharding key 作為分區(qū)字段,在同一個區(qū)塊中嚴格的按照 FIFO 原則進行消息發(fā)布和消費的場景。
RocketMQ可以嚴格的保證消息有序。但這個順序,不是全局順序,只是分區(qū)(queue)順序。要全局順序只能一個分區(qū)。
@Slf4j
public class OrderedProducerDemo {
public static void main(String[] args) throws Exception {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("ordered_producer_group");
//Launch the instance.
producer.setNamesrvAddr(RocketMqConfig.NAMESRV_ADDR);
producer.start();
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 100; i++) {
int orderId = i % 10;
//Create a message instance, specifying topic, tag and message body.
Message msg = new Message("OrderedProducerTopic", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}, orderId);
log.info("sendResult = {}", sendResult);
}
//server shutdown
producer.shutdown();
}
}
@Slf4j
public class OrderedConsumerDemo {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_consumer_group");
consumer.setNamesrvAddr(RocketMqConfig.NAMESRV_ADDR);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("OrderedProducerTopic", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
context.setAutoCommit(false);
log.info("{} Receive New Messages: {}", Thread.currentThread().getName(), msgs);
this.consumeTimes.incrementAndGet();
if ((this.consumeTimes.get() % 2) == 0) {
return ConsumeOrderlyStatus.SUCCESS;
} else if ((this.consumeTimes.get() % 3) == 0) {
return ConsumeOrderlyStatus.ROLLBACK;
} else if ((this.consumeTimes.get() % 4) == 0) {
return ConsumeOrderlyStatus.COMMIT;
} else if ((this.consumeTimes.get() % 5) == 0) {
context.setSuspendCurrentQueueTimeMillis(3000);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
Demo代碼的相關(guān)依賴配置
Maven依賴
<dependency>
<groupId>com.alibaba.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>3.6.2.Final</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.2</version>
</dependency>
日志配置
log4j.rootLogger=INFO, stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n