學(xué)習(xí)redis源碼():quicklist

參考鏈接:

https://blog.csdn.net/harleylau/article/details/80534159
http://zhangtielei.com/posts/blog-redis-quicklist.html
https://blog.csdn.net/qq_30085733/article/details/79914295
http://czrzchao.com/redisSourceQuicklist
https://blog.csdn.net/qq_30085733/article/details/79914295

在redis3.2版本之前,redis的列表對象的底層數(shù)據(jù)結(jié)構(gòu)是ziplist和linkedlist。當(dāng)列表對象中元素的長度比較小或者數(shù)量比較少的時候,采用ziplist來存儲,當(dāng)列表對象中元素的長度比較大或者數(shù)量比較多的時候,則會轉(zhuǎn)而使用雙向列表linkedlist來存儲。但是最新的redis版本中,redis的list的底層數(shù)據(jù)結(jié)構(gòu)統(tǒng)一為quicklist,quicklist是ziplist和linkedlist的結(jié)合,其本質(zhì)上是一個雙向鏈表,鏈表中的每一個元素是ziplist。其數(shù)據(jù)結(jié)構(gòu)如下:

typedef struct quicklist {
    quicklistNode *head;        // 指向quicklist的頭部
    quicklistNode *tail;        // 指向quicklist的尾部
    unsigned long count;        // 列表中所有數(shù)據(jù)項的個數(shù)總和
    unsigned int len;           // quicklist節(jié)點的個數(shù),即ziplist的個數(shù)
    int fill : 16;              // ziplist大小限定,由list-max-ziplist-size給定
    unsigned int compress : 16; // 節(jié)點壓縮深度設(shè)置(首尾兩端不被壓縮節(jié)點的個數(shù)),由list-compress-depth給定,該值為0表示不允許對這個quicklist進行壓縮
} quicklist;

其中,len表示quicklist元素的個數(shù),即quicklist包含多少個ziplist節(jié)點(quicklist的每一個節(jié)點都是ziplist)。由于每個ziplist又是由多個數(shù)據(jù)節(jié)點組成的,所以count代表quicklist中一共有多少個數(shù)據(jù)節(jié)點。fill和compress決定了每個ziplist的屬性。

redis會利用LZF壓縮算法對quicklist的節(jié)點進行壓縮操作,原因在于:list的設(shè)計目的是能夠存放很長的數(shù)據(jù)列表,當(dāng)列表很長時,必然會占用很高的內(nèi)存空間,且list中最容易訪問的是兩端的數(shù)據(jù),中間的數(shù)據(jù)訪問率較低,如果將不怎么被訪問的數(shù)據(jù)進行壓縮,只在需要時才進行解壓縮來訪問原始數(shù)據(jù),那么就可以在盡量少影響業(yè)務(wù)流暢度的基礎(chǔ)上減少數(shù)據(jù)存儲的空間。基于此,redis提出了compress depth這個概念--保持首尾compress depth個quickliostNode不進行壓縮,對中間的quicklistNode進行壓縮處理。

compress:這個字段是修飾整個quicklist的,代表首尾兩端不被壓縮的節(jié)點的個數(shù)。節(jié)點壓縮深度設(shè)置,存放
list-compress-depth參數(shù)的值。
*   0 特殊值,表示不壓縮
*   1 表示quicklist兩端各有一個節(jié)點不壓縮,中間的節(jié)點壓縮
*   2 表示quicklist兩端各有兩個節(jié)點不壓縮,中間的節(jié)點壓縮
*   3 表示quicklist兩端各有三個節(jié)點不壓縮,中間的節(jié)點壓縮
*   以此類推。

compress最大值:
#define COMPRESS_MAX (1 << 16)

由于quicklist結(jié)構(gòu)包含了壓縮表和鏈表,那么每個quicklistNode的大小就是一個需要仔細考量的點。如果單個quicklistNode存儲的數(shù)據(jù)太多,就會影響插入效率;但是如果單個quicklistNode太小,就會變得跟鏈表一樣造成空間浪費。
quicklist通過fill對單個quicklistNode的大小進行限制:

fill:ziplist大小設(shè)置,表示每個quicklist節(jié)點的元素大小限制,存放 list-max-ziplist-size參數(shù)的值。
fill可正可負,如果參數(shù)為正,表示按照數(shù)據(jù)項個數(shù)來限定每個節(jié)點中的元素個數(shù),比如3代表每個節(jié)點中存放的元
素個數(shù)不能超過3(最大為32768個);反之,如果參數(shù)為負,表示按照字節(jié)數(shù)來限定每個節(jié)點中的元素個數(shù),它只能
取-1~-5這五個數(shù),其含義如下:
*   -1 每個節(jié)點的ziplist字節(jié)大小不能超過4kb
*   -2 每個節(jié)點的ziplist字節(jié)大小不能超過8kb(-2是Redis給出的默認值)
*   -3 每個節(jié)點的ziplist字節(jié)大小不能超過16kb
*   -4 每個節(jié)點的ziplist字節(jié)大小不能超過32kb
*   -5 每個節(jié)點的ziplist字節(jié)大小不能超過64kb

-1~-5對應(yīng)下面這個數(shù)組,例如:-1對應(yīng)4096,-5對應(yīng)65536.
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
fill為正數(shù)時的最大值:
#define FILL_MAX (1 << 15)

quicklist節(jié)點有兩種狀態(tài),RAW狀態(tài)和LZF狀態(tài)。RAW狀態(tài)就是原始的ziplist結(jié)構(gòu)(解壓縮狀態(tài));LZF狀態(tài)則是利用LZF壓縮算法對quicklist的節(jié)點進行壓縮操作,其結(jié)構(gòu)為quicklistLZF。

typedef struct quicklistNode {
    struct quicklistNode *prev;  // 指向上一個ziplist節(jié)點
    struct quicklistNode *next;  // 指向下一個ziplist節(jié)點
    unsigned char *zl;           // 數(shù)據(jù)指針,如果沒有被壓縮,就指向ziplist結(jié)構(gòu),反之指向quicklistLZF結(jié)構(gòu) 
    unsigned int sz;             // 表示指向ziplist結(jié)構(gòu)的總長度(內(nèi)存占用長度)
    unsigned int count : 16;     // 表示ziplist中的數(shù)據(jù)項個數(shù)
    unsigned int encoding : 2;   // 編碼方式,1--ziplist,2--quicklistLZF
    unsigned int container : 2;  // 預(yù)留字段,存放數(shù)據(jù)的方式,1--NONE,2--ziplist
    unsigned int recompress : 1; // 當(dāng)我們使用類似lindex這樣的命令查看了某一項本來壓縮的數(shù)據(jù)時,需要把數(shù)據(jù)暫時解壓,這時就設(shè)置recompress=1做一個標(biāo)記,等有機會再把數(shù)據(jù)重新壓縮。
    unsigned int attempted_compress : 1; // 測試相關(guān)
    unsigned int extra : 10; // 擴展字段,暫時沒用
} quicklistNode;
typedef struct quicklistLZF {
    unsigned int sz; /* LZF size in bytes*/
    char compressed[];
} quicklistLZF;

quciklist的迭代器結(jié)構(gòu),指向迭代器節(jié)點元素(當(dāng)前節(jié)點表示quicklist中的某個ziplist中的某個ziplistNode,也就是quicklistEntry最終指向的是這個ziplisyEntry,所以quicklist的迭代,是從當(dāng)前entry依次往后遍歷所有ziplistNode,而不是單單遍歷每個quicklistNode):

// quicklist的迭代器結(jié)構(gòu)
typedef struct quicklistIter {
    const quicklist *quicklist;  // 指向所在quicklist的指針
    quicklistNode *current;  // 指向當(dāng)前節(jié)點的指針
    unsigned char *zi;  // 指向當(dāng)前節(jié)點(ziplist中的·一個元素)
    long offset; // 當(dāng)前ziplist中的偏移地址
    int direction; // 迭代器的方向
} quicklistIter;
// 表示quicklist節(jié)點中ziplist里的一個節(jié)點結(jié)構(gòu)
typedef struct quicklistEntry {
    const quicklist *quicklist;  // 指向所在quicklist的指針
    quicklistNode *node;  // 指向當(dāng)前節(jié)點的指針
    unsigned char *zi;  // 指向當(dāng)前節(jié)點(ziplist中的·一個元素)
    unsigned char *value;  // 當(dāng)前指向的ziplist中的節(jié)點的字符串值
    long long longval;  // 當(dāng)前指向的ziplist中的節(jié)點的整型值
    unsigned int sz;  // 當(dāng)前指向的ziplist中的節(jié)點的字節(jié)大小
    int offset;  // 當(dāng)前指向的ziplist中的節(jié)點相對于ziplist的偏移量
} quicklistEntry;

quicklist結(jié)構(gòu)如圖所示:

quicklist結(jié)構(gòu)圖

quicklist的主要api:

  • 創(chuàng)建quicklist:quicklist *quicklistCreate(void)
  • quicklist節(jié)點創(chuàng)建:REDIS_STATIC quicklistNode *quicklistCreateNode(void)
  • 刪除quicklist:void quicklistRelease(quicklist *quicklist)
  • quicklist的insert操作:insert操作類似push操作,不過是在某個迭代器指向的當(dāng)前ziplistNode節(jié)點前/后insert一個新的ziplistNode元素。
    void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz)
    void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz)
  • quicklist的push操作:在quicklist的頭/尾節(jié)點(ziplist)中push一個ziplistNode元素。
    int quicklistPushHead(quicklist *quicklist, void *value, size_t sz)
    int quicklistPushTail(quicklist *quicklist, void *value, size_t sz)
  • quicklist的pop操作:int quicklistPop(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *slong)
  • quicklist節(jié)點刪除:void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry)
  • quicklist壓縮:quicklistCompress(_ql, _node)

quicklist重要接口實現(xiàn)講解:

  • quicklist壓縮:quicklistCompress(_ql, _node)
/*
如果recompress=1,表示這個節(jié)點之前因為index等操作被“暫時”解壓縮以查看原始數(shù)據(jù),
此時可以對這個節(jié)點直接執(zhí)行壓縮操作,將這個節(jié)點重新壓縮(不需要判斷quicklist的
compress字段是否為0,因為是暫時解壓縮的,所以原來的quicklist的這個節(jié)點一定是支持壓縮的。)
*/
#define quicklistCompress(_ql, _node)                                          \
    do {                                                                       \
        if ((_node)->recompress)                                               \
            quicklistCompressNode((_node));                                    \
        else                                                                   \
            __quicklistCompress((_ql), (_node));                               \
    } while (0)


內(nèi)部接口:


/*
在弄懂這個函數(shù)之前,最重要的是明白quicklist->compress字段的意思,這個字段是修飾整個
quicklist的,代表首尾兩端不被壓縮的節(jié)點的個數(shù),所以在執(zhí)行compress之前,需要先判斷
要被壓縮的目標(biāo)節(jié)點是否可以被壓縮。
*/
//該函數(shù)應(yīng)用很廣,delete或者insert新元素都需要調(diào)用這個函數(shù)來進行quicklist屬性適配
REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
                                      quicklistNode *node) {
    /* If length is less than our compress depth (from both sides),
     * we can't compress anything. */
    if (!quicklistAllowsCompression(quicklist) ||
        quicklist->len < (unsigned int)(quicklist->compress * 2))
        return;
    /* Iterate until we reach compress depth for both sides of the list.a
     * Note: because we do length checks at the *top* of this function,
     *       we can skip explicit null checks below. Everything exists. 
     */
    quicklistNode *forward = quicklist->head;
    quicklistNode *reverse = quicklist->tail;
    int depth = 0;
    int in_depth = 0;
    while (depth++ < quicklist->compress) {
       /*
          從quicklist的頭尾兩邊分別遍歷,因為頭部的compress個節(jié)點和尾部的compress個
          節(jié)點都一定是非壓縮的原始狀態(tài),為了確保滿足compress字段要求,首先將這幾個節(jié)點
          強制再解壓縮回原始狀態(tài)。之所以進行這一步操作的原因是因為,inset或者delete操作
          可能會將原來不處于compress depth中的元素進入compress depth,所以要將這些
          元素變?yōu)檎5臓顟B(tài)
        */
        quicklistDecompressNode(forward);
        quicklistDecompressNode(reverse);

        /*
         目標(biāo)節(jié)點在頭部或尾部的那compress個非壓縮的節(jié)點中,不應(yīng)該壓縮這個節(jié)點,但是可能會壓縮最靠
         近非壓縮深度外的兩個節(jié)點(???),所以只是將in_depth標(biāo)志置為1,表示目標(biāo)節(jié)點在非壓縮的
         深度中(深度指不被壓縮的數(shù)目),但不會return。
         */
        if (forward == node || reverse == node)
            in_depth = 1;

        //quicklist的長度不及2*compress個,不進行下一步的壓縮操作(異常情況,違背入口的判斷條件)
        if (forward == reverse)
            return;

        forward = forward->next;
        reverse = reverse->prev;
    }

    //目標(biāo)節(jié)點不在非壓縮深度中(滿足壓縮條件),直接壓縮
    if (!in_depth)
        quicklistCompressNode(node);

    //為啥壓縮深度外的那兩個?不太清楚??不應(yīng)該只壓縮一個滿足情況的node就好了么??                                                              
    if (depth > 2) {
        /* At this point, forward and reverse are one node beyond depth */
        quicklistCompressNode(forward);
        quicklistCompressNode(reverse);
    }
}

/* If we previously used quicklistDecompressNodeForUse(), just recompress. */
/* 之前如果為了某個需要將某個原始節(jié)點暫時解壓縮了,此時要將該節(jié)點恢復(fù)為壓縮狀態(tài) */
#define quicklistRecompressOnly(_ql, _node)                                    \
    do {                                                                       \
        if ((_node)->recompress)                                               \
            quicklistCompressNode((_node));                                    \
    } while (0)

  • quicklist insert操作:
    quicklist還能在指定的位置插入數(shù)據(jù),quicklistInsertAfter和quicklistInsertBefore就是分別在指定位置后面和前面插入數(shù)據(jù)項。當(dāng)然,和在頭尾節(jié)點插入一樣, 任意位置插入也是需要判斷當(dāng)前插入節(jié)點是否能夠放得下當(dāng)前的元素的,這邊的情況會比頭尾節(jié)點更為復(fù)雜,比如在當(dāng)前節(jié)點放不下的時候,還需要檢查一下旁邊的節(jié)點是否能夠放下這個數(shù)據(jù),能夠放下的話可以放置在旁邊的節(jié)點上,如果也不行的話,就是需要新建一個ziplist節(jié)點。
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
                           void *value, const size_t sz) {
    _quicklistInsert(quicklist, entry, value, sz, 0);
}

內(nèi)部接口:
/* Insert a new entry before or after existing entry 'entry'.
 *
 * If after==1, the new value is inserted after 'entry', otherwise
 * the new value is inserted before 'entry'. */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
                                   void *value, const size_t sz, int after) {
    int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
    int fill = quicklist->fill;
    quicklistNode *node = entry->node;
    quicklistNode *new_node = NULL;

    if (!node) {
        /* we have no reference node, so let's create only node in the list */
        /* 這種情況下相當(dāng)于quicklist中就沒有任何一個quicklistNode */
        D("No node given!");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        __quicklistInsertNode(quicklist, NULL, new_node, after);
        new_node->count++;
        quicklist->count++;
        return;
    }

    /* Populate accounting flags for easier boolean checks later */
    //查看當(dāng)前quicklistNode對應(yīng)的ziplist中空間是否足夠插入一個新元素
    if (!_quicklistNodeAllowInsert(node, fill, sz)) {
        D("Current node is full with count %d with requested fill %lu",
          node->count, fill);
        full = 1;
    }

    /* 如果要在當(dāng)前Node的尾部后面插入新元素,考慮可否插入后一個ziplist中 */
    if (after && (entry->offset == node->count)) {
        D("At Tail of current ziplist");
        at_tail = 1;
        if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
            D("Next node is full too.");
            full_next = 1;
        }
    }

    /* 如果要在當(dāng)前Node的頭部前面插入新元素,考慮可否插入前一個ziplist中 */
    if (!after && (entry->offset == 0)) {
        D("At Head");
        at_head = 1;
        if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
            D("Prev node is full too.");
            full_prev = 1;
        }
    }

    /* Now determine where and how to insert the new element */
    /* 
        !full的情況對quicklist的compress depth沒有影響(沒有插入新的quicklistNode),
        所以只需要對要插入的那個quicklistNode進行:解壓縮(可能需要)-> 插入新元素 -> 重新壓縮
       (可能需要)就可以了
     */
    if (!full && after) {
        D("Not full, inserting after current position.");
        quicklistDecompressNodeForUse(node);//如果之前是壓縮狀態(tài),需要先解壓縮,insert完畢后,再重新壓縮
        unsigned char *next = ziplistNext(node->zl, entry->zi);
        if (next == NULL) {
            node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
        } else {
            node->zl = ziplistInsert(node->zl, next, value, sz);
        }
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);//由于插入元素前可能解壓縮了,所以可能需要重新回復(fù)壓縮狀態(tài)
    } else if (!full && !after) {
        D("Not full, inserting before current position.");
        quicklistDecompressNodeForUse(node);
        node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
        node->count++;
        quicklistNodeUpdateSz(node);
        quicklistRecompressOnly(quicklist, node);
    } else if (full && at_tail && node->next && !full_next && after) {
        /* If we are: at tail, next has free space, and inserting after:
         *   - insert entry at head of next node. */
        D("Full and tail, but next isn't full; inserting next node head");
        new_node = node->next;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && at_head && node->prev && !full_prev && !after) {
        /* If we are: at head, previous has free space, and inserting before:
         *   - insert entry at tail of previous node. */
        D("Full and head, but prev isn't full, inserting prev node tail");
        new_node = node->prev;
        quicklistDecompressNodeForUse(new_node);
        new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        quicklistRecompressOnly(quicklist, new_node);
    } else if (full && ((at_tail && node->next && full_next && after) ||
                        (at_head && node->prev && full_prev && !after))) {
        /* If we are: full, and our prev/next is full, then:
         *   - create new node and attach to quicklist */

        /* 
            這種情況表示cur、pre、next ziplist都滿了,需要重新構(gòu)建一個新的quicklistNode,
            在這個quicklistNode中插入目標(biāo)元素。因為插入了新的quicklistNode,可能會影響
            到compress depth,所以需要重新進行quicklistCompress處理(幸運的是,
            __quicklistInsertNode會自動進行這一步處理)。
         */
        D("\tprovisioning new node...");
        new_node = quicklistCreateNode();
        new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
    } else if (full) {
        /* else, node is full we need to split it. */
        /* covers both after and !after cases */

        /* 
              這種情況對應(yīng)cur ziplist滿了,但是想要插的元素不在ziplist頭部的前面或者尾部的后面,
              無法將元素插入pre或next ziplist。這種情況下,需要將cur ziplist在插入的位置
              split成兩個ziplist,再將這兩個新的ziplist插入quicklist中。因為插入了新的
              quicklistNode,所以需要重新進行quicklistCompress處理。
         */
        D("\tsplitting node...");
        quicklistDecompressNodeForUse(node);
        //split操作會將切割后的兩個ziplist分別存放在new_node和node中
 /* If 'after'==1, returned node will have elements _after_ 'offset'.
  *                The returned node will have elements [OFFSET+1, END].
  *                The input node keeps elements [0, OFFSET].
  *
  * If 'after'==0, returned node will keep elements up to and including 'offset'.
  *                The returned node will have elements [0, OFFSET].
  *                The input node keeps elements [OFFSET+1, END].
  */        
        //new_node可能插入node的前方或后方,由after決定
        //新的元素可能插入new_old的頭部或者尾部,也由after決定
        new_node = _quicklistSplitNode(node, entry->offset, after);
        new_node->zl = ziplistPush(new_node->zl, value, sz,
                                   after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
        new_node->count++;
        quicklistNodeUpdateSz(new_node);
        __quicklistInsertNode(quicklist, node, new_node, after);
        /*
            這一步的merge是為了節(jié)省空間,將多個ziplist合并成滿足fill要求的ziplist,
            減少ziplist頭部開銷。這一步試圖按以下順序合并:
            - (node->prev->prev, node->prev)
            - (node->next, node->next->next)
            - (node->prev, node)
            - (node, node->next)
         */
        _quicklistMergeNodes(quicklist, node);
    }

    quicklist->count++;
}


/* Insert 'new_node' after 'old_node' if 'after' is 1.
 * Insert 'new_node' before 'old_node' if 'after' is 0.
 * Note: 'new_node' is *always* uncompressed, so if we assign it to
 *       head or tail, we do not need to uncompress it. */
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
                                        quicklistNode *old_node,
                                        quicklistNode *new_node, int after) {
    if (after) {
        new_node->prev = old_node;
        if (old_node) {
            new_node->next = old_node->next;
            if (old_node->next)
                old_node->next->prev = new_node;
            old_node->next = new_node;
        }
        if (quicklist->tail == old_node)
            quicklist->tail = new_node;
    } else {
        new_node->next = old_node;
        if (old_node) {
            new_node->prev = old_node->prev;
            if (old_node->prev)
                old_node->prev->next = new_node;
            old_node->prev = new_node;
        }
        if (quicklist->head == old_node)
            quicklist->head = new_node;
    }
    /* If this insert creates the only element so far, initialize head/tail. */
    if (quicklist->len == 0) {
        quicklist->head = quicklist->tail = new_node;
    }

    /* 
       無論是在old元素的before還是after插入一個新的元素,都會更改old元素的位置,為了
       適應(yīng)quicklist->compress字段設(shè)置的非壓縮深度,需要調(diào)用quicklistCompress函數(shù)
       重新更改old元素的壓縮狀態(tài)。
     */
    if (old_node)
        quicklistCompress(quicklist, old_node);

    quicklist->len++;
}
  • quicklist的push操作:
    quicklist的push操作其實就是在雙向列表的頭節(jié)點或尾節(jié)點上插入一個新的元素。從上面我們知道,quicklist的每個節(jié)點都是一個ziplist,所以這個push操作就涉及到一個問題,當(dāng)前節(jié)點的ziplist是否能夠放進新元素。
    ———————————————————————————————————
    1.如果ziplist能夠放入新元素,即大小沒有超過限制(list-max-ziplist-size),那么直接調(diào)用ziplistPush函數(shù)壓入
    2.如果ziplist不能放入新元素,則新建一個quicklist節(jié)點,即新的ziplist,新的數(shù)據(jù)項會被插入到新的ziplist,新的quicklist節(jié)點插入到原有的quicklist上
    ———————————————————————————————————
/* Add new entry to head node of quicklist.
 *
 * Returns 0 if used existing head.
 * Returns 1 if new head created. */
//PushHead操作是在quicklist的某個節(jié)點的ziplist中push值為ziplistNode的元素
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
    quicklistNode *orig_head = quicklist->head;
    if (likely(
            _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
        quicklist->head->zl =
            ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
        quicklistNodeUpdateSz(quicklist->head);
    } else {//無法在原有quicklistNode中push元素,則新建一個只有這個元素的quicklist
        quicklistNode *node = quicklistCreateNode();
        node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);

        quicklistNodeUpdateSz(node);
        _quicklistInsertNodeBefore(quicklist, quicklist->head, node);
    }
    quicklist->count++;
    quicklist->head->count++;
    return (orig_head != quicklist->head);
}

//判斷當(dāng)前ziplist節(jié)點是否能插入的函數(shù):
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
                                           const int fill, const size_t sz) {
    if (unlikely(!node))
        return 0;

    int ziplist_overhead;
    /* size of previous offset */
    if (sz < 254)     //小于254時后一個節(jié)點的pre只有1字節(jié),否則為5字節(jié)
        ziplist_overhead = 1;
    else
        ziplist_overhead = 5;

    /* size of forward offset */
    if (sz < 64)   // 小于64字節(jié)當(dāng)前節(jié)點的encoding為1
        ziplist_overhead += 1;
    else if (likely(sz < 16384))      // 小于16384 encoding為2字節(jié)
        ziplist_overhead += 2;
    else     // encoding為5字節(jié)
        ziplist_overhead += 5;

    /* new_sz overestimates if 'sz' encodes to an integer type */
    unsigned int new_sz = node->sz + sz + ziplist_overhead;  // 忽略了連鎖更新的情況
    if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
        return 1;   // 校驗fill為負數(shù)是否超過單存儲限制
    else if (!sizeMeetsSafetyLimit(new_sz))    // 校驗單個節(jié)點是否超過8kb,主要防止fill為正數(shù)時單個節(jié)點內(nèi)存過大
        return 0;
    else if ((int)node->count < fill)   // fill為正數(shù)是否超過存儲限制,因為打算插入一個新元素,所以只能用<進行判斷,不能用<=進行判斷。
        return 1;
    else
        return 0;
}
  • quicklist的pop操作:
外部接口:
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
                 unsigned int *sz, long long *slong) {
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    if (quicklist->count == 0)
        return 0;
    int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
                                 _quicklistSaver);
    if (data)
        *data = vstr;
    if (slong)
        *slong = vlong;
    if (sz)
        *sz = vlen;
    return ret;
}

內(nèi)部接口:

/* pop from quicklist and return result in 'data' ptr.  Value of 'data'
 * is the return value of 'saver' function pointer if the data is NOT a number.
 *
 * If the quicklist element is a long long, then the return value is returned in
 * 'sval'.
 *
 * Return value of 0 means no elements available.
 * Return value of 1 means check 'data' and 'sval' for values.
 * If 'data' is set, use 'data' and 'sz'.  Otherwise, use 'sval'. */
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
                       unsigned int *sz, long long *sval,
                       void *(*saver)(unsigned char *data, unsigned int sz)) {
    unsigned char *p;
    unsigned char *vstr;
    unsigned int vlen;
    long long vlong;
    int pos = (where == QUICKLIST_HEAD) ? 0 : -1;

    if (quicklist->count == 0)
        return 0;

    if (data)
        *data = NULL;
    if (sz)
        *sz = 0;
    if (sval)
        *sval = -123456789;

    quicklistNode *node;
    if (where == QUICKLIST_HEAD && quicklist->head) {
        node = quicklist->head;
    } else if (where == QUICKLIST_TAIL && quicklist->tail) {
        node = quicklist->tail;
    } else {
        return 0;
    }

    p = ziplistIndex(node->zl, pos);
    if (ziplistGet(p, &vstr, &vlen, &vlong)) {
        if (vstr) {
            if (data)
                *data = saver(vstr, vlen);
            if (sz)
                *sz = vlen;
        } else {
            if (data)
                *data = NULL;
            if (sval)
                *sval = vlong;
        }
        quicklistDelIndex(quicklist, node, &p);
        return 1;
    }
    return 0;
}
  • quicklist節(jié)點刪除:
/* Delete one element represented by 'entry'
 *
 * 'entry' stores enough metadata to delete the proper position in
 * the correct ziplist in the correct quicklist node. */
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
    quicklistNode *prev = entry->node->prev;
    quicklistNode *next = entry->node->next;
    int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
                                         entry->node, &entry->zi);

    /* after delete, the zi is now invalid for any future usage. */
    iter->zi = NULL;

    /* If current node is deleted, we must update iterator node and offset. */
    if (deleted_node) {
        if (iter->direction == AL_START_HEAD) {
            iter->current = next;
            iter->offset = 0;
        } else if (iter->direction == AL_START_TAIL) {
            iter->current = prev;
            iter->offset = -1;
        }
    }
    /* else if (!deleted_node), no changes needed.
     * we already reset iter->zi above, and the existing iter->offset
     * doesn't move again because:
     *   - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
     *   - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
     *  if we deleted the last element at offet N and now
     *  length of this ziplist is N-1, the next call into
     *  quicklistNext() will jump to the next node. */
}

/* Delete one entry from list given the node for the entry and a pointer
 * to the entry in the node.
 *
 * Note: quicklistDelIndex() *requires* uncompressed nodes because you
 *       already had to get *p from an uncompressed node somewhere.
 *
 * Returns 1 if the entire node was deleted, 0 if node still exists.
 * Also updates in/out param 'p' with the next offset in the ziplist. */
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
                                   unsigned char **p) {
    int gone = 0;

    node->zl = ziplistDelete(node->zl, p);
    node->count--;
    if (node->count == 0) {
        gone = 1;
        __quicklistDelNode(quicklist, node);
    } else {
        quicklistNodeUpdateSz(node);
    }
    quicklist->count--;
    /* If we deleted the node, the original node is no longer valid */
    return gone ? 1 : 0;
}

REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
                                     quicklistNode *node) {
    if (node->next)
        node->next->prev = node->prev;
    if (node->prev)
        node->prev->next = node->next;

    if (node == quicklist->tail) {
        quicklist->tail = node->prev;
    }

    if (node == quicklist->head) {
        quicklist->head = node->next;
    }

    /* If we deleted a node within our compress depth, we
     * now have compressed nodes needing to be decompressed. */
    __quicklistCompress(quicklist, NULL);

    quicklist->count -= node->count;

    zfree(node->zl);
    zfree(node);
    quicklist->len--;
}

quicklist迭代器相關(guān)操作:

類似stl的迭代器,在執(zhí)行insert操作時,不能使用原來的迭代器,除非重新建立新的迭代器。
當(dāng)使用quicklistDelEntry()來刪除元素時,迭代器有時會失效。
迭代器失效的原因都是因為底層內(nèi)存重新分配,迭代器原本指向的地址變?yōu)闊o效地址。

  • 初始化迭代器:
/* Returns a quicklist iterator 'iter'. After the initialization every
 * call to quicklistNext() will return the next element of the quicklist. */
quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
    quicklistIter *iter;

    iter = zmalloc(sizeof(*iter));

    if (direction == AL_START_HEAD) {
        iter->current = quicklist->head;
        iter->offset = 0;
    } else if (direction == AL_START_TAIL) {
        iter->current = quicklist->tail;
        iter->offset = -1;
    }

    iter->direction = direction;
    iter->quicklist = quicklist;

    iter->zi = NULL;

    return iter;
}
  • 獲取idx處的迭代器
/* Initialize an iterator at a specific offset 'idx' and make the iterator
 * return nodes in 'direction' direction. */
quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
                                         const int direction,
                                         const long long idx) {
    quicklistEntry entry;

    if (quicklistIndex(quicklist, idx, &entry)) {
        quicklistIter *base = quicklistGetIterator(quicklist, direction);
        base->zi = NULL;//?這個為什么不用entry->zi,同next結(jié)合使用
        base->current = entry.node;
        base->offset = entry.offset;
        return base;
    } else {
        return NULL;
    }
}
  • 迭代器next操作:
    結(jié)合quicklistNext和quicklistGetIteratorAtIdx我們可以發(fā)現(xiàn),當(dāng)利用quicklistGetIteratorAtIdx初始化idx的迭代器時,此時itr->zi=NULL,只有進行了一次next后,itr->zi才會指向?qū)?yīng)idx的entryNode。之后,對這個itr再執(zhí)行quicklistNext操作,此時itr和entry都會指向下一個節(jié)點,而不再是NULL。
    注意點:迭代器next操作時會進行decompress操作獲得原始數(shù)據(jù),因此稍后需要對這些數(shù)據(jù)進行重新壓縮。
/* Get next element in iterator.
 *
 * Note: You must NOT insert into the list while iterating over it.
 * You *may* delete from the list while iterating using the
 * quicklistDelEntry() function.
 * If you insert into the quicklist while iterating, you should
 * re-create the iterator after your addition.
 *
 * iter = quicklistGetIterator(quicklist,<direction>);
 * quicklistEntry entry;
 * while (quicklistNext(iter, &entry)) {
 *     if (entry.value)
 *          [[ use entry.value with entry.sz ]]
 *     else
 *          [[ use entry.longval ]]
 * }
 *
 * Populates 'entry' with values for this iteration.
 * Returns 0 when iteration is complete or if iteration not possible.
 * If return value is 0, the contents of 'entry' are not valid.
 */
int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
    initEntry(entry);

    if (!iter) {
        D("Returning because no iter!");
        return 0;
    }

    entry->quicklist = iter->quicklist;
    entry->node = iter->current;

    if (!iter->current) {
        D("Returning because current node is NULL")
        return 0;
    }

    unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
    int offset_update = 0;

    if (!iter->zi) {
        /* If !zi, use current index. */
        quicklistDecompressNodeForUse(iter->current);
        iter->zi = ziplistIndex(iter->current->zl, iter->offset);
    } else {
        /* else, use existing iterator offset and get prev/next as necessary. */
        if (iter->direction == AL_START_HEAD) {
            nextFn = ziplistNext;
            offset_update = 1;
        } else if (iter->direction == AL_START_TAIL) {
            nextFn = ziplistPrev;
            offset_update = -1;
        }
        iter->zi = nextFn(iter->current->zl, iter->zi);
        iter->offset += offset_update;
    }

    entry->zi = iter->zi;
    entry->offset = iter->offset;

    if (iter->zi) {
        /* Populate value from existing ziplist position */
        ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
        return 1;
    } else {
        /* We ran out of ziplist entries.
         * Pick next node, update offset, then re-run retrieval. */
        quicklistCompress(iter->quicklist, iter->current);
        if (iter->direction == AL_START_HEAD) {
            /* Forward traversal */
            D("Jumping to start of next node");
            iter->current = iter->current->next;
            iter->offset = 0;
        } else if (iter->direction == AL_START_TAIL) {
            /* Reverse traversal */
            D("Jumping to end of previous node");
            iter->current = iter->current->prev;
            iter->offset = -1;
        }
        iter->zi = NULL;
        return quicklistNext(iter, entry);
    }
}

  • 根據(jù)idx獲取quicklist中的元素
/* Populate 'entry' with the element at the specified zero-based index
 * where 0 is the head, 1 is the element next to head
 * and so on. Negative integers are used in order to count
 * from the tail, -1 is the last element, -2 the penultimate
 * and so on. If the index is out of range 0 is returned.
 *
 * Returns 1 if element found
 * Returns 0 if element not found */
int quicklistIndex(const quicklist *quicklist, const long long idx,
                   quicklistEntry *entry) {
    quicklistNode *n;
    unsigned long long accum = 0;
    unsigned long long index;
    int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */

    initEntry(entry);
    entry->quicklist = quicklist;

    if (!forward) {
        index = (-idx) - 1;
        n = quicklist->tail;
    } else {
        index = idx;
        n = quicklist->head;
    }

    if (index >= quicklist->count)
        return 0;

    while (likely(n)) {
        if ((accum + n->count) > index) {
            break;
        } else {
            D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
              accum);
            accum += n->count;
            n = forward ? n->next : n->prev;
        }
    }

    if (!n)
        return 0;

    D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
      accum, index, index - accum, (-index) - 1 + accum);

    entry->node = n;
    if (forward) {
        /* forward = normal head-to-tail offset. */
        entry->offset = index - accum;
    } else {
        /* reverse = need negative offset for tail-to-head, so undo
         * the result of the original if (index < 0) above. */
        entry->offset = (-index) - 1 + accum;
    }

    quicklistDecompressNodeForUse(entry->node);
    entry->zi = ziplistIndex(entry->node->zl, entry->offset);
    ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
    /* The caller will use our result, so we don't re-compress here.
     * The caller can recompress or delete the node as needed. */
    return 1;
}

quicklist查漏補缺:

  • GCC關(guān)鍵字__builtin_expect
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)

__builtin_expect() 是 GCC (version >= 2.96)提供給程序員使用的,目的是將“分支轉(zhuǎn)移”的信息提供給編譯器,這樣編譯器可以對代碼進行優(yōu)化,以減少指令跳轉(zhuǎn)帶來的性能下降。__builtin_expect((x),1) 表示 x 的值為真的可能性更大;
__builtin_expect((x),0) 表示 x 的值為假的可能性更大。
也就是說,使用 likely() ,執(zhí)行 if 后面的語句 的機會更大,使用 unlikely(),執(zhí)行 else 后面的語句的機會更大。通過這種方式,編譯器在編譯過程中,會將可能性更大的代碼緊跟著起面的代碼,從而減少指令跳轉(zhuǎn)帶來的性能上的下降。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容