本文是閱讀了Etcd client v3官方代碼后,結(jié)合個人理解做的一些整理,有理解不正確不到位的地方還望指正~
Etcd版本信息
- 代碼庫:https://github.com/etcd-io/etcd
- 版本:v3.3.18
Etcd client實現(xiàn)
概述
- Etcd client v3是基于grpc實現(xiàn)的,而grpc又是基于http2.0實現(xiàn)的。因此整體上借用grpc的框架做地址管理、連接管理、負載均衡等;而底層對每個Etcd的server只需維持一個http2.0連接。
- Etcd client v3包裝了一個grpc的ClientConn,然后實現(xiàn)了Resolver和Balancer來管理它和與它交互。
- Etcd client v3實現(xiàn)了grpc中的Resolver接口,用于Etcd server地址管理。當client初始化或者server集群地址發(fā)生變更(可以配置定時刷新地址)時,Resolver解析出新的連接地址,通知grpc ClientConn來響應(yīng)變更。
- Etcd client v3實現(xiàn)了grpc中的Balancer的接口,用于連接生命的周期管理和負載均衡等功能。當ClientConn或者其代表的SubConn狀態(tài)發(fā)生變更時,會調(diào)用Balancer的接口以通知它。Balancer會生成一個視圖Picker給ClientConn,供其選取可用的SubConn連接。
- 底層的SubConn又包裝了Http2.0的連接ClientTransport。ClientTransport會維持與服務(wù)器的心跳,當連接斷開會向上通知,讓ClientConn通知Balancer去重建連接
- 總之,Balancer和Resolver像是ClientConn的手,但是由用戶去定義它們的具體實現(xiàn);而SubConn則是ClientConn的內(nèi)臟,是其實現(xiàn)功能的基礎(chǔ)。

源碼
Resolver
代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/resolver/resolver.go
- Resolver接口定義了Resolver給ClientConn調(diào)用的方法
// Resolver watches for the updates on the specified target.
// Updates include address updates and service config updates.
type Resolver interface {
// ResolveNow will be called by gRPC to try to resolve the target name
// again. It's just a hint, resolver can ignore this if it's not necessary.
//
// It could be called multiple times concurrently.
ResolveNow(ResolveNowOptions)
// Close closes the resolver.
Close()
}
- ResolveNow:ClientConn在創(chuàng)建新連接前需要調(diào)用它來解析路徑
- Close:把Resolver關(guān)掉,關(guān)閉相關(guān)的goroutine等資源
- ClientConn接口定義了ClientConn給Resolver調(diào)用的方法
// ClientConn contains the callbacks for resolver to notify any updates
// to the gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// UpdateState updates the state of the ClientConn appropriately.
UpdateState(State)
// ReportError notifies the ClientConn that the Resolver encountered an
// error. The ClientConn will notify the load balancer and begin calling
// ResolveNow on the Resolver with exponential backoff.
ReportError(error)
// NewAddress is called by resolver to notify ClientConn a new list
// of resolved addresses.
// The address list should be the complete list of resolved addresses.
//
// Deprecated: Use UpdateState instead.
NewAddress(addresses []Address)
// NewServiceConfig is called by resolver to notify ClientConn a new
// service config. The service config should be provided as a json string.
//
// Deprecated: Use UpdateState instead.
NewServiceConfig(serviceConfig string)
// ParseServiceConfig parses the provided service config and returns an
// object that provides the parsed config.
ParseServiceConfig(serviceConfigJSON string) *serviceconfig.ParseResult
}
- UpdateState:當Resolver發(fā)生狀態(tài)變更(即重新解析產(chǎn)生了新的Address),通知ClientConn去處理
- ReportError:當Resolver發(fā)生異常,通知ClientConn去處理。
- NewAddress:(已廢棄,用UpdateState代替)
- NewServiceConfig:(已廢棄,用UpdateState代替)
- ParseServiceConfig:解析服務(wù)的配置json串
總結(jié): Resolver的作用是提供一個用戶自定義的解析、修改地址的方法,使得用戶可以自己去實現(xiàn)地址解析的邏輯、做服務(wù)發(fā)現(xiàn)、地址更新等等功能。在Etcd Client v3中主要是1. 將127.0.0.1:2379這樣的地址解析成可供底層連接使用的格式;2. 當集群機器發(fā)生變更時,可以修改已經(jīng)配置的地址
Balancer
代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/balancer/balancer.go
- Balancer接口定義了Balancer給ClientConn調(diào)用的方法
// Balancer takes input from gRPC, manages SubConns, and collects and aggregates
// the connectivity states.
//
// It also generates and updates the Picker used by gRPC to pick SubConns for RPCs.
//
// HandleSubConnectionStateChange, HandleResolvedAddrs and Close are guaranteed
// to be called synchronously from the same goroutine.
// There's no guarantee on picker.Pick, it may be called anytime.
type Balancer interface {
// HandleSubConnStateChange is called by gRPC when the connectivity state
// of sc has changed.
// Balancer is expected to aggregate all the state of SubConn and report
// that back to gRPC.
// Balancer should also generate and update Pickers when its internal state has
// been changed by the new state.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateSubConnState will be called instead.
HandleSubConnStateChange(sc SubConn, state connectivity.State)
// HandleResolvedAddrs is called by gRPC to send updated resolved addresses to
// balancers.
// Balancer can create new SubConn or remove SubConn with the addresses.
// An empty address slice and a non-nil error will be passed if the resolver returns
// non-nil error to gRPC.
//
// Deprecated: if V2Balancer is implemented by the Balancer,
// UpdateClientConnState will be called instead.
HandleResolvedAddrs([]resolver.Address, error)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
Close()
}
- HandleSubConnStateChange:當?shù)讓拥腟ubConn連接狀態(tài)發(fā)生變更時,ClientConn會調(diào)用該方法來通知Balancer,以維護內(nèi)部的狀態(tài)
- HandleResolvedAddrs:當Resolver解析出新地址,調(diào)用UpdateState方法時,ClientConn會調(diào)用該方法,從而完成新連接創(chuàng)建等工作
- Close:把Balancer關(guān)掉,關(guān)閉相關(guān)的goroutine等資源
- ClientConn接口定義了ClientConn給Balancer調(diào)用的方法(注意這是balancer.go中的ClientConn接口,與resolver.go中的不同,但最終由同一個結(jié)構(gòu)體來實現(xiàn))
// ClientConn represents a gRPC ClientConn.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type ClientConn interface {
// NewSubConn is called by balancer to create a new SubConn.
// It doesn't block and wait for the connections to be established.
// Behaviors of the SubConn can be controlled by options.
NewSubConn([]resolver.Address, NewSubConnOptions) (SubConn, error)
// RemoveSubConn removes the SubConn from ClientConn.
// The SubConn will be shutdown.
RemoveSubConn(SubConn)
// UpdateBalancerState is called by balancer to notify gRPC that some internal
// state in balancer has changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConn.
//
// Deprecated: use UpdateState instead
UpdateBalancerState(s connectivity.State, p Picker)
// UpdateState notifies gRPC that the balancer's internal state has
// changed.
//
// gRPC will update the connectivity state of the ClientConn, and will call pick
// on the new picker to pick new SubConns.
UpdateState(State)
// ResolveNow is called by balancer to notify gRPC to do a name resolving.
ResolveNow(resolver.ResolveNowOptions)
// Target returns the dial target for this ClientConn.
//
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string
}
- NewSubConn:由Balancer來調(diào)用以產(chǎn)生底層的SubConn連接,連接的過程是非阻塞的
- RemoveSubConn:由Balancer來調(diào)用以移除現(xiàn)有SubConn連接
- UpdateBalancerState:(已廢棄,用UpdateState代替)
- UpdateState:由Balancer來調(diào)用,告訴ClientConn內(nèi)部的狀態(tài)發(fā)生了變更,并且會產(chǎn)生新的Picker
- ResolveNow:由Balancer來調(diào)用,告訴ClientConn去調(diào)Resolver的ResolveNow以重新解析
- Target:(已廢棄)
- Picker接口定義了Picker給ClientConn調(diào)用的方法(Picker其實是Balancer產(chǎn)生的一個快照,包含底層SubConn的當前狀態(tài),每次為ClientConn挑選一個可用的SubConn,保證其可用和負載均衡)
// Picker is used by gRPC to pick a SubConn to send an RPC.
// Balancer is expected to generate a new picker from its snapshot every time its
// internal state has changed.
//
// The pickers used by gRPC can be updated by ClientConn.UpdateBalancerState().
//
// Deprecated: use V2Picker instead
type Picker interface {
// Pick returns the SubConn to be used to send the RPC.
// The returned SubConn must be one returned by NewSubConn().
//
// This functions is expected to return:
// - a SubConn that is known to be READY;
// - ErrNoSubConnAvailable if no SubConn is available, but progress is being
// made (for example, some SubConn is in CONNECTING mode);
// - other errors if no active connecting is happening (for example, all SubConn
// are in TRANSIENT_FAILURE mode).
//
// If a SubConn is returned:
// - If it is READY, gRPC will send the RPC on it;
// - If it is not ready, or becomes not ready after it's returned, gRPC will
// block until UpdateBalancerState() is called and will call pick on the
// new picker. The done function returned from Pick(), if not nil, will be
// called with nil error, no bytes sent and no bytes received.
//
// If the returned error is not nil:
// - If the error is ErrNoSubConnAvailable, gRPC will block until UpdateBalancerState()
// - If the error is ErrTransientFailure or implements IsTransientFailure()
// bool, returning true:
// - If the RPC is wait-for-ready, gRPC will block until UpdateBalancerState()
// is called to pick again;
// - Otherwise, RPC will fail with unavailable error.
// - Else (error is other non-nil error):
// - The RPC will fail with the error's status code, or Unknown if it is
// not a status error.
//
// The returned done() function will be called once the rpc has finished,
// with the final status of that RPC. If the SubConn returned is not a
// valid SubConn type, done may not be called. done may be nil if balancer
// doesn't care about the RPC status.
Pick(ctx context.Context, info PickInfo) (conn SubConn, done func(DoneInfo), err error)
}
- Pick:挑選一個可用的SubConn給ClientConn
- SubConn接口定義了其作為子連接的功能。grpc中一個SubConn對應(yīng)多個Address,gprc會按順序嘗試連接,直到第一次成功連接。實際在Etcd client v3的使用中,一個endpoint地址對應(yīng)一個SubConn。
當連接狀態(tài)為IDLE時,不會連接,直到Balancer顯式調(diào)用Connect,當連接過程發(fā)生異常,會立即重試。如果狀態(tài)恢復(fù)到IDLE,它就不會自動重連了,除非顯式調(diào)用Connect。
// SubConn represents a gRPC sub connection.
// Each sub connection contains a list of addresses. gRPC will
// try to connect to them (in sequence), and stop trying the
// remainder once one connection is successful.
//
// The reconnect backoff will be applied on the list, not a single address.
// For example, try_on_all_addresses -> backoff -> try_on_all_addresses.
//
// All SubConns start in IDLE, and will not try to connect. To trigger
// the connecting, Balancers must call Connect.
// When the connection encounters an error, it will reconnect immediately.
// When the connection becomes IDLE, it will not reconnect unless Connect is
// called.
//
// This interface is to be implemented by gRPC. Users should not need a
// brand new implementation of this interface. For the situations like
// testing, the new implementation should embed this interface. This allows
// gRPC to add new methods to this interface.
type SubConn interface {
// UpdateAddresses updates the addresses used in this SubConn.
// gRPC checks if currently-connected address is still in the new list.
// If it's in the list, the connection will be kept.
// If it's not in the list, the connection will gracefully closed, and
// a new connection will be created.
//
// This will trigger a state transition for the SubConn.
UpdateAddresses([]resolver.Address)
// Connect starts the connecting for this SubConn.
Connect()
}
- UpdateAddresses:更新SubConn的Address列表,如果已有連接在新列表中,則保持;如果沒有則優(yōu)雅關(guān)閉;如果有新的則創(chuàng)建
- Connect:開始連接
總結(jié): Balancer的作用是提供一個用戶自定義的負載均衡、內(nèi)部狀態(tài)管理結(jié)構(gòu)。Etcd client v3中實現(xiàn)了一個round robin的負載均衡器
Etcd client創(chuàng)建流程
New
創(chuàng)建新的Etcd client v3對象的過程主要包含:
- 創(chuàng)建Client結(jié)構(gòu)體 -> 2. 配置參數(shù) -> 3. 配置Resolver -> 4. 配置Balancer -> 5. grpc dial -> 6. 創(chuàng)建子模塊結(jié)構(gòu)體 -> 7. 啟動地址自動更新協(xié)程-> 8. 返回Client結(jié)構(gòu)體
核心代碼
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/client.go
func newClient(cfg *Config) (*Client, error) {
// 一些預(yù)處理
// ……
// 1. 創(chuàng)建Client結(jié)構(gòu)體
ctx, cancel := context.WithCancel(baseCtx)
client := &Client{
conn: nil,
cfg: *cfg,
creds: creds,
ctx: ctx,
cancel: cancel,
mu: new(sync.RWMutex),
callOpts: defaultCallOpts,
}
// 2. 配置參數(shù)
// ……
// 3. 配置Resolver
// 其實Etcd對Resolver還封裝了一層ResolverGroup,可能是為了支持多集群,但一般就一個集群,也就一個Resolver
// Prepare a 'endpoint://<unique-client-id>/' resolver for the client and create a endpoint target to pass
// to dial so the client knows to use this resolver.
client.resolverGroup, err = endpoint.NewResolverGroup(fmt.Sprintf("client-%s", uuid.New().String()))
if err != nil {
client.cancel()
return nil, err
}
client.resolverGroup.SetEndpoints(cfg.Endpoints)
if len(cfg.Endpoints) < 1 {
return nil, fmt.Errorf("at least one Endpoint must is required in client config")
}
dialEndpoint := cfg.Endpoints[0]
// 4. 配置Balancer / 5. grpc dial
// 由于在Etcd cient v3模塊中注冊了一個以roundRobinBalancerName為名的Balancer,所以這里只需要調(diào)用grpc.WithBalancerName,在grpc中會調(diào)用相應(yīng)的Builder去創(chuàng)建已經(jīng)配置好的Balancer。
// dialWithBalancer中調(diào)用了grpc.dialWithContext,從而創(chuàng)建ClientConn以及底層的SubConn
// Use a provided endpoint target so that for https:// without any tls config given, then
// grpc will assume the certificate server name is the endpoint host.
conn, err := client.dialWithBalancer(dialEndpoint, grpc.WithBalancerName(roundRobinBalancerName))
if err != nil {
client.cancel()
client.resolverGroup.Close()
return nil, err
}
// TODO: With the old grpc balancer interface, we waited until the dial timeout
// for the balancer to be ready. Is there an equivalent wait we should do with the new grpc balancer interface?
client.conn = conn
// 6. 創(chuàng)建子模塊結(jié)構(gòu)體
// 封裝一些子模塊,創(chuàng)建結(jié)構(gòu)體
client.Cluster = NewCluster(client)
client.KV = NewKV(client)
client.Lease = NewLease(client)
client.Watcher = NewWatcher(client)
client.Auth = NewAuth(client)
client.Maintenance = NewMaintenance(client)
if cfg.RejectOldCluster {
if err := client.checkVersion(); err != nil {
client.Close()
return nil, err
}
}
// 7. 啟動地址自動更新協(xié)程
// 如果配置了AutoSyncInterval參數(shù),該協(xié)程會自動定時從Etcd服務(wù)器拉取當前集群列表,然后更新地址
// 如果沒配置,協(xié)程會馬上退出
go client.autoSync()
// 8. 返回Client結(jié)構(gòu)體
return client, nil
}
Etcd client調(diào)用流程
從Client接口可以看出,Client組合了一些功能模塊
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/client.go
// Client provides and manages an etcd v3 client session.
type Client struct {
Cluster
KV
Lease
Watcher
Auth
Maintenance
// 其他
// ……
}
- Cluster: 集群模塊,主要是集群服務(wù)器相關(guān),可以通過rpc請求實現(xiàn)對集群節(jié)點的增刪改查,本文不深究。
- KV: 鍵值模塊,核心的key-value增刪查改,是最常用的模塊。
- Lease: 租約模塊,用于管理自動續(xù)租的會話,基于該會話寫入的key-value記錄會被服務(wù)器保持到會話結(jié)束,會話正常結(jié)束或者續(xù)租失效,記錄會被服務(wù)器自動清除。很多功能諸如服務(wù)發(fā)現(xiàn)、分布式鎖等都可以基于該模塊實現(xiàn)。
- Watcher:監(jiān)聽模塊,用于監(jiān)聽記錄的插入、修改和刪除事件。
- Auth: 鑒權(quán)模塊,本文不深究。
- Maintenance:監(jiān)控模塊,本文不深究。
Get/Put/Delete
這部分功能是由KV模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是KVClient
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/etcdserver/etcdserverpb/rpc.pb.go
// KVClient 是KV模塊的底層支撐
// KVClient is the client API for KV service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type KVClient interface {
// Range gets the keys in the range from the key-value store.
Range(ctx context.Context, in *RangeRequest, opts ...grpc.CallOption) (*RangeResponse, error)
// Put puts the given key into the key-value store.
// A put request increments the revision of the key-value store
// and generates one event in the event history.
Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error)
// DeleteRange deletes the given range from the key-value store.
// A delete request increments the revision of the key-value store
// and generates a delete event in the event history for every deleted key.
DeleteRange(ctx context.Context, in *DeleteRangeRequest, opts ...grpc.CallOption) (*DeleteRangeResponse, error)
// Txn processes multiple requests in a single transaction.
// A txn request increments the revision of the key-value store
// and generates events with the same revision for every completed request.
// It is not allowed to modify the same key several times within one txn.
Txn(ctx context.Context, in *TxnRequest, opts ...grpc.CallOption) (*TxnResponse, error)
// Compact compacts the event history in the etcd key-value store. The key-value
// store should be periodically compacted or the event history will continue to grow
// indefinitely.
Compact(ctx context.Context, in *CompactionRequest, opts ...grpc.CallOption) (*CompactionResponse, error)
}
// kVClient結(jié)構(gòu)體實現(xiàn)KVClient接口
type kVClient struct {
// 與ClientConn關(guān)聯(lián)
cc *grpc.ClientConn
}
// 以Put方法為例最終會調(diào)用ClientConn的Invoke方法
func (c *kVClient) Put(ctx context.Context, in *PutRequest, opts ...grpc.CallOption) (*PutResponse, error) {
out := new(PutResponse)
err := c.cc.Invoke(ctx, "/etcdserverpb.KV/Put", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ……
可以看出最后調(diào)用grpc是調(diào)用了ClientConn的Invoke方法;最終會調(diào)用以下方法:
代碼路徑:https://github.com/grpc/grpc-go/blob/v1.26.0/call.go
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
// 創(chuàng)建一個ClientStream,這一步會獲取一個底層的Transport,在其基礎(chǔ)上創(chuàng)建一個Stream作為本次請求的通信流
cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
// 用通信流發(fā)送request數(shù)據(jù)
if err := cs.SendMsg(req); err != nil {
return err
}
// 用通信流接收reply數(shù)據(jù)
return cs.RecvMsg(reply)
}
注意:Get/Put/Delete等方法是同步阻塞的,而且如果在請求過程中發(fā)生網(wǎng)絡(luò)異常,會直接返回錯誤而不會自動重試
Watch
這部分功能是由Watcher模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是watcher
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go
type Watcher interface {
// Watch watches on a key or prefix. The watched events will be returned
// through the returned channel. If revisions waiting to be sent over the
// watch are compacted, then the watch will be canceled by the server, the
// client will post a compacted error watch response, and the channel will close.
// If the context "ctx" is canceled or timed out, returned "WatchChan" is closed,
// and "WatchResponse" from this closed channel has zero events and nil "Err()".
// The context "ctx" MUST be canceled, as soon as watcher is no longer being used,
// to release the associated resources.
//
// If the context is "context.Background/TODO", returned "WatchChan" will
// not be closed and block until event is triggered, except when server
// returns a non-recoverable error (e.g. ErrCompacted).
// For example, when context passed with "WithRequireLeader" and the
// connected server has no leader (e.g. due to network partition),
// error "etcdserver: no leader" (ErrNoLeader) will be returned,
// and then "WatchChan" is closed with non-nil "Err()".
// In order to prevent a watch stream being stuck in a partitioned node,
// make sure to wrap context with "WithRequireLeader".
//
// Otherwise, as long as the context has not been canceled or timed out,
// watch will retry on other recoverable errors forever until reconnected.
//
// TODO: explicitly set context error in the last "WatchResponse" message and close channel?
// Currently, client contexts are overwritten with "valCtx" that never closes.
// TODO(v3.4): configure watch retry policy, limit maximum retry number
// (see https://github.com/etcd-io/etcd/issues/8980)
Watch(ctx context.Context, key string, opts ...OpOption) WatchChan
// RequestProgress requests a progress notify response be sent in all watch channels.
RequestProgress(ctx context.Context) error
// Close closes the watcher and cancels all watch requests.
Close() error
}
// watcher implements the Watcher interface
type watcher struct {
remote pb.WatchClient
callOpts []grpc.CallOption
// mu protects the grpc streams map
mu sync.RWMutex
// streams holds all the active grpc streams keyed by ctx value.
streams map[string]*watchGrpcStream
}
可以看出watcher結(jié)構(gòu)體中包含一個streams map[string]*watchGrpcStream的map,里面保存了所有活躍的watchGrpcStream。watchGrpcStream代表建立在grpc上的一條通信流,每條watchGrpcStream的map上又承載了若干watcherStream。每次Watch請求都會創(chuàng)建一個watcherStream,watcherStream會一直維持,直到人為cancel或者發(fā)生不能恢復(fù)的內(nèi)部異常,如果只是網(wǎng)絡(luò)斷開等可恢復(fù)的異常,會嘗試重建watcherStream。
再看Watch方法,主要分為三個階段
- 1 準備階段,配置參數(shù)、構(gòu)建watchRequest結(jié)構(gòu)體
- 2 獲取watchGrpcStream階段,會復(fù)用或者創(chuàng)建watchGrpcStream
- 3 提交階段,將watchRequest提交給WatcherGrpcStream的reqc,然后等待watchRequest的返回channel: retc來返回WatchChan
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go
// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
// 1. 準備階段,配置參數(shù)、構(gòu)建watchRequest結(jié)構(gòu)體
ow := opWatch(key, opts...)
var filters []pb.WatchCreateRequest_FilterType
if ow.filterPut {
filters = append(filters, pb.WatchCreateRequest_NOPUT)
}
if ow.filterDelete {
filters = append(filters, pb.WatchCreateRequest_NODELETE)
}
wr := &watchRequest{
ctx: ctx,
createdNotify: ow.createdNotify,
key: string(ow.key),
end: string(ow.end),
rev: ow.rev,
progressNotify: ow.progressNotify,
fragment: ow.fragment,
filters: filters,
prevKV: ow.prevKV,
retc: make(chan chan WatchResponse, 1),
}
ok := false
ctxKey := streamKeyFromCtx(ctx)
// 2. 復(fù)用或者創(chuàng)建WatcherGrpcStream
// find or allocate appropriate grpc watch stream
w.mu.Lock()
if w.streams == nil {
// closed
w.mu.Unlock()
ch := make(chan WatchResponse)
close(ch)
return ch
}
wgs := w.streams[ctxKey]
if wgs == nil {
wgs = w.newWatcherGrpcStream(ctx)
w.streams[ctxKey] = wgs
}
// 這里獲取WatcherGrpcStream的reqc和donec兩個channel,分別用來向它提交請求和異常的信號
donec := wgs.donec
reqc := wgs.reqc
w.mu.Unlock()
// couldn't create channel; return closed channel
closeCh := make(chan WatchResponse, 1)
// 3. 將請求提交給WatcherGrpcStream的reqc,然后等待watchRequest的返回channel: retc來返回WatchChan
// submit request
select {
case reqc <- wr:
ok = true
case <-wr.ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
// receive channel
if ok {
select {
case ret := <-wr.retc:
return ret
case <-ctx.Done():
case <-donec:
if wgs.closeErr != nil {
closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr}
break
}
// retry; may have dropped stream from no ctxs
return w.Watch(ctx, key, opts...)
}
}
close(closeCh)
return closeCh
}
其中需要關(guān)注的是wgs = w.newWatcherGrpcStream(ctx)這行代碼,它會創(chuàng)建一個watchGrpcStream
這個方法會創(chuàng)建一個watchGrpcStream,然后起一個goroutine去執(zhí)行它的run方法,主要的邏輯都在這個方法中完成。
它主要包含以下幾個工作:
- 1 一些準備工作
- 2 創(chuàng)建一個grpc的stream,也就是watch client,用以承載每個watch產(chǎn)生的substreams
- 3 進入一個大的等待循環(huán)
- 3.1 等待新的watch請求。 一旦有新的watch,會創(chuàng)建一個substream,然后啟動一個goroutine去實際接收這個substream返回的WatchResponse,最后將請求通過watch client發(fā)送出去
- 3.2 從watch client接收到response事件,會將事件分派給對應(yīng)的substream去處理
- 3.3 如果watch client通信發(fā)生異常(可能是網(wǎng)絡(luò)連不上,或者服務(wù)器掛了),會嘗試重新獲取一個watch client,并重發(fā)請求,這可以保證watch不會因為網(wǎng)絡(luò)抖動等原因而中斷
- 3.4 異?;蚪Y(jié)束時的處理
當然這里面還有很多細節(jié),這里不討論了。
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/watch.go
func (w *watcher) newWatcherGrpcStream(inctx context.Context) *watchGrpcStream {
ctx, cancel := context.WithCancel(&valCtx{inctx})
wgs := &watchGrpcStream{
owner: w,
remote: w.remote,
callOpts: w.callOpts,
ctx: ctx,
ctxKey: streamKeyFromCtx(inctx),
cancel: cancel,
substreams: make(map[int64]*watcherStream),
respc: make(chan *pb.WatchResponse),
reqc: make(chan watchStreamRequest),
donec: make(chan struct{}),
errc: make(chan error, 1),
closingc: make(chan *watcherStream),
resumec: make(chan struct{}),
}
go wgs.run()
return wgs
}
// run is the root of the goroutines for managing a watcher client
func (w *watchGrpcStream) run() {
// 1. 一些準備工作
// ……
// 2. 創(chuàng)建一個grpc的stream,也就是watch client,用以承載每個watch產(chǎn)生的substreams
// start a stream with the etcd grpc server
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
cancelSet := make(map[int64]struct{})
// 3. 進入一個大的等待循環(huán)
var cur *pb.WatchResponse
for {
select {
// 3.1 等待新的watch請求
// 一旦有新的watch,會創(chuàng)建一個substream,然后啟動一個goroutine去實際接收這個substream返回的WatchResponse,最后將請求通過watch client發(fā)送出去
// Watch() requested
case req := <-w.reqc:
switch wreq := req.(type) {
case *watchRequest:
outc := make(chan WatchResponse, 1)
// TODO: pass custom watch ID?
ws := &watcherStream{
initReq: *wreq,
id: -1,
outc: outc,
// unbuffered so resumes won't cause repeat events
recvc: make(chan *WatchResponse),
}
ws.donec = make(chan struct{})
w.wg.Add(1)
// 這里就是創(chuàng)建一個goroutine去實際處理這個substream返回的WatchResponse
go w.serveSubstream(ws, w.resumec)
// queue up for watcher creation/resume
w.resuming = append(w.resuming, ws)
if len(w.resuming) == 1 {
// head of resume queue, can register a new watcher
wc.Send(ws.initReq.toPB())
}
case *progressRequest:
wc.Send(wreq.toPB())
}
// 3.2 從watch client接收到response事件
// 會將事件分派給對應(yīng)的substream去處理
// new events from the watch client
case pbresp := <-w.respc:
if cur == nil || pbresp.Created || pbresp.Canceled {
cur = pbresp
} else if cur != nil && cur.WatchId == pbresp.WatchId {
// merge new events
cur.Events = append(cur.Events, pbresp.Events...)
// update "Fragment" field; last response with "Fragment" == false
cur.Fragment = pbresp.Fragment
}
switch {
case pbresp.Created:
// response to head of queue creation
if ws := w.resuming[0]; ws != nil {
w.addSubstream(pbresp, ws)
w.dispatchEvent(pbresp)
w.resuming[0] = nil
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
// reset for next iteration
cur = nil
case pbresp.Canceled && pbresp.CompactRevision == 0:
delete(cancelSet, pbresp.WatchId)
if ws, ok := w.substreams[pbresp.WatchId]; ok {
// signal to stream goroutine to update closingc
close(ws.recvc)
closing[ws] = struct{}{}
}
// reset for next iteration
cur = nil
case cur.Fragment:
// watch response events are still fragmented
// continue to fetch next fragmented event arrival
continue
default:
// dispatch to appropriate watch stream
ok := w.dispatchEvent(cur)
// reset for next iteration
cur = nil
if ok {
break
}
// watch response on unexpected watch id; cancel id
if _, ok := cancelSet[pbresp.WatchId]; ok {
break
}
cancelSet[pbresp.WatchId] = struct{}{}
cr := &pb.WatchRequest_CancelRequest{
CancelRequest: &pb.WatchCancelRequest{
WatchId: pbresp.WatchId,
},
}
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
// 3.3 如果watch client通信發(fā)生異常(可能是網(wǎng)絡(luò)連不上,或者服務(wù)器掛了),會嘗試重新獲取一個watch client,并重發(fā)請求,這可以保證watch不會因為網(wǎng)絡(luò)抖動等原因而中斷
// watch client failed on Recv; spawn another if possible
case err := <-w.errc:
if isHaltErr(w.ctx, err) || toErr(w.ctx, err) == v3rpc.ErrNoLeader {
closeErr = err
return
}
if wc, closeErr = w.newWatchClient(); closeErr != nil {
return
}
if ws := w.nextResume(); ws != nil {
wc.Send(ws.initReq.toPB())
}
cancelSet = make(map[int64]struct{})
// 3.4 異?;蚪Y(jié)束時的處理
case <-w.ctx.Done():
return
case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
return
}
}
}
}
Lease
這部分功能是由Lease模塊實現(xiàn)的,主要實現(xiàn)的結(jié)構(gòu)體是lessor
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go
type Lease interface {
// Grant creates a new lease.
Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error)
// Revoke revokes the given lease.
Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error)
// TimeToLive retrieves the lease information of the given lease ID.
TimeToLive(ctx context.Context, id LeaseID, opts ...LeaseOption) (*LeaseTimeToLiveResponse, error)
// Leases retrieves all leases.
Leases(ctx context.Context) (*LeaseLeasesResponse, error)
// KeepAlive keeps the given lease alive forever. If the keepalive response
// posted to the channel is not consumed immediately, the lease client will
// continue sending keep alive requests to the etcd server at least every
// second until latest response is consumed.
//
// The returned "LeaseKeepAliveResponse" channel closes if underlying keep
// alive stream is interrupted in some way the client cannot handle itself;
// given context "ctx" is canceled or timed out. "LeaseKeepAliveResponse"
// from this closed channel is nil.
//
// If client keep alive loop halts with an unexpected error (e.g. "etcdserver:
// no leader") or canceled by the caller (e.g. context.Canceled), the error
// is returned. Otherwise, it retries.
//
// TODO(v4.0): post errors to last keep alive message before closing
// (see https://github.com/coreos/etcd/pull/7866)
KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error)
// KeepAliveOnce renews the lease once. The response corresponds to the
// first message from calling KeepAlive. If the response has a recoverable
// error, KeepAliveOnce will retry the RPC with a new keep alive message.
//
// In most of the cases, Keepalive should be used instead of KeepAliveOnce.
KeepAliveOnce(ctx context.Context, id LeaseID) (*LeaseKeepAliveResponse, error)
// Close releases all resources Lease keeps for efficient communication
// with the etcd server.
Close() error
}
type lessor struct {
mu sync.Mutex // guards all fields
// donec is closed and loopErr is set when recvKeepAliveLoop stops
donec chan struct{}
loopErr error
remote pb.LeaseClient
stream pb.Lease_LeaseKeepAliveClient
streamCancel context.CancelFunc
stopCtx context.Context
stopCancel context.CancelFunc
keepAlives map[LeaseID]*keepAlive
// firstKeepAliveTimeout is the timeout for the first keepalive request
// before the actual TTL is known to the lease client
firstKeepAliveTimeout time.Duration
// firstKeepAliveOnce ensures stream starts after first KeepAlive call.
firstKeepAliveOnce sync.Once
callOpts []grpc.CallOption
}
主要說三個方法:
- Grant:通過一次grpc調(diào)用,通知服務(wù)器創(chuàng)建一個帶ttl的lease,返回一個lease id,并將其保存在自己的keepAlives map中
- Revoke:通過一次grpc調(diào)用,通知服務(wù)器銷毀對應(yīng)lease id的lease
- KeepAlive:會創(chuàng)建一個stream,并且創(chuàng)建兩個協(xié)程分別用來發(fā)送keepalive和接收響應(yīng),從而維持這個lease
先看Grant和Revoke
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go
func (l *lessor) Grant(ctx context.Context, ttl int64) (*LeaseGrantResponse, error) {
r := &pb.LeaseGrantRequest{TTL: ttl}
resp, err := l.remote.LeaseGrant(ctx, r, l.callOpts...)
if err == nil {
gresp := &LeaseGrantResponse{
ResponseHeader: resp.GetHeader(),
ID: LeaseID(resp.ID),
TTL: resp.TTL,
Error: resp.Error,
}
return gresp, nil
}
return nil, toErr(ctx, err)
}
func (l *lessor) Revoke(ctx context.Context, id LeaseID) (*LeaseRevokeResponse, error) {
r := &pb.LeaseRevokeRequest{ID: int64(id)}
resp, err := l.remote.LeaseRevoke(ctx, r, l.callOpts...)
if err == nil {
return (*LeaseRevokeResponse)(resp), nil
}
return nil, toErr(ctx, err)
}
其中底層調(diào)用與Get/Put/Delete類似
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/etcdserver/etcdserverpb/rpc.pb.go
func (c *leaseClient) LeaseGrant(ctx context.Context, in *LeaseGrantRequest, opts ...grpc.CallOption) (*LeaseGrantResponse, error) {
out := new(LeaseGrantResponse)
err := c.cc.Invoke(ctx, "/etcdserverpb.Lease/LeaseGrant", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *leaseClient) LeaseRevoke(ctx context.Context, in *LeaseRevokeRequest, opts ...grpc.CallOption) (*LeaseRevokeResponse, error) {
out := new(LeaseRevokeResponse)
err := c.cc.Invoke(ctx, "/etcdserverpb.Lease/LeaseRevoke", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
重點看KeepAlive的實現(xiàn)
- 準備工作
- 創(chuàng)建或復(fù)用LeaseID所對應(yīng)的keepAlive結(jié)構(gòu)體,并將當前的ctx和ch與之關(guān)聯(lián)
- 創(chuàng)建相關(guān)goroutine。 其中最重要的是go l.recvKeepAliveLoop()這行,在內(nèi)部實現(xiàn)了定期與服務(wù)器續(xù)租的邏輯
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go
func (l *lessor) KeepAlive(ctx context.Context, id LeaseID) (<-chan *LeaseKeepAliveResponse, error) {
// 1. 準備工作
ch := make(chan *LeaseKeepAliveResponse, LeaseResponseChSize)
l.mu.Lock()
// ensure that recvKeepAliveLoop is still running
select {
case <-l.donec:
err := l.loopErr
l.mu.Unlock()
close(ch)
return ch, ErrKeepAliveHalted{Reason: err}
default:
}
// 2. 創(chuàng)建或復(fù)用LeaseID所對應(yīng)的keepAlive結(jié)構(gòu)體,并將當前的ctx和ch與之關(guān)聯(lián)
ka, ok := l.keepAlives[id]
if !ok {
// create fresh keep alive
ka = &keepAlive{
chs: []chan<- *LeaseKeepAliveResponse{ch},
ctxs: []context.Context{ctx},
deadline: time.Now().Add(l.firstKeepAliveTimeout),
nextKeepAlive: time.Now(),
donec: make(chan struct{}),
}
l.keepAlives[id] = ka
} else {
// add channel and context to existing keep alive
ka.ctxs = append(ka.ctxs, ctx)
ka.chs = append(ka.chs, ch)
}
l.mu.Unlock()
// 3. 創(chuàng)建相關(guān)goroutine
// 其中最重要的是go l.recvKeepAliveLoop()這行
// 在內(nèi)部實現(xiàn)了定期與服務(wù)器續(xù)租的邏輯
go l.keepAliveCtxCloser(id, ctx, ka.donec)
l.firstKeepAliveOnce.Do(func() {
go l.recvKeepAliveLoop()
go l.deadlineLoop()
})
return ch, nil
}
再來看下recvKeepAliveLoop這個方法:
- 1 準備工作
- 2 進入一個大的循環(huán),實現(xiàn)續(xù)租邏輯。大循環(huán)的目的是當一個stream失效時可以重試
- 2.1 resetRecv中創(chuàng)建一個stream,并在內(nèi)部創(chuàng)建一個goroutine不斷地發(fā)送續(xù)租請求
- 2.2 進入一個小的循環(huán),小循環(huán)會不斷地從前面創(chuàng)建的stream中讀取服務(wù)器的返回消息
- 2.2.1 stream.Recv()會阻塞,直到服務(wù)器返回消息或者產(chǎn)生異常
- 2.2.2 將服務(wù)器返回的消息發(fā)送到與keepAlive相關(guān)聯(lián)的channel中去
- 2.3 如果小循環(huán)退出了,說明stream失效了,延時500ms后重新開始大循環(huán)
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go
func (l *lessor) recvKeepAliveLoop() (gerr error) {
// 1. 準備工作
defer func() {
l.mu.Lock()
close(l.donec)
l.loopErr = gerr
for _, ka := range l.keepAlives {
ka.close()
}
l.keepAlives = make(map[LeaseID]*keepAlive)
l.mu.Unlock()
}()
// 2. 進入一個大的循環(huán),實現(xiàn)續(xù)租邏輯。大循環(huán)的目的是當一個stream失效時可以重試
for {
// 2.1 resetRecv中創(chuàng)建一個stream,并在內(nèi)部創(chuàng)建一個goroutine不斷地發(fā)送續(xù)租請求
stream, err := l.resetRecv()
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
} else {
// 2.2 進入一個小的循環(huán),小循環(huán)會不斷地從前面創(chuàng)建的stream中讀取服務(wù)器的返回消息
for {
// 2.2.1 stream.Recv()會阻塞,直到服務(wù)器返回消息或者產(chǎn)生異常
resp, err := stream.Recv()
// 異常處理
if err != nil {
if canceledByCaller(l.stopCtx, err) {
return err
}
if toErr(l.stopCtx, err) == rpctypes.ErrNoLeader {
l.closeRequireLeader()
}
break
}
// 2.2.2 將服務(wù)器返回的消息發(fā)送到與keepAlive相關(guān)聯(lián)的channel中去
l.recvKeepAlive(resp)
}
}
// 2.3 如果小循環(huán)退出了,說明stream失效了,延時500ms后重新開始大循環(huán)
select {
case <-time.After(retryConnWait):
continue
case <-l.stopCtx.Done():
return l.stopCtx.Err()
}
}
}
再看下resetRecv這個方法:
- 1 調(diào)用remote的LeaseKeepAlive方法,創(chuàng)建一個grpc stream
- 2 啟動goroutine來執(zhí)行sendKeepAliveLoop,不斷發(fā)送續(xù)租請求
- 3 返回該stream
其中sendKeepAliveLoop中每500ms會續(xù)租一次
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/lease.go
// resetRecv opens a new lease stream and starts sending keep alive requests.
func (l *lessor) resetRecv() (pb.Lease_LeaseKeepAliveClient, error) {
// 1 調(diào)用remote的LeaseKeepAlive方法,創(chuàng)建一個grpc stream
sctx, cancel := context.WithCancel(l.stopCtx)
stream, err := l.remote.LeaseKeepAlive(sctx, l.callOpts...)
if err != nil {
cancel()
return nil, err
}
l.mu.Lock()
defer l.mu.Unlock()
if l.stream != nil && l.streamCancel != nil {
l.streamCancel()
}
l.streamCancel = cancel
l.stream = stream
// 2 啟動goroutine來不斷發(fā)送續(xù)租請求
go l.sendKeepAliveLoop(stream)
// 3 返回該stream
return stream, nil
}
// sendKeepAliveLoop sends keep alive requests for the lifetime of the given stream.
func (l *lessor) sendKeepAliveLoop(stream pb.Lease_LeaseKeepAliveClient) {
for {
var tosend []LeaseID
now := time.Now()
l.mu.Lock()
for id, ka := range l.keepAlives {
if ka.nextKeepAlive.Before(now) {
tosend = append(tosend, id)
}
}
l.mu.Unlock()
for _, id := range tosend {
r := &pb.LeaseKeepAliveRequest{ID: int64(id)}
if err := stream.Send(r); err != nil {
// TODO do something with this error?
return
}
}
select {
case <-time.After(500 * time.Millisecond):
case <-stream.Context().Done():
return
case <-l.donec:
return
case <-l.stopCtx.Done():
return
}
}
}
注意:由以上代碼可知:
- 不管Grant時候設(shè)置的ttl是多少,Keepalive永遠都會以500ms左右的間隔發(fā)送續(xù)租請求,加上recvKeepAliveLoop中大循環(huán)中異常重試等待的500ms,可知正常情況下兩次續(xù)租請求的間隔最多在1s左右。
- 除非某個lease被取消或已經(jīng)超時,否則如果出現(xiàn)網(wǎng)絡(luò)抖動,是會自動重試的。
Session
其實Session功能完全是對Lease功能的封裝,從NewSession可見一斑:
只是調(diào)用了Grant和KeepAlive,將信息都封裝在Session結(jié)構(gòu)體中,方便使用
代碼路徑:https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/concurrency/session.go
// NewSession gets the leased session for a client.
func NewSession(client *v3.Client, opts ...SessionOption) (*Session, error) {
ops := &sessionOptions{ttl: defaultSessionTTL, ctx: client.Ctx()}
for _, opt := range opts {
opt(ops)
}
id := ops.leaseID
if id == v3.NoLease {
resp, err := client.Grant(ops.ctx, int64(ops.ttl))
if err != nil {
return nil, err
}
id = v3.LeaseID(resp.ID)
}
ctx, cancel := context.WithCancel(ops.ctx)
keepAlive, err := client.KeepAlive(ctx, id)
if err != nil || keepAlive == nil {
cancel()
return nil, err
}
donec := make(chan struct{})
s := &Session{client: client, opts: ops, id: id, cancel: cancel, donec: donec}
// keep the lease alive until client error or cancelled context
go func() {
defer close(donec)
for range keepAlive {
// eat messages until keep alive channel closes
}
}()
return s, nil
}
Mutex
該模塊實現(xiàn)了分布式鎖的功能。它其實是通過使用Session(也就是Lease),在客戶端實現(xiàn)的分布式鎖,其實服務(wù)器除了支持Lease功能并不需要提供額外的功能。它的大體思路是:
- 在鎖的key prefix下創(chuàng)建一個key,并用Session來不斷續(xù)租它
- key prefix下的所有key按照先后順序有不同的revision,revision最小的那個key將獲得鎖
- revision不是最小的key的持有者將阻塞,直到revision比它小的所有key都被刪除時,它才獲得鎖
首先看Mutex結(jié)構(gòu)體
- s是相關(guān)聯(lián)的Session
- pfx是鎖的前綴
- myKey是當前持有的key
- myRev是當前持有的key的revision
- hdr是最新返回結(jié)果的Header
再看Lock方法
- 1 在pfx下注冊myKey,這里用了事務(wù),也就是在一次grpc請求中完成以下操作:
- 1.1 判斷myKey是否已經(jīng)存在
- 1.2 如果不存在則執(zhí)行put寫入,如果存在則執(zhí)行g(shù)et讀取
- 1.3 執(zhí)行g(shù)etOwner讀取pfx信息
- 2 將put或get的Response中的值的CreateRevision,取出來作為myRev
- 3 判斷一下如果pfx下沒有key,或者最小的CreateRevision和myRev一致,則表示已經(jīng)獲得鎖了
- 4 如果沒獲得鎖,則需要等待pfx前綴下revision比myRev小的key都被刪除,才能拿到key
注意:其實這個方法這樣實現(xiàn)有幾個問題,這可能是Etcd client v3的設(shè)計bug:
- 如果服務(wù)器網(wǎng)絡(luò)異常導(dǎo)致pfx下的所有key的都超時了,或者這些key被外部刪除了,那所有競爭這個鎖的節(jié)點都會認為自己拿到了鎖
- 如果一個client拿到了鎖,但是它的key超時了,它并不會有感知,仍然會認為自己拿著鎖;但與此同時會有另一個client獲取這把鎖
為了解決這個問題,需要人為做兩件事:
- 1 監(jiān)聽一下Mutex關(guān)聯(lián)的Session有沒有失效,如果失效則要手動放棄鎖
- 2 監(jiān)聽一下當前的myKey有沒有被刪除,如果被刪除則也要手動放棄鎖
最后看一下Unlock,它比較簡單,只是把myKey刪掉就行了
https://github.com/etcd-io/etcd/blob/v3.3.18/clientv3/concurrency/mutex.go
// Mutex implements the sync Locker interface with etcd
type Mutex struct {
s *Session
pfx string
myKey string
myRev int64
hdr *pb.ResponseHeader
}
func NewMutex(s *Session, pfx string) *Mutex {
return &Mutex{s, pfx + "/", "", -1, nil}
}
// Lock locks the mutex with a cancelable context. If the context is canceled
// while trying to acquire the lock, the mutex tries to clean its stale lock entry.
func (m *Mutex) Lock(ctx context.Context) error {
s := m.s
client := m.s.Client()
// 1 在pfx下注冊myKey
// 這里用了事務(wù),也就是在一次grpc請求中完成以下操作:
// 1.1 判斷myKey是否已經(jīng)存在
// 1.2 如果不存在則執(zhí)行put寫入,如果存在則執(zhí)行g(shù)et讀取
// 1.3 執(zhí)行g(shù)etOwner讀取pfx信息
m.myKey = fmt.Sprintf("%s%x", m.pfx, s.Lease())
cmp := v3.Compare(v3.CreateRevision(m.myKey), "=", 0)
// put self in lock waiters via myKey; oldest waiter holds lock
put := v3.OpPut(m.myKey, "", v3.WithLease(s.Lease()))
// reuse key in case this session already holds the lock
get := v3.OpGet(m.myKey)
// fetch current holder to complete uncontended path with only one RPC
getOwner := v3.OpGet(m.pfx, v3.WithFirstCreate()...)
resp, err := client.Txn(ctx).If(cmp).Then(put, getOwner).Else(get, getOwner).Commit()
if err != nil {
return err
}
// 2 將put或get的Response中的值的CreateRevision,取出來作為myRev
m.myRev = resp.Header.Revision
if !resp.Succeeded {
m.myRev = resp.Responses[0].GetResponseRange().Kvs[0].CreateRevision
}
// 3 判斷一下如果pfx下沒有key,或者最小的CreateRevision和myRev一致,則表示已經(jīng)獲得鎖了
// if no key on prefix / the minimum rev is key, already hold the lock
ownerKey := resp.Responses[1].GetResponseRange().Kvs
if len(ownerKey) == 0 || ownerKey[0].CreateRevision == m.myRev {
m.hdr = resp.Header
return nil
}
// 4 如果沒獲得鎖,則需要等待pfx前綴下revision比myRev小的key都被刪除,才能拿到key
// wait for deletion revisions prior to myKey
hdr, werr := waitDeletes(ctx, client, m.pfx, m.myRev-1)
// release lock key if wait failed
if werr != nil {
m.Unlock(client.Ctx())
} else {
m.hdr = hdr
}
return werr
}
func (m *Mutex) Unlock(ctx context.Context) error {
client := m.s.Client()
if _, err := client.Delete(ctx, m.myKey); err != nil {
return err
}
m.myKey = "\x00"
m.myRev = -1
return nil
}