go-libp2p-kad-dht bootstrap 源碼分析

Bootstrap 邏輯

入口代碼:go-libp2p-kad-dht/dht_bootstrap.go

入口方法:
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
最終這個(gè)方法會(huì)去執(zhí)行 runBootstrap 方法,真正的操作是從這里開(kāi)始的

  • func (dht *IpfsDHT) runBootstrap(ctx context.Context, cfg BootstrapConfig) error

原理大概是,生成一個(gè)隨機(jī)的目標(biāo) randomID ,然后在網(wǎng)絡(luò)中 find 這個(gè) peer ,因?yàn)榭隙ㄊ钦也坏降?,所以每個(gè) peer 都會(huì)返回給你離這個(gè) randomID 最近的 peers ,也就是 CloserPeers 來(lái)填充你本地的 k 桶,這樣的動(dòng)作默認(rèn)是 5 分鐘執(zhí)行一次的,所以 k 桶很快就會(huì)被填滿。

  • func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ pstore.PeerInfo, err error)

這個(gè)方法會(huì)現(xiàn)在本地找:dht.FindLocal(id)
本地沒(méi)有就在路由表里找

peers := dht.routingTable.NearestPeers(kb.ConvertPeerID(id), AlphaValue)
這個(gè)方法的邏輯是,AlphaValue = 3 ,表示最低要確保發(fā)送到 3 個(gè) peers 上
目標(biāo) id 會(huì)被 xor 出一個(gè)桶,這個(gè)桶如果是空的,或者桶里不足 3 個(gè) peer
就會(huì)從臨近的桶里返回 peer ,如果桶里 peer 超過(guò) AlphaValue 則按桶里的總數(shù)來(lái)
奇怪的是最后又只節(jié)選了 3 個(gè)距離最近的 peer ,為什么呢?

func (rt *RoutingTable) NearestPeers(id ID, count int) []peer.ID {
    cpl := commonPrefixLen(id, rt.local)

    rt.tabLock.RLock()

    // Get bucket at cpl index or last bucket
    var bucket *Bucket
    if cpl >= len(rt.Buckets) {
        cpl = len(rt.Buckets) - 1
    }
    bucket = rt.Buckets[cpl]

    peerArr := make(peerSorterArr, 0, count)
    //TODO 如果這里的數(shù)據(jù)比 peerArr 多,會(huì)重新 make peerArr
    peerArr = copyPeersFromList(id, peerArr, bucket.list)
    if len(peerArr) < count {
        // In the case of an unusual split, one bucket may be short or empty.
        // if this happens, search both surrounding buckets for nearby peers
        if cpl > 0 {
            plist := rt.Buckets[cpl-1].list
            peerArr = copyPeersFromList(id, peerArr, plist)
        }

        if cpl < len(rt.Buckets)-1 {
            plist := rt.Buckets[cpl+1].list
            peerArr = copyPeersFromList(id, peerArr, plist)
        }
    }
    rt.tabLock.RUnlock()

    // Sort by distance to local peer
    sort.Sort(peerArr)
    // TODO 這里為什么又截?cái)嗔四兀恐蝗×?個(gè)離的最近的,什么目的?
    if count < len(peerArr) {
        peerArr = peerArr[:count]
    }

    out := make([]peer.ID, 0, len(peerArr))
    for _, p := range peerArr {
        out = append(out, p.p)
    }

    return out
}

在 query 的同時(shí)會(huì)刷新每個(gè)有響應(yīng)的 peer 的 ttl ,但是至多只向 3 個(gè) peer 發(fā)送 query 請(qǐng)求

請(qǐng)求的返回的 closer 貌似沒(méi)處理,為什么?

因?yàn)樾枰托膶ふ遥戳艘蝗K于在
func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)
中找到了答案,這個(gè)實(shí)在是太隱蔽了,對(duì)代碼的作者表示譴責(zé)

以下是 FindPeer 中的代碼片段,

    parent := ctx
    query := dht.newQuery(string(id), func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
        notif.PublishQueryEvent(parent, &notif.QueryEvent{
            Type: notif.SendingQuery,
            ID:   p,
        })

        pmes, err := dht.findPeerSingle(ctx, p, id)
        if err != nil {
            return nil, err
        }

        closer := pmes.GetCloserPeers()
        clpeerInfos := pb.PBPeersToPeerInfos(closer)

        // see if we got the peer here
        for _, npi := range clpeerInfos {
            if npi.ID == id {
                // add by liangc ---->
                notif.PublishQueryEvent(parent, &notif.QueryEvent{
                    Type: notif.PeerFindby,
                    ID:   p,
                })
                // add by liangc <----
                return &dhtQueryResult{
                    peer:    npi,
                    success: true,
                }, nil
            }
        }

        notif.PublishQueryEvent(parent, &notif.QueryEvent{
            Type:      notif.PeerResponse,
            ID:        p,
            Responses: clpeerInfos,
        })

        return &dhtQueryResult{closerPeers: clpeerInfos}, nil
    })

    // run it!
    result, err := query.Run(ctx, peers)

在上面的最后一句 query.Run 中,包含了如下調(diào)用軌跡

  • func (q dhtQuery) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
    • func (r dhtQueryRunner) Run(ctx context.Context, peers []peer.ID) (dhtQueryResult, error)
      • func (r *dhtQueryRunner) spawnWorkers(proc process.Process)
        • func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID)

在 dhtQueryRunner.queryPeer中處理了closerPeers

    } else if len(res.closerPeers) > 0 {
        log.Debugf("PEERS CLOSER -- worker for: %v (%d closer peers)", p, len(res.closerPeers))
        for _, next := range res.closerPeers {
            if next.ID == r.query.dht.self { // don't add self.
                log.Debugf("PEERS CLOSER -- worker for: %v found self", p)
                continue
            }

            // add their addresses to the dialer's peerstore
            r.query.dht.peerstore.AddAddrs(next.ID, next.Addrs, pstore.TempAddrTTL)
            r.addPeerToQuery(next.ID)
            log.Debugf("PEERS CLOSER -- worker for: %v added %v (%v)", p, next.ID, next.Addrs)
        }
    } else {
        log.Debugf("QUERY worker for: %v - not found, and no closer peers.", p)
    }

可以看到,每個(gè) closer 都被放到了 peerstore 中,同時(shí)也放到了 peersSeen 中,
其實(shí)這里就是在迭代 peersSeen 來(lái)執(zhí)行 run 方法,peersSeen 是線程安全的 set 類(lèi)型
這就保證雖然所有的 run 都是在線程中執(zhí)行的,但是主程序輪訓(xùn)時(shí)并不會(huì)錯(cuò)過(guò) set 中的新成員
同時(shí) set 還可以保證重復(fù)的 peer 不會(huì)被重復(fù)添加,這就可以斷定 set 不會(huì)無(wú)休止增加

其實(shí) bootstrap 到此就沒(méi)什么可看的了

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容