在沒(méi)有搭建消息中間件(比如rabbitMQ、rocketMQ、kafka)的情況下,可以使用Redis來(lái)實(shí)現(xiàn)消息隊(duì)列。
1、普通消息隊(duì)列
利用的是 Redis 的 List 這個(gè)數(shù)據(jù)結(jié)構(gòu)。
1-1、Redis的數(shù)據(jù)結(jié)構(gòu):List
Redis的List是鏈表型的數(shù)據(jù)結(jié)構(gòu),可以使用LPUSH/RPUSH/LPOP/RPOP等命令在List的兩端執(zhí)行插入元素和彈出元素的操作。
雖然List也支持在特定index上插入和讀取元素的功能,但其時(shí)間復(fù)雜度較高(O(N)),應(yīng)盡量避免使用。
1-2、List的主要操作命令
LPUSH:向指定List的左側(cè)(即頭部)插入1個(gè)或多個(gè)元素,返回插入后的List長(zhǎng)度。時(shí)間復(fù)雜度O(N),N為插入元素的數(shù)量
RPUSH:同LPUSH,向指定List的右側(cè)(即尾部)插入1或多個(gè)元素
LPOP:從指定List的左側(cè)(即頭部)移除一個(gè)元素并返回,時(shí)間復(fù)雜度O(1)
RPOP:同LPOP,從指定List的右側(cè)(即尾部)移除1個(gè)元素并返回
LPUSHX/RPUSHX:與LPUSH/RPUSH類似,區(qū)別在于,LPUSHX/RPUSHX操作的List如果不存在(長(zhǎng)度為0),則不會(huì)進(jìn)行任何操作
LLEN:返回指定List的長(zhǎng)度,時(shí)間復(fù)雜度O(1)
LRANGE:返回指定List中指定范圍的元素(雙端包含,即LRANGE key 0 10會(huì)返回11個(gè)元素),時(shí)間復(fù)雜度O(N)。
1-3、List的注意事項(xiàng)
時(shí)間復(fù)雜度為O(N)的以下命令都應(yīng)謹(jǐn)慎使用
用LRANGE應(yīng)盡可能控制一次獲取的元素?cái)?shù)量,一次獲取過(guò)大范圍的List元素會(huì)導(dǎo)致延遲,同時(shí)對(duì)長(zhǎng)度不可預(yù)知的List,避免使用LRANGE key 0 -1這樣的完整遍歷操作。
LINDEX:返回指定List指定index上的元素,如果index越界,返回nil。index數(shù)值是回環(huán)的,即-1代表List最后一個(gè)位置,-2代表List倒數(shù)第二個(gè)位置。時(shí)間復(fù)雜度O(N)
LSET:將指定List指定index上的元素設(shè)置為value,如果index越界則返回錯(cuò)誤,時(shí)間復(fù)雜度O(N),如果操作的是頭/尾部的元素,則時(shí)間復(fù)雜度為O(1)
LINSERT:向指定List中指定元素之前/之后插入一個(gè)新元素,并返回操作后的List長(zhǎng)度。如果指定的元素不存在,返回-1。如果指定List不存在(長(zhǎng)度為0),不會(huì)進(jìn)行任何操作,時(shí)間復(fù)雜度O(N)
由于Redis的List是鏈表結(jié)構(gòu)的,上述命令的算法效率較低,需要對(duì)List進(jìn)行遍歷,命令的耗時(shí)無(wú)法預(yù)估,在List長(zhǎng)度大的情況下耗時(shí)會(huì)明顯增加,應(yīng)謹(jǐn)慎使用。
換句話說(shuō),Redis的List實(shí)際是設(shè)計(jì)來(lái)用于實(shí)現(xiàn)隊(duì)列,而不是用于實(shí)現(xiàn)類似ArrayList這樣的列表的。如果不是想要實(shí)現(xiàn)一個(gè)雙端出入的隊(duì)列,那么請(qǐng)盡量不要使用Redis的List數(shù)據(jù)結(jié)構(gòu)。
1-4、實(shí)現(xiàn)消息隊(duì)列
(1)基礎(chǔ)方法
既然List這種數(shù)據(jù)結(jié)構(gòu)就是被設(shè)計(jì)用來(lái)實(shí)現(xiàn)隊(duì)列的,那么就可以直接使用List來(lái)實(shí)現(xiàn)消息隊(duì)列。
生產(chǎn)者使用RPUSH不停地向隊(duì)列右端添加信息;消費(fèi)者使用LPOP不停地從隊(duì)列左端讀取消息,讀取不到消息時(shí),使用SLEEP命令定期重讀即可。
(2)改進(jìn)SLEEP
不使用SLEEP命令,直接將LPOP命令替換為BLPOP即可,這個(gè)B代表Block,是一個(gè)阻塞式命令。List為空時(shí),阻塞連接,直到List中有對(duì)象可獲取時(shí)再返回。
這相當(dāng)于Java中的BlockingQueue數(shù)據(jù)類型。
另外,BLPOP的命令參數(shù)中可以設(shè)定超時(shí)時(shí)間,timeout之后如果隊(duì)列中仍然沒(méi)有對(duì)象,則直接返回。
BLPOP key [key ...] timeout
(3)實(shí)現(xiàn)訂閱模式
使用 Redis 的發(fā)布訂閱(pub/sub)模式,可以實(shí)現(xiàn)1:N的消息隊(duì)列(即fanout)。
實(shí)現(xiàn)概要如下。
【多個(gè)消費(fèi)者訂閱channel(即雙方約定的List)】
SUBSCRIBE fanoutChannel
【生產(chǎn)者向channel中發(fā)布消息】
PUBLISH fanoutChannel "msg1"
PUBLISH fanoutChannel "msg2"
【所有消費(fèi)者都會(huì)收到該消息】
1) "message"
2) "fanoutChannel"
3) "msg1"
1) "message"
2) "fanoutChannel"
3) "msg2"
但是要注意,消費(fèi)者下線再上線后,無(wú)法獲取此期間生產(chǎn)的消息(無(wú)持久化)。
2、延時(shí)隊(duì)列
利用的是 Redis 的 Sorted Set 這個(gè)數(shù)據(jù)結(jié)構(gòu)的一點(diǎn)點(diǎn)使用技巧。
2-1、Redis的數(shù)據(jù)結(jié)構(gòu):Sorted Set
Redis Sorted Set是有序的、不可重復(fù)的String集合。
Sorted Set中的每個(gè)元素都需要指派一個(gè)分?jǐn)?shù)(score),Sorted Set會(huì)根據(jù)score對(duì)元素進(jìn)行升序排序。
如果多個(gè)元素?fù)碛邢嗤膕core,則以字典序進(jìn)行升序排序。
Sorted Set非常適合用于記錄排名、熱點(diǎn)話題等場(chǎng)景。
2-2、Sorted Set的主要操作命令
ZADD:向指定Sorted Set中添加1個(gè)或多個(gè)member,時(shí)間復(fù)雜度O(Mlog(N)),M為添加的member數(shù)量,N為Sorted Set中的member數(shù)量
ZREM:從指定Sorted Set中刪除1個(gè)或多個(gè)member,時(shí)間復(fù)雜度O(Mlog(N)),M為刪除的member數(shù)量,N為Sorted Set中的member數(shù)量
ZCOUNT:返回指定Sorted Set中指定score范圍內(nèi)的member數(shù)量,時(shí)間復(fù)雜度:O(log(N))
ZCARD:返回指定Sorted Set中的member數(shù)量,時(shí)間復(fù)雜度O(1)
ZSCORE:返回指定Sorted Set中指定member的score,時(shí)間復(fù)雜度O(1)
ZRANK/ZREVRANK:返回指定member在Sorted Set中的排名,ZRANK返回按升序排序的排名,ZREVRANK則返回按降序排序的排名。時(shí)間復(fù)雜度O(log(N))
ZINCRBY:同INCRBY,對(duì)指定Sorted Set中的指定member的score進(jìn)行自增,時(shí)間復(fù)雜度O(log(N))
2-3、Sorted Set的注意事項(xiàng)
時(shí)間復(fù)雜度較高的以下命令都應(yīng)謹(jǐn)慎使用
ZRANGE/ZREVRANGE:返回指定Sorted Set中指定排名范圍內(nèi)的所有member,ZRANGE為按score升序排序,ZREVRANGE為按score降序排序,時(shí)間復(fù)雜度O(log(N)+M),M為本次返回的member數(shù)(下同)
ZRANGEBYSCORE/ZREVRANGEBYSCORE:返回指定Sorted Set中指定score范圍內(nèi)的所有member,返回結(jié)果以升序/降序排序,min和max可以指定為-inf和+inf,代表返回所有的member。時(shí)間復(fù)雜度O(log(N)+M)
ZREMRANGEBYRANK/ZREMRANGEBYSCORE:移除Sorted Set中指定排名范圍/指定score范圍內(nèi)的所有member。時(shí)間復(fù)雜度O(log(N)+M)
上述命令中應(yīng)避免傳遞[0 -1]或[-inf +inf]這樣的參數(shù),來(lái)對(duì)Sorted Set做一次性的完整遍歷,特別是在Sorted Set的尺寸不可預(yù)知的情況下。
可以通過(guò)ZSCAN命令來(lái)進(jìn)行游標(biāo)式的遍歷
ZSCAN key cursor [MATCH pattern] [COUNT count]
或通過(guò)LIMIT參數(shù)來(lái)限制返回member的數(shù)量(適用于ZRANGEBYSCORE和ZREVRANGEBYSCORE命令),以實(shí)現(xiàn)游標(biāo)式的遍歷。
2-4、實(shí)現(xiàn)延時(shí)隊(duì)列
思路為使用Sorted Set,拿時(shí)間戳作為score。
生產(chǎn)者將消息內(nèi)容作為member,時(shí)間戳作為score調(diào)用ZADD來(lái)生產(chǎn)消息;
ZADD key score member [[score member] [score member] ...]
消費(fèi)者用ZRANGEBYSCORE命令獲取N秒之前的數(shù)據(jù)進(jìn)行輪詢處理,使用min和max向前推N秒來(lái)卡延時(shí)的消息。
ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
2-plus、使用rabbitMQ實(shí)現(xiàn)延時(shí)隊(duì)列的思路
與 Redis 無(wú)關(guān),但是 rabbitMQ 本身實(shí)際上也沒(méi)有直接提供延時(shí)隊(duì)列的功能,所以在這里多說(shuō)一句,提一下在 rabbitMQ 里面實(shí)現(xiàn)延時(shí)隊(duì)列的思路。
rabbitMQ中可以對(duì)Message設(shè)置 x-message-ttl(TTL = Time To Live)來(lái)控制消息的生存時(shí)間。超時(shí)以后消息變?yōu)閐ead letter(死信)。
同時(shí),RabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可選)兩個(gè)參數(shù),如果隊(duì)列內(nèi)出現(xiàn)了dead letter,則按照這兩個(gè)參數(shù)重新路由轉(zhuǎn)發(fā)到指定的隊(duì)列。
利用這樣的特性,設(shè)置兩個(gè)隊(duì)列,A隊(duì)列無(wú)消費(fèi)者,生產(chǎn)者向該隊(duì)列發(fā)送消息,消息設(shè)定TTL;同時(shí)設(shè)定A中出現(xiàn)死信以后將消息轉(zhuǎn)發(fā)到B隊(duì)列;B隊(duì)列使用正常設(shè)定即可,所有消費(fèi)者從B讀取消息。
即可完成延時(shí)時(shí)間為TTL的延時(shí)隊(duì)列。
另外,rabbitMQ中,除了超時(shí)以外,隊(duì)列達(dá)到最大長(zhǎng)度 或 者消息被消費(fèi)端拒絕并且requeue=false 時(shí),消息也會(huì)變成死信,需要注意一下別跟需要延時(shí)的消息混雜在一起。