diff --git a/scheduler/preemption.go b/scheduler/preemption.go index 6ecf49cd8..905ce1dd4 100644 --- a/scheduler/preemption.go +++ b/scheduler/preemption.go @@ -17,6 +17,11 @@ type groupedAllocs struct { allocs []*structs.Allocation } +type allocInfo struct { + maxParallel int + resources *structs.ComparableResources +} + // PreemptionResource interface is implemented by different // types of resources. type PreemptionResource interface { @@ -44,14 +49,7 @@ func (n *NetworkPreemptionResource) MeetsRequirements() bool { } func (n *NetworkPreemptionResource) Distance() float64 { - networkCoord := math.MaxFloat64 - if n.availableResources != nil && n.resourceNeeded != nil { - networkCoord = float64(n.resourceNeeded.MBits-n.availableResources.MBits) / float64(n.resourceNeeded.MBits) - } - - originDist := math.Sqrt( - math.Pow(networkCoord, 2)) - return originDist + return networkResourceDistance(n.availableResources, n.resourceNeeded) } // BasePreemptionResource implements PreemptionResource for CPU/Memory/Disk @@ -100,10 +98,16 @@ type Preemptor struct { // it tracks the number of preempted allocations per job/taskgroup currentPreemptions map[structs.NamespacedID]map[string]int + // allocDetails is a map computed when SetCandidates is called + // it stores some precomputed details about the allocation needed + // when scoring it for preemption + allocDetails map[string]*allocInfo + // jobPriority is the priority of the job being preempted jobPriority int - // nodeRemainingResources tracks remaining available resources on the node + // nodeRemainingResources tracks available resources on the node after + // accounting for running allocations nodeRemainingResources *structs.ComparableResources // currentAllocs is the candidate set used to find preemptible allocations @@ -114,6 +118,7 @@ func NewPreemptor(jobPriority int) *Preemptor { return &Preemptor{ currentPreemptions: make(map[structs.NamespacedID]map[string]int), jobPriority: jobPriority, + allocDetails: make(map[string]*allocInfo), } } @@ -121,8 +126,8 @@ func NewPreemptor(jobPriority int) *Preemptor { func (p *Preemptor) SetNode(node *structs.Node) { nodeRemainingResources := node.ComparableResources() // Subtract the reserved resources of the node - if node.ComparableReservedResources() != nil { - nodeRemainingResources.Subtract(node.ComparableReservedResources()) + if c := node.ComparableReservedResources(); c != nil { + nodeRemainingResources.Subtract(c) } p.nodeRemainingResources = nodeRemainingResources } @@ -130,15 +135,21 @@ func (p *Preemptor) SetNode(node *structs.Node) { // SetCandidates initializes the candidate set from which preemptions are chosen func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) { p.currentAllocs = allocs + for _, alloc := range allocs { + maxParallel := 0 + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg != nil && tg.Migrate != nil { + maxParallel = tg.Migrate.MaxParallel + } + p.allocDetails[alloc.ID] = &allocInfo{maxParallel, alloc.ComparableResources()} + } } // SetPreemptions initializes a map tracking existing counts of preempted allocations // per job/task group. This is used while scoring preemption options func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) { // Clear out existing values since this can be called more than once - for k := range p.currentPreemptions { - delete(p.currentPreemptions, k) - } + p.currentPreemptions = make(map[structs.NamespacedID]map[string]int) // Initialize counts for _, alloc := range allocs { @@ -146,22 +157,20 @@ func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) { countMap, ok := p.currentPreemptions[id] if !ok { countMap = make(map[string]int) + p.currentPreemptions[id] = countMap } - c := countMap[alloc.TaskGroup] - countMap[alloc.TaskGroup] = c + 1 - p.currentPreemptions[id] = countMap + countMap[alloc.TaskGroup]++ } } // getNumPreemptions counts the number of other allocations being preempted that match the job and task group of // the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int { - numCurrentPreemptionsForJob := 0 - countMap := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}] - if countMap != nil { - numCurrentPreemptionsForJob = countMap[alloc.TaskGroup] + c, ok := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}][alloc.TaskGroup] + if !ok { + return 0 } - return numCurrentPreemptionsForJob + return c } // PreemptForTaskGroup computes a list of allocations to preempt to accommodate @@ -180,7 +189,9 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) var bestAllocs []*structs.Allocation allRequirementsMet := false - var preemptedResources *structs.ComparableResources + + // Initialize variable to track resources as they become available from preemption + availableResources := p.nodeRemainingResources.Copy() // Iterate over allocations grouped by priority to find preemptible allocations for _, allocGrp := range allocsByPriority { @@ -190,26 +201,16 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) // Find the alloc with the closest distance for index, alloc := range allocGrp.allocs { currentPreemptionCount := p.getNumPreemptions(alloc) - maxParallel := 0 - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg != nil && tg.Migrate != nil { - maxParallel = tg.Migrate.MaxParallel - } - distance := scoreForTaskGroup(resourcesNeeded, alloc.ComparableResources(), maxParallel, currentPreemptionCount) + allocDetails := p.allocDetails[alloc.ID] + maxParallel := allocDetails.maxParallel + distance := scoreForTaskGroup(resourcesNeeded, allocDetails.resources, maxParallel, currentPreemptionCount) if distance < bestDistance { bestDistance = distance closestAllocIndex = index } } closestAlloc := allocGrp.allocs[closestAllocIndex] - - if preemptedResources == nil { - preemptedResources = closestAlloc.ComparableResources() - } else { - preemptedResources.Add(closestAlloc.ComparableResources()) - } - availableResources := preemptedResources.Copy() - availableResources.Add(p.nodeRemainingResources) + availableResources.Add(closestAlloc.ComparableResources()) // This step needs the original resources asked for as the second arg, can't use the running total allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable()) @@ -232,11 +233,10 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources) return nil } - resourcesNeeded = resourceAsk.Comparable() // We do another pass to eliminate unnecessary preemptions // This filters out allocs whose resources are already covered by another alloc basePreemptionResource := GetBasePreemptionResourceFactory() - + resourcesNeeded = resourceAsk.Comparable() filteredBestAllocs := filterSuperset(bestAllocs, p.nodeRemainingResources, resourcesNeeded, basePreemptionResource) return filteredBestAllocs @@ -257,20 +257,19 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc reservedPortsNeeded := networkResourceAsk.ReservedPorts // Create a map from each device to allocs - // We do this because to place a task we have to be able to - // preempt allocations that are using the same device. - - // This step also filters out high priority allocations and allocations - // that are not using any network resources + // We can only preempt within allocations that + // are using the same device for _, alloc := range p.currentAllocs { if alloc.Job == nil { continue } + // Filter out alloc that's ineligible due to priority if p.jobPriority-alloc.Job.Priority < 10 { continue } - networks := alloc.ComparableResources().Flattened.Networks + allocResources := p.allocDetails[alloc.ID].resources + networks := allocResources.Flattened.Networks // Only include if the alloc has a network device if len(networks) > 0 { @@ -287,7 +286,6 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc } var allocsToPreempt []*structs.Allocation - met := false freeBandwidth := 0 @@ -313,8 +311,9 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc // First try to satisfy needed reserved ports if len(reservedPortsNeeded) > 0 { for _, alloc := range currentAllocs { - for _, tr := range alloc.TaskResources { - reservedPorts := tr.Networks[0].ReservedPorts + allocResources := p.allocDetails[alloc.ID].resources + for _, n := range allocResources.Flattened.Networks { + reservedPorts := n.ReservedPorts for _, p := range reservedPorts { usedPortToAlloc[p.Value] = alloc } @@ -426,8 +425,7 @@ func networkResourceDistance(resourceUsed *structs.NetworkResource, resourceNeed networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits) } - originDist := math.Sqrt( - math.Pow(networkCoord, 2)) + originDist := math.Abs(networkCoord) return originDist }