From f5de80299312409ec3e0a867cf520a0949adc026 Mon Sep 17 00:00:00 2001 From: Derek Strickland <1111455+DerekStrickland@users.noreply.github.com> Date: Fri, 15 Apr 2022 09:31:32 -0400 Subject: [PATCH] system_scheduler: support disconnected clients (#12555) * structs: Add helper method for checking if alloc is configured to disconnect * system_scheduler: Add support for disconnected clients --- nomad/structs/structs.go | 18 + scheduler/generic_sched.go | 5 - scheduler/reconcile_util.go | 10 +- scheduler/scheduler_system.go | 9 +- scheduler/scheduler_system_test.go | 877 +++++++++++++++++++++++++++++ scheduler/util.go | 93 ++- scheduler/util_test.go | 10 +- 7 files changed, 991 insertions(+), 31 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b8f2c30a2..bc1d9627d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -9935,6 +9935,24 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time { return now.Add(*timeout) } +// SupportsDisconnectedClients determines whether both the server and the task group +// are configured to allow the allocation to reconnect after network connectivity +// has been lost and then restored. +func (a *Allocation) SupportsDisconnectedClients(serverSupportsDisconnectedClients bool) bool { + if !serverSupportsDisconnectedClients { + return false + } + + if a.Job != nil { + tg := a.Job.LookupTaskGroup(a.TaskGroup) + if tg != nil { + return tg.MaxClientDisconnect != nil + } + } + + return false +} + // NextDelay returns a duration after which the allocation can be rescheduled. // It is calculated according to the delay function and previous reschedule attempts. func (a *Allocation) NextDelay() time.Duration { diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index ab16badd3..90fba5427 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -429,11 +429,6 @@ func (s *GenericScheduler) computeJobAllocs() error { s.ctx.Plan().AppendAlloc(update, nil) } - // Log reconnect updates. They will be pulled by the client when it reconnects. - for _, update := range results.reconnectUpdates { - s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex) - } - // Nothing remaining to do if placement is not required if len(results.place)+len(results.destructiveUpdate) == 0 { // If the job has been purged we don't have access to the job. Otherwise diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 49f7bbe3b..395998ff2 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -224,19 +224,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS reconnecting = make(map[string]*structs.Allocation) ignore = make(map[string]*structs.Allocation) - supportsDisconnectedClients := serverSupportsDisconnectedClients - for _, alloc := range a { // make sure we don't apply any reconnect logic to task groups // without max_client_disconnect - if alloc.Job != nil { - tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) - if tg != nil { - supportsDisconnectedClients = serverSupportsDisconnectedClients && - tg.MaxClientDisconnect != nil - } - } + supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) reconnected := false expired := false diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 59de58d89..2d5e03bda 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -229,7 +229,9 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term) + diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term, + s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) + s.logger.Debug("reconciled current state with desired state", "place", len(diff.place), "update", len(diff.update), "migrate", len(diff.migrate), "stop", len(diff.stop), @@ -251,6 +253,10 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "") } + for _, e := range diff.disconnecting { + s.plan.AppendUnknownAlloc(e.Alloc) + } + // Attempt to do the upgrades in place destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) diff.update = destructiveUpdates @@ -508,6 +514,7 @@ func (s *SystemScheduler) canHandle(trigger string) bool { case structs.EvalTriggerAllocStop: case structs.EvalTriggerQueuedAllocs: case structs.EvalTriggerScaling: + case structs.EvalTriggerReconnect: default: switch s.sysbatch { case true: diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 63c4c1869..ce3b5cc51 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -2132,3 +2132,880 @@ func TestSystemSched_canHandle(t *testing.T) { require.False(t, s.canHandle(structs.EvalTriggerPeriodicJob)) }) } + +func TestSystemSched_NodeDisconnected(t *testing.T) { + ci.Parallel(t) + + systemJob := mock.SystemJob() + systemAlloc := mock.SystemAlloc() + systemAlloc.Name = fmt.Sprintf("my-job.%s[0]", systemJob.TaskGroups[0].Name) + + sysBatchJob := mock.SystemBatchJob() + sysBatchJob.TaskGroups[0].Tasks[0].Env = make(map[string]string) + sysBatchJob.TaskGroups[0].Tasks[0].Env["foo"] = "bar" + sysBatchAlloc := mock.SysBatchAlloc() + sysBatchAlloc.Name = fmt.Sprintf("my-sysbatch.%s[0]", sysBatchJob.TaskGroups[0].Name) + + now := time.Now().UTC() + + unknownAllocState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now, + }} + + expiredAllocState := []*structs.AllocState{{ + Field: structs.AllocStateFieldClientStatus, + Value: structs.AllocClientStatusUnknown, + Time: now.Add(-60 * time.Second), + }} + + reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected) + reconnectedEvent.Time = time.Now().UnixNano() + systemJobReconnectTaskState := map[string]*structs.TaskState{ + systemJob.TaskGroups[0].Tasks[0].Name: { + Events: []*structs.TaskEvent{reconnectedEvent}, + }, + } + + successTaskState := map[string]*structs.TaskState{ + systemJob.TaskGroups[0].Tasks[0].Name: { + State: structs.TaskStateDead, + Failed: false, + }, + } + + sysBatchJobReconnectTaskState := map[string]*structs.TaskState{ + sysBatchJob.TaskGroups[0].Tasks[0].Name: { + Events: []*structs.TaskEvent{reconnectedEvent}, + }, + } + + type testCase struct { + name string + jobType string + exists bool + required bool + migrate bool + draining bool + targeted bool + modifyJob bool + previousTerminal bool + nodeStatus string + clientStatus string + desiredStatus string + allocState []*structs.AllocState + taskState map[string]*structs.TaskState + expectedPlanCount int + expectedNodeAllocation map[string]*structs.Allocation + expectedNodeUpdate map[string]*structs.Allocation + } + + testCases := []testCase{ + { + name: "system-running-disconnect", + jobType: structs.JobTypeSystem, + exists: true, + required: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusUnknown, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: nil, + }, + { + name: "system-running-reconnect", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: systemJobReconnectTaskState, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "system-unknown-expired", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusUnknown, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: expiredAllocState, + taskState: systemJobReconnectTaskState, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusLost, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "system-migrate", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: true, + draining: true, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-running-unknown", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusUnknown, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: nil, + }, + { + name: "system-ignore-unknown", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusUnknown, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: nil, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-ignore-unknown", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusUnknown, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: nil, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-ignore-complete-disconnected", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusComplete, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: nil, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-running-reconnect", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: sysBatchJobReconnectTaskState, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-failed-reconnect", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusFailed, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: sysBatchJobReconnectTaskState, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-complete-reconnect", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusComplete, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: unknownAllocState, + taskState: sysBatchJobReconnectTaskState, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-unknown-expired", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusUnknown, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: expiredAllocState, + taskState: sysBatchJobReconnectTaskState, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusLost, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-migrate", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: true, + draining: true, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "system-stopped", + jobType: structs.JobTypeSysBatch, + required: false, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "system-lost", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusLost, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-lost", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusLost, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "system-node-draining", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: true, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-node-draining", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: true, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "system-node-down-complete", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusComplete, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-node-down-complete", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusComplete, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-ignore-terminal", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusEvict, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "system-ignore-ineligible", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusPending, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-ignore-ineligible", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDisconnected, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusPending, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "system-stop-not-targeted", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: false, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-stop-not-targeted", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: false, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: nil, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "system-update-job-version", + jobType: structs.JobTypeSystem, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: true, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-update-job-version", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: true, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusRunning, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-ignore-successful-tainted", + jobType: structs.JobTypeSysBatch, + required: true, + exists: true, + nodeStatus: structs.NodeStatusDown, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: successTaskState, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-annotate-when-not-existing", + jobType: structs.JobTypeSysBatch, + required: true, + exists: false, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: false, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: nil, + }, + { + name: "sysbatch-update-modified-terminal-when-not-existing", + jobType: structs.JobTypeSysBatch, + required: true, + exists: false, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: true, + previousTerminal: true, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 1, + expectedNodeAllocation: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusPending, + DesiredStatus: structs.AllocDesiredStatusRun, + }, + }, + expectedNodeUpdate: map[string]*structs.Allocation{ + "id": { + ClientStatus: structs.AllocClientStatusComplete, + DesiredStatus: structs.AllocDesiredStatusStop, + }, + }, + }, + { + name: "sysbatch-ignore-unmodified-terminal-when-not-existing", + jobType: structs.JobTypeSysBatch, + required: true, + exists: false, + nodeStatus: structs.NodeStatusReady, + migrate: false, + draining: false, + targeted: true, + modifyJob: false, + previousTerminal: true, + clientStatus: structs.AllocClientStatusRunning, + desiredStatus: structs.AllocDesiredStatusRun, + allocState: nil, + taskState: nil, + expectedPlanCount: 0, + expectedNodeAllocation: nil, + expectedNodeUpdate: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + h := NewHarness(t) + + // Register a node + node := mock.Node() + node.Status = tc.nodeStatus + + if tc.draining { + node.SchedulingEligibility = structs.NodeSchedulingIneligible + } + + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + // Generate a fake job allocated on that node. + var job *structs.Job + var alloc *structs.Allocation + switch tc.jobType { + case structs.JobTypeSystem: + job = systemJob.Copy() + alloc = systemAlloc.Copy() + case structs.JobTypeSysBatch: + job = sysBatchJob.Copy() + alloc = sysBatchAlloc.Copy() + default: + require.FailNow(t, "invalid jobType") + } + + job.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Second) + + if !tc.required { + job.Stop = true + } + + // If we are no longer on a targeted node, change it to a non-targeted datacenter + if !tc.targeted { + job.Datacenters = []string{"not-targeted"} + } + + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + alloc.Job = job.Copy() + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.TaskGroup = job.TaskGroups[0].Name + alloc.ClientStatus = tc.clientStatus + alloc.DesiredStatus = tc.desiredStatus + alloc.DesiredTransition.Migrate = helper.BoolToPtr(tc.migrate) + alloc.AllocStates = tc.allocState + alloc.TaskStates = tc.taskState + + if tc.exists { + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc})) + } + + if tc.modifyJob { + if tc.jobType == structs.JobTypeSystem { + job.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = []structs.Port{{Label: "grpc"}} + } + if tc.jobType == structs.JobTypeSysBatch { + alloc.Job.TaskGroups[0].Tasks[0].Driver = "raw_exec" + } + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + } + + if tc.previousTerminal { + prev := alloc.Copy() + if tc.modifyJob { + prev.Job.JobModifyIndex = alloc.Job.JobModifyIndex - 1 + } + prev.ClientStatus = structs.AllocClientStatusComplete + prev.DesiredStatus = structs.AllocDesiredStatusRun + + require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{prev})) + } + // Create a mock evaluation to deal with disconnect + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + require.NoError(t, err) + + // Ensure a single plan + require.Len(t, h.Plans, tc.expectedPlanCount) + if tc.expectedPlanCount == 0 { + return + } + + plan := h.Plans[0] + + // Ensure the plan creates the expected plan + require.Len(t, plan.NodeAllocation[node.ID], len(tc.expectedNodeAllocation)) + require.Len(t, plan.NodeUpdate[node.ID], len(tc.expectedNodeUpdate)) + + foundMatch := false + + for _, plannedNodeAllocs := range plan.NodeAllocation { + for _, actual := range plannedNodeAllocs { + for _, expected := range tc.expectedNodeAllocation { + if expected.ClientStatus == actual.ClientStatus && + expected.DesiredStatus == actual.DesiredStatus { + foundMatch = true + break + } + } + } + } + + if len(tc.expectedNodeAllocation) > 0 { + require.True(t, foundMatch, "NodeAllocation did not match") + } + + foundMatch = false + for _, plannedNodeUpdates := range plan.NodeUpdate { + for _, actual := range plannedNodeUpdates { + for _, expected := range tc.expectedNodeUpdate { + if expected.ClientStatus == actual.ClientStatus && + expected.DesiredStatus == actual.DesiredStatus { + foundMatch = true + break + } + } + } + } + + if len(tc.expectedNodeUpdate) > 0 { + require.True(t, foundMatch, "NodeUpdate did not match") + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + }) + } +} diff --git a/scheduler/util.go b/scheduler/util.go index 55b1885eb..004a57b73 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -5,6 +5,7 @@ import ( "fmt" "math/rand" "reflect" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -38,12 +39,12 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { // diffResult is used to return the sets that result from the diff type diffResult struct { - place, update, migrate, stop, ignore, lost []allocTuple + place, update, migrate, stop, ignore, lost, disconnecting, reconnecting []allocTuple } func (d *diffResult) GoString() string { - return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d)", - len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost)) + return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d) (disconnecting %d) (reconnecting %d)", + len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost), len(d.disconnecting), len(d.reconnecting)) } func (d *diffResult) Append(other *diffResult) { @@ -53,15 +54,19 @@ func (d *diffResult) Append(other *diffResult) { d.stop = append(d.stop, other.stop...) d.ignore = append(d.ignore, other.ignore...) d.lost = append(d.lost, other.lost...) + d.disconnecting = append(d.disconnecting, other.disconnecting...) + d.reconnecting = append(d.reconnecting, other.reconnecting...) } // diffSystemAllocsForNode is used to do a set difference between the target allocations -// and the existing allocations for a particular node. This returns 6 sets of results, +// and the existing allocations for a particular node. This returns 8 sets of results, // the list of named task groups that need to be placed (no existing allocation), the // allocations that need to be updated (job definition is newer), allocs that // need to be migrated (node is draining), the allocs that need to be evicted -// (no longer required), those that should be ignored and those that are lost -// that need to be replaced (running on a lost node). +// (no longer required), those that should be ignored, those that are lost +// that need to be replaced (running on a lost node), those that are running on +// a disconnected node but may resume, and those that may still be running on +// a node that has resumed reconnected. func diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, @@ -71,6 +76,7 @@ func diffSystemAllocsForNode( required map[string]*structs.TaskGroup, // set of allocations that must exist allocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) + serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic ) *diffResult { result := new(diffResult) @@ -94,6 +100,16 @@ func diffSystemAllocsForNode( continue } + supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) + + reconnected := false + // Only compute reconnected for unknown and running since they need to go through the reconnect process. + if supportsDisconnectedClients && + (exist.ClientStatus == structs.AllocClientStatusUnknown || + exist.ClientStatus == structs.AllocClientStatusRunning) { + reconnected, _ = exist.Reconnected() + } + // If we have been marked for migration and aren't terminal, migrate if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() { result.migrate = append(result.migrate, allocTuple{ @@ -114,18 +130,72 @@ func diffSystemAllocsForNode( continue } + // Expired unknown allocs are lost. Expired checks that status is unknown. + if supportsDisconnectedClients && exist.Expired(time.Now().UTC()) { + result.lost = append(result.lost, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + // Ignore unknown allocs that we want to reconnect eventually. + if supportsDisconnectedClients && + exist.ClientStatus == structs.AllocClientStatusUnknown && + exist.DesiredStatus == structs.AllocDesiredStatusRun { + result.ignore = append(result.ignore, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + + node, nodeIsTainted := taintedNodes[exist.NodeID] + + // Filter allocs on a node that is now re-connected to reconnecting. + if supportsDisconnectedClients && + !nodeIsTainted && + reconnected { + result.reconnecting = append(result.reconnecting, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } + // If we are on a tainted node, we must migrate if we are a service or // if the batch allocation did not finish - if node, ok := taintedNodes[exist.NodeID]; ok { + if nodeIsTainted { // If the job is batch and finished successfully, the fact that the // node is tainted does not mean it should be migrated or marked as // lost as the work was already successfully finished. However for // service/system jobs, tasks should never complete. The check of // batch type, defends against client bugs. - if exist.Job.Type == structs.JobTypeBatch && exist.RanSuccessfully() { + if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() { goto IGNORE } + // Filter running allocs on a node that is disconnected to be marked as unknown. + if node != nil && + supportsDisconnectedClients && + node.Status == structs.NodeStatusDisconnected && + exist.ClientStatus == structs.AllocClientStatusRunning { + + disconnect := exist.Copy() + disconnect.ClientStatus = structs.AllocClientStatusUnknown + disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) + disconnect.ClientDescription = allocUnknown + result.disconnecting = append(result.disconnecting, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: disconnect, + }) + continue + } + if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) { result.lost = append(result.lost, allocTuple{ Name: name, @@ -141,13 +211,13 @@ func diffSystemAllocsForNode( // For an existing allocation, if the nodeID is no longer // eligible, the diff should be ignored - if _, ok := notReadyNodes[nodeID]; ok { + if _, ineligible := notReadyNodes[nodeID]; ineligible { goto IGNORE } // Existing allocations on nodes that are no longer targeted // should be stopped - if _, ok := eligibleNodes[nodeID]; !ok { + if _, eligible := eligibleNodes[nodeID]; !eligible { result.stop = append(result.stop, allocTuple{ Name: name, TaskGroup: tg, @@ -247,6 +317,7 @@ func diffSystemAllocs( taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id) allocs []*structs.Allocation, // non-terminal allocations terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id) + serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic ) *diffResult { // Build a mapping of nodes to all their allocs. @@ -268,7 +339,7 @@ func diffSystemAllocs( result := new(diffResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) result.Append(diff) } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index fec7d8ad4..65e5498a1 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -67,7 +67,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) require.Empty(t, diff.place) require.Empty(t, diff.update) require.Empty(t, diff.stop) @@ -93,7 +93,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"] expAlloc.NodeID = "node1" - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal) + diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) require.Empty(t, diff.place) require.Len(t, diff.update, 1) require.Empty(t, diff.stop) @@ -199,7 +199,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal, true) // We should update the first alloc require.Len(t, diff.update, 1) @@ -282,7 +282,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) { // No terminal allocs terminal := make(structs.TerminalByNodeByName) - diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal) + diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal, true) require.Len(t, diff.place, 0) require.Len(t, diff.update, 1) @@ -364,7 +364,7 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal) + diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal, true) // We should update the first alloc require.Len(t, diff.update, 1)