在深度學(xué)習(xí)場(chǎng)景下,大部分任務(wù)都需要批量調(diào)度功能,也就是需要保證多個(gè)Pod同時(shí)地調(diào)度。它主要算法就是all or nothing的算法,保證整個(gè)資源要么可以調(diào)度,要么就不要調(diào)度,如果無(wú)法調(diào)度那么就排隊(duì),保證整個(gè)資源不會(huì)被餓死。這是一個(gè)比較常見(jiàn)的需求。例如原生 K8s 調(diào)度器調(diào)度 100 個(gè) Pod 的作業(yè)時(shí),即使調(diào)度了 99 個(gè),最后 1 個(gè)因資源不足永遠(yuǎn)卡住,導(dǎo)致整個(gè)作業(yè)阻塞。
volcano就是實(shí)現(xiàn)gang-scheduling的批處理調(diào)度器,是Kubeflow生態(tài)中的一個(gè)組件。解決批處理場(chǎng)景特有的“要么全調(diào)要么不調(diào)”需求。其核心能力包括Gang Scheduling、公平調(diào)度 (Fair Sharing/Fair Queue)、隊(duì)列管理 (Queue)、作業(yè)管理 (Job Lifecycle)、高級(jí)調(diào)度策略 (Binpack, DRF, Topology)。
Volcano 核心概念
Volcano 引入了幾個(gè)新概念:
- Queue:用于管理和優(yōu)先級(jí)排序任務(wù),做資源隔離
- PodGroup:將一組 Pod 關(guān)聯(lián)起來(lái),實(shí)現(xiàn) Gang Scheduling 等策略的基礎(chǔ),是作業(yè)調(diào)度的原子單位
- VolcanoJob:用戶提交的作業(yè)單元
這些都是 K8s 里的自定義資源,也就是我們能夠通過(guò) kubectl 命令查到相應(yīng)的資源對(duì)象,好比 Deployment、Service、Pod 這些。
PodGroup 是一組強(qiáng)關(guān)聯(lián)的 Pod 集合,本質(zhì)是CRD。核心字段spec.minMember是Gang Scheduling 的關(guān)鍵! 作業(yè)成功運(yùn)行所需的最小 Pod 數(shù)量。調(diào)度器必須保證 >= minMember 個(gè) Pod 同時(shí)被調(diào)度運(yùn)行,作業(yè)才能推進(jìn)。
Queue 是一個(gè) PodGroup 隊(duì)列,也是一個(gè) CRD,用于分組作業(yè)、資源隔離、配額管理和優(yōu)先級(jí)控制。
VolcanoJob 是 Volcano 中的一個(gè)核心概念,其實(shí)還是用的K8s Operator 模式,擴(kuò)展了 Kubernetes 的 Job 資源。VolcanoJob 不僅包括了 Kubernetes Job 的所有特性,還加入了對(duì)批處理作業(yè)的額外支持,使得 Volcano 能夠更好地適應(yīng)高性能和大規(guī)模計(jì)算任務(wù)的需求。
VolcanoJob 根據(jù)配置去創(chuàng)建相應(yīng)的 PodGroup 出來(lái),邏輯上將屬于同一個(gè)作業(yè)(VolcanoJob)的所有 Pod 聚合在一起。而 PodGroup 最終會(huì)被當(dāng)做一個(gè)整體被 Volcano Scheduler 調(diào)度。在調(diào)度的過(guò)程中,Volcano 還用到了 Queue 來(lái)實(shí)現(xiàn) PodGroup 的排隊(duì)、優(yōu)先級(jí)控制等邏輯。
Volcano 調(diào)度核心流程
Volcano 調(diào)度主要包含兩個(gè)概念:
- Actions:enqueue、allocate、backfill 這些調(diào)度動(dòng)作
- Plugins:Action 中執(zhí)行的算法邏輯,就取決于注冊(cè)進(jìn)去的 plugins。
Volcano 的調(diào)度流程由一系列預(yù)定義的 Actions 按順序執(zhí)行構(gòu)成。每個(gè) Action 可以掛載多個(gè) Plugin 來(lái)實(shí)現(xiàn)具體策略。Action和Plugin是Volcano 強(qiáng)大靈活性的基石。Volcano 的核心調(diào)度策略(Gang, Fairness (DRF), Priority, Binpacking, Task Topology)都是通過(guò)實(shí)現(xiàn)特定的 Plugin 來(lái)完成的。Action 提供了框架,Plugin 填充了策略。
典型的 Action 順序是:
- Enqueue: 作業(yè)入隊(duì)
- Reclaim: 資源回收
- Allocate: 資源分配
- Preempt: 搶占
- Backfill: 回填
源碼分析
1. Action 實(shí)現(xiàn)
Volcano 在 pkg/scheduler 中放了調(diào)度器相關(guān)的代碼,里面有一個(gè) actions 目錄。
/pkg/scheduler/actions/factory.go#L29
func init() {
framework.RegisterAction(reclaim.New())
framework.RegisterAction(allocate.New())
framework.RegisterAction(backfill.New())
framework.RegisterAction(preempt.New())
framework.RegisterAction(enqueue.New())
framework.RegisterAction(shuffle.New())
}
可以看到這里注冊(cè)了6個(gè) actions。RegisterAction 方法的實(shí)現(xiàn)也很簡(jiǎn)單,有一個(gè) actionMap 來(lái)保存所有的 actions:
/pkg/scheduler/framework/plugins.go
var actionMap = map[string]Action{}
// RegisterAction register action
func RegisterAction(act Action) {
pluginMutex.Lock()
defer pluginMutex.Unlock()
actionMap[act.Name()] = act
}
2. 調(diào)度器邏輯
Run() 方法負(fù)責(zé)啟動(dòng)一個(gè) Volcano 調(diào)度器:
/cmd/scheduler/main.go#L83
if err := app.Run(s); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
sched, err := scheduler.NewScheduler(config, opt)
run := func(ctx context.Context) {
sched.Run(ctx.Done())
<-ctx.Done()
}
NewScheduler() 方法:
/pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
//...
cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners, opt.ResyncPeriod)
scheduler := &Scheduler{
schedulerConf: opt.SchedulerConf,
fileWatcher: watcher,
cache: cache,
schedulePeriod: opt.SchedulePeriod,
dumper: schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
}
return scheduler, nil
}
Scheduler 對(duì)象:
type Scheduler struct {
cache schedcache.Cache
schedulerConf string
fileWatcher filewatcher.FileWatcher
schedulePeriod time.Duration
once sync.Once
mutex sync.Mutex
actions []framework.Action
plugins []conf.Tier
configurations []conf.Configuration
metricsConf map[string]string
dumper schedcache.Dumper
}
Run() 方法:
func (pc *Scheduler) Run(stopCh <-chan struct{}) {
pc.loadSchedulerConf()
go pc.watchSchedulerConf(stopCh)
// Start cache for policy.
pc.cache.SetMetricsConf(pc.metricsConf)
pc.cache.Run(stopCh)
klog.V(2).Infof("Scheduler completes Initialization and start to run")
go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
if options.ServerOpts.EnableCacheDumper {
pc.dumper.ListenForSignal(stopCh)
}
go runSchedulerSocket()
}
這里就是 Scheduler 的執(zhí)行邏輯了,runOnce 方法被周期性調(diào)用:
func (pc *Scheduler) runOnce() {
//...
// Load ConfigMap to check which action is enabled.
conf.EnabledActionMap = make(map[string]bool)
for _, action := range actions {
conf.EnabledActionMap[action.Name()] = true
}
ssn := framework.OpenSession(pc.cache, plugins, configurations)
defer func() {
framework.CloseSession(ssn)
metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
}()
for _, action := range actions {
actionStartTime := time.Now()
action.Execute(ssn)
metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
}
}
可以看到在 runOnce 中的2個(gè)關(guān)鍵步驟:
- ssn := framework.OpenSession(pc.cache, plugins, configurations)遍歷 actions,
- 調(diào)用 action.Execute(ssn)
這里的 actions 集合是什么呢?OpenSession 拿到的 plugins 又是啥呢?
進(jìn)一步跟代碼可以找到如下默認(rèn)配置:
var DefaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
- name: priority
- name: gang
- name: conformance
- plugins:
- name: overcommit
- name: drf
- name: predicates
- name: proportion
- name: nodeorder
所以默認(rèn)配置下,執(zhí)行的 actions 是 enqueue, allocate, backfill 三個(gè)。
3. actions 和 plugins 的調(diào)用邏輯
前面看framework.OpenSession() 函數(shù)打開(kāi)了一個(gè) Session。不過(guò)什么是 Session 呢?來(lái)具體看下 OpenSession() 函數(shù)的實(shí)現(xiàn):
/pkg/scheduler/framework/framework.go
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
ssn := openSession(cache)
ssn.Tiers = tiers
ssn.Configurations = configurations
ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
ssn.PodLister = NewPodLister(ssn)
for _, tier := range tiers {
for _, plugin := range tier.Plugins {
if pb, found := GetPluginBuilder(plugin.Name); !found {
klog.Errorf("Failed to get plugin %s.", plugin.Name)
} else {
plugin := pb(plugin.Arguments)
ssn.plugins[plugin.Name()] = plugin
onSessionOpenStart := time.Now()
plugin.OnSessionOpen(ssn)
metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
}
}
}
return ssn
}
在 OpenSession() 函數(shù)中,plugins 被遍歷,然后依次調(diào)用 plugin.OnSessionOpen(ssn) 方法。
一個(gè)個(gè) plugins 注冊(cè)具體的算法函數(shù)到 Session 里,然后 actions 順序執(zhí)行的過(guò)程中,到 Session 里去取相應(yīng)的算法函數(shù)來(lái)執(zhí)行。
| Plugin Function | Purpose | Used By |
|---|---|---|
| JobOrderFn | Job priority comparison | allocate, preempt, reclaim |
| TaskOrderFn | Task priority within jobs | allocate, preempt, reclaim |
| PredicateFn | Node feasibility filtering | allocate, preempt, reclaim |
| NodeOrderFn | Node scoring and ranking | allocate, backfill |
| JobValidFn | Job admission validation | enqueue |
| Preemptable | Victim task selection | preempt, reclaim |

Session作為調(diào)度周期上下文,承載了作業(yè)/隊(duì)列狀態(tài)快照Cache和中間結(jié)果Statement,是決策的基礎(chǔ),保證調(diào)度的事務(wù)性。
4. Action 分析:enqueue
Enqueue: 作業(yè)入隊(duì)。檢查 Queue 狀態(tài)是否 Open、資源配額是否足夠、優(yōu)先級(jí)等,決定哪些 PodGroup(作業(yè)) 有資格進(jìn)入本次調(diào)度周期。這是批處理資源隊(duì)列管理的關(guān)鍵!
enqueue Action 的 Execute() 方法如下:
/pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
queueSet := sets.NewString()
jobsMap := map[api.QueueID]*util.PriorityQueue{}
for _, job := range ssn.Jobs {
//...
}
for {
//...
}
這里的 queues 是一個(gè) Priority Queue,隊(duì)列的實(shí)現(xiàn)用了 heap 包,實(shí)現(xiàn)了一個(gè)“最大堆”,也就是每次 Pop() 會(huì)拿到一個(gè)優(yōu)先級(jí)最高的 item。執(zhí)行了2個(gè) for 循環(huán)。
先看第一個(gè)for循環(huán):
for _, job := range ssn.Jobs {
if job.ScheduleStartTimestamp.IsZero() {
ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
Time: time.Now(),
}
}
if queue, found := ssn.Queues[job.Queue]; !found {
klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
job.Queue, job.Namespace, job.Name)
continue
} else if !queueSet.Has(string(queue.UID)) {
klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
queue.Name, job.Namespace, job.Name)
queueSet.Insert(string(queue.UID))
queues.Push(queue)
}
if job.IsPending() {
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
}
這個(gè) for 循環(huán)主要做2件事情,一個(gè)是遍歷 jobs 的過(guò)程中判斷用到了哪些 Queue(K8s 自定義資源對(duì)象),將這些 Queue 保存到 queueSet 和 queues 中;另外一個(gè)就是將處于 Pending 狀態(tài)的 jobs 加入到 jobsMap 中。
再看第二個(gè)無(wú)限循環(huán):
for {
if queues.Empty() {
break
}
queue := queues.Pop().(*api.QueueInfo)
// skip the Queue that has no pending job
jobs, found := jobsMap[queue.UID]
if !found || jobs.Empty() {
continue
}
job := jobs.Pop().(*api.JobInfo)
// 符合條件的 PodGroup 被標(biāo)記為“可調(diào)度”,其 Pod 進(jìn)入后續(xù)環(huán)節(jié)。
if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
ssn.JobEnqueued(job)
job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
ssn.Jobs[job.UID] = job
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
這個(gè)循環(huán)的邏輯是消化隊(duì)列里的 jobs。首先將全局隊(duì)列按照優(yōu)先級(jí) Pop 一個(gè)高優(yōu)隊(duì)列出來(lái),然后根據(jù)這個(gè)隊(duì)列的 UID 找到本地 jobsMap 里對(duì)應(yīng)的 jobs 隊(duì)列,這又是一個(gè)優(yōu)先級(jí)隊(duì)列。最后從這個(gè)優(yōu)先級(jí)隊(duì)列中 Pop 一個(gè)高優(yōu) Job 出來(lái),篩選有資格參與本次調(diào)度的 PodGroup (作業(yè)),符合條件的 PodGroup 被標(biāo)記為“可調(diào)度”。
總的來(lái)說(shuō),enqueue 過(guò)程就是按照隊(duì)列的優(yōu)先級(jí)順序,將隊(duì)列中的 jobs 再按照優(yōu)先級(jí)依次標(biāo)記為 "Inqueue" 狀態(tài)(job.PodGroup.Status.Phase = "Inqueue")。
5. Action 分析:allocate
Allocate: 資源分配 (核心中的核心)。嘗試為待調(diào)度的 Pod 分配節(jié)點(diǎn)。
allocate.Execute() 方法的實(shí)現(xiàn)如下:
/pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
klog.V(5).Infof("Enter Allocate ...")
defer klog.V(5).Infof("Leaving Allocate ...")
alloc.parseArguments(ssn)
// the allocation for pod may have many stages
// 1. pick a queue named Q (using ssn.QueueOrderFn)
// 2. pick a job named J from Q (using ssn.JobOrderFn)
// 3. pick a task T from J (using ssn.TaskOrderFn)
// 4. use predicateFn to filter out node that T can not be allocated on.
// 5. use ssn.NodeOrderFn to judge the best node and assign it to T
// queues sort queues by QueueOrderFn.
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
// jobsMap is used to find job with the highest priority in given queue.
jobsMap := map[api.QueueID]*util.PriorityQueue{}
alloc.session = ssn
alloc.pickUpQueuesAndJobs(queues, jobsMap)
klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
alloc.allocateResources(queues, jobsMap)
}
調(diào)用了plugin注冊(cè)到session的函數(shù):QueueOrderFn等。
validJobFn, JobOrderFn封裝在pickUpQueuesAndJobs方法。
TaskOrderFn,PredicateFn,NodeOrderFn封裝在allocateResources方法。
代碼經(jīng)過(guò)多次重構(gòu)有點(diǎn)晦澀,主流程其實(shí)挺簡(jiǎn)單,這個(gè)過(guò)程包括作業(yè)的predicate和prioritize。使用predicateFn預(yù)選,過(guò)濾掉不能分配作業(yè)的node;使用NodeOrderFn打分來(lái)找到最適合的分配節(jié)點(diǎn):

pickUpQueuesAndJobs的邏輯是一個(gè)for循環(huán),遍歷 jobs,將其按照 queue 不同存到 jobsMap 中。
func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
ssn := alloc.session
for _, job := range ssn.Jobs {
// If not config enqueue action, change Pending pg into Inqueue state to avoid blocking job scheduling.
//...
if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
continue
}
//...
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
queues.Push(ssn.Queues[job.Queue])
}
jobsMap[job.Queue].Push(job)
}
}
allocateResources的邏輯主要是按照優(yōu)先級(jí)依次給 tasks 尋找最合適的 node,找到后“預(yù)占”資源,于是按順序逐步給所有的 tasks 都找到了最佳節(jié)點(diǎn)。
// prioritizeNodes selects the highest score node.
// 尋找最優(yōu)節(jié)點(diǎn)
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
// Allocate idle resource to the task.
// 將前面找到的最佳節(jié)點(diǎn)相應(yīng)資源分配給當(dāng)前 task
if err := alloc.allocateResourcesForTask(stmt, task, bestNode, job); err == nil {
jobNewAllocatedHyperNode = getJobNewAllocatedHyperNode(ssn, bestNode.Name, job, jobNewAllocatedHyperNode)
}
分配最佳節(jié)點(diǎn)后,調(diào)用Commit方法真正綁定到 K8s API Server。調(diào)用 K8s Binding API (kubeClient.CoreV1().Pods().Bind()) 設(shè)置 Pod 的 .spec.nodeName。并且更新 PodGroup 狀態(tài)。
stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
if stmt != nil {
stmt.Commit()
}
這里Action用Statement模式記錄原子操作,保證調(diào)度決策的一致性。

6. Gang scheduling
Gang調(diào)度策略是volcano-scheduler的核心調(diào)度算法之一,它滿足了調(diào)度過(guò)程中的“All or nothing”的調(diào)度需求,避免Pod的任意調(diào)度導(dǎo)致集群資源的浪費(fèi)。具體算法是,觀察Job下的Pod已調(diào)度數(shù)量是否滿足了最小運(yùn)行數(shù)量,當(dāng)Job的最小運(yùn)行數(shù)量得到滿足時(shí),為Job下的所有Pod執(zhí)行調(diào)度動(dòng)作,否則,不執(zhí)行。

Gang調(diào)度策略是通過(guò)Gang plugin集成validation和ordering函數(shù)實(shí)現(xiàn)。session初始化的時(shí)候Gang plugin注冊(cè)了如下幾個(gè)關(guān)鍵函數(shù)。這些函數(shù)在上面Action:allocate中被調(diào)用。
| Function | Purpose |
|---|---|
| JobOrderFn | Orders jobs for gang scheduling priority |
| JobValidFn | Validates if job meets gang requirements |
| JobReadyFn | Determines if job is ready to be scheduled |
| PreemptableFn | Protects gang members from preemption |
validation函數(shù):
pkg/scheduler/plugins/gang/gang.go
validJobFn := func(obj interface{}) *api.ValidateResult {
job, ok := obj.(*api.JobInfo)
if !ok {
return &api.ValidateResult{
Pass: false,
Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
}
}
if valid := job.CheckTaskValid(); !valid {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsOfTaskReason,
Message: "Not enough valid pods of each task for gang-scheduling",
}
}
vtn := job.ValidTaskNum()
if vtn < job.MinAvailable {
return &api.ValidateResult{
Pass: false,
Reason: v1beta1.NotEnoughPodsReason,
Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
vtn, job.MinAvailable),
}
}
return nil
}
ordering函數(shù):
jobOrderFn := func(l, r interface{}) int {
lv := l.(*api.JobInfo)
rv := r.(*api.JobInfo)
lReady := lv.IsReady()
rReady := rv.IsReady()
klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t",
lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady)
if lReady && rReady {
return 0
}
if lReady {
return 1
}
if rReady {
return -1
}
return 0
}