簡介
RocketMQ 特點(diǎn)
RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給 Apache 軟件基金會,并于2017年9月25日成為 Apache 的頂級項目。作為經(jīng)歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國產(chǎn)中間件,以其高性能、低延時和高可靠等特性近年來已經(jīng)也被越來越多的國內(nèi)企業(yè)使用。其主要特點(diǎn)有:
靈活可擴(kuò)展性
RocketMQ 天然支持集群,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點(diǎn)故障的情況下進(jìn)行水平擴(kuò)展。海量消息堆積能力
RocketMQ 采用零拷貝原理實(shí)現(xiàn)超大的消息的堆積能力,據(jù)說單機(jī)已可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。支持順序消息
可以保證消息消費(fèi)者按照消息發(fā)送的順序?qū)ο⑦M(jìn)行消費(fèi)。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產(chǎn)者通過將某一類消息按順序發(fā)送至同一個隊列來實(shí)現(xiàn)。多種消息過濾方式
消息過濾分為在服務(wù)器端過濾和在消費(fèi)端過濾。服務(wù)器端過濾時可以按照消息消費(fèi)者的要求做過濾,優(yōu)點(diǎn)是減少不必要消息傳輸,缺點(diǎn)是增加了消息服務(wù)器的負(fù)擔(dān),實(shí)現(xiàn)相對復(fù)雜。消費(fèi)端過濾則完全由具體應(yīng)用自定義實(shí)現(xiàn),這種方式更加靈活,缺點(diǎn)是很多無用的消息會傳輸給消息消費(fèi)者。支持事務(wù)消息
RocketMQ 除了支持普通消息,順序消息之外還支持事務(wù)消息,這個特性對于分布式事務(wù)來說提供了又一種解決思路。回溯消費(fèi)
回溯消費(fèi)是指消費(fèi)者已經(jīng)消費(fèi)成功的消息,由于業(yè)務(wù)上需求需要重新消費(fèi),RocketMQ 支持按照時間回溯消費(fèi),時間維度精確到毫秒,可以向前回溯,也可以向后回溯。
基本概念
下面是一張 RocketMQ 的部署結(jié)構(gòu)圖,里面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每個組件都可以部署成集群模式進(jìn)行水平擴(kuò)展。
生產(chǎn)者
生產(chǎn)者(Producer)負(fù)責(zé)產(chǎn)生消息,生產(chǎn)者向消息服務(wù)器發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息。 RocketMQ 提供了三種方式發(fā)送消息:同步、異步和單向。
同步發(fā)送
同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營銷短信。
異步發(fā)送
異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包,一般用于可能鏈路耗時較長而對響應(yīng)時間敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù)。
單向發(fā)送
單向發(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。
生產(chǎn)者組
生產(chǎn)者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發(fā)送一類消息并且發(fā)送邏輯一致,所以將這些 Producer 分組在一起。從部署結(jié)構(gòu)上看生產(chǎn)者通過 Producer Group 的名字來標(biāo)記自己是一個集群。
消費(fèi)者
消費(fèi)者(Consumer)負(fù)責(zé)消費(fèi)消息,消費(fèi)者從消息服務(wù)器拉取信息并將其輸入用戶應(yīng)用程序。站在用戶應(yīng)用的角度消費(fèi)者有兩種類型:拉取型消費(fèi)者、推送型消費(fèi)者。
拉取型消費(fèi)者
拉取型消費(fèi)者(Pull Consumer)主動從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會啟動消費(fèi)過程,所以 Pull 稱為主動消費(fèi)型。
推送型消費(fèi)者
推送型消費(fèi)者(Push Consumer)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來實(shí)現(xiàn)。所以 Push 稱為被動消費(fèi)類型,但從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費(fèi)監(jiān)聽器,當(dāng)監(jiān)聽器處觸發(fā)后才開始消費(fèi)消息。
消費(fèi)者組
消費(fèi)者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)同一類消息并且消費(fèi)邏輯一致,所以將這些 Consumer 分組在一起。消費(fèi)者組與生產(chǎn)者組類似,都是將相同角色的分組在一起并命名,分組是個很精妙的概念設(shè)計,RocketMQ 正是通過這種分組機(jī)制,實(shí)現(xiàn)了天然的消息負(fù)載均衡。消費(fèi)消息時通過 Consumer Group 實(shí)現(xiàn)了將消息分發(fā)到多個消費(fèi)者服務(wù)器實(shí)例,比如某個 Topic 有9條消息,其中一個 Consumer Group 有3個實(shí)例(3個進(jìn)程或3臺機(jī)器),那么每個實(shí)例將均攤3條消息,這也意味著我們可以很方便的通過加機(jī)器來實(shí)現(xiàn)水平擴(kuò)展。
消息服務(wù)器
消息服務(wù)器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息并存儲, Consumer 從這里取得消息。它還存儲與消息相關(guān)的元數(shù)據(jù),包括用戶組、消費(fèi)進(jìn)度偏移量、隊列信息等。從部署結(jié)構(gòu)圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結(jié)構(gòu)上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。
單 Master
這種方式一旦 Broker 重啟或宕機(jī)會導(dǎo)致整個服務(wù)不可用,這種方式風(fēng)險較大,所以顯然不建議線上環(huán)境使用。
多 Master
所有消息服務(wù)器都是 Master ,沒有 Slave 。這種方式優(yōu)點(diǎn)是配置簡單,單個 Master 宕機(jī)或重啟維護(hù)對應(yīng)用無影響。缺點(diǎn)是單臺機(jī)器宕機(jī)期間,該機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱,消息實(shí)時性會受影響。
多 Master 多 Slave(異步復(fù)制)
每個 Master 配置一個 Slave,所以有多對 Master-Slave,消息采用異步復(fù)制方式,主備之間有毫秒級消息延遲。這種方式優(yōu)點(diǎn)是消息丟失的非常少,且消息實(shí)時性不會受影響,Master 宕機(jī)后消費(fèi)者可以繼續(xù)從 Slave 消費(fèi),中間的過程對用戶應(yīng)用程序透明,不需要人工干預(yù),性能同多 Master 方式幾乎一樣。缺點(diǎn)是 Master 宕機(jī)時在磁盤損壞情況下會丟失極少量消息。
多 Master 多 Slave(同步雙寫)
每個 Master 配置一個 Slave,所以有多對 Master-Slave ,消息采用同步雙寫方式,主備都寫成功才返回成功。這種方式優(yōu)點(diǎn)是數(shù)據(jù)與服務(wù)都沒有單點(diǎn)問題,Master 宕機(jī)時消息無延遲,服務(wù)與數(shù)據(jù)的可用性非常高。缺點(diǎn)是性能相對異步復(fù)制方式略低,發(fā)送消息的延遲會略高。
名稱服務(wù)器
名稱服務(wù)器(NameServer)用來保存 Broker 相關(guān)元信息并給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設(shè)計成幾乎無狀態(tài)的,可以橫向擴(kuò)展,節(jié)點(diǎn)之間相互之間無通信,通過部署多臺機(jī)器來標(biāo)記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發(fā)送消息前會根據(jù) Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應(yīng)該是和 ZooKeeper 差不多,據(jù)說 RocketMQ 的早期版本確實(shí)是使用的 ZooKeeper ,后來改為了自己實(shí)現(xiàn)的 NameServer 。
消息
消息(Message)就是要傳輸?shù)男畔?。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標(biāo)簽(Tag)和額處的鍵值對,它們可以用于設(shè)置一個業(yè)務(wù) key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。
主題
主題(Topic)可以看做消息的規(guī)類,它是消息的第一級類型。比如一個電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個 Topic 可以有0個、1個、多個生產(chǎn)者向其發(fā)送消息,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息。一個 Topic 也可以被 0個、1個、多個消費(fèi)者訂閱。
標(biāo)簽
標(biāo)簽(Tag)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標(biāo)識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag 。標(biāo)簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。
消息隊列
消息隊列(Message Queue),主題被劃分為一個或多個子主題,即消息隊列。一個 Topic 下可以設(shè)置多個消息隊列,發(fā)送消息時執(zhí)行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發(fā)出去。下圖 Broker 內(nèi)部消息情況:
消息消費(fèi)模式
消息消費(fèi)模式有兩種:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。默認(rèn)情況下就是集群消費(fèi),該模式下一個消費(fèi)者集群共同消費(fèi)一個主題的多個隊列,一個隊列只會被一個消費(fèi)者消費(fèi),如果某個消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。而廣播消費(fèi)消息會發(fā)給消費(fèi)者組中的每一個消費(fèi)者進(jìn)行消費(fèi)。
消息順序
消息順序(Message Order)有兩種:順序消費(fèi)(Orderly)和并行消費(fèi)(Concurrently)。順序消費(fèi)表示消息消費(fèi)的順序同生產(chǎn)者為每個消息隊列發(fā)送的順序一致,所以如果正在處理全局順序是強(qiáng)制性的場景,需要確保使用的主題只有一個消息隊列。并行消費(fèi)不再保證消息順序,消費(fèi)的最大并行數(shù)量受每個消費(fèi)者客戶端指定的線程池限制。
工程實(shí)例
Java 訪問 RocketMQ 實(shí)例
RocketMQ 目前支持 Java、C++、Go 三種語言訪問,按慣例以 Java 語言為例看下如何用 RocketMQ 來收發(fā)消息的。
引入依賴
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致即可。
消息生產(chǎn)者
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class Producer {
public static void main(String[] args) throws Exception {
//創(chuàng)建一個消息生產(chǎn)者,并設(shè)置一個消息生產(chǎn)者組
DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");
//指定 NameServer 地址
producer.setNamesrvAddr("localhost:9876");
//初始化 Producer,整個應(yīng)用生命周期內(nèi)只需要初始化一次
producer.start();
for (int i = 0; i < 100; i++) {
//創(chuàng)建一條消息對象,指定其主題、標(biāo)簽和消息內(nèi)容
Message msg = new Message(
"topic_example_java" /* 消息主題名 */,
"TagA" /* 消息標(biāo)簽 */,
("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內(nèi)容 */
);
//發(fā)送消息并返回結(jié)果
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
}
// 一旦生產(chǎn)者實(shí)例不再被使用則將其關(guān)閉,包括清理資源,關(guān)閉網(wǎng)絡(luò)連接等
producer.shutdown();
}
}
示例中用 DefaultMQProducer 類來創(chuàng)建一個消息生產(chǎn)者,通常一個應(yīng)用創(chuàng)建一個 DefaultMQProducer 對象,所以一般由應(yīng)用來維護(hù)生產(chǎn)者對象,可以其設(shè)置為全局對象或者單例。該類構(gòu)造函數(shù)入?yún)?producerGroup 是消息生產(chǎn)者組的名字,無論生產(chǎn)者還是消費(fèi)者都必須給出 GroupName ,并保證該名字的唯一性,ProducerGroup 發(fā)送普通的消息時作用不大,后面介紹分布式事務(wù)消息時會用到。
接下來指定 NameServer 地址和調(diào)用 start 方法初始化,在整個應(yīng)用生命周期內(nèi)只需要調(diào)用一次 start 方法。
初始化完成后,調(diào)用 send 方法發(fā)送消息,示例中只是簡單的構(gòu)造了100條同樣的消息發(fā)送,其實(shí)一個 Producer 對象可以發(fā)送多個主題多個標(biāo)簽的消息,消息對象的標(biāo)簽可以為空。send 方法是同步調(diào)用,只要不拋異常就標(biāo)識成功。
最后應(yīng)用退出時調(diào)用 shutdown 方法清理資源、關(guān)閉網(wǎng)絡(luò)連接,從服務(wù)器上注銷自己,通常建議應(yīng)用在 JBOSS、Tomcat 等容器的退出鉤子里調(diào)用 shutdown 方法。
消息消費(fèi)者
package org.study.mq.rocketMQ.java;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;
public class Consumer {
public static void main(String[] args) throws Exception {
//創(chuàng)建一個消息消費(fèi)者,并設(shè)置一個消息消費(fèi)者組
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
//指定 NameServer 地址
consumer.setNamesrvAddr("localhost:9876");
//設(shè)置 Consumer 第一次啟動時從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的所有消息
consumer.subscribe("topic_example_java", "*");
//注冊消息監(jiān)聽器
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
//默認(rèn) list 里只有一條消息,可以通過設(shè)置參數(shù)來批量接收消息
if (list != null) {
for (MessageExt ext : list) {
try {
System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 消費(fèi)者對象在使用之前必須要調(diào)用 start 初始化
consumer.start();
System.out.println("消息消費(fèi)者已啟動");
}
}
示例中用 DefaultMQPushConsumer 類來創(chuàng)建一個消息消費(fèi)者,通生產(chǎn)者一樣一個應(yīng)用一般創(chuàng)建一個 DefaultMQPushConsumer 對象,該對象一般由應(yīng)用來維護(hù),可以其設(shè)置為全局對象或者單例。該類構(gòu)造函數(shù)入?yún)?consumerGroup 是消息消費(fèi)者組的名字,需要保證該名字的唯一性。
接下來指定 NameServer 地址和設(shè)置消費(fèi)者應(yīng)用程序第一次啟動時從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)。
接著調(diào)用 subscribe 方法給消費(fèi)者對象訂閱指定主題下的消息,該方法第一個參數(shù)是主題名,第二個擦?xí)菢?biāo)簽名,示例表示訂閱了主題名 topic_example_java 下所有標(biāo)簽的消息。
最主要的是注冊消息監(jiān)聽器才能消費(fèi)消息,示例中用的是 Consumer Push 的方式,即設(shè)置監(jiān)聽器回調(diào)的方式消費(fèi)消息,默認(rèn)監(jiān)聽回調(diào)方法中 List<MessageExt> 里只有一條消息,可以通過設(shè)置參數(shù)來批量接收消息。
最后調(diào)用 start 方法初始化,在整個應(yīng)用生命周期內(nèi)只需要調(diào)用一次 start 方法。
啟動 Name Server
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
RocketMQ 核心的四大組件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,所以要啟動這兩個應(yīng)用才能提供消息服務(wù)。首先啟動 Name Server,先確保你的機(jī)器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設(shè)置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqnamesrv ,默認(rèn)會將該命令的執(zhí)行情況輸出到當(dāng)前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Name Server 的實(shí)際運(yùn)行情況。
啟動 Broker
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
同樣也要確保你的機(jī)器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設(shè)置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqbroker ,默認(rèn)會將該命令的執(zhí)行情況輸出到當(dāng)前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Broker 的實(shí)際運(yùn)行情況。
運(yùn)行 Consumer
先運(yùn)行 Consumer 類,這樣當(dāng)生產(chǎn)者發(fā)送消息的時候能在消費(fèi)者后端看到消息記錄。配置沒問題的話會看到在控制臺打印出消息消費(fèi)者已啟動
運(yùn)行 Producer
最后運(yùn)行 Producer 類,在 Consumer 的控制臺能看到接收的消息
Spring 整合 RocketMQ
不同于 RabbitMQ、ActiveMQ、Kafka 等消息中間件,Spring 社區(qū)已經(jīng)通過多種方式提供了對這些中間件產(chǎn)品集成,例如通過 spring-jms 整合 ActiveMQ、通過 Spring AMQP 項目下的 spring-rabbit 整合 RabbitMQ、通過 spring-kafka 整合 kafka ,通過他們可以在 Spring 項目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三種方式,一是將消息生產(chǎn)者和消費(fèi)者定義成 bean 對象交由 Spring 容器管理,二是使用 RocketMQ 社區(qū)的外部項目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通過 spring-jms 方式集成使用,三是如果你的應(yīng)用是基于 spring-boot 的,可以使用 RocketMQ 的外部項目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比較方便的收發(fā)消息。
總的來講 rocketmq-jms 項目實(shí)現(xiàn)了 JMS 1.1 規(guī)范的部分內(nèi)容,目前支持 JMS 中的發(fā)布/訂閱模型收發(fā)消息。rocketmq-spring-boot-starter 項目目前已經(jīng)支持同步發(fā)送、異步發(fā)送、單向發(fā)送、順序消費(fèi)、并行消費(fèi)、集群消費(fèi)、廣播消費(fèi)等特性,如果比較喜歡 Spring Boot 這種全家桶的快速開發(fā)框架并且現(xiàn)有特性已滿足業(yè)務(wù)要求可以使用該項目。當(dāng)然從 API 使用上最靈活的還是第一種方式,下面以第一種方式為例簡單看下Spring 如何集成 RocketMQ 的。
消息生產(chǎn)者
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class SpringProducer {
private Logger logger = Logger.getLogger(getClass());
private String producerGroupName;
private String nameServerAddr;
private DefaultMQProducer producer;
public SpringProducer(String producerGroupName, String nameServerAddr) {
this.producerGroupName = producerGroupName;
this.nameServerAddr = nameServerAddr;
}
public void init() throws Exception {
logger.info("開始啟動消息生產(chǎn)者服務(wù)...");
//創(chuàng)建一個消息生產(chǎn)者,并設(shè)置一個消息生產(chǎn)者組
producer = new DefaultMQProducer(producerGroupName);
//指定 NameServer 地址
producer.setNamesrvAddr(nameServerAddr);
//初始化 SpringProducer,整個應(yīng)用生命周期內(nèi)只需要初始化一次
producer.start();
logger.info("消息生產(chǎn)者服務(wù)啟動成功.");
}
public void destroy() {
logger.info("開始關(guān)閉消息生產(chǎn)者服務(wù)...");
producer.shutdown();
logger.info("消息生產(chǎn)者服務(wù)已關(guān)閉.");
}
public DefaultMQProducer getProducer() {
return producer;
}
}
消息生產(chǎn)者就是把生產(chǎn)者 DefaultMQProducer 對象的生命周期分成構(gòu)造函數(shù)、init、destroy 三個方法,構(gòu)造函數(shù)中將生產(chǎn)者組名、NameServer 地址作為變量由 Spring 容器在配置時提供,init 方法中實(shí)例化 DefaultMQProducer 對象、設(shè)置 NameServer 地址、初始化生產(chǎn)者對象,destroy 方法用于生產(chǎn)者對象銷毀時清理資源。
消息消費(fèi)者
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
public class SpringConsumer {
private Logger logger = Logger.getLogger(getClass());
private String consumerGroupName;
private String nameServerAddr;
private String topicName;
private DefaultMQPushConsumer consumer;
private MessageListenerConcurrently messageListener;
public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
this.consumerGroupName = consumerGroupName;
this.nameServerAddr = nameServerAddr;
this.topicName = topicName;
this.messageListener = messageListener;
}
public void init() throws Exception {
logger.info("開始啟動消息消費(fèi)者服務(wù)...");
//創(chuàng)建一個消息消費(fèi)者,并設(shè)置一個消息消費(fèi)者組
consumer = new DefaultMQPushConsumer(consumerGroupName);
//指定 NameServer 地址
consumer.setNamesrvAddr(nameServerAddr);
//設(shè)置Consumer第一次啟動是從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
//訂閱指定 Topic 下的所有消息
consumer.subscribe(topicName, "*");
//注冊消息監(jiān)聽器
consumer.registerMessageListener(messageListener);
// 消費(fèi)者對象在使用之前必須要調(diào)用 start 初始化
consumer.start();
logger.info("消息消費(fèi)者服務(wù)啟動成功.");
}
public void destroy(){
logger.info("開始關(guān)閉消息消費(fèi)者服務(wù)...");
consumer.shutdown();
logger.info("消息消費(fèi)者服務(wù)已關(guān)閉.");
}
public DefaultMQPushConsumer getConsumer() {
return consumer;
}
}
同消息生產(chǎn)者類似,消息消費(fèi)者是把生產(chǎn)者 DefaultMQPushConsumer 對象的生命周期分成構(gòu)造函數(shù)、init、destroy 三個方法,具體含義在介紹 Java 訪問 RocketMQ 實(shí)例時已經(jīng)介紹過了,不再贅述。當(dāng)然,有了消費(fèi)者對象還需要消息監(jiān)聽器在接收到消息后執(zhí)行具體的處理邏輯。
package org.study.mq.rocketMQ.spring;
import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.io.UnsupportedEncodingException;
import java.util.List;
public class MessageListener implements MessageListenerConcurrently {
private Logger logger = Logger.getLogger(getClass());
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
if (list != null) {
for (MessageExt ext : list) {
try {
logger.info("監(jiān)聽到消息 : " + new String(ext.getBody(), "UTF-8"));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
消息監(jiān)聽器類就是把前面 Java 示例中注冊消息監(jiān)聽器時聲明的匿名內(nèi)部類代碼抽取出來定義成單獨(dú)一個類而已。
Spring 配置文件
因為只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要額外添加依賴包了。本例中將消息生產(chǎn)者和消息消費(fèi)者分成兩個配置文件,這樣能更好的演示收發(fā)消息的效果。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="producerGroupName" value="spring_producer_group"/>
</bean>
</beans>
消息生產(chǎn)者配置很簡單,定義了一個消息生產(chǎn)者對象,該對象初始化時調(diào)用 init 方法,對象銷毀前執(zhí)行 destroy 方法,將 Name Server 地址和生產(chǎn)者組配置好。
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">
<bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />
<bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
<constructor-arg name="nameServerAddr" value="localhost:9876"/>
<constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
<constructor-arg name="topicName" value="spring-rocketMQ-topic" />
<constructor-arg name="messageListener" ref="messageListener" />
</bean>
</beans>
消息消費(fèi)者同消息生產(chǎn)者配置類似,多了一個消息監(jiān)聽器對象的定義和綁定。
運(yùn)行實(shí)例程序
按前述步驟 啟動 Name Server 和 Broker,接著運(yùn)行消息生產(chǎn)者和消息消費(fèi)者程序,簡化起見我們用兩個單元測試類模擬這兩個程序:
package org.study.mq.rocketMQ.spring;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringProducerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
}
@Test
public void sendMessage() throws Exception {
SpringProducer producer = container.getBean(SpringProducer.class);
for (int i = 0; i < 20; i++) {
//創(chuàng)建一條消息對象,指定其主題、標(biāo)簽和消息內(nèi)容
Message msg = new Message(
"spring-rocketMQ-topic",
null,
("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內(nèi)容 */
);
//發(fā)送消息并返回結(jié)果
SendResult sendResult = producer.getProducer().send(msg);
System.out.printf("%s%n", sendResult);
}
}
}
SpringProducerTest 類模擬消息生產(chǎn)者發(fā)送消息。
package org.study.mq.rocketMQ.spring;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class SpringConsumerTest {
private ApplicationContext container;
@Before
public void setup() {
container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
}
@Test
public void consume() throws Exception {
SpringConsumer consumer = container.getBean(SpringConsumer.class);
Thread.sleep(200 * 1000);
consumer.destroy();
}
}
SpringConsumerTest 類模擬消息消費(fèi)者者接收消息,在 consume 方法返回之前需要讓當(dāng)前線程睡眠一段時間,使消費(fèi)者程序繼續(xù)存活才能監(jiān)聽到生產(chǎn)者發(fā)送的消息。
分別運(yùn)行 SpringProducerTest 類 和 SpringConsumerTest 類,在 SpringConsumerTest 的控制臺能看到接收的消息:
假如啟動兩個 SpringConsumerTest 類進(jìn)程,因為它們屬于同一消費(fèi)者組,在 SpringConsumerTest 的控制臺能看到它們均攤到了消息: