diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index c4921a644..7de4987a2 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -91,6 +91,10 @@ func Job() *structs.Job { Delay: 1 * time.Minute, Mode: structs.RestartPolicyModeDelay, }, + ReschedulePolicy: &structs.ReschedulePolicy{ + Attempts: 2, + Interval: 10 * time.Minute, + }, Tasks: []*structs.Task{ { Name: "web", diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 0ce6eb6eb..edd417827 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -114,7 +114,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate, structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, - structs.EvalTriggerDeploymentWatcher: + structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) @@ -356,9 +356,6 @@ func (s *GenericScheduler) computeJobAllocs() error { // nodes to lost updateNonTerminalAllocsToLost(s.plan, tainted, allocs) - // Filter out the allocations in a terminal state - allocs = s.filterCompleteAllocs(allocs) - reconciler := NewAllocReconciler(s.ctx.Logger(), genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID), s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted) @@ -471,17 +468,30 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // stop the allocation before trying to find a replacement because this // frees the resources currently used by the previous allocation. stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc() + prevAllocation := missing.PreviousAllocation() if stopPrevAlloc { - s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "") + } + + // Setup node weights for replacement allocations + selectOptions := &SelectOptions{} + if prevAllocation != nil { + var penaltyNodes []string + penaltyNodes = append(penaltyNodes, prevAllocation.NodeID) + if prevAllocation.RescheduleTrackers != nil { + for _, reschedTracker := range prevAllocation.RescheduleTrackers { + penaltyNodes = append(penaltyNodes, reschedTracker.PrevNodeID) + } + } + selectOptions.PenaltyNodeIDs = penaltyNodes } // Attempt to match the task group var option *RankedNode if preferredNode != nil { - option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode}) - } else { - option, _ = s.stack.Select(tg) + selectOptions.PreferredNodes = []*structs.Node{preferredNode} } + option, _ = s.stack.Select(tg, selectOptions) // Store the available nodes by datacenter s.ctx.Metrics().NodesAvailable = byDC @@ -510,8 +520,16 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // If the new allocation is replacing an older allocation then we // set the record the older allocation id so that they are chained - if prev := missing.PreviousAllocation(); prev != nil { + if prev := prevAllocation; prev != nil { alloc.PreviousAllocation = prev.ID + var rescheduleTrackers []*structs.RescheduleTracker + if prev.RescheduleTrackers != nil { + for _, reschedInfo := range prev.RescheduleTrackers { + rescheduleTrackers = append(rescheduleTrackers, reschedInfo.Copy()) + } + } + rescheduleTrackers = append(rescheduleTrackers, &structs.RescheduleTracker{RescheduleTime: time.Now().UTC().UnixNano(), PrevAllocID: prev.ID, PrevNodeID: alloc.NodeID}) + alloc.RescheduleTrackers = rescheduleTrackers } // If we are placing a canary and we found a match, add the canary @@ -537,7 +555,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul // If we weren't able to find a replacement for the allocation, back // out the fact that we asked to stop the allocation. if stopPrevAlloc { - s.plan.PopUpdate(missing.PreviousAllocation()) + s.plan.PopUpdate(prevAllocation) } } } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index e7649a238..00f40f63c 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2467,6 +2467,16 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) { var complete []*structs.Allocation for i := 6; i < 10; i++ { newAlloc := stop[i].Copy() + newAlloc.TaskStates = make(map[string]*structs.TaskState) + newAlloc.TaskStates["web"] = &structs.TaskState{ + State: structs.TaskStateDead, + Events: []*structs.TaskEvent{ + { + Type: structs.TaskTerminated, + ExitCode: 0, + }, + }, + } newAlloc.ClientStatus = structs.AllocClientStatusComplete complete = append(complete, newAlloc) } @@ -2705,6 +2715,220 @@ func TestServiceSched_RetryLimit(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusFailed) } +func TestServiceSched_Reschedule_Once(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 15 * time.Minute, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + // Mark one of the allocations as failed + allocs[1].ClientStatus = structs.AllocClientStatusFailed + failedAllocID := allocs[1].ID + successAllocID := allocs[0].ID + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify that one new allocation got created with its restart tracker info + assert := assert.New(t) + assert.Equal(3, len(out)) + var newAlloc *structs.Allocation + for _, alloc := range out { + if alloc.ID != successAllocID && alloc.ID != failedAllocID { + newAlloc = alloc + } + } + assert.Equal(failedAllocID, newAlloc.PreviousAllocation) + assert.Equal(1, len(newAlloc.RescheduleTrackers)) + assert.Equal(failedAllocID, newAlloc.RescheduleTrackers[0].PrevAllocID) + + // Mark this alloc as failed again, should not get rescheduled + newAlloc.ClientStatus = structs.AllocClientStatusFailed + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{newAlloc})) + + // Create another mock evaluation + eval = &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err = h.Process(NewServiceScheduler, eval) + assert.Nil(err) + // Verify no new allocs were created this time + out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + assert.Equal(3, len(out)) + +} + +func TestServiceSched_Reschedule_Multiple(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + maxRestartAttempts := 3 + // Generate a fake job with allocations and an update policy. + job := mock.Job() + job.TaskGroups[0].Count = 2 + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: maxRestartAttempts, + Interval: 30 * time.Minute, + } + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + // Mark one of the allocations as failed + allocs[1].ClientStatus = structs.AllocClientStatusFailed + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + expectedNumAllocs := 3 + expectedNumReschedTrackers := 1 + + assert := assert.New(t) + for i := 0; i < maxRestartAttempts; i++ { + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + noErr(t, err) + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + + // Verify that a new allocation got created with its restart tracker info + assert.Equal(expectedNumAllocs, len(out)) + + // Find the new alloc with ClientStatusPending + var pendingAllocs []*structs.Allocation + fmt.Println("Iteration: ", i) + for _, alloc := range out { + fmt.Println(alloc.ID, alloc.ClientStatus, len(alloc.RescheduleTrackers), alloc.PreviousAllocation) + if alloc.ClientStatus == structs.AllocClientStatusPending { + pendingAllocs = append(pendingAllocs, alloc) + } + } + assert.Equal(1, len(pendingAllocs)) + newAlloc := pendingAllocs[0] + assert.Equal(expectedNumReschedTrackers, len(newAlloc.RescheduleTrackers)) + + // Mark this alloc as failed again + newAlloc.ClientStatus = structs.AllocClientStatusFailed + + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{newAlloc})) + + // Create another mock evaluation + eval = &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + expectedNumAllocs += 1 + expectedNumReschedTrackers += 1 + } + + // Process last eval again, should not reschedule + err := h.Process(NewServiceScheduler, eval) + assert.Nil(err) + + // Verify no new allocs were created because restart attempts were exhausted + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + noErr(t, err) + assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts +} + func TestBatchSched_Run_CompleteAlloc(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/rank.go b/scheduler/rank.go index 9e4ee81a1..748f11f03 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -304,3 +304,52 @@ func (iter *JobAntiAffinityIterator) Next() *RankedNode { func (iter *JobAntiAffinityIterator) Reset() { iter.source.Reset() } + +// NodeAntiAffinityIterator is used to apply a penalty to +// a node that had a previous failed allocation for the same job. +// This is used when attempting to reschedule a failed alloc +type NodeAntiAffinityIterator struct { + ctx Context + source RankIterator + penalty float64 + penaltyNodes map[string]struct{} +} + +// NewNodeAntiAffinityIterator is used to create a NodeAntiAffinityIterator that +// applies the given penalty for placement onto nodes in penaltyNodes +func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float64) *NodeAntiAffinityIterator { + iter := &NodeAntiAffinityIterator{ + ctx: ctx, + source: source, + penalty: penalty, + } + return iter +} + +func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(nodes []string) { + penaltyNodes := make(map[string]struct{}) + for _, node := range nodes { + penaltyNodes[node] = struct{}{} + } + iter.penaltyNodes = penaltyNodes +} + +func (iter *NodeAntiAffinityIterator) Next() *RankedNode { + for { + option := iter.source.Next() + if option == nil { + return nil + } + + _, ok := iter.penaltyNodes[option.Node.ID] + if ok { + option.Score -= iter.penalty + iter.ctx.Metrics().ScoreNode(option.Node, "node-anti-affinity", iter.penalty) + } + return option + } +} + +func (iter *NodeAntiAffinityIterator) Reset() { + iter.source.Reset() +} diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 8541220a7..474749881 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -6,6 +6,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" ) func TestFeasibleRankIterator(t *testing.T) { @@ -429,3 +430,36 @@ func collectRanked(iter RankIterator) (out []*RankedNode) { } return } + +func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) { + _, ctx := testContext(t) + node1 := &structs.Node{ + ID: uuid.Generate(), + } + node2 := &structs.Node{ + ID: uuid.Generate(), + } + + nodes := []*RankedNode{ + { + Node: node1, + }, + { + Node: node2, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0) + nodeAntiAffIter.SetPenaltyNodes([]string{node1.ID}) + + out := collectRanked(nodeAntiAffIter) + assert := assert.New(t) + assert.Equal(2, len(out)) + assert.Equal(node1.ID, out[0].Node.ID) + assert.Equal(-50.0, out[0].Score) + + assert.Equal(node2.ID, out[1].Node.ID) + assert.Equal(0.0, out[1].Score) + +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index a94e0462e..f9bf1af40 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -305,9 +305,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Determine what set of allocations are on tainted nodes untainted, migrate, lost := all.filterByTainted(a.taintedNodes) + untainted, reschedule := untainted.filterByRescheduleable(a.batch, tg.ReschedulePolicy) + // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) - nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate)) + nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule)) // Stop any unneeded allocations and update the untainted set to not // included stopped allocations. @@ -364,7 +366,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * The deployment is not paused or failed // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted, migrate) + place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -610,20 +612,34 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest // computePlacement returns the set of allocations to place given the group // definition, the set of untainted and migrating allocations for the group. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate allocSet) []allocPlaceResult { + nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult { // Hot path the nothing to do case existing := len(untainted) + len(migrate) if existing >= group.Count { return nil } - var place []allocPlaceResult - for _, name := range nameIndex.Next(uint(group.Count - existing)) { + // add rescheduled alloc placement results + for _, alloc := range reschedule { place = append(place, allocPlaceResult{ - name: name, - taskGroup: group, + name: alloc.Name, + taskGroup: group, + previousAlloc: alloc, }) + existing += 1 + if existing == group.Count { + break + } + } + // add remaining + if existing < group.Count { + for _, name := range nameIndex.Next(uint(group.Count - existing)) { + place = append(place, allocPlaceResult{ + name: name, + taskGroup: group, + }) + } } return place @@ -700,6 +716,9 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc removeNames := nameIndex.Highest(uint(remove)) for id, alloc := range untainted { if _, ok := removeNames[alloc.Name]; ok { + if alloc.TerminalStatus() { + continue + } stop[id] = alloc a.result.stop = append(a.result.stop, allocStopResult{ alloc: alloc, @@ -717,6 +736,9 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // It is possible that we didn't stop as many as we should have if there // were allocations with duplicate names. for id, alloc := range untainted { + if alloc.TerminalStatus() { + continue + } stop[id] = alloc a.result.stop = append(a.result.stop, allocStopResult{ alloc: alloc, diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 5a556f035..b8218a735 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -206,16 +206,72 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi untainted[alloc.ID] = alloc continue } - - if n == nil || n.TerminalStatus() { - lost[alloc.ID] = alloc + if !alloc.TerminalStatus() { + if n == nil || n.TerminalStatus() { + lost[alloc.ID] = alloc + } else { + migrate[alloc.ID] = alloc + } } else { - migrate[alloc.ID] = alloc + untainted[alloc.ID] = alloc } } return } +// filterByRescheduleable filters the allocation set to return the set of allocations that are either +// terminal or running, and a set of allocations that must be rescheduled +func (a allocSet) filterByRescheduleable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) { + untainted = make(map[string]*structs.Allocation) + reschedule = make(map[string]*structs.Allocation) + + rescheduledPrevAllocs := make(map[string]struct{}) // Track previous allocs from any restart trackers + + for _, alloc := range a { + if isBatch { + // Allocs from batch jobs should be filtered when the desired status + // is terminal and the client did not finish or when the client + // status is failed so that they will be replaced. If they are + // complete but not failed, they shouldn't be replaced. + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + if alloc.RanSuccessfully() { + untainted[alloc.ID] = alloc + } + continue + default: + } + if alloc.ShouldReschedule(reschedulePolicy) { + reschedule[alloc.ID] = alloc + } else { + untainted[alloc.ID] = alloc + } + } else { + // ignore allocs whose desired state is stop/evict + // everything else is either rescheduleable or untainted + if alloc.ShouldReschedule(reschedulePolicy) { + reschedule[alloc.ID] = alloc + } else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict { + untainted[alloc.ID] = alloc + } + } + } + + // Find allocs that exist in restart trackers from other allocs + for _, alloc := range reschedule { + if alloc.RescheduleTrackers != nil { + for _, reschedTrack := range alloc.RescheduleTrackers { + rescheduledPrevAllocs[reschedTrack.PrevAllocID] = struct{}{} + } + } + } + // Delete these from rescheduleable allocs + for allocId, _ := range rescheduledPrevAllocs { + delete(reschedule, allocId) + } + return +} + // filterByDeployment filters allocations into two sets, those that match the // given deployment ID and those that don't func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { diff --git a/scheduler/stack.go b/scheduler/stack.go index ebd12ba0f..3616e0c50 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -16,6 +16,9 @@ const ( // batchJobAntiAffinityPenalty is the same as the // serviceJobAntiAffinityPenalty but for batch type jobs. batchJobAntiAffinityPenalty = 10.0 + + // previousFailedAllocNodePenalty is a scoring penalty for nodes that a failed allocation was previously run on + previousFailedAllocNodePenalty = 50.0 ) // Stack is a chained collection of iterators. The stack is used to @@ -29,7 +32,12 @@ type Stack interface { SetJob(job *structs.Job) // Select is used to select a node for the task group - Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) + Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) +} + +type SelectOptions struct { + PenaltyNodeIDs []string + PreferredNodes []*structs.Node } // GenericStack is the Stack used for the Generic scheduler. It is @@ -49,6 +57,7 @@ type GenericStack struct { distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator jobAntiAff *JobAntiAffinityIterator + nodeAntiAff *NodeAntiAffinityIterator limit *LimitIterator maxScore *MaxScoreIterator } @@ -111,8 +120,10 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { } s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "") + s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty) + // Apply a limit function. This is to avoid scanning *every* possible node. - s.limit = NewLimitIterator(ctx, s.jobAntiAff, 2) + s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2) // Select the node with the maximum score for placement s.maxScore = NewMaxScoreIterator(ctx, s.limit) @@ -154,7 +165,22 @@ func (s *GenericStack) SetJob(job *structs.Job) { } } -func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { +func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) { + + // This block handles trying to select from preferred nodes if options specify them + // It also sets back the set of nodes to the original nodes + if options != nil && len(options.PreferredNodes) > 0 { + originalNodes := s.source.nodes + s.source.SetNodes(options.PreferredNodes) + options.PreferredNodes = nil + if option, resources := s.Select(tg, options); option != nil { + s.source.SetNodes(originalNodes) + return option, resources + } + s.source.SetNodes(originalNodes) + return s.Select(tg, options) + } + // Reset the max selector and context s.maxScore.Reset() s.ctx.Reset() @@ -170,6 +196,11 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.binPack.SetTaskGroup(tg) + if options != nil { + s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs) + } else { + s.nodeAntiAff.SetPenaltyNodes(nil) + } if contextual, ok := s.quota.(ContextualIterator); ok { contextual.SetTaskGroup(tg) @@ -190,19 +221,6 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso return option, tgConstr.size } -// SelectPreferredNode returns a node where an allocation of the task group can -// be placed, the node passed to it is preferred over the other available nodes -func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*structs.Node) (*RankedNode, *structs.Resources) { - originalNodes := s.source.nodes - s.source.SetNodes(nodes) - if option, resources := s.Select(tg); option != nil { - s.source.SetNodes(originalNodes) - return option, resources - } - s.source.SetNodes(originalNodes) - return s.Select(tg) -} - // SystemStack is the Stack used for the System scheduler. It is designed to // attempt to make placements on all nodes. type SystemStack struct { @@ -276,7 +294,7 @@ func (s *SystemStack) SetJob(job *structs.Job) { } } -func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { +func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) { // Reset the binpack selector and context s.binPack.Reset() s.ctx.Reset() diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index e94b8c9ec..b245ac418 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -47,8 +47,9 @@ func benchmarkServiceStack_MetaKeyConstraint(b *testing.B, key string, numNodes, stack.SetJob(job) b.ResetTimer() + selectOptions := &SelectOptions{} for i := 0; i < b.N; i++ { - stack.Select(job.TaskGroups[0]) + stack.Select(job.TaskGroups[0], selectOptions) } } @@ -104,7 +105,8 @@ func TestServiceStack_Select_Size(t *testing.T) { job := mock.Job() stack.SetJob(job) - node, size := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, size := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -138,7 +140,8 @@ func TestServiceStack_Select_PreferringNodes(t *testing.T) { // Create a preferred node preferredNode := mock.Node() - option, _ := stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode}) + selectOptions := &SelectOptions{PreferredNodes: []*structs.Node{preferredNode}} + option, _ := stack.Select(job.TaskGroups[0], selectOptions) if option == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -151,7 +154,8 @@ func TestServiceStack_Select_PreferringNodes(t *testing.T) { preferredNode1 := preferredNode.Copy() preferredNode1.Attributes["kernel.name"] = "windows" preferredNode1.ComputeClass() - option, _ = stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode1}) + selectOptions = &SelectOptions{PreferredNodes: []*structs.Node{preferredNode1}} + option, _ = stack.Select(job.TaskGroups[0], selectOptions) if option == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -174,7 +178,8 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) { job := mock.Job() stack.SetJob(job) - n1, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + n1, _ := stack.Select(job.TaskGroups[0], selectOptions) m1 := ctx.Metrics() if n1 == nil { t.Fatalf("missing node %#v", m1) @@ -184,7 +189,7 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) { t.Fatalf("should only be 2") } - n2, _ := stack.Select(job.TaskGroups[0]) + n2, _ := stack.Select(job.TaskGroups[0], selectOptions) m2 := ctx.Metrics() if n2 == nil { t.Fatalf("missing node %#v", m2) @@ -215,7 +220,8 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) { job.TaskGroups[0].Tasks[0].Driver = "foo" stack.SetJob(job) - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -243,8 +249,8 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) { job := mock.Job() job.Constraints[0].RTarget = "freebsd" stack.SetJob(job) - - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -280,8 +286,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { job := mock.Job() stack.SetJob(job) - - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -347,7 +353,8 @@ func TestSystemStack_Select_Size(t *testing.T) { job := mock.Job() stack.SetJob(job) - node, size := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, size := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -381,7 +388,8 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) { job := mock.Job() stack.SetJob(job) - n1, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + n1, _ := stack.Select(job.TaskGroups[0], selectOptions) m1 := ctx.Metrics() if n1 == nil { t.Fatalf("missing node %#v", m1) @@ -391,7 +399,7 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) { t.Fatalf("should only be 1") } - n2, _ := stack.Select(job.TaskGroups[0]) + n2, _ := stack.Select(job.TaskGroups[0], selectOptions) m2 := ctx.Metrics() if n2 == nil { t.Fatalf("missing node %#v", m2) @@ -418,7 +426,8 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) { job.TaskGroups[0].Tasks[0].Driver = "foo" stack.SetJob(job) - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -435,7 +444,7 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) { stack = NewSystemStack(ctx) stack.SetNodes(nodes) stack.SetJob(job) - node, _ = stack.Select(job.TaskGroups[0]) + node, _ = stack.Select(job.TaskGroups[0], selectOptions) if node != nil { t.Fatalf("node not filtered %#v", node) } @@ -460,7 +469,8 @@ func TestSystemStack_Select_ConstraintFilter(t *testing.T) { job.Constraints[0].RTarget = "freebsd" stack.SetJob(job) - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } @@ -497,7 +507,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { job := mock.Job() stack.SetJob(job) - node, _ := stack.Select(job.TaskGroups[0]) + selectOptions := &SelectOptions{} + node, _ := stack.Select(job.TaskGroups[0], selectOptions) if node == nil { t.Fatalf("missing node %#v", ctx.Metrics()) } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index bc513dddd..d30608c8b 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -275,7 +275,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { s.stack.SetNodes(nodes) // Attempt to match the task group - option, _ := s.stack.Select(missing.TaskGroup) + option, _ := s.stack.Select(missing.TaskGroup, nil) if option == nil { // If nodes were filtered because of constraint mismatches and we diff --git a/scheduler/util.go b/scheduler/util.go index ffd1366ee..09d36cb6c 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -511,7 +511,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, allocInPlace, "") // Attempt to match the task group - option, _ := stack.Select(update.TaskGroup) + option, _ := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass any node weight options // Pop the allocation ctx.Plan().PopUpdate(update.Alloc) @@ -722,7 +722,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc // genericAllocUpdateFn is a factory for the scheduler to create an allocUpdateType // function to be passed into the reconciler. The factory takes objects that // exist only in the scheduler context and returns a function that can be used -// by the reconciler to make decsions about how to update an allocation. The +// by the reconciler to make decisions about how to update an allocation. The // factory allows the reconciler to be unaware of how to determine the type of // update necessary and can minimize the set of objects it is exposed to. func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType { @@ -767,7 +767,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "") // Attempt to match the task group - option, _ := stack.Select(newTG) + option, _ := stack.Select(newTG, nil) // This select only looks at one node so we don't pass any node weight options // Pop the allocation ctx.Plan().PopUpdate(existing)