簡(jiǎn)介
A generic doubly linked quicklist implementation
A doubly linked list of ziplists
根據(jù)源代碼里面給我出的解釋就是這個(gè)是一個(gè)ziplist組成的雙向鏈表。
每個(gè)節(jié)點(diǎn)使用ziplist來(lái)保存數(shù)據(jù)。
根據(jù)咱們的編程經(jīng)驗(yàn)來(lái)說(shuō)。一個(gè)鏈表使用另外一種鏈表來(lái)作為節(jié)點(diǎn),那么本質(zhì)上來(lái)說(shuō),quicklist里面保存著一個(gè)一個(gè)小的ziplist。
但是為啥不直接用ziplist喃。本身ziplist就是一個(gè)雙向鏈表。并且額外的信息還特別的少。
其實(shí)原因很簡(jiǎn)單。根據(jù)我們上一節(jié)講的ziplist來(lái)看,ziplist在我們程序里面來(lái)看將會(huì)是一塊連續(xù)的內(nèi)存塊。它使用內(nèi)存偏移來(lái)保存next從而節(jié)約了next指針。這樣造成了我們每一次的刪除插入操作都會(huì)進(jìn)行remalloc,從而分配一塊新的內(nèi)存塊。當(dāng)我們的ziplist特別大的時(shí)候。沒(méi)有這么大空閑的內(nèi)存塊給我們的時(shí)候。操作系統(tǒng)其實(shí)會(huì)抽象出一塊連續(xù)的內(nèi)存塊給我。在底層來(lái)說(shuō)他其實(shí)是一個(gè)鏈表鏈接成為的內(nèi)存。不過(guò)在我們程序使用來(lái)說(shuō)。他還是一塊連續(xù)的內(nèi)存。這樣的話(huà)會(huì)造成內(nèi)存碎片,并且在操作的時(shí)候因?yàn)閮?nèi)存不連續(xù)等原因造成效率問(wèn)題?;蛘咭?yàn)檗D(zhuǎn)移到大內(nèi)存塊等進(jìn)行數(shù)據(jù)遷移。從而損失性能。
所以quicklist是對(duì)ziplist進(jìn)行一次封裝,使用小塊的ziplist來(lái)既保證了少使用內(nèi)存,也保證了性能。
數(shù)據(jù)結(jié)構(gòu)
/* quicklistNode is a 32 byte struct describing a ziplist for a quicklist.
* We use bit fields keep the quicklistNode at 32 bytes.
* count: 16 bits, max 65536 (max zl bytes is 65k, so max count actually < 32k).
* encoding: 2 bits, RAW=1, LZF=2.
* container: 2 bits, NONE=1, ZIPLIST=2.
* recompress: 1 bit, bool, true if node is temporarry decompressed for usage.
* attempted_compress: 1 bit, boolean, used for verifying during testing.
* extra: 12 bits, free for future use; pads out the remainder of 32 bits */
typedef struct quicklistNode {
struct quicklistNode *prev; //上一個(gè)node節(jié)點(diǎn)
struct quicklistNode *next; //下一個(gè)node
unsigned char *zl; //保存的數(shù)據(jù) 壓縮前ziplist 壓縮后壓縮的數(shù)據(jù)
unsigned int sz; /* ziplist size in bytes */
unsigned int count : 16; /* count of items in ziplist */
unsigned int encoding : 2; /* RAW==1 or LZF==2 */
unsigned int container : 2; /* NONE==1 or ZIPLIST==2 */
unsigned int recompress : 1; /* was this node previous compressed? */
unsigned int attempted_compress : 1; /* node can't compress; too small */
unsigned int extra : 10; /* more bits to steal for future usage */
} quicklistNode;
/* quicklistLZF is a 4+N byte struct holding 'sz' followed by 'compressed'.
* 'sz' is byte length of 'compressed' field.
* 'compressed' is LZF data with total (compressed) length 'sz'
* NOTE: uncompressed length is stored in quicklistNode->sz.
* When quicklistNode->zl is compressed, node->zl points to a quicklistLZF */
typedef struct quicklistLZF {
unsigned int sz; /* LZF size in bytes*/
char compressed[];
} quicklistLZF;
/* quicklist is a 32 byte struct (on 64-bit systems) describing a quicklist.
* 'count' is the number of total entries.
* 'len' is the number of quicklist nodes.
* 'compress' is: -1 if compression disabled, otherwise it's the number
* of quicklistNodes to leave uncompressed at ends of quicklist.
* 'fill' is the user-requested (or default) fill factor. */
typedef struct quicklist {
quicklistNode *head; //頭結(jié)點(diǎn)
quicklistNode *tail; //尾節(jié)點(diǎn)
unsigned long count; /* total count of all entries in all ziplists */
unsigned int len; /* number of quicklistNodes */
int fill : 16; /* fill factor for individual nodes *///負(fù)數(shù)代表級(jí)別,正數(shù)代表個(gè)數(shù)
unsigned int compress : 16; /* depth of end nodes not to compress;0=off *///壓縮級(jí)別
} quicklist;
說(shuō)起來(lái)這quickList就是一個(gè)標(biāo)準(zhǔn)的雙向鏈表的配置,有head 有tail node里面有prev 有next 其他的參數(shù)都長(zhǎng)度或者一些特別標(biāo)識(shí)??傮w來(lái)說(shuō),我們的quicklist會(huì)很簡(jiǎn)單。
看到這里大家基本都可以猜到插入查找刪除等操作了
插入
由于咱們的quicklist是一個(gè)標(biāo)準(zhǔn)的雙向鏈表,所以他的插入操作的原理會(huì)和普通的雙向鏈表插入很像,不過(guò)喃由于他使用了其他鏈表來(lái)作為節(jié)點(diǎn)。所以喃,我們操作就是找到我們插入的節(jié)點(diǎn)。然后進(jìn)行ziplist的插入就ok了。
/* Optimization levels for size-based filling */
static const size_t optimization_level[] = {4096, 8192, 16384, 32768, 65536};
/* Maximum size in bytes of any multi-element ziplist.
* Larger values will live in their own isolated ziplists. */
#define SIZE_SAFETY_LIMIT 8192
REDIS_STATIC int _quicklistNodeAllowInsert(const quicklistNode *node,
const int fill, const size_t sz) {
if (unlikely(!node))
return 0;
int ziplist_overhead;
/* size of previous offset */
if (sz < 254) //根據(jù)我們ziplist的經(jīng)驗(yàn) 對(duì)prvlen進(jìn)行編碼 小于254一位 大于等于254 五位
ziplist_overhead = 1;
else
ziplist_overhead = 5;
/* size of forward offset */
if (sz < 64) //編碼字節(jié)的size 根據(jù)我們對(duì)len的編碼
ziplist_overhead += 1;
else if (likely(sz < 16384))
ziplist_overhead += 2;
else
ziplist_overhead += 5;
//不過(guò)這個(gè)大小只是一個(gè)大概的猜想。因?yàn)槲覀冎牢覀儂iplist里面還有很多特殊的操作。這個(gè)算出來(lái)的是一個(gè)最大的大小??傮w來(lái)說(shuō)只會(huì)比這個(gè)小,不會(huì)比這個(gè)大的
/* new_sz overestimates if 'sz' encodes to an integer type */
unsigned int new_sz = node->sz + sz + ziplist_overhead; //計(jì)算出一個(gè)大概新插入后的大小
if (likely(_quicklistNodeSizeMeetsOptimizationRequirement(new_sz, fill)))
return 1;
else if (!sizeMeetsSafetyLimit(new_sz))//判斷是不是個(gè)安全的大小
return 0;
else if ((int)node->count < fill)//這才來(lái)判斷個(gè)數(shù)
return 1;
else
return 0;
}
REDIS_STATIC int
_quicklistNodeSizeMeetsOptimizationRequirement(const size_t sz,
const int fill) {
if (fill >= 0)//大于等于0 是個(gè)數(shù)
return 0;
size_t offset = (-fill) - 1;//小于等于0 級(jí)別
if (offset < (sizeof(optimization_level) / sizeof(*optimization_level))) {
if (sz <= optimization_level[offset]) {
return 1;
} else {
return 0;
}
} else {
return 0;
}
}
REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist,
quicklistNode *old_node,
quicklistNode *new_node, int after) {
if (after) { //插入方式
new_node->prev = old_node;//設(shè)置節(jié)點(diǎn)關(guān)系
if (old_node) { //如果原來(lái)的節(jié)點(diǎn)有咯
new_node->next = old_node->next;//設(shè)置next
if (old_node->next)
old_node->next->prev = new_node;
old_node->next = new_node;
}
if (quicklist->tail == old_node)
quicklist->tail = new_node;
} else {
new_node->next = old_node;
if (old_node) {
new_node->prev = old_node->prev;
if (old_node->prev)
old_node->prev->next = new_node;
old_node->prev = new_node;
}
if (quicklist->head == old_node)
quicklist->head = new_node;
}
/* If this insert creates the only element so far, initialize head/tail. */
if (quicklist->len == 0) {//第一次加入 設(shè)置成頭和尾
quicklist->head = quicklist->tail = new_node;
}
if (old_node)//加滿(mǎn)了之后進(jìn)行壓縮操作
quicklistCompress(quicklist, old_node);
quicklist->len++;
}
/* Add new entry to head node of quicklist.
*
* Returns 0 if used existing head.
* Returns 1 if new head created. */
int quicklistPushHead(quicklist *quicklist, void *value, size_t sz) {
quicklistNode *orig_head = quicklist->head;//當(dāng)前的head
if (likely(
_quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) {//看下head讓不讓你插入
quicklist->head->zl =
ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD);//讓你插入就插
quicklistNodeUpdateSz(quicklist->head);//更新長(zhǎng)度
} else {
quicklistNode *node = quicklistCreateNode();//不讓插新建一個(gè)node
node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD);//在新的ziplist插入
quicklistNodeUpdateSz(node);//更新sz
_quicklistInsertNodeBefore(quicklist, quicklist->head, node);//插入了新節(jié)點(diǎn),維護(hù)quicklist的node關(guān)系,如果有壓縮啥的進(jìn)行壓縮
}
quicklist->count++;//cout++
quicklist->head->count++;//cout++
return (orig_head != quicklist->head);
}
相對(duì)于這種標(biāo)準(zhǔn)的雙向鏈表來(lái)說(shuō),插入就變得非常簡(jiǎn)單了。首先是確定插入頭或者尾,插入的時(shí)候先判斷該節(jié)點(diǎn)能否插入。能否插入喃是有幾個(gè)標(biāo)準(zhǔn)的。第一個(gè)是不是設(shè)置了fill fill為負(fù)數(shù)的時(shí)候代表級(jí)別,每個(gè)級(jí)別ziplist能容納的大小喃在上面有。如果沒(méi)有設(shè)置的話(huà)。就看有沒(méi)大于安全的標(biāo)準(zhǔn)大小。沒(méi)有的話(huà)再看fill是不是正數(shù),正數(shù)代表ziplist能夠容納的個(gè)數(shù)。這樣判斷完成能不能夠進(jìn)行該節(jié)點(diǎn)的插入。不能的話(huà)就創(chuàng)建一個(gè)新的節(jié)點(diǎn),把數(shù)據(jù)插入進(jìn)去。然后維護(hù)一下quicklist的雙向鏈表關(guān)系。對(duì)老節(jié)點(diǎn)看下是否進(jìn)行壓縮操作等。
查找操作
list的查找操作主要是對(duì)index的
我們的quicklist的節(jié)點(diǎn)是由一個(gè)一個(gè)的ziplist構(gòu)成的每個(gè)ziplist都有大小。所以我們就只需要先根據(jù)我們每個(gè)node的個(gè)數(shù),從而找到對(duì)應(yīng)的ziplist,調(diào)用ziplist的index就能成功找到。
typedef struct quicklistEntry {
const quicklist *quicklist;//那個(gè)quicklist
quicklistNode *node; //哪一個(gè)node
unsigned char *zi; //找到在ziplist的位置
unsigned char *value;//str value
long long longval; //int value
unsigned int sz; //svalue len
int offset;//對(duì)于ziplist的index
} quicklistEntry;
/* Populate 'entry' with the element at the specified zero-based index
* where 0 is the head, 1 is the element next to head
* and so on. Negative integers are used in order to count
* from the tail, -1 is the last element, -2 the penultimate
* and so on. If the index is out of range 0 is returned.
*
* Returns 1 if element found
* Returns 0 if element not found */
int quicklistIndex(const quicklist *quicklist, const long long idx,
quicklistEntry *entry) {
quicklistNode *n;
unsigned long long accum = 0;
unsigned long long index;
int forward = idx < 0 ? 0 : 1; /* < 0 -> reverse, 0+ -> forward */ //向前還是像后
initEntry(entry);
entry->quicklist = quicklist;//設(shè)置list
if (!forward) {//方向
index = (-idx) - 1;
n = quicklist->tail;
} else {
index = idx;
n = quicklist->head;
}
if (index >= quicklist->count)//有米有這么多個(gè)
return 0;
while (likely(n)) { //找到對(duì)應(yīng)的位置
if ((accum + n->count) > index) {
break;
} else {
D("Skipping over (%p) %u at accum %lld", (void *)n, n->count,
accum);
accum += n->count;
n = forward ? n->next : n->prev;//下一個(gè)node
}
}
if (!n)//沒(méi)找到
return 0;
D("Found node: %p at accum %llu, idx %llu, sub+ %llu, sub- %llu", (void *)n,
accum, index, index - accum, (-index) - 1 + accum);
entry->node = n;//設(shè)置節(jié)點(diǎn)
if (forward) {//
/* forward = normal head-to-tail offset. */
entry->offset = index - accum;//位置 即ziplist的index
} else {
/* reverse = need negative offset for tail-to-head, so undo
* the result of the original if (index < 0) above. */
entry->offset = (-index) - 1 + accum;
}
quicklistDecompressNodeForUse(entry->node);//需要解壓就解壓
entry->zi = ziplistIndex(entry->node->zl, entry->offset);//找到節(jié)點(diǎn)塊
ziplistGet(entry->zi, &entry->value, &entry->sz, &entry->longval);//獲取節(jié)點(diǎn)的值 反編碼
/* The caller will use our result, so we don't re-compress here.
* The caller can recompress or delete the node as needed. */
return 1;
}
刪除
/* Delete a range of elements from the quicklist.
*
* elements may span across multiple quicklistNodes, so we
* have to be careful about tracking where we start and end.
*
* Returns 1 if entries were deleted, 0 if nothing was deleted. */
int quicklistDelRange(quicklist *quicklist, const long start,
const long count) {
if (count <= 0)
return 0;
unsigned long extent = count; /* range is inclusive of start position *///默認(rèn)你刪的個(gè)數(shù)是對(duì)的
if (start >= 0 && extent > (quicklist->count - start)) {//但是喃沒(méi)有這么多給你刪
/* if requesting delete more elements than exist, limit to list size. */
extent = quicklist->count - start;
} else if (start < 0 && extent > (unsigned long)(-start)) {
/* else, if at negative offset, limit max size to rest of list. */
extent = -start; /* c.f. LREM -29 29; just delete until end. */
}
quicklistEntry entry;
if (!quicklistIndex(quicklist, start, &entry))//先找一下位置
return 0;
D("Quicklist delete request for start %ld, count %ld, extent: %ld", start,
count, extent);
quicklistNode *node = entry.node;//拿到要?jiǎng)h的第一個(gè)node
/* iterate over next nodes until everything is deleted. */
while (extent) {
quicklistNode *next = node->next;//下一個(gè)節(jié)點(diǎn)喃 都是往一個(gè)方向刪除哈
unsigned long del;
int delete_entire_node = 0;
if (entry.offset == 0 && extent >= node->count) {//他是第一個(gè)節(jié)點(diǎn) 并且要?jiǎng)h的節(jié)點(diǎn)數(shù)大于這個(gè)node保存的節(jié)點(diǎn)數(shù) 就把這個(gè)node一起干掉
/* If we are deleting more than the count of this node, we
* can just delete the entire node without ziplist math. */
delete_entire_node = 1;
del = node->count;
} else if (entry.offset >= 0 && extent >= node->count) {//要?jiǎng)h的個(gè)數(shù)大于 并且是正向來(lái)的
/* If deleting more nodes after this one, calculate delete based
* on size of current node. */
del = node->count - entry.offset;
} else if (entry.offset < 0) {//如果是反向的話(huà)
/* If offset is negative, we are in the first run of this loop
* and we are deleting the entire range
* from this start offset to end of list. Since the Negative
* offset is the number of elements until the tail of the list,
* just use it directly as the deletion count. */
del = -entry.offset;
/* If the positive offset is greater than the remaining extent,
* we only delete the remaining extent, not the entire offset.
*/
if (del > extent)//看下能刪多少個(gè)
del = extent;
} else {
/* else, we are deleting less than the extent of this node, so
* use extent directly. */
del = extent;
}
D("[%ld]: asking to del: %ld because offset: %d; (ENTIRE NODE: %d), "
"node count: %u",
extent, del, entry.offset, delete_entire_node, node->count);
if (delete_entire_node) {//刪節(jié)點(diǎn)
__quicklistDelNode(quicklist, node);
} else {
quicklistDecompressNodeForUse(node);//解壓
node->zl = ziplistDeleteRange(node->zl, entry.offset, del);//ziplist刪range
quicklistNodeUpdateSz(node);//更新大小
node->count -= del;//更新個(gè)數(shù)
quicklist->count -= del;//更新個(gè)數(shù)
quicklistDeleteIfEmpty(quicklist, node);//看下節(jié)點(diǎn)是不是空了
if (node)
quicklistRecompressOnly(quicklist, node);//壓縮回去
}
extent -= del;//減除已經(jīng)刪除了的
node = next;//繼續(xù)進(jìn)行刪除
entry.offset = 0;
}
return 1;
}
這里給出的是刪除一個(gè)range
我們的操作也很簡(jiǎn)單。
1.先把要?jiǎng)h除的個(gè)數(shù)確定下來(lái)。可能會(huì)存在刪除的個(gè)數(shù)過(guò)多,其實(shí)沒(méi)有那么多的情況。
2.找到要開(kāi)始刪除的節(jié)點(diǎn)。刪除的情況其實(shí)特別的就是獲取當(dāng)前的offset為0,要?jiǎng)h除的個(gè)數(shù)大于等于當(dāng)前節(jié)點(diǎn)的數(shù)據(jù)個(gè)數(shù)。這樣就把它全部刪除。否則的話(huà)就判斷當(dāng)前節(jié)點(diǎn)需要?jiǎng)h除幾個(gè)
總結(jié)
quciklist把ziplist進(jìn)行了封裝,保證了ziplist的最大大小。從而壓縮了內(nèi)存。進(jìn)一步的提高了效率。
magicdeMacBook-Pro:~ magic$ redis-cli
127.0.0.1:6379> rpush xislist 'xistest'
(integer) 2
127.0.0.1:6379> object encoding xislist
"quicklist"
127.0.0.1:6379>
其實(shí)我們可以發(fā)現(xiàn)。我們的list就是使用的quicklist.
參照方法。我們就可以知道我們的,命令的效率。