引言

讀過(guò) Redis 源碼的童鞋,想必會(huì)知道 zset 實(shí)現(xiàn)時(shí),使用了「跳表」(Skiplist)這種數(shù)據(jù)結(jié)構(gòu)吧。它的原理非常容易理解,如果對(duì)鏈表比較熟悉,那么也會(huì)很容易理解「跳表」的工作原理(核心:有序鏈表 + 分層)。當(dāng)然,本文并不會(huì)詳細(xì)講解「跳表」的工作原理,以及對(duì)于 Redis 跳表源碼的詳細(xì)分析。因?yàn)橐呀?jīng)有前輩們產(chǎn)出了非常豐富的文章來(lái)講解 Redis 跳表,需要的話,推薦閱讀 這篇文章 了解更多細(xì)節(jié)。
總的來(lái)說(shuō),Redis 的 zset 實(shí)現(xiàn)中,選用「跳表」的主要原因如下:
- 原理清晰易懂,且容易實(shí)現(xiàn),方便維護(hù):對(duì)比下平衡樹(shù)或者紅黑樹(shù)(可能就像 Raft v.s. Paxos 的感覺(jué)一樣),不管是原理還是實(shí)現(xiàn)都簡(jiǎn)單了很多。平衡樹(shù)或者紅黑樹(shù)在實(shí)現(xiàn)時(shí),還要時(shí)刻維護(hù)節(jié)點(diǎn)關(guān)系,必要時(shí)還需要執(zhí)行樹(shù)的左旋或者右旋來(lái)保持平衡;
- 擁有媲美平衡樹(shù)或者紅黑樹(shù)的查詢效率:插入、刪除、查找的平均時(shí)間復(fù)雜度可以達(dá)到 O(logN)。
當(dāng)然,相對(duì)于 William Pugh 在他的論文中所描述的「跳表」算法而言,作者在實(shí)現(xiàn) Redis 中的「跳表」時(shí),給它加了點(diǎn)「料」:
- 允許重復(fù)的分?jǐn)?shù)存在;
- 在進(jìn)行比較時(shí),不僅會(huì)比較 score,還會(huì)考慮關(guān)聯(lián)的數(shù)據(jù);
- 添加了一個(gè)回退指針,從而構(gòu)成了一個(gè)雙向鏈表(level[0]),便于倒序遍歷鏈表(
ZREVRANGE)使用。
好了,廢話完畢。接下來(lái)進(jìn)入正題,看看如何使用 Go 語(yǔ)言來(lái)實(shí)現(xiàn)「跳表」吧(貼代碼模式開(kāi)啟~)。
跳表實(shí)現(xiàn)
以下僅僅列出了幾個(gè)比較有趣且關(guān)鍵的方法實(shí)現(xiàn),即:插入、刪除和更新分?jǐn)?shù)。完整的實(shí)現(xiàn)源碼可以參考 這里 或者 這里,包含了比較詳細(xì)的單元測(cè)試。
數(shù)據(jù)結(jié)構(gòu)定義
需要說(shuō)明的是,為了簡(jiǎn)單起見(jiàn),假設(shè)存儲(chǔ)的元素是字符串類型(要是使用 interface{} 的話,又得加些代碼支持元素之間的比較了)。但是在 Redis 中,實(shí)際的 element 類型是 sds。
const (
MaxLevel = 64 // 足以容納 2^64 個(gè)元素
P = 0.25
)
type Node struct {
elem string
score float64
backward *Node
level []skipLevel
}
type skipLevel struct {
// forward 每層都要有指向下一個(gè)節(jié)點(diǎn)的指針
forward *Node
// span 間隔定義為:從當(dāng)前節(jié)點(diǎn)到 forward 指向的下個(gè)節(jié)點(diǎn)之間間隔的節(jié)點(diǎn)數(shù)
span int
}
type Skiplist struct {
header, tail *Node
level int // 記錄跳表的實(shí)際高度
length int // 記錄跳表的長(zhǎng)度(不含頭節(jié)點(diǎn))
}
輔助方法
考慮到在實(shí)現(xiàn)時(shí),經(jīng)常需要比較 score 和 element,所以這里直接給 Node 實(shí)現(xiàn)了一些比較方法,便于使用。
func (node *Node) Compare(other *Node) int {
if node.score < other.score || (node.score == other.score && node.elem < other.elem) {
return -1
} else if node.score > other.score || (node.score == other.score && node.elem > other.elem) {
return 1
} else {
return 0
}
}
func (node *Node) Lt(other *Node) bool {
return node.Compare(other) < 0
}
func (node *Node) Lte(other *Node) bool {
return node.Compare(other) <= 0
}
func (node *Node) Gt(other *Node) bool {
return node.Compare(other) > 0
}
func (node *Node) Eq(other *Node) bool {
return node.Compare(other) == 0
}
插入元素
// Insert 向跳表中插入一個(gè)新的元素。
// 步驟:
// 1. 查找插入位置
// 2. 創(chuàng)建新節(jié)點(diǎn),并在目標(biāo)位置插入節(jié)點(diǎn)
// 3. 調(diào)整跳表 backward 指針等
func (sl *Skiplist) Insert(score float64, elem string) *Node {
var (
// update 用于記錄每層待更新的節(jié)點(diǎn)
update [MaxLevel]*Node
// rank 用來(lái)記錄每層經(jīng)過(guò)的節(jié)點(diǎn)記錄(可以看成到頭節(jié)點(diǎn)的距離)
rank [MaxLevel]int
// 構(gòu)建一個(gè)新節(jié)點(diǎn),用于下面的大小判斷,其 level 在后面設(shè)置
node = &Node{score: score, elem: elem}
)
cur := sl.header
for i := sl.level - 1; i >= 0; i-- {
if cur == sl.header {
rank[i] = 0
} else {
rank[i] = rank[i+1]
}
// 與同層的后一個(gè)節(jié)點(diǎn)比較,如果后一個(gè)比目標(biāo)值小,則可以繼續(xù)向后
// 否則下降到一層查找。注意這里的大小比較是按照 score 和
// elem 綜合計(jì)算得到的。
for cur.level[i].forward != nil && cur.level[i].forward.Lt(node) {
rank[i] += cur.level[i].span
// 同層繼續(xù)往后查找
cur = cur.level[i].forward
}
update[i] = cur
}
// 調(diào)整跳表高度
level := sl.randomLevel()
if level > sl.level {
// 初始化每層
for i := level - 1; i >= sl.level; i-- {
rank[i] = 0
update[i] = sl.header
update[i].level[i].span = sl.length
}
sl.level = level
}
// 更新節(jié)點(diǎn) level,并插入新節(jié)點(diǎn)
node.setLevel(level)
for i := 0; i < level; i++ {
// 更新每層的節(jié)點(diǎn)指向
node.level[i].forward = update[i].level[i].forward
update[i].level[i].forward = node
// 更新 span 信息
node.level[i].span = update[i].level[i].span - (rank[0] - rank[i])
update[i].level[i].span = (rank[0] - rank[i]) + 1
}
// 針對(duì)新增節(jié)點(diǎn) level < sl.level 的情況,需要更新上面沒(méi)有掃到的層 span
for i := level; i < sl.level; i++ {
update[i].level[i].span++
}
// 調(diào)整 backward 指針
// 如果前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),則 backward 為 nil
// 否則 backward 指向之前節(jié)點(diǎn)
if update[0] != sl.header {
// update[0] 就是和新增節(jié)點(diǎn)相鄰的前一個(gè)節(jié)點(diǎn)
node.backward = update[0]
}
// 如果新增節(jié)點(diǎn)是最后一個(gè),則需要更新 tail 指針
if node.level[0].forward == nil {
sl.tail = node
} else {
// 中間節(jié)點(diǎn),需要更新后一個(gè)節(jié)點(diǎn)的回退指針
node.level[0].forward.backward = node
}
sl.length++
return node
}
// randomLevel 對(duì)于新增節(jié)點(diǎn),返回一個(gè)隨機(jī)的 level
// 返回的 level 范圍為 [1, MaxLevel]。并且,采用的
// 算法會(huì)保證,更大的 level 返回的概率越低。
// 每個(gè) level 出現(xiàn)的概率計(jì)算:(1-p) * p^(level-1)
func (sl *Skiplist) randomLevel() int {
level := 1
for rand.Float64() < P && level < MaxLevel {
level++
}
return level
}
刪除元素
// Delete 用于刪除跳表中指定的節(jié)點(diǎn)。
func (sl *Skiplist) Delete(score float64, elem string) *Node {
// 第一步,找到需要?jiǎng)h除節(jié)點(diǎn)
var (
update [MaxLevel]*Node
targetNode = &Node{elem: elem, score: score}
)
cur := sl.header
for i := sl.level - 1; i >= 0; i-- {
for cur.level[i].forward != nil && cur.level[i].forward.Lt(targetNode) {
cur = cur.level[i].forward
}
update[i] = cur
}
// 目標(biāo)節(jié)點(diǎn)找到后,這里需要判斷下 elem 是否相等
// score 可以重復(fù),所以必須要謹(jǐn)慎
nodeToBeDeleted := update[0].level[0].forward
if nodeToBeDeleted == nil || !nodeToBeDeleted.Eq(targetNode) {
return nil
}
sl.deleteNode(update, nodeToBeDeleted)
return nodeToBeDeleted
}
func (sl *Skiplist) deleteNode(update [64]*Node, nodeToBeDeleted *Node) {
// 這時(shí)我們要?jiǎng)h除的節(jié)點(diǎn)就是 nodeToBeDeleted
// 調(diào)整每層待更新節(jié)點(diǎn),修改 forward 指向
for i := 0; i < sl.level; i++ {
if update[i].level[i].forward == nodeToBeDeleted {
update[i].level[i].forward = nodeToBeDeleted.level[i].forward
update[i].level[i].span += nodeToBeDeleted.level[i].span - 1
} else {
update[i].level[i].span--
}
}
// 調(diào)整回退指針:
// 1. 如果被刪除的節(jié)點(diǎn)是最后一個(gè)節(jié)點(diǎn),需要更新 sl.tail
// 2. 如果被刪除的節(jié)點(diǎn)位于中間,則直接更新后一個(gè)節(jié)點(diǎn) backward 即可
if sl.tail == nodeToBeDeleted {
sl.tail = nodeToBeDeleted.backward
} else {
nodeToBeDeleted.level[0].forward.backward = nodeToBeDeleted.backward
}
// 調(diào)整層數(shù)
for sl.header.level[sl.level-1].forward == nil {
sl.level--
}
// 減少節(jié)點(diǎn)計(jì)數(shù)
sl.length--
nodeToBeDeleted.backward = nil
nodeToBeDeleted.level[0].forward = nil
}
更新分?jǐn)?shù)
// UpdateScore 用于更新節(jié)點(diǎn)的分?jǐn)?shù)。該函數(shù)會(huì)保證更新分?jǐn)?shù)后,
// 節(jié)點(diǎn)的有序性依然可以維持。
// 策略如下:
// 1. 快速判斷能否原節(jié)點(diǎn)修改,如果可以則直接修改并返回;
// 2. 采用更加昂貴的操作:刪除再添加。
func (sl *Skiplist) UpdateScore(curScore float64, elem string, newScore float64) *Node {
var (
update [MaxLevel]*Node
targetNode = &Node{elem: elem, score: curScore}
)
cur := sl.header
// 第一步,找到符合條件的目標(biāo)節(jié)點(diǎn)
for i := sl.level - 1; i >= 0; i-- {
for cur.level[i].forward != nil && cur.level[i].forward.Lt(targetNode) {
cur = cur.level[i].forward
}
update[i] = cur
}
node := cur.level[0].forward
if node == nil || !node.Eq(targetNode) {
return nil
}
if sl.canUpdateScoreFor(node, newScore) {
node.score = newScore
return node
} else {
// 需要?jiǎng)h除舊節(jié)點(diǎn),增加新節(jié)點(diǎn)
sl.deleteNode(update, node)
return sl.Insert(newScore, node.elem)
}
}
// canUpdateScoreFor 確定能否直接在原有的節(jié)點(diǎn)上進(jìn)行修改
// 什么條件才可以直接原地更新 score 呢?
// 1. node 是唯一一個(gè)數(shù)據(jù)節(jié)點(diǎn)(node.backward == NULL && node->level[0].forward == NULL)
// 2. node 是第一個(gè)數(shù)據(jù)節(jié)點(diǎn),且新的分?jǐn)?shù)要比 node 之后節(jié)點(diǎn)分?jǐn)?shù)要?。ㄟ@樣才能保證有序)
// 即:node.backward == NULL && node->level[0].forward->score > newScore)
// 3. node 是最后一個(gè)數(shù)據(jù)節(jié)點(diǎn),且 node 之前節(jié)點(diǎn)的分?jǐn)?shù)要比新改的分?jǐn)?shù)小
// 即:node->backward->score < newScore && node->level[0].forward == NULL
// 4. node 是修改的后的分?jǐn)?shù)恰好還能保證位于前一個(gè)和后一個(gè)節(jié)點(diǎn)分?jǐn)?shù)之間
// 即:node->backward->score < newscore && node->level[0].forward->score > newscore
func (sl *Skiplist) canUpdateScoreFor(node *Node, newScore float64) bool {
if (node.backward == nil || node.backward.score < newScore) &&
(node.level[0].forward == nil || node.level[0].forward.score > newScore) {
return true
}
return false
}
總結(jié)
俗話說(shuō),「說(shuō)起來(lái)容易,做起來(lái)難」。在實(shí)現(xiàn)「跳表」的時(shí)候感受頗深,似乎看完 Redis 的「跳表」源碼和網(wǎng)上諸多前輩編寫(xiě)的文章后,自以為懂得了原理(可能確實(shí)懂了),但是在具體實(shí)現(xiàn)的時(shí)候還是踩了不少坑。比如,空指針引起 panic;i-- 寫(xiě)成了 i++ 導(dǎo)致查找失?。灰恍┻吔缜闆r的判斷等。總之,細(xì)節(jié)決定成敗,需要在保持思路清晰的同時(shí),更加謹(jǐn)慎一些才能寫(xiě)出足夠健壯的代碼來(lái)。當(dāng)然,這期間自然少不了單元測(cè)試的助攻,否則有很多問(wèn)題可能都沒(méi)法暴露出來(lái)~
參考
聲明
- 本文鏈接: http://ifaceless.space/2019/12/11/implement-redis-skiplist-in-go/
- 版權(quán)聲明:本博客所有文章除特別聲明外,均采用 CC BY-NC-SA 3.0 許可協(xié)議。轉(zhuǎn)載請(qǐng)注明出處!