kube-scheduler 抢占机制
admin
2024-01-18 11:28:39
0

当pod调度失败后,会在PostFilter扩展点执行抢占流程,下面分析相关的代码实现

抢占接口

// PodNominator abstracts operations to maintain nominated Pods.
type PodNominator interface {// 将pod加入抢占成功的node中AddNominatedPod(pod *PodInfo, nodeName string)//将pod从node中删除DeleteNominatedPodIfExists(pod *v1.Pod)//更新podUpdateNominatedPod(oldPod *v1.Pod, newPodInfo *PodInfo)//返回指定node上的所有提名podNominatedPodsForNode(nodeName string) []*PodInfo
}

nominator实现了PodNominator接口

type nominator struct {//用来获取指定的pod是否还存在podLister listersv1.PodLister//保存每个node上的提名pod列表nominatedPods map[string][]*framework.PodInfo//保存提名pod的UID到node的映射nominatedPodToNode map[types.UID]stringsync.RWMutex
}

AddNominatedPod将pod加入抢占成功的node中

func (npm *nominator) AddNominatedPod(pi *framework.PodInfo, nodeName string) {npm.Lock()npm.add(pi, nodeName)npm.Unlock()
}func (npm *nominator) add(pi *framework.PodInfo, nodeName string) {// always delete the pod if it already exist, to ensure we never store more than// one instance of the pod.npm.delete(pi.Pod)//如果参数nodeName为空,并且pod.Status.NominatedNodeName也为空,说明没有抢占node,则返回nnn := nodeNameif len(nnn) == 0 {//pod.Status.NominatedNodeNamennn = NominatedNodeName(pi.Pod)if len(nnn) == 0 {return}}//从apiserver获取pod是否还存在,如果已经被删除了也就没必要执行后续流程if npm.podLister != nil {// If the pod is not alive, don't contain it.if _, err := npm.podLister.Pods(pi.Pod.Namespace).Get(pi.Pod.Name); err != nil {klog.V(4).InfoS("Pod doesn't exist in podLister, aborting adding it to the nominator", "pod", klog.KObj(pi.Pod))return}}//保存UID到node的映射npm.nominatedPodToNode[pi.Pod.UID] = nnn//查看node上是否已经有pod,有的话直接返回for _, npi := range npm.nominatedPods[nnn] {if npi.Pod.UID == pi.Pod.UID {klog.V(4).InfoS("Pod already exists in the nominator", "pod", klog.KObj(npi.Pod))return}}//将pod保存到node上npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn], pi)
}

DeleteNominatedPodIfExists将pod从node中删除

// DeleteNominatedPodIfExists deletes  from nominatedPods.
func (npm *nominator) DeleteNominatedPodIfExists(pod *v1.Pod) {npm.Lock()npm.delete(pod)npm.Unlock()
}func (npm *nominator) delete(p *v1.Pod) {nnn, ok := npm.nominatedPodToNode[p.UID]if !ok {return}for i, np := range npm.nominatedPods[nnn] {if np.Pod.UID == p.UID {npm.nominatedPods[nnn] = append(npm.nominatedPods[nnn][:i], npm.nominatedPods[nnn][i+1:]...)if len(npm.nominatedPods[nnn]) == 0 {delete(npm.nominatedPods, nnn)}break}}delete(npm.nominatedPodToNode, p.UID)
}

NominatedPodsForNode返回指定node上的所有提名pod

// NominatedPodsForNode returns a copy of pods that are nominated to run on the given node,
// but they are waiting for other pods to be removed from the node.
func (npm *nominator) NominatedPodsForNode(nodeName string) []*framework.PodInfo {npm.RLock()defer npm.RUnlock()// Make a copy of the nominated Pods so the caller can mutate safely.pods := make([]*framework.PodInfo, len(npm.nominatedPods[nodeName]))for i := 0; i < len(pods); i++ {pods[i] = npm.nominatedPods[nodeName][i].DeepCopy()}return pods
}

抢占流程
这里看一下pod调度失败后抢占的流程,抢占成功后的处理,再次被调度时的处理。抢占的具体实现后文再分析

func (sched *Scheduler) scheduleOne(ctx context.Context) {...scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, sched.Extenders, fwk, state, pod)if err != nil {nominatedNode := ""//如果错误为fitError类型,则执行PostFilter扩展点上的插件if fitError, ok := err.(*framework.FitError); ok {//PostFilter扩展点上的没配置插件,所以也就没抢占可执行了if !fwk.HasPostFilterPlugins() {klog.V(3).InfoS("No PostFilter plugins are registered, so no preemption will be performed")} else {//执行PostFilter扩展点上的插件,目前只有抢占插件DefaultPreemption://pkg/scheduler/framework/plugins/defaultpreemption/defaultpreemption.go// Run PostFilter plugins to try to make the pod schedulable in a future scheduling cycle.result, status := fwk.RunPostFilterPlugins(ctx, state, pod, fitError.Diagnosis.NodeToStatusMap)if status.Code() == framework.Error {klog.ErrorS(nil, "Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)} else {klog.V(5).InfoS("Status after running PostFilter plugins for pod", "pod", klog.KObj(pod), "status", status)}if status.IsSuccess() && result != nil {nominatedNode = result.NominatedNodeName}}}sched.recordSchedulingFailure(fwk, podInfo, err, v1.PodReasonUnschedulable, nominatedNode)return}...
}

recordSchedulingFailure用来将调度失败的pod放入不可调度队列等待下次重新调度,如果抢占成功的pod还会加入nominatedNode的pod列表,最后将pod.Status.NominatedNodeName设置为nominatedNode,向apiserver发起请求更新此字段

func (sched *Scheduler) recordSchedulingFailure(fwk framework.Framework, podInfo *framework.QueuedPodInfo, err error, reason string, nominatedNode string) {//Error为MakeDefaultErrorFunc返回的函数,将调度失败的pod放入不可调度队列,等待下次重新调度//不管抢占成功与否,都会将调度失败的pod加入不可调度队列sched.Error(podInfo, err)// Update the scheduling queue with the nominated pod information. Without// this, there would be a race condition between the next scheduling cycle// and the time the scheduler receives a Pod Update for the nominated pod.// Here we check for nil only for tests.//将此pod添加到提名node nominatedNode的nominatedpod中if sched.SchedulingQueue != nil {sched.SchedulingQueue.AddNominatedPod(podInfo.PodInfo, nominatedNode)}pod := podInfo.Podmsg := truncateMessage(err.Error())fwk.EventRecorder().Eventf(pod, nil, v1.EventTypeWarning, "FailedScheduling", "Scheduling", msg)//如果nominatedNode不为空,即抢占成功,则设置pod.Status.NominatedNodeName为nominatedNode,向apiserver发起请求更新此字段if err := updatePod(sched.client, pod, &v1.PodCondition{Type:    v1.PodScheduled,Status:  v1.ConditionFalse,Reason:  reason,Message: err.Error(),}, nominatedNode); err != nil {klog.ErrorS(err, "Error updating pod", "pod", klog.KObj(pod))}
}

调度失败的pod抢占成功并加入不可调度队列后,当再次获取调度时,在Schedule->findNodesThatFitPod中直接获取NominatedNodeName进行评估,如果评估通过则说明此pod被真正调度到此node上,可执行后面的assume和bind操作,并在assume中调用DeleteNominatedPodIfExists将pod从nominator中删除

func (g *genericScheduler) Schedule(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) (result ScheduleResult, err error) {...feasibleNodes, diagnosis, err := g.findNodesThatFitPod(ctx, extenders, fwk, state, pod)...// When only one node after predicate, just use it.if len(feasibleNodes) == 1 {return ScheduleResult{SuggestedHost:  feasibleNodes[0].Name,EvaluatedNodes: 1 + len(diagnosis.NodeToStatusMap),FeasibleNodes:  1,}, nil}
}func (g *genericScheduler) findNodesThatFitPod(ctx context.Context, extenders []framework.Extender, fwk framework.Framework, state *framework.CycleState, pod *v1.Pod) ([]*v1.Node, framework.Diagnosis, error) {...// "NominatedNodeName" can potentially be set in a previous scheduling cycle as a result of preemption.// This node is likely the only candidate that will fit the pod, and hence we try it first before iterating over all nodes.//pod.Status.NominatedNodeName为抢占成功的node,并且使能了抢占功能if len(pod.Status.NominatedNodeName) > 0 && feature.DefaultFeatureGate.Enabled(features.PreferNominatedNode) {//需再次评估pod.Status.NominatedNodeName是否能通过Filter,如果能通过说明pod可运行在此node上,否则还需从全局node重新执行feasibleNodes, err := g.evaluateNominatedNode(ctx, extenders, pod, fwk, state, diagnosis)if err != nil {klog.ErrorS(err, "Evaluation failed on nominated node", "pod", klog.KObj(pod), "node", pod.Status.NominatedNodeName)}// Nominated node passes all the filters, scheduler is good to assign this node to the pod.if len(feasibleNodes) != 0 {return feasibleNodes, diagnosis, nil}}...
}//evaluateNominatedNode获取pod.Status.NominatedNodeName,对其执行findNodesThatPassFilters进行Filter过滤
func (g *genericScheduler) evaluateNominatedNode(ctx context.Context, extenders []framework.Extender, pod *v1.Pod, fwk framework.Framework, state *framework.CycleState, diagnosis framework.Diagnosis) ([]*v1.Node, error) {nnn := pod.Status.NominatedNodeNamenodeInfo, err := g.nodeInfoSnapshot.Get(nnn)if err != nil {return nil, err}node := []*framework.NodeInfo{nodeInfo}feasibleNodes, err := g.findNodesThatPassFilters(ctx, fwk, state, pod, diagnosis, node)if err != nil {return nil, err}...return feasibleNodes, nil
}

抢占实现
这里看一下抢占的具体实现

//pkg/scheduler/framework/preemption/preemption.go
func (ev *Evaluator) Preempt(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) (*framework.PostFilterResult, *framework.Status) {// 0) Fetch the latest version of .// It's safe to directly fetch pod here. Because the informer cache has already been// initialized when creating the Scheduler obj, i.e., factory.go#MakeDefaultErrorFunc().// However, tests may need to manually initialize the shared pod informer.podNamespace, podName := pod.Namespace, pod.Namepod, err := ev.PodLister.Pods(pod.Namespace).Get(pod.Name)if err != nil {klog.ErrorS(err, "getting the updated preemptor pod object", "pod", klog.KRef(podNamespace, podName))return nil, framework.AsStatus(err)}// 1) Ensure the preemptor is eligible to preempt other pods.//查看此pod是否有资格进行抢占if !ev.PodEligibleToPreemptOthers(pod, m[pod.Status.NominatedNodeName]) {klog.V(5).InfoS("Pod is not eligible for more preemption", "pod", klog.KObj(pod))return nil, framework.NewStatus(framework.Unschedulable)}// 2) Find all preemption candidates.//找到所有可抢占的候选者candidates, nodeToStatusMap, status := ev.findCandidates(ctx, pod, m)if !status.IsSuccess() {return nil, status}//如果一个候选者都没有,则返回错误// Return a FitError only when there are no candidates that fit the pod.if len(candidates) == 0 {fitError := &framework.FitError{Pod:         pod,NumAllNodes: len(nodeToStatusMap),Diagnosis: framework.Diagnosis{NodeToStatusMap: nodeToStatusMap,// Leave FailedPlugins as nil as it won't be used on moving Pods.},}return nil, framework.NewStatus(framework.Unschedulable, fitError.Error())}// 3) Interact with registered Extenders to filter out some candidates if needed.//暂且互相extener相关的,只需要知道它也是一种过滤候选者的方法candidates, status = ev.callExtenders(pod, candidates)if !status.IsSuccess() {return nil, status}// 4) Find the best candidate.//找到最合适的候选者bestCandidate := ev.SelectCandidate(candidates)if bestCandidate == nil || len(bestCandidate.Name()) == 0 {return nil, framework.NewStatus(framework.Unschedulable)}// 5) Perform preparation work before nominating the selected candidate.//将候选者,即被抢占的pod删除if status := ev.prepareCandidate(bestCandidate, pod, ev.PluginName); !status.IsSuccess() {return nil, status}//最后将最合适的node名字返回return &framework.PostFilterResult{NominatedNodeName: bestCandidate.Name()}, framework.NewStatus(framework.Success)

1.PodEligibleToPreemptOthers
查看此pod是否有资格进行抢占,具体条件看代码注释

// PodEligibleToPreemptOthers determines whether this pod should be considered
// for preempting other pods or not. If this pod has already preempted other
// pods and those are in their graceful termination period, it shouldn't be
// considered for preemption.
// We look at the node that is nominated for this pod and as long as there are
// terminating pods on the node, we don't consider this for preempting more pods.
func (pl *DefaultPreemption) PodEligibleToPreemptOthers(pod *v1.Pod, nominatedNodeStatus *framework.Status) bool {//如果pod指定的抢占策略为never,则返回falseif pod.Spec.PreemptionPolicy != nil && *pod.Spec.PreemptionPolicy == v1.PreemptNever {klog.V(5).InfoS("Pod is not eligible for preemption because it has a preemptionPolicy of Never", "pod", klog.KObj(pod))return false}nodeInfos := pl.fh.SnapshotSharedLister().NodeInfos()//如果NominatedNodeName不为空,说明此pod已经抢占成功一次,此处在filter处又失败了,如果此次失败code是//UnschedulableAndUnresolvable,说明此node即使被抢占也没用,需要再次执行抢占流程,如果因为其他原因失败,则//查看此node上是否有优先级低的pod正在被删除,如果有可不必重新抢占,只需要等待下次重新调度,那些正在被删除的pod真正被删除后,//说不定就能通过filter过滤nomNodeName := pod.Status.NominatedNodeNameif len(nomNodeName) > 0 {// If the pod's nominated node is considered as UnschedulableAndUnresolvable by the filters,// then the pod should be considered for preempting again.if nominatedNodeStatus.Code() == framework.UnschedulableAndUnresolvable {return true}if nodeInfo, _ := nodeInfos.Get(nomNodeName); nodeInfo != nil {podPriority := corev1helpers.PodPriority(pod)for _, p := range nodeInfo.Pods {//如果pod p正在被删除,并且优先级低,说明此node上有pod已经被删除或者此pod是上一轮被抢占的pod,//因为pod删除有优雅退出时间(大概30s),if p.Pod.DeletionTimestamp != nil && corev1helpers.PodPriority(p.Pod) < podPriority {// There is a terminating pod on the nominated node.return false}}}}return true
}

2. findCandidates
找出所有的可被抢占的候选者

// FindCandidates calculates a slice of preemption candidates.
// Each candidate is executable to make the given  schedulable.
func (ev *Evaluator) findCandidates(ctx context.Context, pod *v1.Pod, m framework.NodeToStatusMap) ([]Candidate, framework.NodeToStatusMap, *framework.Status)//获取所有nodeallNodes, err := ev.Handler.SnapshotSharedLister().NodeInfos().List()//2.1 返回潜在node列表,包含预选阶段失败的node(排除UnschedulableAndUnresolvable)potentialNodes, unschedulableNodeStatus := nodesWherePreemptionMightHelp(allNodes, m)//2.2 返回的node并不会全部尝试,通过此函数获取需要尝试的node个数和偏移量offset, numCandidates := ev.GetOffsetAndNumCandidates(int32(len(potentialNodes)))//2.3 从潜在node中找出每个可被抢占的pod集合,candidates包括了node名字和其上面可被抢占的pod集合candidates, nodeStatuses := ev.DryRunPreemption(ctx, pod, potentialNodes, pdbs, offset, numCandidates)for node, status := range unschedulableNodeStatus {nodeStatuses[node] = status}return candidates, nodeStatuses, nil

2.1 nodesWherePreemptionMightHelp
返回node列表,包含预选阶段失败的node(排除UnschedulableAndUnresolvable)

// nodesWherePreemptionMightHelp returns a list of nodes with failed predicates
// that may be satisfied by removing pods from the node.
func nodesWherePreemptionMightHelp(nodes []*framework.NodeInfo, m framework.NodeToStatusMap) ([]*framework.NodeInfo, framework.NodeToStatusMap) {var potentialNodes []*framework.NodeInfonodeStatuses := make(framework.NodeToStatusMap)for _, node := range nodes {name := node.Node().Name// We rely on the status by each plugin - 'Unschedulable' or 'UnschedulableAndUnresolvable'// to determine whether preemption may help or not on the node.//如果node在之前的预选阶段失败过,并且最终结果为UnschedulableAndUnresolvable,表示此node即使抢占也没用,则跳过if m[name].Code() == framework.UnschedulableAndUnresolvable {nodeStatuses[node.Node().Name] = framework.NewStatus(framework.UnschedulableAndUnresolvable, "Preemption is not helpful for scheduling")continue}potentialNodes = append(potentialNodes, node)}return potentialNodes, nodeStatuses
}

2.2 GetOffsetAndNumCandidates
计算出一个随机偏移量和参与抢占的node个数

// GetOffsetAndNumCandidates chooses a random offset and calculates the number
// of candidates that should be shortlisted for dry running preemption.
func (pl *DefaultPreemption) GetOffsetAndNumCandidates(numNodes int32) (int32, int32) {return rand.Int31n(numNodes), pl.calculateNumCandidates(numNodes)
}// calculateNumCandidates returns the number of candidates the FindCandidates
// method must produce from dry running based on the constraints given by
//  and . The number of
// candidates returned will never be greater than .
func (pl *DefaultPreemption) calculateNumCandidates(numNodes int32) int32 {//MinCandidateNodesPercentage默认值为100n := (numNodes * pl.args.MinCandidateNodesPercentage) / 100//MinCandidateNodesAbsolute默认值也为100if n < pl.args.MinCandidateNodesAbsolute {n = pl.args.MinCandidateNodesAbsolute}if n > numNodes {n = numNodes}return n
}

2.3 DryRunPreemption
并行执行checkNode,模拟filter过程,找到符合条件的node

// DryRunPreemption simulates Preemption logic on  in parallel,
// returns preemption candidates and a map indicating filtered nodes statuses.
// The number of candidates depends on the constraints defined in the plugin's args. In the returned list of
// candidates, ones that do not violate PDB are preferred over ones that do.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) DryRunPreemption(ctx context.Context, pod *v1.Pod, potentialNodes []*framework.NodeInfo,pdbs []*policy.PodDisruptionBudget, offset int32, numCandidates int32) ([]Candidate, framework.NodeToStatusMap) {fh := ev.HandlernonViolatingCandidates := newCandidateList(numCandidates)violatingCandidates := newCandidateList(numCandidates)parallelCtx, cancel := context.WithCancel(ctx)nodeStatuses := make(framework.NodeToStatusMap)var statusesLock sync.MutexcheckNode := func(i int) {//从offset索引处开始nodeInfoCopy := potentialNodes[(int(offset)+i)%len(potentialNodes)].Clone()stateCopy := ev.State.Clone()//找出此node上被抢占pod集合pods, numPDBViolations, status := ev.SelectVictimsOnNode(ctx, stateCopy, pod, nodeInfoCopy, pdbs)if status.IsSuccess() && len(pods) != 0 {victims := extenderv1.Victims{Pods:             pods,NumPDBViolations: int64(numPDBViolations),}c := &candidate{victims: &victims,name:    nodeInfoCopy.Node().Name,}if numPDBViolations == 0 {nonViolatingCandidates.add(c)} else {violatingCandidates.add(c)}nvcSize, vcSize := nonViolatingCandidates.size(), violatingCandidates.size()//只要达到numCandidates个即可退出协程if nvcSize > 0 && nvcSize+vcSize >= numCandidates {cancel()}return}if status.IsSuccess() && len(pods) == 0 {status = framework.AsStatus(fmt.Errorf("expected at least one victim pod on node %q", nodeInfoCopy.Node().Name))}statusesLock.Lock()nodeStatuses[nodeInfoCopy.Node().Name] = statusstatusesLock.Unlock()}//启动16个协程执行checkNode//len(potentialNodes)为潜在node个数,但是不会全部尝试,从offset索引处开始,只要达到numCandidates个即可退出协程fh.Parallelizer().Until(parallelCtx, len(potentialNodes), checkNode)return append(nonViolatingCandidates.get(), violatingCandidates.get()...), nodeStatuses
}选出此node上需要被抢占的pod集合
// SelectVictimsOnNode finds minimum set of pods on the given node that should be preempted in order to make enough room
// for "pod" to be scheduled.
func (pl *DefaultPreemption) SelectVictimsOnNode(ctx context.Context,state *framework.CycleState,pod *v1.Pod,nodeInfo *framework.NodeInfo,pdbs []*policy.PodDisruptionBudget) ([]*v1.Pod, int, *framework.Status) {var potentialVictims []*framework.PodInforemovePod := func(rpi *framework.PodInfo) error {if err := nodeInfo.RemovePod(rpi.Pod); err != nil {return err}status := pl.fh.RunPreFilterExtensionRemovePod(ctx, state, pod, rpi, nodeInfo)if !status.IsSuccess() {return status.AsError()}return nil}addPod := func(api *framework.PodInfo) error {nodeInfo.AddPodInfo(api)status := pl.fh.RunPreFilterExtensionAddPod(ctx, state, pod, api, nodeInfo)if !status.IsSuccess() {return status.AsError()}return nil}//找到优先级比抢占pod低的pod,保存到potentialVictims,称为潜在牺牲者,//并且调用removePod将被抢占的pod所占用资源从nodeInfo中删除,此时nodeinfo资源包括:已经bind到此node上的pod所占资源,//假定调度到此node上但正在bind过程中的pod所占资源,已经抢占成功到此node上的pod所占资源(排除优先级低的pod)// As the first step, remove all the lower priority pods from the node and// check if the given pod can be scheduled.podPriority := corev1helpers.PodPriority(pod)for _, pi := range nodeInfo.Pods {//这里的pod不包含其他抢占成功的pod//抢占成功的pod如果优先级也比抢占pod低,也不会被考虑进去,可参考函数addNominatedPodsif corev1helpers.PodPriority(pi.Pod) < podPriority {potentialVictims = append(potentialVictims, pi)if err := removePod(pi); err != nil {return nil, 0, framework.AsStatus(err)}}}// No potential victims are found, and so we don't need to evaluate the node again since its state didn't change.if len(potentialVictims) == 0 {message := fmt.Sprintf("No victims found on node %v for preemptor pod %v", nodeInfo.Node().Name, pod.Name)return nil, 0, framework.NewStatus(framework.UnschedulableAndUnresolvable, message)}//调用RunFilterPluginsWithNominatedPods执行filter插件看一下优先级低的pod被删除后,抢占pod是否//能被调度到此node上,如果仍然返回失败,说明此node即使被抢占也没用// If the new pod does not fit after removing all the lower priority pods,// we are almost done and this node is not suitable for preemption. The only// condition that we could check is if the "pod" is failing to schedule due to// inter-pod affinity to one or more victims, but we have decided not to// support this case for performance reasons. Having affinity to lower// priority pods is not a recommended configuration anyway.if status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo); !status.IsSuccess() {return nil, 0, status}//如果上面的filter成功了,说明此node可以接受抢占pod,但是那些优先级低的pod需要全都被删除才可以吗,//比如其实只要抢占一个低优先级pod就能抢占成功,那就没必要抢占多个,但是一个一个的尝试效率未免太低了。//这里的代码是想尝试挽救几个潜在牺牲者,调用filterPodsWithPDBViolation从潜在牺牲者中找到violatingVictims和nonViolatingVictims,//这是和pdb功能相关的,暂且不看其实现。会遍历这两个数组,执行reprievePod,看一下加上这些pod是否能通过RunFilterPluginsWithNominatedPods,//如果能通过说明即使不抢占这个pod也可以,这样就相当于挽救它了var victims []*v1.PodnumViolatingVictim := 0sort.Slice(potentialVictims, func(i, j int) bool { return util.MoreImportantPod(potentialVictims[i].Pod, potentialVictims[j].Pod) })// Try to reprieve as many pods as possible. We first try to reprieve the PDB// violating victims and then other non-violating ones. In both cases, we start// from the highest priority victims.violatingVictims, nonViolatingVictims := filterPodsWithPDBViolation(potentialVictims, pdbs)reprievePod := func(pi *framework.PodInfo) (bool, error) {if err := addPod(pi); err != nil {return false, err}status := pl.fh.RunFilterPluginsWithNominatedPods(ctx, state, pod, nodeInfo)fits := status.IsSuccess()if !fits {if err := removePod(pi); err != nil {return false, err}rpi := pi.Podvictims = append(victims, rpi)klog.V(5).InfoS("Pod is a potential preemption victim on node", "pod", klog.KObj(rpi), "node", klog.KObj(nodeInfo.Node()))}return fits, nil}for _, p := range violatingVictims {if fits, err := reprievePod(p); err != nil {return nil, 0, framework.AsStatus(err)} else if !fits {numViolatingVictim++}}// Now we try to reprieve non-violating victims.for _, p := range nonViolatingVictims {if _, err := reprievePod(p); err != nil {return nil, 0, framework.AsStatus(err)}}return victims, numViolatingVictim, framework.NewStatus(framework.Success)
}

3. extender
4. SelectCandidate
选择最合适的候选者

// SelectCandidate chooses the best-fit candidate from given  and return it.
// NOTE: This method is exported for easier testing in default preemption.
func (ev *Evaluator) SelectCandidate(candidates []Candidate) Candidate {if len(candidates) == 0 {return nil}if len(candidates) == 1 {return candidates[0]}//获取node上可被抢占的pod集合victimsMap := ev.CandidatesToVictimsMap(candidates)candidateNode := pickOneNodeForPreemption(victimsMap)// Same as candidatesToVictimsMap, this logic is not applicable for out-of-tree// preemption plugins that exercise different candidates on the same nominated node.if victims := victimsMap[candidateNode]; victims != nil {return &candidate{victims: victims,name:    candidateNode,}}// We shouldn't reach here.klog.ErrorS(errors.New("no candidate selected"), "should not reach here", "candidates", candidates)// To not break the whole flow, return the first candidate.return candidates[0]
}

4.1 CandidatesToVictimsMap
将候选者放在map中,key为node名字,value为其上面可被抢占的pod集合

// This function is not applicable for out-of-tree preemption plugins that exercise
// different preemption candidates on the same nominated node.
func (pl *DefaultPreemption) CandidatesToVictimsMap(candidates []preemption.Candidate) map[string]*extenderv1.Victims {m := make(map[string]*extenderv1.Victims)for _, c := range candidates {m[c.Name()] = c.Victims()}return m
}

4.2 pickOneNodeForPreemption
此函数代码比较长,但逻辑还是比较简单的,按照如下步骤进行过滤即可

// pickOneNodeForPreemption chooses one node among the given nodes. It assumes
// pods in each map entry are ordered by decreasing priority.
// It picks a node based on the following criteria:
// 1. A node with minimum number of PDB violations.
// 2. A node with minimum highest priority victim is picked.
// 3. Ties are broken by sum of priorities of all victims.
// 4. If there are still ties, node with the minimum number of victims is picked.
// 5. If there are still ties, node with the latest start time of all highest priority victims is picked.
// 6. If there are still ties, the first such node is picked (sort of randomly).
// The 'minNodes1' and 'minNodes2' are being reused here to save the memory
// allocation and garbage collection time.
func pickOneNodeForPreemption(nodesToVictims map[string]*extenderv1.Victims) string {if len(nodesToVictims) == 0 {return ""}minNumPDBViolatingPods := int64(math.MaxInt32)var minNodes1 []stringlenNodes1 := 0for node, victims := range nodesToVictims {numPDBViolatingPods := victims.NumPDBViolationsif numPDBViolatingPods < minNumPDBViolatingPods {minNumPDBViolatingPods = numPDBViolatingPodsminNodes1 = nillenNodes1 = 0}if numPDBViolatingPods == minNumPDBViolatingPods {minNodes1 = append(minNodes1, node)lenNodes1++}}if lenNodes1 == 1 {return minNodes1[0]}// There are more than one node with minimum number PDB violating pods. Find// the one with minimum highest priority victim.minHighestPriority := int32(math.MaxInt32)var minNodes2 = make([]string, lenNodes1)lenNodes2 := 0for i := 0; i < lenNodes1; i++ {node := minNodes1[i]victims := nodesToVictims[node]// highestPodPriority is the highest priority among the victims on this node.highestPodPriority := corev1helpers.PodPriority(victims.Pods[0])if highestPodPriority < minHighestPriority {minHighestPriority = highestPodPrioritylenNodes2 = 0}if highestPodPriority == minHighestPriority {minNodes2[lenNodes2] = nodelenNodes2++}}if lenNodes2 == 1 {return minNodes2[0]}// There are a few nodes with minimum highest priority victim. Find the// smallest sum of priorities.minSumPriorities := int64(math.MaxInt64)lenNodes1 = 0for i := 0; i < lenNodes2; i++ {var sumPriorities int64node := minNodes2[i]for _, pod := range nodesToVictims[node].Pods {// We add MaxInt32+1 to all priorities to make all of them >= 0. This is// needed so that a node with a few pods with negative priority is not// picked over a node with a smaller number of pods with the same negative// priority (and similar scenarios).sumPriorities += int64(corev1helpers.PodPriority(pod)) + int64(math.MaxInt32+1)}if sumPriorities < minSumPriorities {minSumPriorities = sumPrioritieslenNodes1 = 0}if sumPriorities == minSumPriorities {minNodes1[lenNodes1] = nodelenNodes1++}}if lenNodes1 == 1 {return minNodes1[0]}// There are a few nodes with minimum highest priority victim and sum of priorities.// Find one with the minimum number of pods.minNumPods := math.MaxInt32lenNodes2 = 0for i := 0; i < lenNodes1; i++ {node := minNodes1[i]numPods := len(nodesToVictims[node].Pods)if numPods < minNumPods {minNumPods = numPodslenNodes2 = 0}if numPods == minNumPods {minNodes2[lenNodes2] = nodelenNodes2++}}if lenNodes2 == 1 {return minNodes2[0]}// There are a few nodes with same number of pods.// Find the node that satisfies latest(earliestStartTime(all highest-priority pods on node))latestStartTime := util.GetEarliestPodStartTime(nodesToVictims[minNodes2[0]])if latestStartTime == nil {// If the earliest start time of all pods on the 1st node is nil, just return it,// which is not expected to happen.klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", minNodes2[0])return minNodes2[0]}nodeToReturn := minNodes2[0]for i := 1; i < lenNodes2; i++ {node := minNodes2[i]// Get earliest start time of all pods on the current node.earliestStartTimeOnNode := util.GetEarliestPodStartTime(nodesToVictims[node])if earliestStartTimeOnNode == nil {klog.ErrorS(errors.New("earliestStartTime is nil for node"), "should not reach here", "node", node)continue}if earliestStartTimeOnNode.After(latestStartTime.Time) {latestStartTime = earliestStartTimeOnNodenodeToReturn = node}}return nodeToReturn
}

5. prepareCandidate

// prepareCandidate does some preparation work before nominating the selected candidate:
// - Evict the victim pods
// - Reject the victim pods if they are in waitingPod map
// - Clear the low-priority pods' nominatedNodeName status if needed
func (ev *Evaluator) prepareCandidate(c Candidate, pod *v1.Pod, pluginName string) *framework.Status {fh := ev.Handlercs := ev.Handler.ClientSet()//遍历被抢占的podfor _, victim := range c.Victims().Pods {//如果此pod是WaitingPod,则发送reject消息给PermitPlugin,因为目前PermitPlugin为空,所以不会出现WaitingPod的情况,// If the victim is a WaitingPod, send a reject message to the PermitPlugin.// Otherwise we should delete the victim.if waitingPod := fh.GetWaitingPod(victim.UID); waitingPod != nil {waitingPod.Reject(pluginName, "preempted")//给apiserver发请求,删除pod,见5.1} else if err := util.DeletePod(cs, victim); err != nil {klog.ErrorS(err, "Preempting pod", "pod", klog.KObj(victim), "preemptor", klog.KObj(pod))return framework.AsStatus(err)}fh.EventRecorder().Eventf(victim, pod, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v",pod.Namespace, pod.Name, c.Name())}metrics.PreemptionVictims.Observe(float64(len(c.Victims().Pods)))// Lower priority pods nominated to run on this node, may no longer fit on// this node. So, we should remove their nomination. Removing their// nomination updates these pods and moves them to the active queue. It// lets scheduler find another place for them.//获取比抢占pod优先级低的其他的抢占pod,见5.2nominatedPods := getLowerPriorityNominatedPods(fh, pod, c.Name())//清除这些nominatedPods的p.Status.NominatedNodeName字段,见5.3if err := util.ClearNominatedNodeName(cs, nominatedPods...); err != nil {klog.ErrorS(err, "cannot clear 'NominatedNodeName' field")// We do not return as this error is not critical.}return nil
}

5.1 DeletePod
给apiserver发请求,删除pod

// DeletePod deletes the given  from API server
func DeletePod(cs kubernetes.Interface, pod *v1.Pod) error {return cs.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{})
}

5.2 getLowerPriorityNominatedPods
获取比抢占pod优先级低的其他的抢占pod

// getLowerPriorityNominatedPods returns pods whose priority is smaller than the
// priority of the given "pod" and are nominated to run on the given node.
// Note: We could possibly check if the nominated lower priority pods still fit
// and return those that no longer fit, but that would require lots of
// manipulation of NodeInfo and PreFilter state per nominated pod. It may not be
// worth the complexity, especially because we generally expect to have a very
// small number of nominated pods per node.
func getLowerPriorityNominatedPods(pn framework.PodNominator, pod *v1.Pod, nodeName string) []*v1.Pod {//获取此node上的所有提名pod(经过抢占流程最终选择此node的pod)podInfos := pn.NominatedPodsForNode(nodeName)if len(podInfos) == 0 {return nil}var lowerPriorityPods []*v1.PodpodPriority := corev1helpers.PodPriority(pod)for _, pi := range podInfos {//比抢占pod优先级低的pod保存到lowerPriorityPodsif corev1helpers.PodPriority(pi.Pod) < podPriority {lowerPriorityPods = append(lowerPriorityPods, pi.Pod)}}return lowerPriorityPods
}

5.3 ClearNominatedNodeName
清除这些nominatedPods的p.Status.NominatedNodeName字段

// ClearNominatedNodeName internally submit a patch request to API server
// to set each pods[*].Status.NominatedNodeName> to "".
func ClearNominatedNodeName(cs kubernetes.Interface, pods ...*v1.Pod) utilerrors.Aggregate {var errs []errorfor _, p := range pods {if len(p.Status.NominatedNodeName) == 0 {continue}podStatusCopy := p.Status.DeepCopy()podStatusCopy.NominatedNodeName = ""if err := PatchPodStatus(cs, p, podStatusCopy); err != nil {errs = append(errs, err)}}return utilerrors.NewAggregate(errs)
}

相关内容

热门资讯

文旅融合新名片!贵旅集团推动多... 本文转自:人民网-贵州频道7月26日,暮色下的多彩贵州城流光溢彩、歌舞飞扬。数支专业乐队以《痴心绝对...
西苑医院脾胃病科举办“胃爱守护... 近日,中国中医科学院西苑医院脾胃病科在门诊楼一层大厅举办 “胃爱守护・食刻舒心” 胃食管反流病专病义...
原创 “... “三伏不补,一年受苦”!三伏天是一年中最热、最潮湿的日子,人就像在 “桑拿房” 里待着,一动就出汗,...
贵州威宁举办避暑旅游季活动:“... 7月28日,2025年雪山灼甫“村歌”示范展示暨“我们的中国梦·文化进万家”贵州省威宁自治县避暑旅游...
水韵江苏 风雅德比|盐城VS常... 当盐渎新城的呦呦鹤鸣,应和着滩涂的潮汐,激荡起明代杨瑞云笔下“苍茫一气接乾坤,巨浪长风日夜喧”的壮阔...
带孩子去新疆游玩15天费用攻略... 带孩子去新疆怕预算超支又玩不尽兴?去年我带 7 岁女儿的十五天跟团游堪称 “完美范本”!网上找到的导...
共赴星河之约,枕星入眠!“恰西... 七月的巩留,云朵把影子投在起伏的恰西草原,牛羊像撒落的珍珠,雪岭云杉在天边排成长岗......这片 ...
让世界认识四川,剑门关国家5A... 爱旅游,爱生活。旅游可以放松自己的心情,宽阔自己的心境,你有好久没来一场说走就走的旅行,忘掉不顺心,...
受用的四川旅行五天方案,成都旅... 宝子们,四川,宛如一颗镶嵌在中国西南的璀璨明珠,散发着独特而迷人的魅力。它有着“天府之国”的美誉,这...
九公山公墓网红墓园:九公山名人... 当“特种兵旅游”的热潮退去,年轻人开始用脚步丈量历史的厚度。在九公山长城纪念林,一群特殊的“追星族”...
西北环线8日深度游,大西北经典... 西北环线8日深度游,大西北经典路线全攻略,这样走不踩雷! 想要一次看遍草原、沙漠、湖泊和丹霞的极致...
原创 全... 全球184国中唯一游客锐减的国家是哪里? 在新冠疫情后全球旅游地迎来V型复苏、各处景点人满为患的当...
安徽一地公布三起典型案例 近日 池州市第一批旅游行业导游乱象、 强制消费等问题行政处罚典型案例公布 详情如下 ↓↓↓ 为切实...
众信旅游重庆落地发布会圆满举办... 众信旅游 环球旅游好伙伴! 2025 众信旅游重庆落地发布会圆满举办 正式开启西南市场新篇章 近日...
深圳民宿老板太卷了!4天撒2吨... 封面新闻记者 罗田怡 杨金祝 7月末的深圳较场尾海滩,一场别开生面的“赶海”活动正在上演。与传统赶海...
西北游玩省心攻略,经典线路+省... 西北,这片广袤而神秘的土地,以其雄浑壮美的自然景观和深厚多元的文化底蕴,一直是我旅行清单上的终极梦想...
万达电影四家影城获IMAX卓越... 搜狐娱乐讯 7月29日,IMAX公司公布2024-2025年度IMAX卓越奖,万达电影旗下四家影城凭...
天河潭暑期烟花秀火花天夏攻略 天河潭暑期烟花秀火花天夏攻略 天河潭避暑旅游季活动火热开启,今年的暑期活动格外引人注目。从7月12...