路由表初始化
在 devp2p 中路由表被存儲在 leveldb 中,啟動時會加載到 Table 結(jié)構(gòu)體中
- 代碼位置
p2p/discover/table.go
很容易可以找到調(diào)用
p2p/Server.Start方法的地方,假設(shè)啟動前已經(jīng)設(shè)置好bootnode并且沒有啟用nodiscover開關(guān),那么會按照這個時序圖標(biāo)注的關(guān)鍵路徑一直調(diào)用到Taboe.loop(),整個填表的過程都在loop()函數(shù)中,具體初始化行為在 Table.doRefres 函數(shù)進(jìn)行初始化,其關(guān)鍵調(diào)用路徑如下:

Table.doRefresh 函數(shù)
看個大概意思就可以,以太坊對此更新比較頻繁,大體意思不變,不用每行都讀,很可能下一個版本就對不上了
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// 這里 seednodes 當(dāng)?shù)谝淮螁訒r會是 bootnodes,因為 db 里沒有歷史記錄
tab.loadSeedNodes()
// 這個 lookup 如果不成功,就會阻塞在這里,除非 bootnode 死掉了,否則一定成功
// 查詢自己的目的是在 tab 為空時把 bootnode 填進(jìn)去
tab.lookup(tab.self.ID, false)
// 隨機(jī)的再去 Query 幾個節(jié)點,這是 KAD DHT 的經(jīng)典做法
for i := 0; i < 3; i++ {
var target NodeID
crand.Read(target[:])
tab.lookup(target, false)
}
}
K桶 與 邏輯距離
計算節(jié)點邏輯距離來定位桶號,這幾乎是 KAD DHT 唯一的精髓所在了,看看以太坊是怎么做的,以太坊只分了 17 個桶,每個桶有 16 個位置,也就是說以太坊的 DHT 中就只有17 * 16 = 272 個節(jié)點信息
- 代碼位置 :
p2p/enode/node.go
// LogDist returns the logarithmic distance between a and b, log2(a ^ b).
func LogDist(a, b ID) int {
lz := 0
for i := range a {
x := a[i] ^ b[i]
if x == 0 {
lz += 8
} else {
lz += bits.LeadingZeros8(x)
break
}
}
return len(a)*8 - lz
}
這里的
x = a[i] ^ b[i], 為什么x==0時計數(shù)器lz要加8呢?這是因為a[i]和b[i]的單位都是byte,1byte == 8bit,所以在這里lz是一個bit計數(shù)器,用異或計算邏輯距離就是計算前導(dǎo)0的個數(shù),單位是bit,通常{byte | 0 <= byte <= 255 }所以當(dāng)x != 0時,又用bits.LeadingZeros8(x)取了x的前導(dǎo)0個數(shù)并累加,例如當(dāng)x == 17時byte(17) == bit[0,0,0,1,0,0,0,1],前導(dǎo)0為3所以此時else會執(zhí)行lz += 3
最后的len(a)*8 - lz是把一樣的bit總數(shù)減掉,剩下的就是不同的bit數(shù)量,演算一下結(jié)果約等于log2(a^b)
- 代碼位置 :
p2p/discover/table.go
// bucket returns the bucket for the given node ID hash.
func (tab *Table) bucket(id enode.ID) *bucket {
d := enode.LogDist(tab.self().ID(), id)
if d <= bucketMinDistance {
return tab.buckets[0]
}
return tab.buckets[d-bucketMinDistance-1]
}
唯一用到了
LogDist函數(shù)的地方是Table.bucket函數(shù),在這里ID是32byte桶最小距離bucketMinDistance == 239,那么d <= 239則返回buckets[0]就代表著只要有從左到右16 bit以上相同的就放到第一個桶里,因為(256-16)-239-1 = 0,其余的最大也就是32 * 8 = 256個不同的bit,256 - 239 - 1 = 16正好可以放到最后一個桶里,桶的尺寸是17即[0-16],
這兩個函數(shù)十分重要,在我們關(guān)注的這部分代碼中,在 doRefresh 的 tab.loadSeedNodes() 中被使用,這里如果 db 中的數(shù)據(jù)非空,最多會拿出 30 個 種子節(jié)點,再加上全部的 bootnodes 節(jié)點,就組成了本次的種子節(jié)點集合
lookup
這里就不講太具體的代碼了,因為 v1.8.x 和 v1.9.x 甚至連接口都變了,不過大體邏輯還是差不太多的,挑一些關(guān)鍵代碼和思路進(jìn)行簡單介紹
DistCmp 邏輯距離比較
這是其中一個關(guān)鍵方法,用來判斷兩個 ID 在桶里的順序
func DistCmp(target, a, b ID) int {
for i := range target {
da := a[i] ^ target[i]
db := b[i] ^ target[i]
if da > db {
return 1
} else if da < db {
return -1
}
}
return 0
}
這里邏輯比較簡單,
target是目標(biāo),a和b是競爭關(guān)系,按照每個byte與target進(jìn)行異或,最后誰的結(jié)果大,誰的距離就遠(yuǎn),結(jié)果小的距離就近一些,因為結(jié)果越大說明前導(dǎo)0越少,也就是邏輯距離越遠(yuǎn)了
closest
這個函數(shù)是在本地K桶中尋找距離目標(biāo)節(jié)點邏輯距離最近的一組節(jié)點信息
- 完整的函數(shù)簽名如下:
func (tab *Table) closest(target enode.ID, nresults int, checklive bool) *nodesByDistance
這里 diss 一下以太坊的處理邏輯,這個函數(shù)中會去遍歷整個桶,然后用
DistCmp去篩選出與target邏輯距離較近的一組節(jié)點,這個地方?jīng)]有libp2p處理的優(yōu)雅 ??
重要的函數(shù)是nodesByDistance.push,closest的結(jié)果就是在這個函數(shù)中產(chǎn)生的
func (h *nodesByDistance) push(n *node, maxElems int) {
ix := sort.Search(len(h.entries), func(i int) bool {
return enode.DistCmp(h.target, h.entries[i].ID(), n.ID()) > 0
})
if len(h.entries) < maxElems {
h.entries = append(h.entries, n)
}
if ix == len(h.entries) {
// farther away than all nodes we already have.
// if there was room for it, the node is now the last element.
} else {
copy(h.entries[ix+1:], h.entries[ix:])
h.entries[ix] = n
}
}
可以看到這里將
closest中傳遞的target放入了nodesByDistance對象中,并作為目標(biāo)傳遞給DisCmp函數(shù)來比較已經(jīng)生成的h.entries集合中有沒有比傳入的n離target更遠(yuǎn)的節(jié)點,如果找到了就用n去替換這個節(jié)點,最開始的邏輯是無條件先填滿h.entries集合,然后再去逐個替換,也就是說只要桶中的數(shù)據(jù)超過16條那closest方法的返回值永遠(yuǎn)都是16條記錄
lookupWorker
func (t *UDPv4) lookup(targetKey encPubkey) []*node {
......
result = t.tab.closest(target, bucketSize, false)
......
go t.lookupWorker(n, targetKey, reply)
......
return result.entries
}
接下來由 3 個協(xié)程對
closest結(jié)果集中的前三個節(jié)點,也就是向本地與target邏輯距離最近的三個節(jié)點發(fā)送findnode請求并等待reply,再將返回的結(jié)果插入到target所在的桶中,至此即完成lookup
Table.loop 函數(shù)
可以看到 loop 一啟動就用一個協(xié)程去執(zhí)行了 doRefresh 函數(shù),然后進(jìn)入一個循環(huán)的 select{} 代碼塊,阻塞等待下一個任務(wù),其中 refresh 這個 Ticker 為 30 分鐘,所以每 30 分鐘還會周期性的執(zhí)行 doRefresh 函數(shù),并且查詢的目標(biāo)為隨機(jī),還有一個觸發(fā)條件就是主動產(chǎn)生 refreshReq 請求,這個請求會從 dialTask.Do 函數(shù)中調(diào)用 dailTask.resolve(srv) 中觸發(fā)
// loop schedules runs of doRefresh, doRevalidate and copyLiveNodes.
func (tab *Table) loop() {
var (
refresh = time.NewTicker(refreshInterval)
......
// Start initial refresh.
go tab.doRefresh(refreshDone)
......
for {
select {
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
......
// 這個定時任務(wù)是 10秒 執(zhí)行一次,用來判定節(jié)點狀態(tài)
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
......
}
......
}
Table.doRevalidate
這個函數(shù)很重要,因為節(jié)點數(shù)據(jù)進(jìn)桶之前并沒有進(jìn)行過狀態(tài)交驗
discoverTask
在啟動節(jié)點后,每一個連接任務(wù)都會判斷如果 peers 少于 25(默認(rèn)25),就要增加一個 discoverTask ,用來執(zhí)行 srv.ntab.LookupRandom() ,目的是隨機(jī) findnode 以便填充路由表
強(qiáng)行總結(jié)
相比于優(yōu)雅的 libp2p 以太坊的 DHT 實在是簡陋,甚至有點丑陋,無心再往下看了。