分享基于Redis實現(xiàn)DelayQueue延遲隊列設(shè)計方案

應(yīng)用場景

創(chuàng)建訂單10分鐘之后自動支付
訂單超時取消
…等等…

實現(xiàn)方式

最簡單的方式,定時掃表;例如每分鐘掃表一次十分鐘之后未支付的訂單進行主動支付 ;
優(yōu)點: 簡單
缺點: 每分鐘全局掃表,浪費資源,有一分鐘延遲

使用RabbitMq 實現(xiàn) RabbitMq實現(xiàn)延遲隊列
優(yōu)點: 開源,現(xiàn)成的穩(wěn)定的實現(xiàn)方案;
缺點: RabbitMq是一個消息中間件;延遲隊列只是其中一個小功能,如果團隊技術(shù)棧中本來就是使用RabbitMq那還好,如果不是,那為了使用延遲隊列而去部署一套RabbitMq成本有點大;

使用Java中的延遲隊列,DelayQueue
優(yōu)點: java.util.concurrent包下一個延遲隊列,簡單易用;拿來即用
缺點: 單機、不能持久化、宕機任務(wù)丟失等等;

基于Redis自研延遲隊列
既然上面沒有很好的解決方案,因為Redis的zset、list的特性,我們可以利用Redis來實現(xiàn)一個延遲隊列 RedisDelayQueue

設(shè)計目標
實時性: 允許存在一定時間內(nèi)的秒級誤差
高可用性:支持單機,支持集群
支持消息刪除:業(yè)務(wù)費隨時刪除指定消息
消息可靠性: 保證至少被消費一次
消息持久化: 基于Redis自身的持久化特性,上面的消息可靠性基于Redis的持久化,所以如果redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證;
數(shù)據(jù)結(jié)構(gòu)
Redis_Delay_Table: 是一個Hash_Table結(jié)構(gòu);里面存儲了所有的延遲隊列的信息;KV結(jié)構(gòu);K=TOPIC:ID V=CONENT; V由客戶端傳入的數(shù)據(jù),消費的時候回傳;
RD_ZSET_BUCKET: 延遲隊列的有序集合; 存放member=TOPIC:ID 和score=執(zhí)行時間戳; 根據(jù)時間戳排序;
RD_LIST_TOPIC: list結(jié)構(gòu); 每個Topic一個list;list存放的都是當前需要被消費的延遲Job;
設(shè)計圖


任務(wù)的生命周期
新增一個Job,會在Redis_Delay_Table中插入一條數(shù)據(jù),記錄了業(yè)務(wù)消費方的 數(shù)據(jù)結(jié)構(gòu); RD_ZSET_BUCKET 也會插入一條數(shù)據(jù),記錄了執(zhí)行時間戳;
搬運線程會去RD_ZSET_BUCKET中查找哪些執(zhí)行時間戳runTimeMillis比現(xiàn)在的時間小;將這些記錄全部刪除;同時會解析出來每個任務(wù)的Topic是什么,然后將這些任務(wù)push到Topic對應(yīng)的列表RD_LIST_TOPIC中;
每個Topic的List都會有一個監(jiān)聽線程去批量獲取List中的待消費數(shù)據(jù);獲取到的數(shù)據(jù)全部扔給這個Topic的消費線程池
消息線程池執(zhí)行會去Redis_Delay_Table查找數(shù)據(jù)結(jié)構(gòu),返回給回調(diào)接口,執(zhí)行回調(diào)方法;
以上所有操作,都是基于Lua腳本做的操作,Lua腳本執(zhí)行的優(yōu)點在于,批量命令執(zhí)行具有原子性,事務(wù)性, 并且降低了網(wǎng)絡(luò)開銷,畢竟只有一次網(wǎng)絡(luò)開銷;

搬運線程操作流程圖


設(shè)計細節(jié)
搬運操作
1.搬運操作的時機

為了避免頻繁的執(zhí)行搬運操作, 我們基于 wait(time)/notify 的方式來通知執(zhí)行搬運操作;


我們用一個AtomicLong nextTime 來保存下一次將要搬運的時間;服務(wù)啟動的時候nextTime=0;所以肯定比當前時間小,那么就會先去執(zhí)行一次搬運操作,然后返回搬運操作之后的ZSET的表頭時間戳,這個時間戳就是下一次將要執(zhí)行的時間戳, 把這個時間戳賦值給 nextTime; 如果表中沒有元素了則將nextTime=Long.MaxValue ;因為while循環(huán),下一次又會跟當前時間對比;如果nextTime比當前時間大,則說明需要等待; 那么我們wait(nextTime-System.currentTimeMills()); 等到時間到了之后,再次去判斷一下,就會比當前時間小,就會執(zhí)行一次搬運操作;

那么當有新增延遲任務(wù)Job的時間怎么辦,這個時候又會將當前新增Job的執(zhí)行時間戳跟nextTime做個對比;如果小的話就重新賦值;
重新賦值之后,還是調(diào)用一下 notifyAll() 通知一下搬運線程;讓他重新去判斷一下 新的時間是否比當前時間小;如果還是大的話,那么就繼續(xù)wait(nextTime-System.currentTimeMills()); 但是這個時候wait的時間又會變小;更精準;

2.一次搬運操作的最大數(shù)量
redis的執(zhí)行速度非???在一個Lua里面循環(huán)遍歷1000個10000個根本沒差; 而且是在Lua里面操作,就只有一次網(wǎng)絡(luò)開銷;一次操作多少個元素根本就不會是問題;

搬運操作的防護機制
1.每分鐘喚醒定時線程

在消費方多實例部署的情況下, 如果某一臺機器掛掉了,但是這臺機器的nextTime是最小的,就在一分鐘之后( 新增job的時候落到這臺機器,剛好時間戳很小), 其他機器可能是1個小時之后執(zhí)行搬運操作; 如果這臺機器立馬重啟,那么還會立馬執(zhí)行一次搬運操作;萬一他沒有重啟;那可能就會很久之后才會搬運;
所以我們需要一種防護手段來應(yīng)對這種極端情況;
比如每分鐘將nextTime=0;并且喚醒wait;
那么就會至少每分鐘會執(zhí)行一次搬運操作! 這是可以接受的

LrangeAndLTrim 批量獲取且刪除待消費任務(wù)
1.執(zhí)行時機以及如何防止頻繁請求redis
這是一個守護線程,循環(huán)去做這樣的操作,把拿到的數(shù)據(jù)給線程池去消費;
但是也不能一直不停的去執(zhí)行操作,如果list已經(jīng)沒有數(shù)據(jù)了去操作也沒有任何意義,不然就太浪費資源了,幸好List中有一個BLPOP阻塞原語,如果list中有數(shù)據(jù)就會立馬返回,如果沒有數(shù)據(jù)就會一直阻塞在那里,直到有數(shù)據(jù)返回,可以設(shè)置阻塞的超時時間,超時會返回NULL;
第一次去獲取N個待消費的任務(wù)扔進到消費線程池中;如果獲取到了0個,那么我們就立馬用BLPOP來阻塞,等有元素的時候 BLPOP就返回數(shù)據(jù)了,下次就可以嘗試去LrangeAndLTrim一次了. 通過BLPOP阻塞,我們避免了頻繁的去請求redis,并且更重要的是提高了實時性;

2.批量獲取的數(shù)量和消費線程池的阻塞隊列

執(zhí)行上面的一次獲取N個元素是不定的,這個要看線程池的maxPoolSize 最大線程數(shù)量; 因為避免消費的任務(wù)過多而放入線程池的阻塞隊列, 放入阻塞隊列有宕機丟失任務(wù)的風險,關(guān)機重啟的時候還要講阻塞隊列中的任務(wù)重新放入List中增加了復(fù)雜性;

所以我們每次LrangeAndLTrim獲取的元素不能大于當前線程池可用的線程數(shù); 這樣的一個控制可用用信號量Semaphore來做

Codis集群對BLPOP的影響

如果redis集群用了codis方案或者Twemproxy方案; 他們不支持BLPOP的命令;
codis不支持的命令集合
那么就不能利用BLPOP來防止頻繁請求redis;那么退而求其次改成每秒執(zhí)行一次LrangeAndLTrim操作;

集群對Lua的影響

Lua腳本的執(zhí)行只能在單機器上, 集群的環(huán)境下如果想要執(zhí)行Lua腳本不出錯,那么Lua腳本中的所有key必須落在同一臺機器;

為了支持集群操作Lua,我們利用hashtag; 用{}把三個jey的關(guān)鍵詞包起來;
{projectName}:Redis_Delay_Table
{projectName}:Redis_Delay_Table
{projectName}:RD_LIST_TOPIC
那么所有的數(shù)據(jù)就會在同一臺機器上了

重試機制

消費者回調(diào)接口如果拋出異常了,或者執(zhí)行超時了,那么會將這個Job重新放入到RD_LIST_TOPIC中等待被下一次消費;默認重試2次;可以設(shè)置不重試;

超時機制

超時機制的主要思路都一樣,就是監(jiān)聽一個線程的執(zhí)行時間超過設(shè)定值之后拋出異常打斷方法的執(zhí)行;

這是使用的方式是 利用Callable接口實現(xiàn)異步超時處理

public class TimeoutUtil {

    /**執(zhí)行用戶回調(diào)接口的 線程池;    計算回調(diào)接口的超時時間           **/
    private static ExecutorService executorService = Executors.newCachedThreadPool();

    /**
     * 有超時時間的方法
     * @param timeout 時間秒
     * @return
     */
    public static void timeoutMethod(long timeout, Function function) throws InterruptedException, ExecutionException, TimeoutException {
        FutureTask futureTask = new FutureTask(()->(function.apply("")));
        executorService.execute(futureTask);
        //new Thread(futureTask).start();
        try {
            futureTask.get(timeout, TimeUnit.MILLISECONDS);
        } catch (InterruptedException | ExecutionException | TimeoutException e) {
            //e.printStackTrace();
            futureTask.cancel(true);
            throw e;
        }
    }
}

這種方式有一點不好就是太費線程了,相當于線程使用翻了一倍;但是相比其他的方式,這種算是更好一點的

優(yōu)雅停機
在Jvm那里注冊一個 Runtime.getRuntime().addShutdownHook(Runnable)停機回調(diào)接口;在這里面做好善后工作;

關(guān)閉異步AddJob線程池
關(guān)閉每分鐘喚醒線程
關(guān)閉搬運線程 while(!stop)的形式
關(guān)閉所有的topic監(jiān)聽線程 while(!stop)的形式
關(guān)閉關(guān)閉所有topic的消費線程 ;先調(diào)用shutdown;再executor.awaitTermination(20, TimeUnit.SECONDS);檢查是否還有剩余的線程任務(wù)沒有執(zhí)行完; 如果還沒有執(zhí)行完則等待執(zhí)行完;最多等待20秒之后強制調(diào)用shutdownNow強制關(guān)閉;
關(guān)閉重試線程 while(!stop)的形式
關(guān)閉 異常未消費Job重入List線程池

優(yōu)雅停止線程一般是用下面的方式
①、 while(!stop)的形式 用標識位來停止線程
②.先調(diào)用executor.shutdown(); 阻止接受新的任務(wù);然后等待當前正在執(zhí)行的任務(wù)執(zhí)行完; 如果有阻塞則需要調(diào)用executor.shutdownNow()強制結(jié)束;所以要給一個等待時間;

  /**
     * shutdownNow 終止線程的方法是通過調(diào)用Thread.interrupt()方法來實現(xiàn)的
     * 如果線程中沒有sleep 、wait、Condition、定時鎖等應(yīng)用, interrupt()方法是無法中斷當前的線程的。
     * 上面的情況中斷之后還是可以再執(zhí)行finally里面的方法的;
     * 但是如果是其他的情況 finally是不會被執(zhí)行的
     * @param executor
     */
    public static void closeExecutor(ExecutorService executor, String executorName) {
        try {
            //新的任務(wù)不進隊列
            executor.shutdown();
            //給10秒鐘沒有停止完強行停止;
            if(!executor.awaitTermination(20, TimeUnit.SECONDS)) {
                logger.warn("線程池: {},{}沒有在20秒內(nèi)關(guān)閉,則進行強制關(guān)閉",executorName,executor);
                List<Runnable> droppedTasks = executor.shutdownNow();
                logger.warn("線程池: {},{} 被強行關(guān)閉,阻塞隊列中將有{}個將不會被執(zhí)行.", executorName,executor,droppedTasks.size() );
            }
            logger.info("線程池:{},{} 已經(jīng)關(guān)閉...",executorName,executor);
        }  catch (InterruptedException e) {
            logger.info("線程池:{},{} 打斷...",executorName,executor);
        }
    }

BLPOP阻塞的情況如何優(yōu)雅停止監(jiān)聽redis的線程

如果不是在codis集群的環(huán)境下,BLPOP是可以很方便的阻塞線程的;但是停機的時候可能會有點問題;

假如正在關(guān)機,當前線程正在BLPOP阻塞, 那關(guān)機線程等我們20秒執(zhí)行, 剛好在倒數(shù)1秒的時候BLPOP獲取到了數(shù)據(jù),丟給消費線程去消費;如果消費線程1秒執(zhí)行不完,那么20秒倒計時到了,強制關(guān)機,那么這個任務(wù)就會被丟失了; 怎么解決這個問題呢?

①. 不用BLPOP, 每次都sleep一秒去調(diào)用LrangeAndLTrim操作;

②.關(guān)機的時候殺掉 redis的blpop客戶端; 殺掉之后 BLPOP會立馬返回null; 進入下一個循環(huán)體;

不足

因為Redis的持久化特性,做不到消息完全不丟失,如果要保證完成不丟失,Redis的持久化刷盤策略要收緊
因為Codis不能使用BLPOP這種阻塞的形式,在獲取消費任務(wù)的時候用了每秒一次去獲取,有點浪費性能;
支持消費者多實例部署,但是可能存在不能均勻的分配到每臺機器上去消費;
雖然支持redis集群,但是其實是偽集群,因為Lua腳本的原因,讓他們都只能落在一臺機器上;

總結(jié)

實時性

正常情況下 消費的時間誤差不超過1秒鐘; 極端情況下,一臺實例宕機,另外的實例nextTime很遲; 那么最大誤差是1分鐘; 真正的誤差來自于業(yè)務(wù)方的接口的消費速度

QPS

完全視業(yè)務(wù)方的消費速度而定; 延遲隊列不是瓶頸

————————————————
版權(quán)聲明:本文為CSDN博主「石臻」的原創(chuàng)文章,遵循CC 4.0 by-sa版權(quán)協(xié)議,轉(zhuǎn)載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/u010634066/article/details/98864764

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容