client-go中的SharedInformerFactory機制

Table of Contents

1.章節(jié)介紹

本章首先介紹SharedInformerFactory,了解其組成和作用。

然后以Podinformer為例,了解一個資源實例的Informer應(yīng)該需要實現(xiàn)哪些函數(shù)。

本節(jié)并沒有設(shè)計到具體圖中的informer機制,只是從大的入口入手,看看SharedInformerFactory到底是什么

image.png


2. SharedInformerFactory

SharedInformerFactory封裝了NewSharedIndexInformer方法。字如其名,SharedInformerFactory使用的是工廠模式來生成各類的Informer。無論是k8s控制器,還是自定義控制器, SharedInformerFactory都是非常重要的一環(huán)。所以首先分析SharedInformerFactory。這里以一個實例入手分析SharedInformerFactory。

2.1 SharedInformerFactory實例介紹

package main

import (
    "fmt"
    clientset "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/informers"
    "k8s.io/client-go/tools/cache"
    "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "time"
)

func main()  {
    config := &rest.Config{
        Host: "http://172.21.0.16:8080",
    }
    client := clientset.NewForConfigOrDie(config)
    // 生成SharedInformerFactory
    factory := informers.NewSharedInformerFactory(client, 5 * time.Second)
    // 生成PodInformer
    podInformer := factory.Core().V1().Pods()
    // 獲得一個cache.SharedIndexInformer 單例模式
    sharedInformer := podInformer.Informer()

    //注冊add, update, del處理事件
    sharedInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    func(obj interface{}) {fmt.Printf("add: %v\n", obj.(*v1.Pod).Name)},
        UpdateFunc: func(oldObj, newObj interface{}) {fmt.Printf("update: %v\n", newObj.(*v1.Pod).Name)},
        DeleteFunc: func(obj interface{}){fmt.Printf("delete: %v\n", obj.(*v1.Pod).Name)},
    })

    stopCh := make(chan struct{})

    // 第一種方式
    // 可以這樣啟動  也可以按照下面的方式啟動
    // go sharedInformer.Run(stopCh)
    // time.Sleep(2 * time.Second)

    // 第二種方式,這種方式是啟動factory下面所有的informer
    factory.Start(stopCh)
    factory.WaitForCacheSync(stopCh)

    pods, _ := podInformer.Lister().Pods("default").List(labels.Everything())

    for _, p := range pods {
        fmt.Printf("list pods: %v\n", p.Name)
    }
    <- stopCh
}

2.2 sharedInformerFactory結(jié)構(gòu)體

type sharedInformerFactory struct {
  // client客戶端
    client           kubernetes.Interface            
    // sharedInformerFactory是沒有namespaces限制的。不過可以設(shè)置namespaces限制該factory后面的informer都是指定namespaces的
    namespace        string          
  // TweakListOptionsFunc其實就是ListOptions,這個是針對所有Informer List生效的 (WithTweakListOptions可以看出來)
    tweakListOptions internalinterfaces.TweakListOptionsFunc
    lock             sync.Mutex
    // 這個是list默認定期同步的時間間隔
    defaultResync    time.Duration
    // 每種informer還可以自定義
    customResync     map[reflect.Type]time.Duration
  
  // 屬于該factory下面的所有的informer
    informers map[reflect.Type]cache.SharedIndexInformer
    // startedInformers is used for tracking which informers have been started.
    // This allows Start() to be called multiple times safely.
    // 判斷informer是否已經(jīng) Run起來了
    startedInformers map[reflect.Type]bool   
}


2.3 sharedInformerFactory成員函數(shù)

定義customResync
// WithCustomResyncConfig sets a custom resync period for the specified informer types.
func WithCustomResyncConfig(resyncConfig map[v1.Object]time.Duration) SharedInformerOption 

定義tweakListOptions
// WithTweakListOptions sets a custom filter on all listers of the configured SharedInformerFactory.
func WithTweakListOptions(tweakListOptions internalinterfaces.TweakListOptionsFunc) SharedInformerOption 

定義namespaces
// WithNamespace limits the SharedInformerFactory to the specified namespace.
func WithNamespace(namespace string) SharedInformerOption 

// start所有的informer
// Start initializes all requested informers.
func (f *sharedInformerFactory) Start(stopCh <-chan struct{}) {
    f.lock.Lock()
    defer f.lock.Unlock()

    for informerType, informer := range f.informers {
        if !f.startedInformers[informerType] {
            go informer.Run(stopCh)
            f.startedInformers[informerType] = true
        }
    }
}

// WaitForCacheSync讓所有的informers同步cache。一般informer.run函數(shù)中都有一個這樣的語句。先等cache同步。這個的含義就是等list完了的數(shù)據(jù),全部轉(zhuǎn)換到cache中去。
    // Wait for all involved caches to be synced, before processing items from the queue is started
    if !cache.WaitForCacheSync(stopCh, ctrl.Informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }
    
// WaitForCacheSync waits for all started informers' cache were synced.
func (f *sharedInformerFactory) WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool {
    informers := func() map[reflect.Type]cache.SharedIndexInformer {
        f.lock.Lock()
        defer f.lock.Unlock()

        informers := map[reflect.Type]cache.SharedIndexInformer{}
        for informerType, informer := range f.informers {
            if f.startedInformers[informerType] {
                informers[informerType] = informer
            }
        }
        return informers
    }()

    res := map[reflect.Type]bool{}
    for informType, informer := range informers {
        res[informType] = cache.WaitForCacheSync(stopCh, informer.HasSynced)
    }
    return res
}


// InternalInformerFor returns the SharedIndexInformer for obj using an internal
// client.
func (f *sharedInformerFactory) InformerFor(obj runtime.Object, newFunc internalinterfaces.NewInformerFunc) cache.SharedIndexInformer {
    f.lock.Lock()
    defer f.lock.Unlock()

    informerType := reflect.TypeOf(obj)
    informer, exists := f.informers[informerType]
    // 如果存在同類的,直接返回,不會再new一個。這里的type就是 pod/deploy
    if exists {
        return informer
    }

    resyncPeriod, exists := f.customResync[informerType]
    if !exists {
        resyncPeriod = f.defaultResync
    }

    informer = newFunc(f.client, resyncPeriod)
    f.informers[informerType] = informer

    return informer
}

// SharedInformerFactory provides shared informers for resources in all known
// API group versions.
type SharedInformerFactory interface {
    internalinterfaces.SharedInformerFactory
    ForResource(resource schema.GroupVersionResource) (GenericInformer, error)
    WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool

  // 提供k8s內(nèi)置資源的定義接口,從這里可以看出來
    Admissionregistration() admissionregistration.Interface 
    Apps() apps.Interface
    Autoscaling() autoscaling.Interface
    Batch() batch.Interface
    Certificates() certificates.Interface
    Coordination() coordination.Interface
    Core() core.Interface
    Events() events.Interface
    Extensions() extensions.Interface
    Networking() networking.Interface
    Policy() policy.Interface
    Rbac() rbac.Interface
    Scheduling() scheduling.Interface
    Settings() settings.Interface
    Storage() storage.Interface
}

// 例如core組下面的資源,f.Core().v1.pods() 就是這個
func (f *sharedInformerFactory) Core() core.Interface {
    return core.New(f, f.namespace, f.tweakListOptions)
}


2.4 總結(jié)

通過對sharedInformerFactory的成員和函數(shù)介紹,了解到:

(1)factory就是提供了一個構(gòu)造informer的入口,里面包含了一堆Informer

(2)同一中資源類型共用一個Infomer。這樣的話就可以節(jié)省不必要的資源。例如kcm中,rs可以需要監(jiān)聽pod資源,gc也需要監(jiān)聽Pod資源,通過factory機制就可以使用同一個

(3)但是監(jiān)聽同一種類型的資源,但是不同的listOption看起來也是不行,例如一個Informer監(jiān)聽running的pod,一個Informer監(jiān)聽error的Pod, 是需要多個factory。

3. podInformer

從上訴可以看出來,sharedInformerFactory只是一個入口。接下來以podInformer為例,看看一個具體的資源Informer需要實現(xiàn)哪些功能。

3.1 PodInformer結(jié)構(gòu)體

// PodInformer provides access to a shared informer and lister for
// Pods.
// 只需要實現(xiàn)Informer,Lister函數(shù)
type PodInformer interface {
    Informer() cache.SharedIndexInformer     
    Lister() v1.PodLister
}

type podInformer struct {
    factory          internalinterfaces.SharedInformerFactory   //  是哪一個factory生成的informer
    tweakListOptions internalinterfaces.TweakListOptionsFunc    //  有哪些filter
    namespace        string                                     //  命名空間
}

3.2 PodInformer成員函數(shù)

從函數(shù)定義可以看出來,informer其實就是 cache.SharedIndexInformer

New SharedIndexInformer的時候指定了ListWatch函數(shù)。

listFunc: client.CoreV1().Pods(namespace).List(options)

WatchFunc: client.CoreV1().Pods(namespace).Watch(options)

所以從結(jié)構(gòu)體上推測:

(1) informer最終都是 cache.SharedIndexInformer。但是 cache.SharedIndexInformer需要先定義好list, watch函數(shù)

(2)cache.SharedIndexInformer里面的index就是存儲+查詢。根據(jù)定義好的list, watch更新index的數(shù)據(jù)

接下來繼續(xù)看看cache.SharedIndexInformer是如何實現(xiàn)的。

// NewPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, namespace, resyncPeriod, indexers, nil)
}

// NewFilteredPodInformer constructs a new informer for Pod type.
// Always prefer using an informer factory to get a shared informer instead of getting an independent
// one. This reduces memory footprint and number of connections to the server.
func NewFilteredPodInformer(client kubernetes.Interface, namespace string, resyncPeriod time.Duration, indexers cache.Indexers, tweakListOptions internalinterfaces.TweakListOptionsFunc) cache.SharedIndexInformer {
    return cache.NewSharedIndexInformer(
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(options)
            },
        },
        &corev1.Pod{},
        resyncPeriod,
        indexers,
    )
}

// 默認只有namespaces這個indexer
func (f *podInformer) defaultInformer(client kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
    return NewFilteredPodInformer(client, f.namespace, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, f.tweakListOptions)
}


func (f *podInformer) Informer() cache.SharedIndexInformer {
    return f.factory.InformerFor(&corev1.Pod{}, f.defaultInformer)
}

// 返回Lister數(shù)據(jù), 這里是從index里面獲取,而不是從apiserver中獲取
func (f *podInformer) Lister() v1.PodLister {
    return v1.NewPodLister(f.Informer().GetIndexer())
}

cache中的index定義
k8s.io/client-go/tools/cache/index.go
// Indexer is a storage interface that lets you list objects using multiple indexing functions
type Indexer interface {
    Store
    // Retrieve list of objects that match on the named indexing function
    Index(indexName string, obj interface{}) ([]interface{}, error)
    // IndexKeys returns the set of keys that match on the named indexing function.
    IndexKeys(indexName, indexKey string) ([]string, error)
    // ListIndexFuncValues returns the list of generated values of an Index func
    ListIndexFuncValues(indexName string) []string
    // ByIndex lists object that match on the named indexing function with the exact key
    ByIndex(indexName, indexKey string) ([]interface{}, error)
    // GetIndexer return the indexers
    GetIndexers() Indexers

    // AddIndexers adds more indexers to this store.  If you call this after you already have data
    // in the store, the results are undefined.
    AddIndexers(newIndexers Indexers) error
}

4.總結(jié)

(1)factory就是提供了一個構(gòu)造informer的入口,里面包含了一堆Informer

(2)同一中資源類型共用一個Infomer。這樣的話就可以節(jié)省不必要的資源。例如kcm中,rs可以需要監(jiān)聽pod資源,gc也需要監(jiān)聽Pod資源,通過factory機制就可以使用同一個

(3)但是監(jiān)聽同一種類型的資源,但是不同的listOption看起來也是不行,例如一個Informer監(jiān)聽running的pod,一個Informer監(jiān)聽error的Pod, 是需要多個factory。

(4)當前factory并沒有利用到圖中表示Informer機制。最終是cache.SharedIndexInformer 包含了所有的參數(shù),實現(xiàn)了上訴圖中的Informer機制。下一節(jié)開始介紹cache.SharedIndexInformer

?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結(jié)合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容