RabbitMQ 消息隊列

什么是 RabbitMQ

MQ(Message Queue)消息隊列

消息隊列中間件,是分布式系統(tǒng)中的重要組件;
主要解決異步處理、應用解耦、流量削峰等問題,從而實現(xiàn)高性能,高可用,可伸縮和最終一致性的架構。

使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka 等。

異步處理

用戶注冊后,需要發(fā)送驗證郵箱和手機驗證碼。

將注冊信息寫入數(shù)據(jù)庫,發(fā)送驗證郵件,發(fā)送手機,三個步驟全部完成后,返回給客戶端。

傳統(tǒng):

客戶端 <-> 注冊信息寫入數(shù)據(jù)庫 -> 發(fā)送注冊郵件 -> 發(fā)送注冊短信

現(xiàn)在:

客戶端 <-> 注冊信息寫入數(shù)據(jù)庫 -> 寫入消息隊列 -> 異步 [發(fā)送注冊郵件,發(fā)送注冊短信]

應用解耦

場景:訂單系統(tǒng)需要通知庫存系統(tǒng)。

如果庫存系統(tǒng)異常,則訂單調用庫存失敗,導致下單失敗。

原因:訂單系統(tǒng)和庫存系統(tǒng)耦合度太高。

傳統(tǒng):

用戶 <-> 訂單系統(tǒng) - 調用庫存接口 -> 庫存系統(tǒng)

現(xiàn)在:

用戶 <-> 訂單系統(tǒng) - 寫入 -> 消息隊列 <- 訂閱 - 庫存系統(tǒng)

訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功。

庫存系統(tǒng):訂閱下單的消息,獲取下單信息,庫存系統(tǒng)根據(jù)下單信息,再進行庫存操作。

假如:下單的時候,庫存系統(tǒng)不能正常運行,也不會影響下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關心其他的后續(xù)操作了,實現(xiàn)了訂單系統(tǒng)和庫存系統(tǒng)的應用解耦。

所以,消息隊列是典型的“生產者-消費者“模型。

生產者不斷的向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。

因為消息的生產和消費都是異步的,而且只關心消息的發(fā)送和接收,沒有業(yè)務邏輯的入侵,這樣就實現(xiàn)了生產者和消費者的解耦。

流量削峰

搶購,秒殺等業(yè)務,針對高并發(fā)的場景。

因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列。

用戶的請求,服務器接收后,首先寫入消息隊列,如果超過隊列的長度,就拋棄,發(fā)送一個結束的頁面;而請求成功的就是進入隊列的用戶。

背景知識介紹

AMQP 高級消息隊列協(xié)議

Advanced Message Queuing Protocol 是一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議。

協(xié)議:數(shù)據(jù)在傳輸?shù)倪^程中必須要遵守的規(guī)則。

基于此協(xié)議的客戶端可以與消息中間件傳遞消息。

并不受產品、開發(fā)語言等條件的限制。

JMS

Java Message Server 是 Java 消息服務應用程序接口,一種規(guī)范,和 JDBC 擔任的角色類似。

JMS 是一個 Java 平臺中關于面向消息中間件的 API,用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。

二者的聯(lián)系

JMS 是定義了統(tǒng)一接口,統(tǒng)一消息操作;AMQP 通過協(xié)議統(tǒng)一數(shù)據(jù)交互格式。

JMS 必須是 Java 語言;AMQP 只是協(xié)議,與語言無關。

Erlang 語言

Erlang 是一種通用的面向并發(fā)的編程語言,目的是創(chuàng)造一種可以應對大規(guī)模并發(fā)活動的編程語言和運行環(huán)境。

最初是專門為通信應用設計的,比如控制交換機或者變換協(xié)議等,因此非常適合構建分布式,實時軟并行計算系統(tǒng)。

Erlang 運行時環(huán)境是一個虛擬機,有點像 Java 的虛擬機,這樣代碼一經編譯,同樣可以隨處運行。

為什么選擇 RabbitMQ

  • RabbitMQ 由 Erlang 開發(fā),AMQP 的最佳搭檔,安裝部署簡單,上手門檻低。

  • 企業(yè)級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網(wǎng)易等一線大廠都有使用。

  • 有強大的 WEB 管理頁面。

  • 強大的社區(qū)支持,為技術進步提供動力。

  • 支持消息持久化、支持消息確認機制、靈活的任務分發(fā)機制等,支持功能非常豐富。

  • 集群擴展很容易,并且可以通過增加節(jié)點實現(xiàn)成倍的性能提升。

總結:如果希望使用一個可靠性高、功能強大、易于管理的消息隊列系統(tǒng)那么就選擇 RabbitMQ;如果想用一個性能高,但偶爾丟點數(shù)據(jù),可以使用 Kafka 或者 ZeroMQ。

Kafka 和 ZeroMQ 的性能比 RabbitMQ 好很多。

RabbitMQ 各組件功能

RabbitMQ 組件
  1. Broker - 消息隊列服務器實體。

  2. Virtual Host - 虛擬主機:

  • 標識一批交換機、消息隊列和相關對象,形成的整體。
  • 虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務器域。
  • 每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。
  • VHost 是 AMQP 概念的基礎,RabbitMQ 默認的 vhost 是 /,必須在鏈接時指定。
  1. Exchange - 交換器(路由):用來接收生產者發(fā)送的消息并將這些消息通過路由發(fā)給服務器中的隊列。

  2. Banding - 綁定。用于交換機和消息隊列之間的關聯(lián)

  3. Queue - 消息隊列:

  • 用來保存消息直到發(fā)送給消費者。
  • 它是消息的容器,也是消息的終點。
  • 一個消息可投入一個或多個隊列。
  • 消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
  1. Channel - 通道(信道):
  • 多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。
  • 信道是建立在真實的 TCP 連接內的虛擬鏈接。
  • AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,都是通過信道完成的。
  • 因為對于操作系統(tǒng)來說,建立和銷毀 TCP 連接都是非常昂貴的開銷,所以引入了信道的概 念,用來復用 TCP 連接。
  1. Connection - 網(wǎng)絡連接,比如一個 TCP 連接。

  2. Publisher - 消息的生產者,也是一個向交換器發(fā)布消息的客戶端應用程序。

  3. Consumer - 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。

  4. Message - 消息:

  • 消息是不具名的,它是由消息頭和消息體組成。
  • 消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(優(yōu)先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。

使用 RabbitMQ

想要安裝 RabbitMQ,必須先安裝 erlang 語言環(huán)境;類似安裝 tomcat,必須先安裝 JDK。
查看匹配的版本:https://www.rabbitmq.com/which-erla.html

RabbitMQ 安裝啟動

Erlang 下載:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang

Socat 下載:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm

RabbitMQ 下載:https://www.rabbitmq.com/install-rpm.html#downloads

安裝

啟動 Linux 系統(tǒng)(192.168.186.128),傳輸相關的三個 rpm 到 /opt 目錄下,然后在 /opt 目錄下按順序執(zhí)行安裝命令:

rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
啟動后臺管理插件
rabbitmq-plugins enable rabbitmq_management
啟動 RabbitMQ
systemctl start rabbitmq-server.service
systemctl status rabbitmq-server.service
systemctl restart rabbitmq-server.service
systemctl stop rabbitmq-server.service
查看進程
ps -ef | grep rabbitmq
測試
  1. 關閉防火墻
systemctl stop firewalld

(或者防火墻開放對應的端口號)

firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5671/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --reload
  1. 瀏覽器輸入:http://ip:15672(比如這里輸入:http://192.168.186.128:15672)
  2. 默認帳號和密碼是 guest,而 guest 用戶默認不允許遠程連接

創(chuàng)建賬號:

rabbitmqctl add_user zm 123456

設置用戶角色:

rabbitmqctl set_user_tags zm administrator

設置用戶權限:

rabbitmqctl set_permissions -p "/" zm ".*" ".*" ".*"

查看當前用戶和角色:

rabbitmqctl list_users

修改用戶密碼:

rabbitmqctl change_password zm NewPassword

管理界面介紹:

  • Overview - 概覽
  • Connections - 查看鏈接情況
  • Channels - 信道(通道)情況
  • Exchanges - 交換機(路由)情況,默認4類7個
  • Queues - 消息隊列情況
  • Admin - 管理員列表
  • RabbitMQ 提供給編程語言客戶端鏈接的端口 - 5672;RabbitMQ 管理界面的端口 - 15672;RabbitMQ 集群的端口 - 25672。

RabbitMQ 快速操作

依賴
<!-- 指定編碼及版本 -->
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.encoding>UTF-8</maven.compiler.encoding>
    <java.version>1.11</java.version>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.7.3</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.9</version>
    </dependency>
</dependencies>
日志依賴 log4j(可選項)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
創(chuàng)建連接

先在 RabbitMQ 管理界面 Admin -> Virtual Hosts -> Add a new virtual host 創(chuàng)建虛擬主機 (Name: /zm, Description: zm, Tags: administrator);

然后編寫連接的代碼:

public class ConnectionUtil {

    public static Connection getConnection() throws  Exception{
        // 1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2.在工廠對象中設置 MQ 的連接信息(ip, port, vhost, username, password)
        factory.setHost("192.168.186.128");
        factory.setPort(5672);
        factory.setVirtualHost("/zm");
        factory.setUsername("zm");
        factory.setPassword("123456");
        // 3.通過工廠獲得與 MQ 的連接
        return factory.newConnection();
    }


    public static void main(String[] args) throws Exception {
        Connection connection = getConnection();
        System.out.println("Connection: " + connection);
        connection.close();
    }

}

RabbitMQ 模式

RabbitMQ 提供了 6 種消息模型,但是第 6 種其實是 RPC,并不是 MQ。
在線手冊:https://www.rabbitmq.com/getstarted.html

5 種消息模型,大體分為兩類:

  • 1 和 2 屬于點對點。
  • 3、4、5 屬于發(fā)布訂閱模式(一對多)。

點對點模式 - P2P(Point to Point)模式:

  • 包含三個角色:消息隊列 queue,發(fā)送者 sender,接收者 receiver。
  • 每個消息發(fā)送到一個特定的隊列中,接收者從中獲得消息。
  • 隊列中保留這些消息,直到他們被消費或超時。
  • 如果希望發(fā)送的每個消息都會被成功處理,那需要 P2P。

特點:

  1. 每個消息只有一個消費者,一旦消費,消息就不在隊列中了。
  2. 發(fā)送者和接收者之間沒有依賴性,發(fā)送者發(fā)送完成,不管接收者是否運行,都不會影響消息發(fā)送到隊列中。
  3. 接收者成功接收消息之后需向對象應答成功(確認)。

發(fā)布訂閱模式 - publish / subscribe 模式:

  • Pub / Sub 模式包含三個角色:交換機 exchange,發(fā)布者 publisher,訂閱者 subcriber。
  • 多個發(fā)布者將消息發(fā)送交換機,系統(tǒng)將這些消息傳遞給多個訂閱者。
  • 如果希望發(fā)送的消息被多個消費者處理,可采用 Pub / Sub。

特點:

  1. 每個消息可以有多個訂閱者。
  2. 發(fā)布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創(chuàng)建一個訂閱后,才能消費發(fā)布者的消息。
  3. 為了消費消息,訂閱者必須保持運行狀態(tài)。
簡單模式(Hello World!)


RabbitMQ 本身只是接收,存儲和轉發(fā)消息,并不會對信息進行處理;類似郵局,處理信件的應該是收件人而不是郵局。

生產者 P

public class Sender {

    public static void main(String[] args) throws Exception {
        String msg = "Hello, 你好 zm";

        // 1.獲得連接
        Connection connection = ConnectionUtil.getConnection();
        // 2.在連接中創(chuàng)建通道(信道)
        Channel channel = connection.createChannel();
        // 3.創(chuàng)建消息隊列 (1,2,3,4,5)
        /*
            參數(shù) 1: 隊列的名稱
            參數(shù) 2: 隊列中的數(shù)據(jù)是否持久化
            參數(shù) 3: 是否排外(是否支持擴展,當前隊列只能自己用,不能給別人用)
            參數(shù) 4: 是否自動刪除(當隊列的連接數(shù)為 0 時,隊列會銷毀,不管隊列是否還存保存數(shù)據(jù))
            參數(shù) 5: 隊列參數(shù)(沒有參數(shù)為 null)
         */
        channel.queueDeclare("queue1", false, false, false, null);
        // 4.向指定的隊列發(fā)送消息 (1,2,3,4)
        /*
            參數(shù) 1: 交換機名稱,當前是簡單模式,也就是 P2P 模式,沒有交換機,所以名稱為 ""
            參數(shù) 2: 目標隊列的名稱
            參數(shù) 3: 設置消息的屬性(沒有屬性則為 null)
            參數(shù) 4: 消息的內容 (只接收字節(jié)數(shù)組)
         */
        channel.basicPublish("", "queue1", null, msg.getBytes());
        System.out.println("發(fā)送:" + msg);
        // 5.釋放資源
        channel.close();
        connection.close();
    }

}

啟動生產者,即可前往管理端查看隊列中的信息,會有一條信息沒有處理。

消費者 C

public class Receiver {

    public static void main(String[] args) throws Exception {
        // 1.獲得連接
        Connection connection = ConnectionUtil.getConnection();
        // 2.獲得通道(信道)
        Channel channel = connection.createChannel();

        // 3.從信道中獲得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body 就是從隊列中獲取的消息
                String s = new String(body);
                System.out.println("獲取消息為:" + s);
            }
        };
        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume("queue1", true, consumer);
    }

}

啟動消費者,前往管理端查看隊列中的信息,所有信息都已經處理和確認,顯示 0。

消息確認機制 ACK
通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除。

如果消費者接收消息后,還沒執(zhí)行操作就拋異常宕機導致消費失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了。

因此,RabbitMQ 有一個 ACK 機制,當消費者獲取消息后,會向 RabbitMQ 發(fā)送回執(zhí) ACK,告知消息已經被接收。

ACK - Acknowledge character 即是確認字符,在數(shù)據(jù)通信中,接收站發(fā)給發(fā)送站的一種傳輸類控制字符,表示發(fā)來的數(shù)據(jù)已確認接收無誤。在使用 http 請求時,http 的狀態(tài)碼 200 就是表示服務器執(zhí)行成功。

整個過程就像快遞員將包裹送到你手里,并且需要你的簽字,并拍照回執(zhí)。

不過這種回執(zhí) ACK 分為兩種情況:

  • 自動 ACK - 消息接收后,消費者立刻自動發(fā)送 ACK,類似快遞放在快遞柜。
  • 手動 ACK - 消息接收后,不會發(fā)送 ACK,需要手動調用,類似快遞必須本人簽收。

兩種情況如何選擇,需要看消息的重要性:

  • 如果消息不太重要,丟失也沒有影響,自動 ACK 會比較方便。
  • 如果消息非常重要,最好消費完成手動 ACK;因為如果自動 ACK 消費后,RabbitMQ 就會把消息從隊列中刪除,而此時消費者拋異常宕機,那么消息就永久丟失了。

修改啟動手動 ACK 消息確認:

// 監(jiān)聽隊列 false: 手動消息確認
channel.basicConsume("queue1", false, consumer);

啟動生產者和消費者,前往管理端查看隊列中的信息,會有一條信息沒有確認(Unacked)。

手動 ACK消息確認解決問題:

public class ReceiverAck {

    public static void main(String[] args) throws Exception {
        // 1.獲得連接
        Connection connection = ConnectionUtil.getConnection();
        // 2.獲得通道(信道)
        final Channel channel = connection.createChannel();

        // 3.從信道中獲得消息
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            // 交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息)
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                // body就是從隊列中獲取的消息
                String s = new String(body);
                System.out.println("獲取消息為:" + s);
                // 手動確認(收件人信息,是否同時確認多個消息)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4.監(jiān)聽隊列 false: 手動消息確認
        channel.basicConsume("queue1", false, consumer);
    }

}
工作隊列模式(Work queues)

簡單模式,一個消費者來處理消息,如果生產者生產消息過快過多,而消費者的能力有限,就會產生消息在隊列中堆積(生活中的滯銷)。

當運行許多消費者程序時,消息隊列中的任務會被眾多消費者共享,但其中某一個消息只會被一個消費者獲取(100 支肉串 20 個人吃,但是其中的某支肉串只能被一個人吃)。

生產者 P

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("test_work_queue",false,false,false,null);

        for(int i = 1;i<=100;i++) {
            String msg = "Message --> " + i;
            channel.basicPublish("", "test_work_queue", null, msg.getBytes());
            System.out.println(msg);
        }

        channel.close();
        connection.close();
    }

}

消費者 1

public class Receiver1 {

    // 統(tǒng)計獲取的信息的數(shù)量
    static int counter = 1;

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
        channel.queueDeclare("test_work_queue", false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Receiver 1: " + s + ". Total Message Count: " + counter++);
                // 模擬網(wǎng)絡延遲 200 毫秒
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手動確認(收件人信息,是否同時確認多個消息)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4.監(jiān)聽隊列 false:手動消息確認
        channel.basicConsume("test_work_queue", false, consumer);
    }

}

消費者 2

public class Receiver2 {

    // 統(tǒng)計獲取的信息的數(shù)量
    static int counter = 1;

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        final Channel channel = connection.createChannel();

        // queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
        channel.queueDeclare("test_work_queue", false, false, false, null);

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Receiver 2: " + s + ". Total Message Count: " + counter++);
                // 模擬網(wǎng)絡延遲 900 毫秒
                try {
                    Thread.sleep(900);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 手動確認(收件人信息,是否同時確認多個消息)
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };
        // 4.監(jiān)聽隊列 false:手動消息確認
        channel.basicConsume("test_work_queue", false, consumer);
    }

}

能者多勞
先運行 2 個消費者,排隊等候消費(取餐),再運行生產者開始生產消息(烤肉串)。

由運行結果可以看到,雖然兩個消費者的消費速度不一致(線程休眠時間),但是消費的數(shù)量卻是一致的,各消費 50 個消息。

  • 例如:工作中,A 編碼速率高,B 編碼速率低,兩個人同時開發(fā)一個項目,A 10 天完成,B 30 天完成,A 完成自己的編碼部分,就無所事事了,等著 B 完成就可以了,這樣是不可以的,應該遵循“能者多勞”。
  • 效率高的多干點,效率低的少干點。

為了克服這個問題,可以使用設置為 prefetchCount = 1 的 basicQos 方法。這告訴RabbitMQ 一次不要給一個 worker 發(fā)送一條以上的消息?;蛘?,換句話說,在 worker 處理并確認前一個消息之前,不要向它發(fā)送新消息。相反,它將把它分派到下一個不繁忙的 worker。

在消費者 1 和消費者 2 中加上 channel.basicQos(1):

...
// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
channel.queueDeclare("test_work_queue", false, false, false, null);
// 開啟一次接受一條消息??梢岳斫鉃椋嚎爝f一個一個送,送完一個再送下一個,速度快的送件就多
channel.basicQos(1);
...

能者多勞必須要配合手動的 ACK 機制才生效。

如何避免消息堆積?

  • Workqueue,多個消費者監(jiān)聽同一個隊列。
  • 接收到消息后,通過線程池,異步消費。
發(fā)布/訂閱模式(Publish/Subscribe)

工作隊列背后的假設是,每個任務都被準確地交付給一個工作者;“發(fā)布/訂閱”模式將一個消息傳遞給多個消費者。

生活中的案例:眾多粉絲關注一個視頻主,視頻主發(fā)布視頻,所有粉絲都可以得到視頻通知。

  • 生產者 P 發(fā)送信息給路由 X,路由 X 將信息轉發(fā)給綁定路由 X 的隊列;隊列將信息通過信道發(fā)送給消費者,最后消費者進行消費。整個過程,必須先創(chuàng)建路由。

  • 路由在生產者程序中創(chuàng)建。

  • 路由沒有存儲消息的能力,當生產者將信息發(fā)送給路由后,消費者還沒有運行,所以沒有隊列,路由并不知道將信息發(fā)送給誰。

  • 運行程序的順序:

  1. 執(zhí)行一次 MessageSender,聲明了路由。
  2. 執(zhí)行 MessageReceiver1 和 MessageReceiver2,綁定到路由。
  3. 再次執(zhí)行 MessageSender,發(fā)送消息給路由。

生產者

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明路由(路由名,路由類型)
        // fanout:不處理路由鍵(只需要將隊列綁定到路由上,發(fā)送到路由的消息都會被轉發(fā)到與該路由綁定的所有隊列上)
        channel.exchangeDeclare("test_exchange_fanout", "fanout");

        String msg = "Hello,world";
        channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
        System.out.println("Publisher:" + msg);

        channel.close();
        connection.close();
    }

}

消費者 1

public class Receiver1 {

    private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
        // 綁定路由(關注)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Subscriber 1: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}

消費者 2

public class Receiver2 {

    private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
        // 綁定路由(關注)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Subscriber 2: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}
路由模式(Routing)

路由會根據(jù)類型進行定向(direct)分發(fā)消息給不同的隊列;每種類型可以對應多個消費者。

運行程序的順序:

  • 先運行一次 Sender(創(chuàng)建路由器)。
  • 有了路由器之后,在創(chuàng)建兩個 Receiver1 和 Receiver2,進行隊列綁定。
  • 再次運行 Sender,發(fā)出消息。

生產者

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明路由 (路由名,路由類型)
        // direct:根據(jù)路由鍵進行定向分發(fā)消息
        channel.exchangeDeclare("test_exchange_direct", "direct");

        String msg = "Register New User: userid=S101";
        channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());
        System.out.println(msg);

        channel.close();
        connection.close();
    }

}

消費者 1

public class Receiver1 {

    private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);

        // 綁定路由(如果路由鍵的類型是 添加,刪除,修改 的話,綁定到這個隊列 1 上)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "insert");
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "update");
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "delete");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Cosumer 1: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}

消費者 2

public class Receiver2 {

    private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);

        // 綁定路由(如果路由鍵的類型是 查詢 的話,綁定到這個隊列 2 上)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "select");
       
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Cosumer 2: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}
通配符模式(Topics)

通配符模式是和路由模式差不多,唯獨的區(qū)別就是路由鍵支持模糊匹配。

匹配符號:

* :只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)。
# : 匹配 0 個或更多個詞。

案例:

Q1 綁定了路由鍵 `*.orange.*`      
Q2 綁定了路由鍵 `*.*.rabbit` 和 `lazy.#`

quick.orange.rabbit         # Q1    Q2
lazy.orange.elephant        # Q1    Q2
quick.orange.fox            # Q1
lazy.brown.fox              # Q2
lazy.pink.rabbit            # Q2
quick.brown.fox             # 無
orange                      # 無
quick.orange.male.rabbit    # 無

生產者

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明路由 (路由名,路由類型)
        // topic:模糊匹配的定向分發(fā)
        channel.exchangeDeclare("test_exchange_topic", "topic");

        String msg = "price-off promotion";
        channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes());
        System.out.println("Provider: " + msg);

        channel.close();
        connection.close();
    }

}

消費者 1

public class Receiver1 {

    private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
        // 綁定路由(綁定用戶相關的消息)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Consumer 1: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}

消費者 2

public class Receiver2 {

    private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_2";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列
        channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
        // 綁定路由(綁定商品和訂單相關的消息)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "product.#");
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "order.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Consumer 2: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}

持久化

  • 消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何避免消息丟失?

    1. 消費者的 ACK 確認機制,可以防止消費者丟失消息。

    2. 萬一在消費者消費之前,RabbitMQ 服務器宕機了,那消息也會丟失。

  • 想要將消息持久化,那么路由和隊列都要持久化才可以。

生產者

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明路由 (路由名,路由類型,持久化)
        // topic:模糊匹配的定向分發(fā)
        channel.exchangeDeclare("test_exchange_topic", "topic", true);

        String msg = "price-off promotion";
        // 信道持久化
        channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
        System.out.println("Provider: " + msg);

        channel.close();
        connection.close();
    }

}

消費者

public class Receiver1 {

    private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        // 聲明隊列 (第二個參數(shù)為 true:支持持久化)
        channel.queueDeclare(RECEIVER_QUEUE, true, false, false, null);
        // 綁定路由(綁定用戶相關的消息)
        channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");
        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Consumer 1: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true: 自動消息確認
        channel.basicConsume(RECEIVER_QUEUE, true, consumer);
    }

}

Spring 整合 RabbitMQ

五種消息模型,在企業(yè)中應用最廣泛的就是定向匹配 topics。

Spring AMQP 是基于 Spring 框架的 AMQP 消息解決方案,提供模板化的發(fā)送和接收消息的抽象層,提供基于消息驅動的 POJO 的消息監(jiān)聽等,簡化了對于 RabbitMQ 相關程序的開發(fā)。

生產端工程
  • 依賴 pom.xml
<dependencies>
    <dependency>
        <groupId>org.springframework.amqp</groupId>
        <artifactId>spring-rabbit</artifactId>
        <version>2.0.1.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.25</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.9</version>
    </dependency>
</dependencies>
  • spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 1.配置連接 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.186.128"
                               port="5672"
                               username="zm"
                               password="123456"
                               virtual-host="/zm"
                               publisher-confirms="true"
    />

    <!-- 2.配置隊列 -->
    <rabbit:queue name="test_spring_queue_1"/>

    <!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 4.配置交換機,topic 類型 -->
    <rabbit:topic-exchange name="spring_topic_exchange">
        <rabbit:bindings>
            <!-- 綁定隊列 -->
            <rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/>
        </rabbit:bindings>
    </rabbit:topic-exchange>

    <!-- 5.配置 json 轉換的工具 -->
    <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>

    <!-- 6.配置 rabbitmq 的模版 -->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     exchange="spring_topic_exchange"
                     message-converter="jsonMessageConverter"/>

</beans>
  • 發(fā)消息 com.zm.test.Sender:
public class Sender {

    public static void main(String[] args) {
        // 1.創(chuàng)建 spring 容器
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");

        // 2.從 spring 容器中獲得 rabbit 模版對象
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 3.發(fā)消息
        Map<String, String> map = new HashMap<String, String>();
        map.put("name", "張三");
        map.put("email", "123456789@qq.com");
        rabbitTemplate.convertAndSend("msg.user", map);
        System.out.println("Message Sent...");

        context.close();
    }

}
消費端工程
  • 依賴與生產者一致
  • spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
       http://www.springframework.org/schema/context
       http://www.springframework.org/schema/context/spring-context.xsd">

    <!--1.配置連接-->
    <rabbit:connection-factory
            id="connectionFactory"
            host="192.168.186.128"
            port="5672"
            username="zm"
            password="123456"
            virtual-host="/zm"/>

    <!-- 2.配置隊列 -->
    <rabbit:queue name="test_spring_queue_1"/>

    <!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 4.注解掃描包 springIOC -->
    <context:component-scan base-package="com.zm.listener"/>

    <!-- 5.配置監(jiān)聽 -->
    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
    </rabbit:listener-container>

</beans>
  • 消費者:

MessageListener 接口用于 spring 容器接收到消息后處理消息;

如果需要使用自己定義的類型來實現(xiàn)處理消息時,必須實現(xiàn)該接口,并重寫 onMessage() 方法;

當 spring 容器接收消息后,會自動交由 onMessage 進行處理。

com.zm.listener.ConsumerListener:

@Component
public class ConsumerListener implements MessageListener {

    /**
     * jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的
     */
    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message) {
        // 將 message對象轉換成 json
        JsonNode jsonNode = null;
        try {
            jsonNode = MAPPER.readTree(message.getBody());
            String name = jsonNode.get("name").asText();
            String email = jsonNode.get("email").asText();
            System.out.println("Message From Queue:{" + name + ", " + email + "}");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}
  • 啟動項目 com.zm.test.TestRunner:
public class TestRunner {

    public static void main(String[] args) throws IOException {
        // 獲得容器
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
        // 讓程序一直運行,別終止
        System.in.read();
    }

}

消息成功確認機制

在實際場景下,有的生產者發(fā)送的消息是必須保證成功發(fā)送到消息隊列中,需要 事務機制發(fā)布確認機制

事務機制

AMQP 協(xié)議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式;

利用信道的三個方法來實現(xiàn)以事務方式發(fā)送消息,若發(fā)送失敗,通過異常處理回滾事務,確保消息成功投遞

  • channel.txSelect() - 開啟事務
  • channel.txCommit() - 提交事務
  • channel.txRollback() - 回滾事務

Spring 已經對上面三個方法進行了封裝,所以這里使用原始的代碼演示。

生產者

public class Sender {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare("test_transaction", "topic");
        // 開啟事務
        channel.txSelect();
        try {
            channel.basicPublish("test_transaction", "product.price", null, "Item 1: price-off".getBytes());
            // 模擬出錯
            // System.out.println(1 / 0);
            channel.basicPublish("test_transaction", "product.price", null, "Item 2: price-off".getBytes());
            // 提交事務(一起成功)
            channel.txCommit();
            System.out.println("Producer: All Messages Sent");
        } catch (Exception e) {
            System.out.println("All Messages Rollback");
            // 事務回滾(一起失敗)
            channel.txRollback();
            e.printStackTrace();
        } finally {
            channel.close();
            connection.close();
        }
    }

}

消費者

public class Receiver {

    public static void main(String[] args) throws Exception {
        Connection connection = ConnectionUtil.getConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare("test_transaction_queue", false, false, false, null);
        channel.queueBind("test_transaction_queue", "test_transaction", "product.#");

        DefaultConsumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body);
                System.out.println("Consumer: " + s);
            }
        };

        // 4.監(jiān)聽隊列 true:自動消息確認
        channel.basicConsume("test_transaction_queue", true, consumer);
    }

}
Confirm 發(fā)布確認機制

RabbitMQ 為了保證消息的成功投遞,采用通過 AMQP 協(xié)議層面提供事務機制的方案,但是采用事務會大大降低消息的吞吐量。

開啟事務性能最大損失超過 250 倍。

事務效率低下原因:100 條消息,前 99 條成功,如果第 100 條失敗,那么 99 條消息要全部撤銷回滾。

更加高效的解決方式是采用 Confirm 模式,而 Confirm 模式則采用補發(fā)第 100 條的措施來完成 100 條消息的送達。

在 Spring 中應用

  • resources\spring\spring-rabbitmq-producer.xml
...
<!-- 6.配置 rabbitmq 的模版 -->
<rabbit:template id="rabbitTemplate"
                 connection-factory="connectionFactory"
                 exchange="spring_topic_exchange"
                 message-converter="jsonMessageConverter"
                 confirm-callback="messageConfirm"/>

<!-- 7.確認機制的處理類 -->
<bean id="messageConfirm" class="com.renda.confirm.MessageConfirm"/>
...
  • 消息確認處理類 com.zm.confirm.MessageConfirm:
public class MessageConfirm implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 消息相關的數(shù)據(jù)對象(封裝了消息的唯一 id)
     * @param b               消息是否確認成功
     * @param s               異常信息
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean b, String s) {
        if (b) {
            System.out.println("Successfully Confirmed Message");
        } else {
            System.out.println("Fail to Confirm Message, error: " + s);
            // 如果本條消息一定要發(fā)送到隊列中,例如下訂單消息,可以采用補發(fā)
            // 1.采用遞歸(限制遞歸的次數(shù))
            // 2.redis + 定時任務(jdk 的 timer,或者定時任務框架 Quartz)
        }
    }
}
  • resources\log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n

log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l  %m%n

log4j.rootLogger=debug, stdout,file
  • 發(fā)送消息 com.zm.test.Sender:
...
// 3.發(fā)消息
Map<String, String> map = new HashMap<String, String>();
map.put("name", "張三");
map.put("email", "123456789@qq.com");
// 模擬發(fā)送消息失敗
// rabbitTemplate.convertAndSend("fuck", "msg.user", map);
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("Message Sent...");
...

消費端限流

RabbitMQ 服務器積壓了成千上萬條未處理的消息,然后隨便打開一個消費者客戶端,就會出現(xiàn)這樣的情況:巨量的消息瞬間全部噴涌推送過來,但是單個客戶端無法同時處理這么多數(shù)據(jù),就會被壓垮崩潰。

所以,當數(shù)據(jù)量特別大的時候,對生產端限流肯定是不科學的,因為有時候并發(fā)量就是特別大,有時候并發(fā)量又特別少,這是用戶的行為 - 是無法約束的。

應該對消費端限流,用于保持消費端的穩(wěn)定。

RabbitMQ 提供了一種 QoS(Quality of Service,服務質量)服務質量保證功能;

即在非自動確認消息的前提下,如果一定數(shù)目的消息未被確認前,不再進行消費新的消息。

生產者 com.zm.test.Sender 使用循環(huán)發(fā)出多條消息:

...
for (int i = 0; i < 10; i++) {
    rabbitTemplate.convertAndSend("msg.user", map);
    System.out.println("Message Sent...");
}
...

RabbitMQ 的管理頁面可以看到生產了 10 條堆積未處理的消息。

消費者進行限流處理:

resources\spring\spring-rabbitmq-consumer.xml

...
5.配置監(jiān)聽 -->
<!--
        prefetch="3":一次性消費的消息數(shù)量。
        會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,
        一旦有 N 個消息還沒有 ack,則該 consumer 將阻塞,直到消息被 ack。
    -->
<!-- acknowledge-mode: manual 手動確認-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
    <rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
...

com.zm.listener.ConsumerListener

@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {

    /**
     * jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的
     */
    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        // 將 message對象轉換成 json
//        JsonNode jsonNode = MAPPER.readTree(message.getBody());
//        String name = jsonNode.get("name").asText();
//        String email = jsonNode.get("email").asText();
//        System.out.println("Message From Queue:{" + name + ", " + email + "}");

        String str = new String(message.getBody());
        System.out.println("str = " + str);

        /**
         * 手動確認消息(參數(shù)1,參數(shù)2)
         * 參數(shù) 1:RabbitMQ 想該 channel 投遞的這條消息的唯一標識 ID,此 ID 是一個單調遞增的正整數(shù)。
         * 參數(shù) 2:為了減少網(wǎng)絡流量,手動確認可以被批量處理;當該參數(shù)為 true 時,則可以一次性確認小于等于 msgId 值的所有消息。
         */
        long msgId = message.getMessageProperties().getDeliveryTag();
        channel.basicAck(msgId, true);

        Thread.sleep(3000);
        System.out.println("Rest for 3 seconds and then continue for more messages...");
    }
}

每次最多只確認接收 3 條消息,直到消息隊列為空。

過期時間 TTL

Time To Live - 生存時間、還能活多久,單位毫秒。

在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為 dead message 并投入到死信隊列,無法消費該消息)。

RabbitMQ 可以對消息和隊列設置 TTL:

  • 通過隊列設置,隊列中所有消息都有相同的過期時間。
  • 對消息單獨設置,每條消息的 TTL 可以不同(更顆粒化)。
設置隊列 TTL

RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

resources\spring\spring-rabbitmq-producer.xml

<!-- 對隊列中的消息設置過期時間 -->
<rabbit:queue name="test_spring_queue_1" auto-declare="true">
    <rabbit:queue-arguments>
        <entry key="x-message-ttl" value-type="long" value="5000"/>
    </rabbit:queue-arguments>
</rabbit:queue>

5 秒之后,消息自動刪除。

設置消息 TTL

RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。

設置某條消息的 TTL,只需要在創(chuàng)建發(fā)送消息時指定即可。

resources\spring\spring-rabbitmq-producer.xml

<rabbit:queue name="test_spring_queue_1"/>
com.zm.test.Sender2

public class Sender2 {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);

        // 創(chuàng)建消息配置對象
        MessageProperties messageProperties = new MessageProperties();
        // 設置消息過期時間
        messageProperties.setExpiration("6000");
        // 創(chuàng)建消息
        Message message = new Message("This Message will be deleted in 6000 ms".getBytes(), messageProperties);
        // 發(fā)消息
        rabbitTemplate.convertAndSend("msg.user", message);
        System.out.println("Message Sent...");

        context.close();
    }

}

如果同時設置了 queue 和 message 的 TTL 值,則只有二者中較小的才會起作用。

死信隊列

DLX(Dead Letter Exchanges)死信交換機 / 死信郵箱,當消息在隊列中由于某些原因沒有被及時消費而變成死信(dead message)后,這些消息就會被分發(fā)到 DLX 交換機中,而綁定 DLX 交換機的隊列,稱之為:“死信隊列”。
消息沒有被及時消費的原因:

  • 消息被拒絕(basic.reject / basic.nack)并且不再重新投遞 requeue=false。
  • 消息超時未消費。
  • 達到最大隊列長度。
流程圖

resources\spring\spring-rabbitmq-producer-dlx.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:rabbit="http://www.springframework.org/schema/rabbit"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/rabbit
       http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">

    <!-- 配置連接 -->
    <rabbit:connection-factory id="connectionFactory"
                               host="192.168.186.128"
                               port="5672"
                               username="zm"
                               password="123456"
                               virtual-host="/zm"
                               publisher-confirms="true"
    />

    <!-- 配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
    <rabbit:admin connection-factory="connectionFactory"/>

    <!-- 配置 rabbitmq 的模版 -->
    <rabbit:template id="rabbitTemplate"
                     connection-factory="connectionFactory"
                     exchange="spring_topic_exchange"/>

    <!-- 聲明死信隊列 -->
    <rabbit:queue name="dlx_queue"/>

    <!-- 聲明定向的死信交換機 -->
    <rabbit:direct-exchange name="dlx_exchange">
        <rabbit:bindings>
            <rabbit:binding key="dlx_ttl" queue="dlx_queue"/>
            <rabbit:binding key="dlx_max" queue="dlx_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

    <!-- 聲明測試過期的消息隊列 -->
    <rabbit:queue name="test_ttl_queue">
        <rabbit:queue-arguments>
            <!-- 設置隊列的過期時間 TTL -->
            <entry key="x-message-ttl" value-type="long" value="10000"/>
            <!-- 消息如果超時,將消息投遞給死信交換機 -->
            <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 聲明測試超出長度的消息隊列 -->
    <rabbit:queue name="test_max_queue">
        <rabbit:queue-arguments>
            <!-- 設置隊列的額定長度 (本隊列最多裝 2 個消息) -->
            <entry key="x-max-length" value-type="long" value="2"/>
            <!-- 消息如果超出長度,將消息投遞給死信交換機 -->
            <entry key="x-dead-letter-exchange" value="dlx_exchange"/>
        </rabbit:queue-arguments>
    </rabbit:queue>

    <!-- 聲明定向的測試消息的交換機 -->
    <rabbit:direct-exchange name="my_exchange">
        <rabbit:bindings>
            <rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/>
            <rabbit:binding key="dlx_max" queue="test_max_queue"/>
        </rabbit:bindings>
    </rabbit:direct-exchange>

</beans>

發(fā)消息進行測試

public class SendDLX {

    public static void main(String[] args) {
        ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
        RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
        
        // 測試超時
        // rabbitTemplate.convertAndSend("dlx_ttl", "Overtime: Close".getBytes());

        // 測試超過最大長度
        rabbitTemplate.convertAndSend("dlx_max", "OverSize: 1".getBytes());
        rabbitTemplate.convertAndSend("dlx_max", "OverSize: 2".getBytes());
        rabbitTemplate.convertAndSend("dlx_max", "OverSize: 3".getBytes());

        System.out.println("Message Sent...");

        context.close();
    }

}

延遲隊列

延遲隊列 = TTL + 死信隊列的合體。

死信隊列只是一種特殊的隊列,里面的消息仍然可以消費。

在電商開發(fā)部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題。

生產者

沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可。

消費者

resources\spring\spring-rabbitmq-consumer.xml

...
<!-- 監(jiān)聽死信隊列-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
    <rabbit:listener ref="consumerListener" queue-names="dlx_queue"/>
</rabbit:listener-container>
...
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容