From 71c7c45cf6407b2f342d01009f953bc05cbd0d3a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 5 Jul 2017 12:50:40 -0700 Subject: [PATCH] Change canary handling --- scheduler/generic_sched.go | 19 ++++- scheduler/reconcile.go | 133 ++++++++++++++++++++---------- scheduler/reconcile_test.go | 159 ++++++++++++++++++++++++++++++------ scheduler/reconcile_util.go | 13 +++ 4 files changed, 255 insertions(+), 69 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 07482573d..fa0e3ff63 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -410,7 +410,16 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Handle the in-place updates + deploymentID := "" + if s.plan.Deployment != nil { + deploymentID = s.plan.Deployment.ID + } + for _, update := range results.inplaceUpdate { + if update.DeploymentID != deploymentID { + update.DeploymentID = deploymentID + update.DeploymentStatus = nil + } s.ctx.Plan().AppendAlloc(update) } @@ -485,7 +494,6 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, DeploymentID: deploymentID, - Canary: missing.canary, TaskResources: option.TaskResources, DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, @@ -501,6 +509,15 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { alloc.PreviousAllocation = missing.previousAlloc.ID } + // TODO test + // If we are placing a canary and we found a match, add the canary + // to the deployment state object. + if missing.canary { + if state, ok := s.deployment.TaskGroups[missing.taskGroup.Name]; ok { + state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID) + } + } + s.plan.AppendAlloc(alloc) } else { // Lazy initialize the failed map diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 87cd40fa0..a87fa35f8 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -38,6 +38,9 @@ type allocReconciler struct { // being stopped so we require this seperately. jobID string + // oldDeployment is the last deployment for the job + oldDeployment *structs.Deployment + // deployment is the current deployment for the job deployment *structs.Deployment @@ -168,16 +171,31 @@ func (a *allocReconciler) cancelDeployments() { return } - // Check if the deployment is referencing an older job and cancel it - if d := a.deployment; d != nil { - if d.Active() && (d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex) { - a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ - DeploymentID: a.deployment.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionNewerJob, - }) - a.deployment = nil - } + d := a.deployment + if d == nil { + return + } + + // Check if the deployment is active and referencing an older job and cancel it + if d.Active() && (d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex) { + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: a.deployment.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + a.oldDeployment = d + a.deployment = nil + } + + // Clear it as the current deployment if it is terminal + //if !d.Active() { + //a.oldDeployment = d + //a.deployment = nil + //} + // Clear it as the current deployment if it is successful + if d.Status == structs.DeploymentStatusSuccessful { + a.oldDeployment = d + a.deployment = nil } } @@ -243,40 +261,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) { } } - // Handle stopping unneeded canaries and tracking placed canaries - canaries := all.filterByCanary() - if len(canaries) != 0 { - if a.deployment != nil { - // Stop all non-promoted canaries from older deployments - current, older := canaries.filterByDeployment(a.deployment.ID) - // TODO - //nonPromotedOlder := older.filterByPromoted(false) - nonPromotedOlder := older - a.markStop(nonPromotedOlder, "", allocNotNeeded) - desiredChanges.Stop += uint64(len(nonPromotedOlder)) + canaries, all := a.handleGroupCanaries(all, desiredChanges) - // Handle canaries on migrating/lost nodes here by just stopping - // them - untainted, migrate, lost := current.filterByTainted(a.taintedNodes) - a.markStop(migrate, "", allocMigrating) - a.markStop(lost, structs.AllocClientStatusLost, allocLost) - canaries = untainted - - // Update the all set - all = all.difference(nonPromotedOlder, migrate, lost) - } else { - // Stop all non-promoted canaries - // TODO - //nonPromoted := canaries.filterByPromoted(false) - nonPromoted := canaries - a.markStop(nonPromoted, "", allocNotNeeded) - desiredChanges.Stop += uint64(len(nonPromoted)) - all = all.difference(nonPromoted) - canaries = nil - } - } - - // Determine what set of alloations are on tainted nodes + // Determine what set of allocations are on tainted nodes untainted, migrate, lost := all.filterByTainted(a.taintedNodes) // Create a structure for choosing names. Seed with the taken names which is @@ -304,6 +291,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) { desiredChanges.Ignore += uint64(len(ignore)) desiredChanges.InPlaceUpdate += uint64(len(inplace)) if !existingDeployment { + a.logger.Printf("inplace: %d", len(inplace)) + a.logger.Printf("destructive: %d", len(destructive)) dstate.DesiredTotal += len(destructive) + len(inplace) } @@ -340,6 +329,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) { // * If there are any canaries that they have been promoted place := a.computePlacements(tg, nameIndex, untainted, migrate) if !existingDeployment { + a.logger.Printf("place: %d", len(place)) dstate.DesiredTotal += len(place) } @@ -402,6 +392,61 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) { } } +// handleGroupCanaries handles the canaries for the group by stopping the +// unneeded ones and returning the current set of canaries and the updated total +// set of allocs for the group +func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) { + // Stop any canary from an older deployment or from a failed one + var stop []string + + // Cancel any non-promoted canaries from the older deployment + if a.oldDeployment != nil { + for _, s := range a.oldDeployment.TaskGroups { + if !s.Promoted { + stop = append(stop, s.PlacedCanaries...) + } + } + } + + if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed { + for _, s := range a.deployment.TaskGroups { + if !s.Promoted { + stop = append(stop, s.PlacedCanaries...) + } + } + } + + stopSet := all.fromKeys(stop) + a.markStop(stopSet, "", allocNotNeeded) + desiredChanges.Stop += uint64(len(stopSet)) + a.logger.Printf("canaries stopping b/c old or failed: %#v", stopSet) + all = all.difference(stopSet) + + // Capture our current set of canaries and handle any migrations that are + // needed by just stopping them. + if a.deployment != nil { + var canaryIDs []string + for _, s := range a.deployment.TaskGroups { + canaryIDs = append(canaryIDs, s.PlacedCanaries...) + } + + canaries = all.fromKeys(canaryIDs) + untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes) + a.logger.Printf("canaries: %#v", canaries) + a.logger.Printf("canaries migrating: %#v", migrate) + a.logger.Printf("canaries lost %#v", lost) + + a.markStop(migrate, "", allocMigrating) + a.markStop(lost, structs.AllocClientStatusLost, allocLost) + + canaries = untainted + all = all.difference(migrate, lost) + a.logger.Printf("canaries untainted: %#v", canaries) + } + + return canaries, all +} + // computeLimit returns the placement limit for a particular group. The inputs // are the group definition, the untainted and destructive allocation set and // whether we are in a canary state. @@ -564,6 +609,8 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all } else if destructiveChange { destructive[alloc.ID] = alloc } else { + // Attach the deployment ID and and clear the health if the + // deployment has changed inplace[alloc.ID] = alloc a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc) } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index e293f98c7..474a13c08 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -64,6 +64,7 @@ Deployment Tests: √ Limit calculation accounts for healthy allocs on migrating/lost nodes √ Failed deployment should not place anything √ Run after canaries have been promoted, new allocs have been rolled out and there is no deployment +√ Failed deployment cancels non-promoted task groups */ var ( @@ -1403,20 +1404,23 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { cases := []struct { name string deploymentStatus string + stop uint64 }{ { name: "paused deployment", deploymentStatus: structs.DeploymentStatusPaused, + stop: 0, }, { name: "failed deployment", deploymentStatus: structs.DeploymentStatusFailed, + stop: 1, }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - // Create a deployment that is paused and has placed some canaries + // Create a deployment that is paused/failed and has placed some canaries d := structs.NewDeployment(job) d.Status = c.deploymentStatus d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ @@ -1445,9 +1449,9 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, 0) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true canary.DeploymentID = d.ID allocs = append(allocs, canary) + d.TaskGroups[canary.TaskGroup].PlacedCanaries = []string{canary.ID} mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{canary.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive) reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) @@ -1459,10 +1463,11 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) { deploymentUpdates: nil, place: 0, inplace: 0, - stop: 0, + stop: int(c.stop), desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { - Ignore: 11, + Ignore: 11 - c.stop, + Stop: c.stop, }, }, }) @@ -1704,12 +1709,13 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { // Create a deployment that is paused and has placed some canaries d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, DesiredCanaries: 2, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Create 10 allocations from the old job var allocs []*structs.Allocation @@ -1733,8 +1739,8 @@ func TestReconciler_DrainNode_Canary(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true canary.DeploymentID = d.ID + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) allocs = append(allocs, canary) handled[canary.ID] = allocUpdateFnIgnore } @@ -1775,12 +1781,13 @@ func TestReconciler_LostNode_Canary(t *testing.T) { // Create a deployment that is paused and has placed some canaries d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, DesiredCanaries: 2, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Create 10 allocations from the old job var allocs []*structs.Allocation @@ -1804,7 +1811,7 @@ func TestReconciler_LostNode_Canary(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) canary.DeploymentID = d.ID allocs = append(allocs, canary) handled[canary.ID] = allocUpdateFnIgnore @@ -1847,12 +1854,13 @@ func TestReconciler_StopOldCanaries(t *testing.T) { // Create an old deployment that has placed some canaries d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, DesiredCanaries: 2, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Update the job job.JobModifyIndex += 10 @@ -1878,7 +1886,7 @@ func TestReconciler_StopOldCanaries(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) canary.DeploymentID = d.ID allocs = append(allocs, canary) } @@ -2070,12 +2078,13 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) { // Create an existing deployment that has placed some canaries d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: false, DesiredTotal: 10, DesiredCanaries: 4, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Create 10 allocations from the old job var allocs []*structs.Allocation @@ -2098,7 +2107,7 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) canary.DeploymentID = d.ID allocs = append(allocs, canary) } @@ -2132,12 +2141,13 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { // Create an existing deployment that has placed some canaries and mark them // promoted d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, DesiredCanaries: 2, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Create 10 allocations from the old job var allocs []*structs.Allocation @@ -2161,7 +2171,7 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) canary.DeploymentID = d.ID canary.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(true), @@ -2205,12 +2215,13 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) { // Create an existing deployment that has placed some canaries and mark them // promoted d := structs.NewDeployment(job) - d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + s := &structs.DeploymentState{ Promoted: true, DesiredTotal: 2, DesiredCanaries: 2, PlacedAllocs: 2, } + d.TaskGroups[job.TaskGroups[0].Name] = s // Create 2 allocations from the old job var allocs []*structs.Allocation @@ -2234,7 +2245,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) { canary.NodeID = structs.GenerateUUID() canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) canary.TaskGroup = job.TaskGroups[0].Name - canary.Canary = true + s.PlacedCanaries = append(s.PlacedCanaries, canary.ID) canary.DeploymentID = d.ID canary.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(true), @@ -2527,8 +2538,17 @@ func TestReconciler_CompleteDeployment(t *testing.T) { job := mock.Job() job.TaskGroups[0].Update = canaryUpdate + d := structs.NewDeployment(job) + d.Status = structs.DeploymentStatusSuccessful + d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ + Promoted: true, + DesiredTotal: 10, + DesiredCanaries: 2, + PlacedAllocs: 10, + HealthyAllocs: 10, + } + // Create allocations from the old job - dID := structs.GenerateUUID() var allocs []*structs.Allocation for i := 0; i < 10; i++ { alloc := mock.Alloc() @@ -2537,14 +2557,9 @@ func TestReconciler_CompleteDeployment(t *testing.T) { alloc.NodeID = structs.GenerateUUID() alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) alloc.TaskGroup = job.TaskGroups[0].Name - alloc.DeploymentID = dID - if i < 2 { - alloc.Canary = true - alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ - Healthy: helper.BoolToPtr(true), - // TODO - //Promoted: true, - } + alloc.DeploymentID = d.ID + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), } allocs = append(allocs, alloc) } @@ -2566,3 +2581,97 @@ func TestReconciler_CompleteDeployment(t *testing.T) { }, }) } + +// Test that a failed deployment cancels non-promoted canaries +func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) { + // Create a job with two task groups + job := mock.Job() + job.TaskGroups[0].Update = canaryUpdate + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[1].Name = "two" + + // Create an existing failed deployment that has promoted one task group + d := structs.NewDeployment(job) + d.Status = structs.DeploymentStatusFailed + s0 := &structs.DeploymentState{ + Promoted: true, + DesiredTotal: 10, + DesiredCanaries: 2, + PlacedAllocs: 4, + } + s1 := &structs.DeploymentState{ + Promoted: false, + DesiredTotal: 10, + DesiredCanaries: 2, + PlacedAllocs: 2, + } + d.TaskGroups[job.TaskGroups[0].Name] = s0 + d.TaskGroups[job.TaskGroups[1].Name] = s1 + + // Create 6 allocations from the old job + var allocs []*structs.Allocation + handled := make(map[string]allocUpdateType) + for _, group := range []int{0, 1} { + replacements := 4 + state := s0 + if group == 1 { + replacements = 2 + state = s1 + } + + // Create the healthy replacements + for i := 0; i < replacements; i++ { + new := mock.Alloc() + new.Job = job + new.JobID = job.ID + new.NodeID = structs.GenerateUUID() + new.Name = structs.AllocName(job.ID, job.TaskGroups[group].Name, uint(i)) + new.TaskGroup = job.TaskGroups[group].Name + new.DeploymentID = d.ID + new.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: helper.BoolToPtr(true), + } + allocs = append(allocs, new) + handled[new.ID] = allocUpdateFnIgnore + + // Add the alloc to the canary list + if i < 2 { + + state.PlacedCanaries = append(state.PlacedCanaries, new.ID) + } + } + for i := replacements; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[group].Name, uint(i)) + alloc.TaskGroup = job.TaskGroups[group].Name + allocs = append(allocs, alloc) + } + } + + mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive) + reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Ignore: 10, + }, + job.TaskGroups[1].Name: { + Stop: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) +} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 35d31fc63..b67895442 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -116,6 +116,19 @@ func (a allocSet) union(others ...allocSet) allocSet { return union } +// fromKeys returns an alloc set matching the passed keys +func (a allocSet) fromKeys(keys ...[]string) allocSet { + from := make(map[string]*structs.Allocation) + for _, set := range keys { + for _, k := range set { + if alloc, ok := a[k]; ok { + from[k] = alloc + } + } + } + return from +} + // fitlerByTainted takes a set of tainted nodes and filters the allocation set // into three groups: // 1. Those that exist on untainted nodes