diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 75d0250ad..92397ed06 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -1256,6 +1256,7 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) { ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex) event := structs.NewTaskEvent(structs.TaskClientReconnected) + event.Time = time.Now().UnixNano() for _, tr := range ar.tasks { tr.AppendEvent(event) } @@ -1273,6 +1274,12 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) { // Build the client allocation alloc := ar.clientAlloc(states) + // Don't destroy until after we've appended the reconnect event. + if update.DesiredStatus != structs.AllocDesiredStatusRun { + ar.Shutdown() + return + } + // Update the client state store. err = ar.stateUpdater.PutAllocation(alloc) if err != nil { diff --git a/client/allocwatcher/alloc_watcher.go b/client/allocwatcher/alloc_watcher.go index 4e6bbb2cf..beed670b4 100644 --- a/client/allocwatcher/alloc_watcher.go +++ b/client/allocwatcher/alloc_watcher.go @@ -404,8 +404,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error { p.logger.Debug("blocking alloc was GC'd") return nil } - if resp.Alloc.Terminated() { - // Terminated! + if resp.Alloc.Terminated() || resp.Alloc.ClientStatus == structs.AllocClientStatusUnknown { p.nodeID = resp.Alloc.NodeID return nil } diff --git a/client/client_test.go b/client/client_test.go index d6ac20c17..e6f2e4579 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1765,6 +1765,7 @@ func TestClient_ReconnectAllocs(t *testing.T) { require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus) require.NoError(t, err) unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown + unknownAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc}) require.NoError(t, err) diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index 810196a43..d7e62e15e 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1153,12 +1153,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene for _, allocToUpdate := range args.Alloc { allocToUpdate.ModifyTime = now.UTC().UnixNano() - if !allocToUpdate.TerminalStatus() { + alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID) + if alloc == nil { continue } - alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID) - if alloc == nil { + if !allocToUpdate.TerminalStatus() && alloc.ClientStatus != structs.AllocClientStatusUnknown { continue } @@ -1178,12 +1178,26 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } + evalTriggerBy := "" + var eval *structs.Evaluation // Add an evaluation if this is a failed alloc that is eligible for rescheduling - if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { - eval := &structs.Evaluation{ + if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && + alloc.FollowupEvalID == "" && + alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { + + evalTriggerBy = structs.EvalTriggerRetryFailedAlloc + } + + //Add an evaluation if this is a reconnecting allocation. + if alloc.ClientStatus == structs.AllocClientStatusUnknown { + evalTriggerBy = structs.EvalTriggerReconnect + } + + if evalTriggerBy != "" { + eval = &structs.Evaluation{ ID: uuid.Generate(), Namespace: alloc.Namespace, - TriggeredBy: structs.EvalTriggerRetryFailedAlloc, + TriggeredBy: evalTriggerBy, JobID: alloc.JobID, Type: job.Type, Priority: job.Priority, @@ -1443,6 +1457,7 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6 CreateTime: now, ModifyTime: now, } + evals = append(evals, eval) evalIDs = append(evalIDs, eval.ID) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 02fa668fd..83a503009 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9840,6 +9840,10 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) { return time.Time{}, false } + return a.nextRescheduleTime(failTime, reschedulePolicy) +} + +func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *ReschedulePolicy) (time.Time, bool) { nextDelay := a.NextDelay() nextRescheduleTime := failTime.Add(nextDelay) rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil) @@ -9851,6 +9855,18 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) { return nextRescheduleTime, rescheduleEligible } +// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers +// specify a failure time. Useful for things like determining whether to reschedule +// an alloc on a disconnected node. +func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) { + reschedulePolicy := a.ReschedulePolicy() + if reschedulePolicy == nil { + return time.Time{}, false + } + + return a.nextRescheduleTime(failTime, reschedulePolicy) +} + // ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration func (a *Allocation) ShouldClientStop() bool { tg := a.Job.LookupTaskGroup(a.TaskGroup) @@ -9903,10 +9919,8 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time { tg := a.Job.LookupTaskGroup(a.TaskGroup) - // Prefer the duration from the task group. timeout := tg.MaxClientDisconnect - // If not configured, return now if timeout == nil { return now } @@ -10149,6 +10163,76 @@ func (a *Allocation) AllocationDiff() *AllocationDiff { return (*AllocationDiff)(a) } +// Expired determines whether an allocation has exceeded its MaxClientDisonnect +// duration relative to the passed time stamp. +func (a *Allocation) Expired(now time.Time) bool { + if a == nil || a.Job == nil { + return false + } + + // If alloc is not Unknown it cannot be expired. + if a.ClientStatus != AllocClientStatusUnknown { + return false + } + + lastUnknown := a.LastUnknown() + if lastUnknown.IsZero() { + return false + } + + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg == nil { + return false + } + + if tg.MaxClientDisconnect == nil { + return false + } + + expiry := lastUnknown.Add(*tg.MaxClientDisconnect) + return now.UTC().After(expiry) || now.UTC().Equal(expiry) +} + +// LastUnknown returns the timestamp for the last time the allocation +// transitioned into the unknown client status. +func (a *Allocation) LastUnknown() time.Time { + var lastUnknown time.Time + + for _, s := range a.AllocStates { + if s.Field == AllocStateFieldClientStatus && + s.Value == AllocClientStatusUnknown { + if lastUnknown.IsZero() || lastUnknown.Before(s.Time) { + lastUnknown = s.Time + } + } + } + + return lastUnknown.UTC() +} + +// Reconnected determines whether a reconnect event has occurred for any task +// and whether that event occurred within the allowable duration specified by MaxClientDisconnect. +func (a *Allocation) Reconnected() (bool, bool) { + var lastReconnect time.Time + for _, taskState := range a.TaskStates { + for _, taskEvent := range taskState.Events { + if taskEvent.Type != TaskClientReconnected { + continue + } + eventTime := time.Unix(0, taskEvent.Time).UTC() + if lastReconnect.IsZero() || lastReconnect.Before(eventTime) { + lastReconnect = eventTime + } + } + } + + if lastReconnect.IsZero() { + return false, false + } + + return true, a.Expired(lastReconnect) +} + // AllocationDiff is another named type for Allocation (to use the same fields), // which is used to represent the delta for an Allocation. If you need a method // defined on the al @@ -10567,6 +10651,7 @@ const ( EvalTriggerPreemption = "preemption" EvalTriggerScaling = "job-scaling" EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout" + EvalTriggerReconnect = "reconnect" ) const ( diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 5634921ea..ee6afecce 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -5411,6 +5411,287 @@ func TestAllocation_DisconnectTimeout(t *testing.T) { } } +func TestAllocation_Expired(t *testing.T) { + type testCase struct { + name string + maxDisconnect string + ellapsed int + expected bool + nilJob bool + badTaskGroup bool + mixedUTC bool + noReconnectEvent bool + status string + } + + testCases := []testCase{ + { + name: "has-expired", + maxDisconnect: "5s", + ellapsed: 10, + expected: true, + }, + { + name: "has-not-expired", + maxDisconnect: "5s", + ellapsed: 3, + expected: false, + }, + { + name: "are-equal", + maxDisconnect: "5s", + ellapsed: 5, + expected: true, + }, + { + name: "nil-job", + maxDisconnect: "5s", + ellapsed: 10, + expected: false, + nilJob: true, + }, + { + name: "wrong-status", + maxDisconnect: "5s", + ellapsed: 10, + expected: false, + status: AllocClientStatusRunning, + }, + { + name: "bad-task-group", + maxDisconnect: "", + badTaskGroup: true, + ellapsed: 10, + expected: false, + }, + { + name: "no-max-disconnect", + maxDisconnect: "", + ellapsed: 10, + expected: false, + }, + { + name: "mixed-utc-has-expired", + maxDisconnect: "5s", + ellapsed: 10, + mixedUTC: true, + expected: true, + }, + { + name: "mixed-utc-has-not-expired", + maxDisconnect: "5s", + ellapsed: 3, + mixedUTC: true, + expected: false, + }, + { + name: "no-reconnect-event", + maxDisconnect: "5s", + ellapsed: 2, + expected: false, + noReconnectEvent: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + alloc := MockAlloc() + var err error + var maxDisconnect time.Duration + + if tc.maxDisconnect != "" { + maxDisconnect, err = time.ParseDuration(tc.maxDisconnect) + require.NoError(t, err) + alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect + } + + if tc.nilJob { + alloc.Job = nil + } + + if tc.badTaskGroup { + alloc.TaskGroup = "bad" + } + + alloc.ClientStatus = AllocClientStatusUnknown + if tc.status != "" { + alloc.ClientStatus = tc.status + } + + alloc.AllocStates = []*AllocState{{ + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now(), + }} + + require.NoError(t, err) + now := time.Now().UTC() + if tc.mixedUTC { + now = time.Now() + } + + if !tc.noReconnectEvent { + event := NewTaskEvent(TaskClientReconnected) + event.Time = now.UnixNano() + + alloc.TaskStates = map[string]*TaskState{ + "web": { + Events: []*TaskEvent{event}, + }, + } + } + + ellapsedDuration := time.Duration(tc.ellapsed) * time.Second + now = now.Add(ellapsedDuration) + + require.Equal(t, tc.expected, alloc.Expired(now)) + }) + } +} + +func TestAllocation_Reconnected(t *testing.T) { + type testCase struct { + name string + maxDisconnect string + elapsed int + reconnected bool + expired bool + nilJob bool + badTaskGroup bool + mixedTZ bool + noReconnectEvent bool + status string + } + + testCases := []testCase{ + { + name: "has-expired", + maxDisconnect: "5s", + elapsed: 10, + reconnected: true, + expired: true, + }, + { + name: "has-not-expired", + maxDisconnect: "5s", + elapsed: 3, + reconnected: true, + expired: false, + }, + { + name: "are-equal", + maxDisconnect: "5s", + elapsed: 5, + reconnected: true, + expired: true, + }, + { + name: "nil-job", + maxDisconnect: "5s", + elapsed: 10, + reconnected: true, + expired: false, + nilJob: true, + }, + { + name: "bad-task-group", + maxDisconnect: "", + elapsed: 10, + reconnected: true, + expired: false, + badTaskGroup: true, + }, + { + name: "no-max-disconnect", + maxDisconnect: "", + elapsed: 10, + reconnected: true, + expired: false, + }, + { + name: "mixed-utc-has-expired", + maxDisconnect: "5s", + elapsed: 10, + reconnected: true, + expired: true, + mixedTZ: true, + }, + { + name: "mixed-utc-has-not-expired", + maxDisconnect: "5s", + elapsed: 3, + reconnected: true, + expired: false, + mixedTZ: true, + }, + { + name: "no-reconnect-event", + maxDisconnect: "5s", + elapsed: 2, + reconnected: false, + expired: false, + noReconnectEvent: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + alloc := MockAlloc() + var err error + var maxDisconnect time.Duration + + if tc.maxDisconnect != "" { + maxDisconnect, err = time.ParseDuration(tc.maxDisconnect) + require.NoError(t, err) + alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect + } + + if tc.nilJob { + alloc.Job = nil + } + + if tc.badTaskGroup { + alloc.TaskGroup = "bad" + } + + alloc.ClientStatus = AllocClientStatusUnknown + if tc.status != "" { + alloc.ClientStatus = tc.status + } + + alloc.AllocStates = []*AllocState{{ + Field: AllocStateFieldClientStatus, + Value: AllocClientStatusUnknown, + Time: time.Now().UTC(), + }} + + now := time.Now().UTC() + if tc.mixedTZ { + var loc *time.Location + loc, err = time.LoadLocation("America/New_York") + require.NoError(t, err) + now = time.Now().In(loc) + } + + ellapsedDuration := time.Duration(tc.elapsed) * time.Second + now = now.Add(ellapsedDuration) + + if !tc.noReconnectEvent { + event := NewTaskEvent(TaskClientReconnected) + event.Time = now.UnixNano() + + alloc.TaskStates = map[string]*TaskState{ + "web": { + Events: []*TaskEvent{event}, + }, + } + } + + reconnected, expired := alloc.Reconnected() + require.Equal(t, tc.reconnected, reconnected) + require.Equal(t, tc.expired, expired) + }) + } +} + func TestAllocation_Canonicalize_Old(t *testing.T) { ci.Parallel(t) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index fb668dd92..be6bc08ea 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -164,7 +164,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) { structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption, - structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout: + structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout, structs.EvalTriggerReconnect: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index f46d2cf2d..01f2c2a3a 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -344,7 +344,7 @@ func (a *allocReconciler) handleStop(m allocMatrix) { // filterAndStopAll stops all allocations in an allocSet. This is useful in when // stopping an entire job or task group. func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 { - untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients) + untainted, migrate, lost, disconnecting, reconnecting, _ := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) a.markStop(untainted, "", allocNotNeeded) a.markStop(migrate, "", allocNotNeeded) a.markStop(lost, structs.AllocClientStatusLost, allocLost) @@ -406,18 +406,24 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { canaries, all := a.cancelUnneededCanaries(all, desiredChanges) // Determine what set of allocations are on tainted nodes - untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients) + untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) + desiredChanges.Ignore += uint64(len(ignore)) // Determine what set of terminal allocations need to be rescheduled - untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment) + untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment) + + // Determine what set of disconnecting allocations need to be rescheduled + _, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment) + rescheduleNow = rescheduleNow.union(rescheduleDisconnecting) // Find delays for any lost allocs that have stop_after_client_disconnect lostLater := lost.delayByStopAfterClientDisconnect() - lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name) + lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name) // Find delays for any disconnecting allocs that have max_client_disconnect, // create followup evals, and update the ClientStatus to unknown. timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name) + // Merge disconnecting with the stop_after_client_disconnect set into the // lostLaterEvals so that computeStop can add them to the stop set. lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals) @@ -438,9 +444,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) - // Validate and add reconnecting allocs to the plan so that they will be synced by the client on next poll. + // Validate and add reconnecting allocs to the plan so that they will be logged. a.computeReconnecting(reconnecting) - + desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates)) // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. ignore, inplace, destructive := a.computeUpdates(tg, untainted) @@ -571,10 +577,12 @@ func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignor // cancelUnneededCanaries handles the canaries for the group by stopping the // unneeded ones and returning the current set of canaries and the updated total // set of allocs for the group -func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) { +func (a *allocReconciler) cancelUnneededCanaries(original allocSet, desiredChanges *structs.DesiredUpdates) (canaries, all allocSet) { // Stop any canary from an older deployment or from a failed one var stop []string + all = original + // Cancel any non-promoted canaries from the older deployment if a.oldDeployment != nil { for _, dstate := range a.oldDeployment.TaskGroups { @@ -609,7 +617,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s } canaries = all.fromKeys(canaryIDs) - untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients) + untainted, migrate, lost, _, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now) a.markStop(migrate, "", allocMigrating) a.markStop(lost, structs.AllocClientStatusLost, allocLost) @@ -617,7 +625,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s all = all.difference(migrate, lost) } - return canaries, all + return } // computeUnderProvisionedBy returns the number of allocs that still need to be @@ -688,9 +696,10 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, }) } - // Add replacements for lost allocs up to group.Count + // Add replacements for disconnected and lost allocs up to group.Count existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) + // Add replacements for lost for _, alloc := range lost { if existing >= group.Count { // Reached desired count, do not replace remaining lost @@ -731,7 +740,17 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, // The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying. // It returns the number of allocs still needed. func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates, - place []allocPlaceResult, failed, lost allocSet, underProvisionedBy int) int { + place []allocPlaceResult, rescheduleNow, lost allocSet, underProvisionedBy int) int { + + // Disconnecting allocs are not failing, but are included in rescheduleNow. + // Create a new set that only includes the actual failures and compute + // replacements based off that. + failed := make(allocSet) + for id, alloc := range rescheduleNow { + if _, ok := a.result.disconnectUpdates[id]; !ok { + failed[id] = alloc + } + } // If the deployment is place ready, apply all placements and return if deploymentPlaceReady { @@ -739,6 +758,7 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired // This relies on the computePlacements having built this set, which in // turn relies on len(lostLater) == 0. a.result.place = append(a.result.place, place...) + a.markStop(failed, "", allocRescheduled) desiredChanges.Stop += uint64(len(failed)) @@ -760,13 +780,13 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired } // if no failures or there are no pending placements return. - if len(failed) == 0 || len(place) == 0 { + if len(rescheduleNow) == 0 || len(place) == 0 { return underProvisionedBy } // Handle rescheduling of failed allocations even if the deployment is failed. // If the placement is rescheduling, and not part of a failed deployment, add - // to the place set, and add the previous alloc to the stop set. + // to the place set. Add the previous alloc to the stop set unless it is disconnecting. for _, p := range place { prev := p.PreviousAllocation() partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID @@ -775,6 +795,11 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired a.result.place = append(a.result.place, p) desiredChanges.Place++ + _, prevIsDisconnecting := a.result.disconnectUpdates[prev.ID] + if prevIsDisconnecting { + continue + } + a.result.stop = append(a.result.stop, allocStopResult{ alloc: prev, statusDescription: allocRescheduled, @@ -873,9 +898,7 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in } // Final check to see if the deployment is complete is to ensure everything is healthy - var ok bool - var dstate *structs.DeploymentState - if dstate, ok = a.deployment.TaskGroups[groupName]; ok { + if dstate, ok := a.deployment.TaskGroups[groupName]; ok { if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries complete = false @@ -1009,11 +1032,13 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto for _, reconnectingAlloc := range reconnecting { // if the desired status is not run, or if the user-specified desired - // transition is not run, stop the allocation. + // transition is not run, stop the reconnecting allocation. if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || reconnectingAlloc.DesiredTransition.ShouldMigrate() || reconnectingAlloc.DesiredTransition.ShouldReschedule() || - reconnectingAlloc.DesiredTransition.ShouldForceReschedule() { + reconnectingAlloc.DesiredTransition.ShouldForceReschedule() || + reconnectingAlloc.Job.Version < a.job.Version || + reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex { stop[reconnectingAlloc.ID] = reconnectingAlloc a.result.stop = append(a.result.stop, allocStopResult{ @@ -1032,7 +1057,7 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto // Compare reconnecting to untainted and decide which to keep. for _, untaintedAlloc := range untainted { - // If not a match by name go to next + // If not a match by name and previous alloc continue if reconnectingAlloc.Name != untaintedAlloc.Name { continue } @@ -1045,17 +1070,19 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore() if untaintedMaxScoreMeta == nil { - a.logger.Error(fmt.Sprintf("error computing stop: replacement allocation metrics not available for alloc.name %q", untaintedAlloc.Name)) + a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID) continue } if reconnectingMaxScoreMeta == nil { - a.logger.Error(fmt.Sprintf("error computing stop: reconnecting allocation metrics not available for alloc.name %q", reconnectingAlloc.Name)) + a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID) continue } statusDescription := allocNotNeeded - if untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore { + if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version || + untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex || + untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore { stopAlloc = reconnectingAlloc deleteSet = reconnecting } else { @@ -1112,7 +1139,7 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all // the followupEvalID func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { // followupEvals are created in the same way as for delayed lost allocs - allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, all, tgName) + allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName) // Create updates that will be applied to the allocs to mark the FollowupEvalID for allocID, evalID := range allocIDToFollowupEvalID { @@ -1136,7 +1163,11 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { // Create updates that will be appended to the plan. for _, alloc := range reconnecting { // If the user has defined a DesiredTransition don't resume the alloc. - if alloc.DesiredTransition.ShouldMigrate() || alloc.DesiredTransition.ShouldReschedule() || alloc.DesiredTransition.ShouldForceReschedule() { + if alloc.DesiredTransition.ShouldMigrate() || + alloc.DesiredTransition.ShouldReschedule() || + alloc.DesiredTransition.ShouldForceReschedule() || + alloc.Job.Version < a.job.Version || + alloc.Job.CreateIndex < a.job.CreateIndex { continue } @@ -1145,14 +1176,14 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { continue } - a.result.reconnectUpdates[alloc.ID] = alloc.Copy() + a.result.reconnectUpdates[alloc.ID] = alloc } } // handleDelayedLost creates batched followup evaluations with the WaitUntil field set for // lost allocations. followupEvals are appended to a.result as a side effect, we return a // map of alloc IDs to their followupEval IDs. -func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string { +func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName string) map[string]string { if len(rescheduleLater) == 0 { return map[string]string{} } @@ -1222,7 +1253,7 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now) if err != nil || len(timeoutDelays) != len(disconnecting) { - a.logger.Error(fmt.Sprintf("error computing disconnecting timeouts for task_group.name %q: %s", tgName, err)) + a.logger.Error("error computing disconnecting timeouts for task_group", "task_group", tgName, "err", err) return map[string]string{} } @@ -1278,9 +1309,10 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName emitRescheduleInfo(timeoutInfo.alloc, eval) // Create updates that will be applied to the allocs to mark the FollowupEvalID - // and the unknown ClientStatus. + // and the unknown ClientStatus and AllocState. updatedAlloc := timeoutInfo.alloc.Copy() updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown + updatedAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) updatedAlloc.ClientDescription = allocUnknown updatedAlloc.FollowupEvalID = eval.ID a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 8c263f3c3..78ff3d051 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -259,8 +259,8 @@ type resultExpectation struct { attributeUpdates int disconnectUpdates int reconnectUpdates int - stop int desiredTGUpdates map[string]*structs.DesiredUpdates + stop int } func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { @@ -5272,66 +5272,348 @@ func TestReconciler_Node_Disconnect_Updates_Alloc_To_Unknown(t *testing.T) { }) } -// Tests that when a node reconnects unknown allocations for that node are queued -// to resume on the client, and that any replacement allocations that were scheduled -// are queued to stop. -func TestReconciler_Node_Reconnect_ScaleIn_And_Reconnect_Unknown(t *testing.T) { - // TODO: Table tests - // * Some replacements have a higher nodes score - // * Scores are a tie - // * Canarying - - // Create 2 resumable allocs with a node score of 2. - job, allocs := buildResumableAllocations(2, structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun, 2) - - // Adjust the desired count on the job's Task group that got set in the helper. - job.TaskGroups[0].Count = 3 - - // Create 3 placed allocs with a lower nodeScore here. - scaleInAllocs := buildAllocations(job, 3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 1) - - // 2 should scale in, since we are passing nil in tainted nodes. We pass the - // allocUpdateFnIgnore, because computeUpdates in a real setting should return - // ignore == true for the 1 remaining untainted update after computeStop - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, append(allocs, scaleInAllocs...), nil, "", 50, true) - reconciler.now = time.Now().UTC() - results := reconciler.Compute() - - // Verify that 0 follow up evals were created. - evals := results.desiredFollowupEvals[job.TaskGroups[0].Name] - require.Len(t, evals, 0) - - // Validate that the queued reconnectUpdates have the right client status, - // and that they have no FollowUpdEvalID. - for _, reconnectUpdate := range results.reconnectUpdates { - require.Equal(t, structs.AllocClientStatusUnknown, reconnectUpdate.ClientStatus) - require.Empty(t, reconnectUpdate.FollowupEvalID) - require.Equal(t, structs.AllocDesiredStatusRun, reconnectUpdate.DesiredStatus) +// Tests that when a node disconnects/reconnects allocations for that node are +// reconciled according to the business rules. +func TestReconciler_Disconnected_Client(t *testing.T) { + type testCase struct { + name string + allocCount int + disconnectedAllocCount int + jobVersionIncrement uint64 + nodeScoreIncrement float64 + disconnectedAllocStatus string + isBatch bool + nodeStatusDisconnected bool + replace bool + failReplacement bool + shouldStopOnDisconnectedNode bool + maxDisconnect *time.Duration + expected *resultExpectation } - // 2 to stop, 2 reconnect updates, 1 to ignore - assertResults(t, results, &resultExpectation{ - createDeployment: nil, - deploymentUpdates: nil, - place: 0, - stop: 2, - destructive: 0, - inplace: 0, - disconnectUpdates: 0, - reconnectUpdates: 2, - - // TODO: Figure out how this needs to change. - desiredTGUpdates: map[string]*structs.DesiredUpdates{ - job.TaskGroups[0].Name: { - Place: 0, - Stop: 2, - DestructiveUpdate: 0, - Ignore: 1, - InPlaceUpdate: 0, + testCases := []testCase{ + { + name: "reconnect-original-no-replacement", + allocCount: 2, + replace: false, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + shouldStopOnDisconnectedNode: false, + expected: &resultExpectation{ + reconnectUpdates: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 2, + }, + }, }, }, - }) + { + name: "resume-original-and-stop-replacement", + allocCount: 3, + replace: true, + disconnectedAllocCount: 1, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + shouldStopOnDisconnectedNode: false, + expected: &resultExpectation{ + stop: 1, + reconnectUpdates: 1, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 1, + Ignore: 3, + }, + }, + }, + }, + { + name: "stop-original-with-lower-node-score", + allocCount: 4, + replace: true, + disconnectedAllocCount: 1, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + shouldStopOnDisconnectedNode: true, + nodeScoreIncrement: 1, + expected: &resultExpectation{ + stop: 1, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 1, + Ignore: 4, + }, + }, + }, + }, + { + name: "ignore-original-failed-if-replaced", + allocCount: 4, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusFailed, + shouldStopOnDisconnectedNode: true, + expected: &resultExpectation{ + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 4, + }, + }, + }, + }, + { + name: "reschedule-original-failed-if-not-replaced", + allocCount: 4, + replace: false, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusFailed, + shouldStopOnDisconnectedNode: true, + expected: &resultExpectation{ + stop: 2, + place: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 2, + Place: 2, + Stop: 2, + }, + }, + }, + }, + { + name: "ignore-reconnect-completed", + allocCount: 2, + replace: false, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusComplete, + isBatch: true, + expected: &resultExpectation{ + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 2, + }, + }, + }, + }, + { + name: "stop-original-alloc-with-old-job-version", + allocCount: 5, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + shouldStopOnDisconnectedNode: true, + jobVersionIncrement: 1, + expected: &resultExpectation{ + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Ignore: 5, + Stop: 2, + }, + }, + }, + }, + { + name: "stop-original-alloc-with-old-job-version-reconnect-eval", + allocCount: 5, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + shouldStopOnDisconnectedNode: true, + jobVersionIncrement: 1, + expected: &resultExpectation{ + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 2, + Ignore: 5, + }, + }, + }, + }, + { + name: "stop-original-alloc-with-old-job-version-and-failed-replacements", + allocCount: 5, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + failReplacement: true, + shouldStopOnDisconnectedNode: true, + jobVersionIncrement: 1, + expected: &resultExpectation{ + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 2, + Ignore: 5, + }, + }, + }, + }, + { + name: "stop-original-pending-alloc-for-disconnected-node", + allocCount: 2, + replace: true, + disconnectedAllocCount: 1, + disconnectedAllocStatus: structs.AllocClientStatusPending, + shouldStopOnDisconnectedNode: true, + nodeStatusDisconnected: true, + expected: &resultExpectation{ + stop: 1, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 1, + Ignore: 2, + }, + }, + }, + }, + { + name: "stop-expired-allocs", + allocCount: 5, + replace: true, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusUnknown, + shouldStopOnDisconnectedNode: true, + nodeStatusDisconnected: true, + maxDisconnect: helper.TimeToPtr(2 * time.Second), + expected: &resultExpectation{ + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Stop: 2, + Ignore: 5, + }, + }, + }, + }, + { + name: "replace-allocs-on-disconnected-node", + allocCount: 5, + replace: false, + disconnectedAllocCount: 2, + disconnectedAllocStatus: structs.AllocClientStatusRunning, + nodeStatusDisconnected: true, + expected: &resultExpectation{ + place: 2, + disconnectUpdates: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + "web": { + Place: 2, + Ignore: 3, + }, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + require.NotEqual(t, 0, tc.allocCount, "invalid test case: alloc count must be greater than zero") + + testNode := mock.Node() + if tc.nodeStatusDisconnected == true { + testNode.Status = structs.NodeStatusDisconnected + } + + // Create resumable allocs + job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + + if tc.isBatch { + job.Type = structs.JobTypeBatch + } + + // Set alloc state + disconnectedAllocCount := tc.disconnectedAllocCount + for _, alloc := range allocs { + if tc.maxDisconnect != nil { + alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect + } + + if disconnectedAllocCount > 0 { + alloc.ClientStatus = tc.disconnectedAllocStatus + // Set the node id on all the disconnected allocs to the node under test. + alloc.NodeID = testNode.ID + + alloc.AllocStates = []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: time.Now(), + }} + + event := structs.NewTaskEvent(structs.TaskClientReconnected) + event.Time = time.Now().UnixNano() + + alloc.TaskStates = map[string]*structs.TaskState{ + alloc.Job.TaskGroups[0].Tasks[0].Name: { + Events: []*structs.TaskEvent{event}, + }, + } + disconnectedAllocCount-- + } + } + + // Place the allocs on another node. + if tc.replace { + replacements := make([]*structs.Allocation, 0) + for _, alloc := range allocs { + if alloc.NodeID != testNode.ID { + continue + } + replacement := alloc.Copy() + replacement.ID = uuid.Generate() + replacement.NodeID = uuid.Generate() + replacement.ClientStatus = structs.AllocClientStatusRunning + replacement.PreviousAllocation = alloc.ID + replacement.AllocStates = nil + replacement.TaskStates = nil + alloc.NextAllocation = replacement.ID + + if tc.jobVersionIncrement != 0 { + replacement.Job.Version = replacement.Job.Version + tc.jobVersionIncrement + } + if tc.nodeScoreIncrement != 0 { + replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement + } + + replacements = append(replacements, replacement) + + // If we want to test intermediate replacement failures simulate that. + if tc.failReplacement { + replacement.ClientStatus = structs.AllocClientStatusFailed + nextReplacement := replacement.Copy() + nextReplacement.ID = uuid.Generate() + nextReplacement.ClientStatus = structs.AllocClientStatusRunning + nextReplacement.PreviousAllocation = replacement.ID + replacement.NextAllocation = nextReplacement.ID + replacements = append(replacements, nextReplacement) + } + } + + allocs = append(allocs, replacements...) + } + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, tc.isBatch, job.ID, job, + nil, allocs, map[string]*structs.Node{testNode.ID: testNode}, "", 50, true) + + reconciler.now = time.Now() + if tc.maxDisconnect != nil { + reconciler.now = time.Now().Add(*tc.maxDisconnect * 20) + } + + results := reconciler.Compute() + + for _, stopResult := range results.stop { + if tc.shouldStopOnDisconnectedNode { + require.Equal(t, testNode.ID, stopResult.alloc.NodeID) + } else { + require.NotEqual(t, testNode.ID, stopResult.alloc.NodeID) + } + + require.Equal(t, job.Version, stopResult.alloc.Job.Version) + } + + assertResults(t, results, tc.expected) + }) + } } // Tests that the future timeout evals that get created when a node disconnects @@ -5416,39 +5698,3 @@ func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testin }, }) } - -func TestReconciler_Compute_Disconnecting(t *testing.T) { - // Build a set of resumable allocations. Helper will set the timeout to 5 min. - job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) - - // Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little - // more discernible that only the affected alloc(s) get marked unknown. - nodes := buildDisconnectedNodes(allocs, 2) - - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, allocs, nodes, "", 50, true) - reconciler.now = time.Now().UTC() - - tgName := allocs[0].TaskGroup - matrix := newAllocMatrix(job, allocs) - _, _, _, reconnecting, _ := matrix[tgName].filterByTainted(nodes, reconciler.supportsDisconnectedClients) - require.NotNil(t, reconnecting) - require.Len(t, reconnecting, 2) - - result := reconciler.createTimeoutLaterEvals(reconnecting, tgName) - require.NotNil(t, result) - require.Len(t, reconciler.result.desiredFollowupEvals, 1) - - evals := reconciler.result.desiredFollowupEvals[tgName] - - for _, eval := range evals { - found := false - for _, evalID := range result { - found = eval.ID == evalID - if found { - break - } - } - require.True(t, found) - } -} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index e94daf6c0..946be14a5 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -209,21 +209,23 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet { } // filterByTainted takes a set of tainted nodes and filters the allocation set -// into 5 groups: +// into the following groups: // 1. Those that exist on untainted nodes // 2. Those exist on nodes that are draining -// 3. Those that exist on lost nodes +// 3. Those that exist on lost nodes or have expired // 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown -// 5. Those that have had their ClientState set to unknown, but their node has reconnected. -func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, supportsDisconnectedClients bool) (untainted, migrate, lost, disconnecting, reconnecting allocSet) { +// 5. Those that are on a node that has reconnected. +// 6. Those that are in a state that results in a noop. +func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, supportsDisconnectedClients bool, now time.Time) (untainted, migrate, lost, disconnecting, reconnecting, ignore allocSet) { untainted = make(map[string]*structs.Allocation) migrate = make(map[string]*structs.Allocation) lost = make(map[string]*structs.Allocation) disconnecting = make(map[string]*structs.Allocation) reconnecting = make(map[string]*structs.Allocation) + ignore = make(map[string]*structs.Allocation) for _, alloc := range a { - // Terminal allocs are always untainted as they should never be migrated + // Terminal allocs are always untainted as they should never be migrated. if alloc.TerminalStatus() { untainted[alloc.ID] = alloc continue @@ -235,10 +237,27 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support continue } + // Expired unknown allocs are lost + if supportsDisconnectedClients && alloc.Expired(now) { + lost[alloc.ID] = alloc + continue + } + + // Ignore unknown allocs + if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown { + ignore[alloc.ID] = alloc + continue + } + taintedNode, ok := taintedNodes[alloc.NodeID] if !ok { // Filter allocs on a node that is now re-connected to be resumed. - if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown { + reconnected, expired := alloc.Reconnected() + if reconnected { + if expired { + lost[alloc.ID] = alloc + continue + } reconnecting[alloc.ID] = alloc continue } @@ -263,8 +282,13 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support continue } case structs.NodeStatusReady: - // Filter unknown allocs on a node that is connected to reconnect. - if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown { + // Filter reconnecting allocs with replacements on a node that is now connected. + reconnected, expired := alloc.Reconnected() + if reconnected { + if expired { + lost[alloc.ID] = alloc + continue + } reconnecting[alloc.ID] = alloc continue } @@ -280,7 +304,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support // All other allocs are untainted untainted[alloc.ID] = alloc - } return @@ -290,23 +313,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support // untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled // at a future time are also returned so that we can create follow up evaluations for them. Allocs are // skipped or considered untainted according to logic defined in shouldFilter method. -func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { +func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) { untainted = make(map[string]*structs.Allocation) rescheduleNow = make(map[string]*structs.Allocation) + // When filtering disconnected sets, the untainted set is never populated. + // It has no purpose in that context. for _, alloc := range a { var eligibleNow, eligibleLater bool var rescheduleTime time.Time - // Ignore failing allocs that have already been rescheduled - // only failed allocs should be rescheduled, but protect against a bug allowing rescheduling - // running allocs + // Ignore failing allocs that have already been rescheduled. + // Only failed or disconnecting allocs should be rescheduled. + // Protects against a bug allowing rescheduling running allocs. if alloc.NextAllocation != "" && alloc.TerminalStatus() { continue } isUntainted, ignore := shouldFilter(alloc, isBatch) - if isUntainted { + if isUntainted && !isDisconnecting { untainted[alloc.ID] = alloc } if isUntainted || ignore { @@ -314,9 +339,11 @@ func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID str } // Only failed allocs with desired state run get to this point - // If the failed alloc is not eligible for rescheduling now we add it to the untainted set - eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment) - if !eligibleNow { + // If the failed alloc is not eligible for rescheduling now we + // add it to the untainted set. Disconnecting delay evals are + // handled by allocReconciler.createTimeoutLaterEvals + eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting) + if !isDisconnecting && !eligibleNow { untainted[alloc.ID] = alloc if eligibleLater { rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime}) @@ -378,7 +405,7 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo // updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation // should be rescheduled now, later or left in the untainted set -func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { +func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment, isDisconnecting bool) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) { // If the allocation is part of an ongoing active deployment, we only allow it to reschedule // if it has been marked eligible if d != nil && alloc.DeploymentID == d.ID && d.Active() && !alloc.DesiredTransition.ShouldReschedule() { @@ -391,7 +418,13 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri } // Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time - rescheduleTime, eligible := alloc.NextRescheduleTime() + var eligible bool + if isDisconnecting { + rescheduleTime, eligible = alloc.NextRescheduleTimeByFailTime(now) + } else { + rescheduleTime, eligible = alloc.NextRescheduleTime() + } + if eligible && (alloc.FollowupEvalID == evalID || rescheduleTime.Sub(now) <= rescheduleWindowSize) { rescheduleNow = true return diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 88bc689e7..ae4306548 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -8,6 +8,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/stretchr/testify/require" + "time" ) // Test that we properly create the bitmap even when the alloc set includes an @@ -59,170 +60,591 @@ func TestAllocSet_filterByTainted(t *testing.T) { }, } - batchJob := &structs.Job{ - Type: structs.JobTypeBatch, - } + testJob := mock.Job() + testJob.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Second) + now := time.Now() - allocs := allocSet{ - // Non-terminal alloc with migrate=true should migrate on a draining node - "migrating1": { - ID: "migrating1", - ClientStatus: structs.AllocClientStatusRunning, - DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, - Job: batchJob, - NodeID: "draining", - }, - // Non-terminal alloc with migrate=true should migrate on an unknown node - "migrating2": { - ID: "migrating2", - ClientStatus: structs.AllocClientStatusRunning, - DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, - Job: batchJob, - NodeID: "nil", - }, - "untainted1": { - ID: "untainted1", - ClientStatus: structs.AllocClientStatusRunning, - Job: batchJob, - NodeID: "normal", - }, - // Terminal allocs are always untainted - "untainted2": { - ID: "untainted2", - ClientStatus: structs.AllocClientStatusComplete, - Job: batchJob, - NodeID: "normal", - }, - // Terminal allocs are always untainted, even on draining nodes - "untainted3": { - ID: "untainted3", - ClientStatus: structs.AllocClientStatusComplete, - Job: batchJob, - NodeID: "draining", - }, - // Terminal allocs are always untainted, even on lost nodes - "untainted4": { - ID: "untainted4", - ClientStatus: structs.AllocClientStatusComplete, - Job: batchJob, - NodeID: "lost", - }, - // Non-terminal allocs on lost nodes are lost - "lost1": { - ID: "lost1", - ClientStatus: structs.AllocClientStatusPending, - Job: batchJob, - NodeID: "lost", - }, - // Non-terminal allocs on lost nodes are lost - "lost2": { - ID: "lost2", - ClientStatus: structs.AllocClientStatusRunning, - Job: batchJob, - NodeID: "lost", - }, - // Non-terminal allocs on disconnected nodes are disconnecting - "disconnecting1": { - ID: "disconnecting1", - ClientStatus: structs.AllocClientStatusRunning, - Job: batchJob, - NodeID: "disconnected", - }, - // Non-terminal allocs on disconnected nodes are disconnecting - "disconnecting2": { - ID: "disconnecting2", - ClientStatus: structs.AllocClientStatusRunning, - Job: batchJob, - NodeID: "disconnected", - }, - // Non-terminal allocs on disconnected nodes are disconnecting - "disconnecting3": { - ID: "disconnecting3", - ClientStatus: structs.AllocClientStatusRunning, - Job: batchJob, - NodeID: "disconnected", - }, - // Complete allocs on disconnected nodes don't get restarted - "disconnecting4": { - ID: "disconnecting4", - ClientStatus: structs.AllocClientStatusComplete, - Job: batchJob, - NodeID: "disconnected", - }, - // Failed allocs on disconnected nodes don't get restarted - "disconnecting5": { - ID: "disconnecting5", - ClientStatus: structs.AllocClientStatusFailed, - Job: batchJob, - NodeID: "disconnected", - }, - // Lost allocs on disconnected nodes don't get restarted - "disconnecting6": { - ID: "disconnecting6", - ClientStatus: structs.AllocClientStatusLost, - Job: batchJob, - NodeID: "disconnected", - }, - // Unknown allocs on re-connected nodes are reconnecting - "reconnecting1": { - ID: "reconnecting1", - ClientStatus: structs.AllocClientStatusUnknown, - Job: batchJob, - NodeID: "normal", - }, - // Unknown allocs on re-connected nodes are reconnecting - "reconnecting2": { - ID: "reconnecting2", - ClientStatus: structs.AllocClientStatusUnknown, - Job: batchJob, - NodeID: "normal", - }, - // Complete allocs on disconnected nodes don't get restarted - "reconnecting3": { - ID: "reconnecting3", - ClientStatus: structs.AllocClientStatusComplete, - Job: batchJob, - NodeID: "normal", - }, - // Failed allocs on disconnected nodes don't get restarted - "reconnecting4": { - ID: "reconnecting4", - ClientStatus: structs.AllocClientStatusFailed, - Job: batchJob, - NodeID: "normal", - }, - // Lost allocs on disconnected nodes don't get restarted - "reconnecting5": { - ID: "reconnecting5", - ClientStatus: structs.AllocClientStatusLost, - Job: batchJob, - NodeID: "normal", + unknownAllocState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now, + }} + + expiredAllocState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now.Add(-60 * time.Second), + }} + + reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected) + reconnectedEvent.Time = time.Now().UnixNano() + reconnectTaskState := map[string]*structs.TaskState{ + testJob.TaskGroups[0].Tasks[0].Name: { + Events: []*structs.TaskEvent{reconnectedEvent}, }, } - untainted, migrate, lost, disconnecting, reconnecting := allocs.filterByTainted(nodes, true) - require.Len(t, untainted, 10) - require.Contains(t, untainted, "untainted1") - require.Contains(t, untainted, "untainted2") - require.Contains(t, untainted, "untainted3") - require.Contains(t, untainted, "untainted4") - require.Contains(t, untainted, "disconnecting4") - require.Contains(t, untainted, "disconnecting5") - require.Contains(t, untainted, "disconnecting6") - require.Contains(t, untainted, "reconnecting3") - require.Contains(t, untainted, "reconnecting4") - require.Contains(t, untainted, "reconnecting5") - require.Len(t, migrate, 2) - require.Contains(t, migrate, "migrating1") - require.Contains(t, migrate, "migrating2") - require.Len(t, lost, 2) - require.Contains(t, lost, "lost1") - require.Contains(t, lost, "lost2") - require.Len(t, disconnecting, 3) - require.Contains(t, disconnecting, "disconnecting1") - require.Contains(t, disconnecting, "disconnecting2") - require.Contains(t, disconnecting, "disconnecting3") - require.Len(t, reconnecting, 2) - require.Contains(t, reconnecting, "reconnecting1") - require.Contains(t, reconnecting, "reconnecting2") + type testCase struct { + name string + all allocSet + taintedNodes map[string]*structs.Node + supportsDisconnectedClients bool + skipNilNodeTest bool + now time.Time + // expected results + untainted allocSet + migrate allocSet + lost allocSet + disconnecting allocSet + reconnecting allocSet + ignore allocSet + } + + testCases := []testCase{ + // These two cases test that we maintain parity with pre-disconnected-clients behavior. + { + name: "lost-client", + supportsDisconnectedClients: false, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + "untainted1": { + ID: "untainted1", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + }, + // Terminal allocs are always untainted + "untainted2": { + ID: "untainted2", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + }, + // Terminal allocs are always untainted, even on draining nodes + "untainted3": { + ID: "untainted3", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "draining", + }, + // Terminal allocs are always untainted, even on lost nodes + "untainted4": { + ID: "untainted4", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "lost", + }, + // Non-terminal alloc with migrate=true should migrate on a draining node + "migrating1": { + ID: "migrating1", + ClientStatus: structs.AllocClientStatusRunning, + DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, + Job: testJob, + NodeID: "draining", + }, + // Non-terminal alloc with migrate=true should migrate on an unknown node + "migrating2": { + ID: "migrating2", + ClientStatus: structs.AllocClientStatusRunning, + DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, + Job: testJob, + NodeID: "nil", + }, + }, + untainted: allocSet{ + "untainted1": { + ID: "untainted1", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + }, + // Terminal allocs are always untainted + "untainted2": { + ID: "untainted2", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + }, + // Terminal allocs are always untainted, even on draining nodes + "untainted3": { + ID: "untainted3", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "draining", + }, + // Terminal allocs are always untainted, even on lost nodes + "untainted4": { + ID: "untainted4", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "lost", + }, + }, + migrate: allocSet{ + // Non-terminal alloc with migrate=true should migrate on a draining node + "migrating1": { + ID: "migrating1", + ClientStatus: structs.AllocClientStatusRunning, + DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, + Job: testJob, + NodeID: "draining", + }, + // Non-terminal alloc with migrate=true should migrate on an unknown node + "migrating2": { + ID: "migrating2", + ClientStatus: structs.AllocClientStatusRunning, + DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)}, + Job: testJob, + NodeID: "nil", + }, + }, + disconnecting: allocSet{}, + reconnecting: allocSet{}, + ignore: allocSet{}, + lost: allocSet{}, + }, + { + name: "lost-client-only-tainted-nodes", + supportsDisconnectedClients: false, + now: time.Now(), + taintedNodes: nodes, + // The logic associated with this test case can only trigger if there + // is a tainted node. Therefore, testing with a nil node set produces + // false failures, so don't perform that test if in this case. + skipNilNodeTest: true, + all: allocSet{ + // Non-terminal allocs on lost nodes are lost + "lost1": { + ID: "lost1", + ClientStatus: structs.AllocClientStatusPending, + Job: testJob, + NodeID: "lost", + }, + // Non-terminal allocs on lost nodes are lost + "lost2": { + ID: "lost2", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "lost", + }, + }, + untainted: allocSet{}, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{}, + ignore: allocSet{}, + lost: allocSet{ + // Non-terminal allocs on lost nodes are lost + "lost1": { + ID: "lost1", + ClientStatus: structs.AllocClientStatusPending, + Job: testJob, + NodeID: "lost", + }, + // Non-terminal allocs on lost nodes are lost + "lost2": { + ID: "lost2", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "lost", + }, + }, + }, + + // Everything below this line tests the disconnected client mode. + { + name: "disco-client-untainted-reconnect-failed-and-replaced", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + "running-replacement": { + ID: "running-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + PreviousAllocation: "failed-original", + }, + // Failed and replaced allocs on reconnected nodes are untainted + "failed-original": { + ID: "failed-original", + Name: "web", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + untainted: allocSet{ + "running-replacement": { + ID: "running-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + PreviousAllocation: "failed-original", + }, + "failed-original": { + ID: "failed-original", + Name: "web", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{}, + ignore: allocSet{}, + lost: allocSet{}, + }, + { + name: "disco-client-reconnecting-running-no-replacement", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + // Running allocs on reconnected nodes with no replacement are reconnecting. + // Node.UpdateStatus has already handled syncing client state so this + // should be a noop. + "reconnecting-running-no-replacement": { + ID: "reconnecting-running-no-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + untainted: allocSet{}, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{ + "reconnecting-running-no-replacement": { + ID: "reconnecting-running-no-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + ignore: allocSet{}, + lost: allocSet{}, + }, + { + name: "disco-client-terminal", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + // Allocs on reconnected nodes that are complete are untainted + "untainted-reconnect-complete": { + ID: "untainted-reconnect-complete", + Name: "untainted-reconnect-complete", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + // Failed allocs on reconnected nodes that are complete are untainted + "untainted-reconnect-failed": { + ID: "untainted-reconnect-failed", + Name: "untainted-reconnect-failed", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + // Lost allocs on reconnected nodes don't get restarted + "untainted-reconnect-lost": { + ID: "untainted-reconnect-lost", + Name: "untainted-reconnect-lost", + ClientStatus: structs.AllocClientStatusLost, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + // Replacement allocs that are complete are untainted + "untainted-reconnect-complete-replacement": { + ID: "untainted-reconnect-complete-replacement", + Name: "untainted-reconnect-complete", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-complete", + }, + // Replacement allocs on reconnected nodes that are failed are untainted + "untainted-reconnect-failed-replacement": { + ID: "untainted-reconnect-failed-replacement", + Name: "untainted-reconnect-failed", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-failed", + }, + // Lost replacement allocs on reconnected nodes don't get restarted + "untainted-reconnect-lost-replacement": { + ID: "untainted-reconnect-lost-replacement", + Name: "untainted-reconnect-lost", + ClientStatus: structs.AllocClientStatusLost, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-lost", + }, + }, + untainted: allocSet{ + "untainted-reconnect-complete": { + ID: "untainted-reconnect-complete", + Name: "untainted-reconnect-complete", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + "untainted-reconnect-failed": { + ID: "untainted-reconnect-failed", + Name: "untainted-reconnect-failed", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + "untainted-reconnect-lost": { + ID: "untainted-reconnect-lost", + Name: "untainted-reconnect-lost", + ClientStatus: structs.AllocClientStatusLost, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + "untainted-reconnect-complete-replacement": { + ID: "untainted-reconnect-complete-replacement", + Name: "untainted-reconnect-complete", + ClientStatus: structs.AllocClientStatusComplete, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-complete", + }, + "untainted-reconnect-failed-replacement": { + ID: "untainted-reconnect-failed-replacement", + Name: "untainted-reconnect-failed", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-failed", + }, + "untainted-reconnect-lost-replacement": { + ID: "untainted-reconnect-lost-replacement", + Name: "untainted-reconnect-lost", + ClientStatus: structs.AllocClientStatusLost, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + PreviousAllocation: "untainted-reconnect-lost", + }, + }, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{}, + ignore: allocSet{}, + lost: allocSet{}, + }, + { + name: "disco-client-disconnect", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: true, + all: allocSet{ + // Non-terminal allocs on disconnected nodes are disconnecting + "disconnect-running": { + ID: "disconnect-running", + Name: "disconnect-running", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + }, + // Unknown allocs on disconnected nodes are ignored + "ignore-unknown": { + ID: "ignore-unknown", + Name: "ignore-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: unknownAllocState, + }, + // Unknown allocs on disconnected nodes are lost when expired + "lost-unknown": { + ID: "lost-unknown", + Name: "lost-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: expiredAllocState, + }, + }, + untainted: allocSet{}, + migrate: allocSet{}, + disconnecting: allocSet{ + "disconnect-running": { + ID: "disconnect-running", + Name: "disconnect-running", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + }, + }, + reconnecting: allocSet{}, + ignore: allocSet{ + // Unknown allocs on disconnected nodes are ignored + "ignore-unknown": { + ID: "ignore-unknown", + Name: "ignore-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: unknownAllocState, + }, + }, + lost: allocSet{ + "lost-unknown": { + ID: "lost-unknown", + Name: "lost-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: expiredAllocState, + }, + }, + }, + { + name: "disco-client-running-reconnecting-and-replacement-untainted", + supportsDisconnectedClients: true, + now: time.Now(), + taintedNodes: nodes, + skipNilNodeTest: false, + all: allocSet{ + "running-replacement": { + ID: "running-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + PreviousAllocation: "running-original", + }, + // Running and replaced allocs on reconnected nodes are reconnecting + "running-original": { + ID: "running-original", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + untainted: allocSet{ + "running-replacement": { + ID: "running-replacement", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + PreviousAllocation: "running-original", + }, + }, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{ + "running-original": { + ID: "running-original", + Name: "web", + ClientStatus: structs.AllocClientStatusRunning, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + ignore: allocSet{}, + lost: allocSet{}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // With tainted nodes + untainted, migrate, lost, disconnecting, reconnecting, ignore := tc.all.filterByTainted(tc.taintedNodes, tc.supportsDisconnectedClients, tc.now) + require.Equal(t, tc.untainted, untainted, "with-nodes", "untainted") + require.Equal(t, tc.migrate, migrate, "with-nodes", "migrate") + require.Equal(t, tc.lost, lost, "with-nodes", "lost") + require.Equal(t, tc.disconnecting, disconnecting, "with-nodes", "disconnecting") + require.Equal(t, tc.reconnecting, reconnecting, "with-nodes", "reconnecting") + require.Equal(t, tc.ignore, ignore, "with-nodes", "ignore") + + if tc.skipNilNodeTest { + return + } + + // Now again with nodes nil + untainted, migrate, lost, disconnecting, reconnecting, ignore = tc.all.filterByTainted(nil, tc.supportsDisconnectedClients, tc.now) + require.Equal(t, tc.untainted, untainted, "nodes-nil", "untainted") + require.Equal(t, tc.migrate, migrate, "nodes-nil", "migrate") + require.Equal(t, tc.lost, lost, "nodes-nil", "lost") + require.Equal(t, tc.disconnecting, disconnecting, "nodes-nil", "disconnecting") + require.Equal(t, tc.reconnecting, reconnecting, "nodes-nil", "reconnecting") + require.Equal(t, tc.ignore, ignore, "nodes-nil", "ignore") + }) + } }