1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
在 [k8s源碼分析][client-go] informer之store和index 和 [k8s源碼分析][client-go] informer之store和index 和 [k8s源碼分析][client-go] informer之reflector 的基礎(chǔ)上進行分析, 接下來將會分析如何生成一個
informer, 并且用戶如何添加自己的邏輯, 與用戶層越來越接近了.
2. 接口與類
這里先介紹后面需要用到的幾個接口與結(jié)構(gòu)體
architecture.png
2.1 ResourceEventHandler
// client-go/tools/cache/controller.go
type ResourceEventHandler interface {
OnAdd(obj interface{})
OnUpdate(oldObj, newObj interface{})
OnDelete(obj interface{})
}
相信對這三個函數(shù)比較熟悉, 用戶可以在這里定義自己的邏輯.
2.2 processorListener
type processorListener struct {
nextCh chan interface{}
addCh chan interface{}
// 一個自定義處理數(shù)據(jù)的handler
handler ResourceEventHandler
// 一個環(huán)形的buffer 存著那些還沒有被分發(fā)的notifications
pendingNotifications buffer.RingGrowing
// requestedResyncPeriod is how frequently the listener wants a full resync from the shared informer
requestedResyncPeriod time.Duration
// informer's overall resync check period.
resyncPeriod time.Duration
// 下次要resync的時候
nextResync time.Time
resyncLock sync.Mutex
}
func newProcessListener(handler ResourceEventHandler, requestedResyncPeriod, resyncPeriod time.Duration, now time.Time, bufferSize int) *processorListener {
ret := &processorListener{
nextCh: make(chan interface{}),
addCh: make(chan interface{}),
handler: handler,
pendingNotifications: *buffer.NewRingGrowing(bufferSize),
requestedResyncPeriod: requestedResyncPeriod,
resyncPeriod: resyncPeriod,
}
ret.determineNextResync(now)
return ret
}
func (p *processorListener) determineNextResync(now time.Time) {
p.resyncLock.Lock()
defer p.resyncLock.Unlock()
// now加上該listener的resyncPeriod就是下次要resync的時間
p.nextResync = now.Add(p.resyncPeriod)
}
關(guān)于
buffer.NewRingGrowing是一個無限的循環(huán)數(shù)組, 無限的意思是當你想要在增加一個元素, 發(fā)現(xiàn)整個數(shù)組滿了, 此時會進行擴容, 如果一直擴容, 會被OOM殺死.
關(guān)于
shouldResync和setResyncPeriod比較簡單就不多說了. 這里說一下三個比較重要的方法add,pop和run方法.
add
add方法是由上層程序調(diào)用的, 也就是往該listener發(fā)送了一個新的notification. 相當于生產(chǎn)者.
func (p *processorListener) add(notification interface{}) {
p.addCh <- notification
}
pop 和 run
func (p *processorListener) pop() {
defer utilruntime.HandleCrash()
defer close(p.nextCh) // Tell .run() to stop
var nextCh chan<- interface{}
var notification interface{}
for {
select {
case nextCh <- notification:
// Notification dispatched
var ok bool
// notification從緩沖區(qū)pendingNotifications中讀 然后傳遞給nextCh
notification, ok = p.pendingNotifications.ReadOne()
if !ok { // Nothing to pop
nextCh = nil // Disable this select case
}
case notificationToAdd, ok := <-p.addCh:
if !ok {
return
}
if notification == nil {
// 如果notification還沒有初始化 則進行初始化notification和nextCh
notification = notificationToAdd
nextCh = p.nextCh
} else { // There is already a notification waiting to be dispatched
// 直接往pendingNotifications中寫
p.pendingNotifications.WriteOne(notificationToAdd)
}
}
}
}
func (p *processorListener) run() {
stopCh := make(chan struct{})
wait.Until(func() {
// this gives us a few quick retries before a long pause and then a few more quick retries
err := wait.ExponentialBackoff(retry.DefaultRetry, func() (bool, error) {
// 從nextCh讀取并調(diào)用該listener的handler進行處理
for next := range p.nextCh {
switch notification := next.(type) {
case updateNotification:
p.handler.OnUpdate(notification.oldObj, notification.newObj)
case addNotification:
p.handler.OnAdd(notification.newObj)
case deleteNotification:
p.handler.OnDelete(notification.oldObj)
default:
utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
}
}
// the only way to get here is if the p.nextCh is empty and closed
return true, nil
})
// the only way to get here is if the p.nextCh is empty and closed
if err == nil {
close(stopCh)
}
}, 1*time.Minute, stopCh)
}
pop和run屬于消費者, 消費從add方法中過來的notification, 但是為了防止處理速度(調(diào)用handler)跟不上生產(chǎn)速度, 設(shè)置了一個緩沖區(qū)pendingNotifications, 把從add中過來的notification先加入到pendingNotifications, 然后從pendingNotifications讀取一個notification后, 將notification通過nextCh這個channel來進而傳遞給消費者run.
work_flow.png
2.3 sharedProcessor
type sharedProcessor struct {
// 判斷l(xiāng)isteners有沒有啟動
listenersStarted bool
listenersLock sync.RWMutex
// 所有的processorListener
listeners []*processorListener
// 所有的需要sync的processorListener 動態(tài)變化
syncingListeners []*processorListener
clock clock.Clock
wg wait.Group
}
這里
sharedProcessor就是管理著所有的processorListener, 簡單一點理解就是當拿到一個數(shù)據(jù), 然后可以分發(fā)給所有的listeners.
resyncCheckPeriodChanged
func (p *sharedProcessor) resyncCheckPeriodChanged(resyncCheckPeriod time.Duration) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
for _, listener := range p.listeners {
// 根據(jù)listener自己要求的requestedResyncPeriod和resyncCheckPeriod來決定該listener真正的resyncPeriod
resyncPeriod := determineResyncPeriod(listener.requestedResyncPeriod, resyncCheckPeriod)
listener.setResyncPeriod(resyncPeriod)
}
}
// 1. 如果desired或check其中一個是0 則返回0
// 2. 返回max(desired, check)
func determineResyncPeriod(desired, check time.Duration) time.Duration {
if desired == 0 {
return desired
}
if check == 0 {
klog.Warningf("The specified resyncPeriod %v is invalid because this shared informer doesn't support resyncing", desired)
return 0
}
if desired < check {
klog.Warningf("The specified resyncPeriod %v is being increased to the minimum resyncCheckPeriod %v", desired, check)
return check
}
return desired
}
resyncCheckPeriodChanged的作用是根據(jù)resyncCheckPeriod會重新生成一下每個listener自己的resyncPeriod.
對于每一個listener:
1. 如果自己要求的requestedResyncPeriod為0或被要求的resyncCheckPeriod其中一個是0, 則返回0.
2. 則返回兩個其中最大的一個.
shouldResync
func (p *sharedProcessor) shouldResync() bool {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.syncingListeners = []*processorListener{}
resyncNeeded := false
now := p.clock.Now()
for _, listener := range p.listeners {
if listener.shouldResync(now) {
resyncNeeded = true
p.syncingListeners = append(p.syncingListeners, listener)
listener.determineNextResync(now)
}
}
return resyncNeeded
}
可以看到該方法會重新生成
syncingListeners, 遍歷所有的listeners, 判斷哪個已經(jīng)到了resync時間, 如果到了就加入到syncingListeners中, 并且它的下一次resync的時間.
如果所有的
listeners都沒有到resync時間, 那該sharedProcessor對象的shouldResync會返回false. 否則會返回true.
run
func (p *sharedProcessor) run(stopCh <-chan struct{}) {
func() {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 以goroutine的方式啟動所有的listeners監(jiān)聽
for _, listener := range p.listeners {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
p.listenersStarted = true
}()
// 等待有信號告知退出
<-stopCh
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
// 關(guān)閉所有l(wèi)istener的addCh channel
for _, listener := range p.listeners {
// 通知pop()停止 pop()會告訴run()停止
close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
}
// 等待所有的pop()和run()方法退出
p.wg.Wait() // Wait for all .pop() and .run() to stop
}
run方法主要是啟動所有的listener進行監(jiān)聽.
其余方法
func (p *sharedProcessor) addListener(listener *processorListener) {
p.listenersLock.Lock()
defer p.listenersLock.Unlock()
p.addListenerLocked(listener)
// 如果已經(jīng)啟動了
if p.listenersStarted {
p.wg.Start(listener.run)
p.wg.Start(listener.pop)
}
}
func (p *sharedProcessor) addListenerLocked(listener *processorListener) {
p.listeners = append(p.listeners, listener)
p.syncingListeners = append(p.syncingListeners, listener)
}
// 分發(fā)信息
func (p *sharedProcessor) distribute(obj interface{}, sync bool) {
p.listenersLock.RLock()
defer p.listenersLock.RUnlock()
if sync {
// 如果是sync操作 只需要分發(fā)給那些resync時間到了的listener即可
for _, listener := range p.syncingListeners {
listener.add(obj)
}
} else {
// 如果不是sync操作 則通知所有的listeners
for _, listener := range p.listeners {
listener.add(obj)
}
}
}
addListener: 表示增加一個
processorListener, 如果sharedProcessor已經(jīng)啟動run方法了(listenersStarted=true), 那么就啟動該listener的run和pop監(jiān)控.
distribute: 分發(fā)消息, 也就是說
sharedProcessor收到一個obj, 然后把該obj分發(fā)給它的listeners, 那么每個listeners都可以收到這個obj.
informer整體
整個
informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機制.
informer.png
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory


