diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 92397ed06..32b8d4e02 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -784,9 +784,8 @@ func (ar *allocRunner) NetworkStatus() *structs.AllocNetworkStatus { return ar.state.NetworkStatus.Copy() } -// setIndexes is a helper for forcing a set of server side indexes -// on the alloc runner. This is used during reconnect when the task -// has been marked unknown by the server. +// setIndexes is a helper for forcing alloc state on the alloc runner. This is +// used during reconnect when the task has been marked unknown by the server. func (ar *allocRunner) setIndexes(update *structs.Allocation) { ar.allocLock.Lock() defer ar.allocLock.Unlock() @@ -1253,15 +1252,13 @@ func (ar *allocRunner) Signal(taskName, signal string) error { // Reconnect logs a reconnect event for each task in the allocation and syncs the current alloc state with the server. func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) { - ar.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex) - event := structs.NewTaskEvent(structs.TaskClientReconnected) event.Time = time.Now().UnixNano() for _, tr := range ar.tasks { tr.AppendEvent(event) } - // Update the client alloc with the server client side indexes. + // Update the client alloc with the server side indexes. ar.setIndexes(update) // Calculate alloc state to get the final state with the new events. @@ -1274,12 +1271,6 @@ func (ar *allocRunner) Reconnect(update *structs.Allocation) (err error) { // Build the client allocation alloc := ar.clientAlloc(states) - // Don't destroy until after we've appended the reconnect event. - if update.DesiredStatus != structs.AllocDesiredStatusRun { - ar.Shutdown() - return - } - // Update the client state store. err = ar.stateUpdater.PutAllocation(alloc) if err != nil { diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index d7e62e15e..749807e9f 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1151,6 +1151,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene var evals []*structs.Evaluation for _, allocToUpdate := range args.Alloc { + evalTriggerBy := "" allocToUpdate.ModifyTime = now.UTC().UnixNano() alloc, _ := n.srv.State().AllocByID(nil, allocToUpdate.ID) @@ -1162,51 +1163,73 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene continue } - // if the job has been purged, this will always return error - job, err := n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) + var job *structs.Job + var jobType string + var jobPriority int + + job, err = n.srv.State().JobByID(nil, alloc.Namespace, alloc.JobID) if err != nil { n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID, "error", err) continue } + + // If the job is nil it means it has been de-registered. if job == nil { - n.logger.Debug("UpdateAlloc unable to find job", "job", alloc.JobID) - continue + jobType = alloc.Job.Type + jobPriority = alloc.Job.Priority + evalTriggerBy = structs.EvalTriggerJobDeregister + allocToUpdate.DesiredStatus = structs.AllocDesiredStatusStop + n.logger.Debug("UpdateAlloc unable to find job - shutting down alloc", "job", alloc.JobID) } - taskGroup := job.LookupTaskGroup(alloc.TaskGroup) - if taskGroup == nil { - continue + var taskGroup *structs.TaskGroup + if job != nil { + jobType = job.Type + jobPriority = job.Priority + taskGroup = job.LookupTaskGroup(alloc.TaskGroup) + } + + // If we cannot find the task group for a failed alloc we cannot continue, unless it is an orphan. + if evalTriggerBy != structs.EvalTriggerJobDeregister && + allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && + alloc.FollowupEvalID == "" { + + if taskGroup == nil { + n.logger.Debug("UpdateAlloc unable to find task group for job", "job", alloc.JobID, "alloc", alloc.ID, "task_group", alloc.TaskGroup) + continue + } + + // Set trigger by failed if not an orphan. + if alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { + evalTriggerBy = structs.EvalTriggerRetryFailedAlloc + } } - evalTriggerBy := "" var eval *structs.Evaluation - // Add an evaluation if this is a failed alloc that is eligible for rescheduling - if allocToUpdate.ClientStatus == structs.AllocClientStatusFailed && - alloc.FollowupEvalID == "" && - alloc.RescheduleEligible(taskGroup.ReschedulePolicy, now) { - - evalTriggerBy = structs.EvalTriggerRetryFailedAlloc - } - - //Add an evaluation if this is a reconnecting allocation. - if alloc.ClientStatus == structs.AllocClientStatusUnknown { + // If unknown, and not an orphan, set the trigger by. + if evalTriggerBy != structs.EvalTriggerJobDeregister && + alloc.ClientStatus == structs.AllocClientStatusUnknown { evalTriggerBy = structs.EvalTriggerReconnect } - if evalTriggerBy != "" { - eval = &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: alloc.Namespace, - TriggeredBy: evalTriggerBy, - JobID: alloc.JobID, - Type: job.Type, - Priority: job.Priority, - Status: structs.EvalStatusPending, - CreateTime: now.UTC().UnixNano(), - ModifyTime: now.UTC().UnixNano(), - } - evals = append(evals, eval) + // If we weren't able to determine one of our expected eval triggers, + // continue and don't create an eval. + if evalTriggerBy == "" { + continue } + + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: alloc.Namespace, + TriggeredBy: evalTriggerBy, + JobID: alloc.JobID, + Type: jobType, + Priority: jobPriority, + Status: structs.EvalStatusPending, + CreateTime: now.UTC().UnixNano(), + ModifyTime: now.UTC().UnixNano(), + } + evals = append(evals, eval) } // Add this to the batch @@ -1254,6 +1277,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene // batchUpdate is used to update all the allocations func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Allocation, evals []*structs.Evaluation) { + var mErr multierror.Error // Group pending evals by jobID to prevent creating unnecessary evals evalsByJobId := make(map[structs.NamespacedID]struct{}) var trimmedEvals []*structs.Evaluation @@ -1283,7 +1307,6 @@ func (n *Node) batchUpdate(future *structs.BatchFuture, updates []*structs.Alloc } // Commit this update via Raft - var mErr multierror.Error _, index, err := n.srv.raftApply(structs.AllocClientUpdateRequestType, batch) if err != nil { n.logger.Error("alloc update failed", "error", err) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 222de1ae2..7a43ee59f 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -3765,3 +3765,178 @@ func TestClientEndpoint_ShouldCreateNodeEval(t *testing.T) { }) } } + +func TestClientEndpoint_UpdateAlloc_Evals_ByTrigger(t *testing.T) { + t.Parallel() + + type testCase struct { + name string + clientStatus string + serverClientStatus string + triggerBy string + missingJob bool + missingAlloc bool + invalidTaskGroup bool + } + + testCases := []testCase{ + { + name: "failed-alloc", + clientStatus: structs.AllocClientStatusFailed, + serverClientStatus: structs.AllocClientStatusRunning, + triggerBy: structs.EvalTriggerRetryFailedAlloc, + missingJob: false, + missingAlloc: false, + invalidTaskGroup: false, + }, + { + name: "unknown-alloc", + clientStatus: structs.AllocClientStatusRunning, + serverClientStatus: structs.AllocClientStatusUnknown, + triggerBy: structs.EvalTriggerReconnect, + missingJob: false, + missingAlloc: false, + invalidTaskGroup: false, + }, + { + name: "orphaned-unknown-alloc", + clientStatus: structs.AllocClientStatusRunning, + serverClientStatus: structs.AllocClientStatusUnknown, + triggerBy: structs.EvalTriggerJobDeregister, + missingJob: true, + missingAlloc: false, + invalidTaskGroup: false, + }, + { + name: "running-job", + clientStatus: structs.AllocClientStatusRunning, + serverClientStatus: structs.AllocClientStatusRunning, + triggerBy: "", + missingJob: false, + missingAlloc: false, + invalidTaskGroup: false, + }, + { + name: "complete-job", + clientStatus: structs.AllocClientStatusComplete, + serverClientStatus: structs.AllocClientStatusComplete, + triggerBy: "", + missingJob: false, + missingAlloc: false, + invalidTaskGroup: false, + }, + { + name: "no-alloc-at-server", + clientStatus: structs.AllocClientStatusUnknown, + serverClientStatus: "", + triggerBy: "", + missingJob: false, + missingAlloc: true, + invalidTaskGroup: false, + }, + { + name: "invalid-task-group", + clientStatus: structs.AllocClientStatusUnknown, + serverClientStatus: structs.AllocClientStatusRunning, + triggerBy: "", + missingJob: false, + missingAlloc: false, + invalidTaskGroup: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + s1, cleanupS1 := TestServer(t, func(c *Config) { + // Disabling scheduling in this test so that we can + // ensure that the state store doesn't accumulate more evals + // than what we expect the unit test to add + c.NumSchedulers = 0 + }) + + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var nodeResp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &nodeResp) + require.NoError(t, err) + + fsmState := s1.fsm.State() + + job := mock.Job() + job.ID = tc.name + "-test-job" + + if !tc.missingJob { + err = fsmState.UpsertJob(structs.MsgTypeTestSetup, 101, job) + require.NoError(t, err) + } + + serverAlloc := mock.Alloc() + serverAlloc.JobID = job.ID + serverAlloc.NodeID = node.ID + serverAlloc.ClientStatus = tc.serverClientStatus + serverAlloc.TaskGroup = job.TaskGroups[0].Name + + // Create the incoming client alloc. + clientAlloc := serverAlloc.Copy() + clientAlloc.ClientStatus = tc.clientStatus + + err = fsmState.UpsertJobSummary(99, mock.JobSummary(serverAlloc.JobID)) + require.NoError(t, err) + + if tc.invalidTaskGroup { + serverAlloc.TaskGroup = "invalid" + } + + if !tc.missingAlloc { + err = fsmState.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{serverAlloc}) + require.NoError(t, err) + } + + updateReq := &structs.AllocUpdateRequest{ + Alloc: []*structs.Allocation{clientAlloc}, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + var nodeAllocResp structs.NodeAllocsResponse + err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", updateReq, &nodeAllocResp) + require.NoError(t, err) + require.NotEqual(t, uint64(0), nodeAllocResp.Index) + + // If no eval should be created validate, none were and return. + if tc.triggerBy == "" { + evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.Len(t, evaluations, 0) + return + } + + // Lookup the alloc + updatedAlloc, err := fsmState.AllocByID(nil, serverAlloc.ID) + require.NoError(t, err) + require.Equal(t, tc.clientStatus, updatedAlloc.ClientStatus) + + // Assert that exactly one eval with test case TriggeredBy exists + evaluations, err := fsmState.EvalsByJob(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.Equal(t, 1, len(evaluations)) + foundCount := 0 + for _, resultEval := range evaluations { + if resultEval.TriggeredBy == tc.triggerBy && resultEval.WaitUntil.IsZero() { + foundCount++ + } + } + require.Equal(t, 1, foundCount, "Should create exactly one eval for trigger by", tc.triggerBy) + }) + } + +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 01f2c2a3a..d363912bb 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -440,13 +440,14 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals) + stop, reconnecting := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) // Validate and add reconnecting allocs to the plan so that they will be logged. a.computeReconnecting(reconnecting) desiredChanges.Ignore += uint64(len(a.result.reconnectUpdates)) + // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. ignore, inplace, destructive := a.computeUpdates(tg, untainted) @@ -697,7 +698,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, } // Add replacements for disconnected and lost allocs up to group.Count - existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) + existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) - len(reconnecting.filterByFailedReconnect()) // Add replacements for lost for _, alloc := range lost { @@ -912,13 +913,19 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in // the group definition, the set of allocations in various states and whether we // are canarying. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) allocSet { + untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) (allocSet, allocSet) { // Mark all lost allocations for stop. var stop allocSet stop = stop.union(lost) a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) + // Mark all failed reconnects for stop. + failedReconnects := reconnecting.filterByFailedReconnect() + stop = stop.union(failedReconnects) + a.markStop(failedReconnects, structs.AllocClientStatusFailed, allocRescheduled) + reconnecting = reconnecting.difference(failedReconnects) + // If we are still deploying or creating canaries, don't stop them if isCanarying { untainted = untainted.difference(canaries) @@ -927,7 +934,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // Hot path the nothing to do case remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count if remove <= 0 { - return stop + return stop, reconnecting } // Filter out any terminal allocations from the untainted set @@ -949,7 +956,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop + return stop, reconnecting } } } @@ -973,7 +980,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop + return stop, reconnecting } } } @@ -982,7 +989,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc if len(reconnecting) != 0 { remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove) if remove == 0 { - return stop + return stop, reconnecting } } @@ -999,7 +1006,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop + return stop, reconnecting } } } @@ -1016,11 +1023,11 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc remove-- if remove == 0 { - return stop + return stop, reconnecting } } - return stop + return stop, reconnecting } // computeStopByReconnecting moves allocations from either the untainted or reconnecting @@ -1176,6 +1183,11 @@ func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { continue } + // If the alloc has failed don't reconnect. + if alloc.ClientStatus != structs.AllocClientStatusRunning { + continue + } + a.result.reconnectUpdates[alloc.ID] = alloc } } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 78ff3d051..783970042 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -5282,6 +5282,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { jobVersionIncrement uint64 nodeScoreIncrement float64 disconnectedAllocStatus string + serverDesiredStatus string isBatch bool nodeStatusDisconnected bool replace bool @@ -5298,6 +5299,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: false, expected: &resultExpectation{ reconnectUpdates: 2, @@ -5314,6 +5316,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: false, expected: &resultExpectation{ stop: 1, @@ -5332,6 +5335,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeScoreIncrement: 1, expected: &resultExpectation{ @@ -5345,15 +5349,18 @@ func TestReconciler_Disconnected_Client(t *testing.T) { }, }, { - name: "ignore-original-failed-if-replaced", + name: "stop-original-failed-on-reconnect", allocCount: 4, replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusFailed, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, expected: &resultExpectation{ + stop: 2, desiredTGUpdates: map[string]*structs.DesiredUpdates{ "web": { + Stop: 2, Ignore: 4, }, }, @@ -5365,6 +5372,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusFailed, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, expected: &resultExpectation{ stop: 2, @@ -5384,6 +5392,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusComplete, + serverDesiredStatus: structs.AllocDesiredStatusRun, isBatch: true, expected: &resultExpectation{ desiredTGUpdates: map[string]*structs.DesiredUpdates{ @@ -5399,6 +5408,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, expected: &resultExpectation{ @@ -5417,6 +5427,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, expected: &resultExpectation{ @@ -5435,6 +5446,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, failReplacement: true, shouldStopOnDisconnectedNode: true, jobVersionIncrement: 1, @@ -5454,6 +5466,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 1, disconnectedAllocStatus: structs.AllocClientStatusPending, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeStatusDisconnected: true, expected: &resultExpectation{ @@ -5472,6 +5485,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: true, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusUnknown, + serverDesiredStatus: structs.AllocDesiredStatusRun, shouldStopOnDisconnectedNode: true, nodeStatusDisconnected: true, maxDisconnect: helper.TimeToPtr(2 * time.Second), @@ -5491,6 +5505,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { replace: false, disconnectedAllocCount: 2, disconnectedAllocStatus: structs.AllocClientStatusRunning, + serverDesiredStatus: structs.AllocDesiredStatusRun, nodeStatusDisconnected: true, expected: &resultExpectation{ place: 2, @@ -5524,6 +5539,8 @@ func TestReconciler_Disconnected_Client(t *testing.T) { // Set alloc state disconnectedAllocCount := tc.disconnectedAllocCount for _, alloc := range allocs { + alloc.DesiredStatus = tc.serverDesiredStatus + if tc.maxDisconnect != nil { alloc.Job.TaskGroups[0].MaxClientDisconnect = tc.maxDisconnect } @@ -5600,6 +5617,7 @@ func TestReconciler_Disconnected_Client(t *testing.T) { } results := reconciler.Compute() + assertResults(t, results, tc.expected) for _, stopResult := range results.stop { if tc.shouldStopOnDisconnectedNode { @@ -5610,91 +5628,6 @@ func TestReconciler_Disconnected_Client(t *testing.T) { require.Equal(t, job.Version, stopResult.alloc.Job.Version) } - - assertResults(t, results, tc.expected) }) } } - -// Tests that the future timeout evals that get created when a node disconnects -// stop once the duration passes. -func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testing.T) { - // TODO: Add table tests and play with the reconciler time/node status to make sure that - // if the expiration time has not passed, it's a no-op. - - // Build a set of resumable allocations. Helper will set the timeout to 5 min. - job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) - - // Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little - // more discernible that only the affected alloc(s) get marked unknown. - nodes := buildDisconnectedNodes(allocs, 2) - - // Invoke the reconciler to queue status changes and get the followup evals. - // Use the allocUpdateFnIngore since alloc.TerminalStatus() will evaluate to - // false and cause the real genericAllocUpdateFn to return ignore=true destructive=false - reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, allocs, nodes, "", 50, true) - reconciler.now = time.Now().UTC() - results := reconciler.Compute() - - // Verify that 1 follow up eval was created. - evals := results.desiredFollowupEvals[job.TaskGroups[0].Name] - require.Len(t, evals, 1) - eval := evals[0] - - // Set the NodeStatus to Down on the 2 disconnected nodes to simulate that - // the resume duration has passed. - for _, node := range nodes { - node.Status = structs.NodeStatusDown - } - - // Replace the allocs that were originally created with the updated copies that - // have the unknown ClientStatus. - for i, alloc := range allocs { - for id, updated := range results.disconnectUpdates { - if alloc.ID == id { - allocs[i] = updated - } - } - } - - // Run the followup eval through the reconciler and verify the resumable allocs - // have timed out, will be stopped, and new placements are scheduled. - reconciler = NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, - nil, allocs, nodes, eval.ID, eval.Priority, true) - - // Allocs were configured to expire in 5 min, so configure the reconciler - // to believe that time has passed. - // NOTE: this probably isn't really necessary because this value is really - // only used for computing future evals, but it seemed like good practice - // in case there are other unconsidered side effects. - reconciler.now = time.Now().UTC().Add(6 * time.Minute) - results = reconciler.Compute() - - // Validate that the queued stops have the right client status. - for _, stopResult := range results.stop { - require.Equal(t, structs.AllocClientStatusLost, stopResult.clientStatus) - } - - // 2 to place, 2 to stop, 1 to ignore - assertResults(t, results, &resultExpectation{ - createDeployment: nil, - deploymentUpdates: nil, - place: 2, - destructive: 0, - stop: 2, - inplace: 0, - disconnectUpdates: 0, - reconnectUpdates: 0, - - desiredTGUpdates: map[string]*structs.DesiredUpdates{ - job.TaskGroups[0].Name: { - Place: 2, - Stop: 2, - DestructiveUpdate: 0, - Ignore: 1, - InPlaceUpdate: 0, - }, - }, - }) -} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 946be14a5..cd4b0202e 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -225,8 +225,30 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support ignore = make(map[string]*structs.Allocation) for _, alloc := range a { - // Terminal allocs are always untainted as they should never be migrated. - if alloc.TerminalStatus() { + reconnected := false + expired := false + + // Only compute reconnected for unknown, running, and failed since they need to go through the reconnect logic. + if supportsDisconnectedClients && + (alloc.ClientStatus == structs.AllocClientStatusUnknown || + alloc.ClientStatus == structs.AllocClientStatusRunning || + alloc.ClientStatus == structs.AllocClientStatusFailed) { + reconnected, expired = alloc.Reconnected() + } + + // Failed reconnected allocs need to be added to reconnecting so that they + // can be handled as a failed reconnect. + if supportsDisconnectedClients && + reconnected && + alloc.DesiredStatus == structs.AllocDesiredStatusRun && + alloc.ClientStatus == structs.AllocClientStatusFailed { + reconnecting[alloc.ID] = alloc + continue + } + + // Terminal allocs, if not reconnected, are always untainted as they + // should never be migrated. + if alloc.TerminalStatus() && !reconnected { untainted[alloc.ID] = alloc continue } @@ -243,8 +265,19 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support continue } - // Ignore unknown allocs - if supportsDisconnectedClients && alloc.ClientStatus == structs.AllocClientStatusUnknown { + // Ignore unknown allocs that we want to reconnect eventually. + if supportsDisconnectedClients && + alloc.ClientStatus == structs.AllocClientStatusUnknown && + alloc.DesiredStatus == structs.AllocDesiredStatusRun { + ignore[alloc.ID] = alloc + continue + } + + // Ignore reconnected failed allocs that have been marked stop by the server. + if supportsDisconnectedClients && + reconnected && + alloc.ClientStatus == structs.AllocClientStatusFailed && + alloc.DesiredStatus == structs.AllocDesiredStatusStop { ignore[alloc.ID] = alloc continue } @@ -252,7 +285,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support taintedNode, ok := taintedNodes[alloc.NodeID] if !ok { // Filter allocs on a node that is now re-connected to be resumed. - reconnected, expired := alloc.Reconnected() if reconnected { if expired { lost[alloc.ID] = alloc @@ -283,7 +315,6 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, support } case structs.NodeStatusReady: // Filter reconnecting allocs with replacements on a node that is now connected. - reconnected, expired := alloc.Reconnected() if reconnected { if expired { lost[alloc.ID] = alloc @@ -461,6 +492,19 @@ func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) { return } +// filterByFailedReconnect filters allocation into a set that have failed on the +// client but do not have a terminal status at the server so that they can be +// marked as stop at the server. +func (a allocSet) filterByFailedReconnect() allocSet { + failed := make(allocSet) + for _, alloc := range a { + if !alloc.ServerTerminalStatus() && alloc.ClientStatus == structs.AllocClientStatusFailed { + failed[alloc.ID] = alloc + } + } + return failed +} + // delayByStopAfterClientDisconnect returns a delay for any lost allocation that's got a // stop_after_client_disconnect configured func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedRescheduleInfo) { diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index ae4306548..6e4fa8988 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -292,6 +292,10 @@ func TestAllocSet_filterByTainted(t *testing.T) { TaskGroup: "web", PreviousAllocation: "failed-original", }, + }, + migrate: allocSet{}, + disconnecting: allocSet{}, + reconnecting: allocSet{ "failed-original": { ID: "failed-original", Name: "web", @@ -303,11 +307,8 @@ func TestAllocSet_filterByTainted(t *testing.T) { TaskStates: reconnectTaskState, }, }, - migrate: allocSet{}, - disconnecting: allocSet{}, - reconnecting: allocSet{}, - ignore: allocSet{}, - lost: allocSet{}, + ignore: allocSet{}, + lost: allocSet{}, }, { name: "disco-client-reconnecting-running-no-replacement", @@ -366,10 +367,11 @@ func TestAllocSet_filterByTainted(t *testing.T) { AllocStates: unknownAllocState, TaskStates: reconnectTaskState, }, - // Failed allocs on reconnected nodes that are complete are untainted - "untainted-reconnect-failed": { - ID: "untainted-reconnect-failed", - Name: "untainted-reconnect-failed", + // Failed allocs on reconnected nodes are in reconnecting so that + // they be marked with desired status stop at the server. + "reconnecting-failed": { + ID: "reconnecting-failed", + Name: "reconnecting-failed", ClientStatus: structs.AllocClientStatusFailed, Job: testJob, NodeID: "normal", @@ -408,7 +410,7 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - PreviousAllocation: "untainted-reconnect-failed", + PreviousAllocation: "reconnecting-failed", }, // Lost replacement allocs on reconnected nodes don't get restarted "untainted-reconnect-lost-replacement": { @@ -433,16 +435,6 @@ func TestAllocSet_filterByTainted(t *testing.T) { AllocStates: unknownAllocState, TaskStates: reconnectTaskState, }, - "untainted-reconnect-failed": { - ID: "untainted-reconnect-failed", - Name: "untainted-reconnect-failed", - ClientStatus: structs.AllocClientStatusFailed, - Job: testJob, - NodeID: "normal", - TaskGroup: "web", - AllocStates: unknownAllocState, - TaskStates: reconnectTaskState, - }, "untainted-reconnect-lost": { ID: "untainted-reconnect-lost", Name: "untainted-reconnect-lost", @@ -471,7 +463,7 @@ func TestAllocSet_filterByTainted(t *testing.T) { NodeID: "normal", TaskGroup: "web", AllocStates: unknownAllocState, - PreviousAllocation: "untainted-reconnect-failed", + PreviousAllocation: "reconnecting-failed", }, "untainted-reconnect-lost-replacement": { ID: "untainted-reconnect-lost-replacement", @@ -486,9 +478,20 @@ func TestAllocSet_filterByTainted(t *testing.T) { }, migrate: allocSet{}, disconnecting: allocSet{}, - reconnecting: allocSet{}, - ignore: allocSet{}, - lost: allocSet{}, + reconnecting: allocSet{ + "reconnecting-failed": { + ID: "reconnecting-failed", + Name: "reconnecting-failed", + ClientStatus: structs.AllocClientStatusFailed, + Job: testJob, + NodeID: "normal", + TaskGroup: "web", + AllocStates: unknownAllocState, + TaskStates: reconnectTaskState, + }, + }, + ignore: allocSet{}, + lost: allocSet{}, }, { name: "disco-client-disconnect", @@ -508,13 +511,14 @@ func TestAllocSet_filterByTainted(t *testing.T) { }, // Unknown allocs on disconnected nodes are ignored "ignore-unknown": { - ID: "ignore-unknown", - Name: "ignore-unknown", - ClientStatus: structs.AllocClientStatusUnknown, - Job: testJob, - NodeID: "disconnected", - TaskGroup: "web", - AllocStates: unknownAllocState, + ID: "ignore-unknown", + Name: "ignore-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + DesiredStatus: structs.AllocDesiredStatusRun, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: unknownAllocState, }, // Unknown allocs on disconnected nodes are lost when expired "lost-unknown": { @@ -543,13 +547,14 @@ func TestAllocSet_filterByTainted(t *testing.T) { ignore: allocSet{ // Unknown allocs on disconnected nodes are ignored "ignore-unknown": { - ID: "ignore-unknown", - Name: "ignore-unknown", - ClientStatus: structs.AllocClientStatusUnknown, - Job: testJob, - NodeID: "disconnected", - TaskGroup: "web", - AllocStates: unknownAllocState, + ID: "ignore-unknown", + Name: "ignore-unknown", + ClientStatus: structs.AllocClientStatusUnknown, + DesiredStatus: structs.AllocDesiredStatusRun, + Job: testJob, + NodeID: "disconnected", + TaskGroup: "web", + AllocStates: unknownAllocState, }, }, lost: allocSet{