diff --git a/api/tasks.go b/api/tasks.go index b25ac7368..3ab4523a3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -411,23 +411,24 @@ func (vm *VolumeMount) Canonicalize() { // TaskGroup is the unit of scheduling. type TaskGroup struct { - Name *string - Count *int - Constraints []*Constraint - Affinities []*Affinity - Tasks []*Task - Spreads []*Spread - Volumes map[string]*VolumeRequest - RestartPolicy *RestartPolicy - ReschedulePolicy *ReschedulePolicy - EphemeralDisk *EphemeralDisk - Update *UpdateStrategy - Migrate *MigrateStrategy - Networks []*NetworkResource - Meta map[string]string - Services []*Service - ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"` - Scaling *ScalingPolicy + Name *string + Count *int + Constraints []*Constraint + Affinities []*Affinity + Tasks []*Task + Spreads []*Spread + Volumes map[string]*VolumeRequest + RestartPolicy *RestartPolicy + ReschedulePolicy *ReschedulePolicy + EphemeralDisk *EphemeralDisk + Update *UpdateStrategy + Migrate *MigrateStrategy + Networks []*NetworkResource + Meta map[string]string + Services []*Service + ShutdownDelay *time.Duration `mapstructure:"shutdown_delay"` + StopAfterClientDisconnect *time.Duration `mapstructure:"stop_after_client_disconnect"` + Scaling *ScalingPolicy } // NewTaskGroup creates a new TaskGroup. diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index 05e9b1f5e..7d9570e7d 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -91,7 +91,7 @@ func (h *heartbeatStop) watch() { select { case allocID := <-stop: if err := h.stopAlloc(allocID); err != nil { - h.logger.Warn("stopping alloc %s on heartbeat timeout failed: %v", allocID, err) + h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err) continue } delete(h.allocInterval, allocID) @@ -142,6 +142,8 @@ func (h *heartbeatStop) stopAlloc(allocID string) error { return err } + h.logger.Debug("stopping alloc for stop_after_client_disconnect", "alloc", allocID) + runner.Destroy() return nil } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 09bac74cc..5b3d36ee8 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -793,6 +793,10 @@ func ApiTgToStructsTG(job *structs.Job, taskGroup *api.TaskGroup, tg *structs.Ta tg.ShutdownDelay = taskGroup.ShutdownDelay } + if taskGroup.StopAfterClientDisconnect != nil { + tg.StopAfterClientDisconnect = taskGroup.StopAfterClientDisconnect + } + if taskGroup.ReschedulePolicy != nil { tg.ReschedulePolicy = &structs.ReschedulePolicy{ Attempts: *taskGroup.ReschedulePolicy.Attempts, diff --git a/jobspec/parse_group.go b/jobspec/parse_group.go index 731018c6b..db78ec538 100644 --- a/jobspec/parse_group.go +++ b/jobspec/parse_group.go @@ -56,6 +56,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "service", "volume", "scaling", + "stop_after_client_disconnect", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 88fca2894..a676d8f5b 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -184,6 +184,7 @@ func TestParse(t *testing.T) { }, }, }, + StopAfterClientDisconnect: helper.TimeToPtr(120 * time.Second), ReschedulePolicy: &api.ReschedulePolicy{ Interval: helper.TimeToPtr(12 * time.Hour), Attempts: helper.IntToPtr(5), diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index dfa63caf6..8b2f9ef74 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -152,6 +152,8 @@ job "binstore-storagelocker" { } } + stop_after_client_disconnect = "120s" + task "binstore" { driver = "docker" user = "bob" diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fbb1e3e57..9b0eca9cc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4623,7 +4623,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat } case structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: default: - s.logger.Error("invalid old client status for allocatio", + s.logger.Error("invalid old client status for allocation", "alloc_id", existingAlloc.ID, "client_status", existingAlloc.ClientStatus) } summaryChanged = true diff --git a/nomad/structs/diff.go b/nomad/structs/diff.go index 34533c058..ebd927397 100644 --- a/nomad/structs/diff.go +++ b/nomad/structs/diff.go @@ -238,6 +238,20 @@ func (tg *TaskGroup) Diff(other *TaskGroup, contextual bool) (*TaskGroupDiff, er } } + // StopAfterClientDisconnect diff + if oldPrimitiveFlat != nil && newPrimitiveFlat != nil { + if tg.StopAfterClientDisconnect == nil { + oldPrimitiveFlat["StopAfterClientDisconnect"] = "" + } else { + oldPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *tg.StopAfterClientDisconnect) + } + if other.StopAfterClientDisconnect == nil { + newPrimitiveFlat["StopAfterClientDisconnect"] = "" + } else { + newPrimitiveFlat["StopAfterClientDisconnect"] = fmt.Sprintf("%d", *other.StopAfterClientDisconnect) + } + } + // Diff the primitive fields. diff.Fields = fieldDiffs(oldPrimitiveFlat, newPrimitiveFlat, false) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 680b68938..f56fd8519 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3799,6 +3799,10 @@ func (j *Job) Validate() error { mErr.Errors = append(mErr.Errors, errors.New("ShutdownDelay must be a positive value")) } + if tg.StopAfterClientDisconnect != nil && *tg.StopAfterClientDisconnect < 0 { + mErr.Errors = append(mErr.Errors, errors.New("StopAfterClientDisconnect must be a positive value")) + } + if j.Type == "system" && tg.Count > 1 { mErr.Errors = append(mErr.Errors, fmt.Errorf("Job task group %s has count %d. Count cannot exceed 1 with system scheduler", @@ -5265,6 +5269,10 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.ShutdownDelay = tg.ShutdownDelay } + if tg.StopAfterClientDisconnect != nil { + ntg.StopAfterClientDisconnect = tg.StopAfterClientDisconnect + } + return ntg } @@ -6516,6 +6524,19 @@ func (t *Template) Warnings() error { return mErr.ErrorOrNil() } +// AllocState records a single event that changes the state of the whole allocation +type AllocStateField uint8 + +const ( + AllocStateFieldClientStatus AllocStateField = iota +) + +type AllocState struct { + Field AllocStateField + Value string + Time time.Time +} + // Set of possible states for a task. const ( TaskStatePending = "pending" // The task is waiting to be run. @@ -8152,6 +8173,9 @@ type Allocation struct { // TaskStates stores the state of each task, TaskStates map[string]*TaskState + // AllocStates track meta data associated with changes to the state of the whole allocation, like becoming lost + AllocStates []*AllocState + // PreviousAllocation is the allocation that this allocation is replacing PreviousAllocation string @@ -8420,6 +8444,49 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) { return nextRescheduleTime, rescheduleEligible } +// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration +func (a *Allocation) ShouldClientStop() bool { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil || + tg.StopAfterClientDisconnect == nil || + *tg.StopAfterClientDisconnect == 0*time.Nanosecond { + return false + } + return true +} + +// WaitClientStop uses the reschedule delay mechanism to block rescheduling until +// StopAfterClientDisconnect's block interval passes +func (a *Allocation) WaitClientStop() time.Time { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + + // An alloc can only be marked lost once, so use the first lost transition + var t time.Time + for _, s := range a.AllocStates { + if s.Field == AllocStateFieldClientStatus && + s.Value == AllocClientStatusLost { + t = s.Time + break + } + } + + // On the first pass, the alloc hasn't been marked lost yet, and so we start + // counting from now + if t.IsZero() { + t = time.Now().UTC() + } + + // Find the max kill timeout + kill := DefaultKillTimeout + for _, t := range tg.Tasks { + if t.KillTimeout > kill { + kill = t.KillTimeout + } + } + + return t.Add(*tg.StopAfterClientDisconnect + kill) +} + // NextDelay returns a duration after which the allocation can be rescheduled. // It is calculated according to the delay function and previous reschedule attempts. func (a *Allocation) NextDelay() time.Duration { @@ -8476,6 +8543,24 @@ func (a *Allocation) Terminated() bool { return false } +// SetStopped updates the allocation in place to a DesiredStatus stop, with the ClientStatus +func (a *Allocation) SetStop(clientStatus, clientDesc string) { + a.DesiredStatus = AllocDesiredStatusStop + a.ClientStatus = clientStatus + a.ClientDescription = clientDesc + a.AppendState(AllocStateFieldClientStatus, clientStatus) +} + +// AppendState creates and appends an AllocState entry recording the time of the state +// transition. Used to mark the transition to lost +func (a *Allocation) AppendState(field AllocStateField, value string) { + a.AllocStates = append(a.AllocStates, &AllocState{ + Field: field, + Value: value, + Time: time.Now().UTC(), + }) +} + // RanSuccessfully returns whether the client has ran the allocation and all // tasks finished successfully. Critically this function returns whether the // allocation has ran to completion and not just that the alloc has converged to @@ -9384,6 +9469,8 @@ func (p *Plan) AppendStoppedAlloc(alloc *Allocation, desiredDesc, clientStatus s newAlloc.ClientStatus = clientStatus } + newAlloc.AppendState(AllocStateFieldClientStatus, clientStatus) + node := alloc.NodeID existing := p.NodeUpdate[node] p.NodeUpdate[node] = append(existing, newAlloc) diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 96529be7c..a160a3ec9 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -3595,13 +3595,21 @@ func TestPlan_AppendStoppedAllocAppendsAllocWithUpdatedAttrs(t *testing.T) { plan.AppendStoppedAlloc(alloc, desiredDesc, AllocClientStatusLost) - appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] expectedAlloc := new(Allocation) *expectedAlloc = *alloc expectedAlloc.DesiredDescription = desiredDesc expectedAlloc.DesiredStatus = AllocDesiredStatusStop expectedAlloc.ClientStatus = AllocClientStatusLost expectedAlloc.Job = nil + expectedAlloc.AllocStates = []*AllocState{{ + Field: AllocStateFieldClientStatus, + Value: "lost", + }} + + // This value is set to time.Now() in AppendStoppedAlloc, so clear it + appendedAlloc := plan.NodeUpdate[alloc.NodeID][0] + appendedAlloc.AllocStates[0].Time = time.Time{} + assert.Equal(t, expectedAlloc, appendedAlloc) assert.Equal(t, alloc.Job, plan.Job) } @@ -4372,6 +4380,65 @@ func TestAllocation_NextDelay(t *testing.T) { } +func TestAllocation_WaitClientStop(t *testing.T) { + type testCase struct { + desc string + stop time.Duration + status string + expectedShould bool + expectedRescheduleTime time.Time + } + now := time.Now().UTC() + testCases := []testCase{ + { + desc: "running", + stop: 2 * time.Second, + status: AllocClientStatusRunning, + expectedShould: true, + }, + { + desc: "no stop_after_client_disconnect", + status: AllocClientStatusLost, + expectedShould: false, + }, + { + desc: "stop", + status: AllocClientStatusLost, + stop: 2 * time.Second, + expectedShould: true, + expectedRescheduleTime: now.Add((2 + 5) * time.Second), + }, + } + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + j := testJob() + a := &Allocation{ + ClientStatus: tc.status, + Job: j, + TaskStates: map[string]*TaskState{}, + } + + if tc.status == AllocClientStatusLost { + a.AppendState(AllocStateFieldClientStatus, AllocClientStatusLost) + } + + j.TaskGroups[0].StopAfterClientDisconnect = &tc.stop + a.TaskGroup = j.TaskGroups[0].Name + + require.Equal(t, tc.expectedShould, a.ShouldClientStop()) + + if !tc.expectedShould || tc.status != AllocClientStatusLost { + return + } + + // the reschedTime is close to the expectedRescheduleTime + reschedTime := a.WaitClientStop() + e := reschedTime.Unix() - tc.expectedRescheduleTime.Unix() + require.Less(t, e, int64(2)) + }) + } +} + func TestAllocation_Canonicalize_Old(t *testing.T) { alloc := MockAlloc() alloc.AllocatedResources = nil diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index f47e02500..6e42c297a 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -259,8 +259,10 @@ func (s *GenericScheduler) process() (bool, error) { // If there are failed allocations, we need to create a blocked evaluation // to place the failed allocations when resources become available. If the // current evaluation is already a blocked eval, we reuse it by submitting - // a new eval to the planner in createBlockedEval - if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil { + // a new eval to the planner in createBlockedEval. If the current eval is + // pending with WaitUntil set, it's delayed rather than blocked. + if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil && + s.eval.WaitUntil.IsZero() { if err := s.createBlockedEval(false); err != nil { s.logger.Error("failed to make blocked eval", "error", err) return false, err @@ -338,7 +340,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Update the allocations which are in pending/running state on tainted - // nodes to lost + // nodes to lost, but only if the scheduler has already marked them updateNonTerminalAllocsToLost(s.plan, tainted, allocs) reconciler := NewAllocReconciler(s.logger, diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index deb6ddecd..779c38819 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -2768,6 +2769,167 @@ func TestServiceSched_NodeDown(t *testing.T) { } } +func TestServiceSched_StopAfterClientDisconnect(t *testing.T) { + cases := []struct { + stop time.Duration + when time.Time + rescheduled bool + }{ + { + rescheduled: true, + }, + { + stop: 1 * time.Second, + rescheduled: false, + }, + { + stop: 1 * time.Second, + when: time.Now().UTC().Add(-10 * time.Second), + rescheduled: true, + }, + { + stop: 1 * time.Second, + when: time.Now().UTC().Add(10 * time.Minute), + rescheduled: false, + }, + } + + for i, tc := range cases { + t.Run(fmt.Sprintf(""), func(t *testing.T) { + h := NewHarness(t) + + // Node, which is down + node := mock.Node() + node.Status = structs.NodeStatusDown + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Job with allocations and stop_after_client_disconnect + job := mock.Job() + job.TaskGroups[0].Count = 1 + job.TaskGroups[0].StopAfterClientDisconnect = &tc.stop + require.NoError(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Alloc for the running group + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusRunning + if !tc.when.IsZero() { + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusLost, + Time: tc.when, + }} + } + allocs := []*structs.Allocation{alloc} + require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deal with drain + evals := []*structs.Evaluation{{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeDrain, + JobID: job.ID, + NodeID: node.ID, + Status: structs.EvalStatusPending, + }} + eval := evals[0] + require.NoError(t, h.State.UpsertEvals(h.NextIndex(), evals)) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + require.Equal(t, h.Evals[0].Status, structs.EvalStatusComplete) + require.Len(t, h.Plans, 1, "plan") + + // Followup eval created + require.True(t, len(h.CreateEvals) > 0) + e := h.CreateEvals[0] + require.Equal(t, eval.ID, e.PreviousEval) + + if tc.rescheduled { + require.Equal(t, "blocked", e.Status) + } else { + require.Equal(t, "pending", e.Status) + require.NotEmpty(t, e.WaitUntil) + } + + // This eval is still being inserted in the state store + ws := memdb.NewWatchSet() + testutil.WaitForResult(func() (bool, error) { + found, err := h.State.EvalByID(ws, e.ID) + if err != nil { + return false, err + } + if found == nil { + return false, nil + } + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + alloc, err = h.State.AllocByID(ws, alloc.ID) + require.NoError(t, err) + + // Allocations have been transitioned to lost + require.Equal(t, structs.AllocDesiredStatusStop, alloc.DesiredStatus) + require.Equal(t, structs.AllocClientStatusLost, alloc.ClientStatus) + // At least 1, 2 if we manually set the tc.when + require.NotEmpty(t, alloc.AllocStates) + + if tc.rescheduled { + // Register a new node, leave it up, process the followup eval + node = mock.Node() + require.NoError(t, h.State.UpsertNode(h.NextIndex(), node)) + require.NoError(t, h.Process(NewServiceScheduler, eval)) + + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + testutil.WaitForResult(func() (bool, error) { + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + if err != nil { + return false, err + } + return len(as) == 2, nil + }, func(err error) { + require.NoError(t, err) + }) + + a2 := as[0] + if a2.ID == alloc.ID { + a2 = as[1] + } + + require.Equal(t, structs.AllocClientStatusPending, a2.ClientStatus) + require.Equal(t, structs.AllocDesiredStatusRun, a2.DesiredStatus) + require.Equal(t, node.ID, a2.NodeID) + + // No blocked evals + require.Empty(t, h.ReblockEvals) + require.Len(t, h.CreateEvals, 1) + require.Equal(t, h.CreateEvals[0].ID, e.ID) + } else { + // No new alloc was created + as, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + require.Len(t, as, 1) + old := as[0] + + require.Equal(t, alloc.ID, old.ID) + require.Equal(t, structs.AllocClientStatusLost, old.ClientStatus) + require.Equal(t, structs.AllocDesiredStatusStop, old.DesiredStatus) + } + }) + } +} + func TestServiceSched_NodeUpdate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index ccddfa07f..a3f791806 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -353,10 +353,20 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment) + // Find delays for any lost allocs that have stop_after_client_disconnect + lostLater := lost.delayByStopAfterClientDisconnect() + rescheduleLater = append(rescheduleLater, lostLater...) + // Create batched follow up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating a.handleDelayedReschedules(rescheduleLater, all, tg.Name) + // Allocs that are lost and delayed have an attributeUpdate that correctly links to + // the eval, but incorrectly has the current (running) status + for _, d := range lostLater { + a.result.attributeUpdates[d.allocID].SetStop(structs.AllocClientStatusLost, structs.AllocClientStatusLost) + } + // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow)) @@ -413,9 +423,13 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * The deployment is not paused or failed // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) - if !existingDeployment { - dstate.DesiredTotal += len(place) + // * There is no delayed stop_after_client_disconnect alloc + var place []allocPlaceResult + if len(lostLater) == 0 { + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow) + if !existingDeployment { + dstate.DesiredTotal += len(place) + } } // deploymentPlaceReady tracks whether the deployment is in a state where diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index ec6c26473..e8da880db 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -370,6 +370,28 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { return } +// delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a +// stop_after_client_disconnect configured +func (as allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) { + now := time.Now().UTC() + for _, a := range as { + if !a.ShouldClientStop() { + continue + } + + t := a.WaitClientStop() + + if t.After(now) { + later = append(later, &delayedRescheduleInfo{ + allocID: a.ID, + alloc: a, + rescheduleTime: t, + }) + } + } + return later +} + // allocNameIndex is used to select allocation names for placement or removal // given an existing set of placed allocations. type allocNameIndex struct { diff --git a/scheduler/testing.go b/scheduler/testing.go index cb6059c54..c1d8776b4 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -6,6 +6,7 @@ import ( "time" testing "github.com/mitchellh/go-testing-interface" + "github.com/stretchr/testify/require" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper/testlog" @@ -272,12 +273,7 @@ func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error { } func (h *Harness) AssertEvalStatus(t testing.T, state string) { - if len(h.Evals) != 1 { - t.Fatalf("bad: %#v", h.Evals) - } + require.Len(t, h.Evals, 1) update := h.Evals[0] - - if update.Status != state { - t.Fatalf("bad: %#v", update) - } + require.Equal(t, state, update.Status) } diff --git a/scheduler/util.go b/scheduler/util.go index ee18e3000..0890f32df 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -812,8 +812,8 @@ func adjustQueuedAllocations(logger log.Logger, result *structs.PlanResult, queu } } -// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state on tainted node -// to lost +// updateNonTerminalAllocsToLost updates the allocations which are in pending/running state +// on tainted node to lost, but only for allocs already DesiredStatus stop or evict func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*structs.Node, allocs []*structs.Allocation) { for _, alloc := range allocs { node, ok := tainted[alloc.NodeID] @@ -826,8 +826,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc continue } - // If the scheduler has marked it as stop or evict already but the alloc - // wasn't terminal on the client change the status to lost. + // If the alloc is already correctly marked lost, we're done if (alloc.DesiredStatus == structs.AllocDesiredStatusStop || alloc.DesiredStatus == structs.AllocDesiredStatusEvict) && (alloc.ClientStatus == structs.AllocClientStatusRunning ||