以太坊源碼(3)——節(jié)點(diǎn)發(fā)現(xiàn)協(xié)議,KAD協(xié)議

ethereum協(xié)議

3、P2P節(jié)點(diǎn)發(fā)現(xiàn)

(1)分布式哈希表(DHT)

DHT全稱叫分布式哈希表(Distributed Hash Table),是一種分布式存儲(chǔ)方法。在不需要服務(wù)器的情況下,每個(gè)客戶端負(fù)責(zé)一個(gè)小范圍的路由,并負(fù)責(zé)存儲(chǔ)一小部分?jǐn)?shù)據(jù),從而實(shí)現(xiàn)整個(gè)DHT網(wǎng)絡(luò)的尋址和存儲(chǔ)。DHT技術(shù)的應(yīng)用來源于p2p網(wǎng)絡(luò)發(fā)展的需要。第二代p2p文件共享系統(tǒng)正是由于查找節(jié)點(diǎn)十分困難且耗費(fèi)網(wǎng)絡(luò)資源而促進(jìn)了第三代系統(tǒng)引入了DHT技術(shù),用以快速的查找節(jié)點(diǎn)以及資源。

分布式哈希表與哈希表的共同之處在于能夠?qū)崿F(xiàn)快速的查找。它與上面哈希表的不同在于:1)哈希表通常是本地的,用于在本地快速的插入和查找數(shù)據(jù)。而分布式哈希表相當(dāng)于將哈希表中的bucket(桶)分散到不同的節(jié)點(diǎn)計(jì)算機(jī)中。2)哈希表增添、刪除桶會(huì)導(dǎo)致所有的數(shù)據(jù)需要重新hash,但分布式哈希表支持動(dòng)態(tài)的節(jié)點(diǎn)的數(shù)目,節(jié)點(diǎn)可以隨意的進(jìn)入或退出。

在以太坊中,DHT使用的是KAD協(xié)議。
引自:https://blog.csdn.net/lj900911/article/details/83861438

在Kad網(wǎng)絡(luò)中,所有節(jié)點(diǎn)都被當(dāng)作一顆二叉樹的葉子,并且每一個(gè)節(jié)點(diǎn)的位置都由其ID值的最短前綴唯一確定。

ID值是512位公鑰經(jīng)過Hash出來的256位地址。

1、如何將ID映射到二叉樹

如何把節(jié)點(diǎn)映射到二叉樹?

1)先把key(nodeID)以二進(jìn)制的形式表示,進(jìn)行“最短唯一前綴”來處理;
2)二進(jìn)制的第n位代表二叉樹的第n層,這樣一個(gè)子樹的每個(gè)節(jié)點(diǎn)連起來就是完整的id二進(jìn)制表示;
3)“1”代表進(jìn)入左子樹,“0”代表進(jìn)入右子樹(反過來也行)
4)按上面的步驟處理后得到到最后的葉子節(jié)點(diǎn),就是該“key”對(duì)應(yīng)的節(jié)點(diǎn)。

在以太坊中,KAD協(xié)議的核心邏輯由Discover/table.go中進(jìn)行實(shí)現(xiàn)。KAD協(xié)議中,有四種RPC類型,包括PING、STORE、FINDNODE、FINDVALUE。以太坊的KAD只實(shí)現(xiàn)了PING和FindNode。

首先是新建Table

func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error) {
    //新建 參數(shù)包括transport:KAD的兩個(gè)操作, db 以及引導(dǎo)節(jié)點(diǎn) 
    tab := &Table{
        net:        t,
        db:         db,
        refreshReq: make(chan chan struct{}),
        initDone:   make(chan struct{}),
        closeReq:   make(chan struct{}),
        closed:     make(chan struct{}),
        rand:       mrand.New(mrand.NewSource(0)),
        ips:        netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
    }
    //加載引導(dǎo)節(jié)點(diǎn)
    if err := tab.setFallbackNodes(bootnodes); err != nil {
        return nil, err
    }
    //對(duì)每一個(gè)bucket[i]創(chuàng)建bucket對(duì)象
    for i := range tab.buckets {
        tab.buckets[i] = &bucket{
            ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
        }
    }
    //產(chǎn)生隨機(jī)種子 后面用于讀取table里面的randomnode 代碼見下面的代碼塊。
    tab.seedRand()
    //讀取SeedNodes
    tab.loadSeedNodes()
    //goroutine 負(fù)責(zé)刷新table以及關(guān)閉
    go tab.loop()
    return tab, nil
}

func (tab *Table) seedRand() {
    var b [8]byte
    crand.Read(b[:])

    tab.mutex.Lock()
    tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
    tab.mutex.Unlock()
}

func (tab *Table) loadSeedNodes() {
    seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
    seeds = append(seeds, tab.nursery...)
    for i := range seeds {
        seed := seeds[i]
        age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
        log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
        //將種子節(jié)點(diǎn)加入到addSeenNode中
        tab.addSeenNode(seed)
    }
}

看一下tab.loop()。


// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
    var (
        revalidate     = time.NewTimer(tab.nextRevalidateTime())
        refresh        = time.NewTicker(refreshInterval)
        copyNodes      = time.NewTicker(copyNodesInterval)
        refreshDone    = make(chan struct{})           // where doRefresh reports completion
        revalidateDone chan struct{}                   // where doRevalidate reports completion
        waiting        = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
    )
    defer refresh.Stop()
    defer revalidate.Stop()
    defer copyNodes.Stop()

    // Start initial refresh.
    go tab.doRefresh(refreshDone)

loop:
    for {
        select {
            //每一個(gè)小時(shí)執(zhí)行的刷新節(jié)點(diǎn)
        case <-refresh.C:
            tab.seedRand()
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
            //收到刷新請(qǐng)求也刷新
        case req := <-tab.refreshReq:
            waiting = append(waiting, req)
            if refreshDone == nil {
                refreshDone = make(chan struct{})
                go tab.doRefresh(refreshDone)
            }
            //刷新完 關(guān)閉channel
        case <-refreshDone:
            for _, ch := range waiting {
                close(ch)
            }
            waiting, refreshDone = nil, nil
            //驗(yàn)證bucket最后一個(gè)的節(jié)點(diǎn)是不是還存貨
        case <-revalidate.C:
            revalidateDone = make(chan struct{})
            go tab.doRevalidate(revalidateDone)
            //驗(yàn)證完了 重置驗(yàn)證時(shí)間
        case <-revalidateDone:
            revalidate.Reset(tab.nextRevalidateTime())
            revalidateDone = nil
            //如果存活節(jié)點(diǎn)存貨時(shí)間比seedMinTableTime長(zhǎng),存入db
        case <-copyNodes.C:
            go tab.copyLiveNodes()
            //關(guān)閉接收請(qǐng)求 跳出循環(huán)
        case <-tab.closeReq:
            break loop
        }
    }

    if refreshDone != nil {
        <-refreshDone
    }
    for _, ch := range waiting {
        close(ch)
    }
    if revalidateDone != nil {
        <-revalidateDone
    }
    close(tab.closed)
}

看一下負(fù)責(zé)刷新的doRefresh函數(shù)。

func (tab *Table) doRefresh(done chan struct{}) {
    defer close(done)

    // Load nodes from the database and insert
    // them. This should yield a few previously seen nodes that are
    // (hopefully) still alive.
    tab.loadSeedNodes()

    // Run self lookup to discover new neighbor nodes.
    // We can only do this if we have a secp256k1 identity.
    var key ecdsa.PublicKey
    if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
        tab.lookup(encodePubkey(&key), false)
    }

    // The Kademlia paper specifies that the bucket refresh should
    // perform a lookup in the least recently used bucket. We cannot
    // adhere to this because the findnode target is a 512bit value
    // (not hash-sized) and it is not easily possible to generate a
    // sha3 preimage that falls into a chosen bucket.
    // We perform a few lookups with a random target instead.
    for i := 0; i < 3; i++ {
        var target encPubkey
        crand.Read(target[:])
        tab.lookup(target, false)
    }
}

主要是用tab.lookup()進(jìn)行節(jié)點(diǎn)查找的??匆幌耹ookup函數(shù)。

func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
    var (
        target         = enode.ID(crypto.Keccak256Hash(targetKey[:]))
        asked          = make(map[enode.ID]bool)
        seen           = make(map[enode.ID]bool)
        reply          = make(chan []*node, alpha)
        pendingQueries = 0
        result         *nodesByDistance
    )
    // don't query further if we hit ourself.
    // unlikely to happen often in practice.
    asked[tab.self().ID()] = true

    for {
        tab.mutex.Lock()
        // generate initial result set
        //返回和target最近的集合,最多bucketSize=16個(gè)
        result = tab.closest(target, bucketSize)
        tab.mutex.Unlock()
        if len(result.entries) > 0 || !refreshIfEmpty {
            break
        }
        // The result set is empty, all nodes were dropped, refresh.
        // We actually wait for the refresh to complete here. The very
        // first query will hit this case and run the bootstrapping
        // logic.
        <-tab.refresh()
        refreshIfEmpty = false
    }

    for {
        // ask the alpha closest nodes that we haven't asked yet
        //從result set 中的α=3個(gè)節(jié)點(diǎn)發(fā)起findnode請(qǐng)求,詢問其離target最近的節(jié)點(diǎn)集合
        for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
            n := result.entries[i]
            if !asked[n.ID()] {
                asked[n.ID()] = true
                pendingQueries++
                //執(zhí)行findnode方法
                go tab.findnode(n, targetKey, reply)
            }
        }
        //上面詢問過程的節(jié)點(diǎn)都問過了
        if pendingQueries == 0 {
            // we have asked all closest nodes, stop the search
            break
        }
        select {
            //nodes放入表
        case nodes := <-reply:
            for _, n := range nodes {
                if n != nil && !seen[n.ID()] {
                    seen[n.ID()] = true
                    //按照距離排序放入
                    result.push(n, bucketSize)
                }
            }
            //關(guān)閉請(qǐng)求
        case <-tab.closeReq:
            return nil // shutdown, no need to continue.
        }
        pendingQueries--
    }
    return result.entries
}

看一下findnode

func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
    //findnode失敗次數(shù)
    fails := tab.db.FindFails(n.ID(), n.IP())
    //udp發(fā)送findnode請(qǐng)求
    r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
    if err == errClosed {
        // Avoid recording failures on shutdown.
        reply <- nil
        return
    } else if len(r) == 0 {
        fails++
        tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
        log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
        //大于失敗次數(shù),從table中刪去節(jié)點(diǎn)
        if fails >= maxFindnodeFailures {
            log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
            tab.delete(n)
        }
    } else if fails > 0 {
        tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
    }

    // Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
    // just remove those again during revalidation.

    //盡可能將節(jié)點(diǎn)添加到Table中
    for _, n := range r {
        tab.addSeenNode(n)
    }
    reply <- r
}

再看看table.addSeenNode()

// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
    //等于自己ID,就不加了
    if n.ID() == tab.self().ID() {
        return
    }

    tab.mutex.Lock()
    defer tab.mutex.Unlock()

    
    b := tab.bucket(n.ID())
    //b在bucket里,就不加了
    if contains(b.entries, n.ID()) {
        // Already in bucket, don't add.
        return
    }
    //bucket滿了,可能作為替代添加,舍棄頭部節(jié)點(diǎn)
    if len(b.entries) >= bucketSize {
        // Bucket full, maybe add as replacement.
        tab.addReplacement(b, n)
        return
    }
    if !tab.addIP(b, n.IP()) {
        // Can't add: IP limit reached.

        return
    }
    // Add to end of bucket:
    //直接在尾部加,將n從replacement中刪除
    b.entries = append(b.entries, n)
    b.replacements = deleteNode(b.replacements, n)
    n.addedAt = time.Now()
    if tab.nodeAddedHook != nil {
        tab.nodeAddedHook(n)
    }
}

至此KAD的邏輯處理已經(jīng)完成,還有部分細(xì)節(jié)代碼沒有看,但總體流程如上。
具體的PINGPONG、FINDNODE請(qǐng)求在Discover/udp.go中,有空再看。
這里只分析KAD節(jié)點(diǎn)發(fā)現(xiàn)。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • 一些概念 數(shù)據(jù)結(jié)構(gòu)就是研究數(shù)據(jù)的邏輯結(jié)構(gòu)和物理結(jié)構(gòu)以及它們之間相互關(guān)系,并對(duì)這種結(jié)構(gòu)定義相應(yīng)的運(yùn)算,而且確保經(jīng)過這...
    Winterfell_Z閱讀 6,601評(píng)論 0 13
  • 1、P2P原理及協(xié)議概述 P2P 主要存在四種不同的網(wǎng)絡(luò)模型,也代表著 P2P 技術(shù)的四個(gè)發(fā)展階段:集中式、純分布...
    牧碼人愛跑馬閱讀 15,106評(píng)論 1 18
  • 一、概念 常見的P2P網(wǎng)絡(luò)主要分為兩種類型:結(jié)構(gòu)化網(wǎng)絡(luò)和非結(jié)構(gòu)化網(wǎng)絡(luò)。 非結(jié)構(gòu)化的P2P網(wǎng)絡(luò):并不給節(jié)點(diǎn)的連接覆...
    麻臉大叔閱讀 3,717評(píng)論 0 3
  • 分布式系統(tǒng)面臨的第一個(gè)問題就是數(shù)據(jù)分布,即將數(shù)據(jù)均勻地分布到多個(gè)存儲(chǔ)節(jié)點(diǎn)。另外,為了保證可靠性和可用性,需要將數(shù)據(jù)...
    olostin閱讀 4,925評(píng)論 2 26
  • --- layout: post title: "如果有人問你關(guān)系型數(shù)據(jù)庫的原理,叫他看這篇文章(轉(zhuǎn))" date...
    藍(lán)墜星閱讀 919評(píng)論 0 3

友情鏈接更多精彩內(nèi)容