Raft協(xié)議實(shí)現(xiàn)之etcd(三):日志同步

前言

Etcd集群通過(guò)Raft協(xié)議的選舉機(jī)制產(chǎn)生一個(gè)Leader,這樣客戶端在對(duì)etcd存儲(chǔ)的數(shù)據(jù)進(jìn)行更新時(shí),就可以直接將請(qǐng)求發(fā)給Leader。一旦回復(fù)客戶端成功,則表示數(shù)據(jù)更新完成,即使因?yàn)楣收蠈?dǎo)致Leader后續(xù)無(wú)法響應(yīng),客戶端也可以從新選出的Leader上讀到正確的數(shù)據(jù)。在etcd的實(shí)現(xiàn)中,通過(guò)兩點(diǎn)來(lái)保證這一點(diǎn),通過(guò)Raft協(xié)議的日志復(fù)制保證數(shù)據(jù)更新請(qǐng)求同步到超過(guò)半數(shù)節(jié)點(diǎn)后提交,通過(guò)KV存儲(chǔ)提供帶事務(wù)的數(shù)據(jù)更新。所以如果返回成功的結(jié)果給客戶端,則數(shù)據(jù)一定已經(jīng)寫(xiě)入Leader的KV存儲(chǔ)中。
KV存儲(chǔ)實(shí)現(xiàn)不屬于Raft范疇,Raft只保證日志復(fù)制的一致性,這篇文章通過(guò)etcd的源碼解析來(lái)看下raft日志復(fù)制的實(shí)現(xiàn),以及etcd如何實(shí)現(xiàn)日志的持久存儲(chǔ)。Raft對(duì)于日志復(fù)制的一致性保證請(qǐng)參考之前的原理解析文章(傳送門)。

存儲(chǔ)設(shè)計(jì)

Etcd中跟存儲(chǔ)部分相關(guān)的模塊主要有3塊,Raft狀態(tài)機(jī)中存儲(chǔ)的日志條目、持久化到文件的日志條目以及后端的KV存儲(chǔ)。

Raft狀態(tài)機(jī)存儲(chǔ)

回顧下第一篇中講到的Etcd整體架構(gòu),raft模塊只負(fù)責(zé)算法實(shí)現(xiàn),所以所有收到的日志條目都是存在內(nèi)存中。數(shù)據(jù)結(jié)構(gòu)如下:


EtcdServer

上圖中,所有日志條目都是存儲(chǔ)在一個(gè)raftLog的結(jié)構(gòu)中.

type raftLog struct {
    // 自從上次快照后已經(jīng)持久化的日志
    storage Storage
    // 還未持久化的日志
    unstable unstable
    // 集群中已提交的日志index
    committed uint64
    // 本節(jié)點(diǎn)已經(jīng)應(yīng)用到狀態(tài)機(jī)的日志index
    applied uint64
        ...
        ...
}
  1. raftLog中通過(guò)兩個(gè)字段來(lái)存儲(chǔ)日志,storage存儲(chǔ)了已經(jīng)持久化到磁盤的日志和最近一次快照的信息,也就是上圖中已經(jīng)寫(xiě)到了WAL中的數(shù)據(jù)。這部分日志即使在節(jié)點(diǎn)重啟的情況下也不會(huì)丟失,重啟時(shí)etcd會(huì)從wal中讀取出這部分?jǐn)?shù)據(jù)寫(xiě)到raft的內(nèi)存中。
    為啥是上次快照之后的呢?因?yàn)閞aft節(jié)點(diǎn)的內(nèi)存畢竟是有限的,etcd中會(huì)定期對(duì)KV做快照,快照結(jié)束之后,storage就只需要存儲(chǔ)快照的信息和在快照之后接收的日志就可以了,這在raft協(xié)議中也有定義。
  2. unstable結(jié)構(gòu)中存儲(chǔ)了尚未持久化的日志條目和快照,當(dāng)日志持久化之后就會(huì)從unstable中移到storage中。
  3. raft協(xié)議的committed和applied屬性也存在raftLog中,因?yàn)楦鶕?jù)raft協(xié)議的規(guī)定,這兩個(gè)屬性也是需要持久化存儲(chǔ)的。

Storage
raft狀態(tài)機(jī)Storage接口定義如下:

type Storage interface {
    // 已經(jīng)持久化的HardState和ConfState信息
    InitialState() (pb.HardState, pb.ConfState, error)
    // 返回日志條目
    Entries(lo, hi, maxSize uint64) ([]pb.Entry, error)
    // 當(dāng)前的選舉周期
    Term(i uint64) (uint64, error)
    // 最后一條日志的index
    LastIndex() (uint64, error)
    // 第一條日志的index
    FirstIndex() (uint64, error)
    // 返回最近一次Snapshot
    Snapshot() (pb.Snapshot, error)
}

Storage接口定義了所有Raft協(xié)議中要求的需要持久化的信息接口,比如HardState中的term、commitIndex,以及日志條目等。
etcd中對(duì)于該接口的默認(rèn)實(shí)現(xiàn)是MemoryStorage,從名字可以看出來(lái)數(shù)據(jù)是存在內(nèi)存中的,看起來(lái)這么做跟raft的要求不符。這是因?yàn)?code>MemoryStorage中存的日志和狀態(tài)信息都在WAL中,所以這里只需要內(nèi)存就夠了,重啟的時(shí)候,etcd會(huì)從WAL中恢復(fù)數(shù)據(jù)寫(xiě)道Storage中。MemoryStorage的定義如下:

type MemoryStorage struct {
    // 讀寫(xiě)鎖
    sync.Mutex
   // term,commitIndex,vote封裝在HardState中
    hardState pb.HardState
   //最近一次Snapshot
    snapshot  pb.Snapshot
   //snapshot之后的日志條目,第一條日志條目的index為snapshot.Metadata.Index
    ents []pb.Entry
}

unstable
Raft模塊已經(jīng)收到還沒(méi)有持久化到WAL的日志條目存在unstable中

type unstable struct {
    // 從leader收到的snapshot
    snapshot *pb.Snapshot
    // 新收到還未持久化的日志條目
    entries []pb.Entry
  //第一條日志的偏移量
    offset  uint64
}

日志持久化存儲(chǔ)

Raft模塊日志數(shù)據(jù)持久化通過(guò)WAL實(shí)現(xiàn),WAL通過(guò)追加寫(xiě)的方式來(lái)將數(shù)據(jù)寫(xiě)入磁盤以提高性能。Etcd在如下幾種情況下會(huì)在WAL追加一條記錄:

  • 節(jié)點(diǎn)啟動(dòng)時(shí)記錄節(jié)點(diǎn)和集群信息,對(duì)應(yīng)的記錄類型是metadataType;
  • 收到新的日志條目,對(duì)應(yīng)的記錄類型是entryType;
  • 狀態(tài)變化時(shí),比如新的選舉周期,commitIndex變化等,對(duì)應(yīng)的記錄類型是stateType;
  • 做數(shù)據(jù)快照時(shí),對(duì)應(yīng)的記錄類型是snapshotType;
  • 生成新的wal文件時(shí),wal文件達(dá)到一定大小時(shí),etcd就會(huì)生成一個(gè)新的文件,新的文件第一條記錄會(huì)記錄上一個(gè)文件的crc,以備數(shù)據(jù)校驗(yàn)。對(duì)應(yīng)的記錄類型是crcType
type WAL struct {
    lg *zap.Logger
   // wal文件的存儲(chǔ)目錄
    dir string 
    dirFile *os.File
   // wal文件構(gòu)建后會(huì)寫(xiě)的第一個(gè)metadata記錄
    metadata []byte     
   // wal文件構(gòu)建后會(huì)寫(xiě)的第一個(gè)state記錄
    state    raftpb.HardState 
   // wal開(kāi)始的snapshot,代表讀取wal時(shí)從這個(gè)snapshot的記錄之后開(kāi)始
    start     walpb.Snapshot
   //wal記錄的反序列化器
    decoder   *decoder    
    ...
   //底層數(shù)據(jù)文件列表
    locks []*fileutil.LockedFile 
   //
}

WAL底層對(duì)應(yīng)著磁盤上一系列文件,當(dāng)收到需要持久化的日志條目時(shí)就會(huì)追加到文件的末尾,文件達(dá)到一定大小時(shí),WAL會(huì)主動(dòng)創(chuàng)建一個(gè)新的磁盤文件,防止單個(gè)WAL文件過(guò)大。
etcd 會(huì)定期對(duì)數(shù)據(jù)做快照,快照時(shí)會(huì)在WAL中追加一條記錄。在etcd節(jié)點(diǎn)重啟恢復(fù)時(shí),會(huì)查找wal中最后一次快照的記錄,將快照后的日志條目重新給到raft模塊恢復(fù)內(nèi)存數(shù)據(jù)。

KV數(shù)據(jù)庫(kù)存儲(chǔ)

Etcd最終生效的數(shù)據(jù)存在KV數(shù)據(jù)庫(kù)中,并對(duì)后端存儲(chǔ)抽象了一個(gè)Backend接口,Backend的實(shí)現(xiàn)需要支持事務(wù)和多版本管理。Backend接口的定義如下:

type Backend interface {
    // 開(kāi)啟讀事務(wù).
    ReadTx() ReadTx
    //開(kāi)啟寫(xiě)事務(wù)
    BatchTx() BatchTx
    // 開(kāi)啟并發(fā)讀事務(wù),互相之間不阻塞
    ConcurrentReadTx() ReadTx
    // 對(duì)db做快照
    Snapshot() Snapshot
    Hash(ignores map[IgnoreKey]struct{}) (uint32, error)
    // DB占用的物理磁盤大小,空間可以預(yù)分配,所以不是實(shí)際數(shù)據(jù)大小
    Size() int64
    // 實(shí)際使用的磁盤空間
    SizeInUse() int64
    // 返回當(dāng)前讀事務(wù)個(gè)數(shù)
    OpenReadTxN() int64
    // 數(shù)據(jù)文件整理,會(huì)回收已刪除key和已更新的key舊版本占用的磁盤
    Defrag() error
    ForceCommit()
    Close() error
}

該接口的默認(rèn)實(shí)現(xiàn)如下:

type backend struct {
    // 已經(jīng)占用的磁盤大小
    size int64
    // 實(shí)際使用的大小
    sizeInUse int64
    // 已提交事務(wù)數(shù)
    commits int64
    // 當(dāng)前開(kāi)啟的讀事務(wù)數(shù)
    openReadTxN int64
   // 讀寫(xiě)鎖
    mu sync.RWMutex
    //底層存儲(chǔ)為boltDB
    db *bolt.DB
   // 批量寫(xiě)提交間隔
    batchInterval time.Duration
   // 批量寫(xiě)最大事務(wù)數(shù)
    batchLimit    int
   // 寫(xiě)事務(wù)緩沖隊(duì)列
    batchTx       *batchTxBuffered
   // 寫(xiě)事務(wù)
    readTx *readTx

    stopc chan struct{}
    donec chan struct{}

    lg *zap.Logger
}

從上面的實(shí)現(xiàn)中可以看出,etcd的默認(rèn)底層存儲(chǔ)使用的是boltDB。為了提高讀寫(xiě)效率,etcd會(huì)維護(hù)一個(gè)寫(xiě)事務(wù)的緩存隊(duì)列,當(dāng)隊(duì)列大小達(dá)到一定數(shù)或者離上次已經(jīng)過(guò)了一定的時(shí)間后,才會(huì)真正將數(shù)據(jù)寫(xiě)到磁盤上。

存儲(chǔ)總結(jié)

數(shù)據(jù)從客戶端提交到Etcd后,會(huì)經(jīng)過(guò)3個(gè)存儲(chǔ)的地方。首先會(huì)進(jìn)入Raft算法模塊,raft將日志保存在內(nèi)存中,然后通知etcd持久化。為了提高效率,etcd會(huì)將數(shù)據(jù)寫(xiě)到WAL中,因?yàn)閣al底層文件只追加不更新和刪除,所以完成這一步數(shù)據(jù)就不會(huì)丟了。之后etcd的leader節(jié)點(diǎn)將日志分發(fā)到集群中,當(dāng)收到超過(guò)半數(shù)節(jié)點(diǎn)響應(yīng)后,就會(huì)提交數(shù)據(jù),將數(shù)據(jù)存入后端KV存儲(chǔ)中。

日志同步

了解了etcd中的存儲(chǔ)設(shè)計(jì),可以更好的理解一條數(shù)據(jù)變更請(qǐng)求的整個(gè)流轉(zhuǎn)過(guò)程,下面通過(guò)源碼看一下。

請(qǐng)求處理

當(dāng)客戶端提交一條數(shù)據(jù)變更請(qǐng)求時(shí),比如put hello 為 world的寫(xiě)請(qǐng)求,v3版本中會(huì)調(diào)用EtcdServer的Put()方法,最終都會(huì)調(diào)用到processInternalRaftRequestOnce()。

func (s *EtcdServer) processInternalRaftRequestOnce(ctx context.Context, r pb.InternalRaftRequest) (*applyResult, error) {
    //判斷已提交未apply的記錄是否超過(guò)限制
    ai := s.getAppliedIndex()
    ci := s.getCommittedIndex()
    if ci > ai+maxGapBetweenApplyAndCommitIndex {
        return nil, ErrTooManyRequests
    }
    //生成一個(gè)requestID
    r.Header = &pb.RequestHeader{
        ID: s.reqIDGen.Next(),
    }

    authInfo, err := s.AuthInfoFromCtx(ctx)
    if err != nil {
        return nil, err
    }
    if authInfo != nil {
        r.Header.Username = authInfo.Username
        r.Header.AuthRevision = authInfo.Revision
    }
    //反序列化請(qǐng)求數(shù)據(jù)
    data, err := r.Marshal()
    if err != nil {
        return nil, err
    }

    if len(data) > int(s.Cfg.MaxRequestBytes) {
        return nil, ErrRequestTooLarge
    }

    id := r.ID
    if id == 0 {
        id = r.Header.ID
    }
    //注冊(cè)一個(gè)channel,等待處理完成
    ch := s.w.Register(id)
    //設(shè)置請(qǐng)求超時(shí)
    cctx, cancel := context.WithTimeout(ctx, s.Cfg.ReqTimeout())
    defer cancel()

    start := time.Now()
    // 調(diào)用raft模塊的Propose處理請(qǐng)求
    err = s.r.Propose(cctx, data)
    if err != nil {
        proposalsFailed.Inc()
        s.w.Trigger(id, nil) // GC wait
        return nil, err
    }
    proposalsPending.Inc()
    defer proposalsPending.Dec()
    
    select {
    // 等待收到apply結(jié)果返回給客戶端
    case x := <-ch:
        return x.(*applyResult), nil
    case <-cctx.Done():
        proposalsFailed.Inc()
        s.w.Trigger(id, nil) // GC wait
        return nil, s.parseProposeCtxErr(cctx.Err(), start)
    case <-s.done:
        return nil, ErrStopped
    }
}

上面的方法中,etcd對(duì)請(qǐng)求做了基本的校驗(yàn)之后,會(huì)通過(guò)調(diào)用Propose()方法提交給Raft處理,然后等待反饋。在etcd實(shí)現(xiàn)中,會(huì)一直到數(shù)據(jù)apply到狀態(tài)機(jī)之后,才會(huì)返回結(jié)果給客戶端。在Propose()方法中,raft會(huì)將請(qǐng)求封裝成一個(gè)MsgProp消息并調(diào)用Step函數(shù)。

func (rn *RawNode) Propose(data []byte) error {
    return rn.raft.Step(pb.Message{
        Type: pb.MsgProp,
        From: rn.raft.id,
        Entries: []pb.Entry{
            {Data: data},
        }})
}

etcd中只允許Leader處理數(shù)據(jù)變更請(qǐng)求,所以如果是Follower收到客戶端的命令,會(huì)直接轉(zhuǎn)給leader處理,然后等待Leader的反饋后將結(jié)果返回給客戶端。所以,這里只需要看Leader的處理邏輯,上面的Step()函數(shù)最終調(diào)用的是raft模塊的stepLeader(*raft, pb.Message) 函數(shù)。

對(duì)于為什么進(jìn)到stepLeader方法,前一篇文章里面已經(jīng)講過(guò)了,印象不深的話可以回看一下

func stepLeader(r *raft, m pb.Message) error {
    // These message types do not require any progress for m.From.
    switch m.Type {
    case pb.MsgBeat:
        ...
    case pb.MsgCheckQuorum:
        ...
    case pb.MsgProp:
        if len(m.Entries) == 0 {
            r.logger.Panicf("%x stepped empty MsgProp", r.id)
        }
        if r.prs.Progress[r.id] == nil {
            // 判斷當(dāng)前節(jié)點(diǎn)是不是已經(jīng)被從集群中移除了
            return ErrProposalDropped
        }
        if r.leadTransferee != None {
            // 如果正在進(jìn)行l(wèi)eader切換,拒絕寫(xiě)入
            return ErrProposalDropped
        }

        for i := range m.Entries {
            //判斷是否有配置變更的日志,有的話做一些特殊處理
        }
        //將日志追加到raft狀態(tài)機(jī)中
        if !r.appendEntry(m.Entries...) {
            return ErrProposalDropped
        }
        // 發(fā)送日志給集群其它節(jié)點(diǎn)
        r.bcastAppend()
        return nil
    case pb.MsgReadIndex:
        ...
        return nil
    }
    ...
    ...
    return nil
}

Raft協(xié)議是一個(gè)基于日志復(fù)制的協(xié)議,所以客戶端數(shù)據(jù)變更請(qǐng)求會(huì)封裝成一條日志條目。上面的邏輯中首先做了一些基本的校驗(yàn),通過(guò)后將Message中的日志條目追加到raft的日志列表中,追加成功后就會(huì)將日志廣播給所有Follower。

Raft日志新增

上面講存儲(chǔ)的時(shí)候講到,raft算法實(shí)現(xiàn)模塊只是將日志存在內(nèi)存中,所以appendEntry的邏輯也很簡(jiǎn)單。

func (r *raft) appendEntry(es ...pb.Entry) (accepted bool) {
     //1. 獲取raft節(jié)點(diǎn)當(dāng)前最后一條日志條目的index
    li := r.raftLog.lastIndex()
     //2. 給新的日志條目設(shè)置term和index
    for i := range es {
        es[i].Term = r.Term
        es[i].Index = li + 1 + uint64(i)
    }
    // 3. 判斷未提交的日志條目是不是超過(guò)限制,是的話拒絕并返回失敗
    if !r.increaseUncommittedSize(es) {
        return false
    }
    // 4. 將日志條目追加到raftLog中
    li = r.raftLog.append(es...)
    // 5. 檢查并更新日志進(jìn)度
    r.prs.Progress[r.id].MaybeUpdate(li)
    // 6. 判斷是否做一次commit
    r.maybeCommit()
    return true
}
  1. 獲取raft當(dāng)前日志中最后一條日志條目的index
  2. raft的日志條目index是單調(diào)遞增的
  3. etcd限制了leader上最多有多少未提交的條目,防止因?yàn)閘eader和follower之間出現(xiàn)網(wǎng)絡(luò)問(wèn)題時(shí),導(dǎo)致條目一直累積。
  4. 將日志條目追加到raftLog內(nèi)存隊(duì)列中,并且返回最大一條日志的index,對(duì)于leader追加日志的情況,這里返回的li肯定等于方法第1行中獲取的li
  5. raft的leader節(jié)點(diǎn)保存了所有節(jié)點(diǎn)的日志同步進(jìn)度,這里面也包括它自己
  6. 這里忽略maybeCommit()結(jié)果,直接返回true,開(kāi)始廣播日志。

同步給Follower

Leader節(jié)點(diǎn)將日志條目存到raftLog的內(nèi)存中后,調(diào)用bcastAppend()方法觸發(fā)一次廣播操作,同步日志給Follower。

func (r *raft) bcastAppend() {
    //遍歷所有節(jié)點(diǎn),給除自己外的節(jié)點(diǎn)發(fā)送日志Append消息
    r.prs.Visit(func(id uint64, _ *tracker.Progress) {
        if id == r.id {
            return
        }
        r.sendAppend(id)
    })
}

func (r *raft) sendAppend(to uint64) {
    r.maybeSendAppend(to, true)
}

func (r *raft) maybeSendAppend(to uint64, sendIfEmpty bool) bool {
    //1. 獲取對(duì)端節(jié)點(diǎn)當(dāng)前同步進(jìn)度
    pr := r.prs.Progress[to]
    if pr.IsPaused() {
        return false
    }
    m := pb.Message{}
    m.To = to
    //2. 注意這里帶的term是本次發(fā)送給follower的第一條日志條目的term
    term, errt := r.raftLog.term(pr.Next - 1)
    ents, erre := r.raftLog.entries(pr.Next, r.maxMsgSize)
    if len(ents) == 0 && !sendIfEmpty {
        return false
    }

    if errt != nil || erre != nil { 
        //3. 如果獲取term或日志失敗,說(shuō)明follower落后太多,raftLog內(nèi)存中日志已經(jīng)做過(guò)快照后被刪除了
        if !pr.RecentActive {
            r.logger.Debugf("ignore sending snapshot to %x since it is not recently active", to)
            return false
        }
        //4. 改為發(fā)送Snapshot消息
        m.Type = pb.MsgSnap
        snapshot, err := r.raftLog.snapshot()
        if err != nil {
            if err == ErrSnapshotTemporarilyUnavailable {
                r.logger.Debugf("%x failed to send snapshot to %x because snapshot is temporarily unavailable", r.id, to)
                return false
            }
            panic(err) // TODO(bdarnell)
        }
        if IsEmptySnap(snapshot) {
            panic("need non-empty snapshot")
        }
        m.Snapshot = snapshot
        sindex, sterm := snapshot.Metadata.Index, snapshot.Metadata.Term
        pr.BecomeSnapshot(sindex)
    } else {
        //5. 發(fā)送Append消息
        m.Type = pb.MsgApp
        m.Index = pr.Next - 1
        m.LogTerm = term
        m.Entries = ents
        //6. 每次發(fā)送日志或心跳都會(huì)帶上最新的commitIndex
        m.Commit = r.raftLog.committed
        if n := len(m.Entries); n != 0 {
            ...
            ...
        }
    }
    //7. 發(fā)送消息
    r.send(m)
    return true
}

上面的邏輯中,leader在收到新的更新日志后,會(huì)遍歷集群中所有follower節(jié)點(diǎn),觸發(fā)一次日志同步。

  1. 按照raft協(xié)議規(guī)定,leader需要緩存當(dāng)前所有Follower的日志同步進(jìn)度
  2. 根據(jù)日志進(jìn)度去取日志條目的時(shí)候發(fā)現(xiàn),follower日志落后太多,這通常出現(xiàn)在新節(jié)點(diǎn)剛加入或者網(wǎng)絡(luò)連接出現(xiàn)故障的情況下。那么在這種情況下,leader改為發(fā)送最近一次快照給Follower,從而提高同步效率
  3. 正常情況下會(huì)發(fā)送新的日志給Follower,消息類型為MsgApp,最終調(diào)用r.send(m)提交消息。

日志寫(xiě)WAL

在上一篇講心跳消息發(fā)送的時(shí)候已經(jīng)講過(guò),EtcdServer中會(huì)有一個(gè)goroutine監(jiān)聽(tīng)raft的channel是不是有新的Ready數(shù)據(jù)過(guò)來(lái),收到后就會(huì)將里面的msgs發(fā)送給接收端。這個(gè)MsgApp類型的消息也是一樣提交的,這里就不在重復(fù)了。
日志發(fā)送給Follower的同時(shí),Leader會(huì)將日志落盤,即寫(xiě)到WAL中,這是通過(guò)調(diào)用WAL.Save()方法實(shí)現(xiàn)的。

func (w *WAL) Save(st raftpb.HardState, ents []raftpb.Entry) error {
    //獲取wal的寫(xiě)鎖
    w.mu.Lock()
    defer w.mu.Unlock()
    // HardState變化或者新的日志條目則需要寫(xiě)wal
    if raft.IsEmptyHardState(st) && len(ents) == 0 {
        return nil
    }
    mustSync := raft.MustSync(st, w.state, len(ents))

    // 寫(xiě)日志條目
    for i := range ents {
        if err := w.saveEntry(&ents[i]); err != nil {
            return err
        }
    }
    // 寫(xiě)state變化
    if err := w.saveState(&st); err != nil {
        return err
    }
    // 判斷文件大小是否超過(guò)最大值
    curOff, err := w.tail().Seek(0, io.SeekCurrent)
    if err != nil {
        return err
    }
    if curOff < SegmentSizeBytes {
        if mustSync {
            return w.sync()
        }
        return nil
    }
    // 文件切分
    return w.cut()
}

WAL文件結(jié)構(gòu)上面已經(jīng)講過(guò)了,對(duì)于新增日志的情況,wal中新增entryType的記錄。

Follower日志處理

Leader節(jié)點(diǎn)處理完命令后,發(fā)送日志和持久化操作都是異步進(jìn)行的,但是這不代表客戶端已經(jīng)收到回復(fù)。Raft協(xié)議要求在返回客戶端成功的時(shí)候,日志一定已經(jīng)提交了,所以Leader需要等待超過(guò)半數(shù)的Follower節(jié)點(diǎn)處理完日志并反饋,下面先看一下Follower的日志處理。
日志消息到達(dá)Follower后,也是由EtcdServer.Process()方法來(lái)處理,最終會(huì)進(jìn)到Raft模塊的stepFollower()函數(shù)中。

func stepFollower(r *raft, m pb.Message) error {
    switch m.Type {
    ...
    case pb.MsgApp:
        // 重置心跳計(jì)數(shù)
        r.electionElapsed = 0
        // 設(shè)置Leader
        r.lead = m.From
        // 處理日志條目
        r.handleAppendEntries(m)
    ...
    }
    ...
}

Follower收到消息后首先跟心跳消息的處理邏輯一樣,重置心跳計(jì)數(shù)和leader,然后再處理日志條目。

func (r *raft) handleAppendEntries(m pb.Message) {
    // 判斷是否是過(guò)時(shí)的消息
    if m.Index < r.raftLog.committed {
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: r.raftLog.committed})
        return
    }

    if mlastIndex, ok := r.raftLog.maybeAppend(m.Index, m.LogTerm, m.Commit, m.Entries...); ok {
        // 處理成功,發(fā)送MsgAppResp給Leader
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: mlastIndex})
    } else {
        // 日志的index和Follower的lastIndex不匹配,返回reject消息
        r.send(pb.Message{To: m.From, Type: pb.MsgAppResp, Index: m.Index, Reject: true, RejectHint: r.raftLog.lastIndex()})
    }
}

調(diào)用raftLog存儲(chǔ)日志,并返回結(jié)果給Leader。這里follower失敗可能有2種情況造成的,一種是日志條目中帶的term和follower的term不一致,還有一種是日志列表中最小的index大于follower的最大的日志index。
上面的maybeAppend() 方法只會(huì)將日志存儲(chǔ)到RaftLog維護(hù)的內(nèi)存隊(duì)列中,日志的持久化是異步進(jìn)行的,這個(gè)和Leader節(jié)點(diǎn)的存儲(chǔ)WAL邏輯基本相同。有一點(diǎn)區(qū)別就是follower節(jié)點(diǎn)正式發(fā)送MsgAppResp消息會(huì)在wal保存成功后,而leader節(jié)點(diǎn)是先發(fā)送消息,后保存的wal。

提交(Commit)

Leader節(jié)點(diǎn)在向Follower廣播日志后,就一直在等待follower的MsgAppResp消息,收到后還是會(huì)進(jìn)到stepLeader函數(shù)。

func stepLeader(r *raft, m pb.Message) error {
    ...
    ...
    pr := r.prs.Progress[m.From]
    switch m.Type {
    case pb.MsgAppResp:
        pr.RecentActive = true
        if m.Reject {
            //如果收到的是reject消息,則根據(jù)follower反饋的index重新發(fā)送日志
            if pr.MaybeDecrTo(m.Index, m.RejectHint) {
                if pr.State == tracker.StateReplicate {
                    pr.BecomeProbe()
                }
                r.sendAppend(m.From)
            }
        } else {
            oldPaused := pr.IsPaused()
            //更新緩存的日志同步進(jìn)度
            if pr.MaybeUpdate(m.Index) {
                switch {
                case pr.State == tracker.StateProbe:
                    pr.BecomeReplicate()
                case pr.State == tracker.StateSnapshot && pr.Match >= pr.PendingSnapshot:
                    pr.BecomeProbe()
                    pr.BecomeReplicate()
                case pr.State == tracker.StateReplicate:
                    pr.Inflights.FreeLE(m.Index)
                }
                //如果進(jìn)度有更新,判斷并更新commitIndex
                if r.maybeCommit() {
                    //commitIndex有變化則立即發(fā)送日志
                    r.bcastAppend()
                } else if oldPaused {
                    r.sendAppend(m.From)
                }
                // 循環(huán)發(fā)送所有剩余的日志給follower
                for r.maybeSendAppend(m.From, false) {
                }
                // 是否正在進(jìn)行l(wèi)eader轉(zhuǎn)移
                if m.From == r.leadTransferee && pr.Match == r.raftLog.lastIndex() {
                    r.logger.Infof("%x sent MsgTimeoutNow to %x after received MsgAppResp", r.id, m.From)
                    r.sendTimeoutNow(m.From)
                }
            }
        }
    ...
    ...
    return nil
}

func (r *raft) maybeCommit() bool {
    //獲取最大的超過(guò)半數(shù)確認(rèn)的index
    mci := r.prs.Committed()
    //更新commitIndex
    return r.raftLog.maybeCommit(mci, r.Term)
}

收到Follower的回復(fù)以后,如果是reject的,leader會(huì)根據(jù)返回的index重新發(fā)送日志。如果是成功的消息,則更新緩存中的日志同步進(jìn)度,并判斷超過(guò)半數(shù)確認(rèn)的index是否有變化。有變化則通知raftLog更新commitIndex。到此為止,客戶端的這條數(shù)據(jù)更新命令,就正式提交了。下面就看一下,數(shù)據(jù)是怎樣寫(xiě)到DB中的。

數(shù)據(jù)更新(Apply)

前面已經(jīng)講過(guò),EtcdServer在啟動(dòng)時(shí)會(huì)啟動(dòng)一個(gè)goroutine監(jiān)聽(tīng)raft模塊是否有Ready消息過(guò)來(lái)。當(dāng)上一步的commitIndex發(fā)生變化后,Ready中的HardState就會(huì)有值了。Etcd會(huì)獲取ready結(jié)構(gòu)中的committedEntries,提交給Apply模塊應(yīng)用到后端存儲(chǔ)中。

func (r *raftNode) start(rh *raftReadyHandler) {
    internalTimeout := time.Second
    go func() {
        defer r.onStop()
        islead := false

        for {
            ...
            case rd := <-r.Ready():
                if rd.SoftState != nil {
                    ...
                    ...
                }

                if len(rd.ReadStates) != 0 {
                    ...
                    ...
                }
                // 生成apply請(qǐng)求
                notifyc := make(chan struct{}, 1)
                ap := apply{
                    entries:  rd.CommittedEntries,
                    snapshot: rd.Snapshot,
                    notifyc:  notifyc,
                }
                // 更新etcdServer緩存的commitIndex為最新值
                updateCommittedIndex(&ap, rh)
                // 將已提交日志應(yīng)用到狀態(tài)機(jī)
                select {
                case r.applyc <- ap:
                case <-r.stopped:
                    return
                }

                if islead {
                    // 如果有新的日志條目
                    r.transport.Send(r.processMessages(rd.Messages))
                }

                // 如果有snapshot
                if !raft.IsEmptySnap(rd.Snapshot) {
                    ...
                    ...
                }

                //將hardState和日志條目保存到WAL中
                if err := r.storage.Save(rd.HardState, rd.Entries); err != nil {
                    ...
                    ...
                }
                if !raft.IsEmptyHardState(rd.HardState) {
                    proposalsCommitted.Set(float64(rd.HardState.Commit))
                }

                if !raft.IsEmptySnap(rd.Snapshot) {
                    ...
                    ...
                }

                r.raftStorage.Append(rd.Entries)

                if !islead {
                    ...
                    ...
                } else {
                    notifyc <- struct{}{}
                }
                //更新raft模塊的applied index和將日志從unstable轉(zhuǎn)到stable中
                r.Advance()
            case <-r.stopped:
                return
            }
        }
    }()
}

這里需要注意的是,在將已提交日志條目應(yīng)用到狀態(tài)機(jī)的操作是異步完成的,在Apply完成后,會(huì)將結(jié)果寫(xiě)到客戶端調(diào)用進(jìn)來(lái)時(shí)注冊(cè)的channel中。這樣一次完整的寫(xiě)操作就完成了。

總結(jié)

etcd整個(gè)寫(xiě)操作涉及到raft協(xié)議處理,日志同步,持久化以及更新KV的流程。這里面首先要理解etcd的存儲(chǔ)結(jié)構(gòu),就能對(duì)整個(gè)流程有清晰的理解。

【鏈接】
Raft協(xié)議實(shí)現(xiàn)之etcd(一):基本架構(gòu)
Raft協(xié)議實(shí)現(xiàn)之etcd(二):心跳及選舉

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容