kubernetes 中 informer 的使用

一、kubernetes 集群的幾種訪問方式

在實(shí)際開發(fā)過程中,若想要獲取 kubernetes 中某個(gè)資源(比如 pod)的所有對象,可以使用 kubectl、k8s REST API、client-go(ClientSet、Dynamic Client、RESTClient 三種方式) 等多種方式訪問 k8s 集群獲取資源。在筆者的開發(fā)過程中,最初都是直接調(diào)用 k8s 的 REST API 來獲取的,使用 kubectl get pod -v=9 可以直接看到調(diào)用 k8s 的接口,然后在程序中直接訪問還是比較方便的。但是隨著集群規(guī)模的增長或者從國內(nèi)獲取海外 k8s 集群的數(shù)據(jù),直接調(diào)用 k8s 接口獲取所有 pod 還是比較耗時(shí),這個(gè)問題有多種解決方法,最初是直接使用 k8s 原生的 watch 接口來獲取的,下面是一個(gè)偽代碼:

const (
    ADDED    string = "ADDED"
    MODIFIED string = "MODIFIED"
    DELETED  string = "DELETED"
    ERROR    string = "ERROR"
)

type Event struct {
    Type   string          `json:"type"`
    Object json.RawMessage `json:"object"`
}

func main() {
    resp, err := http.Get("http://apiserver:8080/api/v1/watch/pods?watch=yes")
    if err != nil {
        // ...
    }
    decoder := json.NewDecoder(resp.Body)
    for {
        var event Event
        err = decoder.Decode(&event)
        if err != nil {
            // ...
        }
        switch event.Type {
        case ADDED, MODIFIED:
            // ...
        case DELETED:
            // ...
        case ERROR:
            // ...
        }
    }
}

調(diào)用 watch 接口后會(huì)先將所有的對象 list 一次,然后 apiserver 會(huì)將變化的數(shù)據(jù)推送到 client 端,可以看到每次對于 watch 到的事件都需要判斷后進(jìn)行處理,然后將處理后的結(jié)果寫入到本地的緩存中,原生的 watch 操作還是非常麻煩的。后來了解到官方推出一個(gè)客戶端工具 client-go ,client-go 中的 Informer 對 watch 操作做了封裝,使用起來非常方便,下面會(huì)主要介紹一下 client-go 的使用。

二、Informer 的機(jī)制

cient-go 是從 k8s 代碼中抽出來的一個(gè)客戶端工具,Informer 是 client-go 中的核心工具包,已經(jīng)被 kubernetes 中眾多組件所使用。所謂 Informer,其實(shí)就是一個(gè)帶有本地緩存和索引機(jī)制的、可以注冊 EventHandler 的 client,本地緩存被稱為 Store,索引被稱為 Index。使用 informer 的目的是為了減輕 apiserver 數(shù)據(jù)交互的壓力而抽象出來的一個(gè) cache 層, 客戶端對 apiserver 數(shù)據(jù)的 "讀取" 和 "監(jiān)聽" 操作都通過本地 informer 進(jìn)行。Informer 實(shí)例的Lister()方法可以直接查找緩存在本地內(nèi)存中的數(shù)據(jù)。

Informer 的主要功能:

  • 同步數(shù)據(jù)到本地緩存
  • 根據(jù)對應(yīng)的事件類型,觸發(fā)事先注冊好的 ResourceEventHandler
1、Informer 中幾個(gè)組件的作用

Informer 中主要有 Reflector、Delta FIFO Queue、Local Store、WorkQueue 幾個(gè)組件。以下是 Informer 的工作流程圖。

Informer 組件

根據(jù)流程圖來解釋一下 Informer 中幾個(gè)組件的作用:

  • Reflector:稱之為反射器,實(shí)現(xiàn)對 apiserver 指定類型對象的監(jiān)控(ListAndWatch),其中反射實(shí)現(xiàn)的就是把監(jiān)控的結(jié)果實(shí)例化成具體的對象,最終也是調(diào)用 Kubernetes 的 List/Watch API;

  • DeltaIFIFO Queue:一個(gè)增量隊(duì)列,將 Reflector 監(jiān)控變化的對象形成一個(gè) FIFO 隊(duì)列,此處的 Delta 就是變化;

  • LocalStore:就是 informer 的 cache,這里面緩存的是 apiserver 中的對象(其中有一部分可能還在DeltaFIFO 中),此時(shí)使用者再查詢對象的時(shí)候就直接從 cache 中查找,減少了 apiserver 的壓力,LocalStore 只會(huì)被 Lister 的 List/Get 方法訪問。

  • WorkQueue:DeltaIFIFO 收到事件后會(huì)先將事件存儲(chǔ)在自己的數(shù)據(jù)結(jié)構(gòu)中,然后直接操作 Store 中存儲(chǔ)的數(shù)據(jù),更新完 store 后 DeltaIFIFO 會(huì)將該事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件會(huì)根據(jù)對應(yīng)的類型觸發(fā)對應(yīng)的回調(diào)函數(shù)。

2、Informer 的工作流程
    1. Informer 首先會(huì) list/watch apiserver,Informer 所使用的 Reflector 包負(fù)責(zé)與 apiserver 建立連接,Reflector 使用 ListAndWatch 的方法,會(huì)先從 apiserver 中 list 該資源的所有實(shí)例,list 會(huì)拿到該對象最新的 resourceVersion,然后使用 watch 方法監(jiān)聽該 resourceVersion 之后的所有變化,若中途出現(xiàn)異常,reflector 則會(huì)從斷開的 resourceVersion 處重現(xiàn)嘗試監(jiān)聽所有變化,一旦該對象的實(shí)例有創(chuàng)建、刪除、更新動(dòng)作,Reflector 都會(huì)收到"事件通知",這時(shí),該事件及它對應(yīng)的 API 對象這個(gè)組合,被稱為增量(Delta),它會(huì)被放進(jìn) DeltaFIFO 中。
    1. Informer 會(huì)不斷地從這個(gè) DeltaFIFO 中讀取增量,每拿出一個(gè)對象,Informer 就會(huì)判斷這個(gè)增量的時(shí)間類型,然后創(chuàng)建或更新本地的緩存,也就是 store。
    1. 如果事件類型是 Added(添加對象),那么 Informer 會(huì)通過 Indexer 的庫把這個(gè)增量里的 API 對象保存到本地的緩存中,并為它創(chuàng)建索引,若為刪除操作,則在本地緩存中刪除該對象。
    1. DeltaFIFO 再 pop 這個(gè)事件到 controller 中,controller 會(huì)調(diào)用事先注冊的 ResourceEventHandler 回調(diào)函數(shù)進(jìn)行處理。
    1. 在 ResourceEventHandler 回調(diào)函數(shù)中,其實(shí)只是做了一些很簡單的過濾,然后將關(guān)心變更的 Object 放到 workqueue 里面。
    1. Controller 從 workqueue 里面取出 Object,啟動(dòng)一個(gè) worker 來執(zhí)行自己的業(yè)務(wù)邏輯,業(yè)務(wù)邏輯通常是計(jì)算目前集群的狀態(tài)和用戶希望達(dá)到的狀態(tài)有多大的區(qū)別,然后孜孜不倦地讓 apiserver 將狀態(tài)演化到用戶希望達(dá)到的狀態(tài),比如為 deployment 創(chuàng)建新的 pods,或者是擴(kuò)容/縮容 deployment。
    1. 在worker中就可以使用 lister 來獲取 resource,而不用頻繁的訪問 apiserver,因?yàn)?apiserver 中 resource 的變更都會(huì)反映到本地的 cache 中。

Informer 在使用時(shí)需要先初始化一個(gè) InformerFactory,目前主要推薦使用的是 SharedInformerFactory,Shared 指的是在多個(gè) Informer 中共享一個(gè)本地 cache。

Informer 中的 ResourceEventHandler 函數(shù)有三種:

// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
    AddFunc    func(obj interface{})
    UpdateFunc func(oldObj, newObj interface{})
    DeleteFunc func(obj interface{})
}

這三種函數(shù)的處理邏輯是用戶自定義的,在初始化 controller 時(shí)注冊完 ResourceEventHandler 后,一旦該對象的實(shí)例有創(chuàng)建、刪除、更新三中操作后就會(huì)觸發(fā)對應(yīng)的 ResourceEventHandler。

三、Informer 使用示例

在實(shí)際的開發(fā)工作中,Informer 主要用在兩處:

  • 在訪問 k8s apiserver 的客戶端作為一個(gè) client 緩存對象使用;
  • 在一些自定義 controller 中使用,比如 operator 的開發(fā);

1、下面是一個(gè)作為 client 的使用示例:

package main

import (
    "flag"
    "fmt"
    "log"
    "path/filepath"

    corev1 "k8s.io/api/core/v1"
    "k8s.io/apimachinery/pkg/labels"
    "k8s.io/apimachinery/pkg/util/runtime"

    "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/cache"
    "k8s.io/client-go/tools/clientcmd"
    "k8s.io/client-go/util/homedir"
)

func main() {
    var kubeconfig *string
    if home := homedir.HomeDir(); home != "" {
        kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
    } else {
        kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
    }
    flag.Parse()

    config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
    if err != nil {
        panic(err)
    }

    // 初始化 client
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        log.Panic(err.Error())
    }

    stopper := make(chan struct{})
    defer close(stopper)
    
    // 初始化 informer
    factory := informers.NewSharedInformerFactory(clientset, 0)
    nodeInformer := factory.Core().V1().Nodes()
    informer := nodeInformer.Informer()
    defer runtime.HandleCrash()
    
    // 啟動(dòng) informer,list & watch
    go factory.Start(stopper)
    
    // 從 apiserver 同步資源,即 list 
    if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
        runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
        return
    }

    // 使用自定義 handler
    informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    onAdd,
        UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
        DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
    })
    
    // 創(chuàng)建 lister
    nodeLister := nodeInformer.Lister()
    // 從 lister 中獲取所有 items
    nodeList, err := nodeLister.List(labels.Everything())
    if err != nil {
        fmt.Println(err)
    }
    fmt.Println("nodelist:", nodeList)
    <-stopper
}

func onAdd(obj interface{}) {
    node := obj.(*corev1.Node)
    fmt.Println("add a node:", node.Name)
}

Shared指的是多個(gè) lister 共享同一個(gè)cache,而且資源的變化會(huì)同時(shí)通知到cache和 listers。這個(gè)解釋和上面圖所展示的內(nèi)容的是一致的,cache我們在Indexer的介紹中已經(jīng)分析過了,lister 指的就是OnAdd、OnUpdate、OnDelete 這些回調(diào)函數(shù)背后的對象。

2、以下是作為 controller 使用的一個(gè)整體工作流程

(1) 創(chuàng)建一個(gè)控制器

  • 為控制器創(chuàng)建 workqueue
  • 創(chuàng)建 informer, 為 informer 添加 callback 函數(shù),創(chuàng)建 lister

(2) 啟動(dòng)控制器

  • 啟動(dòng) informer
  • 等待本地 cache sync 完成后, 啟動(dòng) workers

(3) 當(dāng)收到變更事件后,執(zhí)行 callback

  • 等待事件觸發(fā)
  • 從事件中獲取變更的 Object
  • 做一些必要的檢查
  • 生成 object key,一般是 namespace/name 的形式
  • 將 key 放入 workqueue 中

(4) worker loop

  • 等待從 workqueue 中獲取到 item,一般為 object key
  • 用 object key 通過 lister 從本地 cache 中獲取到真正的 object 對象
  • 做一些檢查
  • 執(zhí)行真正的業(yè)務(wù)邏輯
  • 處理下一個(gè) item

下面是自定義 controller 使用的一個(gè)參考:

var (
    masterURL  string
    kubeconfig string
)

func init() {
    flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}

func main() {
    flag.Parse()

    stopCh := signals.SetupSignalHandler()

    cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
    if err != nil {
        glog.Fatalf("Error building kubeconfig: %s", err.Error())
    }

    kubeClient, err := kubernetes.NewForConfig(cfg)
    if err != nil {
        glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    // 所謂 Informer,其實(shí)就是一個(gè)帶有本地緩存和索引機(jī)制的、可以注冊 EventHandler 的 client
    // informer watch apiserver,每隔 30 秒 resync 一次(list)
    kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)

    controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())

    //  啟動(dòng) informer
    go kubeInformerFactory.Start(stopCh)

     // start controller 
    if err = controller.Run(2, stopCh); err != nil {
        glog.Fatalf("Error running controller: %s", err.Error())
    }
}


// NewController returns a new network controller
func NewController(
    kubeclientset kubernetes.Interface,
    networkclientset clientset.Interface,
    networkInformer informers.NetworkInformer) *Controller {

    // Create event broadcaster
    // Add sample-controller types to the default Kubernetes Scheme so Events can be
    // logged for sample-controller types.
    utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
    glog.V(4).Info("Creating event broadcaster")
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(glog.Infof)
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})

    controller := &Controller{
        kubeclientset:    kubeclientset,
        networkclientset: networkclientset,
        networksLister:   networkInformer.Lister(),
        networksSynced:   networkInformer.Informer().HasSynced,
        workqueue:        workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
        recorder:         recorder,
    }

    glog.Info("Setting up event handlers")
    // Set up an event handler for when Network resources change
    networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: controller.enqueueNetwork,
        UpdateFunc: func(old, new interface{}) {
            oldNetwork := old.(*samplecrdv1.Network)
            newNetwork := new.(*samplecrdv1.Network)
            if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
                // Periodic resync will send update events for all known Networks.
                // Two different versions of the same Network will always have different RVs.
                return
            }
            controller.enqueueNetwork(new)
        },
        DeleteFunc: controller.enqueueNetworkForDelete,
    })

    return controller
}

自定義 controller 的詳細(xì)使用方法可以參考:k8s-controller-custom-resource

四、使用中的一些問題

1、Informer 二級緩存中的同步問題

雖然 Informer 和 Kubernetes 之間沒有 resync 機(jī)制,但 Informer 內(nèi)部的這兩級緩存 DeltaIFIFO 和 LocalStore 之間會(huì)存在 resync 機(jī)制,k8s 中 kube-controller-manager 的 StatefulSetController 中使用了兩級緩存的 resync 機(jī)制(如下圖所示),我們在生產(chǎn)環(huán)境中發(fā)現(xiàn) sts 創(chuàng)建后過了很久 pod 才會(huì)創(chuàng)建,主要是由于 StatefulSetController 的兩級緩存之間 30s 會(huì)同步一次,由于 StatefulSetController watch 到變化后就會(huì)把對應(yīng)的 sts 放入 DeltaIFIFO 中,且每隔30s會(huì)把 LocalStore 中全部的 sts 重新入一遍 DeltaIFIFO,入隊(duì)時(shí)會(huì)做一些處理,過濾掉一些不需要重復(fù)入隊(duì)列的 sts,若間隔的 30s 內(nèi)沒有處理完隊(duì)列中所有的 sts,則待處理隊(duì)列中始終存在未處理完的 sts,并且在同步過程中產(chǎn)生的 sts 會(huì)加到隊(duì)列的尾部,新加入隊(duì)尾的 sts 只能等到前面的 sts 處理完成(也就是 resync 完成)才會(huì)被處理,所以導(dǎo)致的現(xiàn)象就是 sts 創(chuàng)建后過了很久 pod 才會(huì)創(chuàng)建。

優(yōu)化的方法就是去掉二級緩存的同步策略(將 setInformer.Informer().AddEventHandlerWithResyncPeriod() 改為 informer.AddEventHandler())或者調(diào)大同步周期,但是在研究 kube-controller-manager 其他 controller 時(shí)發(fā)現(xiàn)并不是所有的 controller 都有同步策略,社區(qū)也有相關(guān)的 issue 反饋了這一問題,Remove resync period for sset controller,社區(qū)也會(huì)在以后的版本中去掉兩級緩存之間的 resync 策略。

k8s.io/kubernetes/pkg/controller/statefulset/stateful_set.go

kube-controller-manager sts controller
2、使用 Informer 如何監(jiān)聽所有資源對象?

一個(gè) Informer 實(shí)例只能監(jiān)聽一種 resource,每個(gè) resource 需要?jiǎng)?chuàng)建對應(yīng)的 Informer 實(shí)例。

3、為什么不是使用 workqueue?

建議使用 RateLimitingQueue,它相比普通的 workqueue 多了以下的功能:

  • 限流:可以限制一個(gè) item 被 reenqueued 的次數(shù)。
  • 防止 hot loop:它保證了一個(gè) item 被 reenqueued 后,不會(huì)馬上被處理。

五、總結(jié)

本文介紹了 client-go 包中核心組件 Informer 的原理以及使用方法,Informer 主要功能是緩存對象到本地以及根據(jù)對應(yīng)的事件類型觸發(fā)已注冊好的 ResourceEventHandler,其主要用在訪問 k8s apiserver 的客戶端和 operator 中。

參考:

如何用 client-go 拓展 Kubernetes 的 API

https://www.kubernetes.org.cn/2693.html

Kubernetes 大咖秀徐超《使用 client-go 控制原生及拓展的 Kubernetes API》

Use prometheus conventions for workqueue metrics

深入淺出kubernetes之client-go的workqueue

https://gianarb.it/blog/kubernetes-shared-informer

理解 K8S 的設(shè)計(jì)精髓之 List-Watch機(jī)制和Informer模塊

https://ranler.org/notes/file/528

Kubernetes Client-go Informer 源碼分析

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

友情鏈接更多精彩內(nèi)容