RabbitMQ 進(jìn)階- 阿里云服務(wù)器部署RabbitMQ集群

一、為什么要搭建RabbitMQ集群?

未部署集群的缺點(diǎn)

如果RabbitMQ集群只有一個(gè)broker節(jié)點(diǎn),那么該節(jié)點(diǎn)的失效將導(dǎo)致整個(gè)服務(wù)臨時(shí)性的不可用,并且可能會(huì)導(dǎo)致message的丟失(尤其是在非持久化message存儲(chǔ)于非持久化queue中的時(shí)候)??梢詫⑺衜essage都設(shè)置為持久化,并且使用持久化的queue,但是這樣仍然無(wú)法避免由于緩存導(dǎo)致的問題:因?yàn)閙essage在發(fā)送之后和被寫入磁盤并執(zhí)行fsync之間存在一個(gè)雖然短暫但是會(huì)產(chǎn)生問題的時(shí)間窗。通過publisher的confirm機(jī)制能夠確??蛻舳酥滥男﹎essage已經(jīng)存入磁盤,盡管如此,一般不希望遇到因單點(diǎn)故障導(dǎo)致服務(wù)不可用。

部署集群后

如果RabbitMQ集群是由多個(gè)broker節(jié)點(diǎn)構(gòu)成的,那么從服務(wù)的整體可用性上來(lái)講,該集群對(duì)于單點(diǎn)失效是有彈性的,但是同時(shí)也需要注意:盡管exchange和binding能夠在單點(diǎn)失效問題上幸免于難,但是queue和其上持有的message卻不行,這是因?yàn)閝ueue及其內(nèi)容僅僅存儲(chǔ)于單個(gè)節(jié)點(diǎn)之上,所以一個(gè)節(jié)點(diǎn)的失效表現(xiàn)為其對(duì)應(yīng)的queue不可用。

RabbitMQ本身是基于Erlang編寫,Erlang語(yǔ)言天生具備分布式特性(通過同步Erlang集群各節(jié)點(diǎn)的erlang.cookie來(lái)實(shí)現(xiàn))。因此,RabbitMQ天然支持集群。集群是保證可靠性的一種方式,同時(shí)可以通過水平擴(kuò)展以達(dá)到增加消息吞吐量能力的目的。

為了提高程序的吞吐量,保持消息的可靠性,一臺(tái)機(jī)器掛了后,RabbitMQ能夠正常生產(chǎn),消費(fèi)消息。

二、RabbitMQ集群的三種模式

rabbitmq有三種模式:?jiǎn)螜C(jī)模式,普通集群模式,鏡像集群模式

??單機(jī)模式

Demo級(jí)別的,一般只是本機(jī)測(cè)試玩玩而已,生產(chǎn)環(huán)境下不會(huì)用的。

?普通集群模式

在多臺(tái)機(jī)器上啟動(dòng)多個(gè)rabbitmq實(shí)例,每個(gè)機(jī)器啟動(dòng)一個(gè)。
但是你創(chuàng)建的queue,只會(huì)放在一個(gè)rabbtimq實(shí)例上,但是每個(gè)實(shí)例都同步queue的元數(shù)據(jù)(存放含queue數(shù)據(jù)的真正實(shí)例位置)。消費(fèi)的時(shí)候,實(shí)際上如果連接到了另外一個(gè)實(shí)例,那么那個(gè)實(shí)例會(huì)從queue所在實(shí)例上拉取數(shù)據(jù)過來(lái)。

示意圖

這種方式確實(shí)很麻煩,也不怎么好,沒做到所謂的分布式,就是個(gè)普通集群。
普通集群的方式,確實(shí)達(dá)到了消息的高可用,但沒辦法保證可靠性,沒做到分布式,簡(jiǎn)而言之,只是一個(gè)普通的集群。

缺點(diǎn):

  • 可能會(huì)在RabbitMQ集群內(nèi)部產(chǎn)生大量數(shù)據(jù)傳輸
  • 可用性沒有達(dá)到保證,一臺(tái)機(jī)器掛了就是掛了,無(wú)法恢復(fù),只能手動(dòng)恢復(fù)
?鏡像隊(duì)列

這種模式,才是所謂的rabbitmq的高可用模式,跟普通集群模式不一樣的是,你創(chuàng)建的queue,無(wú)論元數(shù)據(jù)還是queue里的消息都會(huì)存在于多個(gè)實(shí)例上,然后每次你寫消息到queue的時(shí)候,都會(huì)自動(dòng)把消息到多個(gè)實(shí)例的queue里進(jìn)行消息同步。

上圖中每個(gè)節(jié)點(diǎn)有一個(gè)queue,生產(chǎn)者生產(chǎn)完畢數(shù)據(jù)后投遞到指定交換機(jī)的隊(duì)列,交換機(jī)的隊(duì)列進(jìn)行消息同步。

每個(gè)節(jié)點(diǎn)queue都有一個(gè)完整的rabbitmq節(jié)點(diǎn),所以這種方式叫做鏡像集群

鏡像集群模式的好處與壞處

好處: 任何一個(gè)節(jié)點(diǎn)宕機(jī)后,其它節(jié)點(diǎn)不受影響,正常使用

壞處:

  • 性能開銷大,消息同步所有機(jī)器,導(dǎo)致網(wǎng)絡(luò)帶寬壓力和消耗很重
  • 沒有擴(kuò)展性,如果某個(gè)queue負(fù)載很重,加機(jī)器,新增的機(jī)器也包含了這個(gè)queue的所有數(shù)據(jù),沒有辦法擴(kuò)展
    對(duì)于以上方式,我們的鏡像集群可以通過配置來(lái)解決這種擴(kuò)展性的問題,配置同步的方式

三、阿里云服務(wù)器下Docker搭建RabbitMQ集群

??Docker安裝RabbitMQ集群

確保機(jī)器中安裝了Docker,若未安裝,可看:【云原生】Docker入門 – 阿里云服務(wù)器Linux環(huán)境下安裝Docker

使用Docker安裝RabbitMQ鏡像

# 拉取鏡像,帶有管理界面的版本
docker pull rabbitmq:management

查看拉取的鏡像

docker images
運(yùn)行Docker鏡像
# 開啟第一個(gè)RabbitMQ服務(wù)
docker run -d --hostname myRabbit1 --name rabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
# 開啟第二個(gè)
docker run -d --hostname myRabbit2 --name rabbit2 -p 15673:15672 -p 5673:5672 --link rabbit1:myRabbit1 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
# 開啟第三個(gè)
docker run -d --hostname myRabbit3 --name rabbit3 -p 15674:15672 -p 5674:5672 --link rabbit1:myRabbit1 --link rabbit2:myRabbit2 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:management
查看正在運(yùn)行的鏡像
docker ps

成功運(yùn)行

配置RabbitMQ節(jié)點(diǎn)之間的關(guān)系

設(shè)置節(jié)點(diǎn)1

[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit1 ...
root@myRabbit1:/# rabbitmqctl
設(shè)置節(jié)點(diǎn)2
[root@wanghuichen /]# docker exec -it rabbit2 bash
root@myRabbit2:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit2 ...
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Clustering node rabbit@myRabbit2 with rabbit@myRabbit1
root@myRabbit2:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
配置節(jié)點(diǎn)3
[root@wanghuichen /]# docker exec -it rabbit3 bash
root@myRabbit3:/# rabbitmqctl stop_app
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Stopping rabbit application on node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl reset
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Resetting node rabbit@myRabbit3 ...
root@myRabbit3:/# rabbitmqctl join_cluster --ram rabbit@myRabbit1
root@myRabbit3:/# rabbitmqctl start_app

進(jìn)入每個(gè)集群依次設(shè)置用戶密碼

rabbitmqctl add_user admin admin

rabbitmqctl set_user_tags admin administrator

rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

配置鏡像隊(duì)列

[root@wanghuichen /]# docker exec -it rabbit1 bash
root@myRabbit1:/# rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Setting policy "ha-all" for pattern "^" to "{"ha-mode":"all"}" with priority "0" for vhost "/" ...
root@myRabbit1:/# rabbitmqctl cluster_status
RABBITMQ_ERLANG_COOKIE env variable support is deprecated and will be REMOVED in a future version. Use the $HOME/.erlang.cookie file or the --erlang-cookie switch instead.
Cluster status of node rabbit@myRabbit1 ...
查看集群狀態(tài)
rabbitmqctl cluster_status
常用命令
# 查看已經(jīng)運(yùn)行過但停止了的鏡像
docker ps -a
# 停止鏡像
docker stop 鏡像id/鏡像名稱
# 開啟鏡像,恢復(fù)運(yùn)行狀態(tài)
dockers start 鏡像id/鏡像名稱
# 刪除鏡像
docker rm 鏡像id/鏡像名稱
# 刪除所有鏡像
docker rmi $(docker ps -a)

??測(cè)試RabbitMQ集群

瀏覽器輸入 您的ip地址:15673


部署RabbitMQ鏡像集群成功~

如果出現(xiàn)無(wú)法訪問的情況,可在阿里云服務(wù)器開啟安全組,因?yàn)榘⒗镌颇J(rèn)全部開啟了安全組,配置允許訪問的端口即可

再次測(cè)試即可成功~

四、SpringBoot整合RabbitMQ集群

?創(chuàng)建Maven聚合工程

File —> New —> Project —> Maven —> 直接Next 進(jìn)入下一步創(chuàng)建普通的Maven工程即可

創(chuàng)建一個(gè)默認(rèn)的Maven聚合工程,將src文件夾刪除,該工程就是一個(gè)Maven聚合工程

??引入共有依賴

引入依賴如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.wanshi</groupId>
    <artifactId>springboot-rabbitmq</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>
    <modules>
        <module>rabbitmq-order-producer</module>
        <module>rabbitmq-order-consumer</module>
    </modules>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    </dependencyManagement>

</project>
?創(chuàng)建生產(chǎn)者

在項(xiàng)目?jī)?nèi),新建一個(gè)Moudle,rabbitmq-order-producer 默認(rèn)Maven工程,下一步即可

引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.5</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-order-producer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>
?創(chuàng)建消費(fèi)者

在項(xiàng)目?jī)?nèi),新建一個(gè)Moudle,rabbitmq-order-cousumer 默認(rèn)Maven工程,下一步即可

引入依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-rabbitmq</artifactId>
        <groupId>com.wanshi</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rabbitmq-order-consumer</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
    </dependencies>

</project>

Maven聚合工程創(chuàng)建完成圖

Maven依賴圖

自行手寫MainApplication即可

創(chuàng)建完成!

??核心源碼

生產(chǎn)者服務(wù)配置
# 服務(wù)端口
server:
  port: 8080
# 配置rabbitmq服務(wù)
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    connection-timeout: 16000
    addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,
    # 啟用消息確認(rèn)模式
    publisher-confirm-type: correlated

    # 啟用 return 消息模式
    publisher-returns: true
    template:
      mandatory: true

消費(fèi)者服務(wù)配置
# 服務(wù)端口
server:
  port: 8081
# 配置rabbitmq服務(wù)
spring:
  rabbitmq:
    username: admin
    password: admin
    virtual-host: /
    addresses: 8.130.28.198:5672, 8.130.28.198:5673, 8.130.28.198:5674,

生產(chǎn)者

package com.wanshi.service;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.UUID;

/**
 * @author whc
 * @date 2022/5/23 18:50
 */

@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void makeOrder() {
        String orderId = UUID.randomUUID().toString();
        System.out.println("訂單生成成功:" + orderId);
        String exchange_name = "fanout_order_exchange";
        String routeingKey = "";
        rabbitTemplate.convertAndSend(exchange_name, routeingKey, orderId);
    }
}

消費(fèi)者

交換機(jī)的聲明與隊(duì)列我們放在消費(fèi)者端,因?yàn)橄M(fèi)者是先開啟的,如果沒有交換機(jī)和隊(duì)列,則會(huì)報(bào)錯(cuò)!
RabbitMQConfiguration
package com.wanshi.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author whc
 * @date 2022/5/23 10:18
 */
@Configuration
public class RabbitMQConfiguration {

    //1.聲明注冊(cè)fanout模式的交換機(jī)
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //2.聲明隊(duì)列,sms.fanout.queue email.fanout.queue msg.fanout.queue
    @Bean
    public Queue smsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    @Bean
    public Queue emailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue msgQueue() {
        return new Queue("msg.fanout.queue", true);
    }

    //3.完成綁定關(guān)系(隊(duì)列與交換機(jī)完成綁定關(guān)系)
    @Bean
    public Binding smsBind() {
        return BindingBuilder.bind(smsQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBind() {
        return BindingBuilder.bind(emailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding msgBind() {
        return BindingBuilder.bind(msgQueue()).to(fanoutExchange());
    }
}
編寫具體業(yè)務(wù)消費(fèi)類
FanoutEmailConsumer
package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:53
 */
@RabbitListener(queues = "email.fanout.queue")
@Component
public class FanoutEmailConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout email ==>" + message);
    }
}
FanoutMsgConsumer
package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:55
 */
@RabbitListener(queues = "msg.fanout.queue")
@Component
public class FanoutMsgConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout msg ==>" + message);
    }
}
FanoutSmsConsumer
package com.wanshi.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @author whc
 * @date 2022/5/23 18:54
 */
@RabbitListener(queues = "sms.fanout.queue")
@Component
public class FanoutSmsConsumer {

    @RabbitHandler
    public void messageService(String message) {
        System.out.println("fanout sms ==> " + message);
    }
}

編寫完成!

五、測(cè)試消息的生產(chǎn)與消費(fèi)

啟動(dòng)消費(fèi)者,查看RabbitMQ隊(duì)列的情況

啟動(dòng)消費(fèi)者


查看RabbitMQweb管理界面綁定信息

交換機(jī)

=


查看隊(duì)列Queue

查看其它兩臺(tái)機(jī)器是否同步了數(shù)據(jù)

15674


15675


生產(chǎn)者投遞消息,查看消費(fèi)者消費(fèi)情況

成功消費(fèi)數(shù)據(jù)!

只生產(chǎn)消息,關(guān)閉消費(fèi)者,查看消息同步情況

已成功同步消息~

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容