[以太坊源碼分析][p2p網(wǎng)絡(luò)03]:發(fā)起TCP連接請求


上一節(jié)介紹的是底層p2p網(wǎng)絡(luò)開啟后,監(jiān)聽別的遠(yuǎn)程節(jié)點(diǎn)發(fā)送來的TCP連接請求。這一節(jié)是個續(xù)集,要介紹的是本地節(jié)點(diǎn)如何向遠(yuǎn)程節(jié)點(diǎn)發(fā)起TCP連接請求。

這一次,是我們打電話call別人。但是這一次有點(diǎn)不同,我們是以做任務(wù)的形式向別人發(fā)起撥號,而且每次做很多個任務(wù)。每一個任務(wù)里都包含了連接類型遠(yuǎn)程節(jié)點(diǎn)信息。

究竟是怎么回事,請往下看。

0.索引

01.從Server服務(wù)開始
02.初始化撥號狀態(tài),以及創(chuàng)建任務(wù)
03.計(jì)劃任務(wù)和開啟任務(wù)
04.Do 執(zhí)行任務(wù)
05.總結(jié)

1.從Server服務(wù)開始

Server服務(wù)啟動,也開始了撥號

在上圖中,主要是看第3個步驟,初始化撥號狀態(tài),和第6個步驟,開始撥號。(這里提一下,監(jiān)聽連接發(fā)起連接是兩個單獨(dú)的協(xié)程,所以并不是監(jiān)聽后再發(fā)起連接。)

2.初始化撥號狀態(tài),以及創(chuàng)建任務(wù)

dialstate 撥號狀態(tài)
dialstatep2p/dial.go中的核心結(jié)構(gòu)體,管理撥號(發(fā)起TCP連接請求)和查找節(jié)點(diǎn)的操作。
通過newDialState來新建它。關(guān)于dialstate字段的含義在下方的注釋中。

func newDialState(static []*enode.Node, bootnodes []*enode.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
    s := &dialstate{
        maxDynDials: maxdyn,                                    // 最大的撥號任務(wù)數(shù)量
        ntab:        ntab,                                      // k桶                                        
        netrestrict: netrestrict,                               // ip網(wǎng)絡(luò)的列表
        static:      make(map[enode.ID]*dialTask),              // 靜態(tài)節(jié)點(diǎn)
        dialing:     make(map[enode.ID]connFlag),               // 撥號中,connFlag有4種撥號類型
        bootnodes:   make([]*enode.Node, len(bootnodes)),       // 初始引導(dǎo)節(jié)點(diǎn)
        randomNodes: make([]*enode.Node, maxdyn/2),             // 在k桶種隨機(jī)查找節(jié)點(diǎn),數(shù)量為最大撥號任務(wù)數(shù)量的二分之一
        hist:        new(dialHistory),                          // 記錄最近的撥號
    }
    // 加入初始引導(dǎo)節(jié)點(diǎn)
    copy(s.bootnodes, bootnodes)
    // 加入靜態(tài)節(jié)點(diǎn)
    for _, n := range static {
        s.addStatic(n)
    }
    return s
}

其中加入了兩種節(jié)點(diǎn),bootnodesstaticbootnodes是初始引導(dǎo)節(jié)點(diǎn),在節(jié)點(diǎn)沒有接收到任何節(jié)點(diǎn)的連接請求,也沒有節(jié)點(diǎn)可以給我們鄰居節(jié)點(diǎn)的時候,就去連接bootnodes,它硬編碼在了以太坊的源碼中。static是靜態(tài)節(jié)點(diǎn),如果我們想和某些節(jié)點(diǎn)保持長期的連接,就把它們加入到靜態(tài)節(jié)點(diǎn)的列表中。

newTasks 新建任務(wù)
新建任務(wù)就是將某一些遠(yuǎn)程節(jié)點(diǎn)打包成任務(wù),(一個任務(wù)對應(yīng)一個遠(yuǎn)程節(jié)點(diǎn)),最終返回一個任務(wù)列表。執(zhí)行任務(wù)就是給任務(wù)中的遠(yuǎn)程節(jié)點(diǎn)發(fā)起TCP連接請求。

以下是新建任務(wù)的流程圖:

新建任務(wù)
  • 1.設(shè)置最大的任務(wù)數(shù)量,這個是是由節(jié)點(diǎn)最大連接數(shù)除以撥號比率得出的,即maxPeers/radio得到。
    needDynDials := s.maxDynDials
    
  • 2.判斷peers里是否有已經(jīng)建立連接的節(jié)點(diǎn),peers是向本地節(jié)點(diǎn)發(fā)來連接請求的遠(yuǎn)程節(jié)點(diǎn)的集合。記錄數(shù)量,最大任務(wù)數(shù)量減去這個數(shù)。
    for _, p := range peers {
        if p.rw.is(dynDialedConn) {
            needDynDials--
        }
    }
    
  • 3.判斷服務(wù)中是否有正在撥號的節(jié)點(diǎn)。記錄數(shù)量,最大任務(wù)數(shù)量減去這個數(shù)。
    for _, flag := range s.dialing {
        if flag&dynDialedConn != 0 {
            needDynDials--
        }
    }
    
  • 4.向設(shè)置的靜態(tài)節(jié)點(diǎn)s.static發(fā)起連接請求,這個不消耗最大任務(wù)數(shù)量。
  • 5.如果發(fā)來連接請求的遠(yuǎn)程節(jié)點(diǎn)集合peers為空,并且經(jīng)過了設(shè)置的時間fallbackInterval20s,會隨機(jī)的連接一個引導(dǎo)節(jié)點(diǎn)bootnode。最大任務(wù)數(shù)量減1。
    if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval {
        // 將第一個bootnode放在列表最后,使每一次取的bootnode都是不一樣的。
        bootnode := s.bootnodes[0]
        s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...)
        s.bootnodes = append(s.bootnodes, bootnode)
    
        if addDial(dynDialedConn, bootnode) {
            needDynDials--
        }
    }
    
  • 6.ReadRandomNodes隨機(jī)的從UDP節(jié)點(diǎn)發(fā)現(xiàn)中使用Kad算法維護(hù)的k桶里,提取randomCandidates個節(jié)點(diǎn)。randomCandidates為最大任務(wù)數(shù)量的二分之一。(可能會提取不到這么多個,實(shí)際提取到的數(shù)量為n。)
    最大任務(wù)數(shù)量減去n。
    randomCandidates := needDynDials / 2
    if randomCandidates > 0 {
        n := s.ntab.ReadRandomNodes(s.randomNodes)
        for i := 0; i < randomCandidates && i < n; i++ {
            if addDial(dynDialedConn, s.randomNodes[i]) {
                needDynDials--
            }
        }
    }
    
  • 7.如果還不滿足最大任務(wù)數(shù)量的話,從s.lookupBuf里提取,直到達(dá)到最大任務(wù)數(shù)量。s.lookupBuf里的節(jié)點(diǎn)也是通過Kad獲取節(jié)點(diǎn)的方式獲取到的,如果s.lookupBuf里節(jié)點(diǎn)數(shù)量不夠,則創(chuàng)建發(fā)現(xiàn)任務(wù)discoverTask 進(jìn)行節(jié)點(diǎn)發(fā)現(xiàn),填充s.lookupBuf。
    // 從lookupBuf里提取節(jié)點(diǎn)。
    i := 0
    for ; i < len(s.lookupBuf) && needDynDials > 0; i++ {
        if addDial(dynDialedConn, s.lookupBuf[i]) {
            needDynDials--
        }
    }
    // 去掉被提取出來的節(jié)點(diǎn)。
    s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])]    
    // 數(shù)量不夠的話,進(jìn)行節(jié)點(diǎn)發(fā)現(xiàn)。
    if len(s.lookupBuf) < needDynDials && !s.lookupRunning {
        s.lookupRunning = true
        newtasks = append(newtasks, &discoverTask{})
    }
    
  • 8.如果沒有需要執(zhí)行的任務(wù),會執(zhí)行等待任務(wù)waitExpireTask,也就是,保持撥號邏輯繼續(xù)運(yùn)行。
    if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 {
          t := &waitExpireTask{s.hist.min().exp.Sub(now)}
          newtasks = append(newtasks, t)
      }
    

上述過程,即完成了一次任務(wù)的創(chuàng)建,結(jié)果是得到一個任務(wù)列表newtasks

3.計(jì)劃任務(wù)和開啟任務(wù)

server.gorun(dialstate dialer)
先來看一下Server服務(wù)中關(guān)于任務(wù)的計(jì)劃和執(zhí)行的流程圖:

發(fā)起TCP連接請求

  • 1.首先是對字段進(jìn)行初始化,例如,發(fā)來請求連接的遠(yuǎn)程節(jié)點(diǎn)列表peers,接入的連接數(shù)inboundCount。最主要的是定義了兩種任務(wù)列表,runningTasks運(yùn)行中的任務(wù)列表,和queuedTasks排隊(duì)等待中的任務(wù)列表。
    • runningTasks是指執(zhí)行一個任務(wù)(即發(fā)起一個TCP連接請求),就將該任務(wù)加入到runningTasks列表中。完成任務(wù)后移除。
    • queuedTasks是指新建了任務(wù)后,將任務(wù)加入到queuedTasks列表中,queuedTasks中的任務(wù)被執(zhí)行時從queuedTasks中移除,加入到runningTasks中。
  • 2.定義了三種對任務(wù)進(jìn)行的操作:scheduleTasks計(jì)劃任務(wù),startTasks開始任務(wù),delTask刪除任務(wù)。
    • delTaskrunningTasks移除給定的單個任務(wù)。
    delTask := func(t task) {
       // 循環(huán)查找到該任務(wù),然后移除。
       for i := range runningTasks {
           if runningTasks[i] == t {
               runningTasks = append(runningTasks[:i], runningTasks[i+1:]...)
               break
           }
       }
    }
    
    • startTasks 如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到maxActiveDialTasks最大活躍的任務(wù)數(shù)量(默認(rèn)為16個),則開始執(zhí)行任務(wù)。每一個任務(wù)都是一個單獨(dú)的線程。任務(wù)的執(zhí)行通過調(diào)用t.Do(srv)進(jìn)行,任務(wù)完成后將任務(wù)傳入taskdone通道。
      執(zhí)行中的任務(wù)加入runningTasks列表中。最終返回ts列表中未執(zhí)行的任務(wù)。
    startTasks := func(ts []task) (rest []task) {
         i := 0
         for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ {
             t := ts[i]
             srv.log.Trace("New dial task", "task", t)
             // 分配線程,開始執(zhí)行任務(wù)。
             go func() { t.Do(srv); taskdone <- t }()
             // 運(yùn)行任務(wù)列表中加入該任務(wù)。
             runningTasks = append(runningTasks, t)
         }
         return ts[i:]
    }
    
    • scheduleTasks 先執(zhí)行queuedTasks列表中的任務(wù),queuedTasks列表中未被執(zhí)行的任務(wù)將被保留。
    • 然后如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到最大活躍的任務(wù)數(shù)量,則調(diào)用dialstate.newTasks新建任務(wù),接著執(zhí)行剛剛新建任務(wù)。未被執(zhí)行的任務(wù)也加入到queuedTasks列表中,等待循環(huán)的下一次執(zhí)行。
    scheduleTasks := func() {
      // 執(zhí)行queuedTasks列表中的任務(wù)。
      queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...)
      // 如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到最大的撥號數(shù)量
      if len(runningTasks) < maxActiveDialTasks {
          // 新建撥號任務(wù)
          nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now())
          // 先執(zhí)行新建的任務(wù),新建的任務(wù)中未被執(zhí)行的任務(wù)加入到queuedTasks列表中
          queuedTasks = append(queuedTasks, startTasks(nt)...)
      }
    }
    
  • 3.主要的處理循環(huán):
    • 開啟了計(jì)劃任務(wù)函數(shù),由于開始任務(wù)函數(shù)包含在計(jì)劃函數(shù)里,所以這里開始了新建任務(wù)以及并發(fā)的執(zhí)行任務(wù)。running是運(yùn)行與否的標(biāo)志位。
      running:
        for {
            scheduleTasks()
            // 處理消息
            ...
        }
      
    • 接下來是一個內(nèi)容很多的select case結(jié)構(gòu),處理接收到的內(nèi)容。
    • 比如說,case n := <-srv.addtrusted: 觸發(fā)后,將某個節(jié)點(diǎn)標(biāo)記為受信任的節(jié)點(diǎn)。case n := <-srv.removetrusted: 觸發(fā)后,移除某個受信任的節(jié)點(diǎn)。
    • 比較重要的是case c := <-srv.addpeer:,需要新建遠(yuǎn)程節(jié)點(diǎn)的時候觸發(fā)。也是說這個case會在上一篇中介紹的節(jié)點(diǎn)協(xié)議握手成功之后,srv.addpeer的通道中加入與遠(yuǎn)程節(jié)點(diǎn)的連接的時候觸發(fā)。
      case c := <-srv.addpeer:
          // 協(xié)議握手檢查
          err := srv.protoHandshakeChecks(peers, inboundCount, c)
          if err == nil {
              // 握手完成,通過所有檢查。
              p := newPeer(c, srv.Protocols)
              ...
              // 執(zhí)行遠(yuǎn)程節(jié)點(diǎn)。
              go srv.runPeer(p)
              // 加入連接請求的peers列表。
              peers[c.node.ID()] = p
              // 接入節(jié)點(diǎn)數(shù)加1。
              if p.Inbound() {
                  inboundCount++
              }
          }
          select {
          case c.cont <- err:
          case <-srv.quit:
              break running
          }
      
  • 4.最后是循環(huán)完畢,關(guān)閉節(jié)點(diǎn)發(fā)現(xiàn),斷開與全部節(jié)點(diǎn)的連接,并清空peers

4.Do 執(zhí)行任務(wù)

dial.goDo(srv *Server)
上述startTasks開始任務(wù)中執(zhí)行任務(wù)的具體過程。

func (t *dialTask) Do(srv *Server) {
    // 判斷節(jié)點(diǎn)是否完整,不完整的節(jié)點(diǎn)表示沒有ip地址。
    if t.dest.Incomplete() {
        // 解析,使用Kad的方法查找到該節(jié)點(diǎn)的ip地址。
        if !t.resolve(srv) {
            return
        }
    }
    // 撥號
    err := t.dial(srv, t.dest)
    ...
    }
}
  • 先判斷節(jié)點(diǎn)的完整性,不完整的話解析獲取該節(jié)點(diǎn)的ip地址。
  • 然后進(jìn)行撥號。
func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
    // fd是一個連接
    fd, err := srv.Dialer.Dial(dest)
    ...
    return srv.SetupConn(mfd, t.flags, dest)
}
  • dial,撥號,調(diào)用了golang自帶的net.Dialer.Dial方法建立TCP連接,然后使用srv.SetupConn方法進(jìn)行加密握手和協(xié)議握手。
  • 在上一節(jié)監(jiān)聽連接中,srv.SetupConn的第三個傳入字段是nil,因?yàn)槲覀兪潜O(jiān)聽連接的,所以還無該節(jié)點(diǎn)公鑰。這一次是發(fā)起請求,所以我們知道該遠(yuǎn)程節(jié)點(diǎn)的公鑰,在加密握手之后,可以將我們擁有的公鑰與遠(yuǎn)程節(jié)點(diǎn)發(fā)來的公鑰進(jìn)行驗(yàn)證。

5.總結(jié)

  • 1.節(jié)點(diǎn)發(fā)起TCP連接請求是通過創(chuàng)建任務(wù),執(zhí)行任務(wù)實(shí)現(xiàn)的,以任務(wù)的形式,可以更好的控制建立連接的數(shù)量,也方便并發(fā)的發(fā)起連接請求。
  • 2.監(jiān)聽TCP連接和發(fā)起TCP請求相輔相成。監(jiān)聽連接負(fù)責(zé)接收遠(yuǎn)程節(jié)點(diǎn)的TCP連接,以及建立與遠(yuǎn)程節(jié)點(diǎn)的加密通道;發(fā)起請求負(fù)責(zé)向遠(yuǎn)程節(jié)點(diǎn)發(fā)送TCP連接請求,以及執(zhí)行建立了加密通道后的遠(yuǎn)程節(jié)點(diǎn)(的協(xié)議)。
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容