mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
scheduler: don't suppress blocked evals on delay if previous expires (#26523)
In #8099 we fixed a bug where garbage collecting a job with `disconnect.stop_on_client_after` would spawn recursive delayed evals. But when applied to disconnected allocs with `replace=true`, the fix prevents us from emitting a blocked eval if there's no room for the replacement. Update the guard on creating blocked evals so that rather than checking for `IsZero` that we check for being later than the `WaitUntil`. This separates this guard from the logic guarding the creation of delayed evals so that we can potentially create both when needed. Ref: https://github.com/hashicorp/nomad/pull/8099/files#r435198418
This commit is contained in:
@@ -254,10 +254,10 @@ func (s *GenericScheduler) process() (bool, error) {
|
||||
// current evaluation is already a blocked eval, we reuse it. If not, submit
|
||||
// a new eval to the planner in createBlockedEval. If rescheduling should
|
||||
// be delayed, do that instead.
|
||||
delayInstead := len(s.followUpEvals) > 0 && s.eval.WaitUntil.IsZero()
|
||||
|
||||
if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil &&
|
||||
!delayInstead {
|
||||
if s.eval.Status != structs.EvalStatusBlocked &&
|
||||
len(s.failedTGAllocs) != 0 &&
|
||||
s.blocked == nil &&
|
||||
(len(s.followUpEvals) == 0 || time.Now().After(s.eval.WaitUntil)) {
|
||||
if err := s.createBlockedEval(false); err != nil {
|
||||
s.logger.Error("failed to make blocked eval", "error", err)
|
||||
return false, err
|
||||
@@ -273,7 +273,7 @@ func (s *GenericScheduler) process() (bool, error) {
|
||||
|
||||
// Create follow up evals for any delayed reschedule eligible allocations, except in
|
||||
// the case that this evaluation was already delayed.
|
||||
if delayInstead {
|
||||
if len(s.followUpEvals) > 0 && s.eval.WaitUntil.IsZero() {
|
||||
for _, eval := range s.followUpEvals {
|
||||
eval.PreviousEval = s.eval.ID
|
||||
// TODO(preetha) this should be batching evals before inserting them
|
||||
|
||||
@@ -4908,6 +4908,99 @@ func TestServiceSched_BlockedReschedule(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceSched_BlockedDisconnectReplace(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
h := tests.NewHarness(t)
|
||||
node := mock.Node()
|
||||
node.Status = structs.NodeStatusDisconnected
|
||||
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
// Generate a fake job with a newly-disconnected allocation with a disconnect policy
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Count = 1
|
||||
replaceDelayDuration := 15 * time.Second
|
||||
lostAfterDuration := 12 * time.Hour
|
||||
job.TaskGroups[0].Disconnect = &structs.DisconnectStrategy{
|
||||
LostAfter: lostAfterDuration,
|
||||
Replace: pointer.Of(true),
|
||||
Reconcile: "best_score",
|
||||
}
|
||||
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
|
||||
Attempts: 3,
|
||||
Interval: 15 * time.Minute,
|
||||
Delay: replaceDelayDuration,
|
||||
MaxDelay: 1 * time.Minute,
|
||||
DelayFunction: "constant",
|
||||
}
|
||||
tgName := job.TaskGroups[0].Name
|
||||
now := time.Now().UTC()
|
||||
|
||||
must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job = job
|
||||
alloc.JobID = job.ID
|
||||
alloc.NodeID = node.ID
|
||||
alloc.Name = "my-job.web[0]"
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc.TaskStates = map[string]*structs.TaskState{tgName: {State: "dead",
|
||||
StartedAt: now.Add(-1 * time.Hour),
|
||||
FinishedAt: now}}
|
||||
disconnectedAllocID := alloc.ID
|
||||
|
||||
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup,
|
||||
h.NextIndex(), []*structs.Allocation{alloc}))
|
||||
|
||||
// Create a mock evaluation for the allocation failure
|
||||
eval := &structs.Evaluation{
|
||||
Namespace: structs.DefaultNamespace,
|
||||
ID: uuid.Generate(),
|
||||
Priority: 50,
|
||||
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
|
||||
JobID: job.ID,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
|
||||
h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
|
||||
// -----------------------------------
|
||||
// first eval should result in 1 delayed eval for the lost_after and 1
|
||||
// blocked eval for the rescheduyle
|
||||
|
||||
// Process the evaluation and assert we have a plan
|
||||
must.NoError(t, h.Process(NewServiceScheduler, eval))
|
||||
must.Len(t, 1, h.Plans)
|
||||
must.MapLen(t, 0, h.Plans[0].NodeUpdate) // no stop
|
||||
must.MapLen(t, 1, h.Plans[0].NodeAllocation) // ignore but update with follow-up eval
|
||||
|
||||
// Lookup the allocations by JobID and verify no new allocs created
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 1, out)
|
||||
|
||||
// Verify follow-up eval was created for the failed alloc
|
||||
// and write the eval to the state store
|
||||
alloc, err = h.State.AllocByID(ws, disconnectedAllocID)
|
||||
must.NoError(t, err)
|
||||
must.NotEq(t, "", alloc.FollowupEvalID)
|
||||
must.Len(t, 2, h.CreateEvals)
|
||||
followupEval0 := h.CreateEvals[0]
|
||||
followupEval1 := h.CreateEvals[1]
|
||||
|
||||
must.Eq(t, structs.EvalStatusBlocked, followupEval0.Status)
|
||||
must.Eq(t, structs.EvalStatusPending, followupEval1.Status)
|
||||
|
||||
// TODO: the top-level scheduler calls its own time.Now not shared with the
|
||||
// reconciler
|
||||
// must.Eq(t, now.Add(lostAfterDuration), followupEval1.WaitUntil)
|
||||
must.True(t, followupEval1.WaitUntil.After(now.Add(lostAfterDuration)))
|
||||
|
||||
must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup,
|
||||
h.NextIndex(), []*structs.Evaluation{followupEval0, followupEval1}))
|
||||
}
|
||||
|
||||
// Tests that old reschedule attempts are pruned
|
||||
func TestServiceSched_Reschedule_PruneEvents(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
Reference in New Issue
Block a user