Floyd&Raft的源碼分析(一)

這篇博文是借助360開源的Floyd項目和百度開源的Braft項目來分析Raft的實現(xiàn),至于Raft的理論(概念)和原理及一些問題點可以從下面的連接中獲取,本身分析它是興趣,利用周末時間來延伸自己的知識點,如有說的不正確,請指教哈,這篇估計要寫一段時間,平時比較忙。

Floyd里有關多線程的東西使用了Pink實現(xiàn),但在分析Raft的時候會簡化它,不然東西比較多。我會按照網(wǎng)上的說明,把它至少分為三個部分來分析,第一部分是Leader選舉,第二部分是日志復制,第三部分是安全性,可能關鍵代碼比較多以及每個字段的作用。然后最后分析如何啟動(框架),有幾個線程,以及各個承擔的職責。另外的Paxos挺復雜的,我也不準備去分析,微信團隊也開源了一個類似的實現(xiàn)PhxPaxos,有興趣的可自己分析下。

幾個關鍵的字段作用結合下代碼來分析比較好,每個節(jié)點使用FloydContext來描述該時刻的狀態(tài)和一些數(shù)據(jù):

 34 struct FloydContext {
 35   // Role related
 36   explicit FloydContext(const Options& _options)
 37     : options(_options),
 38       voted_for_ip(""),
 39       voted_for_port(0),
 40       leader_ip(""),
 41       leader_port(0),
 42       vote_quorum(0),
 43       commit_index(0),
 44       last_applied(0),
 45       last_op_time(0),
 46       apply_cond(&apply_mu) {}
 47 
 48   void RecoverInit(RaftMeta *raft);
 49   void BecomeFollower(uint64_t new_iterm,
 50       const std::string leader_ip = "", int port = 0);
 51   void BecomeCandidate();
 52   void BecomeLeader();
 53 
 54   Options options;
 55   // Role related
 56   uint64_t current_term;
 58   Role role;
 59   std::string voted_for_ip;
 60   int voted_for_port;
 61   std::string leader_ip;
 62   int leader_port;
 63   uint32_t vote_quorum;
 64 
 65   uint64_t commit_index;
 66   std::atomic<uint64_t> last_applied;
 67   uint64_t last_op_time;
 68 
 69   std::set<std::string> members;
 70 
 71   // mutex protect commit_index
 72   // used in floyd_apply thread and floyd_peer thread
 73   slash::Mutex global_mu;
 74   slash::Mutex apply_mu;
 75   slash::CondVar apply_cond;
 76 };

current_term表示當前任期,role表示節(jié)點在該任期是處于什么狀態(tài),每個成員變量的作用從命名大概知道,不具體分析了。

 19 void FloydContext::RecoverInit(RaftMeta *raft_meta) {
 20   current_term = raft_meta->GetCurrentTerm();
 21   voted_for_ip = raft_meta->GetVotedForIp();
 22   voted_for_port = raft_meta->GetVotedForPort();
 23   commit_index = raft_meta->GetCommitIndex();
 24   last_applied = raft_meta->GetLastApplied();
 25   role = Role::kFollower;
 26 }
 28 void FloydContext::BecomeFollower(uint64_t new_term,
 29                                   const std::string _leader_ip, int _leader_port) {
 30   // when requestvote receive a large term, then we transfer from candidate to follower
 31   // then we should set voted_for_ip to the leader_ip
 32   // if (current_term < new_term) {
 33   voted_for_ip = _leader_ip;
 34   voted_for_port = _leader_port;
 35   // }
 36   current_term = new_term;
 37   leader_ip = _leader_ip;
 38   leader_port = _leader_port;
 39   role = Role::kFollower;
 40 }

 42 void FloydContext::BecomeCandidate() {
 43   current_term++;
 44   role = Role::kCandidate;
 45   leader_ip.clear();
 46   leader_port = 0;
 47   voted_for_ip = options.local_ip;
 48   voted_for_port = options.local_port;
 49   vote_quorum = 1;
 50 }

 52 void FloydContext::BecomeLeader() {
 53   role = Role::kLeader;
 54   leader_ip = options.local_ip;
 55   leader_port = options.local_port;
 56 }

每個節(jié)點在同一時刻(任期term)只會有一種狀態(tài)(Leader/Follower/Candidate),初始啟動時,集群中的節(jié)點都為Follower,像current_term等是在RecoverInit中從rocksdb數(shù)據(jù)庫中恢復(初始化的),每次狀態(tài)變更都會涉及到成員變量的修改,至于如何從一個狀態(tài)至另一個狀轉變的,先引用下轉換圖,然后從代碼層面分析。

角色轉換圖及條件
 40 class FloydPrimary {
 41  public:
 42   FloydPrimary(FloydContext* context, PeersSet* peers, RaftMeta* raft_meta,
 43       const Options& options, Logger* info_log);
 44   virtual ~FloydPrimary();
 45 
 46   int Start();
 47   int Stop();
 48   void AddTask(TaskType type, bool is_delay = true);
 49  private:
 50   FloydContext* const context_;
 51   PeersSet* const peers_;
 52   RaftMeta* const raft_meta_;
 53   Options options_;
 54   Logger* const info_log_;
 55 
 56   std::atomic<uint64_t> reset_elect_leader_time_;
 57   std::atomic<uint64_t> reset_leader_heartbeat_time_;
 58   pink::BGThread bg_thread_;
 59 
 60   // The Launch* work is done by floyd_peer_thread
 61   // Cron task
 62   static void LaunchHeartBeatWrapper(void *arg);
 63   void LaunchHeartBeat();
 64   static void LaunchCheckLeaderWrapper(void *arg);
 65   void LaunchCheckLeader();
 66   static void LaunchNewCommandWrapper(void *arg);
 67   void LaunchNewCommand();
 68 
 69   void NoticePeerTask(TaskType type);
 74 };

這個線程主要的工作是做些心跳機制,狀態(tài)轉換等,F(xiàn)loyd啟動的時候會先塞一個任務primary_->AddTask(kCheckLeader)主要是用于選擇Leader,所有節(jié)點初始狀態(tài)為Follower;

 52 void FloydPrimary::AddTask(TaskType type, bool is_delay) {
 59   switch (type) {
 60     case kHeartBeat:
 61       if (is_delay) {
 62         uint64_t timeout = options_.heartbeat_us;
 63         bg_thread_.DelaySchedule(timeout / 1000LL, LaunchHeartBeatWrapper, this);
 64       } else {
 65         bg_thread_.Schedule(LaunchHeartBeatWrapper, this);
 66       }
 67       break;
 68     case kCheckLeader:
 69       if (is_delay) {
 70         uint64_t timeout = options_.check_leader_us;
 71         bg_thread_.DelaySchedule(timeout / 1000LL, LaunchCheckLeaderWrapper, this);
 72       } else {
 73         bg_thread_.Schedule(LaunchCheckLeaderWrapper, this);
 74       }
 75       break;
 76     case kNewCommand:
 77       bg_thread_.Schedule(LaunchNewCommandWrapper, this);
 78       break;
 79     default:
 81       break;
 82   }
 83 }
 85 void FloydPrimary::LaunchHeartBeatWrapper(void *arg) {
 86   reinterpret_cast<FloydPrimary *>(arg)->LaunchHeartBeat();
 87 }
 88 
 89 void FloydPrimary::LaunchHeartBeat() {
 90   slash::MutexLock l(&context_->global_mu);
 91   if (context_->role == Role::kLeader) {
 92     NoticePeerTask(kNewCommand);
 93     AddTask(kHeartBeat);
 94   }
 95 }
 96 
 97 void FloydPrimary::LaunchCheckLeaderWrapper(void *arg) {
 98   reinterpret_cast<FloydPrimary *>(arg)->LaunchCheckLeader();
 99 }
101 void FloydPrimary::LaunchCheckLeader() {
102   slash::MutexLock l(&context_->global_mu);
103   if (context_->role == Role::kFollower || context_->role == Role::kCandidate) {
104     if (options_.single_mode) {
105           //code...
111     } else if (context_->last_op_time + options_.check_leader_us < slash::NowMicros()) {
112       context_->BecomeCandidate();
116       raft_meta_->SetCurrentTerm(context_->current_term);
117       raft_meta_->SetVotedForIp(context_->voted_for_ip);
118       raft_meta_->SetVotedForPort(context_->voted_for_port);
119       NoticePeerTask(kHeartBeat);
120     }
121   }
122   AddTask(kCheckLeader);
123 }
124 
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126   reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126   reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
128 
129 void FloydPrimary::LaunchNewCommand() {
131   if (context_->role != Role::kLeader) {
133     return;
134   }
135   NoticePeerTask(kNewCommand);
136 }
140 void FloydPrimary::NoticePeerTask(TaskType type) {
141   for (auto& peer : (*peers_)) {
142     switch (type) {
143     case kHeartBeat:
146       peer.second->AddRequestVoteTask();
147       break;
148     case kNewCommand:
151       peer.second->AddAppendEntriesTask();
152       break;
153     default:
156     }
157   }
158 }

其中心跳檢查也做了些投票邏輯(沒有分開),具體代碼如上,沒有多少難點,能做什么是取決于是什么狀態(tài)。
其中check_leader_us這個值在每個節(jié)點是不相同的:check_leader_us = std::rand() % 2000000 + check_leader_us,引用一段話“服務器啟動時初始狀態(tài)都是follower,如果在超時時間內沒有收到leader發(fā)送的心跳包,則進入candidate狀態(tài)進行選舉,服務器啟動時和leader掛掉時處理一樣。為了避免選票瓜分的情況,raft利用隨機超時機制避免選票瓜分情況。選舉超時時間從一個固定的區(qū)間隨機選擇,由于每個服務器的超時時間不同,則leader掛掉后,超時時間最短且擁有最多日志的follower最先開始選主,并成為leader。一旦candidate成為leader,就會向其他服務器發(fā)送心跳包阻止新一輪的選舉開始。”簡來說,隨機化超時時間檢查,檢查時間大概是 [3s, 5s)。

代碼行101?123就是判斷有沒有超時,超時的話從kFollower轉換為kCandidate,BecomeCandidate函數(shù)要做的事就是增加當前的任期數(shù),給自己投一票,然后通過心跳機制的方式向其他節(jié)點請求投票,并再次AddTask(kCheckLeader),進行下一輪檢查(超時后),來看一下AddRequestVoteTask主要做了什么,以及當其他節(jié)點收到心跳后的處理,這里為了使分析更簡單,假設有A/B兩個節(jié)點,A向B進行請求投票AddRequestVoteTask,此時A的某個工作線程進行了RequestVoteRPC[不考慮異步回調的事件]

 93 void Peer::RequestVoteRPC() {
 94   uint64_t last_log_term;
 95   uint64_t last_log_index;
 96   CmdRequest req;
 97   {
 98   slash::MutexLock l(&context_->global_mu);
 99   raft_log_->GetLastLogTermAndIndex(&last_log_term, &last_log_index);
100 
101   req.set_type(Type::kRequestVote);
102   CmdRequest_RequestVote* request_vote = req.mutable_request_vote();
103   request_vote->set_ip(options_.local_ip);
104   request_vote->set_port(options_.local_port);
105   request_vote->set_term(context_->current_term);
106   request_vote->set_last_log_term(last_log_term);
107   request_vote->set_last_log_index(last_log_index);

消息協(xié)議使用了google的pb,這個線程會把自己的當前任期數(shù),ip/port,以last_log_termlast_log_index,發(fā)給其他節(jié)點SendAndRecv[后面兩個字段的作用和何時修改,會在后面分析];
B節(jié)點的工作線程收到請求后,進行了如下處理:

 89     case Type::kRequestVote:
 90       response_.set_type(Type::kRequestVote);
 91       floyd_->ReplyRequestVote(request_, &response_);
 92       response_.set_code(StatusCode::kOk);
 93       break;

714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
715   slash::MutexLock l(&context_->global_mu);
716   bool granted = false;
717   CmdRequest_RequestVote request_vote = request.request_vote();
718   /*
719    * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
720    */
721   if (request_vote.term() > context_->current_term) {
722     context_->BecomeFollower(request_vote.term());
723     raft_meta_->SetCurrentTerm(context_->current_term);
724   } 
725   // if caller's term smaller than my term, then I will notice him
726   if (request_vote.term() < context_->current_term) {
727     BuildRequestVoteResponse(context_->current_term, granted, response);
728     return -1;
729   }
730   uint64_t my_last_log_term = 0;
731   uint64_t my_last_log_index = 0;
732   raft_log_->GetLastLogTermAndIndex(&my_last_log_term, &my_last_log_index);
733   // if votedfor is null or candidateId, and candidated's log is at least as up-to-date
734   // as receiver's log, grant vote
735   if ((request_vote.last_log_term() < my_last_log_term) ||
736       ((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index))) {
737     BuildRequestVoteResponse(context_->current_term, granted, response);
738     return -1;
739   } 
740   
741   if (vote_for_.find(request_vote.term()) != vote_for_.end()
742       && vote_for_[request_vote.term()] != std::make_pair(request_vote.ip(), request_vote.port())) {
743     BuildRequestVoteResponse(context_->current_term, granted, response);
744     return -1;
745   }
746   vote_for_[request_vote.term()] = std::make_pair(request_vote.ip(), request_vote.port());
747   context_->BecomeFollower(request_vote.term());
748   raft_meta_->SetCurrentTerm(context_->current_term);
749   raft_meta_->SetVotedForIp(context_->voted_for_ip);
750   raft_meta_->SetVotedForPort(context_->voted_for_port);
751   // Got my vote
752   GrantVote(request_vote.term(), request_vote.ip(), request_vote.port());
753   granted = true;
754   context_->last_op_time = slash::NowMicros();
755   BuildRequestVoteResponse(context_->current_term, granted, response);
756   return 0;
757 }

以上的就是收到請求投票的節(jié)點處理,分為以下幾種情況:
1)如果B的任期數(shù)小于A,則B變?yōu)?code>kFollower,如果B的任期數(shù)大于A,則拒絕投票,因為只有最新的Candidate才有可能成為Leader;
2)如果request_vote.last_log_term() < my_last_log_term,或者((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index)),則拒絕投票;
3)然后判斷該term內是否對同一個節(jié)點進行了重復投票;
代碼行746?755進行投票,并修改狀態(tài);

my_last_log_indexmy_last_log_term表示節(jié)點的最新日志條目的索引值和當時的任期值,主要是為了安全性考慮,如果沒有這兩個字段參與的條件判斷,那成為leader的那個節(jié)點可能沒有最新的日志,term最大并不能解決這個問題,具有最新最全日志的server的才有資格去競選當上Leader,并且根據(jù)任期號可以判斷一條日志的新舊程度。

A收到投票響應后,進行了如下處理:

128   if (res.request_vote_res().term() > context_->current_term) {
129     context_->BecomeFollower(res.request_vote_res().term());
130     raft_meta_->SetCurrentTerm(context_->current_term);
131     raft_meta_->SetVotedForIp(context_->voted_for_ip);
132     raft_meta_->SetVotedForPort(context_->voted_for_port);
133     return;
134   } 
135   if (context_->role == Role::kCandidate) {
136     if (res.request_vote_res().vote_granted() == true) {    // granted
137       if (CheckAndVote(res.request_vote_res().term())) {
138         context_->BecomeLeader();
139         UpdatePeerInfo();
140         primary_->AddTask(kHeartBeat, false);
141       } 
142     } else {
143       context_->BecomeFollower(res.request_vote_res().term());
144       raft_meta_->SetCurrentTerm(context_->current_term);
145       raft_meta_->SetVotedForIp(context_->voted_for_ip);
146       raft_meta_->SetVotedForPort(context_->voted_for_port);
147     } 
148   } else if (context_->role == Role::kFollower) {
149   } else if (context_->role == Role::kLeader) {
150   }     
151   }     
152   return;

代碼行128?134判斷自己的任期數(shù)是否小于對端的,是的話則自己變?yōu)?code>kFollower;因為是異步請求投票,所以在收到某個節(jié)點的投票響應后,此時的節(jié)點狀態(tài)可能已經(jīng)發(fā)生變化了,故在檢查時,需要結合節(jié)點當前的狀態(tài);
如果是kCandidate,然后檢查當前任期內的投票數(shù)是否大于一定的數(shù),如果滿足則變?yōu)?code>kLeader,并且

 72 void Peer::UpdatePeerInfo() {
 73   for (auto& pt : (*peers_)) {
 74     pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
 75     pt.second->set_match_index(0);
 76   }
 77 }

然后每個follower進行AddAppendEntriesTask,具體做了什么下次分析。

 65 bool Peer::CheckAndVote(uint64_t vote_term) {
 66   if (context_->current_term != vote_term) {
 67     return false;
 68   }
 69   return (++context_->vote_quorum) > (options_.members.size() / 2);
 70 }

如果是kFollower或者kLeader,啥也不做。
補充一下,見代碼行135?140,在Server成為新Leader后,會立刻AddTask,寫一條NOP日志,之后才能提供服務,主要是為了安全性考慮,下一篇會把這個坑填上。

下面幾個鏈接是我在分析源碼時,所做的一些參考,最后三個是一些相關的應用,包括優(yōu)化方案,原始Raft論文里的實現(xiàn)性能并不怎么好。
參考:
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
https://www.cnblogs.com/katsura/p/6549344.html
https://yq.aliyun.com/articles/62425
http://www.itdecent.cn/p/4711c4c32aab
http://www.itdecent.cn/p/2a2ba021f721
http://www.itdecent.cn/p/ee7646c0f4cf
https://yq.aliyun.com/articles/398232?spm=a2c4e.11153940.blogcont62425.18.1e944bd0b1UuMv
https://yq.aliyun.com/articles/398237?spm=a2c4e.11153940.blogcont62425.19.1e944bd0b1UuMv
https://pingcap.com/blog-cn/#Raft
https://www.zhihu.com/question/54997169
https://zhuanlan.zhihu.com/p/25735592

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容