diff --git a/.changelog/25850.txt b/.changelog/25850.txt new file mode 100644 index 000000000..c3f68fdf5 --- /dev/null +++ b/.changelog/25850.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug where planning or running a system job with constraints & previously running allocations would return a failed allocation error +``` diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index 7c14fa10f..63778e499 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -1013,11 +1013,6 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) { require.Nil(t, h.Process(NewSysBatchScheduler, eval3)) require.Equal(t, "complete", h.Evals[2].Status) - // Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't - require.Len(t, h.Evals[2].FailedTGAllocs, 1) - require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA") - require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB") - require.Len(t, h.Plans, 2) require.Len(t, h.Plans[1].NodeAllocation, 1) // Ensure all NodeAllocations are from first Eval @@ -1041,6 +1036,120 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) { require.Len(t, allocsNodeThree, 1) } +func TestSysBatch_JobConstraint_AllFiltered(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + node2 := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + // Create a job with a constraint + job := mock.SystemBatchJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: "something-else", + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single eval + must.Len(t, 1, h.Evals) + eval = h.Evals[0] + + // Ensure that the eval reports failed allocation + must.Eq(t, len(eval.FailedTGAllocs), 1) + // Ensure that the failed allocation is due to constraint on both nodes + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2) +} + +func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + fooNode := mock.Node() + fooNode.Name = "foo" + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode)) + + barNode := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode)) + + // Create a job with a constraint + job := mock.SystemBatchJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: fooNode.Name, + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Create a mock evaluation to run the job again, which will not place any + // new allocations (fooNode is already running, barNode is constrained), but + // will not report failed allocations + eval2 := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2})) + + err = h.Process(NewSystemScheduler, eval2) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID]) +} + // No errors reported when no available nodes prevent placement func TestSysBatch_ExistingAllocNoNodes(t *testing.T) { ci.Parallel(t) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 0cdc52882..8b6d328f8 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -279,6 +279,7 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendUnknownAlloc(e.Alloc) } + allocExistsForTaskGroup := map[string]bool{} // Attempt to do the upgrades in place. // Reconnecting allocations need to be updated to persists alloc state // changes. @@ -288,6 +289,10 @@ func (s *SystemScheduler) computeJobAllocs() error { destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates) diff.update = destructiveUpdates + for _, inplaceUpdate := range inplaceUpdates { + allocExistsForTaskGroup[inplaceUpdate.TaskGroup.Name] = true + } + if s.eval.AnnotatePlan { s.plan.Annotations = &structs.PlanAnnotations{ DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates), @@ -318,8 +323,12 @@ func (s *SystemScheduler) computeJobAllocs() error { s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 } + for _, ignoredAlloc := range diff.ignore { + allocExistsForTaskGroup[ignoredAlloc.TaskGroup.Name] = true + } + // Compute the placements - return s.computePlacements(diff.place) + return s.computePlacements(diff.place, allocExistsForTaskGroup) } func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { @@ -347,7 +356,7 @@ func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []allocTuple) error { +func (s *SystemScheduler) computePlacements(place []allocTuple, existingByTaskGroup map[string]bool) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node @@ -389,7 +398,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { } filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) - if queued <= 0 { + // If no tasks have been placed and there aren't any previously + // existing (ignored or updated) tasks on the node, mark the alloc as failed to be placed + // if queued <= 0 && !existingByTaskGroup[tgName] { + if queued <= 0 && !existingByTaskGroup[tgName] { if s.failedTGAllocs == nil { s.failedTGAllocs = make(map[string]*structs.AllocMetric) } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 613414699..e1352d6cc 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -1437,11 +1437,6 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) { require.Nil(t, h.Process(NewSystemScheduler, eval3)) require.Equal(t, "complete", h.Evals[2].Status) - // Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't - require.Len(t, h.Evals[2].FailedTGAllocs, 1) - require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA") - require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB") - require.Len(t, h.Plans, 2) require.Len(t, h.Plans[1].NodeAllocation, 1) // Ensure all NodeAllocations are from first Eval @@ -1465,6 +1460,207 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) { require.Len(t, allocsNodeThree, 1) } +func TestSystemSched_JobConstraint_AllFiltered(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + node2 := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + // Create a job with a constraint + job := mock.SystemJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: "something-else", + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single eval + must.Len(t, 1, h.Evals) + eval = h.Evals[0] + + // Ensure that no evals report a failed allocation + must.Eq(t, len(eval.FailedTGAllocs), 1) + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2) +} + +// Test that the system scheduler can handle a job with a constraint on +// subsequent runs, and report the outcome appropriately +func TestSystemSched_JobConstraint_RunMultipleTimes(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom name + fooNode := mock.Node() + fooNode.Name = "foo" + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode)) + + barNode := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode)) + + // Create a job with a constraint on custom name + job := mock.SystemJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: fooNode.Name, + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Begin rerunning jobs with the constraints to ensure that they return the correct error states + testCases := []struct { + desc string + job func() *structs.Job + check func(*testing.T, *Harness) + }{ + { + desc: "Rerunning the job with constraint shouldn't report failed allocations", + job: func() *structs.Job { + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure a plan is not added, because no action should be taken + must.Len(t, 1, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running an inplace-update job with constraint update shouldn't report failed allocations", + job: func() *structs.Job { + job := job.Copy() + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another plan is added for the updated alloc + must.Len(t, 2, h.Plans) + + // Ensure that evals are not creating an alloc + must.Len(t, 0, h.CreateEvals) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan still includes allocation running on fooNode + must.Len(t, 1, h.Plans[1].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[1].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running a destructive-update job with constraint that can be placed should not report failed allocations", + job: func() *structs.Job { + job := job.Copy() + // Update the required resources, but not to exceed the node's capacity + job.TaskGroups[0].Tasks[0].Resources.CPU = 123 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another plan is added for the updated alloc + must.Len(t, 3, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that newest plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[2].NodeAllocation[fooNode.ID]) + // Ensure that newest plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[2].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running a destructive-update job with constraint that can't be placed should report failed allocations", + job: func() *structs.Job { + job := job.Copy() + // Update the required resources to exceed the non-constrained node's capacity + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = 100000000 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another Plan for the updated alloc + must.Len(t, 4, h.Plans) + + eval := h.Evals[4] + // Ensure that this eval reports failed allocation, where the running alloc can no longer be run on the node + must.Eq(t, 1, len(eval.FailedTGAllocs)) + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].NodesExhausted, 1) + }, + }, + } + + for _, tc := range testCases { + job := tc.job() + + testEval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{testEval})) + + err = h.Process(NewSystemScheduler, testEval) + must.NoError(t, err) + + tc.check(t, h) + } +} + // No errors reported when no available nodes prevent placement func TestSystemSched_ExistingAllocNoNodes(t *testing.T) { ci.Parallel(t)