3. Controller
此處分析的文件位置在
client-go/tools/cache/controller.go
3.1 接口
type Controller interface {
Run(stopCh <-chan struct{})
HasSynced() bool
LastSyncResourceVersion() string
}
Controller接口有三個方法.
3.2 實現(xiàn)類controller
type controller struct {
config Config
reflector *Reflector
reflectorMutex sync.RWMutex
clock clock.Clock
}
type Config struct {
// 是一個DeltaFIFO
Queue
// Something that can list and watch your objects.
ListerWatcher
// 自定義的處理邏輯
Process ProcessFunc
// 該controller針對的類型
ObjectType runtime.Object
// resync時間
FullResyncPeriod time.Duration
ShouldResync ShouldResyncFunc
// 發(fā)生錯誤的時候是否需要重新進到隊列中
RetryOnError bool
}
type ShouldResyncFunc func() bool
type ProcessFunc func(obj interface{}) error
// 根據(jù)Config生成一個controller對象
func New(c *Config) Controller {
ctlr := &controller{
config: *c,
clock: &clock.RealClock{},
}
return ctlr
}
關于
Reflector在 [k8s源碼分析][client-go] informer之reflector 中已經(jīng)分析過了.
Queue基本上都是DeltaFIFO.
方法
func (c *controller) HasSynced() bool {
return c.config.Queue.HasSynced()
}
func (c *controller) LastSyncResourceVersion() string {
c.reflectorMutex.RLock()
defer c.reflectorMutex.RUnlock()
if c.reflector == nil {
return ""
}
return c.reflector.LastSyncResourceVersion()
}
1.
HasSynced調用的是DeltaFIFO的方法. 在 [k8s源碼分析][client-go] informer之delta_fifo 已經(jīng)分析過了.
2.LastSyncResourceVersion調用的是reflector的方法, [k8s源碼分析][client-go] informer之reflector 中已經(jīng)分析過了.
func (c *controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
go func() {
<-stopCh
c.config.Queue.Close()
}()
// 構造一個reflector
r := NewReflector(
c.config.ListerWatcher,
c.config.ObjectType,
c.config.Queue,
c.config.FullResyncPeriod,
)
r.ShouldResync = c.config.ShouldResync
r.clock = c.clock
c.reflectorMutex.Lock()
c.reflector = r
c.reflectorMutex.Unlock()
var wg wait.Group
defer wg.Wait()
// goroutine啟動reflector.Run方法
// 所有從listwatcher中的數(shù)據(jù)會存到DeltaFIFO 也就是r.store=c.config.Queue
wg.StartWithChannel(stopCh, r.Run)
// 循環(huán)執(zhí)行processLoop
wait.Until(c.processLoop, time.Second, stopCh)
}
func (c *controller) processLoop() {
for {
// 從DeltaFIFO出隊列的邏輯已經(jīng)分析過了
// 從DeltaFIFO出隊列執(zhí)行用戶邏輯c.config.Process方法
obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
if err != nil {
if err == ErrFIFOClosed {
// 如果deltaFIFO已經(jīng)關閉 則返回
return
}
if c.config.RetryOnError {
// This is the safe way to re-enqueue.
// 如果設置了重試 則重新加入到deltaFIFO中
c.config.Queue.AddIfNotPresent(obj)
}
}
}
}
所以
controller的run的主要目的是reflector一直在往DeltaFIFO中存數(shù)據(jù), 另外一邊是一直從DeltaFIFO中出隊并且給自定義用戶邏輯c.config.Process處理.
controller.png
4. SharedInformer
最外層的類, 也是與用戶接觸的類, 用戶通過該接口的方法來進行自定義配置.
4.1 接口
type SharedInformer interface {
// 增加用戶自己的自定義處理邏輯
AddEventHandler(handler ResourceEventHandler)
// 增加用戶自己的自定義處理邏輯 帶有resyncPeriod時間
AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
// 獲得Store 也就是DeltaFIFO
GetStore() Store
// 獲得Controller 也就是controller
GetController() Controller
Run(stopCh <-chan struct{})
// HasSynced returns true if the shared informer's store has been
// informed by at least one full LIST of the authoritative state
// of the informer's object collection. This is unrelated to "resync".
HasSynced() bool
// 該SharedInformer對應的類型的上一次處理的ResourceVersion
LastSyncResourceVersion() string
}
type SharedIndexInformer interface {
SharedInformer
// AddIndexers add indexers to the informer before it starts.
AddIndexers(indexers Indexers) error
GetIndexer() Indexer
}
4.2 實現(xiàn)類
type sharedIndexInformer struct {
// 本地緩存
indexer Indexer
controller Controller
processor *sharedProcessor
cacheMutationDetector MutationDetector
listerWatcher ListerWatcher
// 該sharedIndexInformers監(jiān)控的類型
objectType runtime.Object
// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can call
// shouldResync to check if any of our listeners need a resync.
// 在reflector中每隔resyncCheckPeriod時間會調用shouldResync方法來判斷是否有任何一個listener需要resync操作
resyncCheckPeriod time.Duration
defaultEventHandlerResyncPeriod time.Duration
clock clock.Clock
started, stopped bool
startedLock sync.Mutex
// 可以停止分發(fā)obj給各個listeners
// 因為HandleDeltas方法需要得到該鎖, 如果失去了該鎖, 就只能等到再次獲得鎖之后再分發(fā)
blockDeltas sync.Mutex
}
func NewSharedInformer(lw ListerWatcher, objType runtime.Object, resyncPeriod time.Duration) SharedInformer {
return NewSharedIndexInformer(lw, objType, resyncPeriod, Indexers{})
}
func NewSharedIndexInformer(lw ListerWatcher, objType runtime.Object, defaultEventHandlerResyncPeriod time.Duration, indexers Indexers) SharedIndexInformer {
realClock := &clock.RealClock{}
sharedIndexInformer := &sharedIndexInformer{
processor: &sharedProcessor{clock: realClock},
indexer: NewIndexer(DeletionHandlingMetaNamespaceKeyFunc, indexers),
listerWatcher: lw,
objectType: objType,
resyncCheckPeriod: defaultEventHandlerResyncPeriod,
defaultEventHandlerResyncPeriod: defaultEventHandlerResyncPeriod,
cacheMutationDetector: NewCacheMutationDetector(fmt.Sprintf("%T", objType)),
clock: realClock,
}
return sharedIndexInformer
}
4.3 方法
AddEventHandler 和 AddEventHandlerWithResyncPeriod
增加一個用戶自定義的
EventHandler實現(xiàn)類.
func (s *sharedIndexInformer) AddEventHandler(handler ResourceEventHandler) {
s.AddEventHandlerWithResyncPeriod(handler, s.defaultEventHandlerResyncPeriod)
}
const minimumResyncPeriod = 1 * time.Second
func (s *sharedIndexInformer) AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration) {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.stopped {
// 該sharedIndexInformer已經(jīng)結束
klog.V(2).Infof("Handler %v was not added to shared informer because it has stopped already", handler)
return
}
if resyncPeriod > 0 {
// 如果比最小的resync時間minimumResyncPeriod還要小 就取最小的minimumResyncPeriod
if resyncPeriod < minimumResyncPeriod {
klog.Warningf("resyncPeriod %d is too small. Changing it to the minimum allowed value of %d", resyncPeriod, minimumResyncPeriod)
resyncPeriod = minimumResyncPeriod
}
if resyncPeriod < s.resyncCheckPeriod {
// 如果比該sharedIndexInformer的resyncCheckPeriod小
// 1. 如果該sharedIndexInformer已經(jīng)啟動 那把resyncPeriod變?yōu)閞esyncCheckPeriod時間
// 2. 如果該sharedIndexInformer沒有啟動 那就盡量讓resyncCheckPeriod變小點 改成resyncPeriod時間 再重新計算各個listeners的resync時間
if s.started {
klog.Warningf("resyncPeriod %d is smaller than resyncCheckPeriod %d and the informer has already started. Changing it to %d", resyncPeriod, s.resyncCheckPeriod, s.resyncCheckPeriod)
resyncPeriod = s.resyncCheckPeriod
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod, update
// resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all the listeners
// accordingly
s.resyncCheckPeriod = resyncPeriod
s.processor.resyncCheckPeriodChanged(resyncPeriod)
}
}
}
listener := newProcessListener(handler, resyncPeriod, determineResyncPeriod(resyncPeriod, s.resyncCheckPeriod), s.clock.Now(), initialBufferSize)
if !s.started {
s.processor.addListener(listener)
return
}
// in order to safely join, we have to
// 1. stop sending add/update/delete notifications
// 2. do a list against the store
// 3. send synthetic "Add" events to the new handler
// 4. unblock
// 1. 如果獲得鎖 意味著HandleDeltas方法會失去該鎖 無法分發(fā)消息了(stop sending add/update/delete notifications)
// 2. 從本地緩存中取出所有對象(do a list against the store)
// 3. 將這些對象發(fā)送一個Add事件給這個listener并且交給新的handler處理(send synthetic "Add" events to the new handler)
// 4. 解鎖(unblock)
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
s.processor.addListener(listener)
for _, item := range s.indexer.List() {
// 把本地緩存中的數(shù)據(jù)往該listener中加入一遍
listener.add(addNotification{newObj: item})
}
}
1. 根據(jù)相關規(guī)則計算出一個的
resyncPeriod.
2. 根據(jù)該eventHandler生成一個ProcessListener.
3. 如果當前還沒有sharedIndexInformer還沒有啟動, 則直接加入到sharedIndexInformer.processor中即可. 如果已經(jīng)啟動, 那說明該listener已經(jīng)錯過了之前的那些event, 所以在加入到sharedIndexInformer.processor后還需要將本地緩存中數(shù)據(jù)統(tǒng)一發(fā)一個addNotification到現(xiàn)在要加入的listener.
Run
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
// 生成一個DeltaFIFO 并且knowObjects是s.indexer 也就是本地緩存
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
// 配置controller的Config對象
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
// 對應的是sharedProcessor的shouldResync 會去計算所有的listeners是否有誰到了resync的時間
ShouldResync: s.processor.shouldResync,
// 出DeltaFIFO隊列的時候 調用用戶自定義的處理邏輯 在這里是HandleDeltas
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
// 生成controller
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
// 啟動cacheMutationDetector
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
// 啟動所有的listeners進行監(jiān)聽
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners
}()
// 啟動controller
s.controller.Run(stopCh)
}
func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
// 先獲得blockDeltas鎖
s.blockDeltas.Lock()
defer s.blockDeltas.Unlock()
// from oldest to newest
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
isSync := d.Type == Sync
// 往cacheMutationDetector增加
s.cacheMutationDetector.AddObject(d.Object)
if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
// 更新本地緩存
if err := s.indexer.Update(d.Object); err != nil {
return err
}
// 根據(jù)isSync分發(fā)給對應的listeners
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
} else {
// 增加到本地緩存
if err := s.indexer.Add(d.Object); err != nil {
return err
}
// 根據(jù)isSync分發(fā)給對應的listeners
s.processor.distribute(addNotification{newObj: d.Object}, isSync)
}
case Deleted:
// 從本地緩存中刪除
if err := s.indexer.Delete(d.Object); err != nil {
return err
}
// 分發(fā)給所有的listeners
s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
}
}
return nil
}
在
controller中已經(jīng)看到controller.Run的功能. 然而sharedIndexInformer中配置了controller.config.process = HandleDeltas. 另外還啟動了一些listeners來監(jiān)聽.
informer.png
從圖中就可以
HandleDeltas從DeltaFIFO的pop方法中得到了Delta, (具體可以參考 [k8s源碼分析][client-go] informer之delta_fifo.), 然后做了兩件事情:
1. 根據(jù)Add/Update/Delete類型操作本地存儲Indexer.
2. 將當前obj構造成notification類型分發(fā)給所有的listeners, 然后每個listener都會調用用戶的ResouceEventHandler進行處理.
其他方法
func (s *sharedIndexInformer) HasSynced() bool {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return false
}
// 實際上調用的是DeltaFIFO的HasSynced方法
return s.controller.HasSynced()
}
func (s *sharedIndexInformer) LastSyncResourceVersion() string {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.controller == nil {
return ""
}
// 實際上調用的是reflector的LastSyncResourceVersion方法
// 返回的是上一次reflector往deltaFIFO中的obj的resourceversion
return s.controller.LastSyncResourceVersion()
}
func (s *sharedIndexInformer) GetStore() Store {
return s.indexer
}
func (s *sharedIndexInformer) GetIndexer() Indexer {
return s.indexer
}
func (s *sharedIndexInformer) AddIndexers(indexers Indexers) error {
s.startedLock.Lock()
defer s.startedLock.Unlock()
if s.started {
return fmt.Errorf("informer has already started")
}
// 只能在沒有啟動前加入
return s.indexer.AddIndexers(indexers)
}
informer整體
整個
informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機制.
informer.png
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory


