Redis消息隊(duì)列

一、概述

消息隊(duì)列,Message Queue,常用于解決并發(fā)系統(tǒng)中的資源一致性問(wèn)題,提升峰值的處理能力,同時(shí)保證消息的順序性、可恢復(fù)性、必送達(dá)性,對(duì)應(yīng)用進(jìn)行解耦,或者實(shí)現(xiàn)異步通訊等。市面上的 MQ應(yīng)用有很多(例如:Kafka,RabbitMQ,Disque),同時(shí)也可以基于 Redis 來(lái)實(shí)現(xiàn),比較典型的方案有:

  • 基于List的 LPUSH+BRPOP 的實(shí)現(xiàn)
  • PUB/SUB,訂閱/發(fā)布模式
  • 基于Sorted-Set的實(shí)現(xiàn)
  • 基于Stream類型的實(shí)現(xiàn)
    在消息隊(duì)列使用中,有生產(chǎn)者producter和消費(fèi)者consumer。生產(chǎn)者負(fù)責(zé)生成消息,消費(fèi)者負(fù)責(zé)使用處理消息。生產(chǎn),指的是將消息放入消息隊(duì)列。 消費(fèi),指的是讀取并處理消息。通常一個(gè)消息再被消費(fèi)后,就應(yīng)該從消息隊(duì)列中刪除。


?

二、實(shí)現(xiàn)

1、基于List的LPUSH+BRPOP的實(shí)現(xiàn)
LPUSH,將消息放入消息隊(duì)列(生產(chǎn)者)
BRPOP,從隊(duì)列中取出消息,阻塞模式(消費(fèi)者)

TBase中不支持BRPOP,只支持RPOP,BRPOP是RPOP的阻塞版本
該模式優(yōu)點(diǎn):

  • 實(shí)現(xiàn)簡(jiǎn)單
  • Reids支持持久化消息,意味著消息不會(huì)丟失,可以重復(fù)查看(注意不是消費(fèi),只看不用,LRANGE類的指令)。
  • 可以保證順序,保證使用LPUSH命令,可以保證消息的順序性
  • 使用RPUSH,可以將消息放在隊(duì)列的開(kāi)頭,達(dá)到優(yōu)先消息的目的,可以實(shí)現(xiàn)簡(jiǎn)易的消息優(yōu)先隊(duì)列。
    該模式缺點(diǎn):
  • 做消費(fèi)確認(rèn)ACK比較麻煩,就是不能保證消費(fèi)者在讀取之后,未處理后的宕機(jī)問(wèn)題。導(dǎo)致消息意外丟失。通常需要自己維護(hù)一個(gè)Pending列表,保證消息的處理確認(rèn)。
  • 不能做廣播模式,例如典型的Pub/Discribe模式。
  • 不能重復(fù)消費(fèi),一旦消費(fèi)就會(huì)被刪除
  • 不支持分組消費(fèi),需要自己在業(yè)務(wù)邏輯層解決
    ?
2、PUB/SUB,訂閱/發(fā)布模式
SUBSCRIBE,用于訂閱信道
PUBLISH,向信道發(fā)送消息
UNSUBSCRIBE,取消訂閱

生產(chǎn)者和消費(fèi)者通過(guò)相同的一個(gè)信道(Channel)進(jìn)行交互。信道其實(shí)也就是隊(duì)列。通常會(huì)有多個(gè)消費(fèi)者。多個(gè)消費(fèi)者訂閱同一個(gè)信道,當(dāng)生產(chǎn)者向信道發(fā)布消息時(shí),該信道會(huì)立即將消息逐一發(fā)布給每個(gè)消費(fèi)者??梢?jiàn),該信道對(duì)于消費(fèi)者是發(fā)散的信道,每個(gè)消費(fèi)者都可以得到相同的消息。典型的對(duì)多的關(guān)系。
該模式優(yōu)點(diǎn):

  • 典型的廣播模式,一個(gè)消息可以發(fā)布到多個(gè)消費(fèi)者
  • 多信道訂閱,消費(fèi)者可以同時(shí)訂閱多個(gè)信道,從而接收多類消息
  • 消息即時(shí)發(fā)送,消息不用等待消費(fèi)者讀取,消費(fèi)者會(huì)自動(dòng)接收到信道發(fā)布的消息
    該模式缺點(diǎn):
  • 消息一旦發(fā)布,不能接收。換句話就是發(fā)布時(shí)若客戶端不在線,則消息丟失,不能尋回
  • 不能保證每個(gè)消費(fèi)者接收的時(shí)間是一致的
  • 若消費(fèi)者客戶端出現(xiàn)消息積壓,到一定程度,會(huì)被強(qiáng)制斷開(kāi),導(dǎo)致消息意外丟失。通常發(fā)生在消息的生產(chǎn)遠(yuǎn)大于消費(fèi)速度時(shí)
    Pub/Sub 模式不適合做消息存儲(chǔ),消息積壓類的業(yè)務(wù),而是擅長(zhǎng)處理廣播,即時(shí)通訊,即時(shí)反饋的業(yè)務(wù)。
    ?
3、基于SortedSet有序集合的實(shí)現(xiàn)
ZADD KEY score member,壓入集合
ZRANGEBYSCORE,依據(jù)score獲取成員

有序集合的方案是在自己確定消息順I(yè)D時(shí)比較常用,使用集合成員的Score來(lái)作為消息ID,保證順序,還可以保證消息ID的單調(diào)遞增。通??梢允褂脮r(shí)間戳+序號(hào)的方案。確保了消息ID的單調(diào)遞增,利用SortedSet的依據(jù)Score排序的特征,就可以制作一個(gè)有序的消息隊(duì)列了。
和上面的方案相比,優(yōu)點(diǎn)就是可以自定義消息ID,在消息ID有意義時(shí),比較重要。缺點(diǎn)也明顯,不允許重復(fù)消息(以為是集合),同時(shí)消息ID確定有錯(cuò)誤會(huì)導(dǎo)致消息的順序出錯(cuò)。
?

4、基于stream實(shí)現(xiàn)

TBase還不支持該數(shù)據(jù)結(jié)構(gòu)
Redis5.0中發(fā)布的Stream類型,也用來(lái)實(shí)現(xiàn)典型的消息隊(duì)列。該Stream類型的出現(xiàn),幾乎滿足了消息隊(duì)列具備的全部?jī)?nèi)容,包括但不限于:

  • 消息ID的序列化生成
  • 消息遍歷
  • 消息的阻塞和非阻塞讀取
  • 消息的分組消費(fèi)
  • 未完成消息的處理
  • 消息隊(duì)列監(jiān)控
    追加新消息,XADD,生產(chǎn)消息
    XADD,命令用于在某個(gè)stream(流數(shù)據(jù))中追加消息,演示如下:
127.0.0.1:6379> XADD memberMessage * user kang msg Hello
"1553439850328-0"
127.0.0.1:6379> XADD memberMessage * user zhong  msg nihao
"1553439858868-0"

語(yǔ)法格式為:

XADD key ID field string [field string ...]

需要提供key,消息ID方案,消息內(nèi)容,其中消息內(nèi)容為key-value型數(shù)據(jù)。 ID,最常使用*,表示由Redis生成消息ID,這也是強(qiáng)烈建議的方案。field string [field string]就是當(dāng)前消息內(nèi)容,由1個(gè)或多個(gè)key-value構(gòu)成。
上面的例子中,在memberMemsages這個(gè)key中追加了user kang msg Hello這個(gè)消息。Redis使用毫秒時(shí)間戳和序號(hào)生成了消息ID。此時(shí),消息隊(duì)列中就有一個(gè)消息可用了。
?
從消息隊(duì)列中獲取消息,XREAD,消費(fèi)消息
XREAD,從Stream中讀取消息,演示如下:

127.0.0.1:6379> XREAD streams memberMessage 0
1) 1) "memberMessage"
   2) 1) 1) "1553439850328-0"
         2) 1) "user"
            2) "kang"
            3) "msg"
            4) "Hello"
      2) 1) "1553439858868-0"
         2) 1) "user"
            2) "zhong"
            3) "msg"
            4) "nihao"

語(yǔ)法格式為:

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] ID [ID ...]
  • [COUNT count] 用于限定獲取的消息數(shù)量
  • [BLOCK milliseconds] 用于設(shè)置XREAD為阻塞模式,默認(rèn)為非阻塞模式
  • ID 用于設(shè)置由哪個(gè)消息ID開(kāi)始讀取。使用0表示從第一條消息開(kāi)始。(本例中就是使用0)此處需要注意,消息隊(duì)列ID是單調(diào)遞增的,所以通過(guò)設(shè)置起點(diǎn),可以向后讀取。在阻塞模式中,可以使用,表示最新的消息ID。(在非阻塞模式下無(wú)意義)。
    XRED讀消息時(shí)分為阻塞和非阻塞模式,使用BLOCK選項(xiàng)可以表示阻塞模式,需要設(shè)置阻塞時(shí)長(zhǎng)。非阻塞模式下,讀取完畢(即使沒(méi)有任何消息)立即返回,而在阻塞模式下,若讀取不到內(nèi)容,則阻塞等待。
    ?
    Pending 等待列表
    為了解決組內(nèi)消息讀取但處理期間消費(fèi)者崩潰帶來(lái)的消息丟失問(wèn)題,STREAM 設(shè)計(jì)了Pending 列表,用于記錄讀取但并未處理完畢的消息。命令XPENDIING 用來(lái)獲消費(fèi)組或消費(fèi)內(nèi)消費(fèi)者的未處理完畢的消息。演示如下:
127.0.0.1:6379> XPENDING mq mqGroup # mpGroup的Pending情況
1) (integer) 5 # 5個(gè)已讀取但未處理的消息
2) "1553585533795-0" # 起始ID
3) "1553585533795-4" # 結(jié)束ID
4) 1) 1) "consumerA" # 消費(fèi)者A有3個(gè)
      2) "3"
   2) 1) "consumerB" # 消費(fèi)者B有1個(gè)
      2) "1"
   3) 1) "consumerC" # 消費(fèi)者C有1個(gè)
      2) "1"
?
127.0.0.1:6379> XPENDING mq mqGroup - + 10 # 使用 start end count 選項(xiàng)可以獲取詳細(xì)信息
1) 1) "1553585533795-0" # 消息ID
   2) "consumerA" # 消費(fèi)者
   3) (integer) 1654355 # 從讀取到現(xiàn)在經(jīng)歷了1654355ms,IDLE
   4) (integer) 5 # 消息被讀取了5次,delivery counter
2) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 1654355
   4) (integer) 4
# 共5個(gè),余下3個(gè)省略 ...
?
127.0.0.1:6379> XPENDING mq mqGroup - + 10 consumerA # 在加上消費(fèi)者參數(shù),獲取具體某個(gè)消費(fèi)者的Pending列表
1) 1) "1553585533795-0"
   2) "consumerA"
   3) (integer) 1641083
   4) (integer) 5
# 共3個(gè),余下2個(gè)省略 ...

每個(gè)Pending的消息有4個(gè)屬性:

  • 消息ID
  • 所屬消費(fèi)者
  • IDLE,已讀取時(shí)長(zhǎng)
  • delivery counter,消息被讀取次數(shù)
    上面的結(jié)果我們可以看到,我們之前讀取的消息,都被記錄在Pending列表中,說(shuō)明全部讀到的消息都沒(méi)有處理,僅僅是讀取了。那如何表示消費(fèi)者處理完畢了消息呢?使用命令XACK 完成告知消息處理完成,演示如下:
127.0.0.1:6379> XACK mq mqGroup 1553585533795-0 # 通知消息處理結(jié)束,用消息ID標(biāo)識(shí)
(integer) 1
?
127.0.0.1:6379> XPENDING mq mqGroup # 再次查看Pending列表
1) (integer) 4 # 已讀取但未處理的消息已經(jīng)變?yōu)?個(gè)
2) "1553585533795-1"
3) "1553585533795-4"
4) 1) 1) "consumerA" # 消費(fèi)者A,還有2個(gè)消息處理
      2) "2"
   2) 1) "consumerB"
      2) "1"
   3) 1) "consumerC"
      2) "1"

有了這樣一個(gè)Pending機(jī)制,就意味著在某個(gè)消費(fèi)者讀取消息但未處理后,消息是不會(huì)丟失的。等待消費(fèi)者再次上線后,可以讀取該P(yáng)ending列表,就可以繼續(xù)處理該消息了,保證消息的有序和不丟失。
此時(shí)還有一個(gè)問(wèn)題,就是若某個(gè)消費(fèi)者宕機(jī)之后,沒(méi)有辦法再上線了,那么就需要將該消費(fèi)者Pending的消息,轉(zhuǎn)義給其他的消費(fèi)者處理,就是消息轉(zhuǎn)移。
?
消息轉(zhuǎn)移
消息轉(zhuǎn)移的操作時(shí)將某個(gè)消息轉(zhuǎn)移到自己的Pending列表中。使用語(yǔ)法XCLAIM來(lái)實(shí)現(xiàn),需要設(shè)置組、轉(zhuǎn)移的目標(biāo)消費(fèi)者和消息ID,同時(shí)需要提供IDLE(已被讀取時(shí)長(zhǎng)),只有超過(guò)這個(gè)時(shí)長(zhǎng),才能被轉(zhuǎn)移。演示如下:

# 當(dāng)前屬于消費(fèi)者A的消息1553585533795-1,已經(jīng)15907,787ms未處理了
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerA"
   3) (integer) 15907787
   4) (integer) 4
?
# 轉(zhuǎn)移超過(guò)3600s的消息1553585533795-1到消費(fèi)者B的Pending列表
127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
1) 1) "1553585533795-1"
   2) 1) "msg"
      2) "2"
?
# 消息1553585533795-1已經(jīng)轉(zhuǎn)移到消費(fèi)者B的Pending中。
127.0.0.1:6379> XPENDING mq mqGroup - + 10
1) 1) "1553585533795-1"
   2) "consumerB"
   3) (integer) 84404 # 注意IDLE,被重置了
   4) (integer) 5 # 注意,讀取次數(shù)也累加了1次

以上代碼,完成了一次消息轉(zhuǎn)移。轉(zhuǎn)移除了要指定ID外,還需要指定IDLE,保證是長(zhǎng)時(shí)間未處理的才被轉(zhuǎn)移。被轉(zhuǎn)移的消息的IDLE會(huì)被重置,用以保證不會(huì)被重復(fù)轉(zhuǎn)移,以為可能會(huì)出現(xiàn)將過(guò)期的消息同時(shí)轉(zhuǎn)移給多個(gè)消費(fèi)者的并發(fā)操作,設(shè)置了IDLE,則可以避免后面的轉(zhuǎn)移不會(huì)成功,因?yàn)镮DLE不滿足條件。例如下面的連續(xù)兩條轉(zhuǎn)移,第二條不會(huì)成功。

127.0.0.1:6379> XCLAIM mq mqGroup consumerB 3600000 1553585533795-1
127.0.0.1:6379> XCLAIM mq mqGroup consumerC 3600000 1553585533795-1

這就是消息轉(zhuǎn)移。至此我們使用了一個(gè)Pending消息的ID,所屬消費(fèi)者和IDLE的屬性,還有一個(gè)屬性就是消息被讀取次數(shù),delivery counter,該屬性的作用由于統(tǒng)計(jì)消息被讀取的次數(shù),包括被轉(zhuǎn)移也算。這個(gè)屬性主要用在判定是否為錯(cuò)誤數(shù)據(jù)上。
?
壞消息問(wèn)題,Dead Letter,死信問(wèn)題
正如上面所說(shuō),如果某個(gè)消息,不能被消費(fèi)者處理,也就是不能被XACK,這是要長(zhǎng)時(shí)間處于Pending列表中,即使被反復(fù)的轉(zhuǎn)移給各個(gè)消費(fèi)者也是如此。此時(shí)該消息的delivery counter就會(huì)累加(上一節(jié)的例子可以看到),當(dāng)累加到某個(gè)我們預(yù)設(shè)的臨界值時(shí),我們就認(rèn)為是壞消息(也叫死信,DeadLetter,無(wú)法投遞的消息),由于有了判定條件,我們將壞消息處理掉即可,刪除即可。刪除一個(gè)消息,使用XDEL語(yǔ)法,演示如下:

# 刪除隊(duì)列中的消息
127.0.0.1:6379> XDEL mq 1553585533795-1
(integer) 1
# 查看隊(duì)列中再無(wú)此消息
127.0.0.1:6379> XRANGE mq - +
1) 1) "1553585533795-0"
   2) 1) "msg"
      2) "1"
2) 1) "1553585533795-2"
   2) 1) "msg"
      2) "3"

注意本例中,并沒(méi)有刪除Pending中的消息因此你查看Pending,消息還會(huì)在。可以執(zhí)行XACK標(biāo)識(shí)其處理完畢!
?
信息監(jiān)控,XINFO
Stream提供了XINFO來(lái)實(shí)現(xiàn)對(duì)服務(wù)器信息的監(jiān)控,可以查詢、查看隊(duì)列信息:

127.0.0.1:6379> Xinfo stream mq
 1) "length"
 2) (integer) 7
 3) "radix-tree-keys"
 4) (integer) 1
 5) "radix-tree-nodes"
 6) (integer) 2
 7) "groups"
 8) (integer) 1
 9) "last-generated-id"
10) "1553585533795-9"
11) "first-entry"
12) 1) "1553585533795-3"
    2) 1) "msg"
       2) "4"
13) "last-entry"
14) 1) "1553585533795-9"
    2) 1) "msg"
       2) "10"

消費(fèi)組信息:

127.0.0.1:6379> Xinfo groups mq
1) 1) "name"
   2) "mqGroup"
   3) "consumers"
   4) (integer) 3
   5) "pending"
   6) (integer) 3
   7) "last-delivered-id"
   8) "1553585533795-4"

消費(fèi)者組成員信息:

127.0.0.1:6379> XINFO CONSUMERS mq mqGroup
1) 1) "name"
   2) "consumerA"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 18949894
2) 1) "name"
   2) "consumerB"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 3092719
3) 1) "name"
   2) "consumerC"
   3) "pending"
   4) (integer) 1
   5) "idle"
   6) (integer) 23683256

?
命令一覽
?

命令 說(shuō)明
XACK 結(jié)束Pending
XADD 生成消息
XCLAIM 消息轉(zhuǎn)移
XDEL 刪除消息
XGROUP 消費(fèi)組管理
XINFO 得到消費(fèi)組信息
XLEN 消息隊(duì)列長(zhǎng)度
Pending列表 Pending列表
XRANGE 獲取消息隊(duì)列中消息
XREAD 消費(fèi)消息
XREADGROUP 分組消費(fèi)消息
XREVRANGE 逆序獲取消息隊(duì)列中消息
XTRIM 消息隊(duì)列容量

?

Reference

[1] 基于Redis實(shí)現(xiàn)消息隊(duì)列典型方案
[2] Stream 類型

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容