
上一節(jié)介紹的是底層p2p網(wǎng)絡(luò)開啟后,監(jiān)聽別的遠(yuǎn)程節(jié)點(diǎn)發(fā)送來的TCP連接請求。這一節(jié)是個續(xù)集,要介紹的是本地節(jié)點(diǎn)如何向遠(yuǎn)程節(jié)點(diǎn)發(fā)起TCP連接請求。
這一次,是我們打電話call別人。但是這一次有點(diǎn)不同,我們是以做任務(wù)的形式向別人發(fā)起撥號,而且每次做很多個任務(wù)。每一個任務(wù)里都包含了連接類型和遠(yuǎn)程節(jié)點(diǎn)信息。
究竟是怎么回事,請往下看。
0.索引
01.從Server服務(wù)開始
02.初始化撥號狀態(tài),以及創(chuàng)建任務(wù)
03.計(jì)劃任務(wù)和開啟任務(wù)
04.Do 執(zhí)行任務(wù)
05.總結(jié)
1.從Server服務(wù)開始

在上圖中,主要是看第3個步驟,初始化撥號狀態(tài),和第6個步驟,開始撥號。(這里提一下,監(jiān)聽連接和發(fā)起連接是兩個單獨(dú)的協(xié)程,所以并不是監(jiān)聽后再發(fā)起連接。)
2.初始化撥號狀態(tài),以及創(chuàng)建任務(wù)
dialstate 撥號狀態(tài)
dialstate是p2p/dial.go中的核心結(jié)構(gòu)體,管理撥號(發(fā)起TCP連接請求)和查找節(jié)點(diǎn)的操作。
通過newDialState來新建它。關(guān)于dialstate字段的含義在下方的注釋中。
func newDialState(static []*enode.Node, bootnodes []*enode.Node, ntab discoverTable, maxdyn int, netrestrict *netutil.Netlist) *dialstate {
s := &dialstate{
maxDynDials: maxdyn, // 最大的撥號任務(wù)數(shù)量
ntab: ntab, // k桶
netrestrict: netrestrict, // ip網(wǎng)絡(luò)的列表
static: make(map[enode.ID]*dialTask), // 靜態(tài)節(jié)點(diǎn)
dialing: make(map[enode.ID]connFlag), // 撥號中,connFlag有4種撥號類型
bootnodes: make([]*enode.Node, len(bootnodes)), // 初始引導(dǎo)節(jié)點(diǎn)
randomNodes: make([]*enode.Node, maxdyn/2), // 在k桶種隨機(jī)查找節(jié)點(diǎn),數(shù)量為最大撥號任務(wù)數(shù)量的二分之一
hist: new(dialHistory), // 記錄最近的撥號
}
// 加入初始引導(dǎo)節(jié)點(diǎn)
copy(s.bootnodes, bootnodes)
// 加入靜態(tài)節(jié)點(diǎn)
for _, n := range static {
s.addStatic(n)
}
return s
}
其中加入了兩種節(jié)點(diǎn),bootnodes和static。bootnodes是初始引導(dǎo)節(jié)點(diǎn),在節(jié)點(diǎn)沒有接收到任何節(jié)點(diǎn)的連接請求,也沒有節(jié)點(diǎn)可以給我們鄰居節(jié)點(diǎn)的時候,就去連接bootnodes,它硬編碼在了以太坊的源碼中。static是靜態(tài)節(jié)點(diǎn),如果我們想和某些節(jié)點(diǎn)保持長期的連接,就把它們加入到靜態(tài)節(jié)點(diǎn)的列表中。
newTasks 新建任務(wù)
新建任務(wù)就是將某一些遠(yuǎn)程節(jié)點(diǎn)打包成任務(wù),(一個任務(wù)對應(yīng)一個遠(yuǎn)程節(jié)點(diǎn)),最終返回一個任務(wù)列表。執(zhí)行任務(wù)就是給任務(wù)中的遠(yuǎn)程節(jié)點(diǎn)發(fā)起TCP連接請求。
以下是新建任務(wù)的流程圖:

- 1.設(shè)置最大的任務(wù)數(shù)量,這個是是由節(jié)點(diǎn)最大連接數(shù)除以撥號比率得出的,即
maxPeers/radio得到。needDynDials := s.maxDynDials - 2.判斷
peers里是否有已經(jīng)建立連接的節(jié)點(diǎn),peers是向本地節(jié)點(diǎn)發(fā)來連接請求的遠(yuǎn)程節(jié)點(diǎn)的集合。記錄數(shù)量,最大任務(wù)數(shù)量減去這個數(shù)。for _, p := range peers { if p.rw.is(dynDialedConn) { needDynDials-- } } - 3.判斷服務(wù)中是否有正在撥號的節(jié)點(diǎn)。記錄數(shù)量,最大任務(wù)數(shù)量減去這個數(shù)。
for _, flag := range s.dialing { if flag&dynDialedConn != 0 { needDynDials-- } } - 4.向設(shè)置的靜態(tài)節(jié)點(diǎn)
s.static發(fā)起連接請求,這個不消耗最大任務(wù)數(shù)量。 - 5.如果發(fā)來連接請求的遠(yuǎn)程節(jié)點(diǎn)集合
peers為空,并且經(jīng)過了設(shè)置的時間fallbackInterval20s,會隨機(jī)的連接一個引導(dǎo)節(jié)點(diǎn)bootnode。最大任務(wù)數(shù)量減1。if len(peers) == 0 && len(s.bootnodes) > 0 && needDynDials > 0 && now.Sub(s.start) > fallbackInterval { // 將第一個bootnode放在列表最后,使每一次取的bootnode都是不一樣的。 bootnode := s.bootnodes[0] s.bootnodes = append(s.bootnodes[:0], s.bootnodes[1:]...) s.bootnodes = append(s.bootnodes, bootnode) if addDial(dynDialedConn, bootnode) { needDynDials-- } } - 6.
ReadRandomNodes隨機(jī)的從UDP節(jié)點(diǎn)發(fā)現(xiàn)中使用Kad算法維護(hù)的k桶里,提取randomCandidates個節(jié)點(diǎn)。randomCandidates為最大任務(wù)數(shù)量的二分之一。(可能會提取不到這么多個,實(shí)際提取到的數(shù)量為n。)
最大任務(wù)數(shù)量減去n。randomCandidates := needDynDials / 2 if randomCandidates > 0 { n := s.ntab.ReadRandomNodes(s.randomNodes) for i := 0; i < randomCandidates && i < n; i++ { if addDial(dynDialedConn, s.randomNodes[i]) { needDynDials-- } } } - 7.如果還不滿足最大任務(wù)數(shù)量的話,從
s.lookupBuf里提取,直到達(dá)到最大任務(wù)數(shù)量。s.lookupBuf里的節(jié)點(diǎn)也是通過Kad獲取節(jié)點(diǎn)的方式獲取到的,如果s.lookupBuf里節(jié)點(diǎn)數(shù)量不夠,則創(chuàng)建發(fā)現(xiàn)任務(wù)discoverTask進(jìn)行節(jié)點(diǎn)發(fā)現(xiàn),填充s.lookupBuf。// 從lookupBuf里提取節(jié)點(diǎn)。 i := 0 for ; i < len(s.lookupBuf) && needDynDials > 0; i++ { if addDial(dynDialedConn, s.lookupBuf[i]) { needDynDials-- } } // 去掉被提取出來的節(jié)點(diǎn)。 s.lookupBuf = s.lookupBuf[:copy(s.lookupBuf, s.lookupBuf[i:])] // 數(shù)量不夠的話,進(jìn)行節(jié)點(diǎn)發(fā)現(xiàn)。 if len(s.lookupBuf) < needDynDials && !s.lookupRunning { s.lookupRunning = true newtasks = append(newtasks, &discoverTask{}) } - 8.如果沒有需要執(zhí)行的任務(wù),會執(zhí)行等待任務(wù)
waitExpireTask,也就是,保持撥號邏輯繼續(xù)運(yùn)行。if nRunning == 0 && len(newtasks) == 0 && s.hist.Len() > 0 { t := &waitExpireTask{s.hist.min().exp.Sub(now)} newtasks = append(newtasks, t) }
上述過程,即完成了一次任務(wù)的創(chuàng)建,結(jié)果是得到一個任務(wù)列表newtasks。
3.計(jì)劃任務(wù)和開啟任務(wù)
(server.go的run(dialstate dialer))
先來看一下Server服務(wù)中關(guān)于任務(wù)的計(jì)劃和執(zhí)行的流程圖:

- 1.首先是對字段進(jìn)行初始化,例如,發(fā)來請求連接的遠(yuǎn)程節(jié)點(diǎn)列表
peers,接入的連接數(shù)inboundCount。最主要的是定義了兩種任務(wù)列表,runningTasks運(yùn)行中的任務(wù)列表,和queuedTasks排隊(duì)等待中的任務(wù)列表。-
runningTasks是指執(zhí)行一個任務(wù)(即發(fā)起一個TCP連接請求),就將該任務(wù)加入到runningTasks列表中。完成任務(wù)后移除。 -
queuedTasks是指新建了任務(wù)后,將任務(wù)加入到queuedTasks列表中,queuedTasks中的任務(wù)被執(zhí)行時從queuedTasks中移除,加入到runningTasks中。
-
- 2.定義了三種對任務(wù)進(jìn)行的操作:
scheduleTasks計(jì)劃任務(wù),startTasks開始任務(wù),delTask刪除任務(wù)。-
delTask在runningTasks移除給定的單個任務(wù)。
delTask := func(t task) { // 循環(huán)查找到該任務(wù),然后移除。 for i := range runningTasks { if runningTasks[i] == t { runningTasks = append(runningTasks[:i], runningTasks[i+1:]...) break } } }-
startTasks如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到maxActiveDialTasks最大活躍的任務(wù)數(shù)量(默認(rèn)為16個),則開始執(zhí)行任務(wù)。每一個任務(wù)都是一個單獨(dú)的線程。任務(wù)的執(zhí)行通過調(diào)用t.Do(srv)進(jìn)行,任務(wù)完成后將任務(wù)傳入taskdone通道。
執(zhí)行中的任務(wù)加入runningTasks列表中。最終返回ts列表中未執(zhí)行的任務(wù)。
startTasks := func(ts []task) (rest []task) { i := 0 for ; len(runningTasks) < maxActiveDialTasks && i < len(ts); i++ { t := ts[i] srv.log.Trace("New dial task", "task", t) // 分配線程,開始執(zhí)行任務(wù)。 go func() { t.Do(srv); taskdone <- t }() // 運(yùn)行任務(wù)列表中加入該任務(wù)。 runningTasks = append(runningTasks, t) } return ts[i:] }-
scheduleTasks先執(zhí)行queuedTasks列表中的任務(wù),queuedTasks列表中未被執(zhí)行的任務(wù)將被保留。 - 然后如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到最大活躍的任務(wù)數(shù)量,則調(diào)用
dialstate.newTasks新建任務(wù),接著執(zhí)行剛剛新建任務(wù)。未被執(zhí)行的任務(wù)也加入到queuedTasks列表中,等待循環(huán)的下一次執(zhí)行。
scheduleTasks := func() { // 執(zhí)行queuedTasks列表中的任務(wù)。 queuedTasks = append(queuedTasks[:0], startTasks(queuedTasks)...) // 如果運(yùn)行中的任務(wù)數(shù)量沒有達(dá)到最大的撥號數(shù)量 if len(runningTasks) < maxActiveDialTasks { // 新建撥號任務(wù) nt := dialstate.newTasks(len(runningTasks)+len(queuedTasks), peers, time.Now()) // 先執(zhí)行新建的任務(wù),新建的任務(wù)中未被執(zhí)行的任務(wù)加入到queuedTasks列表中 queuedTasks = append(queuedTasks, startTasks(nt)...) } } -
- 3.主要的處理循環(huán):
- 開啟了計(jì)劃任務(wù)函數(shù),由于開始任務(wù)函數(shù)包含在計(jì)劃函數(shù)里,所以這里開始了新建任務(wù)以及并發(fā)的執(zhí)行任務(wù)。
running是運(yùn)行與否的標(biāo)志位。running: for { scheduleTasks() // 處理消息 ... } - 接下來是一個內(nèi)容很多的
select case結(jié)構(gòu),處理接收到的內(nèi)容。 - 比如說,
case n := <-srv.addtrusted:觸發(fā)后,將某個節(jié)點(diǎn)標(biāo)記為受信任的節(jié)點(diǎn)。case n := <-srv.removetrusted:觸發(fā)后,移除某個受信任的節(jié)點(diǎn)。 - 比較重要的是
case c := <-srv.addpeer:,需要新建遠(yuǎn)程節(jié)點(diǎn)的時候觸發(fā)。也是說這個case會在上一篇中介紹的節(jié)點(diǎn)協(xié)議握手成功之后,srv.addpeer的通道中加入與遠(yuǎn)程節(jié)點(diǎn)的連接的時候觸發(fā)。case c := <-srv.addpeer: // 協(xié)議握手檢查 err := srv.protoHandshakeChecks(peers, inboundCount, c) if err == nil { // 握手完成,通過所有檢查。 p := newPeer(c, srv.Protocols) ... // 執(zhí)行遠(yuǎn)程節(jié)點(diǎn)。 go srv.runPeer(p) // 加入連接請求的peers列表。 peers[c.node.ID()] = p // 接入節(jié)點(diǎn)數(shù)加1。 if p.Inbound() { inboundCount++ } } select { case c.cont <- err: case <-srv.quit: break running }
- 開啟了計(jì)劃任務(wù)函數(shù),由于開始任務(wù)函數(shù)包含在計(jì)劃函數(shù)里,所以這里開始了新建任務(wù)以及并發(fā)的執(zhí)行任務(wù)。
- 4.最后是循環(huán)完畢,關(guān)閉節(jié)點(diǎn)發(fā)現(xiàn),斷開與全部節(jié)點(diǎn)的連接,并清空
peers。
4.Do 執(zhí)行任務(wù)
(dial.go的Do(srv *Server))
上述startTasks開始任務(wù)中執(zhí)行任務(wù)的具體過程。
func (t *dialTask) Do(srv *Server) {
// 判斷節(jié)點(diǎn)是否完整,不完整的節(jié)點(diǎn)表示沒有ip地址。
if t.dest.Incomplete() {
// 解析,使用Kad的方法查找到該節(jié)點(diǎn)的ip地址。
if !t.resolve(srv) {
return
}
}
// 撥號
err := t.dial(srv, t.dest)
...
}
}
- 先判斷節(jié)點(diǎn)的完整性,不完整的話解析獲取該節(jié)點(diǎn)的ip地址。
- 然后進(jìn)行撥號。
func (t *dialTask) dial(srv *Server, dest *enode.Node) error {
// fd是一個連接
fd, err := srv.Dialer.Dial(dest)
...
return srv.SetupConn(mfd, t.flags, dest)
}
-
dial,撥號,調(diào)用了golang自帶的net.Dialer.Dial方法建立TCP連接,然后使用srv.SetupConn方法進(jìn)行加密握手和協(xié)議握手。 - 在上一節(jié)監(jiān)聽連接中,
srv.SetupConn的第三個傳入字段是nil,因?yàn)槲覀兪潜O(jiān)聽連接的,所以還無該節(jié)點(diǎn)公鑰。這一次是發(fā)起請求,所以我們知道該遠(yuǎn)程節(jié)點(diǎn)的公鑰,在加密握手之后,可以將我們擁有的公鑰與遠(yuǎn)程節(jié)點(diǎn)發(fā)來的公鑰進(jìn)行驗(yàn)證。
5.總結(jié)
- 1.節(jié)點(diǎn)發(fā)起TCP連接請求是通過創(chuàng)建任務(wù),執(zhí)行任務(wù)實(shí)現(xiàn)的,以任務(wù)的形式,可以更好的控制建立連接的數(shù)量,也方便并發(fā)的發(fā)起連接請求。
- 2.監(jiān)聽TCP連接和發(fā)起TCP請求相輔相成。監(jiān)聽連接負(fù)責(zé)接收遠(yuǎn)程節(jié)點(diǎn)的TCP連接,以及建立與遠(yuǎn)程節(jié)點(diǎn)的加密通道;發(fā)起請求負(fù)責(zé)向遠(yuǎn)程節(jié)點(diǎn)發(fā)送TCP連接請求,以及執(zhí)行建立了加密通道后的遠(yuǎn)程節(jié)點(diǎn)(的協(xié)議)。