RocketMQ支持3種消息發(fā)送方式:同步(sync)、異步(async)和單向(one way)。
- 同步:發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),同步等待,直到消息服務(wù)器返回發(fā)送結(jié)果。
- 異步:發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),指定消息發(fā)送成功后的回調(diào)函數(shù),調(diào)用消息發(fā)送API后,立即返回,消息發(fā)送者線程不阻塞,直到運(yùn)行結(jié)束,消息發(fā)送成功或失敗的回調(diào)任務(wù)在一個(gè)新的線程中執(zhí)行。
- 單向:消息發(fā)送者向RocketMQ執(zhí)行發(fā)送消息API時(shí),直接返回,不等待消息服務(wù)器的結(jié)果,也不注冊(cè)回調(diào)函數(shù)。簡(jiǎn)單地說(shuō),就是只管發(fā),不在乎消息是否成功存儲(chǔ)在消息服務(wù)器上。
RocketMQ消息發(fā)送需要考慮以下3個(gè)問(wèn)題:
- 消息隊(duì)列如何進(jìn)行負(fù)載?
- 消息發(fā)送如何實(shí)現(xiàn)高可用?
- 批量消息發(fā)送如何實(shí)現(xiàn)一致性?
1. Topic路由機(jī)制
初次發(fā)送時(shí)會(huì)根據(jù)topic的名稱向NameServer集群查詢topic的路由信息,然后將其存儲(chǔ)在本地內(nèi)存緩存中,并且每隔30s依次遍歷緩存中的topic,向NameServer查詢最新的路由信息。如果成功查詢到路由信息,會(huì)將這些信息更新至本地緩存,實(shí)現(xiàn)topic路由信息的動(dòng)態(tài)感知。
RocketMQ提供了自動(dòng)創(chuàng)建主題(topic)的機(jī)制,消息發(fā)送者向一個(gè)不存在的主題發(fā)送消息時(shí),向NameServer查詢?cè)撝黝}的路由信息會(huì)先返回空,如果開(kāi)啟了自動(dòng)創(chuàng)建主題機(jī)制,會(huì)使用一個(gè)默認(rèn)的主題名再次從NameServer查詢路由信息,然后消息發(fā)送者會(huì)使用默認(rèn)主題的路由信息進(jìn)行負(fù)載均衡,但不會(huì)直接使用默認(rèn)路由信息為新主題創(chuàng)建對(duì)應(yīng)的路由信息。使用默認(rèn)主題創(chuàng)建路由信息的流程如圖所示。

注意:RocketMQ中的路由消息是持久化在Broker中的,NameServer中的路由信息來(lái)自Broker的心跳包并存儲(chǔ)在內(nèi)存中。
2. 消息發(fā)送高可用設(shè)計(jì)
發(fā)送端在自動(dòng)發(fā)現(xiàn)主題的路由信息后,RocketMQ默認(rèn)使用輪詢算法進(jìn)行路由的負(fù)載均衡。RocketMQ在消息發(fā)送時(shí)支持自定義的隊(duì)列負(fù)載算法,需要特別注意的是,使用自定義的路由負(fù)載算法后,RocketMQ的重試機(jī)制將失效。RocketMQ為了實(shí)現(xiàn)消息發(fā)送高可用,引入了兩個(gè)非常重要的特性:
- 消息發(fā)送重試機(jī)制。RocketMQ在消息發(fā)送時(shí)如果出現(xiàn)失敗,默認(rèn)會(huì)重試兩次。
- 故障規(guī)避機(jī)制。當(dāng)消息第一次發(fā)送失敗時(shí),如果下一次消息還是發(fā)送到剛剛失敗的Broker上,其消息發(fā)送大概率還是會(huì)失敗,因此為了保證重試的可靠性,在重試時(shí)會(huì)盡量避開(kāi)剛剛接收失敗的Broker,而是選擇其他Broker上的隊(duì)列進(jìn)行發(fā)送,從而提高消息發(fā)送的成功率。
消息發(fā)送的高可用性設(shè)計(jì)如圖所示:

3. 消息發(fā)送流程
消息生產(chǎn)者啟動(dòng)流程在DefaultMQProducerImpl#start中,記錄了也記不住,這里不贅述了,主要記錄核心業(yè)務(wù)流程。
RocketMQ消息發(fā)送的關(guān)鍵點(diǎn)如圖所示:

消息發(fā)送流程主要的步驟為驗(yàn)證消息、查找路由、消息發(fā)送(包含異常處理機(jī)制),如下所示:
DefaultMQProducerImpl#send
public SendResult send(Message msg) throws MQClientException, RemotingException,
MQBrokerException, InterruptedException {
return send(msg, this.defaultMQProducer.getSendMsgTimeout());
}
public SendResult send(Message msg,long timeout) throws MQClientException,
RemotingException, MQBrokerException, InterruptedException {
return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
}
這里重點(diǎn)介紹下Broker的故障延遲機(jī)制。
MQFaultStrategy#selectOneMessageQueue
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final
String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
// 輪訓(xùn)隊(duì)列
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
// 驗(yàn)證該消息隊(duì)列是否可用
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
// lastBrokerName就是上一次選擇的執(zhí)行發(fā)送消息失敗的Broker
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() %
writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
return tpInfo.selectOneMessageQueue();
}
return tpInfo.selectOneMessageQueue(lastBrokerName);
}
消息發(fā)送API核心入口DefaultMQProducerImpl#sendKernelImpl:
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout)
4. 批量消息發(fā)送
批量消息發(fā)送是將同一主題的多條消息一起打包發(fā)送到消息服務(wù)端,減少網(wǎng)絡(luò)調(diào)用次數(shù),提高網(wǎng)絡(luò)傳輸效率。當(dāng)然,并不是在同一批次中發(fā)送的消息數(shù)量越多,性能就越好,判斷依據(jù)是單條消息的長(zhǎng)度,如果單條消息內(nèi)容比較長(zhǎng),則打包發(fā)送多條消息會(huì)影響其他線程發(fā)送消息的響應(yīng)時(shí)間,并且單批次消息發(fā)送總長(zhǎng)度不能超過(guò)Default MQProducer#maxMessageSize。批量發(fā)送消息要解決的是如何將這些消息編碼,以便服務(wù)端能夠正確解碼每條消息的內(nèi)容。
發(fā)送單條消息時(shí),消息體的內(nèi)容將保存在body中。發(fā)送批量消息時(shí),需要將多條消息體的內(nèi)容存儲(chǔ)在body中。如何存儲(chǔ)更便于服務(wù)端正確解析每條消息呢?RocketMQ采取的方式是,對(duì)單條消息內(nèi)容使用固定格式進(jìn)行存儲(chǔ),如圖所示:

首先在消息發(fā)送端,調(diào)用batch()方法,將一批消息封裝成MessageBatch對(duì)象。Message-Batch繼承自Message對(duì)象,內(nèi)部持有List<Message> messages。這樣一來(lái),批量消息發(fā)送與單條消息發(fā)送的處理流程就完全一樣了。MessageBatch只需要將該集合中每條消息的消息體聚合成一個(gè)byte[]數(shù)組,在消息服務(wù)端能夠從該byte[]數(shù)組中正確解析出消息。
MessageDecoder#encodeMessage
public static byte[] encodeMessage(Message message) {
byte[] body = message.getBody(); int
bodyLen = body.length;
String properties = messageProperties2String(message.getProperties());
byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
propertiesLength = (short) propertiesBytes.length; int
sysFlag = message.getFlag();
int storeSize = 4 // 1 TOTALSIZE
+ 4 // 2 MAGICCOD
+ 4 // 3 BODYCRC
+ 4 // 4 FLAG
+ 4 + bodyLen // 4 BODY
+ 2 + propertiesLength;
ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
// 1 TOTALSIZE
byteBuffer.putInt(storeSize);
// 2 MAGICCODE
byteBuffer.putInt(0);
// 3 BODYCRC
byteBuffer.putInt(0);
// 4 FLAG
int flag = message.getFlag();
byteBuffer.putInt(flag);
// 5 BODY
byteBuffer.putInt(bodyLen);
byteBuffer.put(body);
// 6 properties
byteBuffer.putShort(propertiesLength);
byteBuffer.put(propertiesBytes);
return byteBuffer.array();
}