Consul 的一致性讀分析

背景

Consul 作為HashiCorp 出品的分布式注冊中心和配置中心,是cp模型的,即強調(diào)一致性,通過raft協(xié)議實現(xiàn)

一致性

consul 一致性支持三種模式,即要強一致還是,最終一致, 可以交個用戶選擇,這才是一個優(yōu)秀的分布式系統(tǒng)應該具備的,要了解一致性讀,需要先了解consul的三種一致性模式,如下:

  • Default
    用consul 沒有做任何改動的話,大部分都是這個模式工作的,default模式consul考慮了讀的一致性還是很高的,讀寫都是通過leader來處理的,只是一種情況出現(xiàn)腦裂時,可能存在2個leader,在服務,但是老的leader肯定是不能寫的,但是有可能服務讀,讀到過期的數(shù)據(jù),但也不是一直會這樣,leader有個租約,租約到期這個leader就下線了。
  • Consistent
    consistent 即強一致讀,如果是這個模式,consul 每次讀請求都要向集群的超過半數(shù)的server檢查他是不是leader,就比defalut 模式多一次rtt開銷,因為即使你是leader,還要請求server確認是否存在其他的leader,這樣肯定不會讀到過時的數(shù)據(jù)。

  • Stale
    stale是吞吐量最高的模式,但也是一致性最差的模式,所以一致性和吞吐量是矛盾的,因為stale 模式下,consul 集群的任何節(jié)點都能服務讀請求,意外著即使集群沒有l(wèi)eader,還是可以對外提供讀請求。

我們了解了consul支持三種一致性模式,你是不是很好奇,consul是怎么實現(xiàn)的呢,我們平時部署一個consul集群也沒有讓我指定是那一種啥,consul既然是交給用戶來選擇,所以consul通過api的參數(shù)來確定,需要用那種讀一致性。

在哪里指定一致性級別

有聰明的同學就會問,說了這么多,我到底在哪里指定這個一致性級別,別急,下面就開始說

consul 通過http 接口提供服務,就在http的api里可以指定,客戶端sdk就不說了,有很多版本,這里只說consul agent端,因為線上一般都是直接請求localhost:8500 訪問本地的consul agent的。下面是所有consul agent http接口都要執(zhí)行的一個邏輯parseConsistency,就是解析一致性

func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b structs.QueryOptionsCompat) bool {
    query := req.URL.Query()
    //這里默認就認為是default模式。
    defaults := true
    //解析http請求如果帶了stale參數(shù),則是允許讀過期的數(shù)據(jù),那就server不用轉(zhuǎn)發(fā)給leader
    if _, ok := query["stale"]; ok {
        b.SetAllowStale(true)
        defaults = false
    }
    //解析http請求如果帶了consistent參數(shù),代表要讀最新的數(shù)據(jù)。
    if _, ok := query["consistent"]; ok {
        b.SetRequireConsistent(true)
        defaults = false
    }
    //解析http請求如果帶了consistent參數(shù),代表要從leader讀。
    if _, ok := query["leader"]; ok {
        defaults = false
    }
    //解析http請求如果帶了cached參數(shù),代表可以從agent讀,不需要請求server
    if _, ok := query["cached"]; ok {
        b.SetUseCache(true)
        defaults = false
    }
    if maxStale := query.Get("max_stale"); maxStale != "" {
        dur, err := time.ParseDuration(maxStale)
        if err != nil {
            resp.WriteHeader(http.StatusBadRequest)
            fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
            return true
        }
        b.SetMaxStaleDuration(dur)
        if dur.Nanoseconds() > 0 {
            b.SetAllowStale(true)
            defaults = false
        }
    }
...

上面解析了客戶端的讀模式,下面看怎么用的,隨便看一個consul讀的代碼,比如查看健康的service node 的一段代碼:

//如果可以用cache的數(shù)據(jù),則直接從當前agent響應。
    if args.QueryOptions.UseCache {
        raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
        if err != nil {
            return nil, err
        }
        defer setCacheMeta(resp, &m)
        reply, ok := raw.(*structs.IndexedCheckServiceNodes)
        if !ok {
            // This should never happen, but we want to protect against panics
            return nil, fmt.Errorf("internal error: response type not correct")
        }
        out = *reply
    } else {
    //否則需要通過rpc請求server節(jié)點。  
    RETRY_ONCE:
        if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
            return nil, err
        }
        if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
            args.AllowStale = false
            args.MaxStaleDuration = 0
            goto RETRY_ONCE
        }
    }

我們只有指定了cache參數(shù),consul 才會從agent 本地直接響應數(shù)據(jù),這里也可以看出,agent 是會緩存數(shù)據(jù)的,否則就需要請求server節(jié)點,這個時候問題又來了,server節(jié)點一般我們是一個集群,最少3個節(jié)點,那請求那一個呢,有負載均衡嗎,帶著這個問題,我們看下代碼,怎么選server的, 代碼如下:

// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server.  If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list.  If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list.  If there are no
// servers available, return nil.
func (m *Manager) FindServer() *metadata.Server {
    l := m.getServerList()
    numServers := len(l.servers)
    if numServers == 0 {
        m.logger.Warn("No servers available")
        return nil
    }

    // Return whatever is at the front of the list because it is
    // assumed to be the oldest in the server list (unless -
    // hypothetically - the server list was rotated right after a
    // server was added).
    return l.servers[0]
}

consul 這里是不是處理的很簡單,每次都是取第一個,人家注釋也說了,如果這個出現(xiàn)失敗了,會移到最后。

Consul Server的邏輯

consul agent 發(fā)現(xiàn)不用本地cache的數(shù)據(jù),那就要rpc請求server節(jié)點,server節(jié)點接受到任何請求,都會執(zhí)行forward方法,來檢查是否要轉(zhuǎn)發(fā)請求還是就自己響應數(shù)據(jù)。

func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
    var firstCheck time.Time

    // Handle DC forwarding
    // 檢查dc是否一致,不一致就要轉(zhuǎn)發(fā)到正確的dc
    dc := info.RequestDatacenter()
    if dc != s.config.Datacenter {
        // Local tokens only work within the current datacenter. Check to see
        // if we are attempting to forward one to a remote datacenter and strip
        // it, falling back on the anonymous token on the other end.
        if token := info.TokenSecret(); token != "" {
            done, ident, err := s.ResolveIdentityFromToken(token)
            if done {
                if err != nil && !acl.IsErrNotFound(err) {
                    return false, err
                }
                if ident != nil && ident.IsLocal() {
                    // Strip it from the request.
                    info.SetTokenSecret("")
                    defer info.SetTokenSecret(token)
                }
            }
        }

        err := s.forwardDC(method, dc, args, reply)
        return true, err
    }

    // Check if we can allow a stale read, ensure our local DB is initialized
    // 這里server開始檢查讀一致性,如果允許讀過期的數(shù)據(jù),則直接用當前server的數(shù)據(jù)。
    // 不需要后面的檢查是否為leader了。
    if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
        return false, nil
    }

CHECK_LEADER:
    // Fail fast if we are in the process of leaving
    select {
    case <-s.leaveCh:
        return true, structs.ErrNoLeader
    default:
    }

    // Find the leader
    // 到這里就是要default讀或者consistent讀,都需要從leader讀數(shù)據(jù)。
    isLeader, leader := s.getLeader()

    // Handle the case we are the leader
    // 如果當前是leader,則不需要再轉(zhuǎn)發(fā)到leader了。
    if isLeader {
        return false, nil
    }

    // Handle the case of a known leader
    // 不是leader,則需要再轉(zhuǎn)發(fā)到leader節(jié)點,多一次網(wǎng)絡請求。
    rpcErr := structs.ErrNoLeader
    if leader != nil {
        rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
            leader.Version, method, leader.UseTLS, args, reply)
        if rpcErr != nil && canRetry(info, rpcErr) {
            goto RETRY
        }
        return true, rpcErr
    }

有同學看到這里,不是還有consistent 模式?jīng)]有講嗎,這個就不在分析了, 不然文章太長了,沒有人看

總結

一寫就這么多,總算把consul的一致性讀的特性,怎么用的,和背后的原理給說明了,我們默認情況都是default模式,即請求都是需要通過訪問agent,agent再請求server,如果server不是leader,還要轉(zhuǎn)發(fā)到leader節(jié)點。要1次http,2次rpc才能獲取到數(shù)據(jù),所以如果有consul server壓力大的,可以通過cache來緩解server特別是leader的壓力。

最后編輯于
?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

  • 最近一直在做consul的壓測,在開啟acl的模式下,模擬consul client和springcloud co...
    老虎1029閱讀 927評論 0 0
  • 一致性分析 Raft算法解決的不僅僅是分布式集群環(huán)境下的一致性問題,還有一定限度內(nèi)的容錯性(半數(shù)節(jié)點工作正常則服務...
    黃靠譜閱讀 1,069評論 0 0
  • Consul簡介 注冊中心(不僅僅是注冊中心) 一致性協(xié)議采用 Raft 算法,用來保證服務的高可用和一致性,實際...
    黃靠譜閱讀 4,044評論 0 8
  • 本文主要對Consul中的幾個概念進行了歸納和梳理,為方便后續(xù)使用。 1. RPC(遠程過程調(diào)用) RPC是「服務...
    PlayerI閱讀 1,933評論 0 1
  • etcd 是線性一致性讀,而 zk 卻是順序一致性讀,再加上各種共識、強弱一致的名詞,看的時候總會混淆,這篇文檔就...
    徐亞松_v閱讀 5,362評論 2 10

友情鏈接更多精彩內(nèi)容