ethereum協(xié)議
3、P2P節(jié)點(diǎn)發(fā)現(xiàn)
(1)分布式哈希表(DHT)
DHT全稱叫分布式哈希表(Distributed Hash Table),是一種分布式存儲(chǔ)方法。在不需要服務(wù)器的情況下,每個(gè)客戶端負(fù)責(zé)一個(gè)小范圍的路由,并負(fù)責(zé)存儲(chǔ)一小部分?jǐn)?shù)據(jù),從而實(shí)現(xiàn)整個(gè)DHT網(wǎng)絡(luò)的尋址和存儲(chǔ)。DHT技術(shù)的應(yīng)用來源于p2p網(wǎng)絡(luò)發(fā)展的需要。第二代p2p文件共享系統(tǒng)正是由于查找節(jié)點(diǎn)十分困難且耗費(fèi)網(wǎng)絡(luò)資源而促進(jìn)了第三代系統(tǒng)引入了DHT技術(shù),用以快速的查找節(jié)點(diǎn)以及資源。
分布式哈希表與哈希表的共同之處在于能夠?qū)崿F(xiàn)快速的查找。它與上面哈希表的不同在于:1)哈希表通常是本地的,用于在本地快速的插入和查找數(shù)據(jù)。而分布式哈希表相當(dāng)于將哈希表中的bucket(桶)分散到不同的節(jié)點(diǎn)計(jì)算機(jī)中。2)哈希表增添、刪除桶會(huì)導(dǎo)致所有的數(shù)據(jù)需要重新hash,但分布式哈希表支持動(dòng)態(tài)的節(jié)點(diǎn)的數(shù)目,節(jié)點(diǎn)可以隨意的進(jìn)入或退出。
在以太坊中,DHT使用的是KAD協(xié)議。
引自:https://blog.csdn.net/lj900911/article/details/83861438
在Kad網(wǎng)絡(luò)中,所有節(jié)點(diǎn)都被當(dāng)作一顆二叉樹的葉子,并且每一個(gè)節(jié)點(diǎn)的位置都由其ID值的最短前綴唯一確定。
ID值是512位公鑰經(jīng)過Hash出來的256位地址。
1、如何將ID映射到二叉樹
如何把節(jié)點(diǎn)映射到二叉樹?
1)先把key(nodeID)以二進(jìn)制的形式表示,進(jìn)行“最短唯一前綴”來處理;
2)二進(jìn)制的第n位代表二叉樹的第n層,這樣一個(gè)子樹的每個(gè)節(jié)點(diǎn)連起來就是完整的id二進(jìn)制表示;
3)“1”代表進(jìn)入左子樹,“0”代表進(jìn)入右子樹(反過來也行)
4)按上面的步驟處理后得到到最后的葉子節(jié)點(diǎn),就是該“key”對(duì)應(yīng)的節(jié)點(diǎn)。
在以太坊中,KAD協(xié)議的核心邏輯由Discover/table.go中進(jìn)行實(shí)現(xiàn)。KAD協(xié)議中,有四種RPC類型,包括PING、STORE、FINDNODE、FINDVALUE。以太坊的KAD只實(shí)現(xiàn)了PING和FindNode。
首先是新建Table
func newTable(t transport, db *enode.DB, bootnodes []*enode.Node) (*Table, error) {
//新建 參數(shù)包括transport:KAD的兩個(gè)操作, db 以及引導(dǎo)節(jié)點(diǎn)
tab := &Table{
net: t,
db: db,
refreshReq: make(chan chan struct{}),
initDone: make(chan struct{}),
closeReq: make(chan struct{}),
closed: make(chan struct{}),
rand: mrand.New(mrand.NewSource(0)),
ips: netutil.DistinctNetSet{Subnet: tableSubnet, Limit: tableIPLimit},
}
//加載引導(dǎo)節(jié)點(diǎn)
if err := tab.setFallbackNodes(bootnodes); err != nil {
return nil, err
}
//對(duì)每一個(gè)bucket[i]創(chuàng)建bucket對(duì)象
for i := range tab.buckets {
tab.buckets[i] = &bucket{
ips: netutil.DistinctNetSet{Subnet: bucketSubnet, Limit: bucketIPLimit},
}
}
//產(chǎn)生隨機(jī)種子 后面用于讀取table里面的randomnode 代碼見下面的代碼塊。
tab.seedRand()
//讀取SeedNodes
tab.loadSeedNodes()
//goroutine 負(fù)責(zé)刷新table以及關(guān)閉
go tab.loop()
return tab, nil
}
func (tab *Table) seedRand() {
var b [8]byte
crand.Read(b[:])
tab.mutex.Lock()
tab.rand.Seed(int64(binary.BigEndian.Uint64(b[:])))
tab.mutex.Unlock()
}
func (tab *Table) loadSeedNodes() {
seeds := wrapNodes(tab.db.QuerySeeds(seedCount, seedMaxAge))
seeds = append(seeds, tab.nursery...)
for i := range seeds {
seed := seeds[i]
age := log.Lazy{Fn: func() interface{} { return time.Since(tab.db.LastPongReceived(seed.ID(), seed.IP())) }}
log.Trace("Found seed node in database", "id", seed.ID(), "addr", seed.addr(), "age", age)
//將種子節(jié)點(diǎn)加入到addSeenNode中
tab.addSeenNode(seed)
}
}
看一下tab.loop()。
// loop schedules refresh, revalidate runs and coordinates shutdown.
func (tab *Table) loop() {
var (
revalidate = time.NewTimer(tab.nextRevalidateTime())
refresh = time.NewTicker(refreshInterval)
copyNodes = time.NewTicker(copyNodesInterval)
refreshDone = make(chan struct{}) // where doRefresh reports completion
revalidateDone chan struct{} // where doRevalidate reports completion
waiting = []chan struct{}{tab.initDone} // holds waiting callers while doRefresh runs
)
defer refresh.Stop()
defer revalidate.Stop()
defer copyNodes.Stop()
// Start initial refresh.
go tab.doRefresh(refreshDone)
loop:
for {
select {
//每一個(gè)小時(shí)執(zhí)行的刷新節(jié)點(diǎn)
case <-refresh.C:
tab.seedRand()
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
//收到刷新請(qǐng)求也刷新
case req := <-tab.refreshReq:
waiting = append(waiting, req)
if refreshDone == nil {
refreshDone = make(chan struct{})
go tab.doRefresh(refreshDone)
}
//刷新完 關(guān)閉channel
case <-refreshDone:
for _, ch := range waiting {
close(ch)
}
waiting, refreshDone = nil, nil
//驗(yàn)證bucket最后一個(gè)的節(jié)點(diǎn)是不是還存貨
case <-revalidate.C:
revalidateDone = make(chan struct{})
go tab.doRevalidate(revalidateDone)
//驗(yàn)證完了 重置驗(yàn)證時(shí)間
case <-revalidateDone:
revalidate.Reset(tab.nextRevalidateTime())
revalidateDone = nil
//如果存活節(jié)點(diǎn)存貨時(shí)間比seedMinTableTime長(zhǎng),存入db
case <-copyNodes.C:
go tab.copyLiveNodes()
//關(guān)閉接收請(qǐng)求 跳出循環(huán)
case <-tab.closeReq:
break loop
}
}
if refreshDone != nil {
<-refreshDone
}
for _, ch := range waiting {
close(ch)
}
if revalidateDone != nil {
<-revalidateDone
}
close(tab.closed)
}
看一下負(fù)責(zé)刷新的doRefresh函數(shù)。
func (tab *Table) doRefresh(done chan struct{}) {
defer close(done)
// Load nodes from the database and insert
// them. This should yield a few previously seen nodes that are
// (hopefully) still alive.
tab.loadSeedNodes()
// Run self lookup to discover new neighbor nodes.
// We can only do this if we have a secp256k1 identity.
var key ecdsa.PublicKey
if err := tab.self().Load((*enode.Secp256k1)(&key)); err == nil {
tab.lookup(encodePubkey(&key), false)
}
// The Kademlia paper specifies that the bucket refresh should
// perform a lookup in the least recently used bucket. We cannot
// adhere to this because the findnode target is a 512bit value
// (not hash-sized) and it is not easily possible to generate a
// sha3 preimage that falls into a chosen bucket.
// We perform a few lookups with a random target instead.
for i := 0; i < 3; i++ {
var target encPubkey
crand.Read(target[:])
tab.lookup(target, false)
}
}
主要是用tab.lookup()進(jìn)行節(jié)點(diǎn)查找的??匆幌耹ookup函數(shù)。
func (tab *Table) lookup(targetKey encPubkey, refreshIfEmpty bool) []*node {
var (
target = enode.ID(crypto.Keccak256Hash(targetKey[:]))
asked = make(map[enode.ID]bool)
seen = make(map[enode.ID]bool)
reply = make(chan []*node, alpha)
pendingQueries = 0
result *nodesByDistance
)
// don't query further if we hit ourself.
// unlikely to happen often in practice.
asked[tab.self().ID()] = true
for {
tab.mutex.Lock()
// generate initial result set
//返回和target最近的集合,最多bucketSize=16個(gè)
result = tab.closest(target, bucketSize)
tab.mutex.Unlock()
if len(result.entries) > 0 || !refreshIfEmpty {
break
}
// The result set is empty, all nodes were dropped, refresh.
// We actually wait for the refresh to complete here. The very
// first query will hit this case and run the bootstrapping
// logic.
<-tab.refresh()
refreshIfEmpty = false
}
for {
// ask the alpha closest nodes that we haven't asked yet
//從result set 中的α=3個(gè)節(jié)點(diǎn)發(fā)起findnode請(qǐng)求,詢問其離target最近的節(jié)點(diǎn)集合
for i := 0; i < len(result.entries) && pendingQueries < alpha; i++ {
n := result.entries[i]
if !asked[n.ID()] {
asked[n.ID()] = true
pendingQueries++
//執(zhí)行findnode方法
go tab.findnode(n, targetKey, reply)
}
}
//上面詢問過程的節(jié)點(diǎn)都問過了
if pendingQueries == 0 {
// we have asked all closest nodes, stop the search
break
}
select {
//nodes放入表
case nodes := <-reply:
for _, n := range nodes {
if n != nil && !seen[n.ID()] {
seen[n.ID()] = true
//按照距離排序放入
result.push(n, bucketSize)
}
}
//關(guān)閉請(qǐng)求
case <-tab.closeReq:
return nil // shutdown, no need to continue.
}
pendingQueries--
}
return result.entries
}
看一下findnode
func (tab *Table) findnode(n *node, targetKey encPubkey, reply chan<- []*node) {
//findnode失敗次數(shù)
fails := tab.db.FindFails(n.ID(), n.IP())
//udp發(fā)送findnode請(qǐng)求
r, err := tab.net.findnode(n.ID(), n.addr(), targetKey)
if err == errClosed {
// Avoid recording failures on shutdown.
reply <- nil
return
} else if len(r) == 0 {
fails++
tab.db.UpdateFindFails(n.ID(), n.IP(), fails)
log.Trace("Findnode failed", "id", n.ID(), "failcount", fails, "err", err)
//大于失敗次數(shù),從table中刪去節(jié)點(diǎn)
if fails >= maxFindnodeFailures {
log.Trace("Too many findnode failures, dropping", "id", n.ID(), "failcount", fails)
tab.delete(n)
}
} else if fails > 0 {
tab.db.UpdateFindFails(n.ID(), n.IP(), fails-1)
}
// Grab as many nodes as possible. Some of them might not be alive anymore, but we'll
// just remove those again during revalidation.
//盡可能將節(jié)點(diǎn)添加到Table中
for _, n := range r {
tab.addSeenNode(n)
}
reply <- r
}
再看看table.addSeenNode()
// addSeenNode adds a node which may or may not be live to the end of a bucket. If the
// bucket has space available, adding the node succeeds immediately. Otherwise, the node is
// added to the replacements list.
//
// The caller must not hold tab.mutex.
func (tab *Table) addSeenNode(n *node) {
//等于自己ID,就不加了
if n.ID() == tab.self().ID() {
return
}
tab.mutex.Lock()
defer tab.mutex.Unlock()
b := tab.bucket(n.ID())
//b在bucket里,就不加了
if contains(b.entries, n.ID()) {
// Already in bucket, don't add.
return
}
//bucket滿了,可能作為替代添加,舍棄頭部節(jié)點(diǎn)
if len(b.entries) >= bucketSize {
// Bucket full, maybe add as replacement.
tab.addReplacement(b, n)
return
}
if !tab.addIP(b, n.IP()) {
// Can't add: IP limit reached.
return
}
// Add to end of bucket:
//直接在尾部加,將n從replacement中刪除
b.entries = append(b.entries, n)
b.replacements = deleteNode(b.replacements, n)
n.addedAt = time.Now()
if tab.nodeAddedHook != nil {
tab.nodeAddedHook(n)
}
}
至此KAD的邏輯處理已經(jīng)完成,還有部分細(xì)節(jié)代碼沒有看,但總體流程如上。
具體的PINGPONG、FINDNODE請(qǐng)求在Discover/udp.go中,有空再看。
這里只分析KAD節(jié)點(diǎn)發(fā)現(xiàn)。