需求是想用redis做一個(gè)延時(shí)的隊(duì)列,每次內(nèi)容必須在一定時(shí)間后才能被取出,
比如說:有未支付訂單要在一定時(shí)間內(nèi)關(guān)閉,假設(shè)為30秒,存入的時(shí)候我們使用redis的有序集合進(jìn)行添加,用當(dāng)前時(shí)間戳加上30秒來排序(zadd),然后每次消費(fèi)者輪詢的時(shí)候就只取出開始時(shí)間0到當(dāng)前時(shí)間這個(gè)時(shí)間段(zrangeByScore)
1.生產(chǎn)類 Producer.java
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import redis.clients.jedis.Transaction;
public class Producer {
static final String QueueName = "delay-queue";
public static void main(String[] args)throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
int count = 0;
while (true) {
String message = "Message #" + count;
String key = "foobar:" + count;
System.out.println("Queueing message: " + message);
queueMessage(jedis, QueueName, key, message, 5);
// delete every 5th Action
if (count != 0 && count % 5 == 0) {
System.out.println("Deleting msg with id " + count);
jedis.del(key);
}
count += 1;
Thread.sleep(3000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void queueMessage(Jedis jedis, String queue, String key, String message, Integer delay) {
long time = System.currentTimeMillis() / 1000 + delay;//當(dāng)前時(shí)間的秒數(shù)加上要延時(shí)的秒數(shù)
Transaction t = jedis.multi();
t.zadd(queue, time, key);
t.set(key, message);
t.exec();
}
}
2.消費(fèi)者類 Consumer.java
代碼如下:
public class Consumer {
public static void main(String[] args) throws InterruptedException {
JedisPool pool = new JedisPool(new JedisPoolConfig(), "localhost", 6379, 2000, "123456", 1);
Jedis jedis = pool.getResource();
try {
while (true) {
getMessages(jedis, Producer.QueueName);
Thread.sleep(1000L);
}
} finally {
jedis.close();
pool.destroy();
}
}
private static void getMessages(Jedis jedis, String queue) {
int startTime = 0;
long endTime = System.currentTimeMillis() / 1000;
Transaction t = jedis.multi();
Response<Set<String>> setResponse = t.zrangeByScore(queue, startTime, endTime);//在startTime和endTime之間的數(shù)
t.zremrangeByScore(queue, startTime, endTime);//移除所有startTime-endTime中的所有成員
t.exec();
List<String> keys = new ArrayList();
keys.addAll(setResponse.get());//將所有的key添加到list中
String[] keyArray = keys.toArray(new String[keys.size()]);//然后轉(zhuǎn)換成數(shù)組
if (keyArray.length > 0) {
Transaction tMessage = jedis.multi();
Response<List<String>> messageResponse = tMessage.mget(keyArray);//獲取多個(gè)鍵值對(duì)
tMessage.del(keyArray);
tMessage.exec();
List<String> messages = messageResponse.get();
for (int i = 0; i < messages.size(); i++) {
String key = keys.get(i);
String message = messages.get(i);
System.out.print("Received key: " + key + ". ");
if (message == null) {
System.out.println("Message for key " + key + " is gone!");
} else {
System.out.println("Message for key " + key + " is " + message);
}
}
}
}
}