Spring Boot整合RabbitMQ(延遲隊(duì)列)

1、什么是RabbitMQ?

消息隊(duì)列,主要是用來(lái)實(shí)現(xiàn)應(yīng)用程序的異步和解耦,同時(shí)也能起到消息緩沖,消息分發(fā)的作用。實(shí)現(xiàn)AMQP(高級(jí)消息隊(duì)列協(xié)議)。當(dāng)生產(chǎn)者大量產(chǎn)生數(shù)據(jù)時(shí),消費(fèi)者無(wú)法快速消費(fèi),那么需要一個(gè)中間層。保存這個(gè)數(shù)據(jù)。服務(wù)器端用Erlang語(yǔ)言編寫,支持多種客戶端。

2、什么是AMQP?

即Advanced Message Queuing Protocol,高級(jí)消息隊(duì)列協(xié)議,是應(yīng)用層協(xié)議的一個(gè)開(kāi)放標(biāo)準(zhǔn),為面向消息的中間件設(shè)計(jì)。消息中間件主要用于組件之間的解耦,消息的發(fā)送者無(wú)需知道消息使用者的存在,反之亦然。AMQP的主要特征是面向消息、隊(duì)列、路由(包括點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱)、可靠性、安全。

3、相關(guān)概念

發(fā)消息者(生產(chǎn)者)、隊(duì)列、收消息者(消費(fèi)者),RabbitMQ 在這個(gè)基本概念之上, 多做了一層抽象, 在發(fā)消息者和 隊(duì)列之間, 加入了交換器 (Exchange). 這樣發(fā)消息者和隊(duì)列就沒(méi)有直接聯(lián)系, 轉(zhuǎn)而變成發(fā)消息者把消息給交換器, 交換器根據(jù)調(diào)度策略再把消息再給隊(duì)列。


image.png
  • 左側(cè) P 代表 生產(chǎn)者,也就是往 RabbitMQ 發(fā)消息的程序。
  • 中間即是 RabbitMQ,其中包括了 交換機(jī) 和 隊(duì)列。
  • 右側(cè) C 代表 消費(fèi)者,也就是往 RabbitMQ 拿消息的程序。

4、交換機(jī)(Exchange)四種類型:Direct、topic、Headers、Fanout

  • Direct:direct 類型的行為是"先匹配, 再投送". 即在綁定時(shí)設(shè)定一個(gè) routing_key, 消息的routing_key 匹配時(shí), 才會(huì)被交換器投送到綁定的隊(duì)列中去.
  • Topic:按規(guī)則轉(zhuǎn)發(fā)消息(最靈活)
  • Headers:設(shè)置header attribute參數(shù)類型的交換機(jī)
  • Fanout:轉(zhuǎn)發(fā)消息到所有綁定隊(duì)列

5、代碼示例

pom文件添加依賴
<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
消息生產(chǎn)者(以下各個(gè)類型共用此消息發(fā)送者)
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

@Component
public class ProducerService {
 
    @Autowired
    private AmqpTemplate rabbitTemplate;
 
    public void send() {
        String context = "hello " + new Date();
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("task_queue", context);
    }

    public void topicSend1() {
        String context = "hi, i am message 1";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.message", context);
    }

    public void topicSend2() {
        String context = "hi, i am messages 2";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("exchange", "topic.messages", context);
    }

    public void fanoutsend() {
        String context = "hi, fanout msg ";
        System.out.println("Sender : " + context);
        this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
    }
 
}

1、Direct類型

RabbitConfig
package com.gizhi.beam.modular.amqp.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
 
    @Bean
    public Queue Queue() {
        return new Queue("task_queue");
    }
 
}

消費(fèi)者
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "task_queue")
public class CustomerService {
 
    @RabbitHandler
    public void process(String hello) {
        System.out.println("Receiver  : " + hello);
    }
 
}

2、Topic類型

TopicRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//topic 是RabbitMQ中最靈活的一種方式,可以根據(jù)routing_key自由的綁定不同的隊(duì)列
@Configuration
public class TopicRabbitConfig {

    final static String message = "topic.message";
    final static String messages = "topic.messages";

    @Bean
    public Queue queueMessage() {
        return new Queue(TopicRabbitConfig.message);
    }

    @Bean
    public Queue queueMessages() {
        return new Queue(TopicRabbitConfig.messages);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange("exchange");
    }

    @Bean
    Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
        return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
    }

    @Bean
    Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
        // #:相當(dāng)于一個(gè)或者多個(gè)單詞,例如一個(gè)匹配模式是topic.#,那么,以topic開(kāi)頭的路由鍵都是可以的
        // *:相當(dāng)于一個(gè)單詞,例如一個(gè)匹配模式是topic.*,那么,以topic開(kāi)頭的路由鍵,后面接一個(gè)單詞的都可以
        return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
    }
}
消費(fèi)者
@Component
public class TopicCustomerService {

    @RabbitListener(queues = "topic.message")
    public void message(String msg) {
        System.out.println("message  : " + msg);
    }
    @RabbitListener(queues = "topic.messages")
    public void messages(String msg) {
        System.out.println("messages  : " + msg);
    }

 
}

3、Fanout類型

FanoutRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
//Fanout 就是我們熟悉的廣播模式或者訂閱模式,給Fanout交換機(jī)發(fā)送消息,綁定了這個(gè)交換機(jī)的所有隊(duì)列都收到這個(gè)消息。

@Configuration
public class FanoutRabbitConfig {
    @Bean
    public Queue AMessage() {
        return new Queue("fanout.A");
    }
 
    @Bean
    public Queue BMessage() {
        return new Queue("fanout.B");
    }
 
    @Bean
    public Queue CMessage() {
        return new Queue("fanout.C");
    }
 
    @Bean
    FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanoutExchange");
    }
 
    @Bean
    Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(AMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(BMessage).to(fanoutExchange);
    }
 
    @Bean
    Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
        return BindingBuilder.bind(CMessage).to(fanoutExchange);
    }
 
}
消費(fèi)者
package com.gizhi.beam.modular.amqp.service;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class FanoutCustomerAService {

    @RabbitListener(queues = "fanout.A")
    public void fanoutA(String msg) {
        System.out.println("A  : " + msg);
    }

    @RabbitListener(queues = "fanout.B")
    public void fanoutB(String msg) {
        System.out.println("B  : " + msg);
    }

    @RabbitListener(queues = "fanout.C")
    public void fanoutC(String msg) {
        System.out.println("C  : " + msg);
    }
 
}

4、延遲隊(duì)列

AMQP協(xié)議和RabbitMQ隊(duì)列本身沒(méi)有直接支持延遲隊(duì)列功能,但是我們可以通過(guò)RabbitMQ的兩個(gè)特性來(lái)曲線實(shí)現(xiàn)延遲隊(duì)列:
  • Time To Live(TTL)
  • Dead Letter Exchanges(DLX)
原理:

RabbitMQ可以針對(duì)Queue設(shè)置x-expires 或者 針對(duì)Message設(shè)置 x-message-ttl,來(lái)控制消息的生存時(shí)間,如果超時(shí)(兩者同時(shí)設(shè)置以最先到期的時(shí)間為準(zhǔn)),則消息變?yōu)閐ead letter(死信)。RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。
x-dead-letter-exchange:出現(xiàn)dead letter之后將dead letter重新發(fā)送到指定exchange
x-dead-letter-routing-key:出現(xiàn)dead letter之后將dead letter重新按照指定的routing-key發(fā)送。


image.png
DelayRabbitConfig
package com.gizhi.beam.modular.amqp.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 
import java.util.HashMap;
import java.util.Map;
 
@Configuration
@Slf4j
public class DelayRabbitConfig {
 
 
    /**
     * 延遲隊(duì)列 TTL 名稱
     */
    private static final String ORDER_DELAY_QUEUE = "user.order.delay.queue";
    /**
     * DLX,dead letter發(fā)送到的 exchange
     * 延時(shí)消息就是發(fā)送到該交換機(jī)的
     */
    public static final String ORDER_DELAY_EXCHANGE = "user.order.delay.exchange";
    /**
     * routing key 名稱
     * 具體消息發(fā)送在該 routingKey 的
     */
    public static final String ORDER_DELAY_ROUTING_KEY = "order_delay";
 
    public static final String ORDER_QUEUE_NAME = "user.order.queue";
    public static final String ORDER_EXCHANGE_NAME = "user.order.exchange";
    public static final String ORDER_ROUTING_KEY = "order";
 
    /**
     * 延遲隊(duì)列配置
     * <p>
     * 1、params.put("x-message-ttl", 5 * 1000);
     * 第一種方式是直接設(shè)置 Queue 延遲時(shí)間 但如果直接給隊(duì)列設(shè)置過(guò)期時(shí)間,這種做法不是很靈活,(當(dāng)然二者是兼容的,默認(rèn)是時(shí)間小的優(yōu)先)
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * 第二種就是每次發(fā)送消息動(dòng)態(tài)設(shè)置延遲時(shí)間,這樣我們可以靈活控制
     **/
    @Bean
    public Queue delayOrderQueue() {
        Map<String, Object> params = new HashMap<>();
        // x-dead-letter-exchange 聲明了隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱,
        params.put("x-dead-letter-exchange", ORDER_EXCHANGE_NAME);
        // x-dead-letter-routing-key 聲明了這些死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱。
        params.put("x-dead-letter-routing-key", ORDER_ROUTING_KEY);
        return new Queue(ORDER_DELAY_QUEUE, true, false, false, params);
    }
    /**
     * 需要將一個(gè)隊(duì)列綁定到交換機(jī)上,要求該消息與一個(gè)特定的路由鍵完全匹配。
     * 這是一個(gè)完整的匹配。如果一個(gè)隊(duì)列綁定到該交換機(jī)上要求路由鍵 “dog”,則只有被標(biāo)記為“dog”的消息才被轉(zhuǎn)發(fā),
     * 不會(huì)轉(zhuǎn)發(fā)dog.puppy,也不會(huì)轉(zhuǎn)發(fā)dog.guard,只會(huì)轉(zhuǎn)發(fā)dog。
     * @return DirectExchange
     */
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange(ORDER_DELAY_EXCHANGE);
    }
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayOrderQueue()).to(orderDelayExchange()).with(ORDER_DELAY_ROUTING_KEY);
    }
 
    @Bean
    public Queue orderQueue() {
        return new Queue(ORDER_QUEUE_NAME, true);
    }
    /**
     * 將路由鍵和某模式進(jìn)行匹配。此時(shí)隊(duì)列需要綁定要一個(gè)模式上。
     * 符號(hào)“#”匹配一個(gè)或多個(gè)詞,符號(hào)“*”匹配不多不少一個(gè)詞。因此“audit.#”能夠匹配到“audit.irs.corporate”,但是“audit.*” 只會(huì)匹配到“audit.irs”。
     **/
    @Bean
    public TopicExchange orderTopicExchange() {
        return new TopicExchange(ORDER_EXCHANGE_NAME);
    }
 
    @Bean
    public Binding orderBinding() {
        // TODO 如果要讓延遲隊(duì)列之間有關(guān)聯(lián),這里的 routingKey 和 綁定的交換機(jī)很關(guān)鍵
        return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(ORDER_ROUTING_KEY);
    }
 
}

生產(chǎn)者DelaySender
package com.gizhi.beam.modular.amqp.service;

import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
@Slf4j
public class DelaySender {
 
    @Autowired
    private AmqpTemplate amqpTemplate;
 
    public void sendDelay(Order order) {
        log.info("【訂單生成時(shí)間】" + new Date().toString() +"【1分鐘后檢查訂單是否已經(jīng)支付】" + order.toString() );
        this.amqpTemplate.convertAndSend(DelayRabbitConfig.ORDER_DELAY_EXCHANGE, DelayRabbitConfig.ORDER_DELAY_ROUTING_KEY, order, message -> {
            // 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么這一句也可以省略,具體根據(jù)業(yè)務(wù)需要是聲明 Queue 的時(shí)候就指定好延遲時(shí)間還是在發(fā)送自己控制時(shí)間
            message.getMessageProperties().setExpiration(1 * 1000 * 60 + "");
            return message;
        });
    }
}

消費(fèi)者DelayReceiver
package com.gizhi.beam.modular.amqp.service;

import com.gizhi.beam.modular.amqp.config.DelayRabbitConfig;
import com.gizhi.beam.modular.amqp.dto.Order;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
 
import java.util.Date;
 
@Component
@Slf4j
public class DelayReceiver {
 
    @RabbitListener(queues = {DelayRabbitConfig.ORDER_QUEUE_NAME})
    public void orderDelayQueue(Order order, Message message, Channel channel) {
        log.info("###########################################");
        log.info("【orderDelayQueue 監(jiān)聽(tīng)的消息】 - 【消費(fèi)時(shí)間】 - [{}]- 【訂單內(nèi)容】 - [{}]",  new Date(), order.toString());
        if(order.getOrderStatus() == 0) {
            order.setOrderStatus(2);
            log.info("【該訂單未支付,取消訂單】" + order.toString());
        } else if(order.getOrderStatus() == 1) {
            log.info("【該訂單已完成支付】");
        } else if(order.getOrderStatus() == 2) {
            log.info("【該訂單已取消】");
        }
        log.info("###########################################");
    }
}

5、源碼地址:https://gitee.com/hsshy/beam-example

6、關(guān)注我的公眾號(hào),互相學(xué)習(xí)。

image.png

參考鏈接

https://blog.csdn.net/ztx114/article/details/78410727
https://blog.csdn.net/woaitingting1985/article/details/79087357
https://blog.csdn.net/lizc_lizc/article/details/80722763
http://www.itdecent.cn/p/911d987b5f11
https://www.cnblogs.com/cag2050/p/7724055.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容