Consul Raft協(xié)議源碼分析上篇——日志復(fù)制

背景

前面一篇文章我們描述了raft 協(xié)議的實(shí)現(xiàn)數(shù)據(jù)一致性的基礎(chǔ)知識(shí),有了前面的基礎(chǔ)知識(shí)背景,能很好的幫助我們理解consul 基于raft算法的實(shí)現(xiàn),理論指導(dǎo)實(shí)踐,永遠(yuǎn)不過時(shí)。

我們以consul key value 的一個(gè)例子來(lái)理清整個(gè)流程,以寫一個(gè)key value來(lái)看,是我們?nèi)粘i_發(fā)中用的最多的一個(gè)例子,讓我們來(lái)一起看看consul server到底是怎么實(shí)現(xiàn)的,背后的邏輯是什么。

Consul Agent 請(qǐng)求

客戶端發(fā)起一個(gè)put key value的http請(qǐng)求,由kvs_endpoint.go 的KVSEndpoint func 處理,put的方法會(huì)路由給KVSPut 處理,除了一些校驗(yàn)外和請(qǐng)求標(biāo)識(shí),比如是否有獲取鎖acquire或者release,這里提下一個(gè)檢查,就是value的大小檢查,和web 容器一樣檢查防止請(qǐng)求數(shù)據(jù)太大,可以通過參數(shù)kv_max_value_size 控制,如果超過返回狀態(tài)碼413,標(biāo)準(zhǔn)的http 狀態(tài)碼。

檢查都OK后,consul agent就開始請(qǐng)求consul server了,當(dāng)然還是rpc 操作

// Copy the value
buf := bytes.NewBuffer(nil)
// 這里才開始讀請(qǐng)求的數(shù)據(jù)。
if _, err := io.Copy(buf, req.Body); err != nil {
   return nil, err
}
applyReq.DirEnt.Value = buf.Bytes()

// Make the RPC
var out bool
// 開始請(qǐng)求server
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
   return nil, err
}

// Only use the out value if this was a CAS
// 沒有出錯(cuò)的話,這里就成功返回了
if applyReq.Op == api.KVSet {
        return true, nil
}

請(qǐng)求的是consul 下面的kvs_endpoint.go 下面的Apply 方法,所以我們的重點(diǎn)要來(lái)了

Server Apply

consul server的 apply方法,代碼還是show下,這里還有兩個(gè)邏輯說(shuō)明下。

// Apply is used to apply a KVS update request to the data store.
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
   // 檢查機(jī)房dc是否匹配,不是就轉(zhuǎn)發(fā)到對(duì)應(yīng)到dc的server。
   if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
      return err
   }
   // 中間不重要的去了,省得太多...
   // 對(duì)權(quán)限token 應(yīng)用ACL policy
   ok, err := kvsPreApply(k.logger, k.srv, authz, args.Op, &args.DirEnt)
   if err != nil {
      return err
   }
   if !ok {
      *reply = false
      return nil
   }

   // Apply the update.
   // 這里是開啟raft 算法的之旅的入口。
   resp, err := k.srv.raftApply(structs.KVSRequestType, args)
   if err != nil {
      k.logger.Error("Raft apply failed", "error", err)
      return err
   }
   if respErr, ok := resp.(error); ok {
      return respErr
   }

   // Check if the return type is a bool.
   if respBool, ok := resp.(bool); ok {
      *reply = respBool
   }
   return nil
}

在真正開始執(zhí)行raft 算法前,主要做了如下兩件事:

先檢查了dc是否是當(dāng)前dc,如果不是會(huì)路由到正確的dc,這頁(yè)是consul 支持多機(jī)房部署的一個(gè)很好的特性,路由很方便,這也是多機(jī)房部署consul是很好的選擇。
檢查是否啟用了acl策略,如果有,需要檢查,沒有對(duì)應(yīng)的token是不能操作的。
上面2件事都沒有問題后,開始執(zhí)行raft apply操作,我們真正感興趣的就要出來(lái)了,下面讓我們開始盤apply,

經(jīng)過一盤,在真正執(zhí)行raft前,consul還做了一些加工,不能蠻搞,是非常嚴(yán)謹(jǐn)?shù)?,上面通過raftApply,經(jīng)過幾跳后,會(huì)執(zhí)行到raftApplyWithEncoder方法,這里做的工作是很重要的,所以還是拿出來(lái)說(shuō)下,是漲知識(shí)的地方,代碼如下:

// raftApplyWithEncoder is used to encode a message, run it through raft,
// and return the FSM response along with any errors. Unlike raftApply this
// takes the encoder to use as an argument.
func (s *Server) raftApplyWithEncoder(t structs.MessageType, msg interface{}, encoder raftEncoder) (interface{}, error) {
   if encoder == nil {
      return nil, fmt.Errorf("Failed to encode request: nil encoder")
   }
   // 對(duì)請(qǐng)求編碼。
   buf, err := encoder(t, msg)
   if err != nil {
      return nil, fmt.Errorf("Failed to encode request: %v", err)
   }

   // Warn if the command is very large
   if n := len(buf); n > raftWarnSize {
      s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
   }

   var chunked bool
   var future raft.ApplyFuture
   switch {
   case len(buf) <= raft.SuggestedMaxDataSize || t != structs.KVSRequestType:
      //請(qǐng)求的數(shù)據(jù)大小如果小于512 * 1024 即512k,則做一次log執(zhí)行。
      future = s.raft.Apply(buf, enqueueLimit)
   default:
      //超過了512k,則需要分chunk,每個(gè)chunk做為一個(gè)log來(lái)應(yīng)用。
      chunked = true
      //這里就是每個(gè)log一次future。
      future = raftchunking.ChunkingApply(buf, nil, enqueueLimit, s.raft.ApplyLog)
   }

   //阻塞,等待raft協(xié)議完成。
   if err := future.Error(); err != nil {
      return nil, err
   }

   resp := future.Response()

   //...
   return resp, nil
}

這里通過注釋,你也可以看出,主要關(guān)心4件事情:

  1. 把請(qǐng)求編碼,這個(gè)不是我們的重點(diǎn),后面有時(shí)間可以單獨(dú)分析。
  2. 檢查是否要拆包,是否要拆成多個(gè)raft command 來(lái)執(zhí)行,這里有個(gè)參數(shù)控制,SuggestedMaxDataSize consul 默認(rèn)設(shè)置是512k,如果超過這個(gè)則拆,否則可以一次raft 協(xié)議搞定。
  3. 有一個(gè)超時(shí)時(shí)間,默認(rèn)是30秒,后面會(huì)用到。
  4. 最后事阻塞等待完成,是logfuture。

為什么要拆包

這些事raft 算法不會(huì)提的,這個(gè)事工程實(shí)踐才會(huì)有的一些優(yōu)化,此時(shí)你也和我一樣,為啥要做這個(gè)優(yōu)化呢,有什么好處,解決什么問題,這是我們做一個(gè)架構(gòu)師必須要有的思考。

consul的官方就給出了解釋,所以閱讀優(yōu)秀的代碼就是一種享受,看注釋就能知道為啥這樣做,下面是他們對(duì)SuggestedMaxDataSize的注釋:

// Increasing beyond this risks RPC IO taking too long and preventing
// timely heartbeat signals which are sent in serial in current transports,
// potentially causing leadership instability.
SuggestedMaxDataSize = 512 * 1024

理解就是rpc的請(qǐng)求io 不能太大,因?yàn)檫€有非常重要的心跳包,如果發(fā)心跳包出現(xiàn)延遲,就而影響leader的穩(wěn)定,這個(gè)事一個(gè)非常重要的優(yōu)化措施。

說(shuō)完了拆包優(yōu)化邏輯后,我們看下ApplyLog的邏輯,代碼如下:

// ApplyLog performs Apply but takes in a Log directly. The only values
// currently taken from the submitted Log are Data and Extensions.
func (r *Raft) ApplyLog(log Log, timeout time.Duration) ApplyFuture {
   metrics.IncrCounter([]string{"raft", "apply"}, 1)

   var timer <-chan time.Time
   if timeout > 0 {
      timer = time.After(timeout)
   }

   // Create a log future, no index or term yet
   logFuture := &logFuture{
      log: Log{
         Type:       LogCommand,
         Data:       log.Data,
         Extensions: log.Extensions,
      },
   }
   logFuture.init()

   select {
   case <-timer:
      return errorFuture{ErrEnqueueTimeout}
   case <-r.shutdownCh:
      return errorFuture{ErrRaftShutdown}
   case r.applyCh <- logFuture:
      return logFuture
   }
}

這里主要關(guān)心這個(gè)applyCh channel,consul 在初始化leader的時(shí)候給創(chuàng)建的一個(gè)無(wú)緩沖區(qū)的通道,所以如果leader的協(xié)程在干其他的事情,那這個(gè)提交log就阻塞了,時(shí)間最長(zhǎng)30s,寫入成功,就返回了logFuture,也就事前面我們看到future的阻塞。

到這里整個(gè)consul leader server的插入請(qǐng)求從接受到阻塞等待的邏輯就完成了,consul server 有個(gè)核心的go routine 在watch 這個(gè)applyCh,從定義可以看出,是應(yīng)用raft log的channel。

分組提交

consul leader 在初始化完成后,會(huì)啟動(dòng)一個(gè)核心的go routine,執(zhí)行rpc,leader 驗(yàn)證,這個(gè)我們前面分析過,還有一個(gè)最重要的就事raft log應(yīng)用了,代碼如下:

case newLog := <-r.applyCh://這個(gè)是前面我們提交log future的
   if r.getLeadershipTransferInProgress() {
      r.logger.Debug(ErrLeadershipTransferInProgress.Error())
      newLog.respond(ErrLeadershipTransferInProgress)
      continue
   }
   // Group commit, gather all the ready commits
   ready := []*logFuture{newLog}
GROUP_COMMIT_LOOP:
   for i := 0; i < r.conf.MaxAppendEntries; i++ {
      select {
      case newLog := <-r.applyCh:
         ready = append(ready, newLog)
      default:
         break GROUP_COMMIT_LOOP
      }
   }

   // Dispatch the logs
   if stepDown {
      // we're in the process of stepping down as leader, don't process anything new
     //如果發(fā)現(xiàn)我們不是leader了,直接響應(yīng)失敗 
     for i := range ready {
         ready[i].respond(ErrNotLeader)
      }
   } else {
      r.dispatchLogs(ready)
   }

這里的一個(gè)重要的點(diǎn)就是組提交,我們?cè)诨A(chǔ)篇提過,這里就是實(shí)現(xiàn)了,就是讀applyCh的log,這個(gè)里做了組提交的優(yōu)化,最多一次發(fā)送MaxAppendEntries個(gè),默認(rèn)位64個(gè),如果并發(fā)高的情況下,這里是能讀到一個(gè)batch的,或者沒有了,就不等了,這里是不能等的,因?yàn)閞aft算法要保證順序,這里是單線程出來(lái)的,下面就開始dispatch log了,代碼如下:

// dispatchLog is called on the leader to push a log to disk, mark it
// as inflight and begin replication of it.
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
   now := time.Now()
   defer metrics.MeasureSince([]string{"raft", "leader", "dispatchLog"}, now)

   //獲取當(dāng)前l(fā)eader的任期編號(hào),這個(gè)不會(huì)重復(fù)是遞增的,如果有心的leaer了,會(huì)比這個(gè)大。
   term := r.getCurrentTerm()
   //log 編號(hào),寫一個(gè)加1
   lastIndex := r.getLastIndex()

   n := len(applyLogs)
   logs := make([]*Log, n)
   metrics.SetGauge([]string{"raft", "leader", "dispatchNumLogs"}, float32(n))

   //設(shè)置每個(gè)log的編號(hào)和任期
   for idx, applyLog := range applyLogs {
      applyLog.dispatch = now
      lastIndex++
      applyLog.log.Index = lastIndex
      applyLog.log.Term = term
      logs[idx] = &applyLog.log
      r.leaderState.inflight.PushBack(applyLog)
   }

   // Write the log entry locally
   // log先寫入本地持久化,consul大部分的版本底層用的是boltdb,boltdb
   // 是一個(gè)支持事物的數(shù)據(jù)庫(kù),非常方便,這里會(huì)涉及io操作。
   if err := r.logs.StoreLogs(logs); err != nil {
      r.logger.Error("failed to commit logs", "error", err)
      //如果寫失敗,則直接響應(yīng),前面的future阻塞就會(huì)喚醒。
      for _, applyLog := range applyLogs {
         applyLog.respond(err)
      }
      //更新自己為follower
      r.setState(Follower)
      return
   }
   //這里很重要,好就才看明白,這個(gè)是log 復(fù)制成功后,最終應(yīng)用到狀態(tài)機(jī)的一個(gè)機(jī)制
     //這里是記錄下leader自己的結(jié)果,因?yàn)檫^半leader也算一份。
   r.leaderState.commitment.match(r.localID, lastIndex)

   // Update the last log since it's on disk now
   // 更新最新log entry的編號(hào),寫到這里了。
   r.setLastLog(lastIndex, term)

   // Notify the replicators of the new log
   // 開始異步發(fā)送給所有的follower,這個(gè)leader主go routine的活就干完了。
   for _, f := range r.leaderState.replState {
      asyncNotifyCh(f.triggerCh)
   }
}

這個(gè)dispatchlog的邏輯注釋里基本寫清楚了,核心的go routine 經(jīng)過一頓操作后,最主要就是兩點(diǎn):

本地持久化log

記錄自己寫成功,因?yàn)橛?jì)算過半時(shí),leader自己這一份也算在里面,這個(gè)很重要。
又異步交給了replicate go routine來(lái)處理,他就去繼續(xù)去分組提交了,大概率如此循環(huán)往復(fù),不知疲倦的給replication routine 派活。

復(fù)制GoRoutine

replication routine 會(huì)監(jiān)聽triggerCh channel,接受領(lǐng)導(dǎo)的任務(wù),這個(gè)比較簡(jiǎn)單,就開始真正發(fā)給各自的follower了,代碼如下:

case <-s.triggerCh:
   lastLogIdx, _ := r.getLastLog()
   //這個(gè)后面沒有異步了,就是這個(gè)rpc調(diào)用,判斷
   shouldStop = r.replicateTo(s, lastLogIdx)

replicateTo 就是rpc調(diào)研,真正遠(yuǎn)程rpc給follower,等待響應(yīng)。對(duì)于響應(yīng)的結(jié)果怎么處理,怎么真正應(yīng)用到本地,還沒有分析,帶下一篇提交篇,因?yàn)椴迦胝?qǐng)求還wait在哪里呢是不是。

總結(jié)

寫著寫著文章又很長(zhǎng)了,如果你讀到了這里,就給我點(diǎn)個(gè)贊,關(guān)注下,我會(huì)馬不停蹄的開始下一篇。

文本注意從consul leader server接受請(qǐng)求,做一些檢查,token校驗(yàn),分配發(fā)送,然后異步交給了leader的核心goroutine,核心go routine通過分組合并,計(jì)算好log 編號(hào)和任期term。就交給了replication routine,replication routine 把log 先本地持久化,然后異步發(fā)給所有的follower,等待他們的結(jié)果,到底是commit 應(yīng)用到本地狀態(tài)機(jī)怎么實(shí)現(xiàn)的,下面一篇見,歡迎關(guān)注和轉(zhuǎn)發(fā)。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容