RabbitMQ 入門到精通(這一篇就夠了)

一、 什么是 RabbitMQ

RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ服務(wù)器是用Erlang語言編寫的,而集群和故障轉(zhuǎn)移是構(gòu)建在開放電信平臺(tái)框架上的。所有主要的編程語言均有與代理接口通訊的客戶端庫。

二、 安裝 RabbitMQ

1 安裝 Erlang
1.1什么是 Erlang
Erlang(['?:l??])是一種通用的面向并發(fā)的編程語言,它由瑞典電信設(shè)備制造商愛立信所轄的 CS-Lab 開發(fā),目的是創(chuàng)造一種可以應(yīng)對(duì)大規(guī)模并發(fā)活動(dòng)的編程語言和運(yùn)行環(huán)境
系統(tǒng)版本:CentOS6.5 ; RabbitMQ-Server:3.5.1
2 安裝 erlang
2.1安裝準(zhǔn)備,下載安裝文件
wget https://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.r pm
rpm-Uvherlang-solutions-1.0-1.noarch.rpm
修改 primary.xml.gz 的 sha 的加密值
cd/var/cache/yum/x86_64/6/erlang-solutionssha1sumprimary.xml.gz
修改vimrepomd.xml

<datatype="primary">
<checksumtype="sha">結(jié)果為 sha1sum 命令結(jié)果</checksum>

3 安裝 erlang
yum install erlang
4 安裝完成后可以用 erl 命令查看是否安裝成功
erl-version
5 安裝 RabbitMQServer
5.1安裝準(zhǔn)備,下載 RabbitMQServer
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmqserver-3.5.1-1.noarch.rpm
5.2安裝 RabbitMQServer
rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
yum install rabbitmq-server-3.5.1-1.noarch.rpm
6 啟動(dòng) RabbitMQ
6.1配置為守護(hù)進(jìn)程隨系統(tǒng)自動(dòng)啟動(dòng),root 權(quán)限下執(zhí)行:
chkconfig rabbitmq-serveron
6.2啟動(dòng) rabbitMQ 服務(wù)
/sbin/service rabbitmq-serverstart
7 安裝 Web 管理界面插件
7.1安裝命令
rabbitmq-plugins enable rabbitmq_management
7.2安裝成功后會(huì)顯示如下內(nèi)容

The following plugins have been enabled:
mochiweb
webmachine
rabbitmq_web_dispatch
amqp_client
rabbitmq_management_agent
rabbitmq_management
Plugin configuration haschanged. Restart RabbitMQ forchanges totakeeffect.

8 設(shè)置 RabbitMQ 遠(yuǎn)程 ip 登錄
因?yàn)槟J(rèn)的賬號(hào)和密碼都是guest ;所以我們以創(chuàng)建個(gè)admin帳號(hào),密碼 123456 為例,創(chuàng)建一個(gè)賬號(hào)并 支持遠(yuǎn)程 ip 訪問.
8.1創(chuàng)建賬號(hào)
rabbitmqctl add_user admin 123456
8.2設(shè)置用戶角色
rabbitmqctl set_user_tags admin administrator
8.3設(shè)置用戶權(quán)限
rabbitmqctl set_permissions-p"/"oldlu".""."".*"
8.4設(shè)置完成后可以查看當(dāng)前用戶和角色(需要開啟服務(wù))
rabbitmqctl list_users
瀏覽器輸入:serverip:15672。其中 serverip 是 RabbitMQ-Server 所 在主機(jī)的 ip。

三、 為什么要使用 RabbitMQ?他解決了什么問題?

在這里插入圖片描述

在這里插入圖片描述

四、 消息隊(duì)列基礎(chǔ)知識(shí)。

1 Provider
消息生產(chǎn)者,就是投遞消息的程序。
2 Consumer
消息消費(fèi)者,就是接受消息的程序。
3 沒有使用消息隊(duì)列時(shí)消息傳遞方式

在這里插入圖片描述

4 使用消息隊(duì)列后消息傳遞方式
消息 消息

5 什么是隊(duì)列?
隊(duì)列就像存放了商品的倉庫或者商店,是生產(chǎn)商品的工廠和購買商品的用戶之間的中轉(zhuǎn)站
6 隊(duì)列里存儲(chǔ)了什么?
在 rabbitMQ 中,信息流從你的應(yīng)用程序出發(fā),來到 Rabbitmq 的隊(duì)列,所有信息可以只 存儲(chǔ)在一個(gè)隊(duì)列中。隊(duì)列可以存儲(chǔ)很多信息,因?yàn)樗旧鲜且粋€(gè)無限制的緩沖區(qū),前提是 你的機(jī)器有足夠的存儲(chǔ)空間。
7 隊(duì)列和應(yīng)用程序的關(guān)系?
多個(gè)生產(chǎn)者可以將消息發(fā)送到同一個(gè)隊(duì)列中,多個(gè)消息者也可以只從同一個(gè)隊(duì)列接收數(shù)據(jù)。

五、 編寫 RabbitMQ 的入門案例

1 搭建項(xiàng)目環(huán)境
1.1創(chuàng)建項(xiàng)目

在這里插入圖片描述

1.2修改 pom 文件添加 RabbitMQ 坐標(biāo)

<dependency> 
    <groupId>org.springframework.boot</groupId> 
    <artifactId>spring-boot-starter-amqp</artifactId> 
</dependency>

1.3修改全局配置文件,添加 RabbitMQ 相關(guān)的配置

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

2 編寫代碼
創(chuàng)建隊(duì)列

 /** 
 * 創(chuàng)建消息隊(duì)列 
 * @author Administrator 
 * */
@Configuration 
public class QueueConfig {
/** * 創(chuàng)建隊(duì)列 * @return */ 
    @Bean 
    public Queue createQueue(){  
        return new Queue("hello-queue");    
    }
}

創(chuàng)建消息提供者

/**
 * 消息發(fā)送者
 *
 * @author Administrator
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:隊(duì)列的名稱。
        //參數(shù)二:消息
        this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);
    }
}

消息接收者

@Component
public class Receiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     * @param msg
     */
    @RabbitListener(queues="hello-queue")
    public void process(String msg){
        System.out.println("receiver: "+msg);
    }
}

測(cè)試代碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes = SpringbootServerApplication.class)
public class QueueTest {

    @Autowired
    private Sender sender;

    /*
     * 測(cè)試消息隊(duì)列
     */
    @Test
    public void test1() throws Exception {
        while (true) {
            Thread.sleep(1000);
            this.sender.send("Hello RabbitMQ");
        }
    }
}

六、 RabbitMQ 原理圖

在這里插入圖片描述

1.Message
消息。消息是不具名的,它由消息頭消息體組成。消息體是不透明的,而消息頭則由 一系列可選屬性組成,這些屬性包括:routing-key(路由鍵)、priority(相對(duì)于其他消息的優(yōu)先 權(quán))、delivery-mode(指出消息可能持久性存儲(chǔ))等。
2.Publisher
消息的生產(chǎn)者。也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。
3.Consumer
消息的消費(fèi)者。表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。
4.Exchange
交換器。用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。
三種常用的交換器類型

  1. direct(發(fā)布與訂閱 完全匹配)
  2. fanout(廣播)
  3. topic(主題,規(guī)則匹配)

5.Binding
綁定。用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息 隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。
6.Queue
消息隊(duì)列。用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一 個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者鏈接到這個(gè)隊(duì)列將其取 走。
7.Routing-key
路由鍵。RabbitMQ 決定消息該投遞到哪個(gè)隊(duì)列的規(guī)則。 隊(duì)列通過路由鍵綁定到交換器。 消息發(fā)送到 MQ 服務(wù)器時(shí),消息將擁有一個(gè)路由鍵,即便是空的,RabbitMQ 也會(huì)將其 和綁定使用的路由鍵進(jìn)行匹配。 如果相匹配,消息將會(huì)投遞到該隊(duì)列。 如果不匹配,消息將會(huì)進(jìn)入黑洞。
8.Connection
鏈接。指 rabbit 服務(wù)器和服務(wù)建立的 TCP 鏈接。
9.Channel 信道。
1,Channel 中文叫做信道,是 TCP 里面的虛擬鏈接。例如:電纜相當(dāng)于 TCP,信道是 一個(gè)獨(dú)立光纖束,一條 TCP 連接上創(chuàng)建多條信道是沒有問題的。
2,TCP 一旦打開,就會(huì)創(chuàng)建 AMQP 信道。
3,無論是發(fā)布消息、接收消息、訂閱隊(duì)列,這些動(dòng)作都是通過信道完成的。 10.VirtualHost
虛擬主機(jī)。表示一批交換器,消息隊(duì)列和相關(guān)對(duì)象。虛擬主機(jī)是共享相同的身份認(rèn)證 和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有 自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在鏈接時(shí)指定, RabbitMQ 默認(rèn)的是 vhost
11.Borker
表示消息隊(duì)列服務(wù)器實(shí)體。
12.交換器和隊(duì)列的關(guān)系
交換器是通過路由鍵和隊(duì)列綁定在一起的,如果消息擁有的路由鍵隊(duì)列和交換器的 路由鍵匹配,那么消息就會(huì)被路由到該綁定的隊(duì)列中。
也就是說,消息到隊(duì)列的過程中,消息首先會(huì)經(jīng)過交換器,接下來交換器在通過路由鍵匹配分發(fā)消息到具體的隊(duì)列中。 路由鍵可以理解為匹配的規(guī)則。
13.RabbitMQ 為什么需要信道?為什么不是 TCP 直接通信?
1.TCP 的創(chuàng)建和銷毀開銷特別大。創(chuàng)建需要 3 次握手,銷毀需要 4 次分手。

  1. 如果不用信道,那應(yīng)用程序就會(huì)以 TCP 鏈接 Rabbit,高峰時(shí)每秒成千上萬條鏈接 會(huì)造成資源巨大的浪費(fèi),而且操作系統(tǒng)每秒處理 TCP 鏈接數(shù)也是有限制的,必定造成性能 瓶頸。
  2. 信道的原理是 vhost同用一條 TCP 鏈接。一條 TCP 鏈接可以容納無限的信道,即使每秒成千上萬的請(qǐng)求也不會(huì)成為性能的瓶頸。

七、 Rabbit 交換器講解

1 Direct 交換器(發(fā)布與訂閱 完全匹配)
1.1需求

在這里插入圖片描述

1.2搭建環(huán)境
1.2.1創(chuàng)建項(xiàng)目
1.2.2修改全局配置文件
修改 Consumer 的配置文件

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
設(shè)置交換器的名稱 mq.config.exchange=log.direct
info 隊(duì)列名稱 mq.config.queue.info=log.info
info 路由鍵 mq.config.queue.info.routing.key=log.info.routing.key
error 隊(duì)列名稱 mq.config.queue.error=log.error
error 路由鍵 mq.config.queue.error.routing.key=log.error.routing.key

修改 Provider 的配置文件

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=oldlu spring.rabbitmq.password=123456
設(shè)置交換器的名稱 mq.config.exchange=log.direct
info 路由鍵 mq.config.queue.info.routing.key=log.info.routing.key
error 路由鍵 mq.config.queue.error.routing.key=log.error.routing.key
error 隊(duì)列名稱 mq.config.queue.error=log.error

1.3編寫 Consumer
InfoReceiver

/**
 * 消息接收者
 *
 * @author Administrator
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info........receiver: " + msg);
    }
}

ErrorReceiver

/**
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.error.routing.key}"
        )
)
public class ErrorReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Error..........receiver: " + msg);
    }
}

1.4編寫 Provider

@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;
    
    //routingkey 路由鍵
    @Value("${mq.config.queue.error.routing.key}")
    private String routingkey;
    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:交換器名稱。
        //參數(shù)二:路由鍵
        //參數(shù)三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
        this.rabbitAmqpTemplate.convertAndSend("hello-queue", msg);
    }
}

2 Topic 交換器(主題,規(guī)則匹配)
2.1需求

在這里插入圖片描述

2.2搭建環(huán)境
2.2.1創(chuàng)建項(xiàng)目
2.2.2修改配置文件
Provider

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admib spring.rabbitmq.password=123456

設(shè)置交換器的名稱

mq.config.exchange=log.topic Consumer spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672 spring.rabbitmq.username=oldlu spring.rabbitmq.password=123456

設(shè)置交換器的名稱 mq.config.exchange=log.topic

info 隊(duì)列名稱 mq.config.queue.info=log.info

error 隊(duì)列名稱 mq.config.queue.error=log.error

log 隊(duì)列名稱 mq.config.queue.logs=log.all

2.3編寫 Provider
UserSender

@Component
public class UserSender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;

    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:交換器名稱。
        //參數(shù)二:路由鍵
        //參數(shù)三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.debug", "user.log.debug....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.info", "user.log.info....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.warn", "user.log.warn....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "user.log.error", "user.log.error....." + msg);
    }
}

ProductSender

@Component
public class ProductSender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;

    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:交換器名稱。
        //參數(shù)二:路由鍵
        //參數(shù)三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.debug", "product.log.debug....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.info", "product.log.info....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.warn", "product.log.warn....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "product.log.error", "product.log.error....." + msg);
    }
}

OrderSender

@Component
public class OrderSender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;

    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:交換器名稱。
        //參數(shù)二:路由鍵
        //參數(shù)三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.debug", "order.log.debug....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.info", "order.log.info....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.warn", "order.log.warn....." + msg);
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "order.log.error", "order.log.error....." + msg);
    }
}

2.4編寫 Consumer
InfoReceiver

/**
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.info}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.info.routing.key}"
        )
)
public class InfoReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Info........receiver: " + msg);
    }
}

ErrorReceiver

/**
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.error}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.DIRECT),
                key = "${mq.config.queue.error.routing.key}"
        )
)
public class ErrorReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Error..........receiver: " + msg);
    }
}

LogsReceiver

/**
 * 消息接收者
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器 
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列 *
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "${mq.config.queue.logs}", autoDelete = "tr ue"),
        exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.TOPIC), key = "*.log.*"))
public class LogsReceiver {
    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制 * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("......All........receiver: " + msg);
    }
}

3 Fanout 交換器(廣播)

3.1需求

在這里插入圖片描述

在這里插入圖片描述

3.2搭建環(huán)境
3.2.1創(chuàng)建項(xiàng)目
3.2.2修改配置文件

Consumer spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin spring.rabbitmq.password=123456

設(shè)置交換器的名稱

mq.config.exchange=order.fanout

短信服務(wù)隊(duì)列名稱 mq.config.queue.sms=order.sms

push 服務(wù)隊(duì)列名稱 mq.config.queue.push=order.push

Provider

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131 spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

設(shè)置交換器的名稱 mq.config.exchange=order.fanout

3.3編寫 Consumer
SmsReceiver

/**
 * 消息接收者
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * key:路由鍵
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.sms}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class SmsReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Sms........receiver: " + msg);
    }
}

PushReceiver

/**
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.push}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class PushReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("Push..........receiver: " + msg);
    }
}

3.4編寫 Provider

/**
 * 消息發(fā)送者
 */
@Component
public class Sender {

    @Autowired
    private AmqpTemplate rabbitAmqpTemplate;

    //exchange 交換器名稱
    @Value("${mq.config.exchange}")
    private String exchange;

    /*
     * 發(fā)送消息的方法
     */
    public void send(String msg) {
        //向消息隊(duì)列發(fā)送消息
        //參數(shù)一:交換器名稱。
        //參數(shù)二:路由鍵
        //參數(shù)三:消息
        this.rabbitAmqpTemplate.convertAndSend(this.exchange, "", msg);
    }
}

八、 使用 RabbitMQ 實(shí)現(xiàn)松耦合設(shè)計(jì)
1 需求

在這里插入圖片描述

2 搭建環(huán)境
2.1修改配置文件

spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

設(shè)置交換器的名稱

mq.config.exchange=order.fanout

短信服務(wù)隊(duì)列名稱

mq.config.queue.sms=order.sms

push 服務(wù)隊(duì)列名稱

mq.config.queue.push=order.push

紅包服務(wù)隊(duì)列名稱

mq.config.queue.red=red

2.2添加 RedReceiver

/**
 * @RabbitListener bindings:綁定隊(duì)列
 * @QueueBinding value:綁定隊(duì)列的名稱
 * exchange:配置交換器
 * key:路由鍵
 * @Queue value:配置隊(duì)列名稱
 * autoDelete:是否是一個(gè)可刪除的臨時(shí)隊(duì)列
 * @Exchange value:為交換器起個(gè)名稱
 * type:指定具體的交換器類型
 */
@Component
@RabbitListener(
        bindings = @QueueBinding(
                value = @Queue(value = "${mq.config.queue.red}", autoDelete = "true"),
                exchange = @Exchange(value = "${mq.config.exchange}", type = ExchangeTypes.FANOUT)
        )
)
public class RedReceiver {

    /**
     * 接收消息的方法。采用消息隊(duì)列監(jiān)聽機(jī)制
     *
     * @param msg
     */
    @RabbitHandler
    public void process(String msg) {
        System.out.println("給用戶發(fā)送10元紅包........receiver: " + msg);
    }
}

九、 RabbitMQ 消息處理
1 RabbitMQ 的消息持久化處理
消息的可靠性是 RabbitMQ 的一大特色,那么 RabbitMQ 是如何保證消息可 靠性的呢——消息持久化。
1.1創(chuàng)建項(xiàng)目
1.2autoDelete屬性
@Queue: 當(dāng)所有消費(fèi)客戶端連接斷開后,是否自動(dòng)刪除 隊(duì)列 ;true:刪除 false:不刪除
@Exchange:當(dāng)所有綁定隊(duì)列都不在使用時(shí),是否自動(dòng) 刪除交換器 true:刪除 false:不刪除
2 RabbitMQ 中的消息確認(rèn) ACK 機(jī)制

在這里插入圖片描述

2.1創(chuàng)建項(xiàng)目
修改 Consusmer 配置文件解決 ACK 反饋問題

spring.application.name=springcloud-mq
spring.rabbitmq.host=192.168.70.131
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456

設(shè)置交換器的名稱

mq.config.exchange=log.direct

info 隊(duì)列名稱

mq.config.queue.info=log.info

info 路由鍵

mq.config.queue.info.routing.key=log.info.routing.key

error 隊(duì)列名稱

mq.config.queue.error=log.error

error 路由鍵

mq.config.queue.error.routing.key=log.error.routing.key

開啟重試

spring.rabbitmq.listener.retry.enabled=true

重試次數(shù),默認(rèn)為 3 次

spring.rabbitmq.listener.retry.max-attempts=5

參考書籍:RabbitMQ實(shí)戰(zhàn)指南
總結(jié):這本書籍寫的是真的好,這也算是自己一個(gè)自我的總結(jié)吧,里面附有代碼,精心整理,請(qǐng)大家要好好閱讀,如有發(fā)現(xiàn)錯(cuò)誤,還請(qǐng)大家斧正.文章原創(chuàng),支持轉(zhuǎn)載.侵權(quán)必究.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容