Etcd client v3 學(xué)習(xí)筆記

本文是閱讀了Etcd client v3官方代碼后,結(jié)合個人理解做的一些整理,有理解不正確不到位的地方還望指正~

Etcd版本信息

Etcd client實現(xiàn)

概述

  • Etcd client v3是基于grpc實現(xiàn)的,而grpc又是基于http2.0實現(xiàn)的。因此整體上借用grpc的框架做地址管理、連接管理、負載均衡等;而底層對每個Etcd的server只需維持一個http2.0連接。
  • Etcd client v3包裝了一個grpc的ClientConn,然后實現(xiàn)了Resolver和Balancer來管理它和與它交互。
  • Etcd client v3實現(xiàn)了grpc中的Resolver接口,用于Etcd server地址管理。當client初始化或者server集群地址發(fā)生變更(可以配置定時刷新地址)時,Resolver解析出新的連接地址,通知grpc ClientConn來響應(yīng)變更。
  • Etcd client v3實現(xiàn)了grpc中的Balancer的接口,用于連接生命的周期管理和負載均衡等功能。當ClientConn或者其代表的SubConn狀態(tài)發(fā)生變更時,會調(diào)用Balancer的接口以通知它。Balancer會生成一個視圖Picker給ClientConn,供其選取可用的SubConn連接。
  • 底層的SubConn又包裝了Http2.0的連接ClientTransport。ClientTransport會維持與服務(wù)器的心跳,當連接斷開會向上通知,讓ClientConn通知Balancer去重建連接
  • 總之,Balancer和Resolver像是ClientConn的手,但是由用戶去定義它們的具體實現(xiàn);而SubConn則是ClientConn的內(nèi)臟,是其實現(xiàn)功能的基礎(chǔ)。
Etcd client v3結(jié)構(gòu)圖

源碼

Resolver

代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/resolver/resolver.go

  1. Resolver接口定義了Resolver給ClientConn調(diào)用的方法
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
    // ResolveNow will be called by gRPC to try to resolve the target name
    // again. It's just a hint, resolver can ignore this if it's not necessary.
    //
    // It could be called multiple times concurrently.
    ResolveNow(ResolveNowOptions)
    // Close closes the resolver.
    Close()
}
  • ResolveNow:ClientConn在創(chuàng)建新連接前需要調(diào)用它來解析路徑
  • Close:把Resolver關(guān)掉,關(guān)閉相關(guān)的goroutine等資源
  1. ClientConn接口定義了ClientConn給Resolver調(diào)用的方法
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
    // UpdateState updates the state of the ClientConn appropriately.
    UpdateState(State)
    // ReportError notifies the ClientConn that the Resolver encountered an
    // error.  The ClientConn will notify the load balancer and begin calling
    // ResolveNow on the Resolver with exponential backoff.
    ReportError(error)
    // NewAddress is called by resolver to notify ClientConn a new list
    // of resolved addresses.
    // The address list should be the complete list of resolved addresses.
    //
    // Deprecated: Use UpdateState instead.
    NewAddress(addresses []Address)
    // NewServiceConfig is called by resolver to notify ClientConn a new
    // service config. The service config should be provided as a json string.
    //
    // Deprecated: Use UpdateState instead.
    NewServiceConfig(serviceConfig string)
    // ParseServiceConfig parses the provided service config and returns an
    // object that provides the parsed config.
    ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
  • UpdateState:當Resolver發(fā)生狀態(tài)變更(即重新解析產(chǎn)生了新的Address),通知ClientConn去處理
  • ReportError:當Resolver發(fā)生異常,通知ClientConn去處理。
  • NewAddress:(已廢棄,用UpdateState代替)
  • NewServiceConfig:(已廢棄,用UpdateState代替)
  • ParseServiceConfig:解析服務(wù)的配置json串

總結(jié): Resolver的作用是提供一個用戶自定義的解析、修改地址的方法,使得用戶可以自己去實現(xiàn)地址解析的邏輯、做服務(wù)發(fā)現(xiàn)、地址更新等等功能。在Etcd Client v3中主要是1. 將127.0.0.1:2379這樣的地址解析成可供底層連接使用的格式;2. 當集群機器發(fā)生變更時,可以修改已經(jīng)配置的地址

Balancer

代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/balancer/balancer.go

  1. Balancer接口定義了Balancer給ClientConn調(diào)用的方法
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
    // HandleSubConnStateChange is called by gRPC when the connectivity state
    // of sc has changed.
    // Balancer is expected to aggregate all the state of SubConn and report
    // that back to gRPC.
    // Balancer should also generate and update Pickers when its internal state has
    // been changed by the new state.
    //
    // Deprecated: if V2Balancer is implemented by the Balancer,
    // UpdateSubConnState will be called instead.
    HandleSubConnStateChange(sc SubConn, state connectivity.State)
    // HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
    // balancers.
    // Balancer can create new SubConn or remove SubConn with the addresses.
    // An empty address slice and a non-nil error will be passed if the resolver returns
    // non-nil error to gRPC.
    //
    // Deprecated: if V2Balancer is implemented by the Balancer,
    // UpdateClientConnState will be called instead.
    HandleResolvedAddrs([]resolver.Address, error)
    // Close closes the balancer. The balancer is not required to call
    // ClientConn.RemoveSubConn for its existing SubConns.
    Close()
}
  • HandleSubConnStateChange:當?shù)讓拥腟ubConn連接狀態(tài)發(fā)生變更時,ClientConn會調(diào)用該方法來通知Balancer,以維護內(nèi)部的狀態(tài)
  • HandleResolvedAddrs:當Resolver解析出新地址,調(diào)用UpdateState方法時,ClientConn會調(diào)用該方法,從而完成新連接創(chuàng)建等工作
  • Close:把Balancer關(guān)掉,關(guān)閉相關(guān)的goroutine等資源
  1. ClientConn接口定義了ClientConn給Balancer調(diào)用的方法(注意這是balancer.go中的ClientConn接口,與resolver.go中的不同,但最終由同一個結(jié)構(gòu)體來實現(xiàn))
// ClientConn represents a gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
    // NewSubConn is called by balancer to create a new SubConn.
    // It doesn't block and wait for the connections to be established.
    // Behaviors of the SubConn can be controlled by options.
    NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
    // RemoveSubConn removes the SubConn from ClientConn.
    // The SubConn will be shutdown.
    RemoveSubConn(SubConn)

    // UpdateBalancerState is called by balancer to notify gRPC that some internal
    // state in balancer has changed.
    //
    // gRPC will update the connectivity state of the ClientConn, and will call pick
    // on the new picker to pick new SubConn.
    //
    // Deprecated: use UpdateState instead
    UpdateBalancerState(s connectivity.State, p Picker)

    // UpdateState notifies gRPC that the balancer's internal state has
    // changed.
    //
    // gRPC will update the connectivity state of the ClientConn, and will call pick
    // on the new picker to pick new SubConns.
    UpdateState(State)

    // ResolveNow is called by balancer to notify gRPC to do a name resolving.
    ResolveNow(resolver.ResolveNowOptions)

    // Target returns the dial target for this ClientConn.
    //
    // Deprecated: Use the Target field in the BuildOptions instead.
    Target() string
}
  • NewSubConn:由Balancer來調(diào)用以產(chǎn)生底層的SubConn連接,連接的過程是非阻塞的
  • RemoveSubConn:由Balancer來調(diào)用以移除現(xiàn)有SubConn連接
  • UpdateBalancerState:(已廢棄,用UpdateState代替)
  • UpdateState:由Balancer來調(diào)用,告訴ClientConn內(nèi)部的狀態(tài)發(fā)生了變更,并且會產(chǎn)生新的Picker
  • ResolveNow:由Balancer來調(diào)用,告訴ClientConn去調(diào)Resolver的ResolveNow以重新解析
  • Target:(已廢棄)
  1. Picker接口定義了Picker給ClientConn調(diào)用的方法(Picker其實是Balancer產(chǎn)生的一個快照,包含底層SubConn的當前狀態(tài),每次為ClientConn挑選一個可用的SubConn,保證其可用和負載均衡)
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
//
// Deprecated: use V2Picker instead
type Picker interface {
    // Pick returns the SubConn to be used to send the RPC.
    // The returned SubConn must be one returned by NewSubConn().
    //
    // This functions is expected to return:
    // - a SubConn that is known to be READY;
    // - ErrNoSubConnAvailable if no SubConn is available, but progress is being
    //   made (for example, some SubConn is in CONNECTING mode);
    // - other errors if no active connecting is happening (for example, all SubConn
    //   are in TRANSIENT_FAILURE mode).
    //
    // If a SubConn is returned:
    // - If it is READY, gRPC will send the RPC on it;
    // - If it is not ready, or becomes not ready after it's returned, gRPC will
    //   block until UpdateBalancerState() is called and will call pick on the
    //   new picker. The done function returned from Pick(), if not nil, will be
    //   called with nil error, no bytes sent and no bytes received.
    //
    // If the returned error is not nil:
    // - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
    // - If the error is ErrTransientFailure or implements IsTransientFailure()
    //   bool, returning true:
    //   - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
    //     is called to pick again;
    //   - Otherwise, RPC will fail with unavailable error.
    // - Else (error is other non-nil error):
    //   - The RPC will fail with the error's status code, or Unknown if it is
    //     not a status error.
    //
    // The returned done() function will be called once the rpc has finished,
    // with the final status of that RPC.  If the SubConn returned is not a
    // valid SubConn type, done may not be called.  done may be nil if balancer
    // doesn't care about the RPC status.
    Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
}
  • Pick:挑選一個可用的SubConn給ClientConn
  1. SubConn接口定義了其作為子連接的功能。grpc中一個SubConn對應(yīng)多個Address,gprc會按順序嘗試連接,直到第一次成功連接。實際在Etcd client v3的使用中,一個endpoint地址對應(yīng)一個SubConn。
    當連接狀態(tài)為IDLE時,不會連接,直到Balancer顯式調(diào)用Connect,當連接過程發(fā)生異常,會立即重試。如果狀態(tài)恢復(fù)到IDLE,它就不會自動重連了,除非顯式調(diào)用Connect。
// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type SubConn interface {
    // UpdateAddresses updates the addresses used in this SubConn.
    // gRPC checks if currently-connected address is still in the new list.
    // If it's in the list, the connection will be kept.
    // If it's not in the list, the connection will gracefully closed, and
    // a new connection will be created.
    //
    // This will trigger a state transition for the SubConn.
    UpdateAddresses([]resolver.Address)
    // Connect starts the connecting for this SubConn.
    Connect()
}
  • UpdateAddresses:更新SubConn的Address列表,如果已有連接在新列表中,則保持;如果沒有則優(yōu)雅關(guān)閉;如果有新的則創(chuàng)建
  • Connect:開始連接

總結(jié): Balancer的作用是提供一個用戶自定義的負載均衡、內(nèi)部狀態(tài)管理結(jié)構(gòu)。Etcd client v3中實現(xiàn)了一個round robin的負載均衡器

Etcd client創(chuàng)建流程

New

創(chuàng)建新的Etcd client v3對象的過程主要包含:

  1. 創(chuàng)建Client結(jié)構(gòu)體 -> 2. 配置參數(shù) -> 3. 配置Resolver -> 4. 配置Balancer -> 5. grpc dial -> 6. 創(chuàng)建子模塊結(jié)構(gòu)體 -> 7. 啟動地址自動更新協(xié)程-> 8. 返回Client結(jié)構(gòu)體

核心代碼

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/client.go

func newClient(cfg *Config) (*Client, error) {
    // 一些預(yù)處理
    // ……

    // 1. 創(chuàng)建Client結(jié)構(gòu)體
    ctx, cancel := context.WithCancel(baseCtx)
    client := &Client{
        conn:     nil,
        cfg:      *cfg,
        creds:    creds,
        ctx:      ctx,
        cancel:   cancel,
        mu:       new(sync.RWMutex),
        callOpts: defaultCallOpts,
    }
    
    // 2. 配置參數(shù)
    // ……

    // 3. 配置Resolver
    // 其實Etcd對Resolver還封裝了一層ResolverGroup,可能是為了支持多集群,但一般就一個集群,也就一個Resolver
    // Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
    // to dial so the client knows to use this resolver.
    client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
    if err != nil {
        client.cancel()
        return nil, err
    }
    client.resolverGroup.SetEndpoints(cfg.Endpoints)

    if len(cfg.Endpoints) < 1 {
        return nil, fmt.Errorf("at least one Endpoint must is required in client config")
    }
    dialEndpoint := cfg.Endpoints[0]

    // 4. 配置Balancer / 5. grpc dial
    // 由于在Etcd cient v3模塊中注冊了一個以roundRobinBalancerName為名的Balancer,所以這里只需要調(diào)用grpc.WithBalancerName,在grpc中會調(diào)用相應(yīng)的Builder去創(chuàng)建已經(jīng)配置好的Balancer。
    // dialWithBalancer中調(diào)用了grpc.dialWithContext,從而創(chuàng)建ClientConn以及底層的SubConn
    // Use a provided endpoint target so that for https:// without any tls config given, then
    // grpc will assume the certificate server name is the endpoint host.
    conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
    if err != nil {
        client.cancel()
        client.resolverGroup.Close()
        return nil, err
    }
    // TODO: With the old grpc balancer interface, we waited until the dial timeout
    // for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
    client.conn = conn

    // 6. 創(chuàng)建子模塊結(jié)構(gòu)體
    // 封裝一些子模塊,創(chuàng)建結(jié)構(gòu)體
    client.Cluster = NewCluster(client)
    client.KV = NewKV(client)
    client.Lease = NewLease(client)
    client.Watcher = NewWatcher(client)
    client.Auth = NewAuth(client)
    client.Maintenance = NewMaintenance(client)

    if cfg.RejectOldCluster {
        if err := client.checkVersion(); err != nil {
            client.Close()
            return nil, err
        }
    }

    // 7. 啟動地址自動更新協(xié)程
    // 如果配置了AutoSyncInterval參數(shù),該協(xié)程會自動定時從Etcd服務(wù)器拉取當前集群列表,然后更新地址
    // 如果沒配置,協(xié)程會馬上退出
    go client.autoSync()
    
    // 8. 返回Client結(jié)構(gòu)體
    return client, nil
}

Etcd client調(diào)用流程

從Client接口可以看出,Client組合了一些功能模塊

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/client.go

// Client provides and manages an etcd v3 client session.
type Client struct {
    Cluster
    KV
    Lease
    Watcher
    Auth
    Maintenance
    
    // 其他
    // ……
}
  • Cluster: 集群模塊,主要是集群服務(wù)器相關(guān),可以通過rpc請求實現(xiàn)對集群節(jié)點的增刪改查,本文不深究。
  • KV: 鍵值模塊,核心的key-value增刪查改,是最常用的模塊。
  • Lease: 租約模塊,用于管理自動續(xù)租的會話,基于該會話寫入的key-value記錄會被服務(wù)器保持到會話結(jié)束,會話正常結(jié)束或者續(xù)租失效,記錄會被服務(wù)器自動清除。很多功能諸如服務(wù)發(fā)現(xiàn)、分布式鎖等都可以基于該模塊實現(xiàn)。
  • Watcher:監(jiān)聽模塊,用于監(jiān)聽記錄的插入、修改和刪除事件。
  • Auth: 鑒權(quán)模塊,本文不深究。
  • Maintenance:監(jiān)控模塊,本文不深究。

Get/Put/Delete

這部分功能是由KV模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是KVClient

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/etcdserver/etcdserverpb/rpc.pb.go


// KVClient 是KV模塊的底層支撐
// KVClient is the client API for KV service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type KVClient interface {
    // Range gets the keys in the range from the key-value store.
    Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error)
    // Put puts the given key into the key-value store.
    // A put request increments the revision of the key-value store
    // and generates one event in the event history.
    Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error)
    // DeleteRange deletes the given range from the key-value store.
    // A delete request increments the revision of the key-value store
    // and generates a delete event in the event history for every deleted key.
    DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
    // Txn processes multiple requests in a single transaction.
    // A txn request increments the revision of the key-value store
    // and generates events with the same revision for every completed request.
    // It is not allowed to modify the same key several times within one txn.
    Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
    // Compact compacts the event history in the etcd key-value store. The key-value
    // store should be periodically compacted or the event history will continue to grow
    // indefinitely.
    Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
}

// kVClient結(jié)構(gòu)體實現(xiàn)KVClient接口
type kVClient struct {
    // 與ClientConn關(guān)聯(lián)
    cc *grpc.ClientConn
}

// 以Put方法為例最終會調(diào)用ClientConn的Invoke方法
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
    out := new(PutResponse)
    err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

// ……

可以看出最后調(diào)用grpc是調(diào)用了ClientConn的Invoke方法;最終會調(diào)用以下方法:

代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/call.go

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    // 創(chuàng)建一個ClientStream,這一步會獲取一個底層的Transport,在其基礎(chǔ)上創(chuàng)建一個Stream作為本次請求的通信流
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    // 用通信流發(fā)送request數(shù)據(jù)
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    //  用通信流接收reply數(shù)據(jù)
    return cs.RecvMsg(reply)
}

注意:Get/Put/Delete等方法是同步阻塞的,而且如果在請求過程中發(fā)生網(wǎng)絡(luò)異常,會直接返回錯誤而不會自動重試

Watch

這部分功能是由Watcher模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是watcher

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go

type Watcher interface {
    // Watch watches on a key or prefix. The watched events will be returned
    // through the returned channel. If revisions waiting to be sent over the
    // watch are compacted, then the watch will be canceled by the server, the
    // client will post a compacted error watch response, and the channel will close.
    // If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
    // and "WatchResponse" from this closed channel has zero events and nil "Err()".
    // The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
    // to release the associated resources.
    //
    // If the context is "context.Background/TODO", returned "WatchChan" will
    // not be closed and block until event is triggered, except when server
    // returns a non-recoverable error (e.g. ErrCompacted).
    // For example, when context passed with "WithRequireLeader" and the
    // connected server has no leader (e.g. due to network partition),
    // error "etcdserver: no leader" (ErrNoLeader) will be returned,
    // and then "WatchChan" is closed with non-nil "Err()".
    // In order to prevent a watch stream being stuck in a partitioned node,
    // make sure to wrap context with "WithRequireLeader".
    //
    // Otherwise, as long as the context has not been canceled or timed out,
    // watch will retry on other recoverable errors forever until reconnected.
    //
    // TODO: explicitly set context error in the last "WatchResponse" message and close channel?
    // Currently, client contexts are overwritten with "valCtx" that never closes.
    // TODO(v3.4): configure watch retry policy, limit maximum retry number
    // (see https://github.com/etcd-io/etcd/issues/8980)
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

    // RequestProgress requests a progress notify response be sent in all watch channels.
    RequestProgress(ctx context.Context) error

    // Close closes the watcher and cancels all watch requests.
    Close() error
}

// watcher implements the Watcher interface
type watcher struct {
    remote   pb.WatchClient
    callOpts []grpc.CallOption

    // mu protects the grpc streams map
    mu sync.RWMutex

    // streams holds all the active grpc streams keyed by ctx value.
    streams map[string]*watchGrpcStream
}

可以看出watcher結(jié)構(gòu)體中包含一個streams map[string]*watchGrpcStream的map,里面保存了所有活躍的watchGrpcStream。watchGrpcStream代表建立在grpc上的一條通信流,每條watchGrpcStream的map上又承載了若干watcherStream。每次Watch請求都會創(chuàng)建一個watcherStream,watcherStream會一直維持,直到人為cancel或者發(fā)生不能恢復(fù)的內(nèi)部異常,如果只是網(wǎng)絡(luò)斷開等可恢復(fù)的異常,會嘗試重建watcherStream。

再看Watch方法,主要分為三個階段

  • 1 準備階段,配置參數(shù)、構(gòu)建watchRequest結(jié)構(gòu)體
  • 2 獲取watchGrpcStream階段,會復(fù)用或者創(chuàng)建watchGrpcStream
  • 3 提交階段,將watchRequest提交給WatcherGrpcStream的reqc,然后等待watchRequest的返回channel: retc來返回WatchChan

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // 1. 準備階段,配置參數(shù)、構(gòu)建watchRequest結(jié)構(gòu)體
    ow := opWatch(key, opts...)

    var filters []pb.WatchCreateRequest_FilterType
    if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
    }
    if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
    }

    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        fragment:       ow.fragment,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }

    ok := false
    ctxKey := streamKeyFromCtx(ctx)

    // 2. 復(fù)用或者創(chuàng)建WatcherGrpcStream
    // find or allocate appropriate grpc watch stream
    w.mu.Lock()
    if w.streams == nil {
        // closed
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
    }
    wgs := w.streams[ctxKey]
    if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
    }
    // 這里獲取WatcherGrpcStream的reqc和donec兩個channel,分別用來向它提交請求和異常的信號
    donec := wgs.donec
    reqc := wgs.reqc
    w.mu.Unlock()

    // couldn't create channel; return closed channel
    closeCh := make(chan WatchResponse, 1)

    // 3. 將請求提交給WatcherGrpcStream的reqc,然后等待watchRequest的返回channel: retc來返回WatchChan
    // submit request
    select {
    case reqc <- wr:
        ok = true
    case <-wr.ctx.Done():
    case <-donec:
        if wgs.closeErr != nil {
            closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
            break
        }
        // retry; may have dropped stream from no ctxs
        return w.Watch(ctx, key, opts...)
    }

    // receive channel
    if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
                break
            }
            // retry; may have dropped stream from no ctxs
            return w.Watch(ctx, key, opts...)
        }
    }

    close(closeCh)
    return closeCh
}

其中需要關(guān)注的是wgs = w.newWatcherGrpcStream(ctx)這行代碼,它會創(chuàng)建一個watchGrpcStream
這個方法會創(chuàng)建一個watchGrpcStream,然后起一個goroutine去執(zhí)行它的run方法,主要的邏輯都在這個方法中完成。
它主要包含以下幾個工作:

  • 1 一些準備工作
  • 2 創(chuàng)建一個grpc的stream,也就是watch client,用以承載每個watch產(chǎn)生的substreams
  • 3 進入一個大的等待循環(huán)
    • 3.1 等待新的watch請求。 一旦有新的watch,會創(chuàng)建一個substream,然后啟動一個goroutine去實際接收這個substream返回的WatchResponse,最后將請求通過watch client發(fā)送出去
    • 3.2 從watch client接收到response事件,會將事件分派給對應(yīng)的substream去處理
    • 3.3 如果watch client通信發(fā)生異常(可能是網(wǎng)絡(luò)連不上,或者服務(wù)器掛了),會嘗試重新獲取一個watch client,并重發(fā)請求,這可以保證watch不會因為網(wǎng)絡(luò)抖動等原因而中斷
    • 3.4 異?;蚪Y(jié)束時的處理

當然這里面還有很多細節(jié),這里不討論了。

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go

func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
    ctx, cancel := context.WithCancel(&valCtx{inctx})
    wgs := &watchGrpcStream{
        owner:      w,
        remote:     w.remote,
        callOpts:   w.callOpts,
        ctx:        ctx,
        ctxKey:     streamKeyFromCtx(inctx),
        cancel:     cancel,
        substreams: make(map[int64]*watcherStream),
        respc:      make(chan *pb.WatchResponse),
        reqc:       make(chan watchStreamRequest),
        donec:      make(chan struct{}),
        errc:       make(chan error, 1),
        closingc:   make(chan *watcherStream),
        resumec:    make(chan struct{}),
    }
    go wgs.run()
    return wgs
}

// run is the root of the goroutines for managing a watcher client
func (w *watchGrpcStream) run() {
    
    // 1. 一些準備工作
    // ……

    // 2. 創(chuàng)建一個grpc的stream,也就是watch client,用以承載每個watch產(chǎn)生的substreams
    // start a stream with the etcd grpc server
    if wc, closeErr = w.newWatchClient(); closeErr != nil {
        return
    }

    cancelSet := make(map[int64]struct{})

    // 3. 進入一個大的等待循環(huán)
    var cur *pb.WatchResponse
    for {
        select {
        // 3.1 等待新的watch請求
        // 一旦有新的watch,會創(chuàng)建一個substream,然后啟動一個goroutine去實際接收這個substream返回的WatchResponse,最后將請求通過watch client發(fā)送出去
        // Watch() requested
        case req := <-w.reqc:
            switch wreq := req.(type) {
            case *watchRequest:
                outc := make(chan WatchResponse, 1)
                // TODO: pass custom watch ID?
                ws := &watcherStream{
                    initReq: *wreq,
                    id:      -1,
                    outc:    outc,
                    // unbuffered so resumes won't cause repeat events
                    recvc: make(chan *WatchResponse),
                }

                ws.donec = make(chan struct{})
                w.wg.Add(1)
                // 這里就是創(chuàng)建一個goroutine去實際處理這個substream返回的WatchResponse
                go w.serveSubstream(ws, w.resumec)

                // queue up for watcher creation/resume
                w.resuming = append(w.resuming, ws)
                if len(w.resuming) == 1 {
                    // head of resume queue, can register a new watcher
                    wc.Send(ws.initReq.toPB())
                }
            case *progressRequest:
                wc.Send(wreq.toPB())
            }

        // 3.2 從watch client接收到response事件
        // 會將事件分派給對應(yīng)的substream去處理
        // new events from the watch client
        case pbresp := <-w.respc:
            if cur == nil || pbresp.Created || pbresp.Canceled {
                cur = pbresp
            } else if cur != nil && cur.WatchId == pbresp.WatchId {
                // merge new events
                cur.Events = append(cur.Events, pbresp.Events...)
                // update "Fragment" field; last response with "Fragment" == false
                cur.Fragment = pbresp.Fragment
            }

            switch {
            case pbresp.Created:
                // response to head of queue creation
                if ws := w.resuming[0]; ws != nil {
                    w.addSubstream(pbresp, ws)
                    w.dispatchEvent(pbresp)
                    w.resuming[0] = nil
                }

                if ws := w.nextResume(); ws != nil {
                    wc.Send(ws.initReq.toPB())
                }

                // reset for next iteration
                cur = nil

            case pbresp.Canceled && pbresp.CompactRevision == 0:
                delete(cancelSet, pbresp.WatchId)
                if ws, ok := w.substreams[pbresp.WatchId]; ok {
                    // signal to stream goroutine to update closingc
                    close(ws.recvc)
                    closing[ws] = struct{}{}
                }

                // reset for next iteration
                cur = nil

            case cur.Fragment:
                // watch response events are still fragmented
                // continue to fetch next fragmented event arrival
                continue

            default:
                // dispatch to appropriate watch stream
                ok := w.dispatchEvent(cur)

                // reset for next iteration
                cur = nil

                if ok {
                    break
                }

                // watch response on unexpected watch id; cancel id
                if _, ok := cancelSet[pbresp.WatchId]; ok {
                    break
                }

                cancelSet[pbresp.WatchId] = struct{}{}
                cr := &pb.WatchRequest_CancelRequest{
                    CancelRequest: &pb.WatchCancelRequest{
                        WatchId: pbresp.WatchId,
                    },
                }
                req := &pb.WatchRequest{RequestUnion: cr}
                wc.Send(req)
            }

        // 3.3 如果watch client通信發(fā)生異常(可能是網(wǎng)絡(luò)連不上,或者服務(wù)器掛了),會嘗試重新獲取一個watch client,并重發(fā)請求,這可以保證watch不會因為網(wǎng)絡(luò)抖動等原因而中斷
        // watch client failed on Recv; spawn another if possible
        case err := <-w.errc:
            if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
                closeErr = err
                return
            }
            if wc, closeErr = w.newWatchClient(); closeErr != nil {
                return
            }
            if ws := w.nextResume(); ws != nil {
                wc.Send(ws.initReq.toPB())
            }
            cancelSet = make(map[int64]struct{})

        // 3.4 異?;蚪Y(jié)束時的處理
        case <-w.ctx.Done():
            return

        case ws := <-w.closingc:
            w.closeSubstream(ws)
            delete(closing, ws)
            // no more watchers on this stream, shutdown
            if len(w.substreams)+len(w.resuming) == 0 {
                return
            }
        }
    }
}

Lease

這部分功能是由Lease模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是lessor

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go

type Lease interface {
    // Grant creates a new lease.
    Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)

    // Revoke revokes the given lease.
    Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)

    // TimeToLive retrieves the lease information of the given lease ID.
    TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)

    // Leases retrieves all leases.
    Leases(ctx context.Context) (*LeaseLeasesResponse, error)

    // KeepAlive keeps the given lease alive forever. If the keepalive response
    // posted to the channel is not consumed immediately, the lease client will
    // continue sending keep alive requests to the etcd server at least every
    // second until latest response is consumed.
    //
    // The returned "LeaseKeepAliveResponse" channel closes if underlying keep
    // alive stream is interrupted in some way the client cannot handle itself;
    // given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
    // from this closed channel is nil.
    //
    // If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
    // no leader") or canceled by the caller (e.g. context.Canceled), the error
    // is returned. Otherwise, it retries.
    //
    // TODO(v4.0): post errors to last keep alive message before closing
    // (see https://github.com/coreos/etcd/pull/7866)
    KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)

    // KeepAliveOnce renews the lease once. The response corresponds to the
    // first message from calling KeepAlive. If the response has a recoverable
    // error, KeepAliveOnce will retry the RPC with a new keep alive message.
    //
    // In most of the cases, Keepalive should be used instead of KeepAliveOnce.
    KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)

    // Close releases all resources Lease keeps for efficient communication
    // with the etcd server.
    Close() error
}

type lessor struct {
    mu sync.Mutex // guards all fields

    // donec is closed and loopErr is set when recvKeepAliveLoop stops
    donec   chan struct{}
    loopErr error

    remote pb.LeaseClient

    stream       pb.Lease_LeaseKeepAliveClient
    streamCancel context.CancelFunc

    stopCtx    context.Context
    stopCancel context.CancelFunc

    keepAlives map[LeaseID]*keepAlive

    // firstKeepAliveTimeout is the timeout for the first keepalive request
    // before the actual TTL is known to the lease client
    firstKeepAliveTimeout time.Duration

    // firstKeepAliveOnce ensures stream starts after first KeepAlive call.
    firstKeepAliveOnce sync.Once

    callOpts []grpc.CallOption
}

主要說三個方法:

  • Grant:通過一次grpc調(diào)用,通知服務(wù)器創(chuàng)建一個帶ttl的lease,返回一個lease id,并將其保存在自己的keepAlives map中
  • Revoke:通過一次grpc調(diào)用,通知服務(wù)器銷毀對應(yīng)lease id的lease
  • KeepAlive:會創(chuàng)建一個stream,并且創(chuàng)建兩個協(xié)程分別用來發(fā)送keepalive和接收響應(yīng),從而維持這個lease

先看Grant和Revoke

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go

func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
    r := &pb.LeaseGrantRequest{TTL: ttl}
    resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
    if err == nil {
        gresp := &LeaseGrantResponse{
            ResponseHeader: resp.GetHeader(),
            ID:             LeaseID(resp.ID),
            TTL:            resp.TTL,
            Error:          resp.Error,
        }
        return gresp, nil
    }
    return nil, toErr(ctx, err)
}

func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
    r := &pb.LeaseRevokeRequest{ID: int64(id)}
    resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
    if err == nil {
        return (*LeaseRevokeResponse)(resp), nil
    }
    return nil, toErr(ctx, err)
}

其中底層調(diào)用與Get/Put/Delete類似

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/etcdserver/etcdserverpb/rpc.pb.go

func (c *leaseClient) LeaseGrant(ctx context.Context, in *LeaseGrantRequest, opts ...grpc.CallOption) (*LeaseGrantResponse, error) {
    out := new(LeaseGrantResponse)
    err := c.cc.Invoke(ctx, "/etcdserverpb.Lease/LeaseGrant", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

func (c *leaseClient) LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error) {
    out := new(LeaseRevokeResponse)
    err := c.cc.Invoke(ctx, "/etcdserverpb.Lease/LeaseRevoke", in, out, opts...)
    if err != nil {
        return nil, err
    }
    return out, nil
}

重點看KeepAlive的實現(xiàn)

  1. 準備工作
  2. 創(chuàng)建或復(fù)用LeaseID所對應(yīng)的keepAlive結(jié)構(gòu)體,并將當前的ctx和ch與之關(guān)聯(lián)
  3. 創(chuàng)建相關(guān)goroutine。 其中最重要的是go l.recvKeepAliveLoop()這行,在內(nèi)部實現(xiàn)了定期與服務(wù)器續(xù)租的邏輯

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go

func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
    
    // 1. 準備工作
    ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)

    l.mu.Lock()
    // ensure that recvKeepAliveLoop is still running
    select {
    case <-l.donec:
        err := l.loopErr
        l.mu.Unlock()
        close(ch)
        return ch, ErrKeepAliveHalted{Reason: err}
    default:
    }
    
    // 2. 創(chuàng)建或復(fù)用LeaseID所對應(yīng)的keepAlive結(jié)構(gòu)體,并將當前的ctx和ch與之關(guān)聯(lián)
    ka, ok := l.keepAlives[id]
    if !ok {
        // create fresh keep alive
        ka = &keepAlive{
            chs:           []chan<- *LeaseKeepAliveResponse{ch},
            ctxs:          []context.Context{ctx},
            deadline:      time.Now().Add(l.firstKeepAliveTimeout),
            nextKeepAlive: time.Now(),
            donec:         make(chan struct{}),
        }
        l.keepAlives[id] = ka
    } else {
        // add channel and context to existing keep alive
        ka.ctxs = append(ka.ctxs, ctx)
        ka.chs = append(ka.chs, ch)
    }
    l.mu.Unlock()

    // 3. 創(chuàng)建相關(guān)goroutine
    // 其中最重要的是go l.recvKeepAliveLoop()這行
    // 在內(nèi)部實現(xiàn)了定期與服務(wù)器續(xù)租的邏輯
    go l.keepAliveCtxCloser(id, ctx, ka.donec)
    l.firstKeepAliveOnce.Do(func() {
        go l.recvKeepAliveLoop()
        go l.deadlineLoop()
    })

    return ch, nil
}

再來看下recvKeepAliveLoop這個方法:

  • 1 準備工作
  • 2 進入一個大的循環(huán),實現(xiàn)續(xù)租邏輯。大循環(huán)的目的是當一個stream失效時可以重試
    • 2.1 resetRecv中創(chuàng)建一個stream,并在內(nèi)部創(chuàng)建一個goroutine不斷地發(fā)送續(xù)租請求
    • 2.2 進入一個小的循環(huán),小循環(huán)會不斷地從前面創(chuàng)建的stream中讀取服務(wù)器的返回消息
      • 2.2.1 stream.Recv()會阻塞,直到服務(wù)器返回消息或者產(chǎn)生異常
      • 2.2.2 將服務(wù)器返回的消息發(fā)送到與keepAlive相關(guān)聯(lián)的channel中去
    • 2.3 如果小循環(huán)退出了,說明stream失效了,延時500ms后重新開始大循環(huán)

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go

func (l *lessor) recvKeepAliveLoop() (gerr error) {
    // 1. 準備工作
    defer func() {
        l.mu.Lock()
        close(l.donec)
        l.loopErr = gerr
        for _, ka := range l.keepAlives {
            ka.close()
        }
        l.keepAlives = make(map[LeaseID]*keepAlive)
        l.mu.Unlock()
    }()

    // 2. 進入一個大的循環(huán),實現(xiàn)續(xù)租邏輯。大循環(huán)的目的是當一個stream失效時可以重試
    for {
        // 2.1 resetRecv中創(chuàng)建一個stream,并在內(nèi)部創(chuàng)建一個goroutine不斷地發(fā)送續(xù)租請求
        stream, err := l.resetRecv()
        if err != nil {
            if canceledByCaller(l.stopCtx, err) {
                return err
            }
        } else {
        // 2.2 進入一個小的循環(huán),小循環(huán)會不斷地從前面創(chuàng)建的stream中讀取服務(wù)器的返回消息
            for {
                // 2.2.1 stream.Recv()會阻塞,直到服務(wù)器返回消息或者產(chǎn)生異常
                resp, err := stream.Recv()
                // 異常處理
                if err != nil {
                    if canceledByCaller(l.stopCtx, err) {
                        return err
                    }

                    if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
                        l.closeRequireLeader()
                    }
                    break
                }

                // 2.2.2 將服務(wù)器返回的消息發(fā)送到與keepAlive相關(guān)聯(lián)的channel中去
                l.recvKeepAlive(resp)
            }
        }

        // 2.3 如果小循環(huán)退出了,說明stream失效了,延時500ms后重新開始大循環(huán)
        select {
        case <-time.After(retryConnWait):
            continue
        case <-l.stopCtx.Done():
            return l.stopCtx.Err()
        }
    }
}

再看下resetRecv這個方法:

  • 1 調(diào)用remote的LeaseKeepAlive方法,創(chuàng)建一個grpc stream
  • 2 啟動goroutine來執(zhí)行sendKeepAliveLoop,不斷發(fā)送續(xù)租請求
  • 3 返回該stream

其中sendKeepAliveLoop中每500ms會續(xù)租一次

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go

// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
    // 1 調(diào)用remote的LeaseKeepAlive方法,創(chuàng)建一個grpc stream
    sctx, cancel := context.WithCancel(l.stopCtx)
    stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
    if err != nil {
        cancel()
        return nil, err
    }

    l.mu.Lock()
    defer l.mu.Unlock()
    if l.stream != nil && l.streamCancel != nil {
        l.streamCancel()
    }

    l.streamCancel = cancel
    l.stream = stream

    // 2 啟動goroutine來不斷發(fā)送續(xù)租請求
    go l.sendKeepAliveLoop(stream)
    
    // 3 返回該stream
    return stream, nil
}

// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
    for {
        var tosend []LeaseID

        now := time.Now()
        l.mu.Lock()
        for id, ka := range l.keepAlives {
            if ka.nextKeepAlive.Before(now) {
                tosend = append(tosend, id)
            }
        }
        l.mu.Unlock()

        for _, id := range tosend {
            r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
            if err := stream.Send(r); err != nil {
                // TODO do something with this error?
                return
            }
        }

        select {
        case <-time.After(500 * time.Millisecond):
        case <-stream.Context().Done():
            return
        case <-l.donec:
            return
        case <-l.stopCtx.Done():
            return
        }
    }
}

注意:由以上代碼可知:

  • 不管Grant時候設(shè)置的ttl是多少,Keepalive永遠都會以500ms左右的間隔發(fā)送續(xù)租請求,加上recvKeepAliveLoop中大循環(huán)中異常重試等待的500ms,可知正常情況下兩次續(xù)租請求的間隔最多在1s左右。
  • 除非某個lease被取消或已經(jīng)超時,否則如果出現(xiàn)網(wǎng)絡(luò)抖動,是會自動重試的。

Session

其實Session功能完全是對Lease功能的封裝,從NewSession可見一斑:
只是調(diào)用了Grant和KeepAlive,將信息都封裝在Session結(jié)構(gòu)體中,方便使用

代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/concurrency/session.go

// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
    ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
    for _, opt := range opts {
        opt(ops)
    }

    id := ops.leaseID
    if id == v3.NoLease {
        resp, err := client.Grant(ops.ctx, int64(ops.ttl))
        if err != nil {
            return nil, err
        }
        id = v3.LeaseID(resp.ID)
    }

    ctx, cancel := context.WithCancel(ops.ctx)
    keepAlive, err := client.KeepAlive(ctx, id)
    if err != nil || keepAlive == nil {
        cancel()
        return nil, err
    }

    donec := make(chan struct{})
    s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}

    // keep the lease alive until client error or cancelled context
    go func() {
        defer close(donec)
        for range keepAlive {
            // eat messages until keep alive channel closes
        }
    }()

    return s, nil
}

Mutex

該模塊實現(xiàn)了分布式鎖的功能。它其實是通過使用Session(也就是Lease),在客戶端實現(xiàn)的分布式鎖,其實服務(wù)器除了支持Lease功能并不需要提供額外的功能。它的大體思路是:

  • 在鎖的key prefix下創(chuàng)建一個key,并用Session來不斷續(xù)租它
  • key prefix下的所有key按照先后順序有不同的revision,revision最小的那個key將獲得鎖
  • revision不是最小的key的持有者將阻塞,直到revision比它小的所有key都被刪除時,它才獲得鎖

首先看Mutex結(jié)構(gòu)體

  • s是相關(guān)聯(lián)的Session
  • pfx是鎖的前綴
  • myKey是當前持有的key
  • myRev是當前持有的key的revision
  • hdr是最新返回結(jié)果的Header

再看Lock方法

  • 1 在pfx下注冊myKey,這里用了事務(wù),也就是在一次grpc請求中完成以下操作:
    • 1.1 判斷myKey是否已經(jīng)存在
    • 1.2 如果不存在則執(zhí)行put寫入,如果存在則執(zhí)行g(shù)et讀取
    • 1.3 執(zhí)行g(shù)etOwner讀取pfx信息
  • 2 將put或get的Response中的值的CreateRevision,取出來作為myRev
  • 3 判斷一下如果pfx下沒有key,或者最小的CreateRevision和myRev一致,則表示已經(jīng)獲得鎖了
  • 4 如果沒獲得鎖,則需要等待pfx前綴下revision比myRev小的key都被刪除,才能拿到key

注意:其實這個方法這樣實現(xiàn)有幾個問題,這可能是Etcd client v3的設(shè)計bug:

  • 如果服務(wù)器網(wǎng)絡(luò)異常導(dǎo)致pfx下的所有key的都超時了,或者這些key被外部刪除了,那所有競爭這個鎖的節(jié)點都會認為自己拿到了鎖
  • 如果一個client拿到了鎖,但是它的key超時了,它并不會有感知,仍然會認為自己拿著鎖;但與此同時會有另一個client獲取這把鎖

為了解決這個問題,需要人為做兩件事:

  • 1 監(jiān)聽一下Mutex關(guān)聯(lián)的Session有沒有失效,如果失效則要手動放棄鎖
  • 2 監(jiān)聽一下當前的myKey有沒有被刪除,如果被刪除則也要手動放棄鎖

最后看一下Unlock,它比較簡單,只是把myKey刪掉就行了

https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/concurrency/mutex.go


// Mutex implements the sync Locker interface with etcd
type Mutex struct {
    s *Session

    pfx   string
    myKey string
    myRev int64
    hdr   *pb.ResponseHeader
}

func NewMutex(s *Session, pfx string) *Mutex {
    return &Mutex{s, pfx + "/", "", -1, nil}
}

// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
    s := m.s
    client := m.s.Client()

    // 1 在pfx下注冊myKey
    // 這里用了事務(wù),也就是在一次grpc請求中完成以下操作:
    // 1.1 判斷myKey是否已經(jīng)存在
    // 1.2 如果不存在則執(zhí)行put寫入,如果存在則執(zhí)行g(shù)et讀取
    // 1.3 執(zhí)行g(shù)etOwner讀取pfx信息
    m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
    cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
    // put self in lock waiters via myKey; oldest waiter holds lock
    put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
    // reuse key in case this session already holds the lock
    get := v3.OpGet(m.myKey)
    // fetch current holder to complete uncontended path with only one RPC
    getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
    resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
    if err != nil {
        return err
    }
    // 2 將put或get的Response中的值的CreateRevision,取出來作為myRev
    m.myRev = resp.Header.Revision
    if !resp.Succeeded {
        m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
    }
    // 3 判斷一下如果pfx下沒有key,或者最小的CreateRevision和myRev一致,則表示已經(jīng)獲得鎖了
    // if no key on prefix / the minimum rev is key, already hold the lock
    ownerKey := resp.Responses[1].GetResponseRange().Kvs
    if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
        m.hdr = resp.Header
        return nil
    }
    
    // 4 如果沒獲得鎖,則需要等待pfx前綴下revision比myRev小的key都被刪除,才能拿到key
    // wait for deletion revisions prior to myKey
    hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
    // release lock key if wait failed
    if werr != nil {
        m.Unlock(client.Ctx())
    } else {
        m.hdr = hdr
    }
    return werr
}

func (m *Mutex) Unlock(ctx context.Context) error {
    client := m.s.Client()
    if _, err := client.Delete(ctx, m.myKey); err != nil {
        return err
    }
    m.myKey = "\x00"
    m.myRev = -1
    return nil
}
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

友情鏈接更多精彩內(nèi)容