從源碼理解k8s搶占調(diào)度

背景

簡(jiǎn)單記錄搶占調(diào)度的一些代碼,方便后續(xù)理解

簡(jiǎn)單總結(jié)

調(diào)度失敗,執(zhí)行post filter plugin
檢查是否滿(mǎn)足搶占條件

pod的PreemptionPolicy設(shè)置為Never則不可搶占,其他為可以
pod的NominatedNodeName的節(jié)點(diǎn)狀態(tài)不為UnschedulableAndUnresolvable或目標(biāo)節(jié)點(diǎn)上比當(dāng)前pod優(yōu)先級(jí)低的pod正由于搶占在刪除中

獲取所有節(jié)點(diǎn)
過(guò)濾候選節(jié)點(diǎn)(如節(jié)點(diǎn)上有低于當(dāng)前pod優(yōu)先級(jí)的pod,如目標(biāo)節(jié)點(diǎn)上執(zhí)行Filter plugin成功)
調(diào)用擴(kuò)展過(guò)濾候選節(jié)點(diǎn)(內(nèi)置的為http extender)

根據(jù)如下規(guī)則進(jìn)行打分

最小pdb沖突節(jié)點(diǎn)
最小最高pod優(yōu)先級(jí)節(jié)點(diǎn)
最小總優(yōu)先級(jí)節(jié)點(diǎn)
最小pod數(shù)量節(jié)點(diǎn)
目標(biāo)節(jié)點(diǎn)pod的最早啟動(dòng)時(shí)間(如果pod優(yōu)先級(jí)更高則使用它的啟動(dòng)時(shí)間,一樣則使用更早的pod啟動(dòng)時(shí)間),越大分?jǐn)?shù)越高

執(zhí)行搶占

刪除被搶占的pod
更新pod的NominatedNodeName為當(dāng)前節(jié)點(diǎn)

源碼

pkg/scheduler/schedule_one.go中

調(diào)度算法
func (sched *Scheduler) schedulingAlgorithm(
    ctx context.Context,
    state fwk.CycleState,
    schedFramework framework.Framework,
    podInfo *framework.QueuedPodInfo,
    start time.Time,
) (ScheduleResult, *fwk.Status) {
    ...
    調(diào)度pod
    scheduleResult, err := sched.SchedulePod(ctx, schedFramework, state, podInfo)
    ...
    if err != nil {
        如果調(diào)度失敗
        ...

        result, status := schedFramework.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatus)
        ...
    }
}

pkg/scheduler/framework/runtime/framework.go中

運(yùn)行post filter plugin
func (f *frameworkImpl) RunPostFilterPlugins(ctx context.Context, state fwk.CycleState, pod *v1.Pod, filteredNodeStatusMap fwk.NodeToStatusReader) (_ *fwk.PostFilterResult, status *fwk.Status) {
    ...
    遍歷所有post filter plugin
    for _, pl := range f.postFilterPlugins {
        ...
        執(zhí)行post filter plugin
        r, s := f.runPostFilterPlugin(ctx, pl, state, pod, filteredNodeStatusMap)
        ...
    }
    ...
}


運(yùn)行post filter plugin
func (f *frameworkImpl) runPostFilterPlugin(ctx context.Context, pl fwk.PostFilterPlugin, state fwk.CycleState, pod *v1.Pod, filteredNodeStatusMap fwk.NodeToStatusReader) (*fwk.PostFilterResult, *fwk.Status) {
    ...
    r, s := pl.PostFilter(ctx, state, pod, filteredNodeStatusMap)
    ...
    return r, s
}

pkg/scheduler/framework/plugins/defaultpreemption/default_preemption.go中

運(yùn)行搶占的post filter plugin
func (pl *DefaultPreemption) PostFilter(ctx context.Context, state fwk.CycleState, pod *v1.Pod, m fwk.NodeToStatusReader) (*fwk.PostFilterResult, *fwk.Status) {
    ...
    執(zhí)行搶占
    result, status := pl.Evaluator.Preempt(ctx, state, pod, m)
    ...

}


判斷pod是否具備搶占條件
func (pl *DefaultPreemption) PodEligibleToPreemptOthers(_ context.Context, pod *v1.Pod, nominatedNodeStatus *fwk.Status) (bool, string) {
    如果設(shè)置了不可搶占
    if pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {
        return false, "not eligible due to preemptionPolicy=Never."
    }
    ...
    nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()
    nomNodeName := pod.Status.NominatedNodeName
    判斷pod狀態(tài)中是否設(shè)置了NominatedNodeName
    if len(nomNodeName) > 0 {
        判斷目標(biāo)節(jié)點(diǎn)狀態(tài)是否是UnschedulableAndUnresolvable
        if nominatedNodeStatus.Code() == fwk.UnschedulableAndUnresolvable {
            return true, ""
        }

        if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {
            for _, p := range nodeInfo.GetPods() {
                if pl.isPreemptionAllowed(nodeInfo, p, pod) && podTerminatingByPreemption(p.GetPod()) {
                    return false, "not eligible due to a terminating pod on the nominated node."
                }
            }
        }
    }
    return true, ""
    ...
}

判斷搶占是否允許
func (pl *DefaultPreemption) isPreemptionAllowed(nodeInfo fwk.NodeInfo, victim fwk.PodInfo, preemptor *v1.Pod) bool {
    也就是目標(biāo)pod的Priority是否小于當(dāng)前pod
    return corev1helpers.PodPriority(victim.GetPod()) < corev1helpers.PodPriority(preemptor) && pl.IsEligiblePod(nodeInfo, victim, preemptor)
}

pod是否已被正在搶占
func podTerminatingByPreemption(p *v1.Pod) bool {
    if p.DeletionTimestamp == nil {
        return false
    }

    for _, condition := range p.Status.Conditions {
        if condition.Type == v1.DisruptionTarget {
            return condition.Status == v1.ConditionTrue && condition.Reason == v1.PodReasonPreemptionByScheduler
        }
    }
    return false
}


從目標(biāo)節(jié)點(diǎn)上選擇可搶占的pod
func (pl *DefaultPreemption) SelectVictimsOnNode(
    ctx context.Context,
    state fwk.CycleState,
    pod *v1.Pod,
    nodeInfo fwk.NodeInfo,
    pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *fwk.Status) {
    ...
    獲取節(jié)點(diǎn)上所有pod
    for _, pi := range nodeInfo.GetPods() {
        如果pod可搶占(也就是優(yōu)先級(jí)更低)
        if pl.isPreemptionAllowed(nodeInfo, pi, pod) {
            potentialVictims = append(potentialVictims, pi)
        }
    }
    ...
    按優(yōu)先級(jí)排序,如果相等則StartTime排序,大的在前
    sort.Slice(potentialVictims, func(i, j int) bool {
        return pl.MoreImportantPod(potentialVictims[i].GetPod(), potentialVictims[j].GetPod())
    })
    ...
    pdb過(guò)濾
    violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)

    reprievePod := func(pi fwk.PodInfo) (bool, error) {
        ...
        victims = append(victims, pi)
        ...
        return fits, nil
    }
    for _, p := range violatingVictims {
        if fits, err := reprievePod(p); err != nil {
            return nil, 0, fwk.AsStatus(err)
        } else if !fits {
            numViolatingVictim++
        }
    }
    for _, p := range nonViolatingVictims {
        if _, err := reprievePod(p); err != nil {
            return nil, 0, fwk.AsStatus(err)
        }
    }

    返回可搶占的pod
    var victimPods []*v1.Pod
    for _, pi := range victims {
        victimPods = append(victimPods, pi.GetPod())
    }
    return victimPods, numViolatingVictim, fwk.NewStatus(fwk.Success)
}


打分函數(shù)
func (pl *DefaultPreemption) OrderedScoreFuncs(ctx context.Context, nodesToVictims map[string]*extenderv1.Victims) []func(node string) int64 {
    return nil
}

pkg/scheduler/framework/preemption/preemption.go中

搶占
func (ev *Evaluator) Preempt(ctx context.Context, state fwk.CycleState, pod *v1.Pod, m fwk.NodeToStatusReader) (*fwk.PostFilterResult, *fwk.Status) {
    ...
    獲取pod信息
    podNamespace, podName := pod.Namespace, pod.Name
    pod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)
    ...
    確保滿(mǎn)足搶占條件
    if ok, msg := ev.PodEligibleToPreemptOthers(ctx, pod, nominatedNodeStatus); !ok {
        logger.V(5).Info("Pod is not eligible for preemption", "pod", klog.KObj(pod), "reason", msg)
        return nil, fwk.NewStatus(fwk.Unschedulable, msg)
    }
    ...
    獲取所有節(jié)點(diǎn)
    allNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()
    if err != nil {
        return nil, fwk.AsStatus(err)
    }
    查找搶占目標(biāo)候選
    candidates, nodeToStatusMap, err := ev.findCandidates(ctx, state, allNodes, pod, m)
    if err != nil && len(candidates) == 0 {
        return nil, fwk.AsStatus(err)
    }
    ...
    通過(guò)擴(kuò)展過(guò)濾目標(biāo)候選,由于內(nèi)置只有http擴(kuò)展且比較少用此處忽略
    candidates, status := ev.callExtenders(logger, pod, candidates)
    if !status.IsSuccess() {
        return nil, status
    }

    查找最優(yōu)搶占目標(biāo)
    bestCandidate := ev.SelectCandidate(ctx, candidates)
    if bestCandidate == nil || len(bestCandidate.Name()) == 0 {
        return nil, fwk.NewStatus(fwk.Unschedulable, "no candidate node for preemption")
    }

    實(shí)際搶占
    if status := ev.executor.actuatePodPreemption(ctx, bestCandidate.Name(), bestCandidate.Victims(), pod, ev.PluginName); !status.IsSuccess() {
        return nil, status
    }
}

查找搶占目標(biāo)候選
func (ev *Evaluator) findCandidates(ctx context.Context, state fwk.CycleState, allNodes []fwk.NodeInfo, pod *v1.Pod, m fwk.NodeToStatusReader) ([]Candidate, *framework.NodeToStatus, error) {
    ...
    過(guò)濾Unschedulable節(jié)點(diǎn)
    potentialNodes, err := m.NodesForStatusCode(ev.Handler.SnapshotSharedLister().NodeInfos(), fwk.Unschedulable)
    ...
    獲取pdb列表
    pdbs, err := getPodDisruptionBudgets(ev.PdbLister)
    if err != nil {
        return nil, nil, err
    }

    隨機(jī)挑選部分節(jié)點(diǎn)dryrun 搶占
    offset, candidatesNum := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))
    return ev.DryRunPreemption(ctx, state, pod, potentialNodes, pdbs, offset, candidatesNum)

}

dryrun 搶占
func (ev *Evaluator) DryRunPreemption(ctx context.Context, state fwk.CycleState, pod *v1.Pod, potentialNodes []fwk.NodeInfo,
    檢查節(jié)點(diǎn)
    checkNode := func(i int) {
        nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Snapshot()
        logger.V(5).Info("Check the potential node for preemption", "node", nodeInfoCopy.Node().Name)

        stateCopy := state.Clone()
        pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)
        if status.IsSuccess() && len(pods) != 0 {
            victims := extenderv1.Victims{
                Pods:             pods,
                NumPDBViolations: int64(numPDBViolations),
            }
            c := &candidate{
                victims: &victims,
                name:    nodeInfoCopy.Node().Name,
            }
            if numPDBViolations == 0 {
                nonViolatingCandidates.add(c)
            } else {
                violatingCandidates.add(c)
            }
            nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()
            if nvcSize > 0 && nvcSize+vcSize >= candidatesNum {
                cancel()
            }
            return
        }
        if status.IsSuccess() && len(pods) == 0 {
            status = fwk.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))
        }
        statusesLock.Lock()
        if status.Code() == fwk.Error {
            errs = append(errs, status.AsError())
        }
        nodeStatuses.Set(nodeInfoCopy.Node().Name, status)
        statusesLock.Unlock()
    }
    并行檢查節(jié)點(diǎn)
    fh.Parallelizer().Until(ctx, len(potentialNodes), checkNode, ev.PluginName)
    return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses, utilerrors.NewAggregate(errs)
}


查找最優(yōu)搶占目標(biāo)
func (ev *Evaluator) SelectCandidate(ctx context.Context, candidates []Candidate) Candidate {
    ...
    victimsMap := ev.CandidatesToVictimsMap(candidates)
    scoreFuncs := ev.OrderedScoreFuncs(ctx, victimsMap)
    candidateNode := pickOneNodeForPreemption(logger, victimsMap, scoreFuncs)

    if victims := victimsMap[candidateNode]; victims != nil {
        return &candidate{
            victims: victims,
            name:    candidateNode,
        }
    }

    return candidates[0]
    ...
}


挑選一個(gè)節(jié)點(diǎn)用于搶占
func pickOneNodeForPreemption(logger klog.Logger, nodesToVictims map[string]*extenderv1.Victims, scoreFuncs []func(node string) int64) string {
    ...
    由于默認(rèn)搶占plugin返回空打分函數(shù)所以此處會(huì)使用如下的打分函數(shù)
    if len(scoreFuncs) == 0 {
        minNumPDBViolatingScoreFunc := func(node string) int64 {
            return -nodesToVictims[node].NumPDBViolations
        }
        minHighestPriorityScoreFunc := func(node string) int64 {
            return -int64(highestPodPriority)
        }
        minSumPrioritiesScoreFunc := func(node string) int64 {
            var sumPriorities int64
            for _, pod := range nodesToVictims[node].Pods {
                sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)
            }
            return -sumPriorities
        }
        minNumPodsScoreFunc := func(node string) int64 {
            return -int64(len(nodesToVictims[node].Pods))
        }
        latestStartTimeScoreFunc := func(node string) int64 {
            earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])
            if earliestStartTimeOnNode == nil {
                utilruntime.HandleErrorWithLogger(logger, nil, "Unexpected nil earliestStartTime for node", "node", node)
                return int64(math.MinInt64)
            }
            return earliestStartTimeOnNode.UnixNano()
        }

        scoreFuncs = []func(string) int64{
            最小pdb沖突節(jié)點(diǎn),
            minNumPDBViolatingScoreFunc,
            最小最高pod優(yōu)先級(jí)節(jié)點(diǎn)
            minHighestPriorityScoreFunc,
            最小總優(yōu)先級(jí)節(jié)點(diǎn)
            minSumPrioritiesScoreFunc,
            最小pod數(shù)量節(jié)點(diǎn)
            minNumPodsScoreFunc,
            目標(biāo)節(jié)點(diǎn)pod的最早啟動(dòng)時(shí)間(如果pod優(yōu)先級(jí)更高則使用它的啟動(dòng)時(shí)間,一樣則使用更早的pod啟動(dòng)時(shí)間),越大分?jǐn)?shù)越高
            latestStartTimeScoreFunc,
        }
    }

    for _, f := range scoreFuncs {
        selectedNodes := []string{}
        maxScore := int64(math.MinInt64)
        for _, node := range allCandidates {
            score := f(node)
            if score > maxScore {
                maxScore = score
                selectedNodes = []string{}
            }
            if score == maxScore {
                selectedNodes = append(selectedNodes, node)
            }
        }
        if len(selectedNodes) == 1 {
            return selectedNodes[0]
        }
        allCandidates = selectedNodes
    }

    return allCandidates[0]
    ...
}

pkg/scheduler/framework/preemption/executor.go中

實(shí)際搶占
func (e *Executor) actuatePodPreemption(ctx context.Context, targetNode string, victims *extenderv1.Victims, preemptorPod *v1.Pod, pluginName string) *fwk.Status {
    candidate := &candidate{
        victims: victims,
        name:    targetNode,
    }


    podPreemptor := &podExecutorPreemptor{Pod: preemptorPod}
    異步搶占
    if e.fts.EnableAsyncPreemption {
        e.prepareCandidateAsync(candidate, podPreemptor, pluginName)
        return nil
    }
    同步搶占
    return e.prepareCandidate(ctx, candidate, podPreemptor, pluginName)
}


同步搶占
func (e *Executor) prepareCandidate(ctx context.Context, c Candidate, preemptor ExecutorPreemptor, pluginName string) *fwk.Status {
    ...
    并行搶占
    fh.Parallelizer().Until(ctx, len(c.Victims().Pods), func(index int) {
        victim := c.Victims().Pods[index]
        if victim.DeletionTimestamp != nil {
            // Graceful pod deletion has already started. Sending another API call is unnecessary.
            logger.V(2).Info("Victim Pod is already being deleted, skipping the API call for it", "preemptor", klog.KObj(preemptor), "node", c.Name(), "victim", klog.KObj(victim))
            return
        }
        if err := e.PreemptPod(ctx, c, preemptor, victim, pluginName); err != nil {
            errCh.SendWithCancel(err, cancel)
        }
    }, pluginName)
    if err := errCh.Receive(); err != nil {
        return fwk.AsStatus(err)
    }
    ...
}
?著作權(quán)歸作者所有,轉(zhuǎn)載或內(nèi)容合作請(qǐng)聯(lián)系作者
【社區(qū)內(nèi)容提示】社區(qū)部分內(nèi)容疑似由AI輔助生成,瀏覽時(shí)請(qǐng)結(jié)合常識(shí)與多方信息審慎甄別。
平臺(tái)聲明:文章內(nèi)容(如有圖片或視頻亦包括在內(nèi))由作者上傳并發(fā)布,文章內(nèi)容僅代表作者本人觀點(diǎn),簡(jiǎn)書(shū)系信息發(fā)布平臺(tái),僅提供信息存儲(chǔ)服務(wù)。

相關(guān)閱讀更多精彩內(nèi)容

友情鏈接更多精彩內(nèi)容