以太坊節(jié)點間通信是指本地節(jié)點和peer節(jié)點之間的按照p2p線上協(xié)議標準實現(xiàn)的數(shù)據(jù)收發(fā)過程。其中peer節(jié)點從發(fā)現(xiàn)協(xié)議維護的活躍節(jié)點列表中獲取。
p2p消息傳輸協(xié)議詳細介紹請看:https://github.com/ethereum/wiki/wiki/%C3%90%CE%9EVp2p-Wire-Protocol
ethereum線上傳輸協(xié)議詳細介紹請看:https://github.com/ethereum/wiki/wiki/Ethereum-Wire-Protocol
要說明p2p和eth線上協(xié)議,涉及到軟件架構中的3層:
- service層: 提供PeerManager和ChainService
- protocol層: 實現(xiàn)p2p-protocol 和eth-protocol命令報文的收發(fā)
- network層: 接收tcp連接的原始數(shù)據(jù)并解析,分發(fā)給相應的協(xié)議
下面主要分析p2p-protocol和PeerManager,來說明這幾層之間的交互流程。
1. 概述
本地節(jié)點需要和指定數(shù)量的peer維持tcp連接, 如果連接的節(jié)點數(shù)不足, 從節(jié)點發(fā)現(xiàn)協(xié)議維護的活躍節(jié)點列表中動態(tài)獲取節(jié)點信息。
本地節(jié)點監(jiān)聽peer的tcp連接, 當有peer連接時,為每個peer都維護一個Peer數(shù)據(jù)結構,在這個Peer結構中實現(xiàn)了protocol層和network層:network層負責消息的收發(fā), 并分發(fā)給protocol層的p2pProtocol和ethProtocol。
2. network層
網(wǎng)絡層的詳細功能將下圖:
- PeerManager服務啟動時,在p2p端口上啟動tcp連接的監(jiān)聽;
- PeerManager服務啟動時,如果配置有bootstrap, 會主動進行連接;
- 無論主動還是被動連接到一個peer, 都會建立一個Peer結構, 配置protocol層的協(xié)議(
p2pprotocol,ethprotocol); - 在Peer結構初始化時,使能network層的數(shù)據(jù)收發(fā)功能;
- 本node和peer建立tcp連接后, 首先需要進行加密認證:
5.1 在初始化Peer結構的多路復用的會話分發(fā)器時, 作為連接的發(fā)起者(發(fā)起者知道對端的公鑰), 會發(fā)送一個hello認證消息;
5.2 等發(fā)送協(xié)程起來后,hello認證消息就會被發(fā)送出去;
5.3 對端的Peer結構也是首次收到消息,即該hello認證消息, 在對端的分發(fā)器層需要專門處理該hello認證消息;
5.4 對端驗證該消息的簽名, 回復一個認證確認消息;
5.5 然后本端和對端都發(fā)一個hello認證消息,觸發(fā)定時發(fā)送ping包,進行心跳?;睿?/li> - 消息的發(fā)送和接收都是通過緩存隊列異步執(zhí)行的,方便控制發(fā)送速率和數(shù)據(jù)塊大小。
- 首次和peer通信時,peer發(fā)過來的hello報文后, 該函數(shù)檢查是否允許連接該peer是否超過配置最大的連接數(shù), 超過了則不允許連接;是否是重復連接。
2.1 消息發(fā)送
- 每個Peer都有一個send_packet接口,用戶發(fā)送已經(jīng)組裝好的消息報文;
- 發(fā)送的消息目前只支持p2pProtocol和ethProtocol協(xié)議層的格式;
- 消息送給消息復用分發(fā)器mux,組裝成Frame格式
3.1 組裝成Frame是為了RLP序列化編碼,幀數(shù)據(jù)提供了報文的大小,報文的協(xié)議;
3.2 Frame有2種形式:單幀,多幀;
3.3 單幀的格式如下:header || header-mac || frame || mac
3.4 多幀的格式如下:
header || header-mac || frame-0 || [ header || header-mac || frame-n || ... || ] header || header-mac || frame-last || mac
消息也可以帶有優(yōu)先級,因此, 消息在組裝Frame后,根據(jù)類型分別進入3中不同類型的隊列:normal隊列,chunked隊列,priority隊列;
從上面3個隊列中根據(jù)pws(protocol-window-size)大小和隊列中frame數(shù)據(jù)組裝成數(shù)據(jù)流,方法如下:
5.1 normal隊列和priority隊列中存在Frame:分別獲取pws/2字節(jié)的報文數(shù)據(jù)(以frame為單位)
5.2 chunked隊列和priority隊列中存在Frame:分別獲取pws/2字節(jié)的報文數(shù)據(jù)(以frame為單位)
5.3 normal隊列和chunked隊列中存在Frame:分別獲取pws/2字節(jié)的報文數(shù)據(jù)(以frame為單位)
5.4 其他情況,從有數(shù)據(jù)的隊列中獲取pws字節(jié)的frame;
5.5 所有獲取的數(shù)據(jù)字節(jié)數(shù)大于pws才可以發(fā)送;對上面取出來的frame數(shù)據(jù)進行加密,在放入發(fā)送隊列:message_queue;
線程_run_egress_message監(jiān)聽該隊列, 發(fā)送最終的數(shù)據(jù)給peer;
2.2 消息接收
收到原始的tcp數(shù)據(jù)流后, 需要解析為消息, 并根據(jù)protocolID和cmdID分發(fā)給p2pProtocol和ethProtocol協(xié)議。
3. 協(xié)議層
protocol層處于service層和network層之間,實現(xiàn)了一系列的命令。
3.1 命令消息
在協(xié)議層實現(xiàn)p2p線上協(xié)議定義的命令。
協(xié)議層的實例比如ethProtocol協(xié)議實例,對于每個命令消息,在初始化時都會生成下面3個方法(X是命令名字):
protocol.create_X(*args, **kargs)protocol._receive_X(data)protocol.send_X(*args, **kargs)
其中send_X是 protocol.send_packet(protocol.create_X(*args, **kargs)) 的簡寫。
上圖是ethProtocol協(xié)議實例舉例,比如發(fā)送和接收block對應的接口分別是send_newblock(),_receive_newblock() 處理, 最后調用注冊的回調函數(shù): receive_newblock_callbacks() 。
3.2 接收命令報文
接收從網(wǎng)絡層的報文的入口是 protocol.receive_packet。
在 protocol.receive_packet中:
- 從peer的netwwork層接收報文;
- 根據(jù)命令的結構,對數(shù)據(jù)進行反序列化并保存為dict;
- 根據(jù)cmd_id得到cmd_name,執(zhí)行cmd_name對應的receiveX(...)函數(shù),即command.receive();command.receive()中默認執(zhí)行的是注冊的回調函數(shù):
protocol.receive_X_callbacks; - 依次執(zhí)行注冊的callbacks;
3.3 發(fā)送命令報文
直接調用send_X函數(shù)。
4. P2PProtocol協(xié)議層
P2PProtocol有4個命令:
-
hello: cmd_id = 0 握手報文
發(fā)送: send_hello(...)
接收: 在hello命令的receive中
- 先注冊ethProtocol, 實現(xiàn)位置:
proto.peer.receive_hello(proto, **data) - 啟動定時發(fā)送ping的任務:
BaseProtocol.command.receive(self, proto, data)
- 先注冊ethProtocol, 實現(xiàn)位置:
disconnect: cmd_id = 1 關閉連接報文
ping: cmd_id = 2 ?;畹男奶鴪笪?,定時發(fā)送ping給peer;
pong: cmd_id = 3 ?;畹男奶鴪笪?收到ping后,回一個pong;
p2pprotocol還包括一個連接監(jiān)視器:
self.monitor = ConnectionMonitor(self)
在ConnectionMonitor中, 設置了pong消息和hello消息的回調函數(shù):
self.proto.receive_pong_callbacks.append(self.track_response)
self.proto.receive_hello_callbacks.append(lambda p, **kargs: self.start())
receive_pong_callbacks: 計算ping和pong的時間間隔
receive_hello_callbacks: 啟動定時發(fā)送ping 的保活消息任務: 計算ping消息是否超時
5. 服務層: PeerManager
Peermanager負責和peer的連接,并維護peer結構.
5.1 peer連接
peer連接分2種:我去連peer, 等待peer來連我。
等待peer來連我(不知道peer的公鑰)
在初始化時, 已經(jīng)在p2p監(jiān)聽端口上啟動了一個tcp服務, 監(jiān)聽tcp客戶端的連接。
如果新的連接進來, 啟動一個協(xié)程執(zhí)行已注冊的處理函數(shù): _on_new_connection , 流程如下:
peer = self._start_peer(connection, address)
- 創(chuàng)建一個Peer結構(繼承自Greenlet)
peer = Peer(self, connection, remote_pubkey=remote_pubkey),詳細過程見下面小節(jié). - 設置peer掛掉時的回調函數(shù)
peer.link(on_peer_exit) - 保存peer
self.peers.append(peer) - 調度peer運行
peer.start(). 最終通過_run啟動3個協(xié)程, 處理network層的原始數(shù)據(jù)分發(fā),詳見下面小節(jié).
主動連接peer(知道peer的公鑰)
有2種情況需要主動連接peer:
- 在初始化時,如果配置了bootstrap,主動連接bootstrap;
- 監(jiān)視協(xié)程_discovery_loop線程定時檢查,檢測到連接的peer數(shù)量少于指定數(shù)量時, 從節(jié)點發(fā)現(xiàn)協(xié)議kademlia維護的活躍節(jié)點列表中選取節(jié)點進行連接;
peer = self._start_peer(connection, address, remote_pubkey)
5.2 peer廣播
接口:broadcast(protocol, command_name, ...)
這是一個通用函數(shù), 可以廣播指定協(xié)議的指定命令. 可以指定廣播的peer數(shù)量, 和指定排除那些peer.
對于符合要求的peer, 根據(jù)command_name 找到對應的send_<command_name>函數(shù),, 發(fā)送要求的數(shù)據(jù).
6. Peer
peer實現(xiàn)network層和protocol層, 主要功能實際上在上文中已經(jīng)詳細介紹:發(fā)送消息, 接收數(shù)據(jù)并解析, 分發(fā)給相應的Protocol協(xié)議。
6.1 peer的初始化
- 創(chuàng)建一個加密的多路復用的command分發(fā)器
self.mux = MultiplexedSession(privkey, hello_packet, remote_pubkey=remote_pubkey),在MultiplexedSession初始化中:
1.1hello_packet報文是p2p協(xié)議的cmdID=0的協(xié)議報文
1.2 新建一個rlpx_session,用于加密會話數(shù)據(jù)
1.3 如果已知peer的公鑰,和peer進行加密握手:_send_init_msg()
auth_msg = self.rlpx_session.create_auth_message(self._remote_pubkey)#創(chuàng)建認證消息 auth_msg_ct = self.rlpx_session.encrypt_auth_message(auth_msg) #加密認證消息 self.message_queue.put(auth_msg_ct) #發(fā)送加密后的消息
- 注冊p2p協(xié)議, 啟動和peer連接的服務
self.connect_service(self.peermanager)
2.1 實例化一個線上協(xié)議: P2PProtocol
2.2 在復用器中增加該協(xié)議:self.mux.add_protocol(protocol.protocol_id)
2.3 啟動該協(xié)議(P2PProtocol):protocol.start()。執(zhí)行的是baseProtocol的start(), 注冊自定義的收到命令消息后的回調函數(shù)
6.2 Peer在network層收發(fā)數(shù)據(jù)
在收到一個新的peer的連接到時候, 實例化一個Peer結構, 然后會觸發(fā)peer.start(), 創(chuàng)建3個協(xié)程分別處理收發(fā)報文:
_run_ingress_message
- 等待讀事件:
self.safe_to_read.wait() - 每次讀取最多4k數(shù)據(jù):
imsg = self.connection.recv(4096) - 將消息加到分發(fā)器中:
self.mux.add_message(imsg)
3.1 對消息進行解碼, 因為是流數(shù)據(jù), 需要解析消息,并對消息進行解密
3.2add_message有兩個定義
- 第一次收到消息時處理函數(shù)是
_add_message_during_handshake
- 如果是連接的發(fā)起者, 收到的第一個消息是認證確認消息,如果是連接的接收者 , 收到的第一個消息是hello認證消息
- 解密消息,并驗證簽名(對端使用本節(jié)點的公鑰進行簽名)
- 如果是連接的發(fā)起者, 再發(fā)送一個hello認證消息給對端, 如果是連接的接收者, 驗證成功后, 回復一個認證確認消息, 然后需要再發(fā)送一個hello認證消息給對端
- 以后的消息處理入口是:
_add_message_post_handshake
_run_decoded_packets
- 從packet_queue獲取報文數(shù)據(jù)
- 解析消息,并對每個消息解析protocol, cmd_id
- 發(fā)給protocol層的receive_packet(pkt)
_run_egress_message
- 從message_queue中獲取報文,并發(fā)送
- 剛啟動該監(jiān)聽任務時, 如果是本連接的發(fā)起者, 那么隊列中已經(jīng)有一個hello的認證消息, 此時會馬上將該消息發(fā)送出去.