消息隊列之 RocketMQ

簡介

RocketMQ 特點

RocketMQ 是阿里巴巴在2012年開源的分布式消息中間件,目前已經(jīng)捐贈給 Apache 軟件基金會,并于2017年9月25日成為 Apache 的頂級項目。作為經(jīng)歷過多次阿里巴巴雙十一這種“超級工程”的洗禮并有穩(wěn)定出色表現(xiàn)的國產(chǎn)中間件,以其高性能、低延時和高可靠等特性近年來已經(jīng)也被越來越多的國內企業(yè)使用。其主要特點有:

  1. 靈活可擴展性
    RocketMQ 天然支持集群,其核心四組件(Name Server、Broker、Producer、Consumer)每一個都可以在沒有單點故障的情況下進行水平擴展。

  2. 海量消息堆積能力
    RocketMQ 采用零拷貝原理實現(xiàn)超大的消息的堆積能力,據(jù)說單機已可以支持億級消息堆積,而且在堆積了這么多消息后依然保持寫入低延遲。

  3. 支持順序消息
    可以保證消息消費者按照消息發(fā)送的順序對消息進行消費。順序消息分為全局有序和局部有序,一般推薦使用局部有序,即生產(chǎn)者通過將某一類消息按順序發(fā)送至同一個隊列來實現(xiàn)。

  4. 多種消息過濾方式
    消息過濾分為在服務器端過濾和在消費端過濾。服務器端過濾時可以按照消息消費者的要求做過濾,優(yōu)點是減少不必要消息傳輸,缺點是增加了消息服務器的負擔,實現(xiàn)相對復雜。消費端過濾則完全由具體應用自定義實現(xiàn),這種方式更加靈活,缺點是很多無用的消息會傳輸給消息消費者。

  5. 支持事務消息
    RocketMQ 除了支持普通消息,順序消息之外還支持事務消息,這個特性對于分布式事務來說提供了又一種解決思路。

  6. 回溯消費
    回溯消費是指消費者已經(jīng)消費成功的消息,由于業(yè)務上需求需要重新消費,RocketMQ 支持按照時間回溯消費,時間維度精確到毫秒,可以向前回溯,也可以向后回溯。

基本概念

下面是一張 RocketMQ 的部署結構圖,里面涉及了 RocketMQ 核心的四大組件:Name Server、Broker、Producer、Consumer ,每個組件都可以部署成集群模式進行水平擴展。

部署結構圖

生產(chǎn)者

生產(chǎn)者(Producer)負責產(chǎn)生消息,生產(chǎn)者向消息服務器發(fā)送由業(yè)務應用程序系統(tǒng)生成的消息。 RocketMQ 提供了三種方式發(fā)送消息:同步、異步和單向。

同步發(fā)送

同步發(fā)送指消息發(fā)送方發(fā)出數(shù)據(jù)后會在收到接收方發(fā)回響應之后才發(fā)下一個數(shù)據(jù)包。一般用于重要通知消息,例如重要通知郵件、營銷短信。

異步發(fā)送

異步發(fā)送指發(fā)送方發(fā)出數(shù)據(jù)后,不等接收方發(fā)回響應,接著發(fā)送下個數(shù)據(jù)包,一般用于可能鏈路耗時較長而對響應時間敏感的業(yè)務場景,例如用戶視頻上傳后通知啟動轉碼服務。

單向發(fā)送

單向發(fā)送是指只負責發(fā)送消息而不等待服務器回應且沒有回調函數(shù)觸發(fā),適用于某些耗時非常短但對可靠性要求并不高的場景,例如日志收集。

生產(chǎn)者組

生產(chǎn)者組(Producer Group)是一類 Producer 的集合,這類 Producer 通常發(fā)送一類消息并且發(fā)送邏輯一致,所以將這些 Producer 分組在一起。從部署結構上看生產(chǎn)者通過 Producer Group 的名字來標記自己是一個集群。

消費者

消費者(Consumer)負責消費消息,消費者從消息服務器拉取信息并將其輸入用戶應用程序。站在用戶應用的角度消費者有兩種類型:拉取型消費者、推送型消費者。

拉取型消費者

拉取型消費者(Pull Consumer)主動從消息服務器拉取信息,只要批量拉取到消息,用戶應用就會啟動消費過程,所以 Pull 稱為主動消費型。

推送型消費者

推送型消費者(Push Consumer)封裝了消息的拉取、消費進度和其他的內部維護工作,將消息到達時執(zhí)行的回調接口留給用戶應用程序來實現(xiàn)。所以 Push 稱為被動消費類型,但從實現(xiàn)上看還是從消息服務器中拉取消息,不同于 Pull 的是 Push 首先要注冊消費監(jiān)聽器,當監(jiān)聽器處觸發(fā)后才開始消費消息。

消費者組

消費者組(Consumer Group)一類 Consumer 的集合名稱,這類 Consumer 通常消費同一類消息并且消費邏輯一致,所以將這些 Consumer 分組在一起。消費者組與生產(chǎn)者組類似,都是將相同角色的分組在一起并命名,分組是個很精妙的概念設計,RocketMQ 正是通過這種分組機制,實現(xiàn)了天然的消息負載均衡。消費消息時通過 Consumer Group 實現(xiàn)了將消息分發(fā)到多個消費者服務器實例,比如某個 Topic 有9條消息,其中一個 Consumer Group 有3個實例(3個進程或3臺機器),那么每個實例將均攤3條消息,這也意味著我們可以很方便的通過加機器來實現(xiàn)水平擴展。

消息服務器

消息服務器(Broker)是消息存儲中心,主要作用是接收來自 Producer 的消息并存儲, Consumer 從這里取得消息。它還存儲與消息相關的元數(shù)據(jù),包括用戶組、消費進度偏移量、隊列信息等。從部署結構圖中可以看出 Broker 有 Master 和 Slave 兩種類型,Master 既可以寫又可以讀,Slave 不可以寫只可以讀。從物理結構上看 Broker 的集群部署方式有四種:單 Master 、多 Master 、多 Master 多 Slave(同步刷盤)、多 Master多 Slave(異步刷盤)。

單 Master

這種方式一旦 Broker 重啟或宕機會導致整個服務不可用,這種方式風險較大,所以顯然不建議線上環(huán)境使用。

多 Master

所有消息服務器都是 Master ,沒有 Slave 。這種方式優(yōu)點是配置簡單,單個 Master 宕機或重啟維護對應用無影響。缺點是單臺機器宕機期間,該機器上未被消費的消息在機器恢復之前不可訂閱,消息實時性會受影響。

多 Master 多 Slave(異步復制)

每個 Master 配置一個 Slave,所以有多對 Master-Slave,消息采用異步復制方式,主備之間有毫秒級消息延遲。這種方式優(yōu)點是消息丟失的非常少,且消息實時性不會受影響,Master 宕機后消費者可以繼續(xù)從 Slave 消費,中間的過程對用戶應用程序透明,不需要人工干預,性能同多 Master 方式幾乎一樣。缺點是 Master 宕機時在磁盤損壞情況下會丟失極少量消息。

多 Master 多 Slave(同步雙寫)

每個 Master 配置一個 Slave,所以有多對 Master-Slave ,消息采用同步雙寫方式,主備都寫成功才返回成功。這種方式優(yōu)點是數(shù)據(jù)與服務都沒有單點問題,Master 宕機時消息無延遲,服務與數(shù)據(jù)的可用性非常高。缺點是性能相對異步復制方式略低,發(fā)送消息的延遲會略高。

名稱服務器

名稱服務器(NameServer)用來保存 Broker 相關元信息并給 Producer 和 Consumer 查找 Broker 信息。NameServer 被設計成幾乎無狀態(tài)的,可以橫向擴展,節(jié)點之間相互之間無通信,通過部署多臺機器來標記自己是一個偽集群。每個 Broker 在啟動的時候會到 NameServer 注冊,Producer 在發(fā)送消息前會根據(jù) Topic 到 NameServer 獲取到 Broker 的路由信息,Consumer 也會定時獲取 Topic 的路由信息。所以從功能上看應該是和 ZooKeeper 差不多,據(jù)說 RocketMQ 的早期版本確實是使用的 ZooKeeper ,后來改為了自己實現(xiàn)的 NameServer 。

消息

消息(Message)就是要傳輸?shù)男畔?。一條消息必須有一個主題(Topic),主題可以看做是你的信件要郵寄的地址。一條消息也可以擁有一個可選的標簽(Tag)和額處的鍵值對,它們可以用于設置一個業(yè)務 key 并在 Broker 上查找此消息以便在開發(fā)期間查找問題。

主題

主題(Topic)可以看做消息的規(guī)類,它是消息的第一級類型。比如一個電商系統(tǒng)可以分為:交易消息、物流消息等,一條消息必須有一個 Topic 。Topic 與生產(chǎn)者和消費者的關系非常松散,一個 Topic 可以有0個、1個、多個生產(chǎn)者向其發(fā)送消息,一個生產(chǎn)者也可以同時向不同的 Topic 發(fā)送消息。一個 Topic 也可以被 0個、1個、多個消費者訂閱。

標簽

標簽(Tag)可以看作子主題,它是消息的第二級類型,用于為用戶提供額外的靈活性。使用標簽,同一業(yè)務模塊不同目的的消息就可以用相同 Topic 而不同的 Tag 來標識。比如交易消息又可以分為:交易創(chuàng)建消息、交易完成消息等,一條消息可以沒有 Tag 。標簽有助于保持您的代碼干凈和連貫,并且還可以為 RocketMQ 提供的查詢系統(tǒng)提供幫助。

消息隊列

消息隊列(Message Queue),主題被劃分為一個或多個子主題,即消息隊列。一個 Topic 下可以設置多個消息隊列,發(fā)送消息時執(zhí)行該消息的 Topic ,RocketMQ 會輪詢該 Topic 下的所有隊列將消息發(fā)出去。下圖 Broker 內部消息情況:

Broker 內部消息
消息消費模式

消息消費模式有兩種:集群消費(Clustering)和廣播消費(Broadcasting)。默認情況下就是集群消費,該模式下一個消費者集群共同消費一個主題的多個隊列,一個隊列只會被一個消費者消費,如果某個消費者掛掉,分組內其它消費者會接替掛掉的消費者繼續(xù)消費。而廣播消費消息會發(fā)給消費者組中的每一個消費者進行消費。

消息順序

消息順序(Message Order)有兩種:順序消費(Orderly)和并行消費(Concurrently)。順序消費表示消息消費的順序同生產(chǎn)者為每個消息隊列發(fā)送的順序一致,所以如果正在處理全局順序是強制性的場景,需要確保使用的主題只有一個消息隊列。并行消費不再保證消息順序,消費的最大并行數(shù)量受每個消費者客戶端指定的線程池限制。

工程實例

Java 訪問 RocketMQ 實例

RocketMQ 目前支持 Java、C++、Go 三種語言訪問,按慣例以 Java 語言為例看下如何用 RocketMQ 來收發(fā)消息的。

引入依賴

  <dependency>
      <groupId>org.apache.rocketmq</groupId>
      <artifactId>rocketmq-client</artifactId>
      <version>4.2.0</version>
  </dependency>

添加 RocketMQ 客戶端訪問支持,具體版本和安裝的 RocketMQ 版本一致即可。

消息生產(chǎn)者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

public class Producer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個消息生產(chǎn)者,并設置一個消息生產(chǎn)者組
        DefaultMQProducer producer = new DefaultMQProducer("niwei_producer_group");

        //指定 NameServer 地址
        producer.setNamesrvAddr("localhost:9876");

        //初始化 Producer,整個應用生命周期內只需要初始化一次
        producer.start();

        for (int i = 0; i < 100; i++) {
            //創(chuàng)建一條消息對象,指定其主題、標簽和消息內容
            Message msg = new Message(
                    "topic_example_java" /* 消息主題名 */,
                    "TagA" /* 消息標簽 */,
                    ("Hello Java demo RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
            );

            //發(fā)送消息并返回結果
            SendResult sendResult = producer.send(msg);

            System.out.printf("%s%n", sendResult);
        }

        // 一旦生產(chǎn)者實例不再被使用則將其關閉,包括清理資源,關閉網(wǎng)絡連接等
        producer.shutdown();
    }
}

示例中用 DefaultMQProducer 類來創(chuàng)建一個消息生產(chǎn)者,通常一個應用創(chuàng)建一個 DefaultMQProducer 對象,所以一般由應用來維護生產(chǎn)者對象,可以其設置為全局對象或者單例。該類構造函數(shù)入?yún)?producerGroup 是消息生產(chǎn)者組的名字,無論生產(chǎn)者還是消費者都必須給出 GroupName ,并保證該名字的唯一性,ProducerGroup 發(fā)送普通的消息時作用不大,后面介紹分布式事務消息時會用到。

接下來指定 NameServer 地址和調用 start 方法初始化,在整個應用生命周期內只需要調用一次 start 方法。

初始化完成后,調用 send 方法發(fā)送消息,示例中只是簡單的構造了100條同樣的消息發(fā)送,其實一個 Producer 對象可以發(fā)送多個主題多個標簽的消息,消息對象的標簽可以為空。send 方法是同步調用,只要不拋異常就標識成功。

最后應用退出時調用 shutdown 方法清理資源、關閉網(wǎng)絡連接,從服務器上注銷自己,通常建議應用在 JBOSS、Tomcat 等容器的退出鉤子里調用 shutdown 方法。

消息消費者

package org.study.mq.rocketMQ.java;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.Date;
import java.util.List;

public class Consumer {

    public static void main(String[] args) throws Exception {
        //創(chuàng)建一個消息消費者,并設置一個消息消費者組
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("niwei_consumer_group");
        //指定 NameServer 地址
        consumer.setNamesrvAddr("localhost:9876");
        //設置 Consumer 第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //訂閱指定 Topic 下的所有消息
        consumer.subscribe("topic_example_java", "*");

        //注冊消息監(jiān)聽器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext context) {
                //默認 list 里只有一條消息,可以通過設置參數(shù)來批量接收消息
                if (list != null) {
                    for (MessageExt ext : list) {
                        try {
                            System.out.println(new Date() + new String(ext.getBody(), "UTF-8"));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 消費者對象在使用之前必須要調用 start 初始化
        consumer.start();
        System.out.println("消息消費者已啟動");
    }
}

示例中用 DefaultMQPushConsumer 類來創(chuàng)建一個消息消費者,通生產(chǎn)者一樣一個應用一般創(chuàng)建一個 DefaultMQPushConsumer 對象,該對象一般由應用來維護,可以其設置為全局對象或者單例。該類構造函數(shù)入?yún)?consumerGroup 是消息消費者組的名字,需要保證該名字的唯一性。

接下來指定 NameServer 地址和設置消費者應用程序第一次啟動時從隊列頭部開始消費還是隊列尾部開始消費。

接著調用 subscribe 方法給消費者對象訂閱指定主題下的消息,該方法第一個參數(shù)是主題名,第二個擦書是標簽名,示例表示訂閱了主題名 topic_example_java 下所有標簽的消息。

最主要的是注冊消息監(jiān)聽器才能消費消息,示例中用的是 Consumer Push 的方式,即設置監(jiān)聽器回調的方式消費消息,默認監(jiān)聽回調方法中 List<MessageExt> 里只有一條消息,可以通過設置參數(shù)來批量接收消息。

最后調用 start 方法初始化,在整個應用生命周期內只需要調用一次 start 方法。

啟動 Name Server

nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log

RocketMQ 核心的四大組件中 Name Server 和 Broker 都是由 RocketMQ 安裝包提供的,所以要啟動這兩個應用才能提供消息服務。首先啟動 Name Server,先確保你的機器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqnamesrv ,默認會將該命令的執(zhí)行情況輸出到當前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Name Server 的實際運行情況。

啟動 Broker

nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log

同樣也要確保你的機器中已經(jīng)安裝了與 RocketMQ 相匹配的 JDK ,并設置了環(huán)境變量 JAVA_HOME ,然后在 RocketMQ 的安裝目錄下執(zhí)行 bin 目錄下的 mqbroker ,默認會將該命令的執(zhí)行情況輸出到當前目錄的 nohup.out 文件,最后跟蹤日志文件查看 Broker 的實際運行情況。

運行 Consumer

先運行 Consumer 類,這樣當生產(chǎn)者發(fā)送消息的時候能在消費者后端看到消息記錄。配置沒問題的話會看到在控制臺打印出消息消費者已啟動

運行 Producer

最后運行 Producer 類,在 Consumer 的控制臺能看到接收的消息

消費者接收到消息

Spring 整合 RocketMQ

不同于 RabbitMQ、ActiveMQ、Kafka 等消息中間件,Spring 社區(qū)已經(jīng)通過多種方式提供了對這些中間件產(chǎn)品集成,例如通過 spring-jms 整合 ActiveMQ、通過 Spring AMQP 項目下的 spring-rabbit 整合 RabbitMQ、通過 spring-kafka 整合 kafka ,通過他們可以在 Spring 項目中更方便使用其 API 。目前在 Spring 框架中集成 RocketMQ 有三種方式,一是將消息生產(chǎn)者和消費者定義成 bean 對象交由 Spring 容器管理,二是使用 RocketMQ 社區(qū)的外部項目 rocketmq-jms(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-jms)然后通過 spring-jms 方式集成使用,三是如果你的應用是基于 spring-boot 的,可以使用 RocketMQ 的外部項目 rocketmq-spring-boot-starter(https://github.com/apache/rocketmq-externals/tree/master/rocketmq-spring-boot-starter)比較方便的收發(fā)消息。

總的來講 rocketmq-jms 項目實現(xiàn)了 JMS 1.1 規(guī)范的部分內容,目前支持 JMS 中的發(fā)布/訂閱模型收發(fā)消息。rocketmq-spring-boot-starter 項目目前已經(jīng)支持同步發(fā)送、異步發(fā)送、單向發(fā)送、順序消費、并行消費、集群消費、廣播消費等特性,如果比較喜歡 Spring Boot 這種全家桶的快速開發(fā)框架并且現(xiàn)有特性已滿足業(yè)務要求可以使用該項目。當然從 API 使用上最靈活的還是第一種方式,下面以第一種方式為例簡單看下Spring 如何集成 RocketMQ 的。

消息生產(chǎn)者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.producer.DefaultMQProducer;

public class SpringProducer {

    private Logger logger = Logger.getLogger(getClass());

    private String producerGroupName;

    private String nameServerAddr;

    private DefaultMQProducer producer;

    public SpringProducer(String producerGroupName, String nameServerAddr) {
        this.producerGroupName = producerGroupName;
        this.nameServerAddr = nameServerAddr;
    }

    public void init() throws Exception {
        logger.info("開始啟動消息生產(chǎn)者服務...");

        //創(chuàng)建一個消息生產(chǎn)者,并設置一個消息生產(chǎn)者組
        producer = new DefaultMQProducer(producerGroupName);
        //指定 NameServer 地址
        producer.setNamesrvAddr(nameServerAddr);
        //初始化 SpringProducer,整個應用生命周期內只需要初始化一次
        producer.start();

        logger.info("消息生產(chǎn)者服務啟動成功.");
    }

    public void destroy() {
        logger.info("開始關閉消息生產(chǎn)者服務...");

        producer.shutdown();

        logger.info("消息生產(chǎn)者服務已關閉.");
    }

    public DefaultMQProducer getProducer() {
        return producer;
    }
}

消息生產(chǎn)者就是把生產(chǎn)者 DefaultMQProducer 對象的生命周期分成構造函數(shù)、init、destroy 三個方法,構造函數(shù)中將生產(chǎn)者組名、NameServer 地址作為變量由 Spring 容器在配置時提供,init 方法中實例化 DefaultMQProducer 對象、設置 NameServer 地址、初始化生產(chǎn)者對象,destroy 方法用于生產(chǎn)者對象銷毀時清理資源。

消息消費者

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;

public class SpringConsumer {

    private Logger logger = Logger.getLogger(getClass());

    private String consumerGroupName;

    private String nameServerAddr;

    private String topicName;

    private DefaultMQPushConsumer consumer;

    private MessageListenerConcurrently messageListener;

    public SpringConsumer(String consumerGroupName, String nameServerAddr, String topicName, MessageListenerConcurrently messageListener) {
        this.consumerGroupName = consumerGroupName;
        this.nameServerAddr = nameServerAddr;
        this.topicName = topicName;
        this.messageListener = messageListener;
    }


    public void init() throws Exception {
        logger.info("開始啟動消息消費者服務...");

        //創(chuàng)建一個消息消費者,并設置一個消息消費者組
        consumer = new DefaultMQPushConsumer(consumerGroupName);
        //指定 NameServer 地址
        consumer.setNamesrvAddr(nameServerAddr);
        //設置Consumer第一次啟動是從隊列頭部開始消費還是隊列尾部開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        //訂閱指定 Topic 下的所有消息
        consumer.subscribe(topicName, "*");

        //注冊消息監(jiān)聽器
        consumer.registerMessageListener(messageListener);

        // 消費者對象在使用之前必須要調用 start 初始化
        consumer.start();

        logger.info("消息消費者服務啟動成功.");
    }

    public void destroy(){
        logger.info("開始關閉消息消費者服務...");

        consumer.shutdown();

        logger.info("消息消費者服務已關閉.");
    }

    public DefaultMQPushConsumer getConsumer() {
        return consumer;
    }

}

同消息生產(chǎn)者類似,消息消費者是把生產(chǎn)者 DefaultMQPushConsumer 對象的生命周期分成構造函數(shù)、init、destroy 三個方法,具體含義在介紹 Java 訪問 RocketMQ 實例時已經(jīng)介紹過了,不再贅述。當然,有了消費者對象還需要消息監(jiān)聽器在接收到消息后執(zhí)行具體的處理邏輯。

package org.study.mq.rocketMQ.spring;

import org.apache.log4j.Logger;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;

import java.io.UnsupportedEncodingException;
import java.util.List;

public class MessageListener implements MessageListenerConcurrently {

    private Logger logger = Logger.getLogger(getClass());

    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        if (list != null) {
            for (MessageExt ext : list) {
                try {
                    logger.info("監(jiān)聽到消息 : " + new String(ext.getBody(), "UTF-8"));
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}

消息監(jiān)聽器類就是把前面 Java 示例中注冊消息監(jiān)聽器時聲明的匿名內部類代碼抽取出來定義成單獨一個類而已。

Spring 配置文件

因為只使用 Spring 框架集成,所以除了 Sping 框架核心 jar 包外不需要額外添加依賴包了。本例中將消息生產(chǎn)者和消息消費者分成兩個配置文件,這樣能更好的演示收發(fā)消息的效果。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="producer" class="org.study.mq.rocketMQ.spring.SpringProducer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="producerGroupName" value="spring_producer_group"/>
    </bean>

</beans>

消息生產(chǎn)者配置很簡單,定義了一個消息生產(chǎn)者對象,該對象初始化時調用 init 方法,對象銷毀前執(zhí)行 destroy 方法,將 Name Server 地址和生產(chǎn)者組配置好。

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-4.3.xsd">

    <bean id="messageListener" class="org.study.mq.rocketMQ.spring.MessageListener" />

    <bean id="consumer" class="org.study.mq.rocketMQ.spring.SpringConsumer" init-method="init" destroy-method="destroy">
        <constructor-arg name="nameServerAddr" value="localhost:9876"/>
        <constructor-arg name="consumerGroupName" value="spring_consumer_group"/>
        <constructor-arg name="topicName" value="spring-rocketMQ-topic" />
        <constructor-arg name="messageListener" ref="messageListener" />
    </bean>

</beans>

消息消費者同消息生產(chǎn)者配置類似,多了一個消息監(jiān)聽器對象的定義和綁定。

運行實例程序

按前述步驟 啟動 Name Server 和 Broker,接著運行消息生產(chǎn)者和消息消費者程序,簡化起見我們用兩個單元測試類模擬這兩個程序:

package org.study.mq.rocketMQ.spring;

import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringProducerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-producer.xml");
    }

    @Test
    public void sendMessage() throws Exception {
        SpringProducer producer = container.getBean(SpringProducer.class);

        for (int i = 0; i < 20; i++) {
            //創(chuàng)建一條消息對象,指定其主題、標簽和消息內容
            Message msg = new Message(
                    "spring-rocketMQ-topic",
                    null,
                    ("Spring RocketMQ demo " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* 消息內容 */
            );

            //發(fā)送消息并返回結果
            SendResult sendResult = producer.getProducer().send(msg);

            System.out.printf("%s%n", sendResult);
        }
    }

}

SpringProducerTest 類模擬消息生產(chǎn)者發(fā)送消息。

package org.study.mq.rocketMQ.spring;

import org.junit.Before;
import org.junit.Test;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class SpringConsumerTest {

    private ApplicationContext container;

    @Before
    public void setup() {
        container = new ClassPathXmlApplicationContext("classpath:spring-consumer.xml");
    }

    @Test
    public void consume() throws Exception {
        SpringConsumer consumer = container.getBean(SpringConsumer.class);

        Thread.sleep(200 * 1000);

        consumer.destroy();
    }
}

SpringConsumerTest 類模擬消息消費者者接收消息,在 consume 方法返回之前需要讓當前線程睡眠一段時間,使消費者程序繼續(xù)存活才能監(jiān)聽到生產(chǎn)者發(fā)送的消息。

分別運行 SpringProducerTest 類 和 SpringConsumerTest 類,在 SpringConsumerTest 的控制臺能看到接收的消息:

消費者接收到消息

假如啟動兩個 SpringConsumerTest 類進程,因為它們屬于同一消費者組,在 SpringConsumerTest 的控制臺能看到它們均攤到了消息:

消費者1
消費者2
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容