[k8s源碼分析][controller-manager] ReplicaSetController(ReplicaSet)分析(1)

1. 前言

轉(zhuǎn)載請(qǐng)說(shuō)明原文出處, 尊重他人勞動(dòng)成果!

源碼位置: https://github.com/nicktming/kubernetes/tree/tming-v1.13/pkg/controller/replicaset
分支: tming-v1.13 (基于v1.13版本)

關(guān)于各類controller都會(huì)用到informers, 所以關(guān)于informers, 可以參考 [k8s源碼分析][client-go] informer之SharedInformerFactory.

本文將分析ReplicaSetController是怎么工作的, 是如何控制pod的數(shù)量.

2. 例子

這里有兩個(gè)yaml文件, 一個(gè)生成pod, 一個(gè)生成replicaset, 但是該podlabelreplicasettemplatepod````的label```是一樣的.

matchpod.yaml

apiVersion: v1
kind: Pod
metadata:
  name: test
  labels:
    env: prod
spec:
  containers:
  - name: podtest
    image: nginx:now
    ports:
    - containerPort: 80

replicaset.yaml

[root@master kubectl]# cat replicaset.yaml 
apiVersion: apps/v1
kind: ReplicaSet
metadata:
  name: replicatest
spec:
  replicas: 2
  selector:
    matchLabels:
      env: prod
  template:
    metadata:
      labels:
        env: prod
    spec:
      containers:
      - name: nginx
        image: nginx:now

可以看到label的屬性都是env: prod.

2.1操作

先根據(jù)matchpod.yaml生成一個(gè)pod.

[root@master kubectl]# ./kubectl apply -f matchpod.yaml 
pod/test created
[root@master kubectl]# ./kubectl get pods
NAME   READY   STATUS    RESTARTS   AGE
test   1/1     Running   0          4s
[root@master kubectl]# ./kubectl get pod test -o yaml
apiVersion: v1
kind: Pod
metadata:
 ...
  labels:
    env: prod
  name: test
  namespace: default
  resourceVersion: "89705"    
...

創(chuàng)建一個(gè)replicaset

[root@master kubectl]# ./kubectl get rs
No resources found.
[root@master kubectl]# ./kubectl get pod
NAME   READY   STATUS    RESTARTS   AGE
test   1/1     Running   0          90s
[root@master kubectl]# ./kubectl apply -f replicaset.yaml 
replicaset.apps/replicatest created
[root@master kubectl]# ./kubectl get rs
NAME          DESIRED   CURRENT   READY   AGE
replicatest   2         2         2       4s
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
replicatest-rdjv6   1/1     Running   0          8s
test                1/1     Running   0          2m16s
[root@master kubectl]# ./kubectl get pod test -o yaml
apiVersion: v1
kind: Pod
metadata:
 ...
  labels:
    env: prod
  name: test
  namespace: default
  ownerReferences:
  - apiVersion: apps/v1
    blockOwnerDeletion: true
    controller: true
    kind: ReplicaSet
    name: replicatest
    uid: 74bda7cc-f671-11e9-bb78-525400d54f7e
  resourceVersion: "90171"
...

podownerReferences屬性中可以看到該replicaset已經(jīng)將前面創(chuàng)建的pod(test)歸屬到自己名下了. 該podresourceVersion已經(jīng)從89705變成了90171.
接下來(lái)看看刪除replicaset.

[root@master kubectl]# ./kubectl get rs
NAME          DESIRED   CURRENT   READY   AGE
replicatest   2         2         2       2m36s
[root@master kubectl]# ./kubectl get pods
NAME                READY   STATUS    RESTARTS   AGE
replicatest-rdjv6   1/1     Running   0          2m42s
test                1/1     Running   0          4m50s
[root@master kubectl]# ./kubectl delete rs replicatest
replicaset.extensions "replicatest" deleted
[root@master kubectl]# ./kubectl get pods
NAME   READY   STATUS        RESTARTS   AGE
test   0/1     Terminating   0          5m10s
[root@master kubectl]# ./kubectl get pods
No resources found.
[root@master kubectl]# 

可以看到之前創(chuàng)建的pod也被刪除了.

如果對(duì)此例子有不理解的, 那本文就可以解決這個(gè)疑惑.

3. 啟動(dòng)

關(guān)于kube-controller-manager組件整體的運(yùn)行會(huì)有專門博客介紹, 這里直接看一下replicaset這個(gè)controller是如何啟動(dòng)的.

// cmd/kube-controller-manager/app/controllermanager.go
func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc {
    controllers := map[string]InitFunc{}
    ...
    controllers["replicaset"] = startReplicaSetController
    ...
    return controllers
}
// cmd/kube-controller-manager/app/apps.go
func startReplicaSetController(ctx ControllerContext) (http.Handler, bool, error) {
    if !ctx.AvailableResources[schema.GroupVersionResource{Group: "apps", Version: "v1", Resource: "replicasets"}] {
        return nil, false, nil
    }
    go replicaset.NewReplicaSetController(
        ctx.InformerFactory.Apps().V1().ReplicaSets(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.ClientBuilder.ClientOrDie("replicaset-controller"),
        replicaset.BurstReplicas,
    ).Run(int(ctx.ComponentConfig.ReplicaSetController.ConcurrentRSSyncs), ctx.Stop)
    return nil, true, nil
}

都是同樣的操作, 生成一個(gè)ReplicaSetController并傳入相關(guān)參數(shù). 然后以goroutine的方式啟動(dòng)Run方法.

4. ReplicaSetController

type ReplicaSetController struct {
    // GroupVersionKind indicates the controller type.
    schema.GroupVersionKind
    // clientset 與api-server打交道
    kubeClient clientset.Interface
    // 操作pod 與api-server打交道
    podControl controller.PodControlInterface
    // 一次性最多增加/刪除的pod的個(gè)數(shù)
    burstReplicas int
    syncHandler func(rsKey string) error
    expectations *controller.UIDTrackingControllerExpectations
    rsLister appslisters.ReplicaSetLister
    rsListerSynced cache.InformerSynced
    podLister corelisters.PodLister
    podListerSynced cache.InformerSynced
    // Controllers that need to be synced
    // 一個(gè)隊(duì)列 用來(lái)解耦生產(chǎn)者和消費(fèi)者
    queue workqueue.RateLimitingInterface
}
func NewReplicaSetController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int) *ReplicaSetController {
    eventBroadcaster := record.NewBroadcaster()
    eventBroadcaster.StartLogging(klog.Infof)
    eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
    return NewBaseController(rsInformer, podInformer, kubeClient, burstReplicas,
        apps.SchemeGroupVersion.WithKind("ReplicaSet"),
        "replicaset_controller",
        "replicaset",
        controller.RealPodControl{
            KubeClient: kubeClient,
            Recorder:   eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "replicaset-controller"}),
        },
    )
}
func NewBaseController(rsInformer appsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, kubeClient clientset.Interface, burstReplicas int,
    gvk schema.GroupVersionKind, metricOwnerName, queueName string, podControl controller.PodControlInterface) *ReplicaSetController {
    if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
        metrics.RegisterMetricAndTrackRateLimiterUsage(metricOwnerName, kubeClient.CoreV1().RESTClient().GetRateLimiter())
    }

    rsc := &ReplicaSetController{
        GroupVersionKind: gvk,
        kubeClient:       kubeClient,
        podControl:       podControl,
        burstReplicas:    burstReplicas,
        expectations:     controller.NewUIDTrackingControllerExpectations(controller.NewControllerExpectations()),
        queue:            workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName),
    }

    rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        DeleteFunc: rsc.enqueueReplicaSet,
    })
    rsc.rsLister = rsInformer.Lister()
    rsc.rsListerSynced = rsInformer.Informer().HasSynced

    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
    rsc.podLister = podInformer.Lister()
    rsc.podListerSynced = podInformer.Informer().HasSynced
    rsc.syncHandler = rsc.syncReplicaSet
    return rsc
}

重要的地方都在NewBaseController方法中了.
1. rsInformer添加了關(guān)于處理replicaset的自定義邏輯.
2. podInformer添加了關(guān)于處理pod的自定義邏輯, 因?yàn)?code>replicaset是來(lái)管理屬于它的pod的, 當(dāng)然需要監(jiān)控pod的變化.
3. queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), queueName) 這個(gè)是workqueue的初始化, 用來(lái)解耦生產(chǎn)者和消費(fèi)者的, 主要是為解決消費(fèi)者處理過(guò)慢. 關(guān)于workqueue可以參考 [k8s源碼分析][client-go] workqueue.
4. rsc.syncHandler = rsc.syncReplicaSet 這個(gè)是核心方法, 也就是workqueue的消費(fèi)者.
5. 初始化了expectations. 可以參考 [k8s源碼分析][controller-manager] controller_utils分析. 關(guān)于該controller整個(gè)對(duì)expectations的操作會(huì)在后面做統(tǒng)一分析.

4.1 replicaset的啟動(dòng)

func (rsc *ReplicaSetController) Run(workers int, stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
    defer rsc.queue.ShutDown()

    controllerName := strings.ToLower(rsc.Kind)
    // controllerName = ReplicaSet
    klog.Infof("Starting %v controller", controllerName)
    defer klog.Infof("Shutting down %v controller", controllerName)
    // 等待同步
    if !controller.WaitForCacheSync(rsc.Kind, stopCh, rsc.podListerSynced, rsc.rsListerSynced) {
        return
    }
    // 啟動(dòng)多個(gè)goroutine同時(shí)執(zhí)行rsc.worker方法
    for i := 0; i < workers; i++ {
        go wait.Until(rsc.worker, time.Second, stopCh)
    }
    // 等待結(jié)束
    <-stopCh
}
func (rsc *ReplicaSetController) worker() {
    for rsc.processNextWorkItem() {
    }
}
func (rsc *ReplicaSetController) processNextWorkItem() bool {
    // 從queue中取一個(gè)元素
    key, quit := rsc.queue.Get()
    if quit {
        // 如果queue已經(jīng)關(guān)閉 直接返回
        return false
    }
    // 等到該key處理結(jié)束 調(diào)用Done方法表示結(jié)束 可以參考workqueue的實(shí)現(xiàn)
    defer rsc.queue.Done(key)
    // 處理該對(duì)象
    err := rsc.syncHandler(key.(string))
    if err == nil {
        // 處理成功 
        rsc.queue.Forget(key)
        return true
    }
    // 處理失敗 重新加回到queue中
    utilruntime.HandleError(fmt.Errorf("Sync %q failed with %v", key, err))
    rsc.queue.AddRateLimited(key)
    return true
}

可以看到Run啟動(dòng)幾個(gè)goroutine無(wú)限制調(diào)用syncHandler方法處理從隊(duì)列queue中的元素.
消費(fèi)者: 所以現(xiàn)在可以知道了syncHandler方法就是消費(fèi)者, 而syncHandler就是上面提到的syncReplicaSet方法.
生產(chǎn)者: 現(xiàn)在知道如何消費(fèi)queue里面的元素, 那queue里面的元素是怎么來(lái)的呢? 接下來(lái)看一下生產(chǎn)者.

architecture.png

4.2 生產(chǎn)者

可想而知生產(chǎn)者就是rsInformer, podInformer監(jiān)控著整個(gè)replicasetpod的變化, 當(dāng)有變化的時(shí)候就需要對(duì)某一些對(duì)象做操作, 就會(huì)放到queue中.

rsInformer
rsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc:    rsc.enqueueReplicaSet,
        UpdateFunc: rsc.updateRS,
        DeleteFunc: rsc.enqueueReplicaSet,
    })
func (rsc *ReplicaSetController) enqueueReplicaSet(obj interface{}) {
    key, err := controller.KeyFunc(obj)
    if err != nil {
        utilruntime.HandleError(fmt.Errorf("couldn't get key for object %+v: %v", obj, err))
        return
    }
    rsc.queue.Add(key)
}
func (rsc *ReplicaSetController) updateRS(old, cur interface{}) {
    oldRS := old.(*apps.ReplicaSet)
    curRS := cur.(*apps.ReplicaSet)

    
    if *(oldRS.Spec.Replicas) != *(curRS.Spec.Replicas) {
        klog.V(4).Infof("%v %v updated. Desired pod count change: %d->%d", rsc.Kind, curRS.Name, *(oldRS.Spec.Replicas), *(curRS.Spec.Replicas))
    }
    // 把新的replicaset進(jìn)隊(duì)列
    rsc.enqueueReplicaSet(cur)
}

可以看到增加/刪除/更新全部都是直接進(jìn)隊(duì)列, 更新的時(shí)候只進(jìn)最新的那個(gè), 舊的就不要了.

podInformer
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: rsc.addPod,
        UpdateFunc: rsc.updatePod,
        DeleteFunc: rsc.deletePod,
    })
addPod
func (rsc *ReplicaSetController) resolveControllerRef(namespace string, controllerRef *metav1.OwnerReference) *apps.ReplicaSet {
    // We can't look up by UID, so look up by Name and then verify UID.
    // Don't even try to look up by Name if it's the wrong Kind.
    if controllerRef.Kind != rsc.Kind {
        return nil
    }
    // 從本地緩存中取出該controller
    rs, err := rsc.rsLister.ReplicaSets(namespace).Get(controllerRef.Name)
    if err != nil {
        // 如果本地緩存中沒(méi)有 則返回nil
        return nil
    }
    // 本地緩存中的controllerRef與pod中owner controller不一致
    if rs.UID != controllerRef.UID {
        // The controller we found with this Name is not the same one that the
        // ControllerRef points to.
        return nil
    }
    // 就pod所對(duì)應(yīng)的owner rs
    return rs
}
func (rsc *ReplicaSetController) addPod(obj interface{}) {
    pod := obj.(*v1.Pod)
    if pod.DeletionTimestamp != nil {
        // 該pod處理terminating中
        rsc.deletePod(pod)
        return
    }
    if controllerRef := metav1.GetControllerOf(pod); controllerRef != nil {
        rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
        if rs == nil {
            // 表明該pod中的owner已經(jīng)不存在了或者已經(jīng)更新了
            return
        }
        // 該pod對(duì)應(yīng)的rs
        rsKey, err := controller.KeyFunc(rs)
        if err != nil {
            return
        }
        klog.V(4).Infof("Pod %s created: %#v.", pod.Name, pod)
        rsc.expectations.CreationObserved(rsKey)
        // 將該rs加入到queue中
        rsc.enqueueReplicaSet(rs)
        return
    }
    // 該pod是個(gè)孤兒pod
    // 獲得與該pod可以match的所有replicaset
    rss := rsc.getPodReplicaSets(pod)
    if len(rss) == 0 {
        // 如果沒(méi)有任何的replicaset與該pod匹配 則不用處理了 
        return
    }
    klog.V(4).Infof("Orphan Pod %s created: %#v.", pod.Name, pod)
    for _, rs := range rss {
        // 將這些replicaset都加入到隊(duì)列中
        rsc.enqueueReplicaSet(rs)
    }
}

當(dāng)podInformer中增加一個(gè)Pod時(shí):
1. 如果該pod處于terminating中, 則再次刪除并返回, 與controller沒(méi)有關(guān)系, 所以返回.
2. 如果該pod中的owner已經(jīng)不存在了或者已經(jīng)更新了, 直接返回.(可能有人會(huì)疑惑, 該podowner已經(jīng)不存在了, 為什么不刪除該pod, 這個(gè)與刪除的策略有關(guān), 在garbagecollector中會(huì)分析)
3. 如果該podowner并且該owner是存在的, 那就把該owner也就是rs加入queue中.
4. 如果該pod中沒(méi)有owner信息并且集群中沒(méi)有任何的replicaset可以跟它匹配上, 那后面也就沒(méi)有必要做什么了直接返回.
5. 如果該pod中沒(méi)有owner信息, 將所有與該pod可以匹配上的replicaset加入到queue中.

deletePod
func (rsc *ReplicaSetController) deletePod(obj interface{}) {
    pod, ok := obj.(*v1.Pod)
    // 這個(gè)在deltaFIFO中已經(jīng)分析過(guò)了 如果當(dāng)前informer由于某種原因錯(cuò)過(guò)了Delete事件, 
    // 同步的時(shí)候會(huì)把這些對(duì)象設(shè)置為DeletedFinalStateUnknown結(jié)構(gòu)類型
    if !ok {
        tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %+v", obj))
            return
        }
        pod, ok = tombstone.Obj.(*v1.Pod)
        if !ok {
            utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a pod %#v", obj))
            return
        }
    }
    controllerRef := metav1.GetControllerOf(pod)
    if controllerRef == nil {
        // 該pod不屬于任何的replicaset 所以刪除就刪除了 無(wú)須做別的事情
        // No controller should care about orphans being deleted.
        return
    }
    rs := rsc.resolveControllerRef(pod.Namespace, controllerRef)
    if rs == nil {
        // 如果該pod的owner replicaset已經(jīng)不存在了 那也沒(méi)有必要處理了
        return
    }
    rsKey, err := controller.KeyFunc(rs)
    if err != nil {
        return
    }
    klog.V(4).Infof("Pod %s/%s deleted through %v, timestamp %+v: %#v.", pod.Namespace, pod.Name, utilruntime.GetCaller(), pod.DeletionTimestamp, pod)
    rsc.expectations.DeletionObserved(rsKey, controller.PodKey(pod))
    // 加入到queue中
    rsc.enqueueReplicaSet(rs)
}

當(dāng)podInformer中收到一個(gè)刪除Pod事件時(shí):
1. 如果該pod就是一個(gè)孤兒pod, 不屬于任何的replicaset. 那就沒(méi)有必要做什么處理了, 刪除就刪除了, 不會(huì)有任何影響.
2. 如果該pod所屬的owner(replicaset)已經(jīng)被刪除了, 那也沒(méi)有必要做什么處理了.
3. 12都不成立的時(shí)候, 刪除該pod對(duì)它所屬的owner有影響(比如會(huì)影響該replicaset達(dá)不到所要求的replicas數(shù)等等), 所以需要把該pod所屬的replicaset入隊(duì)列.

func (rsc *ReplicaSetController) updatePod(old, cur interface{}) {
    curPod := cur.(*v1.Pod)
    oldPod := old.(*v1.Pod)
    if curPod.ResourceVersion == oldPod.ResourceVersion {
        // 同一個(gè)ResourceVersion表明該pod沒(méi)有任何改變
        return
    }
    labelChanged := !reflect.DeepEqual(curPod.Labels, oldPod.Labels)
    if curPod.DeletionTimestamp != nil {
        rsc.deletePod(curPod)
        if labelChanged {
            rsc.deletePod(oldPod)
        }
        return
    }
    curControllerRef := metav1.GetControllerOf(curPod)
    oldControllerRef := metav1.GetControllerOf(oldPod)
    controllerRefChanged := !reflect.DeepEqual(curControllerRef, oldControllerRef)
    if controllerRefChanged && oldControllerRef != nil {
        if rs := rsc.resolveControllerRef(oldPod.Namespace, oldControllerRef); rs != nil {
            rsc.enqueueReplicaSet(rs)
        }
    }
    if curControllerRef != nil {
        rs := rsc.resolveControllerRef(curPod.Namespace, curControllerRef)
        if rs == nil {
            return
        }
        klog.V(4).Infof("Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
        rsc.enqueueReplicaSet(rs)
        if !podutil.IsPodReady(oldPod) && podutil.IsPodReady(curPod) && rs.Spec.MinReadySeconds > 0 {
            klog.V(2).Infof("%v %q will be enqueued after %ds for availability check", rsc.Kind, rs.Name, rs.Spec.MinReadySeconds)
            rsc.enqueueReplicaSetAfter(rs, (time.Duration(rs.Spec.MinReadySeconds)*time.Second)+time.Second)
        }
        return
    }
    if labelChanged || controllerRefChanged {
        rss := rsc.getPodReplicaSets(curPod)
        if len(rss) == 0 {
            return
        }
        klog.V(4).Infof("Orphan Pod %s updated, objectMeta %+v -> %+v.", curPod.Name, oldPod.ObjectMeta, curPod.ObjectMeta)
        for _, rs := range rss {
            rsc.enqueueReplicaSet(rs)
        }
    }
}

稍微有點(diǎn)復(fù)雜. 大致意思如下, 但不會(huì)影響整體的理解.

// When a pod is updated, figure out what replica set/s manage it and wake them
// up. If the labels of the pod have changed we need to awaken both the old
// and new replica set. old and cur must be *v1.Pod types.
4.2.1 總結(jié)

可以看到生產(chǎn)者往隊(duì)列中放的元素是不帶有狀態(tài), 無(wú)論該對(duì)象是被刪除pod的時(shí)候或者增加一個(gè)replicaset的時(shí)候, 全部都是把需要放進(jìn)隊(duì)列的replicasetkey放入隊(duì)列中.

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容