領(lǐng)導(dǎo)看了我寫的關(guān)閉超時訂單,讓我出門左轉(zhuǎn)!

原創(chuàng):微信公眾號 【阿Q說代碼】,歡迎分享,轉(zhuǎn)載請保留出處。

哈嘍大家好,我是阿Q!

前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項目又重新啟動了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時”的代碼,心中的酸楚只有自己能夠體會。

這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級??吹竭@么“可愛”的代碼,心中一萬只“xx馬”疾馳而過。

讓我最深惡痛覺的就是里邊竟然用定時任務(wù)實現(xiàn)了“關(guān)閉超時訂單”的功能,現(xiàn)在想來,哭笑不得。我們先分析一波為什么大家都在抵制用定時任務(wù)來實現(xiàn)該功能。

定時任務(wù)

關(guān)閉超時訂單是在創(chuàng)建訂單之后的一段時間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時時間是一致的。

如果我們使用定時任務(wù)來進(jìn)行該操作,很難把握定時任務(wù)輪詢的時間間隔:

  • 時間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說的時間一致性問題,但是頻繁掃描數(shù)據(jù)庫,執(zhí)行定時任務(wù),會造成網(wǎng)絡(luò)IO和磁盤IO的消耗,對實時交易造成一定的沖擊;
  • 時間間隔比較大,由于每個訂單創(chuàng)建的時間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:
image

假設(shè)30分鐘訂單超時自動關(guān)閉,定時任務(wù)的執(zhí)行間隔時間為30分鐘:

  1. 我們在第5分鐘進(jìn)行下單操作;
  2. 當(dāng)時間來到第30分鐘時,定時任務(wù)執(zhí)行一次,但是我們的訂單未滿足條件,不執(zhí)行;
  3. 當(dāng)時間來到第35分鐘時,訂單達(dá)到關(guān)閉條件,但是定時任務(wù)未執(zhí)行,所以不執(zhí)行;
  4. 當(dāng)時間來到第60分鐘時,開始執(zhí)行我們的訂單關(guān)閉操作,而此時,誤差達(dá)到25分鐘。

經(jīng)此種種,我們需要舍棄該方式。

延時隊列

為了滿足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊列:RabbitMQ。盡管它本身并沒有提供延時隊列的功能,但是我們可以利用它的存活時間和死信交換機的特性來間接實現(xiàn)。

首先我們先來簡單介紹下什么是存活時間?什么是死信交換機?

存活時間

存活時間的全拼是Time To Live,簡稱 TTL。它既支持對消息本身進(jìn)行設(shè)置(延遲隊列的關(guān)鍵),又支持對隊列進(jìn)行設(shè)置(該隊列中所有消息存在相同的過期時間)。

  • 對消息本身進(jìn)行設(shè)置:即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判定的;
  • 對隊列進(jìn)行設(shè)置:一旦消息過期,就會從隊列中抹去;

如果同時使用這兩種方法,那么以過期時間的那個數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過期時間還沒有被消費,那么該消息就“死了”,我們把它稱為 死信 消息。

消息變?yōu)樗佬诺臈l件:

  • 消息被拒絕(basic.reject/basic.nack),并且requeue=false;
  • 消息的過期時間到期了;
  • 隊列達(dá)到最大長度;

隊列設(shè)置注意事項

  1. 隊列中該屬性的設(shè)置要在第一次聲明隊列的時候設(shè)置才有效,如果隊列一開始已存在且沒有這個屬性,則要刪掉隊列再重新聲明才可以;
  2. 隊列的 ttl 只能被設(shè)置為某個固定的值,一旦設(shè)置后則不能更改,否則會拋出異常;

死信交換機

死信交換機全拼Dead-Letter-Exchange,簡稱DLX。

當(dāng)消息在一個隊列中變成死信之后,如果這個消息所在的隊列設(shè)置了x-dead-letter-exchange參數(shù),那么它會被發(fā)送到x-dead-letter-exchange對應(yīng)值的交換機上,這個交換機就稱之為死信交換機,與這個死信交換器綁定的隊列就是死信隊列。

  • x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機;
  • x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key

死信隊列與普通隊列的區(qū)別就是它的RoutingKeyExchange需要作為參數(shù),綁定到正常的隊列上。

實戰(zhàn)教學(xué)

先來張圖感受下我們的整體思路

image
  1. 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機路由到延時隊列中;
  2. 在延時隊列中綁定死信交換機與死信轉(zhuǎn)發(fā)的routing-key;
  3. 等延時隊列中的消息達(dá)到延時時間之后變成死信轉(zhuǎn)發(fā)到死信交換機并路由到死信隊列中;
  4. 最后供消費者消費。

我們在上文的基礎(chǔ)上進(jìn)行代碼實現(xiàn):

配置類

@Configuration
public class DelayQueueRabbitConfig {

    public static final String DLX_QUEUE = "queue.dlx";//死信隊列
    public static final String DLX_EXCHANGE = "exchange.dlx";//死信交換機
    public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信隊列與死信交換機綁定的routing-key

    public static final String ORDER_QUEUE = "queue.order";//訂單的延時隊列
    public static final String ORDER_EXCHANGE = "exchange.order";//訂單交換機
    public static final String ORDER_ROUTING_KEY = "routingkey.order";//延時隊列與訂單交換機綁定的routing-key

 /**
     * 定義死信隊列
     **/
    @Bean
    public Queue dlxQueue(){
        return new Queue(DLX_QUEUE,true);
    }

    /**
     * 定義死信交換機
     **/
    @Bean
    public DirectExchange dlxExchange(){
        return new DirectExchange(DLX_EXCHANGE, true, false);
    }

    /**
     * 死信隊列和死信交換機綁定
     * 設(shè)置路由鍵:routingkey.dlx
     **/
    @Bean
    Binding bindingDLX(){
        return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
    }

    /**
     * 訂單延時隊列
     * 設(shè)置隊列里的死信轉(zhuǎn)發(fā)到的DLX名稱
     * 設(shè)置死信在轉(zhuǎn)發(fā)時攜帶的 routing-key 名稱
     **/
    @Bean
    public Queue orderQueue() {
        Map<String, Object> params = new HashMap<>();
        params.put("x-dead-letter-exchange", DLX_EXCHANGE);
        params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
        return new Queue(ORDER_QUEUE, true, false, false, params);
    }

    /**
     * 訂單交換機
     **/
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 把訂單隊列和訂單交換機綁定在一起
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
    }
}

發(fā)送消息

@RequestMapping("/order")
public class OrderSendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendMessage")
    public String sendMessage(){

        String delayTime = "10000";
        //將消息攜帶路由鍵值
        rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                "發(fā)送消息!",message->{
            message.getMessageProperties().setExpiration(delayTime);
            return message;
        });
        return "ok";
    }

}

消費消息

@Component
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽隊列名稱
public class OrderMQReciever {

    @RabbitHandler
    public void process(String message){
        System.out.println("OrderMQReciever接收到的消息是:"+ message);
    }
}

測試

通過調(diào)用接口,發(fā)現(xiàn)10秒之后才會消費消息

image

問題升級

由于開發(fā)環(huán)境和測試環(huán)境使用的是同一個交換機和隊列,所以發(fā)送的延時時間都是30分鐘。但是為了在測試環(huán)境讓測試同學(xué)方便測試,故手動將測試環(huán)境的時間改為了1分鐘。

問題復(fù)現(xiàn)

接著問題就來了:延時時間為1分鐘的消息并沒有立即被消費,而是等30分鐘的消息被消費完之后才被消費了。至于原因,我們下邊再分析,先用代碼來給大家復(fù)現(xiàn)下該問題。

@GetMapping("/sendManyMessage")
public String sendManyMessage(){
    send("延遲消息睡10秒",10000+"");
    send("延遲消息睡2秒",2000+"");
    send("延遲消息睡5秒",5000+"");
    return "ok";
}

private void send(String msg, String delayTime){
 rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE, 
                                  DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
                                  msg,message->{
                                      message.getMessageProperties().setExpiration(delayTime);
                                      return message;
                                  });
}

執(zhí)行結(jié)果如下:

OrderMQReciever接收到的消息是:延遲消息睡10秒
OrderMQReciever接收到的消息是:延遲消息睡2秒
OrderMQReciever接收到的消息是:延遲消息睡5秒

原因就是延時隊列也滿足隊列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊列時,后邊的消息不能順利出隊,造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時。

問題解決

我們可以利用x-delay-message插件來解決該問題

消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)

image
  1. 生產(chǎn)者發(fā)送消息到交換機時,并不會立即進(jìn)入,而是先將消息持久化到 Mnesia(一個分布式數(shù)據(jù)庫管理系統(tǒng));
  2. 插件將會嘗試確認(rèn)消息是否過期;
  3. 如果消息過期,消息會通過 x-delayed-type 類型標(biāo)記的交換機投遞至目標(biāo)隊列,供消費者消費;

實踐

官網(wǎng)下載,我這邊使用的是v3.8.0.ez,將文件下載下來放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。

image
image

出現(xiàn)如圖所示,代表安裝成功。

配置類

@Configuration
public class XDelayedMessageConfig {

    public static final String DIRECT_QUEUE = "queue.direct";//隊列
    public static final String DELAYED_EXCHANGE = "exchange.delayed";//延遲交換機
    public static final String ROUTING_KEY = "routingkey.bind";//綁定的routing-key

    /**
     * 定義隊列
     **/
    @Bean
    public Queue directQueue(){
        return new Queue(DIRECT_QUEUE,true);
    }

    /**
     * 定義延遲交換機
     * args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機具有相同的路由行為,
     * 如果想要不同的路由行為,可以更換現(xiàn)有的交換類型如:“topic”
     * 交換機類型為 x-delayed-message
     **/
    @Bean
    public CustomExchange delayedExchange(){
        Map<String, Object> args = new HashMap<String, Object>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
    }

    /**
     * 隊列和延遲交換機綁定
     **/
    @Bean
    public Binding orderBinding() {
        return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
    }

}

發(fā)送消息

@RestController
@RequestMapping("/delayed")
public class DelayedSendMessageController {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("/sendManyMessage")
    public String sendManyMessage(){

        send("延遲消息睡10秒",10000);
        send("延遲消息睡2秒",2000);
        send("延遲消息睡5秒",5000);
        return "ok";
    }

    private void send(String msg, Integer delayTime){
        //將消息攜帶路由鍵值
        rabbitTemplate.convertAndSend(
                XDelayedMessageConfig.DELAYED_EXCHANGE,
                XDelayedMessageConfig.ROUTING_KEY,
                msg,
                message->{
                    message.getMessageProperties().setDelay(delayTime);
                    return message;
                });
    }
}

消費消息

@Component
@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽隊列名稱
public class DelayedMQReciever {

    @RabbitHandler
    public void process(String message){
        System.out.println("DelayedMQReciever接收到的消息是:"+ message);
    }
}

測試

DelayedMQReciever接收到的消息是:延遲消息睡2秒
DelayedMQReciever接收到的消息是:延遲消息睡5秒
DelayedMQReciever接收到的消息是:延遲消息睡10秒

這樣我們的問題就順利解決了。

局限性

延遲的消息存儲在一個Mnesia表中,當(dāng)前節(jié)點上只有一個磁盤副本,它們將在節(jié)點重啟后存活。

雖然觸發(fā)計劃交付的計時器不會持久化,但它將在節(jié)點啟動時的插件激活期間重新初始化。顯然,集群中只有一個預(yù)定消息的副本意味著丟失該節(jié)點或禁用其上的插件將丟失駐留在該節(jié)點上的消息。

該插件的當(dāng)前設(shè)計并不適合延遲消息數(shù)量較多的場景(如數(shù)萬條或數(shù)百萬條),另外該插件的一個可變性來源是依賴于 Erlang 計時器,在系統(tǒng)中使用了一定數(shù)量的長時間計時器之后,它們開始爭用調(diào)度程序資源,并且時間漂移不斷累積。

回復(fù)“rabbitMQ”獲取源碼!

今天的內(nèi)容就講這了,如果你有不同的意見或者更好的idea,歡迎聯(lián)系阿Q,阿Q將持續(xù)更新java實戰(zhàn)方面的文章,感興趣的可以關(guān)注下阿Q,也可以來技術(shù)群討論問題呦,點贊之交值得深交!

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容