1. 簡介
本文介紹一個簡單的Raft實現。如果有看過Raft論文,那么看這個Raft實現會覺得比較輕松,因為Raft論文中把實現的細節(jié)描述的非常詳細,工程實現基本上就是將Raft論文中的描述用編程語言重新表達一遍。這就是Raft相對于Paxos最大的優(yōu)點,即容易看懂并且容易實現。本文中介紹的Raft實現是用C語言碼成的,除了日志壓縮功能沒有實現,其它特性都有實現,成員變更機制也做的比較簡單,一次只支持一條配置更改。關于Raft的原理可以看Raft論文和《Raft理解》。
2.Raft基本概念
2.1 狀態(tài)
raft有三種狀態(tài):Leader,Candidate和Follower。這三種狀態(tài)的轉換如下圖所示。只有Leader具有處理客戶請求和向Follower復制日志的權利。Candidate是一種Follower向Leader轉換的中間狀態(tài),當集群中沒有Leader的時候,Follower進入Candidate狀態(tài),并向集群中發(fā)起投票,獲取到大多數投票的Follower會變成Leader。
2.2 消息
Raft為了提高協議的可理解性,消息類型的設定及其精簡,只有下面兩種請求。
requestVote 發(fā)起投票請求。Candidate發(fā)起投票時的請求。由集群中其它Follower和Candidate接收處理。
appendEntries 添加日志請求。Leader向Follower添加日志時發(fā)出的請求。
2.3 任期號
Raft協議中使用任期號term來表明時間的新舊關系,這個term值在每個Leader的任期內是不變的,在不同Leader的中是絕對不同且隨時間單調遞增的。如果一條請求A的term比另一個請求B要大,那么說明請求B是過時的。
3.Raft實現
3.1 協議
先介紹四個重要數據結構,對應上面提到過的requestVote和appendEntries請求和回復。
/** requestVote 請求投票
* 競選者Candidate去競選Leader時發(fā)送給其它node的投票請求。
* 其它Leader或者Candidate收到term比自己大的投票請求時,會自動變成Follower*/
typedef struct
{
/** 當前任期號,通過任期號的大小與其它Candidate競爭Leader */
int term;
/** 競選者的id */
int candidate_id;
/** 競選者本地保存的最新一條日志的index */
int last_log_idx;
/** 競選者本地保存的最新一條日志的任期號*/
int last_log_term;
} msg_requestvote_t;
/** 投票請求的回復response.
* 該response主要是給返回某個node是否接收了Candidate的投票請求. */
typedef struct
{
/** node的任期號,Candidate根據投票結果和node的任期號來更新自己的任期號 */
int term;
/** 投票結果,如果node給Candidate投票則為true */
int vote_granted;
} msg_requestvote_response_t;
/** 添加日志請求.
* Follower可以從該消息中知道哪些日志可以安全地提交到狀態(tài)機FSM中去。
* Leader可以將該消息作為心跳消息定期發(fā)送。
* 舊的Leader和Candidate收到該消息后可能會自動變成Follower */
typedef struct
{
/** Leader當前的任期號 */
int term;
/** 最新日志的前一條日志的index,用于Follower確認與Leader的日志完全一致 */
int prev_log_idx;
/** 最新日志的前一條日志的任期號term */
int prev_log_term;
/** leader當前已經確認提交到狀態(tài)機FSM的日志索引index,這意味著Follower也可以安全地將該索引index以前的日志提交 */
int leader_commit;
/** 這條添加日志消息攜帶的日志條數,該實現中最多只有一條 */
int n_entries;
/** 這條添加日志消息中攜帶的日志數組 */
msg_entry_t* entries;
} msg_appendentries_t;
/** 添加日志回復.
* 舊的Leader或Candidate收到該消息會變成Follower */
typedef struct
{
/** 當前任期號 */
int term;
/** node成功添加日志時返回ture,即prev_log_index和prev_log_term都比對成功。否則返回false */
int success;
/* 下面兩個字段不是Raft論文中規(guī)定的字段:
/* 用來優(yōu)化日志追加過程,以加速日志的追加。Raft原文中的追加過程是一次只能追加一條日志*/
/** 處理添加日志請求后本地的最大日志索引 */
int current_idx;
/** 從添加日志請求中接受的第一條日志索引 */
int first_idx;
} msg_appendentries_response_t;
3.2 兩個重要的抽象
- raft_server_private_t 該結構體是Raft在實現中的抽象體,保存了Raft協議運行過程中狀態(tài)和需要的所有數據。
typedef struct {
/* 所有服務器比較固定的狀態(tài): */
/* 服務器最后一次知道的任期號(初始化為 0,持續(xù)遞增) */
int current_term;
/* 記錄在當前分期內給哪個Candidate投過票,
*/
int voted_for;
/* 日志條目集;每一個條目包含一個用戶狀態(tài)機執(zhí)行的指令,和收到時的任期號 */
void* log;
/* 變動比較頻繁的變量: */
/* 已知的最大的已經被提交的日志條目的索引值 */
int commit_idx;
/* 最后被應用到狀態(tài)機的日志條目索引值(初始化為 0,持續(xù)遞增) */
int last_applied_idx;
/* 三種狀態(tài):follower/leader/candidate */
int state;
/* 計時器,周期函數每次執(zhí)行時會遞增改值 */
int timeout_elapsed;
raft_node_t* nodes;
int num_nodes;
int election_timeout;
int request_timeout;
/* 保存Leader的信息,沒有Leader時為NULL */
raft_node_t* current_leader;
/* callbacks,由調用該raft實現的調用者來實現,網絡IO和持久存儲
* 都由調用者在callback中實現 */
raft_cbs_t cb;
void* udata;
/* 自己的信息 */
raft_node_t* node;
/* 該raft實現每次只進行一個服務器的配置更改,該變量記錄raft server
* 是否正在進行配置更改*/
int voting_cfg_change_log_idx;
} raft_server_private_t;
- raft_node_private_t 集群中機器節(jié)點的抽象體,包含了raft協議運行過程中需要保存的其它機器上的信息
typedef struct
{
void* udata; /*一般保存與其它機器的連接信息,由使用者決定怎么實現連接*/
int next_idx; /*對于每一個服務器,需要發(fā)送給他的下一個日志條目的索引值(初始化為領導人最后索引值加一)*/
int match_idx; /*對于每一個服務器,已經復制給他的日志的最高索引值*/
int flags; /*有三種取值,是相或的關系 1:該機器有給我投票 2:該機器有投票權 3: 該機器有最新的日志*/
int id; /*機器對應的id值,這個每臺機器在全局都是唯一的*/
} raft_node_private_t;
3.3 Raft協議過程
-
周期函數 Raft需要周期性地做一些事情,比如Leader需要周期性地給其它服務器append日志,以讓日志落后的服務器有機會追上來;所有服務器需要周期性地將已經確認提交的日志應用到狀態(tài)機中去等等。
raft_periodic函數是該raft實現中被周期性調用的函數,調用周期是1000ms。機器在不同狀態(tài)下會在這個函數中做不同的事情。Leader周期性地向Follower同步日志。而Follower周期性地檢測是否在特定的時間內沒有收到過來自Leader的心跳包,如果是的話就變成Candidate開始發(fā)起投票競選Leader。不管是Leader還是Follower,都會周期性地將已經提交的日志commit到狀態(tài)機FSM中去。
/** raft周期性執(zhí)行的函數,實現raft中的定時器以及定期應用日志到狀態(tài)機
*/
int raft_periodic(raft_server_t* me_, int msec_since_last_period)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
/* 選舉計時器;Follower每次收到Leader的心跳后會重置清0,Leader每次發(fā)送日志也會清0 */
me->timeout_elapsed += msec_since_last_period;
/* Leader周期性地向Follower同步日志 */
if (me->state == RAFT_STATE_LEADER)
{
if (me->request_timeout <= me->timeout_elapsed)
raft_send_appendentries_all(me_);
}
/* Follower檢測選舉計時器是否超時 */
else if (me->election_timeout <= me->timeout_elapsed)
{
if (1 < me->num_nodes)
raft_election_start(me_);
}
/* 周期性地將已經確認commit的日志應用到狀態(tài)機FSM */
if (me->last_applied_idx < me->commit_idx)
if (-1 == raft_apply_entry(me_))
return -1;
return 0;
}
- 成為競選者Candidate 集群中每個服務器都有一個競選計時器,當一個服務器在計時器超時時間內都沒有收到來自Leader的心跳,則認為集群中不存在Leader或者是Leader掛了,該服務器就會變成Candidate,進而發(fā)起投票去競選Leader,下面raft_become_candidate函數就是服務器變成Candidate的函數,函數中主要做這幾件事情:
- 自增當前的任期號(currentTerm)
- 給自己投票
- 重置選舉超時計時器
- 發(fā)送請求投票的 RPC 給其他所有服務器
/** Follower成為Candidate執(zhí)行的函數
*/
void raft_become_candidate(raft_server_t* me_)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
int i;
/*自增當前的任期號;給自己投票,設置自己的狀態(tài)為CANDIDATE*/
raft_set_current_term(me_, raft_get_current_term(me_) + 1);
for (i = 0; i < me->num_nodes; i++)
raft_node_vote_for_me(me->nodes[i], 0);
raft_vote(me_, me->node);
me->current_leader = NULL;
raft_set_state(me_, RAFT_STATE_CANDIDATE);
/* 重置選舉超時計時器。為了防止多個Candidate競爭,將下一次發(fā)起投票的時間間隔設置成隨機值*/
/* TODO: this should probably be lower */
me->timeout_elapsed = rand() % me->election_timeout;
/*發(fā)送請求投票的 RPC 給其他所有服務器*/
for (i = 0; i < me->num_nodes; i++)
if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i]))
raft_send_requestvote(me_, me->nodes[i]);
}
- 處理投票請求 處理投票請求的邏輯主要就是判斷是否要同意投票,判斷的依據就是請求中的任期號和日志信息的新舊程度,還有就是自己是否給其它相同任期號的服務器投過票,如果投過就不能再投,每人只有一票投票權。
如果term > currentTerm, 則轉為Follower模式。
這里收到投票請求的服務器有可能是一個網絡狀況不佳的Leader或者是一個還沒來得及發(fā)出投票請求的Candidate,他們收到任期號比自己要新的請求后,都要無條件變成Follower,以保證只有一個Leader存在如果term < currentTerm返回false。請求中的term比自己的term還要小,說明是一個過時的請求,則不給它投票返回false。
如果 term == currentTerm,請求中的日志信息不比本地日志舊,并且尚未給其它Candidate投過票,那么就投票給他
/** 處理投票請求
*/
int raft_recv_requestvote(raft_server_t* me_,
raft_node_t* node,
msg_requestvote_t* vr,
msg_requestvote_response_t *r)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
/*如果請求中term > 本地currentTerm, 則轉為Follower模式*/
if (raft_get_current_term(me_) < vr->term)
{
raft_set_current_term(me_, vr->term);
raft_become_follower(me_);
}
/*如果需要投票,則回復true,即將r->vote_granted = 1;*/
if (__should_grant_vote(me, vr))
{
assert(!(raft_is_leader(me_) || raft_is_candidate(me_)));
/*同意投票--本地記錄給哪個服務器投了票,并設置response中的vote_granted為1*/
raft_vote_for_nodeid(me_, vr->candidate_id);
r->vote_granted = 1;
/* there must be in an election. */
me->current_leader = NULL;
me->timeout_elapsed = 0;
}
else
r->vote_granted = 0;
__log(me_, node, "node requested vote: %d replying: %s",
node, r->vote_granted == 1 ? "granted" : "not granted");
/*更新本地保存的任期號,與請求中的保持一致*/
r->term = raft_get_current_term(me_);
return 0;
}
/** 檢查是否滿足投票的條件
*/
static int __should_grant_vote(raft_server_private_t* me, msg_requestvote_t* vr)
{
/**請求中的任期號term比本地term要小,不給投票*/
if (vr->term < raft_get_current_term((void*)me))
return 0;
/*如果已經投過票了,返回false*/
/* TODO: if voted for is candiate return 1 (if below checks pass) */
if (raft_already_voted((void*)me))
return 0;
/* 下面代碼檢查請求中日志信息是否比本地日志新*/
/* 獲取本地最新的日志索引 */
int current_idx = raft_get_current_idx((void*)me);
/* 本地日志為空,請求中的日志信息絕對比本地要新,返回true */
if (0 == current_idx)
return 1;
/* 如果本地最新日志中的任期號比請求中的last_log_term要小,則返回true */
raft_entry_t* e = raft_get_entry_from_idx((void*)me, current_idx);
if (e->term < vr->last_log_term)
return 1;
/* 本地最新日志中的任期號與請求中的last_log_term相等,則比較日志索引,索引比較大的說明日志比較新*/
if (vr->last_log_term == e->term && current_idx <= vr->last_log_idx)
return 1;
/*果本地最新日志中的任期號比請求中的last_log_term要大,則返回false */
return 0;
}
- 收到投票回復 Candidate收到投票回復后,檢查是否給自己投了票,如果投了票則統計當前收到的投票總數,超過一半則成為Leader
/** 處理投票恢復
*/
int raft_recv_requestvote_response(raft_server_t* me_,
raft_node_t* node,
msg_requestvote_response_t* r)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
__log(me_, node, "node responded to requestvote status: %s",
r->vote_granted == 1 ? "granted" : "not granted");
/* Oh~我不是Candidate,直接返回 */
if (!raft_is_candidate(me_))
{
return 0;
}
/* response中的任期號比自己的大,說明自己的term已經過時,無條件轉為Follower */
else if (raft_get_current_term(me_) < r->term)
{
raft_set_current_term(me_, r->term);
raft_become_follower(me_);
return 0;
}
/* response中的任期號比自己小,說明收到了一個過時的response,忽略即可。
* 當網絡比較差的時候容易出現這種情況 */
else if (raft_get_current_term(me_) != r->term)
{
return 0;
}
__log(me_, node, "node responded to requestvote: %d status: %s ct:%d rt:%d",
node, r->vote_granted == 1 ? "granted" : "not granted",
me->current_term,
r->term);
/* Yeah~給我投票了 */
if (1 == r->vote_granted)
{
/* 記錄給自己投票的服務器信息 */
if (node)
raft_node_vote_for_me(node, 1);
int votes = raft_get_nvotes_for_me(me_);
/* 如果給自己投票的服務器超過了總數的一般,則成為Leader */
if (raft_votes_is_majority(me->num_nodes, votes))
raft_become_leader(me_);
}
return 0;
}
- 添加日志請求 Leader除了在收到客戶端請求后會發(fā)起添加日志請求,還會在周期函數raft_periodic中發(fā)起添加日志請求。Leader維護了所有Follower的日志情況,如果Follower的日志比較舊,就會周期性地給它發(fā)送添加日志請求。關于日志怎么同步和保持一致性的原理,可以閱讀raft論文5.3節(jié)--日志復制。簡單地說就是,Leader在給Follower發(fā)送一條日志N時,會順帶將前一條日志M的信息也帶過去。Follower會檢查請求中前一條日志M的信息與本地相同索引的日志是否吻合,如果吻合說明本地在M以前的所有日志都是和Leader一致的(raft論文中使用遞歸法證明,因為所有日志都是按照同樣的規(guī)則添加的)。
/** 給某個Follower發(fā)送添加日志請求
*/
int raft_send_appendentries(raft_server_t* me_, raft_node_t* node)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
assert(node);
assert(node != me->node);
/* callback函數,實現網絡發(fā)送功能,由使用該raft實現的調用者實現網絡IO功能*/
if (!(me->cb.send_appendentries))
return -1;
/* 初始化請求的參數-- 當前任期號、最新日志索引 */
msg_appendentries_t ae;
ae.term = me->current_term;
ae.leader_commit = raft_get_commit_idx(me_);
ae.prev_log_idx = 0;
ae.prev_log_term = 0;
ae.n_entries = 0;
ae.entries = NULL;
/* 根據記錄的Follower的日志信息,獲取要發(fā)給Follower的下一條日志索引 */
int next_idx = raft_node_get_next_idx(node);
msg_entry_t mety;
/* 添加下一條日志的內容*/
raft_entry_t* ety = raft_get_entry_from_idx(me_, next_idx);
if (ety)
{
mety.term = ety->term;
mety.id = ety->id;
mety.type = ety->type;
mety.data.len = ety->data.len;
mety.data.buf = ety->data.buf;
ae.entries = &mety;
// TODO: we want to send more than 1 at a time
ae.n_entries = 1;
}
/* 添加要添加日志的前一條日志信息,用來做日志一致性檢查,關于怎么保證
* Leader和Follower日志的一致性,可參看raft論文第5.3節(jié)--日志復制*/
if (1 < next_idx)
{
raft_entry_t* prev_ety = raft_get_entry_from_idx(me_, next_idx - 1);
ae.prev_log_idx = next_idx - 1;
if (prev_ety)
ae.prev_log_term = prev_ety->term;
}
__log(me_, node, "sending appendentries node: ci:%d t:%d lc:%d pli:%d plt:%d",
raft_get_current_idx(me_),
ae.term,
ae.leader_commit,
ae.prev_log_idx,
ae.prev_log_term);
/* 調用callback發(fā)送請求,callback由該raft實現的調用者來實現*/
me->cb.send_appendentries(me_, me->udata, node, &ae);
return 0;
}
- 處理添加日志請求 所有的服務器都有可能收到添加日志請求,比如過時的Leader和Candidate以及正常運行的Follower。處理添加日志請求的過程主要就是驗證請求中的日志是否比本地日志新的過程。
/*
1. 處理任期號的三種情況(大于等于和小于)
2. 處理prev log不一致的情況,返回包中告訴Leader自己目前的log情況
3. 處理添加日志成功的情況-- 保存新日志并更新current_idx和commit_idx
*/
int raft_recv_appendentries(
raft_server_t* me_,
raft_node_t* node,
msg_appendentries_t* ae,
msg_appendentries_response_t *r
)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
me->timeout_elapsed = 0;
if (0 < ae->n_entries)
__log(me_, node, "recvd appendentries from: %lx, t:%d ci:%d lc:%d pli:%d plt:%d #%d",
node,
ae->term,
raft_get_current_idx(me_),
ae->leader_commit,
ae->prev_log_idx,
ae->prev_log_term,
ae->n_entries);
r->term = me->current_term;
/* 處理任期號 */
/* currentTerm == ae->term,當自己是Candidate時收到term與自己相等的請求,
* 說明已經有其它Candidate成為了Leader,自己無條件變成Follower*/
if (raft_is_candidate(me_) && me->current_term == ae->term)
{
me->voted_for = -1;
raft_become_follower(me_);
}
/* currentTerm < ae->term. 自己的任期號已經落后Leader,無條件成為Follower,并且更新自己的term*/
else if (me->current_term < ae->term)
{
raft_set_current_term(me_, ae->term);
r->term = ae->term;
raft_become_follower(me_);
}
/* currentTerm > ae->term. 說明收到一個過時Leader的請求,直接回包告訴它最新的term */
else if (ae->term < me->current_term)
{
/* 1. Reply false if term < currentTerm (?§5.1) */
__log(me_, node, "AE term %d is less than current term %d",
ae->term, me->current_term);
goto fail_with_current_idx;
}
/* NOTE: the log starts at 1 */
/* 檢查請求中prev_log_idx日志的term與本地對應索引的term是否一致 */
if (0 < ae->prev_log_idx)
{
raft_entry_t* e = raft_get_entry_from_idx(me_, ae->prev_log_idx);
/* 本地在prev_log_idx位置還不存在日志,說明日志已經落后Leader了,返回false
* 并告訴leader自己當前日志的位置,這樣Leader知道下一次該發(fā)哪條日志過來了*/
if (!e)
{
__log(me_, node, "AE no log at prev_idx %d", ae->prev_log_idx);
goto fail_with_current_idx;
}
if (raft_get_current_idx(me_) < ae->prev_log_idx)
goto fail_with_current_idx;
/* 本地在prev_log_idx位置的日志的term與請求中的prev_log_term不一致,
* 此時本地無條件刪除本地與請求不一致的日志,并向Leader返回刪除后的日志位置*/
if (e->term != ae->prev_log_term)
{
__log(me_, node, "AE term doesn't match prev_term (ie. %d vs %d) ci:%d pli:%d",
e->term, ae->prev_log_term, raft_get_current_idx(me_), ae->prev_log_idx);
assert(me->commit_idx < ae->prev_log_idx);
/* Delete all the following log entries because they don't match */
log_delete(me->log, ae->prev_log_idx);
r->current_idx = ae->prev_log_idx - 1;
goto fail;
}
}
/* 本地的日志比Leader要多。當本地服務器曾經是Leader,收到了很多客戶端請求
* 并還沒來得及同步時會出現這種情況。這時本地無條件刪除比Leader多的日志 */
if (ae->n_entries == 0 && 0 < ae->prev_log_idx && ae->prev_log_idx + 1 < raft_get_current_idx(me_))
{
assert(me->commit_idx < ae->prev_log_idx + 1);
log_delete(me->log, ae->prev_log_idx + 1);
}
r->current_idx = ae->prev_log_idx;
/* 下面for循環(huán)跳過請求中已經在本地添加過的日志*/
int i;
for (i = 0; i < ae->n_entries; i++)
{
msg_entry_t* ety = &ae->entries[i];
int ety_index = ae->prev_log_idx + 1 + i;
raft_entry_t* existing_ety = raft_get_entry_from_idx(me_, ety_index);
r->current_idx = ety_index;
if (existing_ety && existing_ety->term != ety->term)
{
assert(me->commit_idx < ety_index);
log_delete(me->log, ety_index);
break;
}
else if (!existing_ety)
break;
}
/* 下面for循環(huán)將請求中確認的新日志添加到本地 */
for (; i < ae->n_entries; i++)
{
int e = raft_append_entry(me_, &ae->entries[i]);
if (-1 == e)
goto fail_with_current_idx;
r->current_idx = ae->prev_log_idx + 1 + i;
}
/* 4. 請求中攜帶了Leader已經提交到狀態(tài)機的日志索引,本地同樣也更新這個索引,將其
* 設置為本地最大日志索引和leader_commit中的較小者*/
if (raft_get_commit_idx(me_) < ae->leader_commit)
{
int last_log_idx = max(raft_get_current_idx(me_), 1);
raft_set_commit_idx(me_, min(last_log_idx, ae->leader_commit));
}
/* 更新Leader信息 */
me->current_leader = node;
r->success = 1;
r->first_idx = ae->prev_log_idx + 1;
return 0;
fail_with_current_idx:
r->current_idx = raft_get_current_idx(me_);
fail:
r->success = 0;
r->first_idx = 0;
return -1;
}
- 處理添加日志請求回復 Leader收到添加日志回復后,可以知道下面這些信息:
- 自己是不是已經過時(current_term < response->term即為過時)
- follower是否成功添加日志,如果添加失敗,則減小發(fā)給follower的日志索引nextIndex再重試;如果添加成功則更新本地記錄的follower日志信息,并檢查日志是否最新,如果不是最新則繼續(xù)發(fā)送添加日志請求。
- 新機器的日志添加,詳見3.4節(jié)-- 成員變更
/** 處理添加日志請求回復
* /
int raft_recv_appendentries_response(raft_server_t* me_,
raft_node_t* node,
msg_appendentries_response_t* r)
{
raft_server_private_t* me = (raft_server_private_t*)me_;
__log(me_, node,
"received appendentries response %s ci:%d rci:%d 1stidx:%d",
r->success == 1 ? "SUCCESS" : "fail",
raft_get_current_idx(me_),
r->current_idx,
r->first_idx);
/* 過時的回復 -- 忽略 */
if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node))
return 0;
/* oh~我不是Leader */
if (!raft_is_leader(me_))
return -1;
/* 回復中的term比自己的要大,說明自己是一個過時的Leader,無條件轉為Follower */
if (me->current_term < r->term)
{
raft_set_current_term(me_, r->term);
raft_become_follower(me_);
return 0;
}
/* 過時的回復,網絡狀況不好時會出現 */
else if (me->current_term != r->term)
return 0;
/* stop processing, this is a node we don't have in our configuration */
if (!node)
return 0;
/* 由于日志不一致導致添加日志不成功*/
if (0 == r->success)
{
assert(0 <= raft_node_get_next_idx(node));
/* 將nextIdex減*/
int next_idx = raft_node_get_next_idx(node);
assert(0 <= next_idx);
/* Follower的日志數量還遠遠少于Leader,將nextIdex設為回復中的current_idx+1和Leader
* 當前索引中較小的一個,一般回復中的current_idx+1會比較小*/
if (r->current_idx < next_idx - 1)
raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_)));
/* Follower的日志數量和Leader差不多,但是比對前一條日志時失敗,這種情況將next_idx減1
* 重試*/
else
raft_node_set_next_idx(node, next_idx - 1);
/* 使用更新后的nextIdx重新發(fā)送添加日志請求 */
raft_send_appendentries(me_, node);
return 0;
}
assert(r->current_idx <= raft_get_current_idx(me_));
/* 下面處理添加日志請求的情況 */
/* 更新本地記錄的Follower的日志情況 */
raft_node_set_next_idx(node, r->current_idx + 1);
raft_node_set_match_idx(node, r->current_idx);
/* 如果是新加入的機器,則判斷它的日志是否是最新,如果達到了最新,則賦予它投票權,
* 這里邏輯的詳細解釋在第3.4節(jié) -- 成員變更*/
if (!raft_node_is_voting(node) &&
-1 == me->voting_cfg_change_log_idx &&
raft_get_current_idx(me_) <= r->current_idx + 1 &&
me->cb.node_has_sufficient_logs &&
0 == raft_node_has_sufficient_logs(node)
)
{
raft_node_set_has_sufficient_logs(node);
me->cb.node_has_sufficient_logs(me_, me->udata, node);
}
/* 如果一條日志回復成功的數量超過一半,則將日志提交commit,即允許應用到狀態(tài)機 */
int votes = 1; /* include me */
int point = r->current_idx;
int i;
for (i = 0; i < me->num_nodes; i++)
{
if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i]))
continue;
int match_idx = raft_node_get_match_idx(me->nodes[i]);
if (0 < match_idx)
{
raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx);
/*如果follower已經添加了索引大于等于r->current_idx的日志,則vote加1*/
if (ety->term == me->current_term && point <= match_idx)
votes++;
}
}
/* 投票數大于所有服務器的一半,則將日志提交 */
if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point)
raft_set_commit_idx(me_, point);
/* 如果follower的日志還沒有最新,那么繼續(xù)發(fā)送添加日志請求 */
if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node)))
raft_send_appendentries(me_, node);
/* periodic applies committed entries lazily */
return 0;
}
3.3 成員變更
成員的變更都是以日志的形式下發(fā)的。添加的新成員分兩階段進行,第一階段中新成員沒有有投票權,但是有接收日志的權力;當它的日志同步到最新后就進入到第二階段,由Leader賦予投票權,從而成為集群中完整的一員。刪除成員相對比較簡單,所有服務器收到刪除成員的日志后,立馬將該成員的信息從本地抹除。
- 添加成員過程
- 管理員向Leader發(fā)送添加成員命令
- Leader添加一條 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加沒有投票權的服務器。該日志與其它普通日志一樣同步給集群中其它服務器。收到該日志的服務器在本地保存該新成員的信息。
- 當新成員的日志同步到最新后,Leader添加一條 RAFT_LOGTYPE_ADD_NODE日志,即有投票權的服務器,同樣地,該日志與其它普通日志一樣同步給集群中其它服務器。收到該日志的服務器在本地保存該新成員的信息,以后的投票活動會將新成員考慮進去。
- 刪除成員過程
- 管理員向Leader發(fā)送刪除成員命令。
- Leader添加一條 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一樣同步給其它服務器。收到該日志的服務器立即將被成員信息從本地刪除。