一、kubernetes 集群的幾種訪問方式
在實(shí)際開發(fā)過程中,若想要獲取 kubernetes 中某個(gè)資源(比如 pod)的所有對象,可以使用 kubectl、k8s REST API、client-go(ClientSet、Dynamic Client、RESTClient 三種方式) 等多種方式訪問 k8s 集群獲取資源。在筆者的開發(fā)過程中,最初都是直接調(diào)用 k8s 的 REST API 來獲取的,使用 kubectl get pod -v=9 可以直接看到調(diào)用 k8s 的接口,然后在程序中直接訪問還是比較方便的。但是隨著集群規(guī)模的增長或者從國內(nèi)獲取海外 k8s 集群的數(shù)據(jù),直接調(diào)用 k8s 接口獲取所有 pod 還是比較耗時(shí),這個(gè)問題有多種解決方法,最初是直接使用 k8s 原生的 watch 接口來獲取的,下面是一個(gè)偽代碼:
const (
ADDED string = "ADDED"
MODIFIED string = "MODIFIED"
DELETED string = "DELETED"
ERROR string = "ERROR"
)
type Event struct {
Type string `json:"type"`
Object json.RawMessage `json:"object"`
}
func main() {
resp, err := http.Get("http://apiserver:8080/api/v1/watch/pods?watch=yes")
if err != nil {
// ...
}
decoder := json.NewDecoder(resp.Body)
for {
var event Event
err = decoder.Decode(&event)
if err != nil {
// ...
}
switch event.Type {
case ADDED, MODIFIED:
// ...
case DELETED:
// ...
case ERROR:
// ...
}
}
}
調(diào)用 watch 接口后會(huì)先將所有的對象 list 一次,然后 apiserver 會(huì)將變化的數(shù)據(jù)推送到 client 端,可以看到每次對于 watch 到的事件都需要判斷后進(jìn)行處理,然后將處理后的結(jié)果寫入到本地的緩存中,原生的 watch 操作還是非常麻煩的。后來了解到官方推出一個(gè)客戶端工具 client-go ,client-go 中的 Informer 對 watch 操作做了封裝,使用起來非常方便,下面會(huì)主要介紹一下 client-go 的使用。
二、Informer 的機(jī)制
cient-go 是從 k8s 代碼中抽出來的一個(gè)客戶端工具,Informer 是 client-go 中的核心工具包,已經(jīng)被 kubernetes 中眾多組件所使用。所謂 Informer,其實(shí)就是一個(gè)帶有本地緩存和索引機(jī)制的、可以注冊 EventHandler 的 client,本地緩存被稱為 Store,索引被稱為 Index。使用 informer 的目的是為了減輕 apiserver 數(shù)據(jù)交互的壓力而抽象出來的一個(gè) cache 層, 客戶端對 apiserver 數(shù)據(jù)的 "讀取" 和 "監(jiān)聽" 操作都通過本地 informer 進(jìn)行。Informer 實(shí)例的Lister()方法可以直接查找緩存在本地內(nèi)存中的數(shù)據(jù)。
Informer 的主要功能:
- 同步數(shù)據(jù)到本地緩存
- 根據(jù)對應(yīng)的事件類型,觸發(fā)事先注冊好的 ResourceEventHandler
1、Informer 中幾個(gè)組件的作用
Informer 中主要有 Reflector、Delta FIFO Queue、Local Store、WorkQueue 幾個(gè)組件。以下是 Informer 的工作流程圖。

根據(jù)流程圖來解釋一下 Informer 中幾個(gè)組件的作用:
Reflector:稱之為反射器,實(shí)現(xiàn)對 apiserver 指定類型對象的監(jiān)控(ListAndWatch),其中反射實(shí)現(xiàn)的就是把監(jiān)控的結(jié)果實(shí)例化成具體的對象,最終也是調(diào)用 Kubernetes 的 List/Watch API;
DeltaIFIFO Queue:一個(gè)增量隊(duì)列,將 Reflector 監(jiān)控變化的對象形成一個(gè) FIFO 隊(duì)列,此處的 Delta 就是變化;
LocalStore:就是 informer 的 cache,這里面緩存的是 apiserver 中的對象(其中有一部分可能還在DeltaFIFO 中),此時(shí)使用者再查詢對象的時(shí)候就直接從 cache 中查找,減少了 apiserver 的壓力,LocalStore 只會(huì)被 Lister 的 List/Get 方法訪問。
WorkQueue:DeltaIFIFO 收到事件后會(huì)先將事件存儲(chǔ)在自己的數(shù)據(jù)結(jié)構(gòu)中,然后直接操作 Store 中存儲(chǔ)的數(shù)據(jù),更新完 store 后 DeltaIFIFO 會(huì)將該事件 pop 到 WorkQueue 中,Controller 收到 WorkQueue 中的事件會(huì)根據(jù)對應(yīng)的類型觸發(fā)對應(yīng)的回調(diào)函數(shù)。
2、Informer 的工作流程
- Informer 首先會(huì) list/watch apiserver,Informer 所使用的 Reflector 包負(fù)責(zé)與 apiserver 建立連接,Reflector 使用 ListAndWatch 的方法,會(huì)先從 apiserver 中 list 該資源的所有實(shí)例,list 會(huì)拿到該對象最新的 resourceVersion,然后使用 watch 方法監(jiān)聽該 resourceVersion 之后的所有變化,若中途出現(xiàn)異常,reflector 則會(huì)從斷開的 resourceVersion 處重現(xiàn)嘗試監(jiān)聽所有變化,一旦該對象的實(shí)例有創(chuàng)建、刪除、更新動(dòng)作,Reflector 都會(huì)收到"事件通知",這時(shí),該事件及它對應(yīng)的 API 對象這個(gè)組合,被稱為增量(Delta),它會(huì)被放進(jìn) DeltaFIFO 中。
- Informer 會(huì)不斷地從這個(gè) DeltaFIFO 中讀取增量,每拿出一個(gè)對象,Informer 就會(huì)判斷這個(gè)增量的時(shí)間類型,然后創(chuàng)建或更新本地的緩存,也就是 store。
- 如果事件類型是 Added(添加對象),那么 Informer 會(huì)通過 Indexer 的庫把這個(gè)增量里的 API 對象保存到本地的緩存中,并為它創(chuàng)建索引,若為刪除操作,則在本地緩存中刪除該對象。
- DeltaFIFO 再 pop 這個(gè)事件到 controller 中,controller 會(huì)調(diào)用事先注冊的 ResourceEventHandler 回調(diào)函數(shù)進(jìn)行處理。
- 在 ResourceEventHandler 回調(diào)函數(shù)中,其實(shí)只是做了一些很簡單的過濾,然后將關(guān)心變更的 Object 放到 workqueue 里面。
- Controller 從 workqueue 里面取出 Object,啟動(dòng)一個(gè) worker 來執(zhí)行自己的業(yè)務(wù)邏輯,業(yè)務(wù)邏輯通常是計(jì)算目前集群的狀態(tài)和用戶希望達(dá)到的狀態(tài)有多大的區(qū)別,然后孜孜不倦地讓 apiserver 將狀態(tài)演化到用戶希望達(dá)到的狀態(tài),比如為 deployment 創(chuàng)建新的 pods,或者是擴(kuò)容/縮容 deployment。
- 在worker中就可以使用 lister 來獲取 resource,而不用頻繁的訪問 apiserver,因?yàn)?apiserver 中 resource 的變更都會(huì)反映到本地的 cache 中。
Informer 在使用時(shí)需要先初始化一個(gè) InformerFactory,目前主要推薦使用的是 SharedInformerFactory,Shared 指的是在多個(gè) Informer 中共享一個(gè)本地 cache。
Informer 中的 ResourceEventHandler 函數(shù)有三種:
// ResourceEventHandlerFuncs is an adaptor to let you easily specify as many or
// as few of the notification functions as you want while still implementing
// ResourceEventHandler.
type ResourceEventHandlerFuncs struct {
AddFunc func(obj interface{})
UpdateFunc func(oldObj, newObj interface{})
DeleteFunc func(obj interface{})
}
這三種函數(shù)的處理邏輯是用戶自定義的,在初始化 controller 時(shí)注冊完 ResourceEventHandler 后,一旦該對象的實(shí)例有創(chuàng)建、刪除、更新三中操作后就會(huì)觸發(fā)對應(yīng)的 ResourceEventHandler。
三、Informer 使用示例
在實(shí)際的開發(fā)工作中,Informer 主要用在兩處:
- 在訪問 k8s apiserver 的客戶端作為一個(gè) client 緩存對象使用;
- 在一些自定義 controller 中使用,比如 operator 的開發(fā);
1、下面是一個(gè)作為 client 的使用示例:
package main
import (
"flag"
"fmt"
"log"
"path/filepath"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 啟動(dòng) informer,list & watch
go factory.Start(stopper)
// 從 apiserver 同步資源,即 list
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定義 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
// 創(chuàng)建 lister
nodeLister := nodeInformer.Lister()
// 從 lister 中獲取所有 items
nodeList, err := nodeLister.List(labels.Everything())
if err != nil {
fmt.Println(err)
}
fmt.Println("nodelist:", nodeList)
<-stopper
}
func onAdd(obj interface{}) {
node := obj.(*corev1.Node)
fmt.Println("add a node:", node.Name)
}
Shared指的是多個(gè) lister 共享同一個(gè)cache,而且資源的變化會(huì)同時(shí)通知到cache和 listers。這個(gè)解釋和上面圖所展示的內(nèi)容的是一致的,cache我們在Indexer的介紹中已經(jīng)分析過了,lister 指的就是OnAdd、OnUpdate、OnDelete 這些回調(diào)函數(shù)背后的對象。
2、以下是作為 controller 使用的一個(gè)整體工作流程
(1) 創(chuàng)建一個(gè)控制器
- 為控制器創(chuàng)建 workqueue
- 創(chuàng)建 informer, 為 informer 添加 callback 函數(shù),創(chuàng)建 lister
(2) 啟動(dòng)控制器
- 啟動(dòng) informer
- 等待本地 cache sync 完成后, 啟動(dòng) workers
(3) 當(dāng)收到變更事件后,執(zhí)行 callback
- 等待事件觸發(fā)
- 從事件中獲取變更的 Object
- 做一些必要的檢查
- 生成 object key,一般是 namespace/name 的形式
- 將 key 放入 workqueue 中
(4) worker loop
- 等待從 workqueue 中獲取到 item,一般為 object key
- 用 object key 通過 lister 從本地 cache 中獲取到真正的 object 對象
- 做一些檢查
- 執(zhí)行真正的業(yè)務(wù)邏輯
- 處理下一個(gè) item
下面是自定義 controller 使用的一個(gè)參考:
var (
masterURL string
kubeconfig string
)
func init() {
flag.StringVar(&kubeconfig, "kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
}
func main() {
flag.Parse()
stopCh := signals.SetupSignalHandler()
cfg, err := clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
if err != nil {
glog.Fatalf("Error building kubeconfig: %s", err.Error())
}
kubeClient, err := kubernetes.NewForConfig(cfg)
if err != nil {
glog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
// 所謂 Informer,其實(shí)就是一個(gè)帶有本地緩存和索引機(jī)制的、可以注冊 EventHandler 的 client
// informer watch apiserver,每隔 30 秒 resync 一次(list)
kubeInformerFactory := informers.NewSharedInformerFactory(kubeClient, time.Second*30)
controller := controller.NewController(kubeClient, kubeInformerFactory.Core().V1().Nodes())
// 啟動(dòng) informer
go kubeInformerFactory.Start(stopCh)
// start controller
if err = controller.Run(2, stopCh); err != nil {
glog.Fatalf("Error running controller: %s", err.Error())
}
}
// NewController returns a new network controller
func NewController(
kubeclientset kubernetes.Interface,
networkclientset clientset.Interface,
networkInformer informers.NetworkInformer) *Controller {
// Create event broadcaster
// Add sample-controller types to the default Kubernetes Scheme so Events can be
// logged for sample-controller types.
utilruntime.Must(networkscheme.AddToScheme(scheme.Scheme))
glog.V(4).Info("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: controllerAgentName})
controller := &Controller{
kubeclientset: kubeclientset,
networkclientset: networkclientset,
networksLister: networkInformer.Lister(),
networksSynced: networkInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Networks"),
recorder: recorder,
}
glog.Info("Setting up event handlers")
// Set up an event handler for when Network resources change
networkInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: controller.enqueueNetwork,
UpdateFunc: func(old, new interface{}) {
oldNetwork := old.(*samplecrdv1.Network)
newNetwork := new.(*samplecrdv1.Network)
if oldNetwork.ResourceVersion == newNetwork.ResourceVersion {
// Periodic resync will send update events for all known Networks.
// Two different versions of the same Network will always have different RVs.
return
}
controller.enqueueNetwork(new)
},
DeleteFunc: controller.enqueueNetworkForDelete,
})
return controller
}
自定義 controller 的詳細(xì)使用方法可以參考:k8s-controller-custom-resource
四、使用中的一些問題
1、Informer 二級緩存中的同步問題
雖然 Informer 和 Kubernetes 之間沒有 resync 機(jī)制,但 Informer 內(nèi)部的這兩級緩存 DeltaIFIFO 和 LocalStore 之間會(huì)存在 resync 機(jī)制,k8s 中 kube-controller-manager 的 StatefulSetController 中使用了兩級緩存的 resync 機(jī)制(如下圖所示),我們在生產(chǎn)環(huán)境中發(fā)現(xiàn) sts 創(chuàng)建后過了很久 pod 才會(huì)創(chuàng)建,主要是由于 StatefulSetController 的兩級緩存之間 30s 會(huì)同步一次,由于 StatefulSetController watch 到變化后就會(huì)把對應(yīng)的 sts 放入 DeltaIFIFO 中,且每隔30s會(huì)把 LocalStore 中全部的 sts 重新入一遍 DeltaIFIFO,入隊(duì)時(shí)會(huì)做一些處理,過濾掉一些不需要重復(fù)入隊(duì)列的 sts,若間隔的 30s 內(nèi)沒有處理完隊(duì)列中所有的 sts,則待處理隊(duì)列中始終存在未處理完的 sts,并且在同步過程中產(chǎn)生的 sts 會(huì)加到隊(duì)列的尾部,新加入隊(duì)尾的 sts 只能等到前面的 sts 處理完成(也就是 resync 完成)才會(huì)被處理,所以導(dǎo)致的現(xiàn)象就是 sts 創(chuàng)建后過了很久 pod 才會(huì)創(chuàng)建。
優(yōu)化的方法就是去掉二級緩存的同步策略(將 setInformer.Informer().AddEventHandlerWithResyncPeriod() 改為 informer.AddEventHandler())或者調(diào)大同步周期,但是在研究 kube-controller-manager 其他 controller 時(shí)發(fā)現(xiàn)并不是所有的 controller 都有同步策略,社區(qū)也有相關(guān)的 issue 反饋了這一問題,Remove resync period for sset controller,社區(qū)也會(huì)在以后的版本中去掉兩級緩存之間的 resync 策略。
k8s.io/kubernetes/pkg/controller/statefulset/stateful_set.go

2、使用 Informer 如何監(jiān)聽所有資源對象?
一個(gè) Informer 實(shí)例只能監(jiān)聽一種 resource,每個(gè) resource 需要?jiǎng)?chuàng)建對應(yīng)的 Informer 實(shí)例。
3、為什么不是使用 workqueue?
建議使用 RateLimitingQueue,它相比普通的 workqueue 多了以下的功能:
- 限流:可以限制一個(gè) item 被 reenqueued 的次數(shù)。
- 防止 hot loop:它保證了一個(gè) item 被 reenqueued 后,不會(huì)馬上被處理。
五、總結(jié)
本文介紹了 client-go 包中核心組件 Informer 的原理以及使用方法,Informer 主要功能是緩存對象到本地以及根據(jù)對應(yīng)的事件類型觸發(fā)已注冊好的 ResourceEventHandler,其主要用在訪問 k8s apiserver 的客戶端和 operator 中。
參考:
如何用 client-go 拓展 Kubernetes 的 API
https://www.kubernetes.org.cn/2693.html
Kubernetes 大咖秀徐超《使用 client-go 控制原生及拓展的 Kubernetes API》
Use prometheus conventions for workqueue metrics
深入淺出kubernetes之client-go的workqueue
https://gianarb.it/blog/kubernetes-shared-informer
理解 K8S 的設(shè)計(jì)精髓之 List-Watch機(jī)制和Informer模塊