diff --git a/.changelog/26708.txt b/.changelog/26708.txt new file mode 100644 index 000000000..29fd49845 --- /dev/null +++ b/.changelog/26708.txt @@ -0,0 +1,3 @@ +```release-note:feature +scheduler: Enable deployments for system jobs +``` diff --git a/client/allocrunner/health_hook.go b/client/allocrunner/health_hook.go index 0944aa3f1..58ccd27db 100644 --- a/client/allocrunner/health_hook.go +++ b/client/allocrunner/health_hook.go @@ -86,7 +86,9 @@ func newAllocHealthWatcherHook( // Neither deployments nor migrations care about the health of // non-service jobs so never watch their health - if alloc.Job.Type != structs.JobTypeService { + switch alloc.Job.Type { + case structs.JobTypeService, structs.JobTypeSystem: + default: return noopAllocHealthWatcherHook{} } diff --git a/client/allocrunner/health_hook_test.go b/client/allocrunner/health_hook_test.go index ce1ae8c9f..ddcc9955e 100644 --- a/client/allocrunner/health_hook_test.go +++ b/client/allocrunner/health_hook_test.go @@ -19,6 +19,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test/must" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -396,26 +397,28 @@ func TestHealthHook_SetHealth_unhealthy(t *testing.T) { require.NoError(h.Postrun()) } -// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker. -func TestHealthHook_SystemNoop(t *testing.T) { +// TestHealthHook_System asserts that system jobs trigger hooks just like service jobs. +func TestHealthHook_System(t *testing.T) { ci.Parallel(t) alloc := mock.SystemAlloc() h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), nil, nil, nil, nil) - // Assert that it's the noop impl _, ok := h.(noopAllocHealthWatcherHook) - require.True(t, ok) + must.False(t, ok) - // Assert the noop impl does not implement any hooks + _, ok = h.(*allocHealthWatcherHook) + must.True(t, ok) + + // Assert the other hooks are implemented, too _, ok = h.(interfaces.RunnerPrerunHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.RunnerUpdateHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.RunnerPostrunHook) - require.False(t, ok) + must.True(t, ok) _, ok = h.(interfaces.ShutdownHook) - require.False(t, ok) + must.True(t, ok) } // TestHealthHook_BatchNoop asserts that batch jobs return the noop tracker. diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1b3d158dd..49c44fee1 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -758,8 +758,8 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID stri } d := raw.(*structs.Deployment) - // If the allocation belongs to a job with the same ID but a different - // create index and we are not getting all the allocations whose Jobs + // If the deployment belongs to a job with the same ID but a different + // create index and we are not getting all the deployments whose Jobs // matches the same Job ID then we skip it if !all && job != nil && d.JobCreateIndex != job.CreateIndex { continue diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0ef49b081..04b7e2887 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -5322,6 +5322,9 @@ var ( type UpdateStrategy struct { // Stagger is used to determine the rate at which allocations are migrated // due to down or draining nodes. + // + // Deprecated: as of Nomad 1.11, this field is equivalent to MinHealthyTime + // and will be removed in future releases. Stagger time.Duration // MaxParallel is how many updates can be done in parallel diff --git a/scheduler/reconciler/deployments.go b/scheduler/reconciler/deployments.go new file mode 100644 index 000000000..a43fbbedb --- /dev/null +++ b/scheduler/reconciler/deployments.go @@ -0,0 +1,57 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package reconciler + +import "github.com/hashicorp/nomad/nomad/structs" + +// cancelUnneededDeployments cancels any deployment that is not needed. +// A deployment update will be staged for jobs that should stop or have the +// wrong version. Unneeded deployments include: +// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +// 2. Deployments that are active, but referencing a different job version. +// 3. Deployments that are already successful. +// +// returns: old deployment, current deployment and a slice of deployment status +// updates. +func cancelUnneededDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { + var updates []*structs.DeploymentStatusUpdate + + // If the job is stopped and there is a non-terminal deployment, cancel it + if j.Stopped() { + if d != nil && d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, + }) + } + + // Nothing else to do + return d, nil, updates + } + + if d == nil { + return nil, nil, nil + } + + // Check if the deployment is active and referencing an older job and cancel it + if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { + if d.Active() { + updates = append(updates, &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + } + + return d, nil, updates + } + + // Clear it as the current deployment if it is successful + if d.Status == structs.DeploymentStatusSuccessful { + return d, nil, updates + } + + return nil, d, updates +} diff --git a/scheduler/reconciler/reconcile_cluster.go b/scheduler/reconciler/reconcile_cluster.go index b437b3a3d..f3f04fcb9 100644 --- a/scheduler/reconciler/reconcile_cluster.go +++ b/scheduler/reconciler/reconcile_cluster.go @@ -311,57 +311,6 @@ func (a *AllocReconciler) Compute() *ReconcileResults { return result } -// cancelUnneededDeployments cancels any deployment that is not needed. -// A deployment update will be staged for jobs that should stop or have the -// wrong version. Unneeded deployments include: -// 1. Jobs that are marked for stop, but there is a non-terminal deployment. -// 2. Deployments that are active, but referencing a different job version. -// 3. Deployments that are already successful. -// -// returns: old deployment, current deployment and a slice of deployment status -// updates. -func cancelUnneededDeployments(j *structs.Job, d *structs.Deployment) (*structs.Deployment, *structs.Deployment, []*structs.DeploymentStatusUpdate) { - var updates []*structs.DeploymentStatusUpdate - - // If the job is stopped and there is a non-terminal deployment, cancel it - if j.Stopped() { - if d != nil && d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionStoppedJob, - }) - } - - // Nothing else to do - return d, nil, updates - } - - if d == nil { - return nil, nil, nil - } - - // Check if the deployment is active and referencing an older job and cancel it - if d.JobCreateIndex != j.CreateIndex || d.JobVersion != j.Version { - if d.Active() { - updates = append(updates, &structs.DeploymentStatusUpdate{ - DeploymentID: d.ID, - Status: structs.DeploymentStatusCancelled, - StatusDescription: structs.DeploymentStatusDescriptionNewerJob, - }) - } - - return d, nil, updates - } - - // Clear it as the current deployment if it is successful - if d.Status == structs.DeploymentStatusSuccessful { - return d, nil, updates - } - - return nil, d, updates -} - // handleStop marks all allocations to be stopped, handling the lost case. // Returns result structure with desired changes field set to stopped // allocations and an array of stopped allocations. It mutates the Stop fields diff --git a/scheduler/reconciler/reconcile_node.go b/scheduler/reconciler/reconcile_node.go index d04895f32..481fb64ed 100644 --- a/scheduler/reconciler/reconcile_node.go +++ b/scheduler/reconciler/reconcile_node.go @@ -5,15 +5,36 @@ package reconciler import ( "fmt" + "maps" + "math" + "slices" "time" "github.com/hashicorp/nomad/nomad/structs" sstructs "github.com/hashicorp/nomad/scheduler/structs" ) -// Node is like diffSystemAllocsForNode however, the allocations in the +type NodeReconciler struct { + DeploymentOld *structs.Deployment + DeploymentCurrent *structs.Deployment + DeploymentUpdates []*structs.DeploymentStatusUpdate + + // COMPAT(1.14.0): + // compatHasSameVersionAllocs indicates that the reconciler found some + // allocations that were for the version being deployed + compatHasSameVersionAllocs bool +} + +func NewNodeReconciler(deployment *structs.Deployment) *NodeReconciler { + return &NodeReconciler{ + DeploymentCurrent: deployment, + DeploymentUpdates: make([]*structs.DeploymentStatusUpdate, 0), + } +} + +// Compute is like diffSystemAllocsForNode however, the allocations in the // diffResult contain the specific nodeID they should be allocated on. -func Node( +func (nr *NodeReconciler) Compute( job *structs.Job, // jobs whose allocations are going to be diff-ed readyNodes []*structs.Node, // list of nodes in the ready state notReadyNodes map[string]struct{}, // list of nodes in DC but not ready, e.g. draining @@ -40,42 +61,177 @@ func Node( // Create the required task groups. required := materializeSystemTaskGroups(job) + // Canary deployments deploy to the TaskGroup.UpdateStrategy.Canary + // percentage of eligible nodes, so we create a mapping of task group name + // to a list of nodes that canaries should be placed on. + canaryNodes, canariesPerTG := nr.computeCanaryNodes(required, nodeAllocs, terminal, eligibleNodes) + + compatHadExistingDeployment := nr.DeploymentCurrent != nil + result := new(NodeReconcileResult) + deploymentComplete := true for nodeID, allocs := range nodeAllocs { - diff := diffSystemAllocsForNode(job, nodeID, eligibleNodes, notReadyNodes, taintedNodes, required, allocs, terminal, serverSupportsDisconnectedClients) + diff, deploymentCompleteForNode := nr.computeForNode(job, nodeID, eligibleNodes, + notReadyNodes, taintedNodes, canaryNodes[nodeID], canariesPerTG, required, + allocs, terminal, serverSupportsDisconnectedClients) + deploymentComplete = deploymentComplete && deploymentCompleteForNode result.Append(diff) } + // COMPAT(1.14.0) prevent a new deployment from being created in the case + // where we've upgraded the cluster while a legacy rolling deployment was in + // flight, otherwise we won't have HealthAllocs tracking and will never mark + // the deployment as complete + if !compatHadExistingDeployment && nr.compatHasSameVersionAllocs { + nr.DeploymentCurrent = nil + } + + nr.DeploymentUpdates = append(nr.DeploymentUpdates, nr.setDeploymentStatusAndUpdates(deploymentComplete, job)...) + return result } -// diffSystemAllocsForNode is used to do a set difference between the target allocations -// and the existing allocations for a particular node. This returns 8 sets of results, -// the list of named task groups that need to be placed (no existing allocation), the -// allocations that need to be updated (job definition is newer), allocs that -// need to be migrated (node is draining), the allocs that need to be evicted -// (no longer required), those that should be ignored, those that are lost -// that need to be replaced (running on a lost node), those that are running on -// a disconnected node but may resume, and those that may still be running on -// a node that has resumed reconnected. -func diffSystemAllocsForNode( +// computeCanaryNodes is a helper function that, given required task groups, +// mappings of nodes to their live allocs and terminal allocs, and a map of +// eligible nodes, outputs a map[nodeID] -> map[TG] -> bool which indicates +// which TGs this node is a canary for, and a map[TG] -> int to indicate how +// many total canaries are to be placed for a TG. +func (nr *NodeReconciler) computeCanaryNodes(required map[string]*structs.TaskGroup, + liveAllocs map[string][]*structs.Allocation, terminalAllocs structs.TerminalByNodeByName, + eligibleNodes map[string]*structs.Node) (map[string]map[string]bool, map[string]int) { + + canaryNodes := map[string]map[string]bool{} + eligibleNodesList := slices.Collect(maps.Values(eligibleNodes)) + canariesPerTG := map[string]int{} + + for _, tg := range required { + if tg.Update.IsEmpty() || tg.Update.Canary == 0 { + continue + } + + // round up to the nearest integer + numberOfCanaryNodes := int(math.Ceil(float64(tg.Update.Canary) * float64(len(eligibleNodes)) / 100)) + canariesPerTG[tg.Name] = numberOfCanaryNodes + + // check if there are any live allocations on any nodes that are/were + // canaries. + for nodeID, allocs := range liveAllocs { + for _, a := range allocs { + eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( + eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) + } + } + + // check if there are any terminal allocations that were canaries + for nodeID, terminalAlloc := range terminalAllocs { + for _, a := range terminalAlloc { + eligibleNodesList, numberOfCanaryNodes = nr.findOldCanaryNodes( + eligibleNodesList, numberOfCanaryNodes, a, tg, canaryNodes, nodeID) + } + } + + for i, n := range eligibleNodesList { + if i > numberOfCanaryNodes-1 { + break + } + + if _, ok := canaryNodes[n.ID]; !ok { + canaryNodes[n.ID] = map[string]bool{} + } + + canaryNodes[n.ID][tg.Name] = true + } + } + + return canaryNodes, canariesPerTG +} + +func (nr *NodeReconciler) findOldCanaryNodes(nodesList []*structs.Node, numberOfCanaryNodes int, + a *structs.Allocation, tg *structs.TaskGroup, canaryNodes map[string]map[string]bool, nodeID string) ([]*structs.Node, int) { + + if a.DeploymentStatus == nil || a.DeploymentStatus.Canary == false || + nr.DeploymentCurrent == nil { + return nodesList, numberOfCanaryNodes + } + + nodes := nodesList + numberOfCanaries := numberOfCanaryNodes + if a.TaskGroup == tg.Name { + if _, ok := canaryNodes[nodeID]; !ok { + canaryNodes[nodeID] = map[string]bool{} + } + canaryNodes[nodeID][tg.Name] = true + + // this node should no longer be considered when searching + // for canary nodes + numberOfCanaries -= 1 + nodes = slices.DeleteFunc( + nodes, + func(n *structs.Node) bool { return n.ID == nodeID }, + ) + } + return nodes, numberOfCanaries +} + +// computeForNode is used to do a set difference between the target +// allocations and the existing allocations for a particular node. This returns +// 8 sets of results: +// 1. the list of named task groups that need to be placed (no existing +// allocation), +// 2. the allocations that need to be updated (job definition is newer), +// 3. allocs that need to be migrated (node is draining), +// 4. allocs that need to be evicted (no longer required), +// 5. those that should be ignored, +// 6. those that are lost that need to be replaced (running on a lost node), +// 7. those that are running on a disconnected node but may resume, and +// 8. those that may still be running on a node that has resumed reconnected. +// +// This method mutates the NodeReconciler fields, and returns a new +// NodeReconcilerResult object and a boolean to indicate wither the deployment +// is complete or not. +func (nr *NodeReconciler) computeForNode( job *structs.Job, // job whose allocs are going to be diff-ed nodeID string, eligibleNodes map[string]*structs.Node, notReadyNodes map[string]struct{}, // nodes that are not ready, e.g. draining taintedNodes map[string]*structs.Node, // nodes which are down (by node id) + canaryNode map[string]bool, // indicates whether this node is a canary node for tg + canariesPerTG map[string]int, // indicates how many canary placements we expect per tg required map[string]*structs.TaskGroup, // set of allocations that must exist liveAllocs []*structs.Allocation, // non-terminal allocations that exist terminal structs.TerminalByNodeByName, // latest terminal allocations (by node, id) serverSupportsDisconnectedClients bool, // flag indicating whether to apply disconnected client logic -) *NodeReconcileResult { +) (*NodeReconcileResult, bool) { result := new(NodeReconcileResult) + // cancel deployments that aren't needed anymore + var deploymentUpdates []*structs.DeploymentStatusUpdate + nr.DeploymentOld, nr.DeploymentCurrent, deploymentUpdates = cancelUnneededDeployments(job, nr.DeploymentCurrent) + nr.DeploymentUpdates = append(nr.DeploymentUpdates, deploymentUpdates...) + + // set deployment paused and failed, if we currently have a deployment + var deploymentPaused, deploymentFailed bool + if nr.DeploymentCurrent != nil { + // deployment is paused when it's manually paused by a user, or if the + // deployment is pending or initializing, which are the initial states + // for multi-region job deployments. + deploymentPaused = nr.DeploymentCurrent.Status == structs.DeploymentStatusPaused || + nr.DeploymentCurrent.Status == structs.DeploymentStatusPending || + nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing + deploymentFailed = nr.DeploymentCurrent.Status == structs.DeploymentStatusFailed + } + + // Track desired total and desired canaries across all loops + desiredCanaries := map[string]int{} + + // Track whether we're during a canary update + isCanarying := map[string]bool{} + // Scan the existing updates existing := make(map[string]struct{}) // set of alloc names - for _, exist := range liveAllocs { + for _, alloc := range liveAllocs { // Index the existing node - name := exist.Name + name := alloc.Name existing[name] = struct{}{} // Check for the definition in the required set @@ -86,12 +242,19 @@ func diffSystemAllocsForNode( result.Stop = append(result.Stop, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } - supportsDisconnectedClients := exist.SupportsDisconnectedClients(serverSupportsDisconnectedClients) + // populate deployment state for this task group if there is an existing + // deployment + var dstate = new(structs.DeploymentState) + if nr.DeploymentCurrent != nil { + dstate, _ = nr.DeploymentCurrent.TaskGroups[tg.Name] + } + + supportsDisconnectedClients := alloc.SupportsDisconnectedClients(serverSupportsDisconnectedClients) reconnect := false expired := false @@ -99,20 +262,20 @@ func diffSystemAllocsForNode( // Only compute reconnect for unknown and running since they need to go // through the reconnect process. if supportsDisconnectedClients && - (exist.ClientStatus == structs.AllocClientStatusUnknown || - exist.ClientStatus == structs.AllocClientStatusRunning) { - reconnect = exist.NeedsToReconnect() + (alloc.ClientStatus == structs.AllocClientStatusUnknown || + alloc.ClientStatus == structs.AllocClientStatusRunning) { + reconnect = alloc.NeedsToReconnect() if reconnect { - expired = exist.Expired(time.Now()) + expired = alloc.Expired(time.Now()) } } // If we have been marked for migration and aren't terminal, migrate - if exist.DesiredTransition.ShouldMigrate() { + if alloc.DesiredTransition.ShouldMigrate() { result.Migrate = append(result.Migrate, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } @@ -122,25 +285,25 @@ func diffSystemAllocsForNode( result.Lost = append(result.Lost, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // Ignore unknown allocs that we want to reconnect eventually. if supportsDisconnectedClients && - exist.ClientStatus == structs.AllocClientStatusUnknown && - exist.DesiredStatus == structs.AllocDesiredStatusRun { + alloc.ClientStatus == structs.AllocClientStatusUnknown && + alloc.DesiredStatus == structs.AllocDesiredStatusRun { result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // note: the node can be both tainted and nil - node, nodeIsTainted := taintedNodes[exist.NodeID] + node, nodeIsTainted := taintedNodes[alloc.NodeID] // Filter allocs on a node that is now re-connected to reconnecting. if supportsDisconnectedClients && @@ -149,8 +312,8 @@ func diffSystemAllocsForNode( // Record the new ClientStatus to indicate to future evals that the // alloc has already reconnected. - reconnecting := exist.Copy() - reconnecting.AppendState(structs.AllocStateFieldClientStatus, exist.ClientStatus) + reconnecting := alloc.Copy() + reconnecting.AppendState(structs.AllocStateFieldClientStatus, alloc.ClientStatus) result.Reconnecting = append(result.Reconnecting, AllocTuple{ Name: name, TaskGroup: tg, @@ -167,7 +330,7 @@ func diffSystemAllocsForNode( // lost as the work was already successfully finished. However for // service/system jobs, tasks should never complete. The check of // batch type, defends against client bugs. - if exist.Job.Type == structs.JobTypeSysBatch && exist.RanSuccessfully() { + if alloc.Job.Type == structs.JobTypeSysBatch && alloc.RanSuccessfully() { goto IGNORE } @@ -175,9 +338,9 @@ func diffSystemAllocsForNode( if node != nil && supportsDisconnectedClients && node.Status == structs.NodeStatusDisconnected && - exist.ClientStatus == structs.AllocClientStatusRunning { + alloc.ClientStatus == structs.AllocClientStatusRunning { - disconnect := exist.Copy() + disconnect := alloc.Copy() disconnect.ClientStatus = structs.AllocClientStatusUnknown disconnect.AppendState(structs.AllocStateFieldClientStatus, structs.AllocClientStatusUnknown) disconnect.ClientDescription = sstructs.StatusAllocUnknown @@ -193,7 +356,7 @@ func diffSystemAllocsForNode( result.Lost = append(result.Lost, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) } else { goto IGNORE @@ -214,33 +377,72 @@ func diffSystemAllocsForNode( result.Stop = append(result.Stop, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) continue } // If the definition is updated we need to update - if job.JobModifyIndex != exist.Job.JobModifyIndex { - result.Update = append(result.Update, AllocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) + if job.JobModifyIndex != alloc.Job.JobModifyIndex { + if canariesPerTG[tg.Name] > 0 && dstate != nil && !dstate.Promoted { + isCanarying[tg.Name] = true + if canaryNode[tg.Name] { + result.Update = append(result.Update, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + Canary: true, + }) + desiredCanaries[tg.Name] += 1 + } + } else { + result.Update = append(result.Update, AllocTuple{ + Name: name, + TaskGroup: tg, + Alloc: alloc, + }) + } continue } // Everything is up-to-date IGNORE: + nr.compatHasSameVersionAllocs = true result.Ignore = append(result.Ignore, AllocTuple{ Name: name, TaskGroup: tg, - Alloc: exist, + Alloc: alloc, }) } + // as we iterate over require groups, we'll keep track of whether the deployment + // is complete or not + deploymentComplete := false + // Scan the required groups for name, tg := range required { + // populate deployment state for this task group + var dstate = new(structs.DeploymentState) + var existingDeployment bool + if nr.DeploymentCurrent != nil { + dstate, existingDeployment = nr.DeploymentCurrent.TaskGroups[tg.Name] + } + + if !existingDeployment { + dstate = &structs.DeploymentState{} + if !tg.Update.IsEmpty() { + dstate.AutoRevert = tg.Update.AutoRevert + dstate.AutoPromote = tg.Update.AutoPromote + dstate.ProgressDeadline = tg.Update.ProgressDeadline + } + } + + dstate.DesiredTotal = len(eligibleNodes) + if isCanarying[tg.Name] && !dstate.Promoted { + dstate.DesiredCanaries = canariesPerTG[tg.Name] + } + // Check for an existing allocation if _, ok := existing[name]; !ok { @@ -297,8 +499,139 @@ func diffSystemAllocsForNode( result.Place = append(result.Place, allocTuple) } + + // check if deployment is place ready or complete + deploymentPlaceReady := !deploymentPaused && !deploymentFailed + deploymentComplete = nr.isDeploymentComplete(tg.Name, result, isCanarying[tg.Name]) + + // in this case there's nothing to do + if existingDeployment || tg.Update.IsEmpty() || (dstate.DesiredTotal == 0 && dstate.DesiredCanaries == 0) || !deploymentPlaceReady { + continue + } + + maxParallel := 1 + if !tg.Update.IsEmpty() { + maxParallel = tg.Update.MaxParallel + } + + // maxParallel of 0 means no deployments + if maxParallel != 0 { + nr.createDeployment(job, tg, dstate, len(result.Update), liveAllocs) + } } - return result + + return result, deploymentComplete +} + +func (nr *NodeReconciler) createDeployment(job *structs.Job, tg *structs.TaskGroup, + dstate *structs.DeploymentState, updates int, allocs []*structs.Allocation) { + + // programming error + if dstate == nil { + return + } + + updatingSpec := updates != 0 + + hadRunning := false + for _, alloc := range allocs { + if alloc.Job.ID == job.ID && alloc.Job.Version == job.Version && alloc.Job.CreateIndex == job.CreateIndex { + nr.compatHasSameVersionAllocs = true + hadRunning = true + } + } + + // Don't create a deployment if it's not the first time running the job + // and there are no updates to the spec. + if hadRunning && !updatingSpec { + return + } + + // A previous group may have made the deployment already. If not create one. + if nr.DeploymentCurrent == nil { + nr.DeploymentCurrent = structs.NewDeployment(job, job.Priority, time.Now().UnixNano()) + nr.DeploymentUpdates = append( + nr.DeploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusRunning, + StatusDescription: structs.DeploymentStatusDescriptionRunning, + }) + } + + // Attach the groups deployment state to the deployment + if nr.DeploymentCurrent.TaskGroups == nil { + nr.DeploymentCurrent.TaskGroups = make(map[string]*structs.DeploymentState) + } + + nr.DeploymentCurrent.TaskGroups[tg.Name] = dstate +} + +func (nr *NodeReconciler) isDeploymentComplete(groupName string, buckets *NodeReconcileResult, isCanarying bool) bool { + complete := len(buckets.Place)+len(buckets.Migrate)+len(buckets.Update) == 0 + + if !complete || nr.DeploymentCurrent == nil || isCanarying { + return false + } + + // ensure everything is healthy + if dstate, ok := nr.DeploymentCurrent.TaskGroups[groupName]; ok { + if dstate.HealthyAllocs < dstate.DesiredTotal { // Make sure we have enough healthy allocs + complete = false + } + } + + return complete +} + +func (nr *NodeReconciler) setDeploymentStatusAndUpdates(deploymentComplete bool, job *structs.Job) []*structs.DeploymentStatusUpdate { + statusUpdates := []*structs.DeploymentStatusUpdate{} + + if nr.DeploymentCurrent != nil { + + // Mark the deployment as complete if possible + if deploymentComplete { + if job.IsMultiregion() { + // the unblocking/successful states come after blocked, so we + // need to make sure we don't revert those states + if nr.DeploymentCurrent.Status != structs.DeploymentStatusUnblocking && + nr.DeploymentCurrent.Status != structs.DeploymentStatusSuccessful { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusBlocked, + StatusDescription: structs.DeploymentStatusDescriptionBlocked, + }) + } + } else { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + }) + } + } + + // Mark the deployment as pending since its state is now computed. + if nr.DeploymentCurrent.Status == structs.DeploymentStatusInitializing { + statusUpdates = append(statusUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: nr.DeploymentCurrent.ID, + Status: structs.DeploymentStatusPending, + StatusDescription: structs.DeploymentStatusDescriptionPendingForPeer, + }) + } + } + + // Set the description of a created deployment + if d := nr.DeploymentCurrent; d != nil { + if d.RequiresPromotion() { + if d.HasAutoPromote() { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningAutoPromotion + } else { + d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion + } + } + } + + return statusUpdates } // materializeSystemTaskGroups is used to materialize all the task groups @@ -323,6 +656,7 @@ type AllocTuple struct { Name string TaskGroup *structs.TaskGroup Alloc *structs.Allocation + Canary bool } // NodeReconcileResult is used to return the sets that result from the diff diff --git a/scheduler/reconciler/reconcile_node_prop_test.go b/scheduler/reconciler/reconcile_node_prop_test.go index 23d91a504..ca94e17db 100644 --- a/scheduler/reconciler/reconcile_node_prop_test.go +++ b/scheduler/reconciler/reconcile_node_prop_test.go @@ -39,18 +39,16 @@ func TestNodeReconciler_PropTest(t *testing.T) { // NodeReconcileResults doesn't split results by task group, so split // them up so we can check them separately recordResult := func(subresult []AllocTuple, label string) { - if subresult != nil { - for _, alloc := range subresult { - var tgName string - if alloc.TaskGroup != nil { - tgName = alloc.TaskGroup.Name - } else if alloc.Alloc != nil { - tgName = alloc.Alloc.TaskGroup - } else { - t.Fatal("one of task group or alloc must always be non-nil") - } - perTaskGroup[tgName][label]++ + for _, alloc := range subresult { + var tgName string + if alloc.TaskGroup != nil { + tgName = alloc.TaskGroup.Name + } else if alloc.Alloc != nil { + tgName = alloc.Alloc.TaskGroup + } else { + t.Fatal("one of task group or alloc must always be non-nil") } + perTaskGroup[tgName][label]++ } } @@ -101,8 +99,10 @@ func TestNodeReconciler_PropTest(t *testing.T) { t.Run("system jobs", rapid.MakeCheck(func(t *rapid.T) { nr := genNodeReconciler(structs.JobTypeSystem, &idGenerator{}).Draw(t, "input") - results := Node(nr.job, nr.readyNodes, nr.notReadyNodes, - nr.taintedNodes, nr.allocs, nr.terminal, nr.serverSupportsDisconnectedClients) + n := NewNodeReconciler(nr.deployment) + results := n.Compute(nr.job, nr.readyNodes, + nr.notReadyNodes, nr.taintedNodes, nr.allocs, nr.terminal, + nr.serverSupportsDisconnectedClients) must.NotNil(t, results, must.Sprint("results should never be nil")) perTaskGroup := collectExpectedAndResults(nr, results) @@ -111,8 +111,10 @@ func TestNodeReconciler_PropTest(t *testing.T) { t.Run("sysbatch jobs", rapid.MakeCheck(func(t *rapid.T) { nr := genNodeReconciler(structs.JobTypeSysBatch, &idGenerator{}).Draw(t, "input") - results := Node(nr.job, nr.readyNodes, nr.notReadyNodes, - nr.taintedNodes, nr.allocs, nr.terminal, nr.serverSupportsDisconnectedClients) + n := NewNodeReconciler(nr.deployment) + results := n.Compute(nr.job, nr.readyNodes, + nr.notReadyNodes, nr.taintedNodes, nr.allocs, nr.terminal, + nr.serverSupportsDisconnectedClients) must.NotNil(t, results, must.Sprint("results should never be nil")) perTaskGroup := collectExpectedAndResults(nr, results) @@ -128,6 +130,7 @@ type nodeReconcilerInput struct { taintedNodes map[string]*structs.Node allocs []*structs.Allocation terminal structs.TerminalByNodeByName + deployment *structs.Deployment serverSupportsDisconnectedClients bool } @@ -190,12 +193,15 @@ func genNodeReconciler(jobType string, idg *idGenerator) *rapid.Generator[*nodeR } } + deployment := genDeployment(idg, job, live).Draw(t, "deployment") + return &nodeReconcilerInput{ job: job, readyNodes: readyNodes, notReadyNodes: notReadyNodes, taintedNodes: taintedNodes, allocs: live, + deployment: deployment, serverSupportsDisconnectedClients: rapid.Bool().Draw(t, "supports_disconnected"), } }) diff --git a/scheduler/reconciler/reconcile_node_test.go b/scheduler/reconciler/reconcile_node_test.go index cda2654a4..87e822dca 100644 --- a/scheduler/reconciler/reconcile_node_test.go +++ b/scheduler/reconciler/reconcile_node_test.go @@ -72,7 +72,8 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, place: 1}, diff) if len(diff.Ignore) > 0 { @@ -94,7 +95,8 @@ func TestDiffSystemAllocsForNode_Sysbatch_terminal(t *testing.T) { }, } - diff := diffSystemAllocsForNode(job, "node1", eligible, nil, tainted, required, live, terminal, true) + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode(job, "node1", eligible, nil, tainted, nil, nil, required, live, terminal, true) assertDiffCount(t, diffResultCount{update: 1, place: 1}, diff) }) @@ -155,9 +157,10 @@ func TestDiffSystemAllocsForNode_Placements(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode( job, tc.nodeID, eligible, nil, - tainted, required, allocsForNode, terminal, true) + tainted, nil, nil, required, allocsForNode, terminal, true) assertDiffCount(t, tc.expected, diff) }) @@ -213,8 +216,9 @@ func TestDiffSystemAllocsForNode_Stops(t *testing.T) { tainted := map[string]*structs.Node{} terminal := structs.TerminalByNodeByName{} - diff := diffSystemAllocsForNode( - job, node.ID, eligible, nil, tainted, required, allocs, terminal, true) + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode( + job, node.ID, eligible, nil, tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{ignore: 1, stop: 1, update: 1}, diff) if len(diff.Update) > 0 { @@ -282,8 +286,9 @@ func TestDiffSystemAllocsForNode_IneligibleNode(t *testing.T) { Job: job, } - diff := diffSystemAllocsForNode( - job, tc.nodeID, eligible, ineligible, tainted, + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode( + job, tc.nodeID, eligible, ineligible, tainted, nil, nil, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, diff) @@ -338,9 +343,10 @@ func TestDiffSystemAllocsForNode_DrainingNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode( job, drainNode.ID, map[string]*structs.Node{}, nil, - tainted, required, allocs, terminal, true) + tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{migrate: 1, ignore: 1}, diff) if len(diff.Migrate) > 0 { @@ -389,9 +395,10 @@ func TestDiffSystemAllocsForNode_LostNode(t *testing.T) { }, } - diff := diffSystemAllocsForNode( + nr := NewNodeReconciler(nil) + diff, _ := nr.computeForNode( job, deadNode.ID, map[string]*structs.Node{}, nil, - tainted, required, allocs, terminal, true) + tainted, nil, nil, required, allocs, terminal, true) assertDiffCount(t, diffResultCount{lost: 2}, diff) if len(diff.Migrate) > 0 { @@ -514,8 +521,9 @@ func TestDiffSystemAllocsForNode_DisconnectedNode(t *testing.T) { tc.allocFn(alloc) } - got := diffSystemAllocsForNode( - job, tc.node.ID, eligibleNodes, nil, taintedNodes, + nr := NewNodeReconciler(nil) + got, _ := nr.computeForNode( + job, tc.node.ID, eligibleNodes, nil, taintedNodes, nil, nil, required, []*structs.Allocation{alloc}, terminal, true, ) assertDiffCount(t, tc.expect, got) @@ -602,8 +610,8 @@ func TestDiffSystemAllocs(t *testing.T) { }, } - diff := Node(job, nodes, nil, tainted, allocs, terminal, true) - + nr := NewNodeReconciler(nil) + diff := nr.Compute(job, nodes, nil, tainted, allocs, terminal, true) assertDiffCount(t, diffResultCount{ update: 1, ignore: 1, migrate: 1, lost: 1, place: 6}, diff) @@ -632,3 +640,548 @@ func TestDiffSystemAllocs(t *testing.T) { } } } + +// TestNodeDeployments tests various deployment-related scenarios for the node +// reconciler +func TestNodeDeployments(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + tg := job.TaskGroups[0].Copy() + tg.Name = "other" + tg.Update = structs.DefaultUpdateStrategy + job.TaskGroups = append(job.TaskGroups, tg) + + // Create two alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}} + + // Stopped job to make sure we handle these correctly + stoppedJob := job.Copy() + stoppedJob.Stop = true + + allocs := []*structs.Allocation{} + for _, n := range nodes { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + + allocs = append(allocs, a) + } + + newJobWithNoAllocs := job.Copy() + newJobWithNoAllocs.Name = "new-job" + newJobWithNoAllocs.Version = 100 + newJobWithNoAllocs.CreateIndex = 1000 + + testCases := []struct { + name string + job *structs.Job + existingDeployment *structs.Deployment + newDeployment bool + expectedNewDeploymentStatus string + expectedDeploymenStatusUpdateContains string + }{ + { + "existing successful deployment for the current job version should not return a deployment", + job, + &structs.Deployment{ + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusSuccessful, + }, + false, + "", + "", + }, + { + "existing running deployment should remain untouched", + job, + &structs.Deployment{ + JobID: job.ID, + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusRunning, + StatusDescription: structs.DeploymentStatusDescriptionRunning, + TaskGroups: map[string]*structs.DeploymentState{ + job.TaskGroups[0].Name: { + AutoRevert: true, + ProgressDeadline: time.Minute, + }, + tg.Name: { + AutoPromote: true, + }, + }, + }, + false, + structs.DeploymentStatusSuccessful, + "", + }, + { + "existing running deployment for a stopped job should be cancelled", + stoppedJob, + &structs.Deployment{ + JobCreateIndex: job.CreateIndex, + JobVersion: job.Version, + Status: structs.DeploymentStatusRunning, + }, + false, + structs.DeploymentStatusCancelled, + structs.DeploymentStatusDescriptionStoppedJob, + }, + { + "no existing deployment for a new job that needs one should result in a new deployment", + newJobWithNoAllocs, + nil, + true, + structs.DeploymentStatusRunning, + structs.DeploymentStatusDescriptionRunning, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nr := NewNodeReconciler(tc.existingDeployment) + nr.Compute(tc.job, nodes, nil, nil, allocs, nil, true) + if tc.newDeployment { + must.NotNil(t, nr.DeploymentCurrent, must.Sprintf("expected a non-nil deployment")) + must.Eq(t, nr.DeploymentCurrent.Status, tc.expectedNewDeploymentStatus) + } + if tc.expectedDeploymenStatusUpdateContains != "" { + must.SliceContainsFunc(t, nr.DeploymentUpdates, tc.expectedDeploymenStatusUpdateContains, + func(a *structs.DeploymentStatusUpdate, status string) bool { + return a.StatusDescription == status + }, + ) + } + }) + } +} + +func Test_computeCanaryNodes(t *testing.T) { + ci.Parallel(t) + + // generate an odd number of nodes + fiveEligibleNodes := map[string]*structs.Node{} + // name them so we can refer to their names while testing pre-existing + // canary allocs + fiveEligibleNodeNames := []string{"node1", "node2", "node3", "node4", "node5"} + for _, name := range fiveEligibleNodeNames { + node := mock.Node() + node.ID = name + fiveEligibleNodes[name] = node + } + + // generate an even number of nodes + fourEligibleNodes := map[string]*structs.Node{} + for range 4 { + nodeID := uuid.Generate() + node := mock.Node() + node.ID = nodeID + fourEligibleNodes[nodeID] = node + } + + testCases := []struct { + name string + nodes map[string]*structs.Node + liveAllocs map[string][]*structs.Allocation + terminalAllocs structs.TerminalByNodeByName + required map[string]*structs.TaskGroup + existingDeployment *structs.Deployment + expectedCanaryNodes map[string]int // number of nodes per tg + expectedCanaryNodeID map[string]string // sometimes we want to make sure a particular node ID is a canary + }{ + { + name: "no required task groups", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: nil, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{}, + expectedCanaryNodeID: nil, + }, + { + name: "one task group with no update strategy", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + }}, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{}, + expectedCanaryNodeID: nil, + }, + { + name: "one task group with 33% canary deployment", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 33, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{ + "foo": 2, // we always round up + }, + expectedCanaryNodeID: nil, + }, + { + name: "one task group with 100% canary deployment, four nodes", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 100, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{ + "foo": 4, + }, + expectedCanaryNodeID: nil, + }, + { + name: "one task group with 50% canary deployment, even nodes", + nodes: fourEligibleNodes, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 50, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{ + "foo": 2, + }, + expectedCanaryNodeID: nil, + }, + { + name: "two task groups: one with 50% canary deploy, second one with 2% canary deploy, pre-existing canary alloc", + nodes: fiveEligibleNodes, + liveAllocs: map[string][]*structs.Allocation{ + "foo": {mock.Alloc()}, // should be disregarded since it's not one of our nodes + fiveEligibleNodeNames[0]: { + {DeploymentStatus: nil}, + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: false}}, + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "foo"}, + }, + fiveEligibleNodeNames[1]: { + {DeploymentStatus: &structs.AllocDeploymentStatus{Canary: true}, TaskGroup: "bar"}, + }, + }, + terminalAllocs: structs.TerminalByNodeByName{ + fiveEligibleNodeNames[2]: map[string]*structs.Allocation{ + "foo": { + DeploymentStatus: &structs.AllocDeploymentStatus{ + Canary: true, + }, + TaskGroup: "foo", + }, + }, + }, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 50, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + "bar": { + Name: "bar", + Update: &structs.UpdateStrategy{ + Canary: 2, + MaxParallel: 1, // otherwise the update strategy will be considered nil + }, + }, + }, + existingDeployment: structs.NewDeployment(mock.SystemJob(), 100, time.Now().Unix()), + expectedCanaryNodes: map[string]int{ + "foo": 3, // we always round up + "bar": 1, // we always round up + }, + expectedCanaryNodeID: map[string]string{ + fiveEligibleNodeNames[0]: "foo", + fiveEligibleNodeNames[1]: "bar", + fiveEligibleNodeNames[2]: "foo", + }, + }, + { + name: "task group with 100% canary deploy, 1 eligible node", + nodes: map[string]*structs.Node{"foo": mock.Node()}, + liveAllocs: nil, + terminalAllocs: nil, + required: map[string]*structs.TaskGroup{ + "foo": { + Name: "foo", + Update: &structs.UpdateStrategy{ + Canary: 100, + MaxParallel: 1, + }, + }, + }, + existingDeployment: nil, + expectedCanaryNodes: map[string]int{ + "foo": 1, + }, + expectedCanaryNodeID: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + nr := NewNodeReconciler(tc.existingDeployment) + canaryNodes, canariesPerTG := nr.computeCanaryNodes(tc.required, tc.liveAllocs, tc.terminalAllocs, tc.nodes) + must.Eq(t, tc.expectedCanaryNodes, canariesPerTG) + if tc.liveAllocs != nil { + for nodeID, tgName := range tc.expectedCanaryNodeID { + must.True(t, canaryNodes[nodeID][tgName]) + } + } + }) + } +} + +// Tests the reconciler creates new canaries when the job changes +func TestNodeReconciler_NewCanaries(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + Canary: 20, // deploy to 20% of eligible nodes + MaxParallel: 1, // otherwise the update strategy will be considered nil + } + job.JobModifyIndex = 1 + + // Create 10 nodes + nodes := []*structs.Node{} + for i := range 10 { + node := mock.Node() + node.ID = fmt.Sprintf("node_%d", i) + node.Name = fmt.Sprintf("node_%d", i) + nodes = append(nodes, node) + } + + allocs := []*structs.Allocation{} + for _, n := range nodes { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + + allocs = append(allocs, a) + } + + // bump the job version up + newJobVersion := job.Copy() + newJobVersion.Version = job.Version + 1 + newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 + + // bump the version and add a new TG + newJobWithNewTaskGroup := newJobVersion.Copy() + newJobWithNewTaskGroup.Version = newJobVersion.Version + 1 + newJobWithNewTaskGroup.JobModifyIndex = newJobVersion.JobModifyIndex + 1 + tg := newJobVersion.TaskGroups[0].Copy() + tg.Name = "other" + tg.Update = &structs.UpdateStrategy{MaxParallel: 1} + newJobWithNewTaskGroup.TaskGroups = append(newJobWithNewTaskGroup.TaskGroups, tg) + + // new job with no previous allocs and no canary update strategy + jobWithNoUpdates := mock.SystemJob() + jobWithNoUpdates.Name = "i-am-a-brand-new-job" + jobWithNoUpdates.TaskGroups[0].Name = "i-am-a-brand-new-tg" + jobWithNoUpdates.TaskGroups[0].Update = structs.DefaultUpdateStrategy + + // additional test to make sure there are no canaries being placed for v0 + // jobs + freshJob := mock.SystemJob() + freshJob.TaskGroups[0].Update = structs.DefaultUpdateStrategy + freshNodes := []*structs.Node{} + for range 2 { + node := mock.Node() + freshNodes = append(freshNodes, node) + } + + testCases := []struct { + name string + job *structs.Job + nodes []*structs.Node + existingDeployment *structs.Deployment + expectedDesiredCanaries map[string]int + expectedDesiredTotal map[string]int + expectedDeploymentStatusDescription string + expectedPlaceCount int + expectedUpdateCount int + }{ + { + name: "new job version", + job: newJobVersion, + nodes: nodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{newJobVersion.TaskGroups[0].Name: 2}, + expectedDesiredTotal: map[string]int{newJobVersion.TaskGroups[0].Name: 10}, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + expectedPlaceCount: 0, + expectedUpdateCount: 2, + }, + { + name: "new job version with a new TG (no existing allocs, no canaries)", + job: newJobWithNewTaskGroup, + nodes: nodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{ + newJobWithNewTaskGroup.TaskGroups[0].Name: 2, + newJobWithNewTaskGroup.TaskGroups[1].Name: 0, + }, + expectedDesiredTotal: map[string]int{ + newJobWithNewTaskGroup.TaskGroups[0].Name: 10, + newJobWithNewTaskGroup.TaskGroups[1].Name: 10, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunningNeedsPromotion, + expectedPlaceCount: 10, + expectedUpdateCount: 2, + }, + { + name: "brand new job with no update block", + job: jobWithNoUpdates, + nodes: nodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{ + jobWithNoUpdates.TaskGroups[0].Name: 0, + }, + expectedDesiredTotal: map[string]int{ + jobWithNoUpdates.TaskGroups[0].Name: 10, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, + expectedPlaceCount: 10, + expectedUpdateCount: 0, + }, + { + name: "fresh job with no updates, empty nodes", + job: freshJob, + nodes: freshNodes, + existingDeployment: nil, + expectedDesiredCanaries: map[string]int{ + freshJob.TaskGroups[0].Name: 0, + }, + expectedDesiredTotal: map[string]int{ + freshJob.TaskGroups[0].Name: 2, + }, + expectedDeploymentStatusDescription: structs.DeploymentStatusDescriptionRunning, + expectedPlaceCount: 2, + expectedUpdateCount: 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + reconciler := NewNodeReconciler(tc.existingDeployment) + r := reconciler.Compute(tc.job, tc.nodes, nil, nil, allocs, nil, false) + must.NotNil(t, reconciler.DeploymentCurrent) + must.Eq(t, tc.expectedPlaceCount, len(r.Place), must.Sprint("incorrect amount of r.Place")) + must.Eq(t, tc.expectedUpdateCount, len(r.Update), must.Sprint("incorrect amount of r.Update")) + must.Eq(t, tc.expectedDeploymentStatusDescription, reconciler.DeploymentCurrent.StatusDescription) + for _, tg := range tc.job.TaskGroups { + must.Eq(t, tc.expectedDesiredCanaries[tg.Name], + reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredCanaries, + must.Sprintf("incorrect number of DesiredCanaries for %s", tg.Name)) + must.Eq(t, tc.expectedDesiredTotal[tg.Name], + reconciler.DeploymentCurrent.TaskGroups[tg.Name].DesiredTotal, + must.Sprintf("incorrect number of DesiredTotal for %s", tg.Name)) + } + }) + } +} + +// Tests the reconciler correctly promotes canaries +func TestNodeReconciler_CanaryPromotion(t *testing.T) { + ci.Parallel(t) + + job := mock.SystemJob() + job.TaskGroups[0].Update = &structs.UpdateStrategy{ + Canary: 20, // deploy to 20% of eligible nodes + MaxParallel: 1, // otherwise the update strategy will be considered nil + } + job.JobModifyIndex = 1 + + // bump the job version up + newJobVersion := job.Copy() + newJobVersion.Version = job.Version + 1 + newJobVersion.JobModifyIndex = job.JobModifyIndex + 1 + + // Create 5 nodes + nodes := []*structs.Node{} + for i := range 5 { + node := mock.Node() + node.ID = fmt.Sprintf("node_%d", i) + node.Name = fmt.Sprintf("node_%d", i) + nodes = append(nodes, node) + } + + // Create v0 allocs on 2 of the nodes, and v1 (canary) allocs on 3 nodes + allocs := []*structs.Allocation{} + for _, n := range nodes[0:3] { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + + allocs = append(allocs, a) + } + for _, n := range nodes[3:] { + a := mock.Alloc() + a.Job = job + a.Name = "my-job.web[0]" + a.NodeID = n.ID + a.NodeName = n.Name + a.TaskGroup = job.TaskGroups[0].Name + a.DeploymentStatus = &structs.AllocDeploymentStatus{Canary: true} + a.Job.Version = newJobVersion.Version + a.Job.JobModifyIndex = newJobVersion.JobModifyIndex + + allocs = append(allocs, a) + } + + // promote canaries + deployment := structs.NewDeployment(newJobVersion, 10, time.Now().Unix()) + deployment.TaskGroups[newJobVersion.TaskGroups[0].Name] = &structs.DeploymentState{ + Promoted: true, + HealthyAllocs: 5, + DesiredTotal: 5, + DesiredCanaries: 0, + } + + // reconcile + reconciler := NewNodeReconciler(deployment) + reconciler.Compute(newJobVersion, nodes, nil, nil, allocs, nil, false) + + must.NotNil(t, reconciler.DeploymentCurrent) + must.Eq(t, 5, reconciler.DeploymentCurrent.TaskGroups[newJobVersion.TaskGroups[0].Name].DesiredTotal) + must.SliceContainsFunc(t, reconciler.DeploymentUpdates, structs.DeploymentStatusSuccessful, + func(a *structs.DeploymentStatusUpdate, b string) bool { return a.Status == b }, + ) +} diff --git a/scheduler/scheduler_sysbatch_test.go b/scheduler/scheduler_sysbatch_test.go index d6d69c400..ed386fb9f 100644 --- a/scheduler/scheduler_sysbatch_test.go +++ b/scheduler/scheduler_sysbatch_test.go @@ -91,6 +91,11 @@ func TestSysBatch_JobRegister(t *testing.T) { must.Eq(t, 0, queued, must.Sprint("unexpected queued allocations")) h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // sysbatch jobs never create a deployment + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, 0, deployments) } func TestSysBatch_JobRegister_AddNode_Running(t *testing.T) { @@ -1839,9 +1844,10 @@ func TestSysBatch_canHandle(t *testing.T) { must.True(t, s.canHandle(structs.EvalTriggerPeriodicJob)) }) } + func createNodes(t *testing.T, h *tests.Harness, n int) []*structs.Node { nodes := make([]*structs.Node, n) - for i := 0; i < n; i++ { + for i := range n { node := mock.Node() nodes[i] = node must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go index 3fd9cfefa..27a7700b9 100644 --- a/scheduler/scheduler_system.go +++ b/scheduler/scheduler_system.go @@ -49,6 +49,8 @@ type SystemScheduler struct { notReadyNodes map[string]struct{} nodesByDC map[string]int + deployment *structs.Deployment + limitReached bool nextEval *structs.Evaluation @@ -100,7 +102,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, structs.EvalStatusFailed, desc, - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } limit := maxSystemScheduleAttempts @@ -114,7 +116,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { if statusErr, ok := err.(*SetStatusError); ok { return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, statusErr.EvalStatus, err.Error(), - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } return err } @@ -122,7 +124,7 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) (err error) { // Update the status to complete return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, s.failedTGAllocs, s.planAnnotations, structs.EvalStatusComplete, "", - s.queuedAllocs, "") + s.queuedAllocs, s.deployment.GetID()) } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -151,6 +153,16 @@ func (s *SystemScheduler) process() (bool, error) { } } + if !s.sysbatch { + s.deployment, err = s.state.LatestDeploymentByJobID(ws, s.eval.Namespace, s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get deployment for job %q: %w", s.eval.JobID, err) + } + // system deployments may be mutated in the reconciler because the node + // count can change between evaluations + s.deployment = s.deployment.Copy() + } + // Create a plan s.plan = s.eval.MakePlan(s.job) @@ -181,7 +193,7 @@ func (s *SystemScheduler) process() (bool, error) { // If the limit of placements was reached we need to create an evaluation // to pickup from here after the stagger period. if s.limitReached && s.nextEval == nil { - s.nextEval = s.eval.NextRollingEval(s.job.Update.Stagger) + s.nextEval = s.eval.NextRollingEval(s.job.Update.MinHealthyTime) if err := s.planner.CreateEval(s.nextEval); err != nil { s.logger.Error("failed to make next eval for rolling update", "error", err) return false, err @@ -265,12 +277,22 @@ func (s *SystemScheduler) computeJobAllocs() error { live, term := structs.SplitTerminalAllocs(allocs) // Diff the required and existing allocations - r := reconciler.Node(s.job, s.nodes, s.notReadyNodes, tainted, live, term, + nr := reconciler.NewNodeReconciler(s.deployment) + r := nr.Compute(s.job, s.nodes, s.notReadyNodes, tainted, live, term, s.planner.ServersMeetMinimumVersion(minVersionMaxClientDisconnect, true)) if s.logger.IsDebug() { s.logger.Debug("reconciled current state with desired state", r.Fields()...) } + // Add the deployment changes to the plan + s.plan.Deployment = nr.DeploymentCurrent + s.plan.DeploymentUpdates = nr.DeploymentUpdates + + // Update the stored deployment + if nr.DeploymentCurrent != nil { + s.deployment = nr.DeploymentCurrent + } + // Add all the allocs to stop for _, e := range r.Stop { s.plan.AppendStoppedAlloc(e.Alloc, sstructs.StatusAllocNotNeeded, "", "") @@ -309,14 +331,9 @@ func (s *SystemScheduler) computeJobAllocs() error { DesiredTGUpdates: desiredUpdates(r, inplaceUpdates, destructiveUpdates), } - // Check if a rolling upgrade strategy is being used - limit := len(r.Update) - if !s.job.Stopped() && s.job.Update.Rolling() { - limit = s.job.Update.MaxParallel - } - - // Treat non in-place updates as an eviction and new placement. - s.limitReached = evictAndPlace(s.ctx, r, r.Update, sstructs.StatusAllocUpdating, &limit) + // Treat non in-place updates as an eviction and new placement, which will + // be limited by max_parallel + s.limitReached = evictAndPlace(s.ctx, s.job, r, sstructs.StatusAllocUpdating) // Nothing remaining to do if placement is not required if len(r.Place) == 0 { @@ -375,6 +392,11 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist // track node filtering, to only report an error if all nodes have been filtered var filteredMetrics map[string]*structs.AllocMetric + var deploymentID string + if s.deployment != nil && s.deployment.Active() { + deploymentID = s.deployment.ID + } + nodes := make([]*structs.Node, 1) for _, missing := range place { tgName := missing.TaskGroup.Name @@ -492,6 +514,7 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, NodeName: option.Node.Name, + DeploymentID: deploymentID, TaskResources: resources.OldTaskResources(), AllocatedResources: resources, DesiredStatus: structs.AllocDesiredStatusRun, @@ -510,6 +533,14 @@ func (s *SystemScheduler) computePlacements(place []reconciler.AllocTuple, exist alloc.PreviousAllocation = missing.Alloc.ID } + // If we are placing a canary, add the canary to the deployment state + // object and mark it as a canary. + if missing.Canary && s.deployment != nil { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Canary: true, + } + } + // If this placement involves preemption, set DesiredState to evict for those allocations if option.PreemptedAllocs != nil { var preemptedAllocIDs []string @@ -579,19 +610,36 @@ func (s *SystemScheduler) canHandle(trigger string) bool { } // evictAndPlace is used to mark allocations for evicts and add them to the -// placement queue. evictAndPlace modifies both the diffResult and the -// limit. It returns true if the limit has been reached. -func evictAndPlace(ctx feasible.Context, diff *reconciler.NodeReconcileResult, allocs []reconciler.AllocTuple, desc string, limit *int) bool { - n := len(allocs) - for i := 0; i < n && i < *limit; i++ { - a := allocs[i] - ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") - diff.Place = append(diff.Place, a) +// placement queue. evictAndPlace modifies the diffResult. It returns true if +// the limit has been reached for any task group. +func evictAndPlace(ctx feasible.Context, job *structs.Job, diff *reconciler.NodeReconcileResult, desc string) bool { + + limits := map[string]int{} // per task group limits + if !job.Stopped() { + jobLimit := len(diff.Update) + if job.Update.MaxParallel > 0 { + jobLimit = job.Update.MaxParallel + } + for _, tg := range job.TaskGroups { + if tg.Update != nil && tg.Update.MaxParallel > 0 { + limits[tg.Name] = tg.Update.MaxParallel + } else { + limits[tg.Name] = jobLimit + } + } } - if n <= *limit { - *limit -= n - return false + + limited := false + for _, a := range diff.Update { + if limit := limits[a.Alloc.TaskGroup]; limit > 0 { + ctx.Plan().AppendStoppedAlloc(a.Alloc, desc, "", "") + diff.Place = append(diff.Place, a) + if !a.Canary { + limits[a.Alloc.TaskGroup]-- + } + } else { + limited = true + } } - *limit = 0 - return true + return limited } diff --git a/scheduler/scheduler_system_test.go b/scheduler/scheduler_system_test.go index 757c6a4ea..8369ced42 100644 --- a/scheduler/scheduler_system_test.go +++ b/scheduler/scheduler_system_test.go @@ -5,6 +5,8 @@ package scheduler import ( "fmt" + "maps" + "slices" "sort" "testing" "time" @@ -19,6 +21,7 @@ import ( "github.com/hashicorp/nomad/scheduler/feasible" "github.com/hashicorp/nomad/scheduler/reconciler" "github.com/hashicorp/nomad/scheduler/tests" + "github.com/shoenig/test" "github.com/shoenig/test/must" ) @@ -3292,61 +3295,666 @@ func TestSystemSched_CSITopology(t *testing.T) { } -func TestEvictAndPlace_LimitLessThanAllocs(t *testing.T) { +func TestEvictAndPlace(t *testing.T) { ci.Parallel(t) - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - } - diff := &reconciler.NodeReconcileResult{} + testCases := []struct { + name string + allocsPerTG map[string]int + maxParallelPerTG map[string]int + jobMaxParallel int + + expectLimited bool + expectPlace int + }{ + { + name: "one group limit less than allocs", + allocsPerTG: map[string]int{"a": 4}, + jobMaxParallel: 2, + expectLimited: true, + expectPlace: 2, + }, + { + name: "one group limit equals allocs", + allocsPerTG: map[string]int{"a": 4}, + jobMaxParallel: 4, + expectLimited: false, + expectPlace: 4, + }, + { + name: "one group limit greater than allocs", + allocsPerTG: map[string]int{"a": 2}, + jobMaxParallel: 4, + expectLimited: false, + expectPlace: 2, + }, + { + name: "group limit supercedes job limit", + allocsPerTG: map[string]int{"a": 4}, + maxParallelPerTG: map[string]int{"a": 2}, + jobMaxParallel: 1, + expectLimited: true, + expectPlace: 2, + }, + { + name: "two groups limit less than allocs on one", + allocsPerTG: map[string]int{"a": 4, "b": 4}, + maxParallelPerTG: map[string]int{"a": 2, "b": 4}, + jobMaxParallel: 0, + expectLimited: true, + expectPlace: 6, + }, + { + name: "two groups neither limited", + allocsPerTG: map[string]int{"a": 2, "b": 2}, + maxParallelPerTG: map[string]int{"a": 4, "b": 4}, + jobMaxParallel: 0, + expectLimited: false, + expectPlace: 4, + }, + { + name: "two groups one uses job limit", + allocsPerTG: map[string]int{"a": 4, "b": 4}, + maxParallelPerTG: map[string]int{"a": 4}, + jobMaxParallel: 2, + expectLimited: true, + expectPlace: 6, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + job := &structs.Job{Update: structs.UpdateStrategy{ + MaxParallel: tc.jobMaxParallel, Stagger: time.Second}} + + allocs := []reconciler.AllocTuple{} + for tg, count := range tc.allocsPerTG { + job.TaskGroups = append(job.TaskGroups, &structs.TaskGroup{Name: tg}) + for range count { + allocs = append(allocs, reconciler.AllocTuple{ + Alloc: &structs.Allocation{ID: uuid.Generate(), TaskGroup: tg}}) + } + } + for tg, max := range tc.maxParallelPerTG { + for _, jtg := range job.TaskGroups { + if jtg.Name == tg { + jtg.Update = &structs.UpdateStrategy{ + MaxParallel: max, Stagger: time.Second} + } + } + } + diff := &reconciler.NodeReconcileResult{Update: allocs} + _, ctx := feasible.MockContext(t) + + must.Eq(t, tc.expectLimited, evictAndPlace(ctx, job, diff, ""), + must.Sprintf("limited")) + must.Len(t, tc.expectPlace, diff.Place, must.Sprintf( + "evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) + }) + } - limit := 2 - must.True(t, evictAndPlace(ctx, diff, allocs, "", &limit), - must.Sprintf("evictAndReplace() should have returned true")) - must.Zero(t, limit, - must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 2, diff.Place, - must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) } -func TestEvictAndPlace_LimitEqualToAllocs(t *testing.T) { +// TestSystemScheduler_UpdateBlock tests various permutations of the update block +func TestSystemScheduler_UpdateBlock(t *testing.T) { ci.Parallel(t) - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + collect := func(planned map[string][]*structs.Allocation) map[string]int { + if len(planned) == 0 { + return nil + } + counts := map[string]int{} + for _, node := range planned { + for _, alloc := range node { + counts[alloc.TaskGroup]++ + } + } + return counts } - diff := &reconciler.NodeReconcileResult{} - limit := 4 - must.False(t, evictAndPlace(ctx, diff, allocs, "", &limit), - must.Sprint("evictAndReplace() should have returned false")) - must.Zero(t, limit, must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 4, diff.Place, - must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) -} + assertDeploymentState := func(t *testing.T, expectDstates, gotDstates map[string]*structs.DeploymentState) { + t.Helper() + if expectDstates == nil { + return + } -func TestEvictAndPlace_LimitGreaterThanAllocs(t *testing.T) { - ci.Parallel(t) + must.SliceContainsAll(t, + slices.Collect(maps.Keys(expectDstates)), + slices.Collect(maps.Keys(gotDstates)), + must.Sprint("expected matching task groups in deployment state")) - _, ctx := feasible.MockContext(t) - allocs := []reconciler.AllocTuple{ - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, - {Alloc: &structs.Allocation{ID: uuid.Generate()}}, + for tg, expect := range expectDstates { + got := gotDstates[tg] + test.Eq(t, expect.DesiredCanaries, got.DesiredCanaries, + test.Sprintf("DesiredCanaries for %s", tg)) + test.Eq(t, expect.DesiredTotal, got.DesiredTotal, + test.Sprintf("DesiredTotal for %s", tg)) + test.Eq(t, expect.PlacedAllocs, got.PlacedAllocs, + test.Sprintf("PlacedAllocs for %s", tg)) + test.Eq(t, len(expect.PlacedCanaries), len(got.PlacedCanaries), + test.Sprintf("len(PlacedCanaries) for %s", tg)) + } + } + + tg1, tg2 := "tg1", "tg2" + + // note: all test cases assume that if there's an existing dstate that we're + // in the middle of the deployment, otherwise we're starting a new + // deployment (also noted in name of subtest) + testCases := []struct { + name string + tg1UpdateBlock *structs.UpdateStrategy + tg2UpdateBlock *structs.UpdateStrategy + existingCurrentDState map[string]*structs.DeploymentState + existingOldDState map[string]*structs.DeploymentState + + // these maps signify which nodes have allocs already, mapping + // group -> indexes in the `nodes` slice + existingPrevious map[string][]int // previous version of job + existingRunning map[string][]int // current version of job (running) + existingFailed map[string][]int // current verison of job (failed) + existingCanary map[string][]int // canaries (must match running or failed) + + expectAllocs map[string]int // plan NodeAllocations group -> count + expectStop map[string]int // plan NodeUpdates group -> count + expectDState map[string]*structs.DeploymentState + }{ + { + name: "legacy upgrade non-deployment", + expectAllocs: map[string]int{tg1: 10, tg2: 10}, + }, + + { + name: "new deployment max_parallel vs no update block", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 10}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2}, + }, + }, + + { + name: "max_parallel mid-deployment", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, HealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, HealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 4}, + tg2: {DesiredTotal: 10, PlacedAllocs: 6}, + }, + }, + + { + name: "legacy upgrade max_parallel mid-rollout", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + // expects no new deployment + expectAllocs: map[string]int{tg1: 2, tg2: 3}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + }, + + { + name: "max_parallel underprovisioned from failure", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7}, + tg2: {0, 1, 2, 3, 4, 5, 6}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + tg2: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 2, UnhealthyAllocs: 2}, + tg2: {DesiredTotal: 10, PlacedAllocs: 3, UnhealthyAllocs: 3}, + }, + expectAllocs: map[string]int{tg1: 4, tg2: 6}, // includes reschedules + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 6}, // includes reschedules + tg2: {DesiredTotal: 10, PlacedAllocs: 9}, + }, + }, + + { + name: "new deployment max_parallel with old underprovisioned", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 3, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4}, + tg2: {0, 1, 2}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 5}, + tg2: {DesiredTotal: 10, PlacedAllocs: 7}, + }, + // note: we immediately place on empty nodes + expectAllocs: map[string]int{tg1: 7, tg2: 10}, + expectStop: map[string]int{tg1: 2, tg2: 3}, + expectDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 7}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "new deployment with canaries", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingOldDState: map[string]*structs.DeploymentState{ + tg1: {DesiredTotal: 10, PlacedAllocs: 10}, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 3, tg2: 5}, + expectStop: map[string]int{tg1: 3, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"0", "1", "2"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5}, + }, + }, + + { + name: "canaries failed", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingFailed: map[string][]int{ + tg1: {8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 1, + UnhealthyAllocs: 2, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, // only 2 replacements + expectStop: map[string]int{tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries partial placement", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6, 8, 9}, + tg2: {0, 1, 2, 3, 4}, + }, + existingRunning: map[string][]int{ + tg1: {7}, + tg2: {5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 1, + HealthyAllocs: 1, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 5, HealthyAllocs: 5}, + }, + expectAllocs: map[string]int{tg1: 2, tg2: 5}, + expectStop: map[string]int{tg1: 2, tg2: 5}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries awaiting promotion", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: false, + }, + tg2UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 5, // no canaries here + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: false, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: nil, + expectStop: nil, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 3, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + + { + name: "canaries promoted", + tg1UpdateBlock: &structs.UpdateStrategy{ + MaxParallel: 2, + Canary: 30, + AutoPromote: true, + }, + existingPrevious: map[string][]int{ + tg1: {0, 1, 2, 3, 4, 5, 6}, + }, + existingRunning: map[string][]int{ + tg1: {7, 8, 9}, + tg2: {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + existingCanary: map[string][]int{ + tg1: {7, 8, 9}, + }, + existingCurrentDState: map[string]*structs.DeploymentState{ + tg1: { + Promoted: true, + PlacedCanaries: []string{"7", "8", "9"}, + DesiredCanaries: 3, + DesiredTotal: 10, + PlacedAllocs: 3, + HealthyAllocs: 3, + UnhealthyAllocs: 0, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10, HealthyAllocs: 10}, + }, + expectAllocs: map[string]int{tg1: 2}, + expectStop: map[string]int{tg1: 2}, + expectDState: map[string]*structs.DeploymentState{ + tg1: { + DesiredTotal: 10, + DesiredCanaries: 3, + PlacedCanaries: []string{"7", "8", "9"}, + PlacedAllocs: 5, + }, + tg2: {DesiredTotal: 10, PlacedAllocs: 10}, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + + h := tests.NewHarness(t) + nodes := createNodes(t, h, 10) + + oldJob := mock.SystemJob() + oldJob.TaskGroups[0].Update = tc.tg1UpdateBlock + oldJob.TaskGroups[0].Name = tg1 + taskGroup2 := oldJob.TaskGroups[0].Copy() + taskGroup2.Update = tc.tg2UpdateBlock + taskGroup2.Name = tg2 + oldJob.TaskGroups = append(oldJob.TaskGroups, taskGroup2) + + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, h.NextIndex(), nil, oldJob)) + + // destructively update both task groups of the job + job := oldJob.Copy() + job.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + job.TaskGroups[1].Tasks[0].Config["command"] = "/bin/other" + idx := h.NextIndex() + job.CreateIndex = idx + job.JobModifyIndex = idx + must.NoError(t, h.State.UpsertJob( + structs.MsgTypeTestSetup, idx, nil, job)) + + expectedDeployments := 0 + if len(tc.existingOldDState) > 0 { + d := mock.Deployment() + d.JobID = job.ID + d.JobVersion = oldJob.Version + d.JobCreateIndex = oldJob.CreateIndex + d.JobModifyIndex = oldJob.JobModifyIndex + h.State.UpsertDeployment(h.NextIndex(), d) + expectedDeployments++ + } + + dID := uuid.Generate() + + existAllocs := []*structs.Allocation{} + for _, tg := range []string{tg1, tg2} { + nodesToAllocs := map[string]string{} + for _, nodeIdx := range tc.existingPrevious[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = oldJob + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingRunning[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusRunning + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(true), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for _, nodeIdx := range tc.existingFailed[tg] { + alloc := mock.AllocForNode(nodes[nodeIdx]) + alloc.Job = job + alloc.JobID = job.ID + alloc.TaskGroup = tg + alloc.Name = fmt.Sprintf("my-job.%s[0]", tg) + alloc.ClientStatus = structs.AllocClientStatusFailed + nodesToAllocs[alloc.NodeID] = alloc.ID + + for _, canaryNodeIdx := range tc.existingCanary[tg] { + if nodeIdx == canaryNodeIdx { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Healthy: pointer.Of(false), + Timestamp: time.Time{}, + Canary: true, + ModifyIndex: 0, + } + alloc.DeploymentID = dID + } + } + existAllocs = append(existAllocs, alloc) + } + for i, nodeIdx := range tc.existingCanary[tg] { + nodeID := nodes[nodeIdx].ID + // find the correct alloc IDs for the PlaceCanaries + if dstate, ok := tc.existingCurrentDState[tg]; ok { + dstate.PlacedCanaries[i] = nodesToAllocs[nodeID] + } + } + } + + must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), + existAllocs)) + + if len(tc.existingCurrentDState) > 0 { + d := mock.Deployment() + d.ID = dID + d.JobID = job.ID + d.JobVersion = job.Version + d.JobCreateIndex = job.CreateIndex + d.JobModifyIndex = job.JobModifyIndex + d.TaskGroups = tc.existingCurrentDState + h.State.UpsertDeployment(h.NextIndex(), d) + } + if len(tc.expectDState) > 0 { + expectedDeployments++ + } + + eval := &structs.Evaluation{ + Namespace: job.Namespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + must.NoError(t, h.State.UpsertEvals( + structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + must.NoError(t, err) + + // Ensure a single plan + must.Len(t, 1, h.Plans) + plan := h.Plans[0] + + stopped := collect(plan.NodeUpdate) + test.Eq(t, tc.expectStop, stopped, test.Sprint("expected stop/evict")) + + // note: includes existing allocs but not previous version allocs + nodeAllocs := collect(plan.NodeAllocation) + test.Eq(t, tc.expectAllocs, nodeAllocs, test.Sprint("expected keep/place")) + + deployments, err := h.State.DeploymentsByJobID(nil, job.Namespace, job.ID, true) + must.NoError(t, err) + must.Len(t, expectedDeployments, deployments, must.Sprint("expected deployments")) + + if tc.expectDState != nil { + deployment, err := h.State.LatestDeploymentByJobID(nil, job.Namespace, job.ID) + must.NoError(t, err) + assertDeploymentState(t, tc.expectDState, deployment.TaskGroups) + } + }) } - diff := &reconciler.NodeReconcileResult{} - limit := 6 - must.False(t, evictAndPlace(ctx, diff, allocs, "", &limit)) - must.Eq(t, 2, limit, must.Sprint("evictAndReplace() should decrement limit")) - must.Len(t, 4, diff.Place, must.Sprintf("evictAndReplace() didn't insert into diffResult properly: %v", diff.Place)) }