消息中間件踩坑之旅(三)——RabbitMq延時任務(wù)處理

場景描述

  • 在訂單系統(tǒng)中,一個用戶下單之后通常有30分鐘的時間進(jìn)行支付,如果30分鐘之內(nèi)沒有支付成功,那么這個訂單將進(jìn)行異常處理。

  • 用戶希望通過手機(jī)遠(yuǎn)程遙控家里的智能設(shè)備在指定的時間進(jìn)行工作。這時候就可以將用戶的指令發(fā)送到消息隊列里,進(jìn)行延時處理。

如何實現(xiàn)?

因為博主知識淺薄,所以這里只提供兩種簡單的解決方案
方案一:使用延遲隊列
用到知識:

死信隊列 http://www.rabbitmq.com/dlx.html

  • 當(dāng)該隊列中的消息被拒絕、過期、或者隊列達(dá)到最大長度。消息就會變成死信,然后被重新發(fā)送到另一個轉(zhuǎn)換器。
  • 通過x-dead-letter-exchange設(shè)置轉(zhuǎn)發(fā)的轉(zhuǎn)換器
  • 可以攜帶routing-key ---- x-dead-letter-routing-key

過期時間TTL http://www.rabbitmq.com/ttl.html

  • 故名思意就是設(shè)置一個時間,超過這個時間,消息就過期了。
  • 其實這里有很多種設(shè)置方案,這里采用最合適的設(shè)置 message-ttl 值,這是在隊列屬性上設(shè)置的
image

一下代碼基于SpringBoot

@Bean////訂單首次存在的死信隊列
public Queue TradeQueue() {
    Map<String,Object> map = new HashMap<>();
    map.put("x-dead-letter-exchange","DLXExchange");  //設(shè)置重新發(fā)送的轉(zhuǎn)換器名
    map.put("x-message-ttl",6000);//毫秒為單位          //過期時間
    return new Queue("TradeQueue",true,false,false,map);
}
@Bean//訂單在死信隊列過期后去的隊列
public Queue DLXQueue() {
    Map<String,Object> map = new HashMap<>();
    return new Queue("DLXQueue",true,false,false,map);
}
//兩個轉(zhuǎn)換器聲明
@Bean
DirectExchange TradeExchange() {
    DirectExchange exchange = new DirectExchange("TradeExchange", true, false);
    return exchange;
}

@Bean
FanoutExchange DLXExchange() {
    return new FanoutExchange("DLXExchange",true, false);
}
@Bean//訂單首次存在的死信隊列交換器綁定
Binding bindingExchangeMessage1(Queue TradeQueue, DirectExchange TradeExchange) {
    return BindingBuilder.bind(TradeQueue).to(TradeExchange).with("trade");
}

@Bean//訂單首次存在的死信隊列出來后經(jīng)過的交換器綁定
Binding bindingExchangeMessage2(Queue DLXQueue, FanoutExchange DLXExchange) {
    return BindingBuilder.bind(DLXQueue).to(DLXExchange);
}
@Autowired
private AmqpTemplate rabbitTemplate;

public void send() {
    //測試發(fā)送
    //訂單號
    int tradeId = 9518;
    //發(fā)送單號去死信隊列做延時操作
   rabbitTemplate.convertAndSend("TradeExchange","trade",tradeId);
    System.out.println(new Date()+"---成功發(fā)送訂單號到死信隊列");

}
@RabbitListener(queues = "DLXQueue")    //接收來到DLXQueue的消息,及時TradeQueue過期后傳來的消息,然后進(jìn)行處理
public void process2(int str) {
    System.out.println("Receiver  : " +str);
    System.out.println(new Date()+"---成功接收已經(jīng)過期訂單號");
    System.out.println("處理操作。。。。。");

}

運行結(jié)果 時間相差和之前設(shè)置的一樣,從而達(dá)到效果

      Mon Jan 21 00:51:03 CST 2019---成功發(fā)送訂單號到死信隊列
      Receiver  : 9518
      Mon Jan 21 00:51:09 CST 2019---成功接收已經(jīng)過期訂單號
      處理操作。。。。。

剩下一種方案 后續(xù)更新 是采用

Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
///。。。。。
 message.getMessageProperties().setDelay(3000);   // 毫秒為單位,指定此消息的延時時長
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容