RabbitMQ簡(jiǎn)介
RabbitMQ是一個(gè)開(kāi)源的消息代理和隊(duì)列服務(wù)器,用來(lái)通過(guò)普通協(xié)議在完全不同的應(yīng)用之間共享數(shù)據(jù),RabbitMQ是使用Erlang語(yǔ)言來(lái)編寫的,并且RabbitMQ是給予AMQP協(xié)議(Advanced Message Queuing Protocol 高級(jí)消息隊(duì)列協(xié)議,是具有現(xiàn)代特征的二進(jìn)制協(xié)議。是一個(gè)提供統(tǒng)一消息服務(wù)的應(yīng)用層標(biāo)準(zhǔn)高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì))的。
AMQP核心概念
-
Server:又稱Broker,接收客戶端的連接,實(shí)現(xiàn)AMQP實(shí)體服務(wù); -
Connection:連接,應(yīng)用程序和Broker之間的網(wǎng)絡(luò)連接; -
Channel:網(wǎng)絡(luò)信道,幾乎所有的操作都是在Channel中進(jìn)行的,Channel是進(jìn)行消息讀寫的通道??蛻舳丝梢越⒍鄠€(gè)Channel,每個(gè)Channel代表一個(gè)會(huì)話任務(wù),有點(diǎn)類似于數(shù)據(jù)中的session; -
Message:消息,服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),由Properties和Body組成。Properties可以對(duì)消息進(jìn)行修飾,比如消息的優(yōu)先級(jí)、延遲等高級(jí)特性,Body則就是消息體內(nèi)容; -
Virtual Host:虛擬主機(jī),用于進(jìn)行邏輯隔離,最上層的消息路由。一個(gè)Virtual Host里面可以有若干個(gè)Exchange和Queue,同一個(gè)Virtual Host里面不能有相同名稱的Exchange或Queue,有點(diǎn)類似于Redis中的16個(gè)db,是邏輯層面的隔離; -
Exchange:交換機(jī),接收消息,根據(jù)路由鍵轉(zhuǎn)發(fā)消息到綁定的隊(duì)列(Producer生產(chǎn)消息后都是直接投遞到Exchange中); -
Binding:Exchange和Queue之間的虛擬連接,binding中可以包含routing key; -
Routing Key:一個(gè)路由規(guī)則,虛擬機(jī)可以用它來(lái)確定如何路由一個(gè)特定的消息; -
Queue:也被稱為Message Queue,消息隊(duì)列,保存消息并將它們轉(zhuǎn)發(fā)給消費(fèi)者。
RabbitMQ架構(gòu)圖

Producer生產(chǎn)消息之后直接將消息投遞到Exchange中,在投遞的時(shí)候需要指定兩個(gè)重要的信息,一個(gè)是消息需要被投遞到哪個(gè)Exchange上,另一個(gè)是Routing Key,也就是將消息路由到哪個(gè)Message Queue上。
RabbitMQ安裝
參考官網(wǎng)的安裝,已經(jīng)非常詳細(xì)了,官網(wǎng)推薦的安裝是將RabbitMQ和Erlang一起安裝了,如果要單獨(dú)安裝的話,需要注意RabbitMQ和Erlang之間的版本需要對(duì)應(yīng)。
https://www.rabbitmq.com/install-rpm.html
RabbitMQ基本使用
- 服務(wù)的啟動(dòng):rabbitmq-server start &
- 服務(wù)的停止:rabbitmqctl stop_app
- 管理插件:rabbitmq-plugins enable rabbitmq_management(啟動(dòng)管控臺(tái)插件,方便圖形化管理rabbitmq)
- 訪問(wèn)地址:http://localhost:15672
RabbitMQ常用命令-基礎(chǔ)操作
-
rabbitmqctl stop_app: 關(guān)閉應(yīng)用 -
rabbitmqctl start_app: 啟動(dòng)應(yīng)用 -
rabbitmqctl status: 查看節(jié)點(diǎn)狀態(tài) -
rabbitmqctl add_user username password: 添加用戶 -
rabbitmqctl list_users: 列出所有用戶 -
rabbitmqctl delete_user username: 刪除用戶 -
rabbitmqctl clear_permissions -p vhostpath username: 清除用戶權(quán)限 -
rabbitmqctl list_user_permissions username: 列出用戶權(quán)限 -
rabbitmqctl change_password username newpassword: 修改密碼 -
rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*": 設(shè)置用戶權(quán)限(權(quán)限分別為configure write read,也就是可以配置、可寫、可讀) -
rabbitmqctl add_vhost vhostpath: 創(chuàng)建虛擬主機(jī) -
rabbitmqctl list_vhosts: 列出所有虛擬主機(jī) -
rabbitmqctl list_permissions -p vhostpath: 列出虛擬主機(jī)上所有權(quán)限 -
rabbitmqctl list_queues: 查看所有隊(duì)列信息 -
rabbitmqctl -p vhostpath purge_queue blue: 清楚隊(duì)列中的消息
RabbitMQ常用命令-高級(jí)操作
-
rabbitmqctl reset: 移除所有數(shù)據(jù),要在rabbitmqctl stop_app之后使用 -
rabbitmqctl join_cluster <clusternode> [--ram]: 組成集群命令 -
rabbitmqctl change_cluster_node_type <clusternode> disc | ram: 修改集群節(jié)點(diǎn)的存儲(chǔ)形式,disc為磁盤存儲(chǔ),消息數(shù)據(jù)是存儲(chǔ)在磁盤上的,可靠性高,但是持久化時(shí)間長(zhǎng),ram是內(nèi)存存儲(chǔ),消息是存儲(chǔ)在內(nèi)存中,性能好,但是可能存在丟失 -
rabbitmqctl forget_cluster_node [--offline]: 忘記節(jié)點(diǎn)(摘除節(jié)點(diǎn)) -
rabbitmqctl rename_cluster_node oldnode1 newnode1 [oldnode2] [newnode2]...: 修改節(jié)點(diǎn)名稱
生產(chǎn)者消費(fèi)者模型構(gòu)建
- 創(chuàng)建好一個(gè)SpringBoot或者Spring或者普通的Java項(xiàng)目
- 安裝RabbitMQ相關(guān)依賴
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.6.5</version>
</dependency>
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 通過(guò)Channel發(fā)送數(shù)據(jù)
/*
* basicPublish的四個(gè)參數(shù)為別為:
* exchange: 交換機(jī),如果為空的,routingKey的規(guī)則就是routingKey需要和消息隊(duì)列的名稱一樣,不然就發(fā)送失敗
* routingKey: 路由規(guī)則
* properties: 消息的額外修飾
* body: 消息體,也就是消息的主要內(nèi)容
*/
for (int i = 0; i < 5; i++) {
String msg = "Hello, RabbitMQ!";
channel.basicPublish("", "test001", null, msg.getBytes());
}
// 5. 關(guān)閉連接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明一個(gè)隊(duì)列
/*
* queueDeclare方法的五個(gè)參數(shù)
* queue: 隊(duì)列的名稱
* durable: 是否是持久化,也就是RabbitMQ服務(wù)重啟之后消息隊(duì)列是否被保存,為true就是持久化,服務(wù)重啟消息隊(duì)列不會(huì)被刪除
* exclusive: 是否獨(dú)占,有點(diǎn)類似于獨(dú)占鎖
* autoDelete: 是否開(kāi)啟自動(dòng)刪除,也就是當(dāng)該消息隊(duì)列沒(méi)有被綁定到任何一個(gè)Exchange上時(shí)是否自動(dòng)刪除
* arguments: 額外的參數(shù)
*/
String queueName = "test001";
channel.queueDeclare(queueName, true, true, false, null);
// 5. 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 設(shè)置Channel
/*
* basicConsume的三個(gè)參數(shù)的函數(shù)
* queue: 隊(duì)列的名稱
* autoAck: 是否自動(dòng)簽收,為true表示當(dāng)Consumer收到消息之后自動(dòng)發(fā)送ACK確定給Broker
* callback: 指定消費(fèi)者
*/
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 獲取消息
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端:" + msg);
}
}
}
交換機(jī)Exchange詳解
交換機(jī)屬性
- Name:交換機(jī)名稱
- Type:交換機(jī)類型,大致有direct、topic、fanout、headers四種
- Durability:是否需要持久化,true為持久化
- AutoDelete:當(dāng)最后一個(gè)綁定到Exchange上的隊(duì)列被刪除后,是否自動(dòng)刪除該Exchange
- Internal:當(dāng)前Exchange是否用于RabbitMQ內(nèi)部使用,默認(rèn)為false
- Arguments:擴(kuò)展參數(shù),用于擴(kuò)展AMQP協(xié)議定制化使用
交換機(jī)類型 - Direct Exchange
所有發(fā)送到Direct Exchange上的消息都會(huì)被轉(zhuǎn)發(fā)到RoutingKey中指定的Queue中,在Direct模式下可以使用RabbitMQ自帶的Exchange:default Exchange,所以不需要將Exchange進(jìn)行任何綁定(binding)操作(默認(rèn)的RoutingKey就是隊(duì)列的名稱),消息傳遞時(shí),RoutingKey必須完全匹配(名稱完全一樣,不支持模糊匹配)才會(huì)被隊(duì)列接收,否則該消息會(huì)被拋棄。
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange的名稱和RoutingKey
String exchangeName = "test_direct_exchange";
String routingKey = "test.direct";
// 5. 發(fā)送消息
String msg = "Hello RabbitMQ - Direct Exchange Message...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 6. 關(guān)閉連接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_direct_exchange";
String exchangeType = "direct";
String queueName = "test_direct_queue";
String routingKey = "test.direct";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 設(shè)置Channel
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端:" + msg);
}
}
}
交換機(jī)類型 - Topic Exchange
所有發(fā)送到Topic Exchange上的消息被轉(zhuǎn)發(fā)到所有關(guān)系RoutingKey中指定Topic的Queue中,Exchange將RoutingKey和某個(gè)Topic進(jìn)行模糊匹配,此時(shí)隊(duì)列需要綁定一個(gè)Topic。
上面這句話有點(diǎn)拗口,其實(shí)簡(jiǎn)單來(lái)說(shuō),就是當(dāng)Exchange的類型為topic時(shí),RoutingKey是一組規(guī)則(不再僅僅表示一個(gè)規(guī)則,Direct Exchange中的RoutingKey就是一個(gè)規(guī)則,Producer傳遞的RoutingKey必須和Exchange中的RoutingKey名稱完全一致才能發(fā)送成功),通過(guò)這組規(guī)則可以將多個(gè)RoutingKey和一個(gè)Queue進(jìn)行關(guān)聯(lián),只要滿足RoutingKey的規(guī)則就會(huì)被路由到相關(guān)的隊(duì)列中(比如RoutingKey為log.#,只要符合這個(gè)規(guī)則的消息都會(huì)被路由到相關(guān)隊(duì)列中)。
在制定RoutingKey時(shí)可以使用通配符進(jìn)行模糊匹配,符號(hào)#表示匹配一個(gè)或多個(gè)詞,*表示匹配一個(gè)詞(注意這里是詞,而不是字符),比如log.#可以匹配到log.info.oa,log.*只能匹配到log.info,是匹配不到log.info.oa的
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange的名稱和RoutingKey
String exchangeName = "test_topic_exchange";
String routingKey1 = "log.info.oa";
String routingKey2 = "log.error";
String routingKey3 = "log.debug";
// 5. 發(fā)送消息
String msg = "Hello RabbitMQ - Topic Exchange Message...";
channel.basicPublish(exchangeName, routingKey1, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey2, null, msg.getBytes());
channel.basicPublish(exchangeName, routingKey3, null, msg.getBytes());
// 6. 關(guān)閉連接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_topic_exchange";
String exchangeType = "topic";
String queueName = "test_topic_queue";
// String routingKey = "log.*";
String routingKey = "log.#";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 設(shè)置Channel
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端:" + msg);
}
}
}
交換機(jī)類型 - Fanout Exchange
該種交換機(jī)類型是不會(huì)處理RoutingKey的,只會(huì)簡(jiǎn)單地將隊(duì)列綁定到交換機(jī)上,發(fā)送到交換機(jī)的消息都會(huì)被轉(zhuǎn)發(fā)到與該交換機(jī)綁定的所有隊(duì)列上,F(xiàn)anout Exchange是轉(zhuǎn)發(fā)消息最快的,因?yàn)椴粫?huì)處理路由相關(guān)的操作,即使指定了RoutingKey也不會(huì)理會(huì)
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange的名稱和RoutingKey
String exchangeName = "test_fanout_exchange";
// 指定了RoutingKey也沒(méi)有作用
String routingKey = "log.debug";
// 5. 發(fā)送消息
String msg = "Hello RabbitMQ - Fanout Exchange Message...";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 6. 關(guān)閉連接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_fanout_exchange";
String exchangeType = "fanout";
String queueName = "test_fanout_queue";
String routingKey = "test";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, false, null);
channel.queueDeclare(queueName, true, true, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 設(shè)置Channel
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 獲取消息
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端:" + msg);
}
}
}
綁定、隊(duì)列、消息、虛擬主機(jī)詳解
綁定Binding
是指Exchange和Exchange、Exchange和Queue之間的連接關(guān)系
隊(duì)列
是指消息隊(duì)列,實(shí)際存儲(chǔ)消息數(shù)據(jù)的。包含一些屬性,比如Durability表示是否持久化,Durable就是持久化,Transient表示不持久化;Autodelete表示當(dāng)最后一個(gè)監(jiān)聽(tīng)被移除后,該Queue是否被自動(dòng)刪除。
Message
是指服務(wù)器和應(yīng)用程序之間傳送的數(shù)據(jù),本質(zhì)上就是一段數(shù)據(jù),由Properties和Payload(Body)組成,也包含一些屬性,比如delivery mode、headers(自定義屬性)、content_type、content_encoding、priority、correlation_id、reply_to、expiration、message_id、timestamp、type、user_id、app_id、cluster_id。
如何發(fā)送攜帶Properties的Message呢?
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 通過(guò)Channel發(fā)送數(shù)據(jù)
Map<String, Object> headers = new HashMap<>();
headers.put("name", "snow");
headers.put("sex", "man");
// 設(shè)置Properties
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("15000")
.contentEncoding("UTF-8")
.headers(headers)
.build();
for (int i = 0; i < 5; i++) {
String msg = "Hello, RabbitMQ!";
channel.basicPublish("", "test001", properties, msg.getBytes());
}
// 5. 關(guān)閉連接
channel.close();
connection.close();
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明一個(gè)隊(duì)列
String queueName = "test001";
channel.queueDeclare(queueName, true, false, false, null);
// 5. 創(chuàng)建消費(fèi)者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
// 6. 設(shè)置Channel
channel.basicConsume(queueName, true, queueingConsumer);
// 7. 獲取消息
while (true) {
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
Map<String, Object> headers = delivery.getProperties().getHeaders();
System.out.println("消費(fèi)端:" + msg);
System.out.println(headers.get("name"));
}
}
}
RabbitMQ高級(jí)特性
消息如何保證100%的投遞成功方案-1
什么是生產(chǎn)端的可靠性投遞?
- 保障消息的成功發(fā)出
- 保障MQ節(jié)點(diǎn)的成功接收
- 發(fā)送端收到MQ節(jié)點(diǎn)(Broker)的確認(rèn)應(yīng)答
- 完善的消息補(bǔ)償機(jī)制(也就是消息投遞失敗或者未收到Broker的確認(rèn)應(yīng)答的補(bǔ)償措施)
消息可靠性投遞的解決方案
- 消息落庫(kù),對(duì)消息狀態(tài)進(jìn)行打標(biāo)
- 消息的延遲投遞,做二次確認(rèn),回調(diào)檢查
[圖片上傳失敗...(image-d40d42-1636988003182)]
- Producer端首先將業(yè)務(wù)信息入庫(kù),同時(shí)創(chuàng)建一條消息入庫(kù),設(shè)置消息的status為0(表示消息已經(jīng)投遞)
- Producer端生成一條消息Message投遞到Broker
- Broker收到消息之后,發(fā)送確認(rèn)Confirm返回給Producer
- Producer收到Broker發(fā)送過(guò)來(lái)的Confirm之后,就將消息數(shù)據(jù)庫(kù)中消息的狀態(tài)為1(表示消息已經(jīng)投遞成功)
- 因?yàn)椴襟E2和步驟3都有可能發(fā)生故障,也就是消息投遞失敗,或者網(wǎng)絡(luò)等原因造成Producer未收到Broker發(fā)送過(guò)來(lái)的Confirm消息,所以需要開(kāi)啟一個(gè)分布式定時(shí)任務(wù)從消息數(shù)據(jù)庫(kù)中抓取status為0的消息
- 將抓取出來(lái)的status為0的消息重新投遞給Broker,重復(fù)上述動(dòng)作
- 因?yàn)樵跇O端狀況下有些消息可能就是會(huì)投遞失敗,不能無(wú)休止地重新投遞,可以設(shè)置一個(gè)投遞上限,比如最大重新投遞次數(shù)為3,如果3次投遞均失敗,就將消息數(shù)據(jù)庫(kù)中的消息狀態(tài)設(shè)置為3,之后再建立補(bǔ)償措施來(lái)對(duì)status為3的消息進(jìn)行處理
缺點(diǎn):由于在最開(kāi)始進(jìn)行了兩次入庫(kù)的操作,所以在高并發(fā)的情況下其實(shí)會(huì)有性能上的問(wèn)題。
消息如何保證100%的投遞成功方案-2
[圖片上傳失敗...(image-7aeb0b-1636988003182)]
- Producer端首先對(duì)業(yè)務(wù)消息進(jìn)行入庫(kù),然后同時(shí)生成兩條相同的消息,一條消息立即發(fā)出,另一條消息延遲一段時(shí)間再次發(fā)出
- Consumer端對(duì)消息隊(duì)列進(jìn)行監(jiān)聽(tīng),從中取出消息進(jìn)行消費(fèi),在消費(fèi)完一條消息之后,需要向Broker發(fā)送一個(gè)消費(fèi)確認(rèn)Confirm,表示該條消息已被消費(fèi)
- Callback Service對(duì)Consumer端發(fā)送的消費(fèi)確認(rèn)消息進(jìn)行監(jiān)聽(tīng),如果收到了Consumer端發(fā)送過(guò)來(lái)的消費(fèi)確認(rèn),就將消息數(shù)據(jù)庫(kù)中的消息進(jìn)行入庫(kù)
- 同時(shí)Callback還會(huì)對(duì)Producer端發(fā)送的另一條延遲消息進(jìn)行監(jiān)聽(tīng),如果收到了Producer發(fā)送過(guò)來(lái)的延遲消息,就從消息數(shù)據(jù)庫(kù)中查詢?cè)摋l消息是否已被消費(fèi),如果查詢不到或者消息消費(fèi)失敗,Callback Service就通知Producer進(jìn)行消息重發(fā)
優(yōu)點(diǎn):由于最開(kāi)始只是進(jìn)行了一次入庫(kù)的操作,性能得到了較大的提升,而Callback Service是一個(gè)補(bǔ)償措施,對(duì)業(yè)務(wù)的性能并不會(huì)產(chǎn)生實(shí)際的影響
具體的實(shí)現(xiàn)請(qǐng)參考:RabbitMQ之消息可靠性投遞實(shí)現(xiàn)
冪等性概念及業(yè)界主流解決方案
什么是冪等性?
通俗來(lái)說(shuō),就是假如我們要對(duì)一件事進(jìn)行操作,這個(gè)操作可能重復(fù)進(jìn)行100次或者1000次,那么無(wú)論操作多少次,這些操作的結(jié)果都是一樣的,就像數(shù)據(jù)庫(kù)中的樂(lè)觀鎖機(jī)制,比如我們多個(gè)線程同時(shí)更新庫(kù)存的SQL語(yǔ)句,不采用樂(lè)觀鎖的機(jī)制的話可能會(huì)存在線程安全問(wèn)題導(dǎo)致數(shù)據(jù)不一致,update sku set count = count - 1, version = version + 1 where version = 1,加上一個(gè)樂(lè)觀鎖來(lái)保證線程安全,當(dāng)然樂(lè)觀鎖的背后采用的原理是CAS(CompareAndSwap,也就是先比較然后再替換,保證操作的原子性)。
在海量訂單產(chǎn)生的業(yè)務(wù)高峰期,如何避免消息的重復(fù)消費(fèi)問(wèn)題?
在業(yè)務(wù)高峰期,可能會(huì)存在網(wǎng)絡(luò)原因或者其他原因?qū)е翽roducer端的消息重發(fā),消費(fèi)端要實(shí)現(xiàn)冪等性,就意味著我們的消息永遠(yuǎn)不會(huì)消費(fèi)多次,即使我們收到了多條一樣的消息,解決方案大致有兩種:
- 唯一ID + 指紋碼 機(jī)制,利用數(shù)據(jù)庫(kù)主鍵去重
- 利用Redis的原子性去實(shí)現(xiàn)
唯一ID + 指紋碼 機(jī)制
- 唯一ID + 指紋碼 機(jī)制,利用數(shù)據(jù)庫(kù)進(jìn)行主鍵去重
-
select count(1) from order where id = 唯一ID + 指紋碼,在消費(fèi)的時(shí)候先進(jìn)行查詢,如果查詢結(jié)果為1的話就表示已經(jīng)被消費(fèi)過(guò)了就不再重復(fù)進(jìn)行消費(fèi)了,沒(méi)有查詢出結(jié)果的話就說(shuō)明沒(méi)有被消費(fèi),就進(jìn)行數(shù)據(jù)庫(kù)的入庫(kù) -
好處:實(shí)現(xiàn)簡(jiǎn)單 -
壞處:高并發(fā)下有數(shù)據(jù)庫(kù)寫入的性能瓶頸 -
解決方案:根據(jù)ID進(jìn)行分庫(kù)分表,進(jìn)行算法路由,比如對(duì)ID進(jìn)行路由算法路由到不同的數(shù)據(jù)庫(kù)中,分?jǐn)傉麄€(gè)數(shù)據(jù)流量的壓力
利用Redis原子特性實(shí)現(xiàn)
- 使用Redis實(shí)現(xiàn)消費(fèi)端的冪等,有幾個(gè)需要考慮的問(wèn)題
- 第一:是否要進(jìn)行數(shù)據(jù)庫(kù)入庫(kù)的操作,如果要入庫(kù)的話,如何使得數(shù)據(jù)庫(kù)和緩存的入庫(kù)做到原子性,也就是如何實(shí)現(xiàn)數(shù)據(jù)庫(kù)和緩存的數(shù)據(jù)一致性,因?yàn)橛锌赡艹霈F(xiàn)這樣的情況,redis中保存了該order的數(shù)據(jù),但是在保存到數(shù)據(jù)庫(kù)的時(shí)候出現(xiàn)了問(wèn)題,導(dǎo)致數(shù)據(jù)庫(kù)中沒(méi)有保存成功,然后如何保證數(shù)據(jù)準(zhǔn)確地被同時(shí)保存在數(shù)據(jù)庫(kù)中呢?
- 第二:如果不進(jìn)行數(shù)據(jù)庫(kù)入庫(kù)的話,那么都存儲(chǔ)到緩存redis中,又如何設(shè)置定時(shí)同步的策略呢,因?yàn)閿?shù)據(jù)不可能一直保存在redis中,而且就算一直保存在redis中,redis服務(wù)也有可能會(huì)出現(xiàn)問(wèn)題,這也是需要重點(diǎn)考慮的問(wèn)題
Confirm確認(rèn)消息詳解
什么是Confirm消息確認(rèn)機(jī)制?
消息的確認(rèn),是指Producer投遞消息后,如果Broker收到消息,則會(huì)給我們Producer一個(gè)應(yīng)答,Producer進(jìn)行接收應(yīng)答,用來(lái)確定這條消息是否正常地發(fā)送到了Broker,這種方式也是消息的可靠性投遞的核心保障。
如何實(shí)現(xiàn)Confirm確認(rèn)消息?
- 在channel上開(kāi)啟確認(rèn)模式:
channel.confirmSelect() - 在channel上添加監(jiān)聽(tīng):
addConfirmListener,監(jiān)聽(tīng)成功或者失敗的返回結(jié)果,根據(jù)具體的結(jié)果對(duì)消息進(jìn)行重新發(fā)送或者日志記錄等后續(xù)處理
public class Producer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 指定消息投遞模式:消息的確認(rèn)模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
// 5. 發(fā)送消息
String msg = "Hello RabbitMQ! Send a confirm message.";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
// 6. 添加一個(gè)確認(rèn)監(jiān)聽(tīng)
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------ACK!------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("------NO ACK!------");
}
});
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.*";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端: " + msg);
}
}
}
Return返回消息詳解
什么是Return返回消息機(jī)制?
ReturnListener用于處理一些不可路由的消息,Producer生產(chǎn)一條消息之后,通過(guò)指定一個(gè)Exchange和RoutingKey,將消息送達(dá)到某一個(gè)隊(duì)列中去,然后Consumer監(jiān)聽(tīng)隊(duì)列,進(jìn)行消息的消費(fèi)處理操作,但是在某些情況下,Producer在投遞消息的時(shí)候,指定的Exchange不存在或者RoutingKey路由不到,就說(shuō)明消息投遞失敗,這個(gè)時(shí)候如果需要監(jiān)聽(tīng)這種不可達(dá)的消息,就需要使用ReturnListener。
在使用ReturnListener的基礎(chǔ)API時(shí)有一個(gè)關(guān)鍵的配置項(xiàng)是Mandatory,該參數(shù)為true,則ReturnListener會(huì)接收到路由不可達(dá)的消息,然后進(jìn)行后續(xù)的處理,如果為false,那么Broker端會(huì)自動(dòng)刪除該消息,ReturnListener是監(jiān)聽(tīng)不到的。
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "snow.save";
String msg = "Hello RabbitMQ! Send a Return message.";
boolean mandatory = true;
channel.basicPublish(exchangeName, routingKeyError, mandatory, null, msg.getBytes());
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----handle return----");
System.out.println("replyText: " + replyText);
System.out.println("exchange: " + exchange);
System.out.println("routingKey: " + routingKey);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
});
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_return_exchange";
String routingKey = "return.*";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while (true) {
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println("消費(fèi)端: " + msg);
}
}
}
自定義消費(fèi)者使用
如何自定義消費(fèi)者進(jìn)行消息消費(fèi)?
在之前,我們都是采用默認(rèn)的QueueingConsumer來(lái)創(chuàng)建一個(gè)消費(fèi)者,之后再使用while循環(huán)來(lái)不停地取出消息,但是這種方式不是特別好,一般我們會(huì)自定義自己的Consumer,那么要實(shí)現(xiàn)自定義的Consumer有兩種方式,一種是實(shí)現(xiàn)Consumer的接口,但是這種實(shí)現(xiàn)方式需要重寫很多方法,另一種是繼承DefaultConsumer,重寫其中的
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.save";
String msg = "Hello RabbitMQ! Send a Consumer message.";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
// 1. 創(chuàng)建一個(gè)ConnectionFactory,并且進(jìn)行相關(guān)連接配置
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_consumer_exchange";
String routingKey = "consumer.*";
String queueName = "test_consumer_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 創(chuàng)建消費(fèi)者消費(fèi)消息
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
}
消費(fèi)端的限流策略
什么是消費(fèi)端的限流?
假設(shè)一個(gè)場(chǎng)景,就是我們的RabbitMQ服務(wù)器有上萬(wàn)條未處理的消息,此時(shí)如果我們隨便打開(kāi)一個(gè)消費(fèi)者客戶端,會(huì)出現(xiàn)下面的情況,就是巨量的消息瞬間全部推送過(guò)來(lái),但是我們的單個(gè)客戶端無(wú)法同時(shí)處理這么多數(shù)據(jù),就有可能造成服務(wù)器崩潰。
RabbitMQ提供了一種qos(Quality of Service 服務(wù)質(zhì)量保證)功能,即在非自動(dòng)確認(rèn)消息(autoAck為false)的前提下,如果一定數(shù)目的消息(通過(guò)基于Consumer或者channel設(shè)置的Qos的值)未被確認(rèn)前,不進(jìn)行消費(fèi)新的消息。
void BasicQos(uint prefetchSize, ushort prefetchCount, bool global)中的prefetchSize表示單個(gè)消息的大小,為0表示不限制單個(gè)消息的大小,prefetchCount會(huì)告訴RabbitMQ不要同時(shí)給一個(gè)消費(fèi)者推送超過(guò)N個(gè)消息,即一旦有N個(gè)消息還沒(méi)有Ack,則該Consumer就將block阻塞住,直到有消息被Ack,global表示是否將前兩個(gè)參數(shù)的設(shè)置應(yīng)用于channel,簡(jiǎn)單點(diǎn)說(shuō)就是前兩個(gè)限制是channel級(jí)別還是Consumer級(jí)別的,一般設(shè)置為false,表示Consumer級(jí)別(prefetchCount只在autoAck為false的情況下才會(huì)生效,在自動(dòng)Ack的情況下是無(wú)效的)
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ! Send a QOS message.";
for (int i = 0; i < 5; i++) {
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_qos_exchange";
String routingKey = "qos.*";
String queueName = "test_qos_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 5. 限流,記得將basicConsume方法中的autoAck的值設(shè)置為false
channel.basicQos(0, 1, false);
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("consumerTag: " + consumerTag);
System.out.println("envelope: " + envelope);
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
// 消息的Ack確認(rèn),basicAck的第一個(gè)參數(shù)為消息的deliveryTag,第二個(gè)參數(shù)為是否批量簽收,如果限制的消息個(gè)數(shù)大于1,可以設(shè)置為true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}
消費(fèi)端ACK與重回隊(duì)列機(jī)制
消費(fèi)端的手工ACK和NACK為什么會(huì)存在?
- 消費(fèi)端在進(jìn)行消息消費(fèi)的時(shí)候,如果由于業(yè)務(wù)異常我們可以進(jìn)行日志的記錄,然后進(jìn)行補(bǔ)償,如果采用自動(dòng)ACK的話就達(dá)不到需求
- 如果由于服務(wù)器宕機(jī)等嚴(yán)重問(wèn)題,我們也需要手工進(jìn)行ACK來(lái)保障消費(fèi)端消費(fèi)成功,因?yàn)橄M(fèi)者宕機(jī)后,Broker收不到ACK或者NACK,就會(huì)重新發(fā)送消息給消費(fèi)端再次消費(fèi),因?yàn)樵谧詣?dòng)ACK的機(jī)制下Broker發(fā)送消息給消費(fèi)者時(shí),自動(dòng)確認(rèn)消息被處理完畢
消費(fèi)端的重回隊(duì)列機(jī)制
- 消費(fèi)端重回隊(duì)列是為了將沒(méi)有處理成功的消息重新投遞給Broker
- 一般在實(shí)際應(yīng)用中,都會(huì)關(guān)閉重回隊(duì)列,也就是將requeue設(shè)置為false
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String routingKey = "ack.save";
for (int i = 0; i < 5; i++) {
Map<String, Object> headers = new HashMap<>();
headers.put("num", i);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ! Send a ACK message." + i;
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_ack_exchange";
String routingKey = "ack.*";
String queueName = "test_ack_queue";
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 將autoAck設(shè)置為false,手工Ack確認(rèn)
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private final Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("body: " + new String(body));
if ((Integer) properties.getHeaders().get("num") == 0) {
// 第三個(gè)參數(shù)requeue表示是否重回隊(duì)列
this.channel.basicNack(envelope.getDeliveryTag(), false, false);
} else {
// 消息的Ack確認(rèn),basicAck的第一個(gè)參數(shù)為消息的deliveryTag,第二個(gè)參數(shù)為是否批量簽收,如果限制的消息個(gè)數(shù)大于1,可以設(shè)置為true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
TTL消息詳解
- TTL 是Time To Live的縮寫,也就是生存時(shí)間
- RabbitMQ支持消息的過(guò)期時(shí)間,在消息發(fā)送的時(shí)候可以再Properties中指定expiration過(guò)期時(shí)間
- RabbitMQ支持隊(duì)列的過(guò)期時(shí)間,從消息入隊(duì)列開(kāi)始計(jì)算,如果超過(guò)了隊(duì)列設(shè)置的超時(shí)時(shí)間配置還沒(méi)有被消費(fèi),該消息就會(huì)被自動(dòng)清除
死信隊(duì)列詳解
死信隊(duì)列 DLX Dead-Letter-Exchange
- 利用DLX,當(dāng)消息在一個(gè)隊(duì)列中變成死信(dead message)之后,它能被重新publish到另一個(gè)Exchange,這個(gè)Exchange就是DLX
- DLX也是一個(gè)正常的Exchange,和一般的Exchange沒(méi)有什么區(qū)別,它可以在任何隊(duì)列上被指定(也就是需要設(shè)置隊(duì)列的屬性),這樣的話只要這個(gè)隊(duì)列中有死信就會(huì)被重新發(fā)布到DLX中
- 當(dāng)設(shè)置了DLX的隊(duì)列中有死信時(shí),RabbitMQ就會(huì)自動(dòng)將這個(gè)死信重新發(fā)布到設(shè)置的Exchange中去,從而被路由到另一個(gè)隊(duì)列
- 可以監(jiān)聽(tīng)這個(gè)隊(duì)列中的消息做相應(yīng)的處理,這個(gè)特性可以彌補(bǔ)RabbitMQ3.0版本以前支持的immediate參數(shù)的功能
消息變成死信的情況
- 消息被拒絕或消費(fèi)失?。╞asicReject/basicNack)并且requeue為false(不重回隊(duì)列)
- 消息TTL過(guò)期
- 隊(duì)列達(dá)到最大長(zhǎng)度
死信隊(duì)列的設(shè)置
首先要設(shè)置死信隊(duì)列的Exchange和Queue,然后進(jìn)行綁定
- Exchange: dlx.exchange(名字可以任意?。?/li>
- Queue: dlx.queue(名字可以任意?。?/li>
- RoutingKey: # (為#表示任何消息都可以被路由到dlx.queue中)
然后再進(jìn)行正常的交換機(jī)、隊(duì)列聲明和綁定,只不過(guò)需要再被設(shè)置死信隊(duì)列的隊(duì)列中加上一個(gè)參數(shù):arguments.put("x-dead-letter-exchange", "dlx.exchange"),這樣消息在過(guò)期、不重回隊(duì)列、隊(duì)列達(dá)到最大長(zhǎng)度時(shí)被直接路由到死信隊(duì)列中
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.save";
for (int i = 0; i < 1; i++) {
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
.deliveryMode(2)
.expiration("10000")
.contentEncoding("UTF-8")
.build();
String msg = "Hello RabbitMQ! Send a ACK message.";
channel.basicPublish(exchangeName, routingKey, properties, msg.getBytes());
}
}
}
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("localhost");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
// 2. 通過(guò)連接工廠創(chuàng)建一個(gè)連接
Connection connection = connectionFactory.newConnection();
// 3. 通過(guò)Connection創(chuàng)建一個(gè)Channel
Channel channel = connection.createChannel();
// 4. 聲明Exchange、Queue、RoutingKey
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.*";
String queueName = "test_dlx_queue";
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "dlx.exchange");
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, arguments);
channel.queueBind(queueName, exchangeName, routingKey);
// 死信隊(duì)列的聲明
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
// 將autoAck設(shè)置為false,手工Ack確認(rèn)
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private final Channel channel;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----my consumer handle delivery----");
System.out.println("body: " + new String(body));
// 消息的Ack確認(rèn),basicAck的第一個(gè)參數(shù)為消息的deliveryTag,第二個(gè)參數(shù)為是否批量簽收,如果限制的消息個(gè)數(shù)大于1,可以設(shè)置為true
this.channel.basicAck(envelope.getDeliveryTag(), false);
}
}