diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 8d9e82259..3f03c3a21 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -8,6 +8,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// allocReconciler is used to determine the set of allocations that require +// placement, inplace updating or stopping given the job specification and +// existing cluster state. The reconciler should only be used for batch and +// service jobs. type allocReconciler struct { // ctx gives access to the state store and logger ctx Context @@ -31,9 +35,6 @@ type allocReconciler struct { // deploymentPaused marks whether the deployment is paused deploymentPaused bool - // groupUpdateStrategy maps task groups to their update strategy - groupUpdateStrategy map[string]*structs.UpdateStrategy - // taintedNodes contains a map of nodes that are tainted taintedNodes map[string]*structs.Node @@ -45,17 +46,31 @@ type allocReconciler struct { result *reconcileResults } +// reconcileResults contains the results of the reconciliation and should be +// applied by the scheduler. type reconcileResults struct { - createDeployment *structs.Deployment + // createDeployment is the deployment that should be created as a result of + // scheduling + createDeployment *structs.Deployment + + // deploymentUpdates contains a set of deployment updates that should be + // applied as a result of scheduling deploymentUpdates []*structs.DeploymentStatusUpdate - place []allocPlaceResult + // place is the set of allocations to place by the scheduler + place []allocPlaceResult + + // inplaceUpdate is the set of allocations to apply an inplace update to inplaceUpdate []*structs.Allocation - stop []allocStopResult + + // stop is the set of allocations to stop + stop []allocStopResult // TODO track the desired of the deployment } +// allocPlaceResult contains the information required to place a single +// allocation type allocPlaceResult struct { name string canary bool @@ -63,12 +78,15 @@ type allocPlaceResult struct { previousAlloc *structs.Allocation } +// allocStopResult contains the information required to stop a single allocation type allocStopResult struct { alloc *structs.Allocation clientStatus string statusDescription string } +// NewAllocReconciler creates a new reconciler that should be used to determine +// the changes required to bring the cluster state inline with the declared jobspec func NewAllocReconciler(ctx Context, stack Stack, batch bool, eval *structs.Evaluation, job *structs.Job, deployment *structs.Deployment, existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler { @@ -85,23 +103,16 @@ func NewAllocReconciler(ctx Context, stack Stack, batch bool, result: new(reconcileResults), } + // Detect if the deployment is paused if deployment != nil { a.deploymentPaused = deployment.Status == structs.DeploymentStatusPaused } - // Determine the update strategy for each group - if job != nil { - a.groupUpdateStrategy = make(map[string]*structs.UpdateStrategy) - for _, tg := range job.TaskGroups { - if u := tg.Update; u != nil { - a.groupUpdateStrategy[tg.Name] = u - } - } - } - return a } +// Compute reconciles the existing cluster state and returns the set of changes +// required to converge the job spec and state func (a *allocReconciler) Compute() *reconcileResults { // If we are just stopping a job we do not need to do anything more than // stopping all running allocs @@ -121,7 +132,7 @@ func (a *allocReconciler) Compute() *reconcileResults { return a.result } - // Check if the deployment is referencing an older job + // Check if the deployment is referencing an older job and cancel it if d := a.deployment; d != nil { if d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex { a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ @@ -133,7 +144,7 @@ func (a *allocReconciler) Compute() *reconcileResults { } } - // Create a new deployment + // Create a new deployment if necessary if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() { a.deployment = structs.NewDeployment(a.job) a.result.createDeployment = a.deployment @@ -161,6 +172,8 @@ func (a *allocReconciler) handleStop() { a.markStop(lost, structs.AllocClientStatusLost, allocLost) } +// markStop is a helper for marking a set of allocation for stop with a +// particular client status and description. func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) { for _, alloc := range allocs { a.result.stop = append(a.result.stop, allocStopResult{ @@ -171,6 +184,7 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript } } +// computeGroup reconciles state for a particular task group. func (a *allocReconciler) computeGroup(group string, as allocSet) { // Get the task group. The task group may be nil if the job was updates such // that the task group no longer exists @@ -239,23 +253,21 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.markStop(stop, "", allocNotNeeded) untainted = keep - a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep)) - a.ctx.Logger().Printf("RECONCILER -- stopping %#v", stop) - a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted) - // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. _, inplace, destructive := a.computeUpdates(tg, untainted) + + a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep)) a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) // Get the update strategy of the group - strategy, update := a.groupUpdateStrategy[group] + strategy := tg.Update // XXX need a structure for picking names // The fact that we have destructive updates and have less canaries than is // desired means we need to create canaries - requireCanary := len(destructive) != 0 && update && len(canaries) < strategy.Canary + requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary placeCanaries := requireCanary && !a.deploymentPaused if placeCanaries { a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", strategy.Canary-len(canaries)) @@ -271,7 +283,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // Determine how many we can place haveCanaries := len(canaries) != 0 || placeCanaries - limit := a.computeLimit(tg, strategy, untainted, haveCanaries) + limit := a.computeLimit(tg, untainted, haveCanaries) a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit) // Place if: @@ -283,7 +295,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.ctx.Logger().Printf("RECONCILER -- CAN PLACE %v", canPlace) if canPlace { // Place all new allocations - place := a.computePlacements(tg, untainted, destructive) + place := a.computePlacements(tg, untainted) a.ctx.Logger().Printf("RECONCILER -- Placing (%d)", len(place)) for _, p := range place { a.result.place = append(a.result.place, p) @@ -328,10 +340,13 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } } -func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *structs.UpdateStrategy, untainted allocSet, canaries bool) int { +// computeLimit returns the placement limit for a particular group. The inputs +// are the group definition, the existing/untainted allocation set and whether +// any canaries exist or are being placed. +func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted allocSet, canaries bool) int { // If there is no update stategy or deployment for the group we can deploy // as many as the group has - if strategy == nil || a.deployment == nil { + if group.Update == nil || a.deployment == nil { return group.Count } else if a.deploymentPaused { // If the deployment is paused, do not create anything else @@ -349,7 +364,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *struc // If we have been promoted or there are no canaries, the limit is the // configured MaxParallel - any outstanding non-healthy alloc for the // deployment - limit := strategy.MaxParallel + limit := group.Update.MaxParallel partOf, _ := untainted.filterByDeployment(a.deployment.ID) for _, alloc := range partOf { if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.Healthy == nil { @@ -360,9 +375,11 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *struc return limit } -func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted, destructiveUpdates allocSet) []allocPlaceResult { +// computePlacement returns the set of allocations to place given the group +// definiton and the set of untainted/existing allocations for the group. +func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted allocSet) []allocPlaceResult { // Hot path the nothing to do case - existing := len(untainted) + len(destructiveUpdates) + existing := len(untainted) if existing == group.Count { return nil } @@ -379,6 +396,8 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted, return place } +// computeStop returns the set of allocations to stop given the group definiton +// and the set of untainted/existing allocations for the group. func (a *allocReconciler) computeStop(group *structs.TaskGroup, untainted allocSet) (keep, stop allocSet) { // Hot path the nothing to do case if len(untainted) <= group.Count { @@ -408,6 +427,12 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, untainted allocS return } +// computeUpdates determines which allocations for the passed group require +// updates. Three groups are returned: +// 1. Those that require no upgrades +// 2. Those that can be upgraded in-place. These are added to the results +// automatically since the function contains the correct state to do so, +// 3. Those that require destructive updates func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) { // Determine the set of allocations that need to be updated ignore = make(map[string]*structs.Allocation)