RabbitMQ的動(dòng)態(tài)創(chuàng)建交換機(jī)、隊(duì)列、綁定、死信隊(duì)列,延遲隊(duì)列代碼實(shí)現(xiàn)

?---實(shí)踐是檢驗(yàn)真理的唯一標(biāo)準(zhǔn)---

yml參數(shù)配置

這次我使用的是RabbitTemplate

rabbitmq:
    host: 192.168.225.136
    port: 5672
    username: thinking
    password: 123
    virtual-host: host1
    publisher-returns: true
    # 事務(wù)模式下這行需要?jiǎng)h除
    publisher-confirm-type: correlated
    template:
      # 找不到路由規(guī)則的消息 是否保留
      mandatory: true

為什么Template不需要定義configuration文件來(lái)接收yml文件的參數(shù)?

這是個(gè)常識(shí)問(wèn)題,我這里做個(gè)記錄。。。

我都能忘記昨天吃了東西的,好在我喜歡做筆記。。。

springboot中何時(shí)加載Template,可以仔細(xì)看看自動(dòng)裝配注解:EnableAutoConfiguration
這類Template模板的初始化有個(gè)Properties文件,不如:
RabbitProperties
RedisProperties
方法中注解:ConfigurationProperties  指定了默認(rèn)取得yml格式內(nèi)容
    至于具體的屬性可以找set方法

我們的demo都是基于RabbitTemplate來(lái)寫(xiě)。。。

初始化數(shù)據(jù)

通過(guò)枚舉ExchangeEnum、QueueEnum、BindingEnum動(dòng)態(tài)維護(hù)和創(chuàng)建
1.初始化交換機(jī)

@Bean("createExchange")
public Object createExchange(RabbitAdmin rabbitAdmin) {
    // 遍歷交換機(jī)枚舉
    ExchangeEnum.toList().forEach(exchangeEnum -> {
        // 根據(jù)交換機(jī)模式 生成不同的交換機(jī)
        switch (exchangeEnum.getType()) {
            case fanout:
                rabbitAdmin.declareExchange(new FanoutExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case topic:
                rabbitAdmin.declareExchange(new TopicExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
            case direct:
                rabbitAdmin.declareExchange(new DirectExchange(exchangeEnum.getExchangeName(),
                        exchangeEnum.isDurable(), exchangeEnum.isAutoDelete()));
                break;
        }
    });
    return null;
}

2.初始化隊(duì)列

@Bean("createQueue")
public Object createQueue(RabbitAdmin rabbitAdmin) {
    // 遍歷隊(duì)列枚舉 將隊(duì)列注冊(cè)到spring bean工廠 讓spring實(shí)現(xiàn)隊(duì)列的管理
    QueueEnum.toList().forEach(queueEnum -> {
        rabbitAdmin.declareQueue(new Queue(queueEnum.getName(),
                queueEnum.isDurable(), queueEnum.isExclusive(), queueEnum.isAutoDelete(), queueEnum.getArguments()));
    });
    return null;
}

3.交換機(jī)和隊(duì)列綁定

@Bean("createBinding")
public Object createBinding(RabbitAdmin rabbitAdmin) {
    // 遍歷隊(duì)列枚舉 將隊(duì)列綁定到指定交換機(jī)
    BindingEnum.toList().forEach(bindingEnum -> {
        // 交換機(jī)
        ExchangeEnum exchangeEnum = bindingEnum.getExchangeEnum();
        // queue
        QueueEnum queueEnum = bindingEnum.getQueueEnum();
        // 綁定
        rabbitAdmin.declareBinding(new Binding(
                // queue名稱
                queueEnum.getName(),
                Binding.DestinationType.QUEUE,
                // exchange名稱
                exchangeEnum.getExchangeName(),
                // queue的routingKey
                queueEnum.getRoutingKey(),
                // 綁定的參數(shù)
                bindingEnum.getArguments()));
    });
    return null;
}

延遲隊(duì)列

1.定義隊(duì)列

/**
 * 超時(shí)隊(duì)列---不需要定義RabbitListener方法
 */
deal_queue("deal_queue", "deal.queue", true, false, false, dealParams()),
/**
 * 超時(shí)接收隊(duì)列
 */
reply_queue("reply_queue", "reply.queue", true, false, false, null),
public static Map<String, Object> dealParams(){
      // reply_to 隊(duì)列
      Map<String,Object> map = new HashMap<>();
      //設(shè)置消息的過(guò)期時(shí)間 單位毫秒
      map.put("x-message-ttl",10000);
      //設(shè)置附帶的死信交換機(jī)
      map.put("x-dead-letter-exchange","reply_exchange");
      //指定重定向的路由建 消息作廢之后可以決定需不需要更改他的路由建 如果需要 就在這里指定
      map.put("x-dead-letter-routing-key","reply.queue");
      return map;
  }

2.定義交換機(jī)

/**
 * 超時(shí)交換機(jī)
 */
deal_exchange("deal_exchange", ExchangeTypeEnum.topic, true, false),
/**
 * 超時(shí)接收交換機(jī)
 */
reply_exchange("reply_exchange", ExchangeTypeEnum.topic, true, false),

3.交換機(jī)和隊(duì)列綁定

deal_binding(ExchangeEnum.deal_exchange, QueueEnum.deal_queue, null),
reply_binding(ExchangeEnum.reply_exchange, QueueEnum.reply_queue, null)

4.不定義超時(shí)隊(duì)列的@RabbitListener,只定義超時(shí)接收隊(duì)列的@RabbitListener

@RabbitListener(queues = {"reply_queue"})
@RabbitHandler
public void reply_queue(Message message, Channel channel) throws Exception {
    System.err.println("消費(fèi)端-reply: " + new String(message.getBody(), "UTF-8"));
    Long deliveryTag = message.getMessageProperties().getDeliveryTag();
    channel.basicAck(deliveryTag, false);
}

測(cè)試:

/**
 * 延遲隊(duì)列測(cè)試
 */
public void deal_queue_test() {
    ExchangeEnum exchangeEnum = BindingEnum.deal_binding.getExchangeEnum();
    QueueEnum queueEnum = BindingEnum.deal_binding.getQueueEnum();
    // 消息
    String message = "11111111111111111111111111111111111111";
    MessageProperties messageProperties = getMessageProperties();
    // 發(fā)送
    rabbitTemplate.convertSendAndReceive(
            exchangeEnum.getExchangeName(),
            queueEnum.getRoutingKey(),
            new Message(message.getBytes(), messageProperties));
}

異步隊(duì)列

1.AsyncRabbitTemplate定義

/**
 * 異步隊(duì)列
 * @param rabbitTemplate
 * @return
 */
@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(RabbitTemplate rabbitTemplate){
    AsyncRabbitTemplate asyncRabbitTemplate = new AsyncRabbitTemplate(rabbitTemplate);
    asyncRabbitTemplate.setReceiveTimeout(50000);
    return asyncRabbitTemplate;
}

2.測(cè)試

public void async() {
    System.err.println("---------------async--------------start---------");
    AsyncRabbitTemplate.RabbitConverterFuture<Object> future = asyncRabbitTemplate.convertSendAndReceive("reply_exchange", "reply.queue", "123123123");
    // 配置下面代碼時(shí) 如果 隊(duì)列監(jiān)聽(tīng)中沒(méi)有返回值時(shí)會(huì)報(bào)錯(cuò)
    future.addCallback(new ListenableFutureCallback<Object>() {
        @Override
        public void onFailure(Throwable ex) {
            ex.printStackTrace();
        }
        @Override
        public void onSuccess(Object result) {
            System.out.println("回調(diào)收到結(jié)果=> " + result);
        }
    });
    System.err.println("---------------async--------------end---------");
}

3.監(jiān)聽(tīng)方法

@RabbitListener(queues = {"async_queue"})
@RabbitHandler
public Object async_queue(Message message, Channel channel) throws Exception {
    System.err.println("消費(fèi)端-async: " + new String(message.getBody(), "UTF-8"));
    return "ok";
}

Java api

1.消息回退:

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

long deliveryTag:消息唯一標(biāo)識(shí),這是RabbitMQ自動(dòng)生成的,不需要人為管理,只需要從message.getMessageProperties().getDeliveryTag() 就可以獲得。
boolean multiple:是否批量退回,不開(kāi)啟就使用false,開(kāi)啟批量退回需要增加自己的業(yè)務(wù)判斷邏輯(比如:攢夠幾條再批量回退,或者設(shè)置等待間隔等等)
boolean requeue:是否退回到消息隊(duì)列,退回就使用true,就是交給其他消費(fèi)者處理。

2.拒絕消息

void basicReject(long deliveryTag, boolean requeue) throws IOException;

deliveryTag:發(fā)布的每一條消息都會(huì)獲得一個(gè)唯一的deliveryTag,它在channel范圍內(nèi)是唯一的
requeue:表示如何處理這條消息,為true表示重新放入RabbitMQ的發(fā)送隊(duì)列中,為false表示通知RabbitMQ銷毀該消息

3.確認(rèn)ack

void basicAck(long deliveryTag, boolean multiple) throws IOException;

deliveryTag:該消息的index
multiple:是否批量.true:將一次性ack所有小于deliveryTag的消息。

4.創(chuàng)建一個(gè)隊(duì)列

Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
?
durable:true、false true:在服務(wù)器重啟時(shí),能夠存活
exclusive :是否為當(dāng)前連接的專用隊(duì)列,在連接斷開(kāi)后,會(huì)自動(dòng)刪除該隊(duì)列,生產(chǎn)環(huán)境中應(yīng)該很少用到吧。
autodelete:當(dāng)沒(méi)有任何消費(fèi)者使用時(shí),自動(dòng)刪除該隊(duì)列

5.啟動(dòng)一個(gè)消費(fèi)者,并返回服務(wù)端生成的消費(fèi)者標(biāo)識(shí)

/**
 * queue:隊(duì)列名
 * autoAck:true 接收到傳遞過(guò)來(lái)的消息后acknowledged(應(yīng)答服務(wù)器),false 接收到消息后不應(yīng)答服務(wù)器
 * consumerTag:客戶端生成的一個(gè)消費(fèi)者標(biāo)識(shí)
 * nolocal:如果服務(wù)器不應(yīng)將在此通道連接上發(fā)布的消息傳遞給此使用者,則為true;請(qǐng)注意RabbitMQ服務(wù)器上不支持此標(biāo)記
 * exclusive: 如果是單個(gè)消費(fèi)者,則為true
 * arguments:消費(fèi)的一組參數(shù)
 * deliverCallback: 當(dāng)一個(gè)消息發(fā)送過(guò)來(lái)后的回調(diào)接口
 * cancelCallback:當(dāng)一個(gè)消費(fèi)者取消訂閱時(shí)的回調(diào)接口;取消消費(fèi)者訂閱隊(duì)列時(shí)除了使用{@link Channel#basicCancel}之外的所有方式都會(huì)調(diào)用該回調(diào)方法
 * shutdownSignalCallback: 當(dāng)channel/connection 關(guān)閉后回調(diào)
 */
channel.basicConsume(QUEUE_NAME, true, ctag, false, false, arguments, deliverCallback, consumerTag -> {}, (consumerTag, sig) -> {});

6.取消消費(fèi)者訂閱

/**
* 取消消費(fèi)者對(duì)隊(duì)列的訂閱關(guān)系
* consumerTag:服務(wù)器端生成的消費(fèi)者標(biāo)識(shí)
**/
void basicCancel(String consumerTag)

7.主動(dòng)拉取隊(duì)列中的一條消息

/**
 * 從消息隊(duì)列中取出第一條消息;整個(gè)方法的執(zhí)行過(guò)程是首先消費(fèi)隊(duì)列,然后檢索第一條消息,然后再取消訂閱
 */
GetResponse response = channel.basicGet(QUEUE_NAME, true);
System.out.println("消費(fèi)者接收到的消息是:"+new String(response.getBody(), "UTF-8"));

參數(shù)介紹

1.隊(duì)列參數(shù)

x-dead-letter-exchange 死信交換機(jī)
x-dead-letter-routing-key 死信消息重定向路由鍵
x-expires 隊(duì)列在指定毫秒數(shù)后被刪除
x-ha-policy 創(chuàng)建HA隊(duì)列
x-ha-nodes HA隊(duì)列的分布節(jié)點(diǎn)
x-max-length 隊(duì)列的最大消息數(shù)
x-message-ttl 毫秒為單位的消息過(guò)期時(shí)間,隊(duì)列級(jí)別
x-max-priority 最大優(yōu)先值為255的隊(duì)列優(yōu)先排序功能

2.消息參數(shù)

content-type 消息體的MIME類型,如application/json
content-encoding 消息的編碼類型
message-id 消息的唯一性標(biāo)識(shí),由應(yīng)用進(jìn)行設(shè)置
correlation-id 一般用做關(guān)聯(lián)消息的message-id,常用于消息的響應(yīng)
timestamp 消息的創(chuàng)建時(shí)刻,整形,精確到秒

完整項(xiàng)目地址在微信公眾中,謝謝大家支持

Java技術(shù)學(xué)習(xí)筆記
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容