golang筆記——map底層原理

Map是一種常用的kv數(shù)據(jù)結(jié)構(gòu),程序設(shè)計中經(jīng)常使用,且作為一種最基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu),很多編程語言本身提供的api都會有實現(xiàn),Go也不例外,今天我們將從一下三個方面為大家分析Go中的Map。

  1. 什么是Map
  2. go中Map的實現(xiàn)原理
  3. go中Map的擴容機制
  4. go中map的常見問題

一、什么是Map

map 是由 key-value 對組成的;key 只會出現(xiàn)一次。
和 map 相關(guān)的操作主要是:

  • 增加一個 k-v 對 —— Add or insert;
  • 刪除一個 k-v 對 —— Remove or delete;
  • 修改某個 k 對應(yīng)的 v —— Reassign;
  • 查詢某個 k 對應(yīng)的 v —— Lookup;

Go map 的形式就是鍵值對,給定一個鍵,能盡快的找到對應(yīng)的值。

  • 任何可比較的類型都可以是鍵——所有簡單的標量類型(布爾、整數(shù)、浮點、復數(shù)、字符串)、指針、通道、數(shù)組、接口。
  • 不可比較的類型——切片、映射、函數(shù)。

因此,映射鍵和值應(yīng)存儲在為映射分配的內(nèi)存中。這個過程我們使用的方法叫做哈希算法,哈希算法一般包括兩步,偽代碼如下:

  • hash = hashfunc(key)
  • index = hash % array_size

第一步,通過哈希算法計算鍵的哈希值,這個結(jié)果與桶的數(shù)量無關(guān)。而且這個計算出的哈希值一般是唯一的,避免出現(xiàn)兩個不同的鍵得到相同的哈希值。 第二步,通過執(zhí)行取模運算得到 0 到 array_size-1 之間的 index 序號。
hash沖突

如上圖所示,數(shù)組一個下標處只能存儲一個元素,也就是說一個數(shù)組下標只能存儲一對key,value, hashkey(xiaoming)=4占用了下標0的位置,假設(shè)我們遇到另一個key,hashkey(xiaowang)也是4,這就是hash沖突(不同的key經(jīng)過hash之后得到的值一樣),那么key=xiaowang的怎么存儲?

hash沖突的常見解決方法有兩種:

  1. 開放定址法:其中常見的又有線性探測法,線性補償探測法,隨機探測法。這里我們主要說一下線性探測法。
    線性探測法,字面意思就是按照順序來,從沖突的下標處開始往后探測,到達數(shù)組末尾時,從數(shù)組開始處探測,直到找到一個空位置存儲這個key,當數(shù)組都找不到的情況下會擴容(事實上當數(shù)組容量快滿的時候就會擴容了);查找某一個key的時候,找到key對應(yīng)的下標,比較key是否相等,如果相等直接取出來,否則按照順尋探測直到碰到一個空位置,說明key不存在。
    如下圖:首先存儲key=xiaoming在下標0處,當存儲key=xiaowang時,hash沖突了,按照線性探測,存儲在下標1處,(紅色的線是沖突或者下標已經(jīng)被占用了) 再者key=xiaozhao存儲在下標4處,當存儲key=xiaoliu是,hash沖突了,按照線性探測,從頭開始,存儲在下標2處 (黃色的是沖突或者下標已經(jīng)被占用了)

  2. 鏈表法:當key的hash沖突時,我們在沖突位置的元素上形成一個鏈表,通過指針互連接,當查找時,發(fā)現(xiàn)key沖突,順著鏈表一直往下找,直到鏈表的尾節(jié)點,找不到則返回空,如下圖:

開放定址(線性探測)和拉鏈的優(yōu)缺點:

  1. 由上面可以看出拉鏈法比線性探測處理簡單
  2. 線性探測查找比拉鏈法會更消耗時間
  3. 線性探測會更加容易導致擴容,而拉鏈不會
  4. 拉鏈存儲了指針,所以空間上會比線性探測占用多一點
  5. 拉鏈是動態(tài)申請存儲空間的,所以更適合鏈長不確定的

二、go中Map的實現(xiàn)原理

map的源碼位于 src/runtime/map.go 中 筆者go的版本是1.17在go中,map同樣也是數(shù)組存儲的的,每個數(shù)組下標處存儲的是一個bucket,這個bucket的類型見下面代碼,每個bucket中可以存儲8個kv鍵值對,當每個bucket存儲的kv對到達8個之后,會通過overflow指針指向一個新的bucket,從而形成一個鏈表。

首先我們來看一下map最重要的兩個結(jié)構(gòu):
hmap

// A header for a Go map.
type hmap struct {
    // Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
    // Make sure this stays in sync with the compiler's definition.
    count     int // 元素個數(shù),調(diào)用 len(map) 時,直接返回此值
    flags     uint8 //代表當前 map 的狀態(tài)(是否處于正在寫入的狀態(tài)等)
    B         uint8  // buckets 的對數(shù) log_2
    noverflow uint16 // 為 map 中溢出桶的數(shù)量。當溢出的桶太多時,map 會進行 same-size map growth,其實質(zhì)是避免桶過大導致內(nèi)存泄露
    hash0     uint32 // 代表生成 hash 的隨機數(shù)種子

    buckets    unsafe.Pointer // 指向 buckets 數(shù)組,大小為 2^B,如果元素個數(shù)為0,就為 nil
    oldbuckets unsafe.Pointer // 是在 map 擴容時存儲舊桶的,當所有舊桶中的數(shù)據(jù)都已經(jīng)轉(zhuǎn)移到了新桶中時,則清空
    nevacuate  uintptr        // 在擴容時使用,用于標記當前舊桶中小于 nevacuate 的數(shù)據(jù)都已經(jīng)轉(zhuǎn)移到了新桶中
    extra *mapextra // 存儲 map 中的溢出桶
}

Go 中桶 buckets 是 bmap 結(jié)構(gòu)。在運行時只列出了首個字段,即一個固定長度為 8 的數(shù)組。此字段順序存儲 key 的哈希值的前 8 位。
bmap(bucket桶)

type bmap struct {
    // tophash generally contains the top byte of the hash value
    // for each key in this bucket. If tophash[0] < minTopHash,
    // tophash[0] is a bucket evacuation state instead.
    tophash [bucketCnt]uint8
    // Followed by bucketCnt keys and then bucketCnt elems.
    // NOTE: packing all the keys together and then all the elems together makes the
    // code a bit more complicated than alternating key/elem/key/elem/... but it allows
    // us to eliminate padding which would be needed for, e.g., map[int64]int8.
    // Followed by an overflow pointer.
}

tophash 通常包含此 buckets 中每個鍵的哈希值的最高字節(jié)。 如果 tophash[0] < minTopHash,則 tophash[0] 是一個桶疏散狀態(tài)。

map 在編譯時即確定了 map 中 key、value 及桶的大小,因此在運行時僅僅通過指針操作就可以找到特定位置的元素。編譯期動態(tài)地創(chuàng)建一個新的結(jié)構(gòu):

type bmap struct {
    topbits  [8]uint8
    keys     [8]keytype
    values   [8]valuetype
    pad      uintptr
    overflow uintptr
}

bmap 就是我們常說的“桶”,桶里面會最多裝 8 個 key,這些 key 之所以會落入同一個桶,是因為它們經(jīng)過哈希計算后,哈希結(jié)果是“一類”的。在桶內(nèi),又會根據(jù) key 計算出來的 hash 值的高 8 位來決定 key 到底落入桶內(nèi)的哪個位置(一個桶內(nèi)最多有 8 個位置)。桶在存儲的 tophash 字段后,會存儲 key 數(shù)組和 value 數(shù)組。

hmap和buckets關(guān)系

hmap


bucket

由此看出hmap和bucket的關(guān)系是這樣的:

key定位過程

key 經(jīng)過哈希計算后得到哈希值,共 64 個 bit 位(64位機),計算它到底要落在哪個桶時,只會用到最后 B 個 bit 位。還記得前面提到過的 B 嗎?如果 B = 5,那么桶的數(shù)量,也就是 buckets 數(shù)組的長度是 2^5 = 32。

例如,現(xiàn)在有一個 key 經(jīng)過哈希函數(shù)計算后,得到的哈希結(jié)果是:

10010111 | 000011110110110010001111001010100010010110010101010 │ 01010

用最后的 5 個 bit 位: 01010,值為 10,也就是 10 號桶。這個操作本質(zhì)上就是取余操作,但是取余開銷太大,所以代碼實現(xiàn)上用的位操作代替。

再用哈希值的高 8 位,找到此 key 在 bucket 中的位置,這是在尋找已有的 key。最開始桶內(nèi)還沒有 key,新加入的 key 會找到第一個空位,然后放入。

buckets 編號就是桶編號,當兩個不同的 key 落在同一個桶中,也就是發(fā)生了哈希沖突。沖突的解決手段是用鏈表法:在 bucket 中,從前往后找到第一個空位。這樣,在查找某個 key 時,先找到對應(yīng)的桶,再去遍歷 bucket 中的 key。



上圖中,假定 B = 5,所以 bucket 總數(shù)就是 2^5 = 32。首先計算出待查找 key 的哈希,使用低 5 位 00110,找到對應(yīng)的 6 號 bucket,使用高 8 位 10010111,對應(yīng)十進制 151,在 6 號 bucket 中尋找 tophash 值(HOB hash)為 151 的 key,找到了 2 號槽位,這樣整個查找過程就結(jié)束了。

如果在 bucket 中沒找到,并且 overflow 不為空,還要繼續(xù)去 overflow bucket 中尋找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket。

查找某個 key 的底層函數(shù)是 mapacess 系列函數(shù),函數(shù)的作用類似,我們先以mapacess1函數(shù)為例:

func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
    // ......
    // 如果 *hmap大小為0,或地址為nil,返回零值
    if h == nil || h.count == 0 {
        if t.hashMightPanic() {
            t.hasher(key, 0) // see issue 23734
        }
        return unsafe.Pointer(&zeroVal[0])
    }
    // 讀寫沖突
    if h.flags&hashWriting != 0 {
        throw("concurrent map read and map write")
    }
    // 計算哈希值,并且加入 hash0 引入隨機性
    hash := t.hasher(key, uintptr(h.hash0))
    // 比如 B=5,那 m 就是31,二進制是全 1
    // 求 bucket num 時,將 hash 與 m 相與,
    // 達到 bucket num 由 hash 的低 8 位決定的效果
    m := bucketMask(h.B)
    // b 就是 bucket 的地址
    b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
    // oldbuckets 不為 nil,說明發(fā)生了擴容
    if c := h.oldbuckets; c != nil {
        // 如果不是同 size 擴容(看后面擴容的內(nèi)容)
        // 對應(yīng)條件 1 的解決方案
        if !h.sameSizeGrow() {
            // 新 bucket 數(shù)量是老的 2 倍
            m >>= 1
        }
        // 求出 key 在老的 map 中的 bucket 位置
        oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
        // 如果 oldb 沒有搬遷到新的 bucket
        // 那就在老的 bucket 中尋找
        if !evacuated(oldb) {
            b = oldb
        }
    }
    // 計算出高 8 位的 hash
    // 相當于右移 56 位,只取高8位
    top := tophash(hash)
bucketloop:
    // 遍歷 bucket 的 8 個位置
    // bucket 找完(還沒找到),繼續(xù)到 overflow bucket 里找
    for ; b != nil; b = b.overflow(t) {
        for i := uintptr(0); i < bucketCnt; i++ {
            // tophash 不匹配,繼續(xù)遍歷
            if b.tophash[i] != top {
                if b.tophash[i] == emptyRest {
                    break bucketloop
                }
                continue
            }
            // tophash 匹配,定位到 key 的位置
            k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
            // key 是指針
            if t.indirectkey() {
                // 解引用
                k = *((*unsafe.Pointer)(k))
            }
            // 如果 key 相等
            if t.key.equal(key, k) {
                // 定位到 value 的位置
                e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
                // value 是指針
                if t.indirectelem() {
                    // 解引用
                    e = *((*unsafe.Pointer)(e))
                }
                return e
            }
        }
    }
    // 返回零值
    return unsafe.Pointer(&zeroVal[0])
}

func tophash(hash uintptr) uint8 {
    top := uint8(hash >> (sys.PtrSize*8 - 8))
    if top < minTopHash {
        // 增加一個 minTopHash
        top += minTopHash
    }
    return top
}

函數(shù)返回 h[key] 的指針,如果 h 中沒有此 key,那就會返回一個 key 相應(yīng)類型的零值,不會返回 nil。

源碼中我們可以看到key定位公式為:

k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))

源碼中我們可以看到value定位公式為:

v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))

b 是 bmap 的地址,這里 bmap 還是源碼里定義的結(jié)構(gòu)體,只包含一個 tophash 數(shù)組,經(jīng)編譯器擴充之后的結(jié)構(gòu)體才包含 key,value,overflow 這些字段。 dataOffset 是 key 相對于 bmap 起始地址的偏移

dataOffset = unsafe.Offsetof(struct {
        b bmap
        v int64
    }{}.v)

因此 bucket 里 key 的起始地址就是 unsafe.Pointer(b)+dataOffset。第 i 個 key 的地址就要在此基礎(chǔ)上跨過 i 個 key 的大小;而我們又知道,value 的地址是在所有 key 之后,因此第 i 個 value 的地址還需要加上所有 key 的偏移。理解了這些,上面 key 和 value 的定位公式就很好理解了。

  • minTopHash
    當一個 cell 的 tophash 值小于 minTopHash 時,標志這個 cell 的遷移狀態(tài)。
    因為這個狀態(tài)值是放在 tophash 數(shù)組里,為了和正常的哈希值區(qū)分開,會給 key 計算出來的哈希值一個增量:minTopHash。這樣就能區(qū)分正常的 top hash 值和表示狀態(tài)的哈希值。

下面的這幾種狀態(tài)就表征了 bucket 的情況:

// 空的 cell,也是初始時 bucket 的狀態(tài)
empty          = 0
// 空的 cell,表示 cell 已經(jīng)被遷移到新的 bucket
evacuatedEmpty = 1
// key,value 已經(jīng)搬遷完畢,但是 key 都在新 bucket 前半部分,
// 后面擴容部分會再講到。
evacuatedX     = 2
// 同上,key 在后半部分
evacuatedY     = 3
// tophash 的最小正常值
minTopHash     = 4

源碼里判斷這個 bucket 是否已經(jīng)搬遷完畢,用到的函數(shù):

func evacuated(b *bmap) bool {
    h := b.tophash[0]
    return h > emptyOne && h < minTopHash
}

只取了 tophash 數(shù)組的第一個值,判斷它是否在 0-4 之間。對比上面的常量,當 top hash 是 evacuatedEmptyevacuatedX、evacuatedY 這三個值之一,說明此 bucket 中的 key 全部被搬遷到了新 bucket。

三、go中Map的擴容機制

使用哈希表的目的就是要快速查找到目標 key,然而,隨著向 map 中添加的 key 越來越多,key 發(fā)生碰撞的概率也越來越大。bucket 中的 8 個 cell 會被逐漸塞滿,查找、插入、刪除 key 的效率也會越來越低。最理想的情況是一個 bucket 只裝一個 key,這樣,就能達到 O(1) 的效率,但這樣空間消耗太大,用空間換時間的代價太高。

Go 語言采用一個 bucket 里裝載 8 個 key,定位到某個 bucket 后,還需要再定位到具體的 key,這實際上又用了時間換空間。

當然,這樣做,要有一個度,不然所有的 key 都落在了同一個 bucket 里,直接退化成了鏈表,各種操作的效率直接降為 O(n),是不行的。

因此,需要有一個指標來衡量前面描述的情況,這就是裝載因子。Go 源碼里這樣定義 裝載因子:

loadFactor := count / (2^B)

count 就是 map 的元素個數(shù),2^B 表示 bucket 數(shù)量。

再來說觸發(fā) map 擴容的時機:在向 map 插入新 key 的時候,會進行條件檢測,符合下面這 2 個條件,就會觸發(fā)擴容:

  1. 裝載因子超過閾值,源碼里定義的閾值是 6.5(元素數(shù)量>>bucket數(shù)量)。
  2. 頻繁增刪map,導致未被使用的overflow 的 bucket 數(shù)量過多:當 B 小于 15,也就是 bucket 總數(shù) 2^B 小于 2^15 時,如果 overflow 的 bucket 數(shù)量超過 2^B(未用于存儲的bucket數(shù)量過多),就會觸發(fā)擴容;當 B >= 15,也就是 bucket 總數(shù) 2^B 大于等于 2^15,如果 overflow 的 bucket 數(shù)量超過 2^15,就會觸發(fā)擴容。

通過匯編語言可以找到賦值操作對應(yīng)源碼中的函數(shù)是 mapassign,對應(yīng)擴容條件的源碼如下:

// src/runtime/hashmap.go/mapassign

// 觸發(fā)擴容時機
if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
        hashGrow(t, h)
    }

// 裝載因子超過 6.5
func overLoadFactor(count int64, B uint8) bool {
    return count >= bucketCnt && float32(count) >= loadFactor*float32((uint64(1)<<B))
}

// overflow buckets 太多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
    if B < 16 {
        return noverflow >= uint16(1)<<B
    }
    return noverflow >= 1<<15
}

解釋一下:
第 1 點:我們知道,每個 bucket 有 8 個空位,在沒有溢出,且所有的桶都裝滿了的情況下,裝載因子算出來的結(jié)果是 8。因此當裝載因子超過 6.5 時,表明很多 bucket 都快要裝滿了,查找效率和插入效率都變低了。在這個時候進行擴容是有必要的。
第 2 點:是對第 1 點的補充。就是說在裝載因子比較小的情況下,這時候 map 的查找和插入效率也很低,而第 1 點識別不出來這種情況。表面現(xiàn)象就是計算裝載因子的分子比較小,即 map 里元素總數(shù)少,但是 bucket 數(shù)量多(真實分配的 bucket 數(shù)量多,包括大量的 overflow bucket)。

不難想像造成這種情況的原因:不停地插入、刪除元素。先插入很多元素,導致創(chuàng)建了很多 bucket,但是裝載因子達不到第 1 點的臨界值,未觸發(fā)擴容來緩解這種情況。之后,刪除元素降低元素總數(shù)量,再插入很多元素,導致創(chuàng)建很多的 overflow bucket,但就是不會觸犯第 1 點的規(guī)定,你能拿我怎么辦?overflow bucket 數(shù)量太多,導致 key 會很分散,查找插入效率低得嚇人,因此出臺第 2 點規(guī)定。這就像是一座空城,房子很多,但是住戶很少,都分散了,找起人來很困難。

對于命中條件 1,2 的限制,都會發(fā)生擴容。但是擴容的策略并不相同,畢竟兩種條件應(yīng)對的場景不同。

對于條件 1,元素太多,而 bucket 數(shù)量太少,很簡單:將 B 加 1,bucket 最大數(shù)量(2^B)直接變成原來 bucket 數(shù)量的 2 倍。于是,就有新老 bucket 了。注意,這時候元素都在老 bucket 里,還沒遷移到新的 bucket 來。而且,新 bucket 只是最大數(shù)量變?yōu)樵瓉碜畲髷?shù)量(2^B)的 2 倍(2^B * 2)。

對于條件 2,其實元素沒那么多,但是 overflow bucket 數(shù)特別多,說明很多 bucket 都沒裝滿。解決辦法就是開辟一個新 bucket 空間,將老 bucket 中的元素移動到新 bucket,使得同一個 bucket 中的 key 排列地更緊密。這樣,原來,在 overflow bucket 中的 key 可以移動到 bucket 中來。結(jié)果是節(jié)省空間,提高 bucket 利用率,map 的查找和插入效率自然就會提升。

對于條件 2 的解決方案,曹大的博客里還提出了一個極端的情況:如果插入 map 的 key 哈希都一樣,就會落到同一個 bucket 里,超過 8 個就會產(chǎn)生 overflow bucket,結(jié)果也會造成 overflow bucket 數(shù)過多。移動元素其實解決不了問題,因為這時整個哈希表已經(jīng)退化成了一個鏈表,操作效率變成了 O(n)。

再來看一下擴容具體是怎么做的。由于 map 擴容需要將原有的 key/value 重新搬遷到新的內(nèi)存地址,如果有大量的 key/value 需要搬遷,會非常影響性能。因此 Go map 的擴容采取了一種稱為“漸進式”地方式,原有的 key 并不會一次性搬遷完畢,每次最多只會搬遷 2 個 bucket。

上面說的 hashGrow() 函數(shù)實際上并沒有真正地“搬遷”,它只是分配好了新的 buckets,并將老的 buckets 掛到了 oldbuckets 字段上。真正搬遷 buckets 的動作在 growWork() 函數(shù)中,而調(diào)用 growWork() 函數(shù)的動作是在 mapassign 和 mapdelete 函數(shù)中。也就是插入或修改、刪除 key 的時候,都會嘗試進行搬遷 buckets 的工作。先檢查 oldbuckets 是否搬遷完畢,具體來說就是檢查 oldbuckets 是否為 nil。

我們先看 hashGrow() 函數(shù)所做的工作,再來看具體的搬遷 buckets 是如何進行的

func hashGrow(t *maptype, h *hmap) {
    // B+1 相當于是原來 2 倍的空間
    bigger := uint8(1)

    // 對應(yīng)條件 2
    if !overLoadFactor(int64(h.count), h.B) {
        // 進行等量的內(nèi)存擴容,所以 B 不變
        bigger = 0
        h.flags |= sameSizeGrow
    }
    // 將老 buckets 掛到 buckets 上
    oldbuckets := h.buckets
    // 申請新的 buckets 空間
    newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)

    // x = 01010011
    // y = 01010100
    // z = x &^ y = 00000011
    // 如果 y bit 位為 1,那么結(jié)果 z 對應(yīng) bit 位就為 0,否則 z 對應(yīng) bit 位就和 x 對應(yīng) bit 位的值相同。
    // 
    // 先把 h.flags 中 iterator 和 oldIterator 對應(yīng)位清 0,然后如果發(fā)現(xiàn) iterator 位為 1,
    // 那就把它轉(zhuǎn)接到 oldIterator 位,使得 oldIterator 標志位變成 1。
    // 潛臺詞就是:buckets 現(xiàn)在掛到了 oldBuckets 名下了,對應(yīng)的標志位也轉(zhuǎn)接過去吧。
    flags := h.flags &^ (iterator | oldIterator)
    if h.flags&iterator != 0 {
        flags |= oldIterator
    }
    // 提交 grow 的動作
    h.B += bigger
    h.flags = flags
    h.oldbuckets = oldbuckets
    h.buckets = newbuckets
    // 搬遷進度為 0
    h.nevacuate = 0
    // overflow buckets 數(shù)為 0
    h.noverflow = 0

    // ……
}

主要是申請到了新的 buckets 空間,把相關(guān)的標志位都進行了處理:例如標志 nevacuate 被置為 0, 表示當前搬遷進度為 0。

再來看看真正執(zhí)行搬遷工作的 growWork() 函數(shù)。

func growWork(t *maptype, h *hmap, bucket uintptr) {
    // 確認搬遷老的 bucket 對應(yīng)正在使用的 bucket
    // 為了確認搬遷的 bucket 是我們正在使用的 bucket。
    // oldbucketmask() 函數(shù)返回擴容前的 map 的 bucketmask
    evacuate(t, h, bucket&h.oldbucketmask())

    // 再搬遷一個 bucket,以加快搬遷進程
    if h.growing() {
        evacuate(t, h, h.nevacuate)
    }
}

h.growing() 函數(shù)非常簡單:

func (h *hmap) growing() bool {
    return h.oldbuckets != nil
}

如果 oldbuckets 不為空,說明還沒有搬遷完畢,還得繼續(xù)搬。

所謂的 bucketmask,作用就是將 key 計算出來的哈希值與 bucketmask 相與,得到的結(jié)果就是 key 應(yīng)該落入的桶。比如 B = 5,那么 bucketmask 的低 5 位是 11111,其余位是 0,hash 值與其相與的意思是,只有 hash 值的低 5 位決策 key 到底落入哪個 bucket。

接下來,我們集中所有的精力在搬遷的關(guān)鍵函數(shù) evacuate。

func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
    // 定位老的 bucket 地址
    b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
    // 結(jié)果是 2^B,如 B = 5,結(jié)果為32
    newbit := h.noldbuckets()
    // key 的哈希函數(shù)
    alg := t.key.alg
    // 如果 b 沒有被搬遷過
    if !evacuated(b) {
        var (
            // 表示bucket 移動的目標地址
            x, y   *bmap
            // 指向 x,y 中的 key/val
            xi, yi int
            // 指向 x,y 中的 key
            xk, yk unsafe.Pointer
            // 指向 x,y 中的 value
            xv, yv unsafe.Pointer
        )
        // 默認是等 size 擴容,前后 bucket 序號不變
        // 使用 x 來進行搬遷
        x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
        xi = 0
        xk = add(unsafe.Pointer(x), dataOffset)
        xv = add(xk, bucketCnt*uintptr(t.keysize))、

        // 如果不是等 size 擴容,前后 bucket 序號有變
        // 使用 y 來進行搬遷
        if !h.sameSizeGrow() {
            // y 代表的 bucket 序號增加了 2^B
            y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
            yi = 0
            yk = add(unsafe.Pointer(y), dataOffset)
            yv = add(yk, bucketCnt*uintptr(t.keysize))
        }

        // 遍歷所有的 bucket,包括 overflow buckets
        // b 是老的 bucket 地址
        for ; b != nil; b = b.overflow(t) {
            k := add(unsafe.Pointer(b), dataOffset)
            v := add(k, bucketCnt*uintptr(t.keysize))

            // 遍歷 bucket 中的所有 cell
            for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
                // 當前 cell 的 top hash 值
                top := b.tophash[i]
                // 如果 cell 為空,即沒有 key
                if top == empty {
                    // 那就標志它被"搬遷"過
                    b.tophash[i] = evacuatedEmpty
                    // 繼續(xù)下個 cell
                    continue
                }
                // 正常不會出現(xiàn)這種情況
                // 未被搬遷的 cell 只可能是 empty 或是
                // 正常的 top hash(大于 minTopHash)
                if top < minTopHash {
                    throw("bad map state")
                }

                k2 := k
                // 如果 key 是指針,則解引用
                if t.indirectkey {
                    k2 = *((*unsafe.Pointer)(k2))
                }

                // 默認使用 X,等量擴容
                useX := true
                // 如果不是等量擴容
                if !h.sameSizeGrow() {
                    // 計算 hash 值,和 key 第一次寫入時一樣
                    hash := alg.hash(k2, uintptr(h.hash0))

                    // 如果有協(xié)程正在遍歷 map
                    if h.flags&iterator != 0 {
                        // 如果出現(xiàn) 相同的 key 值,算出來的 hash 值不同
                        if !t.reflexivekey && !alg.equal(k2, k2) {
                            // 只有在 float 變量的 NaN() 情況下會出現(xiàn)
                            if top&1 != 0 {
                                // 第 B 位置 1
                                hash |= newbit
                            } else {
                                // 第 B 位置 0
                                hash &^= newbit
                            }
                            // 取高 8 位作為 top hash 值
                            top = uint8(hash >> (sys.PtrSize*8 - 8))
                            if top < minTopHash {
                                top += minTopHash
                            }
                        }
                    }

                    // 取決于新哈希值的 oldB+1 位是 0 還是 1
                    // 詳細看后面的文章
                    useX = hash&newbit == 0
                }

                // 如果 key 搬到 X 部分
                if useX {
                    // 標志老的 cell 的 top hash 值,表示搬移到 X 部分
                    b.tophash[i] = evacuatedX
                    // 如果 xi 等于 8,說明要溢出了
                    if xi == bucketCnt {
                        // 新建一個 bucket
                        newx := h.newoverflow(t, x)
                        x = newx
                        // xi 從 0 開始計數(shù)
                        xi = 0
                        // xk 表示 key 要移動到的位置
                        xk = add(unsafe.Pointer(x), dataOffset)
                        // xv 表示 value 要移動到的位置
                        xv = add(xk, bucketCnt*uintptr(t.keysize))
                    }
                    // 設(shè)置 top hash 值
                    x.tophash[xi] = top
                    // key 是指針
                    if t.indirectkey {
                        // 將原 key(是指針)復制到新位置
                        *(*unsafe.Pointer)(xk) = k2 // copy pointer
                    } else {
                        // 將原 key(是值)復制到新位置
                        typedmemmove(t.key, xk, k) // copy value
                    }
                    // value 是指針,操作同 key
                    if t.indirectvalue {
                        *(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
                    } else {
                        typedmemmove(t.elem, xv, v)
                    }

                    // 定位到下一個 cell
                    xi++
                    xk = add(xk, uintptr(t.keysize))
                    xv = add(xv, uintptr(t.valuesize))
                } else { // key 搬到 Y 部分,操作同 X 部分
                    // ……
                    // 省略了這部分,操作和 X 部分相同
                }
            }
        }
        // 如果沒有協(xié)程在使用老的 buckets,就把老 buckets 清除掉,幫助gc
        if h.flags&oldIterator == 0 {
            b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
            // 只清除bucket 的 key,value 部分,保留 top hash 部分,指示搬遷狀態(tài)
            if t.bucket.kind&kindNoPointers == 0 {
                memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            } else {
                memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
            }
        }
    }

    // 更新搬遷進度
    // 如果此次搬遷的 bucket 等于當前進度
    if oldbucket == h.nevacuate {
        // 進度加 1
        h.nevacuate = oldbucket + 1
        // Experiments suggest that 1024 is overkill by at least an order of magnitude.
        // Put it in there as a safeguard anyway, to ensure O(1) behavior.
        // 嘗試往后看 1024 個 bucket
        stop := h.nevacuate + 1024
        if stop > newbit {
            stop = newbit
        }
        // 尋找沒有搬遷的 bucket
        for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
            h.nevacuate++
        }
        
        // 現(xiàn)在 h.nevacuate 之前的 bucket 都被搬遷完畢
        
        // 所有的 buckets 搬遷完畢
        if h.nevacuate == newbit {
            // 清除老的 buckets
            h.oldbuckets = nil
            // 清除老的 overflow bucket
            // 回憶一下:[0] 表示當前 overflow bucket
            // [1] 表示 old overflow bucket
            if h.extra != nil {
                h.extra.overflow[1] = nil
            }
            // 清除正在擴容的標志位
            h.flags &^= sameSizeGrow
        }
    }
}

搬遷的目的就是將老的 buckets 搬遷到新的 buckets。而通過前面的說明我們知道,應(yīng)對條件 1,新的 buckets 數(shù)量是之前的一倍,應(yīng)對條件 2,新的 buckets 數(shù)量和之前相等。

對于條件 2,從老的 buckets 搬遷到新的 buckets,由于 bucktes 數(shù)量不變,因此可以按序號來搬,比如原來在 0 號 bucktes,到新的地方后,仍然放在 0 號 buckets。

對于條件 1,就沒這么簡單了。要重新計算 key 的哈希,才能決定它到底落在哪個 bucket。例如,原來 B = 5,計算出 key 的哈希后,只用看它的低 5 位,就能決定它落在哪個 bucket。擴容后,B 變成了 6,因此需要多看一位,它的低 6 位決定 key 落在哪個 bucket。這稱為 rehash。

因此,某個 key 在搬遷前后 bucket 序號可能和原來相等,也可能是相比原來加上 2^B(原來的 B 值),取決于 hash 值 第 6 bit 位是 0 還是 1。

再明確一個問題:如果擴容后,B 增加了 1,意味著 buckets 總數(shù)是原來的 2 倍,原來 1 號的桶“裂變”到兩個桶。

例如,原始 B = 2,1號 bucket 中有 2 個 key 的哈希值低 3 位分別為:010,110。由于原來 B = 2,所以低 2 位 10 決定它們落在 2 號桶,現(xiàn)在 B 變成 3,所以 010、110 分別落入 2、6 號桶。

搬遷函數(shù)中的幾個關(guān)鍵點:

  1. 雙層循環(huán):外層:遍歷 bucket 和 overflow bucket, 內(nèi)層:遍歷 bucket 的所有 cell
  2. X, Y part,其實就是我們說的如果是擴容到原來的 2 倍,桶的數(shù)量是原來的 2 倍,前一半桶被稱為 X part,后一半桶被稱為 Y part。一個 bucket 中的 key 可能會分裂落到 2 個桶,一個位于 X part,一個位于 Y part(取決于擴容后,倒數(shù)第B位是 0 還是 1)。所以在搬遷一個 cell 之前,需要知道這個 cell 中的 key 是落到哪個 Part

有一種 key,每次對它計算 hash,得到的結(jié)果都不一樣。這個 key 就是 math.NaN() 的結(jié)果,它的含義是 not a number,類型是 float64。

當它作為 map 的 key,在搬遷的時候,會遇到一個問題:再次計算它的哈希值和它當初插入 map 時的計算出來的哈希值不一樣!這就導致了這個 key 是永遠不會被 Get 操作獲取的!當我使用 m[math.NaN()] 語句的時候,是查不出來結(jié)果的。這個 key 只有在遍歷整個 map 的時候,才有機會現(xiàn)身。所以,可以向一個 map 插入任意數(shù)量的 math.NaN() 作為 key。

當搬遷碰到 math.NaN() 的 key 時,只通過 tophash 的最低位決定分配到 X part 還是 Y part(如果擴容后是原來 buckets 數(shù)量的 2 倍)。如果 tophash 的最低位是 0 ,分配到 X part;如果是 1 ,則分配到 Y part。

確定了要搬遷到的目標 bucket 后,搬遷操作就比較好進行了。將源 key/value 值 copy 到目的地相應(yīng)的位置。

設(shè)置 key 在原始 buckets 的 tophash 為 evacuatedX 或是 evacuatedY,表示已經(jīng)搬遷到了新 map 的 x part 或是 y part。新 map 的 tophash 則正常取 key 哈希值的高 8 位。

擴容前,B = 2,共有 4 個 buckets,lowbits 表示 hash 值的低位。假設(shè)我們不關(guān)注其他 buckets 情況,專注在 2 號 bucket。并且假設(shè) overflow 太多,觸發(fā)了等量擴容(對應(yīng)于前面的條件 2)。



擴容完成后,overflow bucket 消失了,key 都集中到了一個 bucket,更為緊湊了,提高了查找的效率。



假設(shè)觸發(fā)了 2 倍的擴容,那么擴容完成后,老 buckets 中的 key 分裂到了 2 個 新的 bucket。一個在 x part,一個在 y 的 part。依據(jù)是 hash 的 lowbits。新 map 中 0-3 稱為 x part,4-7 稱為 y part。

注意,上面的兩張圖忽略了其他 buckets 的搬遷情況,表示所有的 bucket 都搬遷完畢后的情形。實際上,我們知道,搬遷是一個“漸進”的過程,并不會一下子就全部搬遷完畢。所以在搬遷過程中,oldbuckets 指針還會指向原來老的 []bmap,并且已經(jīng)搬遷完畢的 key 的 tophash 值會是一個狀態(tài)值,表示 key 的搬遷去向。

四、go中map的常見問題

1、Key為什么是無序的?
map 在擴容后,會發(fā)生 key 的搬遷,原來落在同一個 bucket 中的 key,搬遷后,有些 key 就要遠走高飛了(bucket 序號加上了 2^B)。而遍歷的過程,就是按順序遍歷 bucket,同時按順序遍歷 bucket 中的 key。搬遷后,key 的位置發(fā)生了重大的變化,有些 key 飛上高枝,有些 key 則原地不動。這樣,遍歷 map 的結(jié)果就不可能按原來的順序了。

當我們在遍歷 go 中的 map 時,并不是固定地從 0 號 bucket 開始遍歷,每次都是從一個隨機值序號的 bucket 開始遍歷,并且是從這個 bucket 的一個隨機序號的 cell 開始遍歷。這樣,即使你是一個寫死的 map,僅僅只是遍歷它,也不太可能會返回一個固定序列的 key/value 對了。

2、float類型是否可以作為map的key?
從語法上看,是可以的。Go 語言中只要是可比較的類型都可以作為 key。除開 slice,map,functions 這幾種類型,其他類型都是 OK 的。具體包括:布爾值、數(shù)字、字符串、指針、通道、接口類型、結(jié)構(gòu)體、只包含上述類型的數(shù)組。這些類型的共同特征是支持 == 和 != 操作符,k1 == k2 時,可認為 k1 和 k2 是同一個 key。如果是結(jié)構(gòu)體,只有 hash 后的值相等以及字面值相等,才被認為是相同的 key。很多字面值相等的,hash出來的值不一定相等,比如引用。

float 型可以作為 key,但是由于精度的問題,會導致一些詭異的問題,慎用之。

3、map可以遍歷的同時刪除嗎?
map 并不是一個線程安全的數(shù)據(jù)結(jié)構(gòu)。同時讀寫一個 map 是未定義的行為,如果被檢測到,會直接 panic。

上面說的是發(fā)生在多個協(xié)程同時讀寫同一個 map 的情況下。 如果在同一個協(xié)程內(nèi)邊遍歷邊刪除,并不會檢測到同時讀寫,理論上是可以這樣做的。但是,遍歷的結(jié)果就可能不會是相同的了,有可能結(jié)果遍歷結(jié)果集中包含了刪除的 key,也有可能不包含,這取決于刪除 key 的時間:是在遍歷到 key 所在的 bucket 時刻前或者后。

一般而言,這可以通過讀寫鎖來解決:sync.RWMutex。

讀之前調(diào)用 RLock() 函數(shù),讀完之后調(diào)用 RUnlock() 函數(shù)解鎖;寫之前調(diào)用 Lock() 函數(shù),寫完之后,調(diào)用 Unlock() 解鎖。

另外,sync.Map 是線程安全的 map,也可以使用。

4、可以對map元素取地址嗎?
無法對 map 的 key 或 value 進行取址,將無法通過編譯

如果通過其他 hack 的方式,例如 unsafe.Pointer 等獲取到了 key 或 value 的地址,也不能長期持有,因為一旦發(fā)生擴容,key 和 value 的位置就會改變,之前保存的地址也就失效了。

5、如何比較兩個map是否相等?
map 深度相等的條件:

  1. 都為 nil
  2. 非空、長度相等,指向同一個 map 實體對象
  3. 相應(yīng)的 key 指向的 value “深度”相等
    直接將使用 map1 == map2 是錯誤的。這種寫法只能比較 map 是否為 nil。
    因此只能是遍歷map 的每個元素,比較元素是否都是深度相等。

6、map是線程安全的嗎?
map 不是線程安全的。

在查找、賦值、遍歷、刪除的過程中都會檢測寫標志,一旦發(fā)現(xiàn)寫標志置位(等于1),則直接 panic。賦值和刪除函數(shù)在檢測完寫標志是復位之后,先將寫標志位置位,才會進行之后的操作。

檢測寫標志:

if h.flags&hashWriting == 0 {
        throw("concurrent map writes")
    }

設(shè)置寫標志:

h.flags |= hashWriting

References:
https://xie.infoq.cn/article/5607679316cdfa7284bfa652a
https://golang.design/go-questions/map/principal/
https://topgoer.com/go%E5%9F%BA%E7%A1%80/Map%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.html
https://cloud.tencent.com/developer/article/1468799
https://www.cnblogs.com/dawnlight/p/15552513.html

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 由于本文篇幅較長,故將目錄整理如下 什么是Map 維基百科的定義 In computer science, an ...
    機器鈴砍菜刀s閱讀 232評論 0 0
  • map 的設(shè)計也被稱為 “The dictionary problem”,它的任務(wù)是設(shè)計一種數(shù)據(jù)結(jié)構(gòu)用來維護一個集...
    夜空中乄最亮的星閱讀 1,794評論 0 0
  • 映射是一個集合,可以使用類似處理數(shù)組和切片的方式迭代映射中的元素。但映射是無序的集合,意味著沒有辦法預測鍵值對被返...
    stevenyeahnet閱讀 800評論 0 0
  • 1. 底層原理 hmap Go中的map是一個指針,占用8個字節(jié),指向底層的hmap結(jié)構(gòu)體(hash表),在源碼包...
    husky_1閱讀 1,646評論 2 3
  • 本文目錄如下,閱讀本文后,將一網(wǎng)打盡下面Golang Map相關(guān)面試題 面試題 map的底層實現(xiàn)原理 為什么遍歷m...
    Go程序員閱讀 1,316評論 0 3

友情鏈接更多精彩內(nèi)容