動(dòng)手實(shí)現(xiàn) Redis 跳表(Go 語(yǔ)言)

引言

image

讀過(guò) Redis 源碼的童鞋,想必會(huì)知道 zset 實(shí)現(xiàn)時(shí),使用了「跳表」(Skiplist)這種數(shù)據(jù)結(jié)構(gòu)吧。它的原理非常容易理解,如果對(duì)鏈表比較熟悉,那么也會(huì)很容易理解「跳表」的工作原理(核心:有序鏈表 + 分層)。當(dāng)然,本文并不會(huì)詳細(xì)講解「跳表」的工作原理,以及對(duì)于 Redis 跳表源碼的詳細(xì)分析。因?yàn)橐呀?jīng)有前輩們產(chǎn)出了非常豐富的文章來(lái)講解 Redis 跳表,需要的話,推薦閱讀 這篇文章 了解更多細(xì)節(jié)。

總的來(lái)說(shuō),Redis 的 zset 實(shí)現(xiàn)中,選用「跳表」的主要原因如下:

  1. 原理清晰易懂,且容易實(shí)現(xiàn),方便維護(hù):對(duì)比下平衡樹(shù)或者紅黑樹(shù)(可能就像 Raft v.s. Paxos 的感覺(jué)一樣),不管是原理還是實(shí)現(xiàn)都簡(jiǎn)單了很多。平衡樹(shù)或者紅黑樹(shù)在實(shí)現(xiàn)時(shí),還要時(shí)刻維護(hù)節(jié)點(diǎn)關(guān)系,必要時(shí)還需要執(zhí)行樹(shù)的左旋或者右旋來(lái)保持平衡;
  2. 擁有媲美平衡樹(shù)或者紅黑樹(shù)的查詢效率:插入、刪除、查找的平均時(shí)間復(fù)雜度可以達(dá)到 O(logN)。

當(dāng)然,相對(duì)于 William Pugh 在他的論文中所描述的「跳表」算法而言,作者在實(shí)現(xiàn) Redis 中的「跳表」時(shí),給它加了點(diǎn)「料」:

  1. 允許重復(fù)的分?jǐn)?shù)存在;
  2. 在進(jìn)行比較時(shí),不僅會(huì)比較 score,還會(huì)考慮關(guān)聯(lián)的數(shù)據(jù);
  3. 添加了一個(gè)回退指針,從而構(gòu)成了一個(gè)雙向鏈表(level[0]),便于倒序遍歷鏈表(ZREVRANGE)使用。

好了,廢話完畢。接下來(lái)進(jìn)入正題,看看如何使用 Go 語(yǔ)言來(lái)實(shí)現(xiàn)「跳表」吧(貼代碼模式開(kāi)啟~)。

跳表實(shí)現(xiàn)

以下僅僅列出了幾個(gè)比較有趣且關(guān)鍵的方法實(shí)現(xiàn),即:插入、刪除和更新分?jǐn)?shù)。完整的實(shí)現(xiàn)源碼可以參考 這里 或者 這里,包含了比較詳細(xì)的單元測(cè)試。

數(shù)據(jù)結(jié)構(gòu)定義

需要說(shuō)明的是,為了簡(jiǎn)單起見(jiàn),假設(shè)存儲(chǔ)的元素是字符串類型(要是使用 interface{} 的話,又得加些代碼支持元素之間的比較了)。但是在 Redis 中,實(shí)際的 element 類型是 sds。

const (
    MaxLevel = 64 // 足以容納 2^64 個(gè)元素
    P = 0.25
)

type Node struct {
    elem string
    score float64
    backward *Node
    level []skipLevel
}

type skipLevel struct {
    // forward 每層都要有指向下一個(gè)節(jié)點(diǎn)的指針
    forward *Node
    // span 間隔定義為:從當(dāng)前節(jié)點(diǎn)到 forward 指向的下個(gè)節(jié)點(diǎn)之間間隔的節(jié)點(diǎn)數(shù)
    span int
}

type Skiplist struct {
    header, tail *Node
    level int // 記錄跳表的實(shí)際高度
    length int // 記錄跳表的長(zhǎng)度(不含頭節(jié)點(diǎn))
}

輔助方法

考慮到在實(shí)現(xiàn)時(shí),經(jīng)常需要比較 score 和 element,所以這里直接給 Node 實(shí)現(xiàn)了一些比較方法,便于使用。

func (node *Node) Compare(other *Node) int {
    if node.score < other.score || (node.score == other.score && node.elem < other.elem) {
        return -1
    } else if node.score > other.score || (node.score == other.score && node.elem > other.elem) {
        return 1
    } else {
        return 0
    }
}

func (node *Node) Lt(other *Node) bool {
    return node.Compare(other) < 0
}

func (node *Node) Lte(other *Node) bool {
    return node.Compare(other) <= 0
}

func (node *Node) Gt(other *Node) bool {
    return node.Compare(other) > 0
}

func (node *Node) Eq(other *Node) bool {
    return node.Compare(other) == 0
}

插入元素

// Insert 向跳表中插入一個(gè)新的元素。
// 步驟:
// 1. 查找插入位置
// 2. 創(chuàng)建新節(jié)點(diǎn),并在目標(biāo)位置插入節(jié)點(diǎn)
// 3. 調(diào)整跳表 backward 指針等
func (sl *Skiplist) Insert(score float64, elem string) *Node {
    var (
        // update 用于記錄每層待更新的節(jié)點(diǎn)
        update [MaxLevel]*Node
        // rank 用來(lái)記錄每層經(jīng)過(guò)的節(jié)點(diǎn)記錄(可以看成到頭節(jié)點(diǎn)的距離)
        rank [MaxLevel]int
        // 構(gòu)建一個(gè)新節(jié)點(diǎn),用于下面的大小判斷,其 level 在后面設(shè)置
        node = &Node{score: score, elem: elem}
    )
    cur := sl.header
    for i := sl.level - 1; i >= 0; i-- {
        if cur == sl.header {
            rank[i] = 0
        } else {
            rank[i] = rank[i+1]
        }
        // 與同層的后一個(gè)節(jié)點(diǎn)比較,如果后一個(gè)比目標(biāo)值小,則可以繼續(xù)向后
        // 否則下降到一層查找。注意這里的大小比較是按照 score 和
        // elem 綜合計(jì)算得到的。
        for cur.level[i].forward != nil && cur.level[i].forward.Lt(node) {
            rank[i] += cur.level[i].span
            // 同層繼續(xù)往后查找
            cur = cur.level[i].forward
        }
        update[i] = cur
    }
    // 調(diào)整跳表高度
    level := sl.randomLevel()
    if level > sl.level {
        // 初始化每層
        for i := level - 1; i >= sl.level; i-- {
            rank[i] = 0
            update[i] = sl.header
            update[i].level[i].span = sl.length
        }
        sl.level = level
    }
    // 更新節(jié)點(diǎn) level,并插入新節(jié)點(diǎn)
    node.setLevel(level)
    for i := 0; i < level; i++ {
        // 更新每層的節(jié)點(diǎn)指向
        node.level[i].forward = update[i].level[i].forward
        update[i].level[i].forward = node
        // 更新 span 信息
        node.level[i].span = update[i].level[i].span - (rank[0] - rank[i])
        update[i].level[i].span = (rank[0] - rank[i]) + 1
    }
    // 針對(duì)新增節(jié)點(diǎn) level < sl.level 的情況,需要更新上面沒(méi)有掃到的層 span
    for i := level; i < sl.level; i++ {
        update[i].level[i].span++
    }
    // 調(diào)整 backward 指針
    // 如果前一個(gè)節(jié)點(diǎn)是頭節(jié)點(diǎn),則 backward 為 nil
    // 否則 backward 指向之前節(jié)點(diǎn)
    if update[0] != sl.header {
        // update[0] 就是和新增節(jié)點(diǎn)相鄰的前一個(gè)節(jié)點(diǎn)
        node.backward = update[0]
    }
    // 如果新增節(jié)點(diǎn)是最后一個(gè),則需要更新 tail 指針
    if node.level[0].forward == nil {
        sl.tail = node
    } else {
        // 中間節(jié)點(diǎn),需要更新后一個(gè)節(jié)點(diǎn)的回退指針
        node.level[0].forward.backward = node
    }
    sl.length++
    return node
}

// randomLevel 對(duì)于新增節(jié)點(diǎn),返回一個(gè)隨機(jī)的 level
// 返回的 level 范圍為 [1, MaxLevel]。并且,采用的
// 算法會(huì)保證,更大的 level 返回的概率越低。
// 每個(gè) level 出現(xiàn)的概率計(jì)算:(1-p) * p^(level-1)
func (sl *Skiplist) randomLevel() int {
    level := 1
    for rand.Float64() < P && level < MaxLevel {
        level++
    }
    return level
}

刪除元素

// Delete 用于刪除跳表中指定的節(jié)點(diǎn)。
func (sl *Skiplist) Delete(score float64, elem string) *Node {
    // 第一步,找到需要?jiǎng)h除節(jié)點(diǎn)
    var (
        update [MaxLevel]*Node
        targetNode = &Node{elem: elem, score: score}
    )
    cur := sl.header
    for i := sl.level - 1; i >= 0; i-- {
        for cur.level[i].forward != nil && cur.level[i].forward.Lt(targetNode) {
            cur = cur.level[i].forward
        }
        update[i] = cur
    }
    // 目標(biāo)節(jié)點(diǎn)找到后,這里需要判斷下 elem 是否相等
    // score 可以重復(fù),所以必須要謹(jǐn)慎
    nodeToBeDeleted := update[0].level[0].forward
    if nodeToBeDeleted == nil || !nodeToBeDeleted.Eq(targetNode) {
        return nil
    }
    sl.deleteNode(update, nodeToBeDeleted)
    return nodeToBeDeleted
}

func (sl *Skiplist) deleteNode(update [64]*Node, nodeToBeDeleted *Node) {
    // 這時(shí)我們要?jiǎng)h除的節(jié)點(diǎn)就是 nodeToBeDeleted
    // 調(diào)整每層待更新節(jié)點(diǎn),修改 forward 指向
    for i := 0; i < sl.level; i++ {
        if update[i].level[i].forward == nodeToBeDeleted {
            update[i].level[i].forward = nodeToBeDeleted.level[i].forward
            update[i].level[i].span += nodeToBeDeleted.level[i].span - 1
        } else {
            update[i].level[i].span--
        }
    }
    // 調(diào)整回退指針:
    // 1. 如果被刪除的節(jié)點(diǎn)是最后一個(gè)節(jié)點(diǎn),需要更新 sl.tail
    // 2. 如果被刪除的節(jié)點(diǎn)位于中間,則直接更新后一個(gè)節(jié)點(diǎn) backward 即可
    if sl.tail == nodeToBeDeleted {
        sl.tail = nodeToBeDeleted.backward
    } else {
        nodeToBeDeleted.level[0].forward.backward = nodeToBeDeleted.backward
    }
    // 調(diào)整層數(shù)
    for sl.header.level[sl.level-1].forward == nil {
        sl.level--
    }
    // 減少節(jié)點(diǎn)計(jì)數(shù)
    sl.length--
    nodeToBeDeleted.backward = nil
    nodeToBeDeleted.level[0].forward = nil
}

更新分?jǐn)?shù)

// UpdateScore 用于更新節(jié)點(diǎn)的分?jǐn)?shù)。該函數(shù)會(huì)保證更新分?jǐn)?shù)后,
// 節(jié)點(diǎn)的有序性依然可以維持。
// 策略如下:
// 1. 快速判斷能否原節(jié)點(diǎn)修改,如果可以則直接修改并返回;
// 2. 采用更加昂貴的操作:刪除再添加。
func (sl *Skiplist) UpdateScore(curScore float64, elem string, newScore float64) *Node {
    var (
        update [MaxLevel]*Node
        targetNode = &Node{elem: elem, score: curScore}
    )
    cur := sl.header
    // 第一步,找到符合條件的目標(biāo)節(jié)點(diǎn)
    for i := sl.level - 1; i >= 0; i-- {
        for cur.level[i].forward != nil && cur.level[i].forward.Lt(targetNode) {
            cur = cur.level[i].forward
        }
        update[i] = cur
    }
    node := cur.level[0].forward
    if node == nil || !node.Eq(targetNode) {
        return nil
    }
    if sl.canUpdateScoreFor(node, newScore) {
        node.score = newScore
        return node
    } else {
        // 需要?jiǎng)h除舊節(jié)點(diǎn),增加新節(jié)點(diǎn)
        sl.deleteNode(update, node)
        return sl.Insert(newScore, node.elem)
    }
}

// canUpdateScoreFor 確定能否直接在原有的節(jié)點(diǎn)上進(jìn)行修改
// 什么條件才可以直接原地更新 score 呢?
// 1. node 是唯一一個(gè)數(shù)據(jù)節(jié)點(diǎn)(node.backward == NULL && node->level[0].forward == NULL)
// 2. node 是第一個(gè)數(shù)據(jù)節(jié)點(diǎn),且新的分?jǐn)?shù)要比 node 之后節(jié)點(diǎn)分?jǐn)?shù)要?。ㄟ@樣才能保證有序)
// 即:node.backward == NULL && node->level[0].forward->score > newScore)
// 3. node 是最后一個(gè)數(shù)據(jù)節(jié)點(diǎn),且 node 之前節(jié)點(diǎn)的分?jǐn)?shù)要比新改的分?jǐn)?shù)小
// 即:node->backward->score < newScore && node->level[0].forward == NULL
// 4. node 是修改的后的分?jǐn)?shù)恰好還能保證位于前一個(gè)和后一個(gè)節(jié)點(diǎn)分?jǐn)?shù)之間
// 即:node->backward->score < newscore && node->level[0].forward->score > newscore
func (sl *Skiplist) canUpdateScoreFor(node *Node, newScore float64) bool {
    if (node.backward == nil || node.backward.score < newScore) &&
        (node.level[0].forward == nil || node.level[0].forward.score > newScore) {
        return true
    }

    return false
}

總結(jié)

俗話說(shuō),「說(shuō)起來(lái)容易,做起來(lái)難」。在實(shí)現(xiàn)「跳表」的時(shí)候感受頗深,似乎看完 Redis 的「跳表」源碼和網(wǎng)上諸多前輩編寫(xiě)的文章后,自以為懂得了原理(可能確實(shí)懂了),但是在具體實(shí)現(xiàn)的時(shí)候還是踩了不少坑。比如,空指針引起 panic;i-- 寫(xiě)成了 i++ 導(dǎo)致查找失?。灰恍┻吔缜闆r的判斷等。總之,細(xì)節(jié)決定成敗,需要在保持思路清晰的同時(shí),更加謹(jǐn)慎一些才能寫(xiě)出足夠健壯的代碼來(lái)。當(dāng)然,這期間自然少不了單元測(cè)試的助攻,否則有很多問(wèn)題可能都沒(méi)法暴露出來(lái)~

參考

聲明

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容