05_RabbitMQ研究

1 頁(yè)面發(fā)布

1.1 需求分析

業(yè)務(wù)流程如下:

1、管理員進(jìn)入管理界面點(diǎn)擊“頁(yè)面發(fā)布”,前端請(qǐng)求cms頁(yè)面發(fā)布接口。

2、cms頁(yè)面發(fā)布接口執(zhí)行頁(yè)面靜態(tài)化,并將靜態(tài)化頁(yè)面(html文件)存儲(chǔ)至GridFS中。

3、靜態(tài)化成功后,向消息隊(duì)列發(fā)送頁(yè)面發(fā)布的消息。

頁(yè)面發(fā)布的最終目標(biāo)是將頁(yè)面發(fā)布到服務(wù)器。

通過(guò)消息隊(duì)列將頁(yè)面發(fā)布的消息發(fā)送給各各服務(wù)器。

3、消息隊(duì)列負(fù)責(zé)將消息發(fā)送給各各服務(wù)器上部署的Cms Client(Cms客戶(hù)端)。

在服務(wù)器上部署Cms Client(Cms客戶(hù)端),客戶(hù)端接收消息隊(duì)列的通知。

4、每個(gè)接收到頁(yè)面發(fā)布消息的Cms Client從GridFS獲取Html頁(yè)面文件,并將Html文件存儲(chǔ)在本地服務(wù)器。

CmsClient根據(jù)頁(yè)面發(fā)布消息的內(nèi)容請(qǐng)求GridFS獲取頁(yè)面文件,存儲(chǔ)在本地服務(wù)器。

1.2 RabbitMQ研究

要實(shí)現(xiàn)上邊頁(yè)面發(fā)布的功能,有一個(gè)重要的環(huán)節(jié)就是由消息隊(duì)列將頁(yè)面發(fā)布的消息通知給各各服務(wù)器。

本節(jié)的教學(xué)目標(biāo)是對(duì)MQ的研究:

1、理解MQ的應(yīng)用場(chǎng)景

2、理解MQ常用的工作模式

1 介紹

1.1 RabbitMQ

Queue 高級(jí)消息隊(duì)列協(xié)議)協(xié)議實(shí)現(xiàn)的消息隊(duì)列,它是一種應(yīng)用程序之間的通信方法,消息隊(duì)列在分布式系統(tǒng)開(kāi)

發(fā)中應(yīng)用非常廣泛。RabbitMQ官方地址:http://www.rabbitmq.com/

開(kāi)發(fā)中消息隊(duì)列通常有如下應(yīng)用場(chǎng)景:

1、任務(wù)異步處理。

將不需要同步處理的并且耗時(shí)長(zhǎng)的操作由消息隊(duì)列通知消息接收方進(jìn)行異步處理。提高了應(yīng)用程序的響應(yīng)時(shí)間。

2、應(yīng)用程序解耦合

MQ相當(dāng)于一個(gè)中介,生產(chǎn)方通過(guò)MQ與消費(fèi)方交互,它將應(yīng)用程序進(jìn)行解耦合。

市場(chǎng)上還有哪些消息隊(duì)列?

ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ、Redis。

為什么使用RabbitMQ呢?

1、使得簡(jiǎn)單,功能強(qiáng)大。

2、基于AMQP協(xié)議。

3、社區(qū)活躍,文檔完善。

4、高并發(fā)性能好,這主要得益于Erlang語(yǔ)言。

5、Spring Boot默認(rèn)已集成RabbitMQ

1.2 其它相關(guān)知識(shí)

AMQP是什么 ?


總結(jié):AMQP是一套公開(kāi)的消息隊(duì)列協(xié)議,最早在2003年被提出,它旨在從協(xié)議層定義消息通信數(shù)據(jù)的標(biāo)準(zhǔn)格式,

為的就是解決MQ市場(chǎng)上協(xié)議不統(tǒng)一的問(wèn)題。RabbitMQ就是遵循AMQP標(biāo)準(zhǔn)協(xié)議開(kāi)發(fā)的MQ服務(wù)。

官方:http://www.amqp.org/

JMS是什么 ?


總結(jié):

JMS是java提供的一套消息服務(wù)API標(biāo)準(zhǔn),其目的是為所有的java應(yīng)用程序提供統(tǒng)一的消息通信的標(biāo)準(zhǔn),類(lèi)似java的

jdbc,只要遵循jms標(biāo)準(zhǔn)的應(yīng)用程序之間都可以進(jìn)行消息通信。它和AMQP有什么 不同,jms是java語(yǔ)言專(zhuān)屬的消

息服務(wù)標(biāo)準(zhǔn),它是在api層定義標(biāo)準(zhǔn),并且只能用于java應(yīng)用;而AMQP是在協(xié)議層定義的標(biāo)準(zhǔn),是跨語(yǔ)言的 。


2 快速入門(mén)

2.1 RabbitMQ的工作原理

下圖是RabbitMQ的基本結(jié)構(gòu):


組成部分說(shuō)明如下:

Broker:消息隊(duì)列服務(wù)進(jìn)程,此進(jìn)程包括兩個(gè)部分:Exchange和Queue。

Exchange:消息隊(duì)列交換機(jī),按一定的規(guī)則將消息路由轉(zhuǎn)發(fā)到某個(gè)隊(duì)列,對(duì)消息進(jìn)行過(guò)慮。

Queue:消息隊(duì)列,存儲(chǔ)消息的隊(duì)列,消息到達(dá)隊(duì)列并轉(zhuǎn)發(fā)給指定的消費(fèi)方。

Producer:消息生產(chǎn)者,即生產(chǎn)方客戶(hù)端,生產(chǎn)方客戶(hù)端將消息發(fā)送到MQ。

Consumer:消息消費(fèi)者,即消費(fèi)方客戶(hù)端,接收MQ轉(zhuǎn)發(fā)的消息。

消息發(fā)布接收流程:

-----發(fā)送消息-----

1、生產(chǎn)者和Broker建立TCP連接。

2、生產(chǎn)者和Broker建立通道。

3、生產(chǎn)者通過(guò)通道消息發(fā)送給Broker,由Exchange將消息進(jìn)行轉(zhuǎn)發(fā)。

4、Exchange將消息轉(zhuǎn)發(fā)到指定的Queue(隊(duì)列)

----接收消息-----

1、消費(fèi)者和Broker建立TCP連接

2、消費(fèi)者和Broker建立通道

3、消費(fèi)者監(jiān)聽(tīng)指定的Queue(隊(duì)列)

4、當(dāng)有消息到達(dá)Queue時(shí)Broker默認(rèn)將消息推送給消費(fèi)者。

5、消費(fèi)者接收到消息。

2.2 下載安裝

?可以看看:http://www.itdecent.cn/p/3d43561bb3ee

2.2.1 下載安裝

RabbitMQ由Erlang語(yǔ)言開(kāi)發(fā),Erlang語(yǔ)言用于并發(fā)及分布式系統(tǒng)的開(kāi)發(fā),在電信領(lǐng)域應(yīng)用廣泛,OTP(Open

Telecom Platform)作為Erlang語(yǔ)言的一部分,包含了很多基于Erlang開(kāi)發(fā)的中間件及工具庫(kù),安裝RabbitMQ需

要安裝Erlang/OTP,并保持版本匹配,如下圖:

RabbitMQ的下載地址:http://www.rabbitmq.com/download.html

本項(xiàng)目使用Erlang/OTP 20.3版本和RabbitMQ3.7.3版本。

1)下載erlang

地址如下:

http://erlang.org/download/otp_win64_20.3.exe

erlang安裝完成需要配置erlang環(huán)境變量: ERLANG_HOME=D:\Program Files\erl9.3 在path中添

加%ERLANG_HOME%\bin;

2)安裝RabbitMQ

https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.3/rabbitmq-server-3.7.3.exe

2.2.2啟動(dòng)??進(jìn)入安裝目錄下sbin目錄手動(dòng)啟動(dòng):

管理員身份運(yùn)行 rabbitmq-plugins.bat enable rabbitmq_management

進(jìn)入瀏覽器,輸入:http://localhost:15672

初始賬號(hào)和密碼:guest/guest

2.2.3 注意事項(xiàng)

1、安裝erlang和rabbitMQ以管理員身份運(yùn)行。

2、當(dāng)卸載重新安裝時(shí)會(huì)出現(xiàn)RabbitMQ服務(wù)注冊(cè)失敗,此時(shí)需要進(jìn)入注冊(cè)表清理erlang

搜索RabbitMQ、ErlSrv,將對(duì)應(yīng)的項(xiàng)全部刪除。

2.2 Hello World

按照官方教程(http://www.rabbitmq.com/getstarted.html)測(cè)試hello world:


2.2.1搭建環(huán)境

1)java client

生產(chǎn)者和消費(fèi)者都屬于客戶(hù)端,rabbitMQ的java客戶(hù)端如下:

我們先用 rabbitMQ官方提供的java client測(cè)試,目的是對(duì)RabbitMQ的交互過(guò)程有個(gè)清晰的認(rèn)識(shí)。

參考 :https://github.com/rabbitmq/rabbitmq-java-client/

2)創(chuàng)建maven工程

創(chuàng)建生產(chǎn)者工程和消費(fèi)者工程,分別加入RabbitMQ java client的依賴(lài)。

test-rabbitmq-producer:生產(chǎn)者工程

test-rabbitmq-consumer:消費(fèi)者工程

1:新建2個(gè)module? 一個(gè)生產(chǎn)一個(gè)消費(fèi)? ? ?xc-framework-parent 做父工程


==============================

<dependency>

<groupId>com.rabbitmq</groupId>

<artifactId>amqp‐client</artifactId>

<version>4.0.3</version><!‐‐此版本與spring boot 1.5.9版本匹配‐‐>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐logging</artifactId>

</dependency>

=============================

2.2.2 生產(chǎn)者

在生產(chǎn)者工程下的test中創(chuàng)建測(cè)試類(lèi)如下:

=================================

public class Producer01 {

//隊(duì)列名稱(chēng)

private static final String QUEUE = "helloworld";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = null;

Channel channel = null;

try

{

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱(chēng)為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)

//創(chuàng)建與RabbitMQ服務(wù)的TCP連接

connection = factory.newConnection();

//創(chuàng)建與Exchange的通道,每個(gè)連接可以創(chuàng)建多個(gè)通道,每個(gè)通道代表一個(gè)會(huì)話(huà)任務(wù)

channel = connection.createChannel();

/**

* 聲明隊(duì)列,如果Rabbit中沒(méi)有此隊(duì)列將自動(dòng)創(chuàng)建

* param1:隊(duì)列名稱(chēng)

* param2:是否持久化

* param3:隊(duì)列是否獨(dú)占此連接

* param4:隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列

* param5:隊(duì)列參數(shù)

*/

channel.queueDeclare(QUEUE, true, false, false, null);

String message = "helloworld小明"+System.currentTimeMillis();

/**

* 消息發(fā)布方法

* param1:Exchange的名稱(chēng),如果沒(méi)有指定,則使用Default Exchange

* param2:routingKey,消息的路由Key,是用于Exchange(交換機(jī))將消息轉(zhuǎn)發(fā)到指定的消息隊(duì)列

* param3:消息包含的屬性

* param4:消息體

*/

/**

* 這里沒(méi)有指定交換機(jī),消息將發(fā)送給默認(rèn)交換機(jī),每個(gè)隊(duì)列也會(huì)綁定那個(gè)默認(rèn)的交換機(jī),但是不能顯

示綁定或解除綁定

* 默認(rèn)的交換機(jī),routingKey等于隊(duì)列名稱(chēng)

*/

channel.basicPublish("", QUEUE, null, message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}

catch(Exception ex)

{

ex.printStackTrace();

}

finally

{

if(channel != null)

{

channel.close();

}

if(connection != null)

{

connection.close();

}

}

}

}


==================================

2.2.3 消費(fèi)者

在消費(fèi)者工程下的test中創(chuàng)建測(cè)試類(lèi)如下:

===================================

private static final String QUEUE = "helloworld";

public static void main(String[] args) throws IOException, TimeoutException {

ConnectionFactory factory = new ConnectionFactory();

//設(shè)置MabbitMQ所在服務(wù)器的ip和端口

factory.setHost("127.0.0.1");

factory.setPort(5672);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

//聲明隊(duì)列

channel.queueDeclare(QUEUE, true, false, false, null);

//定義消費(fèi)方法

DefaultConsumer consumer = new DefaultConsumer(channel) {

/**

* 消費(fèi)者接收消息調(diào)用此方法

* @param consumerTag 消費(fèi)者的標(biāo)簽,在channel.basicConsume()去指定

* @param envelope 消息包的內(nèi)容,可從中獲取消息id,消息routingkey,交換機(jī),消息和重傳標(biāo)志

(收到消息失敗后是否需要重新發(fā)送)

* @param properties

* @param body

* @throws IOException

@Override

public void handleDelivery(String consumerTag,

Envelope envelope,

AMQP.BasicProperties properties,

byte[] body)

throws IOException {

//交換機(jī)

String exchange = envelope.getExchange();

//路由key

String routingKey = envelope.getRoutingKey();

//消息id

long deliveryTag = envelope.getDeliveryTag();

//消息內(nèi)容

String msg = new String(body,"utf‐8");

System.out.println("receive message.." + msg);

}

};

/**

* 監(jiān)聽(tīng)隊(duì)列String queue, boolean autoAck,Consumer callback

* 參數(shù)明細(xì)

* 1、隊(duì)列名稱(chēng)

* 2、是否自動(dòng)回復(fù),設(shè)置為true為表示消息接收到自動(dòng)向mq回復(fù)接收到了,mq接收到回復(fù)會(huì)刪除消息,設(shè)置

為false則需要手動(dòng)回復(fù)

* 3、消費(fèi)消息的方法,消費(fèi)者接收到消息后調(diào)用此方法

*/

channel.basicConsume(QUEUE, true, consumer);

}

}


====================================

2.2.4 總結(jié)

1、發(fā)送端操作流程

1)創(chuàng)建連接

2)創(chuàng)建通道

3)聲明隊(duì)列

4)發(fā)送消息

2、接收端

1)創(chuàng)建連接

2)創(chuàng)建通道

3)聲明隊(duì)列

4)監(jiān)聽(tīng)隊(duì)列

5)接收消息

6)ack回復(fù)

4 工作模式

RabbitMQ有以下幾種工作模式 :

1、Work queues

2、Publish/Subscribe

3、Routing

4、Topics

5、Header

6、RPC

4.1 Work queues


work queues與入門(mén)程序相比,多了一個(gè)消費(fèi)端,兩個(gè)消費(fèi)端共同消費(fèi)同一個(gè)隊(duì)列中的消息。

應(yīng)用場(chǎng)景:對(duì)于 任務(wù)過(guò)重或任務(wù)較多情況使用工作隊(duì)列可以提高任務(wù)處理的速度。

測(cè)試:

1、使用入門(mén)程序,啟動(dòng)多個(gè)消費(fèi)者。

2、生產(chǎn)者發(fā)送多個(gè)消息。

結(jié)果:

1、一條消息只會(huì)被一個(gè)消費(fèi)者接收;

2、rabbit采用輪詢(xún)的方式將消息是平均發(fā)送給消費(fèi)者的;

3、消費(fèi)者在處理完某條消息后,才會(huì)收到下一條消息。

4.2 Publish/subscribe



發(fā)布訂閱模式:

1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列。

2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)將消息轉(zhuǎn)發(fā)到綁定此交換機(jī)的每個(gè)隊(duì)列,每個(gè)綁定交換機(jī)的隊(duì)列都將接收

到消息

4.2.2代碼

案例:

用戶(hù)通知,當(dāng)用戶(hù)充值成功或轉(zhuǎn)賬完成系統(tǒng)通知用戶(hù),通知方式有短信、郵件多種方法 。

1、生產(chǎn)者

聲明Exchange_fanout_inform交換機(jī)。

聲明兩個(gè)隊(duì)列并且綁定到此交換機(jī),綁定時(shí)不需要指定routingkey

發(fā)送消息時(shí)不需要指定routingkey

============================================

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Producer02_publish {

//隊(duì)列名稱(chēng)

private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

private static final String QUEUE_INFORM_SMS = "queue_inform_sms";

private static final String EXCHANGE_FANOUT_INFORM="exchange_fanout_inform";

public static void main(String[] args) {

Connection connection = null;

Channel channel = null;

try {

//創(chuàng)建一個(gè)與MQ的連接

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱(chēng)為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)

//創(chuàng)建一個(gè)連接

connection = factory.newConnection();

//創(chuàng)建與交換機(jī)的通道,每個(gè)通道代表一個(gè)會(huì)話(huà)

channel = connection.createChannel();

//聲明交換機(jī) String exchange, BuiltinExchangeType type

/**

* 參數(shù)明細(xì)

* 1、交換機(jī)名稱(chēng)

* 2、交換機(jī)類(lèi)型,fanout、topic、direct、headers

*/

channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

//聲明隊(duì)列

// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String,

Object> arguments)

/**

* 參數(shù)明細(xì):

* 1、隊(duì)列名稱(chēng)

* 2、是否持久化

* 3、是否獨(dú)占此隊(duì)列

* 4、隊(duì)列不用是否自動(dòng)刪除

* 5、參數(shù)

*/

channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//交換機(jī)和隊(duì)列綁定String queue, String exchange, String routingKey

/**

* 參數(shù)明細(xì)

* 1、隊(duì)列名稱(chēng)

* 2、交換機(jī)名稱(chēng)

* 3、路由key

*/

channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_FANOUT_INFORM,"");

channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_FANOUT_INFORM,"");

//發(fā)送消息

for (int i=0;i<10;i++){

String message = "inform to user"+i;

//向交換機(jī)發(fā)送消息 String exchange, String routingKey, BasicProperties props,

byte[] body

/**

* 參數(shù)明細(xì)

* 1、交換機(jī)名稱(chēng),不指令使用默認(rèn)交換機(jī)名稱(chēng) Default Exchange

* 2、routingKey(路由key),根據(jù)key名稱(chēng)將消息轉(zhuǎn)發(fā)到具體的隊(duì)列,這里填寫(xiě)隊(duì)列名稱(chēng)表示消

息將發(fā)到此隊(duì)列

* 3、消息屬性

* 4、消息內(nèi)容

*/

channel.basicPublish(EXCHANGE_FANOUT_INFORM, "", null, message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}finally{

if(channel!=null){

try {

channel.close();

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

if(connection!=null){

try {

connection.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}


=============================================

2、郵件發(fā)送消費(fèi)者

======================

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.*;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Consumer02_subscribe_email {

//隊(duì)列名稱(chēng)

private static final String QUEUE_INFORM_EMAIL = "inform_queue_email";

private static final String EXCHANGE_FANOUT_INFORM="inform_exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {

//創(chuàng)建一個(gè)與MQ的連接

ConnectionFactory connectionFactory = new ConnectionFactory();

? ? ? ? connectionFactory.setHost("127.0.0.1");

? ? ? ? connectionFactory.setPort(5672);//端口

? ? ? ? connectionFactory.setUsername("guest");

? ? ? ? connectionFactory.setPassword("guest");

? ? ? ? //設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mq

? ? ? ? connectionFactory.setVirtualHost("/");

//建立新連接

? ? ? ? Connection connection = connectionFactory.newConnection();

? ? ? ? //創(chuàng)建會(huì)話(huà)通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

? ? ? ? Channel channel = connection.createChannel();

/**

? ? ? ? * 參數(shù)明細(xì)

? ? ? ? * 1、queue 隊(duì)列名稱(chēng)

? ? ? ? * 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在

? ? ? ? * 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建

? ? ? ? * 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)

? ? ? ? * 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間

? ? ? ? */

channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);

? ? ? ? //聲明一個(gè)交換機(jī)

? ? ? ? //參數(shù):String exchange, String type

? ? ? ? /**

? ? ? ? * 參數(shù)明細(xì):

? ? ? ? * 1、交換機(jī)的名稱(chēng)

? ? ? ? * 2、交換機(jī)的類(lèi)型

? ? ? ? * fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe

? ? ? ? * direct:對(duì)應(yīng)的Routing 工作模式

? ? ? ? * topic:對(duì)應(yīng)的Topics工作模式

? ? ? ? * headers: 對(duì)應(yīng)的headers工作模式

? ? ? ? */

channel.exchangeDeclare(EXCHANGE_FANOUT_INFORM, BuiltinExchangeType.FANOUT);

? ? ? ? //進(jìn)行交換機(jī)和隊(duì)列綁定

? ? ? ? //參數(shù):String queue, String exchange, String routingKey

? ? ? ? /**

? ? ? ? * 參數(shù)明細(xì):

? ? ? ? * 1、queue 隊(duì)列名稱(chēng)

? ? ? ? * 2、exchange 交換機(jī)名稱(chēng)

? ? ? ? * 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串

? ? ? ? */

? ? ? ? channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_FANOUT_INFORM, "");

//實(shí)現(xiàn)消費(fèi)方法

? ? ? ? DefaultConsumer defaultConsumer = new DefaultConsumer(channel){

? ? ? ? ? ? /**

? ? ? ? ? ? * 當(dāng)接收到消息后此方法將被調(diào)用

? ? ? ? ? ? * @param consumerTag? 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume

? ? ? ? ? ? * @param envelope 信封,通過(guò)envelope

? ? ? ? ? ? * @param properties 消息屬性

? ? ? ? ? ? * @param body 消息內(nèi)容

? ? ? ? ? ? * @throws IOException

? ? ? ? ? ? */

? ? ? ? ? ? @Override

? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

? ? ? ? ? ? ? ? //交換機(jī)

? ? ? ? ? ? ? ? String exchange = envelope.getExchange();

? ? ? ? ? ? ? ? //消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收

? ? ? ? ? ? ? ? long deliveryTag = envelope.getDeliveryTag();

? ? ? ? ? ? ? ? //消息內(nèi)容

? ? ? ? ? ? ? ? String message= new String(body,"utf-8");

? ? ? ? ? ? ? ? System.out.println("receive message:"+message);

? ? ? ? ? ? }

? ? ? ? };

//監(jiān)聽(tīng)隊(duì)列

? ? ? ? //參數(shù):String queue, boolean autoAck, Consumer callback

? ? ? ? /**

? ? ? ? * 參數(shù)明細(xì):

? ? ? ? * 1、queue 隊(duì)列名稱(chēng)

? ? ? ? * 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)

? ? ? ? * 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法

? ? ? ? */

? ? ? ? channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

? ? }

}


=====================

按照上邊的代碼,編寫(xiě)郵件通知的消費(fèi)代碼。

3、短信發(fā)送消費(fèi)者

參考上邊的郵件發(fā)送消費(fèi)者代碼編寫(xiě)。

4.2.3測(cè)試

打開(kāi)RabbitMQ的管理界面,觀察交換機(jī)綁定情況:

4.2.4 思考

1、publish/subscribe與work queues有什么區(qū)別。

區(qū)別:

1)work queues不用定義交換機(jī),而publish/subscribe需要定義交換機(jī)。

2)publish/subscribe的生產(chǎn)方是面向交換機(jī)發(fā)送消息,work queues的生產(chǎn)方是面向隊(duì)列發(fā)送消息(底層使用默認(rèn)

交換機(jī))。

3)publish/subscribe需要設(shè)置隊(duì)列和交換機(jī)的綁定,work queues不需要設(shè)置,實(shí)質(zhì)上work queues會(huì)將隊(duì)列綁

定到默認(rèn)的交換機(jī) 。

相同點(diǎn):

所以?xún)烧邔?shí)現(xiàn)的發(fā)布/訂閱的效果是一樣的,多個(gè)消費(fèi)端監(jiān)聽(tīng)同一個(gè)隊(duì)列不會(huì)重復(fù)消費(fèi)消息。

2、實(shí)質(zhì)工作用什么 publish/subscribe還是work queues。

建議使用 publish/subscribe,發(fā)布訂閱模式比工作隊(duì)列模式更強(qiáng)大,并且發(fā)布訂閱模式可以指定自己專(zhuān)用的交換

機(jī)。

4.3 Routing

4.3.1 工作模式


路由模式:

1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列,并且設(shè)置routingkey。

2、生產(chǎn)者將消息發(fā)給交換機(jī),由交換機(jī)根據(jù)routingkey來(lái)轉(zhuǎn)發(fā)消息到指定的隊(duì)列。

4.3.2代碼

1、生產(chǎn)者

聲明exchange_routing_inform交換機(jī)。

聲明兩個(gè)隊(duì)列并且綁定到此交換機(jī),綁定時(shí)需要指定routingkey

發(fā)送消息時(shí)需要指定routingkey

===============================

package com.xuecheng.test.rabbitmq;

import com.rabbitmq.client.BuiltinExchangeType;

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

import java.util.concurrent.TimeoutException;

public class Producer03_routing {

//隊(duì)列名稱(chēng)

private static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

private static final String QUEUE_INFORM_SMS = "queue_inform_sms";

private static final String EXCHANGE_ROUTING_INFORM="exchange_routing_inform";

public static void main(String[] args) {

Connection connection = null;

Channel channel = null;

try {

//創(chuàng)建一個(gè)與MQ的連接

ConnectionFactory factory = new ConnectionFactory();

factory.setHost("127.0.0.1");

factory.setPort(5672);

factory.setUsername("guest");

factory.setPassword("guest");

factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱(chēng)為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)

//創(chuàng)建一個(gè)連接

connection = factory.newConnection();

//創(chuàng)建與交換機(jī)的通道,每個(gè)通道代表一個(gè)會(huì)話(huà)

channel = connection.createChannel();

//聲明交換機(jī) String exchange, BuiltinExchangeType type

/**

* 參數(shù)明細(xì)

* 1、交換機(jī)名稱(chēng)

* 2、交換機(jī)類(lèi)型,fanout、topic、direct、headers

*/

channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

//聲明隊(duì)列

// channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean

autoDelete, Map<String, Object> arguments)

/**

* 參數(shù)明細(xì):

* 1、隊(duì)列名稱(chēng)

* 2、是否持久化

* 3、是否獨(dú)占此隊(duì)列

* 4、隊(duì)列不用是否自動(dòng)刪除

* 5、參數(shù)

*/

channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//交換機(jī)和隊(duì)列綁定String queue, String exchange, String routingKey

/**

* 參數(shù)明細(xì)

* 1、隊(duì)列名稱(chēng)

* 2、交換機(jī)名稱(chēng)

* 3、路由key

*/

channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_EMAIL);

channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_ROUTING_INFORM,QUEUE_INFORM_SMS);

//發(fā)送郵件消息

for (int i=0;i<10;i++){

String message = "email inform to user"+i;

//向交換機(jī)發(fā)送消息 String exchange, String routingKey, BasicProperties props,

byte[] body

/**

* 參數(shù)明細(xì)

* 1、交換機(jī)名稱(chēng),不指令使用默認(rèn)交換機(jī)名稱(chēng) Default Exchange

* 2、routingKey(路由key),根據(jù)key名稱(chēng)將消息轉(zhuǎn)發(fā)到具體的隊(duì)列,這里填寫(xiě)隊(duì)列名稱(chēng)表示消

息將發(fā)到此隊(duì)列

* 3、消息屬性

* 4、消息內(nèi)容

*/

channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_EMAIL, null,

message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}

//發(fā)送短信消息

for (int i=0;i<10;i++){

String message = "sms inform to user"+i;

//向交換機(jī)發(fā)送消息 String exchange, String routingKey, BasicProperties props,

byte[] body

channel.basicPublish(EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS, null,

message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}finally{

if(channel!=null){

try {

channel.close();

} catch (IOException e) {

e.printStackTrace();

} catch (TimeoutException e) {

e.printStackTrace();

}

}

if(connection!=null){

try {

connection.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

}

}

===============================

2、郵件發(fā)送消費(fèi)者

==============================

public class Consumer03_subscribe_sms {

//隊(duì)列名稱(chēng)

? ? private static final StringQUEUE_INFORM_SMS ="queue_inform_sms";

? ? private static final StringEXCHANGE_ROUTING_INFORM="exchange_routing_inform";

? ? public static void main(String[] args)throws IOException, TimeoutException {

//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接

? ? ? ? ConnectionFactory connectionFactory =new ConnectionFactory();

? ? ? ? connectionFactory.setHost("127.0.0.1");

? ? ? ? connectionFactory.setPort(5672);//端口

? ? ? ? connectionFactory.setUsername("guest");

? ? ? ? connectionFactory.setPassword("guest");

? ? ? ? //設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mq

? ? ? ? connectionFactory.setVirtualHost("/");

? ? ? ? //建立新連接

? ? ? ? Connection connection = connectionFactory.newConnection();

? ? ? ? //創(chuàng)建會(huì)話(huà)通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? /**

* 參數(shù)明細(xì)

* 1、queue 隊(duì)列名稱(chēng)

* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在

* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建

* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)

* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間

*/

? ? ? ? channel.queueDeclare(QUEUE_INFORM_SMS,true,false,false,null);

? ? ? ? //聲明一個(gè)交換機(jī)

//參數(shù):String exchange, String type

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、交換機(jī)的名稱(chēng)

* 2、交換機(jī)的類(lèi)型

* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe

* direct:對(duì)應(yīng)的Routing? 工作模式

* topic:對(duì)應(yīng)的Topics工作模式

* headers: 對(duì)應(yīng)的headers工作模式

*/

? ? ? ? channel.exchangeDeclare(EXCHANGE_ROUTING_INFORM, BuiltinExchangeType.DIRECT);

? ? ? ? //進(jìn)行交換機(jī)和隊(duì)列綁定

//參數(shù):String queue, String exchange, String routingKey

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、queue 隊(duì)列名稱(chēng)

* 2、exchange 交換機(jī)名稱(chēng)

* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串

*/

? ? ? ? channel.queueBind(QUEUE_INFORM_SMS, EXCHANGE_ROUTING_INFORM, QUEUE_INFORM_SMS);

? ? ? ? //實(shí)現(xiàn)消費(fèi)方法

? ? ? ? DefaultConsumer defaultConsumer =new DefaultConsumer(channel){

/**

* 當(dāng)接收到消息后此方法將被調(diào)用

? ? ? ? ? ? * @param consumerTag? 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume

? ? ? ? ? ? * @param envelope 信封,通過(guò)envelope

? ? ? ? ? ? * @param properties 消息屬性

? ? ? ? ? ? * @param body 消息內(nèi)容

? ? ? ? ? ? * @throws IOException

*/

? ? ? ? ? ? @Override

? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {

//交換機(jī)

? ? ? ? ? ? ? ? String exchange = envelope.getExchange();

? ? ? ? ? ? ? ? //消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收

? ? ? ? ? ? ? ? long deliveryTag = envelope.getDeliveryTag();

? ? ? ? ? ? ? ? //消息內(nèi)容

? ? ? ? ? ? ? ? String message=new String(body,"utf-8");

? ? ? ? ? ? ? ? System.out.println("receive message:"+message);

? ? ? ? ? ? }

};

? ? ? ? //監(jiān)聽(tīng)隊(duì)列

//參數(shù):String queue, boolean autoAck, Consumer callback

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、queue 隊(duì)列名稱(chēng)

* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)

* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法

*/

? ? ? ? channel.basicConsume(QUEUE_INFORM_SMS,true,defaultConsumer);

? ? }

}

==============================

3、短信發(fā)送消費(fèi)者

參考郵件發(fā)送消費(fèi)者的代碼流程,編寫(xiě)短信通知的代碼。

4.4.4思考

1、Routing模式和Publish/subscibe有啥區(qū)別?

Routing模式要求隊(duì)列在綁定交換機(jī)時(shí)要指定routingkey,消息會(huì)轉(zhuǎn)發(fā)到符合routingkey的隊(duì)列。

4.4 Topics

4.4.1工作模式


路由模式:

1、每個(gè)消費(fèi)者監(jiān)聽(tīng)自己的隊(duì)列,并且設(shè)置帶統(tǒng)配符的routingkey。

2、生產(chǎn)者將消息發(fā)給broker,由交換機(jī)根據(jù)routingkey來(lái)轉(zhuǎn)發(fā)消息到指定的隊(duì)列。

案例:

根據(jù)用戶(hù)的通知設(shè)置去通知用戶(hù),設(shè)置接收Email的用戶(hù)只接收Email,設(shè)置接收sms的用戶(hù)只接收sms,設(shè)置兩種

通知類(lèi)型都接收的則兩種通知都有效。

1、生產(chǎn)者

聲明交換機(jī),指定topic類(lèi)型:

========================

public class Producer04_topics {

//隊(duì)列名稱(chēng)

? ? private static final StringQUEUE_INFORM_EMAIL ="queue_inform_email";

? ? private static final StringQUEUE_INFORM_SMS ="queue_inform_sms";

? ? private static final StringEXCHANGE_TOPICS_INFORM="exchange_topics_inform";

? ? private static final StringROUTINGKEY_EMAIL="inform.#.email.#";

? ? private static final StringROUTINGKEY_SMS="inform.#.sms.#";

? ? public static void main(String[] args) {

Connection connection =null;

? ? ? ? Channel channel =null;

? ? ? ? try {

//創(chuàng)建一個(gè)與MQ的連接

? ? ? ? ? ? ConnectionFactory factory =new ConnectionFactory();

? ? ? ? ? ? factory.setHost("127.0.0.1");

? ? ? ? ? ? factory.setPort(5672);

? ? ? ? ? ? factory.setUsername("guest");

? ? ? ? ? ? factory.setPassword("guest");

? ? ? ? ? ? factory.setVirtualHost("/");//rabbitmq默認(rèn)虛擬機(jī)名稱(chēng)為“/”,虛擬機(jī)相當(dāng)于一個(gè)獨(dú)立的mq服務(wù)

//創(chuàng)建一個(gè)連接

? ? ? ? ? ? connection = factory.newConnection();

//創(chuàng)建與交換機(jī)的通道,每個(gè)通道代表一個(gè)會(huì)話(huà)

? ? ? ? ? ? channel = connection.createChannel();

//聲明交換機(jī) String exchange, BuiltinExchangeType type

/**

* 參數(shù)明細(xì)

* 1、交換機(jī)名稱(chēng)

* 2、交換機(jī)類(lèi)型,fanout、topic、direct、headers

*/

? ? ? ? ? ? channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

//聲明隊(duì)列

// (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments)

/**

* 參數(shù)明細(xì):

* 1、隊(duì)列名稱(chēng)

* 2、是否持久化

* 3、是否獨(dú)占此隊(duì)列

* 4、隊(duì)列不用是否自動(dòng)刪除

* 5、參數(shù)

*/

? ? ? ? ? ? channel.queueDeclare(QUEUE_INFORM_EMAIL, true, false, false, null);

? ? ? ? ? ? channel.queueDeclare(QUEUE_INFORM_SMS, true, false, false, null);

//交換機(jī)和隊(duì)列綁定String queue, String exchange, String routingKey

? ? ? ? ? ? /**

* 參數(shù)明細(xì)

* 1、隊(duì)列名稱(chēng)

* 2、交換機(jī)名稱(chēng)

* 3、路由key

*/

? ? ? ? ? ? channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

? ? ? ? ? ? channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_TOPICS_INFORM,ROUTINGKEY_SMS);

? ? ? ? ? ? //發(fā)送郵件消息

/*for (int i=0;i<10;i++){

String message = "email inform to user"+i;

channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.email", null,

message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}*/

//發(fā)送短信消息

? ? ? ? ? ? for (int i=0;i<10;i++){

String message ="sms inform to user"+i;

? ? ? ? ? ? ? ? channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms", null,

? ? ? ? ? ? ? ? ? ? ? ? message.getBytes());

? ? ? ? ? ? ? ? System.out.println("Send Message is:'" + message +"'");

? ? ? ? ? ? }

//發(fā)送短信和郵件消息

/* for (int i=0;i<10;i++){

String message = "sms and email inform to user"+i;

channel.basicPublish(EXCHANGE_TOPICS_INFORM, "inform.sms.email", null,

message.getBytes());

System.out.println("Send Message is:'" + message + "'");

}*/

? ? ? ? }catch (IOException e) {

e.printStackTrace();

? ? ? ? }catch (TimeoutException e) {

e.printStackTrace();

? ? ? ? }finally{

if(channel!=null){

try {

channel.close();

? ? ? ? ? ? ? ? }catch (IOException e) {

e.printStackTrace();

? ? ? ? ? ? ? ? }catch (TimeoutException e) {

e.printStackTrace();

? ? ? ? ? ? ? ? }

}

if(connection!=null){

try {

connection.close();

? ? ? ? ? ? ? ? }catch (IOException e) {

e.printStackTrace();

? ? ? ? ? ? ? ? }

}

}

}

}

=======================

2、消費(fèi)端

隊(duì)列綁定交換機(jī)指定通配符:

統(tǒng)配符規(guī)則:

中間以“.”分隔。

符號(hào)#可以匹配多個(gè)詞,符號(hào)*可以匹配一個(gè)詞語(yǔ)。

=============

public class Consumer04_topics_email {

//隊(duì)列名稱(chēng)

? ? private static final StringQUEUE_INFORM_EMAIL ="queue_inform_email";

? ? private static final StringEXCHANGE_TOPICS_INFORM="exchange_topics_inform";

? ? private static final StringROUTINGKEY_EMAIL="inform.#.email.#";

? ? public static void main(String[] args)throws IOException, TimeoutException {

//通過(guò)連接工廠創(chuàng)建新的連接和mq建立連接

? ? ? ? ConnectionFactory connectionFactory =new ConnectionFactory();

? ? ? ? connectionFactory.setHost("127.0.0.1");

? ? ? ? connectionFactory.setPort(5672);//端口

? ? ? ? connectionFactory.setUsername("guest");

? ? ? ? connectionFactory.setPassword("guest");

? ? ? ? //設(shè)置虛擬機(jī),一個(gè)mq服務(wù)可以設(shè)置多個(gè)虛擬機(jī),每個(gè)虛擬機(jī)就相當(dāng)于一個(gè)獨(dú)立的mq

? ? ? ? connectionFactory.setVirtualHost("/");

? ? ? ? //建立新連接

? ? ? ? Connection connection = connectionFactory.newConnection();

? ? ? ? //創(chuàng)建會(huì)話(huà)通道,生產(chǎn)者和mq服務(wù)所有通信都在channel通道中完成

? ? ? ? Channel channel = connection.createChannel();

? ? ? ? /**

* 參數(shù)明細(xì)

* 1、queue 隊(duì)列名稱(chēng)

* 2、durable 是否持久化,如果持久化,mq重啟后隊(duì)列還在

* 3、exclusive 是否獨(dú)占連接,隊(duì)列只允許在該連接中訪問(wèn),如果connection連接關(guān)閉隊(duì)列則自動(dòng)刪除,如果將此參數(shù)設(shè)置true可用于臨時(shí)隊(duì)列的創(chuàng)建

* 4、autoDelete 自動(dòng)刪除,隊(duì)列不再使用時(shí)是否自動(dòng)刪除此隊(duì)列,如果將此參數(shù)和exclusive參數(shù)設(shè)置為true就可以實(shí)現(xiàn)臨時(shí)隊(duì)列(隊(duì)列不用了就自動(dòng)刪除)

* 5、arguments 參數(shù),可以設(shè)置一個(gè)隊(duì)列的擴(kuò)展參數(shù),比如:可設(shè)置存活時(shí)間

*/

? ? ? ? channel.queueDeclare(QUEUE_INFORM_EMAIL,true,false,false,null);

? ? ? ? //聲明一個(gè)交換機(jī)

//參數(shù):String exchange, String type

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、交換機(jī)的名稱(chēng)

* 2、交換機(jī)的類(lèi)型

* fanout:對(duì)應(yīng)的rabbitmq的工作模式是 publish/subscribe

* direct:對(duì)應(yīng)的Routing? 工作模式

* topic:對(duì)應(yīng)的Topics工作模式

* headers: 對(duì)應(yīng)的headers工作模式

*/

? ? ? ? channel.exchangeDeclare(EXCHANGE_TOPICS_INFORM, BuiltinExchangeType.TOPIC);

? ? ? ? //進(jìn)行交換機(jī)和隊(duì)列綁定

//參數(shù):String queue, String exchange, String routingKey

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、queue 隊(duì)列名稱(chēng)

* 2、exchange 交換機(jī)名稱(chēng)

* 3、routingKey 路由key,作用是交換機(jī)根據(jù)路由key的值將消息轉(zhuǎn)發(fā)到指定的隊(duì)列中,在發(fā)布訂閱模式中調(diào)協(xié)為空字符串

*/

? ? ? ? channel.queueBind(QUEUE_INFORM_EMAIL, EXCHANGE_TOPICS_INFORM,ROUTINGKEY_EMAIL);

? ? ? ? //實(shí)現(xiàn)消費(fèi)方法

? ? ? ? DefaultConsumer defaultConsumer =new DefaultConsumer(channel){

/**

* 當(dāng)接收到消息后此方法將被調(diào)用

? ? ? ? ? ? * @param consumerTag? 消費(fèi)者標(biāo)簽,用來(lái)標(biāo)識(shí)消費(fèi)者的,在監(jiān)聽(tīng)隊(duì)列時(shí)設(shè)置channel.basicConsume

? ? ? ? ? ? * @param envelope 信封,通過(guò)envelope

? ? ? ? ? ? * @param properties 消息屬性

? ? ? ? ? ? * @param body 消息內(nèi)容

? ? ? ? ? ? * @throws IOException

*/

? ? ? ? ? ? @Override

? ? ? ? ? ? public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException {

//交換機(jī)

? ? ? ? ? ? ? ? String exchange = envelope.getExchange();

? ? ? ? ? ? ? ? //消息id,mq在channel中用來(lái)標(biāo)識(shí)消息的id,可用于確認(rèn)消息已接收

? ? ? ? ? ? ? ? long deliveryTag = envelope.getDeliveryTag();

? ? ? ? ? ? ? ? //消息內(nèi)容

? ? ? ? ? ? ? ? String message=new String(body,"utf-8");

? ? ? ? ? ? ? ? System.out.println("receive message:"+message);

? ? ? ? ? ? }

};

? ? ? ? //監(jiān)聽(tīng)隊(duì)列

//參數(shù):String queue, boolean autoAck, Consumer callback

? ? ? ? /**

* 參數(shù)明細(xì):

* 1、queue 隊(duì)列名稱(chēng)

* 2、autoAck 自動(dòng)回復(fù),當(dāng)消費(fèi)者接收到消息后要告訴mq消息已接收,如果將此參數(shù)設(shè)置為tru表示會(huì)自動(dòng)回復(fù)mq,如果設(shè)置為false要通過(guò)編程實(shí)現(xiàn)回復(fù)

* 3、callback,消費(fèi)方法,當(dāng)消費(fèi)者接收到消息要執(zhí)行的方法

*/

? ? ? ? channel.basicConsume(QUEUE_INFORM_EMAIL,true,defaultConsumer);

? ? }

==============

4.4.4思考

1、本案例的需求使用Routing工作模式能否實(shí)現(xiàn)?

使用Routing模式也可以實(shí)現(xiàn)本案例,共設(shè)置三個(gè) routingkey,分別是email、sms、all,email隊(duì)列綁定email和

all,sms隊(duì)列綁定sms和all,這樣就可以實(shí)現(xiàn)上邊案例的功能,實(shí)現(xiàn)過(guò)程比topics復(fù)雜。

Topic模式更多加強(qiáng)大,它可以實(shí)現(xiàn)Routing、publish/subscirbe模式的功能。

4.5 Header模式

header模式與routing不同的地方在于,header模式取消routingkey,使用header中的 key/value(鍵值對(duì))匹配

隊(duì)列。

案例:

根據(jù)用戶(hù)的通知設(shè)置去通知用戶(hù),設(shè)置接收Email的用戶(hù)只接收Email,設(shè)置接收sms的用戶(hù)只接收sms,設(shè)置兩種

通知類(lèi)型都接收的則兩種通知都有效。

代碼:

1)生產(chǎn)者

隊(duì)列與交換機(jī)綁定的代碼與之前不同,如下:


================================================

Map<String, Object> headers_email = new Hashtable<String, Object>();

headers_email.put("inform_type", "email");

Map<String, Object> headers_sms = new Hashtable<String, Object>();

headers_sms.put("inform_type", "sms");

channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);

channel.queueBind(QUEUE_INFORM_SMS,EXCHANGE_HEADERS_INFORM,"",headers_sms);

================================================

通知:

================

String message = "email inform to user"+i;

Map<String,Object> headers = new Hashtable<String, Object>();

headers.put("inform_type", "email");//匹配email通知消費(fèi)者綁定的header

//headers.put("inform_type", "sms");//匹配sms通知消費(fèi)者綁定的header

AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder();

properties.headers(headers);

//Email通知

channel.basicPublish(EXCHANGE_HEADERS_INFORM, "", properties.build(), message.getBytes());

===================

2)發(fā)送郵件消費(fèi)者

=================

channel.exchangeDeclare(EXCHANGE_HEADERS_INFORM, BuiltinExchangeType.HEADERS);

Map<String, Object> headers_email = new Hashtable<String, Object>();

headers_email.put("inform_email", "email");

//交換機(jī)和隊(duì)列綁定

channel.queueBind(QUEUE_INFORM_EMAIL,EXCHANGE_HEADERS_INFORM,"",headers_email);

//指定消費(fèi)隊(duì)列

channel.basicConsume(QUEUE_INFORM_EMAIL, true, consumer);

=================

4.6 RPC

RPC即客戶(hù)端遠(yuǎn)程調(diào)用服務(wù)端的方法 ,使用MQ可以實(shí)現(xiàn)RPC的異步調(diào)用,基于Direct交換機(jī)實(shí)現(xiàn),流程如下:

1、客戶(hù)端即是生產(chǎn)者就是消費(fèi)者,向RPC請(qǐng)求隊(duì)列發(fā)送RPC調(diào)用消息,同時(shí)監(jiān)聽(tīng)RPC響應(yīng)隊(duì)列。

2、服務(wù)端監(jiān)聽(tīng)RPC請(qǐng)求隊(duì)列的消息,收到消息后執(zhí)行服務(wù)端的方法,得到方法返回的結(jié)果

3、服務(wù)端將RPC方法 的結(jié)果發(fā)送到RPC響應(yīng)隊(duì)列

4、客戶(hù)端(RPC調(diào)用方)監(jiān)聽(tīng)RPC響應(yīng)隊(duì)列,接收到RPC調(diào)用結(jié)果。

5 Spring整合RibbitMQ

4.1 搭建SpringBoot環(huán)境

我們選擇基于Spring-Rabbit去操作RabbitMQ

https://github.com/spring-projects/spring-amqp

使用spring-boot-starter-amqp會(huì)自動(dòng)添加spring-rabbit依賴(lài),如下:

================================

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐amqp</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐test</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐logging</artifactId>

</dependency>


=================================

4.2配置

1、配置application.yml

配置連接rabbitmq的參數(shù)

=========================

server:

port: 44000

spring:

application:

name: test‐rabbitmq‐producer

rabbitmq:

host: 127.0.0.1

port: 5672

username: guest

password: guest

virtualHost: /


=========================

2、定義RabbitConfifig類(lèi),配置Exchange、Queue、及綁定交換機(jī)。

本例配置Topic交換機(jī)。

=============================

@Configuration

public class RabbitmqConfig {

public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";

public static final String QUEUE_INFORM_SMS = "queue_inform_sms";

public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";

/**

* 交換機(jī)配置

* ExchangeBuilder提供了fanout、direct、topic、header交換機(jī)類(lèi)型的配置

* @return the exchange

*/

@Bean(EXCHANGE_TOPICS_INFORM)

public Exchange EXCHANGE_TOPICS_INFORM() {

//durable(true)持久化,消息隊(duì)列重啟后交換機(jī)仍然存在

return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();

}

//聲明隊(duì)列

@Bean(QUEUE_INFORM_SMS)

public Queue QUEUE_INFORM_SMS() {

Queue queue = new Queue(QUEUE_INFORM_SMS);

return queue;

}

//聲明隊(duì)列

@Bean(QUEUE_INFORM_EMAIL)

public Queue QUEUE_INFORM_EMAIL() {

Queue queue = new Queue(QUEUE_INFORM_EMAIL);

return queue;

}

/** channel.queueBind(INFORM_QUEUE_SMS,"inform_exchange_topic","inform.#.sms.#");

* 綁定隊(duì)列到交換機(jī) .

*

* @param queue the queue

* @param exchange the exchange

* @return the binding

*/

@Bean

public Binding BINDING_QUEUE_INFORM_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,

@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("inform.#.sms.#").noargs();

}

@Bean

public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,

@Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange) {

return BindingBuilder.bind(queue).to(exchange).with("inform.#.email.#").noargs();

}

}


=============================

4.3 生產(chǎn)端

使用RarbbitTemplate發(fā)送消息??

在測(cè)試包下創(chuàng)建測(cè)試:

==============================

package com.xuecheng.test.rabbitmq;

import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;

import org.junit.Test;

import org.junit.runner.RunWith;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.boot.test.context.SpringBootTest;

import org.springframework.test.context.junit4.SpringRunner;

@SpringBootTest

@RunWith(SpringRunner.class)

public class Producer05_topics_springboot {

@Autowired

RabbitTemplate rabbitTemplate;

@Test

public void testSendByTopics(){

for (int i=0;i<5;i++){

String message = "sms email inform to user"+i;

rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANGE_TOPICS_INFORM,"inform.sms.email",message);

System.out.println("Send Message is:'" + message + "'");

}

}

}


==============================

4.4消費(fèi)端

創(chuàng)建消費(fèi)端工程,添加依賴(lài):

======================

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐amqp</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐test</artifactId>

</dependency>

<dependency>

<groupId>org.springframework.boot</groupId>

<artifactId>spring‐boot‐starter‐logging</artifactId>

</dependency>


======================

使用@RabbitListener注解監(jiān)聽(tīng)隊(duì)列。

====================================

package com.xuecheng.test.rabbitmq.mq;

import com.rabbitmq.client.Channel;

import com.xuecheng.test.rabbitmq.config.RabbitmqConfig;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.stereotype.Component;

@Component

public class ReceiveHandler {

//監(jiān)聽(tīng)email隊(duì)列

@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_EMAIL})

public void receive_email(String msg,Message message,Channel channel){

System.out.println(msg);

}

//監(jiān)聽(tīng)sms隊(duì)列

@RabbitListener(queues = {RabbitmqConfig.QUEUE_INFORM_SMS})

public void receive_sms(String msg,Message message,Channel channel){

System.out.println(msg);

}

}

====================================

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 什么叫消息隊(duì)列? 消息(Message)是指在應(yīng)用間傳送的數(shù)據(jù)。消息可以非常簡(jiǎn)單,比如只包含文本字符串,也可以更復(fù)...
    Agile_dev閱讀 2,435評(píng)論 0 24
  • http://liuxing.info/2017/06/30/Spring%20AMQP%E4%B8%AD%E6%...
    sherlock_6981閱讀 16,208評(píng)論 2 11
  • % rabbitMQ learn% qijun% 19/01/2018 mq 的一些概念 mq: mq 是一個(gè)m...
    c7d122ec46c0閱讀 2,142評(píng)論 0 21
  • 網(wǎng)上rabbitmq的學(xué)習(xí)日志非常豐富,官網(wǎng)文檔也很完美,這里主要記錄學(xué)習(xí)和部署過(guò)程中的一些記錄。會(huì)按以下菜單進(jìn)行...
    恐龍打醬油閱讀 2,654評(píng)論 0 4
  • 關(guān)于消息隊(duì)列,從前年開(kāi)始斷斷續(xù)續(xù)看了些資料,想寫(xiě)很久了,但一直沒(méi)騰出空,近來(lái)分別碰到幾個(gè)朋友聊這塊的技術(shù)選型,是時(shí)...
    預(yù)流閱讀 586,615評(píng)論 51 787

友情鏈接更多精彩內(nèi)容