APIServer list-watch 的實(shí)現(xiàn)

前文講了informer是在客戶(hù)端如何工作的,但是在server端如何響應(yīng) watch-list請(qǐng)求的。沒(méi)有介紹。下面就來(lái)詳細(xì)介紹一下。

list-watch操作需要做這么幾件事

由組件向apiserver而不是etcd發(fā)起watch請(qǐng)求,在組件啟動(dòng)時(shí)就進(jìn)行訂閱,告訴apiserver需要知道什么數(shù)據(jù)發(fā)生變化。Watch是一個(gè)典型的發(fā)布-訂閱模式。

組件向apiserver發(fā)起的watch請(qǐng)求是可以帶條件的,例如,scheduler想要watch的是所有未被調(diào)度的Pod,也就是滿(mǎn)足Pod.destNode=””的Pod來(lái)進(jìn)行調(diào)度操作;而kubelet只關(guān)心自己節(jié)點(diǎn)上的Pod列表。apiserver向etcd發(fā)起的watch是沒(méi)有條件的,只能知道某個(gè)數(shù)據(jù)發(fā)生了變化或創(chuàng)建、刪除,但不能過(guò)濾具體的值。也就是說(shuō)對(duì)象數(shù)據(jù)的條件過(guò)濾必須在apiserver端而不是etcd端完成。

list是watch失敗,數(shù)據(jù)太過(guò)陳舊后的彌補(bǔ)手段,這方面詳見(jiàn) 基于list-watch的Kubernetes異步事件處理框架客戶(hù)端部分。list本身是一個(gè)簡(jiǎn)單的列表操作,和其它apiserver的增刪改操作一樣,不再多描述細(xì)節(jié)。

watch的API處理

既然watch本身是一個(gè)apiserver提供的http restful的API,那么就按照API的方式去閱讀它的代碼,按照apiserver的基礎(chǔ)功能實(shí)現(xiàn)一文所描述,我們來(lái)看它的代碼,

關(guān)鍵的處理API注冊(cè)代碼pkg/apiserver/api_installer.go

  1. 函數(shù) func (a *APIInstaller) registerResourceHandlers()
    路徑 staging/src/k8s.io/apiserver/pkg/endpoints/installer.go 一個(gè)rest.Storage對(duì)象會(huì)被轉(zhuǎn)換為watcher和lister對(duì)象
// what verbs are supported by the storage, used to know what verbs we support per path
creater, isCreater := storage.(rest.Creater)
namedCreater, isNamedCreater := storage.(rest.NamedCreater)
lister, isLister := storage.(rest.Lister)
getter, isGetter := storage.(rest.Getter)

一個(gè)rest.Storage對(duì)象會(huì)被轉(zhuǎn)換為watcher和lister對(duì)象,提供list和watch服務(wù)的入口是同一個(gè),在A(yíng)PI接口中是通過(guò) GET /pods?watch=true 這種方式來(lái)區(qū)分是list還是watch,API處理函數(shù)是由lister和watcher經(jīng)過(guò)ListResource()合體后完成的。

case "LIST": // List all resources of a kind.

handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, requestScope, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
        
route := ws.GET(action.Path).To(handler).
    Doc(doc).
    Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
    Operation("list"+namespaced+kind+strings.Title(subresource)+operationSuffix).
        Produces(append(storageMeta.ProducesMIMETypes(action.Verb), allMediaTypes...)...).
Returns(http.StatusOK, "OK", versionedList).
Writes(versionedList)
        
switch {
case isLister && isWatcher:
            
case isWatcher:
            
addParams(route, action.Params)
routes = append(routes, route)
  restfulListResource函數(shù)最終去向?yàn)長(zhǎng)istResource,路徑vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go

restfulListResource函數(shù)最終去向?yàn)長(zhǎng)istResource,路徑vendor/k8s.io/apiserver/pkg/endpoints/handlers/get.go

func restfulListResource(r rest.Lister, rw rest.Watcher, scope handlers.RequestScope, forceWatch bool, minRequestTimeout time.Duration) restful.RouteFunction {
   return func(req *restful.Request, res *restful.Response) {
      handlers.ListResource(r, rw, scope, forceWatch, minRequestTimeout)(res.ResponseWriter, req.Request)
   }
}

函數(shù) func ListResource函數(shù)

每次有一個(gè)watch的url請(qǐng)求過(guò)來(lái),都會(huì)調(diào)用rw.Watch()創(chuàng)建一個(gè)watcher
使用serveWatch()來(lái)處理這個(gè)請(qǐng)求

func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatch bool, minRequestTimeout time.Duration) http.HandlerFunc {
    return func(w http.ResponseWriter, req *http.Request) {
        // For performance tracking purposes.
        trace := utiltrace.New("List", traceFields(req)...)

        namespace, err := scope.Namer.Namespace(req)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        // Watches for single objects are routed to this function.
        // Treat a name parameter the same as a field selector entry.
        hasName := true
        _, name, err := scope.Namer.Name(req)
        if err != nil {
            hasName = false
        }

        ctx := req.Context()
        ctx = request.WithNamespace(ctx, namespace)

        outputMediaType, _, err := negotiation.NegotiateOutputMediaType(req, scope.Serializer, scope)
        if err != nil {
            scope.err(err, w, req)
            return
        }

        opts := metainternalversion.ListOptions{}
        if err := metainternalversionscheme.ParameterCodec.DecodeParameters(req.URL.Query(), scope.MetaGroupVersion, &opts); err != nil {
            err = errors.NewBadRequest(err.Error())
            scope.err(err, w, req)
            return
        }

        if errs := metainternalversionvalidation.ValidateListOptions(&opts); len(errs) > 0 {
            err := errors.NewInvalid(schema.GroupKind{Group: metav1.GroupName, Kind: "ListOptions"}, "", errs)
            scope.err(err, w, req)
            return
        }

        // transform fields
        // TODO: DecodeParametersInto should do this.
        if opts.FieldSelector != nil {
            fn := func(label, value string) (newLabel, newValue string, err error) {
                return scope.Convertor.ConvertFieldLabel(scope.Kind, label, value)
            }
            if opts.FieldSelector, err = opts.FieldSelector.Transform(fn); err != nil {
                // TODO: allow bad request to set field causes based on query parameters
                err = errors.NewBadRequest(err.Error())
                scope.err(err, w, req)
                return
            }
        }

        if hasName {
            // metadata.name is the canonical internal name.
            // SelectionPredicate will notice that this is a request for
            // a single object and optimize the storage query accordingly.
            nameSelector := fields.OneTermEqualSelector("metadata.name", name)

            // Note that fieldSelector setting explicitly the "metadata.name"
            // will result in reaching this branch (as the value of that field
            // is propagated to requestInfo as the name parameter.
            // That said, the allowed field selectors in this branch are:
            // nil, fields.Everything and field selector matching metadata.name
            // for our name.
            if opts.FieldSelector != nil && !opts.FieldSelector.Empty() {
                selectedName, ok := opts.FieldSelector.RequiresExactMatch("metadata.name")
                if !ok || name != selectedName {
                    scope.err(errors.NewBadRequest("fieldSelector metadata.name doesn't match requested name"), w, req)
                    return
                }
            } else {
                opts.FieldSelector = nameSelector
            }
        }

        if opts.Watch || forceWatch {
            if rw == nil {
                scope.err(errors.NewMethodNotSupported(scope.Resource.GroupResource(), "watch"), w, req)
                return
            }
            // TODO: Currently we explicitly ignore ?timeout= and use only ?timeoutSeconds=.
            timeout := time.Duration(0)
            if opts.TimeoutSeconds != nil {
                timeout = time.Duration(*opts.TimeoutSeconds) * time.Second
            }
            if timeout == 0 && minRequestTimeout > 0 {
                timeout = time.Duration(float64(minRequestTimeout) * (rand.Float64() + 1.0))
            }
            klog.V(3).InfoS("Starting watch", "path", req.URL.Path, "resourceVersion", opts.ResourceVersion, "labels", opts.LabelSelector, "fields", opts.FieldSelector, "timeout", timeout)
            ctx, cancel := context.WithTimeout(ctx, timeout)
            defer cancel()
            watcher, err := rw.Watch(ctx, &opts)
            if err != nil {
                scope.err(err, w, req)
                return
            }
            requestInfo, _ := request.RequestInfoFrom(ctx)
            metrics.RecordLongRunning(req, requestInfo, metrics.APIServerComponent, func() {
                serveWatch(watcher, scope, outputMediaType, req, w, timeout)
            })
            return
        }

        // Log only long List requests (ignore Watch).
        defer trace.LogIfLong(500 * time.Millisecond)
        trace.Step("About to List from storage")
        result, err := r.List(ctx, &opts)
        if err != nil {
            scope.err(err, w, req)
            return
        }
        trace.Step("Listing from storage done")

        transformResponseObject(ctx, scope, trace, req, w, http.StatusOK, outputMediaType, result)
        trace.Step("Writing http response done", utiltrace.Field{"count", meta.LenList(result)})
    }
}

func (s *WatchServer) ServeHTTP()函數(shù)


for {
    select {
    case event, ok := <-ch: 
 
        obj := event.Object
        s.Fixup(obj)
        if err := s.EmbeddedEncoder.Encode(obj, buf); 
 
        unknown.Raw = buf.Bytes()
        event.Object = &unknown
 
        outEvent := &metav1.WatchEvent{}
        *internalEvent = metav1.InternalEvent(event)
        err := metav1.Convert_versioned_InternalEvent_to_versioned_Event(internalEvent, outEvent, nil)
            
        if err := e.Encode(outEvent); 
        if len(ch) == 0 {
            flusher.Flush()
        }
 
        buf.Reset()
}

watcher(watch.Interface)對(duì)象是被staging/src/k8s.io/apiserver/pkg/registry/generic/registry/store.go 對(duì)象創(chuàng)建出來(lái)的。watch是所有通用的函數(shù),根據(jù)label和field過(guò)濾

func (e *Store) Watch(ctx genericapirequest.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
    label := labels.Everything()
    if options != nil && options.LabelSelector != nil {
        label = options.LabelSelector
    }
    field := fields.Everything()
    if options != nil && options.FieldSelector != nil {
        field = options.FieldSelector
    }
    predicate := e.PredicateFunc(label, field)
 
    resourceVersion := ""
    if options != nil {
        resourceVersion = options.ResourceVersion
        predicate.IncludeUninitialized = options.IncludeUninitialized
    }
    return e.WatchPredicate(ctx, predicate, resourceVersion)
}

Cacher

結(jié)構(gòu)體 cahcer

  • storage提供了增刪改查watch list接口,主要面向etcd
  • watchCache有限容量的滑動(dòng)窗口
  • watchers接口體包括所有請(qǐng)求watch的map
type Cacher struct {
 
    // Underlying storage.Interface.
    storage Interface
 
    // Expected type of objects in the underlying cache.
    objectType reflect.Type
 
    // "sliding window" of recent changes of objects and the current state.
    watchCache *watchCache
    reflector  *cache.Reflector
 
    watcherIdx int
    watchers   indexedWatchers
}

2.2 函數(shù) func (c *Cacher) Watch
newCacheWatcher生成一個(gè)watcher
addwatcher并將watcher插入到cacher.watchers中
// Implements storage.Interface.

func (c *Cacher) Watch(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate) (watch.Interface, error) {
    watchRV, err := ParseWatchResourceVersion(resourceVersion)
    if err != nil {
        return nil, err
    }
 
    c.ready.wait()
 
    initEvents, err := c.watchCache.GetAllEventsSinceThreadUnsafe(watchRV)
 
    chanSize := 10
 
    forget := forgetWatcher(c, c.watcherIdx, triggerValue, triggerSupported)
    watcher := newCacheWatcher(watchRV, chanSize, initEvents, watchFilterFunction(key, pred), forget, c.versioner)
 
    c.watchers.addWatcher(watcher, c.watcherIdx, triggerValue, triggerSupported)
    c.watcherIdx++
    return watcher, nil

2.3 函數(shù) func newCacheWatcher
newCacheWatcher生成一個(gè)watcher
process異步處理

func newCacheWatcher(resourceVersion uint64, chanSize int, initEvents []*watchCacheEvent, filter watchFilterFunc, forget func(bool), versioner Versioner) *cacheWatcher {
    watcher := &cacheWatcher{
        input:     make(chan *watchCacheEvent, chanSize),
        result:    make(chan watch.Event, chanSize),
        done:      make(chan struct{}),
        filter:    filter,
        stopped:   false,
        forget:    forget,
        versioner: versioner,
    }
    go watcher.process(initEvents, resourceVersion)
    return watcher
}

2.3 函數(shù) func (c *cacheWatcher) process
process函數(shù)主要讀取input調(diào)用sendWatchCacheEvent,繼續(xù)sendWatchCacheEnven函數(shù)

func (c *cacheWatcher) process(initEvents []*watchCacheEvent, resourceVersion uint64) {
    for _, event := range initEvents {
        c.sendWatchCacheEvent(event)
    }
    processingTime := time.Since(startTime)
    if processingTime > initProcessThreshold {
        objType := "<null>"
        if len(initEvents) > 0 {
            objType = reflect.TypeOf(initEvents[0].Object).String()
        }
        glog.V(2).Infof("processing %d initEvents of %s took %v", len(initEvents), objType, processingTime)
    }
 
    defer close(c.result)
    defer c.Stop()
    for {
        event, ok := <-c.input
        if !ok {
            return
        }
        // only send events newer than resourceVersion
        if event.ResourceVersion > resourceVersion {
            c.sendWatchCacheEvent(event)
        }
    }
}

2.4 函數(shù)func (c *cacheWatcher) sendWatchCacheEvent
watchCacheEvent進(jìn)行Filter,發(fā)送到cacher.result channel中
// NOTE: sendWatchCacheEvent is assumed to not modify <event> !!!

func (c *cacheWatcher) sendWatchCacheEvent(event *watchCacheEvent) {
    curObjPasses := event.Type != watch.Deleted && c.filter(event.Key, event.ObjLabels, event.ObjFields, event.ObjUninitialized)
    oldObjPasses := false
    if event.PrevObject != nil {
        oldObjPasses = c.filter(event.Key, event.PrevObjLabels, event.PrevObjFields, event.PrevObjUninitialized)
    }
    if !curObjPasses && !oldObjPasses {
        // Watcher is not interested in that object.
        return
    }
 
    var watchEvent watch.Event
    switch {
    case curObjPasses && !oldObjPasses:
        watchEvent = watch.Event{Type: watch.Added, Object: event.Object.DeepCopyObject()}
    case curObjPasses && oldObjPasses:
        watchEvent = watch.Event{Type: watch.Modified, Object: event.Object.DeepCopyObject()}
    case !curObjPasses && oldObjPasses:
        // return a delete event with the previous object content, but with the event's resource version
        oldObj := event.PrevObject.DeepCopyObject()
        if err := c.versioner.UpdateObject(oldObj, event.ResourceVersion); err != nil {
            utilruntime.HandleError(fmt.Errorf("failure to version api object (%d) %#v: %v", event.ResourceVersion, oldObj, err))
        }
        watchEvent = watch.Event{Type: watch.Deleted, Object: oldObj}
    }
 
    select {
    case <-c.done:
        return
    default:
    }
 
    select {
    case c.result <- watchEvent:
    case <-c.done:
    }
}
 ```

三. 實(shí)現(xiàn)接口部分
pkg/registry/core/pod/storage/storage.go
3.1 NewPodStorage函數(shù)
 PodStorage.Pod.Store封裝了對(duì)etcd的操作;
```
func NewStorage(optsGetter generic.RESTOptionsGetter, k client.ConnectionInfoGetter, proxyTransport http.RoundTripper, podDisruptionBudgetClient policyclient.PodDisruptionBudgetsGetter) PodStorage {
 
   store := &genericregistry.Store{
      NewFunc:                  func() runtime.Object { return &api.Pod{} },
      NewListFunc:              func() runtime.Object { return &api.PodList{} },
      PredicateFunc:            pod.MatchPod,
      DefaultQualifiedResource: api.Resource("pods"),
 
      CreateStrategy:      pod.Strategy,
      UpdateStrategy:      pod.Strategy,
      DeleteStrategy:      pod.Strategy,
      ReturnDeletedObject: true,
 
      TableConvertor: printerstorage.TableConvertor{TablePrinter: printers.NewTablePrinter().With(printersinternal.AddHandlers)},
   }
   options := &generic.StoreOptions{RESTOptions: optsGetter, AttrFunc: pod.GetAttrs, TriggerFunc: pod.NodeNameTriggerFunc}
   if err := store.CompleteWithOptions(options); err != nil {
      panic(err) // TODO: Propagate error up
   }
 
   statusStore := *store
   statusStore.UpdateStrategy = pod.StatusStrategy
 
   return PodStorage{
      Pod:         &REST{store, proxyTransport},
      Binding:     &BindingREST{store: store},
      Eviction:    newEvictionStorage(store, podDisruptionBudgetClient),
      Status:      &StatusREST{store: &statusStore},
      Log:         &podrest.LogREST{Store: store, KubeletConn: k},
      Proxy:       &podrest.ProxyREST{Store: store, ProxyTransport: proxyTransport},
      Exec:        &podrest.ExecREST{Store: store, KubeletConn: k},
      Attach:      &podrest.AttachREST{Store: store, KubeletConn: k},
      PortForward: &podrest.PortForwardREST{Store: store, KubeletConn: k},
   }
}
```
3.2 CompleWithOptions函數(shù)
路徑:vendor/k8s.io/apiserver/pkg/registry/generic/registry/store.go
主要函數(shù)是GetRESTOptions
```
func (e *Store) CompleteWithOptions(options *generic.StoreOptions) error {
    opts, err := options.RESTOptions.GetRESTOptions(e.DefaultQualifiedResource)
    if err != nil {
        return err
    }
 
    if e.Storage == nil {
        e.Storage, e.DestroyFunc = opts.Decorator(
            opts.StorageConfig,
            e.NewFunc(),
            prefix,
            keyFunc,
            e.NewListFunc,
            attrFunc,
            triggerFunc,
        )
    }
 
    return nil
}

3.3 GetRESTOptinons函數(shù)
路徑:/vendor/k8s.io/apiserver/pkg/server/options/etcd.go
調(diào)用genericregistry.StorageWithCacher創(chuàng)建cache store

func (f *storageFactoryRestOptionsFactory) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) {
    storageConfig, err := f.StorageFactory.NewConfig(resource)
    if err != nil {
        return generic.RESTOptions{}, fmt.Errorf("unable to find storage destination for %v, due to %v", resource, err.Error())
    }
 
    ret := generic.RESTOptions{
        StorageConfig:           storageConfig,
        Decorator:               generic.UndecoratedStorage,
        DeleteCollectionWorkers: f.Options.DeleteCollectionWorkers,
        EnableGarbageCollection: f.Options.EnableGarbageCollection,
        ResourcePrefix:          f.StorageFactory.ResourcePrefix(resource),
    }
    if f.Options.EnableWatchCache {
        sizes, err := ParseWatchCacheSizes(f.Options.WatchCacheSizes)
        if err != nil {
            return generic.RESTOptions{}, err
        }
        cacheSize, ok := sizes[resource]
        if !ok {
            cacheSize = f.Options.DefaultWatchCacheSize
        }
        ret.Decorator = genericregistry.StorageWithCacher(cacheSize)
    }
 
    return ret, nil
}

3.4 StorageWithCacher函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/registry/generic/registry/storage_factory.go
根據(jù)配置創(chuàng)建cacher
NewRawStorage根據(jù)配置類(lèi)型,具體是創(chuàng)建etcd v2或者v3 client
// Creates a cacher based given storageConfig.

func StorageWithCacher(capacity int) generic.StorageDecorator {
    return func(
        storageConfig *storagebackend.Config,
        objectType runtime.Object,
        resourcePrefix string,
        keyFunc func(obj runtime.Object) (string, error),
        newListFunc func() runtime.Object,
        getAttrsFunc storage.AttrFunc,
        triggerFunc storage.TriggerPublisherFunc) (storage.Interface, factory.DestroyFunc) {
 
        s, d := generic.NewRawStorage(storageConfig)
        if capacity == 0 {
            glog.V(5).Infof("Storage caching is disabled for %T", objectType)
            return s, d
        }
        
        cacherConfig := storage.CacherConfig{
            CacheCapacity:        capacity,
            Storage:              s,
            Versioner:            etcdstorage.APIObjectVersioner{},
            Type:                 objectType,
            ResourcePrefix:       resourcePrefix,
            KeyFunc:              keyFunc,
            NewListFunc:          newListFunc,
            GetAttrsFunc:         getAttrsFunc,
            TriggerPublisherFunc: triggerFunc,
            Codec:                storageConfig.Codec,
        }
        cacher := storage.NewCacherFromConfig(cacherConfig)
        destroyFunc := func() {
            cacher.Stop()
            d()
        }
 
        RegisterStorageCleanup(destroyFunc)
        return cacher, destroyFunc
    }
}

3.5 NewCacherFromConfig函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
創(chuàng)建cacher服務(wù)于list-watch創(chuàng)建watchCache對(duì)象和cacheListerWatcher對(duì)象,cacheListWatcher對(duì)象是ListerWatcher接口實(shí)現(xiàn),實(shí)現(xiàn)了List()和Watch()方法
構(gòu)建Cacher對(duì)象
(1) watchCache是一個(gè)結(jié)構(gòu)體,用來(lái)存儲(chǔ)apiserver從etcd watch到的對(duì)象
(2) watchers是一個(gè)indexedWatchers結(jié)構(gòu)體,當(dāng)kubelet,scheduler需要watch某類(lèi)資源時(shí),他們會(huì)向kube-apiserver發(fā)起watch請(qǐng)求,apiserver就會(huì)生成一個(gè)cacheWatcher,cacheWatcher負(fù)責(zé)將watch的資源從apiserver發(fā)送到kubelet, scheduler
(3) Reflector結(jié)構(gòu)體數(shù)據(jù)成員:ListerWatcher,ListerWatcher是接口對(duì)象,包括方法List()和Watch();listerWatcher包裝了Storage,主要是將watch到的對(duì)象存到watchCache中;

// ListerWatcher is any object that knows how to perform an initial list and start a watch on a resource.
type ListerWatcher interface {
    // List should return a list type object; the Items field will be extracted, and the
    // ResourceVersion field will be used to start the watch in the right place.
    List(options metav1.ListOptions) (runtime.Object, error)
    // Watch should begin a watch at the specified version.
    Watch(options metav1.ListOptions) (watch.Interface, error)
}
        (4) incoming channel接收watchCacheEvent的100個(gè)空間的channel

協(xié)程cacher.dispatchEvents,watchCache將incoming channel接收watchCacheEvent添加到watchers的inputChan中

func (c *Cacher) dispatchEvents() {
    for {
        select {
        case event, ok := <-c.incoming:
            if !ok {
                return
            }
            c.dispatchEvent(&event)
        case <-c.stopCh:
            return
        }
    }
}
 
func (c *Cacher) dispatchEvent(event *watchCacheEvent) {
    triggerValues, supported := c.triggerValues(event)
 
    for _, watcher := range c.watchers.allWatchers {
        watcher.add(event, c.dispatchTimeoutBudget)
    }
    if supported {
 
        for _, triggerValue := range triggerValues {
            for _, watcher := range c.watchers.valueWatchers[triggerValue] {
                watcher.add(event, c.dispatchTimeoutBudget)
            }
        }
    } else {
        for _, watchers := range c.watchers.valueWatchers {
            for _, watcher := range watchers {
                watcher.add(event, c.dispatchTimeoutBudget)
            }
        }
    }
}

協(xié)程cacher.startCaching

func NewCacherFromConfig(config CacherConfig) *Cacher {
    watchCache := newWatchCache(config.CacheCapacity, config.KeyFunc, config.GetAttrsFunc)
    listerWatcher := newCacherListerWatcher(config.Storage, config.ResourcePrefix, config.NewListFunc)
    reflectorName := "storage/cacher.go:" + config.ResourcePrefix
 
    stopCh := make(chan struct{})
    cacher := &Cacher{
        
    }
    watchCache.SetOnEvent(cacher.processEvent)
    go cacher.dispatchEvents()
 
    cacher.stopWg.Add(1)
    go func() {
        defer cacher.stopWg.Done()
        wait.Until(
            func() {
                if !cacher.isStopped() {
                    cacher.startCaching(stopCh)
                }
            }, time.Second, stopCh,
        )
    }()
    return cacher
}

3.6 startCaching函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
主要函數(shù)reflecrtor.ListAndWatch函數(shù),把遠(yuǎn)端etcd數(shù)據(jù)同步到本地的方法,存在watchCache

func (c *Cacher) startCaching(stopChannel <-chan struct{}) {
    successfulList := false
    c.watchCache.SetOnReplace(func() {
        successfulList = true
        c.ready.set(true)
    })
    defer func() {
        if successfulList {
            c.ready.set(false)
        }
    }()
 
    c.terminateAllWatchers()
    if err := c.reflector.ListAndWatch(stopChannel); err != nil {
        glog.Errorf("unexpected ListAndWatch error: %v", err)
    }
}

3.7 ListAndWatch函數(shù)
路徑: vendor/k8s.io/client-go/tools/cache/reflector.go
首先list所有條目,根據(jù)版本進(jìn)行watch
調(diào)用listerWatcher.List方法獲得
調(diào)用listerWatcher.Watch進(jìn)行操作

func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
    list, err := r.listerWatcher.List(options)
    
    r.metrics.listDuration.Observe(time.Since(start).Seconds())
    listMetaInterface, err := meta.ListAccessor(list)
    if err != nil {
        return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
    }
    resourceVersion = listMetaInterface.GetResourceVersion()
 
    for {
        w, err := r.listerWatcher.Watch(options)
 
        if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
            
            return nil
        }
    }
}

3.8 newCacherListerWatcher函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/cacher.go
storage接口包含list watch接口

func newCacherListerWatcher(storage Interface, resourcePrefix string, newListFunc func() runtime.Object) cache.ListerWatcher {
    return &cacherListerWatcher{
        storage:        storage,
        resourcePrefix: resourcePrefix,
        newListFunc:    newListFunc,
    }
}
 
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) List(options metav1.ListOptions) (runtime.Object, error) {
    list := lw.newListFunc()
    if err := lw.storage.List(context.TODO(), lw.resourcePrefix, "", Everything, list); err != nil {
        return nil, err
    }
    return list, nil
}
 
// Implements cache.ListerWatcher interface.
func (lw *cacherListerWatcher) Watch(options metav1.ListOptions) (watch.Interface, error) {
    return lw.storage.WatchList(context.TODO(), lw.resourcePrefix, options.ResourceVersion, Everything)
}

3.9 NewRawStorage函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/registry/gereric/registry/storage_factory.go
往回找到,最終去向是etcd,應(yīng)該是k8s1.6版本從v2更新至v3

func NewRawStorage(config *storagebackend.Config) (storage.Interface, factory.DestroyFunc) {
    s, d, err := factory.Create(*config)
    
    return s, d
}
 
func Create(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    switch c.Type {
    case storagebackend.StorageTypeETCD2:
        return newETCD2Storage(c)
    case storagebackend.StorageTypeUnset, storagebackend.StorageTypeETCD3:
        
        return newETCD3Storage(c)
    default:
        return nil, nil, fmt.Errorf("unknown storage type: %s", c.Type)
    }
}

3.10 newETCD3Storage函數(shù)
路徑: vendor/k8s.io/apiserver/pkg/storage/storagebackend/factory/etcd2.go
對(duì)就是etcdv3 client實(shí)現(xiàn)了list watch接口

func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, error) {
    
    client, err := clientv3.New(cfg)
 
    if c.Quorum {
        return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
    }
    return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
 
// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error 
 
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursi
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀(guān)點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容