[以太坊源碼分析][p2p網(wǎng)絡(luò)05]:底層節(jié)點(diǎn)如何與上層節(jié)點(diǎn)聯(lián)系


對(duì)于以太坊的p2p網(wǎng)絡(luò),我覺得,分為底層p2p網(wǎng)絡(luò)的構(gòu)建,以及上層eth服務(wù)的實(shí)現(xiàn)。在介紹以太坊上層服務(wù)之前,需要先來看一下底層網(wǎng)絡(luò)是怎么跟上層網(wǎng)絡(luò)聯(lián)系在一起的。

0.索引

01.ProtocolManager 協(xié)議管理
02.新建一個(gè) ProtocolManager
03.建立聯(lián)系
04.使用 Run 方法
05.底層的peer結(jié)構(gòu)
06.總結(jié)

1.ProtocolManager 協(xié)議管理

ProtocolManagereth/handle.go中的核心結(jié)構(gòu)體,用來管理節(jié)點(diǎn)之間的通信。

type ProtocolManager struct {
    networkID uint64
    fastSync  uint32 
    acceptTxs uint32 

    txpool      txPool
    blockchain  *core.BlockChain
    chainconfig *params.ChainConfig
    maxPeers    int

    downloader *downloader.Downloader
    fetcher    *fetcher.Fetcher
    peers      *peerSet

    SubProtocols []p2p.Protocol

    eventMux      *event.TypeMux
    txsCh         chan core.NewTxsEvent
    txsSub        event.Subscription
    minedBlockSub *event.TypeMuxSubscription

    // fetcher, syncer, txsyncLoop 的通道
    newPeerCh   chan *peer
    txsyncCh    chan *txsync
    quitSync    chan struct{}
    noMorePeers chan struct{}

    wg sync.WaitGroup
}

ProtocolManager結(jié)構(gòu)體中包含了

  • networkID網(wǎng)絡(luò)id。
  • fastSync快速同步的標(biāo)志, acceptTxs接收交易方式的標(biāo)志。
  • txpool交易池,blockchain區(qū)塊鏈,chainconfig區(qū)塊鏈配置,maxpeers最大節(jié)點(diǎn)數(shù)。
  • downloader下載器,fetcher提取器,peers相鄰節(jié)點(diǎn)表。
  • SubProtocols子協(xié)議列表。(與底層節(jié)點(diǎn)相關(guān)。)
  • 以及需要用到的各種通道和同步鎖。

2.新建一個(gè) ProtocolManager

底層節(jié)點(diǎn)與上層節(jié)點(diǎn)的聯(lián)系,就在新建一個(gè)ProtocolManager的方法里。(第3個(gè)步驟,再往下會(huì)做具體說明。)

func NewProtocolManager(
    config *params.ChainConfig, 
    mode downloader.SyncMode, 
    networkID uint64, 
    mux *event.TypeMux, 
    txpool txPool,
    engine consensus.Engine, 
    blockchain *core.BlockChain, 
    chaindb ethdb.Database
) (*ProtocolManager, error) 
  • 1.初始化基礎(chǔ)字段。
    manager := &ProtocolManager{networkID:   networkID,
      eventMux:    mux,
      txpool:      txpool,
      blockchain:  blockchain,
      chainconfig: config,
      peers:       newPeerSet(),
      newPeerCh:   make(chan *peer),
      noMorePeers: make(chan struct{}),
      txsyncCh:    make(chan *txsync),
      quitSync:    make(chan struct{}),}
    
  • 2.確認(rèn)是否快速同步模式。
    if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {...}
    
  • 3.每個(gè)實(shí)現(xiàn)的版本添加子協(xié)議。也就是上層服務(wù)給予底層p2p網(wǎng)絡(luò)調(diào)用上層服務(wù)的函數(shù)入口。(在這一步建立的聯(lián)系。)
    manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
      for i, version := range ProtocolVersions {...}
    
  • 4.新建一個(gè)下載器downloader,用于下載區(qū)塊。構(gòu)建不同的同步機(jī)制,mode為同步機(jī)制類型。
    manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    
  • 5.validator引用驗(yàn)證區(qū)塊頭的方法。
    validator := func(header *types.Header) error {
          return engine.VerifyHeader(blockchain, header, true)
      }
    
  • 6.heighter引用獲取區(qū)塊高度的方法。
    heighter := func() uint64 {
          return blockchain.CurrentBlock().NumberU64()
      }
    
  • 7.inserter引用在區(qū)塊鏈上插入?yún)^(qū)塊的方法。
    inserter := func(blocks types.Blocks) (int, error) {
          ...
          return manager.blockchain.InsertChain(blocks)
      }
    
  • 8.新建一個(gè)提取器fetcher,用于輔助同步區(qū)塊。
    manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
    

3.(添加子協(xié)議)建立聯(lián)系

添加子協(xié)議的具體步驟。

manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
    Name:    ProtocolName,
    Version: version,
    Length:  ProtocolLengths[i],
    Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
        peer := manager.newPeer(int(version), p, rw)
        select {
        case manager.newPeerCh <- peer:
            manager.wg.Add(1)
            defer manager.wg.Done()
            return manager.handle(peer)
        case <-manager.quitSync:
            return p2p.DiscQuitting
        }
    },
    NodeInfo: func() interface{} {
        return manager.NodeInfo()
    },
    PeerInfo: func(id enode.ID) interface{} {
        if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
            return p.Info()
        }
        return nil
    },
})

在新建一個(gè)p2p.Protocol對(duì)象的時(shí)候,

  • 傳入三個(gè)字段:Name協(xié)議名,Version協(xié)議版本,Length協(xié)議長度。
  • 新建三個(gè)遠(yuǎn)程節(jié)點(diǎn)的回調(diào)函數(shù):
    • Run執(zhí)行協(xié)議。
    • NodeInfo返回了本地節(jié)點(diǎn)的網(wǎng)絡(luò)id,區(qū)塊難度值,創(chuàng)世區(qū)塊哈希值,區(qū)塊鏈配置,當(dāng)前區(qū)塊哈希值。
    • PeerInfo遠(yuǎn)程節(jié)點(diǎn)的信息。

Run方法manager.newPeer方法使得底層的peer能創(chuàng)建一個(gè)上層的peer,并且自身包含在上層的peer里。創(chuàng)建了上層的peer后,調(diào)用了manager.handle(peer)方法開始處理遠(yuǎn)程節(jié)點(diǎn)發(fā)來的消息。

4.調(diào)用子協(xié)議的 Run 方法 (包含步驟5)

在發(fā)起TCP連接請(qǐng)求的那一篇里,提到了:
在節(jié)點(diǎn)協(xié)議握手成功之后,srv.addpeer的通道中加入與遠(yuǎn)程節(jié)點(diǎn)的連接。
這時(shí)候會(huì)觸發(fā) case c := <-srv.addpeer:。
(代碼在p2p/server.go中)

case c := <-srv.addpeer:
    // 對(duì)協(xié)議握手進(jìn)行一次檢查
    err := srv.protoHandshakeChecks(peers, inboundCount, c)
    if err == nil {
        // 協(xié)議握手完成,通過檢查。
        // 新建底層peer。
        p := newPeer(c, srv.Protocols)
        // 如果啟用了消息事件,請(qǐng)將peerFeed傳遞給peer
        if srv.EnableMsgEvents {
            p.events = &srv.peerFeed
        }
        name := truncateName(c.name)
        srv.log.Debug("Adding p2p peer", "name", name, "addr", c.fd.RemoteAddr(), "peers", len(peers)+1)
            // 啟動(dòng)一個(gè)單獨(dú)的協(xié)程,運(yùn)行節(jié)點(diǎn)。
            go srv.runPeer(p)
            // 接收請(qǐng)求的節(jié)點(diǎn)集合加入該節(jié)點(diǎn)。
            peers[c.node.ID()] = p
            if p.Inbound() {
                // 接入連接的數(shù)量加1.
                inboundCount++
            }
      }
  • 1.先對(duì)協(xié)議握手進(jìn)行檢查。
  • 2.協(xié)議握手檢查通過后,新建一個(gè)底層的peer。
  • 3.啟動(dòng)一個(gè)單獨(dú)的協(xié)程,來執(zhí)行這一個(gè)peer。go srv.runPeer(p)。先進(jìn)行節(jié)點(diǎn)添加這一事件的廣播,然后調(diào)用p2p/peer.gopeer對(duì)象的run方法。
    func (srv *Server) runPeer(p *Peer) {
      // 測(cè)試。
      ...
      // 廣播節(jié)點(diǎn)添加事件。
      srv.peerFeed.Send(&PeerEvent{
          Type: PeerEventTypeAdd,
          Peer: p.ID(),
      })
      // 執(zhí)行協(xié)議·
      remoteRequested, err := p.run()
      // 廣播節(jié)點(diǎn)下線
      srv.peerFeed.Send(&PeerEvent{
          Type:  PeerEventTypeDrop,
          Peer:  p.ID(),
          Error: err.Error(),
      })
      // 刪除節(jié)點(diǎn)
      srv.delpeer <- peerDrop{p, err, remoteRequested}
    }
    
  • 4.在peers里加入該peer,如果接入成功,接入數(shù)量加1。

5.底層的peer結(jié)構(gòu)以及run方法

(在p2p/peer.go中)
首先是底層peer結(jié)構(gòu)體。它代表一個(gè)遠(yuǎn)程節(jié)點(diǎn)的連接。包含了rw建立的TCP連接,running協(xié)議對(duì)應(yīng)的讀寫通道。

type Peer struct {
    rw      *conn                 // 建立的TCP連接
    running map[string]*protoRW   // 協(xié)議對(duì)應(yīng)的讀寫通道
    log     log.Logger            // 日志記錄
    created mclock.AbsTime

    wg       sync.WaitGroup
    protoErr chan error
    closed   chan struct{}
    disc     chan DiscReason

    // 接收消息發(fā)送/接收事件
    events *event.Feed
}

然后是底層peerrun方法,也就是一個(gè)底層節(jié)點(diǎn)會(huì)執(zhí)行的所有操作。

func (p *Peer) run() (remoteRequested bool, err error) {
    // 定義變量。
    ...
    // 啟動(dòng)了兩個(gè)單獨(dú)的協(xié)程,一個(gè)用于循環(huán)的讀取消息,一個(gè)用于循環(huán)發(fā)送ping消息,確保對(duì)方節(jié)點(diǎn)在線。
    p.wg.Add(2)
    go p.readLoop(readErr)
    go p.pingLoop()

    // 開啟所有協(xié)議。
    writeStart <- struct{}{}
    p.startProtocols(writeStart, writeErr)

    // 等待接收到錯(cuò)誤或者是斷開連接。
loop:
    ...
    // 關(guān)閉節(jié)點(diǎn)。
    close(p.closed)
    p.rw.close(reason)
    p.wg.Wait()
    return remoteRequested, err
}

    1. 啟動(dòng)了兩個(gè)單獨(dú)的協(xié)程,go p.readLoop(readErr)用于循環(huán)的讀取消息,go p.pingLoop()用于循環(huán)發(fā)送ping消息,確保對(duì)方節(jié)點(diǎn)在線。
  • 2.執(zhí)行節(jié)點(diǎn)包含的所有協(xié)議,即p.startProtocols(writeStart, writeErr)方法。在p.startProtocols方法中調(diào)用了底層網(wǎng)絡(luò)設(shè)置的回調(diào)函數(shù)Run(以下程序語句)。
    err := proto.Run(p, rw)
    

步驟4和步驟5一步一步的執(zhí)行和調(diào)用,最后使用了p2p.ProtocolRun方法。

6.總結(jié)

  • 1.底層peer與上層peer通過p2p.Protocol對(duì)象的Run方法聯(lián)系在一起。Run實(shí)現(xiàn)了新建上層節(jié)點(diǎn),以及與協(xié)議版本對(duì)應(yīng)的處理節(jié)點(diǎn)之間通信的消息的功能。
  • 2.關(guān)于底層p2p網(wǎng)絡(luò)啟動(dòng)了的協(xié)程的總結(jié):
    • Server服務(wù)啟動(dòng)了兩個(gè)協(xié)程:監(jiān)聽遠(yuǎn)程節(jié)點(diǎn)發(fā)來的TCP請(qǐng)求,發(fā)起TCP連接請(qǐng)求。
    • 底層peer運(yùn)行協(xié)議的時(shí)候,啟動(dòng)了n+2個(gè)協(xié)程:循環(huán)讀取消息,循環(huán)發(fā)送ping消息,以及n個(gè)協(xié)議對(duì)應(yīng)的處理方式的協(xié)程。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容