Redisson延遲隊列的封裝

繼上一章節(jié)基于Redisson實現(xiàn)延遲隊列

我們實現(xiàn)了延遲隊列的基本功能,但是存在一個問題就是對于延遲隊列,我們能不能有一個監(jiān)聽機制監(jiān)聽消息過期的流程呢?也就是說一旦到期就自動監(jiān)聽并觸發(fā)呢。這時我粗略的想法是采用線程去監(jiān)聽隊列的消息。在項目啟動時我們就初始化并用線程池去循環(huán)監(jiān)聽隊列中的消息,話不多說,按照思路開干。稍微封裝下

定義一個接口對于過期消息的監(jiān)聽,一旦到期觸發(fā)onExpired。對于以后我們消費者只要實現(xiàn)這接口即可實現(xiàn)消息的過期監(jiān)聽。后面會詳細講到

@FunctionalInterface
public interface RedisDelayedQueueListener<T> {
    /**
     * 對于過期消息的監(jiān)聽
     *
     * @param t 泛型
     */
    void onExpired(T t);
}

這時我們再自定義一個注解用于初始化并監(jiān)聽隊列中的消息

其中的name就是隊列的名稱

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface RedisDelayedQueue {
    /**
     * 隊列名稱
     *
     * @return string
     */
    String name();
}

定義一個隊列消費任務DelayPollTask,完成隊列的消費邏輯。這里參考了redis Strem源碼

@Slf4j
public class DelayPollTask<T> implements Runnable{
    private final RedisDelayedQueueListener<T> listener;
    private final RedissonClient redissonClient;
    private final String name;

    public DelayPollTask(RedisDelayedQueueListener<T> listener, RedissonClient redissonClient, String name) {
        this.listener = listener;
        this.redissonClient = redissonClient;
        this.name = name;
    }

    @Override
    public void run() {
        loop();
    }

    @SuppressWarnings({"all"})
    private void loop(){
        while (true) {
            try {
                Thread.sleep(0);
                System.out.println(name);
                RBlockingQueue<T> retryQueue = redissonClient.getBlockingQueue(name);
                // 訂閱redis隊列 解決項目重新啟動并不會消費之前隊列里的消息的問題
                redissonClient.getDelayedQueue(retryQueue);
                T t = retryQueue.take();
                log.info("poll--->" + t);
                listener.onExpired(t);
            } catch (Exception e) {
                log.error("DelayPollTask error", e);
            }
        }
    }

}

定義一個隊列初始化器DelayQueueInitializer初始化并開啟線程監(jiān)聽隊列中的消息

@Slf4j
@Component
public class DelayQueueInitializer implements InitializingBean {
    @Autowired
    private RedissonClient redissonClient;
    /**
     * 線程池來監(jiān)聽隊列的消息 線程的配置以實際項目為主
     */
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 30,
            10, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new NamedThreadFactory("delayQueueJob", false));
    /**
     * 任務項,用于收集注冊進來的線程任務
     */
    private final List<Runnable> tasks = new ArrayList<>();

    @Override
    public void afterPropertiesSet() {
        // 初始化邏輯
        init();
    }

    @SuppressWarnings({"unchecked", "rawtypes"})
    private void init() {
        Collection<RedisDelayedQueueListener> listeners = SpringUtil.getBeansOfType(RedisDelayedQueueListener.class).values();
        for (RedisDelayedQueueListener listener : listeners) {
            RedisDelayedQueue annotation = listener.getClass().getAnnotation(RedisDelayedQueue.class);
            if (annotation != null) {
                register(annotation.name(), listener);
            }
        }
        log.info("成功注冊" + tasks.size() + "條隊列");
        // 開啟監(jiān)聽
        start();
    }

    /**
     * 注冊監(jiān)聽器
     *
     * @param name     隊列的名稱
     * @param listener 監(jiān)聽器
     * @param <T>      泛型
     */
    private <T> void register(String name, RedisDelayedQueueListener<T> listener) {
        tasks.add(new DelayPollTask<>(listener, redissonClient, name));
    }

    /**
     * 開啟對隊列的監(jiān)聽
     */
    private void start() {
        tasks.forEach(executor::execute);
        log.info("成功開啟" + tasks.size() + "條隊列");
    }
}

接下來我們測試下

定義一個訂單類OrderDTO

@NoArgsConstructor
@AllArgsConstructor
@Data
public class OrderDTO implements Serializable {
    private String orderNo;

    private String orderName;

    @Override
    public boolean equals(Object object) {
        if (this == object) return true;
        if (!(object instanceof OrderDTO)) return false;
        OrderDTO orderDTO = (OrderDTO) object;
        return Objects.equals(getOrderNo(), orderDTO.getOrderNo()) && Objects.equals(getOrderName(), orderDTO.getOrderName());
    }

    @Override
    public int hashCode() {
        return Objects.hash(getOrderNo(), getOrderName());
    }
}

重點來了,只要實現(xiàn)RedisDelayedQueueListener接口即可實現(xiàn)對于訂單過期的監(jiān)聽

@RedisDelayedQueue標記隊列的名稱,用于消費之前隊列中的消息。以后對于擴展也方便

@RedisDelayedQueue(name = "Queue-1")
@Component
public class OrderListener implements RedisDelayedQueueListener<OrderDTO> {
    @Override
    public void onExpired(OrderDTO orderDTO) {
        System.out.println("訂單編號"  + orderDTO.getOrderNo());
        System.out.println("訂單名稱"  + orderDTO.getOrderName());
        // 業(yè)務流程...
    }
}

測試:

稍微封裝下隊列的操作

@Component
public class RedissonDelayQueue {
    @Autowired
    private RedissonClient redissonClient;

    /**
     * 放入延遲隊列
     *
     * @param data      數(shù)據(jù)
     * @param delay     延遲時間
     * @param timeUnit  時間單位
     * @param queueName 隊列名稱
     */
    public <T> void addQueue(T data, long delay, TimeUnit timeUnit, String queueName) {
        // redisson  延遲隊列
        RBlockingQueue<T> blockingQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<T> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        delayedQueue.offer(data, delay, timeUnit);
    }


    public void delete(Object data, String queueName) {
        RBlockingQueue<Object> blockingQueue = redissonClient.getBlockingQueue(queueName);
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(blockingQueue);
        delayedQueue.remove(data);
    }
}

測試:

OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderNo("123");
orderDTO.setOrderName("xxx");
// Queue-1 必須和@RedisDelayedQueue(name = "Queue-1") 中的name一直
redissonDelayQueue.addQueue(orderDTO, 3, TimeUnit.SECONDS, "Queue-1");

這時上面定義的OrderListener就能消費隊列中的消息了,注意OrderListener在監(jiān)聽時一定要處理好異常,否則一旦報錯可能后續(xù)監(jiān)聽將失效了

希望大佬們能發(fā)現(xiàn)其中的不足,提出寶貴的意見~

?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容