Etcd watch源碼閱讀

公司的業(yè)務(wù)里面使用了Consul做服務(wù)發(fā)現(xiàn), 發(fā)現(xiàn)其有一個watch機制.這個watch機制引起我的好奇, 因為剛好在看Etcd-raft的代碼, Etcd也有類似的watch機制, 所以趁熱打鐵, 趕緊趁周末研究下etcd watch機制源碼的實現(xiàn).

在看源碼之前, 我們通過一個簡單的例子, 看看Etcd的watch是如何使用的.

  1. 先往Etcd寫入一對KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value="神蛋使者"

  1. Watch這對KV

curl http://127.0.0.1:2379/v2/keys/name?wait=true

如果一切正常, 這時候請求會被阻塞住.

  1. 新開一個終端, 修改存進(jìn)去的KV

curl http://127.0.0.1:2379/v2/keys/name -XPUT -d value=神蛋使者1號

  1. 阻塞的那個請求返回watch到的結(jié)果
{
  "action":"set",
  "node":{ 
      "key":"/name",
      "value":"神蛋使者1號",
      "modifiedIndex":25,
     "createdIndex":25
  },
   "prevNode": {
     "key":"/name",
     "value":"神蛋使者",
     "modifiedIndex":24,
     "createdIndex":24
   }
  }

體驗流程大概就是這樣, 下面正式看源碼.

接口定義

type Watcher interface {
    // Watch watches on a key or prefix. The watched events will be returned
    // through the returned channel.
    // If the watch is slow or the required rev is compacted, the watch request
    // might be canceled from the server-side and the chan will be closed.
    // 'opts' can be: 'WithRev' and/or 'WithPrefix'.
    Watch(ctx context.Context, key string, opts ...OpOption) WatchChan

    // Close closes the watcher and cancels all watch requests.
    Close() error
}

該接口定義了兩個方法, Watch 和 Close

Watch 方法返回一個WatchChan 類似的變量, WatchChan是一個channel, 定義如下:

type WatchChan <-chan WatchResponse

該通道傳遞WatchResponse類型

type WatchResponse struct {
    Header pb.ResponseHeader
    Events []*Event

    // CompactRevision is the minimum revision the watcher may receive.
    CompactRevision int64

    // Canceled is used to indicate watch failure.
    // If the watch failed and the stream was about to close, before the channel is closed,
    // the channel sends a final response that has Canceled set to true with a non-nil Err().
    Canceled bool

    // Created is used to indicate the creation of the watcher.
    Created bool

    closeErr error
}

其中Event類型是一個gRPC生成的消息對象

type Event struct {
    // type is the kind of event. If type is a PUT, it indicates
    // new data has been stored to the key. If type is a DELETE,
    // it indicates the key was deleted.
    Type Event_EventType `protobuf:"varint,1,opt,name=type,proto3,enum=mvccpb.Event_EventType" json:"type,omitempty"`
    // kv holds the KeyValue for the event.
    // A PUT event contains current kv pair.
    // A PUT event with kv.Version=1 indicates the creation of a key.
    // A DELETE/EXPIRE event contains the deleted key with
    // its modification revision set to the revision of deletion.
    Kv *KeyValue `protobuf:"bytes,2,opt,name=kv" json:"kv,omitempty"`
    // prev_kv holds the key-value pair before the event happens.
    PrevKv *KeyValue `protobuf:"bytes,3,opt,name=prev_kv,json=prevKv" json:"prev_kv,omitempty"`
}

接下來看實現(xiàn)了Watcher接口的watcher類型

// watcher implements the Watcher interface
type watcher struct {
    remote pb.WatchClient

    // mu protects the grpc streams map
    mu sync.RWMutex

    // streams holds all the active grpc streams keyed by ctx value.
    streams map[string]*watchGrpcStream
}

watcher結(jié)構(gòu)很簡單, 只有3個字段. remote抽象了發(fā)起watch請求的客戶端, streams是一個map, 這個map映射了交互的數(shù)據(jù)流.還有一個保護(hù)并發(fā)環(huán)境下數(shù)據(jù)流讀寫安全的讀寫鎖.

streams所屬的watchGrpcStream類型抽象了所有交互的數(shù)據(jù), 它的結(jié)構(gòu)定義如下:

type watchGrpcStream struct {
    owner  *watcher
    remote pb.WatchClient

    // ctx controls internal remote.Watch requests
    ctx context.Context
    // ctxKey is the key used when looking up this stream's context
    ctxKey string
    cancel context.CancelFunc

    // substreams holds all active watchers on this grpc stream
    substreams map[int64]*watcherStream
    // resuming holds all resuming watchers on this grpc stream
    resuming []*watcherStream

    // reqc sends a watch request from Watch() to the main goroutine
    reqc chan *watchRequest
    // respc receives data from the watch client
    respc chan *pb.WatchResponse
    // donec closes to broadcast shutdown
    donec chan struct{}
    // errc transmits errors from grpc Recv to the watch stream reconn logic
    errc chan error
    // closingc gets the watcherStream of closing watchers
    closingc chan *watcherStream
    // wg is Done when all substream goroutines have exited
    wg sync.WaitGroup

    // resumec closes to signal that all substreams should begin resuming
    resumec chan struct{}
    // closeErr is the error that closed the watch stream
    closeErr error
}

比較有意思的是, watchGrpcStream也包含了一個watcher類型的owner字段, watcher和watchGrpcStream可以互相引用到對方.同時又定義了watcher類型中已經(jīng)定義過的remote,而且還不是指針類型, 這點不大明白作用是啥.

還有幾個字段值得關(guān)注, 一個是substreams, 看下它的定義和注釋:

// substreams holds all active watchers on this grpc stream
substreams map[int64]*watcherStream

再看看watcherStream類型的定義:

// watcherStream represents a registered watcher
type watcherStream struct {
    // initReq is the request that initiated this request
    initReq watchRequest

    // outc publishes watch responses to subscriber
    outc chan WatchResponse
    // recvc buffers watch responses before publishing
    recvc chan *WatchResponse
    // donec closes when the watcherStream goroutine stops.
    donec chan struct{}
    // closing is set to true when stream should be scheduled to shutdown.
    closing bool
    // id is the registered watch id on the grpc stream
    id int64

    // buf holds all events received from etcd but not yet consumed by the client
    buf []*WatchResponse
}

畫個圖整理下他們之間的關(guān)系:

下載.png

接下來輪到watcher是如何watch方法的了:

// Watch posts a watch request to run() and waits for a new watcher channel
func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) WatchChan {
    // 應(yīng)用配置
    ow := opWatch(key, opts...)

    var filters []pb.WatchCreateRequest_FilterType
    if ow.filterPut {
        filters = append(filters, pb.WatchCreateRequest_NOPUT)
    }
    if ow.filterDelete {
        filters = append(filters, pb.WatchCreateRequest_NODELETE)
    }

    // 根據(jù)傳入的參數(shù)構(gòu)造watch請求
    wr := &watchRequest{
        ctx:            ctx,
        createdNotify:  ow.createdNotify,
        key:            string(ow.key),
        end:            string(ow.end),
        rev:            ow.rev,
        progressNotify: ow.progressNotify,
        filters:        filters,
        prevKV:         ow.prevKV,
        retc:           make(chan chan WatchResponse, 1),
    }

    ok := false
    // 將請求上下文格式化為字符串
    ctxKey := fmt.Sprintf("%v", ctx)

    // find or allocate appropriate grpc watch stream
    // 接下來配置對應(yīng)的輸出流, 注意得加鎖
    w.mu.Lock()

    // 如果stream為空, 返回一個已經(jīng)關(guān)閉的channel.
    // 這種情況應(yīng)該是防止streams為空的情況
    if w.streams == nil {
        // closed
        w.mu.Unlock()
        ch := make(chan WatchResponse)
        close(ch)
        return ch
    }

    // 注意這里, 前面我們提到streams是一個map,該map的key是請求上下文
    // 如果該請求對應(yīng)的流為空,則新建
    wgs := w.streams[ctxKey]
    if wgs == nil {
        wgs = w.newWatcherGrpcStream(ctx)
        w.streams[ctxKey] = wgs
    }
    donec := wgs.donec
    reqc := wgs.reqc
    w.mu.Unlock()

    // couldn't create channel; return closed channel
        // couldn't create channel; return closed channel
    // 這里要設(shè)置為緩沖的原因可能與下面的兩個
    // closeCh <- WatchResponse{closeErr: wgs.closeErr}
    // 語句有關(guān),這里不理解
    closeCh := make(chan WatchResponse, 1)

    // submit request
    select {
    // 發(fā)送上面構(gòu)造好的watch請求給對應(yīng)的流
    case reqc <- wr:
        ok = true
    // 請求斷開(這里應(yīng)該囊括了客戶端請求斷開的所有情況)
    case <-wr.ctx.Done():
    // watch完成
    // 這里應(yīng)該是處理非正常完成的情況
    // 注意下面的重試邏輯
    case <-donec:
        if wgs.closeErr != nil {
            // 如果不是空上下文導(dǎo)致流被丟棄的情況
            // 則不應(yīng)該重試
            closeCh <- WatchResponse{closeErr: wgs.closeErr}
            break
        }
        // retry; may have dropped stream from no ctxs
        return w.Watch(ctx, key, opts...)
    }

    // receive channel
    // 如果是初始請求順利發(fā)送才會執(zhí)行這里
    if ok {
        select {
        case ret := <-wr.retc:
            return ret
        case <-ctx.Done():
        case <-donec:
            if wgs.closeErr != nil {
                closeCh <- WatchResponse{closeErr: wgs.closeErr}
                break
            }
            // retry; may have dropped stream from no ctxs
            return w.Watch(ctx, key, opts...)
        }
    }

    close(closeCh)
    return closeCh
}

還有Watcher接口的另一個方法Close:

func (w *watcher) Close() (err error) {
    // 在鎖內(nèi)先將streams字段置為空
    // 在鎖外再將一個個流都關(guān)閉
    // 這樣做的意義在于不管哪個流關(guān)閉失敗了
    // 都能先保證streams與這些流的關(guān)系被切斷
    w.mu.Lock()
    streams := w.streams
    w.streams = nil
    w.mu.Unlock()
    for _, wgs := range streams {
        if werr := wgs.Close(); werr != nil {
            err = werr
        }
    }
    // etcd竟然也只是返回一個error
    // 雖然上面的for循環(huán)可能產(chǎn)生多個error
    return err
}

這樣watcher就實現(xiàn)了Watcher接口.大致的實現(xiàn)思路本文就介紹到這里,剩下的代碼也都是對其他相關(guān)數(shù)據(jù)結(jié)構(gòu)的邏輯包裝操作.

簡單閱讀Etcd的這一小部分源碼下來, 我看到他們源碼中的兩個東西,算是Golang或者編程上面的一些最佳實踐:

  1. 對包外只暴露一個公共接口, 包內(nèi)的結(jié)構(gòu)體實現(xiàn)該接口即可.就像本文中的Watcher接口和watcher結(jié)構(gòu)體.這樣有兩個好處, 一個就是代碼能夠解耦,還有就是可以省去命名的苦惱(__)

  2. 另一個是注釋的書寫方式,我發(fā)現(xiàn)etcd源碼里的注釋很大一部分寫在變量的定義上面,而且變量的定義名都很清晰.

  3. 抽象得體.這個其實不只是Etcd, 其他任何優(yōu)秀的開源作品都把他們的代碼抽象得很到位.突然想起我寫的那些渣渣代碼%>_<%

最后, 總結(jié)下etcd的watch機制.其實歸根結(jié)底, 它的watch是通過gRPC的多路復(fù)用實現(xiàn)的,這是一個基于HTTP/2的特性.所以本文可能有些偏離了主題,探討Etcd的watch機制, 其實應(yīng)該研究HTTP/2才是.

算是給自己挖個坑.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容