ETCD源碼閱讀(三)

閱讀raftexample:etcd/contrib/raftexample

serveChannels()

func (rc *raftNode) serveChannels() {
    snap, err := rc.raftStorage.Snapshot()
    if err != nil {
        panic(err)
    }
    rc.confState = snap.Metadata.ConfState
    rc.snapshotIndex = snap.Metadata.Index
    rc.appliedIndex = snap.Metadata.Index

    defer rc.wal.Close()

    ticker := time.NewTicker(100 * time.Millisecond)
    defer ticker.Stop()

    // send proposals over raft
    go func() {
        confChangeCount := uint64(0)

        for rc.proposeC != nil && rc.confChangeC != nil {
            select {
            case prop, ok := <-rc.proposeC:
                if !ok {
                    rc.proposeC = nil
                } else {
                    // blocks until accepted by raft state machine
                    rc.node.Propose(context.TODO(), []byte(prop))
                }

            case cc, ok := <-rc.confChangeC:
                if !ok {
                    rc.confChangeC = nil
                } else {
                    confChangeCount++
                    cc.ID = confChangeCount
                    rc.node.ProposeConfChange(context.TODO(), cc)
                }
            }
        }
        // client closed channel; shutdown raft if not already
        close(rc.stopc)
    }()

    // event loop on raft state machine updates
    for {
        select {
        case <-ticker.C:
            rc.node.Tick()

        // store raft entries to wal, then publish over commit channel
        case rd := <-rc.node.Ready():
            rc.wal.Save(rd.HardState, rd.Entries)
            if !raft.IsEmptySnap(rd.Snapshot) {
                rc.saveSnap(rd.Snapshot)
                rc.raftStorage.ApplySnapshot(rd.Snapshot)
                rc.publishSnapshot(rd.Snapshot)
            }
            rc.raftStorage.Append(rd.Entries)
            rc.transport.Send(rd.Messages)
            applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
            if !ok {
                rc.stop()
                return
            }
            rc.maybeTriggerSnapshot(applyDoneC)
            rc.node.Advance()

        case err := <-rc.transport.ErrorC:
            rc.writeError(err)
            return

        case <-rc.stopc:
            rc.stop()
            return
        }
    }
}

serveChannels函數(shù),是 raftexample 中用于處理 Raft 消息的主要事件循環(huán)。

  1. 通過調(diào)用 rc.raftStorage.Snapshot() 方法獲取 etcd 當(dāng)前的快照數(shù)據(jù),并將其中的集群配置狀態(tài)(ConfState)、快照索引(snapshotIndex)和已提交的索引(appliedIndex)等元數(shù)據(jù)信息記錄到當(dāng)前 raftNode 實例(即 rc 變量)中。
  2. 創(chuàng)建一個定時器 ticker,每 100 毫秒觸發(fā)一次,用于驅(qū)動 Raft 狀態(tài)機(jī)的 Tick() 方法。這個Tick()方法使Raft狀態(tài)機(jī)的內(nèi)部邏輯時鐘加一,選舉超時和心跳超時都以“tick”的單位來計算。選舉超時:當(dāng)一個節(jié)點在經(jīng)過一段時間(通常是幾個 tick)沒有收到心跳和選舉請求時,就會從follower狀態(tài)轉(zhuǎn)變成candidate狀態(tài),發(fā)起一次選舉。心跳超時:只有l(wèi)eader擁有這個計時器,超時了就給follower發(fā)送心跳包。follower收到后就會把選舉超市計數(shù)器置零。
  3. 創(chuàng)建一個新的 goroutine,用于將處理KVStore發(fā)送的proposal,HTTP API Server對集群配置的更改請求(如添加或刪除節(jié)點)。
  4. 創(chuàng)建一個處理Raft狀態(tài)機(jī)更新的Goroutine,不斷處理以下事件:
    • ticker事件:參考上一段

    • rc.node.Ready():

      case rd := <-rc.node.Ready():
          rc.wal.Save(rd.HardState, rd.Entries)
          if !raft.IsEmptySnap(rd.Snapshot) {
              rc.saveSnap(rd.Snapshot)
              rc.raftStorage.ApplySnapshot(rd.Snapshot)
              rc.publishSnapshot(rd.Snapshot)
          }
          rc.raftStorage.Append(rd.Entries)
          rc.transport.Send(rd.Messages)
          applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
          if !ok {
              rc.stop()
              return
          }
          rc.maybeTriggerSnapshot(applyDoneC)
          // Advance notifies the Node that the application has 
          // saved progress up to the last Ready. 
          // It prepares the node to return the next available Ready.
          // The application should generally call Advance after
          // it applies the entries in last Ready.
          rc.node.Advance()
      

      獲取aft節(jié)點已經(jīng)準(zhǔn)備好的一批Entry和當(dāng)前的HardState(給客戶端發(fā)送狀態(tài)更新前的狀態(tài)),并保存到 WAL 中。接著判斷是否有快照數(shù)據(jù),如果有就更新快照。

      將Entry添加到raft節(jié)點的raftStorage中。然后調(diào)用 rc.transport.Send() 方法將 Messages 發(fā)送給其他節(jié)點。

      最后調(diào)用 rc.entriesToApply() 方法獲取需要apply的 committed entries,通過調(diào)用 rc.publishEntries() 方法將這些 committed entries 應(yīng)用到狀態(tài)機(jī)中,并返回一個 channel 用于通知應(yīng)用完成。

    • rc.transport.ErrorC 和 rc.stopc:出現(xiàn)錯誤需要關(guān)閉Raft Node

raft.go

這段代碼經(jīng)過前面的解讀,已經(jīng)理解的差不多了,我們來看看對應(yīng)的單元測試吧。選了一個比較有意思的。

// TestProposeOnCommit starts three nodes and feeds commits back into the proposal
// channel. The intent is to ensure blocking on a proposal won't block raft progress.
func TestProposeOnCommit(t *testing.T) {
    clus := newCluster(3)
    defer clus.closeNoErrors(t)

    donec := make(chan struct{})
    for i := range clus.peers {
        // feedback for "n" committed entries, then update donec
        go func(pC chan<- string, cC <-chan *commit, eC <-chan error) {
            for n := 0; n < 100; n++ {
                c, ok := <-cC
                if !ok {
                    pC = nil
                }
                select {
                case pC <- c.data[0]:
                    continue
                case err := <-eC:
                    t.Errorf("eC message (%v)", err)
                }
            }
            donec <- struct{}{}
            for range cC {
                // acknowledge the commits from other nodes so
                // raft continues to make progress
            }
        }(clus.proposeC[i], clus.commitC[i], clus.errorC[i])

        // one message feedback per node
        go func(i int) { clus.proposeC[i] <- "foo" }(i)
    }

    for range clus.peers {
        <-donec
    }
}

先mock了一個三節(jié)點的集群(其實就是將對應(yīng)的channel連接上),然后在每個節(jié)點上啟動了兩個 goroutine。其中,一個 goroutine 持續(xù)地從該節(jié)點的 commit channel 中讀取消息,然后將第一條消息反饋給該節(jié)點的 propose channel。另一個 goroutine 僅向該節(jié)點的 propose channel 中發(fā)送一條消息。

這樣就會發(fā)生不斷地從 propose channel 中讀取并將消息發(fā)送到 commit channel 中的過程,然后在一段時間后結(jié)束這個循環(huán)并向 donec channel 發(fā)送一個空結(jié)構(gòu)體來指示完成。最后測試會阻塞直到從所有的節(jié)點都接收到了 donec channel 中的空結(jié)構(gòu)體,以確保測試的所有 goroutine 都已結(jié)束。

這個測試是為了驗證:即使在等待反饋消息的時候,也不會阻塞 raft 的進(jìn)展。

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容