p2p 模塊
p2p(peer to peer) 的責任是把節(jié)點帶入網(wǎng)絡(luò)、消息傳遞,驅(qū)動整個網(wǎng)絡(luò)正常運轉(zhuǎn)。
p2p 模塊主要分下面幾個組件
- Peer
- Connection
- AddrBook
- Switch
- Transport
- PexReactor
下面的 UML 圖列出了類中一些基本的屬性和方法,只是我認為比較重要的部分。接下來為大家依次講解各個組件的功能。

還有 P2P 模塊的基本調(diào)用關(guān)系

peer
peer 在 p2p 中表示一個對等體。也是由peer 和應(yīng)用程序其他模塊之間直接進行消息交互。 peer 實現(xiàn)了 Peer 接口,接下來看看peer 實現(xiàn)的方法。
peer 分兩種,一種是 inbound, 一種 outbound.
inbound 表示連接我的節(jié)點,outbound表示我對外連接的節(jié)點
peer 構(gòu)造函數(shù)接收了一個 peerConn 類型的對象作為屬性, peerConn 實現(xiàn)如下:
// peerConn contains the raw connection and its config
type peerConn struct {
outbound bool
persistent bool
conn net.Conn // source connection
socketAddr *NetAddress
// cached RemoteIP()
ip net.IP
}
peerConn 封裝了net.Conn, outbound 等字段,表示該對象繼有可能是個服務(wù)端連接,也有可能是客戶端連接。該對象可以直接用于 newOutboundPeerConn 和 newInboundPeerConn 。 這兩個函數(shù)都是在 Switch 組件中被調(diào)用的。
// Peer interface
FlushStop()
ID() ID // 節(jié)點的id
RemoteIP() net.IP // 節(jié)點的ip地址
RemoteAddr() net.Addr // 節(jié)點網(wǎng)絡(luò)地址 (tcp, ip: port)
IsOutbound() bool // 是否 outbound 節(jié)點
IsPersistent() bool // 是否是持久的節(jié)點(斷開后自動重連)
CloseConn() error // 斷開連接
NodeInfo() NodeInfo // 節(jié)點信息
Status() fconn.ConnectionStatus // 節(jié)點當前狀態(tài)
SocketAddr() *NetAddress // 節(jié)點的實際地址 (NetAddress)
Send(byte, []byte) bool // 發(fā)送消息(阻塞)(調(diào)用mconnection)
TrySend(byte, []byte) bool // 嘗試發(fā)送消息(不阻塞),當前情況發(fā)不出去直接返回false (調(diào)用mconnection)
// peer
func newPeer(pc peerConn, mConfig fconn.MConnConfig, nodeInfo NodeInfo, reactorByCh map[byte]Reactor, chDescs []*fconn.ChannelDescriptor, onPeerError func(Peer, interface{}), options ...PeerOption) *peer // 新建 peer 節(jié)點
func (p *peer) FlushStop() // 刷新現(xiàn)有的發(fā)送緩沖區(qū),并關(guān)閉連接(調(diào)用mconnection)
func (p *peer) CloseConn() error // 關(guān)閉連接
接下來詳細講解一些重要方法, Peer 中的重要信息不多,主要的實際流程由 MConnection 完成
- peer 的創(chuàng)建,peerConn 中攜帶了連接的一些附加屬性,比如是否持久,是否 outbound。
func newPeer(
pc peerConn,
mConfig fconn.MConnConfig,
nodeInfo NodeInfo,
reactorByCh map[byte]Reactor,
chDescs []*fconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
options ...PeerOption,
) *peer {
// peerConn 中攜帶了連接的一些附加屬性,比如是否持久,是否 outbound
p := &peer{
peerConn: pc,
nodeInfo: nodeInfo,
channels: nodeInfo.(DefaultNodeInfo).Channels,
Data: cmn.NewCMap(),
//metrics: NopMetrics(),
//metricsTicker: time.NewTicker(metricsTickerDuration),
}
// 創(chuàng)建 MConnection
p.mconn = createMConnection(
pc.conn,
p,
reactorByCh,
chDescs,
onPeerError,
mConfig,
)
p.BaseService = *cmn.NewBaseService(nil, "Peer", p)
for _, option := range options {
option(p)
}
return p
}
- peer 的啟動,主要就是 mConn 的啟動
func (p *peer) OnStart() error {
if err := p.BaseService.OnStart(); err != nil {
return err
}
if err := p.mconn.Start(); err != nil {
return err
}
return nil
}
- peer 發(fā)送 msg, 最終調(diào)用的 mConn 的發(fā)送
func (p *peer) Send(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.Send(chID, msgBytes)
return res
}
- 嘗試發(fā)送消息, 和 Send 類似
func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
if !p.IsRunning() {
return false
} else if !p.hasChannel(chID) {
return false
}
res := p.mconn.TrySend(chID, msgBytes)
return res
}
- peer 是否含有某個 channelID
func (p *peer) hasChannel(chID byte) bool {
for _, ch := range p.channels {
if ch == chID {
return true
}
}
...
return false
}
- peer 刷新(發(fā)送)并停止
func (p *peer) FlushStop() {
p.BaseService.OnStop()
p.mconn.FlushStop()
}
MConnection
MConnection才是封裝了raw connection 的對象,peer 發(fā)送和接收數(shù)據(jù),實際都由MConnection 來實現(xiàn)的
MConnection 方法如下:
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc,
onError errorCbFunc, config MConnConfig) *MConnection // 新建一個 MConncection
func (c *MConnection) OnStart() error // connection 啟動
func (c *MConnection) stopServices() (alreadyStopped bool) // conn 停止
func (c *MConnection) FlushStop() // 刷新現(xiàn)有的發(fā)送緩沖區(qū),并關(guān)閉連接(同peer)
func (c *MConnection) Send(chID byte, msgBytes[]byte) bool // 發(fā)送消息blocking
func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool // 嘗試發(fā)送消息no-blocking, 不成功立刻返回false
func (c *MConnection) CanSend(chID byte) bool // 是否可以發(fā)送
func (c *MConnection) sendSomePacketMsgs() bool // 發(fā)送多段數(shù)據(jù)包(數(shù)據(jù)量大的,拆分為多個數(shù)據(jù)包)
func (c *MConnection) sendPacketMsg() bool // 發(fā)送當個數(shù)據(jù)包
func (c *MConnection) recvRoutine() // 接收來自 inbound 節(jié)點發(fā)來的消息
func (c *MConnection) sendRoutine() // 發(fā)送消息到 outbound 節(jié)點
func (c *MConnection) stopForError(r interface{}) // 停止節(jié)點(有error)
func (c *MConnection) Status() ConnectionStatus // 當前conn 的狀態(tài)
下面展開 MConnection 的實現(xiàn)
-
MConnection的創(chuàng)建, 定義了一些回調(diào)函數(shù),處理收到消息,收到error 等。構(gòu)造函數(shù)中,會根據(jù)傳入的chDescs []*ChannelDescriptor創(chuàng)建[]Channel,保存為變量。
func createMConnection(
conn net.Conn,
p *peer,
reactorsByCh map[byte]Reactor,
chDescs []*fconn.ChannelDescriptor,
onPeerError func(Peer, interface{}),
config fconn.MConnConfig) *fconn.MConnection {
onReceive := func(chID byte, msgBytes []byte) {
reactor := reactorsByCh[chID]
if reactor == nil {
panic(fmt.Sprintf("Unknown channel %X", chID))
}
reactor.Receive(chID, p, msgBytes)
}
onError := func(r interface{}) {
onPeerError(p, r)
}
return fconn.NewMConnectionWithConfig(
conn,
chDescs,
onReceive,
onError,
config,
)
}
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc,
onError errorCbFunc, config MConnConfig) *MConnection {
if config.PongTimeout >= config.PingInterval {
panic("pongTimeout must be less than pingInterval (otherwise, next ping will reset pong timer)")
}
mconn := &MConnection{
conn: conn,
bufConnReader: bufio.NewReaderSize(conn, minReadBufferSize),
bufConnWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
sendMonitor: flow.New(0, 0),
recvMonitor: flow.New(0, 0),
send: make(chan struct{}, 1),
pong: make(chan struct{}, 1),
onReceive: onReceive,
onError: onError,
config: config,
created: time.Now(),
}
var channelsIdx = map[byte]*Channel{}
var channels = []*Channel{}
// 循環(huán)創(chuàng)建 Channel
for _, desc := range chDescs {
channel := newChannel(mconn, *desc)
channelsIdx[channel.desc.ID] = channel
channels = append(channels, channel)
}
mconn.channels = channels
mconn.channelsIdx = channelsIdx
mconn.BaseService = *cmn.NewBaseService(nil, "MConnection", mconn)
// 設(shè)置消息大小上限
mconn._maxPacketMsgSize = mconn.maxPacketMsgSize()
return mconn
}
- MConnection 的啟動方法,初始化一些定時器,啟動兩個 routine, 分別用于監(jiān)聽接收消息,和發(fā)送消息
func (c *MConnection) OnStart() error {
if err := c.BaseService.OnStart(); err != nil {
return err
}
c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle)
c.pingTimer = time.NewTicker(c.config.PingInterval)
c.pongTimeoutCh = make(chan bool, 1)
c.chStatsTimer = time.NewTicker(updateStats)
c.quitSendRoutine = make(chan struct{})
c.doneSendRoutine = make(chan struct{})
go c.sendRoutine() // 啟動發(fā)送routine
go c.recvRoutine() // 啟動接收routine
return nil
}
- 停止服務(wù), 停止定時器,關(guān)閉routine(OnStop 或 FlushStop 時調(diào)用)
func (c *MConnection) stopServices() (alreadyStopped bool) {
c.stopMtx.Lock()
defer c.stopMtx.Unlock()
select {
case <-c.quitSendRoutine:
return true
default:
}
c.BaseService.OnStop()
c.flushTimer.Stop()
c.pingTimer.Stop()
c.chStatsTimer.Stop()
close(c.quitSendRoutine)
return false
}
- 發(fā)送消息,這邊會把消息先存入對應(yīng)的channel 中,然后再釋放 send 信號
func (c *MConnection) Send(chID byte, msgBytes[]byte) bool {
if !c.IsRunning() {
return false
}
c.Logger.Debug("Send", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
// 獲取對應(yīng)的channel
channel, ok := c.channelsIdx[chID]
if !ok {
c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
return false
}
// 會把消息先存入對應(yīng)的channel
success := channel.sendBytes(msgBytes)
if success {
select {
// 釋放發(fā)送信號
case c.send <- struct{}{}:
default:
}
} else {
c.Logger.Debug("Send failed", "channel", chID, "conn", c, "msgBytes", fmt.Sprintf("%X", msgBytes))
}
return success
}
- TrySend 和 CanSend 邏輯也差不多
func (c *MConnection) TrySend(chID byte, msgBytes []byte) bool {
// Send message to channel.
channel, ok := c.channelsIdx[chID]
if !ok {
c.Logger.Error(fmt.Sprintf("Cannot send bytes, unknown channel %X", chID))
return false
}
ok = channel.trySendBytes(msgBytes)
if ok {
// Wake up sendRoutine if necessary
select {
case c.send <- struct{}{}:
default:
}
}
return ok
}
// CanSend returns true if you can send more data onto the chID, false
func (c *MConnection) CanSend(chID byte) bool {
if !c.IsRunning() {
return false
}
channel, ok := c.channelsIdx[chID]
if !ok {
c.Logger.Error(fmt.Sprintf("Unknown channel %X", chID))
return false
}
return channel.canSend()
}
- 回到我們的發(fā)送
routine. 這里處理了心跳,定時刷新緩存區(qū),超時等消息
func (c *MConnection) sendRoutine() {
defer c._recover()
FOR_LOOP:
for {
var _n int64
var err error
SELECTION:
select {
// 定時刷新
case <-c.flushTimer.Ch:
c.flush()
// 定時更新 channel 的狀態(tài)
case <-c.chStatsTimer.C:
for _, channel := range c.channels{
channel.updateStats()
}
// 發(fā)送 ping 消息
case <-c.pingTimer.C:
c.Logger.Debug("Send ping")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPing{})
if err != nil {
break SELECTION
}
c.Logger.Debug("Starting pong timer", "dur", c.config.PongTimeout)
c.pongTimer = time.AfterFunc(c.config.PongTimeout, func() {
select {
case c.pongTimeoutCh<-true:
default:
// never block
}
})
c.flush()
// 發(fā)送ping后,接收pong超時
case timeout := <- c.pongTimeoutCh:
if timeout {
c.Logger.Debug("pong timeout")
err = errors.New("pong timeout")
} else {
c.stopPongTimer()
}
// 接收到 ping, 發(fā)送pong 消息
case <-c.pong:
c.Logger.Debug("Send pong")
_n, err = cdc.MarshalBinaryLengthPrefixedWriter(c.bufConnWriter, PacketPong{})
if err != nil {
break SELECTION
}
c.sendMonitor.Update(int(_n))
c.flush()
// 退出 routine
case <-c.quitSendRoutine:
break FOR_LOOP
// 發(fā)送消息,直到一條完整消息沒有發(fā)完
case <-c.send:
eof := c.sendSomePacketMsgs()
if !eof {
select {
case c.send <- struct{}{}:
default:
}
}
}
....
}
c.stopPongTimer()
close(c.doneSendRoutine)
}
- 再看看接收的
routine
func (c *MConnection) recvRoutine() {
defer c._recover()
FOR_LOOP:
for{
var packet Packet
var _n int64
var err error
_n, err = cdc.UnmarshalBinaryLengthPrefixedReader(c.bufConnReader, &packet, int64(c._maxPacketMsgSize))
if err != nil {
if c.IsRunning() {
c.Logger.Error("Connection failed @ recvRoutine (reading byte)", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
switch pkt := packet.(type) {
// 接收到 ping ,觸發(fā) pong 消息
case PacketPing:
c.Logger.Debug("Recevie Ping")
select {
case c.pong <- struct{}{}:
default:
// never block
}
// 接收到 pong ,標示心跳未超時
case PacketPong:
c.Logger.Debug("Receive Pong")
select {
case c.pongTimeoutCh <- false:
default:
// never block
}
// 接收到普通 消息
case PacketMsg:
// 獲取對應(yīng)的 channel
channel, ok := c.channelsIdx[pkt.ChannelID]
if !ok || channel == nil {
err = fmt.Errorf("Unknown channel %X", pkt.ChannelID)
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
msgBytes, err := channel.recvPacketMsg(pkt)
if err != nil {
if c.IsRunning() {
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
}
break FOR_LOOP
}
if msgBytes != nil {
c.Logger.Debug("Receive bytes","chID", pkt.ChannelID, "msgBytes", fmt.Sprintf("%X", msgBytes))
// 將消息轉(zhuǎn)發(fā)給對應(yīng)的 reactor 處理
c.onReceive(pkt.ChannelID, msgBytes)
}
default:
err := fmt.Errorf("Unknown message type %v", reflect.TypeOf(packet))
c.Logger.Error("Connection failed @ recvRoutine", "conn", c, "err", err)
c.stopForError(err)
break FOR_LOOP
}
}
}
- 具體的發(fā)送邏輯, 只要一個完整交易發(fā)送成功,即返回
true
func (c *MConnection) sendSomePacketMsgs() bool {
for i := 0; i < numBatchPacketMsgs; i++ {
if c.sendPacketMsg() {
return true
}
}
return false
}
func (c *MConnection) sendPacketMsg() bool {
var leastRatio float32 = math.MaxFloat32
var leastChannel *Channel
// 循環(huán),獲取優(yōu)先級最高的channel
for _, channel := range c.channels {
if !channel.isSendPending() {
continue
}
ratio := float32(channel.recentlySent) / float32(channel.desc.Priority)
if ratio < leastRatio {
leastRatio = ratio
leastChannel = channel
}
}
// 沒有可發(fā)送的數(shù)據(jù),返回 true
if leastChannel == nil {
return true
}
// 將數(shù)據(jù)寫到 寫緩存區(qū)
_n, err := leastChannel.writePacketMsgTo(c.bufConnWriter)
if err != nil {
c.Logger.Error("Failed to write PacketMsg", "err", err)
c.stopForError(err)
return true
}
// 刷新 緩沖區(qū)
c.flushTimer.Set()
return false
}
Channel
channel 里主要的就是存取數(shù)據(jù)。 channel 中定義了一個發(fā)送策略。 ratio := float32(channel.recentlySent) / float32(channel.desc.Priority), ratio 值越小的,越有可能被發(fā)送。 也就是 channel 的優(yōu)先值(Priority)越大,且 recentlySent 越小,recentlySent 這個值會被一個定時器稀釋,也就是說被加到 channel 中,時間越久的,優(yōu)先級越高的數(shù)據(jù) 會被最先發(fā)送。
- 將消息發(fā)送(存放)到
channel
// noblocking queue message to send to this channel
func (ch *Channel) trySendBytes(bytes []byte) bool {
select {
case ch.sendQueue <- bytes:
atomic.AddInt32(&ch.sendQueueSize, 1)
return true
default:
return false
}
}
- 判斷當前是否某條消息發(fā)送了一半,處于
pending狀態(tài)
func (ch *Channel) isSendPending() bool {
if len(ch.sending) == 0 {
if len(ch.sendQueue) == 0 {
return false
}
// 如果可以放入數(shù)據(jù)到 sending,返回true
ch.sending = <-ch.sendQueue
}
// sending 中有數(shù)據(jù)
return true
}
-
mConnection從channel中取消息,EOF標識該消息是否取完;未取完,則將下一段放入sending中
func (ch *Channel) nextPacketMsg() PacketMsg {
packet := PacketMsg{}
packet.ChannelID = byte(ch.desc.ID)
maxSize := ch.maxPacketMsgPayloadSize
packet.Bytes = ch.sending[:cmn.MinInt(maxSize, len(ch.sending))]
if len(ch.sending) <= maxSize {
packet.EOF = byte(0x01)
ch.sending = nil
atomic.AddInt32(&ch.sendQueueSize, -1)
} else {
packet.EOF = byte(0x00)
ch.sending = ch.sending[cmn.MinInt(maxSize, len(ch.sending)):]
}
return packet
}
- 當
mConnection接收到消息的時候, 也會傳給channel處理,直到接收到完整的(EOF=1)消息,才會返回
func (ch *Channel) recvPacketMsg(packet PacketMsg) ([]byte, error) {
ch.Logger.Debug("Read PacketMsg", "conn", ch.conn, "packet", packet)
var recvCap, recvReceived = ch.desc.RecvMessageCapacity, len(ch.recving) + len(packet.Bytes)
if recvCap < recvReceived {
return nil, fmt.Errorf("Received message exceeds avaliable capacity: %v < %v", recvCap, recvReceived)
}
// 將數(shù)據(jù)放入 receving 隊列
ch.recving = append(ch.recving, packet.Bytes...)
if packet.EOF == byte(0x01) {
// 返回完整數(shù)據(jù)
msgBytes := ch.recving
// clear the slice without re-allocating
ch.recving = ch.recving[:0]
return msgBytes, nil
}
return nil, nil
}
Switch
switch
func NewSwitch( cfg *config.P2PConfig, transport Transport, options ...SwitchOption) *Switch // 新建 Switch
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor // 添加Reactor,并設(shè)置每個 reactor 的 switch
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool // 廣播消息
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) // 不同類型的peer 數(shù)量
func (sw *Switch) StopPeerForError(peer Peer, reason interface{}) // 停止peer(error)
func (sw *Switch) StopPeerGracefully(peer Peer) // 正常停止peer
func (sw *Switch) reconnectToPeer(addr *NetAddress) // 重新連接該地址的節(jié)點
func (sw *Switch) MarkPeerAsGood(peer Peer) // 將該節(jié)點(地址)標志為good
func (sw *Switch) DialPeersAsync(peers []string) error // 異步連接一些節(jié)點
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) error // 連接一個節(jié)點
func (sw *Switch) AddPersistentPeers(addrs []string) error // 添加持久的 peer 信息
func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, cfg *config.P2PConfig) error // 添加 outbound 節(jié)點
func (sw *Switch) addPeer(p Peer) error // 添加peer
func (sw *Switch) acceptRoutine() // 接收其他節(jié)點的連接
具體實現(xiàn)如下
Switch 里保存了所有的chDesc channel信息, reactors, peers。
type Switch struct {
cmn.BaseService
config *config.P2PConfig
reactors map[string]Reactor
chDesc []*fconn.ChannelDescriptor
reactorByCh map[byte]Reactor
peers *PeerSet
dialing *cmn.CMap
reconnecting *cmn.CMap
nodeInfo NodeInfo
nodeKey *NodeKey
addrBook AddrBook
persistentPeersAddrs []*NetAddress
transport Transport
...
}
- 先看啟動函數(shù), 啟動了一個
routine,處理inbound節(jié)點的連接
func (sw *Switch) OnStart() error {
for _, reactor := range sw.reactors {
err := reactor.Start()
if err != nil {
return cmn.ErrorWrap(err, "failed to start %v", reactor)
}
}
go sw.acceptRoutine()
return nil
}
-
inbound節(jié)點連接處理流程
func (sw *Switch) acceptRoutine() {
for {
p, err := sw.transport.Accept(peerConfig{
chDesc: sw.chDesc,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorByCh,
isPersistent: sw.isPeerPersistentFn(),
//metrics: sw.metrics,
})
if err != nil {
switch err := err.(type) {
case ErrRejected:
....
break
}
_, in, _ := sw.NumPeers()
// inbound 節(jié)點數(shù)量未達到上限
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", p.SocketAddr(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
sw.transport.Cleanup(p)
continue
}
// 這里添加的是 inbound peer
if err := sw.addPeer(p); err != nil {
sw.transport.Cleanup(p)
if p.IsRunning(){
_ = p.Stop()
}
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
}
- 再看看
addPeer
func (sw *Switch) addPeer(p Peer) error {
// 通過外部注入的函數(shù)篩選
if err := sw.filterPeer(p); err != nil {
return err
}
p.SetLogger(sw.Logger.With("peer", p.SocketAddr()))
if !sw.IsRunning() {
sw.Logger.Error("Won't start a peer - switch is not running", "peer", p)
return nil
}
// 利用 peer 預(yù)處理所有的 reactor
for _, reactor := range sw.reactors {
p = reactor.InitPeer(p)
}
// 啟動節(jié)點
err := p.Start()
if err != nil {
sw.Logger.Error("Error starting peer", "err", err, "peer", p)
return err
}
if err := sw.peers.Add(p); err != nil {
return err
}
// 將peer 添加到每個reactor
for _, reactor := range sw.reactors {
reactor.AddPeer(p)
}
sw.Logger.Info("Added Peer", "peer", p)
return nil
}
- 添加
outbound節(jié)點
func (sw *Switch) addOutboundPeerWithConfig(
addr *NetAddress,
cfg *config.P2PConfig,
) error {
sw.Logger.Info("Dialing peer", "address", addr)
// 模擬連接失敗,重新連接
if cfg.TestDialFail {
go sw.reconnectToPeer(addr)
return fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDesc: sw.chDesc,
onPeerError: sw.StopPeerForError,
isPersistent: sw.isPeerPersistentFn(),
reactorsByCh: sw.reactorByCh,
//metrics: sw.metrics,
})
if err != nil {
switch e := err.(type) {
case ErrRejected:
if e.IsSelf() {
sw.addrBook.RemoveAddress(addr)
sw.addrBook.AddOurAddress(addr)
return err
}
}
if sw.isPeerPersistentFn()(addr) {
go sw.reconnectToPeer(addr)
}
return err
}
// 這里調(diào)用 addPeer,添加的 outbound peer
if err := sw.addPeer(p); err != nil {
sw.transport.Cleanup(p)
if p.IsRunning() {
_ = p.Stop()
}
return err
}
return nil
}
- 添加一下持久的地址, 連接的時候,若該地址屬于持久型的,失敗后重新嘗試連接(多次)
func (sw *Switch) AddPersistentPeers(addrs []string) error {
sw.Logger.Info("Adding persistent peers", "addrs", addrs)
netAddrs, errs := NewNetAddressStrings(addrs)
for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err)
}
// return first non-ErrNetAddressLookup error
for _, err := range errs {
if _, ok := err.(ErrNetAddressLookup); ok {
continue
}
return err
}
sw.persistentPeersAddrs = netAddrs
return nil
}
- 重新連接方法
func (sw *Switch) reconnectToPeer(addr *NetAddress) {
if sw.reconnecting.Has(string(addr.ID)) {
return
}
sw.reconnecting.Set(string(addr.ID), addr)
defer sw.reconnecting.Delete(string(addr.ID))
start := time.Now()
sw.Logger.Info("Reconnecting to peer", "addr", addr)
// 最大重試 reconnectAttempts 次
for i := 0; i < reconnectAttempts; i++ {
if !sw.IsRunning() {
return
}
err := sw.DialPeerWithAddress(addr)
if err == nil {
return
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
return
}
sw.Logger.Info("Error reconnecting to peer. Try again", "tries", i, "err", err, "addr", addr)
sw.randomSleep(reconnectInterval)
continue
}
sw.Logger.Error("Failed to reconnect to peer. Beginning exponential backoff", "addr", addr, "elapsed", time.Since(start))
// 上述多次嘗試失敗,再次嘗試,時間間隔 指數(shù)級增長
for i := 0; i < reconnectBackOffAttempts; i++ {
if !sw.IsRunning() {
return
}
sleepIntervalSeconds := math.Pow(reconnectBackOffBaseSeconds, float64(i))
sw.randomSleep(time.Duration(sleepIntervalSeconds) * time.Second)
err := sw.DialPeerWithAddress(addr)
if err != nil {
return
} else if _, ok := err.(ErrCurrentlyDialingOrExistingAddress); ok {
return
}
sw.Logger.Info("Error reconnectiing to peer. Try again", "tries", i, "err", err, "addr", addr)
}
sw.Logger.Error("Failed to reconnect to peer. Giving up", "addr", addr, "elapsed", time.Since(start))
}
- 向節(jié)點發(fā)起連接
func (sw *Switch) DialPeersAsync(peers []string) error {
netAddrs, errs := NewNetAddressStrings(peers)
for _, err := range errs {
sw.Logger.Error("Error in peer's address", "err", err)
}
for _, err := range errs {
if _, ok := err.(ErrNetAddressLookup); ok {
continue
}
return err
}
sw.dialPeersAsync(netAddrs)
return nil
}
func (sw *Switch) dialPeersAsync(netAddrs []*NetAddress) {
ourAddr := sw.NetAddress()
if sw.addrBook != nil {
for _, netAddr := range netAddrs {
// 過濾我們自己的節(jié)點
if !netAddr.Same(ourAddr) {
// 添加到地址簿
if err := sw.addrBook.AddAddress(netAddr, ourAddr); err != nil {
if isPrivateAddr(err) {
sw.Logger.Debug("Won't add peer's address to addrbook", "err", err)
} else {
sw.Logger.Debug("cann't add peer's address to addrbook", "err", err)
}
}
}
}
sw.addrBook.Save()
}
perm := sw.rng.Perm(len(netAddrs))
// 講地址順序打亂,依次發(fā)起連接
for i := 0; i < len(perm); i++ {
go func(i int) {
j := perm[i]
addr := netAddrs[j]
if addr.Same(ourAddr) {
sw.Logger.Debug("Ignore attempt to connect to ourselves", "addr", addr, "ourAddr", ourAddr)
return
}
sw.randomSleep(0)
err := sw.DialPeerWithAddress(addr)
if err != nil {
switch err.(type) {
case ErrSwitchConnectToSelf, ErrSwitchDuplicatePeerID, ErrCurrentlyDialingOrExistingAddress:
sw.Logger.Debug("Error dialing peer", "err", err)
default:
sw.Logger.Error("Error dialing peer", "err", err)
}
}
}(i)
}
}
- 還有一個重要的方法
broadcast, 廣播一條消息,會對所有的peer發(fā)送該消息
func (sw *Switch) Broadcast(chID byte, msgBytes []byte) chan bool {
sw.Logger.Debug("Broadcast", "channel", chID, "msgBytes", fmt.Sprintf("%X", msgBytes))
peers := sw.peers.List()
var wg sync.WaitGroup
wg.Add(len(peers))
successChan := make(chan bool, len(peers))
for _, peer := range peers {
go func(p Peer) {
defer wg.Done()
success := p.Send(chID, msgBytes)
successChan <- success
}(peer)
}
go func() {
wg.Wait()
close(successChan)
}()
return successChan
}
AddrBook
func (a *addrBook) AddOurAddress(addr *p2p.NetAddress) // 添加自己的地址
func (a *addrBook) AddPrivateIDs(IDs []string) // 添加私有地址
func (a *addrBook) AddAddress(addr *p2p.NetAddress, src *p2p.NetAddress) // 添加網(wǎng)絡(luò)中其他地址 (outbound 地址)
func (a *addrBook) NeedMoreAddrs() bool // 是否需要更多地址,當?shù)刂凡纠锏牡刂窋?shù)量小于某個閥值,返回true
func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress // 從地址簿挑選一個地址,biasTowardsNewAddrs 表示為從 newBucket 中挑選的概率
func (a *addrBook) MarkGood(id p2p.ID) // 將一個地址從newBucket 移動到 oldBucket
func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) // 嘗試連接一個節(jié)點,該節(jié)點的連接次數(shù) + 1
func (a *addrBook) GetSelection() []*p2p.NetAddress // 隨機獲取一批節(jié)點 (new & old)
func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress // 隨機獲取一批節(jié)點, biasTowardsNewAddrs 為 new 地址的比例
func (a *addrBook) saveRoutine() // 一個常駐的 routine,間隔的將內(nèi)存中的地址簿保存到磁盤
func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) // 添加到 newBucket 中
func (a *addrBook) addToOldBucket(ka *knownAddress, bucketIdx int) bool // 添加到 oldBucket 中,如果滿了,返回false
func (a *addrBook) removeFromBucket(ka *knownAddress, bucketType byte, bucketIdx int) // 從buckut中移除, ka 屬性中有 bucketType
func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress // 獲取 bucket 中時間最長的 地址
func (a *addrBook) randomPickAddresses(bucketType byte, num int) []*p2p.NetAddress // 從bucketType中, 隨機獲取 num 個地址
下面來看一下主要流程的實現(xiàn)
- 從地址簿獲取一個地址,
biasTowardsNewAddrs為獲取newbucket中地址的概率
func (a *addrBook) PickAddress(biasTowardsNewAddrs int) *p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
bookSize := a.size()
if bookSize <= 0 {
if bookSize < 0 {
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
}
return nil
}
if biasTowardsNewAddrs > 100 {
biasTowardsNewAddrs = 100
}
if biasTowardsNewAddrs < 0 {
biasTowardsNewAddrs = 0
}
// Bias between new and old addresses.
oldCorrelation := math.Sqrt(float64(a.nOld)) * (100.0 - float64(biasTowardsNewAddrs))
newCorrelation := math.Sqrt(float64(a.nNew)) * float64(biasTowardsNewAddrs)
// pick a random peer from a random bucket
var bucket map[string]*knownAddress
pickFromOldBucket := (newCorrelation+oldCorrelation) * a.rand.Float64() < oldCorrelation
if (pickFromOldBucket && a.nOld == 0) ||
(!pickFromOldBucket && a.nNew == 0) {
return nil
}
// loop until we pick a random non-empty bucket
for len(bucket) == 0 {
if pickFromOldBucket {
bucket = a.bucketsOld[a.rand.Intn(len(a.bucketsOld))]
} else {
bucket = a.bucketsNew[a.rand.Intn(len(a.bucketsNew))]
}
}
// 從bucket 中獲取隨機 index 的地址
randIndex := a.rand.Intn(len(bucket))
for _, ka := range bucket {
if randIndex == 0 {
return ka.Addr
}
randIndex--
}
return nil
}
- 將地址標志為
Good
// MarkGood implements AddrBook - 標志節(jié)點為 good ,并移動到 old bucket
func (a *addrBook) MarkGood(id p2p.ID) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[id]
if ka == nil {
return
}
ka.markGood()
if ka.isNew() {
a.moveToOld(ka)
}
}
- 標志節(jié)點為嘗試狀態(tài),嘗試次數(shù) +1
// MarkAttempt implements AddrBook
// 標志著嘗試去連接這個地址
func (a *addrBook) MarkAttempt(addr *p2p.NetAddress) {
a.mtx.Lock()
defer a.mtx.Unlock()
ka := a.addrLookup[addr.ID]
if ka == nil {
return
}
ka.markAttempt()
}
- 一旦一個地主被標志為
bad, 則從地址簿中移除
// MarkBad implements AddrBook.
// 移除該地址.
func (a *addrBook) MarkBad(addr *p2p.NetAddress) {
a.RemoveAddress(addr)
}
- 隨機獲取一批地址,具體數(shù)量由程序中計算。在
newBucket和oldBucket中隨機挑選
// GetSelection implements AddrBook.
// It randomly selects some addresses (old & new). Suitable for peer-exchange protocols.
// Must never return a nil address.
func (a *addrBook) GetSelection() []*p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
bookSize := a.size()
if bookSize <= 0 {
if bookSize < 0 {
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
}
return nil
}
// 地址數(shù)量的計算方式
numAddresses := cmn.MaxInt(
cmn.MinInt(minGetSelection, bookSize),
bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
allAddr := make([]*p2p.NetAddress, bookSize)
i := 0
for _, ka := range a.addrLookup {
allAddr[i] = ka.Addr
i++
}
// Fisher-Yates shuffle the array. We only need to do the first
// `numAddresses' since we are throwing the rest.
for i := 0; i < numAddresses; i++ {
// pick a number between current index and the end
j := cmn.RandIntn(len(allAddr)-i) + i
allAddr[i], allAddr[j] = allAddr[j], allAddr[i]
}
return allAddr[:numAddresses]
}
- 隨機挑選,
biasTowardsNewAddrs為新節(jié)點的比例,與上述方法效果類似。
func (a *addrBook) GetSelectionWithBias(biasTowardsNewAddrs int) []*p2p.NetAddress {
a.mtx.Lock()
defer a.mtx.Unlock()
bookSize := a.size()
if bookSize <= 0 {
if bookSize < 0 {
panic(fmt.Sprintf("Addrbook size %d (new: %d + old: %d) is less than 0", a.nNew+a.nOld, a.nNew, a.nOld))
}
return nil
}
if biasTowardsNewAddrs > 100 {
biasTowardsNewAddrs = 100
}
if biasTowardsNewAddrs < 0 {
biasTowardsNewAddrs = 0
}
numAddresses := cmn.MaxInt(
cmn.MinInt(minGetSelection, bookSize),
bookSize*getSelectionPercent/100)
numAddresses = cmn.MinInt(maxGetSelection, numAddresses)
// number of new addresses that, if possible, should be in the beginning of the selection
// if there are no enough old addrs, will choose new addr instead.
numRequiredNewAdd := cmn.MaxInt(percentageOfNum(biasTowardsNewAddrs, numAddresses), numAddresses-a.nOld)
selection := a.randomPickAddresses(bucketTypeNew, numRequiredNewAdd)
selection = append(selection, a.randomPickAddresses(bucketTypeOld, numAddresses-len(selection))...)
return selection
}
- 持久化
routine, 定時將內(nèi)存中地址保存到磁盤
func (a *addrBook) saveRoutine() {
defer a.wg.Done()
saveFileTicker := time.NewTicker(dumpAddressInterval)
out:
for {
select {
case <-saveFileTicker.C:
a.saveToFile(a.filePath)
case <-a.Quit():
break out
}
}
saveFileTicker.Stop()
a.saveToFile(a.filePath)
}
- 獲取
bucket中 最先加入的地址
// 對應(yīng)bucketType 中,最久的地址
func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress {
bucket := a.getBucket(bucketType, bucketIdx)
var oldest *knownAddress
for _, ka := range bucket {
if oldest == nil || ka.LastAttempt.Before(oldest.LastAttempt) {
oldest = ka
}
}
return oldest
}
- 淘汰
Bucket中的地址;newBucket中如果嘗試連接次數(shù)很多也沒有成功的,oldBucket中時間最久的。都要被淘汰掉
func (a *addrBook) expireNew(bucketIdx int) {
for addrStr, ka := range a.bucketsNew[bucketIdx] {
// If an entry is bad, throw it away
if ka.isBad() {
a.Logger.Info(fmt.Sprintf("expiring bad address %v", addrStr))
a.removeFromBucket(ka, bucketTypeNew, bucketIdx)
return
}
}
// If we haven't thrown out a bad entry, throw out the oldest entry
oldest := a.pickOldest(bucketTypeNew, bucketIdx)
a.removeFromBucket(oldest, bucketTypeNew, bucketIdx)
}
Transport
Transport 主要是向?qū)Ψ桨l(fā)起 Dial 連接,或者 Accept 對方的連接請求的。
如果 Dial 成功,則添加一個 outbound peer, 如果 Accept 成功,則添加一個 inbound peer
- 我們先看 Accept 方法, 監(jiān)聽
<-mt.acceptc, 一旦有conn接入, 調(diào)用wrapPeer()封裝一個inbound peer, 返回給Switch
func (mt *MultiplexTransport) Accept(cfg peerConfig) (Peer, error) {
select {
case a := <-mt.acceptc:
if a.err != nil {
return nil, a.err
}
cfg.outbound = false
return mt.wrapPeer(a.conn, a.nodeInfo, cfg, a.netAddr), nil
case <-mt.closec:
return nil, ErrTransportClosed{}
}
}
- 監(jiān)聽的 routine, 在 switch 創(chuàng)建的時候就啟動了
// Listen implements transportLifecycle.
func (mt *MultiplexTransport) Listen(addr NetAddress) error {
ln, err := net.Listen("tcp", addr.DialString())
if err != nil {
return err
}
mt.netAddr = addr
mt.listener = ln
go mt.acceptPeers()
return nil
}
func (mt *MultiplexTransport) acceptPeers() {
for {
c, err := mt.listener.Accept()
if err != nil {
select {
case _, ok := <-mt.closec:
if !ok {
return
}
default:
}
mt.acceptc <- accept{err: err}
return
}
go func(c net.Conn) {
var (
nodeInfo NodeInfo
secretConn net.Conn
netAddr *NetAddress
)
err := mt.filterConn(c)
if err == nil {
//secretConn, nodeInfo, err := mt.upgrade(c, nil)
secretConn = c
// 握手過程,拿到對方的 nodeInfo
nodeInfo, err = handshake(secretConn, mt.handshakeTimeout, mt.nodeInfo)
if err == nil {
addr := secretConn.RemoteAddr()
id := PubKeyToID(mt.nodeKey.PubKey())
netAddr = NewNetAddress(id, addr)
}
}
select {
// 封裝一個 accept,轉(zhuǎn)發(fā)個 Accept 函數(shù)
case mt.acceptc <- accept{netAddr, secretConn, nodeInfo, err}:
case <-mt.closec:
_ = c.Close()
return
}
}(c)
}
}
- 在看看 Dial 方法, 返回 peer 給 Switch
func (mt *MultiplexTransport) Dial(
addr NetAddress,
cfg peerConfig,
) (Peer, error) {
// 調(diào)用 netAddress 的 DialTimeout()
c, err := addr.DialTimeout(mt.dialTimeout)
if err != nil {
return nil, err
}
// upgrade 函數(shù)中調(diào)用了 handshake 函數(shù),拿到對方的 nodeInfo
nodeInfo, err := mt.upgrade(c, &addr)
if err != nil {
return nil, err
}
cfg.outbound = true
// 新建一個 outbound 節(jié)點
p := mt.wrapPeer(c, nodeInfo, cfg, &addr)
return p, nil
}
// DialTimeout calls net.DialTimeout on the address.
func (na *NetAddress) DialTimeout(timeout time.Duration) (net.Conn, error) {
conn, err := net.DialTimeout("tcp", na.DialString(), timeout)
if err != nil {
return nil, err
}
return conn, nil
}
-
handshake函數(shù)也是很重要的。handshake啟動兩個 routine,必須兩個routine 都走完,才算握手成功。handshake方法被兩處調(diào)用, 即Dial,Accept。 所以當兩個節(jié)點連接上, handshake 即可完成。
func handshake(c net.Conn, timeout time.Duration, nodeInfo NodeInfo ) (NodeInfo, error) {
if err := c.SetDeadline(time.Now().Add(timeout)); err != nil {
return nil, err
}
var (
errc = make(chan error, 2)
peerNodeInfo DefaultNodeInfo
ourNodeInfo = nodeInfo.(DefaultNodeInfo)
)
go func(errc chan<- error, c net.Conn) {
// 把本地peer 的nodeinfo 寫入到 writer
_, err := cdc.MarshalBinaryLengthPrefixedWriter(c, ourNodeInfo)
errc <- err
}(errc, c)
go func(errc chan<- error, c net.Conn) {
// 從 reader 中讀入 對方寫入的 nodeInfo
_, err := cdc.UnmarshalBinaryLengthPrefixedReader(
c,
&peerNodeInfo,
int64(MaxNodeInfoSize()),
)
errc <- err
}(errc, c)
for i := 0; i < cap(errc); i++ {
err := <-errc
if err != nil {
return nil, err
}
}
// 返回對方的 nodeInfo
return peerNodeInfo, c.SetDeadline(time.Time{})
}
PEXReactor
PEXReactor 主要功能就是進行節(jié)點發(fā)現(xiàn),這是p2p網(wǎng)絡(luò)中重要的一環(huán)。PEXReactor 也是Reactor 的一個具體實現(xiàn)。
func (r *PEXReactor) AddPeer(p Peer) // 添加節(jié)點,添加到地址簿中
func (r *PEXReactor) RemovePeer(p Peer, reason interface{}) // 移除節(jié)點
func (r *PEXReactor) GetChannels() []*conn.ChannelDescriptor // 獲取該reactor 的channel 信息
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) // 收到peer發(fā)來的消息
func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor // 新建一個 reactor
func (r *PEXReactor) receiveRequest(src Peer) error // 處理收到的信息,判斷收到的消息(向本節(jié)點請求更多地址)是否有效
func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*p2p.NetAddress) // 給 peer 發(fā)送一些節(jié)點
func (r *PEXReactor) ReceiveAddrs(addrs []*p2p.NetAddress, src Peer) error // 收到 peer 發(fā)來的一些節(jié)點
func (r *PEXReactor) ensurePeersRoutine() // 新開一個routine,不斷的從地址簿取出地址,確保一定數(shù)量的連接
func (r *PEXReactor) ensurePeers() // 被 ensurePeersRoutine 調(diào)用
func (r *PEXReactor) AttemptsToDial(addr *p2p.NetAddress) int // 獲取該地址的 嘗試連接 次數(shù)
func (r *PEXReactor) crawlPeersRoutine() // seedMode 下啟用,抓取更多peer地址
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error // 連接該地址
func (r *PEXReactor) dialSeeds() // 如果有種子節(jié)點,連接種子節(jié)點
func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) // 向這些地址請求更多的地址
func (r *PEXReactor) attemptDisconnects() // 超過 SeedDisconnectWaitPerild 時長的節(jié)點,斷開連接
- 首先還是先看服務(wù)啟動函數(shù)
func (r *PEXReactor) OnStart() error {
// 先啟動地址簿
err := r.book.Start()
if err != nil && err != cmn.ErrAlreadyStarted {
return err
}
numOnline, seedAddrs, err := r.checkSeeds()
if err != nil {
return err
} else if numOnline == 0 && r.book.Empty() {
return errors.New("Address book is empty and cann't resolve any seed nodes")
}
r.seedAddrs = seedAddrs
if r.config.SeedMode {
// 如果是 seedMode ,像其他節(jié)點請求更多地址
go r.crawlPeersRoutine()
} else {
// 從地址簿獲取節(jié)點通信,不請求更多節(jié)點
go r.ensurePeersRoutine()
}
return nil
}
-
channel獲取;PEXReactor中定義的ChannelDescriptor(channel 描述), 用于switch收發(fā)消息后的轉(zhuǎn)發(fā)標識。
// GetChannels implements Reactor
func (r *PEXReactor) GetChannels() []*conn.ChannelDescriptor {
return []*conn.ChannelDescriptor{
{
ID: PexChannel,
Priority: 1,
SendQueueCapacity: 10,
},
}
}
- 確保一定數(shù)量的節(jié)點在線 (如果地址簿有足夠多的數(shù)量)
// 定時調(diào)用 r.ensurePeers
func (r *PEXReactor) ensurePeersRoutine() {
var (
seed = cmn.NewRand()
)
r.ensurePeers()
ticker := time.NewTicker(r.ensurePeersPeriod)
for {
select {
case <-ticker.C:
r.ensurePeers()
case <-r.Quit():
ticker.Stop()
return
}
}
}
func (r *PEXReactor) ensurePeers() {
var (
out, in, dial = r.Switch.NumPeers()
// 剩余可連接數(shù)量
numToDial = r.Switch.MaxNumOutboundPeers() - (out + dial)
)
if numToDial <= 0 {
return
}
// [10, 90]
newBias := cmn.MinInt(out, 8) * 10 + 10
toDial := make(map[p2p.ID] * p2p.NetAddress)
// 最多可連接的 總次數(shù)
maxAttempts := numToDial * 3
for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ {
// 獲取一個地址
try := r.book.PickAddress(newBias)
if try == nil {
continue
}
if _, selected := toDial[try.ID]; selected {
continue
}
if r.Switch.IsDialingOrExistingAddress(try) {
continue
}
r.Logger.Info("Will dial address", "addr", try)
toDial[try.ID] = try
}
// 將獲取的地址,依次發(fā)起連接
for _, addr := range toDial {
go func(addr *p2p.NetAddress) {
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Error(err.Error(), "addr", addr)
}
}
}(addr)
}
}
- 連接某個地址
func (r *PEXReactor) dialPeer(addr *p2p.NetAddress) error {
attempts, lastDialed := r.dialAttemptsInfo(addr)
// 是否已超過嘗試連接次數(shù)
if attempts > maxAttemptsToDial {
r.book.MarkBad(addr)
return errMaxAttemptsToDial{}
}
if attempts > 0 {
jitterSeconds := time.Duration(cmn.RandFloat64() * float64(time.Second))
backoffDuration := jitterSeconds + ((1 << uint(attempts)) * time.Second)
sinceLastDialed := time.Since(lastDialed)
if sinceLastDialed < backoffDuration {
return errTooEarlyToDial{backoffDuration, lastDialed}
}
}
// 調(diào)用 switch 的連接函數(shù)
err := r.Switch.DialPeerWithAddress(addr)
if err != nil {
....
return errors.Wrapf(err, "dialing failed (attempts: %d)", attempts + 1)
}
r.attempsToDial.Delete(addr.DialString())
return nil
}
- 定時向其他節(jié)點請求地址
func (r *PEXReactor) crawlPeersRoutine() {
// 如果配置了 seed 信息, 則連接 seed 節(jié)點
if len(r.seedAddrs) > 0 {
r.dialSeeds()
} else {
// 從地址簿的節(jié)點開始請求
r.crawlPeers(r.book.GetSelection())
}
ticker := time.NewTicker(crawlPeerPeriod)
for {
select {
case <-ticker.C:
r.attemptDisconnects()
r.crawlPeers(r.book.GetSelection())
r.cleanupCrawlPeerInfos()
case <-r.Quit():
return
}
}
}
- 連接 seed 節(jié)點
func (r *PEXReactor) dialSeeds() {
perm := cmn.RandPerm(len(r.seedAddrs))
for _, i := range perm {
seedAddr := r.seedAddrs[i]
err := r.Switch.DialPeerWithAddress(seedAddr)
if err == nil {
return
}
r.Switch.Logger.Error("Error dialing seed", "err", err, "seed", seedAddr)
}
r.Switch.Logger.Error("Couldn't connect to any seeds")
}
- 請求更多地址
func (r *PEXReactor) crawlPeers(addrs []*p2p.NetAddress) {
now := time.Now()
for _, addr := range addrs {
peerInfo, ok := r.crawlPeerInfos[addr.ID]
....
// 先連接節(jié)點
err := r.dialPeer(addr)
if err != nil {
switch err.(type) {
case errMaxAttemptsToDial, errTooEarlyToDial:
r.Logger.Debug(err.Error(), "addr", addr)
default:
r.Logger.Error(err.Error(), "addr", addr)
}
continue
}
peer := r.Switch.Peers().Get(addr.ID)
if peer != nil {
// 連接成功,獲取節(jié)點后 發(fā)起請求更多地址
r.RequestAddrs(peer)
}
}
}
- 定時 routine 中調(diào)用 attemptDisconnects 方法,斷開連接時間較長的節(jié)點
// 超過 SeedDisconnectWaitPerild 時長的節(jié)點,斷開連接
func (r *PEXReactor) attemptDisconnects() {
for _, peer := range r.Switch.Peers().List() {
if peer.Status().Duration < r.config.SeedDisconnectWaitPeriod {
continue
}
if peer.IsPersistent() {
continue
}
r.Switch.StopPeerGracefully(peer)
}
}
- 還有一個重要的方法, 接收到消息時調(diào)用
// Receive implements Reactor by handling incoming PEX messages.
func (r *PEXReactor) Receive(chID byte, src Peer, msgBytes []byte) {
msg, err := decodeMsg(msgBytes)
if err != nil {
r.Logger.Error("Error decoding message", "src", src, "chId", chID, "msg", msg, "err", err, "bytes", msgBytes)
r.Switch.StopPeerForError(src, err)
return
}
r.Logger.Debug("Received message", "src", src, "chId", chID, "msg", msg)
switch msg := msg.(type) {
case *pexRequestMessage:
// 如果當前節(jié)點時seedMode ,接收對方的節(jié)點信息請求,針對一次連接,該節(jié)點只能請求一次
if r.config.SeedMode && !src.IsOutbound() {
id := string(src.ID())
// 如果已經(jīng)請求過,直接返回
v := r.lastReceivedRequests.Get(id)
if v != nil {
return
}
r.lastReceivedRequests.Set(id, time.Now())
// 返回給對方 一些地址
r.SendAddrs(src, r.book.GetSelectionWithBias(biasToSelectNewPeers))
go func() {
src.FlushStop()
r.Switch.StopPeerGracefully(src)
}()
} else {
// 首先檢查對方請求次數(shù)是否太頻繁
if err := r.receiveRequest(src); err != nil {
r.Switch.StopPeerForError(src, err)
return
}
r.SendAddrs(src, r.book.GetSelection())
}
case *pexAddrsMessage:
// 收到 一些地址 信息
if err := r.ReceiveAddrs(msg.Addrs, src); err != nil {
r.Switch.StopPeerForError(src, err)
return
}
default:
r.Logger.Error(fmt.Sprintf("Unknown message type %v", reflect.TypeOf(msg)))
}
}