Only error on constraints if no allocs are running (#25850)

* Only error on constraints if no allocs are running

When running `nomad job run <JOB>` multiple times with constraints
defined, there should be no error as a result of filtering out nodes
that do not/have not ever satsified the constraints.

When running a systems job with constraint, any run after an initial
startup returns an exit(2) and a warning about unplaced allocations due
to constraints. An error that is not encountered on the initial run,
though the constraint stays the same.

This is because the node that satisfies the condition is already running
the allocation, and the placement is ignored. Another placement is
attempted, but the only node(s) left are the ones that do not satisfy
the constraint. Nomad views this case (no allocations that were
attempted to placed could be placed successfully) as an error, and
reports it as such. In reality, no allocations should be placed or
updated in this case, but it should not be treated as an error.

This change uses the `ignored` placements from diffSystemAlloc to attempt to
determine if the case encountered is an error (no ignored placements
means that nothing is already running, and is an error), or is not one
(an ignored placement means that the task is already running somewhere
on a node). It does this at the point where `failedTGAlloc` is
populated, so placement functionality isn't changed, just the field that
populates error.

There is functionality that should be preserved which (correctly)
notifies a user if a job is attempted that cannot be run on any node due
to the constraints filtering out all available nodes. This should still
behave as expected.

* Add changelog entry

* Handle in-place updates for constrained system jobs

* Update .changelog/25850.txt

Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>

* Remove conditionals

---------

Co-authored-by: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com>
This commit is contained in:
Allison Larson
2025-05-15 15:14:03 -07:00
committed by GitHub
parent 9ee2582379
commit fd16f80b5a
4 changed files with 333 additions and 13 deletions

3
.changelog/25850.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug where planning or running a system job with constraints & previously running allocations would return a failed allocation error
```

View File

@@ -1013,11 +1013,6 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
require.Nil(t, h.Process(NewSysBatchScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")
require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
// Ensure all NodeAllocations are from first Eval
@@ -1041,6 +1036,120 @@ func TestSysBatch_JobConstraint_AddNode(t *testing.T) {
require.Len(t, allocsNodeThree, 1)
}
func TestSysBatch_JobConstraint_AllFiltered(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create two nodes, one with a custom class
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
node2 := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
// Create a job with a constraint
job := mock.SystemBatchJob()
job.Priority = structs.JobDefaultPriority
fooConstraint := &structs.Constraint{
LTarget: "${node.unique.name}",
RTarget: "something-else",
Operand: "==",
}
job.Constraints = []*structs.Constraint{fooConstraint}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to start the job, which will run on the foo node
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
must.NoError(t, err)
// Ensure a single eval
must.Len(t, 1, h.Evals)
eval = h.Evals[0]
// Ensure that the eval reports failed allocation
must.Eq(t, len(eval.FailedTGAllocs), 1)
// Ensure that the failed allocation is due to constraint on both nodes
must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2)
}
func TestSysBatch_JobConstraint_RunMultiple(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create two nodes, one with a custom class
fooNode := mock.Node()
fooNode.Name = "foo"
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode))
barNode := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode))
// Create a job with a constraint
job := mock.SystemBatchJob()
job.Priority = structs.JobDefaultPriority
fooConstraint := &structs.Constraint{
LTarget: "${node.unique.name}",
RTarget: fooNode.Name,
Operand: "==",
}
job.Constraints = []*structs.Constraint{fooConstraint}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to start the job, which will run on the foo node
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
must.NoError(t, err)
// Create a mock evaluation to run the job again, which will not place any
// new allocations (fooNode is already running, barNode is constrained), but
// will not report failed allocations
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval2}))
err = h.Process(NewSystemScheduler, eval2)
must.NoError(t, err)
// Ensure a single plan
must.Len(t, 1, h.Plans)
// Ensure that no evals report a failed allocation
for _, eval := range h.Evals {
must.Eq(t, 0, len(eval.FailedTGAllocs))
}
// Ensure that plan includes allocation running on fooNode
must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID])
// Ensure that plan does not include allocation running on barNode
must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID])
}
// No errors reported when no available nodes prevent placement
func TestSysBatch_ExistingAllocNoNodes(t *testing.T) {
ci.Parallel(t)

View File

@@ -279,6 +279,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
s.plan.AppendUnknownAlloc(e.Alloc)
}
allocExistsForTaskGroup := map[string]bool{}
// Attempt to do the upgrades in place.
// Reconnecting allocations need to be updated to persists alloc state
// changes.
@@ -288,6 +289,10 @@ func (s *SystemScheduler) computeJobAllocs() error {
destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, updates)
diff.update = destructiveUpdates
for _, inplaceUpdate := range inplaceUpdates {
allocExistsForTaskGroup[inplaceUpdate.TaskGroup.Name] = true
}
if s.eval.AnnotatePlan {
s.plan.Annotations = &structs.PlanAnnotations{
DesiredTGUpdates: desiredUpdates(diff, inplaceUpdates, destructiveUpdates),
@@ -318,8 +323,12 @@ func (s *SystemScheduler) computeJobAllocs() error {
s.queuedAllocs[allocTuple.TaskGroup.Name] += 1
}
for _, ignoredAlloc := range diff.ignore {
allocExistsForTaskGroup[ignoredAlloc.TaskGroup.Name] = true
}
// Compute the placements
return s.computePlacements(diff.place)
return s.computePlacements(diff.place, allocExistsForTaskGroup)
}
func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
@@ -347,7 +356,7 @@ func mergeNodeFiltered(acc, curr *structs.AllocMetric) *structs.AllocMetric {
}
// computePlacements computes placements for allocations
func (s *SystemScheduler) computePlacements(place []allocTuple) error {
func (s *SystemScheduler) computePlacements(place []allocTuple, existingByTaskGroup map[string]bool) error {
nodeByID := make(map[string]*structs.Node, len(s.nodes))
for _, node := range s.nodes {
nodeByID[node.ID] = node
@@ -389,7 +398,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
}
filteredMetrics[tgName] = mergeNodeFiltered(filteredMetrics[tgName], s.ctx.Metrics())
if queued <= 0 {
// If no tasks have been placed and there aren't any previously
// existing (ignored or updated) tasks on the node, mark the alloc as failed to be placed
// if queued <= 0 && !existingByTaskGroup[tgName] {
if queued <= 0 && !existingByTaskGroup[tgName] {
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}

View File

@@ -1437,11 +1437,6 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
require.Nil(t, h.Process(NewSystemScheduler, eval3))
require.Equal(t, "complete", h.Evals[2].Status)
// Ensure `groupA` fails to be placed due to its constraint, but `groupB` doesn't
require.Len(t, h.Evals[2].FailedTGAllocs, 1)
require.Contains(t, h.Evals[2].FailedTGAllocs, "groupA")
require.NotContains(t, h.Evals[2].FailedTGAllocs, "groupB")
require.Len(t, h.Plans, 2)
require.Len(t, h.Plans[1].NodeAllocation, 1)
// Ensure all NodeAllocations are from first Eval
@@ -1465,6 +1460,207 @@ func TestSystemSched_JobConstraint_AddNode(t *testing.T) {
require.Len(t, allocsNodeThree, 1)
}
func TestSystemSched_JobConstraint_AllFiltered(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create two nodes, one with a custom class
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
node2 := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node2))
// Create a job with a constraint
job := mock.SystemJob()
job.Priority = structs.JobDefaultPriority
fooConstraint := &structs.Constraint{
LTarget: "${node.unique.name}",
RTarget: "something-else",
Operand: "==",
}
job.Constraints = []*structs.Constraint{fooConstraint}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to start the job, which will run on the foo node
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
must.NoError(t, err)
// Ensure a single eval
must.Len(t, 1, h.Evals)
eval = h.Evals[0]
// Ensure that no evals report a failed allocation
must.Eq(t, len(eval.FailedTGAllocs), 1)
must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].ConstraintFiltered[fooConstraint.String()], 2)
}
// Test that the system scheduler can handle a job with a constraint on
// subsequent runs, and report the outcome appropriately
func TestSystemSched_JobConstraint_RunMultipleTimes(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create two nodes, one with a custom name
fooNode := mock.Node()
fooNode.Name = "foo"
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), fooNode))
barNode := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), barNode))
// Create a job with a constraint on custom name
job := mock.SystemJob()
job.Priority = structs.JobDefaultPriority
fooConstraint := &structs.Constraint{
LTarget: "${node.unique.name}",
RTarget: fooNode.Name,
Operand: "==",
}
job.Constraints = []*structs.Constraint{fooConstraint}
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create a mock evaluation to start the job, which will run on the foo node
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
must.NoError(t, err)
// Begin rerunning jobs with the constraints to ensure that they return the correct error states
testCases := []struct {
desc string
job func() *structs.Job
check func(*testing.T, *Harness)
}{
{
desc: "Rerunning the job with constraint shouldn't report failed allocations",
job: func() *structs.Job {
return job
},
check: func(t *testing.T, h *Harness) {
// Ensure a plan is not added, because no action should be taken
must.Len(t, 1, h.Plans)
// Ensure that no evals report a failed allocation
for _, eval := range h.Evals {
must.Eq(t, 0, len(eval.FailedTGAllocs))
}
// Ensure that plan includes allocation running on fooNode
must.Len(t, 1, h.Plans[0].NodeAllocation[fooNode.ID])
// Ensure that plan does not include allocation running on barNode
must.Len(t, 0, h.Plans[0].NodeAllocation[barNode.ID])
},
},
{
desc: "Running an inplace-update job with constraint update shouldn't report failed allocations",
job: func() *structs.Job {
job := job.Copy()
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
return job
},
check: func(t *testing.T, h *Harness) {
// Ensure another plan is added for the updated alloc
must.Len(t, 2, h.Plans)
// Ensure that evals are not creating an alloc
must.Len(t, 0, h.CreateEvals)
// Ensure that no evals report a failed allocation
for _, eval := range h.Evals {
must.Eq(t, 0, len(eval.FailedTGAllocs))
}
// Ensure that plan still includes allocation running on fooNode
must.Len(t, 1, h.Plans[1].NodeAllocation[fooNode.ID])
// Ensure that plan does not include allocation running on barNode
must.Len(t, 0, h.Plans[1].NodeAllocation[barNode.ID])
},
},
{
desc: "Running a destructive-update job with constraint that can be placed should not report failed allocations",
job: func() *structs.Job {
job := job.Copy()
// Update the required resources, but not to exceed the node's capacity
job.TaskGroups[0].Tasks[0].Resources.CPU = 123
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
return job
},
check: func(t *testing.T, h *Harness) {
// Ensure another plan is added for the updated alloc
must.Len(t, 3, h.Plans)
// Ensure that no evals report a failed allocation
for _, eval := range h.Evals {
must.Eq(t, 0, len(eval.FailedTGAllocs))
}
// Ensure that newest plan includes allocation running on fooNode
must.Len(t, 1, h.Plans[2].NodeAllocation[fooNode.ID])
// Ensure that newest plan does not include allocation running on barNode
must.Len(t, 0, h.Plans[2].NodeAllocation[barNode.ID])
},
},
{
desc: "Running a destructive-update job with constraint that can't be placed should report failed allocations",
job: func() *structs.Job {
job := job.Copy()
// Update the required resources to exceed the non-constrained node's capacity
job.TaskGroups[0].Tasks[0].Resources.MemoryMB = 100000000
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
return job
},
check: func(t *testing.T, h *Harness) {
// Ensure another Plan for the updated alloc
must.Len(t, 4, h.Plans)
eval := h.Evals[4]
// Ensure that this eval reports failed allocation, where the running alloc can no longer be run on the node
must.Eq(t, 1, len(eval.FailedTGAllocs))
must.Eq(t, eval.FailedTGAllocs[job.TaskGroups[0].Name].NodesExhausted, 1)
},
},
}
for _, tc := range testCases {
job := tc.job()
testEval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{testEval}))
err = h.Process(NewSystemScheduler, testEval)
must.NoError(t, err)
tc.check(t, h)
}
}
// No errors reported when no available nodes prevent placement
func TestSystemSched_ExistingAllocNoNodes(t *testing.T) {
ci.Parallel(t)