http://www.itdecent.cn/p/7286d895dcc0
從上面這個(gè)過(guò)程中,已經(jīng)知道了kube-batch的啟動(dòng)過(guò)程。kube-batch總共有4個(gè)過(guò)程。這里我們從Allocate開始。
目錄:
一: 流程解釋
二:代碼說(shuō)明
一: 流程解釋
在allocate.go中:找到Execute函數(shù)。首先用文字解釋一下整個(gè)的過(guò)程:
流程:
(1)將kube-batch的job放入對(duì)應(yīng)的隊(duì)列。這是一個(gè)具有優(yōu)先級(jí)的隊(duì)列。
(2)依次遍歷這些隊(duì)列,如果為空就跳過(guò)
(3)如果不為空,依次從隊(duì)列中pop出一個(gè)job.即接下來(lái)要調(diào)度這個(gè)job
(4)取出這個(gè)job對(duì)應(yīng)的所有Tasks(即要綁定的pod),對(duì)每個(gè)task進(jìn)行假綁定,這里的假綁定意思是 只是更新task的狀態(tài),先記錄pod綁定在哪個(gè)節(jié)點(diǎn)上。當(dāng)達(dá)到JobReady時(shí),進(jìn)行真正的綁定。這樣就實(shí)現(xiàn)了一次性綁定了好幾個(gè)Pod.
(5)更新job的信息,將pod重新加入隊(duì)列。跳出循環(huán),再次進(jìn)行調(diào)度。
對(duì)上面流程有兩個(gè)地方需要再解釋一下:
(1) jobReady 作用是什么,gang scheduler和這個(gè)有什么關(guān)系?
allocate每次都是對(duì)task進(jìn)行假綁定。jobReady是一個(gè)信號(hào)。表示現(xiàn)在可以進(jìn)行真正的綁定了。
在gang.go的75行,實(shí)現(xiàn)了這個(gè)接口:
func jobReady(obj interface{}) bool {
job := obj.(*api.JobInfo)
occupied := readyTaskNum(job)
return occupied >= job.MinAvailable
}
可以看出來(lái),gang scheduler中 就是通過(guò)數(shù)量上的判斷來(lái)進(jìn)行限制的。job.MinAvailable這個(gè)是podgroup的minNumber數(shù)量。這樣就使得每次調(diào)度的時(shí)候,只有當(dāng)MinAvailable個(gè)task準(zhǔn)備好了之后。才會(huì)進(jìn)行調(diào)度,從而達(dá)到gang scheduler的效果。
(2)為什么job還要重新加入隊(duì)列。這個(gè)job不是已經(jīng)調(diào)度了嗎?
因?yàn)橛锌赡躩ob的Tasks數(shù)量會(huì)多于 job.MinAvailable。例如,一個(gè)job有8個(gè)task,但是它指定的podgroup的 minNumber=4。這樣調(diào)度時(shí)會(huì)首先調(diào)度4個(gè)task.當(dāng)真正綁定之后。剩余沒綁定的4個(gè)task是一個(gè)新的job.所有需要重新加入隊(duì)列。
二:代碼說(shuō)明
func (alloc *allocateAction) Execute(ssn *framework.Session) {
glog.V(3).Infof("Enter Allocate ...")
defer glog.V(3).Infof("Leaving Allocate ...")
// 這是優(yōu)先級(jí)隊(duì)列,即隊(duì)列里面的內(nèi)容是有優(yōu)先級(jí)的
queues := util.NewPriorityQueue(ssn.QueueOrderFn)
jobsMap := map[api.QueueID]*util.PriorityQueue{}
//首先將所有的kube-batch job放入
for _, job := range ssn.Jobs {
if queue, found := ssn.Queues[job.Queue]; found {
queues.Push(queue)
} else {
glog.Warningf("Skip adding Job <%s/%s> because its queue %s is not found",
job.Namespace, job.Name, job.Queue)
continue
}
if _, found := jobsMap[job.Queue]; !found {
jobsMap[job.Queue] = util.NewPriorityQueue(ssn.JobOrderFn)
}
glog.V(4).Infof("Added Job <%s/%s> into Queue <%s>", job.Namespace, job.Name, job.Queue)
jobsMap[job.Queue].Push(job)
}
glog.V(3).Infof("Try to allocate resource to %d Queues", len(jobsMap))
pendingTasks := map[api.JobID]*util.PriorityQueue{}
for {
if queues.Empty() {
break
}
// 從第一個(gè)隊(duì)列開始尋找是否有job需要調(diào)度
queue := queues.Pop().(*api.QueueInfo)
if ssn.Overused(queue) {
glog.V(3).Infof("Queue <%s> is overused, ignore it.", queue.Name)
continue
}
jobs, found := jobsMap[queue.UID]
glog.V(3).Infof("Try to allocate resource to Jobs in Queue <%v>", queue.Name)
if !found || jobs.Empty() {
glog.V(4).Infof("Can not find jobs for queue %s.", queue.Name)
continue
}
// 從隊(duì)列中依次彈出job進(jìn)行調(diào)度
job := jobs.Pop().(*api.JobInfo)
if _, found := pendingTasks[job.UID]; !found {
tasks := util.NewPriorityQueue(ssn.TaskOrderFn)
for _, task := range job.TaskStatusIndex[api.Pending] {
// Skip BestEffort task in 'allocate' action.
if task.Resreq.IsEmpty() {
glog.V(4).Infof("Task <%v/%v> is BestEffort task, skip it.",
task.Namespace, task.Name)
continue
}
tasks.Push(task)
}
pendingTasks[job.UID] = tasks
}
tasks := pendingTasks[job.UID]
glog.V(3).Infof("Try to allocate resource to %d tasks of Job <%v/%v>",
tasks.Len(), job.Namespace, job.Name)
// 具體調(diào)度Task的循環(huán),每次都假綁定一個(gè)Task,表示這個(gè)task已經(jīng)完成
for !tasks.Empty() {
predicateNodes := []*api.NodeInfo{}
nodeScores := map[int][]*api.NodeInfo{}
task := tasks.Pop().(*api.TaskInfo)
assigned := false
glog.V(3).Infof("There are <%d> nodes for Job <%v/%v>",
len(ssn.Nodes), job.Namespace, job.Name)
//any task that doesn't fit will be the last processed
//within this loop context so any existing contents of
//NodesFitDelta are for tasks that eventually did fit on a
//node
// 后面的很長(zhǎng)一般分,就是為task選擇一個(gè)合適的node。
//主要內(nèi)容是先過(guò)濾,然后選擇一個(gè)滿足task的最優(yōu)節(jié)點(diǎn),然后更新job中該task的信息
if len(job.NodesFitDelta) > 0 {
job.NodesFitDelta = make(api.NodeResourceMap)
}
for _, node := range ssn.Nodes {
glog.V(3).Infof("Considering Task <%v/%v> on node <%v>: <%v> vs. <%v>",
task.Namespace, task.Name, node.Name, task.Resreq, node.Idle)
// TODO (k82cn): Enable eCache for performance improvement.
if err := ssn.PredicateFn(task, node); err != nil {
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s>: %v",
task.Namespace, task.Name, node.Name, err)
continue
} else {
predicateNodes = append(predicateNodes, node)
}
}
for _, node := range predicateNodes {
score, err := ssn.NodeOrderFn(task, node)
if err != nil {
glog.V(3).Infof("Error in Calculating Priority for the node:%v", err)
} else {
nodeScores[score] = append(nodeScores[score], node)
}
}
selectedNodes := util.SelectBestNode(nodeScores)
for _, node := range selectedNodes {
// Allocate idle resource to the task.
if task.InitResreq.LessEqual(node.Idle) {
glog.V(3).Infof("Binding Task <%v/%v> to node <%v>",
task.Namespace, task.Name, node.Name)
// ?。?!這里需要重點(diǎn)注意,這里調(diào)用了session.go中Allocate函數(shù)。 下面會(huì)將這個(gè)的作用
if err := ssn.Allocate(task, node.Name); err != nil {
glog.Errorf("Failed to bind Task %v on %v in Session %v, err: %v",
task.UID, node.Name, ssn.UID, err)
continue
}
assigned = true
break
} else {
//store information about missing resources
job.NodesFitDelta[node.Name] = node.Idle.Clone()
job.NodesFitDelta[node.Name].FitDelta(task.Resreq)
glog.V(3).Infof("Predicates failed for task <%s/%s> on node <%s> with limited resources",
task.Namespace, task.Name, node.Name)
}
// Allocate releasing resource to the task if any.
if task.InitResreq.LessEqual(node.Releasing) {
glog.V(3).Infof("Pipelining Task <%v/%v> to node <%v> for <%v> on <%v>",
task.Namespace, task.Name, node.Name, task.InitResreq, node.Releasing)
if err := ssn.Pipeline(task, node.Name); err != nil {
glog.Errorf("Failed to pipeline Task %v on %v in Session %v",
task.UID, node.Name, ssn.UID)
continue
}
assigned = true
break
}
}
//如果綁定某個(gè)task過(guò)程中失敗,比如資源不足。那么就會(huì)跳出這個(gè)循環(huán)。
if !assigned {
break
}
// 將job重新加入隊(duì)列,然后進(jìn)行下一個(gè)job的調(diào)度。
if ssn.JobReady(job) {
jobs.Push(job)
break
}
}
// Added Queue back until no job in Queue.
queues.Push(queue)
}
session.go中Allocate函數(shù)
func (ssn *Session) Allocate(task *api.TaskInfo, hostname string) error {
if err := ssn.cache.AllocateVolumes(task, hostname); err != nil {
return err
}
// 這里這是更新task的狀態(tài)。
// Only update status in session
job, found := ssn.Jobs[task.Job]
if found {
if err := job.UpdateTaskStatus(task, api.Allocated); err != nil {
glog.Errorf("Failed to update task <%v/%v> status to %v in Session <%v>: %v",
task.Namespace, task.Name, api.Allocated, ssn.UID, err)
return err
}
} else {
glog.Errorf("Failed to found Job <%s> in Session <%s> index when binding.",
task.Job, ssn.UID)
return fmt.Errorf("failed to find job %s", task.Job)
}
task.NodeName = hostname
if node, found := ssn.Nodes[hostname]; found {
if err := node.AddTask(task); err != nil {
glog.Errorf("Failed to add task <%v/%v> to node <%v> in Session <%v>: %v",
task.Namespace, task.Name, hostname, ssn.UID, err)
return err
}
glog.V(3).Infof("After allocated Task <%v/%v> to Node <%v>: idle <%v>, used <%v>, releasing <%v>",
task.Namespace, task.Name, node.Name, node.Idle, node.Used, node.Releasing)
} else {
glog.Errorf("Failed to found Node <%s> in Session <%s> index when binding.",
hostname, ssn.UID)
return fmt.Errorf("failed to find node %s", hostname)
}
//gang.go中有,這里是真正的綁定了,當(dāng)jobReady時(shí),調(diào)用dispatch函數(shù)對(duì)所有的Allocated的task進(jìn)行綁定。
// dispatch就在該函數(shù)的下面。內(nèi)容也很直觀,就是調(diào)用k8s的接口,真正的綁定pod
if ssn.JobReady(job) {
for _, task := range job.TaskStatusIndex[api.Allocated] {
if err := ssn.dispatch(task); err != nil { // 如果job準(zhǔn)備好了,就直接真正綁定所有準(zhǔn)備好的任務(wù)??
glog.Errorf("Failed to dispatch task <%v/%v>: %v",
task.Namespace, task.Name, err)
return err
}
}
}
return nil
}
總結(jié):
感覺自己的文字表達(dá)能力還是不行,還需要更多的鍛煉。
結(jié)合代碼的注釋和上面的流程說(shuō)明一起看會(huì)更容易理解。
在session.go中可以看到,每次為task分配資源時(shí),首先都是更新狀態(tài),只有達(dá)到j(luò)obReady時(shí),才真正的綁定到具體的某個(gè)結(jié)點(diǎn)上。
當(dāng)然如果當(dāng)前要調(diào)度的job1,它需要的資源不足,那么當(dāng)前這個(gè)job1就會(huì)跳出循環(huán),找下一個(gè)要進(jìn)行調(diào)度的job。不用擔(dān)心,job1中已經(jīng)綁定的task所占的資源。backfill操作會(huì)將Job1清空。