Preserve core resources during inplace service alloc updates (#25705)

* Preserve core resources during inplace service alloc updates

When an alloc is running with the core resources specified, and the
alloc is able to be updated in place, the cores it is running on should
be preserved.

This fixes a bug where the allocation's task's core resources
(CPU.ReservedCores) would be recomputed each time the reconciler checked
that the allocation could continue to run on the given node. Under
circumstances where a different core on the node became available before
this check was made, the selection process could compute this new core
as the core to run on, regardless of core the allocation was already
running on. The check takes into account other allocations running on
the node with reserved cores, but cannot check itself.

When this would happen for multiple allocations being evaluated in a
single plan, the selection process would see the other cores being
previously reserved but be unaware of the one it ran on, resulting in
the same core being chosen over and over for each allocation that was
being checked, and updated in the state store (but not on the node).
Once those cores were chosen and committed for multiple allocs, the node
appears to be exhausted on the cores dimension, and it would prevent any
additional allocations from being started on the node.

The reconciler check/computation for allocations that are being updated
in place and have resources.cores defined is effectively a check that
the node has the available cores to run on, not a computation that
should be changed. The fix still performs the check, but once it is
successful any existing ReservedCores are preserved. Because any changes
to this resource is considered a "destructive change", this can be
confidently preserved during the inplace update.

* Adjust reservedCores scheduler test

* Add changelog entry
This commit is contained in:
Allison Larson
2025-04-23 10:38:47 -07:00
committed by GitHub
parent dbf44a6ed3
commit 50513a87b7
3 changed files with 105 additions and 3 deletions

3
.changelog/25705.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
scheduler: Fixed a bug in accounting for resources.cores that could prevent placements on nodes with available cores
```

View File

@@ -7405,6 +7405,102 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T)
}
}
func TestServiceSched_ReservedCores_InPlace(t *testing.T) {
ci.Parallel(t)
h := NewHarness(t)
// Create a node
node := mock.Node()
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Create a job
job := mock.Job()
job.TaskGroups[0].Tasks[0].Resources.Cores = 1
job.TaskGroups[0].Count = 2
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
// Create running allocations on existing cores
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.AllocForNodeWithoutReservedPort(node)
alloc.Job = job
alloc.JobID = job.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores = []uint16{uint16(i + 1)}
allocs = append(allocs, alloc)
}
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
// Create a new job version with a different count
job2 := job.Copy()
job2.TaskGroups[0].Count = 3
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
must.NoError(t, err)
// Ensure a single plan
must.Len(t, 1, h.Plans)
// Ensure the eval has no spawned blocked eval due to core exhaustion
must.Eq(t, "", h.Evals[0].BlockedEval, must.Sprint("blocked eval should be empty, without core exhaustion"))
// Ensure the plan allocated with the correct reserved cores
var planned []*structs.Allocation
for _, allocList := range h.Plans[0].NodeAllocation {
for _, alloc := range allocList {
switch alloc.Name {
case "my-job.web[0]": // Ensure that the first planned alloc is still on core 1
must.Eq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
case "my-job.web[1]": // Ensure that the second planned alloc is still on core 2
must.Eq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
default: // Ensure that the new planned alloc is not on core 1 or 2
must.NotEq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
must.NotEq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
}
}
planned = append(planned, allocList...)
}
must.Len(t, 3, planned)
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
must.NoError(t, err)
must.Len(t, 3, out)
// Ensure the allocations continute to have the correct reserved cores
for _, alloc := range out {
switch alloc.Name {
case "my-job.web[0]": // Ensure that the first alloc is still on core 1
must.Eq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
case "my-job.web[1]": // Ensure that the second alloc is still on core 2
must.Eq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
default: // Ensure that the new alloc is not on core 1 or 2
must.NotEq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
must.NotEq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores)
}
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func initNodeAndAllocs(t *testing.T, h *Harness, job *structs.Job,
nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) {
// Node, which is ready

View File

@@ -935,17 +935,19 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
return false, true, nil
}
// Restore the network and device offers from the existing allocation.
// We do not allow network resources (reserved/dynamic ports)
// to be updated. This is guarded in taskUpdated, so we can
// Restore the network, device offers, and any reserved cores from the existing allocation.
// We do not allow network resources (reserved/dynamic ports) or reserved cores
// to be updated, and reserved cores should not be recomputed for alloc updates. This is guarded in taskUpdated, so we can
// safely restore those here.
for task, resources := range option.TaskResources {
var networks structs.Networks
var devices []*structs.AllocatedDeviceResource
var cores []uint16
if existing.AllocatedResources != nil {
if tr, ok := existing.AllocatedResources.Tasks[task]; ok {
networks = tr.Networks
devices = tr.Devices
cores = tr.Cpu.ReservedCores
}
} else if tr, ok := existing.TaskResources[task]; ok {
networks = tr.Networks
@@ -954,6 +956,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
// Add the networks back
resources.Networks = networks
resources.Devices = devices
resources.Cpu.ReservedCores = cores
}
// Create a shallow copy