From 0e6e5ef8d18c0a6a63af226e940017529b39a70b Mon Sep 17 00:00:00 2001 From: Piotr Kazmierczak <470696+pkazmierczak@users.noreply.github.com> Date: Wed, 6 Aug 2025 17:08:52 +0200 Subject: [PATCH] scheduler: handle deployment completeness in the node reconciler (#26445) This PR introduces marking deployments as complete if there are no remaining placements to be made for a given task group. --- scheduler/reconciler/reconcile_node.go | 91 ++++++++++++++++++--- scheduler/reconciler/reconcile_node_test.go | 20 ++--- 2 files changed, 91 insertions(+), 20 deletions(-) diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index 255895afc..4d0cd6d53 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,7 +5,6 @@ package reconciler import ( "fmt" - "slices" "time" "github.com/hashicorp/nomad/nomad/structs" @@ -55,13 +54,17 @@ func (nr *NodeReconciler) Compute( required := materializeSystemTaskGroups(job) result := new(NodeReconcileResult) + deploymentComplete := true for nodeID, allocs := range nodeAllocs { - diff := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, + diff, deploymentCompleteForNode := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) + deploymentComplete = deploymentComplete && deploymentCompleteForNode result.Append(diff) } + nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) + return result } @@ -84,7 +87,7 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( liveAllocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) *NodeReconcileResult { +) (*NodeReconcileResult, bool) { result := new(NodeReconcileResult) // cancel deployments that aren't needed anymore @@ -279,6 +282,10 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( }) } + // as we iterate over require groups, we'll keep track of whether the deployment + // is complete or not + deploymentComplete := false + // Scan the required groups for name, tg := range required { @@ -363,17 +370,12 @@ func (nr *NodeReconciler) diffSystemAllocsForNode( continue } - // allocs that are in the Place, Update and Migrate buckets are the ones we - // consider for deployment - allocsForDeployment := []*structs.Allocation{} - for _, r := range slices.Concat(result.Place, result.Update, result.Migrate) { - allocsForDeployment = append(allocsForDeployment, r.Alloc) - } nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) } + deploymentComplete = nr.isDeploymentComplete(tg.Name, result) } - return result + return result, deploymentComplete } func (nr *NodeReconciler) createDeployment(job *structs.Job, @@ -419,6 +421,75 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job, nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate } +func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool { + complete := len(buckets.Place)+len(buckets.Migrate) == 0 // && !requiresCanaries // TODO: additional condition for canaries + + if !complete || nr.DeploymentCurrent == nil { + return false + } + + // ensure everything is healthy + if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok { + if dstate.HealthyAllocs < max(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs + (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries + complete = false + } + } + + return complete +} + +func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate { + statusUpdates := []*structs.DeploymentStatusUpdate{} + + if nr.DeploymentCurrent != nil { + + // Mark the deployment as complete if possible + if deploymentComplete { + if job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if nr.DeploymentCurrent.Status != structs.DeploymentStatusUnblocking && + nr.DeploymentCurrent.Status != structs.DeploymentStatusSuccessful { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + }) + } + } + + // Mark the deployment as pending since its state is now computed. + if nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, + }) + } + } + + // Set the description of a created deployment + if d := nr.DeploymentCurrent; d != nil { + if d.RequiresPromotion() { + if d.HasAutoPromote() { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion + } else { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + } + } + } + + return statusUpdates +} + // materializeSystemTaskGroups is used to materialize all the task groups // a system or sysbatch job requires. func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index 4c17bb219..450d9a6c3 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -73,7 +73,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -96,7 +96,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -158,7 +158,7 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, nil, tainted, required, allocsForNode, terminal, true) @@ -217,7 +217,7 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { terminal := structs.TerminalByNodeByName{} nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) @@ -287,7 +287,7 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, tc.nodeID, eligible, ineligible, tainted, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -344,7 +344,7 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -396,7 +396,7 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { } nr := NewNodeReconciler(nil) - diff := nr.diffSystemAllocsForNode( + diff, _ := nr.diffSystemAllocsForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, tainted, required, allocs, terminal, true) @@ -522,7 +522,7 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { } nr := NewNodeReconciler(nil) - got := nr.diffSystemAllocsForNode( + got, _ := nr.diffSystemAllocsForNode( job, tc.node.ID, eligibleNodes, nil, taintedNodes, required, []*structs.Allocation{alloc}, terminal, true, ) @@ -715,8 +715,8 @@ func TestNodeDeployments(t *testing.T) { }, }, false, - "", - "", + structs.DeploymentStatusSuccessful, + structs.DeploymentStatusDescriptionSuccessful, }, { "existing running deployment for a stopped job should be cancelled",