繼上一章節(jié)基于Redisson實現(xiàn)延遲隊列
我們實現(xiàn)了延遲隊列的基本功能,但是存在一個問題就是對于延遲隊列,我們能不能有一個監(jiān)聽機制監(jiān)聽消息過期的流程呢?也就是說一旦到期就自動監(jiān)聽并觸發(fā)呢。這時我粗略的想法是采用線程去監(jiān)聽隊列的消息。在項目啟動時我們就初始化并用線程池去循環(huán)監(jiān)聽隊列中的消息,話不多說,按照思路開干。稍微封裝下
定義一個接口對于過期消息的監(jiān)聽,一旦到期觸發(fā)onExpired。對于以后我們消費者只要實現(xiàn)這接口即可實現(xiàn)消息的過期監(jiān)聽。后面會詳細講到
@FunctionalInterface
public interface RedisDelayedQueueListener<T> {
/**
* 對于過期消息的監(jiān)聽
*
* @param t 泛型
*/
void onExpired(T t);
}
這時我們再自定義一個注解用于初始化并監(jiān)聽隊列中的消息
其中的name就是隊列的名稱
@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisDelayedQueue {
/**
* 隊列名稱
*
* @return string
*/
String name();
}
定義一個隊列消費任務DelayPollTask,完成隊列的消費邏輯。這里參考了redis Strem源碼
@Slf4j
public class DelayPollTask<T> implements Runnable{
private final RedisDelayedQueueListener<T> listener;
private final RedissonClient redissonClient;
private final String name;
public DelayPollTask(RedisDelayedQueueListener<T> listener, RedissonClient redissonClient, String name) {
this.listener = listener;
this.redissonClient = redissonClient;
this.name = name;
}
@Override
public void run() {
loop();
}
@SuppressWarnings({"all"})
private void loop(){
while (true) {
try {
Thread.sleep(0);
System.out.println(name);
RBlockingQueue<T> retryQueue = redissonClient.getBlockingQueue(name);
// 訂閱redis隊列 解決項目重新啟動并不會消費之前隊列里的消息的問題
redissonClient.getDelayedQueue(retryQueue);
T t = retryQueue.take();
log.info("poll--->" + t);
listener.onExpired(t);
} catch (Exception e) {
log.error("DelayPollTask error", e);
}
}
}
}
定義一個隊列初始化器DelayQueueInitializer初始化并開啟線程監(jiān)聽隊列中的消息
@Slf4j
@Component
public class DelayQueueInitializer implements InitializingBean {
@Autowired
private RedissonClient redissonClient;
/**
* 線程池來監(jiān)聽隊列的消息 線程的配置以實際項目為主
*/
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 30,
10, TimeUnit.SECONDS,
new ArrayBlockingQueue<>(1000),
new NamedThreadFactory("delayQueueJob", false));
/**
* 任務項,用于收集注冊進來的線程任務
*/
private final List<Runnable> tasks = new ArrayList<>();
@Override
public void afterPropertiesSet() {
// 初始化邏輯
init();
}
@SuppressWarnings({"unchecked", "rawtypes"})
private void init() {
Collection<RedisDelayedQueueListener> listeners = SpringUtil.getBeansOfType(RedisDelayedQueueListener.class).values();
for (RedisDelayedQueueListener listener : listeners) {
RedisDelayedQueue annotation = listener.getClass().getAnnotation(RedisDelayedQueue.class);
if (annotation != null) {
register(annotation.name(), listener);
}
}
log.info("成功注冊" + tasks.size() + "條隊列");
// 開啟監(jiān)聽
start();
}
/**
* 注冊監(jiān)聽器
*
* @param name 隊列的名稱
* @param listener 監(jiān)聽器
* @param <T> 泛型
*/
private <T> void register(String name, RedisDelayedQueueListener<T> listener) {
tasks.add(new DelayPollTask<>(listener, redissonClient, name));
}
/**
* 開啟對隊列的監(jiān)聽
*/
private void start() {
tasks.forEach(executor::execute);
log.info("成功開啟" + tasks.size() + "條隊列");
}
}
接下來我們測試下
定義一個訂單類OrderDTO
@NoArgsConstructor
@AllArgsConstructor
@Data
public class OrderDTO implements Serializable {
private String orderNo;
private String orderName;
@Override
public boolean equals(Object object) {
if (this == object) return true;
if (!(object instanceof OrderDTO)) return false;
OrderDTO orderDTO = (OrderDTO) object;
return Objects.equals(getOrderNo(), orderDTO.getOrderNo()) && Objects.equals(getOrderName(), orderDTO.getOrderName());
}
@Override
public int hashCode() {
return Objects.hash(getOrderNo(), getOrderName());
}
}
重點來了,只要實現(xiàn)RedisDelayedQueueListener接口即可實現(xiàn)對于訂單過期的監(jiān)聽
@RedisDelayedQueue標記隊列的名稱,用于消費之前隊列中的消息。以后對于擴展也方便
@RedisDelayedQueue(name = "Queue-1")
@Component
public class OrderListener implements RedisDelayedQueueListener<OrderDTO> {
@Override
public void onExpired(OrderDTO orderDTO) {
System.out.println("訂單編號" + orderDTO.getOrderNo());
System.out.println("訂單名稱" + orderDTO.getOrderName());
// 業(yè)務流程...
}
}
測試:
稍微封裝下隊列的操作
@Component
public class RedissonDelayQueue {
@Autowired
private RedissonClient redissonClient;
/**
* 放入延遲隊列
*
* @param data 數(shù)據(jù)
* @param delay 延遲時間
* @param timeUnit 時間單位
* @param queueName 隊列名稱
*/
public <T> void addQueue(T data, long delay, TimeUnit timeUnit, String queueName) {
// redisson 延遲隊列
RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.offer(data, delay, timeUnit);
}
public void delete(Object data, String queueName) {
RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue(queueName);
RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
delayedQueue.remove(data);
}
}
測試:
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderNo("123");
orderDTO.setOrderName("xxx");
// Queue-1 必須和@RedisDelayedQueue(name = "Queue-1") 中的name一直
redissonDelayQueue.addQueue(orderDTO, 3, TimeUnit.SECONDS, "Queue-1");
這時上面定義的OrderListener就能消費隊列中的消息了,注意OrderListener在監(jiān)聽時一定要處理好異常,否則一旦報錯可能后續(xù)監(jiān)聽將失效了
希望大佬們能發(fā)現(xiàn)其中的不足,提出寶貴的意見~