Deployment tests

This commit is contained in:
Alex Dadgar
2017-06-02 16:11:29 -07:00
parent f32a9a5539
commit 369a04b135
4 changed files with 1689 additions and 152 deletions

View File

@@ -4080,26 +4080,6 @@ func (a *Allocation) RanSuccessfully() bool {
return allSuccess
}
// Stub returns a list stub for the allocation
func (a *Allocation) Stub() *AllocListStub {
return &AllocListStub{
ID: a.ID,
EvalID: a.EvalID,
Name: a.Name,
NodeID: a.NodeID,
JobID: a.JobID,
TaskGroup: a.TaskGroup,
DesiredStatus: a.DesiredStatus,
DesiredDescription: a.DesiredDescription,
ClientStatus: a.ClientStatus,
ClientDescription: a.ClientDescription,
TaskStates: a.TaskStates,
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
CreateTime: a.CreateTime,
}
}
// ShouldMigrate returns if the allocation needs data migration
func (a *Allocation) ShouldMigrate() bool {
if a.DesiredStatus == AllocDesiredStatusStop || a.DesiredStatus == AllocDesiredStatusEvict {
@@ -4123,6 +4103,33 @@ func (a *Allocation) ShouldMigrate() bool {
return true
}
// DeploymentHealthy returns if the allocation is marked as healthy as part of a
// deployment
func (a *Allocation) DeploymentHealthy() bool {
return a.DeploymentStatus != nil &&
a.DeploymentStatus.Healthy != nil && *a.DeploymentStatus.Healthy
}
// Stub returns a list stub for the allocation
func (a *Allocation) Stub() *AllocListStub {
return &AllocListStub{
ID: a.ID,
EvalID: a.EvalID,
Name: a.Name,
NodeID: a.NodeID,
JobID: a.JobID,
TaskGroup: a.TaskGroup,
DesiredStatus: a.DesiredStatus,
DesiredDescription: a.DesiredDescription,
ClientStatus: a.ClientStatus,
ClientDescription: a.ClientDescription,
TaskStates: a.TaskStates,
CreateIndex: a.CreateIndex,
ModifyIndex: a.ModifyIndex,
CreateTime: a.CreateTime,
}
}
// AllocListStub is used to return a subset of alloc information
type AllocListStub struct {
ID string

View File

@@ -44,6 +44,9 @@ type allocReconciler struct {
// deploymentPaused marks whether the deployment is paused
deploymentPaused bool
// deploymentFailed marks whether the deployment is failed
deploymentFailed bool
// taintedNodes contains a map of nodes that are tainted
taintedNodes map[string]*structs.Node
@@ -119,6 +122,7 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch
// Detect if the deployment is paused
if deployment != nil {
a.deploymentPaused = deployment.Status == structs.DeploymentStatusPaused
a.deploymentFailed = deployment.Status == structs.DeploymentStatusFailed
}
return a
@@ -152,9 +156,9 @@ func (a *allocReconciler) Compute() *reconcileResults {
// computeDeployments cancels any deployment that is not needed and creates a
// deployment if it is needed
func (a *allocReconciler) computeDeployments() {
// If the job is stopped and there is a deployment cancel it
// If the job is stopped and there is a deployment non-terminal deployment, cancel it
if a.job.Stopped() {
if a.deployment != nil {
if a.deployment != nil && a.deployment.Active() {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
@@ -168,7 +172,7 @@ func (a *allocReconciler) computeDeployments() {
// Check if the deployment is referencing an older job and cancel it
if d := a.deployment; d != nil {
if d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex {
if d.Active() && (d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex) {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
Status: structs.DeploymentStatusCancelled,
@@ -178,6 +182,7 @@ func (a *allocReconciler) computeDeployments() {
}
}
// XXX Should probably do this as needed
// Create a new deployment if necessary
if a.deployment == nil && !a.job.Stopped() && a.job.HasUpdateStrategy() {
a.deployment = structs.NewDeployment(a.job)
@@ -221,13 +226,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
// that the task group no longer exists
tg := a.job.LookupTaskGroup(group)
// Determine what set of alloations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
a.logger.Printf("RECONCILER -- STOPPING ALL")
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
@@ -249,51 +252,60 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
}
}
a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.logger.Printf("RECONCILER -- untainted %#v", untainted)
// Get any existing canaries
canaries := untainted.filterByCanary()
canaries := all.filterByCanary()
// Cancel any canary from a prior deployment
if len(canaries) != 0 {
if a.deployment != nil {
current, older := canaries.filterByDeployment(a.deployment.ID)
// Stop the older canaries
a.markStop(older, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(older))
a.logger.Printf("RECONCILER -- older canaries %#v", older)
a.logger.Printf("RECONCILER -- current canaries %#v", current)
untainted = untainted.difference(older)
canaries = current
// Handle canaries on migrating/lost nodes here by just stopping
// them
untainted, migrate, lost := canaries.filterByTainted(a.taintedNodes)
a.markStop(migrate, "", allocMigrating)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
canaries = untainted
// Update the all set
all = all.difference(older, migrate, lost)
a.logger.Printf("RECONCILER -- canaries %#v", canaries)
} else {
// XXX this is totally wrong they may just be promoted good canaries
// We don't need any of those canaries since there no longer is a
// deployment
a.markStop(canaries, "", allocNotNeeded)
desiredChanges.Stop += uint64(len(canaries))
untainted = untainted.difference(canaries)
all = all.difference(canaries)
canaries = nil
}
a.logger.Printf("RECONCILER -- untainted - remove canaries %#v", untainted)
}
// Determine what set of alloations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes
// the union of untainted and migrating nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate))
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations. We ignore canaries since that can push us
// over the desired count
existingCanariesPromoted := dstate == nil || dstate.DesiredCanaries == 0 || dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, existingCanariesPromoted)
canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
// Having stopped un-needed allocations, append the canaries to the existing
// set of untainted because they are promoted. This will cause them to be
// treated like non-canaries
if existingCanariesPromoted {
if !canaryState {
untainted = untainted.union(canaries)
nameIndex.Set(canaries)
}
@@ -303,7 +315,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
desiredChanges.DestructiveUpdate += uint64(len(destructive))
if creatingDeployment {
dstate.DesiredTotal += len(destructive) + len(inplace)
}
a.logger.Printf("RECONCILER -- Stopping (%d)", len(stop))
a.logger.Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
@@ -313,16 +327,15 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
numDestructive := len(destructive)
strategy := tg.Update
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary
if requireCanary && !a.deploymentPaused {
if requireCanary && !a.deploymentPaused && !a.deploymentFailed {
number := strategy.Canary - len(canaries)
number = helper.IntMin(numDestructive, number)
desiredChanges.Canary += uint64(number)
if creatingDeployment {
dstate.DesiredCanaries = strategy.Canary
dstate.DesiredTotal += strategy.Canary
}
a.logger.Printf("RECONCILER -- Canary (%d)", number)
a.logger.Printf("RECONCILER -- Place Canaries (%d)", number)
for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
name: name,
@@ -333,12 +346,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
}
// Determine how many we can place
haveCanaries := dstate != nil && dstate.DesiredCanaries != 0
limit := a.computeLimit(tg, untainted, destructive, haveCanaries)
canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
limit := a.computeLimit(tg, untainted, destructive, canaryState)
a.logger.Printf("RECONCILER -- LIMIT %v", limit)
// Place if:
// * The deployment is not paused
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate)
@@ -346,27 +359,23 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
dstate.DesiredTotal += len(place)
}
if !a.deploymentPaused && existingCanariesPromoted {
// Update the desired changes and if we are creating a deployment update
// the state.
desiredChanges.Place += uint64(len(place))
if !a.deploymentPaused && !a.deploymentFailed && !canaryState {
// Place all new allocations
a.logger.Printf("RECONCILER -- Placing (%d)", len(place))
desiredChanges.Place += uint64(len(place))
for _, p := range place {
a.result.place = append(a.result.place, p)
}
// XXX Needs to be done in order
// Do all destructive updates
min := helper.IntMin(len(destructive), limit)
i := 0
limit -= min
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
a.logger.Printf("RECONCILER -- Destructive Updating (%d)", min)
for _, alloc := range destructive {
if i == min {
break
}
i++
for _, alloc := range destructive.nameOrder()[:min] {
a.logger.Printf("RECONCILER -- Destructive Updating %q %q", alloc.ID, alloc.Name)
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocUpdating,
@@ -377,44 +386,53 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) {
previousAlloc: alloc,
})
}
limit -= min
} else {
a.logger.Printf("RECONCILER -- NON PROMOTED CASE")
desiredChanges.Ignore += uint64(len(destructive))
}
// TODO Migrations should be done using a stagger and max_parallel.
desiredChanges.Migrate += uint64(len(migrate))
a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate))
if !a.deploymentFailed {
desiredChanges.Migrate += uint64(len(migrate))
a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate))
} else {
desiredChanges.Stop += uint64(len(migrate))
}
for _, alloc := range migrate {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocMigrating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
taskGroup: tg,
previousAlloc: alloc,
})
// If the deployment is failed, just stop the allocation
if !a.deploymentFailed {
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
taskGroup: tg,
previousAlloc: alloc,
})
}
}
}
// computeLimit returns the placement limit for a particular group. The inputs
// are the group definition, the untainted and destructive allocation set and
// whether any canaries exist or are being placed.
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive allocSet, canaries bool) int {
// whether we are in a canary state.
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive allocSet, canaryState bool) int {
// If there is no update stategy or deployment for the group we can deploy
// as many as the group has
if group.Update == nil || len(destructive) == 0 {
return group.Count
} else if a.deploymentPaused {
// If the deployment is paused, do not create anything else
} else if a.deploymentPaused || a.deploymentFailed {
// If the deployment is paused or failed, do not create anything else
return 0
}
// Get the state of the deployment for the group
deploymentState := a.deployment.TaskGroups[group.Name]
// If we have canaries and they have not been promoted the limit is 0
if canaries && (deploymentState == nil || !deploymentState.Promoted) {
if canaryState {
return 0
}
@@ -424,7 +442,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
limit := group.Update.MaxParallel
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.Healthy == nil {
if !alloc.DeploymentHealthy() {
limit--
}
}
@@ -457,7 +475,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// computeStop returns the set of allocations to stop given the group definiton
// and the set of untainted and canary allocations for the group.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, promoted bool) allocSet {
untainted, migrate, lost, canaries allocSet, canaryState bool) allocSet {
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
@@ -465,15 +483,37 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
stop = stop.union(lost)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
if !promoted {
// Canaries are in the untainted set and should be discounted.
// If we are still deploying or creating canaries, don't stop them
if canaryState {
untainted = untainted.difference(canaries)
}
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
if remove <= 0 {
return nil
return stop
}
// Prefer stopping any alloc that has the same name as the canaries if we
// are promoted
if !canaryState && len(canaries) != 0 {
canaryNames := canaries.nameSet()
for id, alloc := range untainted.difference(canaries) {
if _, match := canaryNames[alloc.Name]; match {
a.logger.Printf("ALEX -- STOPPING alloc with same name as canary %q %q", id, alloc.Name)
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(untainted, id)
remove--
if remove == 0 {
return stop
}
}
}
}
// Prefer selecting from the migrating set before stopping existing allocs
@@ -505,7 +545,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
if _, remove := removeNames[alloc.Name]; remove {
a.logger.Printf("ALEX -- STOPPING normal alloc %q", id)
a.logger.Printf("ALEX -- STOPPING normal alloc %q %q", id, alloc.Name)
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
@@ -639,13 +679,16 @@ func (a *allocNameIndex) UnsetIndex(idx uint) {
func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []string {
next := make([]string, 0, n)
// Create a name index
existingNames := existing.nameSet()
// First select indexes from the allocations that are undergoing destructive
// updates. This way we avoid duplicate names as they will get replaced.
dmap := bitmapFrom(destructive, uint(a.count))
var remainder uint
for _, idx := range dmap.IndexesInRange(true, uint(0), uint(a.count)-1) {
name := structs.AllocName(a.job, a.taskGroup, uint(idx))
if _, used := existing[name]; !used {
if _, used := existingNames[name]; !used {
next = append(next, name)
a.b.Set(uint(idx))
@@ -660,7 +703,7 @@ func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []
// Get the set of unset names that can be used
for _, idx := range a.b.IndexesInRange(false, uint(0), uint(a.count)-1) {
name := structs.AllocName(a.job, a.taskGroup, uint(idx))
if _, used := existing[name]; !used {
if _, used := existingNames[name]; !used {
next = append(next, name)
a.b.Set(uint(idx))
@@ -677,7 +720,7 @@ func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []
var i uint
for i = 0; i < remainder; i++ {
name := structs.AllocName(a.job, a.taskGroup, i)
if _, used := existing[name]; !used {
if _, used := existingNames[name]; !used {
next = append(next, name)
a.b.Set(i)

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,8 @@ package scheduler
import (
"fmt"
"sort"
"strings"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -54,10 +56,30 @@ func (a allocSet) GoString() string {
}
start := fmt.Sprintf("len(%d) [\n", len(a))
for k := range a {
start += k + ",\n"
var s []string
for k, v := range a {
s = append(s, fmt.Sprintf("%q: %v", k, v.Name))
}
return start + "]"
return start + strings.Join(s, "\n") + "]"
}
func (a allocSet) nameSet() map[string]struct{} {
names := make(map[string]struct{}, len(a))
for _, alloc := range a {
names[alloc.Name] = struct{}{}
}
return names
}
func (a allocSet) nameOrder() []*structs.Allocation {
allocs := make([]*structs.Allocation, 0, len(a))
for _, alloc := range a {
allocs = append(allocs, alloc)
}
sort.Slice(allocs, func(i, j int) bool {
return allocs[i].Index() < allocs[j].Index()
})
return allocs
}
// difference returns a new allocSet that has all the existing item except those