kube-batch 從代碼中找出gang scheduler這個(gè)過(guò)程

http://www.itdecent.cn/p/7286d895dcc0

從上面這個(gè)過(guò)程中,已經(jīng)知道了kube-batch的啟動(dòng)過(guò)程。kube-batch總共有4個(gè)過(guò)程。這里我們從Allocate開始。

目錄:
一: 流程解釋
二:代碼說(shuō)明

一: 流程解釋

在allocate.go中:找到Execute函數(shù)。首先用文字解釋一下整個(gè)的過(guò)程:
流程:
(1)將kube-batch的job放入對(duì)應(yīng)的隊(duì)列。這是一個(gè)具有優(yōu)先級(jí)的隊(duì)列。
(2)依次遍歷這些隊(duì)列,如果為空就跳過(guò)
(3)如果不為空,依次從隊(duì)列中pop出一個(gè)job.即接下來(lái)要調(diào)度這個(gè)job
(4)取出這個(gè)job對(duì)應(yīng)的所有Tasks(即要綁定的pod),對(duì)每個(gè)task進(jìn)行假綁定,這里的假綁定意思是 只是更新task的狀態(tài),先記錄pod綁定在哪個(gè)節(jié)點(diǎn)上。當(dāng)達(dá)到JobReady時(shí),進(jìn)行真正的綁定。這樣就實(shí)現(xiàn)了一次性綁定了好幾個(gè)Pod.
(5)更新job的信息,將pod重新加入隊(duì)列。跳出循環(huán),再次進(jìn)行調(diào)度。

對(duì)上面流程有兩個(gè)地方需要再解釋一下:
(1) jobReady 作用是什么,gang scheduler和這個(gè)有什么關(guān)系?
allocate每次都是對(duì)task進(jìn)行假綁定。jobReady是一個(gè)信號(hào)。表示現(xiàn)在可以進(jìn)行真正的綁定了。

在gang.go的75行,實(shí)現(xiàn)了這個(gè)接口:

func jobReady(obj interface{}) bool {
    job := obj.(*api.JobInfo)

    occupied := readyTaskNum(job)

    return occupied >= job.MinAvailable
}

可以看出來(lái),gang scheduler中 就是通過(guò)數(shù)量上的判斷來(lái)進(jìn)行限制的。job.MinAvailable這個(gè)是podgroup的minNumber數(shù)量。這樣就使得每次調(diào)度的時(shí)候,只有當(dāng)MinAvailable個(gè)task準(zhǔn)備好了之后。才會(huì)進(jìn)行調(diào)度,從而達(dá)到gang scheduler的效果。

(2)為什么job還要重新加入隊(duì)列。這個(gè)job不是已經(jīng)調(diào)度了嗎?
因?yàn)橛锌赡躩ob的Tasks數(shù)量會(huì)多于 job.MinAvailable。例如,一個(gè)job有8個(gè)task,但是它指定的podgroup的 minNumber=4。這樣調(diào)度時(shí)會(huì)首先調(diào)度4個(gè)task.當(dāng)真正綁定之后。剩余沒綁定的4個(gè)task是一個(gè)新的job.所有需要重新加入隊(duì)列。

二:代碼說(shuō)明

func (alloc *allocateAction) Execute(ssn *framework.Session) {
    glog.V(3).Infof("Enter Allocate ...")
    defer glog.V(3).Infof("Leaving Allocate ...")

    // 這是優(yōu)先級(jí)隊(duì)列,即隊(duì)列里面的內(nèi)容是有優(yōu)先級(jí)的
    queues := util.NewPriorityQueue(ssn.QueueOrderFn)
    jobsMap := map[api.QueueID]*util.PriorityQueue{}

    //首先將所有的kube-batch job放入
    for _, job := range ssn.Jobs {
        if queue, found := ssn.Queues[job.Queue]; found {
            queues.Push(queue)
        } else {
            glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
                job.Namespace, job.Name, job.Queue)
            continue
        }

        if _, found := jobsMap[job.Queue]; !found {
            jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
        }

        glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
        jobsMap[job.Queue].Push(job)
    }

    glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))

    pendingTasks := map[api.JobID]*util.PriorityQueue{}

    for {
        if queues.Empty() {
            break
        }
        // 從第一個(gè)隊(duì)列開始尋找是否有job需要調(diào)度
        queue := queues.Pop().(*api.QueueInfo)
        if ssn.Overused(queue) {
            glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
            continue
        }

        jobs, found := jobsMap[queue.UID]

        glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)

        if !found || jobs.Empty() {
            glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
            continue
        }
        
        // 從隊(duì)列中依次彈出job進(jìn)行調(diào)度
        job := jobs.Pop().(*api.JobInfo)
        if _, found := pendingTasks[job.UID]; !found {
            tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
            for _, task := range job.TaskStatusIndex[api.Pending] {
                // Skip BestEffort task in 'allocate' action.
                if task.Resreq.IsEmpty() {
                    glog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
                        task.Namespace, task.Name)
                    continue
                }

                tasks.Push(task)
            }
            pendingTasks[job.UID] = tasks
        }
        tasks := pendingTasks[job.UID]

        glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
            tasks.Len(), job.Namespace, job.Name)
        
        // 具體調(diào)度Task的循環(huán),每次都假綁定一個(gè)Task,表示這個(gè)task已經(jīng)完成
        for !tasks.Empty() {
            predicateNodes := []*api.NodeInfo{}
            nodeScores := map[int][]*api.NodeInfo{}

            task := tasks.Pop().(*api.TaskInfo)
            assigned := false

            glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
                len(ssn.Nodes), job.Namespace, job.Name)

            //any task that doesn't fit will be the last processed
            //within this loop context so any existing contents of
            //NodesFitDelta are for tasks that eventually did fit on a
            //node
            
            // 后面的很長(zhǎng)一般分,就是為task選擇一個(gè)合適的node。
            //主要內(nèi)容是先過(guò)濾,然后選擇一個(gè)滿足task的最優(yōu)節(jié)點(diǎn),然后更新job中該task的信息
            if len(job.NodesFitDelta) > 0 {
                job.NodesFitDelta = make(api.NodeResourceMap)
            }
            for _, node := range ssn.Nodes {
                glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
                    task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)

                // TODO (k82cn): Enable eCache for performance improvement.
                if err := ssn.PredicateFn(task, node); err != nil {
                    glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
                        task.Namespace, task.Name, node.Name, err)
                    continue
                } else {
                    predicateNodes = append(predicateNodes, node)
                }
            }
            for _, node := range predicateNodes {
                score, err := ssn.NodeOrderFn(task, node)
                if err != nil {
                    glog.V(3).Infof("Error in Calculating Priority for the node:%v", err)
                } else {
                    nodeScores[score] = append(nodeScores[score], node)
                }
            }
            selectedNodes := util.SelectBestNode(nodeScores)
            for _, node := range selectedNodes {
                // Allocate idle resource to the task.
                if task.InitResreq.LessEqual(node.Idle) {
                    glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
                        task.Namespace, task.Name, node.Name)
                    // ?。?!這里需要重點(diǎn)注意,這里調(diào)用了session.go中Allocate函數(shù)。 下面會(huì)將這個(gè)的作用
                    if err := ssn.Allocate(task, node.Name); err != nil {         
                        glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
                            task.UID, node.Name, ssn.UID, err)
                        continue
                    }
                    assigned = true
                    break
                } else {
                    //store information about missing resources
                    job.NodesFitDelta[node.Name] = node.Idle.Clone()
                    job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
                    glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
                        task.Namespace, task.Name, node.Name)
                }

                // Allocate releasing resource to the task if any.
                if task.InitResreq.LessEqual(node.Releasing) {
                    glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
                        task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
                    if err := ssn.Pipeline(task, node.Name); err != nil {
                        glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
                            task.UID, node.Name, ssn.UID)
                        continue
                    }

                    assigned = true
                    break
                }
            }
            
            
            //如果綁定某個(gè)task過(guò)程中失敗,比如資源不足。那么就會(huì)跳出這個(gè)循環(huán)。
            if !assigned {
                break
            }
            // 將job重新加入隊(duì)列,然后進(jìn)行下一個(gè)job的調(diào)度。
            if ssn.JobReady(job) {
                jobs.Push(job)
                break
            }
        }
        // Added Queue back until no job in Queue.  
        queues.Push(queue)
    }

session.go中Allocate函數(shù)

func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
    if err := ssn.cache.AllocateVolumes(task, hostname); err != nil {
        return err
    }

    // 這里這是更新task的狀態(tài)。
    // Only update status in session
    job, found := ssn.Jobs[task.Job]
    if found {
        if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
            glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
                task.Namespace, task.Name, api.Allocated, ssn.UID, err)
            return err
        }
    } else {
        glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
            task.Job, ssn.UID)
        return fmt.Errorf("failed to find job %s", task.Job)
    }

    task.NodeName = hostname

    if node, found := ssn.Nodes[hostname]; found {
        if err := node.AddTask(task); err != nil {
            glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
                task.Namespace, task.Name, hostname, ssn.UID, err)
            return err
        }
        glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
            task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
    } else {
        glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.",
            hostname, ssn.UID)
        return fmt.Errorf("failed to find node %s", hostname)
    }

    //gang.go中有,這里是真正的綁定了,當(dāng)jobReady時(shí),調(diào)用dispatch函數(shù)對(duì)所有的Allocated的task進(jìn)行綁定。
    // dispatch就在該函數(shù)的下面。內(nèi)容也很直觀,就是調(diào)用k8s的接口,真正的綁定pod
    if ssn.JobReady(job) {
        for _, task := range job.TaskStatusIndex[api.Allocated] {
            if err := ssn.dispatch(task); err != nil {                    // 如果job準(zhǔn)備好了,就直接真正綁定所有準(zhǔn)備好的任務(wù)??
                glog.Errorf("Failed to dispatch task <%v/%v>: %v",
                    task.Namespace, task.Name, err)
                return err
            }
        }
    }

    return nil
}

總結(jié):
感覺自己的文字表達(dá)能力還是不行,還需要更多的鍛煉。
結(jié)合代碼的注釋和上面的流程說(shuō)明一起看會(huì)更容易理解。

在session.go中可以看到,每次為task分配資源時(shí),首先都是更新狀態(tài),只有達(dá)到j(luò)obReady時(shí),才真正的綁定到具體的某個(gè)結(jié)點(diǎn)上。

當(dāng)然如果當(dāng)前要調(diào)度的job1,它需要的資源不足,那么當(dāng)前這個(gè)job1就會(huì)跳出循環(huán),找下一個(gè)要進(jìn)行調(diào)度的job。不用擔(dān)心,job1中已經(jīng)綁定的task所占的資源。backfill操作會(huì)將Job1清空。

最后編輯于
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

  • gang scheduler介紹:一個(gè)job,可能有多個(gè)tasks,這些tasks要不全部執(zhí)行,要不一個(gè)都不執(zhí)行。...
    zoux閱讀 3,634評(píng)論 0 2
  • 背景 K8s本身的調(diào)度器具有一些缺陷:(1) 默認(rèn)的調(diào)度器是以 pod 為粒度的,對(duì)機(jī)器學(xué)習(xí)任務(wù)很不利。(2)默認(rèn)...
    zoux閱讀 5,021評(píng)論 3 0
  • 深入分析kube-batch(3)——Plugins 在之前啟動(dòng)過(guò)程一文中分析過(guò),kube-batch會(huì)先執(zhí)行pl...
    陳先生_9e91閱讀 1,840評(píng)論 0 2
  • kubernetes 簡(jiǎn)介 一個(gè)迅速過(guò)一遍kubernetes 非常不錯(cuò)的資源:基于Kubernetes構(gòu)建Doc...
    bradyjoestar閱讀 15,355評(píng)論 2 7
  • 深入分析kube-batch(2)——cache 熟悉K8S的同學(xué),一定對(duì)cache機(jī)制不陌生;在之前啟動(dòng)過(guò)程一文...
    陳先生_9e91閱讀 1,662評(píng)論 0 2

友情鏈接更多精彩內(nèi)容