背景
什么是 Informer 機制
一個控制器每次需要獲取對象的時候都要訪問 APIServer,這會給系統(tǒng)帶來很高的負載,Informers 的內(nèi)存緩存就是來解決這個問題的,此外 Informers 還可以幾乎實時的監(jiān)控對象的變化,而不需要輪詢請求,這樣就可以保證客戶端的緩存數(shù)據(jù)和服務(wù)端的數(shù)據(jù)一致,就可以大大降低 APIServer 的壓力了。

如上圖展示了 Informer 的基本處理流程:
以 events 事件的方式從 APIServer 獲取數(shù)據(jù)
提供一個類似客戶端的 Lister 接口,從內(nèi)存緩存中 get 和 list 對象
為添加、刪除、更新注冊事件處理程序
此外 Informers 也有錯誤處理方式,當(dāng)長期運行的 watch 連接中斷時,它們會嘗試使用另一個 watch 請求來恢復(fù)連接,在不丟失任何事件的情況下恢復(fù)事件流。如果中斷的時間較長,而且 APIServer 丟失了事件(etcd 在新的 watch 請求成功之前從數(shù)據(jù)庫中清除了這些事件),那么 Informers 就會重新 List 全量數(shù)據(jù)。
而且在重新 List 全量操作的時候還可以配置一個重新同步的周期參數(shù),用于協(xié)調(diào)內(nèi)存緩存數(shù)據(jù)和業(yè)務(wù)邏輯的數(shù)據(jù)一致性,每次過了該周期后,注冊的事件處理程序就將被所有的對象調(diào)用,通常這個周期參數(shù)以分為單位,比如10分鐘或者30分鐘。
Informers 的這些高級特性以及超強的魯棒性,都足以讓我們不去直接使用客戶端的 Watch() 方法來處理自己的業(yè)務(wù)邏輯,而且在 Kubernetes 中也有很多地方都有使用到 Informers。但是在使用 Informers 的時候,通常每個 GroupVersionResource(GVR)只實例化一個 Informers,但是有時候我們在一個應(yīng)用中往往有使用多種資源對象的需求,這個時候為了方便共享 Informers,我們可以通過使用共享 Informer 工廠來實例化一個 Informer。
共享 Informer 工廠允許我們在應(yīng)用中為同一個資源共享 Informer,也就是說不同的控制器循環(huán)可以使用相同的 watch 連接到后臺的 APIServer,例如,kube-controller-manager 中的控制器數(shù)據(jù)量就非常多,但是對于每個資源(比如 Pod),在這個進程中只有一個 Informer。
Informer 是 client-go 中的核心工具包,已經(jīng)被 kubernetes 中眾多組件所使用。所謂 Informer,其實就是一個帶有本地緩存和索引機制的、可以注冊 EventHandler 的 client,本地緩存被稱為 Store,索引被稱為 Index。使用 informer 的目的是為了減輕 apiserver 數(shù)據(jù)交互的壓力而抽象出來的一個 cache 層, 客戶端對 apiserver 數(shù)據(jù)的 "讀取" 和 "監(jiān)聽" 操作都通過本地 informer 進行。
Informer 的主要功能:
同步數(shù)據(jù)到本地緩存
根據(jù)對應(yīng)的事件類型,觸發(fā)事先注冊好的 ResourceEventHandle
為什么需要 Informer 機制?
我們知道Kubernetes各個組件都是通過REST API跟API Server交互通信的,而如果每次每一個組件都直接跟API Server交互去讀取/寫入到后端的etcd的話,會對API Server以及etcd造成非常大的負擔(dān)。 而Informer機制是為了保證各個組件之間通信的實時性、可靠性,并且減緩對API Server和etcd的負擔(dān)。
Informer 需要滿足哪些要求?
消息可靠性
消息實時性
消息順序性
高性能
核心功能

Informer的工作流程
Informer 首先會 list/watch apiserver,Informer 所使用的 Reflector 包負責(zé)與 apiserver 建立連接,Reflector 使用 ListAndWatch 的方法,會先從 apiserver 中 list 該資源的所有實例,list 會拿到該對象最新的 resourceVersion,然后使用 watch 方法監(jiān)聽該 resourceVersion 之后的所有變化,若中途出現(xiàn)異常,reflector 則會從斷開的 resourceVersion 處重現(xiàn)嘗試監(jiān)聽所有變化,一旦該對象的實例有創(chuàng)建、刪除、更新動作,Reflector 都會收到"事件通知",這時,該事件及它對應(yīng)的 API 對象這個組合,被稱為增量(Delta),它會被放進 DeltaFIFO 中。
Informer 會不斷地從這個 DeltaFIFO 中讀取增量,每拿出一個對象,Informer 就會判斷這個增量的時間類型,然后創(chuàng)建或更新本地的緩存,也就是 store。
如果事件類型是 Added(添加對象),那么 Informer 會通過 Indexer 的庫把這個增量里的 API 對象保存到本地的緩存中,并為它創(chuàng)建索引,若為刪除操作,則在本地緩存中刪除該對象。
DeltaFIFO 再 pop 這個事件到 controller 中,controller 會調(diào)用事先注冊的 ResourceEventHandler 回調(diào)函數(shù)進行處理。
在 ResourceEventHandler 回調(diào)函數(shù)中,其實只是做了一些很簡單的過濾,然后將關(guān)心變更的 Object 放到 workqueue 里面。
Controller 從 workqueue 里面取出 Object,啟動一個 worker 來執(zhí)行自己的業(yè)務(wù)邏輯,業(yè)務(wù)邏輯通常是計算目前集群的狀態(tài)和用戶希望達到的狀態(tài)有多大的區(qū)別,然后孜孜不倦地讓 apiserver 將狀態(tài)演化到用戶希望達到的狀態(tài),比如為 deployment 創(chuàng)建新的 pods,或者是擴容/縮容 deployment。
在worker中就可以使用 lister 來獲取 resource,而不用頻繁的訪問 apiserver,因為 apiserver 中 resource 的變更都會反映到本地的 cache 中。
List & Watch

List所做的,就是向API Server發(fā)送一個http短鏈接請求,羅列所有目標(biāo)資源的對象。而Watch所做的是實際的“監(jiān)聽”工作,通過http長鏈接的方式,其與API Server能夠建立一個持久的監(jiān)聽關(guān)系,當(dāng)目標(biāo)資源發(fā)生了變化時,API Server會返回一個對應(yīng)的事件,從而完成一次成功的監(jiān)聽,之后的事情便交給后面的handler來做。
這樣一個List & Watch機制,帶來了如下幾個優(yōu)勢:
事件響應(yīng)的實時性:通過Watch的調(diào)用,當(dāng)API Server中的目標(biāo)資源產(chǎn)生變化時,能夠及時的收到事件的返回,從而保證了事件響應(yīng)的實時性。而倘若是一個輪詢的機制,其實時性將受限于輪詢的時間間隔。
事件響應(yīng)的可靠性:倘若僅調(diào)用Watch,則如果在某個時間點連接被斷開,就可能導(dǎo)致事件被丟失。List的調(diào)用帶來了查詢資源期望狀態(tài)的能力,客戶端通過期望狀態(tài)與實際狀態(tài)的對比,可以糾正狀態(tài)的不一致。二者結(jié)合保證了事件響應(yīng)的可靠性。
高性能:倘若僅周期性地調(diào)用List,輪詢地獲取資源的期望狀態(tài)并在與當(dāng)前狀態(tài)不一致時執(zhí)行更新,自然也可以do the job。但是高頻的輪詢會大大增加API Server的負擔(dān),低頻的輪詢也會影響事件響應(yīng)的實時性。Watch這一異步消息機制的結(jié)合,在保證了實時性的基礎(chǔ)上也減少了API Server的負擔(dān),保證了高性能。
事件處理的順序性:我們知道,每個資源對象都有一個遞增的ResourceVersion,唯一地標(biāo)識它當(dāng)前的狀態(tài)是“第幾個版本”,每當(dāng)這個資源內(nèi)容發(fā)生變化時,對應(yīng)產(chǎn)生的事件的ResourceVersion也會相應(yīng)增加。在并發(fā)場景下,K8s可能獲得同一資源的多個事件,由于K8s只關(guān)心資源的最終狀態(tài),因此只需要確保執(zhí)行事件的ResourceVersion是最新的,即可確保事件處理的順序性。
ResourceVersion
Kubernetes 請求并發(fā)控制與數(shù)據(jù)一致性(含ResourceVersion、Update、Patch簡析)
Kubernetes-resourceVersion機制分析
秘訣就是 Chunked transfer encoding(分塊傳輸編碼),它首次出現(xiàn)在HTTP/1.1。正如維基百科所說:
HTTP 分塊傳輸編碼允許服務(wù)器為動態(tài)生成的內(nèi)容維持 HTTP 持久鏈接。通常,持久鏈接需要服務(wù)器在開始發(fā)送消息體前發(fā)送Content-Length消息頭字段,但是對于動態(tài)生成的內(nèi)容來說,在內(nèi)容創(chuàng)建完之前是不可知的。使用分塊傳輸編碼,數(shù)據(jù)分解成一系列數(shù)據(jù)塊,并以一個或多個塊發(fā)送,這樣服務(wù)器可以發(fā)送數(shù)據(jù)而不需要預(yù)先知道發(fā)送內(nèi)容的總大小。
當(dāng)客戶端調(diào)用 watch API 時,apiserver 在response 的 HTTP Header 中設(shè)置 Transfer-Encoding的值為chunked,表示采用分塊傳輸編碼,客戶端收到該信息后,便和服務(wù)端該鏈接,并等待下一個數(shù)據(jù)塊,即資源的事件信息。例如:
Informer 能保證通過list+watch不會丟失事件,如果網(wǎng)絡(luò)抖動重新恢復(fù)后,watch會帶著之前的resourceVersion號重連,resourceVersion是單調(diào)遞增的, API Server 收到該請求后會將所有大于該resourceVersion的變更同步過來。
二級緩存
二級緩存屬于 Informer 的底層緩存機制,這兩級緩存分別是 DeltaFIFO 和 LocalStore。這兩級緩存的用途各不相同。DeltaFIFO 用來存儲 Watch API 返回的各種事件 ,LocalStore 只會被 Lister 的 List/Get 方法訪問 。
如果K8s每次想查看資源對象的狀態(tài),都要經(jīng)歷一遍List調(diào)用,顯然對 API Server 也是一個不小的負擔(dān),對此,一個容易想到的方法是使用一個cache作保存,需要獲取資源狀態(tài)時直接調(diào)cache,當(dāng)事件來臨時除了響應(yīng)事件外,也對cache進行刷新。
雖然 Informer 和 Kubernetes 之間沒有 resync 機制,但 Informer 內(nèi)部的這兩級緩存之間存在 resync 機制。
Resync
Resync 機制會將 Indexer 的本地緩存重新同步到 DeltaFIFO 隊列中。一般我們會設(shè)置一個時間周期,讓 Indexer 周期性地將緩存同步到隊列中。直接 list/watch API Server 就已經(jīng)能拿到集群中資源對象變化的 event 了,這里引入 Resync 的作用是什么呢?去掉會有什么影響呢?
自定義事件處理
ResourceEventHandler 用于處理對象的變更事件,用戶可以通過實現(xiàn) ResourceEventHandler 接口,并調(diào)用 sharedIndexInformer.AddEventHandler() 或 sharedIndexInformer.AddEventHandlerWithResyncPeriod() 方法注冊到 sharedProcessor 中。這樣,當(dāng)數(shù)據(jù)發(fā)送變化時,就會回調(diào) ResourceEventHandler 中對應(yīng)的 OnAdd/OnUpdate/OnDelete 方法來實現(xiàn)用戶自定義的處理邏輯
核心對象
Informer相關(guān)
client-go 中提供了幾種不同的 Informer:
通過調(diào)用 NewInformer 函數(shù)創(chuàng)建一個簡單的不帶 indexer 的 Informer。
通過調(diào)用 NewIndexerInformer 函數(shù)創(chuàng)建一個簡單的帶 indexer 的 Informer。
通過調(diào)用 NewSharedIndexInformer 函數(shù)創(chuàng)建一個 Shared 的 Informer。
通過調(diào)用 NewDynamicSharedInformerFactory 函數(shù)創(chuàng)建一個為 Dynamic 客戶端的 Shared 的 Informer。
這里帶有 Indexer 和不帶 Indexer 的大家好理解寫,從字面意思來看,就是一個是帶有 Indexer 功能一個不帶有 Indexer 功能的 Informer。而這里的 Shared 的 Informer 引入,其實是因為隨著 K8S 中,相同資源的監(jiān)聽者在不斷地增加,從而導(dǎo)致很多調(diào)用者通過 Watch API 對 API Server 建立一個長連接去監(jiān)聽事件的變化,這將嚴(yán)重增加了 API Server 的工作負載,及資源的浪費。
比如在 kube-controller-manager 組件中,有很多控制管理都需要監(jiān)聽 Pod 資源的變化,如果都獨立的調(diào)用 Informer 去維護一個對 APIServer 的長連接,這將導(dǎo)致 kube-controller-manager 中資源的浪費及增加了 APIServer 的負載,而不同控制管理者通過創(chuàng)建 Shared 的 Informer 則實現(xiàn)了這些控制管理者使用同一個 Watch 去和 APIServer 建立長連接,并在收到事件后,分發(fā)給下游的調(diào)用者。
SharedInformer
我們平時說的 Informer 其實就是 SharedInformer,它是可以共享使用的。如果同一個資源的 Informer 被實例化多次,那么就會運行多個 ListAndWatch 操作,這會加大 APIServer 的壓力。而 SharedInformer 通過一個 map 來讓同一類資源的 Informer 實現(xiàn)共享一個 Refelctor,這樣就不會出現(xiàn)上面這個問題了。
Informer通過Local Store緩存目標(biāo)資源對象,且僅為自己所用。但是在K8s中,一個Controller可以關(guān)心不止一種資源,使得多個Controller所關(guān)心的資源彼此會存在交集。如果幾個Controller都用自己的Informer來緩存同一個目標(biāo)資源,顯然會導(dǎo)致不小的空間開銷,因此K8s引入了SharedInformer來解決這個問題。
SharedInformer擁有為多個Controller提供一個共享cache的能力,從而避免資源緩存的重復(fù)、減小空間開銷。除此之外,一個SharedInformer對一種資源只建立一個與API Server的Watch監(jiān)聽,且能夠?qū)⒈O(jiān)聽得到的事件分發(fā)給下游所有感興趣的Controller,這也顯著地減少了API Server的負載壓力。實際上,K8s中廣泛使用的都是SharedInformer,Informer則出場甚少。
SharedIndexInformer
SharedIndexInformer 擴展了 SharedInformer 接口,提供了構(gòu)建索引的能力。
SharedIndexInformerFactory
使用sharedInformerFactory可以統(tǒng)一管理控制器中需要的各資源對象的informer實例,避免同一個資源創(chuàng)建多個實例
默認的 Informer 實現(xiàn)
Informer 機制為 K8s 的各種對象提供了默認的 Informer 實現(xiàn),可以通過以下方式快速創(chuàng)建一個 Informer 對象,并交由 SharedIndexInformerFactory 統(tǒng)一管理。
SharedInformerFactory.Core().V1().Nodes()
.Core().V1().Pods()
.Apps().V1().Deployments()
.Core().V1().Secrets()
.Batch().V1beta1().CronJobs()
List-Watch相關(guān)
Reflector
Reflector用來watch特定的k8s API資源。具體的實現(xiàn)是通過ListAndWatch的方法,watch可以是k8s內(nèi)建的資源或者是自定義的資源。當(dāng)reflector通過watch API接收到有關(guān)新資源實例存在的通知時,它使用相應(yīng)的列表API獲取新創(chuàng)建的對象,并將其放入watchHandler函數(shù)內(nèi)的Delta Fifo隊列中。
ListerWatcher
ListerWatcher 是 Informer 機制中的核心對象之一,其功能是通過 List() 方法從 API Server 中獲取某一類型的全量數(shù)據(jù),再通過 Watch() 方法監(jiān)聽 API Server 中數(shù)據(jù)的增量更新。
ListerWatcher 繼承自 Lister 和 Watcher 接口,從而使其既能獲取全量數(shù)據(jù),又能監(jiān)聽增量數(shù)據(jù)更新。
Lister
Lister 接口用于完成全量數(shù)據(jù)的初始化。
Watcher
Watcher 接口用于監(jiān)聽數(shù)據(jù)的增量更新。
事件隊列相關(guān)

Store
Store是一個通用的對象存儲接口,其中定義了一系列與對象增刪改查相關(guān)的方法。Store 要求對象有唯一鍵,鍵的計算方式由接口實現(xiàn)類中關(guān)聯(lián)的 KeyFunc 決定的。
Queue
從 Queue 接口的定義可以看出,它繼承自 Store 接口,所以其具備基本的數(shù)據(jù)存取能力。同時,它又具備從隊列頭部取出數(shù)據(jù)并調(diào)用 PopProcessFunc 處理頭部數(shù)據(jù)并返回處理結(jié)果的能力。
DeltaFIFO
DeltaFIFO 是一個生產(chǎn)者-消費者的隊列,生產(chǎn)者是 Reflector,消費者是 Pop 函數(shù),從架構(gòu)圖上可以看出 DeltaFIFO 的數(shù)據(jù)來源為 Reflector,通過 Pop 操作消費數(shù)據(jù),消費的數(shù)據(jù)一方面存儲到 Indexer 中,另一方面可以通過 Informer 的 handler 進行處理,Informer 的 handler 處理的數(shù)據(jù)需要與存儲在 Indexer 中的數(shù)據(jù)匹配。需要注意的是,Pop 的單位是一個 Deltas,而不是 Delta。
Delta
Delta 是 DeltaFIFO 存儲的類型,它記錄了對象發(fā)生了什么變化以及變化后對象的狀態(tài)。如果變更是刪除,它會記錄對象刪除之前的最終狀態(tài)。
Deltas
Deltas 保存了對象狀態(tài)的變更(Add/Delete/Update)信息(如 Pod 的刪除添加等),Deltas 緩存了針對相同對象的多個狀態(tài)變更信息,如 Pod 的 Deltas[0]可能更新了標(biāo)簽,Deltas[1]可能刪除了該 Pod。最老的狀態(tài)變更信息為 Oldest(),最新的狀態(tài)變更信息為 Newest(),使用中,獲取 DeltaFIFO 中對象的 key 以及獲取 DeltaFIFO 都以最新狀態(tài)為準(zhǔn)。
最舊的 delta 在索引0位置,最新的 delta 在最后一個索引位置。
DeltaType
const (
Added DeltaType = "Added" // 增加
Updated DeltaType = "Updated" // 更新
Deleted DeltaType = "Deleted" // 刪除
Sync DeltaType = "Sync" // 同步
)</pre>
DeltaFIFO 中數(shù)據(jù)存儲形式

事件處理相關(guān)
Controller
Processor


[圖片上傳失敗...(image-25a1a1-1637847474305)]
ResourceEventHandler
當(dāng)經(jīng)過List & Watch得到事件時,接下來的實際響應(yīng)工作就交由ResourceEventHandler來進行,這個Interface定義如下:
type ResourceEventHandler interface {
// 添加對象回調(diào)函數(shù)
OnAdd(obj interface{})
// 更新對象回調(diào)函數(shù)
OnUpdate(oldObj, newObj interface{})
// 刪除對象回調(diào)函數(shù)
OnDelete(obj interface{})
}
當(dāng)事件到來時,Informer根據(jù)事件的類型(添加/更新/刪除資源對象)進行判斷,將事件分發(fā)給綁定的EventHandler,即分別調(diào)用對應(yīng)的handle方法(OnAdd/OnUpdate/OnDelete),最后EventHandler將事件發(fā)送給Workqueue。
緩存相關(guān)

Indexer
Indexer在Store基礎(chǔ)上擴展了索引能力,就好比給數(shù)據(jù)庫添加的索引,以便查詢更快,那么肯定需要有個結(jié)構(gòu)來保存索引。典型的索引用例是基于對象標(biāo)簽創(chuàng)建索引。 Indexer可以根據(jù)多個索引函數(shù)維護索引。Indexer使用線程安全的數(shù)據(jù)存儲來存儲對象及其鍵。 在Store中定義了一個名為MetaNamespaceKeyFunc 的默認函數(shù),該函數(shù)生成對象的鍵作為該對象的<namespace> / <name>組合。
Reflector 通過 ListAndWatch 把數(shù)據(jù)傳入 DeltaFIFO 后,經(jīng)過 DeltaFIFO 的 Pop 函數(shù)將資源對象存入到了本地的一個存儲 Indexer 中,而這個底層真正的存儲其實就是上面的 ThreadSafeStore。

ThreadSafeStore
使用示例
var kubeconfig *string
if home := homedir.HomeDir(); home != "" {
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
kubeconfig = flag.String(, , "absolute path to the kubeconfig file")
}
flag.Parse()
config, err := clientcmd.BuildConfigFromFlags(, *kubeconfig)
if err != nil {
panic(err)
}
// 初始化 client
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
log.Panic(err.Error())
}
stopper := make(chan struct{})
defer close(stopper)
// 初始化 informer
factory := informers.NewSharedInformerFactory(clientset, 0)
nodeInformer := factory.Core().V1().Nodes()
informer := nodeInformer.Informer()
defer runtime.HandleCrash()
// 啟動 informer,list & watch
go factory.Start(stopper)
// 從 apiserver 同步資源,必不可少
if !cache.WaitForCacheSync(stopper, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
// 使用自定義 handler
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: onAdd,
UpdateFunc: func(interface{}, interface{}) { fmt.Println("update not implemented") }, // 此處省略 workqueue 的使用
DeleteFunc: func(interface{}) { fmt.Println("delete not implemented") },
})
實現(xiàn)原理
構(gòu)建過程

啟動過程

List 過程

Watch過程

變更分發(fā)過程

變更處理過程(待補充)
完整流程
