什么是 RabbitMQ
MQ(Message Queue)消息隊列
消息隊列中間件,是分布式系統(tǒng)中的重要組件;
主要解決異步處理、應用解耦、流量削峰等問題,從而實現(xiàn)高性能,高可用,可伸縮和最終一致性的架構。
使用較多的消息隊列產品:RabbitMQ,RocketMQ,ActiveMQ,ZeroMQ,Kafka 等。
異步處理
用戶注冊后,需要發(fā)送驗證郵箱和手機驗證碼。
將注冊信息寫入數(shù)據(jù)庫,發(fā)送驗證郵件,發(fā)送手機,三個步驟全部完成后,返回給客戶端。
傳統(tǒng):
客戶端 <-> 注冊信息寫入數(shù)據(jù)庫 -> 發(fā)送注冊郵件 -> 發(fā)送注冊短信
現(xiàn)在:
客戶端 <-> 注冊信息寫入數(shù)據(jù)庫 -> 寫入消息隊列 -> 異步 [發(fā)送注冊郵件,發(fā)送注冊短信]
應用解耦
場景:訂單系統(tǒng)需要通知庫存系統(tǒng)。
如果庫存系統(tǒng)異常,則訂單調用庫存失敗,導致下單失敗。
原因:訂單系統(tǒng)和庫存系統(tǒng)耦合度太高。
傳統(tǒng):
用戶 <-> 訂單系統(tǒng) - 調用庫存接口 -> 庫存系統(tǒng)
現(xiàn)在:
用戶 <-> 訂單系統(tǒng) - 寫入 -> 消息隊列 <- 訂閱 - 庫存系統(tǒng)
訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊列,返回用戶,下單成功。
庫存系統(tǒng):訂閱下單的消息,獲取下單信息,庫存系統(tǒng)根據(jù)下單信息,再進行庫存操作。
假如:下單的時候,庫存系統(tǒng)不能正常運行,也不會影響下單,因為下單后,訂單系統(tǒng)寫入消息隊列就不再關心其他的后續(xù)操作了,實現(xiàn)了訂單系統(tǒng)和庫存系統(tǒng)的應用解耦。
所以,消息隊列是典型的“生產者-消費者“模型。
生產者不斷的向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。
因為消息的生產和消費都是異步的,而且只關心消息的發(fā)送和接收,沒有業(yè)務邏輯的入侵,這樣就實現(xiàn)了生產者和消費者的解耦。
流量削峰
搶購,秒殺等業(yè)務,針對高并發(fā)的場景。
因為流量過大,暴增會導致應用掛掉,為解決這個問題,在前端加入消息隊列。
用戶的請求,服務器接收后,首先寫入消息隊列,如果超過隊列的長度,就拋棄,發(fā)送一個結束的頁面;而請求成功的就是進入隊列的用戶。
背景知識介紹
AMQP 高級消息隊列協(xié)議
Advanced Message Queuing Protocol 是一個提供統(tǒng)一消息服務的應用層標準高級消息隊列協(xié)議。
協(xié)議:數(shù)據(jù)在傳輸?shù)倪^程中必須要遵守的規(guī)則。
基于此協(xié)議的客戶端可以與消息中間件傳遞消息。
并不受產品、開發(fā)語言等條件的限制。
JMS
Java Message Server 是 Java 消息服務應用程序接口,一種規(guī)范,和 JDBC 擔任的角色類似。
JMS 是一個 Java 平臺中關于面向消息中間件的 API,用于在兩個應用程序之間,或分布式系統(tǒng)中發(fā)送消息,進行異步通信。
二者的聯(lián)系
JMS 是定義了統(tǒng)一接口,統(tǒng)一消息操作;AMQP 通過協(xié)議統(tǒng)一數(shù)據(jù)交互格式。
JMS 必須是 Java 語言;AMQP 只是協(xié)議,與語言無關。
Erlang 語言
Erlang 是一種通用的面向并發(fā)的編程語言,目的是創(chuàng)造一種可以應對大規(guī)模并發(fā)活動的編程語言和運行環(huán)境。
最初是專門為通信應用設計的,比如控制交換機或者變換協(xié)議等,因此非常適合構建分布式,實時軟并行計算系統(tǒng)。
Erlang 運行時環(huán)境是一個虛擬機,有點像 Java 的虛擬機,這樣代碼一經編譯,同樣可以隨處運行。
為什么選擇 RabbitMQ
RabbitMQ 由 Erlang 開發(fā),AMQP 的最佳搭檔,安裝部署簡單,上手門檻低。
企業(yè)級消息隊列,經過大量實踐考驗的高可靠,大量成功的應用案例,例如阿里、網(wǎng)易等一線大廠都有使用。
有強大的 WEB 管理頁面。
強大的社區(qū)支持,為技術進步提供動力。
支持消息持久化、支持消息確認機制、靈活的任務分發(fā)機制等,支持功能非常豐富。
集群擴展很容易,并且可以通過增加節(jié)點實現(xiàn)成倍的性能提升。
總結:如果希望使用一個可靠性高、功能強大、易于管理的消息隊列系統(tǒng)那么就選擇 RabbitMQ;如果想用一個性能高,但偶爾丟點數(shù)據(jù),可以使用 Kafka 或者 ZeroMQ。
Kafka 和 ZeroMQ 的性能比 RabbitMQ 好很多。
RabbitMQ 各組件功能

Broker - 消息隊列服務器實體。
Virtual Host - 虛擬主機:
- 標識一批交換機、消息隊列和相關對象,形成的整體。
- 虛擬主機是共享相同的身份認證和加密環(huán)境的獨立服務器域。
- 每個 vhost 本質上就是一個 mini 版的 RabbitMQ 服務器,擁有自己的隊列、交換器、綁定和權限機制。
- VHost 是 AMQP 概念的基礎,RabbitMQ 默認的 vhost 是 /,必須在鏈接時指定。
Exchange - 交換器(路由):用來接收生產者發(fā)送的消息并將這些消息通過路由發(fā)給服務器中的隊列。
Banding - 綁定。用于交換機和消息隊列之間的關聯(lián)
Queue - 消息隊列:
- 用來保存消息直到發(fā)送給消費者。
- 它是消息的容器,也是消息的終點。
- 一個消息可投入一個或多個隊列。
- 消息一直在隊列里面,等待消費者連接到這個隊列將其取走。
- Channel - 通道(信道):
- 多路復用連接中的一條獨立的雙向數(shù)據(jù)流通道。
- 信道是建立在真實的 TCP 連接內的虛擬鏈接。
- AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊列還是接收消息,都是通過信道完成的。
- 因為對于操作系統(tǒng)來說,建立和銷毀 TCP 連接都是非常昂貴的開銷,所以引入了信道的概 念,用來復用 TCP 連接。
Connection - 網(wǎng)絡連接,比如一個 TCP 連接。
Publisher - 消息的生產者,也是一個向交換器發(fā)布消息的客戶端應用程序。
Consumer - 消息的消費者,表示一個從消息隊列中取得消息的客戶端應用程序。
Message - 消息:
- 消息是不具名的,它是由消息頭和消息體組成。
- 消息體是不透明的,而消息頭則是由一系列的可選屬性組成,這些屬性包括 routing-key(路由鍵)、priority(優(yōu)先級)、delivery-mode(消息可能需要持久性存儲[消息的路由模式])等。
使用 RabbitMQ
想要安裝 RabbitMQ,必須先安裝 erlang 語言環(huán)境;類似安裝 tomcat,必須先安裝 JDK。
查看匹配的版本:https://www.rabbitmq.com/which-erla.html
RabbitMQ 安裝啟動
Erlang 下載:https://dl.bintray.com/rabbitmq-erlang/rpm/erlang
Socat 下載:http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
RabbitMQ 下載:https://www.rabbitmq.com/install-rpm.html#downloads
安裝
啟動 Linux 系統(tǒng)(192.168.186.128),傳輸相關的三個 rpm 到 /opt 目錄下,然后在 /opt 目錄下按順序執(zhí)行安裝命令:
rpm -ivh erlang-21.3.8.16-1.el7.x86_64.rpm
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
rpm -ivh rabbitmq-server-3.8.6-1.el7.noarch.rpm
啟動后臺管理插件
rabbitmq-plugins enable rabbitmq_management
啟動 RabbitMQ
systemctl start rabbitmq-server.service
systemctl status rabbitmq-server.service
systemctl restart rabbitmq-server.service
systemctl stop rabbitmq-server.service
查看進程
ps -ef | grep rabbitmq
測試
- 關閉防火墻
systemctl stop firewalld
(或者防火墻開放對應的端口號)
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --zone=public --add-port=5671/tcp --permanent
firewall-cmd --zone=public --add-port=5672/tcp --permanent
firewall-cmd --zone=public --add-port=25672/tcp --permanent
firewall-cmd --reload
- 瀏覽器輸入:http://ip:15672(比如這里輸入:http://192.168.186.128:15672)
- 默認帳號和密碼是 guest,而 guest 用戶默認不允許遠程連接
創(chuàng)建賬號:
rabbitmqctl add_user zm 123456
設置用戶角色:
rabbitmqctl set_user_tags zm administrator
設置用戶權限:
rabbitmqctl set_permissions -p "/" zm ".*" ".*" ".*"
查看當前用戶和角色:
rabbitmqctl list_users
修改用戶密碼:
rabbitmqctl change_password zm NewPassword
管理界面介紹:
- Overview - 概覽
- Connections - 查看鏈接情況
- Channels - 信道(通道)情況
- Exchanges - 交換機(路由)情況,默認4類7個
- Queues - 消息隊列情況
- Admin - 管理員列表
- RabbitMQ 提供給編程語言客戶端鏈接的端口 - 5672;RabbitMQ 管理界面的端口 - 15672;RabbitMQ 集群的端口 - 25672。
RabbitMQ 快速操作
依賴
<!-- 指定編碼及版本 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
<java.version>1.11</java.version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
日志依賴 log4j(可選項)
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rebbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
創(chuàng)建連接
先在 RabbitMQ 管理界面 Admin -> Virtual Hosts -> Add a new virtual host 創(chuàng)建虛擬主機 (Name: /zm, Description: zm, Tags: administrator);
然后編寫連接的代碼:
public class ConnectionUtil {
public static Connection getConnection() throws Exception{
// 1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
// 2.在工廠對象中設置 MQ 的連接信息(ip, port, vhost, username, password)
factory.setHost("192.168.186.128");
factory.setPort(5672);
factory.setVirtualHost("/zm");
factory.setUsername("zm");
factory.setPassword("123456");
// 3.通過工廠獲得與 MQ 的連接
return factory.newConnection();
}
public static void main(String[] args) throws Exception {
Connection connection = getConnection();
System.out.println("Connection: " + connection);
connection.close();
}
}
RabbitMQ 模式
RabbitMQ 提供了 6 種消息模型,但是第 6 種其實是 RPC,并不是 MQ。
在線手冊:https://www.rabbitmq.com/getstarted.html
5 種消息模型,大體分為兩類:
- 1 和 2 屬于點對點。
- 3、4、5 屬于發(fā)布訂閱模式(一對多)。
點對點模式 - P2P(Point to Point)模式:
- 包含三個角色:消息隊列 queue,發(fā)送者 sender,接收者 receiver。
- 每個消息發(fā)送到一個特定的隊列中,接收者從中獲得消息。
- 隊列中保留這些消息,直到他們被消費或超時。
- 如果希望發(fā)送的每個消息都會被成功處理,那需要 P2P。
特點:
- 每個消息只有一個消費者,一旦消費,消息就不在隊列中了。
- 發(fā)送者和接收者之間沒有依賴性,發(fā)送者發(fā)送完成,不管接收者是否運行,都不會影響消息發(fā)送到隊列中。
- 接收者成功接收消息之后需向對象應答成功(確認)。
發(fā)布訂閱模式 - publish / subscribe 模式:
- Pub / Sub 模式包含三個角色:交換機 exchange,發(fā)布者 publisher,訂閱者 subcriber。
- 多個發(fā)布者將消息發(fā)送交換機,系統(tǒng)將這些消息傳遞給多個訂閱者。
- 如果希望發(fā)送的消息被多個消費者處理,可采用 Pub / Sub。
特點:
- 每個消息可以有多個訂閱者。
- 發(fā)布者和訂閱者之間在時間上有依賴,對于某個交換機的訂閱者,必須創(chuàng)建一個訂閱后,才能消費發(fā)布者的消息。
- 為了消費消息,訂閱者必須保持運行狀態(tài)。
簡單模式(Hello World!)
)
RabbitMQ 本身只是接收,存儲和轉發(fā)消息,并不會對信息進行處理;類似郵局,處理信件的應該是收件人而不是郵局。
生產者 P
public class Sender {
public static void main(String[] args) throws Exception {
String msg = "Hello, 你好 zm";
// 1.獲得連接
Connection connection = ConnectionUtil.getConnection();
// 2.在連接中創(chuàng)建通道(信道)
Channel channel = connection.createChannel();
// 3.創(chuàng)建消息隊列 (1,2,3,4,5)
/*
參數(shù) 1: 隊列的名稱
參數(shù) 2: 隊列中的數(shù)據(jù)是否持久化
參數(shù) 3: 是否排外(是否支持擴展,當前隊列只能自己用,不能給別人用)
參數(shù) 4: 是否自動刪除(當隊列的連接數(shù)為 0 時,隊列會銷毀,不管隊列是否還存保存數(shù)據(jù))
參數(shù) 5: 隊列參數(shù)(沒有參數(shù)為 null)
*/
channel.queueDeclare("queue1", false, false, false, null);
// 4.向指定的隊列發(fā)送消息 (1,2,3,4)
/*
參數(shù) 1: 交換機名稱,當前是簡單模式,也就是 P2P 模式,沒有交換機,所以名稱為 ""
參數(shù) 2: 目標隊列的名稱
參數(shù) 3: 設置消息的屬性(沒有屬性則為 null)
參數(shù) 4: 消息的內容 (只接收字節(jié)數(shù)組)
*/
channel.basicPublish("", "queue1", null, msg.getBytes());
System.out.println("發(fā)送:" + msg);
// 5.釋放資源
channel.close();
connection.close();
}
}
啟動生產者,即可前往管理端查看隊列中的信息,會有一條信息沒有處理。
消費者 C
public class Receiver {
public static void main(String[] args) throws Exception {
// 1.獲得連接
Connection connection = ConnectionUtil.getConnection();
// 2.獲得通道(信道)
Channel channel = connection.createChannel();
// 3.從信道中獲得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body 就是從隊列中獲取的消息
String s = new String(body);
System.out.println("獲取消息為:" + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume("queue1", true, consumer);
}
}
啟動消費者,前往管理端查看隊列中的信息,所有信息都已經處理和確認,顯示 0。
消息確認機制 ACK
通過剛才的案例可以看出,消息一旦被消費,消息就會立刻從隊列中移除。
如果消費者接收消息后,還沒執(zhí)行操作就拋異常宕機導致消費失敗,但是 RabbitMQ 無從得知,這樣消息就丟失了。
因此,RabbitMQ 有一個 ACK 機制,當消費者獲取消息后,會向 RabbitMQ 發(fā)送回執(zhí) ACK,告知消息已經被接收。
ACK - Acknowledge character 即是確認字符,在數(shù)據(jù)通信中,接收站發(fā)給發(fā)送站的一種傳輸類控制字符,表示發(fā)來的數(shù)據(jù)已確認接收無誤。在使用 http 請求時,http 的狀態(tài)碼 200 就是表示服務器執(zhí)行成功。
整個過程就像快遞員將包裹送到你手里,并且需要你的簽字,并拍照回執(zhí)。
不過這種回執(zhí) ACK 分為兩種情況:
- 自動 ACK - 消息接收后,消費者立刻自動發(fā)送 ACK,類似快遞放在快遞柜。
- 手動 ACK - 消息接收后,不會發(fā)送 ACK,需要手動調用,類似快遞必須本人簽收。
兩種情況如何選擇,需要看消息的重要性:
- 如果消息不太重要,丟失也沒有影響,自動 ACK 會比較方便。
- 如果消息非常重要,最好消費完成手動 ACK;因為如果自動 ACK 消費后,RabbitMQ 就會把消息從隊列中刪除,而此時消費者拋異常宕機,那么消息就永久丟失了。
修改啟動手動 ACK 消息確認:
// 監(jiān)聽隊列 false: 手動消息確認
channel.basicConsume("queue1", false, consumer);
啟動生產者和消費者,前往管理端查看隊列中的信息,會有一條信息沒有確認(Unacked)。
手動 ACK消息確認解決問題:
public class ReceiverAck {
public static void main(String[] args) throws Exception {
// 1.獲得連接
Connection connection = ConnectionUtil.getConnection();
// 2.獲得通道(信道)
final Channel channel = connection.createChannel();
// 3.從信道中獲得消息
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 交付處理(收件人信息,包裹上的快遞標簽,協(xié)議的配置,消息)
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// body就是從隊列中獲取的消息
String s = new String(body);
System.out.println("獲取消息為:" + s);
// 手動確認(收件人信息,是否同時確認多個消息)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 4.監(jiān)聽隊列 false: 手動消息確認
channel.basicConsume("queue1", false, consumer);
}
}
工作隊列模式(Work queues)
簡單模式,一個消費者來處理消息,如果生產者生產消息過快過多,而消費者的能力有限,就會產生消息在隊列中堆積(生活中的滯銷)。
當運行許多消費者程序時,消息隊列中的任務會被眾多消費者共享,但其中某一個消息只會被一個消費者獲取(100 支肉串 20 個人吃,但是其中的某支肉串只能被一個人吃)。
生產者 P
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_work_queue",false,false,false,null);
for(int i = 1;i<=100;i++) {
String msg = "Message --> " + i;
channel.basicPublish("", "test_work_queue", null, msg.getBytes());
System.out.println(msg);
}
channel.close();
connection.close();
}
}
消費者 1
public class Receiver1 {
// 統(tǒng)計獲取的信息的數(shù)量
static int counter = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Receiver 1: " + s + ". Total Message Count: " + counter++);
// 模擬網(wǎng)絡延遲 200 毫秒
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手動確認(收件人信息,是否同時確認多個消息)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 4.監(jiān)聽隊列 false:手動消息確認
channel.basicConsume("test_work_queue", false, consumer);
}
}
消費者 2
public class Receiver2 {
// 統(tǒng)計獲取的信息的數(shù)量
static int counter = 1;
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
channel.queueDeclare("test_work_queue", false, false, false, null);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Receiver 2: " + s + ". Total Message Count: " + counter++);
// 模擬網(wǎng)絡延遲 900 毫秒
try {
Thread.sleep(900);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手動確認(收件人信息,是否同時確認多個消息)
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 4.監(jiān)聽隊列 false:手動消息確認
channel.basicConsume("test_work_queue", false, consumer);
}
}
能者多勞
先運行 2 個消費者,排隊等候消費(取餐),再運行生產者開始生產消息(烤肉串)。
由運行結果可以看到,雖然兩個消費者的消費速度不一致(線程休眠時間),但是消費的數(shù)量卻是一致的,各消費 50 個消息。
- 例如:工作中,A 編碼速率高,B 編碼速率低,兩個人同時開發(fā)一個項目,A 10 天完成,B 30 天完成,A 完成自己的編碼部分,就無所事事了,等著 B 完成就可以了,這樣是不可以的,應該遵循“能者多勞”。
- 效率高的多干點,效率低的少干點。
為了克服這個問題,可以使用設置為 prefetchCount = 1 的 basicQos 方法。這告訴RabbitMQ 一次不要給一個 worker 發(fā)送一條以上的消息?;蛘?,換句話說,在 worker 處理并確認前一個消息之前,不要向它發(fā)送新消息。相反,它將把它分派到下一個不繁忙的 worker。
在消費者 1 和消費者 2 中加上 channel.basicQos(1):
...
// queueDeclare() 此方法有雙重作用,如果隊列不存在,就創(chuàng)建;如果隊列存在,則獲取
channel.queueDeclare("test_work_queue", false, false, false, null);
// 開啟一次接受一條消息??梢岳斫鉃椋嚎爝f一個一個送,送完一個再送下一個,速度快的送件就多
channel.basicQos(1);
...
能者多勞必須要配合手動的 ACK 機制才生效。
如何避免消息堆積?
- Workqueue,多個消費者監(jiān)聽同一個隊列。
- 接收到消息后,通過線程池,異步消費。
發(fā)布/訂閱模式(Publish/Subscribe)
工作隊列背后的假設是,每個任務都被準確地交付給一個工作者;“發(fā)布/訂閱”模式將一個消息傳遞給多個消費者。
生活中的案例:眾多粉絲關注一個視頻主,視頻主發(fā)布視頻,所有粉絲都可以得到視頻通知。
生產者 P 發(fā)送信息給路由 X,路由 X 將信息轉發(fā)給綁定路由 X 的隊列;隊列將信息通過信道發(fā)送給消費者,最后消費者進行消費。整個過程,必須先創(chuàng)建路由。
路由在生產者程序中創(chuàng)建。
路由沒有存儲消息的能力,當生產者將信息發(fā)送給路由后,消費者還沒有運行,所以沒有隊列,路由并不知道將信息發(fā)送給誰。
運行程序的順序:
- 執(zhí)行一次 MessageSender,聲明了路由。
- 執(zhí)行 MessageReceiver1 和 MessageReceiver2,綁定到路由。
- 再次執(zhí)行 MessageSender,發(fā)送消息給路由。
生產者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明路由(路由名,路由類型)
// fanout:不處理路由鍵(只需要將隊列綁定到路由上,發(fā)送到路由的消息都會被轉發(fā)到與該路由綁定的所有隊列上)
channel.exchangeDeclare("test_exchange_fanout", "fanout");
String msg = "Hello,world";
channel.basicPublish("test_exchange_fanout", "", null, msg.getBytes());
System.out.println("Publisher:" + msg);
channel.close();
connection.close();
}
}
消費者 1
public class Receiver1 {
private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(關注)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Subscriber 1: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
消費者 2
public class Receiver2 {
private static final String RECEIVER_QUEUE = "test_exchange_fanout_queue_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(關注)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_fanout", "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Subscriber 2: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
路由模式(Routing)
路由會根據(jù)類型進行定向(direct)分發(fā)消息給不同的隊列;每種類型可以對應多個消費者。
運行程序的順序:
- 先運行一次 Sender(創(chuàng)建路由器)。
- 有了路由器之后,在創(chuàng)建兩個 Receiver1 和 Receiver2,進行隊列綁定。
- 再次運行 Sender,發(fā)出消息。
生產者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明路由 (路由名,路由類型)
// direct:根據(jù)路由鍵進行定向分發(fā)消息
channel.exchangeDeclare("test_exchange_direct", "direct");
String msg = "Register New User: userid=S101";
channel.basicPublish("test_exchange_direct", "insert", null, msg.getBytes());
System.out.println(msg);
channel.close();
connection.close();
}
}
消費者 1
public class Receiver1 {
private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(如果路由鍵的類型是 添加,刪除,修改 的話,綁定到這個隊列 1 上)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "insert");
channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "update");
channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "delete");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Cosumer 1: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
消費者 2
public class Receiver2 {
private static final String RECEIVER_QUEUE = "test_exchange_direct_queue_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(如果路由鍵的類型是 查詢 的話,綁定到這個隊列 2 上)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_direct", "select");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Cosumer 2: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
通配符模式(Topics)
通配符模式是和路由模式差不多,唯獨的區(qū)別就是路由鍵支持模糊匹配。
匹配符號:
*:只能匹配一個詞(正好一個詞,多一個不行,少一個也不行)。
#: 匹配 0 個或更多個詞。
案例:
Q1 綁定了路由鍵 `*.orange.*`
Q2 綁定了路由鍵 `*.*.rabbit` 和 `lazy.#`
quick.orange.rabbit # Q1 Q2
lazy.orange.elephant # Q1 Q2
quick.orange.fox # Q1
lazy.brown.fox # Q2
lazy.pink.rabbit # Q2
quick.brown.fox # 無
orange # 無
quick.orange.male.rabbit # 無
生產者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明路由 (路由名,路由類型)
// topic:模糊匹配的定向分發(fā)
channel.exchangeDeclare("test_exchange_topic", "topic");
String msg = "price-off promotion";
channel.basicPublish("test_exchange_topic", "product.price", null, msg.getBytes());
System.out.println("Provider: " + msg);
channel.close();
connection.close();
}
}
消費者 1
public class Receiver1 {
private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(綁定用戶相關的消息)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Consumer 1: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
消費者 2
public class Receiver2 {
private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_2";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列
channel.queueDeclare(RECEIVER_QUEUE, false, false, false, null);
// 綁定路由(綁定商品和訂單相關的消息)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "product.#");
channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "order.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Consumer 2: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
持久化
-
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何避免消息丟失?
消費者的 ACK 確認機制,可以防止消費者丟失消息。
萬一在消費者消費之前,RabbitMQ 服務器宕機了,那消息也會丟失。
想要將消息持久化,那么路由和隊列都要持久化才可以。
生產者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明路由 (路由名,路由類型,持久化)
// topic:模糊匹配的定向分發(fā)
channel.exchangeDeclare("test_exchange_topic", "topic", true);
String msg = "price-off promotion";
// 信道持久化
channel.basicPublish("test_exchange_topic", "product.price", MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
System.out.println("Provider: " + msg);
channel.close();
connection.close();
}
}
消費者
public class Receiver1 {
private static final String RECEIVER_QUEUE = "test_exchange_topic_queue_1";
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 聲明隊列 (第二個參數(shù)為 true:支持持久化)
channel.queueDeclare(RECEIVER_QUEUE, true, false, false, null);
// 綁定路由(綁定用戶相關的消息)
channel.queueBind(RECEIVER_QUEUE, "test_exchange_topic", "user.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Consumer 1: " + s);
}
};
// 4.監(jiān)聽隊列 true: 自動消息確認
channel.basicConsume(RECEIVER_QUEUE, true, consumer);
}
}
Spring 整合 RabbitMQ
五種消息模型,在企業(yè)中應用最廣泛的就是定向匹配 topics。
Spring AMQP 是基于 Spring 框架的 AMQP 消息解決方案,提供模板化的發(fā)送和接收消息的抽象層,提供基于消息驅動的 POJO 的消息監(jiān)聽等,簡化了對于 RabbitMQ 相關程序的開發(fā)。
生產端工程
- 依賴 pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.0.1.RELEASE</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.9</version>
</dependency>
</dependencies>
- spring-rabbitmq-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 1.配置連接 -->
<rabbit:connection-factory id="connectionFactory"
host="192.168.186.128"
port="5672"
username="zm"
password="123456"
virtual-host="/zm"
publisher-confirms="true"
/>
<!-- 2.配置隊列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.配置交換機,topic 類型 -->
<rabbit:topic-exchange name="spring_topic_exchange">
<rabbit:bindings>
<!-- 綁定隊列 -->
<rabbit:binding pattern="msg.#" queue="test_spring_queue_1"/>
</rabbit:bindings>
</rabbit:topic-exchange>
<!-- 5.配置 json 轉換的工具 -->
<bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter"/>
<!-- 6.配置 rabbitmq 的模版 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"/>
</beans>
- 發(fā)消息 com.zm.test.Sender:
public class Sender {
public static void main(String[] args) {
// 1.創(chuàng)建 spring 容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
// 2.從 spring 容器中獲得 rabbit 模版對象
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 3.發(fā)消息
Map<String, String> map = new HashMap<String, String>();
map.put("name", "張三");
map.put("email", "123456789@qq.com");
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("Message Sent...");
context.close();
}
}
消費端工程
- 依賴與生產者一致
- spring-rabbitmq-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!--1.配置連接-->
<rabbit:connection-factory
id="connectionFactory"
host="192.168.186.128"
port="5672"
username="zm"
password="123456"
virtual-host="/zm"/>
<!-- 2.配置隊列 -->
<rabbit:queue name="test_spring_queue_1"/>
<!-- 3.配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 4.注解掃描包 springIOC -->
<context:component-scan base-package="com.zm.listener"/>
<!-- 5.配置監(jiān)聽 -->
<rabbit:listener-container connection-factory="connectionFactory">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
</beans>
- 消費者:
MessageListener 接口用于 spring 容器接收到消息后處理消息;
如果需要使用自己定義的類型來實現(xiàn)處理消息時,必須實現(xiàn)該接口,并重寫 onMessage() 方法;
當 spring 容器接收消息后,會自動交由 onMessage 進行處理。
com.zm.listener.ConsumerListener:
@Component
public class ConsumerListener implements MessageListener {
/**
* jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的
*/
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message) {
// 將 message對象轉換成 json
JsonNode jsonNode = null;
try {
jsonNode = MAPPER.readTree(message.getBody());
String name = jsonNode.get("name").asText();
String email = jsonNode.get("email").asText();
System.out.println("Message From Queue:{" + name + ", " + email + "}");
} catch (IOException e) {
e.printStackTrace();
}
}
}
- 啟動項目 com.zm.test.TestRunner:
public class TestRunner {
public static void main(String[] args) throws IOException {
// 獲得容器
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-consumer.xml");
// 讓程序一直運行,別終止
System.in.read();
}
}
消息成功確認機制
在實際場景下,有的生產者發(fā)送的消息是必須保證成功發(fā)送到消息隊列中,需要 事務機制 和 發(fā)布確認機制。
事務機制
AMQP 協(xié)議提供的一種保證消息成功投遞的方式,通過信道開啟 transactional 模式;
利用信道的三個方法來實現(xiàn)以事務方式發(fā)送消息,若發(fā)送失敗,通過異常處理回滾事務,確保消息成功投遞
- channel.txSelect() - 開啟事務
- channel.txCommit() - 提交事務
- channel.txRollback() - 回滾事務
Spring 已經對上面三個方法進行了封裝,所以這里使用原始的代碼演示。
生產者
public class Sender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("test_transaction", "topic");
// 開啟事務
channel.txSelect();
try {
channel.basicPublish("test_transaction", "product.price", null, "Item 1: price-off".getBytes());
// 模擬出錯
// System.out.println(1 / 0);
channel.basicPublish("test_transaction", "product.price", null, "Item 2: price-off".getBytes());
// 提交事務(一起成功)
channel.txCommit();
System.out.println("Producer: All Messages Sent");
} catch (Exception e) {
System.out.println("All Messages Rollback");
// 事務回滾(一起失敗)
channel.txRollback();
e.printStackTrace();
} finally {
channel.close();
connection.close();
}
}
}
消費者
public class Receiver {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("test_transaction_queue", false, false, false, null);
channel.queueBind("test_transaction_queue", "test_transaction", "product.#");
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String s = new String(body);
System.out.println("Consumer: " + s);
}
};
// 4.監(jiān)聽隊列 true:自動消息確認
channel.basicConsume("test_transaction_queue", true, consumer);
}
}
Confirm 發(fā)布確認機制
RabbitMQ 為了保證消息的成功投遞,采用通過 AMQP 協(xié)議層面提供事務機制的方案,但是采用事務會大大降低消息的吞吐量。
開啟事務性能最大損失超過 250 倍。
事務效率低下原因:100 條消息,前 99 條成功,如果第 100 條失敗,那么 99 條消息要全部撤銷回滾。
更加高效的解決方式是采用 Confirm 模式,而 Confirm 模式則采用補發(fā)第 100 條的措施來完成 100 條消息的送達。
在 Spring 中應用
- resources\spring\spring-rabbitmq-producer.xml
...
<!-- 6.配置 rabbitmq 的模版 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"
message-converter="jsonMessageConverter"
confirm-callback="messageConfirm"/>
<!-- 7.確認機制的處理類 -->
<bean id="messageConfirm" class="com.renda.confirm.MessageConfirm"/>
...
- 消息確認處理類 com.zm.confirm.MessageConfirm:
public class MessageConfirm implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData 消息相關的數(shù)據(jù)對象(封裝了消息的唯一 id)
* @param b 消息是否確認成功
* @param s 異常信息
*/
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
if (b) {
System.out.println("Successfully Confirmed Message");
} else {
System.out.println("Fail to Confirm Message, error: " + s);
// 如果本條消息一定要發(fā)送到隊列中,例如下訂單消息,可以采用補發(fā)
// 1.采用遞歸(限制遞歸的次數(shù))
// 2.redis + 定時任務(jdk 的 timer,或者定時任務框架 Quartz)
}
}
}
- resources\log4j.properties
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %m%n
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.File=rabbitmq.log
log4j.appender.file.layout=org.apache.log4j.PatternLayout
log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %l %m%n
log4j.rootLogger=debug, stdout,file
- 發(fā)送消息 com.zm.test.Sender:
...
// 3.發(fā)消息
Map<String, String> map = new HashMap<String, String>();
map.put("name", "張三");
map.put("email", "123456789@qq.com");
// 模擬發(fā)送消息失敗
// rabbitTemplate.convertAndSend("fuck", "msg.user", map);
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("Message Sent...");
...
消費端限流
RabbitMQ 服務器積壓了成千上萬條未處理的消息,然后隨便打開一個消費者客戶端,就會出現(xiàn)這樣的情況:巨量的消息瞬間全部噴涌推送過來,但是單個客戶端無法同時處理這么多數(shù)據(jù),就會被壓垮崩潰。
所以,當數(shù)據(jù)量特別大的時候,對生產端限流肯定是不科學的,因為有時候并發(fā)量就是特別大,有時候并發(fā)量又特別少,這是用戶的行為 - 是無法約束的。
應該對消費端限流,用于保持消費端的穩(wěn)定。
RabbitMQ 提供了一種 QoS(Quality of Service,服務質量)服務質量保證功能;
即在非自動確認消息的前提下,如果一定數(shù)目的消息未被確認前,不再進行消費新的消息。
生產者 com.zm.test.Sender 使用循環(huán)發(fā)出多條消息:
...
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("msg.user", map);
System.out.println("Message Sent...");
}
...
RabbitMQ 的管理頁面可以看到生產了 10 條堆積未處理的消息。
消費者進行限流處理:
resources\spring\spring-rabbitmq-consumer.xml
...
5.配置監(jiān)聽 -->
<!--
prefetch="3":一次性消費的消息數(shù)量。
會告訴 RabbitMQ 不要同時給一個消費者推送多于 N 個消息,
一旦有 N 個消息還沒有 ack,則該 consumer 將阻塞,直到消息被 ack。
-->
<!-- acknowledge-mode: manual 手動確認-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="test_spring_queue_1"/>
</rabbit:listener-container>
...
com.zm.listener.ConsumerListener
@Component
public class ConsumerListener extends AbstractAdaptableMessageListener {
/**
* jackson 提供序列化和反序列中使用最多的類,用來轉換 json 的
*/
private static final ObjectMapper MAPPER = new ObjectMapper();
@Override
public void onMessage(Message message, Channel channel) throws Exception {
// 將 message對象轉換成 json
// JsonNode jsonNode = MAPPER.readTree(message.getBody());
// String name = jsonNode.get("name").asText();
// String email = jsonNode.get("email").asText();
// System.out.println("Message From Queue:{" + name + ", " + email + "}");
String str = new String(message.getBody());
System.out.println("str = " + str);
/**
* 手動確認消息(參數(shù)1,參數(shù)2)
* 參數(shù) 1:RabbitMQ 想該 channel 投遞的這條消息的唯一標識 ID,此 ID 是一個單調遞增的正整數(shù)。
* 參數(shù) 2:為了減少網(wǎng)絡流量,手動確認可以被批量處理;當該參數(shù)為 true 時,則可以一次性確認小于等于 msgId 值的所有消息。
*/
long msgId = message.getMessageProperties().getDeliveryTag();
channel.basicAck(msgId, true);
Thread.sleep(3000);
System.out.println("Rest for 3 seconds and then continue for more messages...");
}
}
每次最多只確認接收 3 條消息,直到消息隊列為空。
過期時間 TTL
Time To Live - 生存時間、還能活多久,單位毫秒。
在這個周期內,消息可以被消費者正常消費,超過這個時間,則自動刪除(其實是被稱為 dead message 并投入到死信隊列,無法消費該消息)。
RabbitMQ 可以對消息和隊列設置 TTL:
- 通過隊列設置,隊列中所有消息都有相同的過期時間。
- 對消息單獨設置,每條消息的 TTL 可以不同(更顆粒化)。
設置隊列 TTL
RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。
resources\spring\spring-rabbitmq-producer.xml
<!-- 對隊列中的消息設置過期時間 -->
<rabbit:queue name="test_spring_queue_1" auto-declare="true">
<rabbit:queue-arguments>
<entry key="x-message-ttl" value-type="long" value="5000"/>
</rabbit:queue-arguments>
</rabbit:queue>
5 秒之后,消息自動刪除。
設置消息 TTL
RabbitMQ 管理端刪除掉 test_spring_queue_1 隊列。
設置某條消息的 TTL,只需要在創(chuàng)建發(fā)送消息時指定即可。
resources\spring\spring-rabbitmq-producer.xml
<rabbit:queue name="test_spring_queue_1"/>
com.zm.test.Sender2
public class Sender2 {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 創(chuàng)建消息配置對象
MessageProperties messageProperties = new MessageProperties();
// 設置消息過期時間
messageProperties.setExpiration("6000");
// 創(chuàng)建消息
Message message = new Message("This Message will be deleted in 6000 ms".getBytes(), messageProperties);
// 發(fā)消息
rabbitTemplate.convertAndSend("msg.user", message);
System.out.println("Message Sent...");
context.close();
}
}
如果同時設置了 queue 和 message 的 TTL 值,則只有二者中較小的才會起作用。
死信隊列
DLX(Dead Letter Exchanges)死信交換機 / 死信郵箱,當消息在隊列中由于某些原因沒有被及時消費而變成死信(dead message)后,這些消息就會被分發(fā)到 DLX 交換機中,而綁定 DLX 交換機的隊列,稱之為:“死信隊列”。
消息沒有被及時消費的原因:
- 消息被拒絕(basic.reject / basic.nack)并且不再重新投遞 requeue=false。
- 消息超時未消費。
- 達到最大隊列長度。

resources\spring\spring-rabbitmq-producer-dlx.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd">
<!-- 配置連接 -->
<rabbit:connection-factory id="connectionFactory"
host="192.168.186.128"
port="5672"
username="zm"
password="123456"
virtual-host="/zm"
publisher-confirms="true"
/>
<!-- 配置 rabbitAdmin: 主要用于在 java 代碼中對隊列的管理,用來創(chuàng)建,綁定,刪除隊列與交換機,發(fā)送消息等 -->
<rabbit:admin connection-factory="connectionFactory"/>
<!-- 配置 rabbitmq 的模版 -->
<rabbit:template id="rabbitTemplate"
connection-factory="connectionFactory"
exchange="spring_topic_exchange"/>
<!-- 聲明死信隊列 -->
<rabbit:queue name="dlx_queue"/>
<!-- 聲明定向的死信交換機 -->
<rabbit:direct-exchange name="dlx_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="dlx_queue"/>
<rabbit:binding key="dlx_max" queue="dlx_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
<!-- 聲明測試過期的消息隊列 -->
<rabbit:queue name="test_ttl_queue">
<rabbit:queue-arguments>
<!-- 設置隊列的過期時間 TTL -->
<entry key="x-message-ttl" value-type="long" value="10000"/>
<!-- 消息如果超時,將消息投遞給死信交換機 -->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 聲明測試超出長度的消息隊列 -->
<rabbit:queue name="test_max_queue">
<rabbit:queue-arguments>
<!-- 設置隊列的額定長度 (本隊列最多裝 2 個消息) -->
<entry key="x-max-length" value-type="long" value="2"/>
<!-- 消息如果超出長度,將消息投遞給死信交換機 -->
<entry key="x-dead-letter-exchange" value="dlx_exchange"/>
</rabbit:queue-arguments>
</rabbit:queue>
<!-- 聲明定向的測試消息的交換機 -->
<rabbit:direct-exchange name="my_exchange">
<rabbit:bindings>
<rabbit:binding key="dlx_ttl" queue="test_ttl_queue"/>
<rabbit:binding key="dlx_max" queue="test_max_queue"/>
</rabbit:bindings>
</rabbit:direct-exchange>
</beans>
發(fā)消息進行測試
public class SendDLX {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("spring/spring-rabbitmq-producer-dlx.xml");
RabbitTemplate rabbitTemplate = context.getBean(RabbitTemplate.class);
// 測試超時
// rabbitTemplate.convertAndSend("dlx_ttl", "Overtime: Close".getBytes());
// 測試超過最大長度
rabbitTemplate.convertAndSend("dlx_max", "OverSize: 1".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "OverSize: 2".getBytes());
rabbitTemplate.convertAndSend("dlx_max", "OverSize: 3".getBytes());
System.out.println("Message Sent...");
context.close();
}
}
延遲隊列
延遲隊列 = TTL + 死信隊列的合體。
死信隊列只是一種特殊的隊列,里面的消息仍然可以消費。
在電商開發(fā)部分中,都會涉及到延時關閉訂單,此時延遲隊列正好可以解決這個問題。
生產者
沿用上面死信隊列案例的超時測試,超時時間改為訂單關閉時間即可。
消費者
resources\spring\spring-rabbitmq-consumer.xml
...
<!-- 監(jiān)聽死信隊列-->
<rabbit:listener-container connection-factory="connectionFactory" prefetch="3" acknowledge="manual">
<rabbit:listener ref="consumerListener" queue-names="dlx_queue"/>
</rabbit:listener-container>
...