Redis 源碼簡潔剖析 06 - quicklist 和 listpack

quicklist

為什么要設(shè)計 quicklist

ziplist 有兩個問題,參考 Redis 源碼簡潔剖析 05 - ziplist 壓縮列表

  • 不能保存過多的元素,否則訪問性能會下降
  • 不能保存過大的元素,否則容易導(dǎo)致內(nèi)存重新分配,甚至引起連鎖更新

特點(diǎn)

quicklist 的設(shè)計,其實(shí)是結(jié)合了鏈表和 ziplist 各自的優(yōu)勢。簡單來說,一個 quicklist 就是一個鏈表,而鏈表中的每個元素又是一個 ziplist。

  • 結(jié)構(gòu)定義:quicklist.h
  • 實(shí)現(xiàn):quicklist.c

數(shù)據(jù)結(jié)構(gòu)

quicklist 是一個鏈表,所以每個 quicklistNode 中,都包含了分別指向它前序和后序節(jié)點(diǎn)的指針* prev 和* next。同時,每個 quicklistNode 又是一個 ziplist,所以,在 quicklistNode 的結(jié)構(gòu)體中,還有指向 ziplist 的指針* zl。

每個元素節(jié)點(diǎn) quicklistNode 的定義如下:

typedef struct quicklistNode {
    // 前一個 quicklistNode
    struct quicklistNode *prev;
    // 后一個 quicklistNode
    struct quicklistNode *next;
    // quicklistNode 指向的 ziplist
    unsigned char *zl;
    // ziplist 的字節(jié)大小
    unsigned int sz;
    // ziplist 的元素個數(shù)
    unsigned int count: 16;
    // 編碼方式,『原生字節(jié)數(shù)組』或「壓縮存儲」
    unsigned int encoding: 2;
    // 存儲方式,NONE==1 or ZIPLIST==2
    unsigned int container: 2;
    // 數(shù)據(jù)是否被壓縮
    unsigned int recompress: 1;
    // 數(shù)據(jù)能否被壓縮
    unsigned int attempted_compress: 1;
    // 預(yù)留的 bit 位
    unsigned int extra: 10;
} quicklistNode;

quicklist 的結(jié)構(gòu)體定義如下:

typedef struct quicklist {
    // quicklist 的鏈表頭
    quicklistNode *head;
    // quicklist 的鏈表尾
    quicklistNode *tail;
    // 所有 ziplist 中的總元素個數(shù)
    unsigned long count;
    // quicklistNodes 的個數(shù)
    unsigned long len;
    ……
} quicklist;
image
typedef struct quicklistEntry {
    const quicklist *quicklist;
    quicklistNode *node;
    unsigned char *zi;
    unsigned char *value;
    long long longval;
    unsigned int sz;
    int offset;
} quicklistEntry;

quicklistCreate

quicklist *quicklistCreate(void) {
    struct quicklist *quicklist;

    quicklist = zmalloc(sizeof(*quicklist));
    quicklist->head = quicklist->tail = NULL;
    quicklist->len = 0;
    quicklist->count = 0;
    quicklist->compress = 0;
    quicklist->fill = -2;
    quicklist->bookmark_count = 0;
    return quicklist;
}

quicklistDelIndex

REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
                                   unsigned char **p) {
    int gone = 0;

    // 在該節(jié)點(diǎn)下的 ziplist 中刪除
    node->zl = ziplistDelete(node->zl, p);
    // count-1
    node->count--;
    // ziplist 數(shù)量為空,直接刪除該節(jié)點(diǎn)
    if (node->count == 0) {
        gone = 1;
        __quicklistDelNode(quicklist, node);
    } else {
        quicklistNodeUpdateSz(node);
    }
    // 更新所有 ziplist 中的總元素個數(shù)
    quicklist->count--;
    return gone ? 1 : 0;
}

quicklistDelEntry

核心還是調(diào)用了 quicklistDelIndex,但是 quicklistDelEntry 要維護(hù) quicklistNode 的節(jié)點(diǎn),包括迭代器。

void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
    quicklistNode *prev = entry->node->prev;
    quicklistNode *next = entry->node->next;
    int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
                                         entry->node, &entry->zi);

    iter->zi = NULL;

    // 如果當(dāng)前節(jié)點(diǎn)被刪除,更新 iterator
    if (deleted_node) {
        if (iter->direction == AL_START_HEAD) {
            iter->current = next;
            iter->offset = 0;
        } else if (iter->direction == AL_START_TAIL) {
            iter->current = prev;
            iter->offset = -1;
        }
    }
}

quicklistInsertBefore, quicklistInsertAfter

插入分為兩種:

void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *node,
                          void *value, const size_t sz);
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *node,
                           void *value, const size_t sz);

其底層都調(diào)用了_quicklistInsert 函數(shù):

void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
                           void *value, const size_t sz) {
    _quicklistInsert(quicklist, entry, value, sz, 0);
}

void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry,
                          void *value, const size_t sz) {
    _quicklistInsert(quicklist, entry, value, sz, 1);
}

_quicklistInsert 函數(shù)比較長,但是邏輯很簡單,就是判斷應(yīng)該在哪里插入新元素:

  • 在后面插入
    • 當(dāng)前 entry 的 ziplist 未滿:直接插入
    • 當(dāng)前 entry 的 ziplist 已滿:
      • entry->next 的 ziplist 未滿:在其頭部插入
      • entry->next 的 ziplist 已滿:拆分當(dāng)前 entry
  • 在前面插入
    • 當(dāng)前 entry 的 ziplist 未滿:直接插入
    • 當(dāng)前 entry 的 ziplist 已滿:
      • entry->prev 的 ziplist 未滿:在其尾部插入
      • entry->prev 的 ziplist 已滿:拆分當(dāng)前 entry
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
                                   void *value, const size_t sz, int after) {
    int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
    int fill = quicklist->fill;
    quicklistNode *node = entry->node;
    quicklistNode *new_node = NULL;
    assert(sz < UINT32_MAX); /* TODO: add support for quicklist nodes that are sds encoded (not zipped) */

    ……

    /* Populate accounting flags for easier boolean checks later */
    if (!_quicklistNodeAllowInsert(node, fill, sz)) {
        full = 1;
    }

    // 在后面插入 && 當(dāng)前 entry 的 ziplist 已滿
    if (after && (entry->offset == node->count)) {
        at_tail = 1;
        // 判斷 next 節(jié)點(diǎn)是否可插入
        if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
            full_next = 1;
        }
    }

    // 在前面插入 && 在頭部
    if (!after && (entry->offset == 0)) {
        at_head = 1;
        if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
            full_prev = 1;
        }
    }

    /* Now determine where and how to insert the new element */
    // 在尾部插入 && 當(dāng)前節(jié)點(diǎn)能插入
    if (!full && after) {
        D("Not full, inserting after current position.");
        quicklistDecompressNodeForUse(node);
        unsigned char *next = ziplistNext(node->zl, entry->zi);
        if (next == NULL) {
            node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
        } else {
            node->zl = ziplistInsert(node->zl, next, value, sz);
        }
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (!full && !after) {
        // 在前面插入 && 當(dāng)前節(jié)點(diǎn)能插入
        // 直接在當(dāng)前節(jié)點(diǎn)插入即可
        D("Not full, inserting before current position.");
        quicklistDecompressNodeForUse(node);
        node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (full && at_tail && node->next && !full_next && after) {
        // 在后面插入 && 在 ziplist 尾部插入 && 當(dāng)前節(jié)點(diǎn)不能插入 && next 節(jié)點(diǎn)能插入
        // 在 next 節(jié)點(diǎn)的頭部插入
        D("Full and tail, but next isn't full; inserting next node head");
        new_node = node->next;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && at_head && node->prev && !full_prev && !after) {
        // 在前面插入 && 在 ziplist 頭部插入 && 當(dāng)前節(jié)點(diǎn)不能插入 && prev 節(jié)點(diǎn)能插入
        // 在 prev 節(jié)點(diǎn)的尾部插入
        D("Full and head, but prev isn't full, inserting prev node tail");
        new_node = node->prev;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && ((at_tail && node->next && full_next && after) ||
                        (at_head && node->prev && full_prev && !after))) {
        // 當(dāng)前節(jié)點(diǎn)不能插入 && 不能在前后節(jié)點(diǎn)插入
        // 創(chuàng)建新節(jié)點(diǎn)
        D("\tprovisioning new node...");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
    } else if (full) {
        // 節(jié)點(diǎn)是滿的,需要將當(dāng)前節(jié)點(diǎn)的 ziplist 拆分
        D("\tsplitting node...");
        quicklistDecompressNodeForUse(node);
        new_node = _quicklistSplitNode(node, entry->offset, after);
        new_node->zl = ziplistPush(new_node->zl, value, sz,
                                   after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
        _quicklistMergeNodes(quicklist, node);
    }

    quicklist->count++;
}

其余方法就不一一展開了,大同小異。

listpack

是什么

緊湊列表,用一塊連續(xù)的內(nèi)存空間來緊湊保存數(shù)據(jù),同時使用多種編碼方式,表示不同長度的數(shù)據(jù)(字符串、整數(shù))。

  • 結(jié)構(gòu)定義:listpack.h
  • 實(shí)現(xiàn):listpack.c

數(shù)據(jù)結(jié)構(gòu)

image

編碼類型

在 listpack.c 文件中,有大量的 LP_ENCODING__XX_BIT_INT 和 LP_ENCODING__XX_BIT_STR 的宏定義:

#define LP_ENCODING_7BIT_UINT 0
#define LP_ENCODING_7BIT_UINT_MASK 0x80
#define LP_ENCODING_IS_7BIT_UINT(byte) (((byte)&LP_ENCODING_7BIT_UINT_MASK)==LP_ENCODING_7BIT_UINT)

#define LP_ENCODING_6BIT_STR 0x80
#define LP_ENCODING_6BIT_STR_MASK 0xC0
#define LP_ENCODING_IS_6BIT_STR(byte) (((byte)&LP_ENCODING_6BIT_STR_MASK)==LP_ENCODING_6BIT_STR)

……

listpack 元素會對不同長度的整數(shù)和字符串進(jìn)行編碼。

整數(shù)編碼

以 LP_ENCODING_7BIT_UINT 為例,元素的實(shí)際數(shù)據(jù)是一個 7 bit 的無符號整數(shù)。

image

對于 LP_ENCODING_13BIT_INT,元素實(shí)際數(shù)據(jù)的表示位數(shù)是 13 位,最高 3 位是 110,表示當(dāng)前的編碼類型。

#define LP_ENCODING_13BIT_INT 0xC0
#define LP_ENCODING_13BIT_INT_MASK 0xE0
#define LP_ENCODING_IS_13BIT_INT(byte) (((byte)&LP_ENCODING_13BIT_INT_MASK)==LP_ENCODING_13BIT_INT)
image

整數(shù)其余的編碼方式有:

  • LP_ENCODING_16BIT_INT
  • LP_ENCODING_24BIT_INT
  • LP_ENCODING_32BIT_INT
  • LP_ENCODING_64BIT_INT

字符串編碼

3 種類型:

  • LP_ENCODING_6BIT_STR
  • LP_ENCODING_12BIT_STR
  • LP_ENCODING_32BIT_STR

以編碼類型 LP_ENCODING_6BIT_STR 為例,編碼類型占 1 個字節(jié)。這 1 個字節(jié)包括兩個部分:

  1. 宏定義 2 位,標(biāo)識編碼類型
  2. 后 6 位保存字符串長度
image

如何避免連鎖更新?

每個列表項都只記錄自己的長度,不會像 ziplist 的列表項會記錄前一項的長度。所以在 listpack 中新增或修改元素,只會涉及到列表項自身的操作,不會影響后續(xù)列表項的長度變化,進(jìn)而避免連鎖更新。

lpNew

unsigned char *lpNew(size_t capacity) {
    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    // 設(shè)置 listpack 的大小
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    // 設(shè)置 listpack 的元素個數(shù),初始是 0
    lpSetNumElements(lp,0);
    // 設(shè)置 listpack 的結(jié)尾標(biāo)識符 LP_EOF,值是 255
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}
#define lpSetTotalBytes(p,v) do { \
    (p)[0] = (v)&0xff; \
    (p)[1] = ((v)>>8)&0xff; \
    (p)[2] = ((v)>>16)&0xff; \
    (p)[3] = ((v)>>24)&0xff; \
} while(0)

#define lpSetNumElements(p,v) do { \
    (p)[4] = (v)&0xff; \
    (p)[5] = ((v)>>8)&0xff; \
} while(0)

lpFirst

獲取 listpack 的第一個元素。

/* Return a pointer to the first element of the listpack, or NULL if the
 * listpack has no elements. */
unsigned char *lpFirst(unsigned char *lp) {
    // 跳過 listpack 的頭部 6 字節(jié)
    unsigned char *p = lp + LP_HDR_SIZE; /* Skip the header. */
    // 若是末尾結(jié)束字節(jié),返回 NULL
    if (p[0] == LP_EOF) return NULL;
    lpAssertValidEntry(lp, lpBytes(lp), p);
    return p;
}

lpNext

/* If 'p' points to an element of the listpack, calling lpNext() will return
 * the pointer to the next element (the one on the right), or NULL if 'p'
 * already pointed to the last element of the listpack. */
unsigned char *lpNext(unsigned char *lp, unsigned char *p) {
    ……
    // 偏移指針指向下一個列表項
    p = lpSkip(p);
    if (p[0] == LP_EOF) return NULL;
    ……
    return p;
}

核心是調(diào)用了 lpSkip 函數(shù):

unsigned char *lpSkip(unsigned char *p) {
    // 計算當(dāng)前 entry 編碼類型和實(shí)際數(shù)據(jù)的總長度
    unsigned long entrylen = lpCurrentEncodedSizeUnsafe(p);
    entrylen += lpEncodeBacklen(NULL,entrylen);
    // 偏移指針
    p += entrylen;
    return p;
}

lpSkip 核心是調(diào)用了 lpCurrentEncodedSizeUnsafe 函數(shù)獲取當(dāng)前 entry 的總長度:

uint32_t lpCurrentEncodedSizeUnsafe(unsigned char *p) {
    // LP_ENCODING_IS_7BIT_UINT,編碼類型和整數(shù)數(shù)值都在同一個字節(jié),所以返回 1
    if (LP_ENCODING_IS_7BIT_UINT(p[0])) return 1;
    // LP_ENCODING_IS_6BIT_STR,1 字節(jié)表示編碼類型和字符串長度,該字節(jié)后 6 位表示字符串長度
    if (LP_ENCODING_IS_6BIT_STR(p[0])) return 1+LP_ENCODING_6BIT_STR_LEN(p);
    if (LP_ENCODING_IS_13BIT_INT(p[0])) return 2;
    if (LP_ENCODING_IS_16BIT_INT(p[0])) return 3;
    if (LP_ENCODING_IS_24BIT_INT(p[0])) return 4;
    if (LP_ENCODING_IS_32BIT_INT(p[0])) return 5;
    if (LP_ENCODING_IS_64BIT_INT(p[0])) return 9;
    if (LP_ENCODING_IS_12BIT_STR(p[0])) return 2+LP_ENCODING_12BIT_STR_LEN(p);
    if (LP_ENCODING_IS_32BIT_STR(p[0])) return 5+LP_ENCODING_32BIT_STR_LEN(p);
    if (p[0] == LP_EOF) return 1;
    return 0;
}

大同小異,這里只介紹下 LP_ENCODING_IS_6BIT_STR,1 字節(jié)表示編碼類型和字符串長度,該字節(jié)后 6 位表示字符串長度。使用 LP_ENCODING_6BIT_STR_LEN 宏定義計算后 6 位的數(shù)值。

#define LP_ENCODING_6BIT_STR_LEN(p) ((p)[0] & 0x3F)

lpSkip 函數(shù)還調(diào)用了 lpEncodeBacklen 函數(shù),計算 entry 最后一部分 len 的長度(即下圖中的 len)。

image

unsigned long lpEncodeBacklen(unsigned char *buf, uint64_t l) {
    //編碼類型和實(shí)際數(shù)據(jù)的總長度小于等于 127,entry-len 長度為 1 字節(jié)
    if (l <= 127) {
        ...
        return 1;
    } else if (l < 16383) { //編碼類型和實(shí)際數(shù)據(jù)的總長度大于 127 但小于 16383,entry-len 長度為 2 字節(jié)
       ...
        return 2;
    } else if (l < 2097151) {//編碼類型和實(shí)際數(shù)據(jù)的總長度大于 16383 但小于 2097151,entry-len 長度為 3 字節(jié)
       ...
        return 3;
    } else if (l < 268435455) { //編碼類型和實(shí)際數(shù)據(jù)的總長度大于 2097151 但小于 268435455,entry-len 長度為 4 字節(jié)
        ...
        return 4;
    } else { //否則,entry-len 長度為 5 字節(jié)
       ...
        return 5;
    }
}

lpPrev

unsigned char *lpPrev(unsigned char *lp, unsigned char *p) {
    assert(p);
    // 如果是第一項,直接返回 NULL
    if (p-lp == LP_HDR_SIZE) return NULL;
    p--; /* Seek the first backlen byte of the last element. */
    uint64_t prevlen = lpDecodeBacklen(p);
    prevlen += lpEncodeBacklen(NULL,prevlen);
    p -= prevlen-1; /* Seek the first byte of the previous entry. */
    lpAssertValidEntry(lp, lpBytes(lp), p);
    return p;
}

lpGet

就是按照編碼類型,然后解析出實(shí)際數(shù)據(jù)所占字節(jié)數(shù),進(jìn)而獲取對應(yīng)數(shù)值。

unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf) {
    int64_t val;
    uint64_t uval, negstart, negmax;

    assert(p); /* assertion for valgrind (avoid NPD) */
    if (LP_ENCODING_IS_7BIT_UINT(p[0])) {
        negstart = UINT64_MAX; /* 7 bit ints are always positive. */
        negmax = 0;
        uval = p[0] & 0x7f;
    } else if (LP_ENCODING_IS_6BIT_STR(p[0])) {
        *count = LP_ENCODING_6BIT_STR_LEN(p);
        return p+1;
    } else if (LP_ENCODING_IS_13BIT_INT(p[0])) {
        ……
    } else if (LP_ENCODING_IS_16BIT_INT(p[0])) {
        ……
    } else if (LP_ENCODING_IS_24BIT_INT(p[0])) {
        ……
    } else if (LP_ENCODING_IS_32BIT_INT(p[0])) {
        ……
    } else if (LP_ENCODING_IS_64BIT_INT(p[0])) {
        ……
    } else if (LP_ENCODING_IS_12BIT_STR(p[0])) {
        ……
    } else if (LP_ENCODING_IS_32BIT_STR(p[0])) {
        ……
    } else {
        ……
    }

    /* We reach this code path only for integer encodings.
     * Convert the unsigned value to the signed one using two's complement
     * rule. */
    if (uval >= negstart) {
        /* This three steps conversion should avoid undefined behaviors
         * in the unsigned -> signed conversion. */
        uval = negmax-uval;
        val = uval;
        val = -val-1;
    } else {
        val = uval;
    }

    ……
}

參考鏈接

Redis 源碼簡潔剖析系列

最簡潔的 Redis 源碼剖析系列文章

Java 編程思想-最全思維導(dǎo)圖-GitHub 下載鏈接,需要的小伙伴可以自取~

原創(chuàng)不易,希望大家轉(zhuǎn)載時請先聯(lián)系我,并標(biāo)注原文鏈接。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容