一、為什么要搭建RabbitMQ集群?
未部署集群的缺點(diǎn)
如果RabbitMQ集群只有一個(gè)broker節(jié)點(diǎn),那么該節(jié)點(diǎn)的失效將導(dǎo)致整個(gè)服務(wù)臨時(shí)性的不可用,并且可能會(huì)導(dǎo)致message的丟失(尤其是在非持久化message存儲(chǔ)于非持久化queue中的時(shí)候)??梢詫⑺衜essage都設(shè)置為持久化,并且使用持久化的queue,但是這樣仍然無(wú)法避免由于緩存導(dǎo)致的問題:因?yàn)閙essage在發(fā)送之后和被寫入磁盤并執(zhí)行fsync之間存在一個(gè)雖然短暫但是會(huì)產(chǎn)生問題的時(shí)間窗。通過publisher的confirm機(jī)制能夠確??蛻舳酥滥男﹎essage已經(jīng)存入磁盤,盡管如此,一般不希望遇到因單點(diǎn)故障導(dǎo)致服務(wù)不可用。
部署集群后
如果RabbitMQ集群是由多個(gè)broker節(jié)點(diǎn)構(gòu)成的,那么從服務(wù)的整體可用性上來(lái)講,該集群對(duì)于單點(diǎn)失效是有彈性的,但是同時(shí)也需要注意:盡管exchange和binding能夠在單點(diǎn)失效問題上幸免于難,但是queue和其上持有的message卻不行,這是因?yàn)閝ueue及其內(nèi)容僅僅存儲(chǔ)于單個(gè)節(jié)點(diǎn)之上,所以一個(gè)節(jié)點(diǎn)的失效表現(xiàn)為其對(duì)應(yīng)的queue不可用。
RabbitMQ本身是基于Erlang編寫,Erlang語(yǔ)言天生具備分布式特性(通過同步Erlang集群各節(jié)點(diǎn)的erlang.cookie來(lái)實(shí)現(xiàn))。因此,RabbitMQ天然支持集群。集群是保證可靠性的一種方式,同時(shí)可以通過水平擴(kuò)展以達(dá)到增加消息吞吐量能力的目的。
為了提高程序的吞吐量,保持消息的可靠性,一臺(tái)機(jī)器掛了后,RabbitMQ能夠正常生產(chǎn),消費(fèi)消息。
二、RabbitMQ集群的三種模式
rabbitmq有三種模式:?jiǎn)螜C(jī)模式,普通集群模式,鏡像集群模式
??單機(jī)模式
Demo級(jí)別的,一般只是本機(jī)測(cè)試玩玩而已,生產(chǎn)環(huán)境下不會(huì)用的。
?普通集群模式
在多臺(tái)機(jī)器上啟動(dòng)多個(gè)rabbitmq實(shí)例,每個(gè)機(jī)器啟動(dòng)一個(gè)。
但是你創(chuàng)建的queue,只會(huì)放在一個(gè)rabbtimq實(shí)例上,但是每個(gè)實(shí)例都同步queue的元數(shù)據(jù)(存放含queue數(shù)據(jù)的真正實(shí)例位置)。消費(fèi)的時(shí)候,實(shí)際上如果連接到了另外一個(gè)實(shí)例,那么那個(gè)實(shí)例會(huì)從queue所在實(shí)例上拉取數(shù)據(jù)過來(lái)。
示意圖

這種方式確實(shí)很麻煩,也不怎么好,沒做到所謂的分布式,就是個(gè)普通集群。
普通集群的方式,確實(shí)達(dá)到了消息的高可用,但沒辦法保證可靠性,沒做到分布式,簡(jiǎn)而言之,只是一個(gè)普通的集群。
缺點(diǎn):
- 可能會(huì)在RabbitMQ集群內(nèi)部產(chǎn)生大量數(shù)據(jù)傳輸
- 可用性沒有達(dá)到保證,一臺(tái)機(jī)器掛了就是掛了,無(wú)法恢復(fù),只能手動(dòng)恢復(fù)
?鏡像隊(duì)列
這種模式,才是所謂的rabbitmq的高可用模式,跟普通集群模式不一樣的是,你創(chuàng)建的queue,無(wú)論元數(shù)據(jù)還是queue里的消息都會(huì)存在于多個(gè)實(shí)例上,然后每次你寫消息到queue的時(shí)候,都會(huì)自動(dòng)把消息到多個(gè)實(shí)例的queue里進(jìn)行消息同步。

上圖中每個(gè)節(jié)點(diǎn)有一個(gè)queue,生產(chǎn)者生產(chǎn)完畢數(shù)據(jù)后投遞到指定交換機(jī)的隊(duì)列,交換機(jī)的隊(duì)列進(jìn)行消息同步。
每個(gè)節(jié)點(diǎn)queue都有一個(gè)完整的rabbitmq節(jié)點(diǎn),所以這種方式叫做鏡像集群
鏡像集群模式的好處與壞處
好處: 任何一個(gè)節(jié)點(diǎn)宕機(jī)后,其它節(jié)點(diǎn)不受影響,正常使用
壞處:
- 性能開銷大,消息同步所有機(jī)器,導(dǎo)致網(wǎng)絡(luò)帶寬壓力和消耗很重
- 沒有擴(kuò)展性,如果某個(gè)queue負(fù)載很重,加機(jī)器,新增的機(jī)器也包含了這個(gè)queue的所有數(shù)據(jù),沒有辦法擴(kuò)展
對(duì)于以上方式,我們的鏡像集群可以通過配置來(lái)解決這種擴(kuò)展性的問題,配置同步的方式
三、阿里云服務(wù)器下Docker搭建RabbitMQ集群
??Docker安裝RabbitMQ集群
確保機(jī)器中安裝了Docker,若未安裝,可看:【云原生】Docker入門 – 阿里云服務(wù)器Linux環(huán)境下安裝Docker
使用Docker安裝RabbitMQ鏡像
# 拉取鏡像,帶有管理界面的版本
docker pull rabbitmq:management
查看拉取的鏡像
docker images

運(yùn)行Docker鏡像
# 開啟第一個(gè)RabbitMQ服務(wù)
docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
# 開啟第二個(gè)
docker run -d --hostname myRabbit2 --name rabbit2 -p 15673:15672 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
# 開啟第三個(gè)
docker run -d --hostname myRabbit3 --name rabbit3 -p 15674:15672 -p 5674:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
查看正在運(yùn)行的鏡像
docker ps
成功運(yùn)行
配置RabbitMQ節(jié)點(diǎn)之間的關(guān)系
設(shè)置節(jié)點(diǎn)1
[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl

設(shè)置節(jié)點(diǎn)2
[root@wanghuichen /]# docker exec -it rabbit2 bash
root@myRabbit2:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Clustering node rabbit@myRabbit2 with rabbit@myRabbit1
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1

配置節(jié)點(diǎn)3
[root@wanghuichen /]# docker exec -it rabbit3 bash
root@myRabbit3:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
root@myRabbit3:/# rabbitmqctl start_app

進(jìn)入每個(gè)集群依次設(shè)置用戶密碼
rabbitmqctl add_user admin admin
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
配置鏡像隊(duì)列
[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
root@myRabbit1:/# rabbitmqctl cluster_status
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Cluster status of node rabbit@myRabbit1 ...

查看集群狀態(tài)
rabbitmqctl cluster_status

常用命令
# 查看已經(jīng)運(yùn)行過但停止了的鏡像
docker ps -a
# 停止鏡像
docker stop 鏡像id/鏡像名稱
# 開啟鏡像,恢復(fù)運(yùn)行狀態(tài)
dockers start 鏡像id/鏡像名稱
# 刪除鏡像
docker rm 鏡像id/鏡像名稱
# 刪除所有鏡像
docker rmi $(docker ps -a)
??測(cè)試RabbitMQ集群
瀏覽器輸入 您的ip地址:15673

部署RabbitMQ鏡像集群成功~
如果出現(xiàn)無(wú)法訪問的情況,可在阿里云服務(wù)器開啟安全組,因?yàn)榘⒗镌颇J(rèn)全部開啟了安全組,配置允許訪問的端口即可

再次測(cè)試即可成功~
四、SpringBoot整合RabbitMQ集群
?創(chuàng)建Maven聚合工程
File —> New —> Project —> Maven —> 直接Next 進(jìn)入下一步創(chuàng)建普通的Maven工程即可

創(chuàng)建一個(gè)默認(rèn)的Maven聚合工程,將src文件夾刪除,該工程就是一個(gè)Maven聚合工程
??引入共有依賴
引入依賴如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.wanshi</groupId>
<artifactId>springboot-rabbitmq</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>rabbitmq-order-producer</module>
<module>rabbitmq-order-consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</dependencyManagement>
</project>
?創(chuàng)建生產(chǎn)者
在項(xiàng)目?jī)?nèi),新建一個(gè)Moudle,rabbitmq-order-producer 默認(rèn)Maven工程,下一步即可
引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.5</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq-order-producer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
?創(chuàng)建消費(fèi)者
在項(xiàng)目?jī)?nèi),新建一個(gè)Moudle,rabbitmq-order-cousumer 默認(rèn)Maven工程,下一步即可
引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>springboot-rabbitmq</artifactId>
<groupId>com.wanshi</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rabbitmq-order-consumer</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
</project>
Maven聚合工程創(chuàng)建完成圖

Maven依賴圖

自行手寫MainApplication即可
創(chuàng)建完成!
??核心源碼
生產(chǎn)者服務(wù)配置
# 服務(wù)端口
server:
port: 8080
# 配置rabbitmq服務(wù)
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
connection-timeout: 16000
addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,
# 啟用消息確認(rèn)模式
publisher-confirm-type: correlated
# 啟用 return 消息模式
publisher-returns: true
template:
mandatory: true
消費(fèi)者服務(wù)配置
# 服務(wù)端口
server:
port: 8081
# 配置rabbitmq服務(wù)
spring:
rabbitmq:
username: admin
password: admin
virtual-host: /
addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,
生產(chǎn)者
package com.wanshi.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
/**
* @author whc
* @date 2022/5/23 18:50
*/
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrder() {
String orderId = UUID.randomUUID().toString();
System.out.println("訂單生成成功:" + orderId);
String exchange_name = "fanout_order_exchange";
String routeingKey = "";
rabbitTemplate.convertAndSend(exchange_name, routeingKey, orderId);
}
}
消費(fèi)者
交換機(jī)的聲明與隊(duì)列我們放在消費(fèi)者端,因?yàn)橄M(fèi)者是先開啟的,如果沒有交換機(jī)和隊(duì)列,則會(huì)報(bào)錯(cuò)!
RabbitMQConfiguration
package com.wanshi.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author whc
* @date 2022/5/23 10:18
*/
@Configuration
public class RabbitMQConfiguration {
//1.聲明注冊(cè)fanout模式的交換機(jī)
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
//2.聲明隊(duì)列,sms.fanout.queue email.fanout.queue msg.fanout.queue
@Bean
public Queue smsQueue() {
return new Queue("sms.fanout.queue", true);
}
@Bean
public Queue emailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue msgQueue() {
return new Queue("msg.fanout.queue", true);
}
//3.完成綁定關(guān)系(隊(duì)列與交換機(jī)完成綁定關(guān)系)
@Bean
public Binding smsBind() {
return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBind() {
return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
}
@Bean
public Binding msgBind() {
return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
}
}
編寫具體業(yè)務(wù)消費(fèi)類
FanoutEmailConsumer
package com.wanshi.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author whc
* @date 2022/5/23 18:53
*/
@RabbitListener(queues = "email.fanout.queue")
@Component
public class FanoutEmailConsumer {
@RabbitHandler
public void messageService(String message) {
System.out.println("fanout email ==>" + message);
}
}
FanoutMsgConsumer
package com.wanshi.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author whc
* @date 2022/5/23 18:55
*/
@RabbitListener(queues = "msg.fanout.queue")
@Component
public class FanoutMsgConsumer {
@RabbitHandler
public void messageService(String message) {
System.out.println("fanout msg ==>" + message);
}
}
FanoutSmsConsumer
package com.wanshi.service;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @author whc
* @date 2022/5/23 18:54
*/
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class FanoutSmsConsumer {
@RabbitHandler
public void messageService(String message) {
System.out.println("fanout sms ==> " + message);
}
}
編寫完成!
五、測(cè)試消息的生產(chǎn)與消費(fèi)
啟動(dòng)消費(fèi)者,查看RabbitMQ隊(duì)列的情況
啟動(dòng)消費(fèi)者

查看RabbitMQweb管理界面綁定信息
交換機(jī)
=

查看隊(duì)列Queue

查看其它兩臺(tái)機(jī)器是否同步了數(shù)據(jù)
15674

15675

生產(chǎn)者投遞消息,查看消費(fèi)者消費(fèi)情況

成功消費(fèi)數(shù)據(jù)!
只生產(chǎn)消息,關(guān)閉消費(fèi)者,查看消息同步情況
已成功同步消息~
