Redis總結(jié)(四)redis實(shí)現(xiàn)異步隊(duì)列及延遲隊(duì)列

本文將使用redis實(shí)現(xiàn)異步隊(duì)列以及延遲隊(duì)列,雖然我們?cè)趯?shí)際開(kāi)發(fā)中經(jīng)常會(huì)有專業(yè)的消息隊(duì)列中間件,如:rabbitmq等,但是如果系統(tǒng)中沒(méi)有mq中間件,又懶得維護(hù)mq中間件,那么我們可以通過(guò)redis來(lái)實(shí)現(xiàn)

因?yàn)閞edis并不是專業(yè)實(shí)現(xiàn)隊(duì)列的中間件,因此在實(shí)現(xiàn)方式上還是會(huì)存在一些問(wèn)題,還是比不上rabbitmq之類的中間件,那么我為什么還寫這篇文章,是因?yàn)橥ㄟ^(guò)使用redis來(lái)實(shí)現(xiàn)隊(duì)列及延遲隊(duì)列,可以讓我對(duì)redis的數(shù)據(jù)結(jié)構(gòu)更加熟悉,使用的更加順手。

一、使用redis實(shí)現(xiàn)異步隊(duì)列

redis實(shí)現(xiàn)隊(duì)列主要是使用數(shù)據(jù)結(jié)構(gòu)中的list,因?yàn)樗前凑杖腠樞蚺判虻慕Y(jié)構(gòu),我們就可以按照左邊塞入,右邊取出的方式來(lái)實(shí)現(xiàn)先入先出的隊(duì)列需求

具體實(shí)現(xiàn)如下:

public class RedisClient {
    @Resource
    private JedisPool jedisPool;
   
      /**
     * 向List頭部追加記錄
     * @param key
     * @param value
     * @return 記錄總數(shù)
     */
    public void rpush(String key, String value) {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.rpush(key,value);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    } 
}


//controller寫一個(gè)寫入隊(duì)列的方法
@PostMapping("insert")
public void  setList(@RequestBody QueueTest queueTest){
    System.out.println("塞入一條數(shù)據(jù)");
    redisClient.rpush("queueTest", JSON.toJSONString(queueTest));
}

此時(shí)我們已經(jīng)實(shí)現(xiàn)了從右邊往隊(duì)列中塞數(shù)據(jù),那么我們接下來(lái)只需要從左邊將數(shù)據(jù)取出,即可實(shí)現(xiàn)先入先出

我們?cè)陧?xiàng)目啟動(dòng)的時(shí)候創(chuàng)建一個(gè)線程來(lái)消費(fèi)數(shù)據(jù),代碼如下:

@Service
public class TestRunner implements ApplicationRunner {
    @Autowired
    private RedisClient redisClient;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        new Thread(new Worker()).start();
    }


    private  class  Worker implements Runnable {

        @Override
        public void run() {
            while (true){
                List<String> queueTest = redisClient.blpop(0,"queueTest");
                System.out.println("開(kāi)始消費(fèi)了");
                if(queueTest != null){
                    System.out.println(JSON.toJSONString(queueTest));
                }
            }
        }
    }
}

這樣我們就可以實(shí)現(xiàn)一個(gè)簡(jiǎn)單的隊(duì)列,至于彈出時(shí)為什么使用blpop的方法而不是lpop方法是因?yàn)槿绻褂胠pop方法的話會(huì)造成如果隊(duì)列中沒(méi)有數(shù)據(jù),連接一直空置的情況,所以使用blpop的方法可以在沒(méi)有數(shù)據(jù)的時(shí)候?qū)⑦B接阻塞,在有數(shù)據(jù)時(shí)再讀取
。

當(dāng)然使用blpop同樣可能存在長(zhǎng)時(shí)間沒(méi)有數(shù)據(jù),redis將連接斷掉的情況,因此就需要我們?cè)谑褂脮r(shí)將這種情況的異常也考慮進(jìn)去,在catch中將連接重新建立之類。

測(cè)試過(guò)程如下:


postman請(qǐng)求.png
結(jié)果.png

上面我們實(shí)現(xiàn)的只是簡(jiǎn)單的異步隊(duì)列demo,在項(xiàng)目中使用的話我們可以靈活擴(kuò)展,如:我們需要根據(jù)value中的某一個(gè)值來(lái)做不同的處理,那么我們可以使用工廠模式,寫一個(gè)工廠類,在消費(fèi)消息時(shí),通過(guò)工廠類來(lái)分配不同的消費(fèi)方式。

缺點(diǎn): redis實(shí)現(xiàn)消息我們消費(fèi)的時(shí)候可能無(wú)法得到消息的ack,因?yàn)槲覀兛赡苄枰约壕S護(hù)一個(gè)消息消費(fèi)的表來(lái)記錄消費(fèi)的情況,做后續(xù)處理

二、redis實(shí)現(xiàn)延遲隊(duì)列

我們使用redis實(shí)現(xiàn)延遲隊(duì)列主要時(shí)事使用zset數(shù)據(jù)格式,主要是借助它的score參數(shù),因?yàn)閦set是按照score進(jìn)行排序的,實(shí)現(xiàn)如下:

我們這里實(shí)現(xiàn)一個(gè)簡(jiǎn)單的五秒延遲執(zhí)行:
1、首先我們插入一條數(shù)據(jù),將score設(shè)置當(dāng)前時(shí)間戳加上五秒,如下

    @PostMapping("setZset")
    public void setZset(@RequestBody QueueTest queueTest){
        System.out.println("塞入一條數(shù)據(jù)");
        redisClient.zadd("zqueueTest1",System.currentTimeMillis() + 5000,JSON.toJSONString(queueTest));
    }

2、我們?cè)O(shè)置一個(gè)跑批去讀取在當(dāng)前時(shí)間或者當(dāng)前時(shí)間之前的值,代碼如下:

@EnableScheduling
@Component
public class TestTask {
    @Autowired
    private RedisClient redisClient;

    @Scheduled(cron = "* * * * * ?")
    public void popSet(){
        Set<String> zqueueTest = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis());
        //只取一條數(shù)據(jù)
        //Set<String> zqueueTest1 = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis(),0,1);
        for (String item : zqueueTest){
            System.out.println(JSON.toJSONString(item));
            redisClient.zrem("zqueueTest1",item);
        }
    }
}

這樣我們就可以實(shí)現(xiàn)zset里面是按照時(shí)間的大小進(jìn)行排序的,我們只需要取出當(dāng)前時(shí)間以前的數(shù)據(jù),就可以實(shí)現(xiàn)延遲的效果。

測(cè)試如下:


延遲隊(duì)列postman請(qǐng)求.png
延遲結(jié)果.png

通過(guò)上圖的時(shí)間可以看出來(lái)我們基本上試下了五秒的延遲

缺點(diǎn):

1、因?yàn)槭桥芘x取所以存在一定的時(shí)間延遲

2、對(duì)于并發(fā)的支持性不強(qiáng)

總結(jié): 上面就是redis實(shí)現(xiàn)隊(duì)列的一些demo,這里寫出來(lái)只是給大家提供一些思路,但是大家也可以看出來(lái)這種實(shí)現(xiàn)方式還是存在一定的問(wèn)題,所以真正用的時(shí)候還是使用專業(yè)的mq中間件,后續(xù)也會(huì)有mq的系列文章大家也可以參考借鑒下。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容