From fd16f80b5a032ba384f99ea7f1cedc9562d5f61e Mon Sep 17 00:00:00 2001 From: Allison Larson Date: Thu, 15 May 2025 15:14:03 -0700 Subject: [PATCH] Only error on constraints if no allocs are running (#25850) * Only error on constraints if no allocs are running When running `nomad job run ` multiple times with constraints defined, there should be no error as a result of filtering out nodes that do not/have not ever satsified the constraints. When running a systems job with constraint, any run after an initial startup returns an exit(2) and a warning about unplaced allocations due to constraints. An error that is not encountered on the initial run, though the constraint stays the same. This is because the node that satisfies the condition is already running the allocation, and the placement is ignored. Another placement is attempted, but the only node(s) left are the ones that do not satisfy the constraint. Nomad views this case (no allocations that were attempted to placed could be placed successfully) as an error, and reports it as such. In reality, no allocations should be placed or updated in this case, but it should not be treated as an error. This change uses the `ignored` placements from diffSystemAlloc to attempt to determine if the case encountered is an error (no ignored placements means that nothing is already running, and is an error), or is not one (an ignored placement means that the task is already running somewhere on a node). It does this at the point where `failedTGAlloc` is populated, so placement functionality isn't changed, just the field that populates error. There is functionality that should be preserved which (correctly) notifies a user if a job is attempted that cannot be run on any node due to the constraints filtering out all available nodes. This should still behave as expected. * Add changelog entry * Handle in-place updates for constrained system jobs * Update .changelog/25850.txt Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> * Remove conditionals --------- Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> --- .changelog/25850.txt | 3 + scheduler/scheduler_sysbatch_test.go | 119 +++++++++++++++- scheduler/scheduler_system.go | 18 ++- scheduler/scheduler_system_test.go | 206 ++++++++++++++++++++++++++- 4 files changed, 333 insertions(+), 13 deletions(-) create mode 100644 .changelog/25850.txt diff --git a/.changelog/25850.txt b/.changelog/25850.txt new file mode 100644 index 000000000..c3f68fdf5 --- /dev/null +++ b/.changelog/25850.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug where planning or running a system job with constraints & previously running allocations would return a failed allocation error +``` diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index 7c14fa10f..63778e499 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -1013,11 +1013,6 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) { require.Nil(t, h.Process(NewSysBatchScheduler, eval3)) require.Equal(t, "complete", h.Evals[2].Status) - // Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't - require.Len(t, h.Evals[2].FailedTGAllocs, 1) - require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA") - require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB") - require.Len(t, h.Plans, 2) require.Len(t, h.Plans[1].NodeAllocation, 1) // Ensure all NodeAllocations are from first Eval @@ -1041,6 +1036,120 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) { require.Len(t, allocsNodeThree, 1) } +func TestSysBatch_JobConstraint_AllFiltered(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + node2 := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + // Create a job with a constraint + job := mock.SystemBatchJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: "something-else", + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single eval + must.Len(t, 1, h.Evals) + eval = h.Evals[0] + + // Ensure that the eval reports failed allocation + must.Eq(t, len(eval.FailedTGAllocs), 1) + // Ensure that the failed allocation is due to constraint on both nodes + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2) +} + +func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + fooNode := mock.Node() + fooNode.Name = "foo" + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode)) + + barNode := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode)) + + // Create a job with a constraint + job := mock.SystemBatchJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: fooNode.Name, + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Create a mock evaluation to run the job again, which will not place any + // new allocations (fooNode is already running, barNode is constrained), but + // will not report failed allocations + eval2 := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2})) + + err = h.Process(NewSystemScheduler, eval2) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID]) +} + // No errors reported when no available nodes prevent placement func TestSysBatch_ExistingAllocNoNodes(t *testing.T) { ci.Parallel(t) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 0cdc52882..8b6d328f8 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -279,6 +279,7 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendUnknownAlloc(e.Alloc) } + allocExistsForTaskGroup := map[string]bool{} // Attempt to do the upgrades in place. // Reconnecting allocations need to be updated to persists alloc state // changes. @@ -288,6 +289,10 @@ func (s *SystemScheduler) computeJobAllocs() error { destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates) diff.update = destructiveUpdates + for _, inplaceUpdate := range inplaceUpdates { + allocExistsForTaskGroup[inplaceUpdate.TaskGroup.Name] = true + } + if s.eval.AnnotatePlan { s.plan.Annotations = &structs.PlanAnnotations{ DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates), @@ -318,8 +323,12 @@ func (s *SystemScheduler) computeJobAllocs() error { s.queuedAllocs[allocTuple.TaskGroup.Name] += 1 } + for _, ignoredAlloc := range diff.ignore { + allocExistsForTaskGroup[ignoredAlloc.TaskGroup.Name] = true + } + // Compute the placements - return s.computePlacements(diff.place) + return s.computePlacements(diff.place, allocExistsForTaskGroup) } func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { @@ -347,7 +356,7 @@ func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric { } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []allocTuple) error { +func (s *SystemScheduler) computePlacements(place []allocTuple, existingByTaskGroup map[string]bool) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node @@ -389,7 +398,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { } filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics()) - if queued <= 0 { + // If no tasks have been placed and there aren't any previously + // existing (ignored or updated) tasks on the node, mark the alloc as failed to be placed + // if queued <= 0 && !existingByTaskGroup[tgName] { + if queued <= 0 && !existingByTaskGroup[tgName] { if s.failedTGAllocs == nil { s.failedTGAllocs = make(map[string]*structs.AllocMetric) } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 613414699..e1352d6cc 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -1437,11 +1437,6 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) { require.Nil(t, h.Process(NewSystemScheduler, eval3)) require.Equal(t, "complete", h.Evals[2].Status) - // Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't - require.Len(t, h.Evals[2].FailedTGAllocs, 1) - require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA") - require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB") - require.Len(t, h.Plans, 2) require.Len(t, h.Plans[1].NodeAllocation, 1) // Ensure all NodeAllocations are from first Eval @@ -1465,6 +1460,207 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) { require.Len(t, allocsNodeThree, 1) } +func TestSystemSched_JobConstraint_AllFiltered(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom class + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + node2 := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2)) + + // Create a job with a constraint + job := mock.SystemJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: "something-else", + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single eval + must.Len(t, 1, h.Evals) + eval = h.Evals[0] + + // Ensure that no evals report a failed allocation + must.Eq(t, len(eval.FailedTGAllocs), 1) + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2) +} + +// Test that the system scheduler can handle a job with a constraint on +// subsequent runs, and report the outcome appropriately +func TestSystemSched_JobConstraint_RunMultipleTimes(t *testing.T) { + ci.Parallel(t) + h := NewHarness(t) + + // Create two nodes, one with a custom name + fooNode := mock.Node() + fooNode.Name = "foo" + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode)) + + barNode := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode)) + + // Create a job with a constraint on custom name + job := mock.SystemJob() + job.Priority = structs.JobDefaultPriority + fooConstraint := &structs.Constraint{ + LTarget: "${node.unique.name}", + RTarget: fooNode.Name, + Operand: "==", + } + job.Constraints = []*structs.Constraint{fooConstraint} + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create a mock evaluation to start the job, which will run on the foo node + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Begin rerunning jobs with the constraints to ensure that they return the correct error states + testCases := []struct { + desc string + job func() *structs.Job + check func(*testing.T, *Harness) + }{ + { + desc: "Rerunning the job with constraint shouldn't report failed allocations", + job: func() *structs.Job { + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure a plan is not added, because no action should be taken + must.Len(t, 1, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running an inplace-update job with constraint update shouldn't report failed allocations", + job: func() *structs.Job { + job := job.Copy() + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another plan is added for the updated alloc + must.Len(t, 2, h.Plans) + + // Ensure that evals are not creating an alloc + must.Len(t, 0, h.CreateEvals) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that plan still includes allocation running on fooNode + must.Len(t, 1, h.Plans[1].NodeAllocation[fooNode.ID]) + // Ensure that plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[1].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running a destructive-update job with constraint that can be placed should not report failed allocations", + job: func() *structs.Job { + job := job.Copy() + // Update the required resources, but not to exceed the node's capacity + job.TaskGroups[0].Tasks[0].Resources.CPU = 123 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another plan is added for the updated alloc + must.Len(t, 3, h.Plans) + + // Ensure that no evals report a failed allocation + for _, eval := range h.Evals { + must.Eq(t, 0, len(eval.FailedTGAllocs)) + } + + // Ensure that newest plan includes allocation running on fooNode + must.Len(t, 1, h.Plans[2].NodeAllocation[fooNode.ID]) + // Ensure that newest plan does not include allocation running on barNode + must.Len(t, 0, h.Plans[2].NodeAllocation[barNode.ID]) + }, + }, + { + desc: "Running a destructive-update job with constraint that can't be placed should report failed allocations", + job: func() *structs.Job { + job := job.Copy() + // Update the required resources to exceed the non-constrained node's capacity + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = 100000000 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + return job + }, + check: func(t *testing.T, h *Harness) { + // Ensure another Plan for the updated alloc + must.Len(t, 4, h.Plans) + + eval := h.Evals[4] + // Ensure that this eval reports failed allocation, where the running alloc can no longer be run on the node + must.Eq(t, 1, len(eval.FailedTGAllocs)) + must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].NodesExhausted, 1) + }, + }, + } + + for _, tc := range testCases { + job := tc.job() + + testEval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{testEval})) + + err = h.Process(NewSystemScheduler, testEval) + must.NoError(t, err) + + tc.check(t, h) + } +} + // No errors reported when no available nodes prevent placement func TestSystemSched_ExistingAllocNoNodes(t *testing.T) { ci.Parallel(t)