reconciler: improve variable names and extract methods from inline logic (#12010)

* reconciler: improved variable names and extract methods from inline logic

Co-authored-by: Tim Gross <tgross@hashicorp.com>
This commit is contained in:
Derek Strickland
2022-02-05 04:54:19 -05:00
committed by GitHub
parent 29ffa02683
commit 0263650f27
2 changed files with 55 additions and 41 deletions

View File

@@ -2,9 +2,8 @@ package scheduler
import (
"fmt"
"time"
"sort"
"time"
"github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
@@ -190,8 +189,7 @@ func (a *allocReconciler) Compute() *reconcileResults {
// Create the allocation matrix
m := newAllocMatrix(a.job, a.existingAllocs)
// Handle stopping unneeded deployments
a.cancelDeployments()
a.cancelUnneededDeployments()
// If we are just stopping a job we do not need to do anything more than
// stopping all running allocs
@@ -200,30 +198,27 @@ func (a *allocReconciler) Compute() *reconcileResults {
return a.result
}
// Detect if the deployment is paused
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
// When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment.
if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}
a.computeDeploymentPaused()
// Reconcile each group
deploymentComplete := a.computeDeploymentComplete(m)
a.computeDeploymentUpdates(deploymentComplete)
return a.result
}
func (a *allocReconciler) computeDeploymentComplete(m allocMatrix) bool {
complete := true
for group, as := range m {
groupComplete := a.computeGroup(group, as)
complete = complete && groupComplete
}
return complete
}
func (a *allocReconciler) computeDeploymentUpdates(deploymentComplete bool) {
// Mark the deployment as complete if possible
if a.deployment != nil && complete {
if a.deployment != nil && deploymentComplete {
if a.job.IsMultiregion() {
// the unblocking/successful states come after blocked, so we
// need to make sure we don't revert those states
@@ -254,12 +249,32 @@ func (a *allocReconciler) Compute() *reconcileResults {
}
}
}
return a.result
}
// cancelDeployments cancels any deployment that is not needed
func (a *allocReconciler) cancelDeployments() {
func (a *allocReconciler) computeDeploymentPaused() {
if a.deployment != nil {
a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused ||
a.deployment.Status == structs.DeploymentStatusPending
a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed
}
if a.deployment == nil {
// When we create the deployment later, it will be in a pending
// state. But we also need to tell Compute we're paused, otherwise we
// make placements on the paused deployment.
if a.job.IsMultiregion() && !(a.job.IsPeriodic() || a.job.IsParameterized()) {
a.deploymentPaused = true
}
}
}
// cancelUnneededDeployments cancels any deployment that is not needed. If the
//// current deployment is not needed the deployment field is set to nil. A deployment
//// update will be staged for jobs that should stop or have the wrong version.
//// Unneeded deployments include:
//// 1. Jobs that are marked for stop, but there is a non-terminal deployment.
//// 2. Deployments that are active, but referencing a different job version.
//// 3. Deployments that are already successful.
func (a *allocReconciler) cancelUnneededDeployments() {
// If the job is stopped and there is a non-terminal deployment, cancel it
if a.job.Stopped() {
if a.deployment != nil && a.deployment.Active() {
@@ -408,8 +423,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
canaryState := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, canaryState, lostLaterEvals)
isCanarying := dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
stop := a.computeStop(tg, nameIndex, untainted, migrate, lost, canaries, isCanarying, lostLaterEvals)
desiredChanges.Stop += uint64(len(stop))
untainted = untainted.difference(stop)
@@ -424,7 +439,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Remove the canaries now that we have handled rescheduling so that we do
// not consider them when making placement decisions.
if canaryState {
if isCanarying {
untainted = untainted.difference(canaries)
}
@@ -450,8 +465,8 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}
// Determine how many we can place
canaryState = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
limit := a.computeLimit(tg, untainted, destructive, migrate, canaryState)
isCanarying = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
limit := a.computeLimit(tg, untainted, destructive, migrate, isCanarying)
// Place if:
// * The deployment is not paused or failed
@@ -461,7 +476,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * An alloc was lost
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState, lost)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, isCanarying, lost)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@@ -469,7 +484,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// deploymentPlaceReady tracks whether the deployment is in a state where
// placements can be made without any other consideration.
deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !canaryState
deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying
if deploymentPlaceReady {
desiredChanges.Place += uint64(len(place))
@@ -539,7 +554,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
taskGroup: tg,
previousAlloc: alloc,
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
@@ -775,7 +790,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
// the group definition, the set of allocations in various states and whether we
// are canarying.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex,
untainted, migrate, lost, canaries allocSet, canaryState bool, followupEvals map[string]string) allocSet {
untainted, migrate, lost, canaries allocSet, isCanarying bool, followupEvals map[string]string) allocSet {
// Mark all lost allocations for stop.
var stop allocSet
@@ -783,7 +798,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
a.markDelayed(lost, structs.AllocClientStatusLost, allocLost, followupEvals)
// If we are still deploying or creating canaries, don't stop them
if canaryState {
if isCanarying {
untainted = untainted.difference(canaries)
}
@@ -799,7 +814,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// Prefer stopping any alloc that has the same name as the canaries if we
// are promoted
if !canaryState && len(canaries) != 0 {
if !isCanarying && len(canaries) != 0 {
canaryNames := canaries.nameSet()
for id, alloc := range untainted.difference(canaries) {
if _, match := canaryNames[alloc.Name]; match {
@@ -820,8 +835,8 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// Prefer selecting from the migrating set before stopping existing allocs
if len(migrate) != 0 {
mNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
removeNames := mNames.Highest(uint(remove))
migratingNames := newAllocNameIndex(a.jobID, group.Name, group.Count, migrate)
removeNames := migratingNames.Highest(uint(remove))
for id, alloc := range migrate {
if _, match := removeNames[alloc.Name]; !match {
continue

View File

@@ -4,7 +4,6 @@ import (
"fmt"
"sort"
"strings"
"time"
"github.com/hashicorp/nomad/nomad/structs"
@@ -214,7 +213,7 @@ func (a allocSet) fromKeys(keys ...[]string) allocSet {
// 1. Those that exist on untainted nodes
// 2. Those exist on nodes that are draining
// 3. Those that exist on lost nodes
func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, migrate, lost allocSet) {
func (a allocSet) filterByTainted(taintedNodes map[string]*structs.Node) (untainted, migrate, lost allocSet) {
untainted = make(map[string]*structs.Allocation)
migrate = make(map[string]*structs.Allocation)
lost = make(map[string]*structs.Allocation)
@@ -231,7 +230,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
continue
}
n, ok := nodes[alloc.NodeID]
taintedNode, ok := taintedNodes[alloc.NodeID]
if !ok {
// Node is untainted so alloc is untainted
untainted[alloc.ID] = alloc
@@ -239,7 +238,7 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
}
// Allocs on GC'd (nil) or lost nodes are Lost
if n == nil || n.TerminalStatus() {
if taintedNode == nil || taintedNode.TerminalStatus() {
lost[alloc.ID] = alloc
continue
}