Eos代碼學(xué)習(xí)筆記(五)同步區(qū)塊--p2p通信--notice_message

上篇筆記寫到遠(yuǎn)程節(jié)點(diǎn)給本地節(jié)點(diǎn)發(fā)送notice消息,通知本地節(jié)點(diǎn)同步區(qū)塊。本篇筆記繼續(xù)學(xué)習(xí)本地節(jié)點(diǎn)如何從遠(yuǎn)程節(jié)點(diǎn)同步區(qū)塊,這次重點(diǎn)寫notice消息的發(fā)送與處理過程。

1、notice_message消息結(jié)構(gòu)

//消息結(jié)構(gòu) 定義部分
struct notice_message {
  notice_message () : known_trx(), known_blocks() {}
  ordered_txn_ids known_trx;
  ordered_blk_ids known_blocks;
};
//選擇查看定義,跳轉(zhuǎn)到
using ordered_txn_ids = select_ids<transaction_id_type>;
using ordered_blk_ids = select_ids<block_id_type>;
//選擇查看定義,跳轉(zhuǎn)到
template<typename T>
struct select_ids {
  select_ids () : mode(none),pending(0),ids() {}
  id_list_modes  mode;
  uint32_t       pending;
  vector<T>      ids;        //模板類
  bool           empty () const { return (mode == none || ids.empty()); }
};

notice_message消息類型包括id_list_modes類型的變量,該類型為一個枚舉類型,包括enum id_list_modes { none, catch_up, last_irr_catch_up, normal };,pending表示區(qū)塊數(shù)目。

2、發(fā)送notice_message消息

遠(yuǎn)程節(jié)點(diǎn)給本地節(jié)點(diǎn)發(fā)送notice_message,通知本地節(jié)點(diǎn)同步到不可逆區(qū)塊。

//遠(yuǎn)程節(jié)點(diǎn)給本地節(jié)點(diǎn)發(fā)送的通知消息內(nèi)容 消息調(diào)用部分
if (lib_num > msg.head_num ) {
   fc_dlog(logger, "sync check state 2");
   if (msg.generation > 1 || c->protocol_version > proto_base) {
      notice_message note;
      note.known_trx.pending = lib_num;
      note.known_trx.mode = last_irr_catch_up;   //類型為不可逆區(qū)塊類型
      note.known_blocks.mode = last_irr_catch_up;
      note.known_blocks.pending = head;
      c->enqueue( note );
   }
   c->syncing = true;
   return;
}

遠(yuǎn)程節(jié)點(diǎn)發(fā)送的notice_message消息內(nèi)容比較簡單,包括不可逆區(qū)塊數(shù)、最新區(qū)塊數(shù)、同步方式(同步到不可逆區(qū)塊或者同步到最新區(qū)塊),然后放入到消息隊(duì)列等待發(fā)送出去。

3、接收notice_message消息

同上篇筆記所寫的一樣,處理notice_message消息同樣在一個handle_message的重載函數(shù)里進(jìn)行,參數(shù)為當(dāng)前通信的遠(yuǎn)程節(jié)點(diǎn)的connect對象指針和消息。

void net_plugin_impl::handle_message( connection_ptr c, const notice_message &msg) {
      // peer tells us about one or more blocks or txns. When done syncing, forward on
      // notices of previously unknown blocks or txns,
      //
      peer_ilog(c, "received notice_message");
      c->connecting = false;
      request_message req;
      bool send_req = false;
      if (msg.known_trx.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_trx.mode))("n",msg.known_trx.pending));
      }
      switch (msg.known_trx.mode) {
      case none:
         break;
      case last_irr_catch_up: {
         c->last_handshake_recv.head_num = msg.known_trx.pending;
         req.req_trx.mode = none;
         break;
      }
      case catch_up : {
         if( msg.known_trx.pending > 0) {
            // plan to get all except what we already know about.
            req.req_trx.mode = catch_up;
            send_req = true;
            size_t known_sum = local_txns.size();
            if( known_sum ) {
               for( const auto& t : local_txns.get<by_id>( ) ) {
                  req.req_trx.ids.push_back( t.id );
               }
            }
         }
         break;
      }
      case normal: {
         dispatcher->recv_notice (c, msg, false);
      }
      }

      if (msg.known_blocks.mode != none) {
         fc_dlog(logger,"this is a ${m} notice with ${n} blocks", ("m",modes_str(msg.known_blocks.mode))("n",msg.known_blocks.pending));
      }
      switch (msg.known_blocks.mode) {
      case none : {
         if (msg.known_trx.mode != normal) {
            return;
         }
         break;
      }
      case last_irr_catch_up: //同步到最新區(qū)塊和同步到不可逆區(qū)塊 調(diào)用的函數(shù)相同
      case catch_up: {
         sync_master->recv_notice(c,msg); //通過sync_master進(jìn)行區(qū)塊同步
         break;
      }
      case normal : {
         dispatcher->recv_notice (c, msg, false);
         break;
      }
      default: {
         peer_elog(c, "bad notice_message : invalid known_blocks.mode ${m}",("m",static_cast<uint32_t>(msg.known_blocks.mode)));
      }
      }
      fc_dlog(logger, "send req = ${sr}", ("sr",send_req));
      if( send_req) {
         c->enqueue(req);
      }
   }

由于遠(yuǎn)程節(jié)點(diǎn)發(fā)送的notice_message數(shù)據(jù)包內(nèi)容為同步到不可逆區(qū)塊,mode為last_irr_catch_up,所以代碼中調(diào)用sync_master->recv_notice(c,msg)函數(shù)來進(jìn)行區(qū)塊同步。此函數(shù)同步區(qū)塊分為兩種情況

   void sync_manager::recv_notice (connection_ptr c, const notice_message &msg) {
      fc_ilog (logger, "sync_manager got ${m} block notice",("m",modes_str(msg.known_blocks.mode)));
      if (msg.known_blocks.mode == catch_up) {
         if (msg.known_blocks.ids.size() == 0) {
            elog ("got a catch up with ids size = 0");
         }
         else {
             //同步到最新區(qū)塊
            verify_catchup(c,  msg.known_blocks.pending, msg.known_blocks.ids.back());
         }
      }
      else {
         c->last_handshake_recv.last_irreversible_block_num = msg.known_trx.pending;
         reset_lib_num (c);
         //同步到不可逆區(qū)塊
         start_sync(c, msg.known_blocks.pending);
      }
   }

我們重點(diǎn)看同步到不可逆區(qū)塊,函數(shù)為start_sync,參數(shù)為connection對象指針和消息中包含的最新區(qū)塊數(shù)(不過我覺得應(yīng)該是不可逆區(qū)塊數(shù))。

   void sync_manager::start_sync( connection_ptr c, uint32_t target) {
      if( target > sync_known_lib_num) {
         sync_known_lib_num = target;
      }

      if (!sync_required()) {
         uint32_t bnum = chain_plug->chain().last_irreversible_block_num();
         uint32_t hnum = chain_plug->chain().fork_db_head_block_num();
         fc_dlog( logger, "We are already caught up, my irr = $, head = ${h}, target = ${t}",
                  ("b",bnum)("h",hnum)("t",target));
         return;
      }

      if (state == in_sync) {
         set_state(lib_catchup);
         sync_next_expected_num = chain_plug->chain().last_irreversible_block_num() + 1;
      }

      fc_ilog(logger, "Catching up with chain, our last req is ${cc}, theirs is ${t} peer ${p}",
              ( "cc",sync_last_requested_num)("t",target)("p",c->peer_name()));

      request_next_chunk(c);
   }

在這里,成員變量sync_known_lib_num賦值為sync_known_lib_num和target的最大值。該成員變量表示已知的當(dāng)前不可逆區(qū)塊數(shù),可見上一步同步到不可逆區(qū)塊的調(diào)用應(yīng)該為start_sync(c, msg.known_trx.pending);,在這個函數(shù)里,主要是檢查區(qū)塊同步信息,然后調(diào)用request_next_chunk(c)函數(shù)進(jìn)行區(qū)塊同步。

   void sync_manager::request_next_chunk( connection_ptr conn ) {
      uint32_t head_block = chain_plug->chain().fork_db_head_block_num();
      ······ //省略代碼
      if( sync_last_requested_num != sync_known_lib_num ) {
         uint32_t start = sync_next_expected_num;
         uint32_t end = start + sync_req_span - 1; // sync_req_span為配置文件中設(shè)置,每次同步多少個區(qū)塊,默認(rèn)是100個。
         if( end > sync_known_lib_num )
            end = sync_known_lib_num;
         if( end > 0 && end >= start ) {
            fc_ilog(logger, "requesting range ${s} to ${e}, from ${n}",
                    ("n",source->peer_name())("s",start)("e",end));
            source->request_sync_blocks(start, end);//同樣是發(fā)送某種消息來同步區(qū)塊
            sync_last_requested_num = end;
         }
      }
   }

查看request_next_chunk函數(shù)的功能,我們發(fā)現(xiàn)內(nèi)部調(diào)用request_sync_blocks函數(shù)來發(fā)送同步消息,具體實(shí)現(xiàn)需要進(jìn)入分析。

void connection::request_sync_blocks (uint32_t start, uint32_t end) {
      sync_request_message srm = {start,end};
      enqueue( net_message(srm));
      sync_wait();
   }

request_sync_blocks函數(shù)內(nèi)部實(shí)現(xiàn)很簡單,構(gòu)造了一個sync_request_message類型的消息,然后放入到消息隊(duì)列中,消息的內(nèi)容為起始區(qū)塊和結(jié)束區(qū)塊,每次請求100個(config.ini中默認(rèn)設(shè)置,可以修改)。
此時這篇筆記討論的消息類型先到這里,下一篇筆記討論sync_request_message類型的消息,分析本地節(jié)點(diǎn)是如何從遠(yuǎn)程同步區(qū)塊的。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容