Table of Contents
- 1.章節(jié)介紹
- 2. cache.SharedIndexInformer結構介紹
- 3. sharedIndexInformer.Run
- 4 參考
1.章節(jié)介紹
從上一章節(jié)可以知道。利用informer機制可以非常簡單地實現(xiàn)一個資源對象的控制器,具體步驟為
(1)new SharedInformerFactory實例,然后指定indexer,listWatch參數(shù),就可以生成一個 cache.SharedIndexInformer 對象
(2)cache.SharedIndexInformer 實際是完成了下圖中的informer機制

這一章節(jié)開始從SharedIndexInformer入手研究informer機制。
2. cache.SharedIndexInformer結構介紹
type sharedIndexInformer struct {
indexer Indexer // 本地的緩存+索引機制,上一篇文章詳解介紹了
controller Controller // 控制器,啟動reflector, 這個controller包含reflector:根據(jù)用戶定義的ListWatch方法獲取對象并更新增量隊列DeltaFIFO
processor *sharedProcessor // 注冊了add,update,del事件的listener集合
cacheMutationDetector CacheMutationDetector // 突變檢測器
// This block is tracked to handle late initialization of the controller
listerWatcher ListerWatcher // 定義了list, watch函數(shù), 看podinformer那里就可以知道,是直接調用了client往apiserver發(fā)送了請求
objectType runtime.Object // 定義要List watch的對象類型。如果是Podinfomer,就是要傳入core.v1.pod
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
resyncCheckPeriod time.Duration // 給自己的controller的reflector每隔多少s<嘗試>調用listener的shouldResync方法
// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler (i.e. they don't specify one and just want to use the shared informer's default
// value).
defaultEventHandlerResyncPeriod time.Duration // 通過AddEventHandler注冊的handler的默認同步值
// clock allows for testability
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// blockDeltas gives a way to stop all event distribution so that a late event handler
// can safely join the shared informer.
blockDeltas sync.Mutex
}
SharedIndexInformer主要包括以下對象:
(1)indexer
圖中右下角的indexer。上一節(jié)已經分析了具體的實現(xiàn)。
(2)Controller
圖中左邊的Controller,啟動reflector, list-watch那一套機制。接下來重點分析
(3)processor
圖中最下面的listeners,所有往 informer注冊了 ResourceEventHandler的都是一個listener。
因為是共享informer,所以存在一個inforemr實例化了多次,然后注冊了多個ResourceEventHandler。一般情況下,一個Informer一個listener
type sharedProcessor struct {
listenersStarted bool
listenersLock sync.RWMutex
listeners []*processorListener // 記錄了informer添加的所有l(wèi)istener
syncingListeners []*processorListener // 記錄了informer中哪些listener處于sync狀態(tài)。由resyncCheckPeriod參數(shù)控制。每隔resyncCheckPeriod秒,listener都需要重新同步一下,同步就是將listener變成syncingListeners。
clock clock.Clock
wg wait.Group
}
ResourceEventHandler結構體如下。這個就是定義Informer,add, update, del的處理事件。
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
(4)CacheMutationDetector
突變檢測器,用來檢測內存中對象是否發(fā)生了突變。測試的時候用,默認不開啟。這個先不深入了解
3. sharedIndexInformer.Run
k8s.io/client-go/tools/cache/shared_informer.go
在使用informer的時候,定義好sharedIndexInformer后,就直接運行了sharedIndexInformer.Run函數(shù)開始了整個Informer機制。
整個informer的運轉邏輯就是:
(1)deltaFIFO接收listAndWatch的全量/增量數(shù)據(jù),然后通過pop函數(shù)發(fā)送到HandleDeltas函數(shù)中 (生產)
(2)HandleDeltas將一個一個的事件發(fā)送到自定義的handlers 和 更新indexer緩存 (消費)
現(xiàn)在就沿著 Run這個函數(shù)入手,看看具體是如何實現(xiàn)的。sharedIndexInformer.Run主要邏輯如下:
- new一個 deltafifo對象,并且指定對象的keyfun為 MetaNamespaceKeyFunc,就是用 ns/name 來當對象的key
- 生成config,利用config 生成一個controller
- 運行用戶自定義handler的處理邏輯,s.processor.run (開啟消費)
- 運行controller.run,實現(xiàn)整體的運作邏輯 (開啟生產)
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 1. new一個 deltafifo對象,并且指定對象的keyfun為 MetaNamespaceKeyFunc,就是用 ns/name 來當對象的key
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 2. 生成config
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod, // 同步周期
RetryOnError: false,
ShouldResync: s.processor.shouldResync, // 這是個函數(shù),用于判斷自定義的handler是否需要同步
Process: s.HandleDeltas, // listwatch來了數(shù)據(jù),如何處理的函數(shù)
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 3. 利用config 生成一個controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 內存突變檢測,忽略
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 4. 運行用戶自定義handler的處理邏輯
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 5.運行controller
s.controller.Run(stopCh)
}
3.1 NewDeltaFIFO
3.1.1 DeltaFIFO的定位
在apisever中的list-watch機制介紹中,就可以知道。直接使用list,watch api就可以獲得全量和增量數(shù)據(jù)。
如果讓我寫一個最簡單的client-go客戶端,我實現(xiàn)的方式是:
(1)定義一個本地存儲cache,list的時候將數(shù)據(jù)放到cache中
(2)然后watch的時候就更新cache數(shù)據(jù),然后再將對象發(fā)送到自定義的add, update, del handler函數(shù)中
需要cache的原因:本地緩存一份etcd數(shù)據(jù),這樣控制器需要訪問數(shù)據(jù)的話,直接從本地拿。
以上可以實現(xiàn)一個很簡陋的客戶端,但是還遠遠達不到informer機制的要求。
informer機制為啥需要DeltaFIFO?
(1)為啥需要FIFO隊列?
很容易理解,F(xiàn)IFO是保障有序,不有序就會導致數(shù)據(jù)錯亂。 隊列是為了緩沖,如果更新的數(shù)據(jù)太多,informer機制可能就扛不住了
(2)為啥需要delta?
FIFO隊列的元素總共就兩個去向。第一用于同步本地緩存。第二用于發(fā)送給自定義的add, update, del handler函數(shù)。
假設某個極短的時間內,某一個對象做了大量的update,最后被刪除了。這樣的話,F(xiàn)IFO隊列其實是堆積了很多數(shù)據(jù)。
一個一個發(fā)送給handler函數(shù)沒有問題,因為用戶就想知道這個過程。但是如果是一個一個的更新本地緩存,最后又delete了,那前面的update就浪費了。
所以這個時候DeltaFIFO隊列出現(xiàn)了。它解決了這個問題。
3.1.2 DeltaFIFO結構介紹
DeltaFIFO可以認為是一個特殊的FIFO隊列。Delta就是k8s系統(tǒng)中對象的變化(增、刪、改、同步)的一個標記。
增、刪、改肯定是需要的,因為就算我們自己實現(xiàn)一個隊列也需要當前是做了什么操作。
同步是重新List apiserver的時候需要的
// 有著四種類型
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
// Delta is the type stored by a DeltaFIFO. It tells you what change
// happened, and the object's state after* that change.
//
// [*] Unless the change is a deletion, and then you'll get the final
// state of the object before it was deleted.
type Delta struct {
Type DeltaType
Object interface{} //k8s中的對象
}
// Deltas is a list of one or more 'Delta's to an individual object.
// The oldest delta is at index 0, the newest delta is the last one.
type Deltas []Delta
type DeltaFIFO struct {
// lock/cond protects access to 'items' and 'queue'.
lock sync.RWMutex
cond sync.Cond
// We depend on the property that items in the set are in
// the queue and vice versa, and that all Deltas in this
// map have at least one Delta.
items map[string]Deltas
queue []string
// populated和initialPopulationCount 是用來判斷 process是否同步的
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool //隊列的元素開始被消費
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// keyFunc is used to make the key used for queued item
// insertion and retrieval, and should be deterministic.
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
knownObjects KeyListerGetter
// Indication the queue is closed.
// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
// Currently, not used to gate any of CRED operations.
closed bool
closedLock sync.Mutex
}
DeltaFIFO最關鍵的是, items, queue, 和knownObjects。
items: 對象的變化過程列表
Queue: 表示對象的key。
knownObjects:從下面的初始化可以看出來,就是 cache.indexer
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
func NewDeltaFIFO(keyFunc KeyFunc, knownObjects KeyListerGetter) *DeltaFIFO {
f := &DeltaFIFO{
items: map[string]Deltas{},
queue: []string{},
keyFunc: keyFunc,
knownObjects: knownObjects,
}
f.cond.L = &f.lock
return f
}
3.1.3 舉例說明deltaFifo核心結構
假設監(jiān)聽了 default命名空間的所有Pod,最開始該命名空間沒有Pod,然后監(jiān)聽了一會后,創(chuàng)建了三個Pod, 分別為:
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "two", Annotations: map[string]string{"users": "bert,oscar"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "tre", Annotations: map[string]string{"users": "ernie,elmo"}}}
那么watch函數(shù)依次會產生如下的事件:
pod1-1:表示pod1對應的第一個階段 (pending狀態(tài))
pod1-2:表示pod1對應的第二個階段 (scheduled狀態(tài))
pod1-3:表示pod1對應的第三個階段 (running狀態(tài))
ADD: pod1-1(省略模式,其實是整個pod的元數(shù)據(jù),{ObjectMeta: metav1.ObjectMeta{Name: "one", Annotations: map[string]string{"users": "ernie,bert"}}})
ADD: pod2-1
MODIFIED: pod1-2
ADD: pod3-1
MODIFIED: pod2-2
MODIFIED: pod3-2
MODIFIED: pod1-3
MODIFIED: pod3-3
MODIFIED: pod2-3
這個時候 deltaFIFO結構體的對象為:
deltaFIFO {
queue: ["one", "two", "tree"],
Items: {
"one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}],
"two": [{"add", pod2-1}, {"update", pod2-2}, {"update", pod2-3}],
"tre": [{"add", pod3-1}, {"update", pod3-2}, {"update", pod3-3}],
}
}
這樣的好處就是:
(1)每次是以一個對象為單位進行發(fā)送,比如這里一次就將 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}] 三個事件發(fā)送給了 handler方
(2)indexer可以知道當前對象的最終狀態(tài)。比如 "one": [{"add", pod1-1}, {"update", pod1-2}, {"update", pod1-3}], 這個,能跳過pod1-1, pod1-2狀態(tài),直接將pod1-3狀態(tài)更新到緩存中去。
3.2 sharedIndexInformer生產數(shù)據(jù)
都知道數(shù)據(jù)產生方來著 apisever的listAndWatch。接下來看看是如何使用的。這里直接從 controller.run入手。
3.2.1 controller結構
controller結構本身非常簡單,主要就是一個config,然后根據(jù)config實現(xiàn)的一些生產數(shù)據(jù)相關的函數(shù)
// New makes a new Controller from the given Config.
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
// Config contains all the settings for a Controller.
type Config struct {
// The queue for your objects - has to be a DeltaFIFO due to
// assumptions in the implementation. Your Process() function
// should accept the output of this Queue's Pop() method.
// 弄一個數(shù)據(jù)緩存
Queue
// 從aipserver接收數(shù)據(jù)
ListerWatcher
// Something that can process your objects.
// 如何處理接收到的數(shù)據(jù)
Process ProcessFunc
// The type of your objects.
// 數(shù)據(jù)是什么類型,Pod? deploy?
ObjectType runtime.Object
FullResyncPeriod time.Duration
// 是否需要同步
ShouldResync ShouldResyncFunc
//是否錯誤重試
RetryOnError bool
}
3.2.2 controller.run
- 實例化 NewReflector
- 通過List-watch獲得生產數(shù)據(jù)
- 處理生產數(shù)據(jù),不斷執(zhí)行processLoop,這個方法其實就是從DeltaFIFO pop出對象,再調用reflector的Process(其實是shareIndexInformer的HandleDeltas方法)處理
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 1.實例化 NewReflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// 2. 通過List-watch獲得生產數(shù)據(jù)
wg.StartWithChannel(stopCh, r.Run)
// 3. 處理生產數(shù)據(jù)
// 不斷執(zhí)行processLoop,這個方法其實就是從DeltaFIFO pop出對象,再調用reflector的Process(其實是shareIndexInformer的HandleDeltas方法)處理
wait.Until(c.processLoop, time.Second, stopCh)
}
3.2.3 Reflector實例
Reflector核心結構,可以看出來基本都是從config基礎下來的。
// Reflector watches a specified resource and causes all changes to be reflected in the given store.
type Reflector struct {
// name identifies this reflector. By default it will be a file:line if possible.
name string
// The name of the type we expect to place in the store. The name
// will be the stringification of expectedGVK if provided, and the
// stringification of expectedType otherwise. It is for display
// only, and should not be used for parsing or comparison.
expectedTypeName string
// The type of object we expect to place in the store.
expectedType reflect.Type
// The GVK of the object we expect to place in the store if unstructured.
expectedGVK *schema.GroupVersionKind
// The destination to sync up with the watch source
store Store //獲得數(shù)據(jù)存放哪里,就是deltaFIFO隊列
// listerWatcher is used to perform lists and watches.
listerWatcher ListerWatcher
// period controls timing between one watch ending and
// the beginning of the next one.
period time.Duration
resyncPeriod time.Duration
ShouldResync func() bool
// clock allows tests to manipulate time
clock clock.Clock
// lastSyncResourceVersion is the resource version token last
// observed when doing a sync with the underlying store
// it is thread safe, but not synchronized with the underlying store
lastSyncResourceVersion string
// lastSyncResourceVersionMutex guards read/write access to lastSyncResourceVersion
lastSyncResourceVersionMutex sync.RWMutex
// WatchListPageSize is the requested chunk size of initial and resync watch lists.
// Defaults to pager.PageSize.
WatchListPageSize int64
}
3.2.4 Reflector.run
就是上面的r.un。就做一件事。運行l(wèi)istAndWatch函數(shù)。
注意:ListAndWatch函數(shù)是1s運行一次喲。
所以relist并不是listAndWatch干的。ListAndWatch只是進行一輪list 和 watch(正常情況會一直保持watch)
當ListAndWatch因為異常/錯誤或者其他原因退出了,Reflector會自動再次執(zhí)行l(wèi)istAndWatch
// Run starts a watch and handles watch events. Will restart the watch if it is closed.
// Run will exit when stopCh is closed.
func (r *Reflector) Run(stopCh <-chan struct{}) {
klog.V(3).Infof("Starting reflector %v (%s) from %s", r.expectedTypeName, r.resyncPeriod, r.name)
wait.Until(func() {
if err := r.ListAndWatch(stopCh); err != nil {
utilruntime.HandleError(err)
}
}, r.period, stopCh)
}
NewReflector定義了period是1s
// NewReflector creates a new Reflector object which will keep the given store up to
// date with the server's contents for the given resource. Reflector promises to
// only put things in the store that have the type of expectedType, unless expectedType
// is nil. If resyncPeriod is non-zero, then lists will be executed after every
// resyncPeriod, so that you can use reflectors to periodically process everything as
// well as incrementally processing the things that change.
func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
return NewNamedReflector(naming.GetNameFromCallsite(internalPackages...), lw, expectedType, store, resyncPeriod)
}
// NewNamedReflector same as NewReflector, but with a specified name for logging
func NewNamedReflector(name string, lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
r := &Reflector{
name: name,
listerWatcher: lw,
store: store,
period: time.Second, // period是1s
resyncPeriod: resyncPeriod,
clock: &clock.RealClock{},
}
r.setExpectedType(expectedType)
return r
}
3.2.5 ListAndWatch
知識補充
listAndWatch核心思路就是:將apiserver list/watch到的數(shù)據(jù)發(fā)送到deltaFIFO隊列中去。
在看代碼之前,先通過curl kube-apiserver來看看 list-watch的特性。
(1)podList可以認為是一個新的對象,它也是有資源版本的說法
(2)list默認是用來chunk(分段傳輸)的,chunk的介紹和好處 https://zh.wikipedia.org/wiki/%E5%88%86%E5%9D%97%E4%BC%A0%E8%BE%93%E7%BC%96%E7%A0%81
(3)v1.19 及以上版本的 API 服務器支持 resourceVersionMatch 參數(shù),用以確定如何對 LIST 調用應用 resourceVersion 值。 強烈建議在為 LIST 調用設置了 resourceVersion 時也設置 resourceVersionMatch。 如果 resourceVersion 未設置,則 resourceVersionMatch 是不允許設置的。 為了向后兼容,客戶端必須能夠容忍服務器在某些場景下忽略 resourceVersionMatch 的行為:
- 當設置
resourceVersionMatch=NotOlderThan且指定了limit時,客戶端必須能夠 處理 HTTP 410 "Gone" 響應。例如,客戶端可以使用更新一點的resourceVersion來重試,或者回退到resourceVersion=""(即允許返回任何版本)。 - 當設置了
resourceVersionMatch=Exact且未指定limit時,客戶端必須驗證 響應數(shù)據(jù)中ListMeta的resourceVersion與所請求的resourceVersion匹配, 并處理二者可能不匹配的情況。例如,客戶端可以重試設置了limit的請求。
除非你對一致性有著非常強烈的需求,使用 resourceVersionMatch=NotOlderThan 同時為 resourceVersion 設定一個已知值是優(yōu)選的交互方式,因為與不設置 resourceVersion 和 resourceVersionMatch 相比,這種配置可以取得更好的 集群性能和可擴縮性。后者需要提供帶票選能力的讀操作。
參考:https://kubernetes.io/zh/docs/reference/using-api/api-concepts/
| resourceVersionMatch 參數(shù) | 分頁參數(shù) | resourceVersion 未設置 | resourceVersion="0" | resourceVersion="<非零值>" |
|---|---|---|---|---|
| resourceVersionMatch 未設置 | limit 未設置 | 最新版本 | 任意版本 | 不老于指定版本 |
| resourceVersionMatch 未設置 | limit=<n>, continue 未設置 | 最新版本 | 任意版本 | 精確匹配 |
| resourceVersionMatch 未設置 | limit=<n>, continue=<token> | 從 token 開始、精確匹配 | 非法請求,視為從 token 開始、精確匹配 | 非法請求,返回 HTTP 400 Bad Request
|
| resourceVersionMatch=Exact [1] | limit 未設置 | 非法請求 | 非法請求 | 精確匹配 |
| resourceVersionMatch=Exact [1] | limit=<n>, continue 未設置 | 非法請求 | 非法請求 | 精確匹配 |
| resourceVersionMatch=NotOlderThan [1] | limit 未設置 | 非法請求 | 任意版本 | 不老于指定版本 |
| resourceVersionMatch=NotOlderThan [1] | limit=<n>, continue 未設置 | 非法請求 | 任意版本 | 不老于指定版本 |
// curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods -i
HTTP/1.1 200 OK
Audit-Id: 4ff9e833-e3e0-4001-9e1a-d83c9a9b1937
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:10:48 GMT
Transfer-Encoding: chunked
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163916927"
},
"items": [
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?limit=1 -i
HTTP/1.1 200 OK
Audit-Id: 17d0d42f-a122-4c5a-9659-70224a22522a
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 02:09:32 GMT
Transfer-Encoding: chunked //chunked傳輸
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/test-test/pods",
"resourceVersion": "163915936",
// 注意這continue
"continue": "eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ",
"remainingItemCount": 23 //表示當前還有23個沒有展示處理
},
"items": [
{
"metadata": {
"name": "app-istioversion-test-546dfff56-6t62p",
"generateName": "app-istioversion-test-546dfff56-",
// 加上這個continue參數(shù),會把剩下的23個展示出來。
curl http://7.34.19.44:58201/api/v1/namespaces/test-test/pods?continue=eyJ2IjoibWV0YS5rOHMuaW8vdjEiLCJydiI6MTYzOTE1OTM2LCJzdGFydCI6ImFwcC1pc3Rpb3ZlcnNpb24tdGVzdC01NDZkZmZmNTYtNnQ2MnBcdTAwMDAifQ
watch很簡單,就是一個長鏈接,chunked
root@cld-kmaster1-1051:/home/zouxiang# curl http://7.34.19.44:58201/api/v1/namespaces/default/pods?watch=true -i
HTTP/1.1 200 OK
Cache-Control: no-cache, private
Content-Type: application/json
Date: Sat, 20 Nov 2021 01:32:06 GMT
Transfer-Encoding: chunked
源碼分析
// ListAndWatch first lists all items and get the resource version at the moment of call,
// and then use the resource version to watch.
// It returns error if ListAndWatch didn't even try to initialize watch.
func (r *Reflector) ListAndWatch(stopCh <-chan struct{}) error {
klog.V(3).Infof("Listing and watching %v from %s", r.expectedTypeName, r.name)
var resourceVersion string
// Explicitly set "0" as resource version - it's fine for the List()
// to be served from cache and potentially be delayed relative to
// etcd contents. Reflector framework will catch up via Watch() eventually.
// 以版本號ResourceVersion=0開始首次list
options := metav1.ListOptions{ResourceVersion: "0"}
if err := func() error {
initTrace := trace.New("Reflector ListAndWatch", trace.Field{"name", r.name})
defer initTrace.LogIfLong(10 * time.Second)
var list runtime.Object
var err error
listCh := make(chan struct{}, 1)
panicCh := make(chan interface{}, 1)
go func() {
defer func() {
if r := recover(); r != nil {
panicCh <- r
}
}()
// Attempt to gather list in chunks, if supported by listerWatcher, if not, the first
// list request will return the full response.
// 開始list數(shù)據(jù),分頁
pager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (runtime.Object, error) {
return r.listerWatcher.List(opts)
}))
if r.WatchListPageSize != 0 {
pager.PageSize = r.WatchListPageSize
}
// Pager falls back to full list if paginated list calls fail due to an "Expired" error.
// 獲取list的數(shù)據(jù)
list, err = pager.List(context.Background(), options)
close(listCh)
}()
select {
case <-stopCh:
return nil
case r := <-panicCh:
panic(r)
case <-listCh:
}
if err != nil {
return fmt.Errorf("%s: Failed to list %v: %v", r.name, r.expectedTypeName, err)
}
initTrace.Step("Objects listed")
listMetaInterface, err := meta.ListAccessor(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v: %v", r.name, list, err)
}
resourceVersion = listMetaInterface.GetResourceVersion()
initTrace.Step("Resource version extracted")
// 提取list
items, err := meta.ExtractList(list)
if err != nil {
return fmt.Errorf("%s: Unable to understand list result %#v (%v)", r.name, list, err)
}
initTrace.Step("Objects extracted")
// 提取list的數(shù)據(jù)
if err := r.syncWith(items, resourceVersion); err != nil {
return fmt.Errorf("%s: Unable to sync list result: %v", r.name, err)
}
initTrace.Step("SyncWith done")
// 設置下一次list的resourceVersion
r.setLastSyncResourceVersion(resourceVersion)
initTrace.Step("Resource version updated")
return nil
}(); err != nil {
return err
}
resyncerrc := make(chan error, 1)
cancelCh := make(chan struct{})
defer close(cancelCh)
go func() {
resyncCh, cleanup := r.resyncChan()
defer func() {
cleanup() // Call the last one written into cleanup
}()
for {
select {
case <-resyncCh:
case <-stopCh:
return
case <-cancelCh:
return
}
if r.ShouldResync == nil || r.ShouldResync() {
klog.V(4).Infof("%s: forcing resync", r.name)
// 進行deltaFIFo的同步
if err := r.store.Resync(); err != nil {
resyncerrc <- err
return
}
}
cleanup()
resyncCh, cleanup = r.resyncChan()
}
}()
for {
// give the stopCh a chance to stop the loop, even in case of continue statements further down on errors
select {
case <-stopCh:
return nil
default:
}
timeoutSeconds := int64(minWatchTimeout.Seconds() * (rand.Float64() + 1.0))
options = metav1.ListOptions{
ResourceVersion: resourceVersion,
// We want to avoid situations of hanging watchers. Stop any wachers that do not
// receive any events within the timeout window.
TimeoutSeconds: &timeoutSeconds,
// To reduce load on kube-apiserver on watch restarts, you may enable watch bookmarks.
// Reflector doesn't assume bookmarks are returned at all (if the server do not support
// watch bookmarks, it will ignore this field).
AllowWatchBookmarks: true,
}
// 開始Watch
w, err := r.listerWatcher.Watch(options)
if err != nil {
switch err {
case io.EOF:
// watch closed normally
case io.ErrUnexpectedEOF:
klog.V(1).Infof("%s: Watch for %v closed with unexpected EOF: %v", r.name, r.expectedTypeName, err)
default:
utilruntime.HandleError(fmt.Errorf("%s: Failed to watch %v: %v", r.name, r.expectedTypeName, err))
}
// If this is "connection refused" error, it means that most likely apiserver is not responsive.
// It doesn't make sense to re-list all objects because most likely we will be able to restart
// watch where we ended.
// If that's the case wait and resend watch request.
if utilnet.IsConnectionRefused(err) {
time.Sleep(time.Second)
continue
}
return nil
}
// 處理watch的事件
if err := r.watchHandler(w, &resourceVersion, resyncerrc, stopCh); err != nil {
if err != errorStopRequested {
switch {
case apierrs.IsResourceExpired(err):
klog.V(4).Infof("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
default:
klog.Warningf("%s: watch of %v ended with: %v", r.name, r.expectedTypeName, err)
}
}
return nil
}
}
}
結合知識補充大概的流程很清楚?;卮鹨韵聨讉€問題
(1)list操作為什么需要resoureversion?
A: list機制本來就有resoureversion,resoureversion不同的值有不同的含義。每次list的時候記錄了resoureversion,可以保證數(shù)據(jù)最少是上一次list后的(實際基本都是最新的)
(2)為什么list會分頁?
如果設置了limit就會分頁
(3)如果提取list的數(shù)據(jù)
先是通過 items, err := meta.ExtractList(list) ,將list數(shù)據(jù)保持到items數(shù)組中
然后通過syncWith將List數(shù)據(jù)保持到 deltafIfo隊列中去
syncWith的邏輯如下:
(1)遍歷所有l(wèi)ist的數(shù)據(jù),通過 queueActionLocked(Sync, item)將所有的數(shù)據(jù),以(sync, item)的方式追加到 deltafifo的items里面
(2)遍歷所有fIfo queue的數(shù)據(jù),判斷是否存下 fifo有,但是最新list沒有的數(shù)據(jù)。如果存在這種情況。說明fifo漏到了delete請求,所以封裝一個(delete, DeletedFinalStateUnknown) 到deltafifo的items里面。
為什么是DeletedFinalStateUnknown呢?
因為Replace方法可能是reflector發(fā)生re-list的時候再次調用,這個時候就會出現(xiàn)knownObjects中存在的對象不在Replace list的情況(比
如watch的delete事件丟失了),這個時候是把這些對象篩選出來,封裝成DeletedFinalStateUnknown對象以Delete type類型再次加入
到deltaFIFO中,這樣最終從detaFIFO處理這個DeletedFinalStateUnknown 增量時就可以更新本地緩存并且觸發(fā)reconcile。 因為這個對
象最終的結構確實找不到了,所以只能用knownObjects里面的記錄來封裝delta,所以叫做FinalStateUnknown。
// syncWith replaces the store's items with the given list.
func (r *Reflector) syncWith(items []runtime.Object, resourceVersion string) error {
found := make([]interface{}, 0, len(items))
for _, item := range items {
found = append(found, item)
}
return r.store.Replace(found, resourceVersion)
}
// Replace will delete the contents of 'f', using instead the given map.
// 'f' takes ownership of the map, you should not reference the map again
// after calling this function. f's queue is reset, too; upon return, it
// will contain the items in the map, in no particular order.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 第一次遍歷list到的數(shù)據(jù)
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
// 2.將數(shù)據(jù)同步到fifo隊列中去。這個就是往fifi的items加入元素。可以看出來,list的都是同步的數(shù)據(jù)
// items的delta有四種類型:add, update, del, sync (這里都是sync)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 這個不存在,因為f.knownObjects=deltafifo
if f.knownObjects == nil {
// Do deletion detection against our own list.
}
// Detect deletions not already in the queue.
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
// 第二次遍歷fifo中隊列的數(shù)據(jù)
for _, k := range knownKeys {
// 如果fifo中的數(shù)據(jù),List也有,那就不用管,因為上面的for循環(huán)已經處理了
if keys.Has(k) {
continue
}
// 如果fifo中的數(shù)據(jù),list沒有,那就是該數(shù)據(jù)已經刪除了,但是由于某些原因,緩存沒有收到,所以要刪除這個隊形
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
// 發(fā)送的是delete的delta,主要這里是DeletedFinalStateUnknown
因為Replace方法可能是reflector發(fā)生re-list的時候再次調用,這個時候就會出現(xiàn)knownObjects中存在的對象不在Replace list的情況(比如watch的delete事件丟失了),這個時候是把這些對象篩選出來,封裝成DeletedFinalStateUnknown對象以Delete type類型再次加入到deltaFIFO中,這樣最終從detaFIFO處理這個DeletedFinalStateUnknown 增量時就可以更新本地緩存并且觸發(fā)reconcile。 因為這個對象最終的結構確實找不到了,所以只能用knownObjects里面的記錄來封裝delta,所以叫做FinalStateUnknown。
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
3.2.6 c.processLoop
list, watch將apiserver獲取的數(shù)據(jù)最終都保存到了 deltafifo隊列中去
processLoop將數(shù)據(jù)進行了分發(fā)處理。
processLoop就是將一個個元素拿出來,
func (c *controller) processLoop() {
for {
// for循環(huán)的方式將fifo隊列中的元素發(fā)送到 PopProcessFunc函數(shù)中去
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process)) // 在new config的時候指定了process= cfg :=HandleDeltas 函數(shù)
}
if err != nil {
if err == ErrFIFOClosed {
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
// 1.隊列為空,判斷是否關閉,如果沒有關閉就等待,否則返回
for len(f.queue) == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the f.closed is set and the condition is broadcasted.
// Which causes this loop to continue and return from the Pop().
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
// 2.取出來第一個元素, 注意是 queue里面的一個元素,對應的是Items里面的一個 map key-value對
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
// Item may have been deleted subsequently.
continue
}
delete(f.items, id)
// 3.調用process進行處理
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
// Don't need to copyDeltas here, because we're transferring
// ownership to the caller.
return item, err
}
}
HandleDeltas函數(shù)
終于出現(xiàn)了HandleDeltas, 如圖中HandleDeltas功能所示:
HandleDeltas就是干兩件事情:
(1)更新Indexer (這里很奇怪,沒有一次性更新Indexer到位,就是如果Deltas最后一個是del事件,還是會先update后再刪除)
(2)將事件進行distribute發(fā)送
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
// 同步就是relist的時候,fifo replace函數(shù)發(fā)出來的事件
case Sync, Added, Updated:
isSync := d.Type == Sync
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
if err := s.indexer.Update(d.Object); err != nil {
return err
}
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
if err := s.indexer.Add(d.Object); err != nil {
return err
}
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
distribute就很簡單,將事件進行發(fā)送,這里有一個很簡單的邏輯:
就是注冊resourceHandler的時候,可以指定是否需要同步。比如我New一個informer,然后指定不同步。
這個時候我對應的resourceHandler就不是syncingListeners.
理解listeners和syncingListeners的區(qū)別
processor可以支持listener的維度配置是否需要resync:一個informer可以配置多個EventHandler,而一個EventHandler對應processor中的一個listener,每個listener可以配置需不需要resync,如果某個listener需要resync,那么添加到deltaFIFO的Sync增量最終也只會回到對應的listener
reflector中會定時判斷每一個listener是否需要進行resync,判斷的依據(jù)是看配置EventHandler的時候指定的resyncPeriod,0代表該listener不需要resync,否則就每隔resyncPeriod看看是否到時間了
- listeners:記錄了informer添加的所有l(wèi)istener
- syncingListeners:記錄了informer中哪些listener處于sync狀態(tài)
syncingListeners是listeners的子集,syncingListeners記錄那些開啟了resync且時間已經到達了的listener,把它們放在一個獨立的slice是避免下面分析的distribute方法中把obj增加到了還不需要resync的listener中
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
add 就是往 addch chan發(fā)送數(shù)據(jù)
雖然p.addCh是一個無緩沖的channel,但是因為listener中存在ring buffer,所以這里并不會一直阻塞
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
3.3 s.processor.run消費數(shù)據(jù)
sharedIndexInformer.Run指定了controller.run進行數(shù)據(jù)生產:就是將List, watch到的數(shù)據(jù),以delta的方式保存到了deltafifo中
然后HandleDeltas 通過 distribute 函數(shù)將 delta變量發(fā)送到每一個 listener中去。
接下來分析s.processor.run是如何消費數(shù)據(jù)的。
s.processor.run的邏輯很清楚。啟動每一個listener,run and pop。
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
processorListener結構
type processorListener struct {
nextCh chan interface{} // 發(fā)送給handler的對象
addCh chan interface{} // distribute發(fā)送下來的對象
handler ResourceEventHandler //定義informer時候的 add, update, del函數(shù)
// pendingNotifications is an unbounded ring buffer that holds all notifications not yet distributed.
// There is one per listener, but a failing/stalled listener will have infinite pendingNotifications
// added until we OOM.
// TODO: This is no worse than before, since reflectors were backed by unbounded DeltaFIFOs, but
// we should try to do something better.
pendingNotifications buffer.RingGrowing // 緩存器,避免distribute發(fā)送的太快或者 hanlder處理的太慢
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration // 同步周期
// resyncPeriod is how frequently the listener wants a full resync from the shared informer. This
// value may differ from requestedResyncPeriod if the shared informer adjusts it to align with the
// informer's overall resync check period.
resyncPeriod time.Duration
// nextResync is the earliest time the listener should get a full resync
nextResync time.Time
// resyncLock guards access to resyncPeriod and nextResync
resyncLock sync.Mutex
}
pod and run
pop就是將addch 的對象發(fā)送到 nextCh。如果nextch滿了的話,就保持在pendingNotifications中
run就是將nextCh的對象發(fā)送的 hanlder中去處理。
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil { // No notification to pop (and pendingNotifications is empty)
// Optimize the case - skip adding to pendingNotifications
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
// this call blocks until the channel is closed. When a panic happens during the notification
// we will catch it, **the offending item will be skipped!**, and after a short delay (one second)
// the next notification will be attempted. This is usually better than the alternative of never
// delivering again.
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
4. 總結
(1)使用shareInformerFactory機制可以共享informer
(2)Infomer的核心就是下面的reflector機制,運轉流程為:
通過kube-apiserver的listAndWatch,監(jiān)聽到etcd的資源變化
-
內部通過deltaFIFO隊列更好的分發(fā)處理這些資源變化
- deltaFIFO除了原封不動的繼承kube-apiserver 的add/update/delete事件(這個是數(shù)據(jù)庫元素的變化)外,還會增加一個sync動作。這個是重新list的時候,F(xiàn)IFO通過replace函數(shù)加的。
-
核心的處理函數(shù)事HandleDelta函數(shù),它對這些資源變化進行處理分發(fā),核心邏輯如下:
informer本身會自帶indexer, 不管你使不使用,這是一個本隊的緩存
-
對于一個資源來說,HandleDelta會首先更新本地的indexer緩存。然后再將資源變化發(fā)給每個listener。注意:
(1)kube-apiserver 的add/update/delete事件,不一定是listener看到的事件。比如一個apiserver update事件,如果indexer沒有數(shù)據(jù),那么下發(fā)給listenner的時候就是一個add事件
(2)indexerInformer通過來指定resyncPeriod,表示indexer的數(shù)據(jù)會定期這個時間從apiserver拉起全量數(shù)據(jù)。這些就是sync事件。這個只會同步同步需要sync的listener。

5.參考
https://jimmysong.io/kubernetes-handbook/develop/client-go-informer-sourcecode-analyse.html