一、消息中間件對比
| kafka | RocketMQ | RabbitMQ | ||
|---|---|---|---|---|
| 定位 | 設(shè)計定位 | 系統(tǒng)間的數(shù)據(jù)流管道,實時數(shù)據(jù)處理。例如常規(guī)的消息系統(tǒng)、監(jiān)控數(shù)據(jù)、日志收集 | 可靠的消息傳輸,例如消息推送 | 可靠的消息傳輸,與RocketMQ類似。 |
| 開發(fā)語言 | Scala | Java | Erlang | |
| 客戶端語言 | Java,Python,C | Java | Java,Python,C | |
| 注冊中心 | Zookeeper | namespace | 無 | |
| 選舉方式 | 自動選舉 | 不支持自動選舉 | 無 | |
| 數(shù)據(jù)可靠性 | 很好。支持同步刷盤,同步復(fù)制,但性能差。 | 很好,支持同、異步刷盤,同步雙寫,異步復(fù)制 | 好 | |
| 消息寫入性能 | 非常好,每條10個字符測試:百萬條/s,Topic數(shù)量60個左右后性能會下降 | 很好,每條10個字符測試:單機單broker 7w/s,單機3broker 12w/s,Topic數(shù)量支持5W條左右 | 好,2W/s左右 | |
| 性能穩(wěn)定性 | 隊列、分區(qū)多的時候性能不穩(wěn)定,明顯下降,消息堆積時性能穩(wěn)定 | 隊列多的時候,消息堆積時性能穩(wěn)定 | 消息堆積時性能不穩(wěn)定 | |
| 消息堆積能力 | 非常好 | 非常好 | 一般 | |
| 消息獲取 | pull | pull,push | pull,push | |
| 順序消費 | 支持 | 支持,局部有序 | 支持 | |
| 定時消息 | 支持不好 | 支持,開源只支持指定級別的延遲 | 支持不好 | |
| 事務(wù)消息 | 不支持 | 支持 | 不支持 | |
| 消息查詢 | 不支持 | 支持 | 不支持 |
二、RocketMQ架構(gòu)分布圖

Apache RocketMQ是一個分布式消息傳遞和流媒體平臺,具有低延遲,高性能和可靠性, 萬億級容量和靈活的可伸縮性。 它由四個部分組成:nameserver,broker,生產(chǎn)者和使用者。 它們中的每一個都可以水平擴展,而沒有單個故障點。
- nameserver:提供輕量級的服務(wù)發(fā)現(xiàn)和路由。 每個名稱服務(wù)器記錄完整的路由信息,提供 相應(yīng)的讀寫服務(wù),并支持快速的存儲擴展。 注意:nameserver集群中每個nameserver都是相互獨立的,與zookeeper不同,nameserver節(jié)點間沒有通訊,也沒有主從、選舉概念。
- Broker:通過提供輕量級的TOPIC和QUEUE機制來存儲消息,把自身信息注冊到每個nameserver中。
- 生產(chǎn)者:本地隨機從nameserver中維護broker的信息,并與每個master broker有心跳通訊。
- 消費者:本地隨機從nameserver中維護broker的信息,并與每個master broker有心跳通訊。
三、RocketMQ環(huán)境
環(huán)境變量
#java環(huán)境
export JAVA_HOME=/usr/local/jdk
export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export PATH=$JAVA_HOME/bin:$PATH
#rocketmq環(huán)境
export ROCKETMQ_HOME=/usr/local/rocketmq/rocketmq‐all‐4.1.0‐incubating
export PATH=$ROCKETMQ_HOME/bin:$PATH
broker配置
#rocketmq‐name服務(wù)地址,多個地址用;分開,不配置默認為localhost:9876
namesrvAddr = 192.168.241.198:9876
brokerClusterName = DefaultCluster
brokerName = broker‐a
brokerId = 0
deleteWhen = 04
fileReservedTime = 48
#主從角色SYNC_MASTER,ASYNC_MASTER,SLAVE
brokerRole = SYNC_MASTER
flushDiskType = ASYNC_FLUSH
#允許自動創(chuàng)建主題topic
autoCreateTopicEnable=true
#broker監(jiān)聽端口
listenPort=10911
#數(shù)據(jù)存儲位置
storePathRootDir=/root/rocketmq/store
- brokerClusterName:所屬集群名稱,如果節(jié)點較多可以配置多個。
- brokerName:brokerName為子集群的名稱,子集群中有一個Master與多個Slave,子集群下所有節(jié)點的brokerName必須一樣,brokerId不一樣,默認brokerId = 0的為Master節(jié)點,大于0的為Salve節(jié)點。
- namesrvAddr:注冊中心連接開放端口,可以配置多個,用分號分隔。
- deleteWhen:刪除數(shù)據(jù)的時間,04代表凌晨4點,fileReservedTime為數(shù)據(jù)保存在磁盤的時長,單位小時。
- brokerRole:Master節(jié)點與Slave節(jié)點間的同步方式,有三個值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和異步表示Master和Slave之間同步數(shù)據(jù)的機制,其中Slave一致性使用SLAVE;
- flushDiskType:刷盤策略,取值為:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盤和異步刷盤;SYNC_FLUSH消 息寫入磁盤后才返回成功狀態(tài),ASYNC_FLUSH不需要;
- autoCreateTopicEnable:自動新建topic,默認為false。
- listenPort:啟動監(jiān)聽的端口號。
- storePathRootDir:磁盤存儲消息的根目錄。
內(nèi)存的設(shè)置
rocketmq集群內(nèi)存的設(shè)置是針對注冊中心namesrv與broker內(nèi)存的設(shè)置,分別設(shè)置rocketmq bin目錄下的runserver.sh與runbroker.sh(或者runserver.cmd與runbroker.cmd)。
- runserver配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
- runbroker配置
JAVA_OPT="${JAVA_OPT} ‐server ‐Xms256m ‐Xmx256m ‐Xmn128m ‐XX:MetaspaceSi ze=64m ‐XX:MaxMetaspaceSize=128m"
單機運行
#啟動注冊中心
nohup sh bin/mqnamesrv ‐n 192.168.241.198:9876
#啟動broker
nohup sh bin/mqbroker ‐n 192.168.241.198:9876 ‐c conf/broker.conf &
注意:啟動注冊中心或者broker的時候最好指定一下IP,防止在多網(wǎng)卡或者dockers的環(huán)境下,IP使用錯誤。
多機集群部署
在主目錄下的conf文件夾下提供了多種broker配置模式,分別有:2m-2s-async,2m-2s- sync,2m-noslave。若目前2臺機器,分別部署1個 NameServer,同時分別部署一個Master和一個Slave,互為主備。
注冊中心配置
namesrvAddr配置與在單機的環(huán)境下無異。broker配置
broker節(jié)點在集群中有主從之分,與單機環(huán)境下的配置差異性主要體現(xiàn)如下:
- master
#broker節(jié)點注冊到多個注冊中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#主節(jié)點
brokerId = 0
#SYNC_MASTER或者ASYNC_MASTER
brokerRole = SYNC_MASTER
- slave
#broker節(jié)點注冊到多個注冊中心
namesrvAddr = 192.168.241.198:9876;192.168.241.199:9876
#非0表示從節(jié)點唯一標志
brokerId = 1
#表明從節(jié)點
brokerRole = SLAVE
環(huán)境驗證
- 查看集群監(jiān)控狀態(tài)
sh mqadmin clusterlist ‐n 192.168.241.198:9876;192.168.241.199:9876
- 測試
export NAMESRV_ADDR=192.168.241.198:9876;192.168.241.199:9876
測試發(fā)送端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
測試消費端
> sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
rocketmq console
進入rocketmq-externals項目的GitHub地址,如下圖,可看到RocketMQ項目的諸多擴展項目,其中就包含我們需要下載的rocketmq-console。
rocketmq-console是一個springboot項目,跑之前修改下配置。
四、基本概念
消息模型
RocketMQ主要由 Producer、Broker、Consumer 三部分組成,其中Producer 負責 生產(chǎn)消息,Consumer 負責消費消息,Broker 負責存儲消息。
消息對象
生產(chǎn)者(producer)
負責生產(chǎn)消息,一般由業(yè)務(wù)系統(tǒng)負責生產(chǎn)消息。一個消息生產(chǎn)者會把業(yè)務(wù)應(yīng)用系統(tǒng)里產(chǎn)生的消息發(fā)送到broker服務(wù)器。RocketMQ提供多種發(fā)送方式,同步發(fā)送、異步發(fā)送、順序發(fā)送、單向發(fā)送。同步和異步方式均需要Broker返回確認信息,單向發(fā)送不需要。消費者(Consumer)
負責消費消息,一般是后臺系統(tǒng)負責異步消費。一個消息消費者會從Broker服務(wù)器拉取消息、并將其提供給應(yīng)用程序。從用戶應(yīng)用的角度而言提供了兩種消費形式:拉取式消費 (pull consumer)、推動式消費(push consumer)。
主題(Topic)
表示一類消息的集合,每個主題包含若干條消息,每條消息只能屬于一個主題,是RocketMQ進行消息訂閱的基本單位。
代理服務(wù)器(Broker Server)
消息中轉(zhuǎn)角色,負責存儲消息、轉(zhuǎn)發(fā)消息。代理服務(wù)器在RocketMQ系統(tǒng)中負責接收 從生產(chǎn)者發(fā)送來的消息并存儲、同時為消費者的拉取請求作準備。代理服務(wù)器也存儲消息相 關(guān)的元數(shù)據(jù),包括消費者組、消費進度偏移和主題和隊列消息等。
注冊中心服務(wù)(Name Server)
注冊中心服務(wù)充當路由消息的提供者。生產(chǎn)者或消費者能夠通過名字服務(wù)查找各主題相應(yīng)的 Broker IP列表。多個Namesrv實例組成集群,但相互獨立,沒有信息交換。
組
生產(chǎn)者組(Producer Group)
同一類Producer的集合,這類Producer發(fā)送同一類消息且發(fā)送邏輯一致。如果發(fā)送的 是事物消息且原始生產(chǎn)者在發(fā)送之后崩潰,則Broker服務(wù)器會聯(lián)系同一生產(chǎn)者組的其他生產(chǎn)者實例以提交或回溯消費。消費者組(Consumer Group)
同一類Consumer的集合,這類Consumer通常消費同一類消息且消費邏輯一致。消費者組使得在消息消費方面,實現(xiàn)負載均衡和容錯的目標變得非常容易。要注意的是,消費者組的消費者實例必須訂閱完全相同的Topic。RocketMQ 支持兩種消息模式:集群消費 (Clustering)和廣播消費(Broadcasting)。
客戶端消費
拉取式消費(Pull Consumer)
Consumer消費的一種類型,應(yīng)用通常主動調(diào)用Consumer的拉消息方法從Broker服務(wù) 器拉消息、主動權(quán)由應(yīng)用控制。一旦獲取了批量消息,應(yīng)用就會啟動消費過程。推動式消費(Push Consumer)
Consumer消費的一種類型,該模式下Broker收到數(shù)據(jù)后會主動推送給消費端,該消費模式一般實時性較高。集群消費(Clustering)
集群消費模式下,相同Consumer Group的每個Consumer實例平均分攤消息。廣播消費(Broadcasting)
廣播消費模式下,相同Consumer Group的每個Consumer實例都接收全量的消息。
消息(Message)
消息系統(tǒng)所傳輸信息的物理載體,生產(chǎn)和消費數(shù)據(jù)的最小單位,每條消息必須屬于一個主題。RocketMQ中每個消息擁有唯一的Message ID,且可以攜帶具有業(yè)務(wù)標識的Key。 系統(tǒng)提供了通過Message ID和Key查詢消息的功能。
順序消息
普通順序消息(Normal Ordered Message)
普通順序消費模式下,消費者通過同一個消費隊列收到的消息是有順序的,不同消息隊列收到的消息則可能是無順序的。嚴格順序消息(Strictly Ordered Message)
嚴格順序消息模式下,消費者收到的所有消息均是有順序的。
標簽(Tag)
為消息設(shè)置的標志,用于同一主題下區(qū)分不同類型的消息。來自同一業(yè)務(wù)單元的消息, 可以根據(jù)不同業(yè)務(wù)目的在同一主題下設(shè)置不同標簽。標簽?zāi)軌蛴行У乇3执a的清晰度和連 貫性,并優(yōu)化RocketMQ提供的查詢系統(tǒng)。消費者可以根據(jù)Tag實現(xiàn)對不同子主題的不同消 費邏輯,實現(xiàn)更好的擴展性。

注意1:每個broker中都會有一個commitlog,由于記錄生產(chǎn)者發(fā)送的消息。
注意2:每個broker中有多個Topic,每個Topic中默認有4個queue隊列,每個queue對應(yīng)一個持久化文件。
注意3:每個broker中會對應(yīng)一個consumerOffset.json文件,用于記錄隊列消費的節(jié)點到哪了。
注意4:consumer、producer與broker間的通信基于Netty來實現(xiàn)的,默認為Netty中的epoll模式,若系統(tǒng)不支持epoll模式,才使用nio模式。
注意5:producer在發(fā)送消息的時候,會以輪循的方式放置于隊列中(比如圖上broker-master-1與broker-master-2共8個),若有順序消息的話,會保證所有順序消息放在同一個隊列中。
沒開始使用的broker內(nèi)部的文件。

已經(jīng)使用的broker內(nèi)部的文件。

config內(nèi)部結(jié)構(gòu)

store:存儲commitlog文件,每個broker對應(yīng)一個commitlog,commitlog中存儲的是topic真正的內(nèi)容數(shù)據(jù)。
index:索引。
consumequeue:存儲每個主題下的隊列,默認每個主題4個隊列,這邊存儲的主要是消息的tag、消息對應(yīng)在commitlog的地址、空間大小等,。
topic.json: 存儲所有topic的信息,主要為topic的屬性信息。
consumerOffset.json:消費者偏移量信息,對應(yīng)了每個主題@每個消費群組{隊列1:偏移量,隊列2:偏移量,隊列3:偏移量,隊列4:偏移量}
| 名稱 | 作用 |
|---|---|
| broker | broker模塊:c和p端消息存儲邏輯 |
| client | 客戶端api:produce、consumer端 接受與發(fā)送api |
| common | 公共組件:常量、基類、數(shù)據(jù)結(jié)構(gòu) |
| tools | 運維tools:命令行工具模塊 |
| store | 存儲模塊:消息、索引、commitlog存儲 |
| namesrv | 服務(wù)管理模塊:服務(wù)注冊topic等信息存儲 |
| remoting | 遠程通訊模塊:netty+fastjson |
| logappender | 日志適配模塊 |
| example | Demo列子 |
| filtersrv | 消息過濾器模塊 |
| srvutil | 輔助模塊 |
| filter | 過濾模塊:消息過濾模塊 |
| distribution | 部署、運維相關(guān)zip包中的代碼 |
| openmessaging | 兼容openmessaging分布式消息模塊 |
五、使用
1. 同步、異步、一次性
生產(chǎn)者
同步
public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException, MQBrokerException {
//Instantiate with a producer group name.
DefaultMQProducer producer = new DefaultMQProducer("tl_msg_student_group");
producer.setNamesrvAddr("192.168.241.198:9876");
//producer.setSendMsgTimeout(10000);
producer.start();
Message msg = new Message("TopicStudent" ,
"TagStudent" ,
"tag" ,
("Hello tuling RocketMQ ").getBytes(RemotingHelper.DEFAULT_CHARSET)
);
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
producer.shutdown();
}
異步
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
//設(shè)置發(fā)送失敗重試機制
producer.setRetryTimesWhenSendAsyncFailed(5);
int messageCount = 1;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
Message msg = new Message("TopicTest",
"TagSendOne",
"OrderID188",
"I m sending msg content is yangguo".getBytes(RemotingHelper.DEFAULT_CHARSET));
//消息發(fā)送成功后,執(zhí)行回調(diào)函數(shù)
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
countDownLatch.countDown();
System.out.printf("%-10d OK %s %n", index,
sendResult.getMsgId());
}
@Override
public void onException(Throwable e) {
countDownLatch.countDown();
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
//防止回調(diào)未回,producer就已經(jīng)刪除
countDownLatch.await(5, TimeUnit.SECONDS);
producer.shutdown();
}
一次性
DefaultMQProducer producer = new DefaultMQProducer("tl_message_group");
// Specify name server addresses.
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.setSendMsgTimeout(10000);
producer.start();
for (int i = 0; i < 1; i++) {
Message msg = new Message("TopicTest" /* Topic */,
"TagSendOne" /* Tag */,
"OrderID198",
("Hello RocketMQ test i " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
producer.sendOneway(msg);
}
//Shut down once the producer instance is not longer in use.
producer.shutdown();
消費者
// tl_msg_student_group
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tl_student_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
// Subscribe one more more topics to consume.
consumer.subscribe("TopicStudent", "*");
// Register callback to execute on arrival of messages fetched from brokers.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println(new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
2. 廣播消息
生產(chǎn)者
DefaultMQProducer producer = new DefaultMQProducer("consumer_model_group");
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
for (int i = 0; i < 4; i++){
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
("Hello world"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
producer.shutdown();
消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_model_group");
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
//廣播,全量消費
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt ext : msgs){
System.out.printf(Thread.currentThread().getName() + " Receive New Message: " + new String(ext.getBody()) + "%n");
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Broadcast Consumer Started.%n");
3. 批量消息
批量發(fā)送消息能顯著提高傳遞小消息的性能。限制是這些批量消息應(yīng)該有相同的 topic,相同的waitStoreMsgOK,而且不能是延時消息。此外,這一批消息的總大小不應(yīng) 超過4MB。rocketmq建議每次批量消息大小大概在1MB。 當消息大小超過4MB時,需要將消息進行分割。
生產(chǎn)者
public class ListSplitter implements Iterator<List<Message>> {
private final int SIZE_LIMIT = 1000 * 1000 * 1;//1MB
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages) {
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<Message> next() {
int nextIndex = currIndex;
int totalSize = 0;
//遍歷消息準備拆分
for (; nextIndex < messages.size(); nextIndex++) {
Message message = messages.get(nextIndex);
int tmpSize = message.getTopic().length() + message.getBody().length;
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet()) {
tmpSize += entry.getKey().length() + entry.getValue().length();
}
tmpSize = tmpSize + 20; //for log overhead
if (tmpSize > SIZE_LIMIT) {
if (nextIndex - currIndex == 0) {
nextIndex++;
}
break;
}
if (tmpSize + totalSize > SIZE_LIMIT) {
break;
} else {
totalSize += tmpSize;
}
}
List<Message> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
/**
* rocketMq 支持消息批量發(fā)送
* 同一批次的消息應(yīng)具有:相同的主題,相同的waitStoreMsgOK,并且不支持定時任務(wù)。
* <strong> 同一批次消息建議大小不超過~1M </strong>,消息最大不能超過4M,需要
* 對msg進行拆分
*/
DefaultMQProducer producer = new DefaultMQProducer("batch_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
ListSplitter splitter = new ListSplitter(messages);
/**
* 對批量消息進行拆分
*/
while (splitter.hasNext()) {
try {
List<Message> listItem = splitter.next();
producer.send(listItem);
} catch (Exception e) {
e.printStackTrace();
}
}
producer.shutdown();
消費者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");
// ;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("BatchTest", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
System.out.println("queueId=" + msg.getQueueId() + "," + new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
4. 過濾消息
RocketMq在消息過濾這塊做得很強大,它可以通過Tag過濾消息,可以通過SQL表達式篩選消息,它也可以支持java腳本過濾。
其中通過SQL表達式篩選 和 java腳本過濾 需要在broker的配置文件中把對應(yīng)的配置打開。
enablePropertyFilter=true
Topic 與 Tag 都是業(yè)務(wù)上用來歸類的標識,區(qū)分在于 Topic 是一級分類,而 Tag 可以說是二級分類,關(guān)系如圖所示。

生產(chǎn)者
/***
* TAG-FILTER-1000 ---> 布隆過濾器
* 過濾掉的那些消息。直接就跳過了么。下次就不會繼續(xù)過濾這些了。是么。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("filter_sample_group");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
for (int i = 0; i < 3; i++) {
Message msg = new Message("TopicFilter",
"TAG-FILTER",
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
msg.putUserProperty("a",String.valueOf(i));
if(i % 2 == 0){
msg.putUserProperty("b","yangguo");
}else{
msg.putUserProperty("b","xiaolong girl");
}
producer.send(msg);
}
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("filter_sample_group");
/**
* 注冊中心
*/
consumer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
/**
* 訂閱主題
* 一種資源去換取另外一種資源
*/
consumer.subscribe("TopicFilter", MessageSelector.bySql("a between 0 and 3 and b = 'yangguo'"));
/**
* 注冊監(jiān)聽器,監(jiān)聽主題消息
*/
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs){
try {
System.out.println("consumeThread=" + Thread.currentThread().getName()
+ ", queueId=" + msg.getQueueId() + ", content:"
+ new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.printf("Filter Consumer Started.%n");
}
5. 延遲消息
定時消息是指消息發(fā)到 Broker 后,不能立刻被 Consumer 消費,要到特定的時間點 或者等待特定的時間后才能被消費。
使用場景:如電商里,提交了一個訂單就可以發(fā)送一個延時消息,1h后去檢查這個訂單的 狀態(tài),如果還是未付款就取消訂單釋放庫存。
當前支持的延遲時間
1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
分別對應(yīng)級別
1 2 3....................
生產(chǎn)者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ExampleConsumer");
//;192.168.241.199:9876
producer.setNamesrvAddr("192.168.241.198:9876;192.168.241.199:9876");
producer.start();
int totalMessagesToSend = 3;
for (int i = 0; i < totalMessagesToSend; i++) {
Message message = new Message("TestTopic", ("Hello scheduled message " + i).getBytes());
//延時消費
message.setDelayTimeLevel(6);
// Send the message
producer.send(message);
}
System.out.printf("message send is completed .%n");
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ExampleConsumer");
//;192.168.241.199:9876
consumer.setNamesrvAddr("192.168.241.198:9876");
consumer.subscribe("TestTopic", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> messages, ConsumeConcurrentlyContext context) {
for (MessageExt message : messages) {
// Print approximate delay time period
System.out.println("Receive message[msgId=" + message.getMsgId() + "] "
+ "message content is :" + new String(message.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
//System.out.printf("Consumer Started.%n");
}
注意1:延遲消息發(fā)送到broker的時候,broker會專門新建一個中轉(zhuǎn)主題SCHEDULED_TOPIC_XXXX來存放消息,目前開原版只支持18個級別,相當于中轉(zhuǎn)主題下存在18個隊列文件分別存儲這18個級別。同時broker后臺開啟個線程,只要延遲消息的時間到了,才會把延遲消息放置于真正的topic下。
注意2:開源版下的延遲消息并不適合高并發(fā)的延遲消息,若業(yè)務(wù)存在高并發(fā)的延遲消息,需要考慮使用商業(yè)版的RocketMQ。
注意3:客戶端集群消息的消費來源于pullRequestQueue,pullRequestQueue中的消息來源在于客戶端中存在一個線程從broker中主動pull。
注意4:客戶端從namesrv同步信息周期30s,客戶端與broker心跳周期30s,客戶端心跳消費偏移量同步周期5s。
注意5:客戶端執(zhí)行失敗的消息,客戶端會發(fā)回到broker中,broker端會新建一個RETRY_TOPIC_XXXX來存儲,大概10S后會再次發(fā)給客戶端消費,默認16次。
6. 順序消息
生產(chǎn)者
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("ordered_group_name");
producer.setNamesrvAddr("192.168.241.198:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 訂單列表
List<OrderStep> orderList = buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間前綴
String body = dateStr + " Hello RocketMQ "+ i + " " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i,
body.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Long id = (Long) arg; //根據(jù)訂單id選擇發(fā)送queue
long index = id % mqs.size();
return mqs.get((int) index);
}
}, orderList.get(i).getOrderId());//訂單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
消費者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ordered_group_name");
consumer.setNamesrvAddr("192.168.241.198:9876");
/**
* 設(shè)置消費位置
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
consumer.registerMessageListener(new MessageListenerOrderly() {
AtomicLong consumeTimes = new AtomicLong(0);
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,ConsumeOrderlyContext context) {
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 可以看到每個queue有唯一的consume來消費, 訂單對每個queue(分區(qū))有序
try {
System.out.println("consumeThread=" + Thread.currentThread().getName() + ", queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.printf("Consumer Started.%n");
}
7. 事務(wù)消息
半事務(wù)消息:暫不能投遞的消息,發(fā)送方已經(jīng)成功地將消息發(fā)送到了消息隊列 MQ 服務(wù)端,但是服務(wù)端未收到生產(chǎn)者對該消息的二次確認,此時該消息被標記。半事務(wù)消息會單獨存儲在HALF_TOPIC中。
消息回查:由于網(wǎng)絡(luò)閃斷、生產(chǎn)者應(yīng)用重啟等原因,導(dǎo)致某條事務(wù)消息的二次確 認丟失,消息隊列 MQ 服務(wù)端通過掃描發(fā)現(xiàn)某條消息長期處于“半事務(wù)消息”時,需要 主動向消息生產(chǎn)者詢問該消息的最終狀態(tài)(Commit 或是 Rollback),該詢問過程即 消息回查。
注意:事務(wù)消息中的實現(xiàn)在于product端與broker端是雙向通信的,互為客戶端和服務(wù)端

生產(chǎn)者
private void testTransaction() throws MessagingException {
String[] tags = new String[]{"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
try {
Message msg = MessageBuilder.withPayload("Hello RocketMQ " + i).
setHeader(RocketMQHeaders.KEYS, "KEY_" + i).build();
/**
* TX_PGROUP_NAME 必須同 {@link TransactionListenerImpl} 類的注解 txProducerGroup
* @RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
*/
SendResult sendResult = rocketMQTemplate.sendMessageInTransaction(TX_PGROUP_NAME,
springTransTopic + ":" + tags[i % tags.length], msg, null);
System.out.printf("------ send Transactional msg body = %s , sendResult=%s %n",
msg.getPayload(), sendResult.getSendStatus());
Thread.sleep(10);
} catch (Exception e) {
e.printStackTrace();
}
}
}
監(jiān)聽
@RocketMQTransactionListener(txProducerGroup = "myTxProducerGroup")
public class TransactionListenerImpl implements RocketMQLocalTransactionListener {
private AtomicInteger transactionIndex = new AtomicInteger(0);
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<String, Integer>();
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
System.out.printf("#### executeLocalTransaction is executed, msgTransactionId=%s %n",
transId);
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(transId, status);
if (status == 0) {
// 事務(wù)提交
System.out.printf(" # COMMIT # Simulating msg %s related local transaction exec succeeded! ### %n", msg.getPayload());
return RocketMQLocalTransactionState.COMMIT;
}
if (status == 1) {
// 本地事務(wù)回滾
System.out.printf(" # ROLLBACK # Simulating %s related local transaction exec failed! %n", msg.getPayload());
return RocketMQLocalTransactionState.ROLLBACK;
}
// 事務(wù)狀態(tài)不確定,待Broker發(fā)起 ASK 回查本地事務(wù)狀態(tài)
System.out.printf(" # UNKNOW # Simulating %s related local transaction exec UNKNOWN! \n");
return RocketMQLocalTransactionState.UNKNOWN;
}
/**
* 在{@link TransactionListenerImpl#executeLocalTransaction(org.springframework.messaging.Message, java.lang.Object)}
* 中執(zhí)行本地事務(wù)時可能失敗,或者異步提交,導(dǎo)致事務(wù)狀態(tài)暫時不能確定,broker在一定時間后
* 將會發(fā)起重試,broker會向producer-group發(fā)起ask回查,
* 這里producer->相當于server端,broker相當于client端,所以由此可以看出broker&producer-group是
* 雙向通信的。
* @param msg
* @return
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String transId = (String)msg.getHeaders().get(RocketMQHeaders.PREFIX + RocketMQHeaders.TRANSACTION_ID);
RocketMQLocalTransactionState retState = RocketMQLocalTransactionState.COMMIT;
Integer status = localTrans.get(transId);
if (null != status) {
switch (status) {
case 0:
retState = RocketMQLocalTransactionState.UNKNOWN;
break;
case 1:
retState = RocketMQLocalTransactionState.COMMIT;
break;
case 2:
retState = RocketMQLocalTransactionState.ROLLBACK;
break;
}
}
System.out.printf("------ !!! checkLocalTransaction is executed once," +
" msgTransactionId=%s, TransactionState=%s status=%s %n",
transId, retState, status);
return retState;
}
}
七、消息存儲整體架構(gòu)

消息存儲架構(gòu)圖中主要有下面三個跟消息存儲相關(guān)的文件構(gòu)成。
- CommitLog
消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內(nèi)容, 消息內(nèi)容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為 1G=1073741824;當?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始 偏移量為1073741824,以此類推。消息主要是順序?qū)懭肴罩疚募斘募M了,寫入下一 個文件;
- ConsumeQueue
消息消費隊列,引入的目的主要是提高消息消費的性能,由于 RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷 commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù) ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset, 消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的 commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層 組織結(jié)構(gòu),具體存儲路徑為: $HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文 件采取定長設(shè)計,每一個條目共20個字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié) 的消息長度、8字節(jié)tag hashcode,單個文件由30W個條目組成,可以像數(shù)組一樣隨機訪 問每一個條目,每個ConsumeQueue文件大小約5.72M;
- IndexFile
IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方 法。Index文件的存儲位置是:
{fileName},文件名fileName是以 創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大?。?0+500Wx4+2000Wx20= 420000040個字節(jié)大小,約為400M,一個IndexFile可以保存 2000W個索引,IndexFile 的底層存儲設(shè)計為在文件系統(tǒng)中實現(xiàn)HashMap結(jié)構(gòu),故rocketmq的索引文件其底層實現(xiàn) 為hash索引。
零拷貝刷盤
以文件下載為例,服務(wù)端的主要任務(wù)是:將服務(wù)端主機磁盤中的文件不做修改地從已連接的socket發(fā)出去。操作系統(tǒng)底層I/O過程如下圖所示:

過程共產(chǎn)生了四次數(shù)據(jù)拷貝,在此過程中,我們沒有對文件內(nèi)容做任何修改,那么在內(nèi)核空 間和用戶空間來回拷貝數(shù)據(jù)無疑就是一種浪費,而零拷貝主要就是為了解決這種低效性。
什么是零拷貝技術(shù)?
零拷貝主要的任務(wù)就是避免CPU將數(shù)據(jù)從一塊存儲拷貝到另外一塊存儲,主要就是利用各種零拷貝技術(shù),避免讓CPU做大量的數(shù)據(jù)拷貝任務(wù),減少不必要的拷貝,或者讓別的組件 來做這一類簡單的數(shù)據(jù)傳輸任務(wù),讓CPU解脫出來專注于別的任務(wù)。這樣就可以讓系統(tǒng)資源的利用更加有效。
原理是磁盤上的數(shù)據(jù)會通過DMA被拷貝的內(nèi)核緩沖區(qū),接著操作系統(tǒng)會把這段內(nèi)核緩沖 區(qū)與應(yīng)用程序共享,這樣就不需要把內(nèi)核緩沖區(qū)的內(nèi)容往用戶空間拷貝。應(yīng)用程序再調(diào)用 write(),操作系統(tǒng)直接將內(nèi)核緩沖區(qū)的內(nèi)容拷貝到socket緩沖區(qū)中,這一切都發(fā)生在內(nèi)核 態(tài),最后,socket緩沖區(qū)再把數(shù)據(jù)發(fā)到網(wǎng)卡去。

注意:連續(xù)的磁盤空間才不用經(jīng)過用戶空間的整合,而直接實現(xiàn)頁緩存與socket緩沖區(qū)的共享,從而減少了內(nèi)核空間到用戶空間狀態(tài)的轉(zhuǎn)換,并且減少了2次內(nèi)核空間與用戶空間復(fù)制操作,進而提高了整個系統(tǒng)的性能。這就是rocketmq開辟磁盤空間的時候為什么選擇直接開啟足夠大的磁盤空間文件進行存儲消息的原因(CommitLog IndexFile)。