使用redis的隊列實現(xiàn)接口排隊
在調(diào)用接口時將線程號(多實例的情況下得用uuid,線程號可能會重復(fù))存入redis隊列,查詢隊首線程號(uuid)如果是當前線程,則執(zhí)行邏輯、出隊,否則等待。
-
使用到的redis命令有:
- RPUSH key value1 [value2] 入隊
- LPOP key 出隊
- LRANGE key start stop 查詢隊首元素
增加一個注解
@QueuingPoll來標記要排隊的接口
/**
* @author Jenson
*/
public @interface QueuingPoll {
}
- controller 需要排隊隊接口
加了個sleep延長接口返回的時間以便看效果
@GetMapping("/slowly")
@QueuingPoll
public String slowlyInterface(@RequestParam Integer tenantId,
@RequestParam Integer time) throws InterruptedException {
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
System.out.println(threadName + " 開始休眠: " + time);
Thread.sleep(time);
System.out.println(threadName + " 休眠結(jié)束: " + time);
return threadName;
}
- 使用切面來攔截接口,實現(xiàn)接口排隊
/**
* @author Jenson
*/
@Aspect
@Component
public class QueuingPollAspect {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Around(value = "@annotation(com.jenson.annotation.QueuingPoll)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String tenantId = "null";
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
tenantId = request.getParameter("tenantId");
}
// Thread currentThread = Thread.currentThread();
// 線程 ID 是唯一的,并且在其生命周期內(nèi)保持不變。 當一個線程終止時,這個線程 ID 可能會被重用。
// String threadId = String.valueOf(currentThread.getId());
// 在多實例多情況下線程ID可能會導(dǎo)致重復(fù),所以使用UUID
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
// 相同的租戶放入同一個redis隊列里,實現(xiàn)同租戶串行不同的租戶并行
String key = "jenson:list-thread:" + tenantId;
redisTemplate.opsForList().rightPush(key, uuid);
boolean waitFlag = Boolean.TRUE;
while (waitFlag) {
waitFlag = Boolean.FALSE;
// System.out.println(threadName + "輪詢查看是否輪到自己");
List<String> top = redisTemplate.opsForList().range(key, 0, 0);
if (top != null && top.size() > 0) {
// redis 里有數(shù)據(jù)
if (!uuid.equals(top.get(0))) {
// 隊列頂部不是該接口,線程等待
waitFlag = Boolean.TRUE;
}
}
if (waitFlag) {
// 根據(jù)接口執(zhí)行平均時長來適度調(diào)整休眠時間,休眠時會讓出cpu給其他的線程
Thread.sleep(100);
}
}
Object result = proceedingJoinPoint.proceed();
// 執(zhí)行結(jié)束,推出隊列頂端元素
redisTemplate.opsForList().leftPop(key);
return result;
}
@AfterThrowing(value = "@annotation(com.jenson.annotation.QueuingPoll)", throwing = "e")
public void throwingAdvice(JoinPoint joinPoint, Exception e) {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
String tenantId = "null";
if (attributes != null) {
HttpServletRequest request = attributes.getRequest();
tenantId = request.getParameter("tenantId");
}
String key = "jenson:list-thread:" + tenantId;
// 拋出錯誤時也要推出隊列頂端元素,否則后面的接口就堵死了
redisTemplate.opsForList().leftPop(key);
}
}
- 測試,先調(diào)用接口延時10s再調(diào)用接口演示100ms
http-nio-8010-exec-1 開始休眠: 10000
http-nio-8010-exec-1 休眠結(jié)束: 10000
http-nio-8010-exec-4 開始休眠: 100
http-nio-8010-exec-4 休眠結(jié)束: 100
后調(diào)用的接口后執(zhí)行了,如果不加該注解的情況如下,執(zhí)行快的先執(zhí)行完:
http-nio-8010-exec-2 開始休眠: 10000
http-nio-8010-exec-4 開始休眠: 100
http-nio-8010-exec-4 休眠結(jié)束: 100
http-nio-8010-exec-2 休眠結(jié)束: 10000
- 但是這種辦法有很大風(fēng)險,在服務(wù)重啟或停機時,如果redis隊列中有數(shù)據(jù),會導(dǎo)致服務(wù)重啟后接口一直等待,所以在啟動項目時將隊列清空。
/**
* @author Jenson
*/
@Configuration
public class InitInterfaceQueueConfig {
@Autowired
private RedisTemplate<String, String> redisTemplate;
/**
* 在啟動項目時清空redis隊列
* <p>
* PostConstruct注解的方法將會在依賴注入完成后被自動調(diào)用。
* PostConstruct是spring框架的注解,在方法上加該注解會在項目啟動的時候執(zhí)行該方法,也可以理解為在spring容器初始化的時候執(zhí)行該方法。
*/
@PostConstruct
public void init() {
Set<String> keys = redisTemplate.keys("jenson:list-thread:" + "*");
System.out.println(keys);
if (keys != null && keys.size() > 0) {
redisTemplate.delete(keys);
System.out.println("刪除redis命名空間 jenson:list-thread 成功...");
}
}
}
簡化問題
假如不考慮多租戶和多實例的情況,就有很多種實現(xiàn)方式
使用公平鎖
/**
* @author Jenson
*/
@Aspect
@Component
public class Queuing1Aspect {
/**
* 重入鎖,公平鎖
*/
public static ReentrantLock lock = new ReentrantLock(true);
@Around(value = "@annotation(com.jenson.annotation.Queuing1)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
lock.lock();
try {
Object result = proceedingJoinPoint.proceed();
return result;
} catch (Exception e) {
throw e;
}
finally {
lock.unlock();
System.out.println("-------------unlock-----------");
}
}
}
使用并發(fā)隊列
- ConcurrentLinkedQueue
使用到的方法有offer,poll和peek
ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();
// add 函數(shù)內(nèi)部調(diào)用的offer,和offer等價
concurrentLinkedQueue.add("test1");
concurrentLinkedQueue.offer("test2");
System.out.println(concurrentLinkedQueue.poll());
System.out.println(concurrentLinkedQueue.poll());
// 如果隊列中沒有元素,會返回null
System.out.println(concurrentLinkedQueue.poll());
System.out.println("-----------------------------------------------");
concurrentLinkedQueue.add("test1");
concurrentLinkedQueue.offer("test2");
// peek只查看隊首元素,但不移除隊首元素
System.out.println(concurrentLinkedQueue.peek());
System.out.println(concurrentLinkedQueue.poll());
System.out.println(concurrentLinkedQueue.poll());
// peek 隊列為空返回null
System.out.println(concurrentLinkedQueue.peek());
輸出如下
test1
test2
null
-----------------------------------------------
test1
test1
test2
null
實現(xiàn)如下:
/**
* @author Jenson
*/
@Aspect
@Component
public class Queuing2Aspect {
public static final ConcurrentLinkedQueue<String> concurrentLinkedQueue = new ConcurrentLinkedQueue<String>();
@Around(value = "@annotation(com.jenson.annotation.Queuing2)")
public Object translateReturning(ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
Thread currentThread = Thread.currentThread();
String threadId = String.valueOf(currentThread.getId());
concurrentLinkedQueue.offer(threadId);
boolean waitFlag = Boolean.TRUE;
while (waitFlag) {
waitFlag = Boolean.FALSE;
String top = concurrentLinkedQueue.peek();
if (top != null ) {
// 隊列中有數(shù)據(jù)
if (!threadId.equals(top)) {
// 隊列頂部不是該接口,線程等待
waitFlag = Boolean.TRUE;
}
}
if (waitFlag) {
// 根據(jù)接口執(zhí)行平均時長來適度調(diào)整休眠時間,休眠時會讓出cpu給其他的線程
Thread.sleep(100);
}
}
Object result = proceedingJoinPoint.proceed();
// 執(zhí)行結(jié)束,推出隊列頂端元素
concurrentLinkedQueue.poll();
return result;
}
@AfterThrowing(value = "@annotation(com.jenson.annotation.Queuing2)", throwing = "e")
public void throwingAdvice(JoinPoint joinPoint, Exception e) {
// 拋出錯誤時也要推出隊列頂端元素,否則后面的接口就堵死了
concurrentLinkedQueue.poll();
}
}
- BlockingQueue
使用到LinkedBlockingQueue這個實現(xiàn)類,使用到的方法有offer,poll和peek
// BlockingQueue 是個接口,實現(xiàn)中有 ArrayBlockingQueue 和 LinkedBlockingQueue
// ArrayBlockingQueue是基于數(shù)組實現(xiàn)但有界隊列
// LinkedBlockingQueue 是鏈表實現(xiàn)的,可做無界隊列
LinkedBlockingQueue<String> linkedBlockingQueue = new LinkedBlockingQueue<String>();
// 入隊
try {
// 隊列已滿時會阻塞
linkedBlockingQueue.put("test1");
} catch (InterruptedException e) {
e.printStackTrace();
}
// 如果可以在不超過隊列容量的情況下立即在此隊列的尾部插入指定的元素,則在成功時返回true ,如果隊列已滿則返回false
linkedBlockingQueue.offer("test2");
// 出隊
try {
// 出隊,隊列為空會阻塞
System.out.println(linkedBlockingQueue.take());
} catch (InterruptedException e) {
e.printStackTrace();
}
// peek只查看隊首元素,但不移除隊首元素
System.out.println(linkedBlockingQueue.peek());
// 出隊,隊列為空返回null
System.out.println(linkedBlockingQueue.poll());
// peek 隊列為空返回null
System.out.println(linkedBlockingQueue.peek());
輸出如下:
test1
test2
test2
null
實現(xiàn)方式和使用ConcurrentLinkedQueue方式一樣,BlockingQueue關(guān)鍵在于take方法,當隊列為空時,take會阻塞,適合實現(xiàn)生產(chǎn)者消費者模式。
看一下take的源碼,當隊列為空,調(diào)用的是await,當隊列不為空時,出隊,再判斷隊列還不為空,喚醒其他take的線程

圖片.png
再看一下offer的源碼,當隊列不為空,喚醒take的線程

圖片.png

圖片.png