consul agent 分client server
client 一般部署在靠近應(yīng)用的地方,甚至本機(jī)用于對(duì)應(yīng)用的讀寫等(本主題只討論讀寫)做出轉(zhuǎn)發(fā)
server 參加raft選舉,只有l(wèi)eader server負(fù)責(zé)寫,一般也負(fù)責(zé)讀(除非開啟一致性模式的stale)
入口 github.com/hashicorp/consul/main.go
入口
func main() {
os.exit(realMain())
}
真實(shí)入口
func realMain() int {
...
cmds := command.Map(ui)
cli := &cli.CLI{
Args:? ? ? ? args,?
Commands:? ? cmds,
Autocomplete: true,
Name:? ? ? ? "consul",
HelpFunc:? ? cli.FilteredHelpFunc(names, cli.BasicHelpFunc("consul")),
}
exitCode, err := cli.Run()
...
}
命令加載
func Map(ui cli.Ui) map[string]cli.CommandFactory {
m := make(map[string]cli.CommandFactory)
for name, fn := range registry {
thisFn := fn
m[name] = func() (cli.Command, error) {
? return thisFn(ui)
}
}
return m
}
初始化時(shí)候加載命令
func init() {
...
Register("agent", func(ui cli.Ui) (cli.Command, error) {return agent.New(ui, rev, ver, verPre, verHuman, make(chan struct{})), nil...? ? })
agent 命令運(yùn)行
func (c *cmd) Run(args []string) int {
...
code := c.run(args)
...?
}
agent 運(yùn)行
func (c *cmd) run(args []string) int {
...
agent, err := agent.New(config)
...
if err := agent.Start(); err != nil {
...
}
agent 啟動(dòng)
func (a *Agent) Start() error {
...
if c.ServerMode {
...
server, err := consul.NewServerLogger(consulCfg,a.logger,a.tokens)
...
a.delegate = server
...
}else {
...
client, err := consul.NewClientLogger(consulCfg,a.logger)
...
a.delegate = client
...
servers, err := a.listenHTTP()
...
for _, srv := range servers {
if err := a.serveHTTP(srv); err != nil {
return err
}
}
...
}
agent http批量啟動(dòng)
func (a *Agent) serveHTTP(srv *HTTPServer) error {
...
err := srv.Serve(srv.ln)
...
}
agent 初始化http服務(wù)
func (a *Agent) listenHTTP() ([]*HTTPServer, error) {
...
srv.Server.Handler = srv.handler(a.config.EnableDebug)
...
}
注冊(cè)agent 路由
func (s *HTTPServer) handler(enableDebug bool) http.Handler {
...
for pattern, fn := range endpoints {
...
handleFuncMetrics(pattern, s.wrap(bound, methods))
...
...
}
聚合agent所需路由
func init() {
...
[registerEndpoint("/v1/kv/", []string{"GET", "PUT","DELETE"}, (*HTTPServer).KVSEndpoint)]
...? ?
}
agent 注冊(cè)endpoints方法
func registerEndpoint(pattern string, methods []string, fn unboundEndpoint) {
...
endpoints[pattern] = fn
...
}
agent kv 入口
func (s *HTTPServer) KVSEndpoint(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
...
switch req.Method {
case "GET":
if keyList {
return s.KVSGetKeys(resp, req, &args)
}
return s.KVSGet(resp, req, &args)
case "PUT":
return s.KVSPut(resp, req, &args)
case "DELETE":
return s.KVSDelete(resp, req, &args)
default:
return nil, MethodNotAllowedError{req.Method,[]string{"GET", "PUT", "DELETE"}}
}
...
}
agent 真實(shí) kv get
func (s *HTTPServer) KVSGet(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
...
if err := s.agent.RPC(method, &args, &out); err != nil {
return nil, err
}
...
}
agent發(fā)起rpc
func (a *Agent) RPC(method string, args interface{}, reply interface{}) error {
? ? ...
? ? return a.delegate.RPC(method, args, reply)
? ? ...
}
client初始化
func NewClientLogger(config *Config, logger *log.Logger) (*Client, error) {
...
...
}
agent代理(client)發(fā)起rpc
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
...
rpcErr := c.connPool.RPC(c.config.Datacenter,server.Addr,server.Version,method,server.UseTLS,args,reply)
...
}
client 發(fā)起rpc
func (p *ConnPool) RPC(dc string, addr net.Addr, version int, method string, useTLS bool, args interface{}, reply interface{}) error? ? {
...
err = msgpackrpc.CallWithCodec(sc.codec,method,args,reply)
...
}
server 初始化
func NewServerLogger(config *Config, logger *log.Logger, tokens *token.Store) (*Server, error) {
...
if err := s.setupRPC(tlsWrap); err != nil {
...
go s.listen(s.Listener)
...
}
...
if err := s.setupRaft(); err != nil {
s.Shutdown()
return nil, fmt.Errorf("Failed to start Raft: %v", err)
}
...
}
初始化rpc服務(wù)對(duì)應(yīng)的endpoints
func init(){
...
registerEndpoint(func(s *Server) interface{} { return &KVS{s} })
...
}
server 初始化rpc
func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
...
ln, err := net.ListenTCP("tcp", s.config.RPCAddr)
s.Listener = ln
...
}
server 啟動(dòng)rpc服務(wù)
func (s *Server) listen(listener net.Listener) {
...
? go s.handleConn(conn, false)
...
}
server? 處理 rpc
func (s *Server) handleConn(conn net.Conn, isTLS bool) {
...
typ := pool.RPCType(buf[0])
...
switch typ {
case pool.RPCConsul:
s.handleConsulConn(conn)
...
}
server 處理 consulconn
func (s *Server) handleConsulConn(conn net.Conn) {
...
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
...
}
...
}
server? kv get
func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error {
...
如果是leader則返回false,不是則rpc獲取
if done, err := k.srv.forward("KVS.Get", args,args,reply); done {
...
}
...
return k.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, ent, err := state.KVSGet(ws, args.Key)
...
if ent == nil {
if index == 0 {
reply.Index = 1
} else {
reply.Index = index
}
reply.Entries = nil
} else {
reply.Index = ent.ModifyIndex
reply.Entries = structs.DirEntries{ent}
}
return nil
...
}
client kv get 接口
func (s *Server) blockingQuery(queryOpts *structs.QueryOptions, queryMeta *structs.QueryMeta,fn queryFn) error {
...
開啟一致性讀
if queryOpts.RequireConsistent {
if err := s.consistentRead(); err != nil {
return err
}
...
err := fn(ws, state)
...
}
client kv put 接口
func (s *HTTPServer) KVSPut(resp http.ResponseWriter, req *http.Request, args *structs.KeyRequest) (interface{}, error) {
...
if err := s.agent.RPC("KVS.Apply", &applyReq, &out); err != nil {
? ? ...
}
...
}
rpc kv put 接口
func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error {
...
非leader則轉(zhuǎn)發(fā)
if done, err := k.srv.forward("KVS.Apply", args, args, reply); done {
return err
}
...
resp, err := k.srv.raftApply(structs.KVSRequestType,args)
...
}
raft apply
func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {
...
future := s.raft.Apply(buf, enqueueLimit)
if err := future.Error(); err != nil {
return nil, err
}
return future.Response(), nil
...
}
raft apply真實(shí)入口
func (r *Raft) Apply(cmd []byte, timeout time.Duration) ApplyFuture {
...
logFuture := &logFuture{
log: Log{
Type: LogCommand,
Data: cmd,
},
}
logFuture.init()
select {
case <-timer:
return errorFuture{ErrEnqueueTimeout}
case <-r.shutdownCh:
return errorFuture{ErrRaftShutdown}
case r.applyCh <- logFuture:
return logFuture
}
...
}
leaderloop中apply此log
func (r *Raft) leaderLoop() {
...
case <-r.leaderState.commitCh:
...
r.processLogs(idx, commitLog)
...
case newLog := <-r.applyCh:
ready := []*logFuture{newLog}
批量
for i := 0; i < r.conf.MaxAppendEntries; i++ {
select {
case <-r.leaderState.commitCh:
case newLog := <-r.applyCh:
ready = append(ready, newLog)
default:break} }
...
r.dispatchLogs(ready)
...
}
存儲(chǔ)日志,通知日志復(fù)制
func (r *Raft) dispatchLogs(applyLogs []*logFuture) {
...
r.leaderState.inflight.PushBack(applyLog)
...
if err := r.logs.StoreLogs(logs); err != nil {
...
}
...
for _, f := range r.leaderState.replState {
asyncNotifyCh(f.triggerCh)
}
...
...
}
復(fù)制循環(huán)
func (r *Raft) replicate(s *followerReplication) {
...
for !shouldStop {
select {
case <-s.triggerCh:
lastLogIdx, _ := r.getLastLog()
shouldStop = r.replicateTo(s, lastLogIdx)
}
}
...?
}
啟動(dòng)復(fù)制流程
func (r *Raft) replicateTo(s *followerReplication, lastIndex uint64) (shouldStop bool) {
...
if err := r.trans.AppendEntries(s.peer.ID, s.peer.Address, &req, &resp); err != nil {
...
}
if resp.Success {
updateLastAppended(s,&req)
...
}
...}
剛添加日志更新狀態(tài)
func updateLastAppended(s *followerReplication, req *AppendEntriesRequest) {
...
if logs := req.Entries; len(logs) > 0 {
last := logs[len(logs)-1]
s.nextIndex = last.Index + 1
s.commitment.match(s.peer.ID, last.Index)
}
...
}
復(fù)制檢查匹配日志索引
func (c *commitment) match(server ServerID, matchIndex uint64) {
c.Lock()
defer c.Unlock()
if prev, hasVote := c.matchIndexes[server]; hasVote && matchIndex > prev {
c.matchIndexes[server] = matchIndex
c.recalculate()
}
}
檢查是否已經(jīng)達(dá)到多數(shù)派
func (c *commitment) recalculate() {
? ...
? quorumMatchIndex := matched[(len(matched)-1)/2]
if quorumMatchIndex > c.commitIndex && quorumMatchIndex >= c.startIndex {
c.commitIndex = quorumMatchIndex
asyncNotifyCh(c.commitCh)
}
}
批量響應(yīng)的入口
func (r *Raft) processLogs(index uint64, future *logFuture) {
...
r.processLog(&future.log, future)
...
}
響應(yīng)客戶端的key put
...
if future != nil {
future.respond(nil)
}
...