package p2p
import (
"context"
"errors"
"time"
net "gx/ipfs/QmPjvxTpVH8qJyQDnxnsxF9kv9jezKD1kozz1hs3fCGsNh/go-libp2p-net"
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
pro "gx/ipfs/QmZNkThpqfVXs9GNbexPrfBbXSLNYeKrE7jwFM2oqHbyqN/go-libp2p-protocol"
pstore "gx/ipfs/QmZR2XWVVBCtbgBWnQhWk2xcQfaR3W8faQPriAiaaj7rsr/go-libp2p-peerstore"
p2phost "gx/ipfs/Qmb8T6YBBsjYsVGfrihQLfCJveczZnneSBqBKkYEBWDjge/go-libp2p-host"
peer "gx/ipfs/QmdVrMn1LhB4ybb8hMVaMLXnA8XRSewMnK6YqXKXoTcRvN/go-libp2p-peer"
)
//P2P結(jié)構(gòu)保存當(dāng)前正在運(yùn)行的流/監(jiān)聽器的信息
// P2P structure holds information on currently running streams/listeners
type P2P struct {
//監(jiān)聽器
Listeners ListenerRegistry
//數(shù)據(jù)流
Streams StreamRegistry
//節(jié)點(diǎn)ID
identity peer.ID
//節(jié)點(diǎn)地址
peerHost p2phost.Host
//一個線程安全的對等節(jié)點(diǎn)存儲
peerstore pstore.Peerstore
}
//創(chuàng)建一個新的p2p結(jié)構(gòu)
// NewP2P creates new P2P struct
//這個新的p2p結(jié)構(gòu)不包含p2p結(jié)構(gòu)中的監(jiān)聽器和數(shù)據(jù)流
func NewP2P(identity peer.ID, peerHost p2phost.Host, peerstore pstore.Peerstore) *P2P {
return &P2P{
identity: identity,
peerHost: peerHost,
peerstore: peerstore,
}
}
//新建一個數(shù)據(jù)流 工具方法 構(gòu)建一個有節(jié)點(diǎn)id,內(nèi)容和協(xié)議的流
func (p2p P2P) newStreamTo(ctx2 context.Context, p peer.ID, protocol string) (net.Stream, error) {
//30s 后會自動timeout
ctx, cancel := context.WithTimeout(ctx2, time.Second30) //TODO: configurable?
defer cancel()
err := p2p.peerHost.Connect(ctx, pstore.PeerInfo{ID: p})
if err != nil {
return nil, err
}
//NewStream為給定的對等點(diǎn)p打開一個新的流,并編寫一個p2p/協(xié)議 帶有給定協(xié)議. id的頭文件。
return p2p.peerHost.NewStream(ctx2, p, pro.ID(protocol))
}
//對話為遠(yuǎn)程監(jiān)聽器創(chuàng)建新的P2P流
//創(chuàng)建一個新的p2p流實(shí)現(xiàn)對對話的監(jiān)聽
// Dial creates new P2P stream to a remote listener
//Multiaddr是一種跨協(xié)議、跨平臺的表示格式的互聯(lián)網(wǎng)地址。它強(qiáng)調(diào)明確性和自我描述。
//對內(nèi)接收
func (p2p P2P) Dial(ctx context.Context, addr ma.Multiaddr, peer peer.ID, proto string, bindAddr ma.Multiaddr) (ListenerInfo, error) {
//獲取一些節(jié)點(diǎn)信息 network, host, nil
lnet, _, err := manet.DialArgs(bindAddr)
if err != nil {
return nil, err
}
//監(jiān)聽信息
listenerInfo := ListenerInfo{
//節(jié)點(diǎn)身份
Identity: p2p.identity,
////應(yīng)用程序協(xié)議標(biāo)識符。
Protocol: proto,
}
//調(diào)用newStreamTo 通過ctx(內(nèi)容) peer(節(jié)點(diǎn)id) proto(協(xié)議標(biāo)識符) 參數(shù)獲取一個新的數(shù)據(jù)流
remote, err := p2p.newStreamTo(ctx, peer, proto)
if err != nil {
return nil, err
}
//network協(xié)議標(biāo)識
switch lnet {
//network為"tcp", "tcp4", "tcp6"
case "tcp", "tcp4", "tcp6":
//從監(jiān)聽器獲取新的信息 nla.Listener, nil
listener, err := manet.Listen(bindAddr)
if err != nil {
if err2 := remote.Reset(); err2 != nil {
return nil, err2
}
return nil, err
}
//將獲取的新信息保存到listenerInfo
listenerInfo.Address = listener.Multiaddr()
listenerInfo.Closer = listener
listenerInfo.Running = true
//開啟接受
go p2p.doAccept(&listenerInfo, remote, listener)
default:
return nil, errors.New("unsupported protocol: " + lnet)
}
return &listenerInfo, nil
}
//
func (p2p *P2P) doAccept(listenerInfo *ListenerInfo, remote net.Stream, listener manet.Listener) {
//關(guān)閉偵聽器并刪除流處理程序
defer listener.Close()
//Returns a Multiaddr friendly Conn
//一個有好的 Multiaddr 連接
local, err := listener.Accept()
if err != nil {
return
}
stream := StreamInfo{
//連接協(xié)議
Protocol: listenerInfo.Protocol,
//定位節(jié)點(diǎn)
LocalPeer: listenerInfo.Identity,
//定位節(jié)點(diǎn)地址
LocalAddr: listenerInfo.Address,
//遠(yuǎn)程節(jié)點(diǎn)
RemotePeer: remote.Conn().RemotePeer(),
//遠(yuǎn)程節(jié)點(diǎn)地址
RemoteAddr: remote.Conn().RemoteMultiaddr(),
//定位
Local: local,
//遠(yuǎn)程
Remote: remote,
//注冊碼
Registry: &p2p.Streams,
}
//注冊連接信息
p2p.Streams.Register(&stream)
//開啟節(jié)點(diǎn)廣播
stream.startStreaming()
}
//偵聽器將流處理程序包裝到偵聽器中
// Listener wraps stream handler into a listener
type Listener interface {
Accept() (net.Stream, error)
Close() error
}
//P2PListener保存關(guān)于偵聽器的信息
// P2PListener holds information on a listener
type P2PListener struct {
peerHost p2phost.Host
conCh chan net.Stream
proto pro.ID
ctx context.Context
cancel func()
}
//等待偵聽器的連接
// Accept waits for a connection from the listener
func (il *P2PListener) Accept() (net.Stream, error) {
select {
case c := <-il.conCh:
return c, nil
case <-il.ctx.Done():
return nil, il.ctx.Err()
}
}
//關(guān)閉偵聽器并刪除流處理程序
// Close closes the listener and removes stream handler
func (il *P2PListener) Close() error {
il.cancel()
il.peerHost.RemoveStreamHandler(il.proto)
return nil
}
// Listen創(chuàng)建新的P2PListener
// Listen creates new P2PListener
func (p2p P2P) registerStreamHandler(ctx2 context.Context, protocol string) (P2PListener, error) {
ctx, cancel := context.WithCancel(ctx2)
list := &P2PListener{
peerHost: p2p.peerHost,
proto: pro.ID(protocol),
conCh: make(chan net.Stream),
ctx: ctx,
cancel: cancel,
}
p2p.peerHost.SetStreamHandler(list.proto, func(s net.Stream) {
select {
case list.conCh <- s:
case <-ctx.Done():
s.Reset()
}
})
return list, nil
}
// NewListener創(chuàng)建新的p2p偵聽器
// NewListener creates new p2p listener
//對外廣播
func (p2p P2P) NewListener(ctx context.Context, proto string, addr ma.Multiaddr) (ListenerInfo, error) {
//調(diào)用registerStreamHandler 構(gòu)造一個新的listener
listener, err := p2p.registerStreamHandler(ctx, proto)
if err != nil {
return nil, err
}
//構(gòu)造新的listenerInfo
listenerInfo := ListenerInfo{
Identity: p2p.identity,
Protocol: proto,
Address: addr,
Closer: listener,
Running: true,
Registry: &p2p.Listeners,
}
go p2p.acceptStreams(&listenerInfo, listener)
//注冊連接信息
p2p.Listeners.Register(&listenerInfo)
return &listenerInfo, nil
}
//接受流
func (p2p *P2P) acceptStreams(listenerInfo *ListenerInfo, listener Listener) {
for listenerInfo.Running {
//一個有好的 遠(yuǎn)程 連接
remote, err := listener.Accept()
if err != nil {
listener.Close()
break
}
local, err := manet.Dial(listenerInfo.Address)
if err != nil {
remote.Reset()
continue
}
stream := StreamInfo{
Protocol: listenerInfo.Protocol,
LocalPeer: listenerInfo.Identity,
LocalAddr: listenerInfo.Address,
RemotePeer: remote.Conn().RemotePeer(),
RemoteAddr: remote.Conn().RemoteMultiaddr(),
Local: local,
Remote: remote,
Registry: &p2p.Streams,
}
//注冊數(shù)據(jù)流
p2p.Streams.Register(&stream)
//數(shù)據(jù)流廣播
stream.startStreaming()
}
//取消注冊表中的p2p偵聽器
p2p.Listeners.Deregister(listenerInfo.Protocol)
}
// CheckProtoExists檢查是否注冊了協(xié)議處理程序
// mux處理程序
// CheckProtoExists checks whether a protocol handler is registered to
// mux handler
func (p2p *P2P) CheckProtoExists(proto string) bool {
protos := p2p.peerHost.Mux().Protocols()
for _, p := range protos {
if p != proto {
continue
}
return true
}
return false
}