5. stream
? 很多同學(xué)并不認識這個數(shù)據(jù)結(jié)構(gòu),確實,在Redis 5.0之前并沒有這個數(shù)據(jù)結(jié)構(gòu)。這個數(shù)據(jù)結(jié)構(gòu)稱之為“流”,為什么叫“流”呢?這種數(shù)據(jù)就像流水一樣,不是一次潑了一盆過來而是一點一點地“流”過來。當(dāng)數(shù)據(jù)從生產(chǎn)方“流”到消費方的時候,消費方就可以對一點一點的數(shù)據(jù)進行處理,這樣就保證了處理的時效性,也讓機器能平緩地使用資源。
? 事實上,Redis的stream是作者的另一個開源項目Disque移植過來的,它高度借鑒了Kafka的設(shè)計,如果你有Kafka相關(guān)的知識,那么stream對你來說也很簡單。
5.1 相關(guān)命令
-
消息隊列相關(guān)命令:
XADD - 添加消息到末尾
XTRIM - 對流進行修剪,限制長度
XDEL - 刪除消息
XLEN - 獲取流包含的元素數(shù)量,即消息長度
XRANGE - 獲取消息列表,會自動過濾已經(jīng)刪除的消息
XREVRANGE - 反向獲取消息列表,ID 從大到小
XREAD - 以阻塞或非阻塞方式獲取消息列表
-
消費者組相關(guān)命令:
XGROUP CREATE - 創(chuàng)建消費者組
XREADGROUP GROUP - 讀取消費者組中的消息
XACK - 將消息標(biāo)記為"已處理"
XGROUP SETID - 為消費者組設(shè)置新的最后遞送消息ID
XGROUP DELCONSUMER - 刪除消費者
XGROUP DESTROY - 刪除消費者組
XPENDING - 顯示待處理消息的相關(guān)信息
XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
XINFO - 查看流和消費者組的相關(guān)信息
XINFO GROUPS - 打印消費者組的信息
XINFO STREAM - 打印流信息
5.2 結(jié)構(gòu)源碼
消息ID:
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
* of the latest generated ID and an incremented sequence. */
typedef struct streamID {
uint64_t ms; /* Unix time in milliseconds. */
uint64_t seq; /* Sequence number. */
} streamID;
流迭代器:
/* We define an iterator to iterate stream items in an abstract way, without
* caring about the radix tree + listpack representation. Technically speaking
* the iterator is only used inside streamReplyWithRange(), so could just
* be implemented inside the function, but practically there is the AOF
* rewriting code that also needs to iterate the stream to emit the XADD
* commands. */
typedef struct streamIterator {
stream *stream; /* The stream we are iterating. */
streamID master_id; /* ID of the master entry at listpack head. */
uint64_t master_fields_count; /* Master entries # of fields. */
unsigned char *master_fields_start; /* Master entries start in listpack. */
unsigned char *master_fields_ptr; /* Master field to emit next. */
int entry_flags; /* Flags of entry we are emitting. */
int rev; /* True if iterating end to start (reverse). */
uint64_t start_key[2]; /* Start key as 128 bit big endian. */
uint64_t end_key[2]; /* End key as 128 bit big endian. */
raxIterator ri; /* Rax iterator. */
unsigned char *lp; /* Current listpack. */
unsigned char *lp_ele; /* Current listpack cursor. */
unsigned char *lp_flags; /* Current entry flags pointer. */
/* Buffers used to hold the string of lpGet() when the element is
* integer encoded, so that there is no string representation of the
* element inside the listpack itself. */
unsigned char field_buf[LP_INTBUF_SIZE];
unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;
?
流:
typedef struct stream {
rax *rax; /* The radix tree holding the stream. */
uint64_t length; /* Number of elements inside this stream. */
streamID last_id; /* Zero if there are yet no items. */
rax *cgroups; /* Consumer groups dictionary: name -> streamCG */
} stream;
5.3 運行機制
? 每個 stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。
? 每個 stream 都可以掛多個消費組(Consumer Group),每個消費組會有個游標(biāo) last_delivered_id,任意一個消費者讀取了消息都會使游標(biāo) last_delivered_id 往前移動,同一個消費組的消費者只能讀取之后的消息。
? pending_ids是消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack(Acknowledge character:確認字符)的消息id。當(dāng)消息被取走,這個對應(yīng)的消息ID就會進入消費組的PEL(Pending Entries List)結(jié)構(gòu)里,當(dāng)ack之后,這個消息就會從PEL中刪除。如果stream中的消息被消費者取走但是一直不ack,那么PEL就會一直增長,如果消費者巨多,就可能出現(xiàn)內(nèi)存占用很大的情況。當(dāng)Redis服務(wù)器向消費者發(fā)送數(shù)據(jù)的時候,客戶端斷開了連接,這樣丟失的消息會在客戶端重連之后繼續(xù)發(fā)送給它,因為消息ID會保存在PEL之中。
? 除了這些,stream還提供maxlen參數(shù),來限制自身的最長消息數(shù),這樣就能保證數(shù)據(jù)不超過指定的長度。
5.4 異同
? stream 雖然和 Kafka 非常像,有消費組、水位等概念,但它并不支持天然分區(qū)。也就是說,如果我們要進行partition的話,只能建立多個 stream key ,然后由客戶端或者中間代理來將不同的消息路由到不同的 stream 中。
? 在 stream 被發(fā)布之前,Redis本身就有 pub/sub 來實現(xiàn)消息隊列的功能。不過pub/sub有個極大的缺點就是不能持久化,當(dāng)被訂閱的服務(wù)器pub一條消息,如果沒有sub的客戶端,那么這個消息就會丟失。在生產(chǎn)環(huán)境中我們沒辦法接受那么容易的丟失消息,并且 pub/sub 更適合用來做廣播,并不適合做消息隊列。作者在Reids Cluster 中大量的使用了 pub/sub 這個功能。