這篇博文是借助360開源的Floyd項目和百度開源的Braft項目來分析Raft的實現(xiàn),至于Raft的理論(概念)和原理及一些問題點可以從下面的連接中獲取,本身分析它是興趣,利用周末時間來延伸自己的知識點,如有說的不正確,請指教哈,這篇估計要寫一段時間,平時比較忙。
Floyd里有關多線程的東西使用了Pink實現(xiàn),但在分析Raft的時候會簡化它,不然東西比較多。我會按照網(wǎng)上的說明,把它至少分為三個部分來分析,第一部分是Leader選舉,第二部分是日志復制,第三部分是安全性,可能關鍵代碼比較多以及每個字段的作用。然后最后分析如何啟動(框架),有幾個線程,以及各個承擔的職責。另外的Paxos挺復雜的,我也不準備去分析,微信團隊也開源了一個類似的實現(xiàn)PhxPaxos,有興趣的可自己分析下。
幾個關鍵的字段作用結合下代碼來分析比較好,每個節(jié)點使用FloydContext來描述該時刻的狀態(tài)和一些數(shù)據(jù):
34 struct FloydContext {
35 // Role related
36 explicit FloydContext(const Options& _options)
37 : options(_options),
38 voted_for_ip(""),
39 voted_for_port(0),
40 leader_ip(""),
41 leader_port(0),
42 vote_quorum(0),
43 commit_index(0),
44 last_applied(0),
45 last_op_time(0),
46 apply_cond(&apply_mu) {}
47
48 void RecoverInit(RaftMeta *raft);
49 void BecomeFollower(uint64_t new_iterm,
50 const std::string leader_ip = "", int port = 0);
51 void BecomeCandidate();
52 void BecomeLeader();
53
54 Options options;
55 // Role related
56 uint64_t current_term;
58 Role role;
59 std::string voted_for_ip;
60 int voted_for_port;
61 std::string leader_ip;
62 int leader_port;
63 uint32_t vote_quorum;
64
65 uint64_t commit_index;
66 std::atomic<uint64_t> last_applied;
67 uint64_t last_op_time;
68
69 std::set<std::string> members;
70
71 // mutex protect commit_index
72 // used in floyd_apply thread and floyd_peer thread
73 slash::Mutex global_mu;
74 slash::Mutex apply_mu;
75 slash::CondVar apply_cond;
76 };
current_term表示當前任期,role表示節(jié)點在該任期是處于什么狀態(tài),每個成員變量的作用從命名大概知道,不具體分析了。
19 void FloydContext::RecoverInit(RaftMeta *raft_meta) {
20 current_term = raft_meta->GetCurrentTerm();
21 voted_for_ip = raft_meta->GetVotedForIp();
22 voted_for_port = raft_meta->GetVotedForPort();
23 commit_index = raft_meta->GetCommitIndex();
24 last_applied = raft_meta->GetLastApplied();
25 role = Role::kFollower;
26 }
28 void FloydContext::BecomeFollower(uint64_t new_term,
29 const std::string _leader_ip, int _leader_port) {
30 // when requestvote receive a large term, then we transfer from candidate to follower
31 // then we should set voted_for_ip to the leader_ip
32 // if (current_term < new_term) {
33 voted_for_ip = _leader_ip;
34 voted_for_port = _leader_port;
35 // }
36 current_term = new_term;
37 leader_ip = _leader_ip;
38 leader_port = _leader_port;
39 role = Role::kFollower;
40 }
42 void FloydContext::BecomeCandidate() {
43 current_term++;
44 role = Role::kCandidate;
45 leader_ip.clear();
46 leader_port = 0;
47 voted_for_ip = options.local_ip;
48 voted_for_port = options.local_port;
49 vote_quorum = 1;
50 }
52 void FloydContext::BecomeLeader() {
53 role = Role::kLeader;
54 leader_ip = options.local_ip;
55 leader_port = options.local_port;
56 }
每個節(jié)點在同一時刻(任期term)只會有一種狀態(tài)(Leader/Follower/Candidate),初始啟動時,集群中的節(jié)點都為Follower,像current_term等是在RecoverInit中從rocksdb數(shù)據(jù)庫中恢復(初始化的),每次狀態(tài)變更都會涉及到成員變量的修改,至于如何從一個狀態(tài)至另一個狀轉變的,先引用下轉換圖,然后從代碼層面分析。

40 class FloydPrimary {
41 public:
42 FloydPrimary(FloydContext* context, PeersSet* peers, RaftMeta* raft_meta,
43 const Options& options, Logger* info_log);
44 virtual ~FloydPrimary();
45
46 int Start();
47 int Stop();
48 void AddTask(TaskType type, bool is_delay = true);
49 private:
50 FloydContext* const context_;
51 PeersSet* const peers_;
52 RaftMeta* const raft_meta_;
53 Options options_;
54 Logger* const info_log_;
55
56 std::atomic<uint64_t> reset_elect_leader_time_;
57 std::atomic<uint64_t> reset_leader_heartbeat_time_;
58 pink::BGThread bg_thread_;
59
60 // The Launch* work is done by floyd_peer_thread
61 // Cron task
62 static void LaunchHeartBeatWrapper(void *arg);
63 void LaunchHeartBeat();
64 static void LaunchCheckLeaderWrapper(void *arg);
65 void LaunchCheckLeader();
66 static void LaunchNewCommandWrapper(void *arg);
67 void LaunchNewCommand();
68
69 void NoticePeerTask(TaskType type);
74 };
這個線程主要的工作是做些心跳機制,狀態(tài)轉換等,F(xiàn)loyd啟動的時候會先塞一個任務primary_->AddTask(kCheckLeader)主要是用于選擇Leader,所有節(jié)點初始狀態(tài)為Follower;
52 void FloydPrimary::AddTask(TaskType type, bool is_delay) {
59 switch (type) {
60 case kHeartBeat:
61 if (is_delay) {
62 uint64_t timeout = options_.heartbeat_us;
63 bg_thread_.DelaySchedule(timeout / 1000LL, LaunchHeartBeatWrapper, this);
64 } else {
65 bg_thread_.Schedule(LaunchHeartBeatWrapper, this);
66 }
67 break;
68 case kCheckLeader:
69 if (is_delay) {
70 uint64_t timeout = options_.check_leader_us;
71 bg_thread_.DelaySchedule(timeout / 1000LL, LaunchCheckLeaderWrapper, this);
72 } else {
73 bg_thread_.Schedule(LaunchCheckLeaderWrapper, this);
74 }
75 break;
76 case kNewCommand:
77 bg_thread_.Schedule(LaunchNewCommandWrapper, this);
78 break;
79 default:
81 break;
82 }
83 }
85 void FloydPrimary::LaunchHeartBeatWrapper(void *arg) {
86 reinterpret_cast<FloydPrimary *>(arg)->LaunchHeartBeat();
87 }
88
89 void FloydPrimary::LaunchHeartBeat() {
90 slash::MutexLock l(&context_->global_mu);
91 if (context_->role == Role::kLeader) {
92 NoticePeerTask(kNewCommand);
93 AddTask(kHeartBeat);
94 }
95 }
96
97 void FloydPrimary::LaunchCheckLeaderWrapper(void *arg) {
98 reinterpret_cast<FloydPrimary *>(arg)->LaunchCheckLeader();
99 }
101 void FloydPrimary::LaunchCheckLeader() {
102 slash::MutexLock l(&context_->global_mu);
103 if (context_->role == Role::kFollower || context_->role == Role::kCandidate) {
104 if (options_.single_mode) {
105 //code...
111 } else if (context_->last_op_time + options_.check_leader_us < slash::NowMicros()) {
112 context_->BecomeCandidate();
116 raft_meta_->SetCurrentTerm(context_->current_term);
117 raft_meta_->SetVotedForIp(context_->voted_for_ip);
118 raft_meta_->SetVotedForPort(context_->voted_for_port);
119 NoticePeerTask(kHeartBeat);
120 }
121 }
122 AddTask(kCheckLeader);
123 }
124
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126 reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
125 void FloydPrimary::LaunchNewCommandWrapper(void *arg) {
126 reinterpret_cast<FloydPrimary *>(arg)->LaunchNewCommand();
127 }
128
129 void FloydPrimary::LaunchNewCommand() {
131 if (context_->role != Role::kLeader) {
133 return;
134 }
135 NoticePeerTask(kNewCommand);
136 }
140 void FloydPrimary::NoticePeerTask(TaskType type) {
141 for (auto& peer : (*peers_)) {
142 switch (type) {
143 case kHeartBeat:
146 peer.second->AddRequestVoteTask();
147 break;
148 case kNewCommand:
151 peer.second->AddAppendEntriesTask();
152 break;
153 default:
156 }
157 }
158 }
其中心跳檢查也做了些投票邏輯(沒有分開),具體代碼如上,沒有多少難點,能做什么是取決于是什么狀態(tài)。
其中check_leader_us這個值在每個節(jié)點是不相同的:check_leader_us = std::rand() % 2000000 + check_leader_us,引用一段話“服務器啟動時初始狀態(tài)都是follower,如果在超時時間內沒有收到leader發(fā)送的心跳包,則進入candidate狀態(tài)進行選舉,服務器啟動時和leader掛掉時處理一樣。為了避免選票瓜分的情況,raft利用隨機超時機制避免選票瓜分情況。選舉超時時間從一個固定的區(qū)間隨機選擇,由于每個服務器的超時時間不同,則leader掛掉后,超時時間最短且擁有最多日志的follower最先開始選主,并成為leader。一旦candidate成為leader,就會向其他服務器發(fā)送心跳包阻止新一輪的選舉開始。”簡來說,隨機化超時時間檢查,檢查時間大概是 [3s, 5s)。
代碼行101?123就是判斷有沒有超時,超時的話從kFollower轉換為kCandidate,BecomeCandidate函數(shù)要做的事就是增加當前的任期數(shù),給自己投一票,然后通過心跳機制的方式向其他節(jié)點請求投票,并再次AddTask(kCheckLeader),進行下一輪檢查(超時后),來看一下AddRequestVoteTask主要做了什么,以及當其他節(jié)點收到心跳后的處理,這里為了使分析更簡單,假設有A/B兩個節(jié)點,A向B進行請求投票AddRequestVoteTask,此時A的某個工作線程進行了RequestVoteRPC[不考慮異步回調的事件]
93 void Peer::RequestVoteRPC() {
94 uint64_t last_log_term;
95 uint64_t last_log_index;
96 CmdRequest req;
97 {
98 slash::MutexLock l(&context_->global_mu);
99 raft_log_->GetLastLogTermAndIndex(&last_log_term, &last_log_index);
100
101 req.set_type(Type::kRequestVote);
102 CmdRequest_RequestVote* request_vote = req.mutable_request_vote();
103 request_vote->set_ip(options_.local_ip);
104 request_vote->set_port(options_.local_port);
105 request_vote->set_term(context_->current_term);
106 request_vote->set_last_log_term(last_log_term);
107 request_vote->set_last_log_index(last_log_index);
消息協(xié)議使用了google的pb,這個線程會把自己的當前任期數(shù),ip/port,以last_log_term和last_log_index,發(fā)給其他節(jié)點SendAndRecv[后面兩個字段的作用和何時修改,會在后面分析];
B節(jié)點的工作線程收到請求后,進行了如下處理:
89 case Type::kRequestVote:
90 response_.set_type(Type::kRequestVote);
91 floyd_->ReplyRequestVote(request_, &response_);
92 response_.set_code(StatusCode::kOk);
93 break;
714 int FloydImpl::ReplyRequestVote(const CmdRequest& request, CmdResponse* response) {
715 slash::MutexLock l(&context_->global_mu);
716 bool granted = false;
717 CmdRequest_RequestVote request_vote = request.request_vote();
718 /*
719 * If RPC request or response contains term T > currentTerm: set currentTerm = T, convert to follower (5.1)
720 */
721 if (request_vote.term() > context_->current_term) {
722 context_->BecomeFollower(request_vote.term());
723 raft_meta_->SetCurrentTerm(context_->current_term);
724 }
725 // if caller's term smaller than my term, then I will notice him
726 if (request_vote.term() < context_->current_term) {
727 BuildRequestVoteResponse(context_->current_term, granted, response);
728 return -1;
729 }
730 uint64_t my_last_log_term = 0;
731 uint64_t my_last_log_index = 0;
732 raft_log_->GetLastLogTermAndIndex(&my_last_log_term, &my_last_log_index);
733 // if votedfor is null or candidateId, and candidated's log is at least as up-to-date
734 // as receiver's log, grant vote
735 if ((request_vote.last_log_term() < my_last_log_term) ||
736 ((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index))) {
737 BuildRequestVoteResponse(context_->current_term, granted, response);
738 return -1;
739 }
740
741 if (vote_for_.find(request_vote.term()) != vote_for_.end()
742 && vote_for_[request_vote.term()] != std::make_pair(request_vote.ip(), request_vote.port())) {
743 BuildRequestVoteResponse(context_->current_term, granted, response);
744 return -1;
745 }
746 vote_for_[request_vote.term()] = std::make_pair(request_vote.ip(), request_vote.port());
747 context_->BecomeFollower(request_vote.term());
748 raft_meta_->SetCurrentTerm(context_->current_term);
749 raft_meta_->SetVotedForIp(context_->voted_for_ip);
750 raft_meta_->SetVotedForPort(context_->voted_for_port);
751 // Got my vote
752 GrantVote(request_vote.term(), request_vote.ip(), request_vote.port());
753 granted = true;
754 context_->last_op_time = slash::NowMicros();
755 BuildRequestVoteResponse(context_->current_term, granted, response);
756 return 0;
757 }
以上的就是收到請求投票的節(jié)點處理,分為以下幾種情況:
1)如果B的任期數(shù)小于A,則B變?yōu)?code>kFollower,如果B的任期數(shù)大于A,則拒絕投票,因為只有最新的Candidate才有可能成為Leader;
2)如果request_vote.last_log_term() < my_last_log_term,或者((request_vote.last_log_term() == my_last_log_term) && (request_vote.last_log_index() < my_last_log_index)),則拒絕投票;
3)然后判斷該term內是否對同一個節(jié)點進行了重復投票;
代碼行746?755進行投票,并修改狀態(tài);
my_last_log_index 和my_last_log_term表示節(jié)點的最新日志條目的索引值和當時的任期值,主要是為了安全性考慮,如果沒有這兩個字段參與的條件判斷,那成為leader的那個節(jié)點可能沒有最新的日志,term最大并不能解決這個問題,具有最新最全日志的server的才有資格去競選當上Leader,并且根據(jù)任期號可以判斷一條日志的新舊程度。
A收到投票響應后,進行了如下處理:
128 if (res.request_vote_res().term() > context_->current_term) {
129 context_->BecomeFollower(res.request_vote_res().term());
130 raft_meta_->SetCurrentTerm(context_->current_term);
131 raft_meta_->SetVotedForIp(context_->voted_for_ip);
132 raft_meta_->SetVotedForPort(context_->voted_for_port);
133 return;
134 }
135 if (context_->role == Role::kCandidate) {
136 if (res.request_vote_res().vote_granted() == true) { // granted
137 if (CheckAndVote(res.request_vote_res().term())) {
138 context_->BecomeLeader();
139 UpdatePeerInfo();
140 primary_->AddTask(kHeartBeat, false);
141 }
142 } else {
143 context_->BecomeFollower(res.request_vote_res().term());
144 raft_meta_->SetCurrentTerm(context_->current_term);
145 raft_meta_->SetVotedForIp(context_->voted_for_ip);
146 raft_meta_->SetVotedForPort(context_->voted_for_port);
147 }
148 } else if (context_->role == Role::kFollower) {
149 } else if (context_->role == Role::kLeader) {
150 }
151 }
152 return;
代碼行128?134判斷自己的任期數(shù)是否小于對端的,是的話則自己變?yōu)?code>kFollower;因為是異步請求投票,所以在收到某個節(jié)點的投票響應后,此時的節(jié)點狀態(tài)可能已經(jīng)發(fā)生變化了,故在檢查時,需要結合節(jié)點當前的狀態(tài);
如果是kCandidate,然后檢查當前任期內的投票數(shù)是否大于一定的數(shù),如果滿足則變?yōu)?code>kLeader,并且
72 void Peer::UpdatePeerInfo() {
73 for (auto& pt : (*peers_)) {
74 pt.second->set_next_index(raft_log_->GetLastLogIndex() + 1);
75 pt.second->set_match_index(0);
76 }
77 }
然后每個follower進行AddAppendEntriesTask,具體做了什么下次分析。
65 bool Peer::CheckAndVote(uint64_t vote_term) {
66 if (context_->current_term != vote_term) {
67 return false;
68 }
69 return (++context_->vote_quorum) > (options_.members.size() / 2);
70 }
如果是kFollower或者kLeader,啥也不做。
補充一下,見代碼行135?140,在Server成為新Leader后,會立刻AddTask,寫一條NOP日志,之后才能提供服務,主要是為了安全性考慮,下一篇會把這個坑填上。
下面幾個鏈接是我在分析源碼時,所做的一些參考,最后三個是一些相關的應用,包括優(yōu)化方案,原始Raft論文里的實現(xiàn)性能并不怎么好。
參考:
https://github.com/maemual/raft-zh_cn/blob/master/raft-zh_cn.md
https://www.cnblogs.com/katsura/p/6549344.html
https://yq.aliyun.com/articles/62425
http://www.itdecent.cn/p/4711c4c32aab
http://www.itdecent.cn/p/2a2ba021f721
http://www.itdecent.cn/p/ee7646c0f4cf
https://yq.aliyun.com/articles/398232?spm=a2c4e.11153940.blogcont62425.18.1e944bd0b1UuMv
https://yq.aliyun.com/articles/398237?spm=a2c4e.11153940.blogcont62425.19.1e944bd0b1UuMv
https://pingcap.com/blog-cn/#Raft
https://www.zhihu.com/question/54997169
https://zhuanlan.zhihu.com/p/25735592