From 786180601dc105b2efec2aef59348a45ae2bd032 Mon Sep 17 00:00:00 2001 From: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> Date: Wed, 16 Feb 2022 13:50:20 -0500 Subject: [PATCH] reconciler: support disconnected clients (#12058) * Add merge helper for string maps * structs: add statuses, MaxClientDisconnect, and helper funcs * taintedNodes: Include disconnected nodes * upsertAllocsImpl: don't use existing ClientStatus when upserting unknown * allocSet: update filterByTainted and add delayByMaxClientDisconnect * allocReconciler: support disconnecting and reconnecting allocs * GenericScheduler: upsert unknown and queue reconnecting Co-authored-by: Tim Gross --- helper/funcs.go | 23 +++ helper/funcs_test.go | 21 +++ nomad/state/state_store.go | 5 +- nomad/structs/structs.go | 90 +++++++--- scheduler/generic_sched.go | 20 ++- scheduler/generic_sched_test.go | 112 ++++++++++++ scheduler/reconcile.go | 250 +++++++++++++++++++++++++-- scheduler/reconcile_test.go | 283 +++++++++++++++++++++++++++++++ scheduler/reconcile_util.go | 63 ++++++- scheduler/reconcile_util_test.go | 120 +++++++++++-- scheduler/util.go | 13 +- 11 files changed, 944 insertions(+), 56 deletions(-) diff --git a/helper/funcs.go b/helper/funcs.go index 8c94d49cd..3493f413f 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -352,6 +352,29 @@ func CopyMapStringInterface(m map[string]interface{}) map[string]interface{} { return c } +// MergeMapStringString will merge two maps into one. If a duplicate key exists +// the value in the second map will replace the value in the first map. If both +// maps are empty or nil this returns an empty map. +func MergeMapStringString(m map[string]string, n map[string]string) map[string]string { + if len(m) == 0 && len(n) == 0 { + return map[string]string{} + } + if len(m) == 0 { + return n + } + if len(n) == 0 { + return m + } + + result := CopyMapStringString(m) + + for k, v := range n { + result[k] = v + } + + return result +} + func CopyMapStringInt(m map[string]int) map[string]int { l := len(m) if l == 0 { diff --git a/helper/funcs_test.go b/helper/funcs_test.go index ddd472dd7..0b1598648 100644 --- a/helper/funcs_test.go +++ b/helper/funcs_test.go @@ -207,6 +207,27 @@ func TestCopyMapSliceInterface(t *testing.T) { require.False(t, reflect.DeepEqual(m, c)) } +func TestMergeMapStringString(t *testing.T) { + type testCase struct { + map1 map[string]string + map2 map[string]string + expected map[string]string + } + + cases := []testCase{ + {map[string]string{"foo": "bar"}, map[string]string{"baz": "qux"}, map[string]string{"foo": "bar", "baz": "qux"}}, + {map[string]string{"foo": "bar"}, nil, map[string]string{"foo": "bar"}}, + {nil, map[string]string{"baz": "qux"}, map[string]string{"baz": "qux"}}, + {nil, nil, map[string]string{}}, + } + + for _, c := range cases { + if output := MergeMapStringString(c.map1, c.map2); !CompareMapStringString(output, c.expected) { + t.Errorf("MergeMapStringString(%q, %q) -> %q != %q", c.map1, c.map2, output, c.expected) + } + } +} + func TestCleanEnvVar(t *testing.T) { type testCase struct { input string diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1c0a73df3..a8413704f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3528,9 +3528,10 @@ func (s *StateStore) upsertAllocsImpl(index uint64, allocs []*structs.Allocation // Keep the clients task states alloc.TaskStates = exist.TaskStates - // If the scheduler is marking this allocation as lost we do not + // If the scheduler is marking this allocation as lost or unknown we do not // want to reuse the status of the existing allocation. - if alloc.ClientStatus != structs.AllocClientStatusLost { + if alloc.ClientStatus != structs.AllocClientStatusLost && + alloc.ClientStatus != structs.AllocClientStatusUnknown { alloc.ClientStatus = exist.ClientStatus alloc.ClientDescription = exist.ClientDescription } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index d1a6523b3..a7413cd5c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1694,16 +1694,17 @@ func (ne *NodeEvent) AddDetail(k, v string) *NodeEvent { } const ( - NodeStatusInit = "initializing" - NodeStatusReady = "ready" - NodeStatusDown = "down" + NodeStatusInit = "initializing" + NodeStatusReady = "ready" + NodeStatusDown = "down" + NodeStatusDisconnected = "disconnected" ) // ShouldDrainNode checks if a given node status should trigger an // evaluation. Some states don't require any further action. func ShouldDrainNode(status string) bool { switch status { - case NodeStatusInit, NodeStatusReady: + case NodeStatusInit, NodeStatusReady, NodeStatusDisconnected: return false case NodeStatusDown: return true @@ -1715,7 +1716,7 @@ func ShouldDrainNode(status string) bool { // ValidNodeStatus is used to check if a node status is valid func ValidNodeStatus(status string) bool { switch status { - case NodeStatusInit, NodeStatusReady, NodeStatusDown: + case NodeStatusInit, NodeStatusReady, NodeStatusDown, NodeStatusDisconnected: return true default: return false @@ -6165,6 +6166,10 @@ type TaskGroup struct { // StopAfterClientDisconnect, if set, configures the client to stop the task group // after this duration since the last known good heartbeat StopAfterClientDisconnect *time.Duration + + // MaxClientDisconnect, if set, configures the client to allow placed + // allocations for tasks in this group to attempt to resume running without a restart. + MaxClientDisconnect *time.Duration } func (tg *TaskGroup) Copy() *TaskGroup { @@ -9424,6 +9429,7 @@ const ( AllocClientStatusComplete = "complete" AllocClientStatusFailed = "failed" AllocClientStatusLost = "lost" + AllocClientStatusUnknown = "unknown" ) // Allocation is used to allocate the placement of a task group to a node. @@ -9874,6 +9880,26 @@ func (a *Allocation) WaitClientStop() time.Time { return t.Add(*tg.StopAfterClientDisconnect + kill) } +// DisconnectTimeout uses the MaxClientDisconnect to compute when the allocation +// should transition to lost. +func (a *Allocation) DisconnectTimeout(now time.Time) time.Time { + if a == nil || a.Job == nil { + return now + } + + tg := a.Job.LookupTaskGroup(a.TaskGroup) + + // Prefer the duration from the task group. + timeout := tg.MaxClientDisconnect + + // If not configured, return now + if timeout == nil { + return now + } + + return now.Add(*timeout) +} + // NextDelay returns a duration after which the allocation can be rescheduled. // It is calculated according to the delay function and previous reschedule attempts. func (a *Allocation) NextDelay() time.Duration { @@ -10374,6 +10400,15 @@ func (a *AllocMetric) PopulateScoreMetaData() { } } +// MaxNormScore returns the ScoreMetaData entry with the highest normalized +// score. +func (a *AllocMetric) MaxNormScore() *NodeScoreMeta { + if a == nil || len(a.ScoreMetaData) == 0 { + return nil + } + return a.ScoreMetaData[0] +} + // NodeScoreMeta captures scoring meta data derived from // different scoring factors. type NodeScoreMeta struct { @@ -10502,21 +10537,22 @@ const ( ) const ( - EvalTriggerJobRegister = "job-register" - EvalTriggerJobDeregister = "job-deregister" - EvalTriggerPeriodicJob = "periodic-job" - EvalTriggerNodeDrain = "node-drain" - EvalTriggerNodeUpdate = "node-update" - EvalTriggerAllocStop = "alloc-stop" - EvalTriggerScheduled = "scheduled" - EvalTriggerRollingUpdate = "rolling-update" - EvalTriggerDeploymentWatcher = "deployment-watcher" - EvalTriggerFailedFollowUp = "failed-follow-up" - EvalTriggerMaxPlans = "max-plan-attempts" - EvalTriggerRetryFailedAlloc = "alloc-failure" - EvalTriggerQueuedAllocs = "queued-allocs" - EvalTriggerPreemption = "preemption" - EvalTriggerScaling = "job-scaling" + EvalTriggerJobRegister = "job-register" + EvalTriggerJobDeregister = "job-deregister" + EvalTriggerPeriodicJob = "periodic-job" + EvalTriggerNodeDrain = "node-drain" + EvalTriggerNodeUpdate = "node-update" + EvalTriggerAllocStop = "alloc-stop" + EvalTriggerScheduled = "scheduled" + EvalTriggerRollingUpdate = "rolling-update" + EvalTriggerDeploymentWatcher = "deployment-watcher" + EvalTriggerFailedFollowUp = "failed-follow-up" + EvalTriggerMaxPlans = "max-plan-attempts" + EvalTriggerRetryFailedAlloc = "alloc-failure" + EvalTriggerQueuedAllocs = "queued-allocs" + EvalTriggerPreemption = "preemption" + EvalTriggerScaling = "job-scaling" + EvalTriggerMaxDisconnectTimeout = "max-disconnect-timeout" ) const ( @@ -10618,7 +10654,8 @@ type Evaluation struct { Wait time.Duration // WaitUntil is the time when this eval should be run. This is used to - // supported delayed rescheduling of failed allocations + // supported delayed rescheduling of failed allocations, and delayed + // stopping of allocations that are configured with resume_after_client_reconnect. WaitUntil time.Time // NextEval is the evaluation ID for the eval created to do a followup. @@ -11133,6 +11170,17 @@ func (p *Plan) AppendPreemptedAlloc(alloc *Allocation, preemptingAllocID string) p.NodePreemptions[node] = append(existing, newAlloc) } +// AppendUnknownAlloc marks an allocation as unknown. +func (p *Plan) AppendUnknownAlloc(alloc *Allocation) { + // Strip the job as it's set once on the ApplyPlanResultRequest. + alloc.Job = nil + // Strip the resources as they can be rebuilt. + alloc.Resources = nil + + existing := p.NodeAllocation[alloc.NodeID] + p.NodeAllocation[alloc.NodeID] = append(existing, alloc) +} + func (p *Plan) PopUpdate(alloc *Allocation) { existing := p.NodeUpdate[alloc.NodeID] n := len(existing) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index d15d192bb..e9489739f 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -33,6 +33,9 @@ const ( // allocLost is the status used when an allocation is lost allocLost = "alloc is lost since its node is down" + // allocUnknown is the status used when an allocation is unknown + allocUnknown = "alloc is unknown since its node is disconnected" + // allocInPlace is the status used when speculating on an in-place update allocInPlace = "alloc updating in-place" @@ -55,6 +58,11 @@ const ( // up evals for delayed rescheduling reschedulingFollowupEvalDesc = "created for delayed rescheduling" + // disconnectTimeoutFollowupEvalDesc is the description used when creating follow + // up evals for allocations that be should be stopped after its disconnect + // timeout has passed. + disconnectTimeoutFollowupEvalDesc = "created for delayed disconnect timeout" + // maxPastRescheduleEvents is the maximum number of past reschedule event // that we track when unlimited rescheduling is enabled maxPastRescheduleEvents = 5 @@ -148,7 +156,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) (err error) { structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans, structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc, structs.EvalTriggerFailedFollowUp, structs.EvalTriggerPreemption, - structs.EvalTriggerScaling: + structs.EvalTriggerScaling, structs.EvalTriggerMaxDisconnectTimeout: default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) @@ -392,6 +400,11 @@ func (s *GenericScheduler) computeJobAllocs() error { s.plan.AppendStoppedAlloc(stop.alloc, stop.statusDescription, stop.clientStatus, stop.followupEvalID) } + // Handle disconnect updates + for _, update := range results.disconnectUpdates { + s.plan.AppendUnknownAlloc(update) + } + // Handle the in-place updates for _, update := range results.inplaceUpdate { if update.DeploymentID != s.deployment.GetID() { @@ -406,6 +419,11 @@ func (s *GenericScheduler) computeJobAllocs() error { s.ctx.Plan().AppendAlloc(update, nil) } + // Handle reconnect updates + for _, update := range results.reconnectUpdates { + s.ctx.Plan().AppendAlloc(update, nil) + } + // Nothing remaining to do if placement is not required if len(results.place)+len(results.destructiveUpdate) == 0 { // If the job has been purged we don't have access to the job. Otherwise diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 08d1ebc5a..f76e03162 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -6514,3 +6514,115 @@ func TestPropagateTaskState(t *testing.T) { }) } } + +// Tests that a client disconnect generates attribute updates and follow up evals. +func TestServiceSched_Client_Disconnect_Creates_Updates_and_Evals(t *testing.T) { + h := NewHarness(t) + count := 1 + maxClientDisconnect := 10 * time.Minute + + disconnectedNode, job, unknownAllocs := initNodeAndAllocs(t, h, count, maxClientDisconnect, + structs.NodeStatusReady, structs.AllocClientStatusRunning) + + // Now disconnect the node + disconnectedNode.Status = structs.NodeStatusDisconnected + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), disconnectedNode)) + + // Create an evaluation triggered by the disconnect + evals := []*structs.Evaluation{{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: disconnectedNode.ID, + Status: structs.EvalStatusPending, + }} + nodeStatusUpdateEval := evals[0] + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), evals)) + + // Process the evaluation + err := h.Process(NewServiceScheduler, nodeStatusUpdateEval) + require.NoError(t, err) + require.Equal(t, structs.EvalStatusComplete, h.Evals[0].Status) + require.Len(t, h.Plans, 1, "plan") + + // One followup delayed eval created + require.Len(t, h.CreateEvals, 1) + followUpEval := h.CreateEvals[0] + require.Equal(t, nodeStatusUpdateEval.ID, followUpEval.PreviousEval) + require.Equal(t, "pending", followUpEval.Status) + require.NotEmpty(t, followUpEval.WaitUntil) + + // Insert eval in the state store + testutil.WaitForResult(func() (bool, error) { + found, err := h.State.EvalByID(nil, followUpEval.ID) + if err != nil { + return false, err + } + if found == nil { + return false, nil + } + + require.Equal(t, nodeStatusUpdateEval.ID, found.PreviousEval) + require.Equal(t, "pending", found.Status) + require.NotEmpty(t, found.WaitUntil) + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + // Validate that the ClientStatus updates are part of the plan. + require.Len(t, h.Plans[0].NodeAllocation[disconnectedNode.ID], count) + // Pending update should have unknown status. + for _, nodeAlloc := range h.Plans[0].NodeAllocation[disconnectedNode.ID] { + require.Equal(t, nodeAlloc.ClientStatus, structs.AllocClientStatusUnknown) + } + + // Simulate that NodeAllocation got processed. + err = h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), h.Plans[0].NodeAllocation[disconnectedNode.ID]) + require.NoError(t, err, "plan.NodeUpdate") + + // Validate that the StateStore Upsert applied the ClientStatus we specified. + for _, alloc := range unknownAllocs { + alloc, err = h.State.AllocByID(nil, alloc.ID) + require.NoError(t, err) + require.Equal(t, alloc.ClientStatus, structs.AllocClientStatusUnknown) + + // Allocations have been transitioned to unknown + require.Equal(t, structs.AllocDesiredStatusRun, alloc.DesiredStatus) + require.Equal(t, structs.AllocClientStatusUnknown, alloc.ClientStatus) + } +} + +func initNodeAndAllocs(t *testing.T, h *Harness, allocCount int, + maxClientDisconnect time.Duration, nodeStatus, clientStatus string) (*structs.Node, *structs.Job, []*structs.Allocation) { + // Node, which is ready + node := mock.Node() + node.Status = nodeStatus + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Job with allocations and max_client_disconnect + job := mock.Job() + job.TaskGroups[0].Count = allocCount + job.TaskGroups[0].MaxClientDisconnect = &maxClientDisconnect + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + allocs := make([]*structs.Allocation, allocCount) + for i := 0; i < allocCount; i++ { + // Alloc for the running group + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = clientStatus + + allocs[i] = alloc + } + + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs)) + return node, job, allocs +} diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 3eb9f870b..5371dd53f 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -114,6 +114,14 @@ type reconcileResults struct { // jobspec change. attributeUpdates map[string]*structs.Allocation + // disconnectUpdates is the set of allocations are on disconnected nodes, but + // have not yet had their ClientStatus set to AllocClientStatusUnknown. + disconnectUpdates map[string]*structs.Allocation + + // reconnectUpdates is the set of allocations that have ClientStatus set to + // AllocClientStatusUnknown, but the associated Node has reconnected. + reconnectUpdates map[string]*structs.Allocation + // desiredTGUpdates captures the desired set of changes to make for each // task group. desiredTGUpdates map[string]*structs.DesiredUpdates @@ -178,6 +186,8 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch now: time.Now(), result: &reconcileResults{ attributeUpdates: make(map[string]*structs.Allocation), + disconnectUpdates: make(map[string]*structs.Allocation), + reconnectUpdates: make(map[string]*structs.Allocation), desiredTGUpdates: make(map[string]*structs.DesiredUpdates), desiredFollowupEvals: make(map[string][]*structs.Evaluation), }, @@ -326,11 +336,15 @@ func (a *allocReconciler) handleStop(m allocMatrix) { } } +// filterAndStopAll stops all allocations in an allocSet. This is useful in when +// stopping an entire job or task group. func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 { - untainted, migrate, lost := set.filterByTainted(a.taintedNodes) + untainted, migrate, lost, disconnecting, reconnecting := set.filterByTainted(a.taintedNodes) a.markStop(untainted, "", allocNotNeeded) a.markStop(migrate, "", allocNotNeeded) a.markStop(lost, structs.AllocClientStatusLost, allocLost) + a.markStop(disconnecting, "", allocNotNeeded) + a.markStop(reconnecting, "", allocNotNeeded) return uint64(len(set)) } @@ -387,7 +401,7 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { canaries, all := a.cancelUnneededCanaries(all, desiredChanges) // Determine what set of allocations are on tainted nodes - untainted, migrate, lost := all.filterByTainted(a.taintedNodes) + untainted, migrate, lost, disconnecting, reconnecting := all.filterByTainted(a.taintedNodes) // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch, a.now, a.evalID, a.deployment) @@ -396,7 +410,14 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { lostLater := lost.delayByStopAfterClientDisconnect() lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name) - // Create batched follow up evaluations for allocations that are + // Find delays for any disconnecting allocs that have resume_after_client_reconnect, + // create followup evals, and update the ClientStatus to unknown. + timeoutLaterEvals := a.createTimeoutLaterEvals(disconnecting, tg.Name) + // Merge disconnecting with the stop_after_client_disconnect set into the + // lostLaterEvals so that computeStop can add them to the stop set. + lostLaterEvals = helper.MergeMapStringString(lostLaterEvals, timeoutLaterEvals) + + // Create batched follow-up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name) @@ -408,10 +429,13 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals) + stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, reconnecting, isCanarying, lostLaterEvals) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) + // Validate and add reconnecting allocs to the plan so that they will be synced by the client on next poll. + a.computeReconnecting(reconnecting) + // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. ignore, inplace, destructive := a.computeUpdates(tg, untainted) @@ -442,9 +466,10 @@ func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // * If there are any canaries that they have been promoted // * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group // * An alloc was lost + // * There is not a corresponding reconnecting alloc. var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, reconnecting, isCanarying) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -579,7 +604,7 @@ func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *s } canaries = all.fromKeys(canaryIDs) - untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes) + untainted, migrate, lost, _, _ := canaries.filterByTainted(a.taintedNodes) a.markStop(migrate, "", allocMigrating) a.markStop(lost, structs.AllocClientStatusLost, allocLost) @@ -639,7 +664,7 @@ func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, un // // Placements will meet or exceed group count. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet, + nameIndex *allocNameIndex, untainted, migrate, reschedule, lost, reconnecting allocSet, isCanarying bool) []allocPlaceResult { // Add rescheduled placement results @@ -659,7 +684,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, } // Add replacements for lost allocs up to group.Count - existing := len(untainted) + len(migrate) + len(reschedule) + existing := len(untainted) + len(migrate) + len(reschedule) + len(reconnecting) for _, alloc := range lost { if existing >= group.Count { @@ -859,7 +884,7 @@ func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, in // the group definition, the set of allocations in various states and whether we // are canarying. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet { + untainted, migrate, lost, canaries, reconnecting allocSet, isCanarying bool, followupEvals map[string]string) allocSet { // Mark all lost allocations for stop. var stop allocSet @@ -872,7 +897,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc } // Hot path the nothing to do case - remove := len(untainted) + len(migrate) - group.Count + remove := len(untainted) + len(migrate) + len(reconnecting) - group.Count if remove <= 0 { return stop } @@ -925,6 +950,14 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc } } + // Handle allocs that might be able to reconnect. + if len(reconnecting) != 0 { + remove = a.computeStopByReconnecting(untainted, reconnecting, stop, remove) + if remove == 0 { + return stop + } + } + // Select the allocs with the highest count to remove removeNames := nameIndex.Highest(uint(remove)) for id, alloc := range untainted { @@ -962,6 +995,82 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc return stop } +// computeStopByReconnecting moves allocations from either the untainted or reconnecting +// sets to the stop set and returns the number of allocations that still need to be removed. +func (a *allocReconciler) computeStopByReconnecting(untainted, reconnecting, stop allocSet, remove int) int { + if remove == 0 { + return remove + } + + for _, reconnectingAlloc := range reconnecting { + // if the desired status is not run, or if the user-specified desired + // transition is not run, stop the allocation. + if reconnectingAlloc.DesiredStatus != structs.AllocDesiredStatusRun || + reconnectingAlloc.DesiredTransition.ShouldMigrate() || + reconnectingAlloc.DesiredTransition.ShouldReschedule() || + reconnectingAlloc.DesiredTransition.ShouldForceReschedule() { + + stop[reconnectingAlloc.ID] = reconnectingAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: reconnectingAlloc, + statusDescription: allocNotNeeded, + }) + delete(reconnecting, reconnectingAlloc.ID) + + remove-- + // if we've removed all we need to, stop iterating and return. + if remove == 0 { + return remove + } + } + + // Compare reconnecting to untainted and decide which to keep. + for _, untaintedAlloc := range untainted { + // If not a match by name go to next + if reconnectingAlloc.Name != untaintedAlloc.Name { + continue + } + + // By default, we prefer stopping the replacement alloc unless + // the replacement has a higher metrics score. + stopAlloc := untaintedAlloc + deleteSet := untainted + untaintedMaxScoreMeta := untaintedAlloc.Metrics.MaxNormScore() + reconnectingMaxScoreMeta := reconnectingAlloc.Metrics.MaxNormScore() + + if untaintedMaxScoreMeta == nil { + a.logger.Error(fmt.Sprintf("error computing stop: replacement allocation metrics not available for alloc.name %q", untaintedAlloc.Name)) + continue + } + + if reconnectingMaxScoreMeta == nil { + a.logger.Error(fmt.Sprintf("error computing stop: reconnecting allocation metrics not available for alloc.name %q", reconnectingAlloc.Name)) + continue + } + + if untaintedMaxScoreMeta.NormScore > reconnectingMaxScoreMeta.NormScore { + stopAlloc = reconnectingAlloc + deleteSet = reconnecting + } + + stop[stopAlloc.ID] = stopAlloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: stopAlloc, + statusDescription: allocNotNeeded, + }) + delete(deleteSet, stopAlloc.ID) + + remove-- + // if we've removed all we need to, stop iterating and return. + if remove == 0 { + return remove + } + } + } + + return remove +} + // computeUpdates determines which allocations for the passed group require // updates. Three groups are returned: // 1. Those that require no upgrades @@ -1005,7 +1114,33 @@ func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedR } } -// createLostLaterEvals creates batched followup evaluations with the WaitUntil field set for +// computeReconnecting copies existing allocations in the unknown state, but +// whose nodes have been identified as ready. The Allocations DesiredStatus is +// set to running, and these allocs are appended to the Plan as non-destructive +// updates. Clients are responsible for reconciling the DesiredState with the +// actual state as the node comes back online. +func (a *allocReconciler) computeReconnecting(reconnecting allocSet) { + if len(reconnecting) == 0 { + return + } + + // Create updates that will be appended to the plan. + for _, alloc := range reconnecting { + // If the user has defined a DesiredTransition don't resume the alloc. + if alloc.DesiredTransition.ShouldMigrate() || alloc.DesiredTransition.ShouldReschedule() || alloc.DesiredTransition.ShouldForceReschedule() { + continue + } + + // If the scheduler has defined a terminal DesiredStatus don't resume the alloc. + if alloc.DesiredStatus != structs.AllocDesiredStatusRun { + continue + } + + a.result.reconnectUpdates[alloc.ID] = alloc.Copy() + } +} + +// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for // lost allocations. followupEvals are appended to a.result as a side effect, we return a // map of alloc IDs to their followupEval IDs. func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string { @@ -1062,12 +1197,101 @@ func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedResched emitRescheduleInfo(allocReschedInfo.alloc, eval) } - a.result.desiredFollowupEvals[tgName] = evals + a.appendFollowupEvals(tgName, evals) return allocIDToFollowupEvalID } -// emitRescheduleInfo emits metrics about the reschedule decision of an evaluation. If a followup evaluation is +// createTimeoutLaterEvals creates followup evaluations with the +// WaitUntil field set for allocations in an unknown state on disconnected nodes. +// Followup Evals are appended to a.result as a side effect. It returns a map of +// allocIDs to their associated followUpEvalIDs. +func (a *allocReconciler) createTimeoutLaterEvals(disconnecting allocSet, tgName string) map[string]string { + if len(disconnecting) == 0 { + return map[string]string{} + } + + timeoutDelays, err := disconnecting.delayByMaxClientDisconnect(a.now) + if err != nil || len(timeoutDelays) != len(disconnecting) { + a.logger.Error(fmt.Sprintf("error computing disconnecting timeouts for task_group.name %q: %s", tgName, err)) + return map[string]string{} + } + + // Sort by time + sort.Slice(timeoutDelays, func(i, j int) bool { + return timeoutDelays[i].rescheduleTime.Before(timeoutDelays[j].rescheduleTime) + }) + + var evals []*structs.Evaluation + nextReschedTime := timeoutDelays[0].rescheduleTime + allocIDToFollowupEvalID := make(map[string]string, len(timeoutDelays)) + + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.evalPriority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + StatusDescription: disconnectTimeoutFollowupEvalDesc, + WaitUntil: nextReschedTime, + } + evals = append(evals, eval) + + // Important to remember that these are sorted. The rescheduleTime can only + // get farther into the future. If this loop detects the next delay is greater + // than the batch window (5s) it creates another batch. + for _, timeoutInfo := range timeoutDelays { + if timeoutInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize { + allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID + } else { + // Start a new batch + nextReschedTime = timeoutInfo.rescheduleTime + // Create a new eval for the new batch + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: a.job.Namespace, + Priority: a.evalPriority, + Type: a.job.Type, + TriggeredBy: structs.EvalTriggerMaxDisconnectTimeout, + JobID: a.job.ID, + JobModifyIndex: a.job.ModifyIndex, + Status: structs.EvalStatusPending, + StatusDescription: disconnectTimeoutFollowupEvalDesc, + WaitUntil: timeoutInfo.rescheduleTime, + } + evals = append(evals, eval) + allocIDToFollowupEvalID[timeoutInfo.allocID] = eval.ID + } + + // Create updates that will be applied to the allocs to mark the FollowupEvalID + // and the unknown ClientStatus. + updatedAlloc := timeoutInfo.alloc.Copy() + updatedAlloc.ClientStatus = structs.AllocClientStatusUnknown + updatedAlloc.ClientDescription = allocUnknown + updatedAlloc.FollowupEvalID = eval.ID + a.result.disconnectUpdates[updatedAlloc.ID] = updatedAlloc + } + + a.appendFollowupEvals(tgName, evals) + + return allocIDToFollowupEvalID +} + +// appendFollowupEvals appends a set of followup evals for a task group to the +// desiredFollowupEvals map which is later added to the scheduler's followUpEvals set. +func (a *allocReconciler) appendFollowupEvals(tgName string, evals []*structs.Evaluation) { + // Merge with + if existingFollowUpEvals, ok := a.result.desiredFollowupEvals[tgName]; ok { + evals = append(existingFollowUpEvals, evals...) + } + + a.result.desiredFollowupEvals[tgName] = evals +} + +// emitRescheduleInfo emits metrics about the rescheduling decision of an evaluation. If a followup evaluation is // provided, the waitUntil time is emitted. func emitRescheduleInfo(alloc *structs.Allocation, followupEval *structs.Evaluation) { // Emit short-lived metrics data point. Note, these expire and stop emitting after about a minute. diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 1a9fc37f7..d8847103f 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -257,6 +257,8 @@ type resultExpectation struct { destructive int inplace int attributeUpdates int + disconnectUpdates int + reconnectUpdates int stop int desiredTGUpdates map[string]*structs.DesiredUpdates } @@ -283,10 +285,61 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { assertion.Len(r.destructiveUpdate, exp.destructive, "Expected Destructive") assertion.Len(r.inplaceUpdate, exp.inplace, "Expected Inplace Updates") assertion.Len(r.attributeUpdates, exp.attributeUpdates, "Expected Attribute Updates") + assertion.Len(r.reconnectUpdates, exp.reconnectUpdates, "Expected Reconnect Updates") + assertion.Len(r.disconnectUpdates, exp.disconnectUpdates, "Expected Disconnect Updates") assertion.Len(r.stop, exp.stop, "Expected Stops") assertion.EqualValues(exp.desiredTGUpdates, r.desiredTGUpdates, "Expected Desired TG Update Annotations") } +func buildAllocations(job *structs.Job, count int, clientStatus, desiredStatus string, nodeScore float64) []*structs.Allocation { + allocs := make([]*structs.Allocation, 0) + + for i := 0; i < count; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = uuid.Generate() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + alloc.ClientStatus = clientStatus + alloc.DesiredStatus = desiredStatus + + alloc.Metrics = &structs.AllocMetric{ + ScoreMetaData: []*structs.NodeScoreMeta{ + { + NodeID: alloc.NodeID, + NormScore: nodeScore, + Scores: map[string]float64{ + alloc.NodeID: nodeScore, + }, + }, + }, + } + + allocs = append(allocs, alloc) + } + + return allocs +} + +func buildDisconnectedNodes(allocs []*structs.Allocation, count int) map[string]*structs.Node { + tainted := make(map[string]*structs.Node, count) + for i := 0; i < count; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Status = structs.NodeStatusDisconnected + tainted[n.ID] = n + } + return tainted +} + +func buildResumableAllocations(count int, clientStatus, desiredStatus string, nodeScore float64) (*structs.Job, []*structs.Allocation) { + job := mock.Job() + job.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Minute) + job.TaskGroups[0].Count = count + + return job, buildAllocations(job, count, clientStatus, desiredStatus, nodeScore) +} + // Tests the reconciler properly handles placements for a job that has no // existing allocations func TestReconciler_Place_NoExisting(t *testing.T) { @@ -5169,3 +5222,233 @@ func TestReconciler_RescheduleNot_Batch(t *testing.T) { }, }) } + +// Tests that when a node disconnects running allocations are queued to transition to unknown. +func TestReconciler_Node_Disconnect_Updates_Alloc_To_Unknown(t *testing.T) { + job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + // Build a map of disconnected nodes + nodes := buildDisconnectedNodes(allocs, 2) + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nodes, "", 50) + reconciler.now = time.Now().UTC() + results := reconciler.Compute() + + // Verify that 1 follow up eval was created with the values we expect. + evals := results.desiredFollowupEvals[job.TaskGroups[0].Name] + require.Len(t, evals, 1) + expectedTime := reconciler.now.Add(5 * time.Minute) + + eval := evals[0] + require.NotNil(t, eval.WaitUntil) + require.Equal(t, expectedTime, eval.WaitUntil) + + // Validate that the queued disconnectUpdates have the right client status, + // and that they have a valid FollowUpdEvalID. + for _, disconnectUpdate := range results.disconnectUpdates { + require.Equal(t, structs.AllocClientStatusUnknown, disconnectUpdate.ClientStatus) + require.NotEmpty(t, disconnectUpdate.FollowupEvalID) + require.Equal(t, eval.ID, disconnectUpdate.FollowupEvalID) + } + + // 2 to place, 2 to update, 1 to ignore + assertResults(t, results, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 2, + stop: 0, + inplace: 0, + disconnectUpdates: 2, + + // 2 to place and 1 to ignore + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 2, + Stop: 0, + Ignore: 1, + InPlaceUpdate: 0, + }, + }, + }) +} + +// Tests that when a node reconnects unknown allocations for that node are queued +// to resume on the client, and that any replacement allocations that were scheduled +// are queued to stop. +func TestReconciler_Node_Reconnect_ScaleIn_And_Reconnect_Unknown(t *testing.T) { + // TODO: Table tests + // * Some replacements have a higher nodes score + // * Scores are a tie + // * Canarying + + // Create 2 resumable allocs with a node score of 2. + job, allocs := buildResumableAllocations(2, structs.AllocClientStatusUnknown, structs.AllocDesiredStatusRun, 2) + + // Adjust the desired count on the job's Task group that got set in the helper. + job.TaskGroups[0].Count = 3 + + // Create 3 placed allocs with a lower nodeScore here. + scaleInAllocs := buildAllocations(job, 3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 1) + + // 2 should scale in, since we are passing nil in tainted nodes. We pass the + // allocUpdateFnIgnore, because computeUpdates in a real setting should return + // ignore == true for the 1 remaining untainted update after computeStop + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, append(allocs, scaleInAllocs...), nil, "", 50) + reconciler.now = time.Now().UTC() + results := reconciler.Compute() + + // Verify that 0 follow up evals were created. + evals := results.desiredFollowupEvals[job.TaskGroups[0].Name] + require.Len(t, evals, 0) + + // Validate that the queued reconnectUpdates have the right client status, + // and that they have no FollowUpdEvalID. + for _, reconnectUpdate := range results.reconnectUpdates { + require.Equal(t, structs.AllocClientStatusUnknown, reconnectUpdate.ClientStatus) + require.Empty(t, reconnectUpdate.FollowupEvalID) + require.Equal(t, structs.AllocDesiredStatusRun, reconnectUpdate.DesiredStatus) + } + + // 2 to stop, 2 reconnect updates, 1 to ignore + assertResults(t, results, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + stop: 2, + destructive: 0, + inplace: 0, + disconnectUpdates: 0, + reconnectUpdates: 2, + + // TODO: Figure out how this needs to change. + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 0, + Stop: 2, + DestructiveUpdate: 0, + Ignore: 1, + InPlaceUpdate: 0, + }, + }, + }) +} + +// Tests that the future timeout evals that get created when a node disconnects +// stop once the duration passes. +func TestReconciler_Disconnected_Node_FollowUpEvals_Stop_After_Timeout(t *testing.T) { + // TODO: Add table tests and play with the reconciler time/node status to make sure that + // if the expiration time has not passed, it's a no-op. + + // Build a set of resumable allocations. Helper will set the timeout to 5 min. + job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + + // Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little + // more discernible that only the affected alloc(s) get marked unknown. + nodes := buildDisconnectedNodes(allocs, 2) + + // Invoke the reconciler to queue status changes and get the followup evals. + // Use the allocUpdateFnIngore since alloc.TerminalStatus() will evaluate to + // false and cause the real genericAllocUpdateFn to return ignore=true destructive=false + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nodes, "", 50) + reconciler.now = time.Now().UTC() + results := reconciler.Compute() + + // Verify that 1 follow up eval was created. + evals := results.desiredFollowupEvals[job.TaskGroups[0].Name] + require.Len(t, evals, 1) + eval := evals[0] + + // Set the NodeStatus to Down on the 2 disconnected nodes to simulate that + // the resume duration has passed. + for _, node := range nodes { + node.Status = structs.NodeStatusDown + } + + // Replace the allocs that were originally created with the updated copies that + // have the unknown ClientStatus. + for i, alloc := range allocs { + for id, updated := range results.disconnectUpdates { + if alloc.ID == id { + allocs[i] = updated + } + } + } + + // Run the followup eval through the reconciler and verify the resumable allocs + // have timed out, will be stopped, and new placements are scheduled. + reconciler = NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nodes, eval.ID, eval.Priority) + + // Allocs were configured to expire in 5 min, so configure the reconciler + // to believe that time has passed. + // NOTE: this probably isn't really necessary because this value is really + // only used for computing future evals, but it seemed like good practice + // in case there are other unconsidered side effects. + reconciler.now = time.Now().UTC().Add(6 * time.Minute) + results = reconciler.Compute() + + // Validate that the queued stops have the right client status. + for _, stopResult := range results.stop { + require.Equal(t, structs.AllocClientStatusLost, stopResult.clientStatus) + } + + // 2 to place, 2 to stop, 1 to ignore + assertResults(t, results, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 2, + destructive: 0, + stop: 2, + inplace: 0, + disconnectUpdates: 0, + reconnectUpdates: 0, + + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 2, + Stop: 2, + DestructiveUpdate: 0, + Ignore: 1, + InPlaceUpdate: 0, + }, + }, + }) +} + +func TestReconciler_Compute_Disconnecting(t *testing.T) { + // Build a set of resumable allocations. Helper will set the timeout to 5 min. + job, allocs := buildResumableAllocations(3, structs.AllocClientStatusRunning, structs.AllocDesiredStatusRun, 2) + + // Build a map of disconnected nodes. Only disconnect 2 of the nodes to make it a little + // more discernible that only the affected alloc(s) get marked unknown. + nodes := buildDisconnectedNodes(allocs, 2) + + reconciler := NewAllocReconciler(testlog.HCLogger(t), allocUpdateFnIgnore, false, job.ID, job, + nil, allocs, nodes, "", 50) + reconciler.now = time.Now().UTC() + + tgName := allocs[0].TaskGroup + matrix := newAllocMatrix(job, allocs) + _, _, _, reconnecting, _ := matrix[tgName].filterByTainted(nodes) + require.NotNil(t, reconnecting) + require.Len(t, reconnecting, 2) + + result := reconciler.createTimeoutLaterEvals(reconnecting, tgName) + require.NotNil(t, result) + require.Len(t, reconciler.result.desiredFollowupEvals, 1) + + evals := reconciler.result.desiredFollowupEvals[tgName] + + for _, eval := range evals { + found := false + for _, evalID := range result { + found = eval.ID == evalID + if found { + break + } + } + require.True(t, found) + } +} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index e6d8d9aa5..7ac268d7f 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -209,14 +209,19 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet { } // filterByTainted takes a set of tainted nodes and filters the allocation set -// into three groups: +// into 5 groups: // 1. Those that exist on untainted nodes // 2. Those exist on nodes that are draining // 3. Those that exist on lost nodes -func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost allocSet) { +// 4. Those that are on nodes that are disconnected, but have not had their ClientState set to unknown +// 5. Those that have had their ClientState set to unknown, but their node has reconnected. +func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost, disconnecting, reconnecting allocSet) { untainted = make(map[string]*structs.Allocation) migrate = make(map[string]*structs.Allocation) lost = make(map[string]*structs.Allocation) + disconnecting = make(map[string]*structs.Allocation) + reconnecting = make(map[string]*structs.Allocation) + for _, alloc := range a { // Terminal allocs are always untainted as they should never be migrated if alloc.TerminalStatus() { @@ -232,11 +237,41 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain taintedNode, ok := taintedNodes[alloc.NodeID] if !ok { - // Node is untainted so alloc is untainted + // Filter allocs on a node that is now re-connected to be resumed. + if alloc.ClientStatus == structs.AllocClientStatusUnknown { + reconnecting[alloc.ID] = alloc + continue + } + + // Otherwise, Node is untainted so alloc is untainted untainted[alloc.ID] = alloc continue } + if taintedNode != nil { + // Group disconnecting/reconnecting + switch taintedNode.Status { + case structs.NodeStatusDisconnected: + // Filter running allocs on a node that is disconnected to be marked as unknown. + if alloc.ClientStatus == structs.AllocClientStatusRunning { + disconnecting[alloc.ID] = alloc + continue + } + // Filter pending allocs on a node that is disconnected to be marked as lost. + if alloc.ClientStatus == structs.AllocClientStatusPending { + lost[alloc.ID] = alloc + continue + } + case structs.NodeStatusReady: + // Filter unknown allocs on a node that is connected to reconnect. + if alloc.ClientStatus == structs.AllocClientStatusUnknown { + reconnecting[alloc.ID] = alloc + continue + } + default: + } + } + // Allocs on GC'd (nil) or lost nodes are Lost if taintedNode == nil || taintedNode.TerminalStatus() { lost[alloc.ID] = alloc @@ -245,7 +280,9 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untain // All other allocs are untainted untainted[alloc.ID] = alloc + } + return } @@ -413,6 +450,26 @@ func (a allocSet) delayByStopAfterClientDisconnect() (later []*delayedReschedule return later } +// delayByMaxClientDisconnect returns a delay for any unknown allocation +// that's got a max_client_reconnect configured +func (a allocSet) delayByMaxClientDisconnect(now time.Time) (later []*delayedRescheduleInfo, err error) { + for _, alloc := range a { + timeout := alloc.DisconnectTimeout(now) + + if !timeout.After(now) { + continue + } + + later = append(later, &delayedRescheduleInfo{ + allocID: alloc.ID, + alloc: alloc, + rescheduleTime: timeout, + }) + } + + return +} + // allocNameIndex is used to select allocation names for placement or removal // given an existing set of placed allocations. type allocNameIndex struct { diff --git a/scheduler/reconcile_util_test.go b/scheduler/reconcile_util_test.go index 17617cf3c..b24ebc718 100644 --- a/scheduler/reconcile_util_test.go +++ b/scheduler/reconcile_util_test.go @@ -39,8 +39,6 @@ func TestBitmapFrom(t *testing.T) { func TestAllocSet_filterByTainted(t *testing.T) { ci.Parallel(t) - require := require.New(t) - nodes := map[string]*structs.Node{ "draining": { ID: "draining", @@ -55,6 +53,10 @@ func TestAllocSet_filterByTainted(t *testing.T) { ID: "normal", Status: structs.NodeStatusReady, }, + "disconnected": { + ID: "disconnected", + Status: structs.NodeStatusDisconnected, + }, } batchJob := &structs.Job{ @@ -119,18 +121,108 @@ func TestAllocSet_filterByTainted(t *testing.T) { Job: batchJob, NodeID: "lost", }, + // Non-terminal allocs on disconnected nodes are disconnecting + "disconnecting1": { + ID: "disconnecting1", + ClientStatus: structs.AllocClientStatusRunning, + Job: batchJob, + NodeID: "disconnected", + }, + // Non-terminal allocs on disconnected nodes are disconnecting + "disconnecting2": { + ID: "disconnecting2", + ClientStatus: structs.AllocClientStatusRunning, + Job: batchJob, + NodeID: "disconnected", + }, + // Non-terminal allocs on disconnected nodes are disconnecting + "disconnecting3": { + ID: "disconnecting3", + ClientStatus: structs.AllocClientStatusRunning, + Job: batchJob, + NodeID: "disconnected", + }, + // Complete allocs on disconnected nodes don't get restarted + "disconnecting4": { + ID: "disconnecting4", + ClientStatus: structs.AllocClientStatusComplete, + Job: batchJob, + NodeID: "disconnected", + }, + // Failed allocs on disconnected nodes don't get restarted + "disconnecting5": { + ID: "disconnecting5", + ClientStatus: structs.AllocClientStatusFailed, + Job: batchJob, + NodeID: "disconnected", + }, + // Lost allocs on disconnected nodes don't get restarted + "disconnecting6": { + ID: "disconnecting6", + ClientStatus: structs.AllocClientStatusLost, + Job: batchJob, + NodeID: "disconnected", + }, + // Unknown allocs on re-connected nodes are reconnecting + "reconnecting1": { + ID: "reconnecting1", + ClientStatus: structs.AllocClientStatusUnknown, + Job: batchJob, + NodeID: "normal", + }, + // Unknown allocs on re-connected nodes are reconnecting + "reconnecting2": { + ID: "reconnecting2", + ClientStatus: structs.AllocClientStatusUnknown, + Job: batchJob, + NodeID: "normal", + }, + // Complete allocs on disconnected nodes don't get restarted + "reconnecting3": { + ID: "reconnecting3", + ClientStatus: structs.AllocClientStatusComplete, + Job: batchJob, + NodeID: "normal", + }, + // Failed allocs on disconnected nodes don't get restarted + "reconnecting4": { + ID: "reconnecting4", + ClientStatus: structs.AllocClientStatusFailed, + Job: batchJob, + NodeID: "normal", + }, + // Lost allocs on disconnected nodes don't get restarted + "reconnecting5": { + ID: "reconnecting5", + ClientStatus: structs.AllocClientStatusLost, + Job: batchJob, + NodeID: "normal", + }, } - untainted, migrate, lost := allocs.filterByTainted(nodes) - require.Len(untainted, 4) - require.Contains(untainted, "untainted1") - require.Contains(untainted, "untainted2") - require.Contains(untainted, "untainted3") - require.Contains(untainted, "untainted4") - require.Len(migrate, 2) - require.Contains(migrate, "migrating1") - require.Contains(migrate, "migrating2") - require.Len(lost, 2) - require.Contains(lost, "lost1") - require.Contains(lost, "lost2") + untainted, migrate, lost, disconnecting, reconnecting := allocs.filterByTainted(nodes) + require.Len(t, untainted, 10) + require.Contains(t, untainted, "untainted1") + require.Contains(t, untainted, "untainted2") + require.Contains(t, untainted, "untainted3") + require.Contains(t, untainted, "untainted4") + require.Contains(t, untainted, "disconnecting4") + require.Contains(t, untainted, "disconnecting5") + require.Contains(t, untainted, "disconnecting6") + require.Contains(t, untainted, "reconnecting3") + require.Contains(t, untainted, "reconnecting4") + require.Contains(t, untainted, "reconnecting5") + require.Len(t, migrate, 2) + require.Contains(t, migrate, "migrating1") + require.Contains(t, migrate, "migrating2") + require.Len(t, lost, 2) + require.Contains(t, lost, "lost1") + require.Contains(t, lost, "lost2") + require.Len(t, disconnecting, 3) + require.Contains(t, disconnecting, "disconnecting1") + require.Contains(t, disconnecting, "disconnecting2") + require.Contains(t, disconnecting, "disconnecting3") + require.Len(t, reconnecting, 2) + require.Contains(t, reconnecting, "reconnecting1") + require.Contains(t, reconnecting, "reconnecting2") } diff --git a/scheduler/util.go b/scheduler/util.go index 28113afba..55b1885eb 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -350,8 +350,9 @@ func progressMade(result *structs.PlanResult) bool { } // taintedNodes is used to scan the allocations and then check if the -// underlying nodes are tainted, and should force a migration of the allocation. -// All the nodes returned in the map are tainted. +// underlying nodes are tainted, and should force a migration of the allocation, +// or if the underlying nodes are disconnected, and should be used to calculate +// the reconnect timeout of its allocations. All the nodes returned in the map are tainted. func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) { out := make(map[string]*structs.Node) for _, alloc := range allocs { @@ -373,7 +374,15 @@ func taintedNodes(state State, allocs []*structs.Allocation) (map[string]*struct if structs.ShouldDrainNode(node.Status) || node.DrainStrategy != nil { out[alloc.NodeID] = node } + + // Disconnected nodes are included in the tainted set so that their + // MaxClientDisconnect configuration can be included in the + // timeout calculation. + if node.Status == structs.NodeStatusDisconnected { + out[alloc.NodeID] = node + } } + return out, nil }