Non-Canary/Deployment Tests

This commit is contained in:
Alex Dadgar
2017-06-01 15:16:24 -07:00
parent d72b270a51
commit f32a9a5539
4 changed files with 1153 additions and 77 deletions

View File

@@ -12,7 +12,8 @@ import (
// update, or can be inplace updated. If it can be inplace updated, an updated
// allocation that has the new resources and alloc metrics attached will be
// returned.
type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job,
newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation)
// allocReconciler is used to determine the set of allocations that require
// placement, inplace updating or stopping given the job specification and
@@ -126,13 +127,33 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch
// Compute reconciles the existing cluster state and returns the set of changes
// required to converge the job spec and state
func (a *allocReconciler) Compute() *reconcileResults {
// Create the allocation matrix
m := newAllocMatrix(a.job, a.existingAllocs)
// Handle creating or stopoing deployments
a.computeDeployments()
// If we are just stopping a job we do not need to do anything more than
// stopping all running allocs
stopped := a.job == nil || a.job.Stop
if stopped {
a.handleStop()
if a.job.Stopped() {
a.handleStop(m)
return a.result
}
// Cancel the deployment since it is not needed
// Reconcile each group
for group, as := range m {
a.computeGroup(group, as)
}
return a.result
}
// XXX Shouldn't cancel failed deployments
// computeDeployments cancels any deployment that is not needed and creates a
// deployment if it is needed
func (a *allocReconciler) computeDeployments() {
// If the job is stopped and there is a deployment cancel it
if a.job.Stopped() {
if a.deployment != nil {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: a.deployment.ID,
@@ -141,7 +162,8 @@ func (a *allocReconciler) Compute() *reconcileResults {
})
}
return a.result
// Nothing else to do
return
}
// Check if the deployment is referencing an older job and cancel it
@@ -157,31 +179,24 @@ func (a *allocReconciler) Compute() *reconcileResults {
}
// Create a new deployment if necessary
if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() {
if a.deployment == nil && !a.job.Stopped() && a.job.HasUpdateStrategy() {
a.deployment = structs.NewDeployment(a.job)
a.result.createDeployment = a.deployment
a.logger.Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID)
}
if a.deployment != nil {
a.logger.Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID)
}
m := newAllocMatrix(a.job, a.existingAllocs)
for group, as := range m {
a.computeGroup(group, as)
}
return a.result
}
// handleStop marks all allocations to be stopped, handling the lost case
func (a *allocReconciler) handleStop() {
as := newAllocSet(a.existingAllocs)
untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
func (a *allocReconciler) handleStop(m allocMatrix) {
for group, as := range m {
untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
desiredChanges := new(structs.DesiredUpdates)
desiredChanges.Stop = uint64(len(as))
a.result.desiredTGUpdates[group] = desiredChanges
}
}
// markStop is a helper for marking a set of allocation for stop with a
@@ -197,7 +212,7 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
// computeGroup reconciles state for a particular task group.
func (a *allocReconciler) computeGroup(group string, as allocSet) {
func (a *allocReconciler) computeGroup(group string, all allocSet) {
// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[group] = desiredChanges
@@ -207,7 +222,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
tg := a.job.LookupTaskGroup(group)
// Determine what set of alloations are on tainted nodes
untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
@@ -234,22 +249,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
}
// Track the lost and migrating
desiredChanges.Migrate += uint64(len(migrate) + len(lost))
a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost))
a.logger.Printf("RECONCILER -- untainted %#v", untainted)
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
for _, alloc := range lost {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
clientStatus: structs.AllocClientStatusLost,
statusDescription: allocLost,
})
}
// Get any existing canaries
canaries := untainted.filterByCanary()
@@ -276,15 +278,15 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.logger.Printf("RECONCILER -- untainted - remove canaries %#v", untainted)
}
// Create a structure for choosing names
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted)
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate))
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations. We ignore canaries since that can push us
// over the desired count
existingCanariesPromoted := dstate == nil || dstate.DesiredCanaries == 0 || dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted.difference(canaries), canaries, existingCanariesPromoted)
a.markStop(stop, "", allocNotNeeded)
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, existingCanariesPromoted)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
@@ -293,7 +295,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// treated like non-canaries
if existingCanariesPromoted {
untainted = untainted.union(canaries)
nameIndex.Add(canaries)
nameIndex.Set(canaries)
}
// Do inplace upgrades where possible and capture the set of upgrades that
@@ -306,12 +308,10 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.logger.Printf("RECONCILER -- Stopping (%d)", len(stop))
a.logger.Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
// Get the update strategy of the group
strategy := tg.Update
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
numDestructive := len(destructive)
strategy := tg.Update
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary
if requireCanary && !a.deploymentPaused {
number := strategy.Canary - len(canaries)
@@ -341,7 +341,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// * The deployment is not paused
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted)
place := a.computePlacements(tg, nameIndex, untainted, migrate)
if creatingDeployment {
dstate.DesiredTotal += len(place)
}
@@ -381,6 +381,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
// TODO Migrations should be done using a stagger and max_parallel.
desiredChanges.Migrate += uint64(len(migrate))
a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate))
for _, alloc := range migrate {
a.result.stop = append(a.result.stop, allocStopResult{
@@ -397,8 +398,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
// computeLimit returns the placement limit for a particular group. The inputs
// are the group definition, the existing/untainted allocation set and whether
// any canaries exist or are being placed.
// are the group definition, the untainted and destructive allocation set and
// whether any canaries exist or are being placed.
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive allocSet, canaries bool) int {
// If there is no update stategy or deployment for the group we can deploy
// as many as the group has
@@ -432,12 +433,12 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
}
// computePlacement returns the set of allocations to place given the group
// definiton and the set of untainted/existing allocations for the group.
// definiton, the set of untainted and migrating allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet) []allocPlaceResult {
// Hot path the nothing to do case
existing := len(untainted)
existing := len(untainted) + len(migrate)
if existing >= group.Count {
return nil
}
@@ -456,22 +457,60 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// computeStop returns the set of allocations to stop given the group definiton
// and the set of untainted and canary allocations for the group.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, canaries allocSet, promoted bool) allocSet {
// Hot path the nothing to do case
remove := len(untainted) - group.Count
if promoted {
remove += len(canaries)
untainted, migrate, lost, canaries allocSet, promoted bool) allocSet {
// Mark all lost allocations for stop. Previous allocation doesn't matter
// here since it is on a lost node
var stop allocSet
stop = stop.union(lost)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
if !promoted {
// Canaries are in the untainted set and should be discounted.
untainted = untainted.difference(canaries)
}
// Hot path the nothing to do case
remove := len(untainted) + len(migrate) - group.Count
if remove <= 0 {
return nil
}
// Prefer selecting from the migrating set before stopping existing allocs
if len(migrate) != 0 {
mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
removeNames := mNames.Highest(uint(remove))
for id, alloc := range migrate {
if _, match := removeNames[alloc.Name]; !match {
continue
}
a.logger.Printf("ALEX -- STOPPING migrating alloc %q", id)
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
delete(migrate, id)
stop[id] = alloc
nameIndex.UnsetIndex(alloc.Index())
remove--
if remove == 0 {
return stop
}
}
}
// nameIndex does not include the canaries
a.logger.Printf("ALEX -- ATTEMPTING STOP of %d normal allocs", remove)
removeNames := nameIndex.Highest(uint(remove))
stop := make(map[string]*structs.Allocation)
for id, a := range untainted {
if _, remove := removeNames[a.Name]; remove {
stop[id] = a
for id, alloc := range untainted {
if _, remove := removeNames[alloc.Name]; remove {
a.logger.Printf("ALEX -- STOPPING normal alloc %q", id)
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocNotNeeded,
})
}
}
@@ -560,18 +599,11 @@ func bitmapFrom(input allocSet, minSize uint) structs.Bitmap {
return bitmap
}
// Add adds the allocations to the name index
func (a *allocNameIndex) Add(set allocSet) {
for _, alloc := range set {
a.b.Set(alloc.Index())
}
}
// RemoveHighest removes and returns the hightest n used names. The returned set
// can be less than n if there aren't n names set in the index
func (a *allocNameIndex) Highest(n uint) map[string]struct{} {
h := make(map[string]struct{}, n)
for i := a.b.Size(); i > uint(0) && uint(len(h)) <= n; i-- {
for i := a.b.Size(); i > uint(0) && uint(len(h)) < n; i-- {
// Use this to avoid wrapping around b/c of the unsigned int
idx := i - 1
if a.b.Check(idx) {
@@ -583,6 +615,25 @@ func (a *allocNameIndex) Highest(n uint) map[string]struct{} {
return h
}
// Set sets the indexes from the passed alloc set as used
func (a *allocNameIndex) Set(set allocSet) {
for _, alloc := range set {
a.b.Set(alloc.Index())
}
}
// Unset unsets all indexes of the passed alloc set as being used
func (a *allocNameIndex) Unset(as allocSet) {
for _, alloc := range as {
a.b.Unset(alloc.Index())
}
}
// UnsetIndex unsets the index as having its name used
func (a *allocNameIndex) UnsetIndex(idx uint) {
a.b.Unset(idx)
}
// NextCanaries returns the next n names for use as canaries and sets them as
// used. The existing canaries and destructive updates are also passed in.
func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []string {
@@ -647,13 +698,13 @@ func (a *allocNameIndex) Next(n uint) []string {
next := make([]string, 0, n)
// Get the set of unset names that can be used
var remainder uint
remainder := n
for _, idx := range a.b.IndexesInRange(false, uint(0), uint(a.count)-1) {
next = append(next, structs.AllocName(a.job, a.taskGroup, uint(idx)))
a.b.Set(uint(idx))
// If we have enough, return
remainder := n - uint(len(next))
remainder = n - uint(len(next))
if remainder == 0 {
return next
}

1020
scheduler/reconcile_test.go Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -21,11 +21,14 @@ func newAllocMatrix(job *structs.Job, allocs []*structs.Allocation) allocMatrix
}
s[a.ID] = a
}
for _, tg := range job.TaskGroups {
s, ok := m[tg.Name]
if !ok {
s = make(map[string]*structs.Allocation)
m[tg.Name] = s
if job != nil {
for _, tg := range job.TaskGroups {
s, ok := m[tg.Name]
if !ok {
s = make(map[string]*structs.Allocation)
m[tg.Name] = s
}
}
}
return m

View File

@@ -733,6 +733,8 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
}
}
// newAllocUpdateFn is a factory for the scheduler to create an allocUpdateType
// function for the reconciler
func newAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType {
return func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) {
// Same index, so nothing to do