SpringBoot整合RabbitMQ

SpringBoot整合RabbitMQ

引入依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

開(kāi)啟注解

@EnableRabbit

配置文件rabbitMQ

spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

配置config

@Configuration
public class MyRabbitConfig {
    // 配置使用json的方式序列化對(duì)象
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

創(chuàng)建交換機(jī)

// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
// 創(chuàng)建交換機(jī)
// 交換機(jī)名稱 是否持久化 是否自動(dòng)刪除 相關(guān)參數(shù)
// String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
DirectExchange directExchange = new DirectExchange("hello-java-exchange", true, false, null);
amqpAdmin.declareExchange(directExchange);

創(chuàng)建隊(duì)列

// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;
// 創(chuàng)建隊(duì)列
// 隊(duì)列名稱 是否持久化 是否只能連接一個(gè)交換機(jī) 是否自動(dòng)刪除 相關(guān)參數(shù)
// String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
Queue queue = new Queue("hello-java-queue", true, false, false, null);
amqpAdmin.declareQueue(queue);

綁定交換機(jī)和隊(duì)列

// 注入amqpAdmin
@Autowired
AmqpAdmin amqpAdmin;       
// 綁定交換機(jī)和隊(duì)列
// 將exchange指定的交換機(jī)和destination目的地進(jìn)行綁定(綁定隊(duì)列也可以綁定交換機(jī))使用routingKey作為指定的路由鍵
// String destination, Binding.DestinationType destinationType, String exchange, String routingKey, Map<String, Object> arguments
Binding binding = new Binding("hello-java-queue", Binding.DestinationType.QUEUE, "hello-java-exchange", "hello.java", null);
amqpAdmin.declareBinding(binding);

發(fā)送消息

        // 發(fā)送消息
        // 交換機(jī) 路由鍵 消息
        // String exchange, String routingKey, Object object
        rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", "hello world");

接收/監(jiān)聽(tīng)消息

  1. 使用@RabbitListener,要想使用必須開(kāi)啟@EnableRabbit
    // queues:要監(jiān)聽(tīng)的隊(duì)列數(shù)組
    // 監(jiān)聽(tīng)的內(nèi)容會(huì)自動(dòng)封裝到方法的參數(shù)上
    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage(Object message) {
        System.out.println("message: " + message.toString());
        System.out.println("類型:" + message.getClass());
        System.out.println("接收到消息...內(nèi)容...");
    }

// 接收監(jiān)聽(tīng)到的消息
message: (Body:'{"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1618122170115}' MessageProperties [headers={__TypeId__=com.atguigu.gulimall.order.entity.OrderReturnReasonEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-0TUO4qdSSCcnsHUQ9XZyzg, consumerQueue=hello-java-queue])
類型:class org.springframework.amqp.core.Message
接收到消息...內(nèi)容...

由監(jiān)聽(tīng)的消息類型可以看出為class org.springframework.amqp.core.Message類型,因此可以改造參數(shù)類型

  1. Message message:原生消息詳細(xì)信息,頭+體
  2. T<發(fā)送的消息的類型>:OrderReturnReasonEntity content
  3. Channel channel:當(dāng)前傳輸數(shù)據(jù)的通道
    @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage(Message message, ContentEntity content, Channel channel) {
        // 獲取消息體,即json的數(shù)據(jù) {"id":1,"name":"哈哈","sort":null,"status":null,"createTime":1618122170115}
        byte[] body = message.getBody();
        // 獲取消息頭信息 即
        // [headers={__TypeId__=com.atguigu.gulimall.order.entity.ContentEntity}, contentType=application/json, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=hello-java-exchange, receivedRoutingKey=hello.java, deliveryTag=1, consumerTag=amq.ctag-0TUO4qdSSCcnsHUQ9XZyzg, consumerQueue=hello-java-queue])
        MessageProperties messageProperties = message.getMessageProperties();
        System.out.println("message: " + message.toString());
        System.out.println("類型:" + message.getClass());
        System.out.println("轉(zhuǎn)化后消息...內(nèi)容...");
        System.out.println("content: " + content.toString());
    }

Queue隊(duì)列:可以很多人都來(lái)監(jiān)聽(tīng)。只要收到消息,隊(duì)列刪除消息,并且只能同時(shí)用且只有一個(gè)人接收到此消息

  1. 同一個(gè)消息,只能被一個(gè)客戶端收到
  2. 只有當(dāng)前消息隊(duì)列處理完成,才可以接收下一個(gè)消息隊(duì)列

使用場(chǎng)景

  1. @RabbitHandler: 標(biāo)記方法

  2. @RabbitListener:類+方法

    使用@RabbitListener標(biāo)記在類上(重載區(qū)分不同的消息),說(shuō)明這個(gè)類就是用來(lái)接受消息隊(duì)列的方法類,在該類下的所以方法上標(biāo)記@RabbitHandler,每個(gè)方法指定不同的接收參數(shù),這樣就可以接收不同類型的消息

@RabbitListener
class receive {
    @RabbitHandler(queues = {"hello-java-queue"})
    public void receiveMessage1(Entity1 content) {
        System.out.println("content1: " + content.toString());
    }

    @RabbitHandler(queues = {"hello-java-queue"})
    public void receiveMessage2(Entity2 content) {
        System.out.println("content2: " + content.toString());
    }
}

RabbitMQ消息確認(rèn)機(jī)制-可靠抵達(dá)

  • 保證消息不丟失,可靠抵達(dá),可以使用事務(wù)消息,性能下降250倍,為此引入確認(rèn)機(jī)制
  • publisher confirmCallback 確認(rèn)模式(觸發(fā)時(shí)機(jī):服務(wù)端將消息發(fā)送給RabbitMQ所在的服務(wù)器)
  • publisher returnCallback 未投遞到queue退出模式(觸發(fā)時(shí)機(jī):RabbitmQ所在的服務(wù)器調(diào)用交換機(jī)投遞給對(duì)應(yīng)隊(duì)列)
  • consumer ack機(jī)制(觸發(fā)機(jī)制:消費(fèi)端成功獲取到消息隊(duì)列的消息)

可靠抵達(dá)-服務(wù)端確認(rèn)(confirmCallback 、returnCallback )

  1. 開(kāi)啟發(fā)送端確認(rèn)
# 開(kāi)啟發(fā)送端確認(rèn)
spring.rabbitmq.publisher-confirms=true
  1. 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
# 開(kāi)啟發(fā)送端消息抵達(dá)隊(duì)列的確認(rèn)
spring.rabbitmq.publisher-returns=true
# 只要發(fā)送端消息抵達(dá)隊(duì)列,以異步方式優(yōu)先回調(diào)這個(gè)returnConfirm(綁定一起使用)
spring.rabbitmq.template.mandatory=true
  1. 定制RabbitTemplate自定義confirmCallback 、returnCallback 觸發(fā)方法
    /**
     * 定制RabbitTemplate
     * 1. MQ服務(wù)器收到消息就回調(diào)
     *      1. spring.rabbitmq.publisher-confirms=true
     *      2. 設(shè)置回調(diào)確認(rèn)confirmCallback 
     * 2. 消息正確抵達(dá)隊(duì)列進(jìn)行回調(diào)
     *      1. spring.rabbitmq.publisher-returns=true
     *      2. spring.rabbitmq.template.mandatory=true
     *      3. 設(shè)置回調(diào)確認(rèn)returnCallback 
     */
    // PostConstruct: 當(dāng)MyRabbitConfig對(duì)象創(chuàng)建完再執(zhí)行該方法
    @PostConstruct
    public void initRabbitTemplate() {
        // 設(shè)置MQ服務(wù)器收到消息回調(diào)
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            /**
             * 只要消息抵達(dá)MQ服務(wù)器ack就為true
             * @param correlationData:當(dāng)前消息的唯一關(guān)聯(lián)數(shù)據(jù)(這個(gè)是消息的唯一id)即發(fā)送時(shí)傳的CorrelationData參數(shù)
             * @param b:ack,消息是否成功還是失敗
             * @param s:失敗的原因
             */
            @Override
            public void confirm(CorrelationData correlationData, boolean b, String s) {
                System.out.println("correlationData: " + correlationData);
                System.out.println("ack: " + b);
                System.out.println("s: " + s);
            }
        });
        // 設(shè)置消息抵達(dá)隊(duì)列回調(diào)
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            /**
             * 只要消息沒(méi)有投遞給指定的隊(duì)列,就觸發(fā)這個(gè)失敗回調(diào)
             * @param message:投遞失敗的消息詳細(xì)信息
             * @param i:回復(fù)的狀態(tài)碼
             * @param s:回復(fù)的文本內(nèi)容
             * @param s1:當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)交換機(jī)
             * @param s2:當(dāng)時(shí)這個(gè)消息發(fā)送給哪個(gè)路由鍵
             */
            @Override
            public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                System.out.println("fail message: " + message);
                System.out.println("i: " + i);
                System.out.println("s: " + s);
                System.out.println("s1: " + s1);
                System.out.println("s2: " + s2);
            }
        });
    }

可靠抵達(dá)-消費(fèi)端確認(rèn)(ack)

保證每個(gè)消息被正確消費(fèi),此時(shí)才可以MQ刪除這個(gè)消息

  1. basic.ack 用于肯定確認(rèn);MQ服務(wù)器會(huì)移除此消息
  2. basic.nack用于否定確認(rèn);可以指定MQ服務(wù)器是否丟棄此消息,可以批量
  3. basic.reject用于否定確認(rèn);跟nack使用一樣,但是不能批量
  4. 默認(rèn)是自動(dòng)ack,只要消息接收到,客戶端會(huì)自動(dòng)確認(rèn),服務(wù)端就會(huì)移除這個(gè)消息,如果客戶端在處理消息時(shí)候宕機(jī)則會(huì)丟失消息,因此要手動(dòng)確認(rèn),保證消息不丟失。當(dāng)客戶端宕機(jī)后,消息會(huì)從unacked狀態(tài)變成ready狀態(tài),當(dāng)下一次新的客戶端連接進(jìn)來(lái)再將消息重新發(fā)送給客戶端
# 設(shè)置客戶端手動(dòng)確認(rèn)接受到消息
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  @RabbitListener(queues = {"hello-java-queue"})
    public void receiveMessage1(Message message, Content content, Channel channel) {
        System.out.println("content1: " + content.toString());
        // 通道內(nèi)按順序自增
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            // 確認(rèn)消息接收成功,非批量簽收模式
            // long deliveryTag, boolean multipe (當(dāng)前消息的標(biāo)簽,是否批量簽收)
            channel.basicAck(deliveryTag, false);
            // 消息接收成功,但是拒絕簽收消息
            // long deliveryTag, boolean multipe, boolean requeue (當(dāng)前消息的標(biāo)簽,是否批量簽收,是否重新入隊(duì)(false丟掉消息,true將消息重新入隊(duì)))
            channel.basicNack(deliveryTag,false,false);
        } catch (IOException e) {
            // 網(wǎng)絡(luò)中斷
        }
    }
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容