Redis奇幻之旅(二)5. stream

5. stream

? 很多同學(xué)并不認識這個數(shù)據(jù)結(jié)構(gòu),確實,在Redis 5.0之前并沒有這個數(shù)據(jù)結(jié)構(gòu)。這個數(shù)據(jù)結(jié)構(gòu)稱之為“流”,為什么叫“流”呢?這種數(shù)據(jù)就像流水一樣,不是一次潑了一盆過來而是一點一點地“流”過來。當(dāng)數(shù)據(jù)從生產(chǎn)方“流”到消費方的時候,消費方就可以對一點一點的數(shù)據(jù)進行處理,這樣就保證了處理的時效性,也讓機器能平緩地使用資源。

? 事實上,Redis的stream是作者的另一個開源項目Disque移植過來的,它高度借鑒了Kafka的設(shè)計,如果你有Kafka相關(guān)的知識,那么stream對你來說也很簡單。

5.1 相關(guān)命令

  • 消息隊列相關(guān)命令:

    XADD - 添加消息到末尾

    XTRIM - 對流進行修剪,限制長度

    XDEL - 刪除消息

    XLEN - 獲取流包含的元素數(shù)量,即消息長度

    XRANGE - 獲取消息列表,會自動過濾已經(jīng)刪除的消息

    XREVRANGE - 反向獲取消息列表,ID 從大到小

    XREAD - 以阻塞或非阻塞方式獲取消息列表

  • 消費者組相關(guān)命令:

    XGROUP CREATE - 創(chuàng)建消費者組

    XREADGROUP GROUP - 讀取消費者組中的消息

    XACK - 將消息標(biāo)記為"已處理"

    XGROUP SETID - 為消費者組設(shè)置新的最后遞送消息ID

    XGROUP DELCONSUMER - 刪除消費者

    XGROUP DESTROY - 刪除消費者組

    XPENDING - 顯示待處理消息的相關(guān)信息

    XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)

    XINFO - 查看流和消費者組的相關(guān)信息

    XINFO GROUPS - 打印消費者組的信息

    XINFO STREAM - 打印流信息

5.2 結(jié)構(gòu)源碼

消息ID:

/* Stream item ID: a 128 bit number composed of a milliseconds time and
 * a sequence counter. IDs generated in the same millisecond (or in a past
 * millisecond if the clock jumped backward) will use the millisecond time
 * of the latest generated ID and an incremented sequence. */
typedef struct streamID {
    uint64_t ms;        /* Unix time in milliseconds. */
    uint64_t seq;       /* Sequence number. */
} streamID;

流迭代器:

/* We define an iterator to iterate stream items in an abstract way, without
 * caring about the radix tree + listpack representation. Technically speaking
 * the iterator is only used inside streamReplyWithRange(), so could just
 * be implemented inside the function, but practically there is the AOF
 * rewriting code that also needs to iterate the stream to emit the XADD
 * commands. */
typedef struct streamIterator {
    stream *stream;         /* The stream we are iterating. */
    streamID master_id;     /* ID of the master entry at listpack head. */
    uint64_t master_fields_count;       /* Master entries # of fields. */
    unsigned char *master_fields_start; /* Master entries start in listpack. */
    unsigned char *master_fields_ptr;   /* Master field to emit next. */
    int entry_flags;                    /* Flags of entry we are emitting. */
    int rev;                /* True if iterating end to start (reverse). */
    uint64_t start_key[2];  /* Start key as 128 bit big endian. */
    uint64_t end_key[2];    /* End key as 128 bit big endian. */
    raxIterator ri;         /* Rax iterator. */
    unsigned char *lp;      /* Current listpack. */
    unsigned char *lp_ele;  /* Current listpack cursor. */
    unsigned char *lp_flags; /* Current entry flags pointer. */
    /* Buffers used to hold the string of lpGet() when the element is
     * integer encoded, so that there is no string representation of the
     * element inside the listpack itself. */
    unsigned char field_buf[LP_INTBUF_SIZE];
    unsigned char value_buf[LP_INTBUF_SIZE];
} streamIterator;

?

流:

typedef struct stream {
    rax *rax;               /* The radix tree holding the stream. */
    uint64_t length;        /* Number of elements inside this stream. */
    streamID last_id;       /* Zero if there are yet no items. */
    rax *cgroups;           /* Consumer groups dictionary: name -> streamCG */
} stream;

5.3 運行機制

? 每個 stream 都有唯一的名稱,它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時自動創(chuàng)建。

? 每個 stream 都可以掛多個消費組(Consumer Group),每個消費組會有個游標(biāo) last_delivered_id,任意一個消費者讀取了消息都會使游標(biāo) last_delivered_id 往前移動,同一個消費組的消費者只能讀取之后的消息。

? pending_ids是消費者(Consumer)的狀態(tài)變量,作用是維護消費者的未確認的 id。 它記錄了當(dāng)前已經(jīng)被客戶端讀取的消息,但是還沒有 ack(Acknowledge character:確認字符)的消息id。當(dāng)消息被取走,這個對應(yīng)的消息ID就會進入消費組的PEL(Pending Entries List)結(jié)構(gòu)里,當(dāng)ack之后,這個消息就會從PEL中刪除。如果stream中的消息被消費者取走但是一直不ack,那么PEL就會一直增長,如果消費者巨多,就可能出現(xiàn)內(nèi)存占用很大的情況。當(dāng)Redis服務(wù)器向消費者發(fā)送數(shù)據(jù)的時候,客戶端斷開了連接,這樣丟失的消息會在客戶端重連之后繼續(xù)發(fā)送給它,因為消息ID會保存在PEL之中。

? 除了這些,stream還提供maxlen參數(shù),來限制自身的最長消息數(shù),這樣就能保證數(shù)據(jù)不超過指定的長度。

5.4 異同

? stream 雖然和 Kafka 非常像,有消費組、水位等概念,但它并不支持天然分區(qū)。也就是說,如果我們要進行partition的話,只能建立多個 stream key ,然后由客戶端或者中間代理來將不同的消息路由到不同的 stream 中。

? 在 stream 被發(fā)布之前,Redis本身就有 pub/sub 來實現(xiàn)消息隊列的功能。不過pub/sub有個極大的缺點就是不能持久化,當(dāng)被訂閱的服務(wù)器pub一條消息,如果沒有sub的客戶端,那么這個消息就會丟失。在生產(chǎn)環(huán)境中我們沒辦法接受那么容易的丟失消息,并且 pub/sub 更適合用來做廣播,并不適合做消息隊列。作者在Reids Cluster 中大量的使用了 pub/sub 這個功能。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。
禁止轉(zhuǎn)載,如需轉(zhuǎn)載請通過簡信或評論聯(lián)系作者。

相關(guān)閱讀更多精彩內(nèi)容

  • 原文鏈接:Redis實現(xiàn)消息隊列的方案 Redis作為內(nèi)存中的數(shù)據(jù)結(jié)構(gòu)存儲,常用作數(shù)據(jù)庫、緩存和消息代理。它支持數(shù)...
    這個ID狠溫柔閱讀 101,500評論 2 28
  • 夜鶯2517閱讀 128,087評論 1 9
  • 版本:ios 1.2.1 亮點: 1.app角標(biāo)可以實時更新天氣溫度或選擇空氣質(zhì)量,建議處女座就不要選了,不然老想...
    我就是沉沉閱讀 7,361評論 1 6
  • 我是一名過去式的高三狗,很可悲,在這三年里我沒有戀愛,看著同齡的小伙伴們一對兒一對兒的,我的心不好受。怎么說呢,高...
    小娘紙閱讀 3,738評論 4 7
  • 這些日子就像是一天一天在倒計時 一想到他走了 心里就是說不出的滋味 從幾個月前認識他開始 就意識到終究會發(fā)生的 只...
    栗子a閱讀 1,715評論 1 3

友情鏈接更多精彩內(nèi)容