Map是一種常用的kv數(shù)據(jù)結(jié)構(gòu),程序設(shè)計中經(jīng)常使用,且作為一種最基礎(chǔ)的數(shù)據(jù)結(jié)構(gòu),很多編程語言本身提供的api都會有實現(xiàn),Go也不例外,今天我們將從一下三個方面為大家分析Go中的Map。
- 什么是Map
- go中Map的實現(xiàn)原理
- go中Map的擴容機制
- go中map的常見問題
一、什么是Map
map 是由 key-value 對組成的;key 只會出現(xiàn)一次。
和 map 相關(guān)的操作主要是:
- 增加一個 k-v 對 —— Add or insert;
- 刪除一個 k-v 對 —— Remove or delete;
- 修改某個 k 對應(yīng)的 v —— Reassign;
- 查詢某個 k 對應(yīng)的 v —— Lookup;
Go map 的形式就是鍵值對,給定一個鍵,能盡快的找到對應(yīng)的值。
- 任何可比較的類型都可以是鍵——所有簡單的標量類型(布爾、整數(shù)、浮點、復數(shù)、字符串)、指針、通道、數(shù)組、接口。
- 不可比較的類型——切片、映射、函數(shù)。
因此,映射鍵和值應(yīng)存儲在為映射分配的內(nèi)存中。這個過程我們使用的方法叫做哈希算法,哈希算法一般包括兩步,偽代碼如下:
- hash = hashfunc(key)
- index = hash % array_size

hash沖突
如上圖所示,數(shù)組一個下標處只能存儲一個元素,也就是說一個數(shù)組下標只能存儲一對key,value, hashkey(xiaoming)=4占用了下標0的位置,假設(shè)我們遇到另一個key,hashkey(xiaowang)也是4,這就是hash沖突(不同的key經(jīng)過hash之后得到的值一樣),那么key=xiaowang的怎么存儲?
hash沖突的常見解決方法有兩種:
-
開放定址法:其中常見的又有線性探測法,線性補償探測法,隨機探測法。這里我們主要說一下線性探測法。
線性探測法,字面意思就是按照順序來,從沖突的下標處開始往后探測,到達數(shù)組末尾時,從數(shù)組開始處探測,直到找到一個空位置存儲這個key,當數(shù)組都找不到的情況下會擴容(事實上當數(shù)組容量快滿的時候就會擴容了);查找某一個key的時候,找到key對應(yīng)的下標,比較key是否相等,如果相等直接取出來,否則按照順尋探測直到碰到一個空位置,說明key不存在。
如下圖:首先存儲key=xiaoming在下標0處,當存儲key=xiaowang時,hash沖突了,按照線性探測,存儲在下標1處,(紅色的線是沖突或者下標已經(jīng)被占用了) 再者key=xiaozhao存儲在下標4處,當存儲key=xiaoliu是,hash沖突了,按照線性探測,從頭開始,存儲在下標2處 (黃色的是沖突或者下標已經(jīng)被占用了)
-
鏈表法:當key的hash沖突時,我們在沖突位置的元素上形成一個鏈表,通過指針互連接,當查找時,發(fā)現(xiàn)key沖突,順著鏈表一直往下找,直到鏈表的尾節(jié)點,找不到則返回空,如下圖:
開放定址(線性探測)和拉鏈的優(yōu)缺點:
- 由上面可以看出拉鏈法比線性探測處理簡單
- 線性探測查找比拉鏈法會更消耗時間
- 線性探測會更加容易導致擴容,而拉鏈不會
- 拉鏈存儲了指針,所以空間上會比線性探測占用多一點
- 拉鏈是動態(tài)申請存儲空間的,所以更適合鏈長不確定的
二、go中Map的實現(xiàn)原理
map的源碼位于 src/runtime/map.go 中 筆者go的版本是1.17在go中,map同樣也是數(shù)組存儲的的,每個數(shù)組下標處存儲的是一個bucket,這個bucket的類型見下面代碼,每個bucket中可以存儲8個kv鍵值對,當每個bucket存儲的kv對到達8個之后,會通過overflow指針指向一個新的bucket,從而形成一個鏈表。
首先我們來看一下map最重要的兩個結(jié)構(gòu):
hmap:
// A header for a Go map.
type hmap struct {
// Note: the format of the hmap is also encoded in cmd/compile/internal/reflectdata/reflect.go.
// Make sure this stays in sync with the compiler's definition.
count int // 元素個數(shù),調(diào)用 len(map) 時,直接返回此值
flags uint8 //代表當前 map 的狀態(tài)(是否處于正在寫入的狀態(tài)等)
B uint8 // buckets 的對數(shù) log_2
noverflow uint16 // 為 map 中溢出桶的數(shù)量。當溢出的桶太多時,map 會進行 same-size map growth,其實質(zhì)是避免桶過大導致內(nèi)存泄露
hash0 uint32 // 代表生成 hash 的隨機數(shù)種子
buckets unsafe.Pointer // 指向 buckets 數(shù)組,大小為 2^B,如果元素個數(shù)為0,就為 nil
oldbuckets unsafe.Pointer // 是在 map 擴容時存儲舊桶的,當所有舊桶中的數(shù)據(jù)都已經(jīng)轉(zhuǎn)移到了新桶中時,則清空
nevacuate uintptr // 在擴容時使用,用于標記當前舊桶中小于 nevacuate 的數(shù)據(jù)都已經(jīng)轉(zhuǎn)移到了新桶中
extra *mapextra // 存儲 map 中的溢出桶
}
Go 中桶 buckets 是 bmap 結(jié)構(gòu)。在運行時只列出了首個字段,即一個固定長度為 8 的數(shù)組。此字段順序存儲 key 的哈希值的前 8 位。
bmap(bucket桶):
type bmap struct {
// tophash generally contains the top byte of the hash value
// for each key in this bucket. If tophash[0] < minTopHash,
// tophash[0] is a bucket evacuation state instead.
tophash [bucketCnt]uint8
// Followed by bucketCnt keys and then bucketCnt elems.
// NOTE: packing all the keys together and then all the elems together makes the
// code a bit more complicated than alternating key/elem/key/elem/... but it allows
// us to eliminate padding which would be needed for, e.g., map[int64]int8.
// Followed by an overflow pointer.
}
tophash 通常包含此 buckets 中每個鍵的哈希值的最高字節(jié)。 如果 tophash[0] < minTopHash,則 tophash[0] 是一個桶疏散狀態(tài)。
map 在編譯時即確定了 map 中 key、value 及桶的大小,因此在運行時僅僅通過指針操作就可以找到特定位置的元素。編譯期動態(tài)地創(chuàng)建一個新的結(jié)構(gòu):
type bmap struct {
topbits [8]uint8
keys [8]keytype
values [8]valuetype
pad uintptr
overflow uintptr
}
bmap 就是我們常說的“桶”,桶里面會最多裝 8 個 key,這些 key 之所以會落入同一個桶,是因為它們經(jīng)過哈希計算后,哈希結(jié)果是“一類”的。在桶內(nèi),又會根據(jù) key 計算出來的 hash 值的高 8 位來決定 key 到底落入桶內(nèi)的哪個位置(一個桶內(nèi)最多有 8 個位置)。桶在存儲的 tophash 字段后,會存儲 key 數(shù)組和 value 數(shù)組。
hmap和buckets關(guān)系
hmap:

bucket:

由此看出hmap和bucket的關(guān)系是這樣的:

key定位過程
key 經(jīng)過哈希計算后得到哈希值,共 64 個 bit 位(64位機),計算它到底要落在哪個桶時,只會用到最后 B 個 bit 位。還記得前面提到過的 B 嗎?如果 B = 5,那么桶的數(shù)量,也就是 buckets 數(shù)組的長度是 2^5 = 32。
例如,現(xiàn)在有一個 key 經(jīng)過哈希函數(shù)計算后,得到的哈希結(jié)果是:
10010111 | 000011110110110010001111001010100010010110010101010 │ 01010
用最后的 5 個 bit 位: 01010,值為 10,也就是 10 號桶。這個操作本質(zhì)上就是取余操作,但是取余開銷太大,所以代碼實現(xiàn)上用的位操作代替。
再用哈希值的高 8 位,找到此 key 在 bucket 中的位置,這是在尋找已有的 key。最開始桶內(nèi)還沒有 key,新加入的 key 會找到第一個空位,然后放入。
buckets 編號就是桶編號,當兩個不同的 key 落在同一個桶中,也就是發(fā)生了哈希沖突。沖突的解決手段是用鏈表法:在 bucket 中,從前往后找到第一個空位。這樣,在查找某個 key 時,先找到對應(yīng)的桶,再去遍歷 bucket 中的 key。

上圖中,假定 B = 5,所以 bucket 總數(shù)就是 2^5 = 32。首先計算出待查找 key 的哈希,使用低 5 位 00110,找到對應(yīng)的 6 號 bucket,使用高 8 位 10010111,對應(yīng)十進制 151,在 6 號 bucket 中尋找 tophash 值(HOB hash)為 151 的 key,找到了 2 號槽位,這樣整個查找過程就結(jié)束了。
如果在 bucket 中沒找到,并且 overflow 不為空,還要繼續(xù)去 overflow bucket 中尋找,直到找到或是所有的 key 槽位都找遍了,包括所有的 overflow bucket。
查找某個 key 的底層函數(shù)是 mapacess 系列函數(shù),函數(shù)的作用類似,我們先以mapacess1函數(shù)為例:
func mapaccess1(t *maptype, h *hmap, key unsafe.Pointer) unsafe.Pointer {
// ......
// 如果 *hmap大小為0,或地址為nil,返回零值
if h == nil || h.count == 0 {
if t.hashMightPanic() {
t.hasher(key, 0) // see issue 23734
}
return unsafe.Pointer(&zeroVal[0])
}
// 讀寫沖突
if h.flags&hashWriting != 0 {
throw("concurrent map read and map write")
}
// 計算哈希值,并且加入 hash0 引入隨機性
hash := t.hasher(key, uintptr(h.hash0))
// 比如 B=5,那 m 就是31,二進制是全 1
// 求 bucket num 時,將 hash 與 m 相與,
// 達到 bucket num 由 hash 的低 8 位決定的效果
m := bucketMask(h.B)
// b 就是 bucket 的地址
b := (*bmap)(add(h.buckets, (hash&m)*uintptr(t.bucketsize)))
// oldbuckets 不為 nil,說明發(fā)生了擴容
if c := h.oldbuckets; c != nil {
// 如果不是同 size 擴容(看后面擴容的內(nèi)容)
// 對應(yīng)條件 1 的解決方案
if !h.sameSizeGrow() {
// 新 bucket 數(shù)量是老的 2 倍
m >>= 1
}
// 求出 key 在老的 map 中的 bucket 位置
oldb := (*bmap)(add(c, (hash&m)*uintptr(t.bucketsize)))
// 如果 oldb 沒有搬遷到新的 bucket
// 那就在老的 bucket 中尋找
if !evacuated(oldb) {
b = oldb
}
}
// 計算出高 8 位的 hash
// 相當于右移 56 位,只取高8位
top := tophash(hash)
bucketloop:
// 遍歷 bucket 的 8 個位置
// bucket 找完(還沒找到),繼續(xù)到 overflow bucket 里找
for ; b != nil; b = b.overflow(t) {
for i := uintptr(0); i < bucketCnt; i++ {
// tophash 不匹配,繼續(xù)遍歷
if b.tophash[i] != top {
if b.tophash[i] == emptyRest {
break bucketloop
}
continue
}
// tophash 匹配,定位到 key 的位置
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
// key 是指針
if t.indirectkey() {
// 解引用
k = *((*unsafe.Pointer)(k))
}
// 如果 key 相等
if t.key.equal(key, k) {
// 定位到 value 的位置
e := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.elemsize))
// value 是指針
if t.indirectelem() {
// 解引用
e = *((*unsafe.Pointer)(e))
}
return e
}
}
}
// 返回零值
return unsafe.Pointer(&zeroVal[0])
}
func tophash(hash uintptr) uint8 {
top := uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
// 增加一個 minTopHash
top += minTopHash
}
return top
}
函數(shù)返回 h[key] 的指針,如果 h 中沒有此 key,那就會返回一個 key 相應(yīng)類型的零值,不會返回 nil。
源碼中我們可以看到key定位公式為:
k := add(unsafe.Pointer(b), dataOffset+i*uintptr(t.keysize))
源碼中我們可以看到value定位公式為:
v := add(unsafe.Pointer(b), dataOffset+bucketCnt*uintptr(t.keysize)+i*uintptr(t.valuesize))
b 是 bmap 的地址,這里 bmap 還是源碼里定義的結(jié)構(gòu)體,只包含一個 tophash 數(shù)組,經(jīng)編譯器擴充之后的結(jié)構(gòu)體才包含 key,value,overflow 這些字段。 dataOffset 是 key 相對于 bmap 起始地址的偏移:
dataOffset = unsafe.Offsetof(struct {
b bmap
v int64
}{}.v)
因此 bucket 里 key 的起始地址就是 unsafe.Pointer(b)+dataOffset。第 i 個 key 的地址就要在此基礎(chǔ)上跨過 i 個 key 的大小;而我們又知道,value 的地址是在所有 key 之后,因此第 i 個 value 的地址還需要加上所有 key 的偏移。理解了這些,上面 key 和 value 的定位公式就很好理解了。
-
minTopHash
當一個 cell 的 tophash 值小于 minTopHash 時,標志這個 cell 的遷移狀態(tài)。
因為這個狀態(tài)值是放在 tophash 數(shù)組里,為了和正常的哈希值區(qū)分開,會給 key 計算出來的哈希值一個增量:minTopHash。這樣就能區(qū)分正常的 top hash 值和表示狀態(tài)的哈希值。
下面的這幾種狀態(tài)就表征了 bucket 的情況:
// 空的 cell,也是初始時 bucket 的狀態(tài)
empty = 0
// 空的 cell,表示 cell 已經(jīng)被遷移到新的 bucket
evacuatedEmpty = 1
// key,value 已經(jīng)搬遷完畢,但是 key 都在新 bucket 前半部分,
// 后面擴容部分會再講到。
evacuatedX = 2
// 同上,key 在后半部分
evacuatedY = 3
// tophash 的最小正常值
minTopHash = 4
源碼里判斷這個 bucket 是否已經(jīng)搬遷完畢,用到的函數(shù):
func evacuated(b *bmap) bool {
h := b.tophash[0]
return h > emptyOne && h < minTopHash
}
只取了 tophash 數(shù)組的第一個值,判斷它是否在 0-4 之間。對比上面的常量,當 top hash 是 evacuatedEmpty、evacuatedX、evacuatedY 這三個值之一,說明此 bucket 中的 key 全部被搬遷到了新 bucket。
三、go中Map的擴容機制
使用哈希表的目的就是要快速查找到目標 key,然而,隨著向 map 中添加的 key 越來越多,key 發(fā)生碰撞的概率也越來越大。bucket 中的 8 個 cell 會被逐漸塞滿,查找、插入、刪除 key 的效率也會越來越低。最理想的情況是一個 bucket 只裝一個 key,這樣,就能達到 O(1) 的效率,但這樣空間消耗太大,用空間換時間的代價太高。
Go 語言采用一個 bucket 里裝載 8 個 key,定位到某個 bucket 后,還需要再定位到具體的 key,這實際上又用了時間換空間。
當然,這樣做,要有一個度,不然所有的 key 都落在了同一個 bucket 里,直接退化成了鏈表,各種操作的效率直接降為 O(n),是不行的。
因此,需要有一個指標來衡量前面描述的情況,這就是裝載因子。Go 源碼里這樣定義 裝載因子:
loadFactor := count / (2^B)
count 就是 map 的元素個數(shù),2^B 表示 bucket 數(shù)量。
再來說觸發(fā) map 擴容的時機:在向 map 插入新 key 的時候,會進行條件檢測,符合下面這 2 個條件,就會觸發(fā)擴容:
- 裝載因子超過閾值,源碼里定義的閾值是 6.5(元素數(shù)量>>bucket數(shù)量)。
- 頻繁增刪map,導致未被使用的overflow 的 bucket 數(shù)量過多:當 B 小于 15,也就是 bucket 總數(shù) 2^B 小于 2^15 時,如果 overflow 的 bucket 數(shù)量超過 2^B(未用于存儲的bucket數(shù)量過多),就會觸發(fā)擴容;當 B >= 15,也就是 bucket 總數(shù) 2^B 大于等于 2^15,如果 overflow 的 bucket 數(shù)量超過 2^15,就會觸發(fā)擴容。
通過匯編語言可以找到賦值操作對應(yīng)源碼中的函數(shù)是 mapassign,對應(yīng)擴容條件的源碼如下:
// src/runtime/hashmap.go/mapassign
// 觸發(fā)擴容時機
if !h.growing() && (overLoadFactor(int64(h.count), h.B) || tooManyOverflowBuckets(h.noverflow, h.B)) {
hashGrow(t, h)
}
// 裝載因子超過 6.5
func overLoadFactor(count int64, B uint8) bool {
return count >= bucketCnt && float32(count) >= loadFactor*float32((uint64(1)<<B))
}
// overflow buckets 太多
func tooManyOverflowBuckets(noverflow uint16, B uint8) bool {
if B < 16 {
return noverflow >= uint16(1)<<B
}
return noverflow >= 1<<15
}
解釋一下:
第 1 點:我們知道,每個 bucket 有 8 個空位,在沒有溢出,且所有的桶都裝滿了的情況下,裝載因子算出來的結(jié)果是 8。因此當裝載因子超過 6.5 時,表明很多 bucket 都快要裝滿了,查找效率和插入效率都變低了。在這個時候進行擴容是有必要的。
第 2 點:是對第 1 點的補充。就是說在裝載因子比較小的情況下,這時候 map 的查找和插入效率也很低,而第 1 點識別不出來這種情況。表面現(xiàn)象就是計算裝載因子的分子比較小,即 map 里元素總數(shù)少,但是 bucket 數(shù)量多(真實分配的 bucket 數(shù)量多,包括大量的 overflow bucket)。
不難想像造成這種情況的原因:不停地插入、刪除元素。先插入很多元素,導致創(chuàng)建了很多 bucket,但是裝載因子達不到第 1 點的臨界值,未觸發(fā)擴容來緩解這種情況。之后,刪除元素降低元素總數(shù)量,再插入很多元素,導致創(chuàng)建很多的 overflow bucket,但就是不會觸犯第 1 點的規(guī)定,你能拿我怎么辦?overflow bucket 數(shù)量太多,導致 key 會很分散,查找插入效率低得嚇人,因此出臺第 2 點規(guī)定。這就像是一座空城,房子很多,但是住戶很少,都分散了,找起人來很困難。
對于命中條件 1,2 的限制,都會發(fā)生擴容。但是擴容的策略并不相同,畢竟兩種條件應(yīng)對的場景不同。
對于條件 1,元素太多,而 bucket 數(shù)量太少,很簡單:將 B 加 1,bucket 最大數(shù)量(2^B)直接變成原來 bucket 數(shù)量的 2 倍。于是,就有新老 bucket 了。注意,這時候元素都在老 bucket 里,還沒遷移到新的 bucket 來。而且,新 bucket 只是最大數(shù)量變?yōu)樵瓉碜畲髷?shù)量(2^B)的 2 倍(2^B * 2)。
對于條件 2,其實元素沒那么多,但是 overflow bucket 數(shù)特別多,說明很多 bucket 都沒裝滿。解決辦法就是開辟一個新 bucket 空間,將老 bucket 中的元素移動到新 bucket,使得同一個 bucket 中的 key 排列地更緊密。這樣,原來,在 overflow bucket 中的 key 可以移動到 bucket 中來。結(jié)果是節(jié)省空間,提高 bucket 利用率,map 的查找和插入效率自然就會提升。
對于條件 2 的解決方案,曹大的博客里還提出了一個極端的情況:如果插入 map 的 key 哈希都一樣,就會落到同一個 bucket 里,超過 8 個就會產(chǎn)生 overflow bucket,結(jié)果也會造成 overflow bucket 數(shù)過多。移動元素其實解決不了問題,因為這時整個哈希表已經(jīng)退化成了一個鏈表,操作效率變成了 O(n)。
再來看一下擴容具體是怎么做的。由于 map 擴容需要將原有的 key/value 重新搬遷到新的內(nèi)存地址,如果有大量的 key/value 需要搬遷,會非常影響性能。因此 Go map 的擴容采取了一種稱為“漸進式”地方式,原有的 key 并不會一次性搬遷完畢,每次最多只會搬遷 2 個 bucket。
上面說的 hashGrow() 函數(shù)實際上并沒有真正地“搬遷”,它只是分配好了新的 buckets,并將老的 buckets 掛到了 oldbuckets 字段上。真正搬遷 buckets 的動作在 growWork() 函數(shù)中,而調(diào)用 growWork() 函數(shù)的動作是在 mapassign 和 mapdelete 函數(shù)中。也就是插入或修改、刪除 key 的時候,都會嘗試進行搬遷 buckets 的工作。先檢查 oldbuckets 是否搬遷完畢,具體來說就是檢查 oldbuckets 是否為 nil。
我們先看 hashGrow() 函數(shù)所做的工作,再來看具體的搬遷 buckets 是如何進行的
func hashGrow(t *maptype, h *hmap) {
// B+1 相當于是原來 2 倍的空間
bigger := uint8(1)
// 對應(yīng)條件 2
if !overLoadFactor(int64(h.count), h.B) {
// 進行等量的內(nèi)存擴容,所以 B 不變
bigger = 0
h.flags |= sameSizeGrow
}
// 將老 buckets 掛到 buckets 上
oldbuckets := h.buckets
// 申請新的 buckets 空間
newbuckets, nextOverflow := makeBucketArray(t, h.B+bigger)
// x = 01010011
// y = 01010100
// z = x &^ y = 00000011
// 如果 y bit 位為 1,那么結(jié)果 z 對應(yīng) bit 位就為 0,否則 z 對應(yīng) bit 位就和 x 對應(yīng) bit 位的值相同。
//
// 先把 h.flags 中 iterator 和 oldIterator 對應(yīng)位清 0,然后如果發(fā)現(xiàn) iterator 位為 1,
// 那就把它轉(zhuǎn)接到 oldIterator 位,使得 oldIterator 標志位變成 1。
// 潛臺詞就是:buckets 現(xiàn)在掛到了 oldBuckets 名下了,對應(yīng)的標志位也轉(zhuǎn)接過去吧。
flags := h.flags &^ (iterator | oldIterator)
if h.flags&iterator != 0 {
flags |= oldIterator
}
// 提交 grow 的動作
h.B += bigger
h.flags = flags
h.oldbuckets = oldbuckets
h.buckets = newbuckets
// 搬遷進度為 0
h.nevacuate = 0
// overflow buckets 數(shù)為 0
h.noverflow = 0
// ……
}
主要是申請到了新的 buckets 空間,把相關(guān)的標志位都進行了處理:例如標志 nevacuate 被置為 0, 表示當前搬遷進度為 0。
再來看看真正執(zhí)行搬遷工作的 growWork() 函數(shù)。
func growWork(t *maptype, h *hmap, bucket uintptr) {
// 確認搬遷老的 bucket 對應(yīng)正在使用的 bucket
// 為了確認搬遷的 bucket 是我們正在使用的 bucket。
// oldbucketmask() 函數(shù)返回擴容前的 map 的 bucketmask
evacuate(t, h, bucket&h.oldbucketmask())
// 再搬遷一個 bucket,以加快搬遷進程
if h.growing() {
evacuate(t, h, h.nevacuate)
}
}
h.growing() 函數(shù)非常簡單:
func (h *hmap) growing() bool {
return h.oldbuckets != nil
}
如果 oldbuckets 不為空,說明還沒有搬遷完畢,還得繼續(xù)搬。
所謂的 bucketmask,作用就是將 key 計算出來的哈希值與 bucketmask 相與,得到的結(jié)果就是 key 應(yīng)該落入的桶。比如 B = 5,那么 bucketmask 的低 5 位是 11111,其余位是 0,hash 值與其相與的意思是,只有 hash 值的低 5 位決策 key 到底落入哪個 bucket。
接下來,我們集中所有的精力在搬遷的關(guān)鍵函數(shù) evacuate。
func evacuate(t *maptype, h *hmap, oldbucket uintptr) {
// 定位老的 bucket 地址
b := (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 結(jié)果是 2^B,如 B = 5,結(jié)果為32
newbit := h.noldbuckets()
// key 的哈希函數(shù)
alg := t.key.alg
// 如果 b 沒有被搬遷過
if !evacuated(b) {
var (
// 表示bucket 移動的目標地址
x, y *bmap
// 指向 x,y 中的 key/val
xi, yi int
// 指向 x,y 中的 key
xk, yk unsafe.Pointer
// 指向 x,y 中的 value
xv, yv unsafe.Pointer
)
// 默認是等 size 擴容,前后 bucket 序號不變
// 使用 x 來進行搬遷
x = (*bmap)(add(h.buckets, oldbucket*uintptr(t.bucketsize)))
xi = 0
xk = add(unsafe.Pointer(x), dataOffset)
xv = add(xk, bucketCnt*uintptr(t.keysize))、
// 如果不是等 size 擴容,前后 bucket 序號有變
// 使用 y 來進行搬遷
if !h.sameSizeGrow() {
// y 代表的 bucket 序號增加了 2^B
y = (*bmap)(add(h.buckets, (oldbucket+newbit)*uintptr(t.bucketsize)))
yi = 0
yk = add(unsafe.Pointer(y), dataOffset)
yv = add(yk, bucketCnt*uintptr(t.keysize))
}
// 遍歷所有的 bucket,包括 overflow buckets
// b 是老的 bucket 地址
for ; b != nil; b = b.overflow(t) {
k := add(unsafe.Pointer(b), dataOffset)
v := add(k, bucketCnt*uintptr(t.keysize))
// 遍歷 bucket 中的所有 cell
for i := 0; i < bucketCnt; i, k, v = i+1, add(k, uintptr(t.keysize)), add(v, uintptr(t.valuesize)) {
// 當前 cell 的 top hash 值
top := b.tophash[i]
// 如果 cell 為空,即沒有 key
if top == empty {
// 那就標志它被"搬遷"過
b.tophash[i] = evacuatedEmpty
// 繼續(xù)下個 cell
continue
}
// 正常不會出現(xiàn)這種情況
// 未被搬遷的 cell 只可能是 empty 或是
// 正常的 top hash(大于 minTopHash)
if top < minTopHash {
throw("bad map state")
}
k2 := k
// 如果 key 是指針,則解引用
if t.indirectkey {
k2 = *((*unsafe.Pointer)(k2))
}
// 默認使用 X,等量擴容
useX := true
// 如果不是等量擴容
if !h.sameSizeGrow() {
// 計算 hash 值,和 key 第一次寫入時一樣
hash := alg.hash(k2, uintptr(h.hash0))
// 如果有協(xié)程正在遍歷 map
if h.flags&iterator != 0 {
// 如果出現(xiàn) 相同的 key 值,算出來的 hash 值不同
if !t.reflexivekey && !alg.equal(k2, k2) {
// 只有在 float 變量的 NaN() 情況下會出現(xiàn)
if top&1 != 0 {
// 第 B 位置 1
hash |= newbit
} else {
// 第 B 位置 0
hash &^= newbit
}
// 取高 8 位作為 top hash 值
top = uint8(hash >> (sys.PtrSize*8 - 8))
if top < minTopHash {
top += minTopHash
}
}
}
// 取決于新哈希值的 oldB+1 位是 0 還是 1
// 詳細看后面的文章
useX = hash&newbit == 0
}
// 如果 key 搬到 X 部分
if useX {
// 標志老的 cell 的 top hash 值,表示搬移到 X 部分
b.tophash[i] = evacuatedX
// 如果 xi 等于 8,說明要溢出了
if xi == bucketCnt {
// 新建一個 bucket
newx := h.newoverflow(t, x)
x = newx
// xi 從 0 開始計數(shù)
xi = 0
// xk 表示 key 要移動到的位置
xk = add(unsafe.Pointer(x), dataOffset)
// xv 表示 value 要移動到的位置
xv = add(xk, bucketCnt*uintptr(t.keysize))
}
// 設(shè)置 top hash 值
x.tophash[xi] = top
// key 是指針
if t.indirectkey {
// 將原 key(是指針)復制到新位置
*(*unsafe.Pointer)(xk) = k2 // copy pointer
} else {
// 將原 key(是值)復制到新位置
typedmemmove(t.key, xk, k) // copy value
}
// value 是指針,操作同 key
if t.indirectvalue {
*(*unsafe.Pointer)(xv) = *(*unsafe.Pointer)(v)
} else {
typedmemmove(t.elem, xv, v)
}
// 定位到下一個 cell
xi++
xk = add(xk, uintptr(t.keysize))
xv = add(xv, uintptr(t.valuesize))
} else { // key 搬到 Y 部分,操作同 X 部分
// ……
// 省略了這部分,操作和 X 部分相同
}
}
}
// 如果沒有協(xié)程在使用老的 buckets,就把老 buckets 清除掉,幫助gc
if h.flags&oldIterator == 0 {
b = (*bmap)(add(h.oldbuckets, oldbucket*uintptr(t.bucketsize)))
// 只清除bucket 的 key,value 部分,保留 top hash 部分,指示搬遷狀態(tài)
if t.bucket.kind&kindNoPointers == 0 {
memclrHasPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
} else {
memclrNoHeapPointers(add(unsafe.Pointer(b), dataOffset), uintptr(t.bucketsize)-dataOffset)
}
}
}
// 更新搬遷進度
// 如果此次搬遷的 bucket 等于當前進度
if oldbucket == h.nevacuate {
// 進度加 1
h.nevacuate = oldbucket + 1
// Experiments suggest that 1024 is overkill by at least an order of magnitude.
// Put it in there as a safeguard anyway, to ensure O(1) behavior.
// 嘗試往后看 1024 個 bucket
stop := h.nevacuate + 1024
if stop > newbit {
stop = newbit
}
// 尋找沒有搬遷的 bucket
for h.nevacuate != stop && bucketEvacuated(t, h, h.nevacuate) {
h.nevacuate++
}
// 現(xiàn)在 h.nevacuate 之前的 bucket 都被搬遷完畢
// 所有的 buckets 搬遷完畢
if h.nevacuate == newbit {
// 清除老的 buckets
h.oldbuckets = nil
// 清除老的 overflow bucket
// 回憶一下:[0] 表示當前 overflow bucket
// [1] 表示 old overflow bucket
if h.extra != nil {
h.extra.overflow[1] = nil
}
// 清除正在擴容的標志位
h.flags &^= sameSizeGrow
}
}
}
搬遷的目的就是將老的 buckets 搬遷到新的 buckets。而通過前面的說明我們知道,應(yīng)對條件 1,新的 buckets 數(shù)量是之前的一倍,應(yīng)對條件 2,新的 buckets 數(shù)量和之前相等。
對于條件 2,從老的 buckets 搬遷到新的 buckets,由于 bucktes 數(shù)量不變,因此可以按序號來搬,比如原來在 0 號 bucktes,到新的地方后,仍然放在 0 號 buckets。
對于條件 1,就沒這么簡單了。要重新計算 key 的哈希,才能決定它到底落在哪個 bucket。例如,原來 B = 5,計算出 key 的哈希后,只用看它的低 5 位,就能決定它落在哪個 bucket。擴容后,B 變成了 6,因此需要多看一位,它的低 6 位決定 key 落在哪個 bucket。這稱為 rehash。

因此,某個 key 在搬遷前后 bucket 序號可能和原來相等,也可能是相比原來加上 2^B(原來的 B 值),取決于 hash 值 第 6 bit 位是 0 還是 1。
再明確一個問題:如果擴容后,B 增加了 1,意味著 buckets 總數(shù)是原來的 2 倍,原來 1 號的桶“裂變”到兩個桶。
例如,原始 B = 2,1號 bucket 中有 2 個 key 的哈希值低 3 位分別為:010,110。由于原來 B = 2,所以低 2 位 10 決定它們落在 2 號桶,現(xiàn)在 B 變成 3,所以 010、110 分別落入 2、6 號桶。

搬遷函數(shù)中的幾個關(guān)鍵點:
- 雙層循環(huán):外層:遍歷 bucket 和 overflow bucket, 內(nèi)層:遍歷 bucket 的所有 cell
- X, Y part,其實就是我們說的如果是擴容到原來的 2 倍,桶的數(shù)量是原來的 2 倍,前一半桶被稱為 X part,后一半桶被稱為 Y part。一個 bucket 中的 key 可能會分裂落到 2 個桶,一個位于 X part,一個位于 Y part(取決于擴容后,倒數(shù)第B位是 0 還是 1)。所以在搬遷一個 cell 之前,需要知道這個 cell 中的 key 是落到哪個 Part
有一種 key,每次對它計算 hash,得到的結(jié)果都不一樣。這個 key 就是 math.NaN() 的結(jié)果,它的含義是 not a number,類型是 float64。
當它作為 map 的 key,在搬遷的時候,會遇到一個問題:再次計算它的哈希值和它當初插入 map 時的計算出來的哈希值不一樣!這就導致了這個 key 是永遠不會被 Get 操作獲取的!當我使用 m[math.NaN()] 語句的時候,是查不出來結(jié)果的。這個 key 只有在遍歷整個 map 的時候,才有機會現(xiàn)身。所以,可以向一個 map 插入任意數(shù)量的 math.NaN() 作為 key。
當搬遷碰到 math.NaN() 的 key 時,只通過 tophash 的最低位決定分配到 X part 還是 Y part(如果擴容后是原來 buckets 數(shù)量的 2 倍)。如果 tophash 的最低位是 0 ,分配到 X part;如果是 1 ,則分配到 Y part。
確定了要搬遷到的目標 bucket 后,搬遷操作就比較好進行了。將源 key/value 值 copy 到目的地相應(yīng)的位置。
設(shè)置 key 在原始 buckets 的 tophash 為 evacuatedX 或是 evacuatedY,表示已經(jīng)搬遷到了新 map 的 x part 或是 y part。新 map 的 tophash 則正常取 key 哈希值的高 8 位。
擴容前,B = 2,共有 4 個 buckets,lowbits 表示 hash 值的低位。假設(shè)我們不關(guān)注其他 buckets 情況,專注在 2 號 bucket。并且假設(shè) overflow 太多,觸發(fā)了等量擴容(對應(yīng)于前面的條件 2)。

擴容完成后,overflow bucket 消失了,key 都集中到了一個 bucket,更為緊湊了,提高了查找的效率。

假設(shè)觸發(fā)了 2 倍的擴容,那么擴容完成后,老 buckets 中的 key 分裂到了 2 個 新的 bucket。一個在 x part,一個在 y 的 part。依據(jù)是 hash 的 lowbits。新 map 中 0-3 稱為 x part,4-7 稱為 y part。

注意,上面的兩張圖忽略了其他 buckets 的搬遷情況,表示所有的 bucket 都搬遷完畢后的情形。實際上,我們知道,搬遷是一個“漸進”的過程,并不會一下子就全部搬遷完畢。所以在搬遷過程中,oldbuckets 指針還會指向原來老的 []bmap,并且已經(jīng)搬遷完畢的 key 的 tophash 值會是一個狀態(tài)值,表示 key 的搬遷去向。
四、go中map的常見問題
1、Key為什么是無序的?
map 在擴容后,會發(fā)生 key 的搬遷,原來落在同一個 bucket 中的 key,搬遷后,有些 key 就要遠走高飛了(bucket 序號加上了 2^B)。而遍歷的過程,就是按順序遍歷 bucket,同時按順序遍歷 bucket 中的 key。搬遷后,key 的位置發(fā)生了重大的變化,有些 key 飛上高枝,有些 key 則原地不動。這樣,遍歷 map 的結(jié)果就不可能按原來的順序了。
當我們在遍歷 go 中的 map 時,并不是固定地從 0 號 bucket 開始遍歷,每次都是從一個隨機值序號的 bucket 開始遍歷,并且是從這個 bucket 的一個隨機序號的 cell 開始遍歷。這樣,即使你是一個寫死的 map,僅僅只是遍歷它,也不太可能會返回一個固定序列的 key/value 對了。
2、float類型是否可以作為map的key?
從語法上看,是可以的。Go 語言中只要是可比較的類型都可以作為 key。除開 slice,map,functions 這幾種類型,其他類型都是 OK 的。具體包括:布爾值、數(shù)字、字符串、指針、通道、接口類型、結(jié)構(gòu)體、只包含上述類型的數(shù)組。這些類型的共同特征是支持 == 和 != 操作符,k1 == k2 時,可認為 k1 和 k2 是同一個 key。如果是結(jié)構(gòu)體,只有 hash 后的值相等以及字面值相等,才被認為是相同的 key。很多字面值相等的,hash出來的值不一定相等,比如引用。
float 型可以作為 key,但是由于精度的問題,會導致一些詭異的問題,慎用之。
3、map可以遍歷的同時刪除嗎?
map 并不是一個線程安全的數(shù)據(jù)結(jié)構(gòu)。同時讀寫一個 map 是未定義的行為,如果被檢測到,會直接 panic。
上面說的是發(fā)生在多個協(xié)程同時讀寫同一個 map 的情況下。 如果在同一個協(xié)程內(nèi)邊遍歷邊刪除,并不會檢測到同時讀寫,理論上是可以這樣做的。但是,遍歷的結(jié)果就可能不會是相同的了,有可能結(jié)果遍歷結(jié)果集中包含了刪除的 key,也有可能不包含,這取決于刪除 key 的時間:是在遍歷到 key 所在的 bucket 時刻前或者后。
一般而言,這可以通過讀寫鎖來解決:sync.RWMutex。
讀之前調(diào)用 RLock() 函數(shù),讀完之后調(diào)用 RUnlock() 函數(shù)解鎖;寫之前調(diào)用 Lock() 函數(shù),寫完之后,調(diào)用 Unlock() 解鎖。
另外,sync.Map 是線程安全的 map,也可以使用。
4、可以對map元素取地址嗎?
無法對 map 的 key 或 value 進行取址,將無法通過編譯
如果通過其他 hack 的方式,例如 unsafe.Pointer 等獲取到了 key 或 value 的地址,也不能長期持有,因為一旦發(fā)生擴容,key 和 value 的位置就會改變,之前保存的地址也就失效了。
5、如何比較兩個map是否相等?
map 深度相等的條件:
- 都為 nil
- 非空、長度相等,指向同一個 map 實體對象
- 相應(yīng)的 key 指向的 value “深度”相等
直接將使用 map1 == map2 是錯誤的。這種寫法只能比較 map 是否為 nil。
因此只能是遍歷map 的每個元素,比較元素是否都是深度相等。
6、map是線程安全的嗎?
map 不是線程安全的。
在查找、賦值、遍歷、刪除的過程中都會檢測寫標志,一旦發(fā)現(xiàn)寫標志置位(等于1),則直接 panic。賦值和刪除函數(shù)在檢測完寫標志是復位之后,先將寫標志位置位,才會進行之后的操作。
檢測寫標志:
if h.flags&hashWriting == 0 {
throw("concurrent map writes")
}
設(shè)置寫標志:
h.flags |= hashWriting
References:
https://xie.infoq.cn/article/5607679316cdfa7284bfa652a
https://golang.design/go-questions/map/principal/
https://topgoer.com/go%E5%9F%BA%E7%A1%80/Map%E5%AE%9E%E7%8E%B0%E5%8E%9F%E7%90%86.html
https://cloud.tencent.com/developer/article/1468799
https://www.cnblogs.com/dawnlight/p/15552513.html

