From f32a9a553933defd9edca4c826b9c08cae8cc2e6 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 1 Jun 2017 15:16:24 -0700 Subject: [PATCH] Non-Canary/Deployment Tests --- scheduler/reconcile.go | 195 ++++--- scheduler/reconcile_test.go | 1020 +++++++++++++++++++++++++++++++++++ scheduler/reconcile_util.go | 13 +- scheduler/util.go | 2 + 4 files changed, 1153 insertions(+), 77 deletions(-) create mode 100644 scheduler/reconcile_test.go diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index f6983689d..7c27d12bc 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -12,7 +12,8 @@ import ( // update, or can be inplace updated. If it can be inplace updated, an updated // allocation that has the new resources and alloc metrics attached will be // returned. -type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) +type allocUpdateType func(existing *structs.Allocation, newJob *structs.Job, + newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) // allocReconciler is used to determine the set of allocations that require // placement, inplace updating or stopping given the job specification and @@ -126,13 +127,33 @@ func NewAllocReconciler(logger *log.Logger, allocUpdateFn allocUpdateType, batch // Compute reconciles the existing cluster state and returns the set of changes // required to converge the job spec and state func (a *allocReconciler) Compute() *reconcileResults { + // Create the allocation matrix + m := newAllocMatrix(a.job, a.existingAllocs) + + // Handle creating or stopoing deployments + a.computeDeployments() + // If we are just stopping a job we do not need to do anything more than // stopping all running allocs - stopped := a.job == nil || a.job.Stop - if stopped { - a.handleStop() + if a.job.Stopped() { + a.handleStop(m) + return a.result + } - // Cancel the deployment since it is not needed + // Reconcile each group + for group, as := range m { + a.computeGroup(group, as) + } + + return a.result +} + +// XXX Shouldn't cancel failed deployments +// computeDeployments cancels any deployment that is not needed and creates a +// deployment if it is needed +func (a *allocReconciler) computeDeployments() { + // If the job is stopped and there is a deployment cancel it + if a.job.Stopped() { if a.deployment != nil { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ DeploymentID: a.deployment.ID, @@ -141,7 +162,8 @@ func (a *allocReconciler) Compute() *reconcileResults { }) } - return a.result + // Nothing else to do + return } // Check if the deployment is referencing an older job and cancel it @@ -157,31 +179,24 @@ func (a *allocReconciler) Compute() *reconcileResults { } // Create a new deployment if necessary - if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() { + if a.deployment == nil && !a.job.Stopped() && a.job.HasUpdateStrategy() { a.deployment = structs.NewDeployment(a.job) a.result.createDeployment = a.deployment a.logger.Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID) } - - if a.deployment != nil { - a.logger.Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID) - } - - m := newAllocMatrix(a.job, a.existingAllocs) - for group, as := range m { - a.computeGroup(group, as) - } - - return a.result } // handleStop marks all allocations to be stopped, handling the lost case -func (a *allocReconciler) handleStop() { - as := newAllocSet(a.existingAllocs) - untainted, migrate, lost := as.filterByTainted(a.taintedNodes) - a.markStop(untainted, "", allocNotNeeded) - a.markStop(migrate, "", allocNotNeeded) - a.markStop(lost, structs.AllocClientStatusLost, allocLost) +func (a *allocReconciler) handleStop(m allocMatrix) { + for group, as := range m { + untainted, migrate, lost := as.filterByTainted(a.taintedNodes) + a.markStop(untainted, "", allocNotNeeded) + a.markStop(migrate, "", allocNotNeeded) + a.markStop(lost, structs.AllocClientStatusLost, allocLost) + desiredChanges := new(structs.DesiredUpdates) + desiredChanges.Stop = uint64(len(as)) + a.result.desiredTGUpdates[group] = desiredChanges + } } // markStop is a helper for marking a set of allocation for stop with a @@ -197,7 +212,7 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript } // computeGroup reconciles state for a particular task group. -func (a *allocReconciler) computeGroup(group string, as allocSet) { +func (a *allocReconciler) computeGroup(group string, all allocSet) { // Create the desired update object for the group desiredChanges := new(structs.DesiredUpdates) a.result.desiredTGUpdates[group] = desiredChanges @@ -207,7 +222,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { tg := a.job.LookupTaskGroup(group) // Determine what set of alloations are on tainted nodes - untainted, migrate, lost := as.filterByTainted(a.taintedNodes) + untainted, migrate, lost := all.filterByTainted(a.taintedNodes) // If the task group is nil, then the task group has been removed so all we // need to do is stop everything @@ -234,22 +249,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } } - // Track the lost and migrating - desiredChanges.Migrate += uint64(len(migrate) + len(lost)) - a.logger.Printf("RECONCILER -- untainted (%d); migrate (%d); lost (%d)", len(untainted), len(migrate), len(lost)) a.logger.Printf("RECONCILER -- untainted %#v", untainted) - // Mark all lost allocations for stop. Previous allocation doesn't matter - // here since it is on a lost node - for _, alloc := range lost { - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: alloc, - clientStatus: structs.AllocClientStatusLost, - statusDescription: allocLost, - }) - } - // Get any existing canaries canaries := untainted.filterByCanary() @@ -276,15 +278,15 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.logger.Printf("RECONCILER -- untainted - remove canaries %#v", untainted) } - // Create a structure for choosing names - nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted) + // Create a structure for choosing names. Seed with the taken names which is + // the union of untainted and migrating nodes + nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate)) // Stop any unneeded allocations and update the untainted set to not // included stopped allocations. We ignore canaries since that can push us // over the desired count existingCanariesPromoted := dstate == nil || dstate.DesiredCanaries == 0 || dstate.Promoted - stop := a.computeStop(tg, nameIndex, untainted.difference(canaries), canaries, existingCanariesPromoted) - a.markStop(stop, "", allocNotNeeded) + stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, existingCanariesPromoted) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) @@ -293,7 +295,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // treated like non-canaries if existingCanariesPromoted { untainted = untainted.union(canaries) - nameIndex.Add(canaries) + nameIndex.Set(canaries) } // Do inplace upgrades where possible and capture the set of upgrades that @@ -306,12 +308,10 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.logger.Printf("RECONCILER -- Stopping (%d)", len(stop)) a.logger.Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) - // Get the update strategy of the group - strategy := tg.Update - // The fact that we have destructive updates and have less canaries than is // desired means we need to create canaries numDestructive := len(destructive) + strategy := tg.Update requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary if requireCanary && !a.deploymentPaused { number := strategy.Canary - len(canaries) @@ -341,7 +341,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // * The deployment is not paused // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, nameIndex, untainted) + place := a.computePlacements(tg, nameIndex, untainted, migrate) if creatingDeployment { dstate.DesiredTotal += len(place) } @@ -381,6 +381,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } // TODO Migrations should be done using a stagger and max_parallel. + desiredChanges.Migrate += uint64(len(migrate)) a.logger.Printf("RECONCILER -- Migrating (%d)", len(migrate)) for _, alloc := range migrate { a.result.stop = append(a.result.stop, allocStopResult{ @@ -397,8 +398,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } // computeLimit returns the placement limit for a particular group. The inputs -// are the group definition, the existing/untainted allocation set and whether -// any canaries exist or are being placed. +// are the group definition, the untainted and destructive allocation set and +// whether any canaries exist or are being placed. func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive allocSet, canaries bool) int { // If there is no update stategy or deployment for the group we can deploy // as many as the group has @@ -432,12 +433,12 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest } // computePlacement returns the set of allocations to place given the group -// definiton and the set of untainted/existing allocations for the group. +// definiton, the set of untainted and migrating allocations for the group. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted allocSet) []allocPlaceResult { + nameIndex *allocNameIndex, untainted, migrate allocSet) []allocPlaceResult { // Hot path the nothing to do case - existing := len(untainted) + existing := len(untainted) + len(migrate) if existing >= group.Count { return nil } @@ -456,22 +457,60 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, // computeStop returns the set of allocations to stop given the group definiton // and the set of untainted and canary allocations for the group. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, canaries allocSet, promoted bool) allocSet { - // Hot path the nothing to do case - remove := len(untainted) - group.Count - if promoted { - remove += len(canaries) + untainted, migrate, lost, canaries allocSet, promoted bool) allocSet { + + // Mark all lost allocations for stop. Previous allocation doesn't matter + // here since it is on a lost node + var stop allocSet + stop = stop.union(lost) + a.markStop(lost, structs.AllocClientStatusLost, allocLost) + + if !promoted { + // Canaries are in the untainted set and should be discounted. + untainted = untainted.difference(canaries) } + + // Hot path the nothing to do case + remove := len(untainted) + len(migrate) - group.Count if remove <= 0 { return nil } + // Prefer selecting from the migrating set before stopping existing allocs + if len(migrate) != 0 { + mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate) + removeNames := mNames.Highest(uint(remove)) + for id, alloc := range migrate { + if _, match := removeNames[alloc.Name]; !match { + continue + } + a.logger.Printf("ALEX -- STOPPING migrating alloc %q", id) + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: alloc, + statusDescription: allocNotNeeded, + }) + delete(migrate, id) + stop[id] = alloc + nameIndex.UnsetIndex(alloc.Index()) + + remove-- + if remove == 0 { + return stop + } + } + } + // nameIndex does not include the canaries + a.logger.Printf("ALEX -- ATTEMPTING STOP of %d normal allocs", remove) removeNames := nameIndex.Highest(uint(remove)) - stop := make(map[string]*structs.Allocation) - for id, a := range untainted { - if _, remove := removeNames[a.Name]; remove { - stop[id] = a + for id, alloc := range untainted { + if _, remove := removeNames[alloc.Name]; remove { + a.logger.Printf("ALEX -- STOPPING normal alloc %q", id) + stop[id] = alloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: alloc, + statusDescription: allocNotNeeded, + }) } } @@ -560,18 +599,11 @@ func bitmapFrom(input allocSet, minSize uint) structs.Bitmap { return bitmap } -// Add adds the allocations to the name index -func (a *allocNameIndex) Add(set allocSet) { - for _, alloc := range set { - a.b.Set(alloc.Index()) - } -} - // RemoveHighest removes and returns the hightest n used names. The returned set // can be less than n if there aren't n names set in the index func (a *allocNameIndex) Highest(n uint) map[string]struct{} { h := make(map[string]struct{}, n) - for i := a.b.Size(); i > uint(0) && uint(len(h)) <= n; i-- { + for i := a.b.Size(); i > uint(0) && uint(len(h)) < n; i-- { // Use this to avoid wrapping around b/c of the unsigned int idx := i - 1 if a.b.Check(idx) { @@ -583,6 +615,25 @@ func (a *allocNameIndex) Highest(n uint) map[string]struct{} { return h } +// Set sets the indexes from the passed alloc set as used +func (a *allocNameIndex) Set(set allocSet) { + for _, alloc := range set { + a.b.Set(alloc.Index()) + } +} + +// Unset unsets all indexes of the passed alloc set as being used +func (a *allocNameIndex) Unset(as allocSet) { + for _, alloc := range as { + a.b.Unset(alloc.Index()) + } +} + +// UnsetIndex unsets the index as having its name used +func (a *allocNameIndex) UnsetIndex(idx uint) { + a.b.Unset(idx) +} + // NextCanaries returns the next n names for use as canaries and sets them as // used. The existing canaries and destructive updates are also passed in. func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []string { @@ -647,13 +698,13 @@ func (a *allocNameIndex) Next(n uint) []string { next := make([]string, 0, n) // Get the set of unset names that can be used - var remainder uint + remainder := n for _, idx := range a.b.IndexesInRange(false, uint(0), uint(a.count)-1) { next = append(next, structs.AllocName(a.job, a.taskGroup, uint(idx))) a.b.Set(uint(idx)) // If we have enough, return - remainder := n - uint(len(next)) + remainder = n - uint(len(next)) if remainder == 0 { return next } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go new file mode 100644 index 000000000..6fabe1b8f --- /dev/null +++ b/scheduler/reconcile_test.go @@ -0,0 +1,1020 @@ +package scheduler + +import ( + "log" + "os" + "reflect" + "regexp" + "strconv" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/kr/pretty" +) + +/* + +TODO: +Basic Tests: +√ Place when there is nothing in the cluster +√ Place remainder when there is some in the cluster +√ Scale down from n to n-m where n != m +√ Scale down from n to zero +√ Inplace upgrade test +√ Inplace upgrade and scale up test +√ Inplace upgrade and scale down test +√ Destructive upgrade +√ Destructive upgrade and scale up test +√ Destructive upgrade and scale down test +√ Handle lost nodes +√ Handle lost nodes and scale up +√ Handle lost nodes and scale down +√ Handle draining nodes +√ Handle draining nodes and scale up +√ Handle draining nodes and scale down +√ Handle task group being removed +√ Handle job being stopped both as .Stopped and nil +√ Place more that one group + +Canary Tests: +- Stopped job cancels any existing deployment +- JobIndex change cancels any existing deployment +- Creates a deployment if any group has an update strategy +- Paused deployment doesn't create any more canaries +- Paused deployment doesn't do any placements +- Paused deployment doesn't do destructive updates +- Paused deployment does do migrations +- Canary that is on a tainted node +- Stop old canaries +- Create new canaries on job change +- Create new canaries on job change while scaling up +- Create new canaries on job change while scaling down +- Fill canaries if partial placement +- Promote canaries unblocks max_parallel +- Failed deployment should not place anything + +*/ + +func testLogger() *log.Logger { + return log.New(os.Stderr, "", log.LstdFlags) +} + +func allocUpdateFnIgnore(*structs.Allocation, *structs.Job, *structs.TaskGroup) (bool, bool, *structs.Allocation) { + return true, false, nil +} + +func allocUpdateFnDestructive(*structs.Allocation, *structs.Job, *structs.TaskGroup) (bool, bool, *structs.Allocation) { + return false, true, nil +} + +func allocUpdateFnInplace(existing *structs.Allocation, _ *structs.Job, newTG *structs.TaskGroup) (bool, bool, *structs.Allocation) { + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *existing + newAlloc.TaskResources = make(map[string]*structs.Resources) + + // Use the new task resources but keep the network from the old + for _, task := range newTG.Tasks { + r := task.Resources.Copy() + r.Networks = existing.TaskResources[task.Name].Networks + newAlloc.TaskResources[task.Name] = r + } + + return false, false, newAlloc +} + +var ( + // AllocationIndexRegex is a regular expression to find the allocation index. + allocationIndexRegex = regexp.MustCompile(".+\\[(\\d+)\\]$") +) + +// allocNameToIndex returns the index of the allocation. +func allocNameToIndex(name string) uint { + matches := allocationIndexRegex.FindStringSubmatch(name) + if len(matches) != 2 { + return 0 + } + + index, err := strconv.Atoi(matches[1]) + if err != nil { + return 0 + } + + return uint(index) +} + +func assertNamesHaveIndexes(t *testing.T, indexes []int, names []string) { + m := make(map[uint]int) + for _, i := range indexes { + m[uint(i)] += 1 + } + + for _, n := range names { + index := allocNameToIndex(n) + val, contained := m[index] + if !contained { + t.Fatalf("Unexpected index %d from name %s\nAll names: %v", index, n, names) + } + + val-- + if val < 0 { + t.Fatalf("Index %d repeated too many times\nAll names: %v", index, names) + } + m[index] = val + } + + for k, remainder := range m { + if remainder != 0 { + t.Fatalf("Index %d has %d remaining uses expected\nAll names: %v", k, remainder, names) + } + } +} + +func intRange(pairs ...int) []int { + if len(pairs)%2 != 0 { + return nil + } + + var r []int + for i := 0; i < len(pairs); i += 2 { + for j := pairs[i]; j <= pairs[i+1]; j++ { + r = append(r, j) + } + } + return r +} + +func placeResultsToNames(place []allocPlaceResult) []string { + names := make([]string, 0, len(place)) + for _, p := range place { + names = append(names, p.name) + } + return names +} + +func stopResultsToNames(stop []allocStopResult) []string { + names := make([]string, 0, len(stop)) + for _, s := range stop { + names = append(names, s.alloc.Name) + } + return names +} + +func allocsToNames(allocs []*structs.Allocation) []string { + names := make([]string, 0, len(allocs)) + for _, a := range allocs { + names = append(names, a.Name) + } + return names +} + +type resultExpectation struct { + createDeployment *structs.Deployment + deploymentUpdates int + place int + inplace int + stop int + desiredTGUpdates map[string]*structs.DesiredUpdates +} + +func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { + + if exp.createDeployment != nil && r.createDeployment == nil { + t.Fatalf("Expect a created deployment got none") + } else if exp.createDeployment == nil && r.createDeployment != nil { + t.Fatalf("Expect no created deployment; got %v", r.createDeployment) + } else if !reflect.DeepEqual(r.createDeployment, exp.createDeployment) { + t.Fatalf("Unexpected createdDeployment: %v", pretty.Diff(r.createDeployment, exp.createDeployment)) + } + + if l := len(r.deploymentUpdates); l != exp.deploymentUpdates { + t.Fatalf("Expect %d deployment updates; got %v", exp.deploymentUpdates, r.deploymentUpdates) + } + if l := len(r.place); l != exp.place { + t.Fatalf("Expected %d placements; got %d", exp.place, l) + } + if l := len(r.inplaceUpdate); l != exp.inplace { + t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l) + } + if l := len(r.stop); l != exp.stop { + t.Fatalf("Expected %d stops; got %d", exp.stop, l) + } + if l := len(r.desiredTGUpdates); l != len(exp.desiredTGUpdates) { + t.Fatalf("Expected %d task group desired tg updates annotations; got %d", len(exp.desiredTGUpdates), l) + } + + // Check the desired updates happened + for group, desired := range exp.desiredTGUpdates { + act, ok := r.desiredTGUpdates[group] + if !ok { + t.Fatalf("Expected desired updates for group %q", group) + } + + if !reflect.DeepEqual(act, desired) { + t.Fatalf("Unexpected annotations for group %q: %v", group, pretty.Diff(act, desired)) + } + } +} + +// Tests the reconciler properly handles placements for a job that has no +// existing allocations +func TestReconciler_Place_NoExisting(t *testing.T) { + job := mock.Job() + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, nil, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 10, + inplace: 0, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles placements for a job that has some +// existing allocations +func TestReconciler_Place_Existing(t *testing.T) { + job := mock.Job() + + // Create 3 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 5, + inplace: 0, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 5, + Ignore: 5, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(5, 9), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles stopping allocations for a job that has +// scaled down +func TestReconciler_ScaleDown_Partial(t *testing.T) { + // Has desired 10 + job := mock.Job() + + // Create 20 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 20; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Ignore: 10, + Stop: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(10, 19), stopResultsToNames(r.stop)) +} + +// Tests the reconciler properly handles stopping allocations for a job that has +// scaled down to zero desired +func TestReconciler_ScaleDown_Zero(t *testing.T) { + // Set desired 0 + job := mock.Job() + job.TaskGroups[0].Count = 0 + + // Create 20 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 20; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 0, + stop: 20, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Stop: 20, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 19), stopResultsToNames(r.stop)) +} + +// Tests the reconciler properly handles inplace upgrading allocations +func TestReconciler_Inplace(t *testing.T) { + job := mock.Job() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 10, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + InPlaceUpdate: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), allocsToNames(r.inplaceUpdate)) +} + +// Tests the reconciler properly handles inplace upgrading allocations while +// scaling up +func TestReconciler_Inplace_ScaleUp(t *testing.T) { + // Set desired 15 + job := mock.Job() + job.TaskGroups[0].Count = 15 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 5, + inplace: 10, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 5, + InPlaceUpdate: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), allocsToNames(r.inplaceUpdate)) + assertNamesHaveIndexes(t, intRange(10, 14), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles inplace upgrading allocations while +// scaling down +func TestReconciler_Inplace_ScaleDown(t *testing.T) { + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnInplace, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 5, + stop: 5, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Stop: 5, + InPlaceUpdate: 5, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 4), allocsToNames(r.inplaceUpdate)) + assertNamesHaveIndexes(t, intRange(5, 9), stopResultsToNames(r.stop)) +} + +// Tests the reconciler properly handles destructive upgrading allocations +func TestReconciler_Destructive(t *testing.T) { + job := mock.Job() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 10, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + DestructiveUpdate: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place)) + assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) +} + +// Tests the reconciler properly handles destructive upgrading allocations while +// scaling up +func TestReconciler_Destructive_ScaleUp(t *testing.T) { + // Set desired 15 + job := mock.Job() + job.TaskGroups[0].Count = 15 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 15, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 5, + DestructiveUpdate: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 14), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles destructive upgrading allocations while +// scaling down +func TestReconciler_Destructive_ScaleDown(t *testing.T) { + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnDestructive, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 5, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Stop: 5, + DestructiveUpdate: 5, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 4), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles lost nodes with allocations +func TestReconciler_LostNode(t *testing.T) { + job := mock.Job() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 2) + for i := 0; i < 2; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Status = structs.NodeStatusDown + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 2, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles lost nodes with allocations while +// scaling up +func TestReconciler_LostNode_ScaleUp(t *testing.T) { + // Set desired 15 + job := mock.Job() + job.TaskGroups[0].Count = 15 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 2) + for i := 0; i < 2; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Status = structs.NodeStatusDown + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 7, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 7, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 1, 10, 14), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles lost nodes with allocations while +// scaling down +func TestReconciler_LostNode_ScaleDown(t *testing.T) { + // Set desired 5 + job := mock.Job() + job.TaskGroups[0].Count = 5 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 2) + for i := 0; i < 2; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Status = structs.NodeStatusDown + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 0, + stop: 5, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Stop: 5, + Ignore: 5, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), stopResultsToNames(r.stop)) +} + +// Tests the reconciler properly handles draining nodes with allocations +func TestReconciler_DrainNode(t *testing.T) { + job := mock.Job() + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 2) + for i := 0; i < 2; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Drain = true + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 2, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles draining nodes with allocations while +// scaling up +func TestReconciler_DrainNode_ScaleUp(t *testing.T) { + // Set desired 15 + job := mock.Job() + job.TaskGroups[0].Count = 15 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 2) + for i := 0; i < 2; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Drain = true + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 7, + inplace: 0, + stop: 2, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 5, + Migrate: 2, + Ignore: 8, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 1, 10, 14), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles draining nodes with allocations while +// scaling down +func TestReconciler_DrainNode_ScaleDown(t *testing.T) { + // Set desired 8 + job := mock.Job() + job.TaskGroups[0].Count = 8 + + // Create 10 existing allocations + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + // Build a map of tainted nodes + tainted := make(map[string]*structs.Node, 3) + for i := 0; i < 3; i++ { + n := mock.Node() + n.ID = allocs[i].NodeID + n.Drain = true + tainted[n.ID] = n + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, tainted) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 1, + inplace: 0, + stop: 3, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Migrate: 1, + Stop: 2, + Ignore: 7, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 2), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 0), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles a task group being removed +func TestReconciler_RemovedTG(t *testing.T) { + job := mock.Job() + + // Create 10 allocations for a tg that no longer exists + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + oldName := job.TaskGroups[0].Name + newName := "different" + job.TaskGroups[0].Name = newName + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 10, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + oldName: { + Stop: 10, + }, + newName: { + Place: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place)) +} + +// Tests the reconciler properly handles a job in stopped states +func TestReconciler_JobStopped(t *testing.T) { + job := mock.Job() + job.Stop = true + + cases := []struct { + name string + job *structs.Job + jobID, taskGroup string + }{ + { + name: "stopped job", + job: job, + jobID: job.ID, + taskGroup: job.TaskGroups[0].Name, + }, + { + name: "nil job", + job: nil, + jobID: "foo", + taskGroup: "bar", + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + // Create 10 allocations for a tg that no longer exists + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = c.job + alloc.JobID = c.jobID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(c.jobID, c.taskGroup, uint(i)) + alloc.TaskGroup = c.taskGroup + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, c.jobID, c.job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 0, + inplace: 0, + stop: 10, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + c.taskGroup: { + Stop: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) + }) + } +} + +// Tests the reconciler properly handles jobs with multiple task groups +func TestReconciler_MultiTG(t *testing.T) { + job := mock.Job() + tg2 := job.TaskGroups[0].Copy() + tg2.Name = "foo" + job.TaskGroups = append(job.TaskGroups, tg2) + + // Create 2 existing allocations for the first tg + var allocs []*structs.Allocation + for i := 0; i < 2; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i)) + allocs = append(allocs, alloc) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: 0, + place: 18, + inplace: 0, + stop: 0, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Place: 8, + Ignore: 2, + }, + tg2.Name: { + Place: 10, + }, + }, + }) + + assertNamesHaveIndexes(t, intRange(2, 9, 0, 9), placeResultsToNames(r.place)) +} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index e039986a5..295a40e1a 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -21,11 +21,14 @@ func newAllocMatrix(job *structs.Job, allocs []*structs.Allocation) allocMatrix } s[a.ID] = a } - for _, tg := range job.TaskGroups { - s, ok := m[tg.Name] - if !ok { - s = make(map[string]*structs.Allocation) - m[tg.Name] = s + + if job != nil { + for _, tg := range job.TaskGroups { + s, ok := m[tg.Name] + if !ok { + s = make(map[string]*structs.Allocation) + m[tg.Name] = s + } } } return m diff --git a/scheduler/util.go b/scheduler/util.go index 4f6bd5f92..e2c632c00 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -733,6 +733,8 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc } } +// newAllocUpdateFn is a factory for the scheduler to create an allocUpdateType +// function for the reconciler func newAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType { return func(existing *structs.Allocation, newJob *structs.Job, newTG *structs.TaskGroup) (ignore, destructive bool, updated *structs.Allocation) { // Same index, so nothing to do