個(gè)人專題目錄
1. RabbitMQ工作模式
1.1 Work queues工作隊(duì)列模式
模式說(shuō)明

Work Queues與入門程序的簡(jiǎn)單模式相比,多了一個(gè)或一些消費(fèi)端,多個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。
應(yīng)用場(chǎng)景:對(duì)于 任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。
代碼
Work Queues與入門程序的簡(jiǎn)單模式的代碼是幾乎一樣的;可以完全復(fù)制,并復(fù)制多一個(gè)消費(fèi)者進(jìn)行多個(gè)消費(fèi)者同時(shí)消費(fèi)消息的測(cè)試。
1)生產(chǎn)者
public class Producer {
static final String QUEUE_NAME = "work_queue";
public static void main(String[] args) throws Exception {
//創(chuàng)建連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
for (int i = 1; i <= 30; i++) {
// 發(fā)送信息
String message = "你好;小兔子!work模式--" + i;
/**
* 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
}
// 關(guān)閉資源
channel.close();
connection.close();
}
}
2)消費(fèi)者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//一次只能接收并處理一個(gè)消息
channel.basicQos(1);
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
//確認(rèn)消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
}
}
3)消費(fèi)者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.QUEUE_NAME, true, false, false, null);
//一次只能接收并處理一個(gè)消息
channel.basicQos(1);
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
Thread.sleep(1000);
//確認(rèn)消息
channel.basicAck(envelope.getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.QUEUE_NAME, false, consumer);
}
}
測(cè)試
啟動(dòng)兩個(gè)消費(fèi)者,然后再啟動(dòng)生產(chǎn)者發(fā)送消息;到IDEA的兩個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)查看是否競(jìng)爭(zhēng)性的接收到消息。
小結(jié)
在一個(gè)隊(duì)列中如果有多個(gè)消費(fèi)者,那么消費(fèi)者之間對(duì)于同一個(gè)消息的關(guān)系是競(jìng)爭(zhēng)的關(guān)系。
1.2. 訂閱模式類型
訂閱模式示例圖:

前面2個(gè)案例中,只有3個(gè)角色:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
- queue:消息隊(duì)列,圖中紅色部分
而在訂閱模型中,多了一個(gè)exchange角色,而且過(guò)程略有變化:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序,但是不再發(fā)送到隊(duì)列中,而是發(fā)給X(交換機(jī))
- C:消費(fèi)者,消息的接受者,會(huì)一直等待消息到來(lái)。
- Queue:消息隊(duì)列,接收消息、緩存消息。
- Exchange:交換機(jī),圖中的X。一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個(gè)特別隊(duì)列、遞交給所有隊(duì)列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。Exchange有常見(jiàn)以下3種類型:
- Fanout:廣播,將消息交給所有綁定到交換機(jī)的隊(duì)列
- Direct:定向,把消息交給符合指定routing key 的隊(duì)列
- Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊(duì)列
Exchange(交換機(jī))只負(fù)責(zé)轉(zhuǎn)發(fā)消息,不具備存儲(chǔ)消息的能力,因此如果沒(méi)有任何隊(duì)列與Exchange綁定,或者沒(méi)有符合路由規(guī)則的隊(duì)列,那么消息會(huì)丟失!
1.3. Publish/Subscribe發(fā)布與訂閱模式
模式說(shuō)明

發(fā)布訂閱模式:
1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列。
2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收
到消息
代碼
1)生產(chǎn)者
/**
* 發(fā)布與訂閱使用的交換機(jī)類型為:fanout
*/
public class Producer {
//交換機(jī)名稱
static final String FANOUT_EXCHAGE = "fanout_exchange";
//隊(duì)列名稱
static final String FANOUT_QUEUE_1 = "fanout_queue_1";
//隊(duì)列名稱
static final String FANOUT_QUEUE_2 = "fanout_queue_2";
public static void main(String[] args) throws Exception {
//創(chuàng)建連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機(jī)
* 參數(shù)1:交換機(jī)名稱
* 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(FANOUT_QUEUE_1, true, false, false, null);
channel.queueDeclare(FANOUT_QUEUE_2, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(FANOUT_QUEUE_1, FANOUT_EXCHAGE, "");
channel.queueBind(FANOUT_QUEUE_2, FANOUT_EXCHAGE, "");
for (int i = 1; i <= 10; i++) {
// 發(fā)送信息
String message = "你好;小兔子!發(fā)布訂閱模式--" + i;
/**
* 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish(FANOUT_EXCHAGE, "", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
}
// 關(guān)閉資源
channel.close();
connection.close();
}
}
2)消費(fèi)者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_1, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.FANOUT_QUEUE_1, Producer.FANOUT_EXCHAGE, "");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.FANOUT_QUEUE_1, true, consumer);
}
}
3)消費(fèi)者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.FANOUT_EXCHAGE, BuiltinExchangeType.FANOUT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.FANOUT_QUEUE_2, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.FANOUT_QUEUE_2, Producer.FANOUT_EXCHAGE, "");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.FANOUT_QUEUE_2, true, consumer);
}
}
測(cè)試
啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在每個(gè)消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送的所有消息;到達(dá)廣播的效果。
在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 fanout_exchange 的交換機(jī),可以查看到綁定關(guān)系。
小結(jié)
交換機(jī)需要與隊(duì)列進(jìn)行綁定,綁定之后;一個(gè)消息可以被多個(gè)消費(fèi)者都收到。
發(fā)布訂閱模式與工作隊(duì)列模式的區(qū)別
1、工作隊(duì)列模式不用定義交換機(jī),而發(fā)布/訂閱模式需要定義交換機(jī)。
2、發(fā)布/訂閱模式的生產(chǎn)方是面向交換機(jī)發(fā)送消息,工作隊(duì)列模式的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)交換機(jī))。
3、發(fā)布/訂閱模式需要設(shè)置隊(duì)列和交換機(jī)的綁定,工作隊(duì)列模式不需要設(shè)置,實(shí)際上工作隊(duì)列模式會(huì)將隊(duì)列綁 定到默認(rèn)的交換機(jī) 。
1.4. Routing路由模式
模式說(shuō)明
路由模式特點(diǎn):
- 隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)
RoutingKey(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時(shí),也必須指定消息的
RoutingKey。 - Exchange不再把消息交給每一個(gè)綁定的隊(duì)列,而是根據(jù)消息的
Routing Key進(jìn)行判斷,只有隊(duì)列的Routingkey與消息的Routing key完全一致,才會(huì)接收到消息

圖解:
- P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
- X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
- C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
- C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
代碼
在編碼上與 Publish/Subscribe發(fā)布與訂閱模式 的區(qū)別是交換機(jī)的類型為:Direct,還有隊(duì)列綁定交換機(jī)的時(shí)候需要指定routing key。
1)生產(chǎn)者
/**
* 路由模式的交換機(jī)類型為:direct
*/
public class Producer {
//交換機(jī)名稱
static final String DIRECT_EXCHAGE = "direct_exchange";
//隊(duì)列名稱
static final String DIRECT_QUEUE_INSERT = "direct_queue_insert";
//隊(duì)列名稱
static final String DIRECT_QUEUE_UPDATE = "direct_queue_update";
public static void main(String[] args) throws Exception {
//創(chuàng)建連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機(jī)
* 參數(shù)1:交換機(jī)名稱
* 參數(shù)2:交換機(jī)類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(DIRECT_QUEUE_INSERT, true, false, false, null);
channel.queueDeclare(DIRECT_QUEUE_UPDATE, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(DIRECT_QUEUE_INSERT, DIRECT_EXCHAGE, "insert");
channel.queueBind(DIRECT_QUEUE_UPDATE, DIRECT_EXCHAGE, "update");
// 發(fā)送信息
String message = "新增了商品。路由模式;routing key 為 insert " ;
/**
* 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish(DIRECT_EXCHAGE, "insert", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 發(fā)送信息
message = "修改了商品。路由模式;routing key 為 update" ;
/**
* 參數(shù)1:交換機(jī)名稱,如果沒(méi)有指定則使用默認(rèn)Default Exchage
* 參數(shù)2:路由key,簡(jiǎn)單模式可以傳遞隊(duì)列名稱
* 參數(shù)3:消息其它屬性
* 參數(shù)4:消息內(nèi)容
*/
channel.basicPublish(DIRECT_EXCHAGE, "update", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 關(guān)閉資源
channel.close();
connection.close();
}
}
2)消費(fèi)者1
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_INSERT, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.DIRECT_QUEUE_INSERT, Producer.DIRECT_EXCHAGE, "insert");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.DIRECT_QUEUE_INSERT, true, consumer);
}
}
3)消費(fèi)者2
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.DIRECT_EXCHAGE, BuiltinExchangeType.DIRECT);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.DIRECT_QUEUE_UPDATE, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.DIRECT_QUEUE_UPDATE, Producer.DIRECT_EXCHAGE, "update");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.DIRECT_QUEUE_UPDATE, true, consumer);
}
}
測(cè)試
啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果。
在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 direct_exchange 的交換機(jī),可以查看到綁定關(guān)系。
小結(jié)
Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routing key,消息會(huì)轉(zhuǎn)發(fā)到符合routing key的隊(duì)列。
1.5. Topics通配符模式
模式說(shuō)明
Topic類型與Direct相比,都是可以根據(jù)RoutingKey把消息路由到不同的隊(duì)列。只不過(guò)Topic類型Exchange可以讓隊(duì)列在綁定Routing key 的時(shí)候使用通配符!
Routingkey 一般都是有一個(gè)或多個(gè)單詞組成,多個(gè)單詞之間以”.”分割,例如: item.insert
通配符規(guī)則:
#:匹配一個(gè)或多個(gè)詞
*:匹配不多不少恰好1個(gè)詞
舉例:
item.#:能夠匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert


圖解:
- 紅色Queue:綁定的是
usa.#,因此凡是以usa.開頭的routing key都會(huì)被匹配到 - 黃色Queue:綁定的是
#.news,因此凡是以.news結(jié)尾的routing key都會(huì)被匹配
代碼
1)生產(chǎn)者
使用topic類型的Exchange,發(fā)送消息的routing key有3種: item.insert、item.update、item.delete:
/**
* 通配符Topic的交換機(jī)類型為:topic
*/
public class Producer {
//交換機(jī)名稱
static final String TOPIC_EXCHAGE = "topic_exchange";
//隊(duì)列名稱
static final String TOPIC_QUEUE_1 = "topic_queue_1";
//隊(duì)列名稱
static final String TOPIC_QUEUE_2 = "topic_queue_2";
public static void main(String[] args) throws Exception {
//創(chuàng)建連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
/**
* 聲明交換機(jī)
* 參數(shù)1:交換機(jī)名稱
* 參數(shù)2:交換機(jī)類型,fanout、topic、topic、headers
*/
channel.exchangeDeclare(TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 發(fā)送信息
String message = "新增了商品。Topic模式;routing key 為 item.insert " ;
channel.basicPublish(TOPIC_EXCHAGE, "item.insert", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 發(fā)送信息
message = "修改了商品。Topic模式;routing key 為 item.update" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.update", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 發(fā)送信息
message = "刪除了商品。Topic模式;routing key 為 item.delete" ;
channel.basicPublish(TOPIC_EXCHAGE, "item.delete", null, message.getBytes());
System.out.println("已發(fā)送消息:" + message);
// 關(guān)閉資源
channel.close();
connection.close();
}
}
2)消費(fèi)者1
接收兩種類型的消息:更新商品和刪除商品
public class Consumer1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_1, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.update");
channel.queueBind(Producer.TOPIC_QUEUE_1, Producer.TOPIC_EXCHAGE, "item.delete");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者1-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.TOPIC_QUEUE_1, true, consumer);
}
}
3)消費(fèi)者2
接收所有類型的消息:新增商品,更新商品和刪除商品。
public class Consumer2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建頻道
Channel channel = connection.createChannel();
//聲明交換機(jī)
channel.exchangeDeclare(Producer.TOPIC_EXCHAGE, BuiltinExchangeType.TOPIC);
// 聲明(創(chuàng)建)隊(duì)列
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否定義持久化隊(duì)列
* 參數(shù)3:是否獨(dú)占本次連接
* 參數(shù)4:是否在不使用的時(shí)候自動(dòng)刪除隊(duì)列
* 參數(shù)5:隊(duì)列其它參數(shù)
*/
channel.queueDeclare(Producer.TOPIC_QUEUE_2, true, false, false, null);
//隊(duì)列綁定交換機(jī)
channel.queueBind(Producer.TOPIC_QUEUE_2, Producer.TOPIC_EXCHAGE, "item.*");
//創(chuàng)建消費(fèi)者;并設(shè)置消息處理
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
/**
* consumerTag 消息者標(biāo)簽,在channel.basicConsume時(shí)候可以指定
* envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志(收到消息失敗后是否需要重新發(fā)送)
* properties 屬性信息
* body 消息
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//路由key
System.out.println("路由key為:" + envelope.getRoutingKey());
//交換機(jī)
System.out.println("交換機(jī)為:" + envelope.getExchange());
//消息id
System.out.println("消息id為:" + envelope.getDeliveryTag());
//收到的消息
System.out.println("消費(fèi)者2-接收到的消息為:" + new String(body, "utf-8"));
}
};
//監(jiān)聽(tīng)消息
/**
* 參數(shù)1:隊(duì)列名稱
* 參數(shù)2:是否自動(dòng)確認(rèn),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置為false則需要手動(dòng)確認(rèn)
* 參數(shù)3:消息接收到后回調(diào)
*/
channel.basicConsume(Producer.TOPIC_QUEUE_2, true, consumer);
}
}
測(cè)試
啟動(dòng)所有消費(fèi)者,然后使用生產(chǎn)者發(fā)送消息;在消費(fèi)者對(duì)應(yīng)的控制臺(tái)可以查看到生產(chǎn)者發(fā)送對(duì)應(yīng)routing key對(duì)應(yīng)隊(duì)列的消息;到達(dá)按照需要接收的效果;并且這些routing key可以使用通配符。
在執(zhí)行完測(cè)試代碼后,其實(shí)到RabbitMQ的管理后臺(tái)找到Exchanges選項(xiàng)卡,點(diǎn)擊 topic_exchange 的交換機(jī),可以查看到綁定關(guān)系。
小結(jié)
Topic主題模式可以實(shí)現(xiàn) Publish/Subscribe發(fā)布與訂閱模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的時(shí)候可以使用通配符,顯得更加靈活。
1.6. 模式總結(jié)
RabbitMQ工作模式:
1、簡(jiǎn)單模式 HelloWorld
一個(gè)生產(chǎn)者、一個(gè)消費(fèi)者,不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))
2、工作隊(duì)列模式 Work Queue
一個(gè)生產(chǎn)者、多個(gè)消費(fèi)者(競(jìng)爭(zhēng)關(guān)系),不需要設(shè)置交換機(jī)(使用默認(rèn)的交換機(jī))
3、發(fā)布訂閱模式 Publish/subscribe
需要設(shè)置類型為fanout的交換機(jī),并且交換機(jī)和隊(duì)列進(jìn)行綁定,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)將消息發(fā)送到綁定的隊(duì)列
4、路由模式 Routing
需要設(shè)置類型為direct的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列
5、通配符模式 Topic
需要設(shè)置類型為topic的交換機(jī),交換機(jī)和隊(duì)列進(jìn)行綁定,并且指定通配符方式的routing key,當(dāng)發(fā)送消息到交換機(jī)后,交換機(jī)會(huì)根據(jù)routing key將消息發(fā)送到對(duì)應(yīng)的隊(duì)列