本文將使用redis實(shí)現(xiàn)異步隊(duì)列以及延遲隊(duì)列,雖然我們?cè)趯?shí)際開(kāi)發(fā)中經(jīng)常會(huì)有專業(yè)的消息隊(duì)列中間件,如:rabbitmq等,但是如果系統(tǒng)中沒(méi)有mq中間件,又懶得維護(hù)mq中間件,那么我們可以通過(guò)redis來(lái)實(shí)現(xiàn)
因?yàn)閞edis并不是專業(yè)實(shí)現(xiàn)隊(duì)列的中間件,因此在實(shí)現(xiàn)方式上還是會(huì)存在一些問(wèn)題,還是比不上rabbitmq之類的中間件,那么我為什么還寫這篇文章,是因?yàn)橥ㄟ^(guò)使用redis來(lái)實(shí)現(xiàn)隊(duì)列及延遲隊(duì)列,可以讓我對(duì)redis的數(shù)據(jù)結(jié)構(gòu)更加熟悉,使用的更加順手。
一、使用redis實(shí)現(xiàn)異步隊(duì)列
redis實(shí)現(xiàn)隊(duì)列主要是使用數(shù)據(jù)結(jié)構(gòu)中的list,因?yàn)樗前凑杖腠樞蚺判虻慕Y(jié)構(gòu),我們就可以按照左邊塞入,右邊取出的方式來(lái)實(shí)現(xiàn)先入先出的隊(duì)列需求
具體實(shí)現(xiàn)如下:
public class RedisClient {
@Resource
private JedisPool jedisPool;
/**
* 向List頭部追加記錄
* @param key
* @param value
* @return 記錄總數(shù)
*/
public void rpush(String key, String value) {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.rpush(key,value);
} catch (Exception e) {
throw e;
} finally {
if(jedis != null){
jedis.close();
}
}
}
}
//controller寫一個(gè)寫入隊(duì)列的方法
@PostMapping("insert")
public void setList(@RequestBody QueueTest queueTest){
System.out.println("塞入一條數(shù)據(jù)");
redisClient.rpush("queueTest", JSON.toJSONString(queueTest));
}
此時(shí)我們已經(jīng)實(shí)現(xiàn)了從右邊往隊(duì)列中塞數(shù)據(jù),那么我們接下來(lái)只需要從左邊將數(shù)據(jù)取出,即可實(shí)現(xiàn)先入先出
我們?cè)陧?xiàng)目啟動(dòng)的時(shí)候創(chuàng)建一個(gè)線程來(lái)消費(fèi)數(shù)據(jù),代碼如下:
@Service
public class TestRunner implements ApplicationRunner {
@Autowired
private RedisClient redisClient;
@Override
public void run(ApplicationArguments args) throws Exception {
new Thread(new Worker()).start();
}
private class Worker implements Runnable {
@Override
public void run() {
while (true){
List<String> queueTest = redisClient.blpop(0,"queueTest");
System.out.println("開(kāi)始消費(fèi)了");
if(queueTest != null){
System.out.println(JSON.toJSONString(queueTest));
}
}
}
}
}
這樣我們就可以實(shí)現(xiàn)一個(gè)簡(jiǎn)單的隊(duì)列,至于彈出時(shí)為什么使用blpop的方法而不是lpop方法是因?yàn)槿绻褂胠pop方法的話會(huì)造成如果隊(duì)列中沒(méi)有數(shù)據(jù),連接一直空置的情況,所以使用blpop的方法可以在沒(méi)有數(shù)據(jù)的時(shí)候?qū)⑦B接阻塞,在有數(shù)據(jù)時(shí)再讀取
。
當(dāng)然使用blpop同樣可能存在長(zhǎng)時(shí)間沒(méi)有數(shù)據(jù),redis將連接斷掉的情況,因此就需要我們?cè)谑褂脮r(shí)將這種情況的異常也考慮進(jìn)去,在catch中將連接重新建立之類。
測(cè)試過(guò)程如下:


上面我們實(shí)現(xiàn)的只是簡(jiǎn)單的異步隊(duì)列demo,在項(xiàng)目中使用的話我們可以靈活擴(kuò)展,如:我們需要根據(jù)value中的某一個(gè)值來(lái)做不同的處理,那么我們可以使用工廠模式,寫一個(gè)工廠類,在消費(fèi)消息時(shí),通過(guò)工廠類來(lái)分配不同的消費(fèi)方式。
缺點(diǎn): redis實(shí)現(xiàn)消息我們消費(fèi)的時(shí)候可能無(wú)法得到消息的ack,因?yàn)槲覀兛赡苄枰约壕S護(hù)一個(gè)消息消費(fèi)的表來(lái)記錄消費(fèi)的情況,做后續(xù)處理
二、redis實(shí)現(xiàn)延遲隊(duì)列
我們使用redis實(shí)現(xiàn)延遲隊(duì)列主要時(shí)事使用zset數(shù)據(jù)格式,主要是借助它的score參數(shù),因?yàn)閦set是按照score進(jìn)行排序的,實(shí)現(xiàn)如下:
我們這里實(shí)現(xiàn)一個(gè)簡(jiǎn)單的五秒延遲執(zhí)行:
1、首先我們插入一條數(shù)據(jù),將score設(shè)置當(dāng)前時(shí)間戳加上五秒,如下
@PostMapping("setZset")
public void setZset(@RequestBody QueueTest queueTest){
System.out.println("塞入一條數(shù)據(jù)");
redisClient.zadd("zqueueTest1",System.currentTimeMillis() + 5000,JSON.toJSONString(queueTest));
}
2、我們?cè)O(shè)置一個(gè)跑批去讀取在當(dāng)前時(shí)間或者當(dāng)前時(shí)間之前的值,代碼如下:
@EnableScheduling
@Component
public class TestTask {
@Autowired
private RedisClient redisClient;
@Scheduled(cron = "* * * * * ?")
public void popSet(){
Set<String> zqueueTest = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis());
//只取一條數(shù)據(jù)
//Set<String> zqueueTest1 = redisClient.zrangeWithScores("zqueueTest1", 0, System.currentTimeMillis(),0,1);
for (String item : zqueueTest){
System.out.println(JSON.toJSONString(item));
redisClient.zrem("zqueueTest1",item);
}
}
}
這樣我們就可以實(shí)現(xiàn)zset里面是按照時(shí)間的大小進(jìn)行排序的,我們只需要取出當(dāng)前時(shí)間以前的數(shù)據(jù),就可以實(shí)現(xiàn)延遲的效果。
測(cè)試如下:


通過(guò)上圖的時(shí)間可以看出來(lái)我們基本上試下了五秒的延遲
缺點(diǎn):
1、因?yàn)槭桥芘x取所以存在一定的時(shí)間延遲
2、對(duì)于并發(fā)的支持性不強(qiáng)
總結(jié): 上面就是redis實(shí)現(xiàn)隊(duì)列的一些demo,這里寫出來(lái)只是給大家提供一些思路,但是大家也可以看出來(lái)這種實(shí)現(xiàn)方式還是存在一定的問(wèn)題,所以真正用的時(shí)候還是使用專業(yè)的mq中間件,后續(xù)也會(huì)有mq的系列文章大家也可以參考借鑒下。