diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 578d93dba..8175024a3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4630,18 +4630,7 @@ func (a *Allocation) Terminated() bool { // RanSuccessfully returns whether the client has ran the allocation and all // tasks finished successfully func (a *Allocation) RanSuccessfully() bool { - // Handle the case the client hasn't started the allocation. - if len(a.TaskStates) == 0 { - return false - } - - // Check to see if all the tasks finised successfully in the allocation - allSuccess := true - for _, state := range a.TaskStates { - allSuccess = allSuccess && state.Successful() - } - - return allSuccess + return a.ClientStatus == AllocClientStatusComplete } // ShouldMigrate returns if the allocation needs data migration diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 9342cb442..f22a5b304 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -294,7 +294,7 @@ func (s *GenericScheduler) process() (bool, error) { // filterCompleteAllocs filters allocations that are terminal and should be // re-placed. -func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([]*structs.Allocation, map[string]*structs.Allocation) { +func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) []*structs.Allocation { filter := func(a *structs.Allocation) bool { if s.batch { // Allocs from batch jobs should be filtered when the desired status @@ -319,19 +319,9 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([ return a.TerminalStatus() } - terminalAllocsByName := make(map[string]*structs.Allocation) n := len(allocs) for i := 0; i < n; i++ { if filter(allocs[i]) { - - // Add the allocation to the terminal allocs map if it's not already - // added or has a higher create index than the one which is - // currently present. - alloc, ok := terminalAllocsByName[allocs[i].Name] - if !ok || alloc.CreateIndex < allocs[i].CreateIndex { - terminalAllocsByName[allocs[i].Name] = allocs[i] - } - // Remove the allocation allocs[i], allocs[n-1] = allocs[n-1], nil i-- @@ -339,25 +329,7 @@ func (s *GenericScheduler) filterCompleteAllocs(allocs []*structs.Allocation) ([ } } - // If the job is batch, we want to filter allocations that have been - // replaced by a newer version for the same task group. - filtered := allocs[:n] - if s.batch { - byTG := make(map[string]*structs.Allocation) - for _, alloc := range filtered { - existing := byTG[alloc.Name] - if existing == nil || existing.CreateIndex < alloc.CreateIndex { - byTG[alloc.Name] = alloc - } - } - - filtered = make([]*structs.Allocation, 0, len(byTG)) - for _, alloc := range byTG { - filtered = append(filtered, alloc) - } - } - - return filtered, terminalAllocsByName + return allocs[:n] } // computeJobAllocs is used to reconcile differences between the job, @@ -383,7 +355,7 @@ func (s *GenericScheduler) computeJobAllocs() error { updateNonTerminalAllocsToLost(s.plan, tainted, allocs) // Filter out the allocations in a terminal state - allocs, _ = s.filterCompleteAllocs(allocs) + allocs = s.filterCompleteAllocs(allocs) reconciler := NewAllocReconciler(s.ctx.Logger(), genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID), diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 9b7255ea0..0b085ca1d 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -2643,6 +2643,7 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) { // Create a job job := mock.Job() + job.Type = structs.JobTypeBatch job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) @@ -2688,61 +2689,6 @@ func TestBatchSched_Run_CompleteAlloc(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestBatchSched_Run_DrainedAlloc(t *testing.T) { - h := NewHarness(t) - - // Create a node - node := mock.Node() - noErr(t, h.State.UpsertNode(h.NextIndex(), node)) - - // Create a job - job := mock.Job() - job.TaskGroups[0].Count = 1 - noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - - // Create a complete alloc - alloc := mock.Alloc() - alloc.Job = job - alloc.JobID = job.ID - alloc.NodeID = node.ID - alloc.Name = "my-job.web[0]" - alloc.DesiredStatus = structs.AllocDesiredStatusStop - alloc.ClientStatus = structs.AllocClientStatusComplete - noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) - - // Create a mock evaluation to register the job - eval := &structs.Evaluation{ - Namespace: structs.DefaultNamespace, - ID: structs.GenerateUUID(), - Priority: job.Priority, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: job.ID, - } - - // Process the evaluation - err := h.Process(NewBatchScheduler, eval) - if err != nil { - t.Fatalf("err: %v", err) - } - - // Ensure a plan - if len(h.Plans) != 1 { - t.Fatalf("bad: %#v", h.Plans) - } - - // Lookup the allocations by JobID - ws := memdb.NewWatchSet() - out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) - noErr(t, err) - - // Ensure a replacement alloc was placed. - if len(out) != 2 { - t.Fatalf("bad: %#v", out) - } - - h.AssertEvalStatus(t, structs.EvalStatusComplete) -} - func TestBatchSched_Run_FailedAlloc(t *testing.T) { h := NewHarness(t) @@ -2752,6 +2698,7 @@ func TestBatchSched_Run_FailedAlloc(t *testing.T) { // Create a job job := mock.Job() + job.Type = structs.JobTypeBatch job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) @@ -2813,6 +2760,7 @@ func TestBatchSched_Run_FailedAllocQueuedAllocations(t *testing.T) { // Create a job job := mock.Job() + job.Type = structs.JobTypeBatch job.TaskGroups[0].Count = 1 noErr(t, h.State.UpsertJob(h.NextIndex(), job)) @@ -2948,12 +2896,7 @@ func TestBatchSched_JobModify_InPlace_Terminal(t *testing.T) { } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) - // Update the job - job2 := mock.Job() - job2.ID = job.ID - noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) - - // Create a mock evaluation to deal with drain + // Create a mock evaluation to trigger the job eval := &structs.Evaluation{ Namespace: structs.DefaultNamespace, ID: structs.GenerateUUID(), @@ -2969,106 +2912,268 @@ func TestBatchSched_JobModify_InPlace_Terminal(t *testing.T) { } // Ensure no plan + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans[0]) + } +} + +// This test ensures that terminal jobs from older versions are ignored. +func TestBatchSched_JobModify_Destructive_Terminal(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.Job() + job.Type = structs.JobTypeBatch + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.ClientStatus = structs.AllocClientStatusComplete + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.Job() + job2.ID = job.ID + job2.Type = structs.JobTypeBatch + job2.Version++ + job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"} + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + allocs = nil + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job2 + alloc.JobID = job2.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.ClientStatus = structs.AllocClientStatusComplete + alloc.TaskStates = map[string]*structs.TaskState{ + "web": &structs.TaskState{ + State: structs.TaskStateDead, + Events: []*structs.TaskEvent{ + { + Type: structs.TaskTerminated, + ExitCode: 0, + }, + }, + }, + } + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a plan if len(h.Plans) != 0 { t.Fatalf("bad: %#v", h.Plans) } } -func TestGenericSched_FilterCompleteAllocs(t *testing.T) { - running := mock.Alloc() - desiredStop := mock.Alloc() - desiredStop.DesiredStatus = structs.AllocDesiredStatusStop +// This test asserts that an allocation from an old job that is running on a +// drained node is cleaned up. +func TestBatchSched_NodeDrain_Running_OldJob(t *testing.T) { + h := NewHarness(t) - new := mock.Alloc() - new.CreateIndex = 10000 + // Create two nodes, one that is drained and has a successfully finished + // alloc and a fresh undrained one + node := mock.Node() + node.Drain = true + node2 := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) - oldSuccessful := mock.Alloc() - oldSuccessful.CreateIndex = 30 - oldSuccessful.DesiredStatus = structs.AllocDesiredStatusStop - oldSuccessful.ClientStatus = structs.AllocClientStatusComplete - oldSuccessful.TaskStates = make(map[string]*structs.TaskState, 1) - oldSuccessful.TaskStates["foo"] = &structs.TaskState{ - State: structs.TaskStateDead, - Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 0}}, + // Create a job + job := mock.Job() + job.Type = structs.JobTypeBatch + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a running alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusRunning + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create an update job + job2 := job.Copy() + job2.TaskGroups[0].Tasks[0].Env = map[string]string{"foo": "bar"} + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, } - unsuccessful := mock.Alloc() - unsuccessful.DesiredStatus = structs.AllocDesiredStatusRun - unsuccessful.ClientStatus = structs.AllocClientStatusFailed - unsuccessful.TaskStates = make(map[string]*structs.TaskState, 1) - unsuccessful.TaskStates["foo"] = &structs.TaskState{ - State: structs.TaskStateDead, - Events: []*structs.TaskEvent{{Type: structs.TaskTerminated, ExitCode: 1}}, + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) } - cases := []struct { - Batch bool - Input, Output []*structs.Allocation - TerminalAllocs map[string]*structs.Allocation - }{ - { - Input: []*structs.Allocation{running}, - Output: []*structs.Allocation{running}, - TerminalAllocs: map[string]*structs.Allocation{}, - }, - { - Input: []*structs.Allocation{running, desiredStop}, - Output: []*structs.Allocation{running}, - TerminalAllocs: map[string]*structs.Allocation{ - desiredStop.Name: desiredStop, - }, - }, - { - Batch: true, - Input: []*structs.Allocation{running}, - Output: []*structs.Allocation{running}, - TerminalAllocs: map[string]*structs.Allocation{}, - }, - { - Batch: true, - Input: []*structs.Allocation{new, oldSuccessful}, - Output: []*structs.Allocation{new}, - TerminalAllocs: map[string]*structs.Allocation{}, - }, - { - Batch: true, - Input: []*structs.Allocation{unsuccessful}, - Output: []*structs.Allocation{}, - TerminalAllocs: map[string]*structs.Allocation{ - unsuccessful.Name: unsuccessful, - }, - }, + // Ensure a plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) } - for i, c := range cases { - g := &GenericScheduler{batch: c.Batch} - out, terminalAllocs := g.filterCompleteAllocs(c.Input) - - if !reflect.DeepEqual(out, c.Output) { - t.Log("Got:") - for i, a := range out { - t.Logf("%d: %#v", i, a) - } - t.Log("Want:") - for i, a := range c.Output { - t.Logf("%d: %#v", i, a) - } - t.Fatalf("Case %d failed", i+1) - } - - if !reflect.DeepEqual(terminalAllocs, c.TerminalAllocs) { - t.Log("Got:") - for n, a := range terminalAllocs { - t.Logf("%v: %#v", n, a) - } - t.Log("Want:") - for n, a := range c.TerminalAllocs { - t.Logf("%v: %#v", n, a) - } - t.Fatalf("Case %d failed", i+1) - } + plan := h.Plans[0] + // Ensure the plan evicted 1 + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) } + + // Ensure the plan places 1 + if len(plan.NodeAllocation[node2.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +// This test asserts that an allocation from a job that is complete on a +// drained node is ignored up. +func TestBatchSched_NodeDrain_Complete(t *testing.T) { + h := NewHarness(t) + + // Create two nodes, one that is drained and has a successfully finished + // alloc and a fresh undrained one + node := mock.Node() + node.Drain = true + node2 := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + noErr(t, h.State.UpsertNode(h.NextIndex(), node2)) + + // Create a job + job := mock.Job() + job.Type = structs.JobTypeBatch + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a complete alloc + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusComplete + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +// This is a slightly odd test but it ensures that we handle a scale down of a +// task group's count and that it works even if all the allocs have the same +// name. +func TestBatchSched_ScaleDown_SameName(t *testing.T) { + h := NewHarness(t) + + // Create a node + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a job + job := mock.Job() + job.Type = structs.JobTypeBatch + job.TaskGroups[0].Count = 1 + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a few running alloc + var allocs []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = "my-job.web[0]" + alloc.ClientStatus = structs.AllocClientStatusRunning + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewBatchScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + + plan := h.Plans[0] + + // Ensure the plan evicted 4 of the 5 + if len(plan.NodeUpdate[node.ID]) != 4 { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestGenericSched_ChainedAlloc(t *testing.T) { diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index d4468615e..dc18b17ac 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -296,6 +296,10 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { } } + // Filter batch allocations that do not need to be considered. + all, ignore := a.batchFiltration(all) + desiredChanges.Ignore += uint64(len(ignore)) + canaries, all := a.handleGroupCanaries(all, desiredChanges) // Determine what set of allocations are on tainted nodes @@ -484,6 +488,27 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { return deploymentComplete } +// batchFiltration filters batch allocations that should be ignored. These are +// allocations that are terminal from a previous job version. +func (a *allocReconciler) batchFiltration(all allocSet) (filtered, ignore allocSet) { + if !a.batch { + return all, nil + } + + filtered = filtered.union(all) + ignored := make(map[string]*structs.Allocation) + + // Ignore terminal batch jobs from older versions + for id, alloc := range filtered { + if alloc.Job.Version < a.job.Version && alloc.TerminalStatus() { + delete(filtered, id) + ignored[id] = alloc + } + } + + return filtered, ignored +} + // handleGroupCanaries handles the canaries for the group by stopping the // unneeded ones and returning the current set of canaries and the updated total // set of allocs for the group @@ -673,12 +698,34 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // Select the allocs with the highest count to remove removeNames := nameIndex.Highest(uint(remove)) for id, alloc := range untainted { - if _, remove := removeNames[alloc.Name]; remove { + if _, ok := removeNames[alloc.Name]; ok { stop[id] = alloc a.result.stop = append(a.result.stop, allocStopResult{ alloc: alloc, statusDescription: allocNotNeeded, }) + delete(untainted, id) + + remove-- + if remove == 0 { + return stop + } + } + } + + // It is possible that we didn't stop as many as we should have if there + // were allocations with duplicate names. + for id, alloc := range untainted { + stop[id] = alloc + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: alloc, + statusDescription: allocNotNeeded, + }) + delete(untainted, id) + + remove-- + if remove == 0 { + return stop } } diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index bcf2d356b..04b9cf656 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -150,6 +150,7 @@ func allocNameToIndex(name string) uint { } func assertNamesHaveIndexes(t *testing.T, indexes []int, names []string) { + t.Helper() m := make(map[uint]int) for _, i := range indexes { m[uint(i)] += 1 @@ -177,6 +178,7 @@ func assertNamesHaveIndexes(t *testing.T, indexes []int, names []string) { } func assertNoCanariesStopped(t *testing.T, d *structs.Deployment, stop []allocStopResult) { + t.Helper() canaryIndex := make(map[string]struct{}) for _, state := range d.TaskGroups { for _, c := range state.PlacedCanaries { @@ -192,6 +194,7 @@ func assertNoCanariesStopped(t *testing.T, d *structs.Deployment, stop []allocSt } func assertPlaceResultsHavePreviousAllocs(t *testing.T, numPrevious int, place []allocPlaceResult) { + t.Helper() names := make(map[string]struct{}, numPrevious) found := 0 @@ -273,7 +276,7 @@ type resultExpectation struct { } func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { - + t.Helper() if exp.createDeployment != nil && r.deployment == nil { t.Fatalf("Expect a created deployment got none") } else if exp.createDeployment == nil && r.deployment != nil { @@ -459,6 +462,46 @@ func TestReconciler_ScaleDown_Zero(t *testing.T) { assertNamesHaveIndexes(t, intRange(0, 19), stopResultsToNames(r.stop)) } +// Tests the reconciler properly handles stopping allocations for a job that has +// scaled down to zero desired where allocs have duplicate names +func TestReconciler_ScaleDown_Zero_DuplicateNames(t *testing.T) { + // Set desired 0 + job := mock.Job() + job.TaskGroups[0].Count = 0 + + // Create 20 existing allocations + var allocs []*structs.Allocation + var expectedStopped []int + for i := 0; i < 20; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = structs.GenerateUUID() + alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i%2)) + allocs = append(allocs, alloc) + expectedStopped = append(expectedStopped, i%2) + } + + reconciler := NewAllocReconciler(testLogger(), allocUpdateFnIgnore, false, job.ID, job, nil, allocs, nil) + r := reconciler.Compute() + + // Assert the correct results + assertResults(t, r, &resultExpectation{ + createDeployment: nil, + deploymentUpdates: nil, + place: 0, + inplace: 0, + stop: 20, + desiredTGUpdates: map[string]*structs.DesiredUpdates{ + job.TaskGroups[0].Name: { + Stop: 20, + }, + }, + }) + + assertNamesHaveIndexes(t, expectedStopped, stopResultsToNames(r.stop)) +} + // Tests the reconciler properly handles inplace upgrading allocations func TestReconciler_Inplace(t *testing.T) { job := mock.Job()