reconciler: 2 phase reconnects and tests (#12333)

* structs: Add alloc.Expired & alloc.Reconnected functions. Add Reconnect eval trigger by.

* node_endpoint: Emit new eval for reconnecting unknown allocs.

* filterByTainted: handle 2 phase commit filtering rules.

* reconciler: Append AllocState on disconnect. Logic updates from testing and 2 phase reconnects.

* allocs: Set reconnect timestamp. Destroy if not DesiredStatusRun. Watch for unknown status.
This commit is contained in:
Derek Strickland
2022-03-31 11:32:18 -04:00
committed by DerekStrickland
parent 9a82b63686
commit 8ac3e642e6
11 changed files with 1434 additions and 313 deletions

View File

@@ -1256,6 +1256,7 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
for _, tr := range ar.tasks {
tr.AppendEvent(event)
}
@@ -1273,6 +1274,12 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) {
// Build the client allocation
alloc := ar.clientAlloc(states)
// Don't destroy until after we've appended the reconnect event.
if update.DesiredStatus != structs.AllocDesiredStatusRun {
ar.Shutdown()
return
}
// Update the client state store.
err = ar.stateUpdater.PutAllocation(alloc)
if err != nil {

View File

@@ -404,8 +404,7 @@ func (p *remotePrevAlloc) Wait(ctx context.Context) error {
p.logger.Debug("blocking alloc was GC'd")
return nil
}
if resp.Alloc.Terminated() {
// Terminated!
if resp.Alloc.Terminated() || resp.Alloc.ClientStatus == structs.AllocClientStatusUnknown {
p.nodeID = resp.Alloc.NodeID
return nil
}

View File

@@ -1765,6 +1765,7 @@ func TestClient_ReconnectAllocs(t *testing.T) {
require.Equal(t, structs.AllocClientStatusRunning, unknownAlloc.ClientStatus)
require.NoError(t, err)
unknownAlloc.ClientStatus = structs.AllocClientStatusUnknown
unknownAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
err = state.UpsertAllocs(structs.MsgTypeTestSetup, runningAlloc.AllocModifyIndex+1, []*structs.Allocation{unknownAlloc})
require.NoError(t, err)

View File

@@ -1153,12 +1153,12 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
for _, allocToUpdate := range args.Alloc {
allocToUpdate.ModifyTime = now.UTC().UnixNano()
if !allocToUpdate.TerminalStatus() {
alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
if alloc == nil {
continue
}
alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID)
if alloc == nil {
if !allocToUpdate.TerminalStatus() && alloc.ClientStatus != structs.AllocClientStatusUnknown {
continue
}
@@ -1178,12 +1178,26 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene
continue
}
evalTriggerBy := ""
var eval *structs.Evaluation
// Add an evaluation if this is a failed alloc that is eligible for rescheduling
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && alloc.FollowupEvalID == "" && alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
eval := &structs.Evaluation{
if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed &&
alloc.FollowupEvalID == "" &&
alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) {
evalTriggerBy = structs.EvalTriggerRetryFailedAlloc
}
//Add an evaluation if this is a reconnecting allocation.
if alloc.ClientStatus == structs.AllocClientStatusUnknown {
evalTriggerBy = structs.EvalTriggerReconnect
}
if evalTriggerBy != "" {
eval = &structs.Evaluation{
ID: uuid.Generate(),
Namespace: alloc.Namespace,
TriggeredBy: structs.EvalTriggerRetryFailedAlloc,
TriggeredBy: evalTriggerBy,
JobID: alloc.JobID,
Type: job.Type,
Priority: job.Priority,
@@ -1443,6 +1457,7 @@ func (n *Node) createNodeEvals(nodeID string, nodeIndex uint64) ([]string, uint6
CreateTime: now,
ModifyTime: now,
}
evals = append(evals, eval)
evalIDs = append(evalIDs, eval.ID)
}

View File

@@ -9840,6 +9840,10 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return time.Time{}, false
}
return a.nextRescheduleTime(failTime, reschedulePolicy)
}
func (a *Allocation) nextRescheduleTime(failTime time.Time, reschedulePolicy *ReschedulePolicy) (time.Time, bool) {
nextDelay := a.NextDelay()
nextRescheduleTime := failTime.Add(nextDelay)
rescheduleEligible := reschedulePolicy.Unlimited || (reschedulePolicy.Attempts > 0 && a.RescheduleTracker == nil)
@@ -9851,6 +9855,18 @@ func (a *Allocation) NextRescheduleTime() (time.Time, bool) {
return nextRescheduleTime, rescheduleEligible
}
// NextRescheduleTimeByFailTime works like NextRescheduleTime but allows callers
// specify a failure time. Useful for things like determining whether to reschedule
// an alloc on a disconnected node.
func (a *Allocation) NextRescheduleTimeByFailTime(failTime time.Time) (time.Time, bool) {
reschedulePolicy := a.ReschedulePolicy()
if reschedulePolicy == nil {
return time.Time{}, false
}
return a.nextRescheduleTime(failTime, reschedulePolicy)
}
// ShouldClientStop tests an alloc for StopAfterClientDisconnect configuration
func (a *Allocation) ShouldClientStop() bool {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
@@ -9903,10 +9919,8 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
// Prefer the duration from the task group.
timeout := tg.MaxClientDisconnect
// If not configured, return now
if timeout == nil {
return now
}
@@ -10149,6 +10163,76 @@ func (a *Allocation) AllocationDiff() *AllocationDiff {
return (*AllocationDiff)(a)
}
// Expired determines whether an allocation has exceeded its MaxClientDisonnect
// duration relative to the passed time stamp.
func (a *Allocation) Expired(now time.Time) bool {
if a == nil || a.Job == nil {
return false
}
// If alloc is not Unknown it cannot be expired.
if a.ClientStatus != AllocClientStatusUnknown {
return false
}
lastUnknown := a.LastUnknown()
if lastUnknown.IsZero() {
return false
}
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg == nil {
return false
}
if tg.MaxClientDisconnect == nil {
return false
}
expiry := lastUnknown.Add(*tg.MaxClientDisconnect)
return now.UTC().After(expiry) || now.UTC().Equal(expiry)
}
// LastUnknown returns the timestamp for the last time the allocation
// transitioned into the unknown client status.
func (a *Allocation) LastUnknown() time.Time {
var lastUnknown time.Time
for _, s := range a.AllocStates {
if s.Field == AllocStateFieldClientStatus &&
s.Value == AllocClientStatusUnknown {
if lastUnknown.IsZero() || lastUnknown.Before(s.Time) {
lastUnknown = s.Time
}
}
}
return lastUnknown.UTC()
}
// Reconnected determines whether a reconnect event has occurred for any task
// and whether that event occurred within the allowable duration specified by MaxClientDisconnect.
func (a *Allocation) Reconnected() (bool, bool) {
var lastReconnect time.Time
for _, taskState := range a.TaskStates {
for _, taskEvent := range taskState.Events {
if taskEvent.Type != TaskClientReconnected {
continue
}
eventTime := time.Unix(0, taskEvent.Time).UTC()
if lastReconnect.IsZero() || lastReconnect.Before(eventTime) {
lastReconnect = eventTime
}
}
}
if lastReconnect.IsZero() {
return false, false
}
return true, a.Expired(lastReconnect)
}
// AllocationDiff is another named type for Allocation (to use the same fields),
// which is used to represent the delta for an Allocation. If you need a method
// defined on the al
@@ -10567,6 +10651,7 @@ const (
EvalTriggerPreemption = "preemption"
EvalTriggerScaling = "job-scaling"
EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout"
EvalTriggerReconnect = "reconnect"
)
const (

View File

@@ -5411,6 +5411,287 @@ func TestAllocation_DisconnectTimeout(t *testing.T) {
}
}
func TestAllocation_Expired(t *testing.T) {
type testCase struct {
name string
maxDisconnect string
ellapsed int
expected bool
nilJob bool
badTaskGroup bool
mixedUTC bool
noReconnectEvent bool
status string
}
testCases := []testCase{
{
name: "has-expired",
maxDisconnect: "5s",
ellapsed: 10,
expected: true,
},
{
name: "has-not-expired",
maxDisconnect: "5s",
ellapsed: 3,
expected: false,
},
{
name: "are-equal",
maxDisconnect: "5s",
ellapsed: 5,
expected: true,
},
{
name: "nil-job",
maxDisconnect: "5s",
ellapsed: 10,
expected: false,
nilJob: true,
},
{
name: "wrong-status",
maxDisconnect: "5s",
ellapsed: 10,
expected: false,
status: AllocClientStatusRunning,
},
{
name: "bad-task-group",
maxDisconnect: "",
badTaskGroup: true,
ellapsed: 10,
expected: false,
},
{
name: "no-max-disconnect",
maxDisconnect: "",
ellapsed: 10,
expected: false,
},
{
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
ellapsed: 10,
mixedUTC: true,
expected: true,
},
{
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
ellapsed: 3,
mixedUTC: true,
expected: false,
},
{
name: "no-reconnect-event",
maxDisconnect: "5s",
ellapsed: 2,
expected: false,
noReconnectEvent: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := MockAlloc()
var err error
var maxDisconnect time.Duration
if tc.maxDisconnect != "" {
maxDisconnect, err = time.ParseDuration(tc.maxDisconnect)
require.NoError(t, err)
alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect
}
if tc.nilJob {
alloc.Job = nil
}
if tc.badTaskGroup {
alloc.TaskGroup = "bad"
}
alloc.ClientStatus = AllocClientStatusUnknown
if tc.status != "" {
alloc.ClientStatus = tc.status
}
alloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: AllocClientStatusUnknown,
Time: time.Now(),
}}
require.NoError(t, err)
now := time.Now().UTC()
if tc.mixedUTC {
now = time.Now()
}
if !tc.noReconnectEvent {
event := NewTaskEvent(TaskClientReconnected)
event.Time = now.UnixNano()
alloc.TaskStates = map[string]*TaskState{
"web": {
Events: []*TaskEvent{event},
},
}
}
ellapsedDuration := time.Duration(tc.ellapsed) * time.Second
now = now.Add(ellapsedDuration)
require.Equal(t, tc.expected, alloc.Expired(now))
})
}
}
func TestAllocation_Reconnected(t *testing.T) {
type testCase struct {
name string
maxDisconnect string
elapsed int
reconnected bool
expired bool
nilJob bool
badTaskGroup bool
mixedTZ bool
noReconnectEvent bool
status string
}
testCases := []testCase{
{
name: "has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
},
{
name: "has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
},
{
name: "are-equal",
maxDisconnect: "5s",
elapsed: 5,
reconnected: true,
expired: true,
},
{
name: "nil-job",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: false,
nilJob: true,
},
{
name: "bad-task-group",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
badTaskGroup: true,
},
{
name: "no-max-disconnect",
maxDisconnect: "",
elapsed: 10,
reconnected: true,
expired: false,
},
{
name: "mixed-utc-has-expired",
maxDisconnect: "5s",
elapsed: 10,
reconnected: true,
expired: true,
mixedTZ: true,
},
{
name: "mixed-utc-has-not-expired",
maxDisconnect: "5s",
elapsed: 3,
reconnected: true,
expired: false,
mixedTZ: true,
},
{
name: "no-reconnect-event",
maxDisconnect: "5s",
elapsed: 2,
reconnected: false,
expired: false,
noReconnectEvent: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
alloc := MockAlloc()
var err error
var maxDisconnect time.Duration
if tc.maxDisconnect != "" {
maxDisconnect, err = time.ParseDuration(tc.maxDisconnect)
require.NoError(t, err)
alloc.Job.TaskGroups[0].MaxClientDisconnect = &maxDisconnect
}
if tc.nilJob {
alloc.Job = nil
}
if tc.badTaskGroup {
alloc.TaskGroup = "bad"
}
alloc.ClientStatus = AllocClientStatusUnknown
if tc.status != "" {
alloc.ClientStatus = tc.status
}
alloc.AllocStates = []*AllocState{{
Field: AllocStateFieldClientStatus,
Value: AllocClientStatusUnknown,
Time: time.Now().UTC(),
}}
now := time.Now().UTC()
if tc.mixedTZ {
var loc *time.Location
loc, err = time.LoadLocation("America/New_York")
require.NoError(t, err)
now = time.Now().In(loc)
}
ellapsedDuration := time.Duration(tc.elapsed) * time.Second
now = now.Add(ellapsedDuration)
if !tc.noReconnectEvent {
event := NewTaskEvent(TaskClientReconnected)
event.Time = now.UnixNano()
alloc.TaskStates = map[string]*TaskState{
"web": {
Events: []*TaskEvent{event},
},
}
}
reconnected, expired := alloc.Reconnected()
require.Equal(t, tc.reconnected, reconnected)
require.Equal(t, tc.expired, expired)
})
}
}
func TestAllocation_Canonicalize_Old(t *testing.T) {
ci.Parallel(t)

View File

@@ -164,7 +164,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) {
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc,
structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption,
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout:
structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout, structs.EvalTriggerReconnect:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)

View File

@@ -344,7 +344,7 @@ func (a *allocReconciler) handleStop(m allocMatrix) {
// filterAndStopAll stops all allocations in an allocSet. This is useful in when
// stopping an entire job or task group.
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)
untainted, migrate, lost, disconnecting, reconnecting, _ := set.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@@ -406,18 +406,24 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)
untainted, migrate, lost, disconnecting, reconnecting, ignore := all.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
desiredChanges.Ignore += uint64(len(ignore))
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment)
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, false, a.now, a.evalID, a.deployment)
// Determine what set of disconnecting allocations need to be rescheduled
_, rescheduleDisconnecting, _ := disconnecting.filterByRescheduleable(a.batch, true, a.now, a.evalID, a.deployment)
rescheduleNow = rescheduleNow.union(rescheduleDisconnecting)
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name)
lostLaterEvals := a.createLostLaterEvals(lostLater, tg.Name)
// Find delays for any disconnecting allocs that have max_client_disconnect,
// create followup evals, and update the ClientStatus to unknown.
timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name)
// Merge disconnecting with the stop_after_client_disconnect set into the
// lostLaterEvals so that computeStop can add them to the stop set.
lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals)
@@ -438,9 +444,9 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Validate and add reconnecting allocs to the plan so that they will be synced by the client on next poll.
// Validate and add reconnecting allocs to the plan so that they will be logged.
a.computeReconnecting(reconnecting)
desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates))
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
@@ -571,10 +577,12 @@ func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignor
// cancelUnneededCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
func (a *allocReconciler) cancelUnneededCanaries(original allocSet, desiredChanges *structs.DesiredUpdates) (canaries, all allocSet) {
// Stop any canary from an older deployment or from a failed one
var stop []string
all = original
// Cancel any non-promoted canaries from the older deployment
if a.oldDeployment != nil {
for _, dstate := range a.oldDeployment.TaskGroups {
@@ -609,7 +617,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
}
canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients)
untainted, migrate, lost, _, _, _ := canaries.filterByTainted(a.taintedNodes, a.supportsDisconnectedClients, a.now)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@@ -617,7 +625,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s
all = all.difference(migrate, lost)
}
return canaries, all
return
}
// computeUnderProvisionedBy returns the number of allocs that still need to be
@@ -688,9 +696,10 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
})
}
// Add replacements for lost allocs up to group.Count
// Add replacements for disconnected and lost allocs up to group.Count
existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting)
// Add replacements for lost
for _, alloc := range lost {
if existing >= group.Count {
// Reached desired count, do not replace remaining lost
@@ -731,7 +740,17 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying.
// It returns the number of allocs still needed.
func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates,
place []allocPlaceResult, failed, lost allocSet, underProvisionedBy int) int {
place []allocPlaceResult, rescheduleNow, lost allocSet, underProvisionedBy int) int {
// Disconnecting allocs are not failing, but are included in rescheduleNow.
// Create a new set that only includes the actual failures and compute
// replacements based off that.
failed := make(allocSet)
for id, alloc := range rescheduleNow {
if _, ok := a.result.disconnectUpdates[id]; !ok {
failed[id] = alloc
}
}
// If the deployment is place ready, apply all placements and return
if deploymentPlaceReady {
@@ -739,6 +758,7 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
// This relies on the computePlacements having built this set, which in
// turn relies on len(lostLater) == 0.
a.result.place = append(a.result.place, place...)
a.markStop(failed, "", allocRescheduled)
desiredChanges.Stop += uint64(len(failed))
@@ -760,13 +780,13 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
}
// if no failures or there are no pending placements return.
if len(failed) == 0 || len(place) == 0 {
if len(rescheduleNow) == 0 || len(place) == 0 {
return underProvisionedBy
}
// Handle rescheduling of failed allocations even if the deployment is failed.
// If the placement is rescheduling, and not part of a failed deployment, add
// to the place set, and add the previous alloc to the stop set.
// to the place set. Add the previous alloc to the stop set unless it is disconnecting.
for _, p := range place {
prev := p.PreviousAllocation()
partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID
@@ -775,6 +795,11 @@ func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desired
a.result.place = append(a.result.place, p)
desiredChanges.Place++
_, prevIsDisconnecting := a.result.disconnectUpdates[prev.ID]
if prevIsDisconnecting {
continue
}
a.result.stop = append(a.result.stop, allocStopResult{
alloc: prev,
statusDescription: allocRescheduled,
@@ -873,9 +898,7 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in
}
// Final check to see if the deployment is complete is to ensure everything is healthy
var ok bool
var dstate *structs.DeploymentState
if dstate, ok = a.deployment.TaskGroups[groupName]; ok {
if dstate, ok := a.deployment.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
complete = false
@@ -1009,11 +1032,13 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto
for _, reconnectingAlloc := range reconnecting {
// if the desired status is not run, or if the user-specified desired
// transition is not run, stop the allocation.
// transition is not run, stop the reconnecting allocation.
if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun ||
reconnectingAlloc.DesiredTransition.ShouldMigrate() ||
reconnectingAlloc.DesiredTransition.ShouldReschedule() ||
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() {
reconnectingAlloc.DesiredTransition.ShouldForceReschedule() ||
reconnectingAlloc.Job.Version < a.job.Version ||
reconnectingAlloc.Job.CreateIndex < a.job.CreateIndex {
stop[reconnectingAlloc.ID] = reconnectingAlloc
a.result.stop = append(a.result.stop, allocStopResult{
@@ -1032,7 +1057,7 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto
// Compare reconnecting to untainted and decide which to keep.
for _, untaintedAlloc := range untainted {
// If not a match by name go to next
// If not a match by name and previous alloc continue
if reconnectingAlloc.Name != untaintedAlloc.Name {
continue
}
@@ -1045,17 +1070,19 @@ func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, sto
reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore()
if untaintedMaxScoreMeta == nil {
a.logger.Error(fmt.Sprintf("error computing stop: replacement allocation metrics not available for alloc.name %q", untaintedAlloc.Name))
a.logger.Error("error computing stop: replacement allocation metrics not available", "alloc_name", untaintedAlloc.Name, "alloc_id", untaintedAlloc.ID)
continue
}
if reconnectingMaxScoreMeta == nil {
a.logger.Error(fmt.Sprintf("error computing stop: reconnecting allocation metrics not available for alloc.name %q", reconnectingAlloc.Name))
a.logger.Error("error computing stop: reconnecting allocation metrics not available", "alloc_name", reconnectingAlloc.Name, "alloc_id", reconnectingAlloc.ID)
continue
}
statusDescription := allocNotNeeded
if untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
if untaintedAlloc.Job.Version > reconnectingAlloc.Job.Version ||
untaintedAlloc.Job.CreateIndex > reconnectingAlloc.Job.CreateIndex ||
untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore {
stopAlloc = reconnectingAlloc
deleteSet = reconnecting
} else {
@@ -1112,7 +1139,7 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
// the followupEvalID
func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, all, tgName)
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, tgName)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
@@ -1136,7 +1163,11 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
// Create updates that will be appended to the plan.
for _, alloc := range reconnecting {
// If the user has defined a DesiredTransition don't resume the alloc.
if alloc.DesiredTransition.ShouldMigrate() || alloc.DesiredTransition.ShouldReschedule() || alloc.DesiredTransition.ShouldForceReschedule() {
if alloc.DesiredTransition.ShouldMigrate() ||
alloc.DesiredTransition.ShouldReschedule() ||
alloc.DesiredTransition.ShouldForceReschedule() ||
alloc.Job.Version < a.job.Version ||
alloc.Job.CreateIndex < a.job.CreateIndex {
continue
}
@@ -1145,14 +1176,14 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) {
continue
}
a.result.reconnectUpdates[alloc.ID] = alloc.Copy()
a.result.reconnectUpdates[alloc.ID] = alloc
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs.
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return map[string]string{}
}
@@ -1222,7 +1253,7 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now)
if err != nil || len(timeoutDelays) != len(disconnecting) {
a.logger.Error(fmt.Sprintf("error computing disconnecting timeouts for task_group.name %q: %s", tgName, err))
a.logger.Error("error computing disconnecting timeouts for task_group", "task_group", tgName, "err", err)
return map[string]string{}
}
@@ -1278,9 +1309,10 @@ func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName
emitRescheduleInfo(timeoutInfo.alloc, eval)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
// and the unknown ClientStatus.
// and the unknown ClientStatus and AllocState.
updatedAlloc := timeoutInfo.alloc.Copy()
updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown
updatedAlloc.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
updatedAlloc.ClientDescription = allocUnknown
updatedAlloc.FollowupEvalID = eval.ID
a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc

View File

@@ -259,8 +259,8 @@ type resultExpectation struct {
attributeUpdates int
disconnectUpdates int
reconnectUpdates int
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
stop int
}
func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
@@ -5272,66 +5272,348 @@ func TestReconciler_Node_Disconnect_Updates_Alloc_To_Unknown(t *testing.T) {
})
}
// Tests that when a node reconnects unknown allocations for that node are queued
// to resume on the client, and that any replacement allocations that were scheduled
// are queued to stop.
func TestReconciler_Node_Reconnect_ScaleIn_And_Reconnect_Unknown(t *testing.T) {
// TODO: Table tests
// * Some replacements have a higher nodes score
// * Scores are a tie
// * Canarying
// Create 2 resumable allocs with a node score of 2.
job, allocs := buildResumableAllocations(2, structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun, 2)
// Adjust the desired count on the job's Task group that got set in the helper.
job.TaskGroups[0].Count = 3
// Create 3 placed allocs with a lower nodeScore here.
scaleInAllocs := buildAllocations(job, 3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 1)
// 2 should scale in, since we are passing nil in tainted nodes. We pass the
// allocUpdateFnIgnore, because computeUpdates in a real setting should return
// ignore == true for the 1 remaining untainted update after computeStop
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, append(allocs, scaleInAllocs...), nil, "", 50, true)
reconciler.now = time.Now().UTC()
results := reconciler.Compute()
// Verify that 0 follow up evals were created.
evals := results.desiredFollowupEvals[job.TaskGroups[0].Name]
require.Len(t, evals, 0)
// Validate that the queued reconnectUpdates have the right client status,
// and that they have no FollowUpdEvalID.
for _, reconnectUpdate := range results.reconnectUpdates {
require.Equal(t, structs.AllocClientStatusUnknown, reconnectUpdate.ClientStatus)
require.Empty(t, reconnectUpdate.FollowupEvalID)
require.Equal(t, structs.AllocDesiredStatusRun, reconnectUpdate.DesiredStatus)
// Tests that when a node disconnects/reconnects allocations for that node are
// reconciled according to the business rules.
func TestReconciler_Disconnected_Client(t *testing.T) {
type testCase struct {
name string
allocCount int
disconnectedAllocCount int
jobVersionIncrement uint64
nodeScoreIncrement float64
disconnectedAllocStatus string
isBatch bool
nodeStatusDisconnected bool
replace bool
failReplacement bool
shouldStopOnDisconnectedNode bool
maxDisconnect *time.Duration
expected *resultExpectation
}
// 2 to stop, 2 reconnect updates, 1 to ignore
assertResults(t, results, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
stop: 2,
destructive: 0,
inplace: 0,
disconnectUpdates: 0,
reconnectUpdates: 2,
// TODO: Figure out how this needs to change.
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
Stop: 2,
DestructiveUpdate: 0,
Ignore: 1,
InPlaceUpdate: 0,
testCases := []testCase{
{
name: "reconnect-original-no-replacement",
allocCount: 2,
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
reconnectUpdates: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 2,
},
},
},
},
})
{
name: "resume-original-and-stop-replacement",
allocCount: 3,
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
shouldStopOnDisconnectedNode: false,
expected: &resultExpectation{
stop: 1,
reconnectUpdates: 1,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 1,
Ignore: 3,
},
},
},
},
{
name: "stop-original-with-lower-node-score",
allocCount: 4,
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
shouldStopOnDisconnectedNode: true,
nodeScoreIncrement: 1,
expected: &resultExpectation{
stop: 1,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 1,
Ignore: 4,
},
},
},
},
{
name: "ignore-original-failed-if-replaced",
allocCount: 4,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 4,
},
},
},
},
{
name: "reschedule-original-failed-if-not-replaced",
allocCount: 4,
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusFailed,
shouldStopOnDisconnectedNode: true,
expected: &resultExpectation{
stop: 2,
place: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 2,
Place: 2,
Stop: 2,
},
},
},
},
{
name: "ignore-reconnect-completed",
allocCount: 2,
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusComplete,
isBatch: true,
expected: &resultExpectation{
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 2,
},
},
},
},
{
name: "stop-original-alloc-with-old-job-version",
allocCount: 5,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Ignore: 5,
Stop: 2,
},
},
},
},
{
name: "stop-original-alloc-with-old-job-version-reconnect-eval",
allocCount: 5,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 2,
Ignore: 5,
},
},
},
},
{
name: "stop-original-alloc-with-old-job-version-and-failed-replacements",
allocCount: 5,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
failReplacement: true,
shouldStopOnDisconnectedNode: true,
jobVersionIncrement: 1,
expected: &resultExpectation{
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 2,
Ignore: 5,
},
},
},
},
{
name: "stop-original-pending-alloc-for-disconnected-node",
allocCount: 2,
replace: true,
disconnectedAllocCount: 1,
disconnectedAllocStatus: structs.AllocClientStatusPending,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
expected: &resultExpectation{
stop: 1,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 1,
Ignore: 2,
},
},
},
},
{
name: "stop-expired-allocs",
allocCount: 5,
replace: true,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusUnknown,
shouldStopOnDisconnectedNode: true,
nodeStatusDisconnected: true,
maxDisconnect: helper.TimeToPtr(2 * time.Second),
expected: &resultExpectation{
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Stop: 2,
Ignore: 5,
},
},
},
},
{
name: "replace-allocs-on-disconnected-node",
allocCount: 5,
replace: false,
disconnectedAllocCount: 2,
disconnectedAllocStatus: structs.AllocClientStatusRunning,
nodeStatusDisconnected: true,
expected: &resultExpectation{
place: 2,
disconnectUpdates: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
"web": {
Place: 2,
Ignore: 3,
},
},
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
require.NotEqual(t, 0, tc.allocCount, "invalid test case: alloc count must be greater than zero")
testNode := mock.Node()
if tc.nodeStatusDisconnected == true {
testNode.Status = structs.NodeStatusDisconnected
}
// Create resumable allocs
job, allocs := buildResumableAllocations(tc.allocCount, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
if tc.isBatch {
job.Type = structs.JobTypeBatch
}
// Set alloc state
disconnectedAllocCount := tc.disconnectedAllocCount
for _, alloc := range allocs {
if tc.maxDisconnect != nil {
alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect
}
if disconnectedAllocCount > 0 {
alloc.ClientStatus = tc.disconnectedAllocStatus
// Set the node id on all the disconnected allocs to the node under test.
alloc.NodeID = testNode.ID
alloc.AllocStates = []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: time.Now(),
}}
event := structs.NewTaskEvent(structs.TaskClientReconnected)
event.Time = time.Now().UnixNano()
alloc.TaskStates = map[string]*structs.TaskState{
alloc.Job.TaskGroups[0].Tasks[0].Name: {
Events: []*structs.TaskEvent{event},
},
}
disconnectedAllocCount--
}
}
// Place the allocs on another node.
if tc.replace {
replacements := make([]*structs.Allocation, 0)
for _, alloc := range allocs {
if alloc.NodeID != testNode.ID {
continue
}
replacement := alloc.Copy()
replacement.ID = uuid.Generate()
replacement.NodeID = uuid.Generate()
replacement.ClientStatus = structs.AllocClientStatusRunning
replacement.PreviousAllocation = alloc.ID
replacement.AllocStates = nil
replacement.TaskStates = nil
alloc.NextAllocation = replacement.ID
if tc.jobVersionIncrement != 0 {
replacement.Job.Version = replacement.Job.Version + tc.jobVersionIncrement
}
if tc.nodeScoreIncrement != 0 {
replacement.Metrics.ScoreMetaData[0].NormScore = replacement.Metrics.ScoreMetaData[0].NormScore + tc.nodeScoreIncrement
}
replacements = append(replacements, replacement)
// If we want to test intermediate replacement failures simulate that.
if tc.failReplacement {
replacement.ClientStatus = structs.AllocClientStatusFailed
nextReplacement := replacement.Copy()
nextReplacement.ID = uuid.Generate()
nextReplacement.ClientStatus = structs.AllocClientStatusRunning
nextReplacement.PreviousAllocation = replacement.ID
replacement.NextAllocation = nextReplacement.ID
replacements = append(replacements, nextReplacement)
}
}
allocs = append(allocs, replacements...)
}
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, tc.isBatch, job.ID, job,
nil, allocs, map[string]*structs.Node{testNode.ID: testNode}, "", 50, true)
reconciler.now = time.Now()
if tc.maxDisconnect != nil {
reconciler.now = time.Now().Add(*tc.maxDisconnect * 20)
}
results := reconciler.Compute()
for _, stopResult := range results.stop {
if tc.shouldStopOnDisconnectedNode {
require.Equal(t, testNode.ID, stopResult.alloc.NodeID)
} else {
require.NotEqual(t, testNode.ID, stopResult.alloc.NodeID)
}
require.Equal(t, job.Version, stopResult.alloc.Job.Version)
}
assertResults(t, results, tc.expected)
})
}
}
// Tests that the future timeout evals that get created when a node disconnects
@@ -5416,39 +5698,3 @@ func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testin
},
})
}
func TestReconciler_Compute_Disconnecting(t *testing.T) {
// Build a set of resumable allocations. Helper will set the timeout to 5 min.
job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2)
// Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little
// more discernible that only the affected alloc(s) get marked unknown.
nodes := buildDisconnectedNodes(allocs, 2)
reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job,
nil, allocs, nodes, "", 50, true)
reconciler.now = time.Now().UTC()
tgName := allocs[0].TaskGroup
matrix := newAllocMatrix(job, allocs)
_, _, _, reconnecting, _ := matrix[tgName].filterByTainted(nodes, reconciler.supportsDisconnectedClients)
require.NotNil(t, reconnecting)
require.Len(t, reconnecting, 2)
result := reconciler.createTimeoutLaterEvals(reconnecting, tgName)
require.NotNil(t, result)
require.Len(t, reconciler.result.desiredFollowupEvals, 1)
evals := reconciler.result.desiredFollowupEvals[tgName]
for _, eval := range evals {
found := false
for _, evalID := range result {
found = eval.ID == evalID
if found {
break
}
}
require.True(t, found)
}
}

View File

@@ -209,21 +209,23 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
}
// filterByTainted takes a set of tainted nodes and filters the allocation set
// into 5 groups:
// into the following groups:
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes
// 3. Those that exist on lost nodes or have expired
// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown
// 5. Those that have had their ClientState set to unknown, but their node has reconnected.
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, supportsDisconnectedClients bool) (untainted, migrate, lost, disconnecting, reconnecting allocSet) {
// 5. Those that are on a node that has reconnected.
// 6. Those that are in a state that results in a noop.
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, supportsDisconnectedClients bool, now time.Time) (untainted, migrate, lost, disconnecting, reconnecting, ignore allocSet) {
untainted = make(map[string]*structs.Allocation)
migrate = make(map[string]*structs.Allocation)
lost = make(map[string]*structs.Allocation)
disconnecting = make(map[string]*structs.Allocation)
reconnecting = make(map[string]*structs.Allocation)
ignore = make(map[string]*structs.Allocation)
for _, alloc := range a {
// Terminal allocs are always untainted as they should never be migrated
// Terminal allocs are always untainted as they should never be migrated.
if alloc.TerminalStatus() {
untainted[alloc.ID] = alloc
continue
@@ -235,10 +237,27 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
continue
}
// Expired unknown allocs are lost
if supportsDisconnectedClients && alloc.Expired(now) {
lost[alloc.ID] = alloc
continue
}
// Ignore unknown allocs
if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown {
ignore[alloc.ID] = alloc
continue
}
taintedNode, ok := taintedNodes[alloc.NodeID]
if !ok {
// Filter allocs on a node that is now re-connected to be resumed.
if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown {
reconnected, expired := alloc.Reconnected()
if reconnected {
if expired {
lost[alloc.ID] = alloc
continue
}
reconnecting[alloc.ID] = alloc
continue
}
@@ -263,8 +282,13 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
continue
}
case structs.NodeStatusReady:
// Filter unknown allocs on a node that is connected to reconnect.
if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown {
// Filter reconnecting allocs with replacements on a node that is now connected.
reconnected, expired := alloc.Reconnected()
if reconnected {
if expired {
lost[alloc.ID] = alloc
continue
}
reconnecting[alloc.ID] = alloc
continue
}
@@ -280,7 +304,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
// All other allocs are untainted
untainted[alloc.ID] = alloc
}
return
@@ -290,23 +313,25 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support
// untainted or a set of allocations that must be rescheduled now. Allocations that can be rescheduled
// at a future time are also returned so that we can create follow up evaluations for them. Allocs are
// skipped or considered untainted according to logic defined in shouldFilter method.
func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
func (a allocSet) filterByRescheduleable(isBatch, isDisconnecting bool, now time.Time, evalID string, deployment *structs.Deployment) (untainted, rescheduleNow allocSet, rescheduleLater []*delayedRescheduleInfo) {
untainted = make(map[string]*structs.Allocation)
rescheduleNow = make(map[string]*structs.Allocation)
// When filtering disconnected sets, the untainted set is never populated.
// It has no purpose in that context.
for _, alloc := range a {
var eligibleNow, eligibleLater bool
var rescheduleTime time.Time
// Ignore failing allocs that have already been rescheduled
// only failed allocs should be rescheduled, but protect against a bug allowing rescheduling
// running allocs
// Ignore failing allocs that have already been rescheduled.
// Only failed or disconnecting allocs should be rescheduled.
// Protects against a bug allowing rescheduling running allocs.
if alloc.NextAllocation != "" && alloc.TerminalStatus() {
continue
}
isUntainted, ignore := shouldFilter(alloc, isBatch)
if isUntainted {
if isUntainted && !isDisconnecting {
untainted[alloc.ID] = alloc
}
if isUntainted || ignore {
@@ -314,9 +339,11 @@ func (a allocSet) filterByRescheduleable(isBatch bool, now time.Time, evalID str
}
// Only failed allocs with desired state run get to this point
// If the failed alloc is not eligible for rescheduling now we add it to the untainted set
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment)
if !eligibleNow {
// If the failed alloc is not eligible for rescheduling now we
// add it to the untainted set. Disconnecting delay evals are
// handled by allocReconciler.createTimeoutLaterEvals
eligibleNow, eligibleLater, rescheduleTime = updateByReschedulable(alloc, now, evalID, deployment, isDisconnecting)
if !isDisconnecting && !eligibleNow {
untainted[alloc.ID] = alloc
if eligibleLater {
rescheduleLater = append(rescheduleLater, &delayedRescheduleInfo{alloc.ID, alloc, rescheduleTime})
@@ -378,7 +405,7 @@ func shouldFilter(alloc *structs.Allocation, isBatch bool) (untainted, ignore bo
// updateByReschedulable is a helper method that encapsulates logic for whether a failed allocation
// should be rescheduled now, later or left in the untainted set
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID string, d *structs.Deployment, isDisconnecting bool) (rescheduleNow, rescheduleLater bool, rescheduleTime time.Time) {
// If the allocation is part of an ongoing active deployment, we only allow it to reschedule
// if it has been marked eligible
if d != nil && alloc.DeploymentID == d.ID && d.Active() && !alloc.DesiredTransition.ShouldReschedule() {
@@ -391,7 +418,13 @@ func updateByReschedulable(alloc *structs.Allocation, now time.Time, evalID stri
}
// Reschedule if the eval ID matches the alloc's followup evalID or if its close to its reschedule time
rescheduleTime, eligible := alloc.NextRescheduleTime()
var eligible bool
if isDisconnecting {
rescheduleTime, eligible = alloc.NextRescheduleTimeByFailTime(now)
} else {
rescheduleTime, eligible = alloc.NextRescheduleTime()
}
if eligible && (alloc.FollowupEvalID == evalID || rescheduleTime.Sub(now) <= rescheduleWindowSize) {
rescheduleNow = true
return

View File

@@ -8,6 +8,7 @@ import (
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
"time"
)
// Test that we properly create the bitmap even when the alloc set includes an
@@ -59,170 +60,591 @@ func TestAllocSet_filterByTainted(t *testing.T) {
},
}
batchJob := &structs.Job{
Type: structs.JobTypeBatch,
}
testJob := mock.Job()
testJob.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Second)
now := time.Now()
allocs := allocSet{
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: batchJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: batchJob,
NodeID: "nil",
},
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "lost",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting1": {
ID: "disconnecting1",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting2": {
ID: "disconnecting2",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnecting3": {
ID: "disconnecting3",
ClientStatus: structs.AllocClientStatusRunning,
Job: batchJob,
NodeID: "disconnected",
},
// Complete allocs on disconnected nodes don't get restarted
"disconnecting4": {
ID: "disconnecting4",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "disconnected",
},
// Failed allocs on disconnected nodes don't get restarted
"disconnecting5": {
ID: "disconnecting5",
ClientStatus: structs.AllocClientStatusFailed,
Job: batchJob,
NodeID: "disconnected",
},
// Lost allocs on disconnected nodes don't get restarted
"disconnecting6": {
ID: "disconnecting6",
ClientStatus: structs.AllocClientStatusLost,
Job: batchJob,
NodeID: "disconnected",
},
// Unknown allocs on re-connected nodes are reconnecting
"reconnecting1": {
ID: "reconnecting1",
ClientStatus: structs.AllocClientStatusUnknown,
Job: batchJob,
NodeID: "normal",
},
// Unknown allocs on re-connected nodes are reconnecting
"reconnecting2": {
ID: "reconnecting2",
ClientStatus: structs.AllocClientStatusUnknown,
Job: batchJob,
NodeID: "normal",
},
// Complete allocs on disconnected nodes don't get restarted
"reconnecting3": {
ID: "reconnecting3",
ClientStatus: structs.AllocClientStatusComplete,
Job: batchJob,
NodeID: "normal",
},
// Failed allocs on disconnected nodes don't get restarted
"reconnecting4": {
ID: "reconnecting4",
ClientStatus: structs.AllocClientStatusFailed,
Job: batchJob,
NodeID: "normal",
},
// Lost allocs on disconnected nodes don't get restarted
"reconnecting5": {
ID: "reconnecting5",
ClientStatus: structs.AllocClientStatusLost,
Job: batchJob,
NodeID: "normal",
unknownAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now,
}}
expiredAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now.Add(-60 * time.Second),
}}
reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected)
reconnectedEvent.Time = time.Now().UnixNano()
reconnectTaskState := map[string]*structs.TaskState{
testJob.TaskGroups[0].Tasks[0].Name: {
Events: []*structs.TaskEvent{reconnectedEvent},
},
}
untainted, migrate, lost, disconnecting, reconnecting := allocs.filterByTainted(nodes, true)
require.Len(t, untainted, 10)
require.Contains(t, untainted, "untainted1")
require.Contains(t, untainted, "untainted2")
require.Contains(t, untainted, "untainted3")
require.Contains(t, untainted, "untainted4")
require.Contains(t, untainted, "disconnecting4")
require.Contains(t, untainted, "disconnecting5")
require.Contains(t, untainted, "disconnecting6")
require.Contains(t, untainted, "reconnecting3")
require.Contains(t, untainted, "reconnecting4")
require.Contains(t, untainted, "reconnecting5")
require.Len(t, migrate, 2)
require.Contains(t, migrate, "migrating1")
require.Contains(t, migrate, "migrating2")
require.Len(t, lost, 2)
require.Contains(t, lost, "lost1")
require.Contains(t, lost, "lost2")
require.Len(t, disconnecting, 3)
require.Contains(t, disconnecting, "disconnecting1")
require.Contains(t, disconnecting, "disconnecting2")
require.Contains(t, disconnecting, "disconnecting3")
require.Len(t, reconnecting, 2)
require.Contains(t, reconnecting, "reconnecting1")
require.Contains(t, reconnecting, "reconnecting2")
type testCase struct {
name string
all allocSet
taintedNodes map[string]*structs.Node
supportsDisconnectedClients bool
skipNilNodeTest bool
now time.Time
// expected results
untainted allocSet
migrate allocSet
lost allocSet
disconnecting allocSet
reconnecting allocSet
ignore allocSet
}
testCases := []testCase{
// These two cases test that we maintain parity with pre-disconnected-clients behavior.
{
name: "lost-client",
supportsDisconnectedClients: false,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "lost",
},
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "nil",
},
},
untainted: allocSet{
"untainted1": {
ID: "untainted1",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted
"untainted2": {
ID: "untainted2",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
},
// Terminal allocs are always untainted, even on draining nodes
"untainted3": {
ID: "untainted3",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "draining",
},
// Terminal allocs are always untainted, even on lost nodes
"untainted4": {
ID: "untainted4",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "lost",
},
},
migrate: allocSet{
// Non-terminal alloc with migrate=true should migrate on a draining node
"migrating1": {
ID: "migrating1",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "draining",
},
// Non-terminal alloc with migrate=true should migrate on an unknown node
"migrating2": {
ID: "migrating2",
ClientStatus: structs.AllocClientStatusRunning,
DesiredTransition: structs.DesiredTransition{Migrate: helper.BoolToPtr(true)},
Job: testJob,
NodeID: "nil",
},
},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "lost-client-only-tainted-nodes",
supportsDisconnectedClients: false,
now: time.Now(),
taintedNodes: nodes,
// The logic associated with this test case can only trigger if there
// is a tainted node. Therefore, testing with a nil node set produces
// false failures, so don't perform that test if in this case.
skipNilNodeTest: true,
all: allocSet{
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: testJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "lost",
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{
// Non-terminal allocs on lost nodes are lost
"lost1": {
ID: "lost1",
ClientStatus: structs.AllocClientStatusPending,
Job: testJob,
NodeID: "lost",
},
// Non-terminal allocs on lost nodes are lost
"lost2": {
ID: "lost2",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "lost",
},
},
},
// Everything below this line tests the disconnected client mode.
{
name: "disco-client-untainted-reconnect-failed-and-replaced",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "failed-original",
},
// Failed and replaced allocs on reconnected nodes are untainted
"failed-original": {
ID: "failed-original",
Name: "web",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "failed-original",
},
"failed-original": {
ID: "failed-original",
Name: "web",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-reconnecting-running-no-replacement",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
// Running allocs on reconnected nodes with no replacement are reconnecting.
// Node.UpdateStatus has already handled syncing client state so this
// should be a noop.
"reconnecting-running-no-replacement": {
ID: "reconnecting-running-no-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"reconnecting-running-no-replacement": {
ID: "reconnecting-running-no-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-terminal",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
// Allocs on reconnected nodes that are complete are untainted
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Failed allocs on reconnected nodes that are complete are untainted
"untainted-reconnect-failed": {
ID: "untainted-reconnect-failed",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Lost allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
// Replacement allocs that are complete are untainted
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
// Replacement allocs on reconnected nodes that are failed are untainted
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-failed",
},
// Lost replacement allocs on reconnected nodes don't get restarted
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
untainted: allocSet{
"untainted-reconnect-complete": {
ID: "untainted-reconnect-complete",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-failed": {
ID: "untainted-reconnect-failed",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-lost": {
ID: "untainted-reconnect-lost",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
"untainted-reconnect-complete-replacement": {
ID: "untainted-reconnect-complete-replacement",
Name: "untainted-reconnect-complete",
ClientStatus: structs.AllocClientStatusComplete,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-complete",
},
"untainted-reconnect-failed-replacement": {
ID: "untainted-reconnect-failed-replacement",
Name: "untainted-reconnect-failed",
ClientStatus: structs.AllocClientStatusFailed,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-failed",
},
"untainted-reconnect-lost-replacement": {
ID: "untainted-reconnect-lost-replacement",
Name: "untainted-reconnect-lost",
ClientStatus: structs.AllocClientStatusLost,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
PreviousAllocation: "untainted-reconnect-lost",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{},
ignore: allocSet{},
lost: allocSet{},
},
{
name: "disco-client-disconnect",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: true,
all: allocSet{
// Non-terminal allocs on disconnected nodes are disconnecting
"disconnect-running": {
ID: "disconnect-running",
Name: "disconnect-running",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
},
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
// Unknown allocs on disconnected nodes are lost when expired
"lost-unknown": {
ID: "lost-unknown",
Name: "lost-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: expiredAllocState,
},
},
untainted: allocSet{},
migrate: allocSet{},
disconnecting: allocSet{
"disconnect-running": {
ID: "disconnect-running",
Name: "disconnect-running",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
},
},
reconnecting: allocSet{},
ignore: allocSet{
// Unknown allocs on disconnected nodes are ignored
"ignore-unknown": {
ID: "ignore-unknown",
Name: "ignore-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: unknownAllocState,
},
},
lost: allocSet{
"lost-unknown": {
ID: "lost-unknown",
Name: "lost-unknown",
ClientStatus: structs.AllocClientStatusUnknown,
Job: testJob,
NodeID: "disconnected",
TaskGroup: "web",
AllocStates: expiredAllocState,
},
},
},
{
name: "disco-client-running-reconnecting-and-replacement-untainted",
supportsDisconnectedClients: true,
now: time.Now(),
taintedNodes: nodes,
skipNilNodeTest: false,
all: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "running-original",
},
// Running and replaced allocs on reconnected nodes are reconnecting
"running-original": {
ID: "running-original",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
untainted: allocSet{
"running-replacement": {
ID: "running-replacement",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
PreviousAllocation: "running-original",
},
},
migrate: allocSet{},
disconnecting: allocSet{},
reconnecting: allocSet{
"running-original": {
ID: "running-original",
Name: "web",
ClientStatus: structs.AllocClientStatusRunning,
Job: testJob,
NodeID: "normal",
TaskGroup: "web",
AllocStates: unknownAllocState,
TaskStates: reconnectTaskState,
},
},
ignore: allocSet{},
lost: allocSet{},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// With tainted nodes
untainted, migrate, lost, disconnecting, reconnecting, ignore := tc.all.filterByTainted(tc.taintedNodes, tc.supportsDisconnectedClients, tc.now)
require.Equal(t, tc.untainted, untainted, "with-nodes", "untainted")
require.Equal(t, tc.migrate, migrate, "with-nodes", "migrate")
require.Equal(t, tc.lost, lost, "with-nodes", "lost")
require.Equal(t, tc.disconnecting, disconnecting, "with-nodes", "disconnecting")
require.Equal(t, tc.reconnecting, reconnecting, "with-nodes", "reconnecting")
require.Equal(t, tc.ignore, ignore, "with-nodes", "ignore")
if tc.skipNilNodeTest {
return
}
// Now again with nodes nil
untainted, migrate, lost, disconnecting, reconnecting, ignore = tc.all.filterByTainted(nil, tc.supportsDisconnectedClients, tc.now)
require.Equal(t, tc.untainted, untainted, "nodes-nil", "untainted")
require.Equal(t, tc.migrate, migrate, "nodes-nil", "migrate")
require.Equal(t, tc.lost, lost, "nodes-nil", "lost")
require.Equal(t, tc.disconnecting, disconnecting, "nodes-nil", "disconnecting")
require.Equal(t, tc.reconnecting, reconnecting, "nodes-nil", "reconnecting")
require.Equal(t, tc.ignore, ignore, "nodes-nil", "ignore")
})
}
}