redis實(shí)現(xiàn)延時(shí)隊(duì)列

需求是想用redis做一個(gè)延時(shí)的隊(duì)列,每次內(nèi)容必須在一定時(shí)間后才能被取出,

比如說:有未支付訂單要在一定時(shí)間內(nèi)關(guān)閉,假設(shè)為30秒,存入的時(shí)候我們使用redis的有序集合進(jìn)行添加,用當(dāng)前時(shí)間戳加上30秒來排序(zadd),然后每次消費(fèi)者輪詢的時(shí)候就只取出開始時(shí)間0到當(dāng)前時(shí)間這個(gè)時(shí)間段(zrangeByScore)

1.生產(chǎn)類 Producer.java

  import redis.clients.jedis.Jedis;

  import redis.clients.jedis.JedisPool;

  import redis.clients.jedis.JedisPoolConfig;

  import redis.clients.jedis.Transaction;

  public class Producer {

    static final String QueueName = "delay-queue";

    public static void main(String[] args)throws InterruptedException {

        JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);

        Jedis jedis = pool.getResource();

        try {

            int count = 0;

            while (true) {

                String message = "Message #" + count;

                String key = "foobar:" + count;

                System.out.println("Queueing message: " + message);

                queueMessage(jedis, QueueName, key, message, 5);

               // delete every 5th Action

                if (count != 0 && count % 5 == 0) {

                    System.out.println("Deleting msg with id " + count);

                    jedis.del(key);

                }

                count += 1;

               Thread.sleep(3000L);

            }

        } finally {

            jedis.close();

            pool.destroy();

        }

    }

    private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {

        long time = System.currentTimeMillis() / 1000 + delay;//當(dāng)前時(shí)間的秒數(shù)加上要延時(shí)的秒數(shù)

        Transaction t = jedis.multi();
        t.zadd(queue, time, key);
        t.set(key, message);
        t.exec();

}

}

2.消費(fèi)者類 Consumer.java

代碼如下:

public class Consumer {

public static void main(String[] args) throws InterruptedException {

    JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);

    Jedis jedis = pool.getResource();

    try {

        while (true) {

            getMessages(jedis, Producer.QueueName);

            Thread.sleep(1000L);

        }

    } finally {

        jedis.close();

        pool.destroy();

    }

}

private static void getMessages(Jedis jedis, String queue) {

    int startTime = 0;

    long endTime = System.currentTimeMillis() / 1000;

    Transaction t = jedis.multi();


    Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之間的數(shù)

    t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成員

    t.exec();

    List<String> keys = new ArrayList();

    keys.addAll(setResponse.get());//將所有的key添加到list中

    String[] keyArray = keys.toArray(new String[keys.size()]);//然后轉(zhuǎn)換成數(shù)組

    if (keyArray.length > 0) {

        Transaction tMessage = jedis.multi();

        Response<List<String>>  messageResponse = tMessage.mget(keyArray);//獲取多個(gè)鍵值對(duì)

        tMessage.del(keyArray);

        tMessage.exec();

        List<String> messages = messageResponse.get();

        for (int i = 0; i < messages.size(); i++) {

            String key = keys.get(i);

            String message = messages.get(i);

            System.out.print("Received key: " + key + ". ");

            if (message == null) {

                System.out.println("Message for key " + key + " is gone!");

            } else {

                System.out.println("Message for key " + key + " is " + message);

            }

        }

    }

}

}

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容