diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 6b812740c..501052979 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -366,6 +366,11 @@ func (s *GenericScheduler) computeJobAllocs() error { s.ctx.Plan().AppendAlloc(update) } + // Handle the annotation updates + for _, update := range results.attributeUpdates { + s.ctx.Plan().AppendAlloc(update) + } + // Nothing remaining to do if placement is not required if len(results.place)+len(results.destructiveUpdate) == 0 { if !s.job.Stopped() { diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index b7b936def..416cee45d 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -95,6 +95,10 @@ type reconcileResults struct { // stop is the set of allocations to stop stop []allocStopResult + // attributeUpdates are updates to the allocation that are not from a + // jobspec change. + attributeUpdates map[string]*structs.Allocation + // desiredTGUpdates captures the desired set of changes to make for each // task group. desiredTGUpdates map[string]*structs.DesiredUpdates @@ -326,8 +330,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { } } - // Filter batch allocations that do not need to be considered. - all, ignore := a.batchFiltration(all) + // Filter allocations that do not need to be considered because they are + // from an older job version and are terminal. + all, ignore := a.filterOldTerminalAllocs(all) desiredChanges.Ignore += uint64(len(ignore)) canaries, all := a.handleGroupCanaries(all, desiredChanges) @@ -338,11 +343,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Determine what set of terminal allocations need to be rescheduled untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch) - // Create batched follow up evaluations for allocations that are reschedulable later - var rescheduleLaterAllocs map[string]*structs.Allocation - if len(rescheduleLater) > 0 { - rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name) - } + // Create batched follow up evaluations for allocations that are + // reschedulable later and mark the allocations for in place updating + a.handleDelayedReschedules(rescheduleLater, all, tg.Name) // Create a structure for choosing names. Seed with the taken names which is // the union of untainted and migrating nodes (includes canaries) @@ -365,7 +368,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. - ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs) + ignore, inplace, destructive := a.computeUpdates(tg, untainted) desiredChanges.Ignore += uint64(len(ignore)) desiredChanges.InPlaceUpdate += uint64(len(inplace)) if !existingDeployment { @@ -480,8 +483,20 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { }) } + // Create new deployment if: + // 1. Updating a job specification + // 2. No running allocations (first time running a job) + updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0 + hadRunning := false + for _, alloc := range all { + if alloc.Job.Version == a.job.Version { + hadRunning = true + break + } + } + // Create a new deployment if necessary - if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 { + if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { // A previous group may have made the deployment already if a.deployment == nil { a.deployment = structs.NewDeployment(a.job) @@ -511,9 +526,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { return deploymentComplete } -// batchFiltration filters batch allocations that should be ignored. These are -// allocations that are terminal from a previous job version. -func (a *allocReconciler) batchFiltration(all allocSet) (filtered, ignore allocSet) { +// filterOldTerminalAllocs filters allocations that should be ignored since they +// are allocations that are terminal from a previous job version. +func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) { if !a.batch { return all, nil } @@ -782,7 +797,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // 2. Those that can be upgraded in-place. These are added to the results // automatically since the function contains the correct state to do so, // 3. Those that require destructive updates -func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) { +func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) { // Determine the set of allocations that need to be updated ignore = make(map[string]*structs.Allocation) inplace = make(map[string]*structs.Allocation) @@ -790,19 +805,11 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, re for _, alloc := range untainted { ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group) - // Also check if the alloc is marked for later rescheduling. - // If so it should be in the inplace list - reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID] - if isRescheduleLater { - inplace[alloc.ID] = alloc - a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc) - } else if ignoreChange { + if ignoreChange { ignore[alloc.ID] = alloc } else if destructiveChange { destructive[alloc.ID] = alloc } else { - // Attach the deployment ID and and clear the health if the - // deployment has changed inplace[alloc.ID] = alloc a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc) } @@ -813,7 +820,11 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, re // handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set // for allocations that are eligible to be rescheduled later -func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet { +func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { + if len(rescheduleLater) == 0 { + return + } + // Sort by time sort.Slice(rescheduleLater, func(i, j int) bool { return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime) @@ -822,6 +833,7 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes var evals []*structs.Evaluation nextReschedTime := rescheduleLater[0].rescheduleTime allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater)) + // Create a new eval for the first batch eval := &structs.Evaluation{ ID: uuid.Generate(), @@ -835,6 +847,7 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes WaitUntil: nextReschedTime, } evals = append(evals, eval) + for _, allocReschedInfo := range rescheduleLater { if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize { allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID @@ -861,13 +874,16 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes a.result.desiredFollowupEvals[tgName] = evals + // Initialize the annotations + if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil { + a.result.attributeUpdates = make(map[string]*structs.Allocation) + } + // Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID - rescheduleLaterAllocs := make(map[string]*structs.Allocation) for allocID, evalID := range allocIDToFollowupEvalID { existingAlloc := all[allocID] updatedAlloc := existingAlloc.Copy() updatedAlloc.FollowupEvalID = evalID - rescheduleLaterAllocs[allocID] = updatedAlloc + a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc } - return rescheduleLaterAllocs } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index 604347fa5..044471384 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -114,8 +114,7 @@ func allocUpdateFnDestructive(*structs.Allocation, *structs.Job, *structs.TaskGr func allocUpdateFnInplace(existing *structs.Allocation, _ *structs.Job, newTG *structs.TaskGroup) (bool, bool, *structs.Allocation) { // Create a shallow copy - newAlloc := new(structs.Allocation) - *newAlloc = *existing + newAlloc := existing.CopySkipJob() newAlloc.TaskResources = make(map[string]*structs.Resources) // Use the new task resources but keep the network from the old @@ -289,6 +288,14 @@ func stopResultsToNames(stop []allocStopResult) []string { return names } +func attributeUpdatesToNames(attributeUpdates map[string]*structs.Allocation) []string { + names := make([]string, 0, len(attributeUpdates)) + for _, a := range attributeUpdates { + names = append(names, a.Name) + } + return names +} + func allocsToNames(allocs []*structs.Allocation) []string { names := make([]string, 0, len(allocs)) for _, a := range allocs { @@ -303,6 +310,7 @@ type resultExpectation struct { place int destructive int inplace int + attributeUpdates int stop int desiredTGUpdates map[string]*structs.DesiredUpdates } @@ -334,6 +342,9 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { if l := len(r.inplaceUpdate); l != exp.inplace { t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l) } + if l := len(r.attributeUpdates); l != exp.attributeUpdates { + t.Fatalf("Expected %d attribute updates; got %d", exp.attributeUpdates, l) + } if l := len(r.stop); l != exp.stop { t.Fatalf("Expected %d stops; got %d", exp.stop, l) } @@ -1208,14 +1219,17 @@ func TestReconciler_MultiTG(t *testing.T) { // Tests delayed rescheduling of failed batch allocations func TestReconciler_RescheduleLater_Batch(t *testing.T) { require := require.New(t) + // Set desired 4 job := mock.Job() job.TaskGroups[0].Count = 4 now := time.Now() + // Set up reschedule policy delayDur := 15 * time.Second job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} tgName := job.TaskGroups[0].Name + // Create 6 existing allocations - 2 running, 1 complete and 3 failed var allocs []*structs.Allocation for i := 0; i < 6; i++ { @@ -1227,6 +1241,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { allocs = append(allocs, alloc) alloc.ClientStatus = structs.AllocClientStatusRunning } + // Mark 3 as failed with restart tracking info allocs[0].ClientStatus = structs.AllocClientStatusFailed allocs[0].NextAllocation = allocs[1].ID @@ -1252,6 +1267,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { PrevNodeID: uuid.Generate(), }, }} + // Mark one as complete allocs[5].ClientStatus = structs.AllocClientStatusComplete @@ -1259,7 +1275,6 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { r := reconciler.Compute() // Two reschedule attempts were already made, one more can be made at a future time - // Verify that the follow up eval has the expected waitUntil time evals := r.desiredFollowupEvals[tgName] require.NotNil(evals) @@ -1271,33 +1286,42 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) { createDeployment: nil, deploymentUpdates: nil, place: 0, - inplace: 1, + inplace: 0, + attributeUpdates: 1, stop: 0, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Place: 0, - InPlaceUpdate: 1, - Ignore: 3, + InPlaceUpdate: 0, + Ignore: 4, }, }, }) - assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate)) - // verify that the followup evalID field is set correctly - r.inplaceUpdate[0].EvalID = evals[0].ID + assertNamesHaveIndexes(t, intRange(2, 2), attributeUpdatesToNames(r.attributeUpdates)) + + // Verify that the followup evalID field is set correctly + var annotated *structs.Allocation + for _, a := range r.attributeUpdates { + annotated = a + } + require.Equal(evals[0].ID, annotated.FollowupEvalID) } // Tests delayed rescheduling of failed batch allocations and batching of allocs // with fail times that are close together func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { require := require.New(t) + // Set desired 4 job := mock.Job() job.TaskGroups[0].Count = 10 now := time.Now() + // Set up reschedule policy delayDur := 15 * time.Second job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"} tgName := job.TaskGroups[0].Name + // Create 10 existing allocations var allocs []*structs.Allocation for i := 0; i < 10; i++ { @@ -1309,6 +1333,7 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { allocs = append(allocs, alloc) alloc.ClientStatus = structs.AllocClientStatusRunning } + // Mark 5 as failed with fail times very close together for i := 0; i < 5; i++ { allocs[i].ClientStatus = structs.AllocClientStatusFailed @@ -1343,19 +1368,21 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) { createDeployment: nil, deploymentUpdates: nil, place: 0, - inplace: 7, + inplace: 0, + attributeUpdates: 7, stop: 0, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Place: 0, - InPlaceUpdate: 7, - Ignore: 3, + InPlaceUpdate: 0, + Ignore: 10, }, }, }) - assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate)) - // verify that the followup evalID field is set correctly - for _, alloc := range r.inplaceUpdate { + assertNamesHaveIndexes(t, intRange(0, 6), attributeUpdatesToNames(r.attributeUpdates)) + + // Verify that the followup evalID field is set correctly + for _, alloc := range r.attributeUpdates { if allocNameToIndex(alloc.Name) < 5 { require.Equal(evals[0].ID, alloc.FollowupEvalID) } else if allocNameToIndex(alloc.Name) < 7 { @@ -1447,11 +1474,13 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) { // Tests rescheduling failed service allocations with desired state stop func TestReconciler_RescheduleLater_Service(t *testing.T) { require := require.New(t) + // Set desired 5 job := mock.Job() job.TaskGroups[0].Count = 5 tgName := job.TaskGroups[0].Name now := time.Now() + // Set up reschedule policy delayDur := 15 * time.Second job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, MaxDelay: 1 * time.Hour} @@ -1467,8 +1496,10 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) { allocs = append(allocs, alloc) alloc.ClientStatus = structs.AllocClientStatusRunning } + // Mark two as failed allocs[0].ClientStatus = structs.AllocClientStatusFailed + // Mark one of them as already rescheduled once allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), @@ -1498,33 +1529,49 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) { createDeployment: nil, deploymentUpdates: nil, place: 1, - inplace: 1, + inplace: 0, + attributeUpdates: 1, stop: 0, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Place: 1, - InPlaceUpdate: 1, - Ignore: 3, + InPlaceUpdate: 0, + Ignore: 4, }, }, }) assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate)) - // verify that the followup evalID field is set correctly - r.inplaceUpdate[0].EvalID = evals[0].ID + assertNamesHaveIndexes(t, intRange(1, 1), attributeUpdatesToNames(r.attributeUpdates)) + + // Verify that the followup evalID field is set correctly + var annotated *structs.Allocation + for _, a := range r.attributeUpdates { + annotated = a + } + require.Equal(evals[0].ID, annotated.FollowupEvalID) } // Tests rescheduling failed service allocations with desired state stop func TestReconciler_RescheduleNow_Service(t *testing.T) { require := require.New(t) + // Set desired 5 job := mock.Job() job.TaskGroups[0].Count = 5 tgName := job.TaskGroups[0].Name now := time.Now() - // Set up reschedule policy - job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, MaxDelay: 1 * time.Hour} + + // Set up reschedule policy and update stanza + job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{ + Attempts: 1, + Interval: 24 * time.Hour, + Delay: 5 * time.Second, + DelayFunction: "", + MaxDelay: 1 * time.Hour, + Unlimited: false, + } + job.TaskGroups[0].Update = noCanaryUpdate // Create 5 existing allocations var allocs []*structs.Allocation @@ -1537,8 +1584,10 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) { allocs = append(allocs, alloc) alloc.ClientStatus = structs.AllocClientStatusRunning } + // Mark two as failed allocs[0].ClientStatus = structs.AllocClientStatusFailed + // Mark one of them as already rescheduled once allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{ {RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(), @@ -1576,8 +1625,8 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place)) // Rescheduled allocs should have previous allocs + assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place)) assertPlaceResultsHavePreviousAllocs(t, 1, r.place) assertPlacementsAreRescheduled(t, 1, r.place) } @@ -1856,14 +1905,16 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) { // Tests the reconciler creates a deployment for inplace updates func TestReconciler_CreateDeployment_RollingUpgrade_Inplace(t *testing.T) { - job := mock.Job() + jobOld := mock.Job() + job := jobOld.Copy() + job.Version++ job.TaskGroups[0].Update = noCanaryUpdate // Create 10 allocations from the old job var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() - alloc.Job = job + alloc.Job = jobOld alloc.JobID = job.ID alloc.NodeID = uuid.Generate() alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))