k8s調(diào)度器源碼解析

k8s調(diào)度器

版本號:v1.27.2

調(diào)度入口

kubernetes 調(diào)度器入口文件位于 /cmd/kube-scheduler/scheduler.go,其中用到的核心庫位于 /pkg/scheduler。

調(diào)用流程:

`app.NewSchedulerCommand()` 
                            ↓
`runCommand(cmd, opts, registryOptions...)` 
                            ↓
`cc, sched, err := Setup(ctx, opts, registryOptions...)`
                                                    ↓
                            `sched, err := scheduler.New(cc.Client,...) ` 
                            ↓                               
` Run(ctx, cc, sched)` 
                            |
                            |
                            ---->   `sched.Run(ctx)` (/pkg/scheduler) -> `sched.SchedulingQueue.Run()`
                                                                                                                                                ↓   
                                                                                 go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

最后到達文件 /pkg/scheduler/scheduler.go#L241, 這里創(chuàng)建了一個 Scheduler 是我們重點關注的地方。

Scheduler 結構體

不過為了后面方便理解,我們先看一下 Scheduler結構體的結構。

從注釋信息我們可以看到其主要功能是 watch未調(diào)度的pod,它將試圖找一個合適的 Node ,然后將其寫回到 api server。

// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
    // 通過 NodeLister 和 Algorithm 將觀察到通過 Cache 所做的任何更改
    // It is expected that changes made via Cache will be observed
    // by NodeLister and Algorithm.
    Cache internalcache.Cache

    // 擴展器是外部進程影響Kubernetes做出的調(diào)度決策的接口,通常是不由Kubernetes直接管理的資源所需要的。
    Extenders []framework.Extender

    // 以阻塞的形式獲取下一個有效的待調(diào)度的Pod。這里不使用channel,主要是因為對一個pod的調(diào)度可能需要一些時間
    // NextPod should be a function that blocks until the next pod
    // is available. We don't use a channel for this, because scheduling
    // a pod may take some amount of time and we don't want pods to get
    // stale while they sit in a channel.
    NextPod func() *framework.QueuedPodInfo

    // 調(diào)度失敗處理 Handler
    // FailureHandler is called upon a scheduling failure.
    FailureHandler FailureHandlerFn

    // 試圖將一個Pod調(diào)度到其中一個節(jié)點上,成功則返回 ScheduleResult, 否則返回帶有失敗原來的 FitError
    // SchedulePod tries to schedule the given pod to one of the nodes in the node list.
    // Return a struct of ScheduleResult with the name of suggested host on success,
    // otherwise will return a FitError with reasons.
    SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)

    // Close this to shut down the scheduler.
    StopEverything <-chan struct{}

    // 等待被調(diào)度的Pod隊列,也就是說只有從這個隊列里獲取的Pod才能被調(diào)度
    // SchedulingQueue holds pods to be scheduled
    SchedulingQueue internalqueue.SchedulingQueue

    // 調(diào)度配置profiles
    // Profiles are the scheduling profiles.
    Profiles profile.Map

    // api server 專用 client 
    client clientset.Interface

    // 節(jié)點信息快照
    nodeInfoSnapshot *internalcache.Snapshot

    // 節(jié)點打分相關
    percentageOfNodesToScore int32

    nextStartNodeIndex int
}

通過結構體我們大概可以猜到一些信息:

  • 調(diào)度中某些操作與 Cache 有關,主要是性能優(yōu)化考慮,設計文檔參考 /pkg/scheduler/internal/cache/interface.go
  • 通過 NextPod 函數(shù)可以從調(diào)度隊列 SchedulingQueue 里獲取一個需要待調(diào)度的 Pod 信息,其結構體 *framework.QueuedPodInfo 內(nèi)潛了 *PodInfo 結構體,而 *PodInfo 又內(nèi)嵌了 *v1.Pod 結構體, MakeNextPodFunc() 函數(shù)
  • 通過 SchedulePod 函數(shù)對一個 Pod 進行調(diào)度,其中傳遞的Pod參數(shù)為 *v1.Pod,調(diào)度成功返回 ScheduleResult, 如果失敗則返回 *FitError*
  • 如果調(diào)度失敗需要進行 FailureHandler 處理
  • 所有需要調(diào)度的Pod必須從 SchedulingQueue 里獲取
  • 需要用 client 與api server 通訊,是不是需要保存節(jié)點與Pod綁定關系?因在結構體的注釋提到過 api server
  • 調(diào)度器可能需要在一定的時機將其關閉
  • Profiles 用在什么地方?
  • 節(jié)點快照 nodeInfoSnapshot 是干什么用的?
  • 節(jié)點打分 percentageOfNodesToScore ,它是集群中可用節(jié)點的百分比閾值。一旦發(fā)現(xiàn)足夠的節(jié)點可以滿足調(diào)度一個 pod 的條件,調(diào)度器就會停止在集群中繼續(xù)尋找更多的可用節(jié)點,以提高調(diào)度器的性能。無論這個閾值的值是多少,調(diào)度器始終至少嘗試找到 "minFeasibleNodesToFind" 個可用節(jié)點。例如,如果集群大小為 500 個節(jié)點,PercentageOfNodesToScore 的值為 30,則一旦找到 150 個可行的節(jié)點,調(diào)度器就會停止繼續(xù)尋找更多的可行的節(jié)點。當值為0時,將使用默認百分比 (基于集群大小的5%--50%) 來評分節(jié)點,它將覆蓋全局 PercentageOfNodesToScore。如果沒有設置,將使用全局 PercentageOfNodesToScore。
  • 其它

接著我們帶著上面的一些個人理解到片面信息與疑問,看一下 Scheduler 的內(nèi)部實現(xiàn)原理。

Scheduler 創(chuàng)建

了解了結構體后,我們再看下 Scheduler 的函數(shù) New() 實現(xiàn)

我們先看一下函數(shù)原型

func New(client clientset.Interface,
    informerFactory informers.SharedInformerFactory,
    dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
    recorderFactory profile.RecorderFactory,
    stopCh <-chan struct{},
    opts ...Option) (*Scheduler, error) {}

參數(shù)解釋:

client 就是 api server 的客戶端,一般是通過讀取默認配置文件 $HOME/.kube/config 生成

informerFactory 一個 SharedInformerFactory,參考 https://blog.haohtml.com/archives/32179

dynInformerFactory Dynamic 客戶端的 Shared 的 Informer

recorderFactory 一個 scheduler name 的事件記錄器

stopCh 終止退出信號

opts 就是scheduler 的選項函數(shù),對scheduler 進行初始化

對于返回結果就是一個 Scheduler 對象。

下面我們一塊看一下其具體實現(xiàn)

配置選項初始化

函數(shù)選項式模式對調(diào)度器結構體初始化

    // 函數(shù)選項式模塊初始化調(diào)度器結構體
    options := defaultSchedulerOptions
    for _, opt := range opts {
        opt(&options)
    }

這里的 defaultSchedulerOptions 是一個單獨的結構體,原型為

type schedulerOptions struct {
    componentConfigVersion string
    kubeConfig             *restclient.Config
    // Overridden by profile level percentageOfNodesToScore if set in v1.
    percentageOfNodesToScore          int32
    podInitialBackoffSeconds          int64
    podMaxBackoffSeconds              int64
    podMaxInUnschedulablePodsDuration time.Duration
    // Contains out-of-tree plugins to be merged with the in-tree registry.
    frameworkOutOfTreeRegistry frameworkruntime.Registry
    profiles                   []schedulerapi.KubeSchedulerProfile
    extenders                  []schedulerapi.Extender
    frameworkCapturer          FrameworkCapturer
    parallelism                int32
    applyDefaultProfile        bool
}

這里有兩個重要的字段,一個是 profiles 字段,另一個是 extenders 字段。

profiles 字段

profiels 字估類型為 []schedulerapi.KubeSchedulerProfile 結構類型

// KubeSchedulerProfile is a scheduling profile.
type KubeSchedulerProfile struct {
    SchedulerName            string  // 調(diào)度器名稱
    PercentageOfNodesToScore *int32  // 集群中可用節(jié)點的百分比閾值
    Plugins                  *Plugins  // 所有插件,包含啟用和禁用
    PluginConfig             []PluginConfig // 每個插件的一組可選的自定義插件參數(shù),如果省略則使用默認配置
}

從結構體大致就可以知道它其實是一個調(diào)度器切片,每個調(diào)度器都有一組 Plugins。

而這個 Plugins 結構為

type Plugins struct {
    // PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
    PreEnqueue PluginSet

    // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
    QueueSort PluginSet

    // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework.
    PreFilter PluginSet

    // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod.
    Filter PluginSet

    // PostFilter is a list of plugins that are invoked after filtering phase, but only when no feasible nodes were found for the pod.
    PostFilter PluginSet

    // PreScore is a list of plugins that are invoked before scoring.
    PreScore PluginSet

    // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase.
    Score PluginSet

    // Reserve is a list of plugins invoked when reserving/unreserving resources
    // after a node is assigned to run the pod.
    Reserve PluginSet

    // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod.
    Permit PluginSet

    // PreBind is a list of plugins that should be invoked before a pod is bound.
    PreBind PluginSet

    // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework.
    // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success.
    Bind PluginSet

    // PostBind is a list of plugins that should be invoked after a pod is successfully bound.
    PostBind PluginSet

    // MultiPoint is a simplified config field for enabling plugins for all valid extension points
    MultiPoint PluginSet
}

type PluginSet struct {
    // Enabled指定除了默認插件之外還應啟用的插件。這些是在默認插件之后調(diào)用的,并且按照此處指定的相同順序
    Enabled []Plugin
    // Disabled指定應禁用的默認插件。當需要禁用所有默認插件時,只包含一個“*”的數(shù)組應該be provided.
    Disabled []Plugin
}

type Plugin struct {
    Name string  // 插件名
    Weight int32  // 權重,僅在評分時使用
}

執(zhí)行時。插件包括多個擴展點,當指定時特定擴展點的插件列表是唯一啟用的插件。如果從配置中省略了一個擴展點,那么該擴展點將使用默認的插件集。

啟用的插件按此處指定的順序調(diào)用,在默認插件之后調(diào)用。如果需要在默認插件之前調(diào)用它們,則必須在此處按所需順序禁用并重新啟用默認插件。

       PreEnqueue  // 進入調(diào)度隊列前調(diào)用
                        ↓   
       QueueSort    // 在 scheduler queue 里對Pod排序調(diào)用 
                        ↓   
PreFilter -> Filter -> PostFilter  // 其中 Filter 是在篩選出無法運行Pod的節(jié)點時調(diào)用
                        ↓                                            // PostFilter 是在過濾階段后但找不到pod的可行節(jié)點時調(diào)用
        PreScore -> Score 
                        ↓   
                 Reserve
                        ↓                                       
                    Permit
                        ↓                        
PreBind -> Bind -> PostBind
                        ↓   
        MultiPoint

對于這些插件列表它們每個都有對應的 extension points ,也只有在這些對應的擴展點時它們才有機會被調(diào)用。

PreEnqueue 進入調(diào)度隊列前調(diào)用

QueueSort 在 scheduler queue 里對Pod排序調(diào)用

PreFilter 在 scheduling framework 的 PreFilter 擴展點調(diào)用

Filter 篩選出無法運行 Pod 的 node 時調(diào)用

PostFilter 過濾階段后但找不到 pod 的可行 node 時調(diào)用, 與 Filter 的擴展點不一樣

PreScore 評分前調(diào)用

Score已通過篩選階段的節(jié)點進行排名時調(diào)用

Reserve 在分配 node 運行pod后, reserving/unreserving 服務資源時調(diào)用

Permit 用于控制Pod的綁定,這些插件可以 阻止 或 延遲 Pod的綁定

PreBind 綁定pod之前調(diào)用

Bindscheduling framework.Bind擴展點調(diào)用

PostBind 在pod成功綁定后調(diào)用

MultiPoint 在所有有效的擴展點被調(diào)用,也就是說這個插件是對全局擴展點有效的。

下面我們將會用到PreQueue 這個插件列表

總結

profiles 字段是一個 scheduler 切片,而每個 scheduler 對應一些插件,這些插件又按執(zhí)行擴展點的先后順序進行了分組 Plugins Set, 組里的插件也只有在他們自己的 extension points 才會被調(diào)用,對于 MultiPoint 會在所有有效的 extension points 被調(diào)用。

現(xiàn)在我們再回到初始化調(diào)度器的主流程。

    if options.applyDefaultProfile {
        // 調(diào)度器配置,很重要,特別是  Profiles  和 Extenders 字段
        
        // 對象轉(zhuǎn)換,將 versiondCfg 轉(zhuǎn)為 cfg
        var versionedCfg configv1.KubeSchedulerConfiguration
        scheme.Scheme.Default(&versionedCfg)
        cfg := schedulerapi.KubeSchedulerConfiguration{}
        if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
            return nil, err
        }
        
        // 通過指定 "scheduler name" profile,可以用來控制Pod的調(diào)度行為,如何為空則默認使用 "default-scheduler" profile
        options.profiles = cfg.Profiles
    }

根據(jù)是否啟默認配置文件來初始化 defaultSchedulerOptions.Options 字段,通過 scheduler_name 來控制Pod調(diào)度行為(其實是通過scheduler_name 的 Plugins 實現(xiàn)對Pod的調(diào)度控制 )。

這里 KubeSchedulerConfiguration 結構體

// KubeSchedulerConfiguration configures a scheduler
type KubeSchedulerConfiguration struct {
    ...

    // Profiles are scheduling profiles that kube-scheduler supports. Pods can
    // choose to be scheduled under a particular profile by setting its associated
    // scheduler name. Pods that don't specify any scheduler name are scheduled
    // with the "default-scheduler" profile, if present here.
    // +listType=map
    // +listMapKey=schedulerName
    Profiles []KubeSchedulerProfile `json:"profiles,omitempty"`

    // Extenders are the list of scheduler extenders, each holding the values of how to communicate
    // with the extender. These extenders are shared by all scheduler profiles.
    // +listType=set
    Extenders []Extender `json:"extenders,omitempty"`
}

插件構建器注冊表

注冊所有內(nèi)置插件,registry 的數(shù)據(jù)結構為 map[string]PluginFactory

    // 插件構建函數(shù)注冊表,
    // 注冊表是所有可用插件的集合,framework 使用注冊表來啟用和初始化配置的插件。在初始化框架之前,所有插件都必須在注冊表中。
    registry := frameworkplugins.NewInTreeRegistry()
    if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
        return nil, err
    }

調(diào)用 frameworkplugins.NewInTreeRegistry() 實現(xiàn) in-tree插件的注冊,也可通過WithFrameworkOutOfTreeRegistry選項注冊其他插件

func NewInTreeRegistry() runtime.Registry {
    fts := plfeature.Features{
        EnableDynamicResourceAllocation:              feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
        EnableReadWriteOncePod:                       feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
        EnableVolumeCapacityPriority:                 feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
        EnableMinDomainsInPodTopologySpread:          feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
        EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
        EnableMatchLabelKeysInPodTopologySpread:      feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
        EnablePodSchedulingReadiness:                 feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
        EnablePodDisruptionConditions:                feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions),
        EnableInPlacePodVerticalScaling:              feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
    }

  // 所有內(nèi)置插件
    registry := runtime.Registry{
        dynamicresources.Name:                runtime.FactoryAdapter(fts, dynamicresources.New),
        selectorspread.Name:                  selectorspread.New,
        imagelocality.Name:                   imagelocality.New,
        tainttoleration.Name:                 tainttoleration.New,
        nodename.Name:                        nodename.New,
        nodeports.Name:                       nodeports.New,
        nodeaffinity.Name:                    nodeaffinity.New,
        podtopologyspread.Name:               runtime.FactoryAdapter(fts, podtopologyspread.New),
        nodeunschedulable.Name:               nodeunschedulable.New,
        noderesources.Name:                   runtime.FactoryAdapter(fts, noderesources.NewFit),
        noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
        volumebinding.Name:                   runtime.FactoryAdapter(fts, volumebinding.New),
        volumerestrictions.Name:              runtime.FactoryAdapter(fts, volumerestrictions.New),
        volumezone.Name:                      volumezone.New,
        nodevolumelimits.CSIName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
        nodevolumelimits.EBSName:             runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
        nodevolumelimits.GCEPDName:           runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
        nodevolumelimits.AzureDiskName:       runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
        nodevolumelimits.CinderName:          runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
        interpodaffinity.Name:                interpodaffinity.New,
        queuesort.Name:                       queuesort.New,
        defaultbinder.Name:                   defaultbinder.New,
        defaultpreemption.Name:               runtime.FactoryAdapter(fts, defaultpreemption.New),
        schedulinggates.Name:                 runtime.FactoryAdapter(fts, schedulinggates.New),
    }

    return registry
}

通過插件器可以實現(xiàn)的功能有實現(xiàn) 注冊新插件,取消注冊插件合并插件

type Registry map[string]PluginFactory
func (r Registry) Register(name string, factory PluginFactory) error {}
func (r Registry) Unregister(name string) error {}
func (r Registry) Merge(in Registry) error {}

extenders 擴展器構建

擴展器是外部進程影響Kubernetes做出的調(diào)度決策的接口。這通常是不由Kubernetes直接管理的資源所需要的。

// pkg/scheduler/apis/config/types.go#L261-L301
type Extender struct {
    URLPrefix        string
    FilterVerb       string
    PreemptVerb      string
    PrioritizeVerb   string
    Weight           int64
    BindVerb         string
    EnableHTTPS      bool
    TLSConfig        *ExtenderTLSConfig
    HTTPTimeout      v1.Duration
    NodeCacheCapable bool
    ManagedResources []ExtenderManagedResource
    Ignorable        bool
}

保存用于與擴展器通信的參數(shù),如果謂詞未指定/為空,則假定擴展程序選擇不提供該擴展。

    extenders, err := buildExtenders(options.extenders, options.profiles)

對于擴展器的構建需要 options.extenders 參數(shù), 其是擴展器參數(shù)配置 它返回的是一個 []framework.Extender 數(shù)據(jù)類型。我們看一下其具體實現(xiàn)

func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
    var fExtenders []framework.Extender
    if len(extenders) == 0 {
        return nil, nil
    }

    var ignoredExtendedResources []string
    var ignorableExtenders []framework.Extender
    for i := range extenders {
        // 根據(jù)每個擴展器參數(shù)配置創(chuàng)建一個 httpClient 對象的wrapper
        extender, err := NewHTTPExtender(&extenders[i])
        if err != nil {
            return nil, err
        }
        // 在調(diào)度時,當擴展程序返回錯誤或無法訪問時,擴展器是否被忽略
        if !extender.IsIgnorable() {
            fExtenders = append(fExtenders, extender)
        } else {
            ignorableExtenders = append(ignorableExtenders, extender)
        }
        for _, r := range extenders[i].ManagedResources {
            if r.IgnoredByScheduler {
                ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
            }
        }
    }
    // 將允許忽略的擴展器放在最后面
    fExtenders = append(fExtenders, ignorableExtenders...)

    // If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
    // This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
    // plugin args (and in which case the extender ignored resources take precedence).
    if len(ignoredExtendedResources) == 0 {
        return fExtenders, nil
    }

    // 更新每個scheduler name 中插件 noderesources.Name 的參數(shù)(prof.PluginConfig[k].Args.IgnoredResources)
    for i := range profiles {
        prof := &profiles[i]
        var found = false
        for k := range prof.PluginConfig {
            if prof.PluginConfig[k].Name == noderesources.Name {
                // Update the existing args
                pc := &prof.PluginConfig[k]
                args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
                if !ok {
                    return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
                }
                // 允許出錯被忽略的擴展器
                args.IgnoredResources = ignoredExtendedResources
                found = true
                break
            }
        }
        if !found {
            return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
        }
    }
    return fExtenders, nil
}

這里 profiles[i].PluginConfig[i].args 字段類型對應的是 NodeResourcesFitArgs,它是一個 []string 類型

type NodeResourcesFitArgs struct {
    v1.TypeMeta
    IgnoredResources      []string
    IgnoredResourceGroups []string
    ScoringStrategy       *ScoringStrategy
}

創(chuàng)建 podLister 和 nodeLister

    podLister := informerFactory.Core().V1().Pods().Lister()
    nodeLister := informerFactory.Core().V1().Nodes().Lister()

通過 informetFactory 工作模式創(chuàng)建 podLister 和 nodeLister, 可以用來獲取所有對象資源列表。如 posLister 可以用來獲取所有 Pod 列表, nodeLister 則可以獲取所有 node 列表。

創(chuàng)建scheduler.Profiles

現(xiàn)在到了最核心的部分,上面創(chuàng)建一些對象都會在這里被作為參數(shù)傳入。

    // 節(jié)點快照
    snapshot := internalcache.NewEmptySnapshot()
    
    // clusterEvent 集合
    clusterEventMap := make(map[framework.ClusterEvent]sets.String)
    metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)

    // 根據(jù)給出的profiles配置文件 生成對應的 frameworks,(Map類型,格式為 SchedulerName:Framework)
    // 而每一個 frameworks 都會對自己的 plugins 集進行管理,然后在調(diào)度上下文中的各種埋點執(zhí)行插件
    // registry 其實是 frameworkImpl.registry 字段
    profiles, err := profile.NewMap(
        // 調(diào)度器配置
        options.profiles, 
        
        // 插件構建器注冊表
        registry, 
        
        recorderFactory, stopCh,
        
        // 下面的全是 Options functions 模式,可任意數(shù)量
        // componentConfig 版本號
        frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),

        // api server 的客戶端,一般通過讀取 $HOME/.kube/config 文件創(chuàng)建
        frameworkruntime.WithClientSet(client),
        
        // kubeConfig,一般是 $HOME/kube/config 文件
        frameworkruntime.WithKubeConfig(options.kubeConfig), 
        
        // SharedInformerFactory
        frameworkruntime.WithInformerFactory(informerFactory),
        
        // 設置 SharedLister,用途?
        frameworkruntime.WithSnapshotSharedLister(snapshot),
        
// 回調(diào)函數(shù)             frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),

    // 設置一個 clusterEvent 的 Set 類型
        frameworkruntime.WithClusterEventMap(clusterEventMap),
        
        // 設置并行調(diào)度數(shù)量,默認為16
        frameworkruntime.WithParallelism(int(options.parallelism)),
        
        // 設置外部擴展器
        frameworkruntime.WithExtenders(extenders),
        
        // 指標記錄器
        frameworkruntime.WithMetricsRecorder(metricsRecorder),
    )

先是通過 Options functions 的方式指定了多個參數(shù),其函數(shù) profile.NewMap()實現(xiàn)

// NewMap builds the frameworks given by the configuration, indexed by name.
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
    stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
    m := make(Map)
    v := cfgValidator{m: m}

    for _, cfg := range cfgs {
     // 根據(jù)給的配置構建 Framework
        p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
        if err != nil {
            return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
        }
        if err := v.validate(cfg, p); err != nil {
            return nil, err
        }
        
        // scheduler name 為key, 以 Framework 為value
        m[cfg.SchedulerName] = p
    }
    return m, nil
}

根據(jù)每個 profile 調(diào)用 newProfile() 生成對應的 frameworkImpl,其實現(xiàn)了 Framework接口,這里真正的實現(xiàn)位于函數(shù) NewFramework(), 這里不做介紹。

type Framework interface {
        // 插件需要數(shù)據(jù)和一些工具,在插件初始化的時候通過 pluginFactory 傳入,見 NewInTreeRegistry() 函數(shù)
    Handle
    
    // 返回注冊過的 preEnqueue 插件列表
    PreEnqueuePlugins() []PreEnqueuePlugin
    
    // 獲取 pod 在隊列中的排序函數(shù)
    QueueSortFunc() LessFunc
    
    // PreFilter 插件
    RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
    
    // PostFilter
    RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)

        // PreBind
    RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    // PostBind
    RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
    // Reserve
    RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    // Unreserve
    RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
    
    // Permit
    RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    // 如果pod處于waiting狀態(tài),則將處于block狀態(tài),直到pod被 rejected 或 allowed
    WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
    
    // Bind
    RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
    
    // 是否定義了 Filter 插件
    HasFilterPlugins() bool
    // 是否定義了 PostFilter 插件
    HasPostFilterPlugins() bool
    // 是否定義了 Score
    HasScorePlugins() bool
    
    // 返回定義的plugins 對象
    ListPlugins() *config.Plugins
    ProfileName() string
    PercentageOfNodesToScore() *int32
    
    // 設置 SetPodNominator
    SetPodNominator(nominator PodNominator)
}

Framework 通過 scheduling framework 管理著一些插件集??梢栽谡{(diào)度上下文中的指定 points 調(diào)用配置的插件,這個在上面介紹插件的時候提到過。

調(diào)度隊列創(chuàng)建

首先遍歷 profiles 獲取其對應的已注冊好的 PreQueuePlugin 插件,這些插件是在將Pods添加到activeQ之前調(diào)用。

    preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
    for profileName, profile := range profiles {
        preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
    }

然后創(chuàng)建一個優(yōu)先隊列 sched.SchedulingQueue。

    // 初始化一個優(yōu)先隊列作為調(diào)度隊列 sched.SchedulingQueue
    podQueue := internalqueue.NewSchedulingQueue(
      // 獲取profile 設置的調(diào)度隊列Pod里的Pod排序函數(shù),這里指定獲取第 options.profiles[0]個profile
        profiles[options.profiles[0].SchedulerName].QueueSortFunc(), 
        
        // sharedInformerFactory
        informerFactory, 
    
    // 設置 pod 的 Initial階段的 Backoff 的持續(xù)時間
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
        
        // 最大backoff持續(xù)時間         internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
        
        // 設置podLister
        internalqueue.WithPodLister(podLister),
        
        // 設置 clusterEvent
        internalqueue.WithClusterEventMap(clusterEventMap),
        
        // 一個pod在 unschedulablePods 停留的最長時間
        internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
        
        // preEnqueuePluginMap 插件注冊
        internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap), 
        
        // 指標相關
        internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
        internalqueue.WithMetricsRecorder(*metricsRecorder),
    )

然后從原來的 profiles 中讀取一些配置,如 preEnqueuePlugin 、 Pod 在隊列里的排序函數(shù) QueueSortFunc() 作為調(diào)用函數(shù) internalqueue.NewSchedulingQueue() 的入?yún)?。函?shù)實現(xiàn)

// pkg/scheduler/internal/queue/scheduling_queue.go#L291-L330
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
    lessFn framework.LessFunc,
    informerFactory informers.SharedInformerFactory,
    opts ...Option,
) *PriorityQueue {
    options := defaultPriorityQueueOptions
    if options.podLister == nil {
        options.podLister = informerFactory.Core().V1().Pods().Lister()
    }
    for _, opt := range opts {
        opt(&options)
    }

  // pod 排序
    comp := func(podInfo1, podInfo2 interface{}) bool {
        pInfo1 := podInfo1.(*framework.QueuedPodInfo)
        pInfo2 := podInfo2.(*framework.QueuedPodInfo)
        return lessFn(pInfo1, pInfo2)
    }

  // 初始化優(yōu)先隊列
    pq := &PriorityQueue{
        // 存儲維護pod與其運行在哪個節(jié)點關系
        nominator:                         newPodNominator(options.podLister),
        clock:                             options.clock,
        stop:                              make(chan struct{}),
        podInitialBackoffDuration:         options.podInitialBackoffDuration,
        podMaxBackoffDuration:             options.podMaxBackoffDuration,
        
        // 一個pod在 unschedulablePods 停留的最長時間
        podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
        activeQ:                           heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
        
        // 包含已嘗試并確定為不可調(diào)度的pod, 其實是一個map類型,對應 podInfoMap map[string]*framework.QueuedPodInfo
        unschedulablePods:                 newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
        moveRequestCycle:                  -1,
        
        // clusterEvent
        clusterEventMap:                   options.clusterEventMap,

    // preEnqueuePlugin
        preEnqueuePluginMap:               options.preEnqueuePluginMap,
        metricsRecorder:                   options.metricsRecorder,
        pluginMetricsSamplePercent:        options.pluginMetricsSamplePercent,
    }
    pq.cond.L = &pq.lock
    pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
    pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()

    return pq
}

幾個重要的點已在代碼里進行了注釋,下面我們重點看理解隊列這個概念。

我們以這里的 PriorityQueue為例,其實現(xiàn)了 SchedulingQueue 接口。PriorityQueue 隊列 head 的 Pod 具有最高優(yōu)先級,它有兩個 sub queue 和一個可選的data structure

activeQ :子隊列,持有即將等待被調(diào)度的 Pod,將會根據(jù)其優(yōu)先級和其他配置信息來排隊等待調(diào)度程序?qū)ζ溥M行分配。

backoffQ :子隊列,雙稱為退避隊列。 持有從 unschedulablePods 中移走的Pod, 并將在其 backoff periods 退避期結束時移動到activeQ 隊列。Pod 在退避隊列中等待并定期嘗試進行重新調(diào)度。重新調(diào)度的頻率會按照一個指數(shù)級的算法定時增加,從而充分探索可用的資源,直到找到可以分配給該 Pod 的節(jié)點。

unschedulablePods :不可調(diào)度Pod的列表,也可以將其理解為不可調(diào)度隊列 unschedulable queue 。持有已嘗試進行調(diào)度且當前確定為不可調(diào)度的Pod。

三個隊列都是 Kubernetes 調(diào)度器的三個基本導向隊列,調(diào)度器將使用其內(nèi)置策略和其他算法來對它們進行維護和管理。

隊列機制將通過調(diào)用 Run() 創(chuàng)建兩個gorouteine進行從 podBackoffQactiveQ 的 Pod流轉(zhuǎn),這一點下面會介紹。

我們看一下隊列常用的一些方法

// 從 activeQ 隊列中以阻塞的方式獲取一個Pod,會將增加 scheduling cycle 的值
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error){}

// 更新一個隊列,分兩種情況:
// 不存在:如果 pod 是全新的,則直接添加到 activeQ 隊列
// 已存在:如果 pod 在 activeQ 或 backoffQ 隊列中已存在,則直接更新;否則,如果pod以可調(diào)度的方式更新,它將從不可調(diào)度隊列 unschedulablePods 中刪除,并將更新后的pod添加到 activeQ 中(如修正了原來pod的錯誤后恢復正常)。
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {}

// 從三個隊列中刪除 pod
func (p *PriorityQueue) Delete(pod *v1.Pod) error {}

// 將pod添加到 activeQ 隊列,如果添加的pod已經(jīng)在activeQ、backoffQ或unschedulablePods 存在,則不會有任何效果
func (p *PriorityQueue) Add(pod *v1.Pod) error {)

// 將給定的一組 Pod 從 unschedulablePods 或 backoffQ 隊列中移動到 activeQ 隊列中,前提是這些 Pod 正在 unschedulablePods 或 backoffQ 隊列中
func (p *PriorityQueue) Activate(pods map[string]*v1.Pod) {}

SetPodNominator

    // 根據(jù) profiles 設置PodNominator
    for _, fwk := range profiles {
        fwk.SetPodNominator(podQueue)
    }

指定每個 profile 對應的 Framework ,對于Pod是從 podQueue 隊列中獲取。

調(diào)度緩存

    // 設置緩存,此時緩存服務自動處于運行狀態(tài)
    schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)

    // Setup cache debugger.
    debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
    debugger.ListenForSignal(stopEverything)

調(diào)用函數(shù) internalcache.New(durationToExpireAssumedPod, stopEverything) 生成一個 goroutine 來過期的 Pod, 第一個參數(shù)為 ttl 表示Pod的過期時間,如果為0則表示記不過期,這里durationToExpireAssumedPod 的值為0;第二次參數(shù)是pod停止信號。

疑問:

為什么要添加一層 Cache 設計,在什么時候才發(fā)揮作用呢?

創(chuàng)建 scheduler 對象

sched := &Scheduler{
        Cache:                    schedulerCache, // 緩存
        client:                   client, // api server 客戶端
        nodeInfoSnapshot:         snapshot, // node 快照信息
        percentageOfNodesToScore: options.percentageOfNodesToScore,
        Extenders:                extenders, // 擴展器
        
        // 獲取Pod函數(shù),對應的是優(yōu)先隊列里的 Pop() 方法
        NextPod:                  internalqueue.MakeNextPodFunc(podQueue),
        StopEverything:           stopEverything,
        SchedulingQueue:          podQueue, // 隊列
        Profiles:                 profiles, // 配置文件
    }
    sched.applyDefaultHandlers()

    addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))

調(diào)用 sched.applyDefaultHandlers() 設置調(diào)度Pod函數(shù)和 調(diào)度失敗處理函數(shù)。

func (s *Scheduler) applyDefaultHandlers() {
    s.SchedulePod = s.schedulePod
    s.FailureHandler = s.handleSchedulingFailure
}

這里 s.schedulePod() 作為調(diào)度實現(xiàn)函數(shù),是我們關注的重點中的重點。

調(diào)度器執(zhí)行

上面做了好么多工作,只是為了調(diào)度器能夠執(zhí)行。對于調(diào)度器的執(zhí)行入口為

// cmd/kube-scheduler/app/server.go#L243
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
    ...
    // 調(diào)度器執(zhí)行
    sched.Run(ctx)
}

這里調(diào)用了sched.Run() 函數(shù)真正將服務運行起來,我們看一下它做了哪些事。

// pkg/scheduler/scheduler.go#L337-L391
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
  // 1. 優(yōu)先隊列運行
    sched.SchedulingQueue.Run()

    // We need to start scheduleOne loop in a dedicated goroutine,
    // because scheduleOne function hangs on getting the next item
    // from the SchedulingQueue.
    // If there are no new pods to schedule, it will be hanging there
    // and if done in this goroutine it will be blocking closing
    // SchedulingQueue, in effect causing a deadlock on shutdown.
    // 2. 在單獨的goroutine里以 loop 方式調(diào)度Pod
    go wait.UntilWithContext(ctx, sched.scheduleOne, 0)

  // 調(diào)度服務結束
    <-ctx.Done()
    sched.SchedulingQueue.Close()
}

這里做了兩件事:

  1. 將優(yōu)先隊列服務運行起來
  2. 以 loop 的方式調(diào)用 sched.scheduleOne() 調(diào)度Pod,一次調(diào)度一個Pod。

對于Pod的調(diào)度,這里指定了 UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 函數(shù)的 period 參數(shù)為 0,表示是在一個 loop 中執(zhí)行 sched.scheduleOne()函數(shù)。

調(diào)度時會從 SchedulingQueue 優(yōu)先隊列里獲取一個Pod;如果沒有新的Pod要調(diào)度,它將一直阻塞。

下面我們看一下這兩塊的實現(xiàn)邏輯。

優(yōu)先隊列

首先通過調(diào)用 sched.SchedulingQueue.Run()啟用優(yōu)先隊列服務。

func (p *PriorityQueue) Run() {
    go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
    go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}

這里就干兩件事:

  1. 10秒執(zhí)行一個 p.flushBackoffQCompleted() 函數(shù),將所有已完成的Pod從 backoffQ 隊列移動到 activeQ 隊列
  2. 30秒 執(zhí)行一次 flushUnschedulablePodsLeftover() 函數(shù),將所有停留在 unschedulablePods 中時間超出 podMaxInUnschedulablePodsDuration 的Pod移動到 backoffQactiveQ 隊列

這兩件事全部在單獨的 goroutine 里以 loop 的方式定時執(zhí)行,下面我們看一下它們兩者的實現(xiàn)。

flushBackoffQCompleted()

將所有已完成的Pod從 backoffQ 移動到 activeQ 隊列

func (p *PriorityQueue) flushBackoffQCompleted() {
    p.lock.Lock()
    defer p.lock.Unlock()
    activated := false
    for {
        // 獲取隊列頭部元素,但不從隊列里刪除
        rawPodInfo := p.podBackoffQ.Peek()
        if rawPodInfo == nil {
            break
        }
        pInfo := rawPodInfo.(*framework.QueuedPodInfo)
        pod := pInfo.Pod
        if p.isPodBackingoff(pInfo) {
            break
        }
        
        // 刪除獲取隊列頭部元素
        _, err := p.podBackoffQ.Pop()
        if err != nil {
            klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
            break
        }
        
        // 移入 activeQ 隊列
        if added, _ := p.addToActiveQ(pInfo); added {
            activated = true
        }
    }

    if activated {
        p.cond.Broadcast()
    }
}

加入 activeQ 函數(shù)實現(xiàn)

func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) {
  // 執(zhí)行插件 runPreEnqueuePlugins
    pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
    if pInfo.Gated {
        // Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
        p.unschedulablePods.addOrUpdate(pInfo)
        return false, nil
    }
    
    // 加入隊列,如果已在隊列則直接更新(什么情況下會在activeQ隊列存在?)
    if err := p.activeQ.Add(pInfo); err != nil {
        klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
        return false, err
    }
    return true, nil
}

flushUnschedulablePodsLeftover()

將所有停留在 unschedulablePods 中時間超出 podMaxInUnschedulablePodsDuration 的Pod移動到 backoffQactiveQ 隊列

func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
    var podsToMove []*framework.QueuedPodInfo
    currentTime := p.clock.Now()
    for _, pInfo := range p.unschedulablePods.podInfoMap {
        lastScheduleTime := pInfo.Timestamp
        if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
            podsToMove = append(podsToMove, pInfo)
        }
    }

  // 移入activeQ 或 BackoffQueue
    if len(podsToMove) > 0 {
        p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
    }
}

// 將 Pod 移動到 activeQ 或 backoffQ
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
    activated := false
    for _, pInfo := range podInfoList {
        if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
            continue
        }
        pod := pInfo.Pod
        if p.isPodBackingoff(pInfo) {
            if err := p.podBackoffQ.Add(pInfo); err != nil {
                klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
            } else {
                klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
                metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
                
                // 從 unschedulablePods 里刪除Pod
                p.unschedulablePods.delete(pod, pInfo.Gated)
            }
        } else {
            gated := pInfo.Gated
            
            // 添加到 activeQ 隊列,并從 unschedulablePods 中將其刪除
            if added, _ := p.addToActiveQ(pInfo); added {
                klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
                activated = true
                metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
                p.unschedulablePods.delete(pod, gated)
            }
        }
    }
    p.moveRequestCycle = p.schedulingCycle
    if activated {
        p.cond.Broadcast()
    }
}

下面我們先看一下調(diào)度Pod的邏輯,最后再看隊列相關的兩個函數(shù)

調(diào)度Pod

scheduleOne() 函數(shù)實現(xiàn)了一個 Pod 的整個調(diào)度過程,整個步驟分為:

  1. 從優(yōu)先隊列里獲取一個待調(diào)度的 Pod
  2. 根據(jù)Pod的信息,獲取指定的 Framework
  3. 篩選出一個可能被調(diào)度的 Node,如果有 >1個節(jié)點的話,還需要對每個節(jié)點進行評分,找出評分最高的Node
  4. 異步 的方式綁定 Pod 與 Node

獲取待調(diào)度的Pod

func (sched *Scheduler) scheduleOne(ctx context.Context) {
    // 1. 獲取一個 Pod, 如果為 nil 則直接返回取消
    podInfo := sched.NextPod()
}

這里的 sched.NextPod() 在上面"創(chuàng)建 scheduler 對象"的時候已經(jīng)賦值過,它對應

// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
    return func() *framework.QueuedPodInfo {
        podInfo, err := queue.Pop()
        if err == nil {
            klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
            for plugin := range podInfo.UnschedulablePlugins {
                metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
            }
            return podInfo
        }
        klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
        return nil
    }
}

這里調(diào)用 優(yōu)先隊列的 PriorityQueue.Pop() 方法獲取一個 Pod 信息。

// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
    p.lock.Lock()
    defer p.lock.Unlock()
    
    // 阻塞
    for p.activeQ.Len() == 0 {
        // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
        // When Close() is called, the p.closed is set and the condition is broadcast,
        // which causes this loop to continue and return from the Pop().
        if p.closed {
            return nil, fmt.Errorf(queueClosed)
        }
        p.cond.Wait()
    }
    
    // 彈出Pod
    obj, err := p.activeQ.Pop()
    if err != nil {
        return nil, err
    }
    
    // 遞增
    pInfo := obj.(*framework.QueuedPodInfo)
    pInfo.Attempts++
    p.schedulingCycle++
    return pInfo, nil
}

從注釋里我們可知,它是以block的方式從 actieQ 隊列獲取一個 Pod,如果隊列為空,則將一直等待,至到一個新的Pod被添加到隊列。同時當Pod被 Pop() 走后,scheduling cycle 的值將遞增。

獲取調(diào)度 Framework

根據(jù)當前Pod調(diào)度器 pod.schedulerName 獲取其對應的 Framework

func (sched *Scheduler) scheduleOne(ctx context.Context) {
  // 2. 根據(jù)當前Pod調(diào)度器 pod.schedulerName 獲取其對應的 Framework
    fwk, err := sched.frameworkForPod(pod)
    
    // 檢查是否需要跳過調(diào)度
    if sched.skipPodSchedule(fwk, pod) {
        return
    }
}

調(diào)用 sched.skipPodSchedule(fwk, pod) 函數(shù)來檢查是否需要跳過調(diào)度操作。如果這個Pod正處于刪除狀態(tài)(pod.DeletionTimestamp != nil)或者明確指定了不需要調(diào)度,則直接結束本次調(diào)度;

節(jié)點選擇

節(jié)點選擇有以下步驟:

  1. 找一個節(jié)點
  2. 修改 pod.Spec.NodeName = NodeName 屬性,建立Pod與Node映射關系
  3. 執(zhí)行一系列插件
  4. 根據(jù)需要將一些Pod移到 activeQ 隊列

下面我們先看一下找節(jié)點這一步

找節(jié)點

為了能將執(zhí)行Pod,必須找一個合適的Node,這一步也是 k8s 中必須關注的一塊內(nèi)容

// 3. 獲取一個最合適的節(jié)點
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)

可以看到,對于節(jié)點的選擇是在 sched.schedulingCycle()函數(shù)里又調(diào)用了一個 sched.SchedulePod 函數(shù)來實現(xiàn)的。

// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(
    ctx context.Context,
    state *framework.CycleState,
    fwk framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
    pod := podInfo.Pod
    
    // 一、 獲取一個合適的節(jié)點,這塊下面有詳細介紹
    scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
    if err != nil {
        ...
        return err
    }

這里根據(jù) sched.SchedulePod() 源碼實現(xiàn),共有以下步驟:

  1. 更新節(jié)點快照 sched.nodeInfoSnapshot
  2. 檢查節(jié)點快照 sched.nodeInfoSnapshot 里節(jié)點數(shù)量,如果為 0 ,則直接結束
  3. 基于 Framework 過濾插件和過濾擴展查找適合Pod的節(jié)點(plugins 和 extenders 發(fā)揮使用)
  4. 如果只有一個節(jié)點的話,直接返回使用這個節(jié)點并結束
  5. 如果存在多個節(jié)點的話,則調(diào)用prioritizeNodes函數(shù),通過運行 fwk.RunScorePlugins() 函數(shù)對每個節(jié)點進行打分,并對每個節(jié)點記錄每個插件的打分和統(tǒng)計節(jié)點總得分
  6. 調(diào)用 selectHost() 遍歷一次所有節(jié)點,找出來得分最高的節(jié)點

下面我們看一下源碼實現(xiàn),并在重要的地方做了一些注釋。

如果獲取節(jié)點成功,則返回 ScheduleResult,否則返回一個 FitError數(shù)據(jù)結構。

更新節(jié)點快照
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {

    // 1. 更新節(jié)點快照,為什么更新?為什么用?每調(diào)度一個Pod就會更新Node快照,性能是否存在問題
    if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
        return result, err
    }
}
檢查快照節(jié)點數(shù)量
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
  // 2. 檢查節(jié)點快照有無節(jié)點
    if sched.nodeInfoSnapshot.NumNodes() == 0 {
        return result, ErrNoNodesAvailable
    }
}
找出所有可使用的Node清單

然后為 Pod 尋找所有適合此Pod 運行的 node 清單

// 3. 查找合適的節(jié)點
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
只有一個節(jié)點直接使用

如果當前只有一個節(jié)點的話,則此時不用考慮其它情況,直接使用。

    // 4. 正好只有一個節(jié)點,直接使用
    // When only one node after predicate, just use it.
    if len(feasibleNodes) == 1 {
        return ScheduleResult{
            SuggestedHost:  feasibleNodes[0].Name,
            EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
            FeasibleNodes:  1,
        }, nil
    }
多節(jié)點則進行打分

如果有多個節(jié)點的話,則在這些節(jié)點上調(diào)用函數(shù) RunScorePlugins() 在執(zhí)行插件評分。

// 5. 從多個節(jié)點中選擇最合適的節(jié)點
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)

每個插件都會對當前節(jié)點進行計算,得出當前節(jié)點評分,同時還要將所有插件的評分累計,作為節(jié)點的總評分。

找出評分最高的節(jié)點
// 6. 從叢多節(jié)點中遍歷出來評分最高的那個,每個節(jié)點都會調(diào)用所有plugins對其進行評分
host, err := selectHost(priorityList)

然后遍歷所有節(jié)點,篩選出來得分最高的節(jié)點,這里的 host 變量即是節(jié)點的名稱。

最后將獲取節(jié)點返回

    return ScheduleResult{
        SuggestedHost:  host,  // 選擇的節(jié)點名稱(這里是指主機名還是系統(tǒng)內(nèi)部分配的一個標識?)
        EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
        FeasibleNodes:  len(feasibleNodes),
    }, err

這里 ScheduleResult 結構體為

// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
    // 節(jié)點名稱
    SuggestedHost string
    
    //調(diào)度程序在篩選階段及以后評估pod的節(jié)點數(shù)
    EvaluatedNodes int
    
    // 評估后,適合pod的節(jié)點數(shù)量
    FeasibleNodes int
    
    // 調(diào)度周期相關
    nominatingInfo *framework.NominatingInfo
}

至此,節(jié)點找好了,不過還有一些工作要做。

修改Pod的屬性

通過修改 Pod.Spec.NodeName=NodeName 屬性,建立Pod與Node之間的映射關系。告訴 cache 這個pod已與上面選擇以node建立了綁定關系(其實還沒有真正綁定),只是在cache里對其建立了綁定關系

func (sched *Scheduler) schedulingCycle(...) {
    assumedPodInfo := podInfo.DeepCopy()
    assumedPod := assumedPodInfo.Pod
    
    // 二、修改 Pod.Spec.NodeName = NodeName,建立 Pod 與 Node 的映射關系
    err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
    ...
}

func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
    // pod 所在Node
    assumed.Spec.NodeName = host

  // 緩存更新
    if err := sched.Cache.AssumePod(assumed); err != nil {
        klog.ErrorS(err, "Scheduler cache AssumePod failed")
        return err
    }
    // 從內(nèi)部緩存中刪除
    if sched.SchedulingQueue != nil {
        sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
    }

    return nil
}

執(zhí)行插件

執(zhí)行 reserve 插件的 Reserve() 方法 和 執(zhí)行 Permit 插件。

func (sched *Scheduler) schedulingCycle(...) {
  // 三、執(zhí)行一些插件
    // Run the Reserve method of reserve plugins.
    if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {}


    // Run "permit" plugins.
    runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
    if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
    }

}

將 Pod 轉(zhuǎn)到 activeQ 隊列

如果 podsToActive.Map 存在Pod,則將它們從 backoffQ 或unschedulablePods 中移到 activeQ

func (sched *Scheduler) schedulingCycle(...) {
  // 四、podsToActivate.Map值被插件修改,什么情況下會插件會修改?
  
    // At the end of a successful scheduling cycle, pop and move up Pods if needed.
    if len(podsToActivate.Map) != 0 {
      // 將pod 從 backoffQ 或unschedulablePods 中移到 activeQ
        sched.SchedulingQueue.Activate(podsToActivate.Map)
        // Clear the entries after activation.
        podsToActivate.Map = make(map[string]*v1.Pod)
    }

    return scheduleResult, assumedPodInfo, nil
}

疑問:

podsToActivate.Map 什么時候可能被修改?

綁定Pod與Node

  // 異步綁定 Pod 與 node
    go func() {
        bindingCycleCtx, cancel := context.WithCancel(ctx)
        defer cancel()
        
        // 綁定
        status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
        if !status.IsSuccess() {
            sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
        }
    }()

可以看到對于pod與節(jié)點 node的綁定是異步的,綁定函數(shù)bindingCycle()實現(xiàn)

func (sched *Scheduler) bindingCycle(   ctx context.Context,
    state *framework.CycleState,
    fwk framework.Framework,
    scheduleResult ScheduleResult,
    assumedPodInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate) *framework.Status {

    assumedPod := assumedPodInfo.Pod

    // 執(zhí)行 "permit" 插件.
    if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
        return status
    }

    // Run "prebind" plugins.
    if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
        return status
    }

    // Run "bind" plugins.
    if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
        return status
    }

    // Run "postbind" plugins.
    fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)

    // At the end of a successful binding cycle, move up Pods if needed.
    if len(podsToActivate.Map) != 0 {
        sched.SchedulingQueue.Activate(podsToActivate.Map)
        // Unlike the logic in schedulingCycle(), we don't bother deleting the entries
        // as `podsToActivate.Map` is no longer consumed.
    }

    return nil
}

可能看到在異步綁定 Pod與Node 時,將會依次執(zhí)行下面的插件

  1. permit
  2. prebind
  3. bind
  4. postbind

在最后,如果 podsToActive.Map 不為空的話,則需要將這些 Pod 移到 activeQ 隊列里,但這里并不將這些Pod信息從podsToActivate.Map 中刪除,這個與上面 schedulingCycle() 有些不一樣。

至此,Pod 與 Node 綁定關系已算結束了。

總結

可以看到插件在整個調(diào)度過程中極其的重要,這里并沒有對其進行詳細介紹。有時間的話,大家可以了解一下內(nèi)置插件的作用,同時還有擴展器的作用。

?著作權歸作者所有,轉(zhuǎn)載或內(nèi)容合作請聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容