前言
選舉是Raft實現(xiàn)數(shù)據(jù)一致性的安全保證,一個raft集群能夠正常運行,必須有且僅有一個Leader存在,一次成功選舉是集群能夠正常運行的前提。Raft協(xié)議對選舉的定義和安全性保證請參考之前的Raft選舉原理解析[傳送門]。這篇文章通過解析etcd的源碼來看一下Raft集群選舉的具體實現(xiàn)。
心跳
Raft集群在正常運行中是不會觸發(fā)選舉的,選舉只會發(fā)生在集群初次啟動或者其它節(jié)點無法收到Leader心跳的情況下。初次啟動比較好理解,因為raft節(jié)點在啟動時,默認(rèn)都是將自己設(shè)置為Follower。收不到Leader心跳有兩種情況,一種是原來的Leader機器Crash了,還有一種是發(fā)生網(wǎng)絡(luò)分區(qū),F(xiàn)ollower跟Leader之間的網(wǎng)絡(luò)斷了,F(xiàn)ollower以為Leader宕機了。下面先看一下集群一切正常時,心跳是怎么流轉(zhuǎn)的。
心跳觸發(fā)
EtcdServer定時觸發(fā)
Raft集群運行時Leader需要定時的發(fā)送心跳給所有Follower。在etcd中,通過EtcdServer中的定時器定時觸發(fā)Raft模塊來實現(xiàn),這個定時器在raftNode.start()中啟動的。
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
//通過go中的ticker觸發(fā)
case <-r.ticker.C:
r.tick()
case rd := <-r.Ready():
...
...
}
}
...
}()
}
func (r *raftNode) tick() {
r.tickMu.Lock()
r.Tick() //調(diào)用node.Tick()
r.tickMu.Unlock()
}
raftNode在啟動時會啟動一個go routine循環(huán)監(jiān)聽ticker定時器的channel,時間到時則調(diào)用它的tick()方法。這個ticker定時器的周期即用戶設(shè)置的raft集群的心跳周期。raftNode中組合了一個raft.Node,所以tick()方法只是加了互斥鎖之后就調(diào)用的raft.Node.Tick()方法。也就是說觸發(fā)由EtcdServer來做,邏輯由raft模塊來處理。
raft模塊處理
Node的Tick()方法只是簡單的寫個空對象到tickc的channel中,這個前一篇講過,Node接口的實現(xiàn)類大部分操作都是異步完成的。
func (n *node) Tick() {
select {
//寫空對象到tickc channel異步執(zhí)行
case n.tickc <- struct{}{}:
case <-n.done:
default:
n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
}
}
node在啟動時會啟動一個go routine監(jiān)聽tickc(在node.run()方法中)。收到觸發(fā)后直接調(diào)用的RawNode.Tick()方法,而RawNode又調(diào)用了raft.tick()。
//node啟動時會在一個新的go routine中運行run()
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
//一直循環(huán)查看各個channel
for {
...
...
select {
//監(jiān)聽到觸發(fā),則調(diào)用RawNode.Tick()
case <-n.tickc:
n.rn.Tick()
...
}
}
}
//RawNode.Tick()
func (rn *RawNode) Tick() {
//直接調(diào)用raft.tick()
rn.raft.tick()
}
Leader tick()邏輯
在上一篇文章中講過,raft的tick屬性是函數(shù)類型,當(dāng)節(jié)點的角色是Leader時,tick指向的是raft.tickHeartbeat()。
func (r *raft) tickHeartbeat() {
//1. 心跳計數(shù)+1
r.heartbeatElapsed++
r.electionElapsed++
// 選舉超時控制
if r.electionElapsed >= r.electionTimeout {
r.electionElapsed = 0
if r.checkQuorum {
r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
}
// Leader轉(zhuǎn)讓
if r.state == StateLeader && r.leadTransferee != None {
r.abortLeaderTransfer()
}
}
//2. 如果當(dāng)前已經(jīng)不是Leader了,跳過
if r.state != StateLeader {
return
}
//3. 如果心跳計數(shù)大于心跳超時,則發(fā)送心跳消息
if r.heartbeatElapsed >= r.heartbeatTimeout {
//心跳計數(shù)清0
r.heartbeatElapsed = 0
//調(diào)用Step()發(fā)送心跳
r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
}
}
- 第1步首先Leader將心跳計數(shù)加1,上一篇講過,etcd在記錄超時時不是以標(biāo)準(zhǔn)時間記錄的,而是記錄的心跳間隔的倍數(shù)。所以EtcdServer每觸發(fā)一次
tick(),心跳計數(shù)+1,代表距離上次發(fā)送心跳又過了1個心跳時間 - 第2步關(guān)于選舉超時和轉(zhuǎn)讓的邏輯先跳過
- 第3步中判斷是否應(yīng)該發(fā)送心跳,
heartbeatTimeout的意思是Leader應(yīng)該經(jīng)過幾次心跳時間后必須發(fā)送一次心跳。etcd中heartbeatTimeout的默認(rèn)值是1,也就是說其實每次進(jìn)到這個方法heartbeatElapsed >= heartbeatTimeout都是成立的。當(dāng)判斷需要發(fā)送心跳時,會封裝一個MsgBeat的消息提交Step方法處理,處理邏輯下面再說,先看下Follower的tick()方法。
用戶也可以把
heartbeatTimeout這個值設(shè)的很大,當(dāng)然這樣在Leader宕機到觸發(fā)重新選舉的間隔會長一些。在網(wǎng)絡(luò)狀況不好的時候可以這樣設(shè)置。
Follower tick()邏輯
當(dāng)節(jié)點角色是Follower或者Candidate的時候,tick指向的是tickElection()
func (r *raft) tickElection() {
//選舉計數(shù)加1
r.electionElapsed++
//判斷是否超時,要發(fā)起重新選舉
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
以上的邏輯很簡單,因為對于Follower來說,唯一需要關(guān)心得就是是不是很久都沒收到Leader的心跳了。所以每次tick都將選舉計數(shù)+1,當(dāng)Follower收到Leader心跳的時候會將electionElapsed清0。如果Follower收不到Leader的心跳,electionElapsed就會一直加到超過選舉超時,就發(fā)起選舉。發(fā)起選舉的邏輯下面再說。
Leader心跳發(fā)送
raft消息封裝
上面講到Leader在收到Tick請求后,會提交一個MsgBeat的消息給到Step()方法,對于心跳消息,會直接調(diào)用step指向的函數(shù)。跟tick一樣,step也是個函數(shù)類型,在節(jié)點為Leader時,它指向的是stepLeader(),該函數(shù)中對于MsgBeat的消息會調(diào)用bcastHeartbeat()來給集群中每個Follower發(fā)送心跳消息。
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
//判斷對于心跳消息,則廣播心跳
case pb.MsgBeat:
r.bcastHeartbeat()
return nil
case pb.MsgCheckQuorum:
...
...
}
func (r *raft) bcastHeartbeat() {
lastCtx := r.readOnly.lastPendingRequestCtx()
if len(lastCtx) == 0 {
r.bcastHeartbeatWithCtx(nil)
} else {
r.bcastHeartbeatWithCtx([]byte(lastCtx))
}
}
// 廣播心跳消息至所有節(jié)點
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
r.prs.Visit(func(id uint64, _ *tracker.Progress) {
//排除Leader自己
if id == r.id {
return
}
//發(fā)送心跳
r.sendHeartbeat(id, ctx)
})
}
前一篇講過raft在prs屬性中保存了所有Follower的進(jìn)度信息,包含F(xiàn)ollower的id、同步日志的進(jìn)度等。所以上面的方法就是遍歷prs所有節(jié)點,發(fā)送心跳消息。
func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
//計算消息中帶的commitIndex
commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
//封裝成一個Type是MsgHeartbeat的消息,并帶上commitIndex
m := pb.Message{
To: to,
Type: pb.MsgHeartbeat,
Commit: commit,
Context: ctx,
}
//發(fā)送消息
r.send(m)
}
Raft協(xié)議中定義心跳消息和日志消息其實是一個格式的,只是心跳消息沒有帶日志條目,只會攜帶CommitIndex。Leader首先看Follower已經(jīng)接收成功的日志條目的Index,即Progress.Match字段,然后跟自己的CommitIndex比較,取值較小的那個。這是為了防止Follower的日志同步落后太多,CommitIndex處的日志還沒有同步到。
封裝好消息后,調(diào)用send()方法發(fā)送。raft本身并不負(fù)責(zé)消息發(fā)送,所以這個方法只是把消息放到一個隊列中,等待EtcdServer來獲取。
func (r *raft) send(m pb.Message) {
m.From = r.id
//數(shù)據(jù)校驗,選舉類消息必須帶term屬性
if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
if m.Term == 0 {
panic(fmt.Sprintf("term should be set when sending %s", m.Type))
}
} else {
//其它類消息不能帶term屬性
if m.Term != 0 {
panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
}
//除了日志和MsgReadIndex消息外,設(shè)置term為raft當(dāng)前周期
if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
m.Term = r.Term
}
}
//將消息放入隊列
r.msgs = append(r.msgs, m)
}
EtcdServer消息傳輸
上一步中,心跳消息被放入隊列,那這些消息是什么時候被發(fā)給集群中其它節(jié)點呢?發(fā)送操作是由EtcdServer啟動時的go routine處理的。具體實現(xiàn)還是在raftNode.start()中。
func (r *raftNode) start(rh *raftReadyHandler) {
internalTimeout := time.Second
go func() {
defer r.onStop()
islead := false
for {
select {
case <-r.ticker.C:
r.tick()
//調(diào)用Node.Ready(),從返回的channel中獲取數(shù)據(jù)
case rd := <-r.Ready():
if rd.SoftState != nil {
// SoftState不為空的處理邏輯
}
if len(rd.ReadStates) != 0 {
//ReadStates不為空的處理邏輯
}
// 如果是Leader發(fā)送消息給Follower
if islead {
r.transport.Send(r.processMessages(rd.Messages))
}
...
...
//處理完畢調(diào)用Advance()方法
r.Advance()
case <-r.stopped:
return
}
}
}()
}
以上的邏輯中只包含心跳相關(guān)的,當(dāng)從Ready channel中讀到數(shù)據(jù)后,直接通過transport發(fā)送出去,這里的processMessages()除了對消息封裝成傳輸協(xié)議要求的格式,還會做超時控制。
發(fā)送完畢后無論成功失敗都會調(diào)用raft的Advance()方法處理后續(xù)邏輯。Leader一次心跳發(fā)送就算結(jié)束了。
Ready數(shù)據(jù)獲取
上面的邏輯中,心跳的message是被打包在Ready數(shù)據(jù)結(jié)構(gòu)中返回的,下面看一下數(shù)據(jù)打包的過程。既然Node.Ready()返回的是個channel,則必然有地方將Ready塞進(jìn)channel中,這段邏輯是在node.run()方法中。
func (n *node) run() {
var propc chan msgWithResult
var readyc chan Ready
var advancec chan struct{}
var rd Ready
r := n.rn.raft
lead := None
for {
if advancec != nil {
readyc = nil
} else if n.rn.HasReady() { //判斷是否有Ready數(shù)據(jù)
// 獲取Ready數(shù)據(jù)
rd = n.rn.readyWithoutAccept()
readyc = n.readyc
}
....
select {
....
case readyc <- rd: //數(shù)據(jù)放入ready channel中
n.rn.acceptReady(rd) //告訴raft,ready數(shù)據(jù)已被接收
advancec = n.advancec //賦值A(chǔ)dvance channel等待Ready處理完成的消息
}
}
}
上面的代碼中Ready數(shù)據(jù)是通過調(diào)用RawNode.readyWithoutAccept()獲取到的。
func (rn *RawNode) readyWithoutAccept() Ready {
return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}
func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
rd := Ready{
Entries: r.raftLog.unstableEntries(), //未持久化的日志
CommittedEntries: r.raftLog.nextEnts(), //已提交可以apply的日志
Messages: r.msgs, //raft隊列中所有的message
}
//判斷softState有沒有變化,有則賦值
if softSt := r.softState(); !softSt.equal(prevSoftSt) {
rd.SoftState = softSt
}
//判斷hardState有沒有變化,有則賦值
if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
rd.HardState = hardSt
}
//判斷是不是收到snapshot
if r.raftLog.unstable.snapshot != nil {
rd.Snapshot = *r.raftLog.unstable.snapshot
}
if len(r.readStates) != 0 {
rd.ReadStates = r.readStates
}
//處理該Ready后是否需要做fsync,將數(shù)據(jù)強制刷盤
rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
return rd
}
對于心跳來說,上面最關(guān)鍵的操作就是生成Ready的時候,將msg放到Ready中。
Follower心跳處理
現(xiàn)在來到心跳的接收方,心跳消息到達(dá)Follower后,傳輸層會回調(diào)EtcdServer.Process方法,將心跳消息交給raft狀態(tài)機處理。
func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
if s.cluster.IsIDRemoved(types.ID(m.From)) {
//發(fā)送方已經(jīng)從集群中移除
}
if m.Type == raftpb.MsgApp {
//收到日志消息記錄metrics
}
//調(diào)用raft.Step處理消息
return s.r.Step(ctx, m)
}
對于Follower來說Step仍然進(jìn)入的是stepFollower方法,第一步是將選舉計時清0,防止發(fā)起選舉流程。
func stepFollower(r *raft, m pb.Message) error {
switch m.Type {
...
case pb.MsgHeartbeat:
r.electionElapsed = 0 //選舉超時清0
r.lead = m.From //設(shè)置Lead為心跳發(fā)送方ID
r.handleHeartbeat(m) //處理心跳消息
...
}
return nil
}
func (r *raft) handleHeartbeat(m pb.Message) {
//設(shè)置commitIndex為Leader傳來的最新值
r.raftLog.commitTo(m.Commit)
//發(fā)送Response給Leader
r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}
Follower對心跳消息的處理很簡單,1)選舉超時計時清0;2)設(shè)置commitIndex(會檢查本地的commitIndex是不是比leader發(fā)過來的小);3)回復(fù)Leader,回復(fù)的時候按照raft協(xié)議的要求帶上自己日志的進(jìn)度。
Leader心跳回復(fù)處理
Leader收到Follower的心跳回復(fù)后,跟所有消息的處理邏輯一樣,會進(jìn)入stepLeader()方法處理
func stepLeader(r *raft, m pb.Message) error {
switch m.Type {
...
...
case pb.MsgHeartbeatResp:
//記錄Follower為Active狀態(tài)
pr.RecentActive = true
pr.ProbeSent = false
if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
pr.Inflights.FreeFirstOne()
}
//有日志要發(fā)送,繼續(xù)發(fā)送
if pr.Match < r.raftLog.lastIndex() {
r.sendAppend(m.From)
}
if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
return nil
}
//處理線性讀的邏輯
if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
return nil
}
rss := r.readOnly.advance(m)
for _, rs := range rss {
req := rs.req
if req.From == None || req.From == r.id { // from local member
r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
} else {
r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
}
}
}
}
Leader收到心跳回復(fù)后,會判斷是否有新的日志要發(fā)給Follower,有的話就繼續(xù)發(fā)送。線性讀的邏輯放在后面的文章解析。
選舉
正常情況下,Leader在每次tick()方法時發(fā)送心跳,網(wǎng)絡(luò)一切正常,F(xiàn)ollower收到心跳后將選舉計時清0,集群就這樣愉快的運行下去了。但是分布式系統(tǒng)中,意外時必然會發(fā)生的,這時候Leader宕機了,就需要集群中其它節(jié)點站出來,競選成為新的Leader。
選舉觸發(fā)
首先來看一下Follower是怎么認(rèn)定Leader掛了的。當(dāng)raft節(jié)點角色是Follower的時候,EtcdServer每次觸發(fā)tick(),進(jìn)入的是tickElection()方法:
func (r *raft) tickElection() {
r.electionElapsed++
//判斷是否要發(fā)起選舉
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
func (r *raft) promotable() bool {
pr := r.prs.Progress[r.id]
return pr != nil && !pr.IsLearner
}
func (r *raft) pastElectionTimeout() bool {
//每次tick加1后和隨機選舉超時比較
return r.electionElapsed >= r.randomizedElectionTimeout
}
在每次tick()時,都會檢查是否符合發(fā)起一次新的選舉的條件。其中promotable()比較簡單,就是判斷自己當(dāng)前是不是還在集群中并且不能是Learner。pastElectionTimeout()判斷是否已經(jīng)超過選舉超時時間還沒收到Leader的心跳。這里比較的值是randomizedElectionTimeout,代表一個隨機選舉超時時間,使用隨機時間的原因是防止Leader心跳超時后所有Follower同時發(fā)起選舉。我們看下etcd中這個時間是怎么算的:
func (r *raft) resetRandomizedElectionTimeout() {
r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}
可以看到這個時間是1個選舉超時到2個選舉超時之間的隨機值。每次開啟一個新的term,這個reset方法都會被調(diào)用一次,所以在每個選舉周期這個隨機值都是不同的,最大限度防止重復(fù)。
發(fā)送選票
狀態(tài)轉(zhuǎn)換
上一步中Follower發(fā)現(xiàn)過了選舉超時還沒收到Leader心跳,觸發(fā)Step()方法讓raft狀態(tài)機進(jìn)行狀態(tài)轉(zhuǎn)換。
func (r *raft) Step(m pb.Message) error {
...
...
switch m.Type {
case pb.MsgHup:
if r.state != StateLeader {
if !r.promotable() {
//write log
return nil
}
ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
if err != nil {
r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
}
//判斷有配置變更日志,如果集群正在做配置變更,則不發(fā)起選舉
if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
//write log
return nil
}
//發(fā)起選舉,根據(jù)配置判斷選舉之前是否要做預(yù)投票
if r.preVote {
r.campaign(campaignPreElection)
} else {
r.campaign(campaignElection)
}
} else {
r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
}
}
Step方法收到要發(fā)起選舉的消息后(MsgHup),會首先判斷已經(jīng)提交的還沒生效的日志中有沒有集群變更,有的話說明集群正在變更,則不發(fā)起選舉。這么做的原因是有可能當(dāng)前節(jié)點在集群變更后已經(jīng)被從集群中移除了。
然后,根據(jù)配置中設(shè)置的選舉是否需要先預(yù)選,來調(diào)用campaign()方法發(fā)起選舉。預(yù)選的原理放在本文最后說。
func (r *raft) campaign(t CampaignType) {
var term uint64
var voteMsg pb.MessageType
if t == campaignPreElection {
//需要預(yù)選的情況
r.becomePreCandidate()
voteMsg = pb.MsgPreVote
term = r.Term + 1
} else {
//1. 變?yōu)楹蜻x人,并初始化一條拉票的消息
r.becomeCandidate()
voteMsg = pb.MsgVote
term = r.Term
}
//2.判斷是否已經(jīng)贏得選舉
if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
if t == campaignPreElection {
r.campaign(campaignElection)
} else {
r.becomeLeader()
}
return
}
//3. 收集所有集群中節(jié)點id
var ids []uint64
{
idMap := r.prs.Voters.IDs()
ids = make([]uint64, 0, len(idMap))
for id := range idMap {
ids = append(ids, id)
}
sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
}
//4. 輪詢所有節(jié)點,給除自己之外的節(jié)點發(fā)送一條拉票的消息
for _, id := range ids {
if id == r.id {
continue
}
var ctx []byte
if t == campaignTransfer {
ctx = []byte(t)
}
//5.根據(jù)raft協(xié)議的定義組合投票消息
r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
}
}
上面的代碼分為如下幾步:
- 在發(fā)送投票之前,節(jié)點首先將自己的狀態(tài)置為候選人,這一步會把term加1,然后修改自己的Vote屬性為自己的id,表示當(dāng)前周期選票投給自己。becomeCandidate() 方法如下:
func (r *raft) becomeCandidate() {
if r.state == StateLeader {
panic("invalid transition [leader -> candidate]")
}
r.step = stepCandidate //step方法指向stepCandidate
r.reset(r.Term + 1) //選舉周期+1
r.tick = r.tickElection //tick方法指向tickElection
r.Vote = r.id //投票給自己
r.state = StateCandidate //節(jié)點狀態(tài)轉(zhuǎn)為候選人
}
- 第二步的判斷是為了兼容單節(jié)點集群的場景,不是真正的判斷是否收到半數(shù)以上選票。對于單個節(jié)點,只要自己給自己投票了就已經(jīng)是Leader了。
3-5. 給集群中所有節(jié)點發(fā)送選票, 根據(jù)raft協(xié)議的定義,投票請求需要包含新的選舉周期,節(jié)點id和最新日志的Index。然后跟其它消息一樣調(diào)用send方法提交一條消息。
選票發(fā)送
消息發(fā)送和心跳消息一樣,也是放到raft的msg隊列中。EtcdServer拿到Ready后發(fā)送給集群中其它的節(jié)點,整個步驟中沒有針對voteMsg做特殊處理。
Follower投票
當(dāng)候選人節(jié)點將選票消息發(fā)出以后,在node中會放入recvc,最終會調(diào)用raft.Step(m pb.Message)處理這條消息。到Step()方法之前,邏輯跟心跳沒有區(qū)別,就不重復(fù)了,下面看下Step方法的處理。
func (r *raft) Step(m pb.Message) error {
switch {
case m.Term == 0:
// local message
case m.Term > r.Term:
//1. 消息的term比當(dāng)前節(jié)點的大
if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
//2. 判斷是否是人工操作的強制transfer
force := bytes.Equal(m.Context, []byte(campaignTransfer))
//3. 根據(jù)本地記錄判斷Leader是否已超時
inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
if !force && inLease {
// 如果沒超時,則直接不回復(fù)候選人
return nil
}
}
switch {
case m.Type == pb.MsgPreVote:
// 預(yù)選不修改term
case m.Type == pb.MsgPreVoteResp && !m.Reject:
// 預(yù)選的response
default:
if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
//4-1. 如果收到了新的term的心跳、append或者snapshot,代表新的周期開始,自己變成Follower
r.becomeFollower(m.Term, m.From)
} else {
//4-2. 如果收到了候選人 的投票請求,則說明當(dāng)前進(jìn)入重新選舉階段,將Leader設(shè)置成None
r.becomeFollower(m.Term, None)
}
}
case m.Term < r.Term:
//處理收到的消息term小于當(dāng)前節(jié)點
}
switch m.Type {
case pb.MsgHup:
...
case pb.MsgVote, pb.MsgPreVote:
// 5-1. 判斷是否可以投票給候選人
canVote := r.Vote == m.From ||
(r.Vote == None && r.lead == None) ||
(m.Type == pb.MsgPreVote && m.Term > r.Term)
// 5-2. 判斷候選人的日志比當(dāng)前節(jié)點的新
if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
//6-1. 回復(fù)候選人同意
r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
if m.Type == pb.MsgVote {
// 將Vote屬性改為候選人的id
r.electionElapsed = 0
r.Vote = m.From
}
} else {
//6-2. 回復(fù)候選人不同意
r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
}
default:
...
}
return nil
}
節(jié)點收到投票的消息,處理前提是收到的Term比當(dāng)前的Term要大。
- ,如果是候選人發(fā)送的投票消息,首先會做一次校驗。1到3步是判斷這次投票的消息不是Transfer消息,并且選舉也沒有超時,則直接忽略掉。再次檢查最小選舉超時是為了防止集群中只有少部分節(jié)點收不到心跳,而其它節(jié)點心跳正常的情況,減少重新選舉的次數(shù)。
Transfer消息是etcd支持的人工發(fā)起的Leader轉(zhuǎn)移請求,這是為了在Leader機器性能不夠或者準(zhǔn)備下線時,人工發(fā)起切換Leader
- 收到term比自己大的消息時,有可能是有新的Leader當(dāng)選了,發(fā)送日志或者心跳消息出來。這種情況當(dāng)前節(jié)點無論處于什么狀態(tài)都應(yīng)該切換成Follower。如果是候選人的投票消息,則將自己的Leader設(shè)置成None,進(jìn)入選舉中階段。
5-1. 在收到候選人的投票消息后,必須滿足3種情況下的一種才可以投同意票,① 之前已經(jīng)投給這個候選人了,可能由于網(wǎng)絡(luò)的原因再次收到重復(fù)的消息;②當(dāng)前未給任何節(jié)點投過票,而且當(dāng)前的Leader是None(在低4步中設(shè)置的);③ 預(yù)選消息只需要判斷term就可以了
5-2.投同意票還有一個條件就是候選人的日志比當(dāng)前節(jié)點的新,raft中新的標(biāo)準(zhǔn)就是最后一條日志要么term更大,要么term相同Index更大
6-1. 第5步的條件都滿足后就可以回復(fù)同意給候選人了,同時將自己Vote值改為候選人的ID,這一步很關(guān)鍵,保證了同一個term中,只能投票給一個候選人
6-2. 如果第5步中的條件不滿足則拒絕候選人
選舉完成
對于候選人來說,選票發(fā)出去之后無非面臨如下2種結(jié)果:
- 失敗,規(guī)定時間內(nèi)沒有收到超過半數(shù)同意票
- 成功,規(guī)定時間內(nèi)收到超過半數(shù)同意票
處理選票回復(fù)
在集群網(wǎng)絡(luò)正常時,候選人應(yīng)該很快會收到各個節(jié)點對她選票的回復(fù)。消息的處理在stepCandidate()函數(shù)中:
func stepCandidate(r *raft, m pb.Message) error {
var myVoteRespType pb.MessageType
if r.state == StatePreCandidate {
myVoteRespType = pb.MsgPreVoteResp
} else {
myVoteRespType = pb.MsgVoteResp
}
switch m.Type {
case pb.MsgProp:
//cadidate狀態(tài)下不接受客戶端數(shù)據(jù)修改請求
return ErrProposalDropped
case pb.MsgApp:
//投票期間收到日志,說明其它節(jié)點成為Leader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleAppendEntries(m)
case pb.MsgHeartbeat:
//投票期間收到心跳,說明其它節(jié)點成為Leader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleHeartbeat(m)
case pb.MsgSnap:
//投票期間收到日志快照,說明其它節(jié)點成為Leader
r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
r.handleSnapshot(m)
case myVoteRespType:
//收到投票反饋,則記錄并判斷是否超過半數(shù)
gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
switch res {
case quorum.VoteWon:
if r.state == StatePreCandidate {
r.campaign(campaignElection)
} else {
//贏得選舉,則成為Leader,開始廣播心跳和日志
r.becomeLeader()
r.bcastAppend()
}
case quorum.VoteLost:
// 輸?shù)暨x舉,重新變成Follower
r.becomeFollower(r.Term, None)
}
case pb.MsgTimeoutNow:
...
}
return nil
}
當(dāng)發(fā)起選舉的節(jié)點收到消息時,如果消息是日志、快照或者心跳消息,說明別的節(jié)點已經(jīng)成為Leader,它已經(jīng)輸?shù)袅耍瑒t直接成為Follower。
如果收到的是其它節(jié)點的投票回復(fù),則會統(tǒng)計自己的選票,如果超過半數(shù)支持,是則成為Leader;超過半數(shù)拒絕,則回到Follower狀態(tài)等待下個選舉超時。如果都沒到,則繼續(xù)等。
在贏得選舉成為Leader的情況下,根據(jù)raft協(xié)議,需要馬上開始發(fā)送心跳,以防止其它Follower開始新的選舉。
超時失敗
除了以上候選人節(jié)點在收到明確的消息時,可以判斷自己是否成功之外,還有另外一種場景。比如一個節(jié)點和其它節(jié)點網(wǎng)絡(luò)狀況不佳或者是多個節(jié)點同時成為候選人。這種場景下,即不會收到足夠的投票,也沒有收到別人成為Leader消息,為了防止節(jié)點一直等下去,需要一個超時的機制。
etcd在整個選票發(fā)送及等待選票的過程中,tick()方法是一直在運行的,如果自己一直沒有當(dāng)選,別人也沒當(dāng)選超時的話。候選人會發(fā)起重新一輪的選舉,邏輯跟第一輪是一樣的。回顧下這個方法,其實無論是處于Follower還是Candidate狀態(tài),tick的邏輯是一樣的。
func (r *raft) tickElection() {
r.electionElapsed++
if r.promotable() && r.pastElectionTimeout() {
r.electionElapsed = 0
r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
}
}
預(yù)選舉
什么是預(yù)選
etcd中,raft狀態(tài)機新增了一個狀態(tài)叫做預(yù)候選人。如果用戶在啟動etcd時配置了PreVote屬性為true,則每次選舉開始之前,都會先來一輪預(yù)選。所謂預(yù)選,就是節(jié)點在成為正式候選人之前,先發(fā)送一個預(yù)選的消息給集群內(nèi)所有節(jié)點(MsgPreVote),如果超過半數(shù)節(jié)點都同意,候選人才會開始一次正式的選舉。
在預(yù)選階段,候選節(jié)點的狀態(tài)變?yōu)?code>PreCadidate。而其它節(jié)點仍然保持原來的狀態(tài),也就是說這時候又有其它Follower要發(fā)起選舉并發(fā)送預(yù)選請求,其它節(jié)點也是會同意的。
當(dāng)進(jìn)入預(yù)選狀態(tài)的節(jié)點,收到超過半數(shù)同意后,則正式進(jìn)入候選人狀態(tài)(Candidate)
為什么需要預(yù)選
添加預(yù)選的原因是為了在網(wǎng)絡(luò)狀況不佳時,減少選舉次數(shù)。舉個具體場景,當(dāng)集群中的網(wǎng)絡(luò)不穩(wěn)定時,會有部分Follower不能及時地收到Leader的心跳,這時候就會有Follower發(fā)起選舉。但是網(wǎng)絡(luò)原因,它自身也很難拿到超過半數(shù)選票當(dāng)選,或者當(dāng)選之后也很快就會有別的節(jié)點因為收不到心跳而再次發(fā)起選舉,這就導(dǎo)致了集群經(jīng)常處于選舉狀態(tài)而不可用。為了防止這種情況的頻繁發(fā)生,添加預(yù)選階段,等于把Leader掛掉這件事從單個節(jié)點自己判斷,變成了半數(shù)節(jié)點一起判斷,大大減少了誤判。
凡事都有利有弊,當(dāng)Leader雖然沒掛掉,但性能有問題時,可能只影響了不到一半的節(jié)點。添加預(yù)選之后可能會導(dǎo)致性能不佳的Leader很難被選下去,從而影響讀寫性能。
總結(jié)
選舉是保證raft安全性的基礎(chǔ),心跳是保證集群能夠盡快從Leader宕機或者網(wǎng)絡(luò)分區(qū)中恢復(fù)的關(guān)鍵。etcd中將心跳和計時做了集成,抽象成tick。tick操作在Leader端用來觸發(fā)raft定時發(fā)送心跳,而在Follower端是為了觸發(fā)檢查Leader是否超時。
Raft模塊通過tick操作來觸發(fā)狀態(tài)機在不同狀態(tài)中的轉(zhuǎn)換,通過綁定不同的函數(shù)來對消息進(jìn)行處理和反饋。
選舉完成后,etcd就可以通過在集群中復(fù)制日志來保證用戶對數(shù)據(jù)讀寫的分布式保存和一致性保證了。下一篇將重點解析etcd中日志復(fù)制和提交生效的過程。
【鏈接】
Raft協(xié)議實現(xiàn)之etcd(一):基本架構(gòu)
分布式一致性協(xié)議-Raft詳解 (一) 選舉