reconciler: refactor computeGroup (#12033)

The allocReconciler's computeGroup function contained a significant amount of inline logic that was difficult to understand the intent of. This commit extracts inline logic into the following intention revealing subroutines. It also includes updates to the function internals also aimed at improving maintainability and renames some existing functions for the same purpose. New or renamed functions include.

Renamed functions

- handleGroupCanaries -> cancelUnneededCanaries
- handleDelayedLost -> createLostLaterEvals
- handeDelayedReschedules -> createRescheduleLaterEvals

New functions

- filterAndStopAll
- initializeDeploymentState
- requiresCanaries
- computeCanaries
- computeUnderProvisionedBy
- computeReplacements
- computeDestructiveUpdates
- computeMigrations
- createDeployment
- isDeploymentComplete
This commit is contained in:
Derek Strickland
2022-02-10 16:24:51 -05:00
committed by GitHub
parent 6a3368a08f
commit cefc58dd7b
3 changed files with 288 additions and 231 deletions

View File

@@ -419,22 +419,16 @@ func (s *GenericScheduler) computeJobAllocs() error {
return nil
}
// Record the number of allocations that needs to be placed per Task Group
for _, place := range results.place {
s.queuedAllocs[place.taskGroup.Name] += 1
}
for _, destructive := range results.destructiveUpdate {
s.queuedAllocs[destructive.placeTaskGroup.Name] += 1
}
// Compute the placements
place := make([]placementResult, 0, len(results.place))
for _, p := range results.place {
s.queuedAllocs[p.taskGroup.Name] += 1
place = append(place, p)
}
destructive := make([]placementResult, 0, len(results.destructiveUpdate))
for _, p := range results.destructiveUpdate {
s.queuedAllocs[p.placeTaskGroup.Name] += 1
destructive = append(destructive, p)
}
return s.computePlacements(destructive, place)

View File

@@ -79,7 +79,7 @@ type allocReconciler struct {
evalPriority int
// now is the time used when determining rescheduling eligibility
// defaults to time.Now, and overidden in unit tests
// defaults to time.Now, and overridden in unit tests
now time.Time
// result is the results of the reconcile. During computation it can be
@@ -177,6 +177,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch
evalPriority: evalPriority,
now: time.Now(),
result: &reconcileResults{
attributeUpdates: make(map[string]*structs.Allocation),
desiredTGUpdates: make(map[string]*structs.DesiredUpdates),
desiredFollowupEvals: make(map[string][]*structs.Evaluation),
},
@@ -199,9 +200,7 @@ func (a *allocReconciler) Compute() *reconcileResults {
}
a.computeDeploymentPaused()
deploymentComplete := a.computeDeploymentComplete(m)
a.computeDeploymentUpdates(deploymentComplete)
return a.result
@@ -268,12 +267,12 @@ func (a *allocReconciler) computeDeploymentPaused() {
}
// cancelUnneededDeployments cancels any deployment that is not needed. If the
//// current deployment is not needed the deployment field is set to nil. A deployment
//// update will be staged for jobs that should stop or have the wrong version.
//// Unneeded deployments include:
//// 1. Jobs that are marked for stop, but there is a non-terminal deployment.
//// 2. Deployments that are active, but referencing a different job version.
//// 3. Deployments that are already successful.
// current deployment is not needed the deployment field is set to nil. A deployment
// update will be staged for jobs that should stop or have the wrong version.
// Unneeded deployments include:
// 1. Jobs that are marked for stop, but there is a non-terminal deployment.
// 2. Deployments that are active, but referencing a different job version.
// 3. Deployments that are already successful.
func (a *allocReconciler) cancelUnneededDeployments() {
// If the job is stopped and there is a non-terminal deployment, cancel it
if a.job.Stopped() {
@@ -321,16 +320,20 @@ func (a *allocReconciler) cancelUnneededDeployments() {
func (a *allocReconciler) handleStop(m allocMatrix) {
for group, as := range m {
as = filterByTerminal(as)
untainted, migrate, lost := as.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
desiredChanges := new(structs.DesiredUpdates)
desiredChanges.Stop = uint64(len(as))
desiredChanges.Stop = a.filterAndStopAll(as)
a.result.desiredTGUpdates[group] = desiredChanges
}
}
func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 {
untainted, migrate, lost := set.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
return uint64(len(set))
}
// markStop is a helper for marking a set of allocation for stop with a
// particular client status and description.
func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) {
@@ -358,49 +361,30 @@ func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescr
// computeGroup reconciles state for a particular task group. It returns whether
// the deployment it is for is complete with regards to the task group.
func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool {
// Create the desired update object for the group
desiredChanges := new(structs.DesiredUpdates)
a.result.desiredTGUpdates[group] = desiredChanges
a.result.desiredTGUpdates[groupName] = desiredChanges
// Get the task group. The task group may be nil if the job was updates such
// that the task group no longer exists
tg := a.job.LookupTaskGroup(group)
tg := a.job.LookupTaskGroup(groupName)
// If the task group is nil, then the task group has been removed so all we
// need to do is stop everything
if tg == nil {
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
a.markStop(untainted, "", allocNotNeeded)
a.markStop(migrate, "", allocNotNeeded)
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost))
desiredChanges.Stop = a.filterAndStopAll(all)
return true
}
// Get the deployment state for the group
var dstate *structs.DeploymentState
existingDeployment := false
if a.deployment != nil {
dstate, existingDeployment = a.deployment.TaskGroups[group]
}
if !existingDeployment {
dstate = &structs.DeploymentState{}
if !tg.Update.IsEmpty() {
dstate.AutoRevert = tg.Update.AutoRevert
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
}
dstate, existingDeployment := a.initializeDeploymentState(groupName, tg)
// Filter allocations that do not need to be considered because they are
// from an older job version and are terminal.
all, ignore := a.filterOldTerminalAllocs(all)
desiredChanges.Ignore += uint64(len(ignore))
// canaries is the set of canaries for the current deployment and all is all
// allocs including the canaries
canaries, all := a.handleGroupCanaries(all, desiredChanges)
canaries, all := a.cancelUnneededCanaries(all, desiredChanges)
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
@@ -410,16 +394,16 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Find delays for any lost allocs that have stop_after_client_disconnect
lostLater := lost.delayByStopAfterClientDisconnect()
lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name)
lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name)
// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name)
// Create a structure for choosing names. Seed with the taken names
// which is the union of untainted, rescheduled, allocs on migrating
// nodes, and allocs on down nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow, lost))
nameIndex := newAllocNameIndex(a.jobID, groupName, tg.Count, untainted.union(migrate, rescheduleNow, lost))
// Stop any unneeded allocations and update the untainted set to not
// include stopped allocations.
@@ -443,30 +427,14 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
untainted = untainted.difference(canaries)
}
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
if requireCanary && !a.deploymentPaused && !a.deploymentFailed {
number := strategy.Canary - len(canaries)
desiredChanges.Canary += uint64(number)
for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
name: name,
canary: true,
taskGroup: tg,
})
}
requiresCanaries := a.requiresCanaries(tg, dstate, destructive, canaries)
if requiresCanaries {
a.computeCanaries(tg, dstate, destructive, canaries, desiredChanges, nameIndex)
}
// Determine how many we can place
// Determine how many non-canary allocs we can place
isCanarying = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted
limit := a.computeLimit(tg, untainted, destructive, migrate, isCanarying)
underProvisionedBy := a.computeUnderProvisionedBy(tg, untainted, destructive, migrate, isCanarying)
// Place if:
// * The deployment is not paused or failed
@@ -476,7 +444,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * An alloc was lost
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, isCanarying, lost)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@@ -486,126 +454,68 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// placements can be made without any other consideration.
deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying
if deploymentPlaceReady {
desiredChanges.Place += uint64(len(place))
a.result.place = append(a.result.place, place...)
a.markStop(rescheduleNow, "", allocRescheduled)
desiredChanges.Stop += uint64(len(rescheduleNow))
min := helper.IntMin(len(place), limit)
limit -= min
} else if !deploymentPlaceReady {
// We do not want to place additional allocations but in the case we
// have lost allocations or allocations that require rescheduling now,
// we do so regardless to avoid odd user experiences.
if len(lost) != 0 {
allowed := helper.IntMin(len(lost), len(place))
desiredChanges.Place += uint64(allowed)
a.result.place = append(a.result.place, place[:allowed]...)
}
// Handle rescheduling of failed allocations even if the deployment is
// failed. We do not reschedule if the allocation is part of the failed
// deployment.
if now := len(rescheduleNow); now != 0 {
for _, p := range place {
prev := p.PreviousAllocation()
if p.IsRescheduling() && !(a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID) {
a.result.place = append(a.result.place, p)
desiredChanges.Place++
a.result.stop = append(a.result.stop, allocStopResult{
alloc: prev,
statusDescription: allocRescheduled,
})
desiredChanges.Stop++
}
}
}
}
underProvisionedBy = a.computeReplacements(deploymentPlaceReady, desiredChanges, place, rescheduleNow, lost, underProvisionedBy)
if deploymentPlaceReady {
// Do all destructive updates
min := helper.IntMin(len(destructive), limit)
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
for _, alloc := range destructive.nameOrder()[:min] {
a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
placeName: alloc.Name,
placeTaskGroup: tg,
stopAlloc: alloc,
stopStatusDescription: allocUpdating,
})
}
a.computeDestructiveUpdates(destructive, underProvisionedBy, desiredChanges, tg)
} else {
desiredChanges.Ignore += uint64(len(destructive))
}
// Migrate all the allocations
desiredChanges.Migrate += uint64(len(migrate))
for _, alloc := range migrate.nameOrder() {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocMigrating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,
a.computeMigrations(desiredChanges, migrate, tg, isCanarying)
a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive)
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
// Create new deployment if:
// 1. Updating a job specification
// 2. No running allocations (first time running a job)
updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
hadRunning := false
for _, alloc := range all {
if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex {
hadRunning = true
break
}
}
// Create a new deployment if necessary
if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}
// Attach the groups deployment state to the deployment
a.deployment.TaskGroups[group] = dstate
}
// deploymentComplete is whether the deployment is complete which largely
// means that no placements were made or desired to be made
deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary
// Final check to see if the deployment is complete is to ensure everything
// is healthy
if deploymentComplete && a.deployment != nil {
if dstate, ok := a.deployment.TaskGroups[group]; ok {
if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
deploymentComplete = false
}
}
}
deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace,
migrate, rescheduleNow, dstate, place, rescheduleLater, requiresCanaries)
return deploymentComplete
}
func (a *allocReconciler) initializeDeploymentState(group string, tg *structs.TaskGroup) (*structs.DeploymentState, bool) {
var dstate *structs.DeploymentState
existingDeployment := false
if a.deployment != nil {
dstate, existingDeployment = a.deployment.TaskGroups[group]
}
if !existingDeployment {
dstate = &structs.DeploymentState{}
if !tg.Update.IsEmpty() {
dstate.AutoRevert = tg.Update.AutoRevert
dstate.AutoPromote = tg.Update.AutoPromote
dstate.ProgressDeadline = tg.Update.ProgressDeadline
}
}
return dstate, existingDeployment
}
// If we have destructive updates, and have fewer canaries than is desired, we need to create canaries.
func (a *allocReconciler) requiresCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState, destructive, canaries allocSet) bool {
canariesPromoted := dstate != nil && dstate.Promoted
return tg.Update != nil &&
len(destructive) != 0 &&
len(canaries) < tg.Update.Canary &&
!canariesPromoted
}
func (a *allocReconciler) computeCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState,
destructive, canaries allocSet, desiredChanges *structs.DesiredUpdates, nameIndex *allocNameIndex) {
dstate.DesiredCanaries = tg.Update.Canary
if !a.deploymentPaused && !a.deploymentFailed {
desiredChanges.Canary += uint64(tg.Update.Canary - len(canaries))
for _, name := range nameIndex.NextCanaries(uint(desiredChanges.Canary), canaries, destructive) {
a.result.place = append(a.result.place, allocPlaceResult{
name: name,
canary: true,
taskGroup: tg,
})
}
}
}
// filterOldTerminalAllocs filters allocations that should be ignored since they
// are allocations that are terminal from a previous job version.
func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) {
@@ -628,10 +538,10 @@ func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignor
return filtered, ignored
}
// handleGroupCanaries handles the canaries for the group by stopping the
// cancelUnneededCanaries handles the canaries for the group by stopping the
// unneeded ones and returning the current set of canaries and the updated total
// set of allocs for the group
func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) {
// Stop any canary from an older deployment or from a failed one
var stop []string
@@ -680,58 +590,57 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru
return canaries, all
}
// computeLimit returns the placement limit for a particular group. The inputs
// are the group definition, the untainted, destructive, and migrate allocation
// set and whether we are in a canary state.
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int {
// If there is no update strategy or deployment for the group we can deploy
// as many as the group has
// computeUnderProvisionedBy returns the number of allocs that still need to be
// placed for a particular group. The inputs are the group definition, the untainted,
// destructive, and migrate allocation sets, and whether we are in a canary state.
func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, untainted, destructive, migrate allocSet, isCanarying bool) int {
// If no update strategy, nothing is migrating, and nothing is being replaced,
// allow as many as defined in group.Count
if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 {
return group.Count
} else if a.deploymentPaused || a.deploymentFailed {
// If the deployment is paused or failed, do not create anything else
}
// If the deployment is nil, allow MaxParallel placements
if a.deployment == nil {
return group.Update.MaxParallel
}
// If the deployment is paused, failed, or we have un-promoted canaries, do not create anything else.
if a.deploymentPaused ||
a.deploymentFailed ||
isCanarying {
return 0
}
// If we have canaries and they have not been promoted the limit is 0
if canaryState {
return 0
}
// If we have been promoted or there are no canaries, the limit is the
// configured MaxParallel minus any outstanding non-healthy alloc for the
// deployment
limit := group.Update.MaxParallel
if a.deployment != nil {
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
// An unhealthy allocation means nothing else should be happen.
if alloc.DeploymentStatus.IsUnhealthy() {
return 0
}
if !alloc.DeploymentStatus.IsHealthy() {
limit--
}
underProvisionedBy := group.Update.MaxParallel
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
// An unhealthy allocation means nothing else should happen.
if alloc.DeploymentStatus.IsUnhealthy() {
return 0
}
// If not yet explicitly set to healthy (nil) decrement.
if !alloc.DeploymentStatus.IsHealthy() {
underProvisionedBy--
}
}
// The limit can be less than zero in the case that the job was changed such
// that it required destructive changes and the count was scaled up.
if limit < 0 {
if underProvisionedBy < 0 {
return 0
}
return limit
return underProvisionedBy
}
// computePlacement returns the set of allocations to place given the group
// computePlacements returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
//
// Placements will meet or exceed group count.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet,
canaryState bool, lost allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet,
isCanarying bool) []allocPlaceResult {
// Add rescheduled placement results
var place []allocPlaceResult
@@ -743,7 +652,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
lost: false,
})
@@ -766,7 +675,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
previousAlloc: alloc,
reschedule: false,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
lost: true,
})
@@ -778,7 +687,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
downgradeNonCanary: canaryState,
downgradeNonCanary: isCanarying,
})
}
}
@@ -786,6 +695,165 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
return place
}
// computeReplacements either applies the placements calculated by computePlacements,
// or computes more placements based on whether the deployment is ready for placement
// and if the placement is already rescheduling or part of a failed deployment.
// The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying.
// It returns the number of allocs still needed.
func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates,
place []allocPlaceResult, failed, lost allocSet, underProvisionedBy int) int {
// If the deployment is place ready, apply all placements and return
if deploymentPlaceReady {
desiredChanges.Place += uint64(len(place))
// This relies on the computePlacements having built this set, which in
// turn relies on len(lostLater) == 0.
a.result.place = append(a.result.place, place...)
a.markStop(failed, "", allocRescheduled)
desiredChanges.Stop += uint64(len(failed))
min := helper.IntMin(len(place), underProvisionedBy)
underProvisionedBy -= min
return underProvisionedBy
}
// We do not want to place additional allocations but in the case we
// have lost allocations or allocations that require rescheduling now,
// we do so regardless to avoid odd user experiences.
// If allocs have been lost, determine the number of replacements that are needed
// and add placements to the result for the lost allocs.
if len(lost) != 0 {
allowed := helper.IntMin(len(lost), len(place))
desiredChanges.Place += uint64(allowed)
a.result.place = append(a.result.place, place[:allowed]...)
}
// if no failures or there are no pending placements return.
if len(failed) == 0 || len(place) == 0 {
return underProvisionedBy
}
// Handle rescheduling of failed allocations even if the deployment is failed.
// If the placement is rescheduling, and not part of a failed deployment, add
// to the place set, and add the previous alloc to the stop set.
for _, p := range place {
prev := p.PreviousAllocation()
partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID
if !partOfFailedDeployment && p.IsRescheduling() {
a.result.place = append(a.result.place, p)
desiredChanges.Place++
a.result.stop = append(a.result.stop, allocStopResult{
alloc: prev,
statusDescription: allocRescheduled,
})
desiredChanges.Stop++
}
}
return underProvisionedBy
}
func (a *allocReconciler) computeDestructiveUpdates(destructive allocSet, underProvisionedBy int,
desiredChanges *structs.DesiredUpdates, tg *structs.TaskGroup) {
// Do all destructive updates
min := helper.IntMin(len(destructive), underProvisionedBy)
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
for _, alloc := range destructive.nameOrder()[:min] {
a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
placeName: alloc.Name,
placeTaskGroup: tg,
stopAlloc: alloc,
stopStatusDescription: allocUpdating,
})
}
}
func (a *allocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdates, migrate allocSet, tg *structs.TaskGroup, isCanarying bool) {
desiredChanges.Migrate += uint64(len(migrate))
for _, alloc := range migrate.nameOrder() {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocMigrating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,
downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
}
func (a *allocReconciler) createDeployment(groupName string, strategy *structs.UpdateStrategy,
existingDeployment bool, dstate *structs.DeploymentState, all, destructive allocSet) {
// Guard the simple cases that require no computation first.
if existingDeployment ||
strategy.IsEmpty() ||
dstate.DesiredTotal == 0 {
return
}
updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
hadRunning := false
for _, alloc := range all {
if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex {
hadRunning = true
break
}
}
// Don't create a deployment if it's not the first time running the job
// and there are no updates to the spec.
if hadRunning && !updatingSpec {
return
}
// A previous group may have made the deployment already. If not create one.
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job, a.evalPriority)
// in multiregion jobs, most deployments start in a pending state
if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) {
a.deployment.Status = structs.DeploymentStatusPending
a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer
}
a.result.deployment = a.deployment
}
// Attach the groups deployment state to the deployment
a.deployment.TaskGroups[groupName] = dstate
}
func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, inplace, migrate, rescheduleNow allocSet,
dstate *structs.DeploymentState, place []allocPlaceResult, rescheduleLater []*delayedRescheduleInfo, requiresCanaries bool) bool {
complete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 &&
!requiresCanaries
if !complete || a.deployment == nil {
return false
}
// Final check to see if the deployment is complete is to ensure everything is healthy
var ok bool
if dstate, ok = a.deployment.TaskGroups[groupName]; ok {
if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs
(dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries
complete = false
}
}
return complete
}
// computeStop returns the set of allocations that are marked for stopping given
// the group definition, the set of allocations in various states and whether we
// are canarying.
@@ -920,17 +988,12 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all
return
}
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field
// createRescheduleLaterEvals creates batched followup evaluations with the WaitUntil field
// set for allocations that are eligible to be rescheduled later, and marks the alloc with
// the followupEvalID
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
// followupEvals are created in the same way as for delayed lost allocs
allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName)
// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}
allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, all, tgName)
// Create updates that will be applied to the allocs to mark the FollowupEvalID
for allocID, evalID := range allocIDToFollowupEvalID {
@@ -941,10 +1004,10 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes
}
}
// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for
// createLostLaterEvals creates batched followup evaluations with the WaitUntil field set for
// lost allocations. followupEvals are appended to a.result as a side effect, we return a
// map of alloc IDs to their followupEval IDs
func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
// map of alloc IDs to their followupEval IDs.
func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string {
if len(rescheduleLater) == 0 {
return map[string]string{}
}

View File

@@ -478,7 +478,7 @@ func bitmapFrom(input allocSet, minSize uint) structs.Bitmap {
return bitmap
}
// RemoveHighest removes and returns the highest n used names. The returned set
// Highest removes and returns the highest n used names. The returned set
// can be less than n if there aren't n names set in the index
func (a *allocNameIndex) Highest(n uint) map[string]struct{} {
h := make(map[string]struct{}, n)