消息隊列之 RocketMQ

簡介

RocketMQ 特點(diǎn)

RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給 Apache 軟件基金會,并于2017年9月25日成為 Apache 的頂級項目。作為經(jīng)歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國產(chǎn)中間件,以其高性能、低延時和高可靠等特性近年來已經(jīng)也被越來越多的國內(nèi)企業(yè)使用。其主要特點(diǎn)有:

  1. 靈活可擴(kuò)展性
    RocketMQ 天然支持集群,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點(diǎn)故障的情況下進(jìn)行水平擴(kuò)展。

  2. 海量消息堆積能力
    RocketMQ 采用零拷貝原理實(shí)現(xiàn)超大的消息的堆積能力,據(jù)說單機(jī)已可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。

  3. 支持順序消息
    可以保證消息消費(fèi)者按照消息發(fā)送的順序?qū)ο⑦M(jìn)行消費(fèi)。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產(chǎn)者通過將某一類消息按順序發(fā)送至同一個隊列來實(shí)現(xiàn)。

  4. 多種消息過濾方式
    消息過濾分為在服務(wù)器端過濾和在消費(fèi)端過濾。服務(wù)器端過濾時可以按照消息消費(fèi)者的要求做過濾,優(yōu)點(diǎn)是減少不必要消息傳輸,缺點(diǎn)是增加了消息服務(wù)器的負(fù)擔(dān),實(shí)現(xiàn)相對復(fù)雜。消費(fèi)端過濾則完全由具體應(yīng)用自定義實(shí)現(xiàn),這種方式更加靈活,缺點(diǎn)是很多無用的消息會傳輸給消息消費(fèi)者。

  5. 支持事務(wù)消息
    RocketMQ 除了支持普通消息,順序消息之外還支持事務(wù)消息,這個特性對于分布式事務(wù)來說提供了又一種解決思路。

  6. 回溯消費(fèi)
    回溯消費(fèi)是指消費(fèi)者已經(jīng)消費(fèi)成功的消息,由于業(yè)務(wù)上需求需要重新消費(fèi),RocketMQ 支持按照時間回溯消費(fèi),時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

基本概念

下面是一張 RocketMQ 的部署結(jié)構(gòu)圖,里面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每個組件都可以部署成集群模式進(jìn)行水平擴(kuò)展。

部署結(jié)構(gòu)圖

生產(chǎn)者

生產(chǎn)者(Producer)負(fù)責(zé)產(chǎn)生消息,生產(chǎn)者向消息服務(wù)器發(fā)送由業(yè)務(wù)應(yīng)用程序系統(tǒng)生成的消息。 RocketMQ 提供了三種方式發(fā)送消息:同步、異步和單向。

同步發(fā)送

同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會在收到接收方發(fā)回響應(yīng)之后才發(fā)下一個數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營銷短信。

異步發(fā)送

異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應(yīng),接著發(fā)送下個數(shù)據(jù)包,一般用于可能鏈路耗時較長而對響應(yīng)時間敏感的業(yè)務(wù)場景,例如用戶視頻上傳后通知啟動轉(zhuǎn)碼服務(wù)。

單向發(fā)送

單向發(fā)送是指只負(fù)責(zé)發(fā)送消息而不等待服務(wù)器回應(yīng)且沒有回調(diào)函數(shù)觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。

生產(chǎn)者組

生產(chǎn)者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發(fā)送一類消息并且發(fā)送邏輯一致,所以將這些 Producer 分組在一起。從部署結(jié)構(gòu)上看生產(chǎn)者通過 Producer Group 的名字來標(biāo)記自己是一個集群。

消費(fèi)者

消費(fèi)者(Consumer)負(fù)責(zé)消費(fèi)消息,消費(fèi)者從消息服務(wù)器拉取信息并將其輸入用戶應(yīng)用程序。站在用戶應(yīng)用的角度消費(fèi)者有兩種類型:拉取型消費(fèi)者、推送型消費(fèi)者。

拉取型消費(fèi)者

拉取型消費(fèi)者(Pull Consumer)主動從消息服務(wù)器拉取信息,只要批量拉取到消息,用戶應(yīng)用就會啟動消費(fèi)過程,所以 Pull 稱為主動消費(fèi)型。

推送型消費(fèi)者

推送型消費(fèi)者(Push Consumer)封裝了消息的拉取、消費(fèi)進(jìn)度和其他的內(nèi)部維護(hù)工作,將消息到達(dá)時執(zhí)行的回調(diào)接口留給用戶應(yīng)用程序來實(shí)現(xiàn)。所以 Push 稱為被動消費(fèi)類型,但從實(shí)現(xiàn)上看還是從消息服務(wù)器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費(fèi)監(jiān)聽器,當(dāng)監(jiān)聽器處觸發(fā)后才開始消費(fèi)消息。

消費(fèi)者組

消費(fèi)者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 通常消費(fèi)同一類消息并且消費(fèi)邏輯一致,所以將這些 Consumer 分組在一起。消費(fèi)者組與生產(chǎn)者組類似,都是將相同角色的分組在一起并命名,分組是個很精妙的概念設(shè)計,RocketMQ 正是通過這種分組機(jī)制,實(shí)現(xiàn)了天然的消息負(fù)載均衡。消費(fèi)消息時通過 Consumer Group 實(shí)現(xiàn)了將消息分發(fā)到多個消費(fèi)者服務(wù)器實(shí)例,比如某個 Topic 有9條消息,其中一個 Consumer Group 有3個實(shí)例(3個進(jìn)程或3臺機(jī)器),那么每個實(shí)例將均攤3條消息,這也意味著我們可以很方便的通過加機(jī)器來實(shí)現(xiàn)水平擴(kuò)展。

消息服務(wù)器

消息服務(wù)器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息并存儲, Consumer 從這里取得消息。它還存儲與消息相關(guān)的元數(shù)據(jù),包括用戶組、消費(fèi)進(jìn)度偏移量、隊列信息等。從部署結(jié)構(gòu)圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結(jié)構(gòu)上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。

單 Master

這種方式一旦 Broker 重啟或宕機(jī)會導(dǎo)致整個服務(wù)不可用,這種方式風(fēng)險較大,所以顯然不建議線上環(huán)境使用。

多 Master

所有消息服務(wù)器都是 Master ,沒有 Slave 。這種方式優(yōu)點(diǎn)是配置簡單,單個 Master 宕機(jī)或重啟維護(hù)對應(yīng)用無影響。缺點(diǎn)是單臺機(jī)器宕機(jī)期間,該機(jī)器上未被消費(fèi)的消息在機(jī)器恢復(fù)之前不可訂閱,消息實(shí)時性會受影響。

多 Master 多 Slave(異步復(fù)制)

每個 Master 配置一個 Slave,所以有多對 Master-Slave,消息采用異步復(fù)制方式,主備之間有毫秒級消息延遲。這種方式優(yōu)點(diǎn)是消息丟失的非常少,且消息實(shí)時性不會受影響,Master 宕機(jī)后消費(fèi)者可以繼續(xù)從 Slave 消費(fèi),中間的過程對用戶應(yīng)用程序透明,不需要人工干預(yù),性能同多 Master 方式幾乎一樣。缺點(diǎn)是 Master 宕機(jī)時在磁盤損壞情況下會丟失極少量消息。

多 Master 多 Slave(同步雙寫)

每個 Master 配置一個 Slave,所以有多對 Master-Slave ,消息采用同步雙寫方式,主備都寫成功才返回成功。這種方式優(yōu)點(diǎn)是數(shù)據(jù)與服務(wù)都沒有單點(diǎn)問題,Master 宕機(jī)時消息無延遲,服務(wù)與數(shù)據(jù)的可用性非常高。缺點(diǎn)是性能相對異步復(fù)制方式略低,發(fā)送消息的延遲會略高。

名稱服務(wù)器

名稱服務(wù)器(NameServer)用來保存 Broker 相關(guān)元信息并給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設(shè)計成幾乎無狀態(tài)的,可以橫向擴(kuò)展,節(jié)點(diǎn)之間相互之間無通信,通過部署多臺機(jī)器來標(biāo)記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發(fā)送消息前會根據(jù) Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應(yīng)該是和 ZooKeeper 差不多,據(jù)說 RocketMQ 的早期版本確實(shí)是使用的 ZooKeeper ,后來改為了自己實(shí)現(xiàn)的 NameServer 。

消息

消息(Message)就是要傳輸?shù)男畔?。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標(biāo)簽(Tag)和額處的鍵值對,它們可以用于設(shè)置一個業(yè)務(wù) key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。

主題

主題(Topic)可以看做消息的規(guī)類,它是消息的第一級類型。比如一個電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產(chǎn)者和消費(fèi)者的關(guān)系非常松散,一個 Topic 可以有0個、1個、多個生產(chǎn)者向其發(fā)送消息,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息。一個 Topic 也可以被 0個、1個、多個消費(fèi)者訂閱。

標(biāo)簽

標(biāo)簽(Tag)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標(biāo)簽,同一業(yè)務(wù)模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標(biāo)識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag 。標(biāo)簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。

消息隊列

消息隊列(Message Queue),主題被劃分為一個或多個子主題,即消息隊列。一個 Topic 下可以設(shè)置多個消息隊列,發(fā)送消息時執(zhí)行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發(fā)出去。下圖 Broker 內(nèi)部消息情況:

Broker 內(nèi)部消息
消息消費(fèi)模式

消息消費(fèi)模式有兩種:集群消費(fèi)(Clustering)和廣播消費(fèi)(Broadcasting)。默認(rèn)情況下就是集群消費(fèi),該模式下一個消費(fèi)者集群共同消費(fèi)一個主題的多個隊列,一個隊列只會被一個消費(fèi)者消費(fèi),如果某個消費(fèi)者掛掉,分組內(nèi)其它消費(fèi)者會接替掛掉的消費(fèi)者繼續(xù)消費(fèi)。而廣播消費(fèi)消息會發(fā)給消費(fèi)者組中的每一個消費(fèi)者進(jìn)行消費(fèi)。

消息順序

消息順序(Message Order)有兩種:順序消費(fèi)(Orderly)和并行消費(fèi)(Concurrently)。順序消費(fèi)表示消息消費(fèi)的順序同生產(chǎn)者為每個消息隊列發(fā)送的順序一致,所以如果正在處理全局順序是強(qiáng)制性的場景,需要確保使用的主題只有一個消息隊列。并行消費(fèi)不再保證消息順序,消費(fèi)的最大并行數(shù)量受每個消費(fèi)者客戶端指定的線程池限制。

工程實(shí)例

Java 訪問 RocketMQ 實(shí)例

RocketMQ 目前支持 Java、C++、Go 三種語言訪問,按慣例以 Java 語言為例看下如何用 RocketMQ 來收發(fā)消息的。

引入依賴

  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.2.0</version>
  </dependency>

添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致即可。

消息生產(chǎn)者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個消息生產(chǎn)者,并設(shè)置一個消息生產(chǎn)者組
        DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        //初始化 Producer,整個應(yīng)用生命周期內(nèi)只需要初始化一次
        producer.start();

        for (int i = 0; i < 100; i++) {
            //創(chuàng)建一條消息對象,指定其主題、標(biāo)簽和消息內(nèi)容
            Message msg = new Message(
                    "topic_example_java" /* 消息主題名 */,
                    "TagA" /* 消息標(biāo)簽 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內(nèi)容 */
            );

            //發(fā)送消息并返回結(jié)果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生產(chǎn)者實(shí)例不再被使用則將其關(guān)閉,包括清理資源,關(guān)閉網(wǎng)絡(luò)連接等
        producer.shutdown();
    }
}

示例中用 DefaultMQProducer 類來創(chuàng)建一個消息生產(chǎn)者,通常一個應(yīng)用創(chuàng)建一個 DefaultMQProducer 對象,所以一般由應(yīng)用來維護(hù)生產(chǎn)者對象,可以其設(shè)置為全局對象或者單例。該類構(gòu)造函數(shù)入?yún)?producerGroup 是消息生產(chǎn)者組的名字,無論生產(chǎn)者還是消費(fèi)者都必須給出 GroupName ,并保證該名字的唯一性,ProducerGroup 發(fā)送普通的消息時作用不大,后面介紹分布式事務(wù)消息時會用到。

接下來指定 NameServer 地址和調(diào)用 start 方法初始化,在整個應(yīng)用生命周期內(nèi)只需要調(diào)用一次 start 方法。

初始化完成后,調(diào)用 send 方法發(fā)送消息,示例中只是簡單的構(gòu)造了100條同樣的消息發(fā)送,其實(shí)一個 Producer 對象可以發(fā)送多個主題多個標(biāo)簽的消息,消息對象的標(biāo)簽可以為空。send 方法是同步調(diào)用,只要不拋異常就標(biāo)識成功。

最后應(yīng)用退出時調(diào)用 shutdown 方法清理資源、關(guān)閉網(wǎng)絡(luò)連接,從服務(wù)器上注銷自己,通常建議應(yīng)用在 JBOSS、Tomcat 等容器的退出鉤子里調(diào)用 shutdown 方法。

消息消費(fèi)者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

public class Consumer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個消息消費(fèi)者,并設(shè)置一個消息消費(fèi)者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //設(shè)置 Consumer 第一次啟動時從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱指定 Topic 下的所有消息
        consumer.subscribe("topic_example_java", "*");

        //注冊消息監(jiān)聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默認(rèn) list 里只有一條消息,可以通過設(shè)置參數(shù)來批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消費(fèi)者對象在使用之前必須要調(diào)用 start 初始化
        consumer.start();
        System.out.println("消息消費(fèi)者已啟動");
    }
}

示例中用 DefaultMQPushConsumer 類來創(chuàng)建一個消息消費(fèi)者,通生產(chǎn)者一樣一個應(yīng)用一般創(chuàng)建一個 DefaultMQPushConsumer 對象,該對象一般由應(yīng)用來維護(hù),可以其設(shè)置為全局對象或者單例。該類構(gòu)造函數(shù)入?yún)?consumerGroup 是消息消費(fèi)者組的名字,需要保證該名字的唯一性。

接下來指定 NameServer 地址和設(shè)置消費(fèi)者應(yīng)用程序第一次啟動時從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)。

接著調(diào)用 subscribe 方法給消費(fèi)者對象訂閱指定主題下的消息,該方法第一個參數(shù)是主題名,第二個擦?xí)菢?biāo)簽名,示例表示訂閱了主題名 topic_example_java 下所有標(biāo)簽的消息。

最主要的是注冊消息監(jiān)聽器才能消費(fèi)消息,示例中用的是 Consumer Push 的方式,即設(shè)置監(jiān)聽器回調(diào)的方式消費(fèi)消息,默認(rèn)監(jiān)聽回調(diào)方法中 List<MessageExt> 里只有一條消息,可以通過設(shè)置參數(shù)來批量接收消息。

最后調(diào)用 start 方法初始化,在整個應(yīng)用生命周期內(nèi)只需要調(diào)用一次 start 方法。

啟動 Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ 核心的四大組件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,所以要啟動這兩個應(yīng)用才能提供消息服務(wù)。首先啟動 Name Server,先確保你的機(jī)器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設(shè)置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqnamesrv ,默認(rèn)會將該命令的執(zhí)行情況輸出到當(dāng)前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Name Server 的實(shí)際運(yùn)行情況。

啟動 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

同樣也要確保你的機(jī)器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設(shè)置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqbroker ,默認(rèn)會將該命令的執(zhí)行情況輸出到當(dāng)前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Broker 的實(shí)際運(yùn)行情況。

運(yùn)行 Consumer

先運(yùn)行 Consumer 類,這樣當(dāng)生產(chǎn)者發(fā)送消息的時候能在消費(fèi)者后端看到消息記錄。配置沒問題的話會看到在控制臺打印出消息消費(fèi)者已啟動

運(yùn)行 Producer

最后運(yùn)行 Producer 類,在 Consumer 的控制臺能看到接收的消息

消費(fèi)者接收到消息

Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中間件,Spring 社區(qū)已經(jīng)通過多種方式提供了對這些中間件產(chǎn)品集成,例如通過 spring-jms 整合 ActiveMQ、通過 Spring AMQP 項目下的 spring-rabbit 整合 RabbitMQ、通過 spring-kafka 整合 kafka ,通過他們可以在 Spring 項目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三種方式,一是將消息生產(chǎn)者和消費(fèi)者定義成 bean 對象交由 Spring 容器管理,二是使用 RocketMQ 社區(qū)的外部項目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通過 spring-jms 方式集成使用,三是如果你的應(yīng)用是基于 spring-boot 的,可以使用 RocketMQ 的外部項目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比較方便的收發(fā)消息。

總的來講 rocketmq-jms 項目實(shí)現(xiàn)了 JMS 1.1 規(guī)范的部分內(nèi)容,目前支持 JMS 中的發(fā)布/訂閱模型收發(fā)消息。rocketmq-spring-boot-starter 項目目前已經(jīng)支持同步發(fā)送、異步發(fā)送、單向發(fā)送、順序消費(fèi)、并行消費(fèi)、集群消費(fèi)、廣播消費(fèi)等特性,如果比較喜歡 Spring Boot 這種全家桶的快速開發(fā)框架并且現(xiàn)有特性已滿足業(yè)務(wù)要求可以使用該項目。當(dāng)然從 API 使用上最靈活的還是第一種方式,下面以第一種方式為例簡單看下Spring 如何集成 RocketMQ 的。

消息生產(chǎn)者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class SpringProducer {

    private Logger logger = Logger.getLogger(getClass());

    private String producerGroupName;

    private String nameServerAddr;

    private DefaultMQProducer producer;

    public SpringProducer(String producerGroupName, String nameServerAddr) {
        this.producerGroupName = producerGroupName;
        this.nameServerAddr = nameServerAddr;
    }

    public void init() throws Exception {
        logger.info("開始啟動消息生產(chǎn)者服務(wù)...");

        //創(chuàng)建一個消息生產(chǎn)者,并設(shè)置一個消息生產(chǎn)者組
        producer = new DefaultMQProducer(producerGroupName);
        //指定 NameServer 地址
        producer.setNamesrvAddr(nameServerAddr);
        //初始化 SpringProducer,整個應(yīng)用生命周期內(nèi)只需要初始化一次
        producer.start();

        logger.info("消息生產(chǎn)者服務(wù)啟動成功.");
    }

    public void destroy() {
        logger.info("開始關(guān)閉消息生產(chǎn)者服務(wù)...");

        producer.shutdown();

        logger.info("消息生產(chǎn)者服務(wù)已關(guān)閉.");
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }
}

消息生產(chǎn)者就是把生產(chǎn)者 DefaultMQProducer 對象的生命周期分成構(gòu)造函數(shù)、init、destroy 三個方法,構(gòu)造函數(shù)中將生產(chǎn)者組名、NameServer 地址作為變量由 Spring 容器在配置時提供,init 方法中實(shí)例化 DefaultMQProducer 對象、設(shè)置 NameServer 地址、初始化生產(chǎn)者對象,destroy 方法用于生產(chǎn)者對象銷毀時清理資源。

消息消費(fèi)者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class SpringConsumer {

    private Logger logger = Logger.getLogger(getClass());

    private String consumerGroupName;

    private String nameServerAddr;

    private String topicName;

    private DefaultMQPushConsumer consumer;

    private MessageListenerConcurrently messageListener;

    public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
        this.consumerGroupName = consumerGroupName;
        this.nameServerAddr = nameServerAddr;
        this.topicName = topicName;
        this.messageListener = messageListener;
    }


    public void init() throws Exception {
        logger.info("開始啟動消息消費(fèi)者服務(wù)...");

        //創(chuàng)建一個消息消費(fèi)者,并設(shè)置一個消息消費(fèi)者組
        consumer = new DefaultMQPushConsumer(consumerGroupName);
        //指定 NameServer 地址
        consumer.setNamesrvAddr(nameServerAddr);
        //設(shè)置Consumer第一次啟動是從隊列頭部開始消費(fèi)還是隊列尾部開始消費(fèi)
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //訂閱指定 Topic 下的所有消息
        consumer.subscribe(topicName, "*");

        //注冊消息監(jiān)聽器
        consumer.registerMessageListener(messageListener);

        // 消費(fèi)者對象在使用之前必須要調(diào)用 start 初始化
        consumer.start();

        logger.info("消息消費(fèi)者服務(wù)啟動成功.");
    }

    public void destroy(){
        logger.info("開始關(guān)閉消息消費(fèi)者服務(wù)...");

        consumer.shutdown();

        logger.info("消息消費(fèi)者服務(wù)已關(guān)閉.");
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

}

同消息生產(chǎn)者類似,消息消費(fèi)者是把生產(chǎn)者 DefaultMQPushConsumer 對象的生命周期分成構(gòu)造函數(shù)、init、destroy 三個方法,具體含義在介紹 Java 訪問 RocketMQ 實(shí)例時已經(jīng)介紹過了,不再贅述。當(dāng)然,有了消費(fèi)者對象還需要消息監(jiān)聽器在接收到消息后執(zhí)行具體的處理邏輯。

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class MessageListener implements MessageListenerConcurrently {

    private Logger logger = Logger.getLogger(getClass());

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list != null) {
            for (MessageExt ext : list) {
                try {
                    logger.info("監(jiān)聽到消息 : " + new String(ext.getBody(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

消息監(jiān)聽器類就是把前面 Java 示例中注冊消息監(jiān)聽器時聲明的匿名內(nèi)部類代碼抽取出來定義成單獨(dú)一個類而已。

Spring 配置文件

因為只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要額外添加依賴包了。本例中將消息生產(chǎn)者和消息消費(fèi)者分成兩個配置文件,這樣能更好的演示收發(fā)消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="producerGroupName" value="spring_producer_group"/>
    </bean>

</beans>

消息生產(chǎn)者配置很簡單,定義了一個消息生產(chǎn)者對象,該對象初始化時調(diào)用 init 方法,對象銷毀前執(zhí)行 destroy 方法,將 Name Server 地址和生產(chǎn)者組配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />

    <bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
        <constructor-arg name="topicName" value="spring-rocketMQ-topic" />
        <constructor-arg name="messageListener" ref="messageListener" />
    </bean>

</beans>

消息消費(fèi)者同消息生產(chǎn)者配置類似,多了一個消息監(jiān)聽器對象的定義和綁定。

運(yùn)行實(shí)例程序

按前述步驟 啟動 Name Server 和 Broker,接著運(yùn)行消息生產(chǎn)者和消息消費(fèi)者程序,簡化起見我們用兩個單元測試類模擬這兩個程序:

package org.study.mq.rocketMQ.spring;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringProducerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
    }

    @Test
    public void sendMessage() throws Exception {
        SpringProducer producer = container.getBean(SpringProducer.class);

        for (int i = 0; i < 20; i++) {
            //創(chuàng)建一條消息對象,指定其主題、標(biāo)簽和消息內(nèi)容
            Message msg = new Message(
                    "spring-rocketMQ-topic",
                    null,
                    ("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內(nèi)容 */
            );

            //發(fā)送消息并返回結(jié)果
            SendResult sendResult = producer.getProducer().send(msg);

            System.out.printf("%s%n", sendResult);
        }
    }

}

SpringProducerTest 類模擬消息生產(chǎn)者發(fā)送消息。

package org.study.mq.rocketMQ.spring;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringConsumerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
    }

    @Test
    public void consume() throws Exception {
        SpringConsumer consumer = container.getBean(SpringConsumer.class);

        Thread.sleep(200 * 1000);

        consumer.destroy();
    }
}

SpringConsumerTest 類模擬消息消費(fèi)者者接收消息,在 consume 方法返回之前需要讓當(dāng)前線程睡眠一段時間,使消費(fèi)者程序繼續(xù)存活才能監(jiān)聽到生產(chǎn)者發(fā)送的消息。

分別運(yùn)行 SpringProducerTest 類 和 SpringConsumerTest 類,在 SpringConsumerTest 的控制臺能看到接收的消息:

消費(fèi)者接收到消息

假如啟動兩個 SpringConsumerTest 類進(jìn)程,因為它們屬于同一消費(fèi)者組,在 SpringConsumerTest 的控制臺能看到它們均攤到了消息:

消費(fèi)者1
消費(fèi)者2
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容