From 95fa06a35623dae5e73247be9c864e2d223a930b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 20 Jul 2017 12:23:46 -0700 Subject: [PATCH] Handle destructive changes before placements This PR updates the generic scheduler to handle destructive changes before handling placements. This is important because the destructive change may be due to a lowering of resources. If this is the case, the handling of the destructive changes first may make it possible for the placement to happen. To reason about this imagine there is one node with CPU = 500. If the group originally had: * `count = 1` * `cpu = 400` And then the job was updated such that the group had: * `count = 4` * `cpu = 120` If the original alloc isn't discounted first, nothing would be able to place. --- scheduler/generic_sched.go | 177 +++++++++++++++++++------------------ 1 file changed, 92 insertions(+), 85 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ae80fac50..a947d8881 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -443,18 +443,20 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Compute the placements - place := make([]placementResult, 0, len(results.place)+len(results.destructiveUpdate)) + place := make([]placementResult, 0, len(results.place)) for _, p := range results.place { place = append(place, p) } + + destructive := make([]placementResult, 0, len(results.destructiveUpdate)) for _, p := range results.destructiveUpdate { - place = append(place, p) + destructive = append(destructive, p) } - return s.computePlacements(place) + return s.computePlacements(destructive, place) } // computePlacements computes placements for allocations -func (s *GenericScheduler) computePlacements(place []placementResult) error { +func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error { // Get the base nodes nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { @@ -469,93 +471,98 @@ func (s *GenericScheduler) computePlacements(place []placementResult) error { // Update the set of placement ndoes s.stack.SetNodes(nodes) - for _, missing := range place { - // Get the task group - tg := missing.TaskGroup() + // Have to handle destructive changes first as we need to discount their + // resources. To understand this imagine the resources were reduced and the + // count was scaled up. + for _, results := range [][]placementResult{destructive, place} { + for _, missing := range results { + // Get the task group + tg := missing.TaskGroup() - // Check if this task group has already failed - if metric, ok := s.failedTGAllocs[tg.Name]; ok { - metric.CoalescedFailures += 1 - continue - } - - // Find the preferred node - preferredNode, err := s.findPreferredNode(missing) - if err != nil { - return err - } - - // Check if we should stop the previous allocation upon successful - // placement of its replacement. This allow atomic placements/stops. We - // stop the allocation before trying to find a replacement because this - // frees the resources currently used by the previous allocation. - stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() - if stopPrevAlloc { - s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") - } - - // Attempt to match the task group - var option *RankedNode - if preferredNode != nil { - option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode}) - } else { - option, _ = s.stack.Select(tg) - } - - // Store the available nodes by datacenter - s.ctx.Metrics().NodesAvailable = byDC - - // Set fields based on if we found an allocation option - if option != nil { - // Create an allocation for this - alloc := &structs.Allocation{ - ID: structs.GenerateUUID(), - EvalID: s.eval.ID, - Name: missing.Name(), - JobID: s.job.ID, - TaskGroup: tg.Name, - Metrics: s.ctx.Metrics(), - NodeID: option.Node.ID, - DeploymentID: deploymentID, - TaskResources: option.TaskResources, - DesiredStatus: structs.AllocDesiredStatusRun, - ClientStatus: structs.AllocClientStatusPending, - - SharedResources: &structs.Resources{ - DiskMB: tg.EphemeralDisk.SizeMB, - }, + // Check if this task group has already failed + if metric, ok := s.failedTGAllocs[tg.Name]; ok { + metric.CoalescedFailures += 1 + continue } - // If the new allocation is replacing an older allocation then we - // set the record the older allocation id so that they are chained - if prev := missing.PreviousAllocation(); prev != nil { - alloc.PreviousAllocation = prev.ID + // Find the preferred node + preferredNode, err := s.findPreferredNode(missing) + if err != nil { + return err } - // If we are placing a canary and we found a match, add the canary - // to the deployment state object. - if missing.Canary() { - if state, ok := s.deployment.TaskGroups[tg.Name]; ok { - state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID) - } - } - - // Track the placement - s.plan.AppendAlloc(alloc) - - } else { - // Lazy initialize the failed map - if s.failedTGAllocs == nil { - s.failedTGAllocs = make(map[string]*structs.AllocMetric) - } - - // Track the fact that we didn't find a placement - s.failedTGAllocs[tg.Name] = s.ctx.Metrics() - - // If we weren't able to find a replacement for the allocation, back - // out the fact that we asked to stop the allocation. + // Check if we should stop the previous allocation upon successful + // placement of its replacement. This allow atomic placements/stops. We + // stop the allocation before trying to find a replacement because this + // frees the resources currently used by the previous allocation. + stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() if stopPrevAlloc { - s.plan.PopUpdate(missing.PreviousAllocation()) + s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + } + + // Attempt to match the task group + var option *RankedNode + if preferredNode != nil { + option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode}) + } else { + option, _ = s.stack.Select(tg) + } + + // Store the available nodes by datacenter + s.ctx.Metrics().NodesAvailable = byDC + + // Set fields based on if we found an allocation option + if option != nil { + // Create an allocation for this + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: s.eval.ID, + Name: missing.Name(), + JobID: s.job.ID, + TaskGroup: tg.Name, + Metrics: s.ctx.Metrics(), + NodeID: option.Node.ID, + DeploymentID: deploymentID, + TaskResources: option.TaskResources, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + + SharedResources: &structs.Resources{ + DiskMB: tg.EphemeralDisk.SizeMB, + }, + } + + // If the new allocation is replacing an older allocation then we + // set the record the older allocation id so that they are chained + if prev := missing.PreviousAllocation(); prev != nil { + alloc.PreviousAllocation = prev.ID + } + + // If we are placing a canary and we found a match, add the canary + // to the deployment state object. + if missing.Canary() { + if state, ok := s.deployment.TaskGroups[tg.Name]; ok { + state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID) + } + } + + // Track the placement + s.plan.AppendAlloc(alloc) + + } else { + // Lazy initialize the failed map + if s.failedTGAllocs == nil { + s.failedTGAllocs = make(map[string]*structs.AllocMetric) + } + + // Track the fact that we didn't find a placement + s.failedTGAllocs[tg.Name] = s.ctx.Metrics() + + // If we weren't able to find a replacement for the allocation, back + // out the fact that we asked to stop the allocation. + if stopPrevAlloc { + s.plan.PopUpdate(missing.PreviousAllocation()) + } } } }