跳躍表是Redis zset的底層實現(xiàn)之一,zset在member較多時會采用跳躍表作為底層實現(xiàn),它在添加、刪除、查找節(jié)點上都擁有與紅黑樹相當?shù)男阅?,它其實說白了就是一種特殊的鏈表,鏈表的每個節(jié)點存了不同的“層”信息,用這種分層存節(jié)點的方式在查找節(jié)點時能跳過些節(jié)點,從而使添加、刪除、查找操作都擁有了O(logn)的平均時間復(fù)雜度。
下面簡單介紹一下跳躍表:
跳躍表最低層(第一層)是一個擁有跳躍表所有節(jié)點的普通鏈表,每次在往跳躍表插入鏈表節(jié)點時一定會插入到這個最低層,至于是否插入到上層 就由拋硬幣決定(這么說不是很準確,redis里這個概率是1/4而非1/2,為了表述方便先這么說),什么意思呢?假設(shè)已經(jīng)有一個跳躍表,其高度只有一層:

往表中插入節(jié)點“7”時,假設(shè)插入7時拋硬幣的結(jié)果是正,則在第二層中插入“7”節(jié)點,繼續(xù)拋一次看看還能不能上到第三層,為反則停止插入,上層不再插入“7”節(jié)點了:

同理插入“4”節(jié)點假設(shè)連續(xù)拋兩次都拋了正面,第三次拋了反面,則“4”節(jié)點會插入到2、3層:

這就是跳躍表最基本的樣子。
查找一個節(jié)點時,我們只需從高層到低層,一個個鏈表查找,每次找到該層鏈表中小于等于目標節(jié)點的最大節(jié)點,直到找到為止。由于高層的鏈表迭代時會“跳過”低層的部分節(jié)點,所以跳躍表會比正常的鏈表查找少查部分節(jié)點,這也是skiplist名字的由來。
假如我們需要查找節(jié)點“5”:
先遍歷最高層,發(fā)現(xiàn)第三層頭結(jié)點的下一個節(jié)點是“4”,4<5,所以游標定位到“4”節(jié)點,但是“4”節(jié)點的下一個節(jié)點是空,得繼續(xù)往低層走;第二層也從“4”節(jié)點開始,“4”節(jié)點在第二層的下一個節(jié)點是“7”,7>5,公交車做過頭了,回來依舊定位在“4”節(jié)點;繼續(xù)往低層走,第一層“4”節(jié)點的下一個節(jié)點是“5”,這就找到了。

事實上插入前也需要進行跳躍表查找操作,上文演示的插入流程只是直接用了鏈表而省略了這一步。
跳躍表有了個大概的了解,接下來我們細說redis里的skiplist
zskiplist相關(guān)的結(jié)構(gòu)體聲明
redis的skiplist有兩個主要的數(shù)據(jù)結(jié)構(gòu),
- zskiplistNode:skiplist的節(jié)點
- zskiplist:用來記錄一個skiplist的長度、高度等信息
zskiplistNode的定義
typedef struct zskiplistNode {
robj *obj;//zset的member
double score;//zset的score
struct zskiplistNode *backward;//指向上一個節(jié)點,用于zrevrange命令
struct zskiplistLevel {
struct zskiplistNode *forward;//指向下一個節(jié)點
unsigned int span;//到達后一個節(jié)點的跨度(兩個相鄰節(jié)點span為1)
} level[];//該節(jié)點在各層的信息,柔性數(shù)組成員
} zskiplistNode;
backward變量是特意為zrevrange*系列命令準備的,目的是為了使跳躍表實現(xiàn)反向遍歷,普通跳躍表的實現(xiàn)里是非必要的。
level變量記錄了此節(jié)點各層的信息:
- level[x]->forward表示該節(jié)點在第x層的下一個節(jié)點。跳躍表節(jié)點都會出現(xiàn)在最低層的鏈表里,所以都會有l(wèi)evel[0],通過level[0]->forward實現(xiàn)跳躍表正向遍歷,zrange*系列命令就是以此實現(xiàn)的。
- level[x]->span表示該節(jié)點到第x層的下一個節(jié)點跳躍了多少節(jié)點。span主要是為了zrank、zrevrank服務(wù),普通跳躍表的實現(xiàn)里是非必要的。
zskiplist的定義
typedef struct zskiplist {
struct zskiplistNode *header, *tail;//跳躍表頭尾節(jié)點
unsigned long length;//節(jié)點個數(shù)
int level;//除頭結(jié)點外最大的層數(shù)
} zskiplist;
zskiplist的頭結(jié)點不是一個有效的節(jié)點,它有ZSKIPLIST_MAXLEVEL層(32層),每層的forward指向該層跳躍表的第一個節(jié)點,若沒有則為null。
btw:跳躍表節(jié)點的層數(shù)限制在了32,若想超過32層得連續(xù)32次拋硬幣都得到正面,這得有足夠多的節(jié)點,redis限定了拋硬幣正面的概率為1/4,所以到達32層的概率為(1/2)^64,一般一臺64位的計算機能擁有的最大內(nèi)存也無法存儲這么多zskiplistNode,所以對于基本使用 32層的上限已經(jīng)足夠高了,再高也沒必要 浪費頭節(jié)點的內(nèi)存。
最終zskiplist內(nèi)存布局如下:

也可以去掉不必要的部分(backward、obj),作出如下抽象后的圖:

下文我們都通過簡化圖來分析跳躍表的基本操作流程。
zskiplist相關(guān)接口
創(chuàng)建一個zskiplist 時間復(fù)雜度O(1)
創(chuàng)建一個zskiplist就是創(chuàng)建一個高度為ZSKIPLIST_MAXLEVEL(32)的頭結(jié)點,score為0,member為null。
代碼如下:
zskiplist *zslCreate(void) {
int j;
zskiplist *zsl;
zsl = zmalloc(sizeof(*zsl));
zsl->level = 1;
zsl->length = 0;
zsl->header = zslCreateNode(ZSKIPLIST_MAXLEVEL,0,NULL);//畫圖,32level的頭結(jié)點
for (j = 0; j < ZSKIPLIST_MAXLEVEL; j++) {
//頭結(jié)點每個level的下一個節(jié)點都初始化為null,跨度為0
zsl->header->level[j].forward = NULL;
zsl->header->level[j].span = 0;
}
zsl->header->backward = NULL;
zsl->tail = NULL;
return zsl;
}
//為指定高度的節(jié)點分配空間并賦值,insert操作也要用到
zskiplistNode *zslCreateNode(int level, double score, robj *obj) {
zskiplistNode *zn = zmalloc(sizeof(*zn)+level*sizeof(struct zskiplistLevel));//柔性數(shù)組成員
zn->score = score;
zn->obj = obj;
return zn;
}
執(zhí)行完zslCreate后會得到如下布局:

zskiplist插入一個節(jié)點 時間復(fù)雜度O(logn)
插入一個節(jié)點時需要做以下工作

要找到新節(jié)點的插入位置,只需像上文介紹的那樣,從高層向低層找即可。在找的過程中用update[]數(shù)組記錄每一層插入位置的上一個節(jié)點,用rank[]數(shù)組記錄每一層插入位置的上一個節(jié)點在跳躍表中的排名。根據(jù)update[]插入新節(jié)點,插入完成后再根據(jù)rank[]更新跨度信息即可。
ps:redis允許節(jié)點有重復(fù)的score,當score相同時根據(jù)member(代碼里的obj指向的字符串)的字典序來排名。
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {//返回一個隨機的層數(shù),不是level的索引是層數(shù)
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))//有1/4的概率加入到上一層中
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
//根據(jù)score、member插入一個節(jié)點到某個zskiplist中
//調(diào)用這個函數(shù)前需要確認obj(member)還不在跳躍表里,避免重復(fù)插入
zskiplistNode *zslInsert(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;//每一層的最后一個小于score的節(jié)點,因為插入節(jié)點需要修改每一層這個節(jié)點的上一個節(jié)點的信息(跨度),所以得保留一下,更重要的,這是保留插入位置
unsigned int rank[ZSKIPLIST_MAXLEVEL];//記錄每一層插入節(jié)點的上一個節(jié)點在skiplist中的排名
int i, level;//變量i作為zslInsert函數(shù)里循環(huán)的索引值,變量level為插入節(jié)點的層數(shù)(不是層數(shù)的索引)
serverAssert(!isnan(score));//isnan是一個macro,用于判斷參數(shù)是否是NAN(非數(shù)字)
x = zsl->header;//x用于迭代zskiplistNode
for (i = zsl->level-1; i >= 0; i--) {//從最高層向最底層查詢,找到合適的插入位置
/* store rank that is crossed to reach the insert position */
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];//記錄每一層插入節(jié)點的上一個節(jié)點的排名
while (x->level[i].forward &&//當前層的下一個節(jié)點存在
(x->level[i].forward->score < score ||//下一個節(jié)點的分數(shù)小于需要插入的分數(shù)
(x->level[i].forward->score == score &&//score相同的情況下,根據(jù)member字符串的大小來比較(二進制安全的memcmp)
compareStringObjects(x->level[i].forward->obj,obj) < 0))) {
rank[i] += x->level[i].span;//每層的跨度
x = x->level[i].forward;//下一個節(jié)點
}
update[i] = x;//當前層的最后一個小于score的節(jié)點
}
/* we assume the key is not already inside, since we allow duplicated
* scores, and the re-insertion of score and redis object should never
* happen since the caller of zslInsert() should test in the hash table
* if the element is already inside or not. */
level = zslRandomLevel();
if (level > zsl->level) {//大于之前跳躍表的高度所以沒有記錄update[i],因為插入的節(jié)點有這么高所以要修改這些頭結(jié)點的信息
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;//高出部分的頭結(jié)點在還沒插入當前節(jié)點時跨度應(yīng)該是整張表,插入之后會重新更新這個值
}
zsl->level = level;
}
x = zslCreateNode(level,score,obj);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here */
//rank[0]是x在第0層的上一個節(jié)點的實際排名,rank[i]是x在第i層的上一個節(jié)點的實際排名,它們倆的差值為x在第i層的上一個節(jié)點與x之間的距離
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels */
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;//尾節(jié)點
zsl->length++;
return x;
}
假如往圖2插入一個score為7的節(jié)點,則會按照下圖方式所示進行:
找到插入位置(藍色的線表示查找路徑):

- 假設(shè)層數(shù)為3,將節(jié)點7插入進skiplist并修改附近節(jié)點的相關(guān)屬性:

zskiplist里的刪除操作
釋放一個節(jié)點的內(nèi)存 時間復(fù)雜度O(1):
void zslFreeNode(zskiplistNode *node) {
decrRefCount(node->obj);//member的引用計數(shù)-1,防止內(nèi)存泄漏
zfree(node);
}
釋放整個skiplist的內(nèi)存 時間復(fù)雜度O(n):
void zslFree(zskiplist *zsl) {
//任何一個節(jié)點一定有l(wèi)evel[0],所以迭代level[0]來刪除所有節(jié)點
zskiplistNode *node = zsl->header->level[0].forward, *next;
zfree(zsl->header);
while(node) {
next = node->level[0].forward;
zslFreeNode(node);
node = next;
}
zfree(zsl);
}
從skiplist中刪除并釋放掉一個節(jié)點 時間復(fù)雜度O(logn):
主要分為以下3個步驟:
- 根據(jù)member(obj)和score找到節(jié)點的位置(代碼里變量x即為該節(jié)點,update記錄每層x的上一個節(jié)點)
- 調(diào)動zslDeleteNode把x節(jié)點從skiplist邏輯上刪除
- 釋放x節(jié)點內(nèi)存。
/* Delete an element with matching score/object from the skiplist. */
//從skiplist邏輯上刪除一個節(jié)點并釋放該節(jié)點的內(nèi)存
int zslDelete(zskiplist *zsl, double score, robj *obj) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
compareStringObjects(x->level[i].forward->obj,obj) < 0)))
x = x->level[i].forward;
update[i] = x;
}
/* We may have multiple elements with the same score, what we need
* is to find the element with both the right score and object. */
x = x->level[0].forward;//要刪除的節(jié)點
if (x && score == x->score && equalStringObjects(x->obj,obj)) {
zslDeleteNode(zsl, x, update);
zslFreeNode(x);//obj的引用計數(shù)-1并釋放節(jié)點內(nèi)存
return 1;
}
return 0; /* not found */
}
/* Internal function used by zslDelete, zslDeleteRangeByScore and zslDeleteRangeByRank and zslDeleteRangeByLex*/
//從skiplist邏輯上刪除一個節(jié)點(不釋放內(nèi)存,僅改變節(jié)點位置關(guān)系)
//x為要刪除的節(jié)點
//update為每一層x的上一個節(jié)點(為了更新x上一個節(jié)點的forward和span屬性)
void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update) {
int i;
for (i = 0; i < zsl->level; i++) {
if (update[i]->level[i].forward == x) {//當前層有x節(jié)點
update[i]->level[i].span += x->level[i].span - 1;//...=x->level[i].span;
update[i]->level[i].forward = x->level[i].forward;//跨過x節(jié)點
} else {//當前層沒有x節(jié)點
update[i]->level[i].span -= 1;
}
}
if (x->level[0].forward) {//是否是tail節(jié)點
x->level[0].forward->backward = x->backward;
} else {
zsl->tail = x->backward;
}
while(zsl->level > 1 && zsl->header->level[zsl->level-1].forward == NULL)//刪除了最高層數(shù)的節(jié)點
zsl->level--;
zsl->length--;
}
根據(jù)范圍刪除節(jié)點 時間復(fù)雜度O(log(n)+m), m是范圍內(nèi)元素的個數(shù)
- zslDeleteRangeByScore //刪除 滿足score范圍 的節(jié)點
- zslDeleteRangeByRank //刪除 滿足排名范圍 的節(jié)點
- zslDeleteRangeByLex //在所有節(jié)點的score相同的skiplist中,刪除 滿足member字典序范圍 的節(jié)點
代碼懶得貼了
范圍查找操作 時間復(fù)雜度O(log(n)+m), m是范圍內(nèi)元素的個數(shù)
根據(jù)查找范圍的類型 zskiplist查找可以分為三類:
- rank范圍查找
- score范圍查找
- member范圍查找
rank范圍查找 zrangeGenericCommand
功能:給定一個zero-based排名范圍(start,end),從zskiplist中找出滿足該范圍的所有節(jié)點。
zrangeGenericCommand函數(shù)考慮了一些與客戶端交互的內(nèi)容,學zskiplist的時候沒必要細看,它主要分為以下兩步:
- 1.調(diào)用zslGetElementByRank找到排名start+1的節(jié)點·······O(logn)
- 2.從這個節(jié)點開始遍歷(end-start+1)個節(jié)點·······O(m)
下面是zslGetElementByRank的代碼:
/* Finds an element by its rank. The rank argument needs to be 1-based. */
//O(logn)
zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
zskiplistNode *x;
unsigned long traversed = 0;
int i;
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) { //從高層向底層累加span直到累加的值等于rank
while (x->level[i].forward && (traversed + x->level[i].span) <= rank)
{
traversed += x->level[i].span;
x = x->level[i].forward;
}
if (traversed == rank) {
return x;
}
}
return NULL;
}
score范圍查找 genericZrangebyscoreCommand
功能:給定一個score的范圍,從zskiplist中找出滿足該score范圍的所有節(jié)點。
為了方便的表示score范圍的開閉區(qū)間,redis在server.h里聲明了一個表示zset分數(shù)區(qū)間的類型zrangespec,關(guān)于score范圍查找的相關(guān)函數(shù)都會用到它:
/* Struct to hold a inclusive/exclusive range spec by score comparison. */
typedef struct {
double min, max;
//是否是開閉區(qū)間,1為開,0位閉
int minex, maxex; /* are min or max exclusive? */
} zrangespec;
判斷一個score與zrangespec區(qū)間內(nèi)最小值、最大值的關(guān)系:
//給定value是否>(>=)此范圍的下界
//gte(greater than or equal to)
static int zslValueGteMin(double value, zrangespec *spec) {
return spec->minex ? (value > spec->min) : (value >= spec->min);
}
//給定value是否<(<=)此范圍的上界
//lte(less than or equal to)
int zslValueLteMax(double value, zrangespec *spec) {
return spec->maxex ? (value < spec->max) : (value <= spec->max);
}
根據(jù)上述兩個函數(shù),就可以用O(1)時間復(fù)雜度判斷一個zskiplist的score是否與zrangespec分數(shù)區(qū)間有交集:
/* Returns if there is a part of the zset is in range. */
//用O(1)的時間復(fù)雜度判斷zset(zsl)的分數(shù)范圍是否與給定分數(shù)范圍(range)有交集。
//
//range(6,9] zsl{1,2,3,4,5} 或zsl{10,12,13} 都是不在范圍內(nèi)
//
int zslIsInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
/* Test for ranges that will always be empty. */
if (range->min > range->max ||
(range->min == range->max && (range->minex || range->maxex)))
return 0;
x = zsl->tail;
if (x == NULL || !zslValueGteMin(x->score,range))//尾節(jié)點小于范圍下界
return 0;
x = zsl->header->level[0].forward;
if (x == NULL || !zslValueLteMax(x->score,range))//頭節(jié)點大于范圍上界
return 0;
return 1;//在
}
genericZrangebyscoreCommand函數(shù)也考慮了很多與客戶端交互的內(nèi)容,就學習底層跳躍表實現(xiàn)時沒必要細看,我們只需要知道底層是如何做到的即可,主要執(zhí)行如下步驟:
- 1.調(diào)用zslParseRange把客戶端傳過來的范圍min、max轉(zhuǎn)換成zrangespec區(qū)間類型 保存在range變量里。
- 2.調(diào)用zslFirstInRange找到zskiplist中滿足range條件的最小節(jié)點。(假設(shè)是正序的范圍查找)·······O(logn)
- 3.從這個節(jié)點開始遍歷,直到調(diào)用zslValueLteMax()找到最后一個小于range條件上界的節(jié)點。·······O(m)
放一下核心的代碼zslFirstInRange:
/* Find the first node that is contained in the specified range.
* Returns NULL when no element is contained in the range. */
//滿足range條件最小的那個 O(logn)
zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range) {
zskiplistNode *x;
int i;
/* If everything is out of range, return early. */
if (!zslIsInRange(zsl,range)) return NULL;//保證下面的邏輯一定能找到范圍內(nèi)的節(jié)點
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* Go forward while *OUT* of range. */
while (x->level[i].forward &&
!zslValueGteMin(x->level[i].forward->score,range))
x = x->level[i].forward;
}
/* This is an inner range, so the next node cannot be NULL. */
x = x->level[0].forward;
serverAssert(x != NULL);
/* Check if score <= max. */
if (!zslValueLteMax(x->score,range)) return NULL;
return x;
}
member范圍查找 genericZrangebylexCommand
功能:在一個所有節(jié)點的score都相同的zskiplist中,找到滿足member字符串字典序范圍的所有節(jié)點。
底層用memcmp比較兩個字符串的大小。實現(xiàn)的流程與genericZrangebyscoreCommand很像,有興趣再看。
其他
zslgetrank 根據(jù)member和score獲得節(jié)點在該skiplist中的rank
zslParseRange 把客戶端傳過來的范圍min、max轉(zhuǎn)換成zrangespec區(qū)間類型 返回給range參數(shù)。
...
為什么不用紅黑樹作為zset底層實現(xiàn)?
其實作者Antirez已經(jīng)給出了答復(fù):
https://news.ycombinator.com/item?id=1171423
劃重點:They are not very memory intensive. It's up to you basically.
既然取決于自己,skiplist實現(xiàn)簡單就選它了。至于可能的好處和壞處大概整理了一下有這些:
缺點:
- 比紅黑樹占用更多的內(nèi)存,每個節(jié)點的大小取決于該節(jié)點的層數(shù)
- 空間局部性較差導(dǎo)致緩存命中率低,感覺上會比紅黑樹更慢
優(yōu)點:
- 實現(xiàn)比紅黑樹簡單
- 比紅黑樹更容易擴展,作者之后實現(xiàn)zrank指令時沒怎么改動代碼。
- 紅黑樹插入刪除時為了平衡高度需要旋轉(zhuǎn)附近節(jié)點,高并發(fā)時需要鎖。skiplist不需要考慮。
- 一般用zset的操作都是執(zhí)行zrange之類的操作,取出一片連續(xù)的節(jié)點。這些操作的緩存命中率不會比紅黑樹低。
參考資料
Skip Lists: A Probabilistic Alternative to Balanced Trees
Skip Lists: Done Right