Redis是一個優(yōu)秀的內(nèi)存數(shù)據(jù)庫,Stream是其中強大的功能,但也有很多細節(jié),本篇是在學習中的筆記。
應(yīng)用場景
記錄用戶行為(如點擊,輸入)
傳感器的讀數(shù)(多個維度,如溫度、濕度)
通知
提供多種數(shù)據(jù)消費策略(xread,xreadgroup,xrange)
XADD
寫入,支持多字段,和Hash數(shù)據(jù)類型相似,*代表讓Redis自動生成ID,命令返回ID
ID由兩個64位的數(shù)字組成
> XADD mystream * sensor-id 1234 temperature 19.8
1518951480106-0
*也可以替換成自定義,最小是0-1,但其他的第二個數(shù)字可以為0,比如1-0
> XADD mystream 0-1 foo bar
0-1
第二位*的情況
> XADD somestream 0-* baz qux
0-2
如果沒有key,不創(chuàng)建
> xadd mykey nomkstream * k v
(nil)
> xadd mykey * k v
"1682114234811-0"
> xadd mykey nomkstream * k1 v1
"1682114244867-0"
可以攜帶trim信息,MAXLEN和閾值直接可以加上~或=,表示近似或精確相等,默認精確,也可以用MINID
> xadd mykey * k1 v1
"1682135799495-0"
> xadd mykey * k2 v2
"1682135814851-0"
> xadd mykey * k3 v3
"1682135821706-0"
> xadd mykey * k4 v4
"1682135835368-0"
> xadd mykey MAXLEN 3 * k5 v5
"1682135847838-0"
> xrange mykey - +
1) 1) "1682135821706-0"
2) 1) "k3"
2) "v3"
2) 1) "1682135835368-0"
2) 1) "k4"
2) "v4"
3) 1) "1682135847838-0"
2) 1) "k5"
2) "v5"
XLEN
獲取長度
> XLEN mystream
(integer) 1
XRANGE
根據(jù)范圍獲取數(shù)據(jù),兩個ID是閉區(qū)間,可以通過前面加上括號變成開區(qū)間
XRANGE mystream - +
1) 1) 1518951480106-0
2) 1) "sensor-id"
2) "1234"
3) "temperature"
4) "19.8"
2) 1) 1518951482479-0
2) 1) "sensor-id"
2) "9999"
3) "temperature"
4) "18.2"
XRANGE mystream 1518951480106 1518951480107
XRANGE mystream - + COUNT 2
XRANGE mystream (1519073279157-0 + COUNT 2
XREAD
0表示永遠不會超時,BLOCK只配合$一起使用才能實現(xiàn)阻塞功能
XREAD BLOCK 0 STREAMS mystream $
BLOCK是有順序的,先執(zhí)行的先獲得數(shù)據(jù)
XREAD也能同時取多個stream的數(shù)據(jù),每個stream都要給定起始ID
XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
XREAD和XRANGE
都能迭代獲取所有記錄,XRANGE迭代時起始ID需要加上(表示排除
XREAD適合于通過迭代方式取完所有記錄,XRANGE適合一次性獲取記錄(支持 -+)
XREAD支持阻塞式讀($)
前者支持多個Key,后者只支持一個Key
XREAD
與PUBSUB的區(qū)別:沒有存儲功能
與LIST區(qū)別:其中一個消費者取完,另外一個消費者不能再次獲得
=================
stream還提供了consume group功能,和kafka相似
- 每個消息只能發(fā)送給一個消費者
- 每個消費者有唯一的名稱
- First ID的概念,沒被發(fā)送的的第一個消息
- 消息可以ACK的操作,表示消息已經(jīng)正確處理完成
- 消費者能看到已接收pending,還沒ACK的歷史數(shù)據(jù)
創(chuàng)建consumer group,如果不存在stream會創(chuàng)建失敗,$表示stream中最大的id,stream中新的消息才能被發(fā)送到group里,0表示stream所有的消息都會被送到組里,這個id也能是一個0外的ID
XGROUP CREATE newstream mygroup $
XGROUP CREATE newstream mygroup 0
帶MKSTREAM,如果不存在stream會自動創(chuàng)建,如果存在沒影響
XGROUP CREATE newstream mygroup $ MKSTREAM
讀取,如果沒有未讀消息直接返回null,消費者是隱式創(chuàng)建的,不需要單獨命令
XREADGROUP GROUP mygroup Alice COUNT 1 STREAMS mystream
歷史消息只要是沒XACK,還在pending列表中,還是能獲取到
XREADGROUP GROUP mygroup Alice STREAMS mystream 0
消費者是隱式創(chuàng)建的,不需要單獨命令
XREADGROUP也支持讀多個stream,但是group在不同stream里必須取相同名稱
這是一個只讀命令,返回最小最大id,并且返回消費者和他們名下的pending數(shù)量
XPENDING mystream mygroup
這個命令不返回消息內(nèi)容,可以通過XRANGE查看
增加了范圍,服務(wù)器給出了更詳細的信息,包括每個已發(fā)送信息的距離上次發(fā)送的時長和總共發(fā)送次數(shù)
> XPENDING mystream mygroup - + 10
1) 1) 1526569498055-0
2) "Bob" // Owner
3) (integer) 74170458 // Idle時間
4) (integer) 1 // 發(fā)送次數(shù)
通過XCLAIM(索要)可以改變owner,這命令同時返回消息內(nèi)容,如果不需要消息內(nèi)容,可以在命令中加JUSTID選項
XCLAIM <key> <group> <consumer> <min-idle-time> <ID-1> <ID-2> ... <ID-N>
通過設(shè)置min-idle-time可以防止消息被同時改變owner
Client 1: XCLAIM mystream mygroup Alice 3600000 1526569498055-0
Client 2: XCLAIM mystream mygroup Lora 3600000 1526569498055-0
自動claim
> XAUTOCLAIM mystream mygroup Alice 3600000 0-0 COUNT 1
查看stream的信息
> XINFO STREAM mystream
> XINFO GROUPS mystream
> XINFO CONSUMERS mystream mygroup