以太坊源碼(2)——rlpx協(xié)議的握手以及對分幀加解密傳送

ethereum協(xié)議

2、P2P網(wǎng)絡(luò)啟動

(3)Server.go setupDiscovery()

初始化發(fā)現(xiàn)協(xié)議。Discover


func (srv *Server) setupDiscovery() error {
    if srv.NoDiscovery && !srv.DiscoveryV5 {
        return nil
    }

    addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
    if err != nil {
        return err
    }
    conn, err := net.ListenUDP("udp", addr)
    if err != nil {
        return err
    }
    realaddr := conn.LocalAddr().(*net.UDPAddr)
    srv.log.Debug("UDP listener up", "addr", realaddr)
    if srv.NAT != nil {
        if !realaddr.IP.IsLoopback() {
            go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, qidongrealaddr.Port, "ethereum discovery")
        }
    }
    srv.localnode.SetFallbackUDP(realaddr.Port)

    // Discovery V4
    var unhandled chan discover.ReadPacket
    var sconn *sharedUDPConn
    if !srv.NoDiscovery {
        if srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }
        cfg := discover.Config{
            PrivateKey:  srv.PrivateKey,
            NetRestrict: srv.NetRestrict,
            Bootnodes:   srv.BootstrapNodes,
            Unhandled:   unhandled,
        }
        ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
        if err != nil {
            return err
        }
        srv.ntab = ntab
    }
    // Discovery V5
    if srv.DiscoveryV5 {
        var ntab *discv5.Network
        var err error
        if sconn != nil {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, "", srv.NetRestrict)
        } else {
            ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, "", srv.NetRestrict)
        }
        if err != nil {
            return err
        }
        if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
            return err
        }
        srv.DiscV5 = ntab
    }
    return nil
}

基于UDP。

func (srv *Server) setupListening() error {
    // Launch the TCP listener.
    listener, err := net.Listen("tcp", srv.ListenAddr)
    if err != nil {
        return err
    }
    laddr := listener.Addr().(*net.TCPAddr)
    srv.ListenAddr = laddr.String()
    srv.listener = listener
    srv.localnode.Set(enr.TCP(laddr.Port))

    srv.loopWG.Add(1)
    go srv.listenLoop()

    // Map the TCP listening port if NAT is configured.
    if !laddr.IP.IsLoopback() && srv.NAT != nil {
        srv.loopWG.Add(1)
        go func() {
            nat.Map(srv.NAT, srv.quit, "tcp", laddr.Port, laddr.Port, "ethereum p2p")
            srv.loopWG.Done()
        }()
    }
    return nil
}

setuplistening 創(chuàng)建TCP監(jiān)聽。srv.listenLoop 協(xié)程循環(huán)監(jiān)聽socket,等待slots。直到Accept,調(diào)用setupConnect()。


// listenLoop runs in its own goroutine and accepts
// inbound connections.
func (srv *Server) listenLoop() {
    defer srv.loopWG.Done()
    srv.log.Debug("TCP listener up", "addr", srv.listener.Addr())

    tokens := defaultMaxPendingPeers
    if srv.MaxPendingPeers > 0 {
        tokens = srv.MaxPendingPeers
    }
    slots := make(chan struct{}, tokens)
    for i := 0; i < tokens; i++ {
        slots <- struct{}{}
    }

    for {
        // Wait for a handshake slot before accepting.
        <-slots

        var (
            fd  net.Conn
            err error
        )
        for {
            fd, err = srv.listener.Accept()
            if netutil.IsTemporaryError(err) {
                srv.log.Debug("Temporary read error", "err", err)
                continue
            } else if err != nil {
                srv.log.Debug("Read error", "err", err)
                return
            }
            break
        }

        // Reject connections that do not match NetRestrict.
        if srv.NetRestrict != nil {
            if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok && !srv.NetRestrict.Contains(tcp.IP) {
                srv.log.Debug("Rejected conn (not whitelisted in NetRestrict)", "addr", fd.RemoteAddr())
                fd.Close()
                slots <- struct{}{}
                continue
            }
        }

        var ip net.IP
        if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok {
            ip = tcp.IP
        }
        fd = newMeteredConn(fd, true, ip)
        srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr())
        go func() {
            srv.SetupConn(fd, inboundConn, nil)
            slots <- struct{}{}
        }()
    }
}

看setupConn()。正常連接使用該函數(shù)建立p2p鏈路。
首先要獲取dial節(jié)點的public key,也就是nodeID。


func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) error {
    // Prevent leftover pending conns from entering the handshake.
    srv.lock.Lock()
    running := srv.running
    srv.lock.Unlock()
    if !running {
        return errServerStopped
    }
    // If dialing, figure out the remote public key.
    var dialPubkey *ecdsa.PublicKey
    if dialDest != nil {
        dialPubkey = new(ecdsa.PublicKey)
        if err := dialDest.Load((*enode.Secp256k1)(dialPubkey)); err != nil {
            return errors.New("dial destination doesn't have a secp256k1 public key")
        }
    }

然后進行加密握手。

remotePubkey, err := c.doEncHandshake(srv.PrivateKey, dialPubkey)

協(xié)議握手

// Run the protocol handshake
    phs, err := c.doProtoHandshake(srv.ourHandshake)
    if err != nil {
        clog.Trace("Failed proto handshake", "err", err)
        return err
    }

這里卡住了好久。。因為Ctrl+leftclick跟不到doEncHandshake和doProtoHandshake方法。最后只能全局查找,發(fā)現(xiàn)原來實現(xiàn)在p2p/rlpx.go中??梢钥吹匠跏蓟薳ncHandshake類型的變量,然后通過makeAuthMsg創(chuàng)建了authMsg。然后產(chǎn)生authPacket,傳輸給遠端,獲取authRespPacket

func (t *rlpx) doEncHandshake(prv *ecdsa.PrivateKey, dial *ecdsa.PublicKey) (*ecdsa.PublicKey, error) {
    var (
        sec secrets
        err error
    )
    if dial == nil {
        sec, err = receiverEncHandshake(t.fd, prv)
    } else {
        sec, err = initiatorEncHandshake(t.fd, prv, dial)
    }
    if err != nil {
        return nil, err
    }
    t.wmu.Lock()
    t.rw = newRLPXFrameRW(t.fd, sec)
    t.wmu.Unlock()
    return sec.Remote.ExportECDSA(), nil
}

主動撥號狀態(tài)的,發(fā)起initiatorEncHandshake,否則是監(jiān)聽時接收的receiverEncHandshake。再跟一步看initiatorEncHandshake和receiverEncHandshake。

func initiatorEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey, remote *ecdsa.PublicKey) (s secrets, err error) {
    //定義h為encHandshake 導(dǎo)入ECDSA PUBLIC KEY
    h := &encHandshake{initiator: true, remote: ecies.ImportECDSAPublic(remote)}
    //creates the initiator handshake message.
    authMsg, err := h.makeAuthMsg(prv)
    if err != nil {
        return s, err
    }
    //rlp編碼,加密也在里面進行
    authPacket, err := sealEIP8(authMsg, h)
    if err != nil {
        return s, err
    }
    if _, err = conn.Write(authPacket); err != nil {
        return s, err
    }
    //RLPX V4 handshake 回應(yīng)
    authRespMsg := new(authRespV4)

    //處理接收的消息,讀取authResponsePacket 解碼和解密。
    authRespPacket, err := readHandshakeMsg(authRespMsg, encAuthRespLen, prv, conn)
    if err != nil {
        return s, err
    }
    //獲取握手過程中的Nonce和隨機PublicKey
    if err := h.handleAuthResp(authRespMsg); err != nil {
        return s, err
    }
    //將authPacket和authRespPacket打包成共享的secret
    return h.secrets(authPacket, authRespPacket)
}

下面詳細看上面的幾個函數(shù)。

// makeAuthMsg creates the initiator handshake message.
func (h *encHandshake) makeAuthMsg(prv *ecdsa.PrivateKey) (*authMsgV4, error) {
    // Generate random initiator nonce.
    //為加密握手生成一個Nonce
    h.initNonce = make([]byte, shaLen)
    _, err := rand.Read(h.initNonce)
    if err != nil {
        return nil, err
    }
    // Generate random keypair to for ECDH.
    //獲取隨機私鑰
    h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
    if err != nil {
        return nil, err
    }

    // Sign known message: static-shared-secret ^ nonce
    //生成token ,就是共享秘鑰
    token, err := h.staticSharedSecret(prv)
    if err != nil {
        return nil, err
    }
    //異或
    signed := xor(token, h.initNonce)
    //隨機私鑰簽名
    signature, err := crypto.Sign(signed, h.randomPrivKey.ExportECDSA())
    if err != nil {
        return nil, err
    }

    msg := new(authMsgV4)
    //msg包含對Nonce的簽名、發(fā)起者的公鑰、以及發(fā)起者的Nonce
    copy(msg.Signature[:], signature)
    copy(msg.InitiatorPubkey[:], crypto.FromECDSAPub(&prv.PublicKey)[1:])
    copy(msg.Nonce[:], h.initNonce)
    msg.Version = 4
    //返回authMsgV4對象
    return msg, nil
}


func sealEIP8(msg interface{}, h *encHandshake) ([]byte, error) {
    //對msg進行編碼和添加前綴,具體前綴是啥沒細看。
    buf := new(bytes.Buffer)
    if err := rlp.Encode(buf, msg); err != nil {
        return nil, err
    }
    // pad with random amount of data. the amount needs to be at least 100 bytes to make
    // the message distinguishable from pre-EIP-8 handshakes.
    pad := padSpace[:mrand.Intn(len(padSpace)-100)+100]
    buf.Write(pad)
    prefix := make([]byte, 2)
    binary.BigEndian.PutUint16(prefix, uint16(buf.Len()+eciesOverhead))
    //用遠端公鑰ecies加密。
    enc, err := ecies.Encrypt(rand.Reader, h.remote, buf.Bytes(), nil, prefix)
    return append(prefix, enc...), err
}

func readHandshakeMsg(msg plainDecoder, plainSize int, prv *ecdsa.PrivateKey, r io.Reader) ([]byte, error) {
    buf := make([]byte, plainSize)
    if _, err := io.ReadFull(r, buf); err != nil {
        return buf, err
    }
    // Attempt decoding pre-EIP-8 "plain" format.
    //私鑰解密,嘗試用EIP8以前的格式
    key := ecies.ImportECDSA(prv)
    if dec, err := key.Decrypt(buf, nil, nil); err == nil {
        msg.decodePlain(dec)
        return buf, nil
    }
    // Could be EIP-8 format, try that.
    //EIP8后,使用了RLP編解碼,需要處理前綴等等。
    prefix := buf[:2]
    size := binary.BigEndian.Uint16(prefix)
    if size < uint16(plainSize) {
        return buf, fmt.Errorf("size underflow, need at least %d bytes", plainSize)
    }
    buf = append(buf, make([]byte, size-uint16(plainSize)+2)...)
    if _, err := io.ReadFull(r, buf[plainSize:]); err != nil {
        return buf, err
    }
    //同樣的私鑰解碼
    dec, err := key.Decrypt(buf[2:], nil, prefix)
    if err != nil {
        return buf, err
    }
    // Can't use rlp.DecodeBytes here because it rejects
    // trailing data (forward-compatibility).
    s := rlp.NewStream(bytes.NewReader(dec), 0)
    //返回的buf是authRespPacket -- buffer類型 ,s.Decode將buffer Decode,然后返回的是authRespV4對象。
    return buf, s.Decode(msg)
}

func (h *encHandshake) handleAuthResp(msg *authRespV4) (err error) {
    //從AuthMsg對象中取出Nonce和RandomPublicKey
    h.respNonce = msg.Nonce[:]
    h.remoteRandomPub, err = importPublicKey(msg.RandomPubkey[:])
    return err
}

//打包成加密握手的共享secret
func (h *encHandshake) secrets(auth, authResp []byte) (secrets, error) {
    //瞬時秘鑰,只在當前鏈接中有效。滿足前向安全。
    ecdheSecret, err := h.randomPrivKey.GenerateShared(h.remoteRandomPub, sskLen, sskLen)
    if err != nil {
        return secrets{}, err
    }

    // derive base secrets from ephemeral key agreement
    //對前面的秘鑰,還有兩個隨機數(shù)進行hash,生成一個aes秘鑰和MAC
    sharedSecret := crypto.Keccak256(ecdheSecret, crypto.Keccak256(h.respNonce, h.initNonce))
    aesSecret := crypto.Keccak256(ecdheSecret, sharedSecret)
    s := secrets{
        Remote: h.remote,
        AES:    aesSecret,
        MAC:    crypto.Keccak256(ecdheSecret, aesSecret),
    }

    // setup sha3 instances for the MACs
    //算兩個mac,作為EMac和InMac,用于后續(xù)RLPX幀的MAC驗證。
    mac1 := sha3.NewLegacyKeccak256()
    mac1.Write(xor(s.MAC, h.respNonce))
    mac1.Write(auth)
    mac2 := sha3.NewLegacyKeccak256()
    mac2.Write(xor(s.MAC, h.initNonce))
    mac2.Write(authResp)
    if h.initiator {
        s.EgressMAC, s.IngressMAC = mac1, mac2
    } else {
        s.EgressMAC, s.IngressMAC = mac2, mac1
    }
    //返回s
    return s, nil
}

再看接受加密請求的處理函數(shù)receiverEncHandshake。


// prv is the local client's private key.
func receiverEncHandshake(conn io.ReadWriter, prv *ecdsa.PrivateKey) (s secrets, err error) {
    authMsg := new(authMsgV4)
    //收到請求,先調(diào)用readHandshakeMsg進行解密解碼
    authPacket, err := readHandshakeMsg(authMsg, encAuthMsgLen, prv, conn)
    if err != nil {
        return s, err
    }
    h := new(encHandshake)
    //處理發(fā)過來的請求
    if err := h.handleAuthMsg(authMsg, prv); err != nil {
        return s, err
    }
    //發(fā)起請求應(yīng)答
    authRespMsg, err := h.makeAuthResp()
    if err != nil {
        return s, err
    }
    //編碼成packet
    var authRespPacket []byte
    if authMsg.gotPlain {
        authRespPacket, err = authRespMsg.sealPlain(h)
    } else {
        authRespPacket, err = sealEIP8(authRespMsg, h)
    }
    if err != nil {
        return s, err
    }
    //寫入傳輸
    if _, err = conn.Write(authRespPacket); err != nil {
        return s, err
    }
    //握手包打包
    return h.secrets(authPacket, authRespPacket)
}

對發(fā)來的請求進行處理,獲取發(fā)起者的nonce以及隨機公鑰。


func (h *encHandshake) handleAuthMsg(msg *authMsgV4, prv *ecdsa.PrivateKey) error {
    // Import the remote identity.
    rpub, err := importPublicKey(msg.InitiatorPubkey[:])
    if err != nil {
        return err
    }
    //這里的initNonce就是發(fā)起者發(fā)過來的Nonce
    h.initNonce = msg.Nonce[:]
    //發(fā)起者公鑰
    h.remote = rpub

 ,
    // Generate random keypair for ECDH.
    // If a private key is already set, use it instead of generating one (for testing).
    if h.randomPrivKey == nil {
        //   //生成自己的隨機私鑰
        h.randomPrivKey, err = ecies.GenerateKey(rand.Reader, crypto.S256(), nil)
        if err != nil {
            return err
        }
    }
    //通過ECDH算出共享秘鑰token
    // Check the signature.
    token, err := h.staticSharedSecret(prv)
    if err != nil {
        return err
    }
    //異或,從明文以及簽名推出發(fā)起者的隨機公鑰。
    signedMsg := xor(token, h.initNonce)
    remoteRandomPub, err := secp256k1.RecoverPubkey(signedMsg, msg.Signature[:])
    if err != nil {
        return err
    }
    //發(fā)起者隨機公鑰放入握手中
    h.remoteRandomPub, _ = importPublicKey(remoteRandomPub)
    return nil
}

準備應(yīng)答請求的makeAuthResp,生成被動方的nonce,將自己的Nonce以及公鑰都放入Msg中。


func (h *encHandshake) makeAuthResp() (msg *authRespV4, err error) {
    // Generate random nonce.
    h.respNonce = make([]byte, shaLen)
    if _, err = rand.Read(h.respNonce); err != nil {
        return nil, err
    }
    //authResponse 包括,接受者自己生成的Nonce,自己的隨機公鑰
    msg = new(authRespV4)
    copy(msg.Nonce[:], h.respNonce)
    copy(msg.RandomPubkey[:], exportPubkey(&h.randomPrivKey.PublicKey))
    msg.Version = 4
    return msg, nil
}

至此,加密握手完畢。
開始分幀傳輸。主要是通過WriteMsg和ReadMsg兩個函數(shù)實現(xiàn),給上層的p2p提供支持。

幀結(jié)構(gòu)如下:

frame = header || header-mac || frame-data || frame-mac
header = frame-size || header-data || padding
frame-size = size of frame excluding padding, integer < 2**24, big endian
header-data = rlp.list(protocol-type[, context-id])
protocol-type = integer < 2**16, big endian
context-id = integer < 2**16, big endian
padding = zero-fill to 16-byte boundary
frame-content = any binary data

header-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ header-ciphertext).digest
frame-mac = left16(egress-mac.update(aes(mac-secret,egress-mac)) ^ left16(egress-mac.update(frame-ciphertext).digest))
egress-mac = keccak256 state, continuously updated with egress bytes
ingress-mac = keccak256 state, continuously updated with ingress bytes

left16(x) is the first 16 bytes of x
|| is concatenate
^ is xor

新建的幀函數(shù)

func newRLPXFrameRW(conn io.ReadWriter, s secrets) *rlpxFrameRW {
    //HMAC的秘鑰
    macc, err := aes.NewCipher(s.MAC)
    if err != nil {
        panic("invalid MAC secret: " + err.Error())
    }
    //aes 秘鑰
    encc, err := aes.NewCipher(s.AES)
    if err != nil {
        panic("invalid AES secret: " + err.Error())
    }
    // we use an all-zeroes IV for AES because the key used
    // for encryption is ephemeral.
    //零向量
    iv := make([]byte, encc.BlockSize())
    return &rlpxFrameRW{
        conn:       conn,
        enc:        cipher.NewCTR(encc, iv),
        dec:        cipher.NewCTR(encc, iv),
        macCipher:  macc,
        egressMAC:  s.EgressMAC,
        ingressMAC: s.IngressMAC,
    }
}

WriteMsg看一下


func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
    //將msg的code(應(yīng)該指狀態(tài)碼)RLP編碼成byte流
    ptype, _ := rlp.EncodeToBytes(msg.Code)

    // if snappy is enabled, compress message now
    //snappy 好像是個數(shù)據(jù)流壓縮方式
    if rw.snappy {
        if msg.Size > maxUint24 {
            return errPlainMessageTooLarge
        }
        payload, _ := ioutil.ReadAll(msg.Payload)
        payload = snappy.Encode(nil, payload)

        msg.Payload = bytes.NewReader(payload)
        msg.Size = uint32(len(payload))
    }
    // write header
    //幀的header 32byte
    headbuf := make([]byte, 32)
    //幀大小,包括payload大小以及Code RLP編碼后的byte[]長度
    fsize := uint32(len(ptype)) + msg.Size
    if fsize > maxUint24 {
        return errors.New("message size overflows uint24")
    }
    putInt24(fsize, headbuf) // TODO: check overflow
    copy(headbuf[3:], zeroHeader)
    //頭16個byte,AES256 CTR模式 和AES秘鑰加密
    rw.enc.XORKeyStream(headbuf[:16], headbuf[:16]) // first half is now encrypted

    // write header MAC
    //更新header MAC 送入conn
    copy(headbuf[16:], updateMAC(rw.egressMAC, rw.macCipher, headbuf[:16]))
    if _, err := rw.conn.Write(headbuf); err != nil {
        return err
    }

    // write encrypted frame, updating the egress MAC hash with the data written to conn.
    // rw.enc是AES秘鑰,tee好像是一個StreamWriter,將msg寫成byte類型, 它調(diào)用XORKeyStream來處理通過的每個數(shù)據(jù)片段,讀進來應(yīng)該就是
    tee := cipher.StreamWriter{S: rw.enc, W: io.MultiWriter(rw.conn, rw.egressMAC)}
    //這里通過tee.Write 調(diào)用XORKeyStream與秘鑰進行XOR運算,將Msg.code進行加密
    //
    if _, err := tee.Write(ptype); err != nil {
        return err
    }
    //將payload復(fù)制到tee的Writer中,但其實因為tee的Writer實現(xiàn)了自己的Write函數(shù),所以也調(diào)用了XORKeyStream進行加密
    if _, err := io.Copy(tee, msg.Payload); err != nil {
        return err
    }
    //補零補足
    if padding := fsize % 16; padding > 0 {
        if _, err := tee.Write(zero16[:16-padding]); err != nil {
            return err
        }
    }

    // write frame MAC. egress MAC hash is up to date because
    // frame content was written to it as well.
    //更新frameMAC,因為frame內(nèi)容中也包括了framemac所以egressMAC會不停變化?
    fmacseed := rw.egressMAC.Sum(nil)
    //func updateMAC(mac hash.Hash, block cipher.Block, seed []byte) []byte 
    //mac是指frame mac
    mac := updateMAC(rw.egressMAC, rw.macCipher, fmacseed)
    _, err := rw.conn.Write(mac)
    return err
}

然后是協(xié)議握手。 商量好運行的協(xié)議的version啊名稱啊等等


func (t *rlpx) doProtoHandshake(our *protoHandshake) (their *protoHandshake, err error) {
    // Writing our handshake happens concurrently, we prefer
    // returning the handshake read error. If the remote side
    // disconnects us early with a valid reason, we should return it
    // as the error so it can be tracked elsewhere.
    werr := make(chan error, 1)
    go func() { werr <- Send(t.rw, handshakeMsg, our) }()
    if their, err = readProtocolHandshake(t.rw); err != nil {
        <-werr // make sure the write terminates too
        return nil, err
    }
    if err := <-werr; err != nil {
        return nil, fmt.Errorf("write error: %v", err)
    }
    // If the protocol version supports Snappy encoding, upgrade immediately
    t.rw.snappy = their.Version >= snappyProtocolVersion

    return their, nil
}

rlpx加解密篇完畢?。。。?/p>

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容