Raft協(xié)議實現(xiàn)之etcd(二):心跳及選舉

前言

選舉是Raft實現(xiàn)數(shù)據(jù)一致性的安全保證,一個raft集群能夠正常運行,必須有且僅有一個Leader存在,一次成功選舉是集群能夠正常運行的前提。Raft協(xié)議對選舉的定義和安全性保證請參考之前的Raft選舉原理解析[傳送門]。這篇文章通過解析etcd的源碼來看一下Raft集群選舉的具體實現(xiàn)。

心跳

Raft集群在正常運行中是不會觸發(fā)選舉的,選舉只會發(fā)生在集群初次啟動或者其它節(jié)點無法收到Leader心跳的情況下。初次啟動比較好理解,因為raft節(jié)點在啟動時,默認(rèn)都是將自己設(shè)置為Follower。收不到Leader心跳有兩種情況,一種是原來的Leader機器Crash了,還有一種是發(fā)生網(wǎng)絡(luò)分區(qū),F(xiàn)ollower跟Leader之間的網(wǎng)絡(luò)斷了,F(xiàn)ollower以為Leader宕機了。下面先看一下集群一切正常時,心跳是怎么流轉(zhuǎn)的。

心跳觸發(fā)

EtcdServer定時觸發(fā)
Raft集群運行時Leader需要定時的發(fā)送心跳給所有Follower。在etcd中,通過EtcdServer中的定時器定時觸發(fā)Raft模塊來實現(xiàn),這個定時器在raftNode.start()中啟動的。

func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second
    go func() {
        defer r.onStop()
        islead := false

        for {
            select {
           //通過go中的ticker觸發(fā)
            case <-r.ticker.C:
                r.tick()
            case rd := <-r.Ready():
                ...
                ...
           }
        }
       ...
   }()
}

func (r *raftNode) tick() {
    r.tickMu.Lock()
    r.Tick() //調(diào)用node.Tick()
    r.tickMu.Unlock()
}

raftNode在啟動時會啟動一個go routine循環(huán)監(jiān)聽ticker定時器的channel,時間到時則調(diào)用它的tick()方法。這個ticker定時器的周期即用戶設(shè)置的raft集群的心跳周期。raftNode中組合了一個raft.Node,所以tick()方法只是加了互斥鎖之后就調(diào)用的raft.Node.Tick()方法。也就是說觸發(fā)由EtcdServer來做,邏輯由raft模塊來處理。
raft模塊處理
Node的Tick()方法只是簡單的寫個空對象到tickc的channel中,這個前一篇講過,Node接口的實現(xiàn)類大部分操作都是異步完成的。

func (n *node) Tick() {
    select {
    //寫空對象到tickc channel異步執(zhí)行
    case n.tickc <- struct{}{}:
    case <-n.done:
    default:
        n.rn.raft.logger.Warningf("%x (leader %v) A tick missed to fire. Node blocks too long!", n.rn.raft.id, n.rn.raft.id == n.rn.raft.lead)
    }
}

node在啟動時會啟動一個go routine監(jiān)聽tickc(在node.run()方法中)。收到觸發(fā)后直接調(diào)用的RawNode.Tick()方法,而RawNode又調(diào)用了raft.tick()。

//node啟動時會在一個新的go routine中運行run()
func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready
    r := n.rn.raft
    lead := None
   //一直循環(huán)查看各個channel
    for {
            ...
            ...
            select {
                //監(jiān)聽到觸發(fā),則調(diào)用RawNode.Tick()
                case <-n.tickc:
                    n.rn.Tick()
                ...
            }
     }
}

//RawNode.Tick()
func (rn *RawNode) Tick() {
    //直接調(diào)用raft.tick()
    rn.raft.tick()
}

Leader tick()邏輯
在上一篇文章中講過,raft的tick屬性是函數(shù)類型,當(dāng)節(jié)點的角色是Leader時,tick指向的是raft.tickHeartbeat()

func (r *raft) tickHeartbeat() {
    //1. 心跳計數(shù)+1
    r.heartbeatElapsed++
    r.electionElapsed++
    // 選舉超時控制  
    if r.electionElapsed >= r.electionTimeout {
        r.electionElapsed = 0
        if r.checkQuorum {
            r.Step(pb.Message{From: r.id, Type: pb.MsgCheckQuorum})
        }
        // Leader轉(zhuǎn)讓
        if r.state == StateLeader && r.leadTransferee != None {
            r.abortLeaderTransfer()
        }
    }
     //2. 如果當(dāng)前已經(jīng)不是Leader了,跳過
    if r.state != StateLeader {
        return
    }
     //3. 如果心跳計數(shù)大于心跳超時,則發(fā)送心跳消息
    if r.heartbeatElapsed >= r.heartbeatTimeout {
        //心跳計數(shù)清0
        r.heartbeatElapsed = 0
        //調(diào)用Step()發(fā)送心跳
        r.Step(pb.Message{From: r.id, Type: pb.MsgBeat})
    }
}
  • 第1步首先Leader將心跳計數(shù)加1,上一篇講過,etcd在記錄超時時不是以標(biāo)準(zhǔn)時間記錄的,而是記錄的心跳間隔的倍數(shù)。所以EtcdServer每觸發(fā)一次tick(),心跳計數(shù)+1,代表距離上次發(fā)送心跳又過了1個心跳時間
  • 第2步關(guān)于選舉超時和轉(zhuǎn)讓的邏輯先跳過
  • 第3步中判斷是否應(yīng)該發(fā)送心跳,heartbeatTimeout的意思是Leader應(yīng)該經(jīng)過幾次心跳時間后必須發(fā)送一次心跳。etcd中heartbeatTimeout的默認(rèn)值是1,也就是說其實每次進(jìn)到這個方法heartbeatElapsed >= heartbeatTimeout都是成立的。當(dāng)判斷需要發(fā)送心跳時,會封裝一個MsgBeat的消息提交Step方法處理,處理邏輯下面再說,先看下Follower的tick()方法。

用戶也可以把heartbeatTimeout這個值設(shè)的很大,當(dāng)然這樣在Leader宕機到觸發(fā)重新選舉的間隔會長一些。在網(wǎng)絡(luò)狀況不好的時候可以這樣設(shè)置。

Follower tick()邏輯
當(dāng)節(jié)點角色是Follower或者Candidate的時候,tick指向的是tickElection()

func (r *raft) tickElection() {
    //選舉計數(shù)加1
    r.electionElapsed++
    //判斷是否超時,要發(fā)起重新選舉
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

以上的邏輯很簡單,因為對于Follower來說,唯一需要關(guān)心得就是是不是很久都沒收到Leader的心跳了。所以每次tick都將選舉計數(shù)+1,當(dāng)Follower收到Leader心跳的時候會將electionElapsed清0。如果Follower收不到Leader的心跳,electionElapsed就會一直加到超過選舉超時,就發(fā)起選舉。發(fā)起選舉的邏輯下面再說。

Leader心跳發(fā)送

raft消息封裝
上面講到Leader在收到Tick請求后,會提交一個MsgBeat的消息給到Step()方法,對于心跳消息,會直接調(diào)用step指向的函數(shù)。跟tick一樣,step也是個函數(shù)類型,在節(jié)點為Leader時,它指向的是stepLeader(),該函數(shù)中對于MsgBeat的消息會調(diào)用bcastHeartbeat()來給集群中每個Follower發(fā)送心跳消息。

func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
    //判斷對于心跳消息,則廣播心跳
    case pb.MsgBeat:
        r.bcastHeartbeat()
        return nil
        case pb.MsgCheckQuorum:
        ...
        ...
}

func (r *raft) bcastHeartbeat() {
    lastCtx := r.readOnly.lastPendingRequestCtx()
    if len(lastCtx) == 0 {
        r.bcastHeartbeatWithCtx(nil)
    } else {
        r.bcastHeartbeatWithCtx([]byte(lastCtx))
    }
}
// 廣播心跳消息至所有節(jié)點
func (r *raft) bcastHeartbeatWithCtx(ctx []byte) {
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        //排除Leader自己
        if id == r.id {
            return
        }
       //發(fā)送心跳
        r.sendHeartbeat(id, ctx)
    })
}

前一篇講過raft在prs屬性中保存了所有Follower的進(jìn)度信息,包含F(xiàn)ollower的id、同步日志的進(jìn)度等。所以上面的方法就是遍歷prs所有節(jié)點,發(fā)送心跳消息。

func (r *raft) sendHeartbeat(to uint64, ctx []byte) {
    //計算消息中帶的commitIndex
    commit := min(r.prs.Progress[to].Match, r.raftLog.committed)
       //封裝成一個Type是MsgHeartbeat的消息,并帶上commitIndex
    m := pb.Message{
        To:      to,
        Type:    pb.MsgHeartbeat,
        Commit:  commit,
        Context: ctx,
    }
       //發(fā)送消息
    r.send(m)
}

Raft協(xié)議中定義心跳消息和日志消息其實是一個格式的,只是心跳消息沒有帶日志條目,只會攜帶CommitIndex。Leader首先看Follower已經(jīng)接收成功的日志條目的Index,即Progress.Match字段,然后跟自己的CommitIndex比較,取值較小的那個。這是為了防止Follower的日志同步落后太多,CommitIndex處的日志還沒有同步到。
封裝好消息后,調(diào)用send()方法發(fā)送。raft本身并不負(fù)責(zé)消息發(fā)送,所以這個方法只是把消息放到一個隊列中,等待EtcdServer來獲取。

func (r *raft) send(m pb.Message) {
    m.From = r.id
    //數(shù)據(jù)校驗,選舉類消息必須帶term屬性
    if m.Type == pb.MsgVote || m.Type == pb.MsgVoteResp || m.Type == pb.MsgPreVote || m.Type == pb.MsgPreVoteResp {
        if m.Term == 0 {
            panic(fmt.Sprintf("term should be set when sending %s", m.Type))
        }
    } else {
         //其它類消息不能帶term屬性
        if m.Term != 0 {
            panic(fmt.Sprintf("term should not be set when sending %s (was %d)", m.Type, m.Term))
        }
       //除了日志和MsgReadIndex消息外,設(shè)置term為raft當(dāng)前周期
        if m.Type != pb.MsgProp && m.Type != pb.MsgReadIndex {
            m.Term = r.Term
        }
    }
    //將消息放入隊列
    r.msgs = append(r.msgs, m)
}

EtcdServer消息傳輸
上一步中,心跳消息被放入隊列,那這些消息是什么時候被發(fā)給集群中其它節(jié)點呢?發(fā)送操作是由EtcdServer啟動時的go routine處理的。具體實現(xiàn)還是在raftNode.start()中。

func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second
    go func() {
        defer r.onStop()
        islead := false
        for {
            select {
            case <-r.ticker.C:
                r.tick()
              //調(diào)用Node.Ready(),從返回的channel中獲取數(shù)據(jù)
            case rd := <-r.Ready():
                if rd.SoftState != nil {
                    // SoftState不為空的處理邏輯
                }
                if len(rd.ReadStates) != 0 {
                    //ReadStates不為空的處理邏輯
                }
                // 如果是Leader發(fā)送消息給Follower
                if islead {
                    r.transport.Send(r.processMessages(rd.Messages))
                }
                ...
                ...
                //處理完畢調(diào)用Advance()方法
                r.Advance()
            case <-r.stopped:
                return
            }
        }
    }()
}

以上的邏輯中只包含心跳相關(guān)的,當(dāng)從Ready channel中讀到數(shù)據(jù)后,直接通過transport發(fā)送出去,這里的processMessages()除了對消息封裝成傳輸協(xié)議要求的格式,還會做超時控制。
發(fā)送完畢后無論成功失敗都會調(diào)用raft的Advance()方法處理后續(xù)邏輯。Leader一次心跳發(fā)送就算結(jié)束了。
Ready數(shù)據(jù)獲取
上面的邏輯中,心跳的message是被打包在Ready數(shù)據(jù)結(jié)構(gòu)中返回的,下面看一下數(shù)據(jù)打包的過程。既然Node.Ready()返回的是個channel,則必然有地方將Ready塞進(jìn)channel中,這段邏輯是在node.run()方法中。

func (n *node) run() {
    var propc chan msgWithResult
    var readyc chan Ready
    var advancec chan struct{}
    var rd Ready
    r := n.rn.raft
    lead := None
    for {
        if advancec != nil {
            readyc = nil
        } else if n.rn.HasReady() { //判斷是否有Ready數(shù)據(jù)
            // 獲取Ready數(shù)據(jù)
            rd = n.rn.readyWithoutAccept()
            readyc = n.readyc
        }
        ....
        select {
            ....
            case readyc <- rd: //數(shù)據(jù)放入ready channel中
                n.rn.acceptReady(rd)  //告訴raft,ready數(shù)據(jù)已被接收
                advancec = n.advancec  //賦值A(chǔ)dvance channel等待Ready處理完成的消息
            }
    }
}

上面的代碼中Ready數(shù)據(jù)是通過調(diào)用RawNode.readyWithoutAccept()獲取到的。

func (rn *RawNode) readyWithoutAccept() Ready {
    return newReady(rn.raft, rn.prevSoftSt, rn.prevHardSt)
}

func newReady(r *raft, prevSoftSt *SoftState, prevHardSt pb.HardState) Ready {
    rd := Ready{
        Entries:          r.raftLog.unstableEntries(),  //未持久化的日志
        CommittedEntries: r.raftLog.nextEnts(),   //已提交可以apply的日志
        Messages:         r.msgs,  //raft隊列中所有的message
    }
    //判斷softState有沒有變化,有則賦值
    if softSt := r.softState(); !softSt.equal(prevSoftSt) {
        rd.SoftState = softSt
    }
    //判斷hardState有沒有變化,有則賦值
    if hardSt := r.hardState(); !isHardStateEqual(hardSt, prevHardSt) {
        rd.HardState = hardSt
    }
    //判斷是不是收到snapshot
    if r.raftLog.unstable.snapshot != nil {
        rd.Snapshot = *r.raftLog.unstable.snapshot
    }
    if len(r.readStates) != 0 {
        rd.ReadStates = r.readStates
    }
    //處理該Ready后是否需要做fsync,將數(shù)據(jù)強制刷盤
    rd.MustSync = MustSync(r.hardState(), prevHardSt, len(rd.Entries))
    return rd
}

對于心跳來說,上面最關(guān)鍵的操作就是生成Ready的時候,將msg放到Ready中。

Follower心跳處理

現(xiàn)在來到心跳的接收方,心跳消息到達(dá)Follower后,傳輸層會回調(diào)EtcdServer.Process方法,將心跳消息交給raft狀態(tài)機處理。

func (s *EtcdServer) Process(ctx context.Context, m raftpb.Message) error {
    if s.cluster.IsIDRemoved(types.ID(m.From)) {
        //發(fā)送方已經(jīng)從集群中移除
    }
    if m.Type == raftpb.MsgApp {
        //收到日志消息記錄metrics
    }
    //調(diào)用raft.Step處理消息
    return s.r.Step(ctx, m)
}

對于Follower來說Step仍然進(jìn)入的是stepFollower方法,第一步是將選舉計時清0,防止發(fā)起選舉流程。

func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    ...
    case pb.MsgHeartbeat:
        r.electionElapsed = 0 //選舉超時清0
        r.lead = m.From  //設(shè)置Lead為心跳發(fā)送方ID
        r.handleHeartbeat(m) //處理心跳消息
    ...
    }
    return nil
}

func (r *raft) handleHeartbeat(m pb.Message) {
     //設(shè)置commitIndex為Leader傳來的最新值
    r.raftLog.commitTo(m.Commit)
     //發(fā)送Response給Leader
    r.send(pb.Message{To: m.From, Type: pb.MsgHeartbeatResp, Context: m.Context})
}

Follower對心跳消息的處理很簡單,1)選舉超時計時清0;2)設(shè)置commitIndex(會檢查本地的commitIndex是不是比leader發(fā)過來的小);3)回復(fù)Leader,回復(fù)的時候按照raft協(xié)議的要求帶上自己日志的進(jìn)度。

Leader心跳回復(fù)處理

Leader收到Follower的心跳回復(fù)后,跟所有消息的處理邏輯一樣,會進(jìn)入stepLeader()方法處理

func stepLeader(r *raft, m pb.Message) error {
    switch m.Type {
      ...
      ...
    case pb.MsgHeartbeatResp:
        //記錄Follower為Active狀態(tài)
        pr.RecentActive = true
        pr.ProbeSent = false
        if pr.State == tracker.StateReplicate && pr.Inflights.Full() {
            pr.Inflights.FreeFirstOne()
        }
         //有日志要發(fā)送,繼續(xù)發(fā)送
        if pr.Match < r.raftLog.lastIndex() {
            r.sendAppend(m.From)
        }
        if r.readOnly.option != ReadOnlySafe || len(m.Context) == 0 {
            return nil
        }
         //處理線性讀的邏輯
        if r.prs.Voters.VoteResult(r.readOnly.recvAck(m.From, m.Context)) != quorum.VoteWon {
            return nil
        }
        rss := r.readOnly.advance(m)
        for _, rs := range rss {
            req := rs.req
            if req.From == None || req.From == r.id { // from local member
                r.readStates = append(r.readStates, ReadState{Index: rs.index, RequestCtx: req.Entries[0].Data})
            } else {
                r.send(pb.Message{To: req.From, Type: pb.MsgReadIndexResp, Index: rs.index, Entries: req.Entries})
            }
        }
    }
}

Leader收到心跳回復(fù)后,會判斷是否有新的日志要發(fā)給Follower,有的話就繼續(xù)發(fā)送。線性讀的邏輯放在后面的文章解析。

選舉

正常情況下,Leader在每次tick()方法時發(fā)送心跳,網(wǎng)絡(luò)一切正常,F(xiàn)ollower收到心跳后將選舉計時清0,集群就這樣愉快的運行下去了。但是分布式系統(tǒng)中,意外時必然會發(fā)生的,這時候Leader宕機了,就需要集群中其它節(jié)點站出來,競選成為新的Leader。

選舉觸發(fā)

首先來看一下Follower是怎么認(rèn)定Leader掛了的。當(dāng)raft節(jié)點角色是Follower的時候,EtcdServer每次觸發(fā)tick(),進(jìn)入的是tickElection()方法:

func (r *raft) tickElection() {
    r.electionElapsed++
     //判斷是否要發(fā)起選舉
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

func (r *raft) promotable() bool {
    pr := r.prs.Progress[r.id]
    return pr != nil && !pr.IsLearner
}
func (r *raft) pastElectionTimeout() bool {
    //每次tick加1后和隨機選舉超時比較
    return r.electionElapsed >= r.randomizedElectionTimeout
}

在每次tick()時,都會檢查是否符合發(fā)起一次新的選舉的條件。其中promotable()比較簡單,就是判斷自己當(dāng)前是不是還在集群中并且不能是Learner。pastElectionTimeout()判斷是否已經(jīng)超過選舉超時時間還沒收到Leader的心跳。這里比較的值是randomizedElectionTimeout,代表一個隨機選舉超時時間,使用隨機時間的原因是防止Leader心跳超時后所有Follower同時發(fā)起選舉。我們看下etcd中這個時間是怎么算的:

func (r *raft) resetRandomizedElectionTimeout() {
    r.randomizedElectionTimeout = r.electionTimeout + globalRand.Intn(r.electionTimeout)
}

可以看到這個時間是1個選舉超時到2個選舉超時之間的隨機值。每次開啟一個新的term,這個reset方法都會被調(diào)用一次,所以在每個選舉周期這個隨機值都是不同的,最大限度防止重復(fù)。

發(fā)送選票

狀態(tài)轉(zhuǎn)換
上一步中Follower發(fā)現(xiàn)過了選舉超時還沒收到Leader心跳,觸發(fā)Step()方法讓raft狀態(tài)機進(jìn)行狀態(tài)轉(zhuǎn)換。

func (r *raft) Step(m pb.Message) error {
    ...
    ...
    switch m.Type {
    case pb.MsgHup:
        if r.state != StateLeader {
            if !r.promotable() {
                //write log
                return nil
            }
            ents, err := r.raftLog.slice(r.raftLog.applied+1, r.raftLog.committed+1, noLimit)
            if err != nil {
                r.logger.Panicf("unexpected error getting unapplied entries (%v)", err)
            }
            //判斷有配置變更日志,如果集群正在做配置變更,則不發(fā)起選舉
            if n := numOfPendingConf(ents); n != 0 && r.raftLog.committed > r.raftLog.applied {
                //write log
                return nil
            }
            //發(fā)起選舉,根據(jù)配置判斷選舉之前是否要做預(yù)投票
            if r.preVote {
                r.campaign(campaignPreElection)
            } else {
                r.campaign(campaignElection)
            }
        } else {
            r.logger.Debugf("%x ignoring MsgHup because already leader", r.id)
        }
}

Step方法收到要發(fā)起選舉的消息后(MsgHup),會首先判斷已經(jīng)提交的還沒生效的日志中有沒有集群變更,有的話說明集群正在變更,則不發(fā)起選舉。這么做的原因是有可能當(dāng)前節(jié)點在集群變更后已經(jīng)被從集群中移除了。
然后,根據(jù)配置中設(shè)置的選舉是否需要先預(yù)選,來調(diào)用campaign()方法發(fā)起選舉。預(yù)選的原理放在本文最后說。

func (r *raft) campaign(t CampaignType) {
    var term uint64
    var voteMsg pb.MessageType
    if t == campaignPreElection { 
        //需要預(yù)選的情況
        r.becomePreCandidate()
        voteMsg = pb.MsgPreVote
        term = r.Term + 1
    } else {
        //1. 變?yōu)楹蜻x人,并初始化一條拉票的消息
        r.becomeCandidate()
        voteMsg = pb.MsgVote
        term = r.Term
    }
    //2.判斷是否已經(jīng)贏得選舉
    if _, _, res := r.poll(r.id, voteRespMsgType(voteMsg), true); res == quorum.VoteWon {
        if t == campaignPreElection {
            r.campaign(campaignElection)
        } else {
            r.becomeLeader()
        }
        return
    }
    //3. 收集所有集群中節(jié)點id
    var ids []uint64
    {
        idMap := r.prs.Voters.IDs()
        ids = make([]uint64, 0, len(idMap))
        for id := range idMap {
            ids = append(ids, id)
        }
        sort.Slice(ids, func(i, j int) bool { return ids[i] < ids[j] })
    }
   //4. 輪詢所有節(jié)點,給除自己之外的節(jié)點發(fā)送一條拉票的消息
    for _, id := range ids {
        if id == r.id {
            continue
        }
        var ctx []byte
        if t == campaignTransfer {
            ctx = []byte(t)
        }
        //5.根據(jù)raft協(xié)議的定義組合投票消息
        r.send(pb.Message{Term: term, To: id, Type: voteMsg, Index: r.raftLog.lastIndex(), LogTerm: r.raftLog.lastTerm(), Context: ctx})
    }
}

上面的代碼分為如下幾步:

  1. 在發(fā)送投票之前,節(jié)點首先將自己的狀態(tài)置為候選人,這一步會把term加1,然后修改自己的Vote屬性為自己的id,表示當(dāng)前周期選票投給自己。becomeCandidate() 方法如下:
func (r *raft) becomeCandidate() {
    if r.state == StateLeader {
        panic("invalid transition [leader -> candidate]")
    }
    r.step = stepCandidate  //step方法指向stepCandidate
    r.reset(r.Term + 1)  //選舉周期+1
    r.tick = r.tickElection //tick方法指向tickElection
    r.Vote = r.id  //投票給自己
    r.state = StateCandidate  //節(jié)點狀態(tài)轉(zhuǎn)為候選人
}
  1. 第二步的判斷是為了兼容單節(jié)點集群的場景,不是真正的判斷是否收到半數(shù)以上選票。對于單個節(jié)點,只要自己給自己投票了就已經(jīng)是Leader了。

3-5. 給集群中所有節(jié)點發(fā)送選票, 根據(jù)raft協(xié)議的定義,投票請求需要包含新的選舉周期,節(jié)點id和最新日志的Index。然后跟其它消息一樣調(diào)用send方法提交一條消息。

選票發(fā)送
消息發(fā)送和心跳消息一樣,也是放到raft的msg隊列中。EtcdServer拿到Ready后發(fā)送給集群中其它的節(jié)點,整個步驟中沒有針對voteMsg做特殊處理。

Follower投票

當(dāng)候選人節(jié)點將選票消息發(fā)出以后,在node中會放入recvc,最終會調(diào)用raft.Step(m pb.Message)處理這條消息。到Step()方法之前,邏輯跟心跳沒有區(qū)別,就不重復(fù)了,下面看下Step方法的處理。

func (r *raft) Step(m pb.Message) error {
    switch {
    case m.Term == 0:
        // local message
    case m.Term > r.Term:
        //1. 消息的term比當(dāng)前節(jié)點的大
        if m.Type == pb.MsgVote || m.Type == pb.MsgPreVote {
            //2. 判斷是否是人工操作的強制transfer
            force := bytes.Equal(m.Context, []byte(campaignTransfer))
            //3. 根據(jù)本地記錄判斷Leader是否已超時
            inLease := r.checkQuorum && r.lead != None && r.electionElapsed < r.electionTimeout
            if !force && inLease {
                // 如果沒超時,則直接不回復(fù)候選人
                return nil
            }
        }
        switch {
        case m.Type == pb.MsgPreVote:
            // 預(yù)選不修改term
        case m.Type == pb.MsgPreVoteResp && !m.Reject:
            // 預(yù)選的response
        default:
            if m.Type == pb.MsgApp || m.Type == pb.MsgHeartbeat || m.Type == pb.MsgSnap {
                //4-1. 如果收到了新的term的心跳、append或者snapshot,代表新的周期開始,自己變成Follower
                r.becomeFollower(m.Term, m.From)
            } else {
                //4-2. 如果收到了候選人 的投票請求,則說明當(dāng)前進(jìn)入重新選舉階段,將Leader設(shè)置成None
                r.becomeFollower(m.Term, None)
            }
        }
    case m.Term < r.Term:
        //處理收到的消息term小于當(dāng)前節(jié)點
    }

    switch m.Type {
    case pb.MsgHup:
        ...
    case pb.MsgVote, pb.MsgPreVote:
        // 5-1. 判斷是否可以投票給候選人
        canVote := r.Vote == m.From ||
            (r.Vote == None && r.lead == None) ||
            (m.Type == pb.MsgPreVote && m.Term > r.Term)
        // 5-2. 判斷候選人的日志比當(dāng)前節(jié)點的新
        if canVote && r.raftLog.isUpToDate(m.Index, m.LogTerm) {
            //6-1. 回復(fù)候選人同意
            r.send(pb.Message{To: m.From, Term: m.Term, Type: voteRespMsgType(m.Type)})
            if m.Type == pb.MsgVote {
                // 將Vote屬性改為候選人的id
                r.electionElapsed = 0
                r.Vote = m.From
            }
        } else {
            //6-2. 回復(fù)候選人不同意
            r.send(pb.Message{To: m.From, Term: r.Term, Type: voteRespMsgType(m.Type), Reject: true})
        }

    default:
        ...
    }
    return nil
}

節(jié)點收到投票的消息,處理前提是收到的Term比當(dāng)前的Term要大。

  1. ,如果是候選人發(fā)送的投票消息,首先會做一次校驗。1到3步是判斷這次投票的消息不是Transfer消息,并且選舉也沒有超時,則直接忽略掉。再次檢查最小選舉超時是為了防止集群中只有少部分節(jié)點收不到心跳,而其它節(jié)點心跳正常的情況,減少重新選舉的次數(shù)。

Transfer消息是etcd支持的人工發(fā)起的Leader轉(zhuǎn)移請求,這是為了在Leader機器性能不夠或者準(zhǔn)備下線時,人工發(fā)起切換Leader

  1. 收到term比自己大的消息時,有可能是有新的Leader當(dāng)選了,發(fā)送日志或者心跳消息出來。這種情況當(dāng)前節(jié)點無論處于什么狀態(tài)都應(yīng)該切換成Follower。如果是候選人的投票消息,則將自己的Leader設(shè)置成None,進(jìn)入選舉中階段。

5-1. 在收到候選人的投票消息后,必須滿足3種情況下的一種才可以投同意票,① 之前已經(jīng)投給這個候選人了,可能由于網(wǎng)絡(luò)的原因再次收到重復(fù)的消息;②當(dāng)前未給任何節(jié)點投過票,而且當(dāng)前的Leader是None(在低4步中設(shè)置的);③ 預(yù)選消息只需要判斷term就可以了
5-2.投同意票還有一個條件就是候選人的日志比當(dāng)前節(jié)點的新,raft中新的標(biāo)準(zhǔn)就是最后一條日志要么term更大,要么term相同Index更大

6-1. 第5步的條件都滿足后就可以回復(fù)同意給候選人了,同時將自己Vote值改為候選人的ID,這一步很關(guān)鍵,保證了同一個term中,只能投票給一個候選人
6-2. 如果第5步中的條件不滿足則拒絕候選人

選舉完成

對于候選人來說,選票發(fā)出去之后無非面臨如下2種結(jié)果:

  • 失敗,規(guī)定時間內(nèi)沒有收到超過半數(shù)同意票
  • 成功,規(guī)定時間內(nèi)收到超過半數(shù)同意票

處理選票回復(fù)

在集群網(wǎng)絡(luò)正常時,候選人應(yīng)該很快會收到各個節(jié)點對她選票的回復(fù)。消息的處理在stepCandidate()函數(shù)中:

func stepCandidate(r *raft, m pb.Message) error {
    var myVoteRespType pb.MessageType
    if r.state == StatePreCandidate {
        myVoteRespType = pb.MsgPreVoteResp
    } else {
        myVoteRespType = pb.MsgVoteResp
    }
    switch m.Type {
    case pb.MsgProp:
        //cadidate狀態(tài)下不接受客戶端數(shù)據(jù)修改請求
        return ErrProposalDropped
    case pb.MsgApp:
        //投票期間收到日志,說明其它節(jié)點成為Leader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleAppendEntries(m)
    case pb.MsgHeartbeat:
       //投票期間收到心跳,說明其它節(jié)點成為Leader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleHeartbeat(m)
    case pb.MsgSnap:
       //投票期間收到日志快照,說明其它節(jié)點成為Leader
        r.becomeFollower(m.Term, m.From) // always m.Term == r.Term
        r.handleSnapshot(m)
    case myVoteRespType:
        //收到投票反饋,則記錄并判斷是否超過半數(shù)
        gr, rj, res := r.poll(m.From, m.Type, !m.Reject)
        switch res {
        case quorum.VoteWon:
            if r.state == StatePreCandidate {
                r.campaign(campaignElection)
            } else {
                //贏得選舉,則成為Leader,開始廣播心跳和日志
                r.becomeLeader()
                r.bcastAppend()
            }
        case quorum.VoteLost:
            // 輸?shù)暨x舉,重新變成Follower
            r.becomeFollower(r.Term, None)
        }
    case pb.MsgTimeoutNow:
        ...
    }
    return nil
}

當(dāng)發(fā)起選舉的節(jié)點收到消息時,如果消息是日志、快照或者心跳消息,說明別的節(jié)點已經(jīng)成為Leader,它已經(jīng)輸?shù)袅耍瑒t直接成為Follower。
如果收到的是其它節(jié)點的投票回復(fù),則會統(tǒng)計自己的選票,如果超過半數(shù)支持,是則成為Leader;超過半數(shù)拒絕,則回到Follower狀態(tài)等待下個選舉超時。如果都沒到,則繼續(xù)等。
在贏得選舉成為Leader的情況下,根據(jù)raft協(xié)議,需要馬上開始發(fā)送心跳,以防止其它Follower開始新的選舉。

超時失敗

除了以上候選人節(jié)點在收到明確的消息時,可以判斷自己是否成功之外,還有另外一種場景。比如一個節(jié)點和其它節(jié)點網(wǎng)絡(luò)狀況不佳或者是多個節(jié)點同時成為候選人。這種場景下,即不會收到足夠的投票,也沒有收到別人成為Leader消息,為了防止節(jié)點一直等下去,需要一個超時的機制。
etcd在整個選票發(fā)送及等待選票的過程中,tick()方法是一直在運行的,如果自己一直沒有當(dāng)選,別人也沒當(dāng)選超時的話。候選人會發(fā)起重新一輪的選舉,邏輯跟第一輪是一樣的。回顧下這個方法,其實無論是處于Follower還是Candidate狀態(tài),tick的邏輯是一樣的。

func (r *raft) tickElection() {
    r.electionElapsed++
    if r.promotable() && r.pastElectionTimeout() {
        r.electionElapsed = 0
        r.Step(pb.Message{From: r.id, Type: pb.MsgHup})
    }
}

預(yù)選舉

什么是預(yù)選
etcd中,raft狀態(tài)機新增了一個狀態(tài)叫做預(yù)候選人。如果用戶在啟動etcd時配置了PreVote屬性為true,則每次選舉開始之前,都會先來一輪預(yù)選。所謂預(yù)選,就是節(jié)點在成為正式候選人之前,先發(fā)送一個預(yù)選的消息給集群內(nèi)所有節(jié)點(MsgPreVote),如果超過半數(shù)節(jié)點都同意,候選人才會開始一次正式的選舉。
在預(yù)選階段,候選節(jié)點的狀態(tài)變?yōu)?code>PreCadidate。而其它節(jié)點仍然保持原來的狀態(tài),也就是說這時候又有其它Follower要發(fā)起選舉并發(fā)送預(yù)選請求,其它節(jié)點也是會同意的。
當(dāng)進(jìn)入預(yù)選狀態(tài)的節(jié)點,收到超過半數(shù)同意后,則正式進(jìn)入候選人狀態(tài)(Candidate)
為什么需要預(yù)選
添加預(yù)選的原因是為了在網(wǎng)絡(luò)狀況不佳時,減少選舉次數(shù)。舉個具體場景,當(dāng)集群中的網(wǎng)絡(luò)不穩(wěn)定時,會有部分Follower不能及時地收到Leader的心跳,這時候就會有Follower發(fā)起選舉。但是網(wǎng)絡(luò)原因,它自身也很難拿到超過半數(shù)選票當(dāng)選,或者當(dāng)選之后也很快就會有別的節(jié)點因為收不到心跳而再次發(fā)起選舉,這就導(dǎo)致了集群經(jīng)常處于選舉狀態(tài)而不可用。為了防止這種情況的頻繁發(fā)生,添加預(yù)選階段,等于把Leader掛掉這件事從單個節(jié)點自己判斷,變成了半數(shù)節(jié)點一起判斷,大大減少了誤判。
凡事都有利有弊,當(dāng)Leader雖然沒掛掉,但性能有問題時,可能只影響了不到一半的節(jié)點。添加預(yù)選之后可能會導(dǎo)致性能不佳的Leader很難被選下去,從而影響讀寫性能。

總結(jié)

選舉是保證raft安全性的基礎(chǔ),心跳是保證集群能夠盡快從Leader宕機或者網(wǎng)絡(luò)分區(qū)中恢復(fù)的關(guān)鍵。etcd中將心跳和計時做了集成,抽象成tick。tick操作在Leader端用來觸發(fā)raft定時發(fā)送心跳,而在Follower端是為了觸發(fā)檢查Leader是否超時。
Raft模塊通過tick操作來觸發(fā)狀態(tài)機在不同狀態(tài)中的轉(zhuǎn)換,通過綁定不同的函數(shù)來對消息進(jìn)行處理和反饋。
選舉完成后,etcd就可以通過在集群中復(fù)制日志來保證用戶對數(shù)據(jù)讀寫的分布式保存和一致性保證了。下一篇將重點解析etcd中日志復(fù)制和提交生效的過程。

【鏈接】
Raft協(xié)議實現(xiàn)之etcd(一):基本架構(gòu)
分布式一致性協(xié)議-Raft詳解 (一) 選舉

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容