btcd的p2p網(wǎng)絡(luò)(2)-連接ConnMgr

p2p網(wǎng)絡(luò)從底層到上層可以分為3層,地址 連接 節(jié)點(diǎn),每一層都有自己的功能
聲明:文章代碼和源碼有不一致地方
這篇文章簡單記錄下連接conn

三個(gè)主要的結(jié)構(gòu)體

1、連接管理

// ConnManager providers a manager to handle network connections.
type ConnManager struct {
    // the following variables must only be used atomically
    // 記錄自己主動(dòng)連接其他節(jié)點(diǎn)的連接數(shù)量
    connReqCount uint64
    // 標(biāo)識(shí)connmgr已經(jīng)啟動(dòng)
    start int32
    // 標(biāo)識(shí)connmgr已經(jīng)結(jié)束
    stop int32

    // 設(shè)定相關(guān)的配置
    cfg Config
    // 用于同步connmgr的退出狀態(tài),調(diào)用方可以阻塞等待connmgr的工作協(xié)程退出
    wg sync.WaitGroup
    // 某個(gè)連接失敗后,connmgr嘗試選擇新的peer地址連接的總次數(shù)
    failedAttempts uint64
    // 用于與connmgr工作協(xié)程通信的管道
    requests chan interface{}
    // 用于通知工作協(xié)程退出
    quit chan struct{}
}

2、Config,配置參數(shù)
其實(shí)就是connmgr配置,本身就是connmgr的一個(gè)字段。

// Config holds the configuration options related to the connection manager.
type Config struct {
    // Listeners define a slice of listeners for which the connection manager
    // will take ownership of(取得所有權(quán)) and accept connections. when a connection
    // is accepted,the OnAccept handler will be invoked with the connection. since
    // the connection manager takes ownership of these listeners,they will be closed
    // when the connection manager is stoped.

    // this field will not have any effect if the onAccept field is not also specified.
    // It may be nil if the caller does not wish to listen for
    // incoming connection

    Listeners []net.Listener //節(jié)點(diǎn)上所有等待外部連接的監(jiān)聽點(diǎn);
    // OnAccept is a callback that is fired when an inbound connection is
    // accepted.  It is the caller's responsibility(責(zé)任、義務(wù)) to close the connection.
    // Failure to close the connection will result in the connection manager
    // believing the connection is still active and thus have undesirable
    // side effects such as still counting toward maximum connection limits.
    //
    // This field will not have any effect if the Listeners field is not
    // also specified since there couldn't possibly be any accepted
    // connections in that case.
    OnAccept func(net.Conn) // 節(jié)點(diǎn)應(yīng)答并接受外部連接后的回調(diào)函數(shù)
    // TargetOutbound is the number of outbound network connections to
    // maintain. Defaults to 8.
    TargetOutbound uint32 // 節(jié)點(diǎn)主動(dòng)向外連接peer的最大個(gè)數(shù)
    // RetryDuration is the duration to wait before retrying connection
    // requests. Defaults to 5s.
    RetryDuration time.Duration // 連接失敗后發(fā)起重連的等待時(shí)間
    // OnConnection is a callback that is fired when a new outbound
    // connection is established.
    OnConnection func(*ConnReq, net.Conn) // 連接建立成功后的回調(diào)函數(shù)
    // OnDisconnection is a callback that is fired when an outbound
    // connection is disconnected.
    OnDisconnection func(*ConnReq) // 連接關(guān)閉后的回調(diào)函數(shù)
    // GetNewAddress is a way to get an address to make a network connection
    // to.  If nil, no new connections will be made automatically.
    // 連接失敗后,ConnMgr可能會(huì)選擇新的peer地址進(jìn)行連接
    // GetNewAddress函數(shù)提供了獲取新的peer地址的方法,它最終會(huì)調(diào)用addrManager中
    // 的GetAddress()來分配新地址。
    GetNewAddress func() (net.Addr, error)
    // Dial connects to the address on the named network.It cannot be nil.
    // 定義建立TCP連接的方式,是直接連接還是通過代理連接。
    Dial func(net.Addr) (net.Conn, error)
}

3、ConnReq 描述了一個(gè)連接

// ConnReq is the connection request to a network address. If permanent, the
// connection will be retried on disconnection.
// ConnReq 描述了一個(gè)連接
type ConnReq struct {
    // The following variables must only be used atomically.
    // 連接的序號(hào),用于索引
    id uint64
    // 連接的目的地址
    Addr      net.Addr
    // 標(biāo)識(shí)是否與Peer保持永久連接,如果為true,
    // 則連接失敗后,繼續(xù)嘗試與該P(yáng)eer連接,而不是選擇新的Peer地址重新連接
    Permanent bool
    // 連接成功后,真實(shí)的net.Conn對(duì)象;
    conn       net.Conn
    // 連接的狀態(tài),有ConnPending、ConnEstablished、ConnDisconnected及ConnFailed等;
    state      ConnState
    // stateMtx: 保護(hù)state狀態(tài)的讀寫鎖;
    stateMtx   sync.RWMutex
    //retryCount: 如果Permanent為true,retryCount記錄該連接重復(fù)重連的次數(shù);
    retryCount uint32
}

結(jié)合起來說,就是連接管理器connmgr按照自身的配置config,管理著一些連接connReq

啟動(dòng)ConnMgr

我們先看start()函數(shù)

// Start: launches(發(fā)起、發(fā)動(dòng))the connection manager and begins conecting to the network.
func (cm *ConnManager) Start() {
    // already started ?
    if atomic.AddInt32(&cm.start, 1) != 1 {
        return
    }
    log.Trace("Connection manager started")
    cm.wg.Add(1)
    // 啟動(dòng)工作協(xié)程
    go cm.connHandler()
    // Start all the listeners so long as the caller requested
    // them and provided a callback to be invoked when connections are accepted.
    if cm.cfg.OnAccept != nil {
        for _, listenr := range cm.cfg.Listeners {
            cm.wg.Add(1)
            // 啟動(dòng)監(jiān)聽協(xié)程listenHandler,等待其他節(jié)點(diǎn)連接;
            go cm.listenHandler(listenr)
        }
    }
    // 啟動(dòng)建立連接的協(xié)程,選擇Peer地址并主動(dòng)連接;
    for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
        go cm.NewConnReq()
    }
}

主要是啟動(dòng)工作協(xié)程cm.connHandler(),
然后一方面監(jiān)聽其他節(jié)點(diǎn)的連接,go cm.listenHandler(listenr)這里面做的事情就是我們普通的tcp地址監(jiān)聽。
一方面主動(dòng)去連接其他的節(jié)點(diǎn)。
主動(dòng)連接其他節(jié)點(diǎn)cm.NewConnReq()

// NewConnReq creates a new connection request and connects to the
// corresponding(對(duì)應(yīng)的) address.
// 創(chuàng)建一個(gè)連接請求,然后連接對(duì)應(yīng)的地址
func (cm *ConnManager) NewConnReq() {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }
    if cm.cfg.GetNewAddress == nil {
        return
    }
    c := &ConnReq{}
    atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
    // Submit a request of a pending connection attempt to the connection
    // manager. By registering the id before the connection is even established,
    // we'll be able to later cancel the connection via the Remove method.
    done := make(chan struct{})
    select {
    case cm.requests <- registerPending{c, done}:
    case <-cm.quit:
        return
    }

    // wait for the registration to successfully add the pending conn req
    // to the conn manager's internal state.
    select {
    case <-done:
    case <-cm.quit:
        return
    }
    addr,err := cm.cfg.GetNewAddress()
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }
    c.Addr = addr
    cm.Connect(c)
}

首先構(gòu)造一個(gè)ConnReq,c := &ConnReq{},然后生成registerPending{c, done},
registerPending寫入到connmgr的通道case cm.requests <- registerPending{c, done}

這里的registerPending結(jié)構(gòu)體中還有一個(gè)通道done,cm.requests這個(gè)通道肯定有人會(huì)從里面讀數(shù)據(jù),處理完后會(huì)通過通道done返回信息。下面的case <-done:就是在等待返回的信息。
誰在通道的另外一頭讀呢?go cm.connHandler(),下面這個(gè)圖就是他們工作概況

然后cm.cfg.GetNewAddress()得到一個(gè)連接的地址(這里用到了addrMgr),然后連接連接cm.Connect(c)

// Connection assigns an id and dials a connection to the address of the connection request
func (cm *ConnManager) Connect(c *ConnReq){
    if atomic.LoadInt32(&cm.stop) != 0{
        return
    }
    // TODO 再次檢查一遍,相當(dāng)于重復(fù)了NewConnReq()的工作
    log.Debugf("Attempting to connect to %v", c)
    conn,err := cm.cfg.Dial(c.Addr)
    if err != nil {
        select {
        case cm.requests <- handleFailed{c, err}:
        case <-cm.quit:
        }
        return
    }

    select {
    case cm.requests <- handleConnected{c, conn}:
    case <-cm.quit:
    }
}

連接主要就是這句代碼conn,err := cm.cfg.Dial(c.Addr),這個(gè)Dial就是在普通的tcp連接外包了一層,讓我們有個(gè)選擇,比如可以通過代理進(jìn)行連接。

不論是連接成功還是失敗,handleConnected{c, conn}:handleFailed{c, err}:這兩個(gè)結(jié)構(gòu)體都被構(gòu)建,并且發(fā)送到cm.requests

有連接就有斷開

func (cm *ConnManager) Disconnect(id uint64) {
    if atomic.LoadInt32(&cm.stop) != 0 {
        return
    }

    select {
    case cm.requests <- handleDisconnected{id, true}:
    case <-cm.quit:
    }
}

connect也差不多,都是向cm.requests發(fā)了一個(gè)請求。

看來,連接或者斷開連接的主要處理邏輯在connHandler中,我們來看看它的實(shí)現(xiàn):

// connHandler handles all connection related requests.  It must be run as a
// goroutine.
//
// The connection handler makes sure that we maintain a pool of active outbound
// connections so that we remain connected to the network.  Connection requests
// are processed and mapped by their assigned ids.
func (cm *ConnManager) connHandler() {
    // pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map時(shí),size可以省略,當(dāng)你知道大小時(shí),最好加上

out:
    for {
        select {
        case req := <- cm.requests:
            switch msg:=req.(type) {
            case registerPending:
                // TODO
            case handleConnected:
                connReq := msg.c

                if _, ok := pending[connReq.id]; !ok {
                    if msg.conn != nil {
                        msg.conn.Close()
                    }
                    log.Debugf("Ignoring connection for "+
                        "canceled connreq=%v", connReq)
                    continue
                }

                connReq.updateState(ConnEstablished)
                connReq.conn = msg.conn
                conns[connReq.id] = connReq
                log.Debugf("Connected to %v", connReq)
                connReq.retryCount = 0
                cm.failedAttempts = 0

                delete(pending, connReq.id)

                if cm.cfg.OnConnection != nil {
                    go cm.cfg.OnConnection(connReq, msg.conn)
                }
            case handleDisconnected:
                // TODO
            case handleFailed:
                // TODO
            }

        case <-cm.quit:
            break out
        }
    }
    cm.wg.Done()
    log.Trace("Connection handler done")
}

在這里不停的處理cm.requests通道中的信息。我們看下連接成功的處理
起初創(chuàng)建了兩個(gè)變量

// pending holds all registered conn requests that hava yet to succeed.
    var pending = make(map[uint64]*ConnReq)
    // conns represents the set of all actively connected peers.
    var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map時(shí),size可以省略,當(dāng)你知道大小時(shí),最好加上

連接成功后

  1. map``pending中找有沒有這個(gè)連接請求,如果沒有則表明這不要我們要的連接。斷開
  2. 更新connReq的狀態(tài),然后添加到map conns
  3. 調(diào)用go cm.cfg.OnConnection(connReq, msg.conn)

兩個(gè)peer之間的連接conn,還需要考慮其他的很多方面。但是還好,到現(xiàn)在我們至少可以簡單的創(chuàng)建一個(gè)連接了。
至于cm.cfg.OnConnection()要干什么,我們后面再分析了。


參考
http://www.itdecent.cn/p/d6484e5710ad

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容