參考鏈接:
https://blog.csdn.net/harleylau/article/details/80534159
http://zhangtielei.com/posts/blog-redis-quicklist.html
https://blog.csdn.net/qq_30085733/article/details/79914295
http://czrzchao.com/redisSourceQuicklist
https://blog.csdn.net/qq_30085733/article/details/79914295
在redis3.2版本之前,redis的列表對象的底層數(shù)據(jù)結(jié)構(gòu)是ziplist和linkedlist。當(dāng)列表對象中元素的長度比較小或者數(shù)量比較少的時候,采用ziplist來存儲,當(dāng)列表對象中元素的長度比較大或者數(shù)量比較多的時候,則會轉(zhuǎn)而使用雙向列表linkedlist來存儲。但是最新的redis版本中,redis的list的底層數(shù)據(jù)結(jié)構(gòu)統(tǒng)一為quicklist,quicklist是ziplist和linkedlist的結(jié)合,其本質(zhì)上是一個雙向鏈表,鏈表中的每一個元素是ziplist。其數(shù)據(jù)結(jié)構(gòu)如下:
typedef struct quicklist {
quicklistNode *head; // 指向quicklist的頭部
quicklistNode *tail; // 指向quicklist的尾部
unsigned long count; // 列表中所有數(shù)據(jù)項的個數(shù)總和
unsigned int len; // quicklist節(jié)點的個數(shù),即ziplist的個數(shù)
int fill : 16; // ziplist大小限定,由list-max-ziplist-size給定
unsigned int compress : 16; // 節(jié)點壓縮深度設(shè)置(首尾兩端不被壓縮節(jié)點的個數(shù)),由list-compress-depth給定,該值為0表示不允許對這個quicklist進行壓縮
} quicklist;
其中,len表示quicklist元素的個數(shù),即quicklist包含多少個ziplist節(jié)點(quicklist的每一個節(jié)點都是ziplist)。由于每個ziplist又是由多個數(shù)據(jù)節(jié)點組成的,所以count代表quicklist中一共有多少個數(shù)據(jù)節(jié)點。fill和compress決定了每個ziplist的屬性。
redis會利用LZF壓縮算法對quicklist的節(jié)點進行壓縮操作,原因在于:list的設(shè)計目的是能夠存放很長的數(shù)據(jù)列表,當(dāng)列表很長時,必然會占用很高的內(nèi)存空間,且list中最容易訪問的是兩端的數(shù)據(jù),中間的數(shù)據(jù)訪問率較低,如果將不怎么被訪問的數(shù)據(jù)進行壓縮,只在需要時才進行解壓縮來訪問原始數(shù)據(jù),那么就可以在盡量少影響業(yè)務(wù)流暢度的基礎(chǔ)上減少數(shù)據(jù)存儲的空間。基于此,redis提出了compress depth這個概念--保持首尾compress depth個quickliostNode不進行壓縮,對中間的quicklistNode進行壓縮處理。
compress:這個字段是修飾整個quicklist的,代表首尾兩端不被壓縮的節(jié)點的個數(shù)。節(jié)點壓縮深度設(shè)置,存放
list-compress-depth參數(shù)的值。
* 0 特殊值,表示不壓縮
* 1 表示quicklist兩端各有一個節(jié)點不壓縮,中間的節(jié)點壓縮
* 2 表示quicklist兩端各有兩個節(jié)點不壓縮,中間的節(jié)點壓縮
* 3 表示quicklist兩端各有三個節(jié)點不壓縮,中間的節(jié)點壓縮
* 以此類推。
compress最大值:
#define COMPRESS_MAX (1 << 16)
由于quicklist結(jié)構(gòu)包含了壓縮表和鏈表,那么每個quicklistNode的大小就是一個需要仔細考量的點。如果單個quicklistNode存儲的數(shù)據(jù)太多,就會影響插入效率;但是如果單個quicklistNode太小,就會變得跟鏈表一樣造成空間浪費。
quicklist通過fill對單個quicklistNode的大小進行限制:
fill:ziplist大小設(shè)置,表示每個quicklist節(jié)點的元素大小限制,存放 list-max-ziplist-size參數(shù)的值。
fill可正可負,如果參數(shù)為正,表示按照數(shù)據(jù)項個數(shù)來限定每個節(jié)點中的元素個數(shù),比如3代表每個節(jié)點中存放的元
素個數(shù)不能超過3(最大為32768個);反之,如果參數(shù)為負,表示按照字節(jié)數(shù)來限定每個節(jié)點中的元素個數(shù),它只能
取-1~-5這五個數(shù),其含義如下:
* -1 每個節(jié)點的ziplist字節(jié)大小不能超過4kb
* -2 每個節(jié)點的ziplist字節(jié)大小不能超過8kb(-2是Redis給出的默認值)
* -3 每個節(jié)點的ziplist字節(jié)大小不能超過16kb
* -4 每個節(jié)點的ziplist字節(jié)大小不能超過32kb
* -5 每個節(jié)點的ziplist字節(jié)大小不能超過64kb
-1~-5對應(yīng)下面這個數(shù)組,例如:-1對應(yīng)4096,-5對應(yīng)65536.
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
fill為正數(shù)時的最大值:
#define FILL_MAX (1 << 15)
quicklist節(jié)點有兩種狀態(tài),RAW狀態(tài)和LZF狀態(tài)。RAW狀態(tài)就是原始的ziplist結(jié)構(gòu)(解壓縮狀態(tài));LZF狀態(tài)則是利用LZF壓縮算法對quicklist的節(jié)點進行壓縮操作,其結(jié)構(gòu)為quicklistLZF。
typedef struct quicklistNode {
struct quicklistNode *prev; // 指向上一個ziplist節(jié)點
struct quicklistNode *next; // 指向下一個ziplist節(jié)點
unsigned char *zl; // 數(shù)據(jù)指針,如果沒有被壓縮,就指向ziplist結(jié)構(gòu),反之指向quicklistLZF結(jié)構(gòu)
unsigned int sz; // 表示指向ziplist結(jié)構(gòu)的總長度(內(nèi)存占用長度)
unsigned int count : 16; // 表示ziplist中的數(shù)據(jù)項個數(shù)
unsigned int encoding : 2; // 編碼方式,1--ziplist,2--quicklistLZF
unsigned int container : 2; // 預(yù)留字段,存放數(shù)據(jù)的方式,1--NONE,2--ziplist
unsigned int recompress : 1; // 當(dāng)我們使用類似lindex這樣的命令查看了某一項本來壓縮的數(shù)據(jù)時,需要把數(shù)據(jù)暫時解壓,這時就設(shè)置recompress=1做一個標(biāo)記,等有機會再把數(shù)據(jù)重新壓縮。
unsigned int attempted_compress : 1; // 測試相關(guān)
unsigned int extra : 10; // 擴展字段,暫時沒用
} quicklistNode;
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
quciklist的迭代器結(jié)構(gòu),指向迭代器節(jié)點元素(當(dāng)前節(jié)點表示quicklist中的某個ziplist中的某個ziplistNode,也就是quicklistEntry最終指向的是這個ziplisyEntry,所以quicklist的迭代,是從當(dāng)前entry依次往后遍歷所有ziplistNode,而不是單單遍歷每個quicklistNode):
// quicklist的迭代器結(jié)構(gòu)
typedef struct quicklistIter {
const quicklist *quicklist; // 指向所在quicklist的指針
quicklistNode *current; // 指向當(dāng)前節(jié)點的指針
unsigned char *zi; // 指向當(dāng)前節(jié)點(ziplist中的·一個元素)
long offset; // 當(dāng)前ziplist中的偏移地址
int direction; // 迭代器的方向
} quicklistIter;
// 表示quicklist節(jié)點中ziplist里的一個節(jié)點結(jié)構(gòu)
typedef struct quicklistEntry {
const quicklist *quicklist; // 指向所在quicklist的指針
quicklistNode *node; // 指向當(dāng)前節(jié)點的指針
unsigned char *zi; // 指向當(dāng)前節(jié)點(ziplist中的·一個元素)
unsigned char *value; // 當(dāng)前指向的ziplist中的節(jié)點的字符串值
long long longval; // 當(dāng)前指向的ziplist中的節(jié)點的整型值
unsigned int sz; // 當(dāng)前指向的ziplist中的節(jié)點的字節(jié)大小
int offset; // 當(dāng)前指向的ziplist中的節(jié)點相對于ziplist的偏移量
} quicklistEntry;
quicklist結(jié)構(gòu)如圖所示:

quicklist的主要api:
- 創(chuàng)建quicklist:quicklist *quicklistCreate(void)
- quicklist節(jié)點創(chuàng)建:REDIS_STATIC quicklistNode *quicklistCreateNode(void)
- 刪除quicklist:void quicklistRelease(quicklist *quicklist)
- quicklist的insert操作:insert操作類似push操作,不過是在某個迭代器指向的當(dāng)前ziplistNode節(jié)點前/后insert一個新的ziplistNode元素。
void quicklistInsertAfter(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz)
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry, void *value, const size_t sz) - quicklist的push操作:在quicklist的頭/尾節(jié)點(ziplist)中push一個ziplistNode元素。
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz)
int quicklistPushTail(quicklist *quicklist, void *value, size_t sz) - quicklist的pop操作:int quicklistPop(quicklist *quicklist, int where, unsigned char **data, unsigned int *sz, long long *slong)
- quicklist節(jié)點刪除:void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry)
- quicklist壓縮:quicklistCompress(_ql, _node)
quicklist重要接口實現(xiàn)講解:
- quicklist壓縮:quicklistCompress(_ql, _node)
/*
如果recompress=1,表示這個節(jié)點之前因為index等操作被“暫時”解壓縮以查看原始數(shù)據(jù),
此時可以對這個節(jié)點直接執(zhí)行壓縮操作,將這個節(jié)點重新壓縮(不需要判斷quicklist的
compress字段是否為0,因為是暫時解壓縮的,所以原來的quicklist的這個節(jié)點一定是支持壓縮的。)
*/
#define quicklistCompress(_ql, _node) \
do { \
if ((_node)->recompress) \
quicklistCompressNode((_node)); \
else \
__quicklistCompress((_ql), (_node)); \
} while (0)
內(nèi)部接口:
/*
在弄懂這個函數(shù)之前,最重要的是明白quicklist->compress字段的意思,這個字段是修飾整個
quicklist的,代表首尾兩端不被壓縮的節(jié)點的個數(shù),所以在執(zhí)行compress之前,需要先判斷
要被壓縮的目標(biāo)節(jié)點是否可以被壓縮。
*/
//該函數(shù)應(yīng)用很廣,delete或者insert新元素都需要調(diào)用這個函數(shù)來進行quicklist屬性適配
REDIS_STATIC void __quicklistCompress(const quicklist *quicklist,
quicklistNode *node) {
/* If length is less than our compress depth (from both sides),
* we can't compress anything. */
if (!quicklistAllowsCompression(quicklist) ||
quicklist->len < (unsigned int)(quicklist->compress * 2))
return;
/* Iterate until we reach compress depth for both sides of the list.a
* Note: because we do length checks at the *top* of this function,
* we can skip explicit null checks below. Everything exists.
*/
quicklistNode *forward = quicklist->head;
quicklistNode *reverse = quicklist->tail;
int depth = 0;
int in_depth = 0;
while (depth++ < quicklist->compress) {
/*
從quicklist的頭尾兩邊分別遍歷,因為頭部的compress個節(jié)點和尾部的compress個
節(jié)點都一定是非壓縮的原始狀態(tài),為了確保滿足compress字段要求,首先將這幾個節(jié)點
強制再解壓縮回原始狀態(tài)。之所以進行這一步操作的原因是因為,inset或者delete操作
可能會將原來不處于compress depth中的元素進入compress depth,所以要將這些
元素變?yōu)檎5臓顟B(tài)
*/
quicklistDecompressNode(forward);
quicklistDecompressNode(reverse);
/*
目標(biāo)節(jié)點在頭部或尾部的那compress個非壓縮的節(jié)點中,不應(yīng)該壓縮這個節(jié)點,但是可能會壓縮最靠
近非壓縮深度外的兩個節(jié)點(???),所以只是將in_depth標(biāo)志置為1,表示目標(biāo)節(jié)點在非壓縮的
深度中(深度指不被壓縮的數(shù)目),但不會return。
*/
if (forward == node || reverse == node)
in_depth = 1;
//quicklist的長度不及2*compress個,不進行下一步的壓縮操作(異常情況,違背入口的判斷條件)
if (forward == reverse)
return;
forward = forward->next;
reverse = reverse->prev;
}
//目標(biāo)節(jié)點不在非壓縮深度中(滿足壓縮條件),直接壓縮
if (!in_depth)
quicklistCompressNode(node);
//為啥壓縮深度外的那兩個?不太清楚??不應(yīng)該只壓縮一個滿足情況的node就好了么??
if (depth > 2) {
/* At this point, forward and reverse are one node beyond depth */
quicklistCompressNode(forward);
quicklistCompressNode(reverse);
}
}
/* If we previously used quicklistDecompressNodeForUse(), just recompress. */
/* 之前如果為了某個需要將某個原始節(jié)點暫時解壓縮了,此時要將該節(jié)點恢復(fù)為壓縮狀態(tài) */
#define quicklistRecompressOnly(_ql, _node) \
do { \
if ((_node)->recompress) \
quicklistCompressNode((_node)); \
} while (0)
- quicklist insert操作:
quicklist還能在指定的位置插入數(shù)據(jù),quicklistInsertAfter和quicklistInsertBefore就是分別在指定位置后面和前面插入數(shù)據(jù)項。當(dāng)然,和在頭尾節(jié)點插入一樣, 任意位置插入也是需要判斷當(dāng)前插入節(jié)點是否能夠放得下當(dāng)前的元素的,這邊的情況會比頭尾節(jié)點更為復(fù)雜,比如在當(dāng)前節(jié)點放不下的時候,還需要檢查一下旁邊的節(jié)點是否能夠放下這個數(shù)據(jù),能夠放下的話可以放置在旁邊的節(jié)點上,如果也不行的話,就是需要新建一個ziplist節(jié)點。
void quicklistInsertBefore(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz) {
_quicklistInsert(quicklist, entry, value, sz, 0);
}
內(nèi)部接口:
/* Insert a new entry before or after existing entry 'entry'.
*
* If after==1, the new value is inserted after 'entry', otherwise
* the new value is inserted before 'entry'. */
REDIS_STATIC void _quicklistInsert(quicklist *quicklist, quicklistEntry *entry,
void *value, const size_t sz, int after) {
int full = 0, at_tail = 0, at_head = 0, full_next = 0, full_prev = 0;
int fill = quicklist->fill;
quicklistNode *node = entry->node;
quicklistNode *new_node = NULL;
if (!node) {
/* we have no reference node, so let's create only node in the list */
/* 這種情況下相當(dāng)于quicklist中就沒有任何一個quicklistNode */
D("No node given!");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
__quicklistInsertNode(quicklist, NULL, new_node, after);
new_node->count++;
quicklist->count++;
return;
}
/* Populate accounting flags for easier boolean checks later */
//查看當(dāng)前quicklistNode對應(yīng)的ziplist中空間是否足夠插入一個新元素
if (!_quicklistNodeAllowInsert(node, fill, sz)) {
D("Current node is full with count %d with requested fill %lu",
node->count, fill);
full = 1;
}
/* 如果要在當(dāng)前Node的尾部后面插入新元素,考慮可否插入后一個ziplist中 */
if (after && (entry->offset == node->count)) {
D("At Tail of current ziplist");
at_tail = 1;
if (!_quicklistNodeAllowInsert(node->next, fill, sz)) {
D("Next node is full too.");
full_next = 1;
}
}
/* 如果要在當(dāng)前Node的頭部前面插入新元素,考慮可否插入前一個ziplist中 */
if (!after && (entry->offset == 0)) {
D("At Head");
at_head = 1;
if (!_quicklistNodeAllowInsert(node->prev, fill, sz)) {
D("Prev node is full too.");
full_prev = 1;
}
}
/* Now determine where and how to insert the new element */
/*
!full的情況對quicklist的compress depth沒有影響(沒有插入新的quicklistNode),
所以只需要對要插入的那個quicklistNode進行:解壓縮(可能需要)-> 插入新元素 -> 重新壓縮
(可能需要)就可以了
*/
if (!full && after) {
D("Not full, inserting after current position.");
quicklistDecompressNodeForUse(node);//如果之前是壓縮狀態(tài),需要先解壓縮,insert完畢后,再重新壓縮
unsigned char *next = ziplistNext(node->zl, entry->zi);
if (next == NULL) {
node->zl = ziplistPush(node->zl, value, sz, ZIPLIST_TAIL);
} else {
node->zl = ziplistInsert(node->zl, next, value, sz);
}
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);//由于插入元素前可能解壓縮了,所以可能需要重新回復(fù)壓縮狀態(tài)
} else if (!full && !after) {
D("Not full, inserting before current position.");
quicklistDecompressNodeForUse(node);
node->zl = ziplistInsert(node->zl, entry->zi, value, sz);
node->count++;
quicklistNodeUpdateSz(node);
quicklistRecompressOnly(quicklist, node);
} else if (full && at_tail && node->next && !full_next && after) {
/* If we are: at tail, next has free space, and inserting after:
* - insert entry at head of next node. */
D("Full and tail, but next isn't full; inserting next node head");
new_node = node->next;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && at_head && node->prev && !full_prev && !after) {
/* If we are: at head, previous has free space, and inserting before:
* - insert entry at tail of previous node. */
D("Full and head, but prev isn't full, inserting prev node tail");
new_node = node->prev;
quicklistDecompressNodeForUse(new_node);
new_node->zl = ziplistPush(new_node->zl, value, sz, ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
quicklistRecompressOnly(quicklist, new_node);
} else if (full && ((at_tail && node->next && full_next && after) ||
(at_head && node->prev && full_prev && !after))) {
/* If we are: full, and our prev/next is full, then:
* - create new node and attach to quicklist */
/*
這種情況表示cur、pre、next ziplist都滿了,需要重新構(gòu)建一個新的quicklistNode,
在這個quicklistNode中插入目標(biāo)元素。因為插入了新的quicklistNode,可能會影響
到compress depth,所以需要重新進行quicklistCompress處理(幸運的是,
__quicklistInsertNode會自動進行這一步處理)。
*/
D("\tprovisioning new node...");
new_node = quicklistCreateNode();
new_node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
} else if (full) {
/* else, node is full we need to split it. */
/* covers both after and !after cases */
/*
這種情況對應(yīng)cur ziplist滿了,但是想要插的元素不在ziplist頭部的前面或者尾部的后面,
無法將元素插入pre或next ziplist。這種情況下,需要將cur ziplist在插入的位置
split成兩個ziplist,再將這兩個新的ziplist插入quicklist中。因為插入了新的
quicklistNode,所以需要重新進行quicklistCompress處理。
*/
D("\tsplitting node...");
quicklistDecompressNodeForUse(node);
//split操作會將切割后的兩個ziplist分別存放在new_node和node中
/* If 'after'==1, returned node will have elements _after_ 'offset'.
* The returned node will have elements [OFFSET+1, END].
* The input node keeps elements [0, OFFSET].
*
* If 'after'==0, returned node will keep elements up to and including 'offset'.
* The returned node will have elements [0, OFFSET].
* The input node keeps elements [OFFSET+1, END].
*/
//new_node可能插入node的前方或后方,由after決定
//新的元素可能插入new_old的頭部或者尾部,也由after決定
new_node = _quicklistSplitNode(node, entry->offset, after);
new_node->zl = ziplistPush(new_node->zl, value, sz,
after ? ZIPLIST_HEAD : ZIPLIST_TAIL);
new_node->count++;
quicklistNodeUpdateSz(new_node);
__quicklistInsertNode(quicklist, node, new_node, after);
/*
這一步的merge是為了節(jié)省空間,將多個ziplist合并成滿足fill要求的ziplist,
減少ziplist頭部開銷。這一步試圖按以下順序合并:
- (node->prev->prev, node->prev)
- (node->next, node->next->next)
- (node->prev, node)
- (node, node->next)
*/
_quicklistMergeNodes(quicklist, node);
}
quicklist->count++;
}
/* Insert 'new_node' after 'old_node' if 'after' is 1.
* Insert 'new_node' before 'old_node' if 'after' is 0.
* Note: 'new_node' is *always* uncompressed, so if we assign it to
* head or tail, we do not need to uncompress it. */
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node, int after) {
if (after) {
new_node->prev = old_node;
if (old_node) {
new_node->next = old_node->next;
if (old_node->next)
old_node->next->prev = new_node;
old_node->next = new_node;
}
if (quicklist->tail == old_node)
quicklist->tail = new_node;
} else {
new_node->next = old_node;
if (old_node) {
new_node->prev = old_node->prev;
if (old_node->prev)
old_node->prev->next = new_node;
old_node->prev = new_node;
}
if (quicklist->head == old_node)
quicklist->head = new_node;
}
/* If this insert creates the only element so far, initialize head/tail. */
if (quicklist->len == 0) {
quicklist->head = quicklist->tail = new_node;
}
/*
無論是在old元素的before還是after插入一個新的元素,都會更改old元素的位置,為了
適應(yīng)quicklist->compress字段設(shè)置的非壓縮深度,需要調(diào)用quicklistCompress函數(shù)
重新更改old元素的壓縮狀態(tài)。
*/
if (old_node)
quicklistCompress(quicklist, old_node);
quicklist->len++;
}
- quicklist的push操作:
quicklist的push操作其實就是在雙向列表的頭節(jié)點或尾節(jié)點上插入一個新的元素。從上面我們知道,quicklist的每個節(jié)點都是一個ziplist,所以這個push操作就涉及到一個問題,當(dāng)前節(jié)點的ziplist是否能夠放進新元素。
———————————————————————————————————
1.如果ziplist能夠放入新元素,即大小沒有超過限制(list-max-ziplist-size),那么直接調(diào)用ziplistPush函數(shù)壓入
2.如果ziplist不能放入新元素,則新建一個quicklist節(jié)點,即新的ziplist,新的數(shù)據(jù)項會被插入到新的ziplist,新的quicklist節(jié)點插入到原有的quicklist上
———————————————————————————————————
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
//PushHead操作是在quicklist的某個節(jié)點的ziplist中push值為ziplistNode的元素
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(quicklist->head);
} else {//無法在原有quicklistNode中push元素,則新建一個只有這個元素的quicklist
quicklistNode *node = quicklistCreateNode();
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);
quicklistNodeUpdateSz(node);
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);
}
quicklist->count++;
quicklist->head->count++;
return (orig_head != quicklist->head);
}
//判斷當(dāng)前ziplist節(jié)點是否能插入的函數(shù):
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254) //小于254時后一個節(jié)點的pre只有1字節(jié),否則為5字節(jié)
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64) // 小于64字節(jié)當(dāng)前節(jié)點的encoding為1
ziplist_overhead += 1;
else if (likely(sz < 16384)) // 小于16384 encoding為2字節(jié)
ziplist_overhead += 2;
else // encoding為5字節(jié)
ziplist_overhead += 5;
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead; // 忽略了連鎖更新的情況
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1; // 校驗fill為負數(shù)是否超過單存儲限制
else if (!sizeMeetsSafetyLimit(new_sz)) // 校驗單個節(jié)點是否超過8kb,主要防止fill為正數(shù)時單個節(jié)點內(nèi)存過大
return 0;
else if ((int)node->count < fill) // fill為正數(shù)是否超過存儲限制,因為打算插入一個新元素,所以只能用<進行判斷,不能用<=進行判斷。
return 1;
else
return 0;
}
- quicklist的pop操作:
外部接口:
int quicklistPop(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *slong) {
unsigned char *vstr;
unsigned int vlen;
long long vlong;
if (quicklist->count == 0)
return 0;
int ret = quicklistPopCustom(quicklist, where, &vstr, &vlen, &vlong,
_quicklistSaver);
if (data)
*data = vstr;
if (slong)
*slong = vlong;
if (sz)
*sz = vlen;
return ret;
}
內(nèi)部接口:
/* pop from quicklist and return result in 'data' ptr. Value of 'data'
* is the return value of 'saver' function pointer if the data is NOT a number.
*
* If the quicklist element is a long long, then the return value is returned in
* 'sval'.
*
* Return value of 0 means no elements available.
* Return value of 1 means check 'data' and 'sval' for values.
* If 'data' is set, use 'data' and 'sz'. Otherwise, use 'sval'. */
int quicklistPopCustom(quicklist *quicklist, int where, unsigned char **data,
unsigned int *sz, long long *sval,
void *(*saver)(unsigned char *data, unsigned int sz)) {
unsigned char *p;
unsigned char *vstr;
unsigned int vlen;
long long vlong;
int pos = (where == QUICKLIST_HEAD) ? 0 : -1;
if (quicklist->count == 0)
return 0;
if (data)
*data = NULL;
if (sz)
*sz = 0;
if (sval)
*sval = -123456789;
quicklistNode *node;
if (where == QUICKLIST_HEAD && quicklist->head) {
node = quicklist->head;
} else if (where == QUICKLIST_TAIL && quicklist->tail) {
node = quicklist->tail;
} else {
return 0;
}
p = ziplistIndex(node->zl, pos);
if (ziplistGet(p, &vstr, &vlen, &vlong)) {
if (vstr) {
if (data)
*data = saver(vstr, vlen);
if (sz)
*sz = vlen;
} else {
if (data)
*data = NULL;
if (sval)
*sval = vlong;
}
quicklistDelIndex(quicklist, node, &p);
return 1;
}
return 0;
}
- quicklist節(jié)點刪除:
/* Delete one element represented by 'entry'
*
* 'entry' stores enough metadata to delete the proper position in
* the correct ziplist in the correct quicklist node. */
void quicklistDelEntry(quicklistIter *iter, quicklistEntry *entry) {
quicklistNode *prev = entry->node->prev;
quicklistNode *next = entry->node->next;
int deleted_node = quicklistDelIndex((quicklist *)entry->quicklist,
entry->node, &entry->zi);
/* after delete, the zi is now invalid for any future usage. */
iter->zi = NULL;
/* If current node is deleted, we must update iterator node and offset. */
if (deleted_node) {
if (iter->direction == AL_START_HEAD) {
iter->current = next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
iter->current = prev;
iter->offset = -1;
}
}
/* else if (!deleted_node), no changes needed.
* we already reset iter->zi above, and the existing iter->offset
* doesn't move again because:
* - [1, 2, 3] => delete offset 1 => [1, 3]: next element still offset 1
* - [1, 2, 3] => delete offset 0 => [2, 3]: next element still offset 0
* if we deleted the last element at offet N and now
* length of this ziplist is N-1, the next call into
* quicklistNext() will jump to the next node. */
}
/* Delete one entry from list given the node for the entry and a pointer
* to the entry in the node.
*
* Note: quicklistDelIndex() *requires* uncompressed nodes because you
* already had to get *p from an uncompressed node somewhere.
*
* Returns 1 if the entire node was deleted, 0 if node still exists.
* Also updates in/out param 'p' with the next offset in the ziplist. */
REDIS_STATIC int quicklistDelIndex(quicklist *quicklist, quicklistNode *node,
unsigned char **p) {
int gone = 0;
node->zl = ziplistDelete(node->zl, p);
node->count--;
if (node->count == 0) {
gone = 1;
__quicklistDelNode(quicklist, node);
} else {
quicklistNodeUpdateSz(node);
}
quicklist->count--;
/* If we deleted the node, the original node is no longer valid */
return gone ? 1 : 0;
}
REDIS_STATIC void __quicklistDelNode(quicklist *quicklist,
quicklistNode *node) {
if (node->next)
node->next->prev = node->prev;
if (node->prev)
node->prev->next = node->next;
if (node == quicklist->tail) {
quicklist->tail = node->prev;
}
if (node == quicklist->head) {
quicklist->head = node->next;
}
/* If we deleted a node within our compress depth, we
* now have compressed nodes needing to be decompressed. */
__quicklistCompress(quicklist, NULL);
quicklist->count -= node->count;
zfree(node->zl);
zfree(node);
quicklist->len--;
}
quicklist迭代器相關(guān)操作:
類似stl的迭代器,在執(zhí)行insert操作時,不能使用原來的迭代器,除非重新建立新的迭代器。
當(dāng)使用quicklistDelEntry()來刪除元素時,迭代器有時會失效。
迭代器失效的原因都是因為底層內(nèi)存重新分配,迭代器原本指向的地址變?yōu)闊o效地址。
- 初始化迭代器:
/* Returns a quicklist iterator 'iter'. After the initialization every
* call to quicklistNext() will return the next element of the quicklist. */
quicklistIter *quicklistGetIterator(const quicklist *quicklist, int direction) {
quicklistIter *iter;
iter = zmalloc(sizeof(*iter));
if (direction == AL_START_HEAD) {
iter->current = quicklist->head;
iter->offset = 0;
} else if (direction == AL_START_TAIL) {
iter->current = quicklist->tail;
iter->offset = -1;
}
iter->direction = direction;
iter->quicklist = quicklist;
iter->zi = NULL;
return iter;
}
- 獲取idx處的迭代器
/* Initialize an iterator at a specific offset 'idx' and make the iterator
* return nodes in 'direction' direction. */
quicklistIter *quicklistGetIteratorAtIdx(const quicklist *quicklist,
const int direction,
const long long idx) {
quicklistEntry entry;
if (quicklistIndex(quicklist, idx, &entry)) {
quicklistIter *base = quicklistGetIterator(quicklist, direction);
base->zi = NULL;//?這個為什么不用entry->zi,同next結(jié)合使用
base->current = entry.node;
base->offset = entry.offset;
return base;
} else {
return NULL;
}
}
- 迭代器next操作:
結(jié)合quicklistNext和quicklistGetIteratorAtIdx我們可以發(fā)現(xiàn),當(dāng)利用quicklistGetIteratorAtIdx初始化idx的迭代器時,此時itr->zi=NULL,只有進行了一次next后,itr->zi才會指向?qū)?yīng)idx的entryNode。之后,對這個itr再執(zhí)行quicklistNext操作,此時itr和entry都會指向下一個節(jié)點,而不再是NULL。
注意點:迭代器next操作時會進行decompress操作獲得原始數(shù)據(jù),因此稍后需要對這些數(shù)據(jù)進行重新壓縮。
/* Get next element in iterator.
*
* Note: You must NOT insert into the list while iterating over it.
* You *may* delete from the list while iterating using the
* quicklistDelEntry() function.
* If you insert into the quicklist while iterating, you should
* re-create the iterator after your addition.
*
* iter = quicklistGetIterator(quicklist,<direction>);
* quicklistEntry entry;
* while (quicklistNext(iter, &entry)) {
* if (entry.value)
* [[ use entry.value with entry.sz ]]
* else
* [[ use entry.longval ]]
* }
*
* Populates 'entry' with values for this iteration.
* Returns 0 when iteration is complete or if iteration not possible.
* If return value is 0, the contents of 'entry' are not valid.
*/
int quicklistNext(quicklistIter *iter, quicklistEntry *entry) {
initEntry(entry);
if (!iter) {
D("Returning because no iter!");
return 0;
}
entry->quicklist = iter->quicklist;
entry->node = iter->current;
if (!iter->current) {
D("Returning because current node is NULL")
return 0;
}
unsigned char *(*nextFn)(unsigned char *, unsigned char *) = NULL;
int offset_update = 0;
if (!iter->zi) {
/* If !zi, use current index. */
quicklistDecompressNodeForUse(iter->current);
iter->zi = ziplistIndex(iter->current->zl, iter->offset);
} else {
/* else, use existing iterator offset and get prev/next as necessary. */
if (iter->direction == AL_START_HEAD) {
nextFn = ziplistNext;
offset_update = 1;
} else if (iter->direction == AL_START_TAIL) {
nextFn = ziplistPrev;
offset_update = -1;
}
iter->zi = nextFn(iter->current->zl, iter->zi);
iter->offset += offset_update;
}
entry->zi = iter->zi;
entry->offset = iter->offset;
if (iter->zi) {
/* Populate value from existing ziplist position */
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
return 1;
} else {
/* We ran out of ziplist entries.
* Pick next node, update offset, then re-run retrieval. */
quicklistCompress(iter->quicklist, iter->current);
if (iter->direction == AL_START_HEAD) {
/* Forward traversal */
D("Jumping to start of next node");
iter->current = iter->current->next;
iter->offset = 0;
} else if (iter->direction == AL_START_TAIL) {
/* Reverse traversal */
D("Jumping to end of previous node");
iter->current = iter->current->prev;
iter->offset = -1;
}
iter->zi = NULL;
return quicklistNext(iter, entry);
}
}
- 根據(jù)idx獲取quicklist中的元素
/* Populate 'entry' with the element at the specified zero-based index
* where 0 is the head, 1 is the element next to head
* and so on. Negative integers are used in order to count
* from the tail, -1 is the last element, -2 the penultimate
* and so on. If the index is out of range 0 is returned.
*
* Returns 1 if element found
* Returns 0 if element not found */
int quicklistIndex(const quicklist *quicklist, const long long idx,
quicklistEntry *entry) {
quicklistNode *n;
unsigned long long accum = 0;
unsigned long long index;
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */
initEntry(entry);
entry->quicklist = quicklist;
if (!forward) {
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
if (index >= quicklist->count)
return 0;
while (likely(n)) {
if ((accum + n->count) > index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;
}
}
if (!n)
return 0;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
entry->node = n;
if (forward) {
/* forward = normal head-to-tail offset. */
entry->offset = index - accum;
} else {
/* reverse = need negative offset for tail-to-head, so undo
* the result of the original if (index < 0) above. */
entry->offset = (-index) - 1 + accum;
}
quicklistDecompressNodeForUse(entry->node);
entry->zi = ziplistIndex(entry->node->zl, entry->offset);
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);
/* The caller will use our result, so we don't re-compress here.
* The caller can recompress or delete the node as needed. */
return 1;
}
quicklist查漏補缺:
- GCC關(guān)鍵字__builtin_expect
#define likely(x) __builtin_expect(!!(x), 1)
#define unlikely(x) __builtin_expect(!!(x), 0)
__builtin_expect() 是 GCC (version >= 2.96)提供給程序員使用的,目的是將“分支轉(zhuǎn)移”的信息提供給編譯器,這樣編譯器可以對代碼進行優(yōu)化,以減少指令跳轉(zhuǎn)帶來的性能下降。__builtin_expect((x),1) 表示 x 的值為真的可能性更大;
__builtin_expect((x),0) 表示 x 的值為假的可能性更大。
也就是說,使用 likely() ,執(zhí)行 if 后面的語句 的機會更大,使用 unlikely(),執(zhí)行 else 后面的語句的機會更大。通過這種方式,編譯器在編譯過程中,會將可能性更大的代碼緊跟著起面的代碼,從而減少指令跳轉(zhuǎn)帶來的性能上的下降。