參考
官網(wǎng):https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
B站:https://www.bilibili.com/video/BV1dE411K7MG
CSDN:https://blog.csdn.net/kavito/article/details/91403659
安裝
https://blog.csdn.net/qq_44767162/article/details/114531509
簡介
消息指的是兩個應(yīng)用間傳遞的數(shù)據(jù)。數(shù)據(jù)的類型有很多種形式,可能只包含文本字符串,也可能包含嵌入對象。
“消息隊(duì)列(Message Queue)”是在消息的傳輸過程中保存消息的容器。在消息隊(duì)列中,通常有生產(chǎn)者和消費(fèi)者兩個角色。生產(chǎn)者只負(fù)責(zé)發(fā)送數(shù)據(jù)到消息隊(duì)列,誰從消息隊(duì)列中取出數(shù)據(jù)處理,他不管。消費(fèi)者只負(fù)責(zé)從消息隊(duì)列中取出數(shù)據(jù)處理,他不管這是誰發(fā)送的數(shù)據(jù)。
[圖片上傳失敗...(image-99bb95-1637568679843)]
主要有三個作用:
- 解耦。如圖所示。假設(shè)有系統(tǒng)B、C、D都需要系統(tǒng)A的數(shù)據(jù),于是系統(tǒng)A調(diào)用三個方法發(fā)送數(shù)據(jù)到B、C、D。這時,系統(tǒng)D不需要了,那就需要在系統(tǒng)A把相關(guān)的代碼刪掉。假設(shè)這時有個新的系統(tǒng)E需要數(shù)據(jù),這時系統(tǒng)A又要增加調(diào)用系統(tǒng)E的代碼。為了降低這種強(qiáng)耦合,就可以使用MQ,系統(tǒng)A只需要把數(shù)據(jù)發(fā)送到MQ,其他系統(tǒng)如果需要數(shù)據(jù),則從MQ中獲取即可。
[圖片上傳失敗...(image-d2f09c-1637568679843)]
- 異步。如圖所示。一個客戶端請求發(fā)送進(jìn)來,系統(tǒng)A會調(diào)用系統(tǒng)B、C、D三個系統(tǒng),同步請求的話,響應(yīng)時間就是系統(tǒng)A、B、C、D的總和,也就是800ms。如果使用MQ,系統(tǒng)A發(fā)送數(shù)據(jù)到MQ,然后就可以返回響應(yīng)給客戶端,不需要再等待系統(tǒng)B、C、D的響應(yīng),可以大大地提高性能。對于一些非必要的業(yè)務(wù),比如發(fā)送短信,發(fā)送郵件等等,就可以采用MQ。
[圖片上傳失敗...(image-fb5677-1637568679843)]
- 削峰。如圖所示。這其實(shí)是MQ一個很重要的應(yīng)用。假設(shè)系統(tǒng)A在某一段時間請求數(shù)暴增,有5000個請求發(fā)送過來,系統(tǒng)A這時就會發(fā)送5000條SQL進(jìn)入MySQL進(jìn)行執(zhí)行,MySQL對于如此龐大的請求當(dāng)然處理不過來,MySQL就會崩潰,導(dǎo)致系統(tǒng)癱瘓。如果使用MQ,系統(tǒng)A不再是直接發(fā)送SQL到數(shù)據(jù)庫,而是把數(shù)據(jù)發(fā)送到MQ,MQ短時間積壓數(shù)據(jù)是可以接受的,然后由消費(fèi)者每次拉取2000條進(jìn)行處理,防止在請求峰值時期大量的請求直接發(fā)送到MySQL導(dǎo)致系統(tǒng)崩潰。
[圖片上傳失敗...(image-5f3010-1637568679843)]
RabbitMQ的特點(diǎn)
RabbitMQ是一款使用Erlang語言開發(fā)的,實(shí)現(xiàn)AMQP(高級消息隊(duì)列協(xié)議)的開源消息中間件。首先要知道一些RabbitMQ的特點(diǎn),官網(wǎng)可查:
- 可靠性。支持持久化,傳輸確認(rèn),發(fā)布確認(rèn)等保證了MQ的可靠性。
- 靈活的分發(fā)消息策略。這應(yīng)該是RabbitMQ的一大特點(diǎn)。在消息進(jìn)入MQ前由Exchange(交換機(jī))進(jìn)行路由消息。分發(fā)消息策略有:簡單模式、工作隊(duì)列模式、發(fā)布訂閱模式、路由模式、通配符模式。
- 支持集群。多臺RabbitMQ服務(wù)器可以組成一個集群,形成一個邏輯Broker。
- 多種協(xié)議。RabbitMQ支持多種消息隊(duì)列協(xié)議,比如 STOMP、MQTT 等等。
- 支持多種語言客戶端。RabbitMQ幾乎支持所有常用編程語言,包括 Java、.NET、Ruby 等等。
- 可視化管理界面。RabbitMQ提供了一個易用的用戶界面,使得用戶可以監(jiān)控和管理消息 Broker。
- 插件機(jī)制。RabbitMQ提供了許多插件,可以通過插件進(jìn)行擴(kuò)展,也可以編寫自己的插件。
準(zhǔn)備
maven依賴
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
RabbitMQUtils
public class RabbitMQUtils {
// 定義連接工廠
private static ConnectionFactory factory;
static {
// 重量級資源 類加載執(zhí)行
factory = new ConnectionFactory();
// 主機(jī)地址
factory.setHost("192.168.1.107");
// 端口
factory.setPort(5672);
// 設(shè)置虛擬主機(jī)
factory.setVirtualHost("/ems");
// 賬號密碼
factory.setUsername("ems");
factory.setPassword("123");
}
/**
* 建立 RabbitMQ 連接
*
* @return
* @throws Exception
*/
public static Connection getConnection() {
try {
return factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void close(Channel channel, Connection connection) {
try {
if (channel != null) {
channel.close();
}
if (connection != null) {
connection.close();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
細(xì)節(jié)
生產(chǎn)者消費(fèi)者隊(duì)列要嚴(yán)格對應(yīng),消費(fèi)者退出自動刪除才生效
①基本消息模型:

在上圖的模型中,有以下概念:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會一直等待消息到來。
- queue:消息隊(duì)列,圖中紅色部分。可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
生產(chǎn)者
public class Send {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException {
// 1、獲取到連接
Connection connection = RabbitMQUtils.getConnection();
// 2、從連接中創(chuàng)建通道,使用通道才能完成消息相關(guān)的操作
Channel channel = connection.createChannel();
// 3、聲明(創(chuàng)建)隊(duì)列
// 參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數(shù)明細(xì)
* 1、queue 隊(duì)列名稱
* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在
* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時隊(duì)列的創(chuàng)建
* 4、autoDelete 自動刪除,隊(duì)列不再使用時是否自動刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時隊(duì)列(隊(duì)列不用了就自動刪除)
* 5、arguments 參數(shù),可以設(shè)置一個隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 4、消息內(nèi)容
String message = "Hello World!";
// 向指定的隊(duì)列中發(fā)送消息
//參數(shù):String exchange, String routingKey, BasicProperties props, byte[] body
/**
* 參數(shù)明細(xì):
* 1、exchange,交換機(jī),如果不指定將使用mq的默認(rèn)交換機(jī)(設(shè)置為"")
* 2、routingKey,路由key,交換機(jī)根據(jù)路由key來將消息轉(zhuǎn)發(fā)到指定的隊(duì)列,如果使用默認(rèn)交換機(jī),routingKey設(shè)置為隊(duì)列的名稱
* 3、props,消息的屬性
* 4、body,消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Send '" + message + "'");
//關(guān)閉通道和連接(資源關(guān)閉最好用try-catch-finally語句處理)
RabbitMQUtils.close(channel, connection);
}
}
消費(fèi)者
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException {
// 1、獲取到連接
Connection connection = RabbitMQUtils.getConnection();
// 2、從連接中創(chuàng)建通道,使用通道才能完成消息相關(guān)的操作
Channel channel = connection.createChannel();
// 3、聲明(創(chuàng)建)隊(duì)列
// 參數(shù):String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
/**
* 參數(shù)明細(xì)
* 1、queue 隊(duì)列名稱
* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在
* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問,如果connection連接關(guān)閉隊(duì)列則自動刪除,如果將此參數(shù)設(shè)置true可用于臨時隊(duì)列的創(chuàng)建
* 4、autoDelete 自動刪除,隊(duì)列不再使用時是否自動刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時隊(duì)列(隊(duì)列不用了就自動刪除)
* 5、arguments 參數(shù),可以設(shè)置一個隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時間
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 監(jiān)聽隊(duì)列,第二個參數(shù):是否自動進(jìn)行消息確認(rèn)。
//參數(shù):String queue, boolean autoAck, Consumer callback
/**
* 參數(shù)明細(xì):
* 1、queue 隊(duì)列名稱
* 2、autoAck 自動回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會自動回復(fù)mq,如果設(shè)置為false要通過編程實(shí)現(xiàn)回復(fù)
* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法
*/
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個方法類似事件監(jiān)聽,如果有消息的時候,會被自動調(diào)用
/**
* 當(dāng)接收到消息后此方法將被調(diào)用
* @param consumerTag 消費(fèi)者標(biāo)簽,用來標(biāo)識消費(fèi)者的,在監(jiān)聽隊(duì)列時設(shè)置channel.basicConsume
* @param envelope 信封,通過envelope
* @param properties 消息屬性
* @param body 消息內(nèi)容
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Recv '" + new String(body) + "'");
}
});
// 不用關(guān)閉,使其處于監(jiān)聽狀態(tài)
// abbitMQUtils.close(channel, connection);
}
}
②work消息模型
工作隊(duì)列或者競爭消費(fèi)者模式

work queues與入門程序相比,多了一個消費(fèi)端,兩個消費(fèi)端共同消費(fèi)同一個隊(duì)列中的消息,但是一個消息只能被一個消費(fèi)者獲取。
這個消息模型在Web應(yīng)用程序中特別有用,可以處理短的HTTP請求窗口中無法處理復(fù)雜的任務(wù)。
生產(chǎn)者
public class provider {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 10; i++) {
channel.basicPublish("", QUEUE_NAME, null, ("你好hello work queue "+i).getBytes());
}
RabbitMQUtils.close(channel, connection);
}
}
消費(fèi)者1
public class Customer1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 測試處理速度不同問題
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("Customer1:" + new String(body));
}
});
}
}
消費(fèi)者2
public class Customer2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer2:" + new String(body));
}
});
}
}
能者多勞
需要了解確認(rèn)機(jī)制
public class Customer1 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點(diǎn)最多處理1個Message
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1:" + new String(body));
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
// 關(guān)閉了自動確認(rèn)則手動設(shè)置確認(rèn)
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
}
Customer1 處理得慢設(shè)置為手動確認(rèn),Customer2 處理的快設(shè)置為自動確認(rèn)
public class Customer2 {
private final static String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 這樣RabbitMQ就會使得每個Consumer在同一個時間點(diǎn)最多處理1個Message
channel.basicQos(1);
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer2:" + new String(body));
}
});
}
}
③Publish/subscribe(交換機(jī)類型:Fanout,也稱為廣播 )
Publish/subscribe模型示意圖 :

生產(chǎn)者
public class Provider {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, "fanout type message".getBytes());
RabbitMQUtils.close(channel, connection);
}
}
消費(fèi)者(三個)
public class Customer1 {
private final static String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1:" + new String(body));
}
});
}
}
Customer1,Customer2,Customer3幾乎相同
④Routing 路由模型(交換機(jī)類型:direct)
Routing模型示意圖:

P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時,會指定一個routing key。
X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
生產(chǎn)者
public class Provider {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// 測試 info error warning
String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME, routingKey, null, ("direct type 基于routingKey:[" + routingKey + "] message ").getBytes());
RabbitMQUtils.close(channel, connection);
}
}
消費(fèi)者1
public class Customer1 {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, "error");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1:" + new String(body));
}
});
}
}
消費(fèi)者2
public class Customer2 {
private final static String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue, EXCHANGE_NAME, "info");
channel.queueBind(queue, EXCHANGE_NAME, "warning");
channel.queueBind(queue, EXCHANGE_NAME, "error");
channel.basicConsume(queue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer2:" + new String(body));
}
});
}
}
direct 下
Customer1 直接收 error
Customer2 接收 info、warning、error
⑤Topics 通配符模式(交換機(jī)類型:topics)
Topics模型示意圖:

每個消費(fèi)者監(jiān)聽自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey,生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊(duì)列。
Routingkey一般都是有一個或者多個單詞組成,多個單詞之間以“.”分割,例如:inform.sms
通配符規(guī)則:
:匹配一個或多個詞
*:匹配不多不少恰好1個詞
生產(chǎn)者
public class Provider { private final static String EXCHANGE_NAME = "topic_exchange"; public static void main(String[] args) throws IOException { Connection connection = RabbitMQUtils.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 測試 * # String routingKey = "user.save.find"; channel.basicPublish(EXCHANGE_NAME, routingKey, null, ("topic type 基于routingKey:[" + routingKey + "] message ").getBytes()); RabbitMQUtils.close(channel, connection); }}
消費(fèi)者1
public class Customer1 {
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,BuiltinExchangeType.TOPIC);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1:"+new String(body));
}
});
}
}
消費(fèi)者4
public class Customer2 {
private final static String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException {
Connection connection = RabbitMQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,EXCHANGE_NAME,"user.#");
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Customer1:"+new String(body));
}
});
}
}
⑥RPC
RPC模型示意圖:

基本概念:
Callback queue 回調(diào)隊(duì)列,客戶端向服務(wù)器發(fā)送請求,服務(wù)器端處理請求后,將其處理結(jié)果保存在一個存儲體中。而客戶端為了獲得處理結(jié)果,那么客戶在向服務(wù)器發(fā)送請求時,同時發(fā)送一個回調(diào)隊(duì)列地址reply_to。
Correlation id 關(guān)聯(lián)標(biāo)識,客戶端可能會發(fā)送多個請求給服務(wù)器,當(dāng)服務(wù)器處理完后,客戶端無法辨別在回調(diào)隊(duì)列中的響應(yīng)具體和那個請求時對應(yīng)的。為了處理這種情況,客戶端在發(fā)送每個請求時,同時會附帶一個獨(dú)有correlation_id屬性,這樣客戶端在回調(diào)隊(duì)列中根據(jù)correlation_id字段的值就可以分辨此響應(yīng)屬于哪個請求。
流程說明:
當(dāng)客戶端啟動的時候,它創(chuàng)建一個匿名獨(dú)享的回調(diào)隊(duì)列。
在 RPC 請求中,客戶端發(fā)送帶有兩個屬性的消息:一個是設(shè)置回調(diào)隊(duì)列的 reply_to 屬性,另一個是設(shè)置唯一值的 correlation_id 屬性。
將請求發(fā)送到一個 rpc_queue 隊(duì)列中。
服務(wù)器等待請求發(fā)送到這個隊(duì)列中來。當(dāng)請求出現(xiàn)的時候,它執(zhí)行他的工作并且將帶有執(zhí)行結(jié)果的消息發(fā)送給 reply_to 字段指定的隊(duì)列。
客戶端等待回調(diào)隊(duì)列里的數(shù)據(jù)。當(dāng)有消息出現(xiàn)的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應(yīng)用
分享兩道面試題:
面試題:
避免消息堆積?
1) 采用workqueue,多個消費(fèi)者監(jiān)聽同一隊(duì)列。
2)接收到消息以后,而是通過線程池,異步消費(fèi)。
如何避免消息丟失?
1) 消費(fèi)者的ACK機(jī)制??梢苑乐瓜M(fèi)者丟失消息。
但是,如果在消費(fèi)者消費(fèi)之前,MQ就宕機(jī)了,消息就沒了?
2)可以將消息進(jìn)行持久化。要將消息持久化,前提是:隊(duì)列、Exchange都持久化
Spring整合RibbitMQ
依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
application
spring:
application:
name: rabbitmq-springboot
rabbitmq:
host: 192.168.1.107
port: 5672
username: ems
password: 123
virtual-host: /ems
RabbitTemplate
Simple
生產(chǎn)者
@SpringBootTest
class RabbitmqStringbootApplicationTests {
@Autowired
RabbitTemplate rabbitTemplate;
// simple
@Test
void testHello() {
rabbitTemplate.convertAndSend("hello", "hello world");
}
}
消費(fèi)者
@Component
@RabbitListener(queuesToDeclare = @Queue(value = "hello",declare = "true"))
public class HelloCustomer {
@RabbitHandler
public void receive(String message) {
System.out.println("message = " + message);
}
}
Work
生產(chǎn)者
// work
@Test
void testWork() {
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work", "work模型");
}
}
消費(fèi)者
@Component
public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
Fanout
生產(chǎn)者
// fanout
@Test
void testFanout() {
rabbitTemplate.convertAndSend("logs", "", "Fanout的模型發(fā)送消息");
}
消費(fèi)者
@Component
public class FanoutCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "logs", type = "fanout"))
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "logs", type = "fanout"))
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
Direct
生產(chǎn)者
// direct
@Test
void testDirect() {
// 修改 error info warn 測試
rabbitTemplate.convertAndSend("direct", "error", "Direct的模型發(fā)送info消息");
}
消費(fèi)者
@Component
public class DirectCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"info", "warn", "error"})
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "direct", type = "direct"), key = {"error"})
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
Topic
生產(chǎn)者
// topic
@Test
void testTopic() {
rabbitTemplate.convertAndSend("topic", "product.save", "Topic的模型發(fā)送product.save消息");
rabbitTemplate.convertAndSend("topic", "user.save", "Topic的模型發(fā)送user.save消息");
}
消費(fèi)者
@Component
public class TopicCustomer {
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.save", "user.*"})
})
public void receive1(String message) {
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue, exchange = @Exchange(value = "topic", type = "topic"), key = {"user.#","product.#"})
})
public void receive2(String message) {
System.out.println("message2 = " + message);
}
}
應(yīng)用場景
異步處理
場景說明:用戶注冊后,需要發(fā)注冊郵件和注冊短信,傳統(tǒng)的做法有兩種 1.串行的方式 2.并行的方式
串行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送注冊郵件,再發(fā)送注冊短信,以上三個任務(wù)全部完成后才返回給客戶端。 這有一個問題是,郵件,短信并不是必須的,它只是一個通知,而這種做法讓客戶端等待沒有必要等待的東西.
并行方式:將注冊信息寫入數(shù)據(jù)庫后,發(fā)送郵件的同時,發(fā)送短信,以上三個任務(wù)完成后,返回給客戶端,并行的方式能提高處理的時間。
消息隊(duì)列:假設(shè)三個業(yè)務(wù)節(jié)點(diǎn)分別使用50ms,串行方式使用時間150ms,并行使用時間100ms。雖然并行已經(jīng)提高的處理時間,但是,前面說過,郵件和短信對我正常的使用網(wǎng)站沒有任何影響,客戶端沒有必要等著其發(fā)送完成才顯示注冊成功,應(yīng)該是寫入數(shù)據(jù)庫后就返回. 消息隊(duì)列: 引入消息隊(duì)列后,把發(fā)送郵件,短信不是必須的業(yè)務(wù)邏輯異步處理
由此可以看出,引入消息隊(duì)列后,用戶的響應(yīng)時間就等于寫入數(shù)據(jù)庫的時間+寫入消息隊(duì)列的時間(可以忽略不計(jì)),引入消息隊(duì)列后處理后,響應(yīng)時間是串行的3倍,是并行的2倍。
應(yīng)用解耦
場景:雙11是購物狂節(jié),用戶下單后,訂單系統(tǒng)需要通知庫存系統(tǒng),傳統(tǒng)的做法就是訂單系統(tǒng)調(diào)用庫存系統(tǒng)的接口.
這種做法有一個缺點(diǎn):
當(dāng)庫存系統(tǒng)出現(xiàn)故障時,訂單就會失敗。 訂單系統(tǒng)和庫存系統(tǒng)高耦合. 引入消息隊(duì)列
訂單系統(tǒng):用戶下單后,訂單系統(tǒng)完成持久化處理,將消息寫入消息隊(duì)列,返回用戶訂單下單成功。
庫存系統(tǒng):訂閱下單的消息,獲取下單消息,進(jìn)行庫操作。 就算庫存系統(tǒng)出現(xiàn)故障,消息隊(duì)列也能保證消息的可靠投遞,不會導(dǎo)致消息丟失.
流量削峰
場景: 秒殺活動,一般會因?yàn)榱髁窟^大,導(dǎo)致應(yīng)用掛掉,為了解決這個問題,一般在應(yīng)用前端加入消息隊(duì)列。
作用:
1.可以控制活動人數(shù),超過此一定閥值的訂單直接丟棄(我為什么秒殺一次都沒有成功過呢^^)
2.可以緩解短時間的高流量壓垮應(yīng)用(應(yīng)用程序按自己的最大處理能力獲取訂單)
1.用戶的請求,服務(wù)器收到之后,首先寫入消息隊(duì)列,加入消息隊(duì)列長度超過最大值,則直接拋棄用戶請求或跳轉(zhuǎn)到錯誤頁面.
2.秒殺業(yè)務(wù)根據(jù)消息隊(duì)列中的請求信息,再做后續(xù)處理.
RabbitMQ集群
普通集群(副本集群)
All data/state required for the operation of a RabbitMQ broker is replicated across all nodes. An exception to this are message queues, which by default reside on one node, though they are visible and reachable from all nodes. To replicate queues across nodes in a cluster --摘自官網(wǎng)
默認(rèn)情況下:RabbitMQ代理操作所需的所有數(shù)據(jù)/狀態(tài)都將跨所有節(jié)點(diǎn)復(fù)制。這方面的一個例外是消息隊(duì)列,默認(rèn)情況下,消息隊(duì)列位于一個節(jié)點(diǎn)上,盡管它們可以從所有節(jié)點(diǎn)看到和訪問
架構(gòu)圖

核心解決問題: 當(dāng)集群中某一時刻master節(jié)點(diǎn)宕機(jī),可以對Quene中信息,進(jìn)行備份
集群搭建
# 0.集群規(guī)劃
node1: 10.15.0.3 mq1 master 主節(jié)點(diǎn)
node2: 10.15.0.4 mq2 repl1 副本節(jié)點(diǎn)
node3: 10.15.0.5 mq3 repl2 副本節(jié)點(diǎn)
# 1.克隆三臺機(jī)器主機(jī)名和ip映射
vim /etc/hosts加入:
10.15.0.3 mq1
10.15.0.4 mq2
10.15.0.5 mq3
node1: vim /etc/hostname 加入: mq1
node2: vim /etc/hostname 加入: mq2
node3: vim /etc/hostname 加入: mq3
# 2.三個機(jī)器安裝rabbitmq,并同步cookie文件,在node1上執(zhí)行:
scp /var/lib/rabbitmq/.erlang.cookie root@mq2:/var/lib/rabbitmq/
scp /var/lib/rabbitmq/.erlang.cookie root@mq3:/var/lib/rabbitmq/
# 3.查看cookie是否一致:
node1: cat /var/lib/rabbitmq/.erlang.cookie
node2: cat /var/lib/rabbitmq/.erlang.cookie
node3: cat /var/lib/rabbitmq/.erlang.cookie
# 4.后臺啟動rabbitmq所有節(jié)點(diǎn)執(zhí)行如下命令,啟動成功訪問管理界面:
rabbitmq-server -detached
# 5.在node2和node3執(zhí)行加入集群命令:
1.關(guān)閉 rabbitmqctl stop_app
2.加入集群 rabbitmqctl join_cluster rabbit@mq1
3.啟動服務(wù) rabbitmqctl start_app
# 6.查看集群狀態(tài),任意節(jié)點(diǎn)執(zhí)行:
rabbitmqctl cluster_status
# 7.如果出現(xiàn)如下顯示,集群搭建成功:
Cluster status of node rabbit@mq3 ...
[{nodes,[{disc,[rabbit@mq1,rabbit@mq2,rabbit@mq3]}]},
{running_nodes,[rabbit@mq1,rabbit@mq2,rabbit@mq3]},
{cluster_name,<<"rabbit@mq1">>},
{partitions,[]},
{alarms,[{rabbit@mq1,[]},{rabbit@mq2,[]},{rabbit@mq3,[]}]}]
# 8.登錄管理界面,展示如下狀態(tài):

# 9.測試集群在node1上,創(chuàng)建隊(duì)列

# 10.查看node2和node3節(jié)點(diǎn):

# 11.關(guān)閉node1節(jié)點(diǎn),執(zhí)行如下命令,查看node2和node3:
rabbitmqctl stop_app
鏡像集群
This guide covers mirroring (queue contents replication) of classic queues --摘自官網(wǎng)
By default, contents of a queue within a RabbitMQ cluster are located on a single node (the node on which the queue was declared). This is in contrast to exchanges and bindings, which can always be considered to be on all nodes. Queues can optionally be made mirrored across multiple nodes. --摘自官網(wǎng)
鏡像隊(duì)列機(jī)制就是將隊(duì)列在三個節(jié)點(diǎn)之間設(shè)置主從關(guān)系,消息會在三個節(jié)點(diǎn)之間進(jìn)行自動同步,且如果其中一個節(jié)點(diǎn)不可用,并不會導(dǎo)致消息丟失或服務(wù)不可用的情況,提升MQ集群的整體高可用性。
集群架構(gòu)圖

配置集群架構(gòu)
# 0.策略說明
rabbitmqctl set_policy [-p <vhost>] [--priority <priority>] [--apply-to <apply-to>] <name> <pattern> <definition>
-p Vhost: 可選參數(shù),針對指定vhost下的queue進(jìn)行設(shè)置
Name: policy的名稱
Pattern: queue的匹配模式(正則表達(dá)式)
Definition:鏡像定義,包括三個部分ha-mode, ha-params, ha-sync-mode
ha-mode:指明鏡像隊(duì)列的模式,有效值為 all/exactly/nodes
all:表示在集群中所有的節(jié)點(diǎn)上進(jìn)行鏡像
exactly:表示在指定個數(shù)的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)的個數(shù)由ha-params指定
nodes:表示在指定的節(jié)點(diǎn)上進(jìn)行鏡像,節(jié)點(diǎn)名稱通過ha-params指定
ha-params:ha-mode模式需要用到的參數(shù)
ha-sync-mode:進(jìn)行隊(duì)列中消息的同步方式,有效值為automatic和manual
priority:可選參數(shù),policy的優(yōu)先級
# 1.查看當(dāng)前策略
rabbitmqctl list_policies
# 2.添加策略
rabbitmqctl set_policy ha-all '^hello' '{"ha-mode":"all","ha-sync-mode":"automatic"}'
說明:策略正則表達(dá)式為 “^” 表示所有匹配所有隊(duì)列名稱 ^hello:匹配hello開頭隊(duì)列
# 3.刪除策略
rabbitmqctl clear_policy ha-all
# 4.測試集群
總結(jié)
前面的除了集群部分沒有完全實(shí)際操作外,其他的都是實(shí)際操作過的目前中間件消息隊(duì)列部分只了解了這一個RabbitMQ,還是希望能學(xué)到更多,更重要的是實(shí)際操作應(yīng)用