From ce614e6b7a00ad8bc499e3db772ae2aadde88a50 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 5 Sep 2025 10:22:42 -0400 Subject: [PATCH] scheduler: `upgrade` block testing for system deployments (#26579) This changeset adds system scheduler tests of various permutations of the `update` block. It also fixes a number of bugs discovered in the process. * Don't create deployment for in-flight rollout. If a system job is in the middle of a rollout prior to upgrading to a version of Nomad with system deployments, we'll end up creating a system deployment which might never complete because previously placed allocs will not be tracked. Check to see if we have existing allocs that should belong to the new deployment and prevent a deployment from being created in that case. * Ensure we call `Copy` on `Deployment` to avoid state store corruption. * Don't limit canary counts by `max_parallel`. * Never create deployments for `sysbatch` jobs. Ref: https://hashicorp.atlassian.net/browse/NMD-761 --- nomad/state/state_store.go | 4 +- scheduler/reconciler/reconcile_node.go | 18 +- scheduler/scheduler_sysbatch_test.go | 8 +- scheduler/scheduler_system.go | 8 +- scheduler/scheduler_system_test.go | 568 +++++++++++++++++++++++++ 5 files changed, 600 insertions(+), 6 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1b3d158dd..49c44fee1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -758,8 +758,8 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri } d := raw.(*structs.Deployment) - // If the allocation belongs to a job with the same ID but a different - // create index and we are not getting all the allocations whose Jobs + // If the deployment belongs to a job with the same ID but a different + // create index and we are not getting all the deployments whose Jobs // matches the same Job ID then we skip it if !all && job != nil && d.JobCreateIndex != job.CreateIndex { continue diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index cd99ac8c9..481fb64ed 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -18,6 +18,11 @@ type NodeReconciler struct { DeploymentOld *structs.Deployment DeploymentCurrent *structs.Deployment DeploymentUpdates []*structs.DeploymentStatusUpdate + + // COMPAT(1.14.0): + // compatHasSameVersionAllocs indicates that the reconciler found some + // allocations that were for the version being deployed + compatHasSameVersionAllocs bool } func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler { @@ -61,6 +66,8 @@ func (nr *NodeReconciler) Compute( // to a list of nodes that canaries should be placed on. canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes) + compatHadExistingDeployment := nr.DeploymentCurrent != nil + result := new(NodeReconcileResult) deploymentComplete := true for nodeID, allocs := range nodeAllocs { @@ -71,6 +78,14 @@ func (nr *NodeReconciler) Compute( result.Append(diff) } + // COMPAT(1.14.0) prevent a new deployment from being created in the case + // where we've upgraded the cluster while a legacy rolling deployment was in + // flight, otherwise we won't have HealthAllocs tracking and will never mark + // the deployment as complete + if !compatHadExistingDeployment && nr.compatHasSameVersionAllocs { + nr.DeploymentCurrent = nil + } + nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) return result @@ -392,6 +407,7 @@ func (nr *NodeReconciler) computeForNode( // Everything is up-to-date IGNORE: + nr.compatHasSameVersionAllocs = true result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, @@ -520,8 +536,8 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro hadRunning := false for _, alloc := range allocs { if alloc.Job.ID == job.ID && alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { + nr.compatHasSameVersionAllocs = true hadRunning = true - break } } diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index d6d69c400..ed386fb9f 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -91,6 +91,11 @@ func TestSysBatch_JobRegister(t *testing.T) { must.Eq(t, 0, queued, must.Sprint("unexpected queued allocations")) h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // sysbatch jobs never create a deployment + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, 0, deployments) } func TestSysBatch_JobRegister_AddNode_Running(t *testing.T) { @@ -1839,9 +1844,10 @@ func TestSysBatch_canHandle(t *testing.T) { must.True(t, s.canHandle(structs.EvalTriggerPeriodicJob)) }) } + func createNodes(t *testing.T, h *tests.Harness, n int) []*structs.Node { nodes := make([]*structs.Node, n) - for i := 0; i < n; i++ { + for i := range n { node := mock.Node() nodes[i] = node must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index d8b8a3e1b..27a7700b9 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -154,11 +154,13 @@ func (s *SystemScheduler) process() (bool, error) { } if !s.sysbatch { - // Get any existing deployment s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) if err != nil { return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) } + // system deployments may be mutated in the reconciler because the node + // count can change between evaluations + s.deployment = s.deployment.Copy() } // Create a plan @@ -632,7 +634,9 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node if limit := limits[a.Alloc.TaskGroup]; limit > 0 { ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") diff.Place = append(diff.Place, a) - limits[a.Alloc.TaskGroup]-- + if !a.Canary { + limits[a.Alloc.TaskGroup]-- + } } else { limited = true } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 908927c49..8369ced42 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -5,6 +5,8 @@ package scheduler import ( "fmt" + "maps" + "slices" "sort" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/hashicorp/nomad/scheduler/feasible" "github.com/hashicorp/nomad/scheduler/reconciler" "github.com/hashicorp/nomad/scheduler/tests" + "github.com/shoenig/test" "github.com/shoenig/test/must" ) @@ -3390,3 +3393,568 @@ func TestEvictAndPlace(t *testing.T) { } } + +// TestSystemScheduler_UpdateBlock tests various permutations of the update block +func TestSystemScheduler_UpdateBlock(t *testing.T) { + ci.Parallel(t) + + collect := func(planned map[string][]*structs.Allocation) map[string]int { + if len(planned) == 0 { + return nil + } + counts := map[string]int{} + for _, node := range planned { + for _, alloc := range node { + counts[alloc.TaskGroup]++ + } + } + return counts + } + + assertDeploymentState := func(t *testing.T, expectDstates, gotDstates map[string]*structs.DeploymentState) { + t.Helper() + if expectDstates == nil { + return + } + + must.SliceContainsAll(t, + slices.Collect(maps.Keys(expectDstates)), + slices.Collect(maps.Keys(gotDstates)), + must.Sprint("expected matching task groups in deployment state")) + + for tg, expect := range expectDstates { + got := gotDstates[tg] + test.Eq(t, expect.DesiredCanaries, got.DesiredCanaries, + test.Sprintf("DesiredCanaries for %s", tg)) + test.Eq(t, expect.DesiredTotal, got.DesiredTotal, + test.Sprintf("DesiredTotal for %s", tg)) + test.Eq(t, expect.PlacedAllocs, got.PlacedAllocs, + test.Sprintf("PlacedAllocs for %s", tg)) + test.Eq(t, len(expect.PlacedCanaries), len(got.PlacedCanaries), + test.Sprintf("len(PlacedCanaries) for %s", tg)) + } + } + + tg1, tg2 := "tg1", "tg2" + + // note: all test cases assume that if there's an existing dstate that we're + // in the middle of the deployment, otherwise we're starting a new + // deployment (also noted in name of subtest) + testCases := []struct { + name string + tg1UpdateBlock *structs.UpdateStrategy + tg2UpdateBlock *structs.UpdateStrategy + existingCurrentDState map[string]*structs.DeploymentState + existingOldDState map[string]*structs.DeploymentState + + // these maps signify which nodes have allocs already, mapping + // group -> indexes in the `nodes` slice + existingPrevious map[string][]int // previous version of job + existingRunning map[string][]int // current version of job (running) + existingFailed map[string][]int // current verison of job (failed) + existingCanary map[string][]int // canaries (must match running or failed) + + expectAllocs map[string]int // plan NodeAllocations group -> count + expectStop map[string]int // plan NodeUpdates group -> count + expectDState map[string]*structs.DeploymentState + }{ + { + name: "legacy upgrade non-deployment", + expectAllocs: map[string]int{tg1: 10, tg2: 10}, + }, + + { + name: "new deployment max_parallel vs no update block", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 10}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2}, + }, + }, + + { + name: "max_parallel mid-deployment", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, HealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, HealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 4}, + tg2: {DesiredTotal: 10, PlacedAllocs: 6}, + }, + }, + + { + name: "legacy upgrade max_parallel mid-rollout", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + // expects no new deployment + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + }, + + { + name: "max_parallel underprovisioned from failure", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, UnhealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, UnhealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 4, tg2: 6}, // includes reschedules + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 6}, // includes reschedules + tg2: {DesiredTotal: 10, PlacedAllocs: 9}, + }, + }, + + { + name: "new deployment max_parallel with old underprovisioned", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4}, + tg2: {0, 1, 2}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 5}, + tg2: {DesiredTotal: 10, PlacedAllocs: 7}, + }, + // note: we immediately place on empty nodes + expectAllocs: map[string]int{tg1: 7, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 7}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "new deployment with canaries", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 3, tg2: 5}, + expectStop: map[string]int{tg1: 3, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"0", "1", "2"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5}, + }, + }, + + { + name: "canaries failed", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 1, + UnhealthyAllocs: 2, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, // only 2 replacements + expectStop: map[string]int{tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries partial placement", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 8, 9}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 1, + HealthyAllocs: 1, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, + expectStop: map[string]int{tg1: 2, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries awaiting promotion", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: false, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: nil, + expectStop: nil, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries promoted", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: true, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: true, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2}, + expectStop: map[string]int{tg1: 2}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + h := tests.NewHarness(t) + nodes := createNodes(t, h, 10) + + oldJob := mock.SystemJob() + oldJob.TaskGroups[0].Update = tc.tg1UpdateBlock + oldJob.TaskGroups[0].Name = tg1 + taskGroup2 := oldJob.TaskGroups[0].Copy() + taskGroup2.Update = tc.tg2UpdateBlock + taskGroup2.Name = tg2 + oldJob.TaskGroups = append(oldJob.TaskGroups, taskGroup2) + + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, h.NextIndex(), nil, oldJob)) + + // destructively update both task groups of the job + job := oldJob.Copy() + job.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + job.TaskGroups[1].Tasks[0].Config["command"] = "/bin/other" + idx := h.NextIndex() + job.CreateIndex = idx + job.JobModifyIndex = idx + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, idx, nil, job)) + + expectedDeployments := 0 + if len(tc.existingOldDState) > 0 { + d := mock.Deployment() + d.JobID = job.ID + d.JobVersion = oldJob.Version + d.JobCreateIndex = oldJob.CreateIndex + d.JobModifyIndex = oldJob.JobModifyIndex + h.State.UpsertDeployment(h.NextIndex(), d) + expectedDeployments++ + } + + dID := uuid.Generate() + + existAllocs := []*structs.Allocation{} + for _, tg := range []string{tg1, tg2} { + nodesToAllocs := map[string]string{} + for _, nodeIdx := range tc.existingPrevious[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = oldJob + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingRunning[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingFailed[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusFailed + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(false), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for i, nodeIdx := range tc.existingCanary[tg] { + nodeID := nodes[nodeIdx].ID + // find the correct alloc IDs for the PlaceCanaries + if dstate, ok := tc.existingCurrentDState[tg]; ok { + dstate.PlacedCanaries[i] = nodesToAllocs[nodeID] + } + } + } + + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), + existAllocs)) + + if len(tc.existingCurrentDState) > 0 { + d := mock.Deployment() + d.ID = dID + d.JobID = job.ID + d.JobVersion = job.Version + d.JobCreateIndex = job.CreateIndex + d.JobModifyIndex = job.JobModifyIndex + d.TaskGroups = tc.existingCurrentDState + h.State.UpsertDeployment(h.NextIndex(), d) + } + if len(tc.expectDState) > 0 { + expectedDeployments++ + } + + eval := &structs.Evaluation{ + Namespace: job.Namespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals( + structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + plan := h.Plans[0] + + stopped := collect(plan.NodeUpdate) + test.Eq(t, tc.expectStop, stopped, test.Sprint("expected stop/evict")) + + // note: includes existing allocs but not previous version allocs + nodeAllocs := collect(plan.NodeAllocation) + test.Eq(t, tc.expectAllocs, nodeAllocs, test.Sprint("expected keep/place")) + + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, expectedDeployments, deployments, must.Sprint("expected deployments")) + + if tc.expectDState != nil { + deployment, err := h.State.LatestDeploymentByJobID(nil, job.Namespace, job.ID) + must.NoError(t, err) + assertDeploymentState(t, tc.expectDState, deployment.TaskGroups) + } + }) + } + +}