一、 什么是 RabbitMQ
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語言編寫的,而集群和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。
二、 安裝 RabbitMQ
1 安裝 Erlang
1.1什么是 Erlang
Erlang(['?:l??])是一種通用的面向并發(fā)的編程語言,它由瑞典電信設(shè)備制造商愛立信所轄的 CS-Lab 開發(fā),目的是創(chuàng)造一種可以應(yīng)對(duì)大規(guī)模并發(fā)活動(dòng)的編程語言和運(yùn)行環(huán)境
系統(tǒng)版本:CentOS6.5 ; RabbitMQ-Server:3.5.1
2 安裝 erlang
2.1安裝準(zhǔn)備,下載安裝文件
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.r pm
rpm-Uvherlang-solutions-1.0-1.noarch.rpm
修改 primary.xml.gz 的 sha 的加密值
cd/var/cache/yum/x86_64/6/erlang-solutionssha1sumprimary.xml.gz
修改vimrepomd.xml
<datatype="primary">
<checksumtype="sha">結(jié)果為 sha1sum 命令結(jié)果</checksum>
3 安裝 erlang
yum install erlang
4 安裝完成后可以用 erl 命令查看是否安裝成功
erl-version
5 安裝 RabbitMQServer
5.1安裝準(zhǔn)備,下載 RabbitMQServer
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmqserver-3.5.1-1.noarch.rpm
5.2安裝 RabbitMQServer
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.5.1-1.noarch.rpm
6 啟動(dòng) RabbitMQ
6.1配置為守護(hù)進(jìn)程隨系統(tǒng)自動(dòng)啟動(dòng),root 權(quán)限下執(zhí)行:
chkconfig rabbitmq-serveron
6.2啟動(dòng) rabbitMQ 服務(wù)
/sbin/service rabbitmq-serverstart
7 安裝 Web 管理界面插件
7.1安裝命令
rabbitmq-plugins enable rabbitmq_management
7.2安裝成功后會(huì)顯示如下內(nèi)容
The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration haschanged. Restart RabbitMQ forchanges totakeeffect.
8 設(shè)置 RabbitMQ 遠(yuǎn)程 ip 登錄
因?yàn)槟J(rèn)的賬號(hào)和密碼都是guest ;所以我們以創(chuàng)建個(gè)admin帳號(hào),密碼 123456 為例,創(chuàng)建一個(gè)賬號(hào)并 支持遠(yuǎn)程 ip 訪問.
8.1創(chuàng)建賬號(hào)
rabbitmqctl add_user admin 123456
8.2設(shè)置用戶角色
rabbitmqctl set_user_tags admin administrator
8.3設(shè)置用戶權(quán)限
rabbitmqctl set_permissions-p"/"oldlu".""."".*"
8.4設(shè)置完成后可以查看當(dāng)前用戶和角色(需要開啟服務(wù))
rabbitmqctl list_users
瀏覽器輸入:serverip:15672。其中 serverip 是 RabbitMQ-Server 所 在主機(jī)的 ip。
三、 為什么要使用 RabbitMQ?他解決了什么問題?
四、 消息隊(duì)列基礎(chǔ)知識(shí)。
1 Provider
消息生產(chǎn)者,就是投遞消息的程序。
2 Consumer
消息消費(fèi)者,就是接受消息的程序。
3 沒有使用消息隊(duì)列時(shí)消息傳遞方式

4 使用消息隊(duì)列后消息傳遞方式

5 什么是隊(duì)列?
隊(duì)列就像存放了商品的倉庫或者商店,是生產(chǎn)商品的工廠和購買商品的用戶之間的中轉(zhuǎn)站
6 隊(duì)列里存儲(chǔ)了什么?
在 rabbitMQ 中,信息流從你的應(yīng)用程序出發(fā),來到 Rabbitmq 的隊(duì)列,所有信息可以只 存儲(chǔ)在一個(gè)隊(duì)列中。隊(duì)列可以存儲(chǔ)很多信息,因?yàn)樗旧鲜且粋€(gè)無限制的緩沖區(qū),前提是 你的機(jī)器有足夠的存儲(chǔ)空間。
7 隊(duì)列和應(yīng)用程序的關(guān)系?
多個(gè)生產(chǎn)者可以將消息發(fā)送到同一個(gè)隊(duì)列中,多個(gè)消息者也可以只從同一個(gè)隊(duì)列接收數(shù)據(jù)。
五、 編寫 RabbitMQ 的入門案例
1 搭建項(xiàng)目環(huán)境
1.1創(chuàng)建項(xiàng)目
1.2修改 pom 文件添加 RabbitMQ 坐標(biāo)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1.3修改全局配置文件,添加 RabbitMQ 相關(guān)的配置
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
2 編寫代碼
創(chuàng)建隊(duì)列
/**
* 創(chuàng)建消息隊(duì)列
* @author Administrator
* */
@Configuration
public class QueueConfig {
/** * 創(chuàng)建隊(duì)列 * @return */
@Bean
public Queue createQueue(){
return new Queue("hello-queue");
}
}
創(chuàng)建消息提供者
/**
* 消息發(fā)送者
*
* @author Administrator
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:隊(duì)列的名稱。
//參數(shù)二:消息
this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);
}
}
消息接收者
@Component
public class Receiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
* @param msg
*/
@RabbitListener(queues="hello-queue")
public void process(String msg){
System.out.println("receiver: "+msg);
}
}
測(cè)試代碼
@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootServerApplication.class)
public class QueueTest {
@Autowired
private Sender sender;
/*
* 測(cè)試消息隊(duì)列
*/
@Test
public void test1() throws Exception {
while (true) {
Thread.sleep(1000);
this.sender.send("Hello RabbitMQ");
}
}
}
六、 RabbitMQ 原理圖
1.Message
消息。消息是不具名的,它由消息頭消息體組成。消息體是不透明的,而消息頭則由 一系列可選屬性組成,這些屬性包括:routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先 權(quán))、delivery-mode(指出消息可能持久性存儲(chǔ))等。
2.Publisher
消息的生產(chǎn)者。也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
3.Consumer
消息的消費(fèi)者。表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
4.Exchange
交換器。用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
三種常用的交換器類型
- direct(發(fā)布與訂閱 完全匹配)
- fanout(廣播)
- topic(主題,規(guī)則匹配)
5.Binding
綁定。用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息 隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
6.Queue
消息隊(duì)列。用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一 個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者鏈接到這個(gè)隊(duì)列將其取 走。
7.Routing-key
路由鍵。RabbitMQ 決定消息該投遞到哪個(gè)隊(duì)列的規(guī)則。 隊(duì)列通過路由鍵綁定到交換器。 消息發(fā)送到 MQ 服務(wù)器時(shí),消息將擁有一個(gè)路由鍵,即便是空的,RabbitMQ 也會(huì)將其 和綁定使用的路由鍵進(jìn)行匹配。 如果相匹配,消息將會(huì)投遞到該隊(duì)列。 如果不匹配,消息將會(huì)進(jìn)入黑洞。
8.Connection
鏈接。指 rabbit 服務(wù)器和服務(wù)建立的 TCP 鏈接。
9.Channel 信道。
1,Channel 中文叫做信道,是 TCP 里面的虛擬鏈接。例如:電纜相當(dāng)于 TCP,信道是 一個(gè)獨(dú)立光纖束,一條 TCP 連接上創(chuàng)建多條信道是沒有問題的。
2,TCP 一旦打開,就會(huì)創(chuàng)建 AMQP 信道。
3,無論是發(fā)布消息、接收消息、訂閱隊(duì)列,這些動(dòng)作都是通過信道完成的。 10.VirtualHost
虛擬主機(jī)。表示一批交換器,消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證 和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有 自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在鏈接時(shí)指定, RabbitMQ 默認(rèn)的是 vhost
11.Borker
表示消息隊(duì)列服務(wù)器實(shí)體。
12.交換器和隊(duì)列的關(guān)系
交換器是通過路由鍵和隊(duì)列綁定在一起的,如果消息擁有的路由鍵跟隊(duì)列和交換器的 路由鍵匹配,那么消息就會(huì)被路由到該綁定的隊(duì)列中。
也就是說,消息到隊(duì)列的過程中,消息首先會(huì)經(jīng)過交換器,接下來交換器在通過路由鍵匹配分發(fā)消息到具體的隊(duì)列中。 路由鍵可以理解為匹配的規(guī)則。
13.RabbitMQ 為什么需要信道?為什么不是 TCP 直接通信?
1.TCP 的創(chuàng)建和銷毀開銷特別大。創(chuàng)建需要 3 次握手,銷毀需要 4 次分手。
- 如果不用信道,那應(yīng)用程序就會(huì)以 TCP 鏈接 Rabbit,高峰時(shí)每秒成千上萬條鏈接 會(huì)造成資源巨大的浪費(fèi),而且操作系統(tǒng)每秒處理 TCP 鏈接數(shù)也是有限制的,必定造成性能 瓶頸。
- 信道的原理是 vhost同用一條 TCP 鏈接。一條 TCP 鏈接可以容納無限的信道,即使每秒成千上萬的請(qǐng)求也不會(huì)成為性能的瓶頸。
七、 Rabbit 交換器講解
1 Direct 交換器(發(fā)布與訂閱 完全匹配)
1.1需求

1.2搭建環(huán)境
1.2.1創(chuàng)建項(xiàng)目
1.2.2修改全局配置文件
修改 Consumer 的配置文件
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
設(shè)置交換器的名稱 mq.config.exchange=log.direct
info 隊(duì)列名稱 mq.config.queue.info=log.info
info 路由鍵 mq.config.queue.info.routing.key=log.info.routing.key
error 隊(duì)列名稱 mq.config.queue.error=log.error
error 路由鍵 mq.config.queue.error.routing.key=log.error.routing.key
修改 Provider 的配置文件
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu spring.rabbitmq.password=123456
設(shè)置交換器的名稱 mq.config.exchange=log.direct
info 路由鍵 mq.config.queue.info.routing.key=log.info.routing.key
error 路由鍵 mq.config.queue.error.routing.key=log.error.routing.key
error 隊(duì)列名稱 mq.config.queue.error=log.error
1.3編寫 Consumer
InfoReceiver
/**
* 消息接收者
*
* @author Administrator
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Info........receiver: " + msg);
}
}
ErrorReceiver
/**
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Error..........receiver: " + msg);
}
}
1.4編寫 Provider
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
//routingkey 路由鍵
@Value("${mq.config.queue.error.routing.key}")
private String routingkey;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:交換器名稱。
//參數(shù)二:路由鍵
//參數(shù)三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);
}
}
2 Topic 交換器(主題,規(guī)則匹配)
2.1需求

2.2搭建環(huán)境
2.2.1創(chuàng)建項(xiàng)目
2.2.2修改配置文件
Provider
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admib spring.rabbitmq.password=123456
設(shè)置交換器的名稱
mq.config.exchange=log.topic Consumer spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672 spring.rabbitmq.username=oldlu spring.rabbitmq.password=123456
設(shè)置交換器的名稱 mq.config.exchange=log.topic
info 隊(duì)列名稱 mq.config.queue.info=log.info
error 隊(duì)列名稱 mq.config.queue.error=log.error
log 隊(duì)列名稱 mq.config.queue.logs=log.all
2.3編寫 Provider
UserSender
@Component
public class UserSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:交換器名稱。
//參數(shù)二:路由鍵
//參數(shù)三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.debug", "user.log.debug....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.info", "user.log.info....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.warn", "user.log.warn....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.error", "user.log.error....." + msg);
}
}
ProductSender
@Component
public class ProductSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:交換器名稱。
//參數(shù)二:路由鍵
//參數(shù)三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.debug", "product.log.debug....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.info", "product.log.info....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.warn", "product.log.warn....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.error", "product.log.error....." + msg);
}
}
OrderSender
@Component
public class OrderSender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:交換器名稱。
//參數(shù)二:路由鍵
//參數(shù)三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.debug", "order.log.debug....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.info", "order.log.info....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.warn", "order.log.warn....." + msg);
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.error", "order.log.error....." + msg);
}
}
2.4編寫 Consumer
InfoReceiver
/**
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.info.routing.key}"
)
)
public class InfoReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Info........receiver: " + msg);
}
}
ErrorReceiver
/**
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
key = "${mq.config.queue.error.routing.key}"
)
)
public class ErrorReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Error..........receiver: " + msg);
}
}
LogsReceiver
/**
* 消息接收者
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列 *
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.logs}", autoDelete = "tr ue"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.*"))
public class LogsReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制 * @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("......All........receiver: " + msg);
}
}
3 Fanout 交換器(廣播)
3.1需求

3.2搭建環(huán)境
3.2.1創(chuàng)建項(xiàng)目
3.2.2修改配置文件
Consumer spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin spring.rabbitmq.password=123456
設(shè)置交換器的名稱
mq.config.exchange=order.fanout
短信服務(wù)隊(duì)列名稱 mq.config.queue.sms=order.sms
push 服務(wù)隊(duì)列名稱 mq.config.queue.push=order.push
Provider
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456設(shè)置交換器的名稱 mq.config.exchange=order.fanout
3.3編寫 Consumer
SmsReceiver
/**
* 消息接收者
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* key:路由鍵
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.sms}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class SmsReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Sms........receiver: " + msg);
}
}
PushReceiver
/**
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.push}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class PushReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("Push..........receiver: " + msg);
}
}
3.4編寫 Provider
/**
* 消息發(fā)送者
*/
@Component
public class Sender {
@Autowired
private AmqpTemplate rabbitAmqpTemplate;
//exchange 交換器名稱
@Value("${mq.config.exchange}")
private String exchange;
/*
* 發(fā)送消息的方法
*/
public void send(String msg) {
//向消息隊(duì)列發(fā)送消息
//參數(shù)一:交換器名稱。
//參數(shù)二:路由鍵
//參數(shù)三:消息
this.rabbitAmqpTemplate.convertAndSend(this.exchange, "", msg);
}
}
八、 使用 RabbitMQ 實(shí)現(xiàn)松耦合設(shè)計(jì)
1 需求

2 搭建環(huán)境
2.1修改配置文件
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456設(shè)置交換器的名稱
mq.config.exchange=order.fanout
短信服務(wù)隊(duì)列名稱
mq.config.queue.sms=order.sms
push 服務(wù)隊(duì)列名稱
mq.config.queue.push=order.push
紅包服務(wù)隊(duì)列名稱
mq.config.queue.red=red
2.2添加 RedReceiver
/**
* @RabbitListener bindings:綁定隊(duì)列
* @QueueBinding value:綁定隊(duì)列的名稱
* exchange:配置交換器
* key:路由鍵
* @Queue value:配置隊(duì)列名稱
* autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
* @Exchange value:為交換器起個(gè)名稱
* type:指定具體的交換器類型
*/
@Component
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "${mq.config.queue.red}", autoDelete = "true"),
exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
)
)
public class RedReceiver {
/**
* 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
*
* @param msg
*/
@RabbitHandler
public void process(String msg) {
System.out.println("給用戶發(fā)送10元紅包........receiver: " + msg);
}
}
九、 RabbitMQ 消息處理
1 RabbitMQ 的消息持久化處理
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保證消息可 靠性的呢——消息持久化。
1.1創(chuàng)建項(xiàng)目
1.2autoDelete屬性
@Queue: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除 隊(duì)列 ;true:刪除 false:不刪除
@Exchange:當(dāng)所有綁定隊(duì)列都不在使用時(shí),是否自動(dòng) 刪除交換器 true:刪除 false:不刪除
2 RabbitMQ 中的消息確認(rèn) ACK 機(jī)制
2.1創(chuàng)建項(xiàng)目
修改 Consusmer 配置文件解決 ACK 反饋問題
spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456設(shè)置交換器的名稱
mq.config.exchange=log.direct
info 隊(duì)列名稱
mq.config.queue.info=log.info
info 路由鍵
mq.config.queue.info.routing.key=log.info.routing.key
error 隊(duì)列名稱
mq.config.queue.error=log.error
error 路由鍵
mq.config.queue.error.routing.key=log.error.routing.key
開啟重試
spring.rabbitmq.listener.retry.enabled=true
重試次數(shù),默認(rèn)為 3 次
spring.rabbitmq.listener.retry.max-attempts=5
參考書籍:RabbitMQ實(shí)戰(zhàn)指南
總結(jié):這本書籍寫的是真的好,這也算是自己一個(gè)自我的總結(jié)吧,里面附有代碼,精心整理,請(qǐng)大家要好好閱讀,如有發(fā)現(xiàn)錯(cuò)誤,還請(qǐng)大家斧正.文章原創(chuàng),支持轉(zhuǎn)載.侵權(quán)必究.