Change canary handling

This commit is contained in:
Alex Dadgar
2017-07-05 12:50:40 -07:00
parent e5b1e3171c
commit 71c7c45cf6
4 changed files with 255 additions and 69 deletions

View File

@@ -410,7 +410,16 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Handle the in-place updates
deploymentID := ""
if s.plan.Deployment != nil {
deploymentID = s.plan.Deployment.ID
}
for _, update := range results.inplaceUpdate {
if update.DeploymentID != deploymentID {
update.DeploymentID = deploymentID
update.DeploymentStatus = nil
}
s.ctx.Plan().AppendAlloc(update)
}
@@ -485,7 +494,6 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
DeploymentID: deploymentID,
Canary: missing.canary,
TaskResources: option.TaskResources,
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
@@ -501,6 +509,15 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
alloc.PreviousAllocation = missing.previousAlloc.ID
}
// TODO test
// If we are placing a canary and we found a match, add the canary
// to the deployment state object.
if missing.canary {
if state, ok := s.deployment.TaskGroups[missing.taskGroup.Name]; ok {
state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID)
}
}
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map

View File

@@ -38,6 +38,9 @@ type allocReconciler struct {
// being stopped so we require this seperately.
jobID string
// oldDeployment is the last deployment for the job
oldDeployment *structs.Deployment
// deployment is the current deployment for the job
deployment *structs.Deployment
@@ -168,16 +171,31 @@ func (a *allocReconciler) cancelDeployments() {
return
}
// Check if the deployment is referencing an older job and cancel it
if d := a.deployment; d != nil {
if d.Active() && (d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex) {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
})
a.deployment = nil
}
d := a.deployment
if d == nil {
return
}
// Check if the deployment is active and referencing an older job and cancel it
if d.Active() && (d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex) {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
StatusDescription: structs.DeploymentStatusDescriptionNewerJob,
})
a.oldDeployment = d
a.deployment = nil
}
// Clear it as the current deployment if it is terminal
//if !d.Active() {
//a.oldDeployment = d
//a.deployment = nil
//}
// Clear it as the current deployment if it is successful
if d.Status == structs.DeploymentStatusSuccessful {
a.oldDeployment = d
a.deployment = nil
}
}
@@ -243,40 +261,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
}
}
// Handle stopping unneeded canaries and tracking placed canaries
canaries := all.filterByCanary()
if len(canaries) != 0 {
if a.deployment != nil {
// Stop all non-promoted canaries from older deployments
current, older := canaries.filterByDeployment(a.deployment.ID)
// TODO
//nonPromotedOlder := older.filterByPromoted(false)
nonPromotedOlder := older
a.markStop(nonPromotedOlder, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(nonPromotedOlder))
canaries, all := a.handleGroupCanaries(all, desiredChanges)
// Handle canaries on migrating/lost nodes here by just stopping
// them
untainted, migrate, lost := current.filterByTainted(a.taintedNodes)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
canaries = untainted
// Update the all set
all = all.difference(nonPromotedOlder, migrate, lost)
} else {
// Stop all non-promoted canaries
// TODO
//nonPromoted := canaries.filterByPromoted(false)
nonPromoted := canaries
a.markStop(nonPromoted, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(nonPromoted))
all = all.difference(nonPromoted)
canaries = nil
}
}
// Determine what set of alloations are on tainted nodes
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
// Create a structure for choosing names. Seed with the taken names which is
@@ -304,6 +291,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
a.logger.Printf("inplace: %d", len(inplace))
a.logger.Printf("destructive: %d", len(destructive))
dstate.DesiredTotal += len(destructive) + len(inplace)
}
@@ -340,6 +329,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate)
if !existingDeployment {
a.logger.Printf("place: %d", len(place))
dstate.DesiredTotal += len(place)
}
@@ -402,6 +392,61 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
}
}
// handleGroupCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
// Stop any canary from an older deployment or from a failed one
var stop []string
// Cancel any non-promoted canaries from the older deployment
if a.oldDeployment != nil {
for _, s := range a.oldDeployment.TaskGroups {
if !s.Promoted {
stop = append(stop, s.PlacedCanaries...)
}
}
}
if a.deployment != nil && a.deployment.Status == structs.DeploymentStatusFailed {
for _, s := range a.deployment.TaskGroups {
if !s.Promoted {
stop = append(stop, s.PlacedCanaries...)
}
}
}
stopSet := all.fromKeys(stop)
a.markStop(stopSet, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(stopSet))
a.logger.Printf("canaries stopping b/c old or failed: %#v", stopSet)
all = all.difference(stopSet)
// Capture our current set of canaries and handle any migrations that are
// needed by just stopping them.
if a.deployment != nil {
var canaryIDs []string
for _, s := range a.deployment.TaskGroups {
canaryIDs = append(canaryIDs, s.PlacedCanaries...)
}
canaries = all.fromKeys(canaryIDs)
untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
a.logger.Printf("canaries: %#v", canaries)
a.logger.Printf("canaries migrating: %#v", migrate)
a.logger.Printf("canaries lost %#v", lost)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
canaries = untainted
all = all.difference(migrate, lost)
a.logger.Printf("canaries untainted: %#v", canaries)
}
return canaries, all
}
// computeLimit returns the placement limit for a particular group. The inputs
// are the group definition, the untainted and destructive allocation set and
// whether we are in a canary state.
@@ -564,6 +609,8 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
} else if destructiveChange {
destructive[alloc.ID] = alloc
} else {
// Attach the deployment ID and and clear the health if the
// deployment has changed
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
}

View File

@@ -64,6 +64,7 @@ Deployment Tests:
√ Limit calculation accounts for healthy allocs on migrating/lost nodes
√ Failed deployment should not place anything
√ Run after canaries have been promoted, new allocs have been rolled out and there is no deployment
√ Failed deployment cancels non-promoted task groups
*/
var (
@@ -1403,20 +1404,23 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) {
cases := []struct {
name string
deploymentStatus string
stop uint64
}{
{
name: "paused deployment",
deploymentStatus: structs.DeploymentStatusPaused,
stop: 0,
},
{
name: "failed deployment",
deploymentStatus: structs.DeploymentStatusFailed,
stop: 1,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
// Create a deployment that is paused and has placed some canaries
// Create a deployment that is paused/failed and has placed some canaries
d := structs.NewDeployment(job)
d.Status = c.deploymentStatus
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
@@ -1445,9 +1449,9 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, 0)
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
canary.DeploymentID = d.ID
allocs = append(allocs, canary)
d.TaskGroups[canary.TaskGroup].PlacedCanaries = []string{canary.ID}
mockUpdateFn := allocUpdateFnMock(map[string]allocUpdateType{canary.ID: allocUpdateFnIgnore}, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
@@ -1459,10 +1463,11 @@ func TestReconciler_PausedOrFailedDeployment_NoMoreCanaries(t *testing.T) {
deploymentUpdates: nil,
place: 0,
inplace: 0,
stop: 0,
stop: int(c.stop),
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Ignore: 11,
Ignore: 11 - c.stop,
Stop: c.stop,
},
},
})
@@ -1704,12 +1709,13 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
// Create a deployment that is paused and has placed some canaries
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Create 10 allocations from the old job
var allocs []*structs.Allocation
@@ -1733,8 +1739,8 @@ func TestReconciler_DrainNode_Canary(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
canary.DeploymentID = d.ID
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
allocs = append(allocs, canary)
handled[canary.ID] = allocUpdateFnIgnore
}
@@ -1775,12 +1781,13 @@ func TestReconciler_LostNode_Canary(t *testing.T) {
// Create a deployment that is paused and has placed some canaries
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Create 10 allocations from the old job
var allocs []*structs.Allocation
@@ -1804,7 +1811,7 @@ func TestReconciler_LostNode_Canary(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
canary.DeploymentID = d.ID
allocs = append(allocs, canary)
handled[canary.ID] = allocUpdateFnIgnore
@@ -1847,12 +1854,13 @@ func TestReconciler_StopOldCanaries(t *testing.T) {
// Create an old deployment that has placed some canaries
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Update the job
job.JobModifyIndex += 10
@@ -1878,7 +1886,7 @@ func TestReconciler_StopOldCanaries(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
canary.DeploymentID = d.ID
allocs = append(allocs, canary)
}
@@ -2070,12 +2078,13 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) {
// Create an existing deployment that has placed some canaries
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 10,
DesiredCanaries: 4,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Create 10 allocations from the old job
var allocs []*structs.Allocation
@@ -2098,7 +2107,7 @@ func TestReconciler_NewCanaries_FillNames(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
canary.DeploymentID = d.ID
allocs = append(allocs, canary)
}
@@ -2132,12 +2141,13 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
// Create an existing deployment that has placed some canaries and mark them
// promoted
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: true,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Create 10 allocations from the old job
var allocs []*structs.Allocation
@@ -2161,7 +2171,7 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
canary.DeploymentID = d.ID
canary.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
@@ -2205,12 +2215,13 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) {
// Create an existing deployment that has placed some canaries and mark them
// promoted
d := structs.NewDeployment(job)
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
s := &structs.DeploymentState{
Promoted: true,
DesiredTotal: 2,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s
// Create 2 allocations from the old job
var allocs []*structs.Allocation
@@ -2234,7 +2245,7 @@ func TestReconciler_PromoteCanaries_CanariesEqualCount(t *testing.T) {
canary.NodeID = structs.GenerateUUID()
canary.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
canary.TaskGroup = job.TaskGroups[0].Name
canary.Canary = true
s.PlacedCanaries = append(s.PlacedCanaries, canary.ID)
canary.DeploymentID = d.ID
canary.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
@@ -2527,8 +2538,17 @@ func TestReconciler_CompleteDeployment(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Update = canaryUpdate
d := structs.NewDeployment(job)
d.Status = structs.DeploymentStatusSuccessful
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
Promoted: true,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 10,
HealthyAllocs: 10,
}
// Create allocations from the old job
dID := structs.GenerateUUID()
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
@@ -2537,14 +2557,9 @@ func TestReconciler_CompleteDeployment(t *testing.T) {
alloc.NodeID = structs.GenerateUUID()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))
alloc.TaskGroup = job.TaskGroups[0].Name
alloc.DeploymentID = dID
if i < 2 {
alloc.Canary = true
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
// TODO
//Promoted: true,
}
alloc.DeploymentID = d.ID
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
allocs = append(allocs, alloc)
}
@@ -2566,3 +2581,97 @@ func TestReconciler_CompleteDeployment(t *testing.T) {
},
})
}
// Test that a failed deployment cancels non-promoted canaries
func TestReconciler_FailedDeployment_CancelCanaries(t *testing.T) {
// Create a job with two task groups
job := mock.Job()
job.TaskGroups[0].Update = canaryUpdate
job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy())
job.TaskGroups[1].Name = "two"
// Create an existing failed deployment that has promoted one task group
d := structs.NewDeployment(job)
d.Status = structs.DeploymentStatusFailed
s0 := &structs.DeploymentState{
Promoted: true,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 4,
}
s1 := &structs.DeploymentState{
Promoted: false,
DesiredTotal: 10,
DesiredCanaries: 2,
PlacedAllocs: 2,
}
d.TaskGroups[job.TaskGroups[0].Name] = s0
d.TaskGroups[job.TaskGroups[1].Name] = s1
// Create 6 allocations from the old job
var allocs []*structs.Allocation
handled := make(map[string]allocUpdateType)
for _, group := range []int{0, 1} {
replacements := 4
state := s0
if group == 1 {
replacements = 2
state = s1
}
// Create the healthy replacements
for i := 0; i < replacements; i++ {
new := mock.Alloc()
new.Job = job
new.JobID = job.ID
new.NodeID = structs.GenerateUUID()
new.Name = structs.AllocName(job.ID, job.TaskGroups[group].Name, uint(i))
new.TaskGroup = job.TaskGroups[group].Name
new.DeploymentID = d.ID
new.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
}
allocs = append(allocs, new)
handled[new.ID] = allocUpdateFnIgnore
// Add the alloc to the canary list
if i < 2 {
state.PlacedCanaries = append(state.PlacedCanaries, new.ID)
}
}
for i := replacements; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = structs.GenerateUUID()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[group].Name, uint(i))
alloc.TaskGroup = job.TaskGroups[group].Name
allocs = append(allocs, alloc)
}
}
mockUpdateFn := allocUpdateFnMock(handled, allocUpdateFnDestructive)
reconciler := NewAllocReconciler(testLogger(), mockUpdateFn, false, job.ID, job, d, allocs, nil)
r := reconciler.Compute()
// Assert the correct results
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 0,
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Ignore: 10,
},
job.TaskGroups[1].Name: {
Stop: 2,
Ignore: 8,
},
},
})
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
}

View File

@@ -116,6 +116,19 @@ func (a allocSet) union(others ...allocSet) allocSet {
return union
}
// fromKeys returns an alloc set matching the passed keys
func (a allocSet) fromKeys(keys ...[]string) allocSet {
from := make(map[string]*structs.Allocation)
for _, set := range keys {
for _, k := range set {
if alloc, ok := a[k]; ok {
from[k] = alloc
}
}
}
return from
}
// fitlerByTainted takes a set of tainted nodes and filters the allocation set
// into three groups:
// 1. Those that exist on untainted nodes