Redis實(shí)現(xiàn)延遲隊(duì)列方法介紹

延遲隊(duì)列,顧名思義它是一種帶有延遲功能的消息隊(duì)列。那么,是在什么場景下我才需要這樣的隊(duì)列呢?

1. 背景

我們先看看以下業(yè)務(wù)場景:

  • 當(dāng)訂單一直處于未支付狀態(tài)時,如何及時的關(guān)閉訂單
  • 如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功
  • 在訂單長時間沒有收到下游系統(tǒng)的狀態(tài)通知的時候,如何實(shí)現(xiàn)階梯式的同步訂單狀態(tài)的策略
  • 在系統(tǒng)通知上游系統(tǒng)支付成功終態(tài)時,上游系統(tǒng)返回通知失敗,如何進(jìn)行異步通知實(shí)行分頻率發(fā)送:15s 3m 10m 30m 30m 1h 2h 6h 15h

1.1 解決方案

  • 最簡單的方式,定時掃表。例如對于訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進(jìn)行主動關(guān)單操作。優(yōu)點(diǎn)是簡單,缺點(diǎn)是每分鐘全局掃表,浪費(fèi)資源,如果遇到表數(shù)據(jù)訂單量即將過期的訂單量很大,會造成關(guān)單延遲。
  • 使用RabbitMq或者其他MQ改造實(shí)現(xiàn)延遲隊(duì)列,優(yōu)點(diǎn)是,開源,現(xiàn)成的穩(wěn)定的實(shí)現(xiàn)方案,缺點(diǎn)是:MQ是一個消息中間件,如果團(tuán)隊(duì)技術(shù)棧本來就有MQ,那還好,如果不是,那為了延遲隊(duì)列而去部署一套MQ成本有點(diǎn)大
  • 使用Redis的zset、list的特性,我們可以利用redis來實(shí)現(xiàn)一個延遲隊(duì)列RedisDelayQueue

2. 設(shè)計目標(biāo)

  • 實(shí)時性:允許存在一定時間的秒級誤差
  • 高可用性:支持單機(jī)、支持集群
  • 支持消息刪除:業(yè)務(wù)會隨時刪除指定消息
  • 消息可靠性:保證至少被消費(fèi)一次
  • 消息持久化:基于Redis自身的持久化特性,如果Redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續(xù)優(yōu)化將消息持久化到MangoDB中

3. 設(shè)計方案

設(shè)計主要包含以下幾點(diǎn):

  • 將整個Redis當(dāng)做消息池,以KV形式存儲消息
  • 使用ZSET做優(yōu)先隊(duì)列,按照Score維持優(yōu)先級
  • 使用LIST結(jié)構(gòu),以先進(jìn)先出的方式消費(fèi)
  • ZSET和LIST存儲消息地址(對應(yīng)消息池的每個KEY)
  • 自定義路由對象,存儲ZSET和LIST名稱,以點(diǎn)對點(diǎn)的方式將消息從ZSET路由到正確的LIST
  • 使用定時器維護(hù)路由
  • 根據(jù)TTL規(guī)則實(shí)現(xiàn)消息延遲

3.1 設(shè)計圖

還是基于有贊的延遲隊(duì)列設(shè)計,進(jìn)行優(yōu)化改造及代碼實(shí)現(xiàn)。有贊設(shè)計

image

3.2 數(shù)據(jù)結(jié)構(gòu)

  • ZING:DELAY_QUEUE:JOB_POOL 是一個Hash_Table結(jié)構(gòu),里面存儲了所有延遲隊(duì)列的信息。KV結(jié)構(gòu):K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數(shù)據(jù),消費(fèi)的時候回傳
  • ZING:DELAY_QUEUE:BUCKET 延遲隊(duì)列的有序集合ZSET,存放K=ID和需要的執(zhí)行時間戳,根據(jù)時間戳排序
  • ZING:DELAY_QUEUE:QUEUE LIST結(jié)構(gòu),每個Topic一個LIST,list存放的都是當(dāng)前需要被消費(fèi)的JOB
image

圖片僅供參考,基本可以描述整個流程的執(zhí)行過程

3.3 任務(wù)的生命周期

  1. 新增一個JOB,會在ZING:DELAY_QUEUE:JOB_POOL中插入一條數(shù)據(jù),記錄了業(yè)務(wù)方消費(fèi)方。ZING:DELAY_QUEUE:BUCKET也會插入一條記錄,記錄執(zhí)行的時間戳
  2. 搬運(yùn)線程會去ZING:DELAY_QUEUE:BUCKET中查找哪些執(zhí)行時間戳的RunTimeMillis比現(xiàn)在的時間小,將這些記錄全部刪除;同時會解析出每個任務(wù)的Topic是什么,然后將這些任務(wù)PUSH到TOPIC對應(yīng)的列表ZING:DELAY_QUEUE:QUEUE
  3. 每個TOPIC的LIST都會有一個監(jiān)聽線程去批量獲取LIST中的待消費(fèi)數(shù)據(jù),獲取到的數(shù)據(jù)全部扔給這個TOPIC的消費(fèi)線程池
  4. 消費(fèi)線程池執(zhí)行會去ZING:DELAY_QUEUE:JOB_POOL查找數(shù)據(jù)結(jié)構(gòu),返回給回調(diào)結(jié)構(gòu),執(zhí)行回調(diào)方法。

3.4 設(shè)計要點(diǎn)

3.4.1 基本概念

  • JOB:需要異步處理的任務(wù),是延遲隊(duì)列里的基本單元
  • Topic:一組相同類型Job的集合(隊(duì)列)。供消費(fèi)者來訂閱

3.4.2 消息結(jié)構(gòu)

每個JOB必須包含以下幾個屬性

  • jobId:Job的唯一標(biāo)識。用來檢索和刪除指定的Job信息
  • topic:Job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱
  • delay:Job需要延遲的時間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時間)
  • body:Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲
  • retry:失敗重試次數(shù)
  • url:通知URL

3.5 設(shè)計細(xì)節(jié)

3.5.1 如何快速消費(fèi)ZING:DELAY_QUEUE:QUEUE

最簡單的實(shí)現(xiàn)方式就是使用定時器進(jìn)行秒級掃描,為了保證消息執(zhí)行的時效性,可以設(shè)置每1S請求Redis一次,判斷隊(duì)列中是否有待消費(fèi)的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費(fèi)的JOB,那頻繁的掃描就失去了意義,也浪費(fèi)了資源,幸好LIST中有一個BLPOP阻塞原語,如果list中有數(shù)據(jù)就會立馬返回,如果沒有數(shù)據(jù)就會一直阻塞在那里,直到有數(shù)據(jù)返回,可以設(shè)置阻塞的超時時間,超時會返回NULL;具體的實(shí)現(xiàn)方式及策略會在代碼中進(jìn)行具體的實(shí)現(xiàn)介紹

3.5.2 避免定時導(dǎo)致的消息重復(fù)搬運(yùn)及消費(fèi)

  • 使用Redis的分布式鎖來控制消息的搬運(yùn),從而避免消息被重復(fù)搬運(yùn)導(dǎo)致的問題
  • 使用分布式鎖來保證定時器的執(zhí)行頻率

4. 核心代碼實(shí)現(xiàn)

4.1 技術(shù)說明

技術(shù)棧:SpringBoot,Redisson,Redis,分布式鎖,定時器

注意:本項(xiàng)目沒有實(shí)現(xiàn)設(shè)計方案中的多Queue消費(fèi),只開啟了一個QUEUE,這個待以后優(yōu)化

4.2 核心實(shí)體

4.2.1 Job新增對象

/**
 * 消息結(jié)構(gòu)
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class Job implements Serializable {

 private static final long serialVersionUID = 1L;

 /**
 * Job的唯一標(biāo)識。用來檢索和刪除指定的Job信息
 */
 @NotBlank
 private String jobId;

 /**
 * Job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱
 */
 @NotBlank
 private String topic;

 /**
 * Job需要延遲的時間。單位:秒。(服務(wù)端會將其轉(zhuǎn)換為絕對時間)
 */
 private Long delay;

 /**
 * Job的內(nèi)容,供消費(fèi)者做具體的業(yè)務(wù)處理,以json格式存儲
 */
 @NotBlank
 private String body;

 /**
 * 失敗重試次數(shù)
 */
 private int retry = 0;

 /**
 * 通知URL
 */
 @NotBlank
 private String url;
}
4.2.2 Job刪除對象
/**
 * 消息結(jié)構(gòu)
 *
 * @author 睜眼看世界
 * @date 2020年1月15日
 */
@Data
public class JobDie implements Serializable {

 private static final long serialVersionUID = 1L;

 /**
 * Job的唯一標(biāo)識。用來檢索和刪除指定的Job信息
 */
 @NotBlank
 private String jobId;

 /**
 * Job類型??梢岳斫獬删唧w的業(yè)務(wù)名稱
 */
 @NotBlank
 private String topic;
}

4.3 搬運(yùn)線程

/**
 * 搬運(yùn)線程
 *
 * @author 睜眼看世界
 * @date 2020年1月17日
 */
@Slf4j
@Component
public class CarryJobScheduled {

 @Autowired
 private RedissonClient redissonClient;

 /**
 * 啟動定時開啟搬運(yùn)JOB信息
 */
 @Scheduled(cron = "*/1 * * * * *")
 public void carryJobToQueue() {
 System.out.println("carryJobToQueue --->");
 RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
 long now = System.currentTimeMillis();
 Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
 List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
 RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
 readyQueue.addAll(jobList);
 bucketSet.removeAllAsync(jobList);
 } catch (InterruptedException e) {
 log.error("carryJobToQueue error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}

4.4 消費(fèi)線程

@Slf4j
@Component
public class ReadyQueueContext {

 @Autowired
 private RedissonClient redissonClient;

 @Autowired
 private ConsumerService consumerService;

 /**
 * TOPIC消費(fèi)線程
 */
 @PostConstruct
 public void startTopicConsumer() {
 TaskManager.doTask(this::runTopicThreads, "開啟TOPIC消費(fèi)線程");
 }

 /**
 * 開啟TOPIC消費(fèi)線程
 * 將所有可能出現(xiàn)的異常全部catch住,確保While(true)能夠不中斷
 */
 @SuppressWarnings("InfiniteLoopStatement")
 private void runTopicThreads() {
 while (true) {
 RLock lock = null;
 try {
 lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
 } catch (Exception e) {
 log.error("runTopicThreads getLock error", e);
 }
 try {
 if (lock == null) {
 continue;
 }
 // 分布式鎖時間比Blpop阻塞時間多1S,避免出現(xiàn)釋放鎖的時候,鎖已經(jīng)超時釋放,unlock報錯
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 continue;
 }

 // 1\. 獲取ReadyQueue中待消費(fèi)的數(shù)據(jù)
 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
 String topicId = queue.poll(60, TimeUnit.SECONDS);
 if (StringUtils.isEmpty(topicId)) {
 continue;
 }

 // 2\. 獲取job元信息內(nèi)容
 RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
 Job job = jobPoolMap.get(topicId);

 // 3\. 消費(fèi)
 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費(fèi)JobId-->" + job.getJobId());
 if (taskResult.get()) {
 // 3.1 消費(fèi)成功,刪除JobPool和DelayBucket的job信息
 jobPoolMap.remove(topicId);
 } else {
 int retrySum = job.getRetry() + 1;
 // 3.2 消費(fèi)失敗,則根據(jù)策略重新加入Bucket

 // 如果重試次數(shù)大于5,則將jobPool中的數(shù)據(jù)刪除,持久化到DB
 if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
 jobPoolMap.remove(topicId);
 continue;
 }
 job.setRetry(retrySum);
 long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
 log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(nextTime, topicId);
 // 3.3 更新元信息失敗次數(shù)
 jobPoolMap.put(topicId, job);
 }
 } catch (Exception e) {
 log.error("runTopicThreads error", e);
 } finally {
 if (lock != null) {
 try {
 lock.unlock();
 } catch (Exception e) {
 log.error("runTopicThreads unlock error", e);
 }
 }
 }
 }
 }
}

4.5 添加及刪除JOB

/**
 * 提供給外部服務(wù)的操作接口
 *
 * @author why
 * @date 2020年1月15日
 */
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

 @Autowired
 private RedissonClient redissonClient;

 /**
 * 添加job元信息
 *
 * @param job 元信息
 */
 @Override
 public void addJob(Job job) {

 RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

 // 1\. 將job添加到 JobPool中
 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 if (jobPool.get(topicId) != null) {
 throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
 }

 jobPool.put(topicId, job);

 // 2\. 將job添加到 DelayBucket中
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(job.getDelay(), topicId);
 } catch (InterruptedException e) {
 log.error("addJob error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }

 /**
 * 刪除job信息
 *
 * @param job 元信息
 */
 @Override
 public void deleteJob(JobDie jobDie) {

 RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
 try {
 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
 }
 String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());

 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 jobPool.remove(topicId);

 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.remove(topicId);
 } catch (InterruptedException e) {
 log.error("addJob error", e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}

5. 待優(yōu)化的內(nèi)容

  1. 目前只有一個Queue隊(duì)列存放消息,當(dāng)需要消費(fèi)的消息大量堆積后,會影響消息通知的時效。改進(jìn)的辦法是,開啟多個Queue,進(jìn)行消息路由,再開啟多個消費(fèi)線程進(jìn)行消費(fèi),提供吞吐量
  2. 消息沒有進(jìn)行持久化,存在風(fēng)險,后續(xù)會將消息持久化到MangoDB中

6. 源碼

更多詳細(xì)源碼請在下面地址中獲取

7. 參考

騰訊T3-T4標(biāo)準(zhǔn)精品PHP架構(gòu)師教程目錄大全,只要你看完保證薪資上升一個臺階(持續(xù)更新)?

以上內(nèi)容希望幫助到大家,很多PHPer在進(jìn)階的時候總會遇到一些問題和瓶頸,業(yè)務(wù)代碼寫多了沒有方向感,不知道該從那里入手去提升,對此我整理了一些資料,包括但不限于:分布式架構(gòu)、高可擴(kuò)展、高性能、高并發(fā)、服務(wù)器性能調(diào)優(yōu)、TP6,laravel,YII2,Redis,Swoole、Swoft、Kafka、Mysql優(yōu)化、shell腳本、Docker、微服務(wù)、Nginx等多個知識點(diǎn)高級進(jìn)階干貨需要的可以免費(fèi)分享給大家,官方群點(diǎn)擊此處

部分資料截圖:

image

[圖片上傳失敗...(image-e3af7b-1594644753828)]

[圖片上傳失敗...(image-2fb007-1594644753828)]

還有限時精品福利:

★騰訊高級PHP工程師筆試題目

★億級PV高并發(fā)場景訂單的處理

★laravel開發(fā)天貓商城組件服務(wù)

★戰(zhàn)旗TV視頻直播的架構(gòu)項(xiàng)目實(shí)戰(zhàn)

掃描下面二維碼領(lǐng)取

https://qm.qq.com/cgi-bin/qm/qr?k=tnc-MbbNXPn5psmmPtjh0VPlxH-g4Mtq&authKey=xT5SnnVXhzkkXIIgP0E2YmSexJDtvy0jiVMlSjIb8TbChYuG98yysUeas7jWUy/S&noverify=0 (二維碼自動識別)

對PHP后端技術(shù),對PHP架構(gòu)技術(shù)感興趣的朋友,我的官方群點(diǎn)擊此處,一起學(xué)習(xí),相互討論。

群內(nèi)已經(jīng)有管理將知識體系整理好(源碼,學(xué)習(xí)視頻等資料),歡迎加群免費(fèi)領(lǐng)取。

本課程深度對標(biāo)騰訊T3-T4標(biāo)準(zhǔn),貼身打造學(xué)習(xí)計劃為web開發(fā)人員進(jìn)階中高級、架構(gòu)師提升技術(shù),為自己增值漲薪!加入BAT特訓(xùn)營還可以獲得內(nèi)推大廠名額以及GO語言學(xué)習(xí)權(quán)限!??!

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容