基于Redisson實(shí)現(xiàn)延遲隊(duì)列

Redisson實(shí)現(xiàn)延遲隊(duì)列

1.場(chǎng)景介紹

假設(shè)有這樣一個(gè)場(chǎng)景,我們有一個(gè)訂單,或者工單等等。需要在超時(shí)30分鐘后進(jìn)行關(guān)閉。這個(gè)時(shí)候我們最先想到的應(yīng)該是采用定時(shí)任務(wù)去進(jìn)行輪訓(xùn)判斷,但是呢,每個(gè)訂單的創(chuàng)建時(shí)間是不一樣的,這個(gè)時(shí)間怎么確定才好呢,5分鐘。。1分鐘。。執(zhí)行一次嗎。這樣就會(huì)非常影響性能。且時(shí)間誤差很大?;谝陨蠘I(yè)務(wù)需要我們想到了有以下解決方案。

  • JDK延遲隊(duì)列,但是數(shù)據(jù)都在內(nèi)存中,重啟后什么都沒(méi)了。
  • MQ中的延遲隊(duì)列,比如RocketMQ。
  • 基于Redisson的延遲隊(duì)列

2.JDK延遲隊(duì)列

我們首先來(lái)回顧下JDK的延遲隊(duì)列

基于延遲隊(duì)列要實(shí)現(xiàn)接口Delayed,并且實(shí)現(xiàn)getDelay方法和compareTo方法

  • getDelay主要是計(jì)算返回剩余時(shí)間,單位時(shí)間戳(毫秒)延遲任務(wù)是否到時(shí)就是按照這個(gè)方法判斷如果返回的是負(fù)數(shù)則說(shuō)明到期否則還沒(méi)到期
  • compareTo主要是自定義實(shí)現(xiàn)比較方法返回 1 0 -1三個(gè)參數(shù)
@ToString
public class MyDelayed<T> implements Delayed {
    /**
     * 延遲時(shí)間
     */
    Long delayTime;

    /**
     * 過(guò)期時(shí)間
     */
    Long expire;
    /**
     * 數(shù)據(jù)
     */
    T t;

    public MyDelayed(long delayTime, T t) {
        this.delayTime = delayTime;
        // 過(guò)期時(shí)間 = 當(dāng)前時(shí)間 + 延遲時(shí)間
        this.expire = System.currentTimeMillis() + delayTime;
        this.t = t;
    }

    /**
     * 剩余時(shí)間 = 到期時(shí)間 - 當(dāng)前時(shí)間
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.expire - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * 優(yōu)先級(jí)規(guī)則:兩個(gè)任務(wù)比較,時(shí)間短的優(yōu)先執(zhí)行
     */
    @Override
    public int compareTo(Delayed o) {
        long f = this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS);
        return (int) f;
    }

訂單的實(shí)體,為了簡(jiǎn)單就定義基礎(chǔ)幾個(gè)字段。

@Data
public class OrderInfo implements Serializable {
    private static final long serialVersionUID = -2837036864073566484L;
    /**
     * 訂單id
     */
    private Long id;
    
    /**
     * 訂單金額
     */
    private Double salary;

    /**
     * 訂單創(chuàng)建時(shí)間   對(duì)于java8LocalDateTime 以下注解序列化反序列化用到
     */
    @JsonDeserialize(using = LocalDateTimeDeserializer.class)
    @JsonSerialize(using = LocalDateTimeSerializer.class)
    private LocalDateTime createTime;

}

為了簡(jiǎn)單我們暫且定義延遲時(shí)間為10s

public static void main(String[] args) throws InterruptedException {
        OrderInfo orderInfo = new OrderInfo();
        orderInfo.setCreateTime(LocalDateTimeUtil.parse("2022-07-01 15:00:00", "yyyy-MM-dd HH:mm:ss"));
        MyDelayed<OrderInfo> myDelayed = new MyDelayed<>(10000L,orderInfo);
        DelayQueue<MyDelayed<OrderInfo>> queue = new DelayQueue<>();
        queue.add(myDelayed);
       
        System.out.println(queue.take().getT().getCreateTime());
        System.out.println("當(dāng)前時(shí)間:" + LocalDateTime.now());
    }

輸出結(jié)果

2022-07-01T15:00
當(dāng)前時(shí)間:2022-07-01T15:10:37.375

3.基于Redisson的延遲隊(duì)列

當(dāng)然今天的主角是它了,我們主要圍繞著基于Redisson的延遲隊(duì)列來(lái)說(shuō)。

其實(shí)Redisson延遲隊(duì)列內(nèi)部也是基于redis來(lái)實(shí)現(xiàn)的,我們先來(lái)進(jìn)行整合使用看看效果。基于springboot

1.依賴(lài):

 <dependency>
      <groupId>org.redisson</groupId>
      <artifactId>redisson-spring-boot-starter</artifactId>
      <version>3.16.7</version>
 </dependency>

2.創(chuàng)建redisson.yml

# 單節(jié)點(diǎn)配置
singleServerConfig:
  # 連接空閑超時(shí),單位:毫秒
  idleConnectionTimeout: 10000
  # 連接超時(shí),單位:毫秒
  connectTimeout: 10000
  # 命令等待超時(shí),單位:毫秒
  timeout: 3000
  # 命令失敗重試次數(shù),如果嘗試達(dá)到 retryAttempts(命令失敗重試次數(shù)) 仍然不能將命令發(fā)送至某個(gè)指定的節(jié)點(diǎn)時(shí),將拋出錯(cuò)誤。
  # 如果嘗試在此限制之內(nèi)發(fā)送成功,則開(kāi)始啟用 timeout(命令等待超時(shí)) 計(jì)時(shí)。
  retryAttempts: 3
  # 命令重試發(fā)送時(shí)間間隔,單位:毫秒
  retryInterval: 1500
  # 密碼
  password:
  # 單個(gè)連接最大訂閱數(shù)量
  subscriptionsPerConnection: 5
  # 客戶(hù)端名稱(chēng)
  clientName: null
  # 節(jié)點(diǎn)地址
  address: redis://127.0.0.1:6379
  # 發(fā)布和訂閱連接的最小空閑連接數(shù)
  subscriptionConnectionMinimumIdleSize: 1
  # 發(fā)布和訂閱連接池大小
  subscriptionConnectionPoolSize: 50
  # 最小空閑連接數(shù)
  connectionMinimumIdleSize: 32
  # 連接池大小
  connectionPoolSize: 64
  # 數(shù)據(jù)庫(kù)編號(hào)
  database: 0
  # DNS監(jiān)測(cè)時(shí)間間隔,單位:毫秒
  dnsMonitoringInterval: 5000
# 線(xiàn)程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#threads: 0
# Netty線(xiàn)程池?cái)?shù)量,默認(rèn)值: 當(dāng)前處理核數(shù)量 * 2
#nettyThreads: 0
# 編碼
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 傳輸模式
transportMode : "NIO"

3.創(chuàng)建配置類(lèi)RedissonConfig,這里是為了讀取我們剛剛創(chuàng)建在配置文件中的yml

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient() throws IOException {
        Config config = Config.fromYAML(RedissonConfig.class.getClassLoader().getResource("redisson.yml"));;
        return Redisson.create(config);
    }
}

4.測(cè)試

        // redisson  延遲隊(duì)列
        // Redisson的延時(shí)隊(duì)列是對(duì)另一個(gè)隊(duì)列的再包裝,使用時(shí)要先將延時(shí)消息添加到延時(shí)隊(duì)列中,
        // 當(dāng)延時(shí)隊(duì)列中的消息達(dá)到設(shè)定的延時(shí)時(shí)間后,該延時(shí)消息才會(huì)進(jìn)行進(jìn)入到被包裝隊(duì)列中,因此,我們只需要對(duì)被包裝隊(duì)列進(jìn)行監(jiān)聽(tīng)即可。
        RBlockingQueue<OrderInfo> blockingFairQueue = redissonClient.getBlockingQueue("my-test");

        RDelayedQueue<OrderInfo> delayedQueue = redissonClient.getDelayedQueue(blockingFairQueue);

        OrderInfo orderInfo = new OrderInfo();
        // 訂單生成時(shí)間
        orderInfo.setCreateTime(LocalDateTime.now());
        // 10秒鐘以后將消息發(fā)送到指定隊(duì)列
        delayedQueue.offer(orderInfo, 10, TimeUnit.SECONDS);
        RBlockingQueue<OrderInfo> outQueue = redissonClient.getBlockingQueue("my-test");

        OrderInfo orderInfo2 = outQueue.take();
        System.out.println("訂單生成時(shí)間" + orderInfo2.getCreateTime());
        System.out.println("訂單關(guān)閉時(shí)間" + LocalDateTime.now());

        // 在該對(duì)象不再需要的情況下,應(yīng)該主動(dòng)銷(xiāo)毀。僅在相關(guān)的Redisson對(duì)象也需要關(guān)閉的時(shí)候可以不用主動(dòng)銷(xiāo)毀
        delayedQueue.destroy();

控制臺(tái)輸出:

訂單生成時(shí)間2022-07-01T15:22:10.304
訂單關(guān)閉時(shí)間2022-07-01T15:22:20.414

解決項(xiàng)目重新啟動(dòng)并不會(huì)消費(fèi)之前隊(duì)列里的消息的問(wèn)題,增加如下代碼

 redissonClient.getDelayedQueue(deque);

4.深入探究Redisson的延遲隊(duì)列實(shí)現(xiàn)原理

我們首先來(lái)了解兩個(gè)API

  • RBlockingQueue 就是目標(biāo)隊(duì)列

  • RDelayedQueue 就是中轉(zhuǎn)隊(duì)列

那么為什么會(huì)涉及到兩個(gè)隊(duì)列呢,這兩個(gè)隊(duì)列到底有什么用呢?

首先我們實(shí)際操作的是RBlockingQueue阻塞隊(duì)列,并不是RDelayedQueue隊(duì)列,RDelayedQueue對(duì)接主要是提供中間轉(zhuǎn)發(fā)的一個(gè)隊(duì)列,類(lèi)似中間商的意思

畫(huà)個(gè)小圖理解下

image-20220701153738619.png

這里不難看出我們都是基于RBlockingQueue 目標(biāo)隊(duì)列在進(jìn)行消費(fèi),而RDelayedQueue就是會(huì)把過(guò)期的消息放入到我們的目標(biāo)隊(duì)列中

我們只要從RBlockingQueue隊(duì)列中取數(shù)據(jù)即可。

好像還是不夠深入,我們接著看。我們知道Redisson是基于redis來(lái)實(shí)現(xiàn)的那么我們看看里面到底做了什么事

打開(kāi)redis客戶(hù)端,執(zhí)行monitor命令,看下在執(zhí)行上面訂單操作時(shí)redis到底執(zhí)行了哪些命令

monitor命令可以看到操作redis時(shí)執(zhí)行了什么命令

// 這里訂閱了一個(gè)固定的隊(duì)列 redisson_delay_queue_channel:{my-test},為了開(kāi)啟進(jìn)程里面的延時(shí)任務(wù)
"SUBSCRIBE" "redisson_delay_queue_channel:{my-test}"

// Redis Zrangebyscore 返回有序集合中指定分?jǐn)?shù)區(qū)間的成員列表。有序集成員按分?jǐn)?shù)值遞增(從小到大)次序排列。
// redisson_delay_queue_channel:{my-test} 是一個(gè)zset,當(dāng)有延時(shí)數(shù)據(jù)存入Redisson隊(duì)列時(shí),就會(huì)在此隊(duì)列中插入 數(shù)據(jù),排序分?jǐn)?shù)為延時(shí)的時(shí)間戳(毫秒 以下同理)。
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404479385" "limit" "0" "100"

// 取出第一個(gè)數(shù),也就是判斷上面執(zhí)行的操作是否有下一頁(yè)。(因?yàn)閯倓傞_(kāi)始總是0的)除非是之前的操作(zrangebyscore)沒(méi)有取完
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"

// 往zset里面設(shè)置 數(shù)據(jù)過(guò)期的時(shí)間戳(當(dāng)前執(zhí)行的時(shí)間戳+延時(shí)的時(shí)間毫秒值)內(nèi)容就是訂單數(shù)據(jù)
"zadd" "redisson_delay_queue_timeout:{my-test}" "1656404489400" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 同步一份數(shù)據(jù)到list隊(duì)列
"rpush" "redisson_delay_queue:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 取出排序好的第一個(gè)數(shù)據(jù),也就是最臨近要觸發(fā)的數(shù)據(jù),然后發(fā)送通知
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0"

// 發(fā)送通知  之前第一步 SUBSCRIBE 訂閱了 客戶(hù)端收到通知后,就在自己進(jìn)程里面開(kāi)啟延時(shí)任務(wù)(HashedWheelTimer),到時(shí)間后就可以從redis取數(shù)據(jù)發(fā)送
"publish" "redisson_delay_queue_channel:{my-test}" "1656404489400"

// 這里就是取數(shù)據(jù)環(huán)節(jié)了
"BLPOP" "my-test" "0"

// 在范圍 0-過(guò)期時(shí)間  取出100條數(shù)據(jù)
"zrangebyscore" "redisson_delay_queue_timeout:{my-test}" "0" "1656404489444" "limit" "0" "100"

// 將上面取到的數(shù)據(jù)push到阻塞隊(duì)列 很顯然能看到 com.example.mytest.domain.OrderInfo 是我們的訂單數(shù)據(jù)
"rpush" "my-test" "{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 刪除數(shù)據(jù)
"lrem" "redisson_delay_queue:{my-test}" "1" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"
"zrem" "redisson_delay_queue_timeout:{my-test}" "b\x99M9\x9b\x0c\xd3\xc3\\\x00\x00\x00{\"@class\":\"com.example.mytest.domain.OrderInfo\",\"createTime\":[2022,6,28,16,21,19,400000000]}"

// 取zset第一個(gè)數(shù)據(jù),有的話(huà)繼續(xù)上面邏輯取數(shù)據(jù)
"zrange" "redisson_delay_queue_timeout:{my-test}" "0" "0" "WITHSCORES"

// 退訂
"UNSUBSCRIBE" "redisson_delay_queue_channel:{my-test}"

這里參考:https://zhuanlan.zhihu.com/p/343811173

我們知道Zset是按照分?jǐn)?shù)升序的也就是最小的分?jǐn)?shù)在最前面,基于這個(gè)特點(diǎn),大致明白,利用過(guò)期時(shí)間的時(shí)間戳作為分?jǐn)?shù)放入到Zset中,那么即將過(guò)期的就在最上面。

直接上個(gè)圖解


image-20220701155445411.png
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容