背景
Consul 作為HashiCorp 出品的分布式注冊中心和配置中心,是cp模型的,即強調(diào)一致性,通過raft協(xié)議實現(xiàn)
一致性
consul 一致性支持三種模式,即要強一致還是,最終一致, 可以交個用戶選擇,這才是一個優(yōu)秀的分布式系統(tǒng)應該具備的,要了解一致性讀,需要先了解consul的三種一致性模式,如下:
- Default
用consul 沒有做任何改動的話,大部分都是這個模式工作的,default模式consul考慮了讀的一致性還是很高的,讀寫都是通過leader來處理的,只是一種情況出現(xiàn)腦裂時,可能存在2個leader,在服務,但是老的leader肯定是不能寫的,但是有可能服務讀,讀到過期的數(shù)據(jù),但也不是一直會這樣,leader有個租約,租約到期這個leader就下線了。
Consistent
consistent 即強一致讀,如果是這個模式,consul 每次讀請求都要向集群的超過半數(shù)的server檢查他是不是leader,就比defalut 模式多一次rtt開銷,因為即使你是leader,還要請求server確認是否存在其他的leader,這樣肯定不會讀到過時的數(shù)據(jù)。Stale
stale是吞吐量最高的模式,但也是一致性最差的模式,所以一致性和吞吐量是矛盾的,因為stale 模式下,consul 集群的任何節(jié)點都能服務讀請求,意外著即使集群沒有l(wèi)eader,還是可以對外提供讀請求。
我們了解了consul支持三種一致性模式,你是不是很好奇,consul是怎么實現(xiàn)的呢,我們平時部署一個consul集群也沒有讓我指定是那一種啥,consul既然是交給用戶來選擇,所以consul通過api的參數(shù)來確定,需要用那種讀一致性。
在哪里指定一致性級別
有聰明的同學就會問,說了這么多,我到底在哪里指定這個一致性級別,別急,下面就開始說
consul 通過http 接口提供服務,就在http的api里可以指定,客戶端sdk就不說了,有很多版本,這里只說consul agent端,因為線上一般都是直接請求localhost:8500 訪問本地的consul agent的。下面是所有consul agent http接口都要執(zhí)行的一個邏輯parseConsistency,就是解析一致性
func (s *HTTPServer) parseConsistency(resp http.ResponseWriter, req *http.Request, b structs.QueryOptionsCompat) bool {
query := req.URL.Query()
//這里默認就認為是default模式。
defaults := true
//解析http請求如果帶了stale參數(shù),則是允許讀過期的數(shù)據(jù),那就server不用轉(zhuǎn)發(fā)給leader
if _, ok := query["stale"]; ok {
b.SetAllowStale(true)
defaults = false
}
//解析http請求如果帶了consistent參數(shù),代表要讀最新的數(shù)據(jù)。
if _, ok := query["consistent"]; ok {
b.SetRequireConsistent(true)
defaults = false
}
//解析http請求如果帶了consistent參數(shù),代表要從leader讀。
if _, ok := query["leader"]; ok {
defaults = false
}
//解析http請求如果帶了cached參數(shù),代表可以從agent讀,不需要請求server
if _, ok := query["cached"]; ok {
b.SetUseCache(true)
defaults = false
}
if maxStale := query.Get("max_stale"); maxStale != "" {
dur, err := time.ParseDuration(maxStale)
if err != nil {
resp.WriteHeader(http.StatusBadRequest)
fmt.Fprintf(resp, "Invalid max_stale value %q", maxStale)
return true
}
b.SetMaxStaleDuration(dur)
if dur.Nanoseconds() > 0 {
b.SetAllowStale(true)
defaults = false
}
}
...
上面解析了客戶端的讀模式,下面看怎么用的,隨便看一個consul讀的代碼,比如查看健康的service node 的一段代碼:
//如果可以用cache的數(shù)據(jù),則直接從當前agent響應。
if args.QueryOptions.UseCache {
raw, m, err := s.agent.cache.Get(cachetype.HealthServicesName, &args)
if err != nil {
return nil, err
}
defer setCacheMeta(resp, &m)
reply, ok := raw.(*structs.IndexedCheckServiceNodes)
if !ok {
// This should never happen, but we want to protect against panics
return nil, fmt.Errorf("internal error: response type not correct")
}
out = *reply
} else {
//否則需要通過rpc請求server節(jié)點。
RETRY_ONCE:
if err := s.agent.RPC("Health.ServiceNodes", &args, &out); err != nil {
return nil, err
}
if args.QueryOptions.AllowStale && args.MaxStaleDuration > 0 && args.MaxStaleDuration < out.LastContact {
args.AllowStale = false
args.MaxStaleDuration = 0
goto RETRY_ONCE
}
}
我們只有指定了cache參數(shù),consul 才會從agent 本地直接響應數(shù)據(jù),這里也可以看出,agent 是會緩存數(shù)據(jù)的,否則就需要請求server節(jié)點,這個時候問題又來了,server節(jié)點一般我們是一個集群,最少3個節(jié)點,那請求那一個呢,有負載均衡嗎,帶著這個問題,我們看下代碼,怎么選server的, 代碼如下:
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on Serf to detect this and remove the node from the
// server list. If the server at the front of the list has failed or fails
// during an RPC call, it is rotated to the end of the list. If there are no
// servers available, return nil.
func (m *Manager) FindServer() *metadata.Server {
l := m.getServerList()
numServers := len(l.servers)
if numServers == 0 {
m.logger.Warn("No servers available")
return nil
}
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return l.servers[0]
}
consul 這里是不是處理的很簡單,每次都是取第一個,人家注釋也說了,如果這個出現(xiàn)失敗了,會移到最后。
Consul Server的邏輯
consul agent 發(fā)現(xiàn)不用本地cache的數(shù)據(jù),那就要rpc請求server節(jié)點,server節(jié)點接受到任何請求,都會執(zhí)行forward方法,來檢查是否要轉(zhuǎn)發(fā)請求還是就自己響應數(shù)據(jù)。
func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) {
var firstCheck time.Time
// Handle DC forwarding
// 檢查dc是否一致,不一致就要轉(zhuǎn)發(fā)到正確的dc
dc := info.RequestDatacenter()
if dc != s.config.Datacenter {
// Local tokens only work within the current datacenter. Check to see
// if we are attempting to forward one to a remote datacenter and strip
// it, falling back on the anonymous token on the other end.
if token := info.TokenSecret(); token != "" {
done, ident, err := s.ResolveIdentityFromToken(token)
if done {
if err != nil && !acl.IsErrNotFound(err) {
return false, err
}
if ident != nil && ident.IsLocal() {
// Strip it from the request.
info.SetTokenSecret("")
defer info.SetTokenSecret(token)
}
}
}
err := s.forwardDC(method, dc, args, reply)
return true, err
}
// Check if we can allow a stale read, ensure our local DB is initialized
// 這里server開始檢查讀一致性,如果允許讀過期的數(shù)據(jù),則直接用當前server的數(shù)據(jù)。
// 不需要后面的檢查是否為leader了。
if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() {
return false, nil
}
CHECK_LEADER:
// Fail fast if we are in the process of leaving
select {
case <-s.leaveCh:
return true, structs.ErrNoLeader
default:
}
// Find the leader
// 到這里就是要default讀或者consistent讀,都需要從leader讀數(shù)據(jù)。
isLeader, leader := s.getLeader()
// Handle the case we are the leader
// 如果當前是leader,則不需要再轉(zhuǎn)發(fā)到leader了。
if isLeader {
return false, nil
}
// Handle the case of a known leader
// 不是leader,則需要再轉(zhuǎn)發(fā)到leader節(jié)點,多一次網(wǎng)絡請求。
rpcErr := structs.ErrNoLeader
if leader != nil {
rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr,
leader.Version, method, leader.UseTLS, args, reply)
if rpcErr != nil && canRetry(info, rpcErr) {
goto RETRY
}
return true, rpcErr
}
有同學看到這里,不是還有consistent 模式?jīng)]有講嗎,這個就不在分析了, 不然文章太長了,沒有人看
總結
一寫就這么多,總算把consul的一致性讀的特性,怎么用的,和背后的原理給說明了,我們默認情況都是default模式,即請求都是需要通過訪問agent,agent再請求server,如果server不是leader,還要轉(zhuǎn)發(fā)到leader節(jié)點。要1次http,2次rpc才能獲取到數(shù)據(jù),所以如果有consul server壓力大的,可以通過cache來緩解server特別是leader的壓力。