review comments

This commit is contained in:
Preetha Appan
2018-11-01 12:01:59 -05:00
parent f2b027797b
commit 22d156f5a0

View File

@@ -17,6 +17,11 @@ type groupedAllocs struct {
allocs []*structs.Allocation
}
type allocInfo struct {
maxParallel int
resources *structs.ComparableResources
}
// PreemptionResource interface is implemented by different
// types of resources.
type PreemptionResource interface {
@@ -44,14 +49,7 @@ func (n *NetworkPreemptionResource) MeetsRequirements() bool {
}
func (n *NetworkPreemptionResource) Distance() float64 {
networkCoord := math.MaxFloat64
if n.availableResources != nil && n.resourceNeeded != nil {
networkCoord = float64(n.resourceNeeded.MBits-n.availableResources.MBits) / float64(n.resourceNeeded.MBits)
}
originDist := math.Sqrt(
math.Pow(networkCoord, 2))
return originDist
return networkResourceDistance(n.availableResources, n.resourceNeeded)
}
// BasePreemptionResource implements PreemptionResource for CPU/Memory/Disk
@@ -100,10 +98,16 @@ type Preemptor struct {
// it tracks the number of preempted allocations per job/taskgroup
currentPreemptions map[structs.NamespacedID]map[string]int
// allocDetails is a map computed when SetCandidates is called
// it stores some precomputed details about the allocation needed
// when scoring it for preemption
allocDetails map[string]*allocInfo
// jobPriority is the priority of the job being preempted
jobPriority int
// nodeRemainingResources tracks remaining available resources on the node
// nodeRemainingResources tracks available resources on the node after
// accounting for running allocations
nodeRemainingResources *structs.ComparableResources
// currentAllocs is the candidate set used to find preemptible allocations
@@ -114,6 +118,7 @@ func NewPreemptor(jobPriority int) *Preemptor {
return &Preemptor{
currentPreemptions: make(map[structs.NamespacedID]map[string]int),
jobPriority: jobPriority,
allocDetails: make(map[string]*allocInfo),
}
}
@@ -121,8 +126,8 @@ func NewPreemptor(jobPriority int) *Preemptor {
func (p *Preemptor) SetNode(node *structs.Node) {
nodeRemainingResources := node.ComparableResources()
// Subtract the reserved resources of the node
if node.ComparableReservedResources() != nil {
nodeRemainingResources.Subtract(node.ComparableReservedResources())
if c := node.ComparableReservedResources(); c != nil {
nodeRemainingResources.Subtract(c)
}
p.nodeRemainingResources = nodeRemainingResources
}
@@ -130,15 +135,21 @@ func (p *Preemptor) SetNode(node *structs.Node) {
// SetCandidates initializes the candidate set from which preemptions are chosen
func (p *Preemptor) SetCandidates(allocs []*structs.Allocation) {
p.currentAllocs = allocs
for _, alloc := range allocs {
maxParallel := 0
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel
}
p.allocDetails[alloc.ID] = &allocInfo{maxParallel, alloc.ComparableResources()}
}
}
// SetPreemptions initializes a map tracking existing counts of preempted allocations
// per job/task group. This is used while scoring preemption options
func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) {
// Clear out existing values since this can be called more than once
for k := range p.currentPreemptions {
delete(p.currentPreemptions, k)
}
p.currentPreemptions = make(map[structs.NamespacedID]map[string]int)
// Initialize counts
for _, alloc := range allocs {
@@ -146,22 +157,20 @@ func (p *Preemptor) SetPreemptions(allocs []*structs.Allocation) {
countMap, ok := p.currentPreemptions[id]
if !ok {
countMap = make(map[string]int)
p.currentPreemptions[id] = countMap
}
c := countMap[alloc.TaskGroup]
countMap[alloc.TaskGroup] = c + 1
p.currentPreemptions[id] = countMap
countMap[alloc.TaskGroup]++
}
}
// getNumPreemptions counts the number of other allocations being preempted that match the job and task group of
// the alloc under consideration. This is used as a scoring factor to minimize too many allocs of the same job being preempted at once
func (p *Preemptor) getNumPreemptions(alloc *structs.Allocation) int {
numCurrentPreemptionsForJob := 0
countMap := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}]
if countMap != nil {
numCurrentPreemptionsForJob = countMap[alloc.TaskGroup]
c, ok := p.currentPreemptions[structs.NamespacedID{alloc.JobID, alloc.Namespace}][alloc.TaskGroup]
if !ok {
return 0
}
return numCurrentPreemptionsForJob
return c
}
// PreemptForTaskGroup computes a list of allocations to preempt to accommodate
@@ -180,7 +189,9 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources)
var bestAllocs []*structs.Allocation
allRequirementsMet := false
var preemptedResources *structs.ComparableResources
// Initialize variable to track resources as they become available from preemption
availableResources := p.nodeRemainingResources.Copy()
// Iterate over allocations grouped by priority to find preemptible allocations
for _, allocGrp := range allocsByPriority {
@@ -190,26 +201,16 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources)
// Find the alloc with the closest distance
for index, alloc := range allocGrp.allocs {
currentPreemptionCount := p.getNumPreemptions(alloc)
maxParallel := 0
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil && tg.Migrate != nil {
maxParallel = tg.Migrate.MaxParallel
}
distance := scoreForTaskGroup(resourcesNeeded, alloc.ComparableResources(), maxParallel, currentPreemptionCount)
allocDetails := p.allocDetails[alloc.ID]
maxParallel := allocDetails.maxParallel
distance := scoreForTaskGroup(resourcesNeeded, allocDetails.resources, maxParallel, currentPreemptionCount)
if distance < bestDistance {
bestDistance = distance
closestAllocIndex = index
}
}
closestAlloc := allocGrp.allocs[closestAllocIndex]
if preemptedResources == nil {
preemptedResources = closestAlloc.ComparableResources()
} else {
preemptedResources.Add(closestAlloc.ComparableResources())
}
availableResources := preemptedResources.Copy()
availableResources.Add(p.nodeRemainingResources)
availableResources.Add(closestAlloc.ComparableResources())
// This step needs the original resources asked for as the second arg, can't use the running total
allRequirementsMet, _ = availableResources.Superset(resourceAsk.Comparable())
@@ -232,11 +233,10 @@ func (p *Preemptor) PreemptForTaskGroup(resourceAsk *structs.AllocatedResources)
return nil
}
resourcesNeeded = resourceAsk.Comparable()
// We do another pass to eliminate unnecessary preemptions
// This filters out allocs whose resources are already covered by another alloc
basePreemptionResource := GetBasePreemptionResourceFactory()
resourcesNeeded = resourceAsk.Comparable()
filteredBestAllocs := filterSuperset(bestAllocs, p.nodeRemainingResources, resourcesNeeded, basePreemptionResource)
return filteredBestAllocs
@@ -257,20 +257,19 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc
reservedPortsNeeded := networkResourceAsk.ReservedPorts
// Create a map from each device to allocs
// We do this because to place a task we have to be able to
// preempt allocations that are using the same device.
// This step also filters out high priority allocations and allocations
// that are not using any network resources
// We can only preempt within allocations that
// are using the same device
for _, alloc := range p.currentAllocs {
if alloc.Job == nil {
continue
}
// Filter out alloc that's ineligible due to priority
if p.jobPriority-alloc.Job.Priority < 10 {
continue
}
networks := alloc.ComparableResources().Flattened.Networks
allocResources := p.allocDetails[alloc.ID].resources
networks := allocResources.Flattened.Networks
// Only include if the alloc has a network device
if len(networks) > 0 {
@@ -287,7 +286,6 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc
}
var allocsToPreempt []*structs.Allocation
met := false
freeBandwidth := 0
@@ -313,8 +311,9 @@ func (p *Preemptor) PreemptForNetwork(networkResourceAsk *structs.NetworkResourc
// First try to satisfy needed reserved ports
if len(reservedPortsNeeded) > 0 {
for _, alloc := range currentAllocs {
for _, tr := range alloc.TaskResources {
reservedPorts := tr.Networks[0].ReservedPorts
allocResources := p.allocDetails[alloc.ID].resources
for _, n := range allocResources.Flattened.Networks {
reservedPorts := n.ReservedPorts
for _, p := range reservedPorts {
usedPortToAlloc[p.Value] = alloc
}
@@ -426,8 +425,7 @@ func networkResourceDistance(resourceUsed *structs.NetworkResource, resourceNeed
networkCoord = float64(resourceNeeded.MBits-resourceUsed.MBits) / float64(resourceNeeded.MBits)
}
originDist := math.Sqrt(
math.Pow(networkCoord, 2))
originDist := math.Abs(networkCoord)
return originDist
}