system_scheduler: support disconnected clients (#12555)

* structs: Add helper method for checking if alloc is configured to disconnect
* system_scheduler: Add support for disconnected clients
This commit is contained in:
Derek Strickland
2022-04-15 09:31:32 -04:00
committed by GitHub
parent fd21cebec7
commit f5de802993
7 changed files with 991 additions and 31 deletions

View File

@@ -9935,6 +9935,24 @@ func (a *Allocation) DisconnectTimeout(now time.Time) time.Time {
return now.Add(*timeout)
}
// SupportsDisconnectedClients determines whether both the server and the task group
// are configured to allow the allocation to reconnect after network connectivity
// has been lost and then restored.
func (a *Allocation) SupportsDisconnectedClients(serverSupportsDisconnectedClients bool) bool {
if !serverSupportsDisconnectedClients {
return false
}
if a.Job != nil {
tg := a.Job.LookupTaskGroup(a.TaskGroup)
if tg != nil {
return tg.MaxClientDisconnect != nil
}
}
return false
}
// NextDelay returns a duration after which the allocation can be rescheduled.
// It is calculated according to the delay function and previous reschedule attempts.
func (a *Allocation) NextDelay() time.Duration {

View File

@@ -429,11 +429,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update, nil)
}
// Log reconnect updates. They will be pulled by the client when it reconnects.
for _, update := range results.reconnectUpdates {
s.logger.Trace("reconnecting alloc", "alloc_id", update.ID, "alloc_modify_index", update.AllocModifyIndex)
}
// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
// If the job has been purged we don't have access to the job. Otherwise

View File

@@ -224,19 +224,11 @@ func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node, serverS
reconnecting = make(map[string]*structs.Allocation)
ignore = make(map[string]*structs.Allocation)
supportsDisconnectedClients := serverSupportsDisconnectedClients
for _, alloc := range a {
// make sure we don't apply any reconnect logic to task groups
// without max_client_disconnect
if alloc.Job != nil {
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg != nil {
supportsDisconnectedClients = serverSupportsDisconnectedClients &&
tg.MaxClientDisconnect != nil
}
}
supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients)
reconnected := false
expired := false

View File

@@ -229,7 +229,9 @@ func (s *SystemScheduler) computeJobAllocs() error {
live, term := structs.SplitTerminalAllocs(allocs)
// Diff the required and existing allocations
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term)
diff := diffSystemAllocs(s.job, s.nodes, s.notReadyNodes, tainted, live, term,
s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true))
s.logger.Debug("reconciled current state with desired state",
"place", len(diff.place), "update", len(diff.update),
"migrate", len(diff.migrate), "stop", len(diff.stop),
@@ -251,6 +253,10 @@ func (s *SystemScheduler) computeJobAllocs() error {
s.plan.AppendStoppedAlloc(e.Alloc, allocLost, structs.AllocClientStatusLost, "")
}
for _, e := range diff.disconnecting {
s.plan.AppendUnknownAlloc(e.Alloc)
}
// Attempt to do the upgrades in place
destructiveUpdates, inplaceUpdates := inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
diff.update = destructiveUpdates
@@ -508,6 +514,7 @@ func (s *SystemScheduler) canHandle(trigger string) bool {
case structs.EvalTriggerAllocStop:
case structs.EvalTriggerQueuedAllocs:
case structs.EvalTriggerScaling:
case structs.EvalTriggerReconnect:
default:
switch s.sysbatch {
case true:

View File

@@ -2132,3 +2132,880 @@ func TestSystemSched_canHandle(t *testing.T) {
require.False(t, s.canHandle(structs.EvalTriggerPeriodicJob))
})
}
func TestSystemSched_NodeDisconnected(t *testing.T) {
ci.Parallel(t)
systemJob := mock.SystemJob()
systemAlloc := mock.SystemAlloc()
systemAlloc.Name = fmt.Sprintf("my-job.%s[0]", systemJob.TaskGroups[0].Name)
sysBatchJob := mock.SystemBatchJob()
sysBatchJob.TaskGroups[0].Tasks[0].Env = make(map[string]string)
sysBatchJob.TaskGroups[0].Tasks[0].Env["foo"] = "bar"
sysBatchAlloc := mock.SysBatchAlloc()
sysBatchAlloc.Name = fmt.Sprintf("my-sysbatch.%s[0]", sysBatchJob.TaskGroups[0].Name)
now := time.Now().UTC()
unknownAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now,
}}
expiredAllocState := []*structs.AllocState{{
Field: structs.AllocStateFieldClientStatus,
Value: structs.AllocClientStatusUnknown,
Time: now.Add(-60 * time.Second),
}}
reconnectedEvent := structs.NewTaskEvent(structs.TaskClientReconnected)
reconnectedEvent.Time = time.Now().UnixNano()
systemJobReconnectTaskState := map[string]*structs.TaskState{
systemJob.TaskGroups[0].Tasks[0].Name: {
Events: []*structs.TaskEvent{reconnectedEvent},
},
}
successTaskState := map[string]*structs.TaskState{
systemJob.TaskGroups[0].Tasks[0].Name: {
State: structs.TaskStateDead,
Failed: false,
},
}
sysBatchJobReconnectTaskState := map[string]*structs.TaskState{
sysBatchJob.TaskGroups[0].Tasks[0].Name: {
Events: []*structs.TaskEvent{reconnectedEvent},
},
}
type testCase struct {
name string
jobType string
exists bool
required bool
migrate bool
draining bool
targeted bool
modifyJob bool
previousTerminal bool
nodeStatus string
clientStatus string
desiredStatus string
allocState []*structs.AllocState
taskState map[string]*structs.TaskState
expectedPlanCount int
expectedNodeAllocation map[string]*structs.Allocation
expectedNodeUpdate map[string]*structs.Allocation
}
testCases := []testCase{
{
name: "system-running-disconnect",
jobType: structs.JobTypeSystem,
exists: true,
required: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "system-running-reconnect",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: systemJobReconnectTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-unknown-expired",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: expiredAllocState,
taskState: systemJobReconnectTaskState,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-migrate",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: true,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-running-unknown",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusUnknown,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "system-ignore-unknown",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: nil,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-unknown",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: nil,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-complete-disconnected",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: nil,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-running-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: sysBatchJobReconnectTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-failed-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusFailed,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: sysBatchJobReconnectTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-complete-reconnect",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: unknownAllocState,
taskState: sysBatchJobReconnectTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-unknown-expired",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusUnknown,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: expiredAllocState,
taskState: sysBatchJobReconnectTaskState,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-migrate",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: true,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-stopped",
jobType: structs.JobTypeSysBatch,
required: false,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-lost",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-lost",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusLost,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-node-draining",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-node-draining",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: true,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-node-down-complete",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-node-down-complete",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusComplete,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-terminal",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusEvict,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-ignore-ineligible",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusPending,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-ignore-ineligible",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDisconnected,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusPending,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "system-stop-not-targeted",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: false,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-stop-not-targeted",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: false,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: nil,
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "system-update-job-version",
jobType: structs.JobTypeSystem,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-update-job-version",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusRunning,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-ignore-successful-tainted",
jobType: structs.JobTypeSysBatch,
required: true,
exists: true,
nodeStatus: structs.NodeStatusDown,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: successTaskState,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
{
name: "sysbatch-annotate-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: false,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: nil,
},
{
name: "sysbatch-update-modified-terminal-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: true,
previousTerminal: true,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 1,
expectedNodeAllocation: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
},
},
expectedNodeUpdate: map[string]*structs.Allocation{
"id": {
ClientStatus: structs.AllocClientStatusComplete,
DesiredStatus: structs.AllocDesiredStatusStop,
},
},
},
{
name: "sysbatch-ignore-unmodified-terminal-when-not-existing",
jobType: structs.JobTypeSysBatch,
required: true,
exists: false,
nodeStatus: structs.NodeStatusReady,
migrate: false,
draining: false,
targeted: true,
modifyJob: false,
previousTerminal: true,
clientStatus: structs.AllocClientStatusRunning,
desiredStatus: structs.AllocDesiredStatusRun,
allocState: nil,
taskState: nil,
expectedPlanCount: 0,
expectedNodeAllocation: nil,
expectedNodeUpdate: nil,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
h := NewHarness(t)
// Register a node
node := mock.Node()
node.Status = tc.nodeStatus
if tc.draining {
node.SchedulingEligibility = structs.NodeSchedulingIneligible
}
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
// Generate a fake job allocated on that node.
var job *structs.Job
var alloc *structs.Allocation
switch tc.jobType {
case structs.JobTypeSystem:
job = systemJob.Copy()
alloc = systemAlloc.Copy()
case structs.JobTypeSysBatch:
job = sysBatchJob.Copy()
alloc = sysBatchAlloc.Copy()
default:
require.FailNow(t, "invalid jobType")
}
job.TaskGroups[0].MaxClientDisconnect = helper.TimeToPtr(5 * time.Second)
if !tc.required {
job.Stop = true
}
// If we are no longer on a targeted node, change it to a non-targeted datacenter
if !tc.targeted {
job.Datacenters = []string{"not-targeted"}
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
alloc.Job = job.Copy()
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.ClientStatus = tc.clientStatus
alloc.DesiredStatus = tc.desiredStatus
alloc.DesiredTransition.Migrate = helper.BoolToPtr(tc.migrate)
alloc.AllocStates = tc.allocState
alloc.TaskStates = tc.taskState
if tc.exists {
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{alloc}))
}
if tc.modifyJob {
if tc.jobType == structs.JobTypeSystem {
job.TaskGroups[0].Tasks[0].Resources.Networks[0].DynamicPorts = []structs.Port{{Label: "grpc"}}
}
if tc.jobType == structs.JobTypeSysBatch {
alloc.Job.TaskGroups[0].Tasks[0].Driver = "raw_exec"
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
}
if tc.previousTerminal {
prev := alloc.Copy()
if tc.modifyJob {
prev.Job.JobModifyIndex = alloc.Job.JobModifyIndex - 1
}
prev.ClientStatus = structs.AllocClientStatusComplete
prev.DesiredStatus = structs.AllocDesiredStatusRun
require.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Allocation{prev}))
}
// Create a mock evaluation to deal with disconnect
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
NodeID: node.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewSystemScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, tc.expectedPlanCount)
if tc.expectedPlanCount == 0 {
return
}
plan := h.Plans[0]
// Ensure the plan creates the expected plan
require.Len(t, plan.NodeAllocation[node.ID], len(tc.expectedNodeAllocation))
require.Len(t, plan.NodeUpdate[node.ID], len(tc.expectedNodeUpdate))
foundMatch := false
for _, plannedNodeAllocs := range plan.NodeAllocation {
for _, actual := range plannedNodeAllocs {
for _, expected := range tc.expectedNodeAllocation {
if expected.ClientStatus == actual.ClientStatus &&
expected.DesiredStatus == actual.DesiredStatus {
foundMatch = true
break
}
}
}
}
if len(tc.expectedNodeAllocation) > 0 {
require.True(t, foundMatch, "NodeAllocation did not match")
}
foundMatch = false
for _, plannedNodeUpdates := range plan.NodeUpdate {
for _, actual := range plannedNodeUpdates {
for _, expected := range tc.expectedNodeUpdate {
if expected.ClientStatus == actual.ClientStatus &&
expected.DesiredStatus == actual.DesiredStatus {
foundMatch = true
break
}
}
}
}
if len(tc.expectedNodeUpdate) > 0 {
require.True(t, foundMatch, "NodeUpdate did not match")
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
})
}
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"reflect"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
@@ -38,12 +39,12 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
// diffResult is used to return the sets that result from the diff
type diffResult struct {
place, update, migrate, stop, ignore, lost []allocTuple
place, update, migrate, stop, ignore, lost, disconnecting, reconnecting []allocTuple
}
func (d *diffResult) GoString() string {
return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d)",
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost))
return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (stop %d) (ignore %d) (lost %d) (disconnecting %d) (reconnecting %d)",
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore), len(d.lost), len(d.disconnecting), len(d.reconnecting))
}
func (d *diffResult) Append(other *diffResult) {
@@ -53,15 +54,19 @@ func (d *diffResult) Append(other *diffResult) {
d.stop = append(d.stop, other.stop...)
d.ignore = append(d.ignore, other.ignore...)
d.lost = append(d.lost, other.lost...)
d.disconnecting = append(d.disconnecting, other.disconnecting...)
d.reconnecting = append(d.reconnecting, other.reconnecting...)
}
// diffSystemAllocsForNode is used to do a set difference between the target allocations
// and the existing allocations for a particular node. This returns 6 sets of results,
// and the existing allocations for a particular node. This returns 8 sets of results,
// the list of named task groups that need to be placed (no existing allocation), the
// allocations that need to be updated (job definition is newer), allocs that
// need to be migrated (node is draining), the allocs that need to be evicted
// (no longer required), those that should be ignored and those that are lost
// that need to be replaced (running on a lost node).
// (no longer required), those that should be ignored, those that are lost
// that need to be replaced (running on a lost node), those that are running on
// a disconnected node but may resume, and those that may still be running on
// a node that has resumed reconnected.
func diffSystemAllocsForNode(
job *structs.Job, // job whose allocs are going to be diff-ed
nodeID string,
@@ -71,6 +76,7 @@ func diffSystemAllocsForNode(
required map[string]*structs.TaskGroup, // set of allocations that must exist
allocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) *diffResult {
result := new(diffResult)
@@ -94,6 +100,16 @@ func diffSystemAllocsForNode(
continue
}
supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients)
reconnected := false
// Only compute reconnected for unknown and running since they need to go through the reconnect process.
if supportsDisconnectedClients &&
(exist.ClientStatus == structs.AllocClientStatusUnknown ||
exist.ClientStatus == structs.AllocClientStatusRunning) {
reconnected, _ = exist.Reconnected()
}
// If we have been marked for migration and aren't terminal, migrate
if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
@@ -114,18 +130,72 @@ func diffSystemAllocsForNode(
continue
}
// Expired unknown allocs are lost. Expired checks that status is unknown.
if supportsDisconnectedClients && exist.Expired(time.Now().UTC()) {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
// Ignore unknown allocs that we want to reconnect eventually.
if supportsDisconnectedClients &&
exist.ClientStatus == structs.AllocClientStatusUnknown &&
exist.DesiredStatus == structs.AllocDesiredStatusRun {
result.ignore = append(result.ignore, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
node, nodeIsTainted := taintedNodes[exist.NodeID]
// Filter allocs on a node that is now re-connected to reconnecting.
if supportsDisconnectedClients &&
!nodeIsTainted &&
reconnected {
result.reconnecting = append(result.reconnecting, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
// If we are on a tainted node, we must migrate if we are a service or
// if the batch allocation did not finish
if node, ok := taintedNodes[exist.NodeID]; ok {
if nodeIsTainted {
// If the job is batch and finished successfully, the fact that the
// node is tainted does not mean it should be migrated or marked as
// lost as the work was already successfully finished. However for
// service/system jobs, tasks should never complete. The check of
// batch type, defends against client bugs.
if exist.Job.Type == structs.JobTypeBatch && exist.RanSuccessfully() {
if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() {
goto IGNORE
}
// Filter running allocs on a node that is disconnected to be marked as unknown.
if node != nil &&
supportsDisconnectedClients &&
node.Status == structs.NodeStatusDisconnected &&
exist.ClientStatus == structs.AllocClientStatusRunning {
disconnect := exist.Copy()
disconnect.ClientStatus = structs.AllocClientStatusUnknown
disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown)
disconnect.ClientDescription = allocUnknown
result.disconnecting = append(result.disconnecting, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: disconnect,
})
continue
}
if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) {
result.lost = append(result.lost, allocTuple{
Name: name,
@@ -141,13 +211,13 @@ func diffSystemAllocsForNode(
// For an existing allocation, if the nodeID is no longer
// eligible, the diff should be ignored
if _, ok := notReadyNodes[nodeID]; ok {
if _, ineligible := notReadyNodes[nodeID]; ineligible {
goto IGNORE
}
// Existing allocations on nodes that are no longer targeted
// should be stopped
if _, ok := eligibleNodes[nodeID]; !ok {
if _, eligible := eligibleNodes[nodeID]; !eligible {
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
@@ -247,6 +317,7 @@ func diffSystemAllocs(
taintedNodes map[string]*structs.Node, // nodes which are down or drain mode (by node id)
allocs []*structs.Allocation, // non-terminal allocations
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) *diffResult {
// Build a mapping of nodes to all their allocs.
@@ -268,7 +339,7 @@ func diffSystemAllocs(
result := new(diffResult)
for nodeID, allocs := range nodeAllocs {
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients)
result.Append(diff)
}

View File

@@ -67,7 +67,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
},
}
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
require.Empty(t, diff.place)
require.Empty(t, diff.update)
require.Empty(t, diff.stop)
@@ -93,7 +93,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
expAlloc := terminal["node1"]["my-sysbatch.pinger[0]"]
expAlloc.NodeID = "node1"
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal)
diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
require.Empty(t, diff.place)
require.Len(t, diff.update, 1)
require.Empty(t, diff.stop)
@@ -199,7 +199,7 @@ func TestDiffSystemAllocsForNode(t *testing.T) {
},
}
diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, "zip", eligible, nil, tainted, required, allocs, terminal, true)
// We should update the first alloc
require.Len(t, diff.update, 1)
@@ -282,7 +282,7 @@ func TestDiffSystemAllocsForNode_ExistingAllocIneligibleNode(t *testing.T) {
// No terminal allocs
terminal := make(structs.TerminalByNodeByName)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal)
diff := diffSystemAllocsForNode(job, eligibleNode.ID, eligible, nil, tainted, required, allocs, terminal, true)
require.Len(t, diff.place, 0)
require.Len(t, diff.update, 1)
@@ -364,7 +364,7 @@ func TestDiffSystemAllocs(t *testing.T) {
},
}
diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal)
diff := diffSystemAllocs(job, nodes, nil, tainted, allocs, terminal, true)
// We should update the first alloc
require.Len(t, diff.update, 1)