前文講了informer是在客戶(hù)端如何工作的,但是在server端如何響應(yīng) watch-list請(qǐng)求的。沒(méi)有介紹。下面就來(lái)詳細(xì)介紹一下。
list-watch操作需要做這么幾件事
由組件向apiserver而不是etcd發(fā)起watch請(qǐng)求,在組件啟動(dòng)時(shí)就進(jìn)行訂閱,告訴apiserver需要知道什么數(shù)據(jù)發(fā)生變化。Watch是一個(gè)典型的發(fā)布-訂閱模式。
組件向apiserver發(fā)起的watch請(qǐng)求是可以帶條件的,例如,scheduler想要watch的是所有未被調(diào)度的Pod,也就是滿(mǎn)足Pod.destNode=””的Pod來(lái)進(jìn)行調(diào)度操作;而kubelet只關(guān)心自己節(jié)點(diǎn)上的Pod列表。apiserver向etcd發(fā)起的watch是沒(méi)有條件的,只能知道某個(gè)數(shù)據(jù)發(fā)生了變化或創(chuàng)建、刪除,但不能過(guò)濾具體的值。也就是說(shuō)對(duì)象數(shù)據(jù)的條件過(guò)濾必須在apiserver端而不是etcd端完成。
list是watch失敗,數(shù)據(jù)太過(guò)陳舊后的彌補(bǔ)手段,這方面詳見(jiàn) 基于list-watch的Kubernetes異步事件處理框架客戶(hù)端部分。list本身是一個(gè)簡(jiǎn)單的列表操作,和其它apiserver的增刪改操作一樣,不再多描述細(xì)節(jié)。
watch的API處理
既然watch本身是一個(gè)apiserver提供的http restful的API,那么就按照API的方式去閱讀它的代碼,按照apiserver的基礎(chǔ)功能實(shí)現(xiàn)一文所描述,我們來(lái)看它的代碼,
關(guān)鍵的處理API注冊(cè)代碼pkg/apiserver/api_installer.go
- 函數(shù) func (a *APIInstaller) registerResourceHandlers()
路徑 staging/src/k8s.io/apiserver/pkg/endpoints/installer.go 一個(gè)rest.Storage對(duì)象會(huì)被轉(zhuǎn)換為watcher和lister對(duì)象
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)
一個(gè)rest.Storage對(duì)象會(huì)被轉(zhuǎn)換為watcher和lister對(duì)象,提供list和watch服務(wù)的入口是同一個(gè),在A(yíng)PI接口中是通過(guò) GET /pods?watch=true 這種方式來(lái)區(qū)分是list還是watch,API處理函數(shù)是由lister和watcher經(jīng)過(guò)ListResource()合體后完成的。
case "LIST": // List all resources of a kind.
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
route := ws.GET(action.Path).To(handler).
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
Returns(http.StatusOK, "OK", versionedList).
Writes(versionedList)
switch {
case isLister && isWatcher:
case isWatcher:
addParams(route, action.Params)
routes = append(routes, route)
restfulListResource函數(shù)最終去向?yàn)長(zhǎng)istResource,路徑vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
restfulListResource函數(shù)最終去向?yàn)長(zhǎng)istResource,路徑vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go
func restfulListResource(r rest.Lister, rw rest.Watcher, scope handlers.RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
return func(req *restful.Request, res *restful.Response) {
handlers.ListResource(r, rw, scope, forceWatch, minRequestTimeout)(res.ResponseWriter, req.Request)
}
}
函數(shù) func ListResource函數(shù)
每次有一個(gè)watch的url請(qǐng)求過(guò)來(lái),都會(huì)調(diào)用rw.Watch()創(chuàng)建一個(gè)watcher
使用serveWatch()來(lái)處理這個(gè)請(qǐng)求
func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, req *http.Request) {
// For performance tracking purposes.
trace := utiltrace.New("List", traceFields(req)...)
namespace, err := scope.Namer.Namespace(req)
if err != nil {
scope.err(err, w, req)
return
}
// Watches for single objects are routed to this function.
// Treat a name parameter the same as a field selector entry.
hasName := true
_, name, err := scope.Namer.Name(req)
if err != nil {
hasName = false
}
ctx := req.Context()
ctx = request.WithNamespace(ctx, namespace)
outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
if err != nil {
scope.err(err, w, req)
return
}
opts := metainternalversion.ListOptions{}
if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
scope.err(err, w, req)
return
}
// transform fields
// TODO: DecodeParametersInto should do this.
if opts.FieldSelector != nil {
fn := func(label, value string) (newLabel, newValue string, err error) {
return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)
}
if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
// TODO: allow bad request to set field causes based on query parameters
err = errors.NewBadRequest(err.Error())
scope.err(err, w, req)
return
}
}
if hasName {
// metadata.name is the canonical internal name.
// SelectionPredicate will notice that this is a request for
// a single object and optimize the storage query accordingly.
nameSelector := fields.OneTermEqualSelector("metadata.name", name)
// Note that fieldSelector setting explicitly the "metadata.name"
// will result in reaching this branch (as the value of that field
// is propagated to requestInfo as the name parameter.
// That said, the allowed field selectors in this branch are:
// nil, fields.Everything and field selector matching metadata.name
// for our name.
if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name")
if !ok || name != selectedName {
scope.err(errors.NewBadRequest("fieldSelector metadata.name doesn't match requested name"), w, req)
return
}
} else {
opts.FieldSelector = nameSelector
}
}
if opts.Watch || forceWatch {
if rw == nil {
scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), "watch"), w, req)
return
}
// TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
timeout := time.Duration(0)
if opts.TimeoutSeconds != nil {
timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
}
if timeout == 0 && minRequestTimeout > 0 {
timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
}
klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
watcher, err := rw.Watch(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
requestInfo, _ := request.RequestInfoFrom(ctx)
metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
serveWatch(watcher, scope, outputMediaType, req, w, timeout)
})
return
}
// Log only long List requests (ignore Watch).
defer trace.LogIfLong(500 * time.Millisecond)
trace.Step("About to List from storage")
result, err := r.List(ctx, &opts)
if err != nil {
scope.err(err, w, req)
return
}
trace.Step("Listing from storage done")
transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
}
}
func (s *WatchServer) ServeHTTP()函數(shù)
for {
select {
case event, ok := <-ch:
obj := event.Object
s.Fixup(obj)
if err := s.EmbeddedEncoder.Encode(obj, buf);
unknown.Raw = buf.Bytes()
event.Object = &unknown
outEvent := &metav1.WatchEvent{}
*internalEvent = metav1.InternalEvent(event)
err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
if err := e.Encode(outEvent);
if len(ch) == 0 {
flusher.Flush()
}
buf.Reset()
}
watcher(watch.Interface)對(duì)象是被staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go 對(duì)象創(chuàng)建出來(lái)的。watch是所有通用的函數(shù),根據(jù)label和field過(guò)濾
func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}
field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
predicate := e.PredicateFunc(label, field)
resourceVersion := ""
if options != nil {
resourceVersion = options.ResourceVersion
predicate.IncludeUninitialized = options.IncludeUninitialized
}
return e.WatchPredicate(ctx, predicate, resourceVersion)
}
Cacher
結(jié)構(gòu)體 cahcer
- storage提供了增刪改查watch list接口,主要面向etcd
- watchCache有限容量的滑動(dòng)窗口
- watchers接口體包括所有請(qǐng)求watch的map
type Cacher struct {
// Underlying storage.Interface.
storage Interface
// Expected type of objects in the underlying cache.
objectType reflect.Type
// "sliding window" of recent changes of objects and the current state.
watchCache *watchCache
reflector *cache.Reflector
watcherIdx int
watchers indexedWatchers
}
2.2 函數(shù) func (c *Cacher) Watch
newCacheWatcher生成一個(gè)watcher
addwatcher并將watcher插入到cacher.watchers中
// Implements storage.Interface.
func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
watchRV, err := ParseWatchResourceVersion(resourceVersion)
if err != nil {
return nil, err
}
c.ready.wait()
initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
chanSize := 10
forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
c.watcherIdx++
return watcher, nil
2.3 函數(shù) func newCacheWatcher
newCacheWatcher生成一個(gè)watcher
process異步處理
func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
watcher := &cacheWatcher{
input: make(chan *watchCacheEvent, chanSize),
result: make(chan watch.Event, chanSize),
done: make(chan struct{}),
filter: filter,
stopped: false,
forget: forget,
versioner: versioner,
}
go watcher.process(initEvents, resourceVersion)
return watcher
}
2.3 函數(shù) func (c *cacheWatcher) process
process函數(shù)主要讀取input調(diào)用sendWatchCacheEvent,繼續(xù)sendWatchCacheEnven函數(shù)
func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
for _, event := range initEvents {
c.sendWatchCacheEvent(event)
}
processingTime := time.Since(startTime)
if processingTime > initProcessThreshold {
objType := "<null>"
if len(initEvents) > 0 {
objType = reflect.TypeOf(initEvents[0].Object).String()
}
glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
}
defer close(c.result)
defer c.Stop()
for {
event, ok := <-c.input
if !ok {
return
}
// only send events newer than resourceVersion
if event.ResourceVersion > resourceVersion {
c.sendWatchCacheEvent(event)
}
}
}
2.4 函數(shù)func (c *cacheWatcher) sendWatchCacheEvent
watchCacheEvent進(jìn)行Filter,發(fā)送到cacher.result channel中
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!
func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
oldObjPasses := false
if event.PrevObject != nil {
oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
}
if !curObjPasses && !oldObjPasses {
// Watcher is not interested in that object.
return
}
var watchEvent watch.Event
switch {
case curObjPasses && !oldObjPasses:
watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
case curObjPasses && oldObjPasses:
watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
case !curObjPasses && oldObjPasses:
// return a delete event with the previous object content, but with the event's resource version
oldObj := event.PrevObject.DeepCopyObject()
if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
}
watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
}
select {
case <-c.done:
return
default:
}
select {
case c.result <- watchEvent:
case <-c.done:
}
}
```
三. 實(shí)現(xiàn)接口部分
pkg/registry/core/pod/storage/storage.go
3.1 NewPodStorage函數(shù)
PodStorage.Pod.Store封裝了對(duì)etcd的操作;
```
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
store := &genericregistry.Store{
NewFunc: func() runtime.Object { return &api.Pod{} },
NewListFunc: func() runtime.Object { return &api.PodList{} },
PredicateFunc: pod.MatchPod,
DefaultQualifiedResource: api.Resource("pods"),
CreateStrategy: pod.Strategy,
UpdateStrategy: pod.Strategy,
DeleteStrategy: pod.Strategy,
ReturnDeletedObject: true,
TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
}
options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
if err := store.CompleteWithOptions(options); err != nil {
panic(err) // TODO: Propagate error up
}
statusStore := *store
statusStore.UpdateStrategy = pod.StatusStrategy
return PodStorage{
Pod: &REST{store, proxyTransport},
Binding: &BindingREST{store: store},
Eviction: newEvictionStorage(store, podDisruptionBudgetClient),
Status: &StatusREST{store: &statusStore},
Log: &podrest.LogREST{Store: store, KubeletConn: k},
Proxy: &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
Exec: &podrest.ExecREST{Store: store, KubeletConn: k},
Attach: &podrest.AttachREST{Store: store, KubeletConn: k},
PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
}
}
```
3.2 CompleWithOptions函數(shù)
路徑:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
主要函數(shù)是GetRESTOptions
```
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
if err != nil {
return err
}
if e.Storage == nil {
e.Storage, e.DestroyFunc = opts.Decorator(
opts.StorageConfig,
e.NewFunc(),
prefix,
keyFunc,
e.NewListFunc,
attrFunc,
triggerFunc,
)
}
return nil
}
3.3 GetRESTOptinons函數(shù)
路徑:/vendor/k8s.io/apiserver/pkg/server/options/etcd.go
調(diào)用genericregistry.StorageWithCacher創(chuàng)建cache store
func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
storageConfig, err := f.StorageFactory.NewConfig(resource)
if err != nil {
return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
}
ret := generic.RESTOptions{
StorageConfig: storageConfig,
Decorator: generic.UndecoratedStorage,
DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
EnableGarbageCollection: f.Options.EnableGarbageCollection,
ResourcePrefix: f.StorageFactory.ResourcePrefix(resource),
}
if f.Options.EnableWatchCache {
sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
if err != nil {
return generic.RESTOptions{}, err
}
cacheSize, ok := sizes[resource]
if !ok {
cacheSize = f.Options.DefaultWatchCacheSize
}
ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
}
return ret, nil
}
3.4 StorageWithCacher函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
根據(jù)配置創(chuàng)建cacher
NewRawStorage根據(jù)配置類(lèi)型,具體是創(chuàng)建etcd v2或者v3 client
// Creates a cacher based given storageConfig.
func StorageWithCacher(capacity int) generic.StorageDecorator {
return func(
storageConfig *storagebackend.Config,
objectType runtime.Object,
resourcePrefix string,
keyFunc func(obj runtime.Object) (string, error),
newListFunc func() runtime.Object,
getAttrsFunc storage.AttrFunc,
triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
s, d := generic.NewRawStorage(storageConfig)
if capacity == 0 {
glog.V(5).Infof("Storage caching is disabled for %T", objectType)
return s, d
}
cacherConfig := storage.CacherConfig{
CacheCapacity: capacity,
Storage: s,
Versioner: etcdstorage.APIObjectVersioner{},
Type: objectType,
ResourcePrefix: resourcePrefix,
KeyFunc: keyFunc,
NewListFunc: newListFunc,
GetAttrsFunc: getAttrsFunc,
TriggerPublisherFunc: triggerFunc,
Codec: storageConfig.Codec,
}
cacher := storage.NewCacherFromConfig(cacherConfig)
destroyFunc := func() {
cacher.Stop()
d()
}
RegisterStorageCleanup(destroyFunc)
return cacher, destroyFunc
}
}
3.5 NewCacherFromConfig函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
創(chuàng)建cacher服務(wù)于list-watch創(chuàng)建watchCache對(duì)象和cacheListerWatcher對(duì)象,cacheListWatcher對(duì)象是ListerWatcher接口實(shí)現(xiàn),實(shí)現(xiàn)了List()和Watch()方法
構(gòu)建Cacher對(duì)象
(1) watchCache是一個(gè)結(jié)構(gòu)體,用來(lái)存儲(chǔ)apiserver從etcd watch到的對(duì)象
(2) watchers是一個(gè)indexedWatchers結(jié)構(gòu)體,當(dāng)kubelet,scheduler需要watch某類(lèi)資源時(shí),他們會(huì)向kube-apiserver發(fā)起watch請(qǐng)求,apiserver就會(huì)生成一個(gè)cacheWatcher,cacheWatcher負(fù)責(zé)將watch的資源從apiserver發(fā)送到kubelet, scheduler
(3) Reflector結(jié)構(gòu)體數(shù)據(jù)成員:ListerWatcher,ListerWatcher是接口對(duì)象,包括方法List()和Watch();listerWatcher包裝了Storage,主要是將watch到的對(duì)象存到watchCache中;
// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
// List should return a list type object; the Items field will be extracted, and the
// ResourceVersion field will be used to start the watch in the right place.
List(options metav1.ListOptions) (runtime.Object, error)
// Watch should begin a watch at the specified version.
Watch(options metav1.ListOptions) (watch.Interface, error)
}
(4) incoming channel接收watchCacheEvent的100個(gè)空間的channel
協(xié)程cacher.dispatchEvents,watchCache將incoming channel接收watchCacheEvent添加到watchers的inputChan中
func (c *Cacher) dispatchEvents() {
for {
select {
case event, ok := <-c.incoming:
if !ok {
return
}
c.dispatchEvent(&event)
case <-c.stopCh:
return
}
}
}
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
triggerValues, supported := c.triggerValues(event)
for _, watcher := range c.watchers.allWatchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
if supported {
for _, triggerValue := range triggerValues {
for _, watcher := range c.watchers.valueWatchers[triggerValue] {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
} else {
for _, watchers := range c.watchers.valueWatchers {
for _, watcher := range watchers {
watcher.add(event, c.dispatchTimeoutBudget)
}
}
}
}
協(xié)程cacher.startCaching
func NewCacherFromConfig(config CacherConfig) *Cacher {
watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
reflectorName := "storage/cacher.go:" + config.ResourcePrefix
stopCh := make(chan struct{})
cacher := &Cacher{
}
watchCache.SetOnEvent(cacher.processEvent)
go cacher.dispatchEvents()
cacher.stopWg.Add(1)
go func() {
defer cacher.stopWg.Done()
wait.Until(
func() {
if !cacher.isStopped() {
cacher.startCaching(stopCh)
}
}, time.Second, stopCh,
)
}()
return cacher
}
3.6 startCaching函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
主要函數(shù)reflecrtor.ListAndWatch函數(shù),把遠(yuǎn)端etcd數(shù)據(jù)同步到本地的方法,存在watchCache
func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
successfulList := false
c.watchCache.SetOnReplace(func() {
successfulList = true
c.ready.set(true)
})
defer func() {
if successfulList {
c.ready.set(false)
}
}()
c.terminateAllWatchers()
if err := c.reflector.ListAndWatch(stopChannel); err != nil {
glog.Errorf("unexpected ListAndWatch error: %v", err)
}
}
3.7 ListAndWatch函數(shù)
路徑: vendor/k8s.io/client-go/tools/cache/reflector.go
首先list所有條目,根據(jù)版本進(jìn)行watch
調(diào)用listerWatcher.List方法獲得
調(diào)用listerWatcher.Watch進(jìn)行操作
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
list, err := r.listerWatcher.List(options)
r.metrics.listDuration.Observe(time.Since(start).Seconds())
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
for {
w, err := r.listerWatcher.Watch(options)
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
return nil
}
}
}
3.8 newCacherListerWatcher函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
storage接口包含list watch接口
func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
return &cacherListerWatcher{
storage: storage,
resourcePrefix: resourcePrefix,
newListFunc: newListFunc,
}
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
list := lw.newListFunc()
if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
return nil, err
}
return list, nil
}
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}
3.9 NewRawStorage函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/registry/gereric/registry/storage_factory.go
往回找到,最終去向是etcd,應(yīng)該是k8s1.6版本從v2更新至v3
func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
s, d, err := factory.Create(*config)
return s, d
}
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
switch c.Type {
case storagebackend.StorageTypeETCD2:
return newETCD2Storage(c)
case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
return newETCD3Storage(c)
default:
return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
}
}
3.10 newETCD3Storage函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
對(duì)就是etcdv3 client實(shí)現(xiàn)了list watch接口
func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
client, err := clientv3.New(cfg)
if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursi