Bootstrap 邏輯
入口代碼:go-libp2p-kad-dht/dht_bootstrap.go
入口方法:
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
最終這個(gè)方法會(huì)去執(zhí)行 runBootstrap 方法,真正的操作是從這里開(kāi)始的
- func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error
原理大概是,生成一個(gè)隨機(jī)的目標(biāo) randomID ,然后在網(wǎng)絡(luò)中 find 這個(gè) peer ,因?yàn)榭隙ㄊ钦也坏降?,所以每個(gè) peer 都會(huì)返回給你離這個(gè) randomID 最近的 peers ,也就是 CloserPeers 來(lái)填充你本地的 k 桶,這樣的動(dòng)作默認(rèn)是 5 分鐘執(zhí)行一次的,所以 k 桶很快就會(huì)被填滿。
- func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error)
這個(gè)方法會(huì)現(xiàn)在本地找:dht.FindLocal(id)
本地沒(méi)有就在路由表里找peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
這個(gè)方法的邏輯是,AlphaValue = 3 ,表示最低要確保發(fā)送到 3 個(gè) peers 上
目標(biāo) id 會(huì)被 xor 出一個(gè)桶,這個(gè)桶如果是空的,或者桶里不足 3 個(gè) peer
就會(huì)從臨近的桶里返回 peer ,如果桶里 peer 超過(guò) AlphaValue 則按桶里的總數(shù)來(lái)
奇怪的是最后又只節(jié)選了 3 個(gè)距離最近的 peer ,為什么呢?
func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
cpl := commonPrefixLen(id, rt.local)
rt.tabLock.RLock()
// Get bucket at cpl index or last bucket
var bucket *Bucket
if cpl >= len(rt.Buckets) {
cpl = len(rt.Buckets) - 1
}
bucket = rt.Buckets[cpl]
peerArr := make(peerSorterArr, 0, count)
//TODO 如果這里的數(shù)據(jù)比 peerArr 多,會(huì)重新 make peerArr
peerArr = copyPeersFromList(id, peerArr, bucket.list)
if len(peerArr) < count {
// In the case of an unusual split, one bucket may be short or empty.
// if this happens, search both surrounding buckets for nearby peers
if cpl > 0 {
plist := rt.Buckets[cpl-1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
if cpl < len(rt.Buckets)-1 {
plist := rt.Buckets[cpl+1].list
peerArr = copyPeersFromList(id, peerArr, plist)
}
}
rt.tabLock.RUnlock()
// Sort by distance to local peer
sort.Sort(peerArr)
// TODO 這里為什么又截?cái)嗔四兀恐蝗×?個(gè)離的最近的,什么目的?
if count < len(peerArr) {
peerArr = peerArr[:count]
}
out := make([]peer.ID, 0, len(peerArr))
for _, p := range peerArr {
out = append(out, p.p)
}
return out
}
在 query 的同時(shí)會(huì)刷新每個(gè)有響應(yīng)的 peer 的 ttl ,但是至多只向 3 個(gè) peer 發(fā)送 query 請(qǐng)求
請(qǐng)求的返回的 closer 貌似沒(méi)處理,為什么?
因?yàn)樾枰托膶ふ遥戳艘蝗K于在
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
中找到了答案,這個(gè)實(shí)在是太隱蔽了,對(duì)代碼的作者表示譴責(zé)
以下是 FindPeer 中的代碼片段,
parent := ctx
query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.SendingQuery,
ID: p,
})
pmes, err := dht.findPeerSingle(ctx, p, id)
if err != nil {
return nil, err
}
closer := pmes.GetCloserPeers()
clpeerInfos := pb.PBPeersToPeerInfos(closer)
// see if we got the peer here
for _, npi := range clpeerInfos {
if npi.ID == id {
// add by liangc ---->
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerFindby,
ID: p,
})
// add by liangc <----
return &dhtQueryResult{
peer: npi,
success: true,
}, nil
}
}
notif.PublishQueryEvent(parent, ¬if.QueryEvent{
Type: notif.PeerResponse,
ID: p,
Responses: clpeerInfos,
})
return &dhtQueryResult{closerPeers: clpeerInfos}, nil
})
// run it!
result, err := query.Run(ctx, peers)
在上面的最后一句 query.Run 中,包含了如下調(diào)用軌跡
- func (q dhtQuery) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
- func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
- func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
- func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
- func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
- func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
在 dhtQueryRunner.queryPeer中處理了closerPeers
} else if len(res.closerPeers) > 0 {
log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
for _, next := range res.closerPeers {
if next.ID == r.query.dht.self { // don't add self.
log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
continue
}
// add their addresses to the dialer's peerstore
r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
r.addPeerToQuery(next.ID)
log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
}
} else {
log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
}
可以看到,每個(gè) closer 都被放到了 peerstore 中,同時(shí)也放到了 peersSeen 中,
其實(shí)這里就是在迭代 peersSeen 來(lái)執(zhí)行 run 方法,peersSeen 是線程安全的 set 類(lèi)型
這就保證雖然所有的 run 都是在線程中執(zhí)行的,但是主程序輪訓(xùn)時(shí)并不會(huì)錯(cuò)過(guò) set 中的新成員
同時(shí) set 還可以保證重復(fù)的 peer 不會(huì)被重復(fù)添加,這就可以斷定 set 不會(huì)無(wú)休止增加
其實(shí) bootstrap 到此就沒(méi)什么可看的了