從源碼看k8s gang scheduling

背景

k8s 最近的版本加入 workload aware scheduling 功能,可以支持gang調度
這里記錄一下gang調度的實現

簡單總結

通過gang plugin中的preenqueue plugin攔截配置了SchedulingGroup且podgroup policy為gang但是mincount不滿足的的pod進入調度隊列
通過gang plugin中的permit plugin攔截配置了SchedulingGroup且podgroup policy為gang但是mincount不滿足的的pod進行bind,執(zhí)行unreserve plugin等后續(xù)操作

源碼

pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go中

const (
    permit超時時間
    ...
    permitTimeoutDuration = 5 * time.Minute
)



入調度隊列前plugin
func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Status {
    沒設置SchedulingGroup則不需要gang調度
    if pod.Spec.SchedulingGroup == nil {
        return nil
    }
    獲取podGroup信息
    namespace := pod.Namespace
    schedulingGroup := pod.Spec.SchedulingGroup

    podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
    ...
    判斷策略是否是gang調度
    policy := podGroup.Spec.SchedulingPolicy
    if policy.Gang == nil {
        return nil
    }
    ...
    獲取podGroup狀態(tài)
    podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName)
    ...
    判斷podGroup狀態(tài)的所有pod數量是否滿足minCount
    allPodsCount := podGroupState.AllPodsCount()
    if allPodsCount < int(policy.Gang.MinCount) {
        return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "waiting for minCount pods from a gang to appear in scheduling queue")
    }

    return nil
}


pod bind前plugin
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
    沒設置SchedulingGroup則不需要gang調度
    if pod.Spec.SchedulingGroup == nil {
        return nil, 0
    }
    ...
    獲取podGroup信息
    namespace := pod.Namespace
    schedulingGroup := pod.Spec.SchedulingGroup

    podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
    ...
    判斷策略是否是gang調度
    policy := podGroup.Spec.SchedulingPolicy
    if policy.Gang == nil {
        return nil, 0
    }
    ...
    獲取podGroup狀態(tài)
    podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
    if err != nil {
        return fwk.AsStatus(err), 0
    }
    ...
    判斷podGroup狀態(tài)的已調度的pod數量是否滿足minCount
    scheduledPodsCount := podGroupState.ScheduledPodsCount()
    if scheduledPodsCount < int(policy.Gang.MinCount) {
        return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration
    }

    允許這個pod并且通知其他同podgroup的pod繼續(xù)處理
    assumedPods := podGroupState.AssumedPods()

    for podUID := range assumedPods {
        waitingPod := pl.handle.GetWaitingPod(podUID)
        if waitingPod != nil {
            waitingPod.Allow(Name)
        }
    }

}

pkg/scheduler/framework/runtime/waiting_pods_map.go中

創(chuàng)建waitingPod對象
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
    wp := &waitingPod{
        pod: pod,
        s: make(chan *fwk.Status, 1),
    }
    ...
    設置定時器,如果plugin超時,則reject
    for k, v := range pluginsMaxWaitTime {
        plugin, waitTime := k, v
        wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
            msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
                waitTime, plugin)
            wp.Reject(plugin, msg)
        })
    }

    return wp
}


permit結果為允許
func (w *waitingPod) Allow(pluginName string) {
    ...
    通知permit結果
    select {
    case w.s <- fwk.NewStatus(fwk.Success, ""):
    default:
    }
    ...
}

結果為拒絕
func (w *waitingPod) Reject(pluginName, msg string) bool {
    return w.stopWithStatus(fwk.Unschedulable, pluginName, msg)
}


攜帶狀態(tài)停止
func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool {
    ...
    select {
    case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName):
    default:
    }
    ...
}

pkg/scheduler/schedule_one.go中

運行bind循環(huán)
func (sched *Scheduler) runBindingCycle(
    ctx context.Context,
    state fwk.CycleState,
    schedFramework framework.Framework,
    scheduleResult ScheduleResult,
    assumedPodInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate) {
    運行bind循環(huán)
    status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate)
    如果未成功,則處理bind循環(huán)錯誤
    if !status.IsSuccess() {
        sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status)
        return
    }
}

bind循環(huán)
func (sched *Scheduler) bindingCycle(
    ctx context.Context,
    state fwk.CycleState,
    schedFramework framework.Framework,
    scheduleResult ScheduleResult,
    assumedPodInfo *framework.QueuedPodInfo,
    start time.Time,
    podsToActivate *framework.PodsToActivate) *fwk.Status {
    ...
    等待permit
    if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
        ...
        return status
    }
    ...
}


func (sched *Scheduler) handleBindingCycleError(
    ctx context.Context,
    state fwk.CycleState,
    fwk framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
    scheduleResult ScheduleResult,
    status *fwk.Status) {
    ...
    執(zhí)行unreserve plugin
    if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
        utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed")
    } else {
        如果是rejected
        if status.IsRejected() {
            當成pod delete事件,將除assume pod以外的pod移到active queue或者backoff queue
            defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
                return assumedPod.UID != pod.UID
            })
        } else {
            當成pod delete事件,將除所有移到active queue或者backoff queue
            sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, nil)
        }
    }
    ...
}
等待permit
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
    ...
    獲取等待的pod
    waitingPod := f.waitingPods.get(pod.UID)
    ...
    獲取permit狀態(tài)
    s := <-waitingPod.s
    ...
}
最后編輯于
?著作權歸作者所有,轉載或內容合作請聯系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容