diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index d15b31be0..b722bf488 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -2,9 +2,8 @@ package scheduler import ( "fmt" - "time" - "sort" + "time" "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" @@ -190,8 +189,7 @@ func (a *allocReconciler) Compute() *reconcileResults { // Create the allocation matrix m := newAllocMatrix(a.job, a.existingAllocs) - // Handle stopping unneeded deployments - a.cancelDeployments() + a.cancelUnneededDeployments() // If we are just stopping a job we do not need to do anything more than // stopping all running allocs @@ -200,30 +198,27 @@ func (a *allocReconciler) Compute() *reconcileResults { return a.result } - // Detect if the deployment is paused - if a.deployment != nil { - a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused || - a.deployment.Status == structs.DeploymentStatusPending - a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed - } - if a.deployment == nil { - // When we create the deployment later, it will be in a pending - // state. But we also need to tell Compute we're paused, otherwise we - // make placements on the paused deployment. - if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) { - a.deploymentPaused = true - } - } + a.computeDeploymentPaused() - // Reconcile each group + deploymentComplete := a.computeDeploymentComplete(m) + + a.computeDeploymentUpdates(deploymentComplete) + + return a.result +} + +func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool { complete := true for group, as := range m { groupComplete := a.computeGroup(group, as) complete = complete && groupComplete } + return complete +} +func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) { // Mark the deployment as complete if possible - if a.deployment != nil && complete { + if a.deployment != nil && deploymentComplete { if a.job.IsMultiregion() { // the unblocking/successful states come after blocked, so we // need to make sure we don't revert those states @@ -254,12 +249,32 @@ func (a *allocReconciler) Compute() *reconcileResults { } } } - - return a.result } -// cancelDeployments cancels any deployment that is not needed -func (a *allocReconciler) cancelDeployments() { +func (a *allocReconciler) computeDeploymentPaused() { + if a.deployment != nil { + a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused || + a.deployment.Status == structs.DeploymentStatusPending + a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed + } + if a.deployment == nil { + // When we create the deployment later, it will be in a pending + // state. But we also need to tell Compute we're paused, otherwise we + // make placements on the paused deployment. + if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) { + a.deploymentPaused = true + } + } +} + +// cancelUnneededDeployments cancels any deployment that is not needed. If the +//// current deployment is not needed the deployment field is set to nil. A deployment +//// update will be staged for jobs that should stop or have the wrong version. +//// Unneeded deployments include: +//// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +//// 2. Deployments that are active, but referencing a different job version. +//// 3. Deployments that are already successful. +func (a *allocReconciler) cancelUnneededDeployments() { // If the job is stopped and there is a non-terminal deployment, cancel it if a.job.Stopped() { if a.deployment != nil && a.deployment.Active() { @@ -408,8 +423,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. - canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals) + isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted + stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals) desiredChanges.Stop += uint64(len(stop)) untainted = untainted.difference(stop) @@ -424,7 +439,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Remove the canaries now that we have handled rescheduling so that we do // not consider them when making placement decisions. - if canaryState { + if isCanarying { untainted = untainted.difference(canaries) } @@ -450,8 +465,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { } // Determine how many we can place - canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - limit := a.computeLimit(tg, untainted, destructive, migrate, canaryState) + isCanarying = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted + limit := a.computeLimit(tg, untainted, destructive, migrate, isCanarying) // Place if: // * The deployment is not paused or failed @@ -461,7 +476,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * An alloc was lost var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState, lost) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, isCanarying, lost) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -469,7 +484,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // deploymentPlaceReady tracks whether the deployment is in a state where // placements can be made without any other consideration. - deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !canaryState + deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying if deploymentPlaceReady { desiredChanges.Place += uint64(len(place)) @@ -539,7 +554,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { taskGroup: tg, previousAlloc: alloc, - downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), + downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(), minJobVersion: alloc.Job.Version, }) } @@ -775,7 +790,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, // the group definition, the set of allocations in various states and whether we // are canarying. func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, - untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet { + untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet { // Mark all lost allocations for stop. var stop allocSet @@ -783,7 +798,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals) // If we are still deploying or creating canaries, don't stop them - if canaryState { + if isCanarying { untainted = untainted.difference(canaries) } @@ -799,7 +814,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // Prefer stopping any alloc that has the same name as the canaries if we // are promoted - if !canaryState && len(canaries) != 0 { + if !isCanarying && len(canaries) != 0 { canaryNames := canaries.nameSet() for id, alloc := range untainted.difference(canaries) { if _, match := canaryNames[alloc.Name]; match { @@ -820,8 +835,8 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc // Prefer selecting from the migrating set before stopping existing allocs if len(migrate) != 0 { - mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate) - removeNames := mNames.Highest(uint(remove)) + migratingNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate) + removeNames := migratingNames.Highest(uint(remove)) for id, alloc := range migrate { if _, match := removeNames[alloc.Name]; !match { continue diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 40ced8a86..d90f183bc 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -4,7 +4,6 @@ import ( "fmt" "sort" "strings" - "time" "github.com/hashicorp/nomad/nomad/structs" @@ -214,7 +213,7 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet { // 1. Those that exist on untainted nodes // 2. Those exist on nodes that are draining // 3. Those that exist on lost nodes -func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, migrate, lost allocSet) { +func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost allocSet) { untainted = make(map[string]*structs.Allocation) migrate = make(map[string]*structs.Allocation) lost = make(map[string]*structs.Allocation) @@ -231,7 +230,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi continue } - n, ok := nodes[alloc.NodeID] + taintedNode, ok := taintedNodes[alloc.NodeID] if !ok { // Node is untainted so alloc is untainted untainted[alloc.ID] = alloc @@ -239,7 +238,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi } // Allocs on GC'd (nil) or lost nodes are Lost - if n == nil || n.TerminalStatus() { + if taintedNode == nil || taintedNode.TerminalStatus() { lost[alloc.ID] = alloc continue }