mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
scheduler: upgrade block testing for system deployments (#26579)
This changeset adds system scheduler tests of various permutations of the `update` block. It also fixes a number of bugs discovered in the process. * Don't create deployment for in-flight rollout. If a system job is in the middle of a rollout prior to upgrading to a version of Nomad with system deployments, we'll end up creating a system deployment which might never complete because previously placed allocs will not be tracked. Check to see if we have existing allocs that should belong to the new deployment and prevent a deployment from being created in that case. * Ensure we call `Copy` on `Deployment` to avoid state store corruption. * Don't limit canary counts by `max_parallel`. * Never create deployments for `sysbatch` jobs. Ref: https://hashicorp.atlassian.net/browse/NMD-761
This commit is contained in:
@@ -758,8 +758,8 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri
|
||||
}
|
||||
d := raw.(*structs.Deployment)
|
||||
|
||||
// If the allocation belongs to a job with the same ID but a different
|
||||
// create index and we are not getting all the allocations whose Jobs
|
||||
// If the deployment belongs to a job with the same ID but a different
|
||||
// create index and we are not getting all the deployments whose Jobs
|
||||
// matches the same Job ID then we skip it
|
||||
if !all && job != nil && d.JobCreateIndex != job.CreateIndex {
|
||||
continue
|
||||
|
||||
@@ -18,6 +18,11 @@ type NodeReconciler struct {
|
||||
DeploymentOld *structs.Deployment
|
||||
DeploymentCurrent *structs.Deployment
|
||||
DeploymentUpdates []*structs.DeploymentStatusUpdate
|
||||
|
||||
// COMPAT(1.14.0):
|
||||
// compatHasSameVersionAllocs indicates that the reconciler found some
|
||||
// allocations that were for the version being deployed
|
||||
compatHasSameVersionAllocs bool
|
||||
}
|
||||
|
||||
func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler {
|
||||
@@ -61,6 +66,8 @@ func (nr *NodeReconciler) Compute(
|
||||
// to a list of nodes that canaries should be placed on.
|
||||
canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes)
|
||||
|
||||
compatHadExistingDeployment := nr.DeploymentCurrent != nil
|
||||
|
||||
result := new(NodeReconcileResult)
|
||||
deploymentComplete := true
|
||||
for nodeID, allocs := range nodeAllocs {
|
||||
@@ -71,6 +78,14 @@ func (nr *NodeReconciler) Compute(
|
||||
result.Append(diff)
|
||||
}
|
||||
|
||||
// COMPAT(1.14.0) prevent a new deployment from being created in the case
|
||||
// where we've upgraded the cluster while a legacy rolling deployment was in
|
||||
// flight, otherwise we won't have HealthAllocs tracking and will never mark
|
||||
// the deployment as complete
|
||||
if !compatHadExistingDeployment && nr.compatHasSameVersionAllocs {
|
||||
nr.DeploymentCurrent = nil
|
||||
}
|
||||
|
||||
nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...)
|
||||
|
||||
return result
|
||||
@@ -392,6 +407,7 @@ func (nr *NodeReconciler) computeForNode(
|
||||
|
||||
// Everything is up-to-date
|
||||
IGNORE:
|
||||
nr.compatHasSameVersionAllocs = true
|
||||
result.Ignore = append(result.Ignore, AllocTuple{
|
||||
Name: name,
|
||||
TaskGroup: tg,
|
||||
@@ -520,8 +536,8 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro
|
||||
hadRunning := false
|
||||
for _, alloc := range allocs {
|
||||
if alloc.Job.ID == job.ID && alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex {
|
||||
nr.compatHasSameVersionAllocs = true
|
||||
hadRunning = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -91,6 +91,11 @@ func TestSysBatch_JobRegister(t *testing.T) {
|
||||
must.Eq(t, 0, queued, must.Sprint("unexpected queued allocations"))
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
|
||||
// sysbatch jobs never create a deployment
|
||||
deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 0, deployments)
|
||||
}
|
||||
|
||||
func TestSysBatch_JobRegister_AddNode_Running(t *testing.T) {
|
||||
@@ -1839,9 +1844,10 @@ func TestSysBatch_canHandle(t *testing.T) {
|
||||
must.True(t, s.canHandle(structs.EvalTriggerPeriodicJob))
|
||||
})
|
||||
}
|
||||
|
||||
func createNodes(t *testing.T, h *tests.Harness, n int) []*structs.Node {
|
||||
nodes := make([]*structs.Node, n)
|
||||
for i := 0; i < n; i++ {
|
||||
for i := range n {
|
||||
node := mock.Node()
|
||||
nodes[i] = node
|
||||
must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
|
||||
|
||||
@@ -154,11 +154,13 @@ func (s *SystemScheduler) process() (bool, error) {
|
||||
}
|
||||
|
||||
if !s.sysbatch {
|
||||
// Get any existing deployment
|
||||
s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err)
|
||||
}
|
||||
// system deployments may be mutated in the reconciler because the node
|
||||
// count can change between evaluations
|
||||
s.deployment = s.deployment.Copy()
|
||||
}
|
||||
|
||||
// Create a plan
|
||||
@@ -632,7 +634,9 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node
|
||||
if limit := limits[a.Alloc.TaskGroup]; limit > 0 {
|
||||
ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "")
|
||||
diff.Place = append(diff.Place, a)
|
||||
if !a.Canary {
|
||||
limits[a.Alloc.TaskGroup]--
|
||||
}
|
||||
} else {
|
||||
limited = true
|
||||
}
|
||||
|
||||
@@ -5,6 +5,8 @@ package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"maps"
|
||||
"slices"
|
||||
"sort"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -19,6 +21,7 @@ import (
|
||||
"github.com/hashicorp/nomad/scheduler/feasible"
|
||||
"github.com/hashicorp/nomad/scheduler/reconciler"
|
||||
"github.com/hashicorp/nomad/scheduler/tests"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
@@ -3390,3 +3393,568 @@ func TestEvictAndPlace(t *testing.T) {
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestSystemScheduler_UpdateBlock tests various permutations of the update block
|
||||
func TestSystemScheduler_UpdateBlock(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
collect := func(planned map[string][]*structs.Allocation) map[string]int {
|
||||
if len(planned) == 0 {
|
||||
return nil
|
||||
}
|
||||
counts := map[string]int{}
|
||||
for _, node := range planned {
|
||||
for _, alloc := range node {
|
||||
counts[alloc.TaskGroup]++
|
||||
}
|
||||
}
|
||||
return counts
|
||||
}
|
||||
|
||||
assertDeploymentState := func(t *testing.T, expectDstates, gotDstates map[string]*structs.DeploymentState) {
|
||||
t.Helper()
|
||||
if expectDstates == nil {
|
||||
return
|
||||
}
|
||||
|
||||
must.SliceContainsAll(t,
|
||||
slices.Collect(maps.Keys(expectDstates)),
|
||||
slices.Collect(maps.Keys(gotDstates)),
|
||||
must.Sprint("expected matching task groups in deployment state"))
|
||||
|
||||
for tg, expect := range expectDstates {
|
||||
got := gotDstates[tg]
|
||||
test.Eq(t, expect.DesiredCanaries, got.DesiredCanaries,
|
||||
test.Sprintf("DesiredCanaries for %s", tg))
|
||||
test.Eq(t, expect.DesiredTotal, got.DesiredTotal,
|
||||
test.Sprintf("DesiredTotal for %s", tg))
|
||||
test.Eq(t, expect.PlacedAllocs, got.PlacedAllocs,
|
||||
test.Sprintf("PlacedAllocs for %s", tg))
|
||||
test.Eq(t, len(expect.PlacedCanaries), len(got.PlacedCanaries),
|
||||
test.Sprintf("len(PlacedCanaries) for %s", tg))
|
||||
}
|
||||
}
|
||||
|
||||
tg1, tg2 := "tg1", "tg2"
|
||||
|
||||
// note: all test cases assume that if there's an existing dstate that we're
|
||||
// in the middle of the deployment, otherwise we're starting a new
|
||||
// deployment (also noted in name of subtest)
|
||||
testCases := []struct {
|
||||
name string
|
||||
tg1UpdateBlock *structs.UpdateStrategy
|
||||
tg2UpdateBlock *structs.UpdateStrategy
|
||||
existingCurrentDState map[string]*structs.DeploymentState
|
||||
existingOldDState map[string]*structs.DeploymentState
|
||||
|
||||
// these maps signify which nodes have allocs already, mapping
|
||||
// group -> indexes in the `nodes` slice
|
||||
existingPrevious map[string][]int // previous version of job
|
||||
existingRunning map[string][]int // current version of job (running)
|
||||
existingFailed map[string][]int // current verison of job (failed)
|
||||
existingCanary map[string][]int // canaries (must match running or failed)
|
||||
|
||||
expectAllocs map[string]int // plan NodeAllocations group -> count
|
||||
expectStop map[string]int // plan NodeUpdates group -> count
|
||||
expectDState map[string]*structs.DeploymentState
|
||||
}{
|
||||
{
|
||||
name: "legacy upgrade non-deployment",
|
||||
expectAllocs: map[string]int{tg1: 10, tg2: 10},
|
||||
},
|
||||
|
||||
{
|
||||
name: "new deployment max_parallel vs no update block",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
existingOldDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 2, tg2: 10},
|
||||
expectStop: map[string]int{tg1: 2, tg2: 10},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 2},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "max_parallel mid-deployment",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 7},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {8, 9},
|
||||
tg2: {7, 8, 9},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 2, HealthyAllocs: 2},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 3, HealthyAllocs: 3},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 2, tg2: 3},
|
||||
expectStop: map[string]int{tg1: 2, tg2: 3},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 4},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 6},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "legacy upgrade max_parallel mid-rollout",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 7},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {8, 9},
|
||||
tg2: {7, 8, 9},
|
||||
},
|
||||
// expects no new deployment
|
||||
expectAllocs: map[string]int{tg1: 2, tg2: 3},
|
||||
expectStop: map[string]int{tg1: 2, tg2: 3},
|
||||
},
|
||||
|
||||
{
|
||||
name: "max_parallel underprovisioned from failure",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 7},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
existingFailed: map[string][]int{
|
||||
tg1: {8, 9},
|
||||
tg2: {7, 8, 9},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 2, UnhealthyAllocs: 2},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 3, UnhealthyAllocs: 3},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 4, tg2: 6}, // includes reschedules
|
||||
expectStop: map[string]int{tg1: 2, tg2: 3},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 6}, // includes reschedules
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 9},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "new deployment max_parallel with old underprovisioned",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 3,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4},
|
||||
tg2: {0, 1, 2},
|
||||
},
|
||||
existingOldDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 5},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 7},
|
||||
},
|
||||
// note: we immediately place on empty nodes
|
||||
expectAllocs: map[string]int{tg1: 7, tg2: 10},
|
||||
expectStop: map[string]int{tg1: 2, tg2: 3},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 7},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "new deployment with canaries",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Canary: 30,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 5, // no canaries here
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
existingOldDState: map[string]*structs.DeploymentState{
|
||||
tg1: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 3, tg2: 5},
|
||||
expectStop: map[string]int{tg1: 3, tg2: 5},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
DesiredTotal: 10,
|
||||
DesiredCanaries: 3,
|
||||
PlacedCanaries: []string{"0", "1", "2"},
|
||||
PlacedAllocs: 3,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 5},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "canaries failed",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Canary: 30,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 5, // no canaries here
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6},
|
||||
tg2: {0, 1, 2, 3, 4},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {7},
|
||||
tg2: {5, 6, 7, 8, 9},
|
||||
},
|
||||
existingFailed: map[string][]int{
|
||||
tg1: {8, 9},
|
||||
},
|
||||
existingCanary: map[string][]int{
|
||||
tg1: {7, 8, 9},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
Promoted: false,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
DesiredCanaries: 3,
|
||||
DesiredTotal: 10,
|
||||
PlacedAllocs: 3,
|
||||
HealthyAllocs: 1,
|
||||
UnhealthyAllocs: 2,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 2, tg2: 5}, // only 2 replacements
|
||||
expectStop: map[string]int{tg2: 5},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
DesiredTotal: 10,
|
||||
DesiredCanaries: 3,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
PlacedAllocs: 5,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "canaries partial placement",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Canary: 30,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 5, // no canaries here
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6, 8, 9},
|
||||
tg2: {0, 1, 2, 3, 4},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {7},
|
||||
tg2: {5, 6, 7, 8, 9},
|
||||
},
|
||||
existingCanary: map[string][]int{
|
||||
tg1: {7},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
Promoted: false,
|
||||
PlacedCanaries: []string{"7"},
|
||||
DesiredCanaries: 3,
|
||||
DesiredTotal: 10,
|
||||
PlacedAllocs: 1,
|
||||
HealthyAllocs: 1,
|
||||
UnhealthyAllocs: 0,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 2, tg2: 5},
|
||||
expectStop: map[string]int{tg1: 2, tg2: 5},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
DesiredTotal: 10,
|
||||
DesiredCanaries: 3,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
PlacedAllocs: 3,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "canaries awaiting promotion",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Canary: 30,
|
||||
AutoPromote: false,
|
||||
},
|
||||
tg2UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 5, // no canaries here
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {7, 8, 9},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
existingCanary: map[string][]int{
|
||||
tg1: {7, 8, 9},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
Promoted: false,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
DesiredCanaries: 3,
|
||||
DesiredTotal: 10,
|
||||
PlacedAllocs: 3,
|
||||
HealthyAllocs: 3,
|
||||
UnhealthyAllocs: 0,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10},
|
||||
},
|
||||
expectAllocs: nil,
|
||||
expectStop: nil,
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
DesiredTotal: 10,
|
||||
DesiredCanaries: 3,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
PlacedAllocs: 3,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
name: "canaries promoted",
|
||||
tg1UpdateBlock: &structs.UpdateStrategy{
|
||||
MaxParallel: 2,
|
||||
Canary: 30,
|
||||
AutoPromote: true,
|
||||
},
|
||||
existingPrevious: map[string][]int{
|
||||
tg1: {0, 1, 2, 3, 4, 5, 6},
|
||||
},
|
||||
existingRunning: map[string][]int{
|
||||
tg1: {7, 8, 9},
|
||||
tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
existingCanary: map[string][]int{
|
||||
tg1: {7, 8, 9},
|
||||
},
|
||||
existingCurrentDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
Promoted: true,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
DesiredCanaries: 3,
|
||||
DesiredTotal: 10,
|
||||
PlacedAllocs: 3,
|
||||
HealthyAllocs: 3,
|
||||
UnhealthyAllocs: 0,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10},
|
||||
},
|
||||
expectAllocs: map[string]int{tg1: 2},
|
||||
expectStop: map[string]int{tg1: 2},
|
||||
expectDState: map[string]*structs.DeploymentState{
|
||||
tg1: {
|
||||
DesiredTotal: 10,
|
||||
DesiredCanaries: 3,
|
||||
PlacedCanaries: []string{"7", "8", "9"},
|
||||
PlacedAllocs: 5,
|
||||
},
|
||||
tg2: {DesiredTotal: 10, PlacedAllocs: 10},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
h := tests.NewHarness(t)
|
||||
nodes := createNodes(t, h, 10)
|
||||
|
||||
oldJob := mock.SystemJob()
|
||||
oldJob.TaskGroups[0].Update = tc.tg1UpdateBlock
|
||||
oldJob.TaskGroups[0].Name = tg1
|
||||
taskGroup2 := oldJob.TaskGroups[0].Copy()
|
||||
taskGroup2.Update = tc.tg2UpdateBlock
|
||||
taskGroup2.Name = tg2
|
||||
oldJob.TaskGroups = append(oldJob.TaskGroups, taskGroup2)
|
||||
|
||||
must.NoError(t, h.State.UpsertJob(
|
||||
structs.MsgTypeTestSetup, h.NextIndex(), nil, oldJob))
|
||||
|
||||
// destructively update both task groups of the job
|
||||
job := oldJob.Copy()
|
||||
job.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
|
||||
job.TaskGroups[1].Tasks[0].Config["command"] = "/bin/other"
|
||||
idx := h.NextIndex()
|
||||
job.CreateIndex = idx
|
||||
job.JobModifyIndex = idx
|
||||
must.NoError(t, h.State.UpsertJob(
|
||||
structs.MsgTypeTestSetup, idx, nil, job))
|
||||
|
||||
expectedDeployments := 0
|
||||
if len(tc.existingOldDState) > 0 {
|
||||
d := mock.Deployment()
|
||||
d.JobID = job.ID
|
||||
d.JobVersion = oldJob.Version
|
||||
d.JobCreateIndex = oldJob.CreateIndex
|
||||
d.JobModifyIndex = oldJob.JobModifyIndex
|
||||
h.State.UpsertDeployment(h.NextIndex(), d)
|
||||
expectedDeployments++
|
||||
}
|
||||
|
||||
dID := uuid.Generate()
|
||||
|
||||
existAllocs := []*structs.Allocation{}
|
||||
for _, tg := range []string{tg1, tg2} {
|
||||
nodesToAllocs := map[string]string{}
|
||||
for _, nodeIdx := range tc.existingPrevious[tg] {
|
||||
alloc := mock.AllocForNode(nodes[nodeIdx])
|
||||
alloc.Job = oldJob
|
||||
alloc.JobID = job.ID
|
||||
alloc.TaskGroup = tg
|
||||
alloc.Name = fmt.Sprintf("my-job.%s[0]", tg)
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
nodesToAllocs[alloc.NodeID] = alloc.ID
|
||||
existAllocs = append(existAllocs, alloc)
|
||||
}
|
||||
for _, nodeIdx := range tc.existingRunning[tg] {
|
||||
alloc := mock.AllocForNode(nodes[nodeIdx])
|
||||
alloc.Job = job
|
||||
alloc.JobID = job.ID
|
||||
alloc.TaskGroup = tg
|
||||
alloc.Name = fmt.Sprintf("my-job.%s[0]", tg)
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
nodesToAllocs[alloc.NodeID] = alloc.ID
|
||||
|
||||
for _, canaryNodeIdx := range tc.existingCanary[tg] {
|
||||
if nodeIdx == canaryNodeIdx {
|
||||
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
|
||||
Healthy: pointer.Of(true),
|
||||
Timestamp: time.Time{},
|
||||
Canary: true,
|
||||
ModifyIndex: 0,
|
||||
}
|
||||
alloc.DeploymentID = dID
|
||||
}
|
||||
}
|
||||
existAllocs = append(existAllocs, alloc)
|
||||
}
|
||||
for _, nodeIdx := range tc.existingFailed[tg] {
|
||||
alloc := mock.AllocForNode(nodes[nodeIdx])
|
||||
alloc.Job = job
|
||||
alloc.JobID = job.ID
|
||||
alloc.TaskGroup = tg
|
||||
alloc.Name = fmt.Sprintf("my-job.%s[0]", tg)
|
||||
alloc.ClientStatus = structs.AllocClientStatusFailed
|
||||
nodesToAllocs[alloc.NodeID] = alloc.ID
|
||||
|
||||
for _, canaryNodeIdx := range tc.existingCanary[tg] {
|
||||
if nodeIdx == canaryNodeIdx {
|
||||
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
|
||||
Healthy: pointer.Of(false),
|
||||
Timestamp: time.Time{},
|
||||
Canary: true,
|
||||
ModifyIndex: 0,
|
||||
}
|
||||
alloc.DeploymentID = dID
|
||||
}
|
||||
}
|
||||
existAllocs = append(existAllocs, alloc)
|
||||
}
|
||||
for i, nodeIdx := range tc.existingCanary[tg] {
|
||||
nodeID := nodes[nodeIdx].ID
|
||||
// find the correct alloc IDs for the PlaceCanaries
|
||||
if dstate, ok := tc.existingCurrentDState[tg]; ok {
|
||||
dstate.PlacedCanaries[i] = nodesToAllocs[nodeID]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(),
|
||||
existAllocs))
|
||||
|
||||
if len(tc.existingCurrentDState) > 0 {
|
||||
d := mock.Deployment()
|
||||
d.ID = dID
|
||||
d.JobID = job.ID
|
||||
d.JobVersion = job.Version
|
||||
d.JobCreateIndex = job.CreateIndex
|
||||
d.JobModifyIndex = job.JobModifyIndex
|
||||
d.TaskGroups = tc.existingCurrentDState
|
||||
h.State.UpsertDeployment(h.NextIndex(), d)
|
||||
}
|
||||
if len(tc.expectDState) > 0 {
|
||||
expectedDeployments++
|
||||
}
|
||||
|
||||
eval := &structs.Evaluation{
|
||||
Namespace: job.Namespace,
|
||||
ID: uuid.Generate(),
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
Status: structs.EvalStatusPending,
|
||||
}
|
||||
must.NoError(t, h.State.UpsertEvals(
|
||||
structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewSystemScheduler, eval)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Ensure a single plan
|
||||
must.Len(t, 1, h.Plans)
|
||||
plan := h.Plans[0]
|
||||
|
||||
stopped := collect(plan.NodeUpdate)
|
||||
test.Eq(t, tc.expectStop, stopped, test.Sprint("expected stop/evict"))
|
||||
|
||||
// note: includes existing allocs but not previous version allocs
|
||||
nodeAllocs := collect(plan.NodeAllocation)
|
||||
test.Eq(t, tc.expectAllocs, nodeAllocs, test.Sprint("expected keep/place"))
|
||||
|
||||
deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true)
|
||||
must.NoError(t, err)
|
||||
must.Len(t, expectedDeployments, deployments, must.Sprint("expected deployments"))
|
||||
|
||||
if tc.expectDState != nil {
|
||||
deployment, err := h.State.LatestDeploymentByJobID(nil, job.Namespace, job.ID)
|
||||
must.NoError(t, err)
|
||||
assertDeploymentState(t, tc.expectDState, deployment.TaskGroups)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user