批處理調(diào)度器volcano

在深度學(xué)習(xí)場(chǎng)景下,大部分任務(wù)都需要批量調(diào)度功能,也就是需要保證多個(gè)Pod同時(shí)地調(diào)度。它主要算法就是all or nothing的算法,保證整個(gè)資源要么可以調(diào)度,要么就不要調(diào)度,如果無(wú)法調(diào)度那么就排隊(duì),保證整個(gè)資源不會(huì)被餓死。這是一個(gè)比較常見(jiàn)的需求。例如原生 K8s 調(diào)度器調(diào)度 100 個(gè) Pod 的作業(yè)時(shí),即使調(diào)度了 99 個(gè),最后 1 個(gè)因資源不足永遠(yuǎn)卡住,導(dǎo)致整個(gè)作業(yè)阻塞。

volcano就是實(shí)現(xiàn)gang-scheduling的批處理調(diào)度器,是Kubeflow生態(tài)中的一個(gè)組件。解決批處理場(chǎng)景特有的“要么全調(diào)要么不調(diào)”需求。其核心能力包括Gang Scheduling、公平調(diào)度 (Fair Sharing/Fair Queue)、隊(duì)列管理 (Queue)、作業(yè)管理 (Job Lifecycle)、高級(jí)調(diào)度策略 (Binpack, DRF, Topology)。

Volcano 核心概念

Volcano 引入了幾個(gè)新概念:

  • Queue:用于管理和優(yōu)先級(jí)排序任務(wù),做資源隔離
  • PodGroup:將一組 Pod 關(guān)聯(lián)起來(lái),實(shí)現(xiàn) Gang Scheduling 等策略的基礎(chǔ),是作業(yè)調(diào)度的原子單位
  • VolcanoJob:用戶提交的作業(yè)單元

這些都是 K8s 里的自定義資源,也就是我們能夠通過(guò) kubectl 命令查到相應(yīng)的資源對(duì)象,好比 Deployment、Service、Pod 這些。

PodGroup 是一組強(qiáng)關(guān)聯(lián)的 Pod 集合,本質(zhì)是CRD。核心字段spec.minMember是Gang Scheduling 的關(guān)鍵! 作業(yè)成功運(yùn)行所需的最小 Pod 數(shù)量。調(diào)度器必須保證 >= minMember 個(gè) Pod 同時(shí)被調(diào)度運(yùn)行,作業(yè)才能推進(jìn)。

Queue 是一個(gè) PodGroup 隊(duì)列,也是一個(gè) CRD,用于分組作業(yè)、資源隔離、配額管理和優(yōu)先級(jí)控制。

VolcanoJob 是 Volcano 中的一個(gè)核心概念,其實(shí)還是用的K8s Operator 模式,擴(kuò)展了 Kubernetes 的 Job 資源。VolcanoJob 不僅包括了 Kubernetes Job 的所有特性,還加入了對(duì)批處理作業(yè)的額外支持,使得 Volcano 能夠更好地適應(yīng)高性能和大規(guī)模計(jì)算任務(wù)的需求。

VolcanoJob 根據(jù)配置去創(chuàng)建相應(yīng)的 PodGroup 出來(lái),邏輯上將屬于同一個(gè)作業(yè)(VolcanoJob)的所有 Pod 聚合在一起。而 PodGroup 最終會(huì)被當(dāng)做一個(gè)整體被 Volcano Scheduler 調(diào)度。在調(diào)度的過(guò)程中,Volcano 還用到了 Queue 來(lái)實(shí)現(xiàn) PodGroup 的排隊(duì)、優(yōu)先級(jí)控制等邏輯。

Volcano 調(diào)度核心流程

Volcano 調(diào)度主要包含兩個(gè)概念:

  • Actions:enqueue、allocate、backfill 這些調(diào)度動(dòng)作
  • Plugins:Action 中執(zhí)行的算法邏輯,就取決于注冊(cè)進(jìn)去的 plugins。

Volcano 的調(diào)度流程由一系列預(yù)定義的 Actions 按順序執(zhí)行構(gòu)成。每個(gè) Action 可以掛載多個(gè) Plugin 來(lái)實(shí)現(xiàn)具體策略。Action和Plugin是Volcano 強(qiáng)大靈活性的基石。Volcano 的核心調(diào)度策略(Gang, Fairness (DRF), Priority, Binpacking, Task Topology)都是通過(guò)實(shí)現(xiàn)特定的 Plugin 來(lái)完成的。Action 提供了框架,Plugin 填充了策略。
典型的 Action 順序是:

  • Enqueue: 作業(yè)入隊(duì)
  • Reclaim: 資源回收
  • Allocate: 資源分配
  • Preempt: 搶占
  • Backfill: 回填

源碼分析

1. Action 實(shí)現(xiàn)

Volcano 在 pkg/scheduler 中放了調(diào)度器相關(guān)的代碼,里面有一個(gè) actions 目錄。

/pkg/scheduler/actions/factory.go#L29
func init() {
    framework.RegisterAction(reclaim.New())
    framework.RegisterAction(allocate.New())
    framework.RegisterAction(backfill.New())
    framework.RegisterAction(preempt.New())
    framework.RegisterAction(enqueue.New())
    framework.RegisterAction(shuffle.New())
}

可以看到這里注冊(cè)了6個(gè) actions。RegisterAction 方法的實(shí)現(xiàn)也很簡(jiǎn)單,有一個(gè) actionMap 來(lái)保存所有的 actions:

/pkg/scheduler/framework/plugins.go
var actionMap = map[string]Action{}

// RegisterAction register action
func RegisterAction(act Action) {
    pluginMutex.Lock()
    defer pluginMutex.Unlock()

    actionMap[act.Name()] = act
}

2. 調(diào)度器邏輯

Run() 方法負(fù)責(zé)啟動(dòng)一個(gè) Volcano 調(diào)度器:

/cmd/scheduler/main.go#L83
if err := app.Run(s); err != nil {
        fmt.Fprintf(os.Stderr, "%v\n", err)
        os.Exit(1)
    }

sched, err := scheduler.NewScheduler(config, opt)
run := func(ctx context.Context) {
        sched.Run(ctx.Done())
        <-ctx.Done()
    }

NewScheduler() 方法:

/pkg/scheduler/scheduler.go
// NewScheduler returns a Scheduler
func NewScheduler(config *rest.Config, opt *options.ServerOption) (*Scheduler, error) {
    //...
    cache := schedcache.New(config, opt.SchedulerNames, opt.DefaultQueue, opt.NodeSelector, opt.NodeWorkerThreads, opt.IgnoredCSIProvisioners, opt.ResyncPeriod)
    scheduler := &Scheduler{
        schedulerConf:  opt.SchedulerConf,
        fileWatcher:    watcher,
        cache:          cache,
        schedulePeriod: opt.SchedulePeriod,
        dumper:         schedcache.Dumper{Cache: cache, RootDir: opt.CacheDumpFileDir},
    }

    return scheduler, nil
}

Scheduler 對(duì)象:

type Scheduler struct {
    cache          schedcache.Cache
    schedulerConf  string
    fileWatcher    filewatcher.FileWatcher
    schedulePeriod time.Duration
    once           sync.Once

    mutex          sync.Mutex
    actions        []framework.Action
    plugins        []conf.Tier
    configurations []conf.Configuration
    metricsConf    map[string]string
    dumper         schedcache.Dumper
}

Run() 方法:

func (pc *Scheduler) Run(stopCh <-chan struct{}) {
    pc.loadSchedulerConf()
    go pc.watchSchedulerConf(stopCh)
    // Start cache for policy.
    pc.cache.SetMetricsConf(pc.metricsConf)
    pc.cache.Run(stopCh)
    klog.V(2).Infof("Scheduler completes Initialization and start to run")
    go wait.Until(pc.runOnce, pc.schedulePeriod, stopCh)
    if options.ServerOpts.EnableCacheDumper {
        pc.dumper.ListenForSignal(stopCh)
    }
    go runSchedulerSocket()
}

這里就是 Scheduler 的執(zhí)行邏輯了,runOnce 方法被周期性調(diào)用:

func (pc *Scheduler) runOnce() {
    //...
    // Load ConfigMap to check which action is enabled.
    conf.EnabledActionMap = make(map[string]bool)
    for _, action := range actions {
        conf.EnabledActionMap[action.Name()] = true
    }

    ssn := framework.OpenSession(pc.cache, plugins, configurations)
    defer func() {
        framework.CloseSession(ssn)
        metrics.UpdateE2eDuration(metrics.Duration(scheduleStartTime))
    }()

    for _, action := range actions {
        actionStartTime := time.Now()
        action.Execute(ssn)
        metrics.UpdateActionDuration(action.Name(), metrics.Duration(actionStartTime))
    }
}

可以看到在 runOnce 中的2個(gè)關(guān)鍵步驟:

  1. ssn := framework.OpenSession(pc.cache, plugins, configurations)遍歷 actions,
  2. 調(diào)用 action.Execute(ssn)
    這里的 actions 集合是什么呢?OpenSession 拿到的 plugins 又是啥呢?
    進(jìn)一步跟代碼可以找到如下默認(rèn)配置:
var DefaultSchedulerConf = `
actions: "enqueue, allocate, backfill"
tiers:
- plugins:
  - name: priority
  - name: gang
  - name: conformance
- plugins:
  - name: overcommit
  - name: drf
  - name: predicates
  - name: proportion
  - name: nodeorder

所以默認(rèn)配置下,執(zhí)行的 actions 是 enqueue, allocate, backfill 三個(gè)。

3. actions 和 plugins 的調(diào)用邏輯

前面看framework.OpenSession() 函數(shù)打開(kāi)了一個(gè) Session。不過(guò)什么是 Session 呢?來(lái)具體看下 OpenSession() 函數(shù)的實(shí)現(xiàn):

/pkg/scheduler/framework/framework.go
// OpenSession start the session
func OpenSession(cache cache.Cache, tiers []conf.Tier, configurations []conf.Configuration) *Session {
    ssn := openSession(cache)
    ssn.Tiers = tiers
    ssn.Configurations = configurations
    ssn.NodeMap = GenerateNodeMapAndSlice(ssn.Nodes)
    ssn.PodLister = NewPodLister(ssn)

    for _, tier := range tiers {
        for _, plugin := range tier.Plugins {
            if pb, found := GetPluginBuilder(plugin.Name); !found {
                klog.Errorf("Failed to get plugin %s.", plugin.Name)
            } else {
                plugin := pb(plugin.Arguments)
                ssn.plugins[plugin.Name()] = plugin
                onSessionOpenStart := time.Now()
                plugin.OnSessionOpen(ssn)
                metrics.UpdatePluginDuration(plugin.Name(), metrics.OnSessionOpen, metrics.Duration(onSessionOpenStart))
            }
        }
    }
    return ssn
}

在 OpenSession() 函數(shù)中,plugins 被遍歷,然后依次調(diào)用 plugin.OnSessionOpen(ssn) 方法。
一個(gè)個(gè) plugins 注冊(cè)具體的算法函數(shù)到 Session 里,然后 actions 順序執(zhí)行的過(guò)程中,到 Session 里去取相應(yīng)的算法函數(shù)來(lái)執(zhí)行。

Plugin Function Purpose Used By
JobOrderFn Job priority comparison allocate, preempt, reclaim
TaskOrderFn Task priority within jobs allocate, preempt, reclaim
PredicateFn Node feasibility filtering allocate, preempt, reclaim
NodeOrderFn Node scoring and ranking allocate, backfill
JobValidFn Job admission validation enqueue
Preemptable Victim task selection preempt, reclaim
Volcano scheduler工作流

Session作為調(diào)度周期上下文,承載了作業(yè)/隊(duì)列狀態(tài)快照Cache和中間結(jié)果Statement,是決策的基礎(chǔ),保證調(diào)度的事務(wù)性。

4. Action 分析:enqueue

Enqueue: 作業(yè)入隊(duì)。檢查 Queue 狀態(tài)是否 Open、資源配額是否足夠、優(yōu)先級(jí)等,決定哪些 PodGroup(作業(yè)) 有資格進(jìn)入本次調(diào)度周期。這是批處理資源隊(duì)列管理的關(guān)鍵!

enqueue Action 的 Execute() 方法如下:

/pkg/scheduler/actions/enqueue/enqueue.go
func (enqueue *Action) Execute(ssn *framework.Session) {

    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    queueSet := sets.NewString()
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    for _, job := range ssn.Jobs {
        //...
        }

    for {
        //...
    }

這里的 queues 是一個(gè) Priority Queue,隊(duì)列的實(shí)現(xiàn)用了 heap 包,實(shí)現(xiàn)了一個(gè)“最大堆”,也就是每次 Pop() 會(huì)拿到一個(gè)優(yōu)先級(jí)最高的 item。執(zhí)行了2個(gè) for 循環(huán)。
先看第一個(gè)for循環(huán):

    for _, job := range ssn.Jobs {
        if job.ScheduleStartTimestamp.IsZero() {
            ssn.Jobs[job.UID].ScheduleStartTimestamp = metav1.Time{
                Time: time.Now(),
            }
        }
        if queue, found := ssn.Queues[job.Queue]; !found {
            klog.Errorf("Failed to find Queue <%s> for Job <%s/%s>",
                job.Queue, job.Namespace, job.Name)
            continue
        } else if !queueSet.Has(string(queue.UID)) {
            klog.V(5).Infof("Added Queue <%s> for Job <%s/%s>",
                queue.Name, job.Namespace, job.Name)

            queueSet.Insert(string(queue.UID))
            queues.Push(queue)
        }

        if job.IsPending() {
            if _, found := jobsMap[job.Queue]; !found {
                jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
            }
            klog.V(5).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
            jobsMap[job.Queue].Push(job)
        }
    }

這個(gè) for 循環(huán)主要做2件事情,一個(gè)是遍歷 jobs 的過(guò)程中判斷用到了哪些 Queue(K8s 自定義資源對(duì)象),將這些 Queue 保存到 queueSet 和 queues 中;另外一個(gè)就是將處于 Pending 狀態(tài)的 jobs 加入到 jobsMap 中。
再看第二個(gè)無(wú)限循環(huán):

    for {
        if queues.Empty() {
            break
        }

        queue := queues.Pop().(*api.QueueInfo)

        // skip the Queue that has no pending job
        jobs, found := jobsMap[queue.UID]
        if !found || jobs.Empty() {
            continue
        }
        job := jobs.Pop().(*api.JobInfo)

        // 符合條件的 PodGroup 被標(biāo)記為“可調(diào)度”,其 Pod 進(jìn)入后續(xù)環(huán)節(jié)。
        if job.PodGroup.Spec.MinResources == nil || ssn.JobEnqueueable(job) {
            ssn.JobEnqueued(job)
            job.PodGroup.Status.Phase = scheduling.PodGroupInqueue
            ssn.Jobs[job.UID] = job
        }

        // Added Queue back until no job in Queue.
        queues.Push(queue)
    }

這個(gè)循環(huán)的邏輯是消化隊(duì)列里的 jobs。首先將全局隊(duì)列按照優(yōu)先級(jí) Pop 一個(gè)高優(yōu)隊(duì)列出來(lái),然后根據(jù)這個(gè)隊(duì)列的 UID 找到本地 jobsMap 里對(duì)應(yīng)的 jobs 隊(duì)列,這又是一個(gè)優(yōu)先級(jí)隊(duì)列。最后從這個(gè)優(yōu)先級(jí)隊(duì)列中 Pop 一個(gè)高優(yōu) Job 出來(lái),篩選有資格參與本次調(diào)度的 PodGroup (作業(yè)),符合條件的 PodGroup 被標(biāo)記為“可調(diào)度”。

總的來(lái)說(shuō),enqueue 過(guò)程就是按照隊(duì)列的優(yōu)先級(jí)順序,將隊(duì)列中的 jobs 再按照優(yōu)先級(jí)依次標(biāo)記為 "Inqueue" 狀態(tài)(job.PodGroup.Status.Phase = "Inqueue")。

5. Action 分析:allocate

Allocate: 資源分配 (核心中的核心)。嘗試為待調(diào)度的 Pod 分配節(jié)點(diǎn)。

allocate.Execute() 方法的實(shí)現(xiàn)如下:

/pkg/scheduler/actions/allocate/allocate.go
func (alloc *Action) Execute(ssn *framework.Session) {
    klog.V(5).Infof("Enter Allocate ...")
    defer klog.V(5).Infof("Leaving Allocate ...")

    alloc.parseArguments(ssn)

    // the allocation for pod may have many stages
    // 1. pick a queue named Q (using ssn.QueueOrderFn)
    // 2. pick a job named J from Q (using ssn.JobOrderFn)
    // 3. pick a task T from J (using ssn.TaskOrderFn)
    // 4. use predicateFn to filter out node that T can not be allocated on.
    // 5. use ssn.NodeOrderFn to judge the best node and assign it to T

    // queues sort queues by QueueOrderFn.
    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    // jobsMap is used to find job with the highest priority in given queue.
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    alloc.session = ssn
    alloc.pickUpQueuesAndJobs(queues, jobsMap)
    klog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
    alloc.allocateResources(queues, jobsMap)
}

調(diào)用了plugin注冊(cè)到session的函數(shù):QueueOrderFn等。
validJobFn, JobOrderFn封裝在pickUpQueuesAndJobs方法。
TaskOrderFn,PredicateFn,NodeOrderFn封裝在allocateResources方法。
代碼經(jīng)過(guò)多次重構(gòu)有點(diǎn)晦澀,主流程其實(shí)挺簡(jiǎn)單,這個(gè)過(guò)程包括作業(yè)的predicate和prioritize。使用predicateFn預(yù)選,過(guò)濾掉不能分配作業(yè)的node;使用NodeOrderFn打分來(lái)找到最適合的分配節(jié)點(diǎn):


allocate流程

pickUpQueuesAndJobs的邏輯是一個(gè)for循環(huán),遍歷 jobs,將其按照 queue 不同存到 jobsMap 中。

func (alloc *Action) pickUpQueuesAndJobs(queues *util.PriorityQueue, jobsMap map[api.QueueID]*util.PriorityQueue) {
    ssn := alloc.session
    for _, job := range ssn.Jobs {
        // If not config enqueue action, change Pending pg into Inqueue state to avoid blocking job scheduling.
        //...
        if vr := ssn.JobValid(job); vr != nil && !vr.Pass {
            klog.V(4).Infof("Job <%s/%s> Queue <%s> skip allocate, reason: %v, message %v", job.Namespace, job.Name, job.Queue, vr.Reason, vr.Message)
            continue
        }
        //...
        if _, found := jobsMap[job.Queue]; !found {
            jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
            queues.Push(ssn.Queues[job.Queue])
        }
        jobsMap[job.Queue].Push(job)
    }
}

allocateResources的邏輯主要是按照優(yōu)先級(jí)依次給 tasks 尋找最合適的 node,找到后“預(yù)占”資源,于是按順序逐步給所有的 tasks 都找到了最佳節(jié)點(diǎn)。

// prioritizeNodes selects the highest score node.
// 尋找最優(yōu)節(jié)點(diǎn)
bestNode, highestScore := alloc.prioritizeNodes(ssn, task, predicateNodes)
// Allocate idle resource to the task.
// 將前面找到的最佳節(jié)點(diǎn)相應(yīng)資源分配給當(dāng)前 task
if err := alloc.allocateResourcesForTask(stmt, task, bestNode, job); err == nil {
    jobNewAllocatedHyperNode = getJobNewAllocatedHyperNode(ssn, bestNode.Name, job, jobNewAllocatedHyperNode)
}

分配最佳節(jié)點(diǎn)后,調(diào)用Commit方法真正綁定到 K8s API Server。調(diào)用 K8s Binding API (kubeClient.CoreV1().Pods().Bind()) 設(shè)置 Pod 的 .spec.nodeName。并且更新 PodGroup 狀態(tài)。

stmt = alloc.allocateResourcesForTasks(tasks, job, queue, allNodes, "")
if stmt != nil {
    stmt.Commit()
}

這里Action用Statement模式記錄原子操作,保證調(diào)度決策的一致性。


Statement模式實(shí)現(xiàn)的調(diào)度事務(wù)

6. Gang scheduling

Gang調(diào)度策略是volcano-scheduler的核心調(diào)度算法之一,它滿足了調(diào)度過(guò)程中的“All or nothing”的調(diào)度需求,避免Pod的任意調(diào)度導(dǎo)致集群資源的浪費(fèi)。具體算法是,觀察Job下的Pod已調(diào)度數(shù)量是否滿足了最小運(yùn)行數(shù)量,當(dāng)Job的最小運(yùn)行數(shù)量得到滿足時(shí),為Job下的所有Pod執(zhí)行調(diào)度動(dòng)作,否則,不執(zhí)行。

Gang scheduling flow

Gang調(diào)度策略是通過(guò)Gang plugin集成validation和ordering函數(shù)實(shí)現(xiàn)。session初始化的時(shí)候Gang plugin注冊(cè)了如下幾個(gè)關(guān)鍵函數(shù)。這些函數(shù)在上面Action:allocate中被調(diào)用。

Function Purpose
JobOrderFn Orders jobs for gang scheduling priority
JobValidFn Validates if job meets gang requirements
JobReadyFn Determines if job is ready to be scheduled
PreemptableFn Protects gang members from preemption

validation函數(shù):

pkg/scheduler/plugins/gang/gang.go
validJobFn := func(obj interface{}) *api.ValidateResult {
        job, ok := obj.(*api.JobInfo)
        if !ok {
            return &api.ValidateResult{
                Pass:    false,
                Message: fmt.Sprintf("Failed to convert <%v> to *JobInfo", obj),
            }
        }

        if valid := job.CheckTaskValid(); !valid {
            return &api.ValidateResult{
                Pass:    false,
                Reason:  v1beta1.NotEnoughPodsOfTaskReason,
                Message: "Not enough valid pods of each task for gang-scheduling",
            }
        }

        vtn := job.ValidTaskNum()
        if vtn < job.MinAvailable {
            return &api.ValidateResult{
                Pass:   false,
                Reason: v1beta1.NotEnoughPodsReason,
                Message: fmt.Sprintf("Not enough valid tasks for gang-scheduling, valid: %d, min: %d",
                    vtn, job.MinAvailable),
            }
        }
        return nil
    }

ordering函數(shù):

jobOrderFn := func(l, r interface{}) int {
        lv := l.(*api.JobInfo)
        rv := r.(*api.JobInfo)

        lReady := lv.IsReady()
        rReady := rv.IsReady()

        klog.V(4).Infof("Gang JobOrderFn: <%v/%v> is ready: %t, <%v/%v> is ready: %t",
            lv.Namespace, lv.Name, lReady, rv.Namespace, rv.Name, rReady)

        if lReady && rReady {
            return 0
        }

        if lReady {
            return 1
        }

        if rReady {
            return -1
        }

        return 0
    }
最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容