diff --git a/.changelog/25705.txt b/.changelog/25705.txt new file mode 100644 index 000000000..9d2f02b1b --- /dev/null +++ b/.changelog/25705.txt @@ -0,0 +1,3 @@ +```release-note:bug +scheduler: Fixed a bug in accounting for resources.cores that could prevent placements on nodes with available cores +``` diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 82f91a187..0942613ba 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -7405,6 +7405,102 @@ func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T) } } +func TestServiceSched_ReservedCores_InPlace(t *testing.T) { + ci.Parallel(t) + + h := NewHarness(t) + + // Create a node + node := mock.Node() + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Create a job + job := mock.Job() + job.TaskGroups[0].Tasks[0].Resources.Cores = 1 + job.TaskGroups[0].Count = 2 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + // Create running allocations on existing cores + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.AllocForNodeWithoutReservedPort(node) + alloc.Job = job + alloc.JobID = job.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores = []uint16{uint16(i + 1)} + allocs = append(allocs, alloc) + } + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + + // Create a new job version with a different count + job2 := job.Copy() + job2.TaskGroups[0].Count = 3 + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + + // Ensure the eval has no spawned blocked eval due to core exhaustion + must.Eq(t, "", h.Evals[0].BlockedEval, must.Sprint("blocked eval should be empty, without core exhaustion")) + + // Ensure the plan allocated with the correct reserved cores + var planned []*structs.Allocation + for _, allocList := range h.Plans[0].NodeAllocation { + for _, alloc := range allocList { + switch alloc.Name { + case "my-job.web[0]": // Ensure that the first planned alloc is still on core 1 + must.Eq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + case "my-job.web[1]": // Ensure that the second planned alloc is still on core 2 + must.Eq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + default: // Ensure that the new planned alloc is not on core 1 or 2 + must.NotEq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + must.NotEq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + } + } + planned = append(planned, allocList...) + } + + must.Len(t, 3, planned) + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + must.NoError(t, err) + + must.Len(t, 3, out) + + // Ensure the allocations continute to have the correct reserved cores + for _, alloc := range out { + switch alloc.Name { + case "my-job.web[0]": // Ensure that the first alloc is still on core 1 + must.Eq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + case "my-job.web[1]": // Ensure that the second alloc is still on core 2 + must.Eq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + default: // Ensure that the new alloc is not on core 1 or 2 + must.NotEq(t, []uint16{uint16(2)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + must.NotEq(t, []uint16{uint16(1)}, alloc.AllocatedResources.Tasks["web"].Cpu.ReservedCores) + } + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func initNodeAndAllocs(t *testing.T, h *Harness, job *structs.Job, nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) { // Node, which is ready diff --git a/scheduler/util.go b/scheduler/util.go index cd3dc7fb7..0d2372aa0 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -935,17 +935,19 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy return false, true, nil } - // Restore the network and device offers from the existing allocation. - // We do not allow network resources (reserved/dynamic ports) - // to be updated. This is guarded in taskUpdated, so we can + // Restore the network, device offers, and any reserved cores from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) or reserved cores + // to be updated, and reserved cores should not be recomputed for alloc updates. This is guarded in taskUpdated, so we can // safely restore those here. for task, resources := range option.TaskResources { var networks structs.Networks var devices []*structs.AllocatedDeviceResource + var cores []uint16 if existing.AllocatedResources != nil { if tr, ok := existing.AllocatedResources.Tasks[task]; ok { networks = tr.Networks devices = tr.Devices + cores = tr.Cpu.ReservedCores } } else if tr, ok := existing.TaskResources[task]; ok { networks = tr.Networks @@ -954,6 +956,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy // Add the networks back resources.Networks = networks resources.Devices = devices + resources.Cpu.ReservedCores = cores } // Create a shallow copy