介紹
RabbitMQ
MQ全稱為Message Queue,即消息隊列, RabbitMQ是由erlang語言開發(fā),基于AMQP(Advanced Message Queu
e 高級消息隊列協(xié)議)協(xié)議實現(xiàn)的消息隊列,它是一種應(yīng)用程序之間的通信方法,消息隊列在分布式系統(tǒng)開發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:<u>http://www.rabbitmq.com/</u>
開發(fā)中消息隊列通常有如下應(yīng)用場景:
1、任務(wù)異步處理。
將不需要同步處理的并且耗時長的操作由消息隊列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時間。
2、應(yīng)用程序解耦合
MQ相當(dāng)于一個中介,生產(chǎn)方通過MQ與消費方交互,它將應(yīng)用程序進(jìn)行解耦合。
市場上還有哪些消息隊列?
ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。
為什么使用RabbitMQ呢?
1、使得簡單,功能強大。
2、基于AMQP協(xié)議。
3、社區(qū)活躍,文檔完善。
4、高并發(fā)性能好,這主要得益于Erlang語言。
5、Spring Boot默認(rèn)已集成RabbitMQ


2.快速入門
2.1RabbitMQ的工作原理

組成部分說明如下:
Broker:消息隊列服務(wù)進(jìn)程,此進(jìn)程包括兩個部分:Exchange和Queue。
Exchange:消息隊列交換機,按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個隊列,對消息進(jìn)行過慮。
Queue:消息隊列,存儲消息的隊列,消息到達(dá)隊列并轉(zhuǎn)發(fā)給指定的消費方。
Producer:消息生產(chǎn)者,即生產(chǎn)方客戶端,生產(chǎn)方客戶端將消息發(fā)送到MQ。
Consumer:消息消費者,即消費方客戶端,接收MQ轉(zhuǎn)發(fā)的消息。
消息發(fā)布接收流程:
-----發(fā)送消息-----
1、生產(chǎn)者和Broker建立TCP連接。
2、生產(chǎn)者和Broker建立通道。
3、生產(chǎn)者通過通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。 4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊列)
----接收消息-----
1、消費者和Broker建立TCP連接2、消費者和Broker建立通道
3、消費者監(jiān)聽指定的Queue(隊列)
4、當(dāng)有消息到達(dá)Queue時Broker默認(rèn)將消息推送給消費者。
5、消費者接收到消息。
2.2下載安裝
2.2.1下載安裝
RabbitMQ由Erlang語言開發(fā),Erlang語言用于并發(fā)及分布式系統(tǒng)的開發(fā),在電信領(lǐng)域應(yīng)用廣泛,OTP(Open Telecom Platform)作為Erlang語言的一部分,包含了很多基于Erlang開發(fā)的中間件及工具庫,安裝RabbitMQ需要安裝Erlang/OTP,并保持版本匹配,如下圖:
RabbitMQ的下載地址:<u>http://www.rabbitmq.com/download.html</u>

我們使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。
1) 下載erlang
地址如下:
http://erlang.org/download/otp_win64_20.3.exe
erlang安裝完成需要配置erlang環(huán)境變量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添加%ERLANG_HOME%\bin;
2) 安裝RabbitMQ
https://github.com/rabbitmq/rabbitmq-server/releases/tag/v3.7.3
1.1.1 啟動
安裝成功后會自動創(chuàng)建RabbitMQ服務(wù)并且啟動。
1) 從開始菜單啟動RabbitMQ
完成在開始菜單找到RabbitMQ的菜單:

RabbitMQ Service-install :安裝服務(wù)
RabbitMQ Service-remove 刪除服務(wù)
RabbitMQ Service-start 啟動
RabbitMQ Service-stop 啟動
2)如果沒有開始菜單則進(jìn)入安裝目錄下sbin目錄手動啟動:

1)安裝并運行服務(wù)
rabbitmq-service.bat install 安裝服務(wù) rabbitmq-service.bat stop 停止服務(wù) rabbitmq-service.bat start 啟動服務(wù)
2)安裝管理插件
安裝rabbitMQ的管理插件,方便在瀏覽器端管理RabbitMQ
管理員身份運行 rabbitmq-plugins.bat enable rabbitmq_management
3、啟動成功 登錄RabbitMQ
進(jìn)入瀏覽器,輸入:http://localhost:15672

2.2.3注意事項
1、安裝erlang和rabbitMQ以管理員身份運行。
2、當(dāng)卸載重新安裝時會出現(xiàn)RabbitMQ服務(wù)注冊失敗,此時需要進(jìn)入注冊表清理erlang
搜索RabbitMQ、ErlSrv,將對應(yīng)的項全部刪除。
2.2HelloWorld
按照官方教程(http://www.rabbitmq.com/getstarted.html)測試hello world:

2.2.1 搭建環(huán)境
1) java client
生產(chǎn)者和消費者都屬于客戶端,rabbitMQ的java客戶端如下:

我們先用 rabbitMQ官方提供的java client測試,目的是對RabbitMQ的交互過程有個清晰的認(rèn)識。參考 :<u>https://github.com/rabbitmq/rabbitmq-java-client/</u>
2) 創(chuàng)建maven工程
創(chuàng)建生產(chǎn)者工程和消費者工程,分別加入RabbitMQ java client的依賴。test-rabbitmq-producer:生產(chǎn)者工程
test-rabbitmq-consumer:消費者工程
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp‐client</artifactId>
<version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
2.2.2生產(chǎn)者
在生產(chǎn)者工程下的test中創(chuàng)建測試類如下:
public class Producer01 {
//隊列名稱
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = null;
Channel channel = null;
try {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");
factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)
器
//創(chuàng)建與RabbitMQ服務(wù)的TCP連接
connection = factory.newConnection();
//創(chuàng)建與Exchange的通道,每個連接可以創(chuàng)建多個通道,每個通道代表一個會話任務(wù)channel = connection.createChannel();
/**
*聲明隊列,如果Rabbit中沒有此隊列將自動創(chuàng)建
*param1:隊列名稱
*param2:是否持久化
*param3:隊列是否獨占此連接
*param4:隊列不再使用時是否自動刪除此隊列
*param5:隊列參數(shù)
*/
channel.queueDeclare(QUEUE, true, false, false, null); String message = "helloworld小明"+System.currentTimeMillis();
/**
*消息發(fā)布方法
*param1:Exchange的名稱,如果沒有指定,則使用Default Exchange
*param2:routingKey,消息的路由Key,是用于Exchange(交換機)將消息轉(zhuǎn)發(fā)到指定的消息隊列
*param3:消息包含的屬性
*param4:消息體
*/
/**
*這里沒有指定交換機,消息將發(fā)送給默認(rèn)交換機,每個隊列也會綁定那個默認(rèn)的交換機,但是不能顯 示綁定或解除綁定
*默認(rèn)的交換機,routingKey等于隊列名稱
*/
channel.basicPublish("", QUEUE, null, message.getBytes()); System.out.println("Send Message is:'" + message + "'");
} catch(Exception ex) {
ex.printStackTrace();
}
finally{
if(channel != null)
{
channel.close();
}
if(connection != null)
{
connection.close();
}
}
}
}
2.2.3消費者
在消費者工程下的test中創(chuàng)建測試類如下:
public class Consumer01 {
private static final String QUEUE = "helloworld";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置MabbitMQ所在服務(wù)器的ip和端口
factory.setHost("127.0.0.1");
factory.setPort(5672);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//聲明隊列
channel.queueDeclare(QUEUE, true, false, false, null);
//定義消費方法
DefaultConsumer consumer = new DefaultConsumer(channel) {
/**
*消費者接收消息調(diào)用此方法
*@param consumerTag 消費者的標(biāo)簽,在channel.basicConsume()去指定
*@param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機,消息和重傳標(biāo)志
(收到消息失敗后是否需要重新發(fā)送)
*@param properties
*@param body
*@throws IOException
*
* */
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
//交換機
String exchange = envelope.getExchange();
//路由key
String routingKey = envelope.getRoutingKey();
//消息id
long deliveryTag = envelope.getDeliveryTag();
//消息內(nèi)容
String msg = new String(body, "utf‐8");
System.out.println("receive message.." + msg);
}
};
/**
*監(jiān)聽隊列String queue, boolean autoAck,Consumer callback
*參數(shù)明細(xì)
*1、隊列名稱
*2、是否自動回復(fù),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置 為false則需要手動回復(fù)
*3、消費消息的方法,消費者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE, true, consumer);
}
2.2.4 總結(jié)
1、發(fā)送端操作流程
1)創(chuàng)建連接
2)創(chuàng)建通道
3)聲明隊列
4)發(fā)送消息
2、接收端
1)創(chuàng)建連接
2)創(chuàng)建通道
3)聲明隊列
4)監(jiān)聽隊列
5)接收消息
6)ack回復(fù)
4工作模式
RabbitMQ有以下幾種工作模式 :
1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
4.1Work queues

work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息。應(yīng)用場景:對于 任務(wù)過重或任務(wù)較多情況使用工作隊列可以提高任務(wù)處理的速度。
測試:
1、使用入門程序,啟動多個消費者。
2、生產(chǎn)者發(fā)送多個消息。結(jié)果:
1、一條消息只會被一個消費者接收;
2、rabbit采用輪詢的方式將消息是平均發(fā)送給消費者的;
3、消費者在處理完某條消息后,才會收到下一條消息。

發(fā)布訂閱模式:
1、每個消費者監(jiān)聽自己的隊列。
2、生產(chǎn)者將消息發(fā)給broker,由交換機將消息轉(zhuǎn)發(fā)到綁定此交換機的每個隊列,每個綁定交換機的隊列都將接收到消息
案例
用戶通知,當(dāng)用戶充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶,通知方式有短信、郵件多種方法 。
1、生產(chǎn)者
聲明Exchange_fanout_inform交換機。
聲明兩個隊列并且綁定到此交換機,綁定時不需要指定routingkey .
發(fā)送消息時不需要指定routingkey
public class Producer02_publish {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_FANOUT_INFORM = "exchange_fanout_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創(chuàng)建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)
器
//創(chuàng)建一個連接
connection = factory.newConnection();
//創(chuàng)建與交換機的通道,每個通道代表一個會話channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
*參數(shù)明細(xì)
*1、交換機名稱
*2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//聲明隊列
// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
*參數(shù)明細(xì):
*1、隊列名稱
*2、是否持久化
*3、是否獨占此隊列
*4、隊列不用是否自動刪除
*5、參數(shù)
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
*參數(shù)明細(xì)
*1、隊列名稱
*2、交換機名稱
*3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_FANOUT_INFORM, "");
//發(fā)送消息
for (int i = 0; i < 10; i++) {
String message = "inform to user" + i;
//向交換機發(fā)送消息 String exchange, String routingKey, BasicProperties props,byte[] body
/**
*參數(shù)明細(xì)
*1、交換機名稱,不指令使用默認(rèn)交換機名稱 Default Exchange
*2、routingKey(路由key),根據(jù)key名稱將消息轉(zhuǎn)發(fā)到具體的隊列,這里填寫隊列名稱表示消
*3、消息屬性
*4、消息內(nèi)容
*/
channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
} catch(IOException e){
e.printStackTrace();
} catch(TimeoutException e){
e.printStackTrace();
}finally{
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2、郵件發(fā)送消費者
public class Consumer02_subscribe_email {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_FANOUT_INFORM = "inform_exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)器
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建與交換機的通道,每個通道代表一個會話Channel channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
*參數(shù)明細(xì)
*1、交換機名稱
*2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
*參數(shù)明細(xì):
*1、隊列名稱
*2、是否持久化
*3、是否獨占此隊列
*4、隊列不用是否自動刪除
*5、參數(shù)
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
*參數(shù)明細(xì)
*1、隊列名稱
*2、交換機名稱
*3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");
//定義消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息內(nèi)容
String message = new String(body, "utf‐8");
System.out.println(message);
}
};
/**
*監(jiān)聽隊列String queue, boolean autoAck,Consumer callback
*參數(shù)明細(xì)
*1、隊列名稱
*2、是否自動回復(fù),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置 為false則需要手動回復(fù)
*3、消費消息的方法,消費者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
按照上邊的代碼,編寫郵件通知的消費代碼。
3、短信發(fā)送消費者
參考上邊的郵件發(fā)送消費者代碼編寫。
4.2.3測試
打開RabbitMQ的管理界面,觀察交換機綁定情況:

4.2.4思考
1、publish/subscribe與work queues有什么區(qū)別。區(qū)別:
1)work queues不用定義交換機,而publish/subscribe需要定義交換機。
2)publish/subscribe的生產(chǎn)方是面向交換機發(fā)送消息,work queues的生產(chǎn)方是面向隊列發(fā)送消息(底層使用默認(rèn)交換機)。
3)publish/subscribe需要設(shè)置隊列和交換機的綁定,work queues不需要設(shè)置,實質(zhì)上work queues會將隊列綁定到默認(rèn)的交換機 。
相同點:
所以兩者實現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個消費端監(jiān)聽同一個隊列不會重復(fù)消費消息。
2、實質(zhì)工作用什么 publish/subscribe還是work queues。
建議使用 publish/subscribe,發(fā)布訂閱模式比工作隊列模式更強大,并且發(fā)布訂閱模式可以指定自己專用的交換機。


路由模式:
1、每個消費者監(jiān)聽自己的隊列,并且設(shè)置routingkey。
2、生產(chǎn)者將消息發(fā)給交換機,由交換機根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊列。
4.3.2代碼
1、生產(chǎn)者
聲明exchange_routing_inform交換機。
聲明兩個隊列并且綁定到此交換機,綁定時需要指定routingkey
發(fā)送消息時需要指定routingkey
public class Producer03_routing {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_ROUTING_INFORM = "exchange_routing_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創(chuàng)建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)
器
//創(chuàng)建一個連接
connection = factory.newConnection();
//創(chuàng)建與交換機的通道,每個通道代表一個會話channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
*參數(shù)明細(xì)
*1、交換機名稱
*2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
*參數(shù)明細(xì):
*1、隊列名稱
*2、是否持久化
*3、是否獨占此隊列
*4、隊列不用是否自動刪除
*5、參數(shù)
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
*參數(shù)明細(xì)
*1、隊列名稱
*2、交換機名稱
*3、路由key
*/channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);
//發(fā)送郵件消息
for (int i = 0; i < 10; i++) {
String message = "email inform to user" + i;
//向交換機發(fā)送消息 String exchange, String routingKey, BasicProperties props,byte[] body
//息將發(fā)到此隊列
/**
*參數(shù)明細(xì)
*1、交換機名稱,不指令使用默認(rèn)交換機名稱 Default Exchange
*2、routingKey(路由key),根據(jù)key名稱將消息轉(zhuǎn)發(fā)到具體的隊列,這里填寫隊列名稱表示消
*3、消息屬性
*4、消息內(nèi)容
*/
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null, message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發(fā)送短信消息
for (int i = 0; i < 10; i++) {
String message = "sms inform to user" + i;
//向交換機發(fā)送消息 String exchange, String routingKey, BasicProperties props,byte[] body
channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2、郵件發(fā)送消費者
public class Consumer03_routing_email {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";
private static final String EXCHANGE_ROUTING_INFORM = "inform_exchange_routing";
public static void main(String[] args) throws IOException, TimeoutException {
//創(chuàng)建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)器
//創(chuàng)建一個連接
Connection connection = factory.newConnection();
//創(chuàng)建與交換機的通道,每個通道代表一個會話Channel channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
*參數(shù)明細(xì)
*1、交換機名稱
*2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);
//聲明隊列
// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
/**
*參數(shù)明細(xì):
*1、隊列名稱
*2、是否持久化
*3、是否獨占此隊列
*4、隊列不用是否自動刪除
*5、參數(shù)
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
//交換機和隊列綁定String queue, String exchange, String routingKey
/**
*參數(shù)明細(xì)
*1、隊列名稱
*2、交換機名稱
*3、路由key
*/
channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL);
//定義消費方法
DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
String exchange = envelope.getExchange();
//消息內(nèi)容
String message = new String(body, "utf‐8");
System.out.println(message);
}
};
/**
*監(jiān)聽隊列String queue, boolean autoAck,Consumer callback
*參數(shù)明細(xì)
*1、隊列名稱
*2、是否自動回復(fù),設(shè)置為true為表示消息接收到自動向mq回復(fù)接收到了,mq接收到回復(fù)會刪除消息,設(shè)置 為false則需要手動回復(fù)
*3、消費消息的方法,消費者接收到消息后調(diào)用此方法
*/
channel.basicConsume(QUEUE_INFORM_EMAIL, true, defaultConsumer);
}
}
3、短信發(fā)送消費者
參考郵件發(fā)送消費者的代碼流程,編寫短信通知的代碼。
4.3.3測試
打開RabbitMQ的管理界面,觀察交換機綁定情況:

使用生產(chǎn)者發(fā)送若干條消息,交換機根據(jù)routingkey轉(zhuǎn)發(fā)消息到指定的隊列。
4.4.4思考
1、Routing模式和Publish/subscibe有啥區(qū)別?
Routing模式要求隊列在綁定交換機時要指定routingkey,消息會轉(zhuǎn)發(fā)到符合routingkey的隊列。

路由模式:
1、每個消費者監(jiān)聽自己的隊列,并且設(shè)置帶統(tǒng)配符的routingkey。
2、生產(chǎn)者將消息發(fā)給broker,由交換機根據(jù)routingkey來轉(zhuǎn)發(fā)消息到指定的隊列。
4.4.2代碼
案例:
根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種 通知類型都接收的則兩種通知都有效。
1、生產(chǎn)者
聲明交換機,指定topic類型:
/**
* 聲明交換機
* param1:交換機名稱
* param2:交換機類型 四種交換機類型:direct、fanout、topic、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM,BuiltinExchangeType.TOPIC);
//Email通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.email",null,message.getBytes());
//sms通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms",null,message.getBytes());
//兩種都通知
channel.basicPublish(EXCHANGE_TOPICS_INFORM,"inform.sms.email",null,message.getBytes());
完整代碼:
public class Producer04_topics {
//隊列名稱
private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
private static final String QUEUE_INFORM_SMS = "queue_inform_sms";
private static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
public static void main(String[] args) {
Connection connection = null;
Channel channel = null;
try {
//創(chuàng)建一個與MQ的連接
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機名稱為“/”,虛擬機相當(dāng)于一個獨立的mq服務(wù)
器
//創(chuàng)建一個連接
connection = factory.newConnection();
//創(chuàng)建與交換機的通道,每個通道代表一個會話channel = connection.createChannel();
//聲明交換機 String exchange, BuiltinExchangeType type
/**
* 參數(shù)明細(xì)*1、交換機名稱
*2、交換機類型,fanout、topic、direct、headers
*/
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//聲明隊列
/**
*參數(shù)明細(xì):
*1、隊列名稱
*2、是否持久化
*3、是否獨占此隊列
*4、隊列不用是否自動刪除
*5、參數(shù)
*/
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);
channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
byte[] body
//發(fā)送郵件消息
for (int i = 0; i < 10; i++) {
String message = "email inform to user" + i;
//向交換機發(fā)送消息 String exchange, String routingKey, BasicProperties props,
/**
*參數(shù)明細(xì)
*1、交換機名稱,不指令使用默認(rèn)交換機名稱 Default Exchange
*2、routingKey(路由key),根據(jù)key名稱將消息轉(zhuǎn)發(fā)到具體的隊列,這里填寫隊列名稱表示消
息將發(fā)到此隊列
*3、消息屬性
*4、消息內(nèi)容
*/
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發(fā)送短信消息
for (int i = 0; i < 10; i++) {
String message = "sms inform to user" + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
//發(fā)送短信和郵件消息
for (int i = 0; i < 10; i++) {
String message = "sms and email inform to user" + i;
channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null,
message.getBytes());
System.out.println("Send Message is:'" + message + "'");
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
} finally {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
2、消費端
隊列綁定交換機指定通配符: 統(tǒng)配符規(guī)則:
中間以“.”分隔。
符號#可以匹配多個詞,符號*可以匹配一個詞語。
//聲明隊列
channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null); channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);
//聲明交換機
channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);
//綁定email通知隊列channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,"inform.#.email.#");
//綁定sms通知隊列
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,"inform.#.sms.#");
測試:

4.4.4思考
1、本案例的需求使用Routing工作模式能否實現(xiàn)?
使用Routing模式也可以實現(xiàn)本案例,共設(shè)置三個 routingkey,分別是email、sms、all,email隊列綁定email和all,sms隊列綁定sms和all,這樣就可以實現(xiàn)上邊案例的功能,實現(xiàn)過程比topics復(fù)雜。
Topic模式更多加強大,它可以實現(xiàn)Routing、publish/subscirbe模式的功能。
4.5Header模式
header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對)匹配隊列。
案例:
根據(jù)用戶的通知設(shè)置去通知用戶,設(shè)置接收Email的用戶只接收Email,設(shè)置接收sms的用戶只接收sms,設(shè)置兩種 通知類型都接收的則兩種通知都有效。
代碼:
1)生產(chǎn)者
隊列與交換機綁定的代碼與之前不同,如下:
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_type", "email");
Map<String, Object> headers_sms = new Hashtable<String, Object>();
headers_sms.put("inform_type", "sms"); channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);
通知:
String message="email inform to user"+i;
Map<String, Object> headers=new Hashtable<String, Object>();headers.put("inform_type","email");//匹配email通知消費者綁定的header
//headers.put("inform_type", "sms");//匹配sms通知消費者綁定的header
AMQP.BasicProperties.Builder properties=new AMQP.BasicProperties.Builder();properties.headers(headers);
//Email通知
channel.basicPublish(EXCHANGE_HEADERS_INFORM,"",properties.build(),message.getBytes());
2)發(fā)送郵件消費者
channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);
Map<String, Object> headers_email = new Hashtable<String, Object>();
headers_email.put("inform_email", "email");
//交換機和隊列綁定
channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);
//指定消費隊列
channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);
3)測試


RPC即客戶端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實現(xiàn)RPC的異步調(diào)用,基于Direct交換機實現(xiàn),流程如下:
1、客戶端即是生產(chǎn)者就是消費者,向RPC請求隊列發(fā)送RPC調(diào)用消息,同時監(jiān)聽RPC響應(yīng)隊列。
2、服務(wù)端監(jiān)聽RPC請求隊列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果
3、服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊列
4、客戶端(RPC調(diào)用方)監(jiān)聽RPC響應(yīng)隊列,接收到RPC調(diào)用結(jié)果。

我們選擇基于Spring-Rabbit去操作RabbitMQ <u>https://github.com/spring-projects/spring-amqp</u>
使用spring-boot-starter-amqp會自動添加spring-rabbit依賴,如下:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
4.2配置
1、配置application.yml
配置連接rabbitmq的參數(shù)
server:
port: 44000 spring:
application:
name: test‐rabbitmq‐producer
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtualHost: /
2、定義RabbitCon?g類,配置Exchange、Queue、及綁定交換機。本例配置Topic交換機。
@Configuration
public class RabbitmqConfig {
public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
public static final String EXCHANGE_TOPICS_INFORM = "exchange_topics_inform";
/**
* 交換機配置
* ExchangeBuilder提供了fanout、direct、topic、header交換機類型的配置
*
* @return the exchange
*/
@Bean(EXCHANGE_TOPICS_INFORM)
public Exchange EXCHANGE_TOPICS_INFORM() {
//durable(true)持久化,消息隊列重啟后交換機仍然存在
return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
}
//聲明隊列@Bean(QUEUE_INFORM_SMS)
public Queue QUEUE_INFORM_SMS() {
Queue queue = new Queue(QUEUE_INFORM_SMS);
return queue;
}
//聲明隊列@Bean(QUEUE_INFORM_EMAIL)
public Queue QUEUE_INFORM_EMAIL() {
Queue queue = new Queue(QUEUE_INFORM_EMAIL);
return queue;
}
/**
* channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");
* 綁定隊列到交換機 .
*
* @param queue the queue
* @param exchange the exchange
* @return the binding
*/
@Bean
public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();
}
@Bean
public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();
}
}
4.3生產(chǎn)端
使用RarbbitTemplate發(fā)送消息
@SpringBootTest
@RunWith(SpringRunner.class)
public class Producer05_topics_springboot {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void testSendByTopics() {
for (int i = 0; i < 5; i++) {
String message = "sms email inform to user" + i;
rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM, "inform.sms.email", message);
System.out.println("Send Message is:'" + message + "'");
}
}
4.4消費端
創(chuàng)建消費端工程,添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐test</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐starter‐logging</artifactId>
</dependency>
使用@RabbitListener注解監(jiān)聽隊列。
@Component
public class ReceiveHandler {
//監(jiān)聽email隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})
public void receive_email(String msg, Message message, Channel channel) {
System.out.println(msg);
}
//監(jiān)聽sms隊列
@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})
public void receive_sms(String msg, Message message, Channel channel) {
System.out.println(msg);
}
}
4.5測試
