scheduler: handle deployment completeness in the node reconciler (#26445)

This PR introduces marking deployments as complete if there are no remaining
placements to be made for a given task group.
This commit is contained in:
Piotr Kazmierczak
2025-08-06 17:08:52 +02:00
parent c33e30596c
commit 0e6e5ef8d1
2 changed files with 91 additions and 20 deletions

View File

@@ -5,7 +5,6 @@ package reconciler
import (
"fmt"
"slices"
"time"
"github.com/hashicorp/nomad/nomad/structs"
@@ -55,13 +54,17 @@ func (nr *NodeReconciler) Compute(
required := materializeSystemTaskGroups(job)
result := new(NodeReconcileResult)
deploymentComplete := true
for nodeID, allocs := range nodeAllocs {
diff := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes,
diff, deploymentCompleteForNode := nr.diffSystemAllocsForNode(job, nodeID, eligibleNodes,
notReadyNodes, taintedNodes, required, allocs, terminal,
serverSupportsDisconnectedClients)
deploymentComplete = deploymentComplete && deploymentCompleteForNode
result.Append(diff)
}
nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...)
return result
}
@@ -84,7 +87,7 @@ func (nr *NodeReconciler) diffSystemAllocsForNode(
liveAllocs []*structs.Allocation, // non-terminal allocations that exist
terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id)
serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic
) *NodeReconcileResult {
) (*NodeReconcileResult, bool) {
result := new(NodeReconcileResult)
// cancel deployments that aren't needed anymore
@@ -279,6 +282,10 @@ func (nr *NodeReconciler) diffSystemAllocsForNode(
})
}
// as we iterate over require groups, we'll keep track of whether the deployment
// is complete or not
deploymentComplete := false
// Scan the required groups
for name, tg := range required {
@@ -363,17 +370,12 @@ func (nr *NodeReconciler) diffSystemAllocsForNode(
continue
}
// allocs that are in the Place, Update and Migrate buckets are the ones we
// consider for deployment
allocsForDeployment := []*structs.Allocation{}
for _, r := range slices.Concat(result.Place, result.Update, result.Migrate) {
allocsForDeployment = append(allocsForDeployment, r.Alloc)
}
nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs)
}
deploymentComplete = nr.isDeploymentComplete(tg.Name, result)
}
return result
return result, deploymentComplete
}
func (nr *NodeReconciler) createDeployment(job *structs.Job,
@@ -419,6 +421,75 @@ func (nr *NodeReconciler) createDeployment(job *structs.Job,
nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate
}
func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult) bool {
complete := len(buckets.Place)+len(buckets.Migrate) == 0 // && !requiresCanaries // TODO: additional condition for canaries
if !complete || nr.DeploymentCurrent == nil {
return false
}
// ensure everything is healthy
if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < max(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
complete = false
}
}
return complete
}
func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate {
statusUpdates := []*structs.DeploymentStatusUpdate{}
if nr.DeploymentCurrent != nil {
// Mark the deployment as complete if possible
if deploymentComplete {
if job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
if nr.DeploymentCurrent.Status != structs.DeploymentStatusUnblocking &&
nr.DeploymentCurrent.Status != structs.DeploymentStatusSuccessful {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusBlocked,
StatusDescription: structs.DeploymentStatusDescriptionBlocked,
})
}
} else {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
})
}
}
// Mark the deployment as pending since its state is now computed.
if nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing {
statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{
DeploymentID: nr.DeploymentCurrent.ID,
Status: structs.DeploymentStatusPending,
StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer,
})
}
}
// Set the description of a created deployment
if d := nr.DeploymentCurrent; d != nil {
if d.RequiresPromotion() {
if d.HasAutoPromote() {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion
} else {
d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion
}
}
}
return statusUpdates
}
// materializeSystemTaskGroups is used to materialize all the task groups
// a system or sysbatch job requires.
func materializeSystemTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {

View File

@@ -73,7 +73,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff)
if len(diff.Ignore) > 0 {
@@ -96,7 +96,7 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) {
}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
diff, _ := nr.diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true)
assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff)
})
@@ -158,7 +158,7 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(
diff, _ := nr.diffSystemAllocsForNode(
job, tc.nodeID, eligible, nil,
tainted, required, allocsForNode, terminal, true)
@@ -217,7 +217,7 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) {
terminal := structs.TerminalByNodeByName{}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(
diff, _ := nr.diffSystemAllocsForNode(
job, node.ID, eligible, nil, tainted, required, allocs, terminal, true)
assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff)
@@ -287,7 +287,7 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) {
}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(
diff, _ := nr.diffSystemAllocsForNode(
job, tc.nodeID, eligible, ineligible, tainted,
required, []*structs.Allocation{alloc}, terminal, true,
)
@@ -344,7 +344,7 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) {
}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(
diff, _ := nr.diffSystemAllocsForNode(
job, drainNode.ID, map[string]*structs.Node{}, nil,
tainted, required, allocs, terminal, true)
@@ -396,7 +396,7 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) {
}
nr := NewNodeReconciler(nil)
diff := nr.diffSystemAllocsForNode(
diff, _ := nr.diffSystemAllocsForNode(
job, deadNode.ID, map[string]*structs.Node{}, nil,
tainted, required, allocs, terminal, true)
@@ -522,7 +522,7 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) {
}
nr := NewNodeReconciler(nil)
got := nr.diffSystemAllocsForNode(
got, _ := nr.diffSystemAllocsForNode(
job, tc.node.ID, eligibleNodes, nil, taintedNodes,
required, []*structs.Allocation{alloc}, terminal, true,
)
@@ -715,8 +715,8 @@ func TestNodeDeployments(t *testing.T) {
},
},
false,
"",
"",
structs.DeploymentStatusSuccessful,
structs.DeploymentStatusDescriptionSuccessful,
},
{
"existing running deployment for a stopped job should be cancelled",