背景
k8s 最近的版本加入 workload aware scheduling 功能,可以支持gang調度
這里記錄一下gang調度的實現
簡單總結
通過gang plugin中的preenqueue plugin攔截配置了SchedulingGroup且podgroup policy為gang但是mincount不滿足的的pod進入調度隊列
通過gang plugin中的permit plugin攔截配置了SchedulingGroup且podgroup policy為gang但是mincount不滿足的的pod進行bind,執(zhí)行unreserve plugin等后續(xù)操作
源碼
pkg/scheduler/framework/plugins/gangscheduling/gangscheduling.go中
const (
permit超時時間
...
permitTimeoutDuration = 5 * time.Minute
)
入調度隊列前plugin
func (pl *GangScheduling) PreEnqueue(ctx context.Context, pod *v1.Pod) *fwk.Status {
沒設置SchedulingGroup則不需要gang調度
if pod.Spec.SchedulingGroup == nil {
return nil
}
獲取podGroup信息
namespace := pod.Namespace
schedulingGroup := pod.Spec.SchedulingGroup
podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
...
判斷策略是否是gang調度
policy := podGroup.Spec.SchedulingPolicy
if policy.Gang == nil {
return nil
}
...
獲取podGroup狀態(tài)
podGroupState, err := pl.podGroupManager.PodGroupStates().Get(namespace, *schedulingGroup.PodGroupName)
...
判斷podGroup狀態(tài)的所有pod數量是否滿足minCount
allPodsCount := podGroupState.AllPodsCount()
if allPodsCount < int(policy.Gang.MinCount) {
return fwk.NewStatus(fwk.UnschedulableAndUnresolvable, "waiting for minCount pods from a gang to appear in scheduling queue")
}
return nil
}
pod bind前plugin
func (pl *GangScheduling) Permit(ctx context.Context, state fwk.CycleState, pod *v1.Pod, nodeName string) (*fwk.Status, time.Duration) {
沒設置SchedulingGroup則不需要gang調度
if pod.Spec.SchedulingGroup == nil {
return nil, 0
}
...
獲取podGroup信息
namespace := pod.Namespace
schedulingGroup := pod.Spec.SchedulingGroup
podGroup, err := pl.podGroupLister.PodGroups(namespace).Get(*schedulingGroup.PodGroupName)
...
判斷策略是否是gang調度
policy := podGroup.Spec.SchedulingPolicy
if policy.Gang == nil {
return nil, 0
}
...
獲取podGroup狀態(tài)
podGroupState, err := podGroupStateLister.Get(namespace, *schedulingGroup.PodGroupName)
if err != nil {
return fwk.AsStatus(err), 0
}
...
判斷podGroup狀態(tài)的已調度的pod數量是否滿足minCount
scheduledPodsCount := podGroupState.ScheduledPodsCount()
if scheduledPodsCount < int(policy.Gang.MinCount) {
return fwk.NewStatus(fwk.Wait, "waiting for minCount pods from a gang to be scheduled"), permitTimeoutDuration
}
允許這個pod并且通知其他同podgroup的pod繼續(xù)處理
assumedPods := podGroupState.AssumedPods()
for podUID := range assumedPods {
waitingPod := pl.handle.GetWaitingPod(podUID)
if waitingPod != nil {
waitingPod.Allow(Name)
}
}
}
pkg/scheduler/framework/runtime/waiting_pods_map.go中
創(chuàng)建waitingPod對象
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod {
wp := &waitingPod{
pod: pod,
s: make(chan *fwk.Status, 1),
}
...
設置定時器,如果plugin超時,則reject
for k, v := range pluginsMaxWaitTime {
plugin, waitTime := k, v
wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() {
msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v",
waitTime, plugin)
wp.Reject(plugin, msg)
})
}
return wp
}
permit結果為允許
func (w *waitingPod) Allow(pluginName string) {
...
通知permit結果
select {
case w.s <- fwk.NewStatus(fwk.Success, ""):
default:
}
...
}
結果為拒絕
func (w *waitingPod) Reject(pluginName, msg string) bool {
return w.stopWithStatus(fwk.Unschedulable, pluginName, msg)
}
攜帶狀態(tài)停止
func (w *waitingPod) stopWithStatus(status fwk.Code, pluginName, msg string) bool {
...
select {
case w.s <- fwk.NewStatus(status, msg).WithPlugin(pluginName):
default:
}
...
}
pkg/scheduler/schedule_one.go中
運行bind循環(huán)
func (sched *Scheduler) runBindingCycle(
ctx context.Context,
state fwk.CycleState,
schedFramework framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) {
運行bind循環(huán)
status := sched.bindingCycle(bindingCycleCtx, state, schedFramework, scheduleResult, assumedPodInfo, start, podsToActivate)
如果未成功,則處理bind循環(huán)錯誤
if !status.IsSuccess() {
sched.handleBindingCycleError(bindingCycleCtx, state, schedFramework, assumedPodInfo, start, scheduleResult, status)
return
}
}
bind循環(huán)
func (sched *Scheduler) bindingCycle(
ctx context.Context,
state fwk.CycleState,
schedFramework framework.Framework,
scheduleResult ScheduleResult,
assumedPodInfo *framework.QueuedPodInfo,
start time.Time,
podsToActivate *framework.PodsToActivate) *fwk.Status {
...
等待permit
if status := schedFramework.WaitOnPermit(ctx, assumedPod); !status.IsSuccess() {
...
return status
}
...
}
func (sched *Scheduler) handleBindingCycleError(
ctx context.Context,
state fwk.CycleState,
fwk framework.Framework,
podInfo *framework.QueuedPodInfo,
start time.Time,
scheduleResult ScheduleResult,
status *fwk.Status) {
...
執(zhí)行unreserve plugin
if forgetErr := sched.unreserveAndForget(ctx, state, fwk, podInfo, scheduleResult.SuggestedHost); forgetErr != nil {
utilruntime.HandleErrorWithContext(ctx, forgetErr, "ForgetPod failed")
} else {
如果是rejected
if status.IsRejected() {
當成pod delete事件,將除assume pod以外的pod移到active queue或者backoff queue
defer sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, func(pod *v1.Pod) bool {
return assumedPod.UID != pod.UID
})
} else {
當成pod delete事件,將除所有移到active queue或者backoff queue
sched.SchedulingQueue.MoveAllToActiveOrBackoffQueue(logger, framework.EventAssignedPodDelete, assumedPod, nil, nil)
}
}
...
}
等待permit
func (f *frameworkImpl) WaitOnPermit(ctx context.Context, pod *v1.Pod) *fwk.Status {
...
獲取等待的pod
waitingPod := f.waitingPods.get(pod.UID)
...
獲取permit狀態(tài)
s := <-waitingPod.s
...
}