使用redis實(shí)現(xiàn)一個(gè)簡單的延時(shí)消息隊(duì)列

延時(shí)消息隊(duì)列可以使用redis的zset來實(shí)現(xiàn),將消息序列化為一個(gè)字符串作為zset的value,消息到期時(shí)間作為zset的score,然后使用多線程輪循zset獲取到期的任務(wù)進(jìn)行處理,多線程是為了保證可用性,一個(gè)線程掛掉之后其他線程可以繼續(xù)處理。因?yàn)橛卸嗑€程,所有需要考慮并發(fā)搶任務(wù),確保任務(wù)不會被多次執(zhí)行。

Java版本代碼示例:

public class RedisDelayingQueue {

    private RedisTemplate redisTemplate;
    private String queueKey; //zset鍵

    public RedisDelayingQueue(RedisTemplate redisTemplate, String queueKey) {
        this.redisTemplate = redisTemplate;
        this.queueKey = queueKey;
    }

    //存數(shù)據(jù)方法
    public void delay(String msg){
        RedisTaskItem item = new RedisTaskItem();
        item.id = UUIDUtil.getUUID();
        item.msg = msg;
        String s = JSONObject.toJSONString(item);
        redisTemplate.opsForZSet().add(queueKey,s,Double.valueOf(System.currentTimeMillis() + 5000));
    }
    //取數(shù)據(jù)方法
    public void loop(){
        while (!Thread.interrupted()){
            //每次只取一條
            Set<Object> values = redisTemplate.opsForZSet().rangeByScore(queueKey,0L,System.currentTimeMillis(),0L,1L);
            if(values.isEmpty()){
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    break;
                }
                continue;
            }
            String s = values.iterator().next().toString();
            //loop方法可能會被多個(gè)線程調(diào)用,所以要通過remove來決定唯一的屬主
            if(redisTemplate.opsForZSet().remove(queueKey,s)>0){
                //搶到了 反序列化
                RedisTaskItem taskItem = JSONObject.parseObject(s,RedisTaskItem.class);
                System.out.println(taskItem.msg);
            }
        }
    }
}
//redis數(shù)據(jù)實(shí)體類
public class RedisTaskItem {

    public String id;
    public String msg;
}

測試方法

    @Resource(name = "redisTemplate0")
    private RedisTemplate<String, Object> redisTemplate;//使用redisTemplate操作redis
    @Test
    public void redisTest(){
        RedisDelayingQueue queue = new RedisDelayingQueue(redisTemplate,"q-demo");
        Thread product = new Thread(){
            @Override
            public void run() {
                for (int i = 0; i < 10; i++) {
                    queue.delay("choose" + i);
                }
            }
        };
        Thread consumer = new Thread(){
            @Override
            public void run() {
                queue.loop();
            }
        };

        System.out.println("開始時(shí)間為" + new Date());

        product.start();
        consumer.start();
        try {
            product.join();
            Thread.sleep(5100);
            consumer.interrupt();
            consumer.join();
            System.out.println("結(jié)束時(shí)間為" + new Date());
        }catch (InterruptedException e){
            System.out.println("error>>>>>>>>>>>>>>>>>>>>>>>>");
        }

    }

這種方式簡單的實(shí)現(xiàn)了一個(gè)延時(shí)消息隊(duì)列,但是同一個(gè)任務(wù)可能會被多個(gè)線程使用remove爭搶,再決定歸屬,造成了資源浪費(fèi),可以考慮使用lua scripting來進(jìn)行優(yōu)化,將rangeByScore和remove一同挪到服務(wù)器端進(jìn)行原子化操作,這樣多個(gè)進(jìn)程爭搶任務(wù)是不會造成這種浪費(fèi)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容