k8s調(diào)度器
版本號:v1.27.2
調(diào)度入口
kubernetes 調(diào)度器入口文件位于 /cmd/kube-scheduler/scheduler.go,其中用到的核心庫位于 /pkg/scheduler。
調(diào)用流程:
`app.NewSchedulerCommand()`
↓
`runCommand(cmd, opts, registryOptions...)`
↓
`cc, sched, err := Setup(ctx, opts, registryOptions...)`
↓
`sched, err := scheduler.New(cc.Client,...) `
↓
` Run(ctx, cc, sched)`
|
|
----> `sched.Run(ctx)` (/pkg/scheduler) -> `sched.SchedulingQueue.Run()`
↓
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
最后到達文件 /pkg/scheduler/scheduler.go#L241, 這里創(chuàng)建了一個 Scheduler 是我們重點關注的地方。
Scheduler 結構體
不過為了后面方便理解,我們先看一下 Scheduler結構體的結構。
從注釋信息我們可以看到其主要功能是 watch未調(diào)度的pod,它將試圖找一個合適的 Node ,然后將其寫回到 api server。
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
// 通過 NodeLister 和 Algorithm 將觀察到通過 Cache 所做的任何更改
// It is expected that changes made via Cache will be observed
// by NodeLister and Algorithm.
Cache internalcache.Cache
// 擴展器是外部進程影響Kubernetes做出的調(diào)度決策的接口,通常是不由Kubernetes直接管理的資源所需要的。
Extenders []framework.Extender
// 以阻塞的形式獲取下一個有效的待調(diào)度的Pod。這里不使用channel,主要是因為對一個pod的調(diào)度可能需要一些時間
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
// a pod may take some amount of time and we don't want pods to get
// stale while they sit in a channel.
NextPod func() *framework.QueuedPodInfo
// 調(diào)度失敗處理 Handler
// FailureHandler is called upon a scheduling failure.
FailureHandler FailureHandlerFn
// 試圖將一個Pod調(diào)度到其中一個節(jié)點上,成功則返回 ScheduleResult, 否則返回帶有失敗原來的 FitError
// SchedulePod tries to schedule the given pod to one of the nodes in the node list.
// Return a struct of ScheduleResult with the name of suggested host on success,
// otherwise will return a FitError with reasons.
SchedulePod func(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (ScheduleResult, error)
// Close this to shut down the scheduler.
StopEverything <-chan struct{}
// 等待被調(diào)度的Pod隊列,也就是說只有從這個隊列里獲取的Pod才能被調(diào)度
// SchedulingQueue holds pods to be scheduled
SchedulingQueue internalqueue.SchedulingQueue
// 調(diào)度配置profiles
// Profiles are the scheduling profiles.
Profiles profile.Map
// api server 專用 client
client clientset.Interface
// 節(jié)點信息快照
nodeInfoSnapshot *internalcache.Snapshot
// 節(jié)點打分相關
percentageOfNodesToScore int32
nextStartNodeIndex int
}
通過結構體我們大概可以猜到一些信息:
- 調(diào)度中某些操作與
Cache有關,主要是性能優(yōu)化考慮,設計文檔參考/pkg/scheduler/internal/cache/interface.go - 通過
NextPod函數(shù)可以從調(diào)度隊列SchedulingQueue里獲取一個需要待調(diào)度的 Pod 信息,其結構體*framework.QueuedPodInfo內(nèi)潛了*PodInfo結構體,而*PodInfo又內(nèi)嵌了*v1.Pod結構體,MakeNextPodFunc()函數(shù) - 通過 SchedulePod 函數(shù)對一個 Pod 進行調(diào)度,其中傳遞的Pod參數(shù)為
*v1.Pod,調(diào)度成功返回ScheduleResult, 如果失敗則返回*FitError* - 如果調(diào)度失敗需要進行
FailureHandler處理 - 所有需要調(diào)度的Pod必須從
SchedulingQueue里獲取 - 需要用 client 與
api server通訊,是不是需要保存節(jié)點與Pod綁定關系?因在結構體的注釋提到過 api server - 調(diào)度器可能需要在一定的時機將其關閉
- Profiles 用在什么地方?
- 節(jié)點快照
nodeInfoSnapshot是干什么用的? - 節(jié)點打分
percentageOfNodesToScore,它是集群中可用節(jié)點的百分比閾值。一旦發(fā)現(xiàn)足夠的節(jié)點可以滿足調(diào)度一個 pod 的條件,調(diào)度器就會停止在集群中繼續(xù)尋找更多的可用節(jié)點,以提高調(diào)度器的性能。無論這個閾值的值是多少,調(diào)度器始終至少嘗試找到 "minFeasibleNodesToFind" 個可用節(jié)點。例如,如果集群大小為 500 個節(jié)點,PercentageOfNodesToScore 的值為 30,則一旦找到 150 個可行的節(jié)點,調(diào)度器就會停止繼續(xù)尋找更多的可行的節(jié)點。當值為0時,將使用默認百分比 (基于集群大小的5%--50%) 來評分節(jié)點,它將覆蓋全局 PercentageOfNodesToScore。如果沒有設置,將使用全局 PercentageOfNodesToScore。 - 其它
接著我們帶著上面的一些個人理解到片面信息與疑問,看一下 Scheduler 的內(nèi)部實現(xiàn)原理。
Scheduler 創(chuàng)建
了解了結構體后,我們再看下 Scheduler 的函數(shù) New() 實現(xiàn)
我們先看一下函數(shù)原型
func New(client clientset.Interface,
informerFactory informers.SharedInformerFactory,
dynInformerFactory dynamicinformer.DynamicSharedInformerFactory,
recorderFactory profile.RecorderFactory,
stopCh <-chan struct{},
opts ...Option) (*Scheduler, error) {}
參數(shù)解釋:
client 就是 api server 的客戶端,一般是通過讀取默認配置文件 $HOME/.kube/config 生成
informerFactory 一個 SharedInformerFactory,參考 https://blog.haohtml.com/archives/32179
dynInformerFactory Dynamic 客戶端的 Shared 的 Informer
recorderFactory 一個 scheduler name 的事件記錄器
stopCh 終止退出信號
opts 就是scheduler 的選項函數(shù),對scheduler 進行初始化
對于返回結果就是一個 Scheduler 對象。
下面我們一塊看一下其具體實現(xiàn)
配置選項初始化
函數(shù)選項式模式對調(diào)度器結構體初始化
// 函數(shù)選項式模塊初始化調(diào)度器結構體
options := defaultSchedulerOptions
for _, opt := range opts {
opt(&options)
}
這里的 defaultSchedulerOptions 是一個單獨的結構體,原型為
type schedulerOptions struct {
componentConfigVersion string
kubeConfig *restclient.Config
// Overridden by profile level percentageOfNodesToScore if set in v1.
percentageOfNodesToScore int32
podInitialBackoffSeconds int64
podMaxBackoffSeconds int64
podMaxInUnschedulablePodsDuration time.Duration
// Contains out-of-tree plugins to be merged with the in-tree registry.
frameworkOutOfTreeRegistry frameworkruntime.Registry
profiles []schedulerapi.KubeSchedulerProfile
extenders []schedulerapi.Extender
frameworkCapturer FrameworkCapturer
parallelism int32
applyDefaultProfile bool
}
這里有兩個重要的字段,一個是 profiles 字段,另一個是 extenders 字段。
profiles 字段
profiels 字估類型為 []schedulerapi.KubeSchedulerProfile 結構類型
// KubeSchedulerProfile is a scheduling profile.
type KubeSchedulerProfile struct {
SchedulerName string // 調(diào)度器名稱
PercentageOfNodesToScore *int32 // 集群中可用節(jié)點的百分比閾值
Plugins *Plugins // 所有插件,包含啟用和禁用
PluginConfig []PluginConfig // 每個插件的一組可選的自定義插件參數(shù),如果省略則使用默認配置
}
從結構體大致就可以知道它其實是一個調(diào)度器切片,每個調(diào)度器都有一組 Plugins。
而這個 Plugins 結構為
type Plugins struct {
// PreEnqueue is a list of plugins that should be invoked before adding pods to the scheduling queue.
PreEnqueue PluginSet
// QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue.
QueueSort PluginSet
// PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework.
PreFilter PluginSet
// Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod.
Filter PluginSet
// PostFilter is a list of plugins that are invoked after filtering phase, but only when no feasible nodes were found for the pod.
PostFilter PluginSet
// PreScore is a list of plugins that are invoked before scoring.
PreScore PluginSet
// Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase.
Score PluginSet
// Reserve is a list of plugins invoked when reserving/unreserving resources
// after a node is assigned to run the pod.
Reserve PluginSet
// Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod.
Permit PluginSet
// PreBind is a list of plugins that should be invoked before a pod is bound.
PreBind PluginSet
// Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework.
// The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success.
Bind PluginSet
// PostBind is a list of plugins that should be invoked after a pod is successfully bound.
PostBind PluginSet
// MultiPoint is a simplified config field for enabling plugins for all valid extension points
MultiPoint PluginSet
}
type PluginSet struct {
// Enabled指定除了默認插件之外還應啟用的插件。這些是在默認插件之后調(diào)用的,并且按照此處指定的相同順序
Enabled []Plugin
// Disabled指定應禁用的默認插件。當需要禁用所有默認插件時,只包含一個“*”的數(shù)組應該be provided.
Disabled []Plugin
}
type Plugin struct {
Name string // 插件名
Weight int32 // 權重,僅在評分時使用
}
執(zhí)行時。插件包括多個擴展點,當指定時特定擴展點的插件列表是唯一啟用的插件。如果從配置中省略了一個擴展點,那么該擴展點將使用默認的插件集。
啟用的插件按此處指定的順序調(diào)用,在默認插件之后調(diào)用。如果需要在默認插件之前調(diào)用它們,則必須在此處按所需順序禁用并重新啟用默認插件。
PreEnqueue // 進入調(diào)度隊列前調(diào)用
↓
QueueSort // 在 scheduler queue 里對Pod排序調(diào)用
↓
PreFilter -> Filter -> PostFilter // 其中 Filter 是在篩選出無法運行Pod的節(jié)點時調(diào)用
↓ // PostFilter 是在過濾階段后但找不到pod的可行節(jié)點時調(diào)用
PreScore -> Score
↓
Reserve
↓
Permit
↓
PreBind -> Bind -> PostBind
↓
MultiPoint
對于這些插件列表它們每個都有對應的 extension points ,也只有在這些對應的擴展點時它們才有機會被調(diào)用。
PreEnqueue 進入調(diào)度隊列前調(diào)用
QueueSort 在 scheduler queue 里對Pod排序調(diào)用
PreFilter 在 scheduling framework 的 PreFilter 擴展點調(diào)用
Filter 篩選出無法運行 Pod 的 node 時調(diào)用
PostFilter 過濾階段后但找不到 pod 的可行 node 時調(diào)用, 與 Filter 的擴展點不一樣
PreScore 評分前調(diào)用
Score已通過篩選階段的節(jié)點進行排名時調(diào)用
Reserve 在分配 node 運行pod后, reserving/unreserving 服務資源時調(diào)用
Permit 用于控制Pod的綁定,這些插件可以 阻止 或 延遲 Pod的綁定
PreBind 綁定pod之前調(diào)用
Bind 在 scheduling framework. 的 Bind擴展點調(diào)用
PostBind 在pod成功綁定后調(diào)用
MultiPoint 在所有有效的擴展點被調(diào)用,也就是說這個插件是對全局擴展點有效的。
下面我們將會用到PreQueue 這個插件列表
總結
profiles 字段是一個 scheduler 切片,而每個 scheduler 對應一些插件,這些插件又按執(zhí)行擴展點的先后順序進行了分組 Plugins Set, 組里的插件也只有在他們自己的 extension points 才會被調(diào)用,對于 MultiPoint 會在所有有效的 extension points 被調(diào)用。
現(xiàn)在我們再回到初始化調(diào)度器的主流程。
if options.applyDefaultProfile {
// 調(diào)度器配置,很重要,特別是 Profiles 和 Extenders 字段
// 對象轉(zhuǎn)換,將 versiondCfg 轉(zhuǎn)為 cfg
var versionedCfg configv1.KubeSchedulerConfiguration
scheme.Scheme.Default(&versionedCfg)
cfg := schedulerapi.KubeSchedulerConfiguration{}
if err := scheme.Scheme.Convert(&versionedCfg, &cfg, nil); err != nil {
return nil, err
}
// 通過指定 "scheduler name" profile,可以用來控制Pod的調(diào)度行為,如何為空則默認使用 "default-scheduler" profile
options.profiles = cfg.Profiles
}
根據(jù)是否啟默認配置文件來初始化 defaultSchedulerOptions.Options 字段,通過 scheduler_name 來控制Pod調(diào)度行為(其實是通過scheduler_name 的 Plugins 實現(xiàn)對Pod的調(diào)度控制 )。
這里 KubeSchedulerConfiguration 結構體
// KubeSchedulerConfiguration configures a scheduler
type KubeSchedulerConfiguration struct {
...
// Profiles are scheduling profiles that kube-scheduler supports. Pods can
// choose to be scheduled under a particular profile by setting its associated
// scheduler name. Pods that don't specify any scheduler name are scheduled
// with the "default-scheduler" profile, if present here.
// +listType=map
// +listMapKey=schedulerName
Profiles []KubeSchedulerProfile `json:"profiles,omitempty"`
// Extenders are the list of scheduler extenders, each holding the values of how to communicate
// with the extender. These extenders are shared by all scheduler profiles.
// +listType=set
Extenders []Extender `json:"extenders,omitempty"`
}
插件構建器注冊表
注冊所有內(nèi)置插件,registry 的數(shù)據(jù)結構為 map[string]PluginFactory
// 插件構建函數(shù)注冊表,
// 注冊表是所有可用插件的集合,framework 使用注冊表來啟用和初始化配置的插件。在初始化框架之前,所有插件都必須在注冊表中。
registry := frameworkplugins.NewInTreeRegistry()
if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil {
return nil, err
}
調(diào)用 frameworkplugins.NewInTreeRegistry() 實現(xiàn) in-tree插件的注冊,也可通過WithFrameworkOutOfTreeRegistry選項注冊其他插件
func NewInTreeRegistry() runtime.Registry {
fts := plfeature.Features{
EnableDynamicResourceAllocation: feature.DefaultFeatureGate.Enabled(features.DynamicResourceAllocation),
EnableReadWriteOncePod: feature.DefaultFeatureGate.Enabled(features.ReadWriteOncePod),
EnableVolumeCapacityPriority: feature.DefaultFeatureGate.Enabled(features.VolumeCapacityPriority),
EnableMinDomainsInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MinDomainsInPodTopologySpread),
EnableNodeInclusionPolicyInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.NodeInclusionPolicyInPodTopologySpread),
EnableMatchLabelKeysInPodTopologySpread: feature.DefaultFeatureGate.Enabled(features.MatchLabelKeysInPodTopologySpread),
EnablePodSchedulingReadiness: feature.DefaultFeatureGate.Enabled(features.PodSchedulingReadiness),
EnablePodDisruptionConditions: feature.DefaultFeatureGate.Enabled(features.PodDisruptionConditions),
EnableInPlacePodVerticalScaling: feature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling),
}
// 所有內(nèi)置插件
registry := runtime.Registry{
dynamicresources.Name: runtime.FactoryAdapter(fts, dynamicresources.New),
selectorspread.Name: selectorspread.New,
imagelocality.Name: imagelocality.New,
tainttoleration.Name: tainttoleration.New,
nodename.Name: nodename.New,
nodeports.Name: nodeports.New,
nodeaffinity.Name: nodeaffinity.New,
podtopologyspread.Name: runtime.FactoryAdapter(fts, podtopologyspread.New),
nodeunschedulable.Name: nodeunschedulable.New,
noderesources.Name: runtime.FactoryAdapter(fts, noderesources.NewFit),
noderesources.BalancedAllocationName: runtime.FactoryAdapter(fts, noderesources.NewBalancedAllocation),
volumebinding.Name: runtime.FactoryAdapter(fts, volumebinding.New),
volumerestrictions.Name: runtime.FactoryAdapter(fts, volumerestrictions.New),
volumezone.Name: volumezone.New,
nodevolumelimits.CSIName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCSI),
nodevolumelimits.EBSName: runtime.FactoryAdapter(fts, nodevolumelimits.NewEBS),
nodevolumelimits.GCEPDName: runtime.FactoryAdapter(fts, nodevolumelimits.NewGCEPD),
nodevolumelimits.AzureDiskName: runtime.FactoryAdapter(fts, nodevolumelimits.NewAzureDisk),
nodevolumelimits.CinderName: runtime.FactoryAdapter(fts, nodevolumelimits.NewCinder),
interpodaffinity.Name: interpodaffinity.New,
queuesort.Name: queuesort.New,
defaultbinder.Name: defaultbinder.New,
defaultpreemption.Name: runtime.FactoryAdapter(fts, defaultpreemption.New),
schedulinggates.Name: runtime.FactoryAdapter(fts, schedulinggates.New),
}
return registry
}
通過插件器可以實現(xiàn)的功能有實現(xiàn) 注冊新插件,取消注冊插件 和 合并插件
type Registry map[string]PluginFactory
func (r Registry) Register(name string, factory PluginFactory) error {}
func (r Registry) Unregister(name string) error {}
func (r Registry) Merge(in Registry) error {}
extenders 擴展器構建
擴展器是外部進程影響Kubernetes做出的調(diào)度決策的接口。這通常是不由Kubernetes直接管理的資源所需要的。
// pkg/scheduler/apis/config/types.go#L261-L301
type Extender struct {
URLPrefix string
FilterVerb string
PreemptVerb string
PrioritizeVerb string
Weight int64
BindVerb string
EnableHTTPS bool
TLSConfig *ExtenderTLSConfig
HTTPTimeout v1.Duration
NodeCacheCapable bool
ManagedResources []ExtenderManagedResource
Ignorable bool
}
保存用于與擴展器通信的參數(shù),如果謂詞未指定/為空,則假定擴展程序選擇不提供該擴展。
extenders, err := buildExtenders(options.extenders, options.profiles)
對于擴展器的構建需要 options.extenders 參數(shù), 其是擴展器參數(shù)配置 它返回的是一個 []framework.Extender 數(shù)據(jù)類型。我們看一下其具體實現(xiàn)
func buildExtenders(extenders []schedulerapi.Extender, profiles []schedulerapi.KubeSchedulerProfile) ([]framework.Extender, error) {
var fExtenders []framework.Extender
if len(extenders) == 0 {
return nil, nil
}
var ignoredExtendedResources []string
var ignorableExtenders []framework.Extender
for i := range extenders {
// 根據(jù)每個擴展器參數(shù)配置創(chuàng)建一個 httpClient 對象的wrapper
extender, err := NewHTTPExtender(&extenders[i])
if err != nil {
return nil, err
}
// 在調(diào)度時,當擴展程序返回錯誤或無法訪問時,擴展器是否被忽略
if !extender.IsIgnorable() {
fExtenders = append(fExtenders, extender)
} else {
ignorableExtenders = append(ignorableExtenders, extender)
}
for _, r := range extenders[i].ManagedResources {
if r.IgnoredByScheduler {
ignoredExtendedResources = append(ignoredExtendedResources, r.Name)
}
}
}
// 將允許忽略的擴展器放在最后面
fExtenders = append(fExtenders, ignorableExtenders...)
// If there are any extended resources found from the Extenders, append them to the pluginConfig for each profile.
// This should only have an effect on ComponentConfig, where it is possible to configure Extenders and
// plugin args (and in which case the extender ignored resources take precedence).
if len(ignoredExtendedResources) == 0 {
return fExtenders, nil
}
// 更新每個scheduler name 中插件 noderesources.Name 的參數(shù)(prof.PluginConfig[k].Args.IgnoredResources)
for i := range profiles {
prof := &profiles[i]
var found = false
for k := range prof.PluginConfig {
if prof.PluginConfig[k].Name == noderesources.Name {
// Update the existing args
pc := &prof.PluginConfig[k]
args, ok := pc.Args.(*schedulerapi.NodeResourcesFitArgs)
if !ok {
return nil, fmt.Errorf("want args to be of type NodeResourcesFitArgs, got %T", pc.Args)
}
// 允許出錯被忽略的擴展器
args.IgnoredResources = ignoredExtendedResources
found = true
break
}
}
if !found {
return nil, fmt.Errorf("can't find NodeResourcesFitArgs in plugin config")
}
}
return fExtenders, nil
}
這里 profiles[i].PluginConfig[i].args 字段類型對應的是 NodeResourcesFitArgs,它是一個 []string 類型
type NodeResourcesFitArgs struct {
v1.TypeMeta
IgnoredResources []string
IgnoredResourceGroups []string
ScoringStrategy *ScoringStrategy
}
創(chuàng)建 podLister 和 nodeLister
podLister := informerFactory.Core().V1().Pods().Lister()
nodeLister := informerFactory.Core().V1().Nodes().Lister()
通過 informetFactory 工作模式創(chuàng)建 podLister 和 nodeLister, 可以用來獲取所有對象資源列表。如 posLister 可以用來獲取所有 Pod 列表, nodeLister 則可以獲取所有 node 列表。
創(chuàng)建scheduler.Profiles
現(xiàn)在到了最核心的部分,上面創(chuàng)建一些對象都會在這里被作為參數(shù)傳入。
// 節(jié)點快照
snapshot := internalcache.NewEmptySnapshot()
// clusterEvent 集合
clusterEventMap := make(map[framework.ClusterEvent]sets.String)
metricsRecorder := metrics.NewMetricsAsyncRecorder(1000, time.Second, stopCh)
// 根據(jù)給出的profiles配置文件 生成對應的 frameworks,(Map類型,格式為 SchedulerName:Framework)
// 而每一個 frameworks 都會對自己的 plugins 集進行管理,然后在調(diào)度上下文中的各種埋點執(zhí)行插件
// registry 其實是 frameworkImpl.registry 字段
profiles, err := profile.NewMap(
// 調(diào)度器配置
options.profiles,
// 插件構建器注冊表
registry,
recorderFactory, stopCh,
// 下面的全是 Options functions 模式,可任意數(shù)量
// componentConfig 版本號
frameworkruntime.WithComponentConfigVersion(options.componentConfigVersion),
// api server 的客戶端,一般通過讀取 $HOME/.kube/config 文件創(chuàng)建
frameworkruntime.WithClientSet(client),
// kubeConfig,一般是 $HOME/kube/config 文件
frameworkruntime.WithKubeConfig(options.kubeConfig),
// SharedInformerFactory
frameworkruntime.WithInformerFactory(informerFactory),
// 設置 SharedLister,用途?
frameworkruntime.WithSnapshotSharedLister(snapshot),
// 回調(diào)函數(shù) frameworkruntime.WithCaptureProfile(frameworkruntime.CaptureProfile(options.frameworkCapturer)),
// 設置一個 clusterEvent 的 Set 類型
frameworkruntime.WithClusterEventMap(clusterEventMap),
// 設置并行調(diào)度數(shù)量,默認為16
frameworkruntime.WithParallelism(int(options.parallelism)),
// 設置外部擴展器
frameworkruntime.WithExtenders(extenders),
// 指標記錄器
frameworkruntime.WithMetricsRecorder(metricsRecorder),
)
先是通過 Options functions 的方式指定了多個參數(shù),其函數(shù) profile.NewMap()實現(xiàn)
// NewMap builds the frameworks given by the configuration, indexed by name.
func NewMap(cfgs []config.KubeSchedulerProfile, r frameworkruntime.Registry, recorderFact RecorderFactory,
stopCh <-chan struct{}, opts ...frameworkruntime.Option) (Map, error) {
m := make(Map)
v := cfgValidator{m: m}
for _, cfg := range cfgs {
// 根據(jù)給的配置構建 Framework
p, err := newProfile(cfg, r, recorderFact, stopCh, opts...)
if err != nil {
return nil, fmt.Errorf("creating profile for scheduler name %s: %v", cfg.SchedulerName, err)
}
if err := v.validate(cfg, p); err != nil {
return nil, err
}
// scheduler name 為key, 以 Framework 為value
m[cfg.SchedulerName] = p
}
return m, nil
}
根據(jù)每個 profile 調(diào)用 newProfile() 生成對應的 frameworkImpl,其實現(xiàn)了 Framework接口,這里真正的實現(xiàn)位于函數(shù) NewFramework(), 這里不做介紹。
type Framework interface {
// 插件需要數(shù)據(jù)和一些工具,在插件初始化的時候通過 pluginFactory 傳入,見 NewInTreeRegistry() 函數(shù)
Handle
// 返回注冊過的 preEnqueue 插件列表
PreEnqueuePlugins() []PreEnqueuePlugin
// 獲取 pod 在隊列中的排序函數(shù)
QueueSortFunc() LessFunc
// PreFilter 插件
RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (*PreFilterResult, *Status)
// PostFilter
RunPostFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, filteredNodeStatusMap NodeToStatusMap) (*PostFilterResult, *Status)
// PreBind
RunPreBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// PostBind
RunPostBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
// Reserve
RunReservePluginsReserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// Unreserve
RunReservePluginsUnreserve(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string)
// Permit
RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// 如果pod處于waiting狀態(tài),則將處于block狀態(tài),直到pod被 rejected 或 allowed
WaitOnPermit(ctx context.Context, pod *v1.Pod) *Status
// Bind
RunBindPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) *Status
// 是否定義了 Filter 插件
HasFilterPlugins() bool
// 是否定義了 PostFilter 插件
HasPostFilterPlugins() bool
// 是否定義了 Score
HasScorePlugins() bool
// 返回定義的plugins 對象
ListPlugins() *config.Plugins
ProfileName() string
PercentageOfNodesToScore() *int32
// 設置 SetPodNominator
SetPodNominator(nominator PodNominator)
}
Framework 通過 scheduling framework 管理著一些插件集??梢栽谡{(diào)度上下文中的指定 points 調(diào)用配置的插件,這個在上面介紹插件的時候提到過。
調(diào)度隊列創(chuàng)建
首先遍歷 profiles 獲取其對應的已注冊好的 PreQueuePlugin 插件,這些插件是在將Pods添加到activeQ之前調(diào)用。
preEnqueuePluginMap := make(map[string][]framework.PreEnqueuePlugin)
for profileName, profile := range profiles {
preEnqueuePluginMap[profileName] = profile.PreEnqueuePlugins()
}
然后創(chuàng)建一個優(yōu)先隊列 sched.SchedulingQueue。
// 初始化一個優(yōu)先隊列作為調(diào)度隊列 sched.SchedulingQueue
podQueue := internalqueue.NewSchedulingQueue(
// 獲取profile 設置的調(diào)度隊列Pod里的Pod排序函數(shù),這里指定獲取第 options.profiles[0]個profile
profiles[options.profiles[0].SchedulerName].QueueSortFunc(),
// sharedInformerFactory
informerFactory,
// 設置 pod 的 Initial階段的 Backoff 的持續(xù)時間
internalqueue.WithPodInitialBackoffDuration(time.Duration(options.podInitialBackoffSeconds)*time.Second),
// 最大backoff持續(xù)時間 internalqueue.WithPodMaxBackoffDuration(time.Duration(options.podMaxBackoffSeconds)*time.Second),
// 設置podLister
internalqueue.WithPodLister(podLister),
// 設置 clusterEvent
internalqueue.WithClusterEventMap(clusterEventMap),
// 一個pod在 unschedulablePods 停留的最長時間
internalqueue.WithPodMaxInUnschedulablePodsDuration(options.podMaxInUnschedulablePodsDuration),
// preEnqueuePluginMap 插件注冊
internalqueue.WithPreEnqueuePluginMap(preEnqueuePluginMap),
// 指標相關
internalqueue.WithPluginMetricsSamplePercent(pluginMetricsSamplePercent),
internalqueue.WithMetricsRecorder(*metricsRecorder),
)
然后從原來的 profiles 中讀取一些配置,如 preEnqueuePlugin 、 Pod 在隊列里的排序函數(shù) QueueSortFunc() 作為調(diào)用函數(shù) internalqueue.NewSchedulingQueue() 的入?yún)?。函?shù)實現(xiàn)
// pkg/scheduler/internal/queue/scheduling_queue.go#L291-L330
// NewPriorityQueue creates a PriorityQueue object.
func NewPriorityQueue(
lessFn framework.LessFunc,
informerFactory informers.SharedInformerFactory,
opts ...Option,
) *PriorityQueue {
options := defaultPriorityQueueOptions
if options.podLister == nil {
options.podLister = informerFactory.Core().V1().Pods().Lister()
}
for _, opt := range opts {
opt(&options)
}
// pod 排序
comp := func(podInfo1, podInfo2 interface{}) bool {
pInfo1 := podInfo1.(*framework.QueuedPodInfo)
pInfo2 := podInfo2.(*framework.QueuedPodInfo)
return lessFn(pInfo1, pInfo2)
}
// 初始化優(yōu)先隊列
pq := &PriorityQueue{
// 存儲維護pod與其運行在哪個節(jié)點關系
nominator: newPodNominator(options.podLister),
clock: options.clock,
stop: make(chan struct{}),
podInitialBackoffDuration: options.podInitialBackoffDuration,
podMaxBackoffDuration: options.podMaxBackoffDuration,
// 一個pod在 unschedulablePods 停留的最長時間
podMaxInUnschedulablePodsDuration: options.podMaxInUnschedulablePodsDuration,
activeQ: heap.NewWithRecorder(podInfoKeyFunc, comp, metrics.NewActivePodsRecorder()),
// 包含已嘗試并確定為不可調(diào)度的pod, 其實是一個map類型,對應 podInfoMap map[string]*framework.QueuedPodInfo
unschedulablePods: newUnschedulablePods(metrics.NewUnschedulablePodsRecorder(), metrics.NewGatedPodsRecorder()),
moveRequestCycle: -1,
// clusterEvent
clusterEventMap: options.clusterEventMap,
// preEnqueuePlugin
preEnqueuePluginMap: options.preEnqueuePluginMap,
metricsRecorder: options.metricsRecorder,
pluginMetricsSamplePercent: options.pluginMetricsSamplePercent,
}
pq.cond.L = &pq.lock
pq.podBackoffQ = heap.NewWithRecorder(podInfoKeyFunc, pq.podsCompareBackoffCompleted, metrics.NewBackoffPodsRecorder())
pq.nsLister = informerFactory.Core().V1().Namespaces().Lister()
return pq
}
幾個重要的點已在代碼里進行了注釋,下面我們重點看理解隊列這個概念。
我們以這里的 PriorityQueue為例,其實現(xiàn)了 SchedulingQueue 接口。PriorityQueue 隊列 head 的 Pod 具有最高優(yōu)先級,它有兩個 sub queue 和一個可選的data structure:
activeQ :子隊列,持有即將等待被調(diào)度的 Pod,將會根據(jù)其優(yōu)先級和其他配置信息來排隊等待調(diào)度程序?qū)ζ溥M行分配。
backoffQ :子隊列,雙稱為退避隊列。 持有從 unschedulablePods 中移走的Pod, 并將在其 backoff periods 退避期結束時移動到activeQ 隊列。Pod 在退避隊列中等待并定期嘗試進行重新調(diào)度。重新調(diào)度的頻率會按照一個指數(shù)級的算法定時增加,從而充分探索可用的資源,直到找到可以分配給該 Pod 的節(jié)點。
unschedulablePods :不可調(diào)度Pod的列表,也可以將其理解為不可調(diào)度隊列 unschedulable queue 。持有已嘗試進行調(diào)度且當前確定為不可調(diào)度的Pod。
三個隊列都是 Kubernetes 調(diào)度器的三個基本導向隊列,調(diào)度器將使用其內(nèi)置策略和其他算法來對它們進行維護和管理。
隊列機制將通過調(diào)用 Run() 創(chuàng)建兩個gorouteine進行從 podBackoffQ 到 activeQ 的 Pod流轉(zhuǎn),這一點下面會介紹。
我們看一下隊列常用的一些方法
// 從 activeQ 隊列中以阻塞的方式獲取一個Pod,會將增加 scheduling cycle 的值
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error){}
// 更新一個隊列,分兩種情況:
// 不存在:如果 pod 是全新的,則直接添加到 activeQ 隊列
// 已存在:如果 pod 在 activeQ 或 backoffQ 隊列中已存在,則直接更新;否則,如果pod以可調(diào)度的方式更新,它將從不可調(diào)度隊列 unschedulablePods 中刪除,并將更新后的pod添加到 activeQ 中(如修正了原來pod的錯誤后恢復正常)。
func (p *PriorityQueue) Update(oldPod, newPod *v1.Pod) error {}
// 從三個隊列中刪除 pod
func (p *PriorityQueue) Delete(pod *v1.Pod) error {}
// 將pod添加到 activeQ 隊列,如果添加的pod已經(jīng)在activeQ、backoffQ或unschedulablePods 存在,則不會有任何效果
func (p *PriorityQueue) Add(pod *v1.Pod) error {)
// 將給定的一組 Pod 從 unschedulablePods 或 backoffQ 隊列中移動到 activeQ 隊列中,前提是這些 Pod 正在 unschedulablePods 或 backoffQ 隊列中
func (p *PriorityQueue) Activate(pods map[string]*v1.Pod) {}
SetPodNominator
// 根據(jù) profiles 設置PodNominator
for _, fwk := range profiles {
fwk.SetPodNominator(podQueue)
}
指定每個 profile 對應的 Framework ,對于Pod是從 podQueue 隊列中獲取。
調(diào)度緩存
// 設置緩存,此時緩存服務自動處于運行狀態(tài)
schedulerCache := internalcache.New(durationToExpireAssumedPod, stopEverything)
// Setup cache debugger.
debugger := cachedebugger.New(nodeLister, podLister, schedulerCache, podQueue)
debugger.ListenForSignal(stopEverything)
調(diào)用函數(shù) internalcache.New(durationToExpireAssumedPod, stopEverything) 生成一個 goroutine 來過期的 Pod, 第一個參數(shù)為 ttl 表示Pod的過期時間,如果為0則表示記不過期,這里durationToExpireAssumedPod 的值為0;第二次參數(shù)是pod停止信號。
疑問:
為什么要添加一層 Cache 設計,在什么時候才發(fā)揮作用呢?
創(chuàng)建 scheduler 對象
sched := &Scheduler{
Cache: schedulerCache, // 緩存
client: client, // api server 客戶端
nodeInfoSnapshot: snapshot, // node 快照信息
percentageOfNodesToScore: options.percentageOfNodesToScore,
Extenders: extenders, // 擴展器
// 獲取Pod函數(shù),對應的是優(yōu)先隊列里的 Pop() 方法
NextPod: internalqueue.MakeNextPodFunc(podQueue),
StopEverything: stopEverything,
SchedulingQueue: podQueue, // 隊列
Profiles: profiles, // 配置文件
}
sched.applyDefaultHandlers()
addAllEventHandlers(sched, informerFactory, dynInformerFactory, unionedGVKs(clusterEventMap))
調(diào)用 sched.applyDefaultHandlers() 設置調(diào)度Pod函數(shù)和 調(diào)度失敗處理函數(shù)。
func (s *Scheduler) applyDefaultHandlers() {
s.SchedulePod = s.schedulePod
s.FailureHandler = s.handleSchedulingFailure
}
這里 s.schedulePod() 作為調(diào)度實現(xiàn)函數(shù),是我們關注的重點中的重點。
調(diào)度器執(zhí)行
上面做了好么多工作,只是為了調(diào)度器能夠執(zhí)行。對于調(diào)度器的執(zhí)行入口為
// cmd/kube-scheduler/app/server.go#L243
// Run executes the scheduler based on the given configuration. It only returns on error or when context is done.
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error {
...
// 調(diào)度器執(zhí)行
sched.Run(ctx)
}
這里調(diào)用了sched.Run() 函數(shù)真正將服務運行起來,我們看一下它做了哪些事。
// pkg/scheduler/scheduler.go#L337-L391
// Run begins watching and scheduling. It starts scheduling and blocked until the context is done.
func (sched *Scheduler) Run(ctx context.Context) {
// 1. 優(yōu)先隊列運行
sched.SchedulingQueue.Run()
// We need to start scheduleOne loop in a dedicated goroutine,
// because scheduleOne function hangs on getting the next item
// from the SchedulingQueue.
// If there are no new pods to schedule, it will be hanging there
// and if done in this goroutine it will be blocking closing
// SchedulingQueue, in effect causing a deadlock on shutdown.
// 2. 在單獨的goroutine里以 loop 方式調(diào)度Pod
go wait.UntilWithContext(ctx, sched.scheduleOne, 0)
// 調(diào)度服務結束
<-ctx.Done()
sched.SchedulingQueue.Close()
}
這里做了兩件事:
- 將優(yōu)先隊列服務運行起來
- 以 loop 的方式調(diào)用
sched.scheduleOne()調(diào)度Pod,一次調(diào)度一個Pod。
對于Pod的調(diào)度,這里指定了 UntilWithContext(ctx context.Context, f func(context.Context), period time.Duration) 函數(shù)的 period 參數(shù)為 0,表示是在一個 loop 中執(zhí)行 sched.scheduleOne()函數(shù)。
調(diào)度時會從 SchedulingQueue 優(yōu)先隊列里獲取一個Pod;如果沒有新的Pod要調(diào)度,它將一直阻塞。
下面我們看一下這兩塊的實現(xiàn)邏輯。
優(yōu)先隊列
首先通過調(diào)用 sched.SchedulingQueue.Run()啟用優(yōu)先隊列服務。
func (p *PriorityQueue) Run() {
go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop)
go wait.Until(p.flushUnschedulablePodsLeftover, 30*time.Second, p.stop)
}
這里就干兩件事:
- 每
10秒執(zhí)行一個p.flushBackoffQCompleted()函數(shù),將所有已完成的Pod從backoffQ隊列移動到activeQ隊列 - 每
30秒執(zhí)行一次flushUnschedulablePodsLeftover()函數(shù),將所有停留在unschedulablePods中時間超出 podMaxInUnschedulablePodsDuration 的Pod移動到backoffQ或activeQ隊列
這兩件事全部在單獨的 goroutine 里以 loop 的方式定時執(zhí)行,下面我們看一下它們兩者的實現(xiàn)。
flushBackoffQCompleted()
將所有已完成的Pod從 backoffQ 移動到 activeQ 隊列
func (p *PriorityQueue) flushBackoffQCompleted() {
p.lock.Lock()
defer p.lock.Unlock()
activated := false
for {
// 獲取隊列頭部元素,但不從隊列里刪除
rawPodInfo := p.podBackoffQ.Peek()
if rawPodInfo == nil {
break
}
pInfo := rawPodInfo.(*framework.QueuedPodInfo)
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
break
}
// 刪除獲取隊列頭部元素
_, err := p.podBackoffQ.Pop()
if err != nil {
klog.ErrorS(err, "Unable to pop pod from backoff queue despite backoff completion", "pod", klog.KObj(pod))
break
}
// 移入 activeQ 隊列
if added, _ := p.addToActiveQ(pInfo); added {
activated = true
}
}
if activated {
p.cond.Broadcast()
}
}
加入 activeQ 函數(shù)實現(xiàn)
func (p *PriorityQueue) addToActiveQ(pInfo *framework.QueuedPodInfo) (bool, error) {
// 執(zhí)行插件 runPreEnqueuePlugins
pInfo.Gated = !p.runPreEnqueuePlugins(context.Background(), pInfo)
if pInfo.Gated {
// Add the Pod to unschedulablePods if it's not passing PreEnqueuePlugins.
p.unschedulablePods.addOrUpdate(pInfo)
return false, nil
}
// 加入隊列,如果已在隊列則直接更新(什么情況下會在activeQ隊列存在?)
if err := p.activeQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the active queue", "pod", klog.KObj(pInfo.Pod))
return false, err
}
return true, nil
}
flushUnschedulablePodsLeftover()
將所有停留在 unschedulablePods 中時間超出 podMaxInUnschedulablePodsDuration 的Pod移動到 backoffQ 或 activeQ 隊列
func (p *PriorityQueue) flushUnschedulablePodsLeftover() {
var podsToMove []*framework.QueuedPodInfo
currentTime := p.clock.Now()
for _, pInfo := range p.unschedulablePods.podInfoMap {
lastScheduleTime := pInfo.Timestamp
if currentTime.Sub(lastScheduleTime) > p.podMaxInUnschedulablePodsDuration {
podsToMove = append(podsToMove, pInfo)
}
}
// 移入activeQ 或 BackoffQueue
if len(podsToMove) > 0 {
p.movePodsToActiveOrBackoffQueue(podsToMove, UnschedulableTimeout)
}
}
// 將 Pod 移動到 activeQ 或 backoffQ
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event framework.ClusterEvent) {
activated := false
for _, pInfo := range podInfoList {
if len(pInfo.UnschedulablePlugins) != 0 && !p.podMatchesEvent(pInfo, event) {
continue
}
pod := pInfo.Pod
if p.isPodBackingoff(pInfo) {
if err := p.podBackoffQ.Add(pInfo); err != nil {
klog.ErrorS(err, "Error adding pod to the backoff queue", "pod", klog.KObj(pod))
} else {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", backoffQName)
metrics.SchedulerQueueIncomingPods.WithLabelValues("backoff", event.Label).Inc()
// 從 unschedulablePods 里刪除Pod
p.unschedulablePods.delete(pod, pInfo.Gated)
}
} else {
gated := pInfo.Gated
// 添加到 activeQ 隊列,并從 unschedulablePods 中將其刪除
if added, _ := p.addToActiveQ(pInfo); added {
klog.V(5).InfoS("Pod moved to an internal scheduling queue", "pod", klog.KObj(pInfo.Pod), "event", event, "queue", activeQName)
activated = true
metrics.SchedulerQueueIncomingPods.WithLabelValues("active", event.Label).Inc()
p.unschedulablePods.delete(pod, gated)
}
}
}
p.moveRequestCycle = p.schedulingCycle
if activated {
p.cond.Broadcast()
}
}
下面我們先看一下調(diào)度Pod的邏輯,最后再看隊列相關的兩個函數(shù)
調(diào)度Pod
scheduleOne() 函數(shù)實現(xiàn)了一個 Pod 的整個調(diào)度過程,整個步驟分為:
- 從優(yōu)先隊列里獲取一個待調(diào)度的 Pod
- 根據(jù)Pod的信息,獲取指定的
Framework - 篩選出一個可能被調(diào)度的 Node,如果有
>1個節(jié)點的話,還需要對每個節(jié)點進行評分,找出評分最高的Node - 以
異步的方式綁定 Pod 與 Node
獲取待調(diào)度的Pod
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 1. 獲取一個 Pod, 如果為 nil 則直接返回取消
podInfo := sched.NextPod()
}
這里的 sched.NextPod() 在上面"創(chuàng)建 scheduler 對象"的時候已經(jīng)賦值過,它對應
// MakeNextPodFunc returns a function to retrieve the next pod from a given
// scheduling queue
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.QueuedPodInfo {
return func() *framework.QueuedPodInfo {
podInfo, err := queue.Pop()
if err == nil {
klog.V(4).InfoS("About to try and schedule pod", "pod", klog.KObj(podInfo.Pod))
for plugin := range podInfo.UnschedulablePlugins {
metrics.UnschedulableReason(plugin, podInfo.Pod.Spec.SchedulerName).Dec()
}
return podInfo
}
klog.ErrorS(err, "Error while retrieving next pod from scheduling queue")
return nil
}
}
這里調(diào)用 優(yōu)先隊列的 PriorityQueue.Pop() 方法獲取一個 Pod 信息。
// Pop removes the head of the active queue and returns it. It blocks if the
// activeQ is empty and waits until a new item is added to the queue. It
// increments scheduling cycle when a pod is popped.
func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) {
p.lock.Lock()
defer p.lock.Unlock()
// 阻塞
for p.activeQ.Len() == 0 {
// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
// When Close() is called, the p.closed is set and the condition is broadcast,
// which causes this loop to continue and return from the Pop().
if p.closed {
return nil, fmt.Errorf(queueClosed)
}
p.cond.Wait()
}
// 彈出Pod
obj, err := p.activeQ.Pop()
if err != nil {
return nil, err
}
// 遞增
pInfo := obj.(*framework.QueuedPodInfo)
pInfo.Attempts++
p.schedulingCycle++
return pInfo, nil
}
從注釋里我們可知,它是以block的方式從 actieQ 隊列獲取一個 Pod,如果隊列為空,則將一直等待,至到一個新的Pod被添加到隊列。同時當Pod被 Pop() 走后,scheduling cycle 的值將遞增。
獲取調(diào)度 Framework
根據(jù)當前Pod調(diào)度器 pod.schedulerName 獲取其對應的 Framework
func (sched *Scheduler) scheduleOne(ctx context.Context) {
// 2. 根據(jù)當前Pod調(diào)度器 pod.schedulerName 獲取其對應的 Framework
fwk, err := sched.frameworkForPod(pod)
// 檢查是否需要跳過調(diào)度
if sched.skipPodSchedule(fwk, pod) {
return
}
}
調(diào)用 sched.skipPodSchedule(fwk, pod) 函數(shù)來檢查是否需要跳過調(diào)度操作。如果這個Pod正處于刪除狀態(tài)(pod.DeletionTimestamp != nil)或者明確指定了不需要調(diào)度,則直接結束本次調(diào)度;
節(jié)點選擇
節(jié)點選擇有以下步驟:
- 找一個節(jié)點
- 修改
pod.Spec.NodeName = NodeName屬性,建立Pod與Node映射關系 - 執(zhí)行一系列插件
- 根據(jù)需要將一些Pod移到 activeQ 隊列
下面我們先看一下找節(jié)點這一步
找節(jié)點
為了能將執(zhí)行Pod,必須找一個合適的Node,這一步也是 k8s 中必須關注的一塊內(nèi)容
// 3. 獲取一個最合適的節(jié)點
scheduleResult, assumedPodInfo, status := sched.schedulingCycle(schedulingCycleCtx, state, fwk, podInfo, start, podsToActivate)
可以看到,對于節(jié)點的選擇是在 sched.schedulingCycle()函數(shù)里又調(diào)用了一個 sched.SchedulePod 函數(shù)來實現(xiàn)的。
// schedulingCycle tries to schedule a single Pod.
func (sched *Scheduler) schedulingCycle(
ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate,
) (ScheduleResult, *framework.QueuedPodInfo, *framework.Status) {
pod := podInfo.Pod
// 一、 獲取一個合適的節(jié)點,這塊下面有詳細介紹
scheduleResult, err := sched.SchedulePod(ctx, fwk, state, pod)
if err != nil {
...
return err
}
這里根據(jù) sched.SchedulePod() 源碼實現(xiàn),共有以下步驟:
- 更新節(jié)點快照
sched.nodeInfoSnapshot - 檢查節(jié)點快照
sched.nodeInfoSnapshot里節(jié)點數(shù)量,如果為0,則直接結束 - 基于 Framework 過濾插件和過濾擴展查找適合Pod的節(jié)點(plugins 和 extenders 發(fā)揮使用)
- 如果只有一個節(jié)點的話,直接返回使用這個節(jié)點并結束
- 如果存在多個節(jié)點的話,則調(diào)用
prioritizeNodes函數(shù),通過運行fwk.RunScorePlugins()函數(shù)對每個節(jié)點進行打分,并對每個節(jié)點記錄每個插件的打分和統(tǒng)計節(jié)點總得分 - 調(diào)用
selectHost()遍歷一次所有節(jié)點,找出來得分最高的節(jié)點
下面我們看一下源碼實現(xiàn),并在重要的地方做了一些注釋。
如果獲取節(jié)點成功,則返回 ScheduleResult,否則返回一個 FitError數(shù)據(jù)結構。
更新節(jié)點快照
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 1. 更新節(jié)點快照,為什么更新?為什么用?每調(diào)度一個Pod就會更新Node快照,性能是否存在問題
if err := sched.Cache.UpdateSnapshot(sched.nodeInfoSnapshot); err != nil {
return result, err
}
}
檢查快照節(jié)點數(shù)量
// schedulePod tries to schedule the given pod to one of the nodes in the node list.
func (sched *Scheduler) schedulePod(ctx context.Context, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {
// 2. 檢查節(jié)點快照有無節(jié)點
if sched.nodeInfoSnapshot.NumNodes() == 0 {
return result, ErrNoNodesAvailable
}
}
找出所有可使用的Node清單
然后為 Pod 尋找所有適合此Pod 運行的 node 清單
// 3. 查找合適的節(jié)點
feasibleNodes, diagnosis, err := sched.findNodesThatFitPod(ctx, fwk, state, pod)
只有一個節(jié)點直接使用
如果當前只有一個節(jié)點的話,則此時不用考慮其它情況,直接使用。
// 4. 正好只有一個節(jié)點,直接使用
// When only one node after predicate, just use it.
if len(feasibleNodes) == 1 {
return ScheduleResult{
SuggestedHost: feasibleNodes[0].Name,
EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),
FeasibleNodes: 1,
}, nil
}
多節(jié)點則進行打分
如果有多個節(jié)點的話,則在這些節(jié)點上調(diào)用函數(shù) RunScorePlugins() 在執(zhí)行插件評分。
// 5. 從多個節(jié)點中選擇最合適的節(jié)點
priorityList, err := prioritizeNodes(ctx, sched.Extenders, fwk, state, pod, feasibleNodes)
每個插件都會對當前節(jié)點進行計算,得出當前節(jié)點評分,同時還要將所有插件的評分累計,作為節(jié)點的總評分。
找出評分最高的節(jié)點
// 6. 從叢多節(jié)點中遍歷出來評分最高的那個,每個節(jié)點都會調(diào)用所有plugins對其進行評分
host, err := selectHost(priorityList)
然后遍歷所有節(jié)點,篩選出來得分最高的節(jié)點,這里的 host 變量即是節(jié)點的名稱。
最后將獲取節(jié)點返回
return ScheduleResult{
SuggestedHost: host, // 選擇的節(jié)點名稱(這里是指主機名還是系統(tǒng)內(nèi)部分配的一個標識?)
EvaluatedNodes: len(feasibleNodes) + len(diagnosis.NodeToStatusMap),
FeasibleNodes: len(feasibleNodes),
}, err
這里 ScheduleResult 結構體為
// ScheduleResult represents the result of scheduling a pod.
type ScheduleResult struct {
// 節(jié)點名稱
SuggestedHost string
//調(diào)度程序在篩選階段及以后評估pod的節(jié)點數(shù)
EvaluatedNodes int
// 評估后,適合pod的節(jié)點數(shù)量
FeasibleNodes int
// 調(diào)度周期相關
nominatingInfo *framework.NominatingInfo
}
至此,節(jié)點找好了,不過還有一些工作要做。
修改Pod的屬性
通過修改 Pod.Spec.NodeName=NodeName 屬性,建立Pod與Node之間的映射關系。告訴 cache 這個pod已與上面選擇以node建立了綁定關系(其實還沒有真正綁定),只是在cache里對其建立了綁定關系
func (sched *Scheduler) schedulingCycle(...) {
assumedPodInfo := podInfo.DeepCopy()
assumedPod := assumedPodInfo.Pod
// 二、修改 Pod.Spec.NodeName = NodeName,建立 Pod 與 Node 的映射關系
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
...
}
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error {
// pod 所在Node
assumed.Spec.NodeName = host
// 緩存更新
if err := sched.Cache.AssumePod(assumed); err != nil {
klog.ErrorS(err, "Scheduler cache AssumePod failed")
return err
}
// 從內(nèi)部緩存中刪除
if sched.SchedulingQueue != nil {
sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed)
}
return nil
}
執(zhí)行插件
執(zhí)行 reserve 插件的 Reserve() 方法 和 執(zhí)行 Permit 插件。
func (sched *Scheduler) schedulingCycle(...) {
// 三、執(zhí)行一些插件
// Run the Reserve method of reserve plugins.
if sts := fwk.RunReservePluginsReserve(ctx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() {}
// Run "permit" plugins.
runPermitStatus := fwk.RunPermitPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
if !runPermitStatus.IsWait() && !runPermitStatus.IsSuccess() {
}
}
將 Pod 轉(zhuǎn)到 activeQ 隊列
如果 podsToActive.Map 存在Pod,則將它們從 backoffQ 或unschedulablePods 中移到 activeQ
func (sched *Scheduler) schedulingCycle(...) {
// 四、podsToActivate.Map值被插件修改,什么情況下會插件會修改?
// At the end of a successful scheduling cycle, pop and move up Pods if needed.
if len(podsToActivate.Map) != 0 {
// 將pod 從 backoffQ 或unschedulablePods 中移到 activeQ
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Clear the entries after activation.
podsToActivate.Map = make(map[string]*v1.Pod)
}
return scheduleResult, assumedPodInfo, nil
}
疑問:
podsToActivate.Map 什么時候可能被修改?
綁定Pod與Node
// 異步綁定 Pod 與 node
go func() {
bindingCycleCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 綁定
status := sched.bindingCycle(bindingCycleCtx, state, fwk, scheduleResult, assumedPodInfo, start, podsToActivate)
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, fwk, assumedPodInfo, start, scheduleResult, status)
}
}()
可以看到對于pod與節(jié)點 node的綁定是異步的,綁定函數(shù)bindingCycle()實現(xiàn)
func (sched *Scheduler) bindingCycle( ctx context.Context,
state *framework.CycleState,
fwk framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *framework.Status {
assumedPod := assumedPodInfo.Pod
// 執(zhí)行 "permit" 插件.
if status := fwk.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
return status
}
// Run "prebind" plugins.
if status := fwk.RunPreBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost); !status.IsSuccess() {
return status
}
// Run "bind" plugins.
if status := sched.bind(ctx, fwk, assumedPod, scheduleResult.SuggestedHost, state); !status.IsSuccess() {
return status
}
// Run "postbind" plugins.
fwk.RunPostBindPlugins(ctx, state, assumedPod, scheduleResult.SuggestedHost)
// At the end of a successful binding cycle, move up Pods if needed.
if len(podsToActivate.Map) != 0 {
sched.SchedulingQueue.Activate(podsToActivate.Map)
// Unlike the logic in schedulingCycle(), we don't bother deleting the entries
// as `podsToActivate.Map` is no longer consumed.
}
return nil
}
可能看到在異步綁定 Pod與Node 時,將會依次執(zhí)行下面的插件
- permit
- prebind
- bind
- postbind
在最后,如果 podsToActive.Map 不為空的話,則需要將這些 Pod 移到 activeQ 隊列里,但這里并不將這些Pod信息從podsToActivate.Map 中刪除,這個與上面 schedulingCycle() 有些不一樣。
至此,Pod 與 Node 綁定關系已算結束了。
總結
可以看到插件在整個調(diào)度過程中極其的重要,這里并沒有對其進行詳細介紹。有時間的話,大家可以了解一下內(nèi)置插件的作用,同時還有擴展器的作用。