1. 前言
轉(zhuǎn)載請說明原文出處, 尊重他人勞動成果!
源碼位置: https://github.com/nicktming/client-go/tree/tming-v13.0/tools/cache
分支: tming-v13.0 (基于v13.0版本)
本文將分析
tools/cache包中的DeltaFIFO. 主要會涉及到delta_fifo.go, 該類在整個informer體系中用于接受reflector送出來的數(shù)據(jù). 相當(dāng)于reflector是DeltaFIFO的生產(chǎn)者.
2. 整體接口與實現(xiàn)類
architecture.png
DeltaFIFO is like FIFO, but allows you to process changes to items which is delta.
DeltaFIFO is a producer-consumer queue, where a Reflector is intended to be the producer,
and the consumer is whatever calls the Pop() method.
A note on the KeyLister used by the DeltaFIFO:It's main purpose is to list keys that are "known",
for the purpose of figuring out which items have been deleted when Replace() or Delete() are
called. The deleted object will be included in the DeleteFinalStateUnknown markers.
關(guān)于
FIFO, 在 [k8s源碼分析][client-go] cache之fifo 中已經(jīng)分析了,DeltaFIFO也是Queue的一個實現(xiàn)類, 但是稍微比FIFO復(fù)雜一點.
type DeltaFIFO struct {
lock sync.RWMutex
cond sync.Cond
// items里面存的是key 以及該key對應(yīng)的pod的變化
// queue中存的是key 即出隊列的順序
items map[string]Deltas
queue []string
// populated is true if the first batch of items inserted by Replace() has been populated
// or Delete/Add/Update was called first.
populated bool
// initialPopulationCount is the number of items inserted by the first call of Replace()
initialPopulationCount int
// 生成key
keyFunc KeyFunc
// knownObjects list keys that are "known", for the
// purpose of figuring out which items have been deleted
// when Replace() or Delete() is called.
// 說白了 就是本地緩存
knownObjects KeyListerGetter
closed bool
closedLock sync.Mutex
}
// It tells you what change happened
type Delta struct {
Type DeltaType
Object interface{}
}
type Deltas []Delta
// DeltaType is the type of a change (addition, deletion, etc)
type DeltaType string
// Change type definition
const (
Added DeltaType = "Added"
Updated DeltaType = "Updated"
Deleted DeltaType = "Deleted"
// The other types are obvious. You'll get Sync deltas when:
// * A watch expires/errors out and a new list/watch cycle is started.
// * You've turned on periodic syncs.
// (Anything that trigger's DeltaFIFO's Replace() method.)
Sync DeltaType = "Sync"
)
type DeletedFinalStateUnknown struct {
Key string
Obj interface{}
}
// A KeyListerGetter is anything that knows how to list its keys and look up by key.
type KeyListerGetter interface {
KeyLister
KeyGetter
}
// A KeyLister is anything that knows how to list its keys.
type KeyLister interface {
ListKeys() []string
}
// A KeyGetter is anything that knows how to get the value stored under a given key.
type KeyGetter interface {
GetByKey(key string) (interface{}, bool, error)
}
與
FIFO相比, 主要有以下幾點不同:
1.items中的value不再只存著該key對應(yīng)的obj, 而是obj的一系列變化, 用一個數(shù)組來表示. 包括添加/更新/刪除等等. 因此衍生出來了很多結(jié)構(gòu)體和方法, 包括Deltas,Delta等等.
2. 增加了本地緩存knownObjects KeyListerGetter,KeyListerGetter提供了兩個方法分別是從本地緩存中獲得所有的key和根據(jù)key找到對應(yīng)的obj. 當(dāng)程序中錯過了某些event, 比如deletion event, 會造成服務(wù)器數(shù)據(jù)庫中沒有該obj, 而本地緩存中有該obj, 從而造成數(shù)據(jù)不一致, 那么在同步的過程中會有所操作. (其實KeyListerGetter在informers體系中是一個Indexer. [k8s源碼分析][client-go] cache之store和index)
或許有人會疑惑會為什么需要用另外一個屬性來緩存呢?items屬性不就可以當(dāng)做緩存了嗎? 理由是: items只是暫時性存儲, 當(dāng)調(diào)用pop的時候?qū)?yīng)的數(shù)據(jù)就會從items中刪除了, 而knownObjects會維護(hù)本地緩存.
3.DeletedFinalStateUnknown: 當(dāng)一個obj被刪除了, 但是這個程序這邊由于某種原因miss了這次deletion event, 那么假如在做同步操作時, 從服務(wù)器獲取的列表中已經(jīng)沒有了這個obj, 因為該程序沒有接收到deletion event, 所以該obj在本地緩存中依然存在, 所以此時會給這個obj構(gòu)造成這個DeletedFinalStateUnknown類型.
3. 方法
在講方法的同時盡量用個例子來進(jìn)行說明. 先定義一下類和方法.
func testFifoObjectKeyFunc(obj interface{}) (string, error) {
return obj.(testFifoObject).name, nil
}
type testFifoObject struct {
name string
val interface{}
}
func mkFifoObj(name string, val interface{}) testFifoObject {
return testFifoObject{name: name, val: val}
}
// helper function to reduce stuttering
func testPop(f *DeltaFIFO) testFifoObject {
return Pop(f).(Deltas).Newest().Object.(testFifoObject)
}
// keyLookupFunc adapts a raw function to be a KeyLookup.
type keyLookupFunc func() []testFifoObject
// ListKeys just calls kl.
func (kl keyLookupFunc) ListKeys() []string {
result := []string{}
for _, fifoObj := range kl() {
result = append(result, fifoObj.name)
}
return result
}
// GetByKey returns the key if it exists in the list returned by kl.
func (kl keyLookupFunc) GetByKey(key string) (interface{}, bool, error) {
for _, v := range kl() {
if v.name == key {
return v, true, nil
}
}
return nil, false, nil
}
Add 和 Update
func (f *DeltaFIFO) Add(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Added, obj)
}
func (f *DeltaFIFO) Update(obj interface{}) error {
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
return f.queueActionLocked(Updated, obj)
}
func (f *DeltaFIFO) Delete(obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
f.lock.Lock()
defer f.lock.Unlock()
f.populated = true
if f.knownObjects == nil {
// 如果沒有設(shè)置本地緩存
if _, exists := f.items[id]; !exists {
// 如果items中沒有該元素, 返回
return nil
}
} else {
_, exists, err := f.knownObjects.GetByKey(id)
_, itemsExist := f.items[id]
if err == nil && !exists && !itemsExist {
// 如果本地緩存和items中都沒有, 返回
return nil
}
}
return f.queueActionLocked(Deleted, obj)
}
1.
Delete方法有所不一樣, 需要判斷本地緩存. 這三個方法都是需要調(diào)用queueActionLocked來進(jìn)行操作.
2. 都設(shè)置populated為true, 跟在FIFO中 [k8s源碼分析][client-go] cache之fifo 的行為一樣.
func (f *DeltaFIFO) KeyOf(obj interface{}) (string, error) {
// 如果是Deltas, 也就是該obj的變化, 取最后一個操作的obj
if d, ok := obj.(Deltas); ok {
if len(d) == 0 {
return "", KeyError{obj, ErrZeroLengthDeltasObject}
}
obj = d.Newest().Object
}
// 如果該是DeletedFinalStateUnknown類型, 表明在服務(wù)器端已經(jīng)被刪除了, 在本地緩存中依然存在
if d, ok := obj.(DeletedFinalStateUnknown); ok {
return d.Key, nil
}
// 根據(jù)obj生成key
return f.keyFunc(obj)
}
// 目前這里的操作只是去判斷最后兩個元素是不是都是delete, 如果是則進(jìn)行合并, 就選其中一個即可
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
a := &deltas[n-1]
b := &deltas[n-2]
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
func isDup(a, b *Delta) *Delta {
if out := isDeletionDup(a, b); out != nil {
return out
}
// TODO: Detect other duplicate situations? Are there any?
return nil
}
// a:倒數(shù)第一個 b:倒數(shù)第二個
// 如果倒數(shù)第一個和倒數(shù)第二個都是Delete
// 如果倒數(shù)第二個是DeletedFinalStateUnknown 返回倒數(shù)第一個
// 如果倒數(shù)第二個不是DeletedFinalStateUnknown 返回倒數(shù)第二個
// 選擇一個盡量不是DeletedFinalStateUnknown的元素
func isDeletionDup(a, b *Delta) *Delta {
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// Do more sophisticated checks, or is this sufficient?
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
// 判斷該id的最后一次操作是不是Deleted操作
func (f *DeltaFIFO) willObjectBeDeletedLocked(id string) bool {
deltas := f.items[id]
return len(deltas) > 0 && deltas[len(deltas)-1].Type == Deleted
}
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 如果是Sync并且該元素中最后一次變化是刪除操作 就直接返回了
// 因為都已經(jīng)是刪除操作了, 在后面加一個Sync就沒有必要了 也可以方便用戶操作, 用戶判斷最后一個是不是delete會很方便
// Resync和Replace方法中有可能會調(diào)用Sync操作
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
newDeltas := append(f.items[id], Delta{actionType, obj})
newDeltas = dedupDeltas(newDeltas)
if len(newDeltas) > 0 {
if _, exists := f.items[id]; !exists {
f.queue = append(f.queue, id)
}
f.items[id] = newDeltas
f.cond.Broadcast()
} else {
delete(f.items, id)
}
return nil
}
1. 首先利用
KeyOf方法計算出該obj的id. (在KeyOf中如果該obj是一個Deltas類型, 則取最后一次變化的元素的id).
2. 如果是Sync操作(Resync和Replace方法中有可能會調(diào)用Sync操作), 如果該元素目前接受到的最后一次是刪除操作, 則這里直接返回.(因為服務(wù)器端已經(jīng)發(fā)出刪除指令了, 這里沒必要再給加Sync)
3. 判斷該元素的最后兩次操作是否相同(已經(jīng)把這次要加入的操作也算進(jìn)去了), 這里主要是進(jìn)行Delete操作判斷, 如果相同需要進(jìn)行合并.
例子
因為在
informers體系下knownObjects是真實存在的, 所以為了后面更好的理解informers, 所以例子中會帶有knownObjects.
f := NewDeltaFIFO(
testFifoObjectKeyFunc,
keyLookupFunc(func() []testFifoObject {
return []testFifoObject{mkFifoObj("foo", 5), mkFifoObj("bar", 6), mkFifoObj("baz", 7)}
}),
)
f.Update(mkFifoObj("baz", 18))
f.Add(mkFifoObj("foo", 10))
f.Update(mkFifoObj("bar", 15))
f.Update(mkFifoObj("foo", 15))
f.Delete(mkFifoObj("baz", 18))
1. 可以看到本地緩存中已經(jīng)保存了
foo,bar和baz, 說明之前已經(jīng)有Add這些event發(fā)生了, 所以本地緩存中有.(因為這是模擬, 所以就這樣說明一下)
2. 調(diào)用f.Add(mkFifoObj("foo", 10)), 按照上面的方法, 這個調(diào)用肯定是可以成功的.
3.Update操作與上面一樣的, 不多說了. 可以看f.Delete(mkFifoObj("baz", 20)), 并且本地緩存中有baz, 所以成功. 那什么時候會從本地緩存中刪除這個baz, 這個在informers體系中是在pop的時候去更新本地緩存的.pop出來的是一個Dletas(包含一系列該obj變化的delta數(shù)組), 那么我們自己定義的邏輯要怎么處理就怎么處理, 在informers的邏輯中, 是for這個數(shù)組對本地緩存進(jìn)行操作. 所以在沒有出隊列之前, 本地緩存中的值還是原來的值.
func newInformer(
lw ListerWatcher,
objType runtime.Object,
resyncPeriod time.Duration,
h ResourceEventHandler,
clientState Store,
) Controller {
// clientState就是本地緩存 對應(yīng)的knownObjects
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, clientState)
cfg := &Config{
Queue: fifo,
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: resyncPeriod,
RetryOnError: false,
Process: func(obj interface{}) error {
// from oldest to newest 出隊列的數(shù)組 一個一個操作
for _, d := range obj.(Deltas) {
switch d.Type {
case Sync, Added, Updated:
// 更新本地緩存
if old, exists, err := clientState.Get(d.Object); err == nil && exists {
if err := clientState.Update(d.Object); err != nil {
return err
}
h.OnUpdate(old, d.Object)
} else {
// 添加到本地緩存
if err := clientState.Add(d.Object); err != nil {
return err
}
h.OnAdd(d.Object)
}
case Deleted:
// 刪除本地緩存
if err := clientState.Delete(d.Object); err != nil {
return err
}
h.OnDelete(d.Object)
}
}
return nil
},
}
return New(cfg)
}
最終的結(jié)果如下:
add/update/delete.png
pop方法
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
f.lock.Lock()
defer f.lock.Unlock()
for {
for len(f.queue) == 0 {
if f.IsClosed() {
return nil, ErrFIFOClosed
}
f.cond.Wait()
}
id := f.queue[0]
f.queue = f.queue[1:]
if f.initialPopulationCount > 0 {
f.initialPopulationCount--
}
item, ok := f.items[id]
if !ok {
continue
}
delete(f.items, id)
err := process(item)
if e, ok := err.(ErrRequeue); ok {
f.addIfNotPresent(id, item)
err = e.Err
}
return item, err
}
}
1. 這里的出隊列與
FIFO一樣, 但是有一點區(qū)別是這里出隊列的是一個數(shù)組, 而不是某一個obj. 從上面的例子中看, 第一個出隊列的是foo, 它出來的是一個數(shù)組[{Add, {foo, 10}}, {Update, {foo, 15}}], 然后用戶的process方法需要處理的是這樣的一個數(shù)組.
2. 如果initialPopulationCount > 0, 表明Replace是比Add/Update/AddIfNotPresent/Delete先調(diào)用 然后設(shè)置了initialPopulationCount就是第一次調(diào)用Replace中加入的元素個數(shù), 那在pop中對于initialPopulationCount--做的操作就是每出來一個元素就減少一個, 等到initialPopulationCount=0的時候, 也就表明第一次調(diào)用replace加入的元素已經(jīng)全部出隊列了.
例子
所以按照上面的例子出一個隊列看一下, 然后
process PopProcessFunc就用上面informers用的那個process的邏輯. 所以出隊列的是baz, 然后依次對它的兩個變化{Update, {baz, 18}}和{Delete, {baz, 20}}進(jìn)行操作.
delete.png
delta_update.png
delta_delete.png
Replace
Replace的作用很清楚, 就是用傳入的list來代替之前這里的所有元素, 與FIFO不同的是, 這里的操作都是針對變化, 這里DeltaFIFO就給那些要刪除的元素發(fā)送一個Delete操作, 給那些不需要刪除的元素發(fā)送一個Sync操作表示已經(jīng)完成同步.
那么問題來了, 哪些元素是要被刪除的元素呢? 在傳入的
list中沒有出現(xiàn)的元素就是要刪除的元素. 再想一下,DeletedFinalStateUnknown出現(xiàn)的原因是因為某種原因miss了Delete event, 現(xiàn)在假設(shè)Replace中的元素是從服務(wù)器中最新獲取的所有真正存在的元素, 并且該程序沒有錯誤任何的刪除事件, 那么傳入的list的keys與knownObjects中的keys應(yīng)該是一樣的. 所以如果錯過了某些刪除事件, 那自然是knownObjects中多了一些已經(jīng)被刪除的obj.
理解了這些, 再看代碼就會輕松很多了.
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 將要加入的list放到keys中
// 給list中的每一個item發(fā)送Sync操作
for _, item := range list {
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
keys.Insert(key)
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
if f.knownObjects == nil {
// Do deletion detection against our own list.
// 如果沒有設(shè)置本地緩存
queuedDeletions := 0
for k, oldItem := range f.items {
// 如果新加的list中有 因為已經(jīng)發(fā)送Sync操作了 所以就不需要了
if keys.Has(k) {
continue
}
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
queuedDeletions++
// 不在list中的元素需要被刪除
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
// While there shouldn't be any queued deletions in the initial
// population of the queue, it's better to be on the safe side.
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
// Detect deletions not already in the queue.
// 這里可能有人會疑惑為什么不刪除f.items里面的元素, 因為f.items里面有的元素會出現(xiàn)在本地緩存中的, 所以直接對本地緩存做操作即可
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
if keys.Has(k) {
continue
}
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
klog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
klog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
queuedDeletions++
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
1. 可以看到在做刪除操作的時候, 給這些
obj都是構(gòu)造成DeletedFinalStateUnknown類型.
2.initialPopulationCount將會被設(shè)置成len(list) + queuedDeletions, 也就是說要等待同步的元素和那些錯過了刪除事件的元素全部出隊列完成才可以說同步成功.
例子
在上面的基礎(chǔ)上增加個
Replace操作.
1.pop() --> 本地緩存中就會有foo的記錄, queue和item中會刪除foo
2.pop() --> 本地緩存中就會有bar的記錄, queue和item中會刪除bar
3.Add {baz, 15} pop() --> 本地緩存中就會有baz的記錄
4.Update {foo, 10} Update {foo, 15} --> foo會加入到queue和item中
5.Update {bar, 15} --> bar會加入到queue和item中
6.Update {baz, 18} Delete {baz, 18} --> baz會加入到queue和item中
7.Replace [{foo, 15}, {baz, 18}]
前面
6步會得到下面的這個狀態(tài), 這里就不一一解釋了, 上面已經(jīng)分析過了.
before_replace.png
可以看到在本地緩存中有
foo,bar和baz. 現(xiàn)在假設(shè)Replace中的list是從服務(wù)器上最新獲取來的元素列表. 那么可以看到bar已經(jīng)被刪除了, 而foo和baz還在服務(wù)器中.
另外要說明的一點是baz已經(jīng)接收到了delete操作, 怎么服務(wù)器上還有呢?這里有可能是獲得的列表是在發(fā)出了刪除命令與完全刪除之間這段時間獲取的, 也有可能是刪除了之后又添加了一個這樣的obj, 但是DeltaQueue還沒有收到Add event, 不過有待考證, 這里只是想為了說明在Delete之后發(fā)送了Sync操作是不添加在后面的.
replace.png
從圖中可以看到:
foo: 在list中, 所以發(fā)送一個sync請求.
bar: 不在list中, 說明服務(wù)器端已經(jīng)刪除了, 該DeltaQueue由于某種原因(比如網(wǎng)絡(luò))沒有收到delete event, 也有可能還沒有出隊列等等原因, 所以需要加一個Delete操作, 并且構(gòu)造成DeletedFinalStateUnknown類型. 另外在這里仔細(xì)思考一下之前關(guān)于isDeletionDup的操作是不是更清晰, 就是盡量選從上流程序發(fā)過來的Delete event帶有的obj.
Delete: 因為baz在list中, 所以發(fā)一個sync操作, 但是由于該元素之前已經(jīng)接收到了Delete請求, 所以再把該Sync加到后面, 在出隊列pop中informers(上面有提到), 判斷是Sync并且本地緩存中沒有的時候會把該obj重新加回去.
Resync
func (f *DeltaFIFO) syncKey(key string) error {
f.lock.Lock()
defer f.lock.Unlock()
return f.syncKeyLocked(key)
}
func (f *DeltaFIFO) syncKeyLocked(key string) error {
obj, exists, err := f.knownObjects.GetByKey(key)
if err != nil {
klog.Errorf("Unexpected error %v during lookup of key %v, unable to queue object for sync", err, key)
return nil
} else if !exists {
// 如果該元素在本地緩存中不存在 則返回
klog.Infof("Key %v does not exist in known objects store, unable to queue object for sync", key)
return nil
}
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
if len(f.items[id]) > 0 {
return nil
}
if err := f.queueActionLocked(Sync, obj); err != nil {
return fmt.Errorf("couldn't queue object: %v", err)
}
return nil
}
向本地緩存中的那些不在
items里面的元素發(fā)一個Sync操作.
HasSynced
func (f *DeltaFIFO) HasSynced() bool {
f.lock.Lock()
defer f.lock.Unlock()
return f.populated && f.initialPopulationCount == 0
}
假設(shè)此時該
DeltaFIFQ對象剛剛初始化.
1. 如果啥方法都沒有調(diào)用, 那么HasSynced返回false, 因為populated=false.
2. 如果先調(diào)用Add/Update/AddIfNotPresent/Delete后(后面調(diào)用什么函數(shù)都不用管了), 那么HasSynced返回true, 因為populated=true并且initialPopulationCount == 0.
3. 如果先調(diào)用Replace(后面調(diào)用什么函數(shù)都不用管了), 那么必須要等待該replace方法加入元素的個數(shù)和DeletedFinalStateUnknown(也就是那些本地緩存上有服務(wù)器上沒有的元素)全部pop之后,HasSynced才會返回true, 因為只有全部pop完了之后initialPopulationCount才減為0.
informer整體
整個
informer體系在k8s代碼中占有重要一環(huán), 理解informer可以更好理解k8s的工作機(jī)制.
informer.png
1. [k8s源碼分析][client-go] informer之store和index
2. [k8s源碼分析][client-go] informer之delta_fifo
3. [k8s源碼分析][client-go] informer之reflector
4. [k8s源碼分析][client-go] informer之controller和shared_informer(1)
5. [k8s源碼分析][client-go] informer之controller和shared_informer(2)
6. [k8s源碼分析][client-go] informer之SharedInformerFactory







