p2p網(wǎng)絡(luò)從底層到上層可以分為3層,地址 連接 節(jié)點(diǎn),每一層都有自己的功能
聲明:文章代碼和源碼有不一致地方
這篇文章簡單記錄下連接conn
三個(gè)主要的結(jié)構(gòu)體
1、連接管理
// ConnManager providers a manager to handle network connections.
type ConnManager struct {
// the following variables must only be used atomically
// 記錄自己主動(dòng)連接其他節(jié)點(diǎn)的連接數(shù)量
connReqCount uint64
// 標(biāo)識(shí)connmgr已經(jīng)啟動(dòng)
start int32
// 標(biāo)識(shí)connmgr已經(jīng)結(jié)束
stop int32
// 設(shè)定相關(guān)的配置
cfg Config
// 用于同步connmgr的退出狀態(tài),調(diào)用方可以阻塞等待connmgr的工作協(xié)程退出
wg sync.WaitGroup
// 某個(gè)連接失敗后,connmgr嘗試選擇新的peer地址連接的總次數(shù)
failedAttempts uint64
// 用于與connmgr工作協(xié)程通信的管道
requests chan interface{}
// 用于通知工作協(xié)程退出
quit chan struct{}
}
2、Config,配置參數(shù)
其實(shí)就是connmgr配置,本身就是connmgr的一個(gè)字段。
// Config holds the configuration options related to the connection manager.
type Config struct {
// Listeners define a slice of listeners for which the connection manager
// will take ownership of(取得所有權(quán)) and accept connections. when a connection
// is accepted,the OnAccept handler will be invoked with the connection. since
// the connection manager takes ownership of these listeners,they will be closed
// when the connection manager is stoped.
// this field will not have any effect if the onAccept field is not also specified.
// It may be nil if the caller does not wish to listen for
// incoming connection
Listeners []net.Listener //節(jié)點(diǎn)上所有等待外部連接的監(jiān)聽點(diǎn);
// OnAccept is a callback that is fired when an inbound connection is
// accepted. It is the caller's responsibility(責(zé)任、義務(wù)) to close the connection.
// Failure to close the connection will result in the connection manager
// believing the connection is still active and thus have undesirable
// side effects such as still counting toward maximum connection limits.
//
// This field will not have any effect if the Listeners field is not
// also specified since there couldn't possibly be any accepted
// connections in that case.
OnAccept func(net.Conn) // 節(jié)點(diǎn)應(yīng)答并接受外部連接后的回調(diào)函數(shù)
// TargetOutbound is the number of outbound network connections to
// maintain. Defaults to 8.
TargetOutbound uint32 // 節(jié)點(diǎn)主動(dòng)向外連接peer的最大個(gè)數(shù)
// RetryDuration is the duration to wait before retrying connection
// requests. Defaults to 5s.
RetryDuration time.Duration // 連接失敗后發(fā)起重連的等待時(shí)間
// OnConnection is a callback that is fired when a new outbound
// connection is established.
OnConnection func(*ConnReq, net.Conn) // 連接建立成功后的回調(diào)函數(shù)
// OnDisconnection is a callback that is fired when an outbound
// connection is disconnected.
OnDisconnection func(*ConnReq) // 連接關(guān)閉后的回調(diào)函數(shù)
// GetNewAddress is a way to get an address to make a network connection
// to. If nil, no new connections will be made automatically.
// 連接失敗后,ConnMgr可能會(huì)選擇新的peer地址進(jìn)行連接
// GetNewAddress函數(shù)提供了獲取新的peer地址的方法,它最終會(huì)調(diào)用addrManager中
// 的GetAddress()來分配新地址。
GetNewAddress func() (net.Addr, error)
// Dial connects to the address on the named network.It cannot be nil.
// 定義建立TCP連接的方式,是直接連接還是通過代理連接。
Dial func(net.Addr) (net.Conn, error)
}
3、ConnReq 描述了一個(gè)連接
// ConnReq is the connection request to a network address. If permanent, the
// connection will be retried on disconnection.
// ConnReq 描述了一個(gè)連接
type ConnReq struct {
// The following variables must only be used atomically.
// 連接的序號(hào),用于索引
id uint64
// 連接的目的地址
Addr net.Addr
// 標(biāo)識(shí)是否與Peer保持永久連接,如果為true,
// 則連接失敗后,繼續(xù)嘗試與該P(yáng)eer連接,而不是選擇新的Peer地址重新連接
Permanent bool
// 連接成功后,真實(shí)的net.Conn對(duì)象;
conn net.Conn
// 連接的狀態(tài),有ConnPending、ConnEstablished、ConnDisconnected及ConnFailed等;
state ConnState
// stateMtx: 保護(hù)state狀態(tài)的讀寫鎖;
stateMtx sync.RWMutex
//retryCount: 如果Permanent為true,retryCount記錄該連接重復(fù)重連的次數(shù);
retryCount uint32
}
結(jié)合起來說,就是連接管理器connmgr按照自身的配置config,管理著一些連接connReq
啟動(dòng)ConnMgr
我們先看start()函數(shù)
// Start: launches(發(fā)起、發(fā)動(dòng))the connection manager and begins conecting to the network.
func (cm *ConnManager) Start() {
// already started ?
if atomic.AddInt32(&cm.start, 1) != 1 {
return
}
log.Trace("Connection manager started")
cm.wg.Add(1)
// 啟動(dòng)工作協(xié)程
go cm.connHandler()
// Start all the listeners so long as the caller requested
// them and provided a callback to be invoked when connections are accepted.
if cm.cfg.OnAccept != nil {
for _, listenr := range cm.cfg.Listeners {
cm.wg.Add(1)
// 啟動(dòng)監(jiān)聽協(xié)程listenHandler,等待其他節(jié)點(diǎn)連接;
go cm.listenHandler(listenr)
}
}
// 啟動(dòng)建立連接的協(xié)程,選擇Peer地址并主動(dòng)連接;
for i := atomic.LoadUint64(&cm.connReqCount); i < uint64(cm.cfg.TargetOutbound); i++ {
go cm.NewConnReq()
}
}
主要是啟動(dòng)工作協(xié)程cm.connHandler(),
然后一方面監(jiān)聽其他節(jié)點(diǎn)的連接,go cm.listenHandler(listenr)這里面做的事情就是我們普通的tcp地址監(jiān)聽。
一方面主動(dòng)去連接其他的節(jié)點(diǎn)。
主動(dòng)連接其他節(jié)點(diǎn)cm.NewConnReq()
// NewConnReq creates a new connection request and connects to the
// corresponding(對(duì)應(yīng)的) address.
// 創(chuàng)建一個(gè)連接請求,然后連接對(duì)應(yīng)的地址
func (cm *ConnManager) NewConnReq() {
if atomic.LoadInt32(&cm.stop) != 0 {
return
}
if cm.cfg.GetNewAddress == nil {
return
}
c := &ConnReq{}
atomic.StoreUint64(&c.id, atomic.AddUint64(&cm.connReqCount, 1))
// Submit a request of a pending connection attempt to the connection
// manager. By registering the id before the connection is even established,
// we'll be able to later cancel the connection via the Remove method.
done := make(chan struct{})
select {
case cm.requests <- registerPending{c, done}:
case <-cm.quit:
return
}
// wait for the registration to successfully add the pending conn req
// to the conn manager's internal state.
select {
case <-done:
case <-cm.quit:
return
}
addr,err := cm.cfg.GetNewAddress()
if err != nil {
select {
case cm.requests <- handleFailed{c, err}:
case <-cm.quit:
}
return
}
c.Addr = addr
cm.Connect(c)
}
首先構(gòu)造一個(gè)ConnReq,c := &ConnReq{},然后生成registerPending{c, done},
把registerPending寫入到connmgr的通道case cm.requests <- registerPending{c, done}
這里的registerPending結(jié)構(gòu)體中還有一個(gè)通道done,cm.requests這個(gè)通道肯定有人會(huì)從里面讀數(shù)據(jù),處理完后會(huì)通過通道done返回信息。下面的case <-done:就是在等待返回的信息。
誰在通道的另外一頭讀呢?go cm.connHandler(),下面這個(gè)圖就是他們工作概況

然后cm.cfg.GetNewAddress()得到一個(gè)連接的地址(這里用到了addrMgr),然后連接連接cm.Connect(c)
// Connection assigns an id and dials a connection to the address of the connection request
func (cm *ConnManager) Connect(c *ConnReq){
if atomic.LoadInt32(&cm.stop) != 0{
return
}
// TODO 再次檢查一遍,相當(dāng)于重復(fù)了NewConnReq()的工作
log.Debugf("Attempting to connect to %v", c)
conn,err := cm.cfg.Dial(c.Addr)
if err != nil {
select {
case cm.requests <- handleFailed{c, err}:
case <-cm.quit:
}
return
}
select {
case cm.requests <- handleConnected{c, conn}:
case <-cm.quit:
}
}
連接主要就是這句代碼conn,err := cm.cfg.Dial(c.Addr),這個(gè)Dial就是在普通的tcp連接外包了一層,讓我們有個(gè)選擇,比如可以通過代理進(jìn)行連接。
不論是連接成功還是失敗,handleConnected{c, conn}:和handleFailed{c, err}:這兩個(gè)結(jié)構(gòu)體都被構(gòu)建,并且發(fā)送到cm.requests
有連接就有斷開
func (cm *ConnManager) Disconnect(id uint64) {
if atomic.LoadInt32(&cm.stop) != 0 {
return
}
select {
case cm.requests <- handleDisconnected{id, true}:
case <-cm.quit:
}
}
可connect也差不多,都是向cm.requests發(fā)了一個(gè)請求。
看來,連接或者斷開連接的主要處理邏輯在connHandler中,我們來看看它的實(shí)現(xiàn):
// connHandler handles all connection related requests. It must be run as a
// goroutine.
//
// The connection handler makes sure that we maintain a pool of active outbound
// connections so that we remain connected to the network. Connection requests
// are processed and mapped by their assigned ids.
func (cm *ConnManager) connHandler() {
// pending holds all registered conn requests that hava yet to succeed.
var pending = make(map[uint64]*ConnReq)
// conns represents the set of all actively connected peers.
var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map時(shí),size可以省略,當(dāng)你知道大小時(shí),最好加上
out:
for {
select {
case req := <- cm.requests:
switch msg:=req.(type) {
case registerPending:
// TODO
case handleConnected:
connReq := msg.c
if _, ok := pending[connReq.id]; !ok {
if msg.conn != nil {
msg.conn.Close()
}
log.Debugf("Ignoring connection for "+
"canceled connreq=%v", connReq)
continue
}
connReq.updateState(ConnEstablished)
connReq.conn = msg.conn
conns[connReq.id] = connReq
log.Debugf("Connected to %v", connReq)
connReq.retryCount = 0
cm.failedAttempts = 0
delete(pending, connReq.id)
if cm.cfg.OnConnection != nil {
go cm.cfg.OnConnection(connReq, msg.conn)
}
case handleDisconnected:
// TODO
case handleFailed:
// TODO
}
case <-cm.quit:
break out
}
}
cm.wg.Done()
log.Trace("Connection handler done")
}
在這里不停的處理cm.requests通道中的信息。我們看下連接成功的處理
起初創(chuàng)建了兩個(gè)變量
// pending holds all registered conn requests that hava yet to succeed.
var pending = make(map[uint64]*ConnReq)
// conns represents the set of all actively connected peers.
var conns = make(map[uint64]*ConnReq, cm.cfg.TargetOutbound) // make map時(shí),size可以省略,當(dāng)你知道大小時(shí),最好加上
連接成功后
- 在
map``pending中找有沒有這個(gè)連接請求,如果沒有則表明這不要我們要的連接。斷開 - 更新
connReq的狀態(tài),然后添加到map conns中 - 調(diào)用
go cm.cfg.OnConnection(connReq, msg.conn)
兩個(gè)peer之間的連接conn,還需要考慮其他的很多方面。但是還好,到現(xiàn)在我們至少可以簡單的創(chuàng)建一個(gè)連接了。
至于cm.cfg.OnConnection()要干什么,我們后面再分析了。