From c33e30596ca5c4f18ace357d8526966fc1420fea Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 1 Aug 2025 08:55:44 +0200 Subject: [PATCH 01/13] scheduler: support deployments in the `NodeReconciler` (#26318) This is the initial implementation of deployments for the system and sysbatch reconciler. It does not support updates or canaries at this point, it simply provides the necessary plumbing for deployments. --- scheduler/reconciler/deployments.go | 57 ++++++ scheduler/reconciler/reconcile_cluster.go | 51 ----- scheduler/reconciler/reconcile_node.go | 176 +++++++++++++++--- .../reconciler/reconcile_node_prop_test.go | 16 +- scheduler/reconciler/reconcile_node_test.go | 146 ++++++++++++++- scheduler/scheduler_system.go | 23 ++- 6 files changed, 374 insertions(+), 95 deletions(-) create mode 100644 scheduler/reconciler/deployments.go diff --git a/scheduler/reconciler/deployments.go b/scheduler/reconciler/deployments.go new file mode 100644 index 000000000..a43fbbedb --- /dev/null +++ b/scheduler/reconciler/deployments.go @@ -0,0 +1,57 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package reconciler + +import "github.com/hashicorp/nomad/nomad/structs" + +// cancelUnneededDeployments cancels any deployment that is not needed. +// A deployment update will be staged for jobs that should stop or have the +// wrong version. Unneeded deployments include: +// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +// 2. Deployments that are active, but referencing a different job version. +// 3. Deployments that are already successful. +// +// returns: old deployment, current deployment and a slice of deployment status +// updates. +func cancelUnneededDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { + var updates []*structs.DeploymentStatusUpdate + + // If the job is stopped and there is a non-terminal deployment, cancel it + if j.Stopped() { + if d != nil && d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, + }) + } + + // Nothing else to do + return d, nil, updates + } + + if d == nil { + return nil, nil, nil + } + + // Check if the deployment is active and referencing an older job and cancel it + if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { + if d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + } + + return d, nil, updates + } + + // Clear it as the current deployment if it is successful + if d.Status == structs.DeploymentStatusSuccessful { + return d, nil, updates + } + + return nil, d, updates +} diff --git a/scheduler/reconciler/reconcile_cluster.go b/scheduler/reconciler/reconcile_cluster.go index b437b3a3d..f3f04fcb9 100644 --- a/scheduler/reconciler/reconcile_cluster.go +++ b/scheduler/reconciler/reconcile_cluster.go @@ -311,57 +311,6 @@ func (a *AllocReconciler) Compute() *ReconcileResults { return result } -// cancelUnneededDeployments cancels any deployment that is not needed. -// A deployment update will be staged for jobs that should stop or have the -// wrong version. Unneeded deployments include: -// 1. Jobs that are marked for stop, but there is a non-terminal deployment. -// 2. Deployments that are active, but referencing a different job version. -// 3. Deployments that are already successful. -// -// returns: old deployment, current deployment and a slice of deployment status -// updates. -func cancelUnneededDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { - var updates []*structs.DeploymentStatusUpdate - - // If the job is stopped and there is a non-terminal deployment, cancel it - if j.Stopped() { - if d != nil && d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, - }) - } - - // Nothing else to do - return d, nil, updates - } - - if d == nil { - return nil, nil, nil - } - - // Check if the deployment is active and referencing an older job and cancel it - if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { - if d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionNewerJob, - }) - } - - return d, nil, updates - } - - // Clear it as the current deployment if it is successful - if d.Status == structs.DeploymentStatusSuccessful { - return d, nil, updates - } - - return nil, d, updates -} - // handleStop marks all allocations to be stopped, handling the lost case. // Returns result structure with desired changes field set to stopped // allocations and an array of stopped allocations. It mutates the Stop fields diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index d04895f32..255895afc 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,15 +5,29 @@ package reconciler import ( "fmt" + "slices" "time" "github.com/hashicorp/nomad/nomad/structs" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) -// Node is like diffSystemAllocsForNode however, the allocations in the +type NodeReconciler struct { + DeploymentOld *structs.Deployment + DeploymentCurrent *structs.Deployment + DeploymentUpdates []*structs.DeploymentStatusUpdate +} + +func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler { + return &NodeReconciler{ + DeploymentCurrent: deployment, + DeploymentUpdates: make([]*structs.DeploymentStatusUpdate, 0), + } +} + +// Compute is like diffSystemAllocsForNode however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. -func Node( +func (nr *NodeReconciler) Compute( job *structs.Job, // jobs whose allocations are going to be diff-ed readyNodes []*structs.Node, // list of nodes in the ready state notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining @@ -42,7 +56,9 @@ func Node( result := new(NodeReconcileResult) for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) + diff := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, + notReadyNodes, taintedNodes, required, allocs, terminal, + serverSupportsDisconnectedClients) result.Append(diff) } @@ -58,7 +74,7 @@ func Node( // that need to be replaced (running on a lost node), those that are running on // a disconnected node but may resume, and those that may still be running on // a node that has resumed reconnected. -func diffSystemAllocsForNode( +func (nr *NodeReconciler) diffSystemAllocsForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, @@ -71,11 +87,36 @@ func diffSystemAllocsForNode( ) *NodeReconcileResult { result := new(NodeReconcileResult) + // cancel deployments that aren't needed anymore + // TODO: old deployment is only used when checking for canaries + var deploymentUpdates []*structs.DeploymentStatusUpdate + _, nr.DeploymentCurrent, deploymentUpdates = cancelUnneededDeployments(job, nr.DeploymentCurrent) + nr.DeploymentUpdates = append(nr.DeploymentUpdates, deploymentUpdates...) + + /* + // TODO: the failed and paused fields are only used for dealing with canary + // placements and their respective deployments + // + // set deployment paused and failed, if we currently have a deployment + var deploymentPaused, deploymentFailed bool + if nr.DeploymentCurrent != nil { + // deployment is paused when it's manually paused by a user, or if the + // deployment is pending or initializing, which are the initial states + // for multi-region job deployments. + deploymentPaused = nr.DeploymentCurrent.Status == structs.DeploymentStatusPaused || + nr.DeploymentCurrent.Status == structs.DeploymentStatusPending || + nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing + deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed + } + // TODO: will be needed for canaries + deploymentPlaceReady := !deploymentPaused && !deploymentFailed && !isCanarying + */ + // Scan the existing updates existing := make(map[string]struct{}) // set of alloc names - for _, exist := range liveAllocs { + for _, alloc := range liveAllocs { // Index the existing node - name := exist.Name + name := alloc.Name existing[name] = struct{}{} // Check for the definition in the required set @@ -86,12 +127,12 @@ func diffSystemAllocsForNode( result.Stop = append(result.Stop, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } - supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) + supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) reconnect := false expired := false @@ -99,20 +140,20 @@ func diffSystemAllocsForNode( // Only compute reconnect for unknown and running since they need to go // through the reconnect process. if supportsDisconnectedClients && - (exist.ClientStatus == structs.AllocClientStatusUnknown || - exist.ClientStatus == structs.AllocClientStatusRunning) { - reconnect = exist.NeedsToReconnect() + (alloc.ClientStatus == structs.AllocClientStatusUnknown || + alloc.ClientStatus == structs.AllocClientStatusRunning) { + reconnect = alloc.NeedsToReconnect() if reconnect { - expired = exist.Expired(time.Now()) + expired = alloc.Expired(time.Now()) } } // If we have been marked for migration and aren't terminal, migrate - if exist.DesiredTransition.ShouldMigrate() { + if alloc.DesiredTransition.ShouldMigrate() { result.Migrate = append(result.Migrate, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } @@ -122,25 +163,25 @@ func diffSystemAllocsForNode( result.Lost = append(result.Lost, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // Ignore unknown allocs that we want to reconnect eventually. if supportsDisconnectedClients && - exist.ClientStatus == structs.AllocClientStatusUnknown && - exist.DesiredStatus == structs.AllocDesiredStatusRun { + alloc.ClientStatus == structs.AllocClientStatusUnknown && + alloc.DesiredStatus == structs.AllocDesiredStatusRun { result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // note: the node can be both tainted and nil - node, nodeIsTainted := taintedNodes[exist.NodeID] + node, nodeIsTainted := taintedNodes[alloc.NodeID] // Filter allocs on a node that is now re-connected to reconnecting. if supportsDisconnectedClients && @@ -149,8 +190,8 @@ func diffSystemAllocsForNode( // Record the new ClientStatus to indicate to future evals that the // alloc has already reconnected. - reconnecting := exist.Copy() - reconnecting.AppendState(structs.AllocStateFieldClientStatus, exist.ClientStatus) + reconnecting := alloc.Copy() + reconnecting.AppendState(structs.AllocStateFieldClientStatus, alloc.ClientStatus) result.Reconnecting = append(result.Reconnecting, AllocTuple{ Name: name, TaskGroup: tg, @@ -167,7 +208,7 @@ func diffSystemAllocsForNode( // lost as the work was already successfully finished. However for // service/system jobs, tasks should never complete. The check of // batch type, defends against client bugs. - if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() { + if alloc.Job.Type == structs.JobTypeSysBatch && alloc.RanSuccessfully() { goto IGNORE } @@ -175,9 +216,9 @@ func diffSystemAllocsForNode( if node != nil && supportsDisconnectedClients && node.Status == structs.NodeStatusDisconnected && - exist.ClientStatus == structs.AllocClientStatusRunning { + alloc.ClientStatus == structs.AllocClientStatusRunning { - disconnect := exist.Copy() + disconnect := alloc.Copy() disconnect.ClientStatus = structs.AllocClientStatusUnknown disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) disconnect.ClientDescription = sstructs.StatusAllocUnknown @@ -193,7 +234,7 @@ func diffSystemAllocsForNode( result.Lost = append(result.Lost, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) } else { goto IGNORE @@ -214,17 +255,17 @@ func diffSystemAllocsForNode( result.Stop = append(result.Stop, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // If the definition is updated we need to update - if job.JobModifyIndex != exist.Job.JobModifyIndex { + if job.JobModifyIndex != alloc.Job.JobModifyIndex { result.Update = append(result.Update, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } @@ -234,7 +275,7 @@ func diffSystemAllocsForNode( result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) } @@ -296,11 +337,88 @@ func diffSystemAllocsForNode( } result.Place = append(result.Place, allocTuple) + + // populate deployment state for this task group + var dstate = new(structs.DeploymentState) + var existingDeployment bool + if nr.DeploymentCurrent != nil { + dstate, existingDeployment = nr.DeploymentCurrent.TaskGroups[tg.Name] + } + + if !existingDeployment && dstate != nil { + if !tg.Update.IsEmpty() { + dstate.AutoRevert = tg.Update.AutoRevert + dstate.AutoPromote = tg.Update.AutoPromote + dstate.ProgressDeadline = tg.Update.ProgressDeadline + } + dstate.DesiredTotal += len(result.Place) + } + + if dstate == nil { + dstate = new(structs.DeploymentState) + } + + // in this case there's nothing to do + if existingDeployment || tg.Update.IsEmpty() || dstate.DesiredTotal == 0 { + continue + } + + // allocs that are in the Place, Update and Migrate buckets are the ones we + // consider for deployment + allocsForDeployment := []*structs.Allocation{} + for _, r := range slices.Concat(result.Place, result.Update, result.Migrate) { + allocsForDeployment = append(allocsForDeployment, r.Alloc) + } + nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) } } + return result } +func (nr *NodeReconciler) createDeployment(job *structs.Job, + tg *structs.TaskGroup, dstate *structs.DeploymentState, updates int, + allocs []*structs.Allocation) { + + if dstate == nil { + dstate = &structs.DeploymentState{} + } + + updatingSpec := updates != 0 + + hadRunning := false + for _, alloc := range allocs { + if alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { + hadRunning = true + break + } + } + + // Don't create a deployment if it's not the first time running the job + // and there are no updates to the spec. + if hadRunning && !updatingSpec { + return + } + + // A previous group may have made the deployment already. If not create one. + if nr.DeploymentCurrent == nil { + nr.DeploymentCurrent = structs.NewDeployment(job, job.Priority, time.Now().UnixNano()) + nr.DeploymentUpdates = append( + nr.DeploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusRunning, + StatusDescription: structs.DeploymentStatusDescriptionRunning, + }) + } + + // Attach the groups deployment state to the deployment + if nr.DeploymentCurrent.TaskGroups == nil { + nr.DeploymentCurrent.TaskGroups = make(map[string]*structs.DeploymentState) + } + + nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate +} + // materializeSystemTaskGroups is used to materialize all the task groups // a system or sysbatch job requires. func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { diff --git a/scheduler/reconciler/reconcile_node_prop_test.go b/scheduler/reconciler/reconcile_node_prop_test.go index 23d91a504..5774a817a 100644 --- a/scheduler/reconciler/reconcile_node_prop_test.go +++ b/scheduler/reconciler/reconcile_node_prop_test.go @@ -101,8 +101,10 @@ func TestNodeReconciler_PropTest(t *testing.T) { t.Run("system jobs", rapid.MakeCheck(func(t *rapid.T) { nr := genNodeReconciler(structs.JobTypeSystem, &idGenerator{}).Draw(t, "input") - results := Node(nr.job, nr.readyNodes, nr.notReadyNodes, - nr.taintedNodes, nr.allocs, nr.terminal, nr.serverSupportsDisconnectedClients) + n := NewNodeReconciler(nr.deployment) + results := n.Compute(nr.job, nr.readyNodes, + nr.notReadyNodes, nr.taintedNodes, nr.allocs, nr.terminal, + nr.serverSupportsDisconnectedClients) must.NotNil(t, results, must.Sprint("results should never be nil")) perTaskGroup := collectExpectedAndResults(nr, results) @@ -111,8 +113,10 @@ func TestNodeReconciler_PropTest(t *testing.T) { t.Run("sysbatch jobs", rapid.MakeCheck(func(t *rapid.T) { nr := genNodeReconciler(structs.JobTypeSysBatch, &idGenerator{}).Draw(t, "input") - results := Node(nr.job, nr.readyNodes, nr.notReadyNodes, - nr.taintedNodes, nr.allocs, nr.terminal, nr.serverSupportsDisconnectedClients) + n := NewNodeReconciler(nr.deployment) + results := n.Compute(nr.job, nr.readyNodes, + nr.notReadyNodes, nr.taintedNodes, nr.allocs, nr.terminal, + nr.serverSupportsDisconnectedClients) must.NotNil(t, results, must.Sprint("results should never be nil")) perTaskGroup := collectExpectedAndResults(nr, results) @@ -128,6 +132,7 @@ type nodeReconcilerInput struct { taintedNodes map[string]*structs.Node allocs []*structs.Allocation terminal structs.TerminalByNodeByName + deployment *structs.Deployment serverSupportsDisconnectedClients bool } @@ -190,12 +195,15 @@ func genNodeReconciler(jobType string, idg *idGenerator) *rapid.Generator[*nodeR } } + deployment := genDeployment(idg, job, live).Draw(t, "deployment") + return &nodeReconcilerInput{ job: job, readyNodes: readyNodes, notReadyNodes: notReadyNodes, taintedNodes: taintedNodes, allocs: live, + deployment: deployment, serverSupportsDisconnectedClients: rapid.Bool().Draw(t, "supports_disconnected"), } }) diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index cda2654a4..4c17bb219 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -72,7 +72,8 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -94,7 +95,8 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -155,7 +157,8 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, nil, tainted, required, allocsForNode, terminal, true) @@ -213,7 +216,8 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { tainted := map[string]*structs.Node{} terminal := structs.TerminalByNodeByName{} - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode( job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) @@ -282,7 +286,8 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { Job: job, } - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, ineligible, tainted, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -338,7 +343,8 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -389,7 +395,8 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff := nr.diffSystemAllocsForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -514,7 +521,8 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { tc.allocFn(alloc) } - got := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + got := nr.diffSystemAllocsForNode( job, tc.node.ID, eligibleNodes, nil, taintedNodes, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -602,8 +610,8 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := Node(job, nodes, nil, tainted, allocs, terminal, true) - + nr := NewNodeReconciler(nil) + diff := nr.Compute(job, nodes, nil, tainted, allocs, terminal, true) assertDiffCount(t, diffResultCount{ update: 1, ignore: 1, migrate: 1, lost: 1, place: 6}, diff) @@ -632,3 +640,121 @@ func TestDiffSystemAllocs(t *testing.T) { } } } + +// TestNodeDeployments tests various deployment-related scenarios for the node +// reconciler +func TestNodeDeployments(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + tg := job.TaskGroups[0].Copy() + tg.Name = "other" + tg.Update = structs.DefaultUpdateStrategy + job.TaskGroups = append(job.TaskGroups, tg) + + // Create two alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}} + + // Stopped job to make sure we handle these correctly + stoppedJob := job.Copy() + stoppedJob.Stop = true + + allocs := []*structs.Allocation{} + for _, n := range nodes { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + + allocs = append(allocs, a) + } + + newJobWithNoAllocs := job.Copy() + newJobWithNoAllocs.Name = "new-job" + newJobWithNoAllocs.Version = 100 + newJobWithNoAllocs.CreateIndex = 1000 + + testCases := []struct { + name string + job *structs.Job + existingDeployment *structs.Deployment + newDeployment bool + expectedNewDeploymentStatus string + expectedDeploymenStatusUpdateContains string + }{ + { + "existing successful deployment for the current job version should not return a deployment", + job, + &structs.Deployment{ + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusSuccessful, + }, + false, + "", + "", + }, + { + "existing running deployment should remain untouched", + job, + &structs.Deployment{ + JobID: job.ID, + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusRunning, + StatusDescription: structs.DeploymentStatusDescriptionRunning, + TaskGroups: map[string]*structs.DeploymentState{ + job.TaskGroups[0].Name: { + AutoRevert: true, + ProgressDeadline: time.Minute, + }, + tg.Name: { + AutoPromote: true, + }, + }, + }, + false, + "", + "", + }, + { + "existing running deployment for a stopped job should be cancelled", + stoppedJob, + &structs.Deployment{ + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusRunning, + }, + false, + structs.DeploymentStatusCancelled, + structs.DeploymentStatusDescriptionStoppedJob, + }, + { + "no existing deployment for a new job that needs one should result in a new deployment", + newJobWithNoAllocs, + nil, + true, + structs.DeploymentStatusRunning, + structs.DeploymentStatusDescriptionRunning, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nr := NewNodeReconciler(tc.existingDeployment) + nr.Compute(tc.job, nodes, nil, nil, allocs, nil, true) + if tc.newDeployment { + must.NotNil(t, nr.DeploymentCurrent, must.Sprintf("expected a non-nil deployment")) + must.Eq(t, nr.DeploymentCurrent.Status, tc.expectedNewDeploymentStatus) + } + if tc.expectedDeploymenStatusUpdateContains != "" { + must.SliceContainsFunc(t, nr.DeploymentUpdates, tc.expectedDeploymenStatusUpdateContains, + func(a *structs.DeploymentStatusUpdate, status string) bool { + return a.StatusDescription == status + }, + ) + } + }) + } +} diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 3fd9cfefa..73b746b64 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -49,6 +49,8 @@ type SystemScheduler struct { notReadyNodes map[string]struct{} nodesByDC map[string]int + deployment *structs.Deployment + limitReached bool nextEval *structs.Evaluation @@ -151,6 +153,14 @@ func (s *SystemScheduler) process() (bool, error) { } } + if !s.sysbatch { + // Get any existing deployment + s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) + } + } + // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -265,12 +275,17 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - r := reconciler.Node(s.job, s.nodes, s.notReadyNodes, tainted, live, term, + nr := reconciler.NewNodeReconciler(s.deployment) + r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, live, term, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) if s.logger.IsDebug() { s.logger.Debug("reconciled current state with desired state", r.Fields()...) } + // Add the deployment changes to the plan + s.plan.Deployment = nr.DeploymentCurrent + s.plan.DeploymentUpdates = nr.DeploymentUpdates + // Add all the allocs to stop for _, e := range r.Stop { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNotNeeded, "", "") @@ -375,6 +390,11 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist // track node filtering, to only report an error if all nodes have been filtered var filteredMetrics map[string]*structs.AllocMetric + var deploymentID string + if s.deployment != nil && s.deployment.Active() { + deploymentID = s.deployment.ID + } + nodes := make([]*structs.Node, 1) for _, missing := range place { tgName := missing.TaskGroup.Name @@ -492,6 +512,7 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, NodeName: option.Node.Name, + DeploymentID: deploymentID, TaskResources: resources.OldTaskResources(), AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, From 0e6e5ef8d18c0a6a63af226e940017529b39a70b Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 6 Aug 2025 17:08:52 +0200 Subject: [PATCH 02/13] scheduler: handle deployment completeness in the node reconciler (#26445) This PR introduces marking deployments as complete if there are no remaining placements to be made for a given task group. --- scheduler/reconciler/reconcile_node.go | 91 ++++++++++++++++++--- scheduler/reconciler/reconcile_node_test.go | 20 ++--- 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 255895afc..4d0cd6d53 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,7 +5,6 @@ package reconciler import ( "fmt" - "slices" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -55,13 +54,17 @@ func (nr *NodeReconciler) Compute( required := materializeSystemTaskGroups(job) result := new(NodeReconcileResult) + deploymentComplete := true for nodeID, allocs := range nodeAllocs { - diff := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, + diff, deploymentCompleteForNode := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) + deploymentComplete = deploymentComplete && deploymentCompleteForNode result.Append(diff) } + nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) + return result } @@ -84,7 +87,7 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( liveAllocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) *NodeReconcileResult { +) (*NodeReconcileResult, bool) { result := new(NodeReconcileResult) // cancel deployments that aren't needed anymore @@ -279,6 +282,10 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( }) } + // as we iterate over require groups, we'll keep track of whether the deployment + // is complete or not + deploymentComplete := false + // Scan the required groups for name, tg := range required { @@ -363,17 +370,12 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( continue } - // allocs that are in the Place, Update and Migrate buckets are the ones we - // consider for deployment - allocsForDeployment := []*structs.Allocation{} - for _, r := range slices.Concat(result.Place, result.Update, result.Migrate) { - allocsForDeployment = append(allocsForDeployment, r.Alloc) - } nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) } + deploymentComplete = nr.isDeploymentComplete(tg.Name, result) } - return result + return result, deploymentComplete } func (nr *NodeReconciler) createDeployment(job *structs.Job, @@ -419,6 +421,75 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate } +func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool { + complete := len(buckets.Place)+len(buckets.Migrate) == 0 // && !requiresCanaries // TODO: additional condition for canaries + + if !complete || nr.DeploymentCurrent == nil { + return false + } + + // ensure everything is healthy + if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok { + if dstate.HealthyAllocs < max(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs + (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries + complete = false + } + } + + return complete +} + +func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate { + statusUpdates := []*structs.DeploymentStatusUpdate{} + + if nr.DeploymentCurrent != nil { + + // Mark the deployment as complete if possible + if deploymentComplete { + if job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if nr.DeploymentCurrent.Status != structs.DeploymentStatusUnblocking && + nr.DeploymentCurrent.Status != structs.DeploymentStatusSuccessful { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + }) + } + } + + // Mark the deployment as pending since its state is now computed. + if nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, + }) + } + } + + // Set the description of a created deployment + if d := nr.DeploymentCurrent; d != nil { + if d.RequiresPromotion() { + if d.HasAutoPromote() { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion + } else { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + } + } + } + + return statusUpdates +} + // materializeSystemTaskGroups is used to materialize all the task groups // a system or sysbatch job requires. func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index 4c17bb219..450d9a6c3 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -73,7 +73,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -96,7 +96,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -158,7 +158,7 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, nil, tainted, required, allocsForNode, terminal, true) @@ -217,7 +217,7 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { terminal := structs.TerminalByNodeByName{} nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) @@ -287,7 +287,7 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, ineligible, tainted, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -344,7 +344,7 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -396,7 +396,7 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -522,7 +522,7 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { } nr := NewNodeReconciler(nil) - got := nr.diffSystemAllocsForNode( + got, _ := nr.diffSystemAllocsForNode( job, tc.node.ID, eligibleNodes, nil, taintedNodes, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -715,8 +715,8 @@ func TestNodeDeployments(t *testing.T) { }, }, false, - "", - "", + structs.DeploymentStatusSuccessful, + structs.DeploymentStatusDescriptionSuccessful, }, { "existing running deployment for a stopped job should be cancelled", From 3d373c9a6af9b1225bd8e4c164b8a060b09ab56d Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 22 Aug 2025 15:02:40 +0200 Subject: [PATCH 03/13] scheduler: support canary deployments for system jobs (#26499) This changeset introduces canary deployments for system jobs. Canaries work a little different for system jobs than for service jobs. The integer in the update block of a task group is interpreted as a percentage of eligible nodes that this task group update should be deployed to (rounded up to the nearest integer, so, e.g., for 5 eligible nodes and canary value set to 50, we will deploy to 3 nodes). In contrast to service jobs, system job canaries are not tracked, i.e., the scheduler doesn't need to know which allocations are canaries and which are not, since any node can only run one system job. Canary deployments are marked for promotion and if promoted, the scheduler simply performs an update as usual, replacing allocations belonging to a previous job version, and leaving new ones intact. --- scheduler/reconciler/reconcile_node.go | 211 ++++++++----- .../reconciler/reconcile_node_prop_test.go | 20 +- scheduler/reconciler/reconcile_node_test.go | 277 +++++++++++++++++- 3 files changed, 412 insertions(+), 96 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 4d0cd6d53..c0542b552 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,6 +5,9 @@ package reconciler import ( "fmt" + "maps" + "math" + "slices" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -53,12 +56,17 @@ func (nr *NodeReconciler) Compute( // Create the required task groups. required := materializeSystemTaskGroups(job) + // Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary + // percentage of eligible nodes, so we create a mapping of task group name + // to a list of nodes that canaries should be placed on. + canaryNodes, canariesPerTG := computeCanaryNodes(required, eligibleNodes) + result := new(NodeReconcileResult) deploymentComplete := true for nodeID, allocs := range nodeAllocs { - diff, deploymentCompleteForNode := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, - notReadyNodes, taintedNodes, required, allocs, terminal, - serverSupportsDisconnectedClients) + diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes, + notReadyNodes, taintedNodes, canaryNodes[nodeID], canariesPerTG, required, + allocs, terminal, serverSupportsDisconnectedClients) deploymentComplete = deploymentComplete && deploymentCompleteForNode result.Append(diff) } @@ -68,21 +76,63 @@ func (nr *NodeReconciler) Compute( return result } -// diffSystemAllocsForNode is used to do a set difference between the target allocations -// and the existing allocations for a particular node. This returns 8 sets of results, -// the list of named task groups that need to be placed (no existing allocation), the -// allocations that need to be updated (job definition is newer), allocs that -// need to be migrated (node is draining), the allocs that need to be evicted -// (no longer required), those that should be ignored, those that are lost -// that need to be replaced (running on a lost node), those that are running on -// a disconnected node but may resume, and those that may still be running on -// a node that has resumed reconnected. -func (nr *NodeReconciler) diffSystemAllocsForNode( +// computeCanaryNodes is a helper function that, given required task groups and +// eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates +// which TGs this node is a canary for, and a map[TG] -> int to indicate how +// many total canaries are to be placed for a TG. +func computeCanaryNodes(required map[string]*structs.TaskGroup, + eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) { + + canaryNodes := map[string]map[string]bool{} + eligibleNodesList := slices.Collect(maps.Values(eligibleNodes)) + canariesPerTG := map[string]int{} + + for _, tg := range required { + if tg.Update.IsEmpty() || tg.Update.Canary == 0 { + continue + } + + // round up to the nearest integer + numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100)) + canariesPerTG[tg.Name] = numberOfCanaryNodes + + for i, n := range eligibleNodesList { + canaryNodes[n.ID] = map[string]bool{} + if i > numberOfCanaryNodes-1 { + break + } + + canaryNodes[n.ID][tg.Name] = true + } + } + + return canaryNodes, canariesPerTG +} + +// computeForNode is used to do a set difference between the target +// allocations and the existing allocations for a particular node. This returns +// 8 sets of results: +// 1. the list of named task groups that need to be placed (no existing +// allocation), +// 2. the allocations that need to be updated (job definition is newer), +// 3. allocs that need to be migrated (node is draining), +// 4. allocs that need to be evicted (no longer required), +// 5. those that should be ignored, +// 6. those that are lost that need to be replaced (running on a lost node), +// 7. those that are running on a disconnected node but may resume, and +// 8. those that may still be running on a node that has resumed reconnected. +// +// This method mutates the NodeReconciler fields, and returns a new +// NodeReconcilerResult object and a boolean to indicate wither the deployment +// is complete or not. +func (nr *NodeReconciler) computeForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down (by node id) + canaryNode map[string]bool, // indicates whether this node is a canary node for tg + canariesPerTG map[string]int, // indicates how many canary placements we expect per tg required map[string]*structs.TaskGroup, // set of allocations that must exist liveAllocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) @@ -91,29 +141,24 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( result := new(NodeReconcileResult) // cancel deployments that aren't needed anymore - // TODO: old deployment is only used when checking for canaries var deploymentUpdates []*structs.DeploymentStatusUpdate - _, nr.DeploymentCurrent, deploymentUpdates = cancelUnneededDeployments(job, nr.DeploymentCurrent) + nr.DeploymentOld, nr.DeploymentCurrent, deploymentUpdates = cancelUnneededDeployments(job, nr.DeploymentCurrent) nr.DeploymentUpdates = append(nr.DeploymentUpdates, deploymentUpdates...) - /* - // TODO: the failed and paused fields are only used for dealing with canary - // placements and their respective deployments - // - // set deployment paused and failed, if we currently have a deployment - var deploymentPaused, deploymentFailed bool - if nr.DeploymentCurrent != nil { - // deployment is paused when it's manually paused by a user, or if the - // deployment is pending or initializing, which are the initial states - // for multi-region job deployments. - deploymentPaused = nr.DeploymentCurrent.Status == structs.DeploymentStatusPaused || - nr.DeploymentCurrent.Status == structs.DeploymentStatusPending || - nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing - deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed - } - // TODO: will be needed for canaries - deploymentPlaceReady := !deploymentPaused && !deploymentFailed && !isCanarying - */ + // set deployment paused and failed, if we currently have a deployment + var deploymentPaused, deploymentFailed bool + if nr.DeploymentCurrent != nil { + // deployment is paused when it's manually paused by a user, or if the + // deployment is pending or initializing, which are the initial states + // for multi-region job deployments. + deploymentPaused = nr.DeploymentCurrent.Status == structs.DeploymentStatusPaused || + nr.DeploymentCurrent.Status == structs.DeploymentStatusPending || + nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing + deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed + } + + // Track desired total placements across all loops + var desiredTotal int // Scan the existing updates existing := make(map[string]struct{}) // set of alloc names @@ -265,11 +310,22 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( // If the definition is updated we need to update if job.JobModifyIndex != alloc.Job.JobModifyIndex { - result.Update = append(result.Update, AllocTuple{ - Name: name, - TaskGroup: tg, - Alloc: alloc, - }) + if canariesPerTG[tg.Name] > 0 { + if canaryNode[tg.Name] { + result.Update = append(result.Update, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + } + } else { + result.Update = append(result.Update, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + desiredTotal += 1 + } continue } @@ -289,6 +345,30 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( // Scan the required groups for name, tg := range required { + // populate deployment state for this task group + var dstate = new(structs.DeploymentState) + var existingDeployment bool + if nr.DeploymentCurrent != nil { + dstate, existingDeployment = nr.DeploymentCurrent.TaskGroups[tg.Name] + } + + if !existingDeployment { + dstate = &structs.DeploymentState{} + if !tg.Update.IsEmpty() { + dstate.AutoRevert = tg.Update.AutoRevert + dstate.AutoPromote = tg.Update.AutoPromote + dstate.ProgressDeadline = tg.Update.ProgressDeadline + } + } + + isCanarying := canariesPerTG[tg.Name] > 0 + if isCanarying { + dstate.DesiredTotal = canariesPerTG[tg.Name] + dstate.DesiredCanaries = canariesPerTG[tg.Name] + } else { + dstate.DesiredTotal = desiredTotal + } + // Check for an existing allocation if _, ok := existing[name]; !ok { @@ -343,54 +423,45 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( allocTuple.Alloc = &structs.Allocation{NodeID: nodeID} } - result.Place = append(result.Place, allocTuple) - - // populate deployment state for this task group - var dstate = new(structs.DeploymentState) - var existingDeployment bool - if nr.DeploymentCurrent != nil { - dstate, existingDeployment = nr.DeploymentCurrent.TaskGroups[tg.Name] - } - - if !existingDeployment && dstate != nil { - if !tg.Update.IsEmpty() { - dstate.AutoRevert = tg.Update.AutoRevert - dstate.AutoPromote = tg.Update.AutoPromote - dstate.ProgressDeadline = tg.Update.ProgressDeadline + if isCanarying { + if canaryNode[tg.Name] { + result.Place = append(result.Place, allocTuple) + dstate.DesiredCanaries += 1 } - dstate.DesiredTotal += len(result.Place) + } else { + result.Place = append(result.Place, allocTuple) + dstate.DesiredTotal += 1 } - - if dstate == nil { - dstate = new(structs.DeploymentState) - } - - // in this case there's nothing to do - if existingDeployment || tg.Update.IsEmpty() || dstate.DesiredTotal == 0 { - continue - } - - nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) } + + deploymentPlaceReady := !deploymentPaused && !deploymentFailed + + // in this case there's nothing to do + if existingDeployment || tg.Update.IsEmpty() || (dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) || !deploymentPlaceReady { + continue + } + + nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) + deploymentComplete = nr.isDeploymentComplete(tg.Name, result) } return result, deploymentComplete } -func (nr *NodeReconciler) createDeployment(job *structs.Job, - tg *structs.TaskGroup, dstate *structs.DeploymentState, updates int, - allocs []*structs.Allocation) { +func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup, + dstate *structs.DeploymentState, updates int, allocs []*structs.Allocation) { + // programming error if dstate == nil { - dstate = &structs.DeploymentState{} + return } updatingSpec := updates != 0 hadRunning := false for _, alloc := range allocs { - if alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { + if alloc.Job.ID == job.ID && alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { hadRunning = true break } @@ -422,7 +493,7 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, } func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool { - complete := len(buckets.Place)+len(buckets.Migrate) == 0 // && !requiresCanaries // TODO: additional condition for canaries + complete := len(buckets.Place)+len(buckets.Migrate) == 0 if !complete || nr.DeploymentCurrent == nil { return false diff --git a/scheduler/reconciler/reconcile_node_prop_test.go b/scheduler/reconciler/reconcile_node_prop_test.go index 5774a817a..ca94e17db 100644 --- a/scheduler/reconciler/reconcile_node_prop_test.go +++ b/scheduler/reconciler/reconcile_node_prop_test.go @@ -39,18 +39,16 @@ func TestNodeReconciler_PropTest(t *testing.T) { // NodeReconcileResults doesn't split results by task group, so split // them up so we can check them separately recordResult := func(subresult []AllocTuple, label string) { - if subresult != nil { - for _, alloc := range subresult { - var tgName string - if alloc.TaskGroup != nil { - tgName = alloc.TaskGroup.Name - } else if alloc.Alloc != nil { - tgName = alloc.Alloc.TaskGroup - } else { - t.Fatal("one of task group or alloc must always be non-nil") - } - perTaskGroup[tgName][label]++ + for _, alloc := range subresult { + var tgName string + if alloc.TaskGroup != nil { + tgName = alloc.TaskGroup.Name + } else if alloc.Alloc != nil { + tgName = alloc.Alloc.TaskGroup + } else { + t.Fatal("one of task group or alloc must always be non-nil") } + perTaskGroup[tgName][label]++ } } diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index 450d9a6c3..faab4ba42 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -73,7 +73,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -96,7 +96,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -158,9 +158,9 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode( + diff, _ := nr.computeForNode( job, tc.nodeID, eligible, nil, - tainted, required, allocsForNode, terminal, true) + tainted, nil, nil, required, allocsForNode, terminal, true) assertDiffCount(t, tc.expected, diff) }) @@ -217,8 +217,8 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { terminal := structs.TerminalByNodeByName{} nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode( - job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) + diff, _ := nr.computeForNode( + job, node.ID, eligible, nil, tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) if len(diff.Update) > 0 { @@ -287,8 +287,8 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode( - job, tc.nodeID, eligible, ineligible, tainted, + diff, _ := nr.computeForNode( + job, tc.nodeID, eligible, ineligible, tainted, nil, nil, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, diff) @@ -344,9 +344,9 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode( + diff, _ := nr.computeForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, - tainted, required, allocs, terminal, true) + tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{migrate: 1, ignore: 1}, diff) if len(diff.Migrate) > 0 { @@ -396,9 +396,9 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff, _ := nr.diffSystemAllocsForNode( + diff, _ := nr.computeForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, - tainted, required, allocs, terminal, true) + tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{lost: 2}, diff) if len(diff.Migrate) > 0 { @@ -522,8 +522,8 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { } nr := NewNodeReconciler(nil) - got, _ := nr.diffSystemAllocsForNode( - job, tc.node.ID, eligibleNodes, nil, taintedNodes, + got, _ := nr.computeForNode( + job, tc.node.ID, eligibleNodes, nil, taintedNodes, nil, nil, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, got) @@ -716,7 +716,7 @@ func TestNodeDeployments(t *testing.T) { }, false, structs.DeploymentStatusSuccessful, - structs.DeploymentStatusDescriptionSuccessful, + "", }, { "existing running deployment for a stopped job should be cancelled", @@ -758,3 +758,250 @@ func TestNodeDeployments(t *testing.T) { }) } } + +func Test_computeCanaryNodes(t *testing.T) { + ci.Parallel(t) + + // generate an odd number of nodes + fiveEligibleNodes := map[string]*structs.Node{} + for range 5 { + nodeID := uuid.Generate() + node := mock.Node() + node.ID = nodeID + fiveEligibleNodes[nodeID] = node + } + + // generate an even number of nodes + fourEligibleNodes := map[string]*structs.Node{} + for range 4 { + nodeID := uuid.Generate() + node := mock.Node() + node.ID = nodeID + fourEligibleNodes[nodeID] = node + } + + testCases := []struct { + name string + nodes map[string]*structs.Node + required map[string]*structs.TaskGroup + expectedCanaryNodes map[string]int // number of nodes per tg + }{ + { + name: "no required task groups", + nodes: fourEligibleNodes, + required: nil, + expectedCanaryNodes: map[string]int{}, + }, + { + name: "one task group with no update strategy", + nodes: fourEligibleNodes, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + }}, + expectedCanaryNodes: map[string]int{}, + }, + { + name: "one task group with 33% canary deployment", + nodes: fourEligibleNodes, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 33, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + expectedCanaryNodes: map[string]int{ + "foo": 2, // we always round up + }, + }, + { + name: "one task group with 100% canary deployment, four nodes", + nodes: fourEligibleNodes, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 100, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + expectedCanaryNodes: map[string]int{ + "foo": 4, + }, + }, + { + name: "one task group with 50% canary deployment, even nodes", + nodes: fourEligibleNodes, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 50, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + expectedCanaryNodes: map[string]int{ + "foo": 2, + }, + }, + { + name: "two task groups: one with 50% canary deploy, second one with 2% canary deploy", + nodes: fiveEligibleNodes, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 50, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + "bar": { + Name: "bar", + Update: &structs.UpdateStrategy{ + Canary: 2, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + expectedCanaryNodes: map[string]int{ + "foo": 3, // we always round up + "bar": 1, // we always round up + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, canariesPerTG := computeCanaryNodes(tc.required, tc.nodes) + must.Eq(t, tc.expectedCanaryNodes, canariesPerTG) + }) + } +} + +// Tests the reconciler creates new canaries when the job changes +func TestNodeReconciler_NewCanaries(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + Canary: 20, // deploy to 20% of eligible nodes + MaxParallel: 1, // otherwise the update strategy will be considered nil + } + job.JobModifyIndex = 1 + + // Create 10 nodes + nodes := []*structs.Node{} + for i := range 10 { + node := mock.Node() + node.ID = fmt.Sprintf("node_%d", i) + node.Name = fmt.Sprintf("node_%d", i) + nodes = append(nodes, node) + } + + allocs := []*structs.Allocation{} + for _, n := range nodes { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + + allocs = append(allocs, a) + } + + // bump the job version up + newJob := job.Copy() + newJob.Version = job.Version + 1 + newJob.JobModifyIndex = job.JobModifyIndex + 1 + + // bump the version and add a new TG + newJobWithNewTaskGroup := newJob.Copy() + newJobWithNewTaskGroup.Version = newJob.Version + 1 + newJobWithNewTaskGroup.JobModifyIndex = newJob.JobModifyIndex + 1 + tg := newJob.TaskGroups[0].Copy() + tg.Name = "other" + tg.Update = &structs.UpdateStrategy{MaxParallel: 1} + newJobWithNewTaskGroup.TaskGroups = append(newJobWithNewTaskGroup.TaskGroups, tg) + + // new job with no previous allocs and no canary update strategy + jobWithNoUpdates := mock.SystemJob() + jobWithNoUpdates.Name = "i-am-a-brand-new-job" + jobWithNoUpdates.TaskGroups[0].Name = "i-am-a-brand-new-tg" + jobWithNoUpdates.TaskGroups[0].Update = structs.DefaultUpdateStrategy + + testCases := []struct { + name string + job *structs.Job + existingDeployment *structs.Deployment + expectedDeployment *structs.Deployment + expectedPlaceCount int + expectedUpdateCount int + }{ + { + name: "new job version", + job: newJob, + existingDeployment: nil, + expectedDeployment: &structs.Deployment{ + StatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + TaskGroups: map[string]*structs.DeploymentState{ + newJob.TaskGroups[0].Name: { + DesiredCanaries: 2, + DesiredTotal: 2, + }}, + }, + expectedPlaceCount: 0, + expectedUpdateCount: 2, + }, + { + name: "new job version with a new TG (no existing allocs, no canaries)", + job: newJobWithNewTaskGroup, + existingDeployment: nil, + expectedDeployment: &structs.Deployment{ + StatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + TaskGroups: map[string]*structs.DeploymentState{ + newJobWithNewTaskGroup.TaskGroups[0].Name: { + DesiredCanaries: 2, + DesiredTotal: 2, + }, + newJobWithNewTaskGroup.TaskGroups[1].Name: { + DesiredCanaries: 0, + DesiredTotal: 10, + }, + }, + }, + expectedPlaceCount: 10, + expectedUpdateCount: 2, + }, + { + name: "brand new job with no update block", + job: jobWithNoUpdates, + existingDeployment: nil, + expectedDeployment: &structs.Deployment{ + StatusDescription: structs.DeploymentStatusDescriptionRunning, + TaskGroups: map[string]*structs.DeploymentState{ + jobWithNoUpdates.TaskGroups[0].Name: { + DesiredTotal: 10, + }, + }, + }, + expectedPlaceCount: 10, + expectedUpdateCount: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reconciler := NewNodeReconciler(tc.existingDeployment) + r := reconciler.Compute(tc.job, nodes, nil, nil, allocs, nil, false) + must.NotNil(t, reconciler.DeploymentCurrent) + must.Eq(t, tc.expectedPlaceCount, len(r.Place)) + must.Eq(t, tc.expectedUpdateCount, len(r.Update)) + }) + } +} From 7c4faf92278ff5534bf2bd4281609864ce5b25d7 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Mon, 25 Aug 2025 15:29:13 +0200 Subject: [PATCH 04/13] scheduler: monitor deployments correctly (#26605) Corrects two minor bugs that prevented proper deployment monitoring for systems jobs: populating the new deployment field of the system scheduler object, and correcting allocrunner health checks that were guarded not to run on system jobs. --- client/allocrunner/health_hook.go | 4 +++- client/allocrunner/health_hook_test.go | 21 ++++++++++++--------- scheduler/scheduler_system.go | 11 ++++++++--- 3 files changed, 23 insertions(+), 13 deletions(-) diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index 0944aa3f1..58ccd27db 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -86,7 +86,9 @@ func newAllocHealthWatcherHook( // Neither deployments nor migrations care about the health of // non-service jobs so never watch their health - if alloc.Job.Type != structs.JobTypeService { + switch alloc.Job.Type { + case structs.JobTypeService, structs.JobTypeSystem: + default: return noopAllocHealthWatcherHook{} } diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index ce1ae8c9f..ddcc9955e 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -396,26 +397,28 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { require.NoError(h.Postrun()) } -// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker. -func TestHealthHook_SystemNoop(t *testing.T) { +// TestHealthHook_System asserts that system jobs trigger hooks just like service jobs. +func TestHealthHook_System(t *testing.T) { ci.Parallel(t) alloc := mock.SystemAlloc() h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) - // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) - require.True(t, ok) + must.False(t, ok) - // Assert the noop impl does not implement any hooks + _, ok = h.(*allocHealthWatcherHook) + must.True(t, ok) + + // Assert the other hooks are implemented, too _, ok = h.(interfaces.RunnerPrerunHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.RunnerUpdateHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.RunnerPostrunHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.ShutdownHook) - require.False(t, ok) + must.True(t, ok) } // TestHealthHook_BatchNoop asserts that batch jobs return the noop tracker. diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 73b746b64..a233aed56 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -102,7 +102,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, structs.EvalStatusFailed, desc, - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } limit := maxSystemScheduleAttempts @@ -116,7 +116,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { if statusErr, ok := err.(*SetStatusError); ok { return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, statusErr.EvalStatus, err.Error(), - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } return err } @@ -124,7 +124,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { // Update the status to complete return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, structs.EvalStatusComplete, "", - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -286,6 +286,11 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.Deployment = nr.DeploymentCurrent s.plan.DeploymentUpdates = nr.DeploymentUpdates + // Update the stored deployment + if nr.DeploymentCurrent != nil { + s.deployment = nr.DeploymentCurrent + } + // Add all the allocs to stop for _, e := range r.Stop { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNotNeeded, "", "") From 5c444b892294726f33bcd1b7723a3b9bc1f69434 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 27 Aug 2025 03:38:18 -0400 Subject: [PATCH 05/13] system scheduler: account for per task group max_parallel (#26635) The system scheduler's `evictAndPlace` function does not account for per task group `max_parallel`, as needed to support system deployments. Push the rolling upgrade strategy check into this function and return that the deployment was limited if any one of the task groups is limited. --- scheduler/scheduler_system.go | 52 ++++++----- scheduler/scheduler_system_test.go | 140 ++++++++++++++++++----------- 2 files changed, 121 insertions(+), 71 deletions(-) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index a233aed56..b25aabe38 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -329,14 +329,9 @@ func (s *SystemScheduler) computeJobAllocs() error { DesiredTGUpdates: desiredUpdates(r, inplaceUpdates, destructiveUpdates), } - // Check if a rolling upgrade strategy is being used - limit := len(r.Update) - if !s.job.Stopped() && s.job.Update.Rolling() { - limit = s.job.Update.MaxParallel - } - - // Treat non in-place updates as an eviction and new placement. - s.limitReached = evictAndPlace(s.ctx, r, r.Update, sstructs.StatusAllocUpdating, &limit) + // Treat non in-place updates as an eviction and new placement, which will + // be limited by max_parallel + s.limitReached = evictAndPlace(s.ctx, s.job, r, sstructs.StatusAllocUpdating) // Nothing remaining to do if placement is not required if len(r.Place) == 0 { @@ -605,19 +600,34 @@ func (s *SystemScheduler) canHandle(trigger string) bool { } // evictAndPlace is used to mark allocations for evicts and add them to the -// placement queue. evictAndPlace modifies both the diffResult and the -// limit. It returns true if the limit has been reached. -func evictAndPlace(ctx feasible.Context, diff *reconciler.NodeReconcileResult, allocs []reconciler.AllocTuple, desc string, limit *int) bool { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") - diff.Place = append(diff.Place, a) +// placement queue. evictAndPlace modifies the diffResult. It returns true if +// the limit has been reached for any task group. +func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool { + + limits := map[string]int{} // per task group limits + if !job.Stopped() { + jobLimit := len(diff.Update) + if job.Update.MaxParallel > 0 { + jobLimit = job.Update.MaxParallel + } + for _, tg := range job.TaskGroups { + if tg.Update != nil && tg.Update.MaxParallel > 0 { + limits[tg.Name] = tg.Update.MaxParallel + } else { + limits[tg.Name] = jobLimit + } + } } - if n <= *limit { - *limit -= n - return false + + limited := false + for _, a := range diff.Update { + if limit := limits[a.Alloc.TaskGroup]; limit > 0 { + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") + diff.Place = append(diff.Place, a) + limits[a.Alloc.TaskGroup]-- + } else { + limited = true + } } - *limit = 0 - return true + return limited } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 757c6a4ea..908927c49 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -3292,61 +3292,101 @@ func TestSystemSched_CSITopology(t *testing.T) { } -func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { +func TestEvictAndPlace(t *testing.T) { ci.Parallel(t) - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + testCases := []struct { + name string + allocsPerTG map[string]int + maxParallelPerTG map[string]int + jobMaxParallel int + + expectLimited bool + expectPlace int + }{ + { + name: "one group limit less than allocs", + allocsPerTG: map[string]int{"a": 4}, + jobMaxParallel: 2, + expectLimited: true, + expectPlace: 2, + }, + { + name: "one group limit equals allocs", + allocsPerTG: map[string]int{"a": 4}, + jobMaxParallel: 4, + expectLimited: false, + expectPlace: 4, + }, + { + name: "one group limit greater than allocs", + allocsPerTG: map[string]int{"a": 2}, + jobMaxParallel: 4, + expectLimited: false, + expectPlace: 2, + }, + { + name: "group limit supercedes job limit", + allocsPerTG: map[string]int{"a": 4}, + maxParallelPerTG: map[string]int{"a": 2}, + jobMaxParallel: 1, + expectLimited: true, + expectPlace: 2, + }, + { + name: "two groups limit less than allocs on one", + allocsPerTG: map[string]int{"a": 4, "b": 4}, + maxParallelPerTG: map[string]int{"a": 2, "b": 4}, + jobMaxParallel: 0, + expectLimited: true, + expectPlace: 6, + }, + { + name: "two groups neither limited", + allocsPerTG: map[string]int{"a": 2, "b": 2}, + maxParallelPerTG: map[string]int{"a": 4, "b": 4}, + jobMaxParallel: 0, + expectLimited: false, + expectPlace: 4, + }, + { + name: "two groups one uses job limit", + allocsPerTG: map[string]int{"a": 4, "b": 4}, + maxParallelPerTG: map[string]int{"a": 4}, + jobMaxParallel: 2, + expectLimited: true, + expectPlace: 6, + }, } - diff := &reconciler.NodeReconcileResult{} + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + job := &structs.Job{Update: structs.UpdateStrategy{ + MaxParallel: tc.jobMaxParallel, Stagger: time.Second}} - limit := 2 - must.True(t, evictAndPlace(ctx, diff, allocs, "", &limit), - must.Sprintf("evictAndReplace() should have returned true")) - must.Zero(t, limit, - must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 2, diff.Place, - must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) -} + allocs := []reconciler.AllocTuple{} + for tg, count := range tc.allocsPerTG { + job.TaskGroups = append(job.TaskGroups, &structs.TaskGroup{Name: tg}) + for range count { + allocs = append(allocs, reconciler.AllocTuple{ + Alloc: &structs.Allocation{ID: uuid.Generate(), TaskGroup: tg}}) + } + } + for tg, max := range tc.maxParallelPerTG { + for _, jtg := range job.TaskGroups { + if jtg.Name == tg { + jtg.Update = &structs.UpdateStrategy{ + MaxParallel: max, Stagger: time.Second} + } + } + } + diff := &reconciler.NodeReconcileResult{Update: allocs} + _, ctx := feasible.MockContext(t) -func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { - ci.Parallel(t) - - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + must.Eq(t, tc.expectLimited, evictAndPlace(ctx, job, diff, ""), + must.Sprintf("limited")) + must.Len(t, tc.expectPlace, diff.Place, must.Sprintf( + "evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) + }) } - diff := &reconciler.NodeReconcileResult{} - limit := 4 - must.False(t, evictAndPlace(ctx, diff, allocs, "", &limit), - must.Sprint("evictAndReplace() should have returned false")) - must.Zero(t, limit, must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 4, diff.Place, - must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) -} - -func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { - ci.Parallel(t) - - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - } - diff := &reconciler.NodeReconcileResult{} - - limit := 6 - must.False(t, evictAndPlace(ctx, diff, allocs, "", &limit)) - must.Eq(t, 2, limit, must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 4, diff.Place, must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) } From ca96de15d07952e43ca3273242d42837d8998b38 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 27 Aug 2025 09:38:35 +0200 Subject: [PATCH 06/13] scheduler: correct handling of MaxParallel and obsoleting Stagger in the system scheduler (#26631) --- nomad/structs/structs.go | 3 +++ scheduler/reconciler/reconcile_node.go | 12 ++++++++++-- scheduler/scheduler_system.go | 2 +- 3 files changed, 14 insertions(+), 3 deletions(-) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0ef49b081..04b7e2887 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5322,6 +5322,9 @@ var ( type UpdateStrategy struct { // Stagger is used to determine the rate at which allocations are migrated // due to down or draining nodes. + // + // Deprecated: as of Nomad 1.11, this field is equivalent to MinHealthyTime + // and will be removed in future releases. Stagger time.Duration // MaxParallel is how many updates can be done in parallel diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index c0542b552..15ee989e9 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -441,7 +441,15 @@ func (nr *NodeReconciler) computeForNode( continue } - nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) + maxParallel := 1 + if !tg.Update.IsEmpty() { + maxParallel = tg.Update.MaxParallel + } + + // maxParallel of 0 means no deployments + if maxParallel != 0 { + nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) + } deploymentComplete = nr.isDeploymentComplete(tg.Name, result) } @@ -493,7 +501,7 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro } func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool { - complete := len(buckets.Place)+len(buckets.Migrate) == 0 + complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0 if !complete || nr.DeploymentCurrent == nil { return false diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index b25aabe38..967449216 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -191,7 +191,7 @@ func (s *SystemScheduler) process() (bool, error) { // If the limit of placements was reached we need to create an evaluation // to pickup from here after the stagger period. if s.limitReached && s.nextEval == nil { - s.nextEval = s.eval.NextRollingEval(s.job.Update.Stagger) + s.nextEval = s.eval.NextRollingEval(s.job.Update.MinHealthyTime) if err := s.planner.CreateEval(s.nextEval); err != nil { s.logger.Error("failed to make next eval for rolling update", "error", err) return false, err From de342ee48b052419f552285b6e32c64750f8cc51 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Thu, 28 Aug 2025 16:24:52 +0200 Subject: [PATCH 07/13] scheduler: correct dstate total/canary counts for system deployments (#26641) --- scheduler/reconciler/reconcile_node.go | 31 ++--- scheduler/reconciler/reconcile_node_test.go | 134 ++++++++++++-------- scheduler/scheduler_system.go | 8 ++ 3 files changed, 103 insertions(+), 70 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 15ee989e9..302967bcd 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -157,8 +157,11 @@ func (nr *NodeReconciler) computeForNode( deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed } - // Track desired total placements across all loops - var desiredTotal int + // Track desired total and desired canaries across all loops + desiredCanaries := map[string]int{} + + // Track whether we're during a canary update + isCanarying := map[string]bool{} // Scan the existing updates existing := make(map[string]struct{}) // set of alloc names @@ -311,12 +314,15 @@ func (nr *NodeReconciler) computeForNode( // If the definition is updated we need to update if job.JobModifyIndex != alloc.Job.JobModifyIndex { if canariesPerTG[tg.Name] > 0 { + isCanarying[tg.Name] = true if canaryNode[tg.Name] { result.Update = append(result.Update, AllocTuple{ Name: name, TaskGroup: tg, Alloc: alloc, + Canary: true, }) + desiredCanaries[tg.Name] += 1 } } else { result.Update = append(result.Update, AllocTuple{ @@ -324,7 +330,6 @@ func (nr *NodeReconciler) computeForNode( TaskGroup: tg, Alloc: alloc, }) - desiredTotal += 1 } continue } @@ -361,12 +366,9 @@ func (nr *NodeReconciler) computeForNode( } } - isCanarying := canariesPerTG[tg.Name] > 0 - if isCanarying { - dstate.DesiredTotal = canariesPerTG[tg.Name] - dstate.DesiredCanaries = canariesPerTG[tg.Name] - } else { - dstate.DesiredTotal = desiredTotal + dstate.DesiredTotal = len(eligibleNodes) + if isCanarying[tg.Name] { + dstate.DesiredCanaries += desiredCanaries[tg.Name] } // Check for an existing allocation @@ -423,15 +425,7 @@ func (nr *NodeReconciler) computeForNode( allocTuple.Alloc = &structs.Allocation{NodeID: nodeID} } - if isCanarying { - if canaryNode[tg.Name] { - result.Place = append(result.Place, allocTuple) - dstate.DesiredCanaries += 1 - } - } else { - result.Place = append(result.Place, allocTuple) - dstate.DesiredTotal += 1 - } + result.Place = append(result.Place, allocTuple) } deploymentPlaceReady := !deploymentPaused && !deploymentFailed @@ -591,6 +585,7 @@ type AllocTuple struct { Name string TaskGroup *structs.TaskGroup Alloc *structs.Allocation + Canary bool } // NodeReconcileResult is used to return the sets that result from the diff diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index faab4ba42..c61e3fa87 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -916,15 +916,15 @@ func TestNodeReconciler_NewCanaries(t *testing.T) { } // bump the job version up - newJob := job.Copy() - newJob.Version = job.Version + 1 - newJob.JobModifyIndex = job.JobModifyIndex + 1 + newJobVersion := job.Copy() + newJobVersion.Version = job.Version + 1 + newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 // bump the version and add a new TG - newJobWithNewTaskGroup := newJob.Copy() - newJobWithNewTaskGroup.Version = newJob.Version + 1 - newJobWithNewTaskGroup.JobModifyIndex = newJob.JobModifyIndex + 1 - tg := newJob.TaskGroups[0].Copy() + newJobWithNewTaskGroup := newJobVersion.Copy() + newJobWithNewTaskGroup.Version = newJobVersion.Version + 1 + newJobWithNewTaskGroup.JobModifyIndex = newJobVersion.JobModifyIndex + 1 + tg := newJobVersion.TaskGroups[0].Copy() tg.Name = "other" tg.Update = &structs.UpdateStrategy{MaxParallel: 1} newJobWithNewTaskGroup.TaskGroups = append(newJobWithNewTaskGroup.TaskGroups, tg) @@ -935,73 +935,103 @@ func TestNodeReconciler_NewCanaries(t *testing.T) { jobWithNoUpdates.TaskGroups[0].Name = "i-am-a-brand-new-tg" jobWithNoUpdates.TaskGroups[0].Update = structs.DefaultUpdateStrategy + // additional test to make sure there are no canaries being placed for v0 + // jobs + freshJob := mock.SystemJob() + freshJob.TaskGroups[0].Update = structs.DefaultUpdateStrategy + freshNodes := []*structs.Node{} + for range 2 { + node := mock.Node() + freshNodes = append(freshNodes, node) + } + testCases := []struct { - name string - job *structs.Job - existingDeployment *structs.Deployment - expectedDeployment *structs.Deployment - expectedPlaceCount int - expectedUpdateCount int + name string + job *structs.Job + nodes []*structs.Node + existingDeployment *structs.Deployment + expectedDesiredCanaries map[string]int + expectedDesiredTotal map[string]int + expectedDeploymentStatusDescription string + expectedPlaceCount int + expectedUpdateCount int }{ { - name: "new job version", - job: newJob, - existingDeployment: nil, - expectedDeployment: &structs.Deployment{ - StatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, - TaskGroups: map[string]*structs.DeploymentState{ - newJob.TaskGroups[0].Name: { - DesiredCanaries: 2, - DesiredTotal: 2, - }}, - }, - expectedPlaceCount: 0, - expectedUpdateCount: 2, + name: "new job version", + job: newJobVersion, + nodes: nodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{newJobVersion.TaskGroups[0].Name: 2}, + expectedDesiredTotal: map[string]int{newJobVersion.TaskGroups[0].Name: 10}, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + expectedPlaceCount: 0, + expectedUpdateCount: 2, }, { name: "new job version with a new TG (no existing allocs, no canaries)", job: newJobWithNewTaskGroup, + nodes: nodes, existingDeployment: nil, - expectedDeployment: &structs.Deployment{ - StatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, - TaskGroups: map[string]*structs.DeploymentState{ - newJobWithNewTaskGroup.TaskGroups[0].Name: { - DesiredCanaries: 2, - DesiredTotal: 2, - }, - newJobWithNewTaskGroup.TaskGroups[1].Name: { - DesiredCanaries: 0, - DesiredTotal: 10, - }, - }, + expectedDesiredCanaries: map[string]int{ + newJobWithNewTaskGroup.TaskGroups[0].Name: 2, + newJobWithNewTaskGroup.TaskGroups[1].Name: 0, }, - expectedPlaceCount: 10, - expectedUpdateCount: 2, + expectedDesiredTotal: map[string]int{ + newJobWithNewTaskGroup.TaskGroups[0].Name: 10, + newJobWithNewTaskGroup.TaskGroups[1].Name: 10, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + expectedPlaceCount: 10, + expectedUpdateCount: 2, }, { name: "brand new job with no update block", job: jobWithNoUpdates, + nodes: nodes, existingDeployment: nil, - expectedDeployment: &structs.Deployment{ - StatusDescription: structs.DeploymentStatusDescriptionRunning, - TaskGroups: map[string]*structs.DeploymentState{ - jobWithNoUpdates.TaskGroups[0].Name: { - DesiredTotal: 10, - }, - }, + expectedDesiredCanaries: map[string]int{ + jobWithNoUpdates.TaskGroups[0].Name: 0, }, - expectedPlaceCount: 10, - expectedUpdateCount: 0, + expectedDesiredTotal: map[string]int{ + jobWithNoUpdates.TaskGroups[0].Name: 10, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, + expectedPlaceCount: 10, + expectedUpdateCount: 0, + }, + { + name: "fresh job with no updates, empty nodes", + job: freshJob, + nodes: freshNodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{ + freshJob.TaskGroups[0].Name: 0, + }, + expectedDesiredTotal: map[string]int{ + freshJob.TaskGroups[0].Name: 2, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, + expectedPlaceCount: 2, + expectedUpdateCount: 0, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { reconciler := NewNodeReconciler(tc.existingDeployment) - r := reconciler.Compute(tc.job, nodes, nil, nil, allocs, nil, false) + r := reconciler.Compute(tc.job, tc.nodes, nil, nil, allocs, nil, false) must.NotNil(t, reconciler.DeploymentCurrent) - must.Eq(t, tc.expectedPlaceCount, len(r.Place)) - must.Eq(t, tc.expectedUpdateCount, len(r.Update)) + must.Eq(t, tc.expectedPlaceCount, len(r.Place), must.Sprint("incorrect amount of r.Place")) + must.Eq(t, tc.expectedUpdateCount, len(r.Update), must.Sprint("incorrect amount of r.Update")) + must.Eq(t, tc.expectedDeploymentStatusDescription, reconciler.DeploymentCurrent.StatusDescription) + for _, tg := range tc.job.TaskGroups { + must.Eq(t, tc.expectedDesiredCanaries[tg.Name], + reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredCanaries, + must.Sprintf("incorrect number of DesiredCanaries for %s", tg.Name)) + must.Eq(t, tc.expectedDesiredTotal[tg.Name], + reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredTotal, + must.Sprintf("incorrect number of DesiredTotal for %s", tg.Name)) + } }) } } diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 967449216..d8b8a3e1b 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -531,6 +531,14 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist alloc.PreviousAllocation = missing.Alloc.ID } + // If we are placing a canary, add the canary to the deployment state + // object and mark it as a canary. + if missing.Canary && s.deployment != nil { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Canary: true, + } + } + // If this placement involves preemption, set DesiredState to evict for those allocations if option.PreemptedAllocs != nil { var preemptedAllocIDs []string From 8b8e21dc0eefa438b141941adc1d8bae480f51e3 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Thu, 28 Aug 2025 17:29:13 +0200 Subject: [PATCH 08/13] scheduler: check if system job deploy is complete before other guards (#26651) --- scheduler/reconciler/reconcile_node.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 302967bcd..6f97de0a3 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -428,7 +428,9 @@ func (nr *NodeReconciler) computeForNode( result.Place = append(result.Place, allocTuple) } + // check if deployment is place ready or complete deploymentPlaceReady := !deploymentPaused && !deploymentFailed + deploymentComplete = nr.isDeploymentComplete(tg.Name, result) // in this case there's nothing to do if existingDeployment || tg.Update.IsEmpty() || (dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) || !deploymentPlaceReady { @@ -444,8 +446,6 @@ func (nr *NodeReconciler) computeForNode( if maxParallel != 0 { nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) } - - deploymentComplete = nr.isDeploymentComplete(tg.Name, result) } return result, deploymentComplete From 14e98a2420b5f48d44d922ea599721f4be5a84f8 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 3 Sep 2025 16:09:36 +0200 Subject: [PATCH 09/13] scheduler: fix promotions of system job canaries (#26652) This changeset adjusts the handling of allocations placement when we're promoting a deployment, and it corrects the behavior of isDeploymentComplete, which previously would never mark promoted deployment as complete. --- scheduler/reconciler/reconcile_node.go | 22 ++++--- scheduler/reconciler/reconcile_node_test.go | 71 +++++++++++++++++++++ 2 files changed, 85 insertions(+), 8 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 6f97de0a3..2a21f7096 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -183,6 +183,13 @@ func (nr *NodeReconciler) computeForNode( continue } + // populate deployment state for this task group if there is an existing + // deployment + var dstate = new(structs.DeploymentState) + if nr.DeploymentCurrent != nil { + dstate, _ = nr.DeploymentCurrent.TaskGroups[tg.Name] + } + supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) reconnect := false @@ -313,7 +320,7 @@ func (nr *NodeReconciler) computeForNode( // If the definition is updated we need to update if job.JobModifyIndex != alloc.Job.JobModifyIndex { - if canariesPerTG[tg.Name] > 0 { + if canariesPerTG[tg.Name] > 0 && dstate != nil && !dstate.Promoted { isCanarying[tg.Name] = true if canaryNode[tg.Name] { result.Update = append(result.Update, AllocTuple{ @@ -367,8 +374,8 @@ func (nr *NodeReconciler) computeForNode( } dstate.DesiredTotal = len(eligibleNodes) - if isCanarying[tg.Name] { - dstate.DesiredCanaries += desiredCanaries[tg.Name] + if isCanarying[tg.Name] && !dstate.Promoted { + dstate.DesiredCanaries = canariesPerTG[tg.Name] } // Check for an existing allocation @@ -430,7 +437,7 @@ func (nr *NodeReconciler) computeForNode( // check if deployment is place ready or complete deploymentPlaceReady := !deploymentPaused && !deploymentFailed - deploymentComplete = nr.isDeploymentComplete(tg.Name, result) + deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name]) // in this case there's nothing to do if existingDeployment || tg.Update.IsEmpty() || (dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) || !deploymentPlaceReady { @@ -494,17 +501,16 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate } -func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool { +func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool { complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0 - if !complete || nr.DeploymentCurrent == nil { + if !complete || nr.DeploymentCurrent == nil || isCanarying { return false } // ensure everything is healthy if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok { - if dstate.HealthyAllocs < max(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs - (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries + if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs complete = false } } diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index c61e3fa87..286f2a546 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -1035,3 +1035,74 @@ func TestNodeReconciler_NewCanaries(t *testing.T) { }) } } + +// Tests the reconciler correctly promotes canaries +func TestNodeReconciler_CanaryPromotion(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + Canary: 20, // deploy to 20% of eligible nodes + MaxParallel: 1, // otherwise the update strategy will be considered nil + } + job.JobModifyIndex = 1 + + // bump the job version up + newJobVersion := job.Copy() + newJobVersion.Version = job.Version + 1 + newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 + + // Create 5 nodes + nodes := []*structs.Node{} + for i := range 5 { + node := mock.Node() + node.ID = fmt.Sprintf("node_%d", i) + node.Name = fmt.Sprintf("node_%d", i) + nodes = append(nodes, node) + } + + // Create v0 allocs on 2 of the nodes, and v1 (canary) allocs on 3 nodes + allocs := []*structs.Allocation{} + for _, n := range nodes[0:3] { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + + allocs = append(allocs, a) + } + for _, n := range nodes[3:] { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + a.DeploymentStatus = &structs.AllocDeploymentStatus{Canary: true} + a.Job.Version = newJobVersion.Version + a.Job.JobModifyIndex = newJobVersion.JobModifyIndex + + allocs = append(allocs, a) + } + + // promote canaries + deployment := structs.NewDeployment(newJobVersion, 10, time.Now().Unix()) + deployment.TaskGroups[newJobVersion.TaskGroups[0].Name] = &structs.DeploymentState{ + Promoted: true, + HealthyAllocs: 5, + DesiredTotal: 5, + DesiredCanaries: 0, + } + + // reconcile + reconciler := NewNodeReconciler(deployment) + reconciler.Compute(newJobVersion, nodes, nil, nil, allocs, nil, false) + + must.NotNil(t, reconciler.DeploymentCurrent) + must.Eq(t, 5, reconciler.DeploymentCurrent.TaskGroups[newJobVersion.TaskGroups[0].Name].DesiredTotal) + must.SliceContainsFunc(t, reconciler.DeploymentUpdates, structs.DeploymentStatusSuccessful, + func(a *structs.DeploymentStatusUpdate, b string) bool { return a.Status == b }, + ) +} From 276ab8a4c69ae563360a547b41ccbb081b4a4f09 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 5 Sep 2025 15:32:08 +0200 Subject: [PATCH 10/13] system scheduler: keep track of previously used canary nodes (#26697) In the system scheduler, we need to keep track which nodes were previously used as "canary nodes" and not pick them at random, in case of previously failed canaries or changes to the amount of canaries in the jobspec. --------- Co-authored-by: Tim Gross --- scheduler/reconciler/reconcile_node.go | 57 ++++++++++- scheduler/reconciler/reconcile_node_test.go | 104 +++++++++++++++----- 2 files changed, 134 insertions(+), 27 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 2a21f7096..71dd7965c 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -59,7 +59,7 @@ func (nr *NodeReconciler) Compute( // Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary // percentage of eligible nodes, so we create a mapping of task group name // to a list of nodes that canaries should be placed on. - canaryNodes, canariesPerTG := computeCanaryNodes(required, eligibleNodes) + canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes) result := new(NodeReconcileResult) deploymentComplete := true @@ -76,11 +76,13 @@ func (nr *NodeReconciler) Compute( return result } -// computeCanaryNodes is a helper function that, given required task groups and +// computeCanaryNodes is a helper function that, given required task groups, +// mappings of nodes to their live allocs and terminal allocs, and a map of // eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates // which TGs this node is a canary for, and a map[TG] -> int to indicate how // many total canaries are to be placed for a TG. -func computeCanaryNodes(required map[string]*structs.TaskGroup, +func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup, + liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName, eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) { canaryNodes := map[string]map[string]bool{} @@ -96,12 +98,32 @@ func computeCanaryNodes(required map[string]*structs.TaskGroup, numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100)) canariesPerTG[tg.Name] = numberOfCanaryNodes + // check if there are any live allocations on any nodes that are/were + // canaries. + for nodeID, allocs := range liveAllocs { + for _, a := range allocs { + eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( + eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) + } + } + + // check if there are any terminal allocations that were canaries + for nodeID, terminalAlloc := range terminalAllocs { + for _, a := range terminalAlloc { + eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( + eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) + } + } + for i, n := range eligibleNodesList { - canaryNodes[n.ID] = map[string]bool{} if i > numberOfCanaryNodes-1 { break } + if _, ok := canaryNodes[n.ID]; !ok { + canaryNodes[n.ID] = map[string]bool{} + } + canaryNodes[n.ID][tg.Name] = true } } @@ -109,6 +131,33 @@ func computeCanaryNodes(required map[string]*structs.TaskGroup, return canaryNodes, canariesPerTG } +func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOfCanaryNodes int, + a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) { + + if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false || + nr.DeploymentCurrent == nil { // TODO: should we add this? || nr.DeploymentCurrent.ID != a.DeploymentID { + return nodesList, numberOfCanaryNodes + } + + nodes := nodesList + numberOfCanaries := numberOfCanaryNodes + if a.TaskGroup == tg.Name { + if _, ok := canaryNodes[nodeID]; !ok { + canaryNodes[nodeID] = map[string]bool{} + } + canaryNodes[nodeID][tg.Name] = true + + // this node should no longer be considered when searching + // for canary nodes + numberOfCanaries -= 1 + nodes = slices.DeleteFunc( + nodes, + func(n *structs.Node) bool { return n.ID == nodeID }, + ) + } + return nodes, numberOfCanaries +} + // computeForNode is used to do a set difference between the target // allocations and the existing allocations for a particular node. This returns // 8 sets of results: diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index 286f2a546..c9906e76a 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -764,11 +764,13 @@ func Test_computeCanaryNodes(t *testing.T) { // generate an odd number of nodes fiveEligibleNodes := map[string]*structs.Node{} - for range 5 { - nodeID := uuid.Generate() + // name them so we can refer to their names while testing pre-existing + // canary allocs + fiveEligibleNodeNames := []string{"node1", "node2", "node3", "node4", "node5"} + for _, name := range fiveEligibleNodeNames { node := mock.Node() - node.ID = nodeID - fiveEligibleNodes[nodeID] = node + node.ID = name + fiveEligibleNodes[name] = node } // generate an even number of nodes @@ -781,29 +783,43 @@ func Test_computeCanaryNodes(t *testing.T) { } testCases := []struct { - name string - nodes map[string]*structs.Node - required map[string]*structs.TaskGroup - expectedCanaryNodes map[string]int // number of nodes per tg + name string + nodes map[string]*structs.Node + liveAllocs map[string][]*structs.Allocation + terminalAllocs structs.TerminalByNodeByName + required map[string]*structs.TaskGroup + existingDeployment *structs.Deployment + expectedCanaryNodes map[string]int // number of nodes per tg + expectedCanaryNodeID map[string]string // sometimes we want to make sure a particular node ID is a canary }{ { - name: "no required task groups", - nodes: fourEligibleNodes, - required: nil, - expectedCanaryNodes: map[string]int{}, + name: "no required task groups", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: nil, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{}, + expectedCanaryNodeID: nil, }, { - name: "one task group with no update strategy", - nodes: fourEligibleNodes, + name: "one task group with no update strategy", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, required: map[string]*structs.TaskGroup{ "foo": { Name: "foo", }}, - expectedCanaryNodes: map[string]int{}, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{}, + expectedCanaryNodeID: nil, }, { - name: "one task group with 33% canary deployment", - nodes: fourEligibleNodes, + name: "one task group with 33% canary deployment", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, required: map[string]*structs.TaskGroup{ "foo": { Name: "foo", @@ -813,13 +829,17 @@ func Test_computeCanaryNodes(t *testing.T) { }, }, }, + existingDeployment: nil, expectedCanaryNodes: map[string]int{ "foo": 2, // we always round up }, + expectedCanaryNodeID: nil, }, { - name: "one task group with 100% canary deployment, four nodes", - nodes: fourEligibleNodes, + name: "one task group with 100% canary deployment, four nodes", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, required: map[string]*structs.TaskGroup{ "foo": { Name: "foo", @@ -829,13 +849,17 @@ func Test_computeCanaryNodes(t *testing.T) { }, }, }, + existingDeployment: nil, expectedCanaryNodes: map[string]int{ "foo": 4, }, + expectedCanaryNodeID: nil, }, { - name: "one task group with 50% canary deployment, even nodes", - nodes: fourEligibleNodes, + name: "one task group with 50% canary deployment, even nodes", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, required: map[string]*structs.TaskGroup{ "foo": { Name: "foo", @@ -845,13 +869,35 @@ func Test_computeCanaryNodes(t *testing.T) { }, }, }, + existingDeployment: nil, expectedCanaryNodes: map[string]int{ "foo": 2, }, + expectedCanaryNodeID: nil, }, { - name: "two task groups: one with 50% canary deploy, second one with 2% canary deploy", + name: "two task groups: one with 50% canary deploy, second one with 2% canary deploy, pre-existing canary alloc", nodes: fiveEligibleNodes, + liveAllocs: map[string][]*structs.Allocation{ + "foo": {mock.Alloc()}, // should be disregarded since it's not one of our nodes + fiveEligibleNodeNames[0]: { + {DeploymentStatus: nil}, + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: false}}, + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "foo"}, + }, + fiveEligibleNodeNames[1]: { + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "bar"}, + }, + }, + terminalAllocs: structs.TerminalByNodeByName{ + fiveEligibleNodeNames[2]: map[string]*structs.Allocation{ + "foo": { + DeploymentStatus: &structs.AllocDeploymentStatus{ + Canary: true, + }, + }, + }, + }, required: map[string]*structs.TaskGroup{ "foo": { Name: "foo", @@ -868,17 +914,29 @@ func Test_computeCanaryNodes(t *testing.T) { }, }, }, + existingDeployment: structs.NewDeployment(mock.SystemJob(), 100, time.Now().Unix()), expectedCanaryNodes: map[string]int{ "foo": 3, // we always round up "bar": 1, // we always round up }, + expectedCanaryNodeID: map[string]string{ + fiveEligibleNodeNames[0]: "foo", + fiveEligibleNodeNames[1]: "bar", + fiveEligibleNodeNames[2]: "foo", + }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - _, canariesPerTG := computeCanaryNodes(tc.required, tc.nodes) + nr := NewNodeReconciler(tc.existingDeployment) + canaryNodes, canariesPerTG := nr.computeCanaryNodes(tc.required, tc.liveAllocs, tc.terminalAllocs, tc.nodes) must.Eq(t, tc.expectedCanaryNodes, canariesPerTG) + if tc.liveAllocs != nil { + for nodeID, tgName := range tc.expectedCanaryNodeID { + must.True(t, canaryNodes[nodeID][tgName]) + } + } }) } } From a0834952408996e92d89997636a24d3295a3b307 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 5 Sep 2025 16:20:34 +0200 Subject: [PATCH 11/13] system scheduler: correction to Test_computeCanaryNodes (#26707) --- scheduler/reconciler/reconcile_node.go | 2 +- scheduler/reconciler/reconcile_node_test.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 71dd7965c..cd99ac8c9 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -135,7 +135,7 @@ func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOf a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) { if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false || - nr.DeploymentCurrent == nil { // TODO: should we add this? || nr.DeploymentCurrent.ID != a.DeploymentID { + nr.DeploymentCurrent == nil { return nodesList, numberOfCanaryNodes } diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index c9906e76a..87e822dca 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -895,6 +895,7 @@ func Test_computeCanaryNodes(t *testing.T) { DeploymentStatus: &structs.AllocDeploymentStatus{ Canary: true, }, + TaskGroup: "foo", }, }, }, @@ -925,6 +926,26 @@ func Test_computeCanaryNodes(t *testing.T) { fiveEligibleNodeNames[2]: "foo", }, }, + { + name: "task group with 100% canary deploy, 1 eligible node", + nodes: map[string]*structs.Node{"foo": mock.Node()}, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 100, + MaxParallel: 1, + }, + }, + }, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{ + "foo": 1, + }, + expectedCanaryNodeID: nil, + }, } for _, tc := range testCases { From ce614e6b7a00ad8bc499e3db772ae2aadde88a50 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 5 Sep 2025 10:22:42 -0400 Subject: [PATCH 12/13] scheduler: `upgrade` block testing for system deployments (#26579) This changeset adds system scheduler tests of various permutations of the `update` block. It also fixes a number of bugs discovered in the process. * Don't create deployment for in-flight rollout. If a system job is in the middle of a rollout prior to upgrading to a version of Nomad with system deployments, we'll end up creating a system deployment which might never complete because previously placed allocs will not be tracked. Check to see if we have existing allocs that should belong to the new deployment and prevent a deployment from being created in that case. * Ensure we call `Copy` on `Deployment` to avoid state store corruption. * Don't limit canary counts by `max_parallel`. * Never create deployments for `sysbatch` jobs. Ref: https://hashicorp.atlassian.net/browse/NMD-761 --- nomad/state/state_store.go | 4 +- scheduler/reconciler/reconcile_node.go | 18 +- scheduler/scheduler_sysbatch_test.go | 8 +- scheduler/scheduler_system.go | 8 +- scheduler/scheduler_system_test.go | 568 +++++++++++++++++++++++++ 5 files changed, 600 insertions(+), 6 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1b3d158dd..49c44fee1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -758,8 +758,8 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri } d := raw.(*structs.Deployment) - // If the allocation belongs to a job with the same ID but a different - // create index and we are not getting all the allocations whose Jobs + // If the deployment belongs to a job with the same ID but a different + // create index and we are not getting all the deployments whose Jobs // matches the same Job ID then we skip it if !all && job != nil && d.JobCreateIndex != job.CreateIndex { continue diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index cd99ac8c9..481fb64ed 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -18,6 +18,11 @@ type NodeReconciler struct { DeploymentOld *structs.Deployment DeploymentCurrent *structs.Deployment DeploymentUpdates []*structs.DeploymentStatusUpdate + + // COMPAT(1.14.0): + // compatHasSameVersionAllocs indicates that the reconciler found some + // allocations that were for the version being deployed + compatHasSameVersionAllocs bool } func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler { @@ -61,6 +66,8 @@ func (nr *NodeReconciler) Compute( // to a list of nodes that canaries should be placed on. canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes) + compatHadExistingDeployment := nr.DeploymentCurrent != nil + result := new(NodeReconcileResult) deploymentComplete := true for nodeID, allocs := range nodeAllocs { @@ -71,6 +78,14 @@ func (nr *NodeReconciler) Compute( result.Append(diff) } + // COMPAT(1.14.0) prevent a new deployment from being created in the case + // where we've upgraded the cluster while a legacy rolling deployment was in + // flight, otherwise we won't have HealthAllocs tracking and will never mark + // the deployment as complete + if !compatHadExistingDeployment && nr.compatHasSameVersionAllocs { + nr.DeploymentCurrent = nil + } + nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) return result @@ -392,6 +407,7 @@ func (nr *NodeReconciler) computeForNode( // Everything is up-to-date IGNORE: + nr.compatHasSameVersionAllocs = true result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, @@ -520,8 +536,8 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGro hadRunning := false for _, alloc := range allocs { if alloc.Job.ID == job.ID && alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { + nr.compatHasSameVersionAllocs = true hadRunning = true - break } } diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index d6d69c400..ed386fb9f 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -91,6 +91,11 @@ func TestSysBatch_JobRegister(t *testing.T) { must.Eq(t, 0, queued, must.Sprint("unexpected queued allocations")) h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // sysbatch jobs never create a deployment + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, 0, deployments) } func TestSysBatch_JobRegister_AddNode_Running(t *testing.T) { @@ -1839,9 +1844,10 @@ func TestSysBatch_canHandle(t *testing.T) { must.True(t, s.canHandle(structs.EvalTriggerPeriodicJob)) }) } + func createNodes(t *testing.T, h *tests.Harness, n int) []*structs.Node { nodes := make([]*structs.Node, n) - for i := 0; i < n; i++ { + for i := range n { node := mock.Node() nodes[i] = node must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index d8b8a3e1b..27a7700b9 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -154,11 +154,13 @@ func (s *SystemScheduler) process() (bool, error) { } if !s.sysbatch { - // Get any existing deployment s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) if err != nil { return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) } + // system deployments may be mutated in the reconciler because the node + // count can change between evaluations + s.deployment = s.deployment.Copy() } // Create a plan @@ -632,7 +634,9 @@ func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.Node if limit := limits[a.Alloc.TaskGroup]; limit > 0 { ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") diff.Place = append(diff.Place, a) - limits[a.Alloc.TaskGroup]-- + if !a.Canary { + limits[a.Alloc.TaskGroup]-- + } } else { limited = true } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 908927c49..8369ced42 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -5,6 +5,8 @@ package scheduler import ( "fmt" + "maps" + "slices" "sort" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/hashicorp/nomad/scheduler/feasible" "github.com/hashicorp/nomad/scheduler/reconciler" "github.com/hashicorp/nomad/scheduler/tests" + "github.com/shoenig/test" "github.com/shoenig/test/must" ) @@ -3390,3 +3393,568 @@ func TestEvictAndPlace(t *testing.T) { } } + +// TestSystemScheduler_UpdateBlock tests various permutations of the update block +func TestSystemScheduler_UpdateBlock(t *testing.T) { + ci.Parallel(t) + + collect := func(planned map[string][]*structs.Allocation) map[string]int { + if len(planned) == 0 { + return nil + } + counts := map[string]int{} + for _, node := range planned { + for _, alloc := range node { + counts[alloc.TaskGroup]++ + } + } + return counts + } + + assertDeploymentState := func(t *testing.T, expectDstates, gotDstates map[string]*structs.DeploymentState) { + t.Helper() + if expectDstates == nil { + return + } + + must.SliceContainsAll(t, + slices.Collect(maps.Keys(expectDstates)), + slices.Collect(maps.Keys(gotDstates)), + must.Sprint("expected matching task groups in deployment state")) + + for tg, expect := range expectDstates { + got := gotDstates[tg] + test.Eq(t, expect.DesiredCanaries, got.DesiredCanaries, + test.Sprintf("DesiredCanaries for %s", tg)) + test.Eq(t, expect.DesiredTotal, got.DesiredTotal, + test.Sprintf("DesiredTotal for %s", tg)) + test.Eq(t, expect.PlacedAllocs, got.PlacedAllocs, + test.Sprintf("PlacedAllocs for %s", tg)) + test.Eq(t, len(expect.PlacedCanaries), len(got.PlacedCanaries), + test.Sprintf("len(PlacedCanaries) for %s", tg)) + } + } + + tg1, tg2 := "tg1", "tg2" + + // note: all test cases assume that if there's an existing dstate that we're + // in the middle of the deployment, otherwise we're starting a new + // deployment (also noted in name of subtest) + testCases := []struct { + name string + tg1UpdateBlock *structs.UpdateStrategy + tg2UpdateBlock *structs.UpdateStrategy + existingCurrentDState map[string]*structs.DeploymentState + existingOldDState map[string]*structs.DeploymentState + + // these maps signify which nodes have allocs already, mapping + // group -> indexes in the `nodes` slice + existingPrevious map[string][]int // previous version of job + existingRunning map[string][]int // current version of job (running) + existingFailed map[string][]int // current verison of job (failed) + existingCanary map[string][]int // canaries (must match running or failed) + + expectAllocs map[string]int // plan NodeAllocations group -> count + expectStop map[string]int // plan NodeUpdates group -> count + expectDState map[string]*structs.DeploymentState + }{ + { + name: "legacy upgrade non-deployment", + expectAllocs: map[string]int{tg1: 10, tg2: 10}, + }, + + { + name: "new deployment max_parallel vs no update block", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 10}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2}, + }, + }, + + { + name: "max_parallel mid-deployment", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, HealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, HealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 4}, + tg2: {DesiredTotal: 10, PlacedAllocs: 6}, + }, + }, + + { + name: "legacy upgrade max_parallel mid-rollout", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + // expects no new deployment + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + }, + + { + name: "max_parallel underprovisioned from failure", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, UnhealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, UnhealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 4, tg2: 6}, // includes reschedules + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 6}, // includes reschedules + tg2: {DesiredTotal: 10, PlacedAllocs: 9}, + }, + }, + + { + name: "new deployment max_parallel with old underprovisioned", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4}, + tg2: {0, 1, 2}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 5}, + tg2: {DesiredTotal: 10, PlacedAllocs: 7}, + }, + // note: we immediately place on empty nodes + expectAllocs: map[string]int{tg1: 7, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 7}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "new deployment with canaries", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 3, tg2: 5}, + expectStop: map[string]int{tg1: 3, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"0", "1", "2"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5}, + }, + }, + + { + name: "canaries failed", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 1, + UnhealthyAllocs: 2, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, // only 2 replacements + expectStop: map[string]int{tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries partial placement", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 8, 9}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 1, + HealthyAllocs: 1, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, + expectStop: map[string]int{tg1: 2, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries awaiting promotion", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: false, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: nil, + expectStop: nil, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries promoted", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: true, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: true, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2}, + expectStop: map[string]int{tg1: 2}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + h := tests.NewHarness(t) + nodes := createNodes(t, h, 10) + + oldJob := mock.SystemJob() + oldJob.TaskGroups[0].Update = tc.tg1UpdateBlock + oldJob.TaskGroups[0].Name = tg1 + taskGroup2 := oldJob.TaskGroups[0].Copy() + taskGroup2.Update = tc.tg2UpdateBlock + taskGroup2.Name = tg2 + oldJob.TaskGroups = append(oldJob.TaskGroups, taskGroup2) + + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, h.NextIndex(), nil, oldJob)) + + // destructively update both task groups of the job + job := oldJob.Copy() + job.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + job.TaskGroups[1].Tasks[0].Config["command"] = "/bin/other" + idx := h.NextIndex() + job.CreateIndex = idx + job.JobModifyIndex = idx + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, idx, nil, job)) + + expectedDeployments := 0 + if len(tc.existingOldDState) > 0 { + d := mock.Deployment() + d.JobID = job.ID + d.JobVersion = oldJob.Version + d.JobCreateIndex = oldJob.CreateIndex + d.JobModifyIndex = oldJob.JobModifyIndex + h.State.UpsertDeployment(h.NextIndex(), d) + expectedDeployments++ + } + + dID := uuid.Generate() + + existAllocs := []*structs.Allocation{} + for _, tg := range []string{tg1, tg2} { + nodesToAllocs := map[string]string{} + for _, nodeIdx := range tc.existingPrevious[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = oldJob + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingRunning[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingFailed[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusFailed + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(false), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for i, nodeIdx := range tc.existingCanary[tg] { + nodeID := nodes[nodeIdx].ID + // find the correct alloc IDs for the PlaceCanaries + if dstate, ok := tc.existingCurrentDState[tg]; ok { + dstate.PlacedCanaries[i] = nodesToAllocs[nodeID] + } + } + } + + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), + existAllocs)) + + if len(tc.existingCurrentDState) > 0 { + d := mock.Deployment() + d.ID = dID + d.JobID = job.ID + d.JobVersion = job.Version + d.JobCreateIndex = job.CreateIndex + d.JobModifyIndex = job.JobModifyIndex + d.TaskGroups = tc.existingCurrentDState + h.State.UpsertDeployment(h.NextIndex(), d) + } + if len(tc.expectDState) > 0 { + expectedDeployments++ + } + + eval := &structs.Evaluation{ + Namespace: job.Namespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals( + structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + plan := h.Plans[0] + + stopped := collect(plan.NodeUpdate) + test.Eq(t, tc.expectStop, stopped, test.Sprint("expected stop/evict")) + + // note: includes existing allocs but not previous version allocs + nodeAllocs := collect(plan.NodeAllocation) + test.Eq(t, tc.expectAllocs, nodeAllocs, test.Sprint("expected keep/place")) + + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, expectedDeployments, deployments, must.Sprint("expected deployments")) + + if tc.expectDState != nil { + deployment, err := h.State.LatestDeploymentByJobID(nil, job.Namespace, job.ID) + must.NoError(t, err) + assertDeploymentState(t, tc.expectDState, deployment.TaskGroups) + } + }) + } + +} From 3e4d2b731cdc10af153ee65e4ace56fb05e37774 Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Fri, 5 Sep 2025 16:30:25 +0200 Subject: [PATCH 13/13] scheduler: changelog entry for system deployments --- .changelog/26708.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/26708.txt diff --git a/.changelog/26708.txt b/.changelog/26708.txt new file mode 100644 index 000000000..29fd49845 --- /dev/null +++ b/.changelog/26708.txt @@ -0,0 +1,3 @@ +```release-note:feature +scheduler: Enable deployments for system jobs +```