diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 6d6c262db..d15d192bb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -419,22 +419,16 @@ func (s *GenericScheduler) computeJobAllocs() error { return nil } - // Record the number of allocations that needs to be placed per Task Group - for _, place := range results.place { - s.queuedAllocs[place.taskGroup.Name] += 1 - } - for _, destructive := range results.destructiveUpdate { - s.queuedAllocs[destructive.placeTaskGroup.Name] += 1 - } - // Compute the placements place := make([]placementResult, 0, len(results.place)) for _, p := range results.place { + s.queuedAllocs[p.taskGroup.Name] += 1 place = append(place, p) } destructive := make([]placementResult, 0, len(results.destructiveUpdate)) for _, p := range results.destructiveUpdate { + s.queuedAllocs[p.placeTaskGroup.Name] += 1 destructive = append(destructive, p) } return s.computePlacements(destructive, place) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index b722bf488..16ddc6743 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -79,7 +79,7 @@ type allocReconciler struct { evalPriority int // now is the time used when determining rescheduling eligibility - // defaults to time.Now, and overidden in unit tests + // defaults to time.Now, and overridden in unit tests now time.Time // result is the results of the reconcile. During computation it can be @@ -177,6 +177,7 @@ func NewAllocReconciler(logger log.Logger, allocUpdateFn allocUpdateType, batch evalPriority: evalPriority, now: time.Now(), result: &reconcileResults{ + attributeUpdates: make(map[string]*structs.Allocation), desiredTGUpdates: make(map[string]*structs.DesiredUpdates), desiredFollowupEvals: make(map[string][]*structs.Evaluation), }, @@ -199,9 +200,7 @@ func (a *allocReconciler) Compute() *reconcileResults { } a.computeDeploymentPaused() - deploymentComplete := a.computeDeploymentComplete(m) - a.computeDeploymentUpdates(deploymentComplete) return a.result @@ -268,12 +267,12 @@ func (a *allocReconciler) computeDeploymentPaused() { } // cancelUnneededDeployments cancels any deployment that is not needed. If the -//// current deployment is not needed the deployment field is set to nil. A deployment -//// update will be staged for jobs that should stop or have the wrong version. -//// Unneeded deployments include: -//// 1. Jobs that are marked for stop, but there is a non-terminal deployment. -//// 2. Deployments that are active, but referencing a different job version. -//// 3. Deployments that are already successful. +// current deployment is not needed the deployment field is set to nil. A deployment +// update will be staged for jobs that should stop or have the wrong version. +// Unneeded deployments include: +// 1. Jobs that are marked for stop, but there is a non-terminal deployment. +// 2. Deployments that are active, but referencing a different job version. +// 3. Deployments that are already successful. func (a *allocReconciler) cancelUnneededDeployments() { // If the job is stopped and there is a non-terminal deployment, cancel it if a.job.Stopped() { @@ -321,16 +320,20 @@ func (a *allocReconciler) cancelUnneededDeployments() { func (a *allocReconciler) handleStop(m allocMatrix) { for group, as := range m { as = filterByTerminal(as) - untainted, migrate, lost := as.filterByTainted(a.taintedNodes) - a.markStop(untainted, "", allocNotNeeded) - a.markStop(migrate, "", allocNotNeeded) - a.markStop(lost, structs.AllocClientStatusLost, allocLost) desiredChanges := new(structs.DesiredUpdates) - desiredChanges.Stop = uint64(len(as)) + desiredChanges.Stop = a.filterAndStopAll(as) a.result.desiredTGUpdates[group] = desiredChanges } } +func (a *allocReconciler) filterAndStopAll(set allocSet) uint64 { + untainted, migrate, lost := set.filterByTainted(a.taintedNodes) + a.markStop(untainted, "", allocNotNeeded) + a.markStop(migrate, "", allocNotNeeded) + a.markStop(lost, structs.AllocClientStatusLost, allocLost) + return uint64(len(set)) +} + // markStop is a helper for marking a set of allocation for stop with a // particular client status and description. func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) { @@ -358,49 +361,30 @@ func (a *allocReconciler) markDelayed(allocs allocSet, clientStatus, statusDescr // computeGroup reconciles state for a particular task group. It returns whether // the deployment it is for is complete with regards to the task group. -func (a *allocReconciler) computeGroup(group string, all allocSet) bool { +func (a *allocReconciler) computeGroup(groupName string, all allocSet) bool { // Create the desired update object for the group desiredChanges := new(structs.DesiredUpdates) - a.result.desiredTGUpdates[group] = desiredChanges + a.result.desiredTGUpdates[groupName] = desiredChanges // Get the task group. The task group may be nil if the job was updates such // that the task group no longer exists - tg := a.job.LookupTaskGroup(group) + tg := a.job.LookupTaskGroup(groupName) // If the task group is nil, then the task group has been removed so all we // need to do is stop everything if tg == nil { - untainted, migrate, lost := all.filterByTainted(a.taintedNodes) - a.markStop(untainted, "", allocNotNeeded) - a.markStop(migrate, "", allocNotNeeded) - a.markStop(lost, structs.AllocClientStatusLost, allocLost) - desiredChanges.Stop = uint64(len(untainted) + len(migrate) + len(lost)) + desiredChanges.Stop = a.filterAndStopAll(all) return true } - // Get the deployment state for the group - var dstate *structs.DeploymentState - existingDeployment := false - if a.deployment != nil { - dstate, existingDeployment = a.deployment.TaskGroups[group] - } - if !existingDeployment { - dstate = &structs.DeploymentState{} - if !tg.Update.IsEmpty() { - dstate.AutoRevert = tg.Update.AutoRevert - dstate.AutoPromote = tg.Update.AutoPromote - dstate.ProgressDeadline = tg.Update.ProgressDeadline - } - } + dstate, existingDeployment := a.initializeDeploymentState(groupName, tg) // Filter allocations that do not need to be considered because they are // from an older job version and are terminal. all, ignore := a.filterOldTerminalAllocs(all) desiredChanges.Ignore += uint64(len(ignore)) - // canaries is the set of canaries for the current deployment and all is all - // allocs including the canaries - canaries, all := a.handleGroupCanaries(all, desiredChanges) + canaries, all := a.cancelUnneededCanaries(all, desiredChanges) // Determine what set of allocations are on tainted nodes untainted, migrate, lost := all.filterByTainted(a.taintedNodes) @@ -410,16 +394,16 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Find delays for any lost allocs that have stop_after_client_disconnect lostLater := lost.delayByStopAfterClientDisconnect() - lostLaterEvals := a.handleDelayedLost(lostLater, all, tg.Name) + lostLaterEvals := a.createLostLaterEvals(lostLater, all, tg.Name) // Create batched follow up evaluations for allocations that are // reschedulable later and mark the allocations for in place updating - a.handleDelayedReschedules(rescheduleLater, all, tg.Name) + a.createRescheduleLaterEvals(rescheduleLater, all, tg.Name) // Create a structure for choosing names. Seed with the taken names // which is the union of untainted, rescheduled, allocs on migrating // nodes, and allocs on down nodes (includes canaries) - nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, rescheduleNow, lost)) + nameIndex := newAllocNameIndex(a.jobID, groupName, tg.Count, untainted.union(migrate, rescheduleNow, lost)) // Stop any unneeded allocations and update the untainted set to not // include stopped allocations. @@ -443,30 +427,14 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { untainted = untainted.difference(canaries) } - // The fact that we have destructive updates and have less canaries than is - // desired means we need to create canaries - strategy := tg.Update - canariesPromoted := dstate != nil && dstate.Promoted - requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted - if requireCanary { - dstate.DesiredCanaries = strategy.Canary - } - if requireCanary && !a.deploymentPaused && !a.deploymentFailed { - number := strategy.Canary - len(canaries) - desiredChanges.Canary += uint64(number) - - for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) { - a.result.place = append(a.result.place, allocPlaceResult{ - name: name, - canary: true, - taskGroup: tg, - }) - } + requiresCanaries := a.requiresCanaries(tg, dstate, destructive, canaries) + if requiresCanaries { + a.computeCanaries(tg, dstate, destructive, canaries, desiredChanges, nameIndex) } - // Determine how many we can place + // Determine how many non-canary allocs we can place isCanarying = dstate != nil && dstate.DesiredCanaries != 0 && !dstate.Promoted - limit := a.computeLimit(tg, untainted, destructive, migrate, isCanarying) + underProvisionedBy := a.computeUnderProvisionedBy(tg, untainted, destructive, migrate, isCanarying) // Place if: // * The deployment is not paused or failed @@ -476,7 +444,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // * An alloc was lost var place []allocPlaceResult if len(lostLater) == 0 { - place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, isCanarying, lost) + place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, lost, isCanarying) if !existingDeployment { dstate.DesiredTotal += len(place) } @@ -486,126 +454,68 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // placements can be made without any other consideration. deploymentPlaceReady := !a.deploymentPaused && !a.deploymentFailed && !isCanarying - if deploymentPlaceReady { - desiredChanges.Place += uint64(len(place)) - a.result.place = append(a.result.place, place...) - a.markStop(rescheduleNow, "", allocRescheduled) - desiredChanges.Stop += uint64(len(rescheduleNow)) - - min := helper.IntMin(len(place), limit) - limit -= min - } else if !deploymentPlaceReady { - // We do not want to place additional allocations but in the case we - // have lost allocations or allocations that require rescheduling now, - // we do so regardless to avoid odd user experiences. - if len(lost) != 0 { - allowed := helper.IntMin(len(lost), len(place)) - desiredChanges.Place += uint64(allowed) - a.result.place = append(a.result.place, place[:allowed]...) - } - - // Handle rescheduling of failed allocations even if the deployment is - // failed. We do not reschedule if the allocation is part of the failed - // deployment. - if now := len(rescheduleNow); now != 0 { - for _, p := range place { - prev := p.PreviousAllocation() - if p.IsRescheduling() && !(a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID) { - a.result.place = append(a.result.place, p) - desiredChanges.Place++ - - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: prev, - statusDescription: allocRescheduled, - }) - desiredChanges.Stop++ - } - } - } - } + underProvisionedBy = a.computeReplacements(deploymentPlaceReady, desiredChanges, place, rescheduleNow, lost, underProvisionedBy) if deploymentPlaceReady { - // Do all destructive updates - min := helper.IntMin(len(destructive), limit) - desiredChanges.DestructiveUpdate += uint64(min) - desiredChanges.Ignore += uint64(len(destructive) - min) - for _, alloc := range destructive.nameOrder()[:min] { - a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{ - placeName: alloc.Name, - placeTaskGroup: tg, - stopAlloc: alloc, - stopStatusDescription: allocUpdating, - }) - } + a.computeDestructiveUpdates(destructive, underProvisionedBy, desiredChanges, tg) } else { desiredChanges.Ignore += uint64(len(destructive)) } - // Migrate all the allocations - desiredChanges.Migrate += uint64(len(migrate)) - for _, alloc := range migrate.nameOrder() { - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: alloc, - statusDescription: allocMigrating, - }) - a.result.place = append(a.result.place, allocPlaceResult{ - name: alloc.Name, - canary: alloc.DeploymentStatus.IsCanary(), - taskGroup: tg, - previousAlloc: alloc, + a.computeMigrations(desiredChanges, migrate, tg, isCanarying) + a.createDeployment(tg.Name, tg.Update, existingDeployment, dstate, all, destructive) - downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(), - minJobVersion: alloc.Job.Version, - }) - } - - // Create new deployment if: - // 1. Updating a job specification - // 2. No running allocations (first time running a job) - updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0 - hadRunning := false - for _, alloc := range all { - if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex { - hadRunning = true - break - } - } - - // Create a new deployment if necessary - if !existingDeployment && !strategy.IsEmpty() && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) { - // A previous group may have made the deployment already - if a.deployment == nil { - a.deployment = structs.NewDeployment(a.job, a.evalPriority) - // in multiregion jobs, most deployments start in a pending state - if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { - a.deployment.Status = structs.DeploymentStatusPending - a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer - } - a.result.deployment = a.deployment - } - - // Attach the groups deployment state to the deployment - a.deployment.TaskGroups[group] = dstate - } - - // deploymentComplete is whether the deployment is complete which largely - // means that no placements were made or desired to be made - deploymentComplete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && !requireCanary - - // Final check to see if the deployment is complete is to ensure everything - // is healthy - if deploymentComplete && a.deployment != nil { - if dstate, ok := a.deployment.TaskGroups[group]; ok { - if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs - (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries - deploymentComplete = false - } - } - } + deploymentComplete := a.isDeploymentComplete(groupName, destructive, inplace, + migrate, rescheduleNow, dstate, place, rescheduleLater, requiresCanaries) return deploymentComplete } +func (a *allocReconciler) initializeDeploymentState(group string, tg *structs.TaskGroup) (*structs.DeploymentState, bool) { + var dstate *structs.DeploymentState + existingDeployment := false + + if a.deployment != nil { + dstate, existingDeployment = a.deployment.TaskGroups[group] + } + + if !existingDeployment { + dstate = &structs.DeploymentState{} + if !tg.Update.IsEmpty() { + dstate.AutoRevert = tg.Update.AutoRevert + dstate.AutoPromote = tg.Update.AutoPromote + dstate.ProgressDeadline = tg.Update.ProgressDeadline + } + } + + return dstate, existingDeployment +} + +// If we have destructive updates, and have fewer canaries than is desired, we need to create canaries. +func (a *allocReconciler) requiresCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState, destructive, canaries allocSet) bool { + canariesPromoted := dstate != nil && dstate.Promoted + return tg.Update != nil && + len(destructive) != 0 && + len(canaries) < tg.Update.Canary && + !canariesPromoted +} + +func (a *allocReconciler) computeCanaries(tg *structs.TaskGroup, dstate *structs.DeploymentState, + destructive, canaries allocSet, desiredChanges *structs.DesiredUpdates, nameIndex *allocNameIndex) { + dstate.DesiredCanaries = tg.Update.Canary + + if !a.deploymentPaused && !a.deploymentFailed { + desiredChanges.Canary += uint64(tg.Update.Canary - len(canaries)) + for _, name := range nameIndex.NextCanaries(uint(desiredChanges.Canary), canaries, destructive) { + a.result.place = append(a.result.place, allocPlaceResult{ + name: name, + canary: true, + taskGroup: tg, + }) + } + } +} + // filterOldTerminalAllocs filters allocations that should be ignored since they // are allocations that are terminal from a previous job version. func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) { @@ -628,10 +538,10 @@ func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignor return filtered, ignored } -// handleGroupCanaries handles the canaries for the group by stopping the +// cancelUnneededCanaries handles the canaries for the group by stopping the // unneeded ones and returning the current set of canaries and the updated total // set of allocs for the group -func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) { +func (a *allocReconciler) cancelUnneededCanaries(all allocSet, desiredChanges *structs.DesiredUpdates) (canaries, newAll allocSet) { // Stop any canary from an older deployment or from a failed one var stop []string @@ -680,58 +590,57 @@ func (a *allocReconciler) handleGroupCanaries(all allocSet, desiredChanges *stru return canaries, all } -// computeLimit returns the placement limit for a particular group. The inputs -// are the group definition, the untainted, destructive, and migrate allocation -// set and whether we are in a canary state. -func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive, migrate allocSet, canaryState bool) int { - // If there is no update strategy or deployment for the group we can deploy - // as many as the group has +// computeUnderProvisionedBy returns the number of allocs that still need to be +// placed for a particular group. The inputs are the group definition, the untainted, +// destructive, and migrate allocation sets, and whether we are in a canary state. +func (a *allocReconciler) computeUnderProvisionedBy(group *structs.TaskGroup, untainted, destructive, migrate allocSet, isCanarying bool) int { + // If no update strategy, nothing is migrating, and nothing is being replaced, + // allow as many as defined in group.Count if group.Update.IsEmpty() || len(destructive)+len(migrate) == 0 { return group.Count - } else if a.deploymentPaused || a.deploymentFailed { - // If the deployment is paused or failed, do not create anything else + } + + // If the deployment is nil, allow MaxParallel placements + if a.deployment == nil { + return group.Update.MaxParallel + } + + // If the deployment is paused, failed, or we have un-promoted canaries, do not create anything else. + if a.deploymentPaused || + a.deploymentFailed || + isCanarying { return 0 } - // If we have canaries and they have not been promoted the limit is 0 - if canaryState { - return 0 - } - - // If we have been promoted or there are no canaries, the limit is the - // configured MaxParallel minus any outstanding non-healthy alloc for the - // deployment - limit := group.Update.MaxParallel - if a.deployment != nil { - partOf, _ := untainted.filterByDeployment(a.deployment.ID) - for _, alloc := range partOf { - // An unhealthy allocation means nothing else should be happen. - if alloc.DeploymentStatus.IsUnhealthy() { - return 0 - } - - if !alloc.DeploymentStatus.IsHealthy() { - limit-- - } + underProvisionedBy := group.Update.MaxParallel + partOf, _ := untainted.filterByDeployment(a.deployment.ID) + for _, alloc := range partOf { + // An unhealthy allocation means nothing else should happen. + if alloc.DeploymentStatus.IsUnhealthy() { + return 0 + } + // If not yet explicitly set to healthy (nil) decrement. + if !alloc.DeploymentStatus.IsHealthy() { + underProvisionedBy-- } } // The limit can be less than zero in the case that the job was changed such // that it required destructive changes and the count was scaled up. - if limit < 0 { + if underProvisionedBy < 0 { return 0 } - return limit + return underProvisionedBy } -// computePlacement returns the set of allocations to place given the group +// computePlacements returns the set of allocations to place given the group // definition, the set of untainted, migrating and reschedule allocations for the group. // // Placements will meet or exceed group count. func (a *allocReconciler) computePlacements(group *structs.TaskGroup, - nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, - canaryState bool, lost allocSet) []allocPlaceResult { + nameIndex *allocNameIndex, untainted, migrate, reschedule, lost allocSet, + isCanarying bool) []allocPlaceResult { // Add rescheduled placement results var place []allocPlaceResult @@ -743,7 +652,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, reschedule: true, canary: alloc.DeploymentStatus.IsCanary(), - downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), + downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(), minJobVersion: alloc.Job.Version, lost: false, }) @@ -766,7 +675,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, previousAlloc: alloc, reschedule: false, canary: alloc.DeploymentStatus.IsCanary(), - downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(), + downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(), minJobVersion: alloc.Job.Version, lost: true, }) @@ -778,7 +687,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, place = append(place, allocPlaceResult{ name: name, taskGroup: group, - downgradeNonCanary: canaryState, + downgradeNonCanary: isCanarying, }) } } @@ -786,6 +695,165 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, return place } +// computeReplacements either applies the placements calculated by computePlacements, +// or computes more placements based on whether the deployment is ready for placement +// and if the placement is already rescheduling or part of a failed deployment. +// The input deploymentPlaceReady is calculated as the deployment is not paused, failed, or canarying. +// It returns the number of allocs still needed. +func (a *allocReconciler) computeReplacements(deploymentPlaceReady bool, desiredChanges *structs.DesiredUpdates, + place []allocPlaceResult, failed, lost allocSet, underProvisionedBy int) int { + + // If the deployment is place ready, apply all placements and return + if deploymentPlaceReady { + desiredChanges.Place += uint64(len(place)) + // This relies on the computePlacements having built this set, which in + // turn relies on len(lostLater) == 0. + a.result.place = append(a.result.place, place...) + a.markStop(failed, "", allocRescheduled) + desiredChanges.Stop += uint64(len(failed)) + + min := helper.IntMin(len(place), underProvisionedBy) + underProvisionedBy -= min + return underProvisionedBy + } + + // We do not want to place additional allocations but in the case we + // have lost allocations or allocations that require rescheduling now, + // we do so regardless to avoid odd user experiences. + + // If allocs have been lost, determine the number of replacements that are needed + // and add placements to the result for the lost allocs. + if len(lost) != 0 { + allowed := helper.IntMin(len(lost), len(place)) + desiredChanges.Place += uint64(allowed) + a.result.place = append(a.result.place, place[:allowed]...) + } + + // if no failures or there are no pending placements return. + if len(failed) == 0 || len(place) == 0 { + return underProvisionedBy + } + + // Handle rescheduling of failed allocations even if the deployment is failed. + // If the placement is rescheduling, and not part of a failed deployment, add + // to the place set, and add the previous alloc to the stop set. + for _, p := range place { + prev := p.PreviousAllocation() + partOfFailedDeployment := a.deploymentFailed && prev != nil && a.deployment.ID == prev.DeploymentID + + if !partOfFailedDeployment && p.IsRescheduling() { + a.result.place = append(a.result.place, p) + desiredChanges.Place++ + + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: prev, + statusDescription: allocRescheduled, + }) + desiredChanges.Stop++ + } + } + + return underProvisionedBy +} + +func (a *allocReconciler) computeDestructiveUpdates(destructive allocSet, underProvisionedBy int, + desiredChanges *structs.DesiredUpdates, tg *structs.TaskGroup) { + + // Do all destructive updates + min := helper.IntMin(len(destructive), underProvisionedBy) + desiredChanges.DestructiveUpdate += uint64(min) + desiredChanges.Ignore += uint64(len(destructive) - min) + for _, alloc := range destructive.nameOrder()[:min] { + a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{ + placeName: alloc.Name, + placeTaskGroup: tg, + stopAlloc: alloc, + stopStatusDescription: allocUpdating, + }) + } +} + +func (a *allocReconciler) computeMigrations(desiredChanges *structs.DesiredUpdates, migrate allocSet, tg *structs.TaskGroup, isCanarying bool) { + desiredChanges.Migrate += uint64(len(migrate)) + for _, alloc := range migrate.nameOrder() { + a.result.stop = append(a.result.stop, allocStopResult{ + alloc: alloc, + statusDescription: allocMigrating, + }) + a.result.place = append(a.result.place, allocPlaceResult{ + name: alloc.Name, + canary: alloc.DeploymentStatus.IsCanary(), + taskGroup: tg, + previousAlloc: alloc, + + downgradeNonCanary: isCanarying && !alloc.DeploymentStatus.IsCanary(), + minJobVersion: alloc.Job.Version, + }) + } +} + +func (a *allocReconciler) createDeployment(groupName string, strategy *structs.UpdateStrategy, + existingDeployment bool, dstate *structs.DeploymentState, all, destructive allocSet) { + // Guard the simple cases that require no computation first. + if existingDeployment || + strategy.IsEmpty() || + dstate.DesiredTotal == 0 { + return + } + + updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0 + + hadRunning := false + for _, alloc := range all { + if alloc.Job.Version == a.job.Version && alloc.Job.CreateIndex == a.job.CreateIndex { + hadRunning = true + break + } + } + + // Don't create a deployment if it's not the first time running the job + // and there are no updates to the spec. + if hadRunning && !updatingSpec { + return + } + + // A previous group may have made the deployment already. If not create one. + if a.deployment == nil { + a.deployment = structs.NewDeployment(a.job, a.evalPriority) + // in multiregion jobs, most deployments start in a pending state + if a.job.IsMultiregion() && !(a.job.IsPeriodic() && a.job.IsParameterized()) { + a.deployment.Status = structs.DeploymentStatusPending + a.deployment.StatusDescription = structs.DeploymentStatusDescriptionPendingForPeer + } + a.result.deployment = a.deployment + } + + // Attach the groups deployment state to the deployment + a.deployment.TaskGroups[groupName] = dstate +} + +func (a *allocReconciler) isDeploymentComplete(groupName string, destructive, inplace, migrate, rescheduleNow allocSet, + dstate *structs.DeploymentState, place []allocPlaceResult, rescheduleLater []*delayedRescheduleInfo, requiresCanaries bool) bool { + + complete := len(destructive)+len(inplace)+len(place)+len(migrate)+len(rescheduleNow)+len(rescheduleLater) == 0 && + !requiresCanaries + + if !complete || a.deployment == nil { + return false + } + + // Final check to see if the deployment is complete is to ensure everything is healthy + var ok bool + if dstate, ok = a.deployment.TaskGroups[groupName]; ok { + if dstate.HealthyAllocs < helper.IntMax(dstate.DesiredTotal, dstate.DesiredCanaries) || // Make sure we have enough healthy allocs + (dstate.DesiredCanaries > 0 && !dstate.Promoted) { // Make sure we are promoted if we have canaries + complete = false + } + } + + return complete +} + // computeStop returns the set of allocations that are marked for stopping given // the group definition, the set of allocations in various states and whether we // are canarying. @@ -920,17 +988,12 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all return } -// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field +// createRescheduleLaterEvals creates batched followup evaluations with the WaitUntil field // set for allocations that are eligible to be rescheduled later, and marks the alloc with // the followupEvalID -func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { +func (a *allocReconciler) createRescheduleLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) { // followupEvals are created in the same way as for delayed lost allocs - allocIDToFollowupEvalID := a.handleDelayedLost(rescheduleLater, all, tgName) - - // Initialize the annotations - if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil { - a.result.attributeUpdates = make(map[string]*structs.Allocation) - } + allocIDToFollowupEvalID := a.createLostLaterEvals(rescheduleLater, all, tgName) // Create updates that will be applied to the allocs to mark the FollowupEvalID for allocID, evalID := range allocIDToFollowupEvalID { @@ -941,10 +1004,10 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes } } -// handleDelayedLost creates batched followup evaluations with the WaitUntil field set for +// createLostLaterEvals creates batched followup evaluations with the WaitUntil field set for // lost allocations. followupEvals are appended to a.result as a side effect, we return a -// map of alloc IDs to their followupEval IDs -func (a *allocReconciler) handleDelayedLost(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string { +// map of alloc IDs to their followupEval IDs. +func (a *allocReconciler) createLostLaterEvals(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) map[string]string { if len(rescheduleLater) == 0 { return map[string]string{} } diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index d90f183bc..e6d8d9aa5 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -478,7 +478,7 @@ func bitmapFrom(input allocSet, minSize uint) structs.Bitmap { return bitmap } -// RemoveHighest removes and returns the highest n used names. The returned set +// Highest removes and returns the highest n used names. The returned set // can be less than n if there aren't n names set in the index func (a *allocNameIndex) Highest(n uint) map[string]struct{} { h := make(map[string]struct{}, n)