From d1186ae53e3033569d8473a48a34c5ce3e2fdac6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 15 Aug 2025 10:53:52 -0400 Subject: [PATCH] scheduler: don't suppress blocked evals on delay if previous expires (#26523) In #8099 we fixed a bug where garbage collecting a job with `disconnect.stop_on_client_after` would spawn recursive delayed evals. But when applied to disconnected allocs with `replace=true`, the fix prevents us from emitting a blocked eval if there's no room for the replacement. Update the guard on creating blocked evals so that rather than checking for `IsZero` that we check for being later than the `WaitUntil`. This separates this guard from the logic guarding the creation of delayed evals so that we can potentially create both when needed. Ref: https://github.com/hashicorp/nomad/pull/8099/files#r435198418 --- scheduler/generic_sched.go | 10 ++-- scheduler/generic_sched_test.go | 93 +++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+), 5 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 13dd6d02d..8bdd13e1b 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -254,10 +254,10 @@ func (s *GenericScheduler) process() (bool, error) { // current evaluation is already a blocked eval, we reuse it. If not, submit // a new eval to the planner in createBlockedEval. If rescheduling should // be delayed, do that instead. - delayInstead := len(s.followUpEvals) > 0 && s.eval.WaitUntil.IsZero() - - if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil && - !delayInstead { + if s.eval.Status != structs.EvalStatusBlocked && + len(s.failedTGAllocs) != 0 && + s.blocked == nil && + (len(s.followUpEvals) == 0 || time.Now().After(s.eval.WaitUntil)) { if err := s.createBlockedEval(false); err != nil { s.logger.Error("failed to make blocked eval", "error", err) return false, err @@ -273,7 +273,7 @@ func (s *GenericScheduler) process() (bool, error) { // Create follow up evals for any delayed reschedule eligible allocations, except in // the case that this evaluation was already delayed. - if delayInstead { + if len(s.followUpEvals) > 0 && s.eval.WaitUntil.IsZero() { for _, eval := range s.followUpEvals { eval.PreviousEval = s.eval.ID // TODO(preetha) this should be batching evals before inserting them diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7dceac0e3..0c6b88ab5 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -4908,6 +4908,99 @@ func TestServiceSched_BlockedReschedule(t *testing.T) { } } +func TestServiceSched_BlockedDisconnectReplace(t *testing.T) { + ci.Parallel(t) + + h := tests.NewHarness(t) + node := mock.Node() + node.Status = structs.NodeStatusDisconnected + must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Generate a fake job with a newly-disconnected allocation with a disconnect policy + job := mock.Job() + job.TaskGroups[0].Count = 1 + replaceDelayDuration := 15 * time.Second + lostAfterDuration := 12 * time.Hour + job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{ + LostAfter: lostAfterDuration, + Replace: pointer.Of(true), + Reconcile: "best_score", + } + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 3, + Interval: 15 * time.Minute, + Delay: replaceDelayDuration, + MaxDelay: 1 * time.Minute, + DelayFunction: "constant", + } + tgName := job.TaskGroups[0].Name + now := time.Now().UTC() + + must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job)) + + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead", + StartedAt: now.Add(-1 * time.Hour), + FinishedAt: now}} + disconnectedAllocID := alloc.ID + + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation for the allocation failure + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Evaluation{eval})) + + // ----------------------------------- + // first eval should result in 1 delayed eval for the lost_after and 1 + // blocked eval for the rescheduyle + + // Process the evaluation and assert we have a plan + must.NoError(t, h.Process(NewServiceScheduler, eval)) + must.Len(t, 1, h.Plans) + must.MapLen(t, 0, h.Plans[0].NodeUpdate) // no stop + must.MapLen(t, 1, h.Plans[0].NodeAllocation) // ignore but update with follow-up eval + + // Lookup the allocations by JobID and verify no new allocs created + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + must.NoError(t, err) + must.Len(t, 1, out) + + // Verify follow-up eval was created for the failed alloc + // and write the eval to the state store + alloc, err = h.State.AllocByID(ws, disconnectedAllocID) + must.NoError(t, err) + must.NotEq(t, "", alloc.FollowupEvalID) + must.Len(t, 2, h.CreateEvals) + followupEval0 := h.CreateEvals[0] + followupEval1 := h.CreateEvals[1] + + must.Eq(t, structs.EvalStatusBlocked, followupEval0.Status) + must.Eq(t, structs.EvalStatusPending, followupEval1.Status) + + // TODO: the top-level scheduler calls its own time.Now not shared with the + // reconciler + // must.Eq(t, now.Add(lostAfterDuration), followupEval1.WaitUntil) + must.True(t, followupEval1.WaitUntil.After(now.Add(lostAfterDuration))) + + must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, + h.NextIndex(), []*structs.Evaluation{followupEval0, followupEval1})) +} + // Tests that old reschedule attempts are pruned func TestServiceSched_Reschedule_PruneEvents(t *testing.T) { ci.Parallel(t)