Some comments and cleanup

This commit is contained in:
Alex Dadgar
2017-05-22 17:42:41 -07:00
parent 4bbf24a875
commit 3ae7abac3a

View File

@@ -8,6 +8,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// allocReconciler is used to determine the set of allocations that require
// placement, inplace updating or stopping given the job specification and
// existing cluster state. The reconciler should only be used for batch and
// service jobs.
type allocReconciler struct {
// ctx gives access to the state store and logger
ctx Context
@@ -31,9 +35,6 @@ type allocReconciler struct {
// deploymentPaused marks whether the deployment is paused
deploymentPaused bool
// groupUpdateStrategy maps task groups to their update strategy
groupUpdateStrategy map[string]*structs.UpdateStrategy
// taintedNodes contains a map of nodes that are tainted
taintedNodes map[string]*structs.Node
@@ -45,17 +46,31 @@ type allocReconciler struct {
result *reconcileResults
}
// reconcileResults contains the results of the reconciliation and should be
// applied by the scheduler.
type reconcileResults struct {
createDeployment *structs.Deployment
// createDeployment is the deployment that should be created as a result of
// scheduling
createDeployment *structs.Deployment
// deploymentUpdates contains a set of deployment updates that should be
// applied as a result of scheduling
deploymentUpdates []*structs.DeploymentStatusUpdate
place []allocPlaceResult
// place is the set of allocations to place by the scheduler
place []allocPlaceResult
// inplaceUpdate is the set of allocations to apply an inplace update to
inplaceUpdate []*structs.Allocation
stop []allocStopResult
// stop is the set of allocations to stop
stop []allocStopResult
// TODO track the desired of the deployment
}
// allocPlaceResult contains the information required to place a single
// allocation
type allocPlaceResult struct {
name string
canary bool
@@ -63,12 +78,15 @@ type allocPlaceResult struct {
previousAlloc *structs.Allocation
}
// allocStopResult contains the information required to stop a single allocation
type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
}
// NewAllocReconciler creates a new reconciler that should be used to determine
// the changes required to bring the cluster state inline with the declared jobspec
func NewAllocReconciler(ctx Context, stack Stack, batch bool,
eval *structs.Evaluation, job *structs.Job, deployment *structs.Deployment,
existingAllocs []*structs.Allocation, taintedNodes map[string]*structs.Node) *allocReconciler {
@@ -85,23 +103,16 @@ func NewAllocReconciler(ctx Context, stack Stack, batch bool,
result: new(reconcileResults),
}
// Detect if the deployment is paused
if deployment != nil {
a.deploymentPaused = deployment.Status == structs.DeploymentStatusPaused
}
// Determine the update strategy for each group
if job != nil {
a.groupUpdateStrategy = make(map[string]*structs.UpdateStrategy)
for _, tg := range job.TaskGroups {
if u := tg.Update; u != nil {
a.groupUpdateStrategy[tg.Name] = u
}
}
}
return a
}
// Compute reconciles the existing cluster state and returns the set of changes
// required to converge the job spec and state
func (a *allocReconciler) Compute() *reconcileResults {
// If we are just stopping a job we do not need to do anything more than
// stopping all running allocs
@@ -121,7 +132,7 @@ func (a *allocReconciler) Compute() *reconcileResults {
return a.result
}
// Check if the deployment is referencing an older job
// Check if the deployment is referencing an older job and cancel it
if d := a.deployment; d != nil {
if d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex {
a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{
@@ -133,7 +144,7 @@ func (a *allocReconciler) Compute() *reconcileResults {
}
}
// Create a new deployment
// Create a new deployment if necessary
if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() {
a.deployment = structs.NewDeployment(a.job)
a.result.createDeployment = a.deployment
@@ -161,6 +172,8 @@ func (a *allocReconciler) handleStop() {
a.markStop(lost, structs.AllocClientStatusLost, allocLost)
}
// markStop is a helper for marking a set of allocation for stop with a
// particular client status and description.
func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescription string) {
for _, alloc := range allocs {
a.result.stop = append(a.result.stop, allocStopResult{
@@ -171,6 +184,7 @@ func (a *allocReconciler) markStop(allocs allocSet, clientStatus, statusDescript
}
}
// computeGroup reconciles state for a particular task group.
func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Get the task group. The task group may be nil if the job was updates such
// that the task group no longer exists
@@ -239,23 +253,21 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.markStop(stop, "", allocNotNeeded)
untainted = keep
a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep))
a.ctx.Logger().Printf("RECONCILER -- stopping %#v", stop)
a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted)
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
_, inplace, destructive := a.computeUpdates(tg, untainted)
a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep))
a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive))
// Get the update strategy of the group
strategy, update := a.groupUpdateStrategy[group]
strategy := tg.Update
// XXX need a structure for picking names
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
requireCanary := len(destructive) != 0 && update && len(canaries) < strategy.Canary
requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary
placeCanaries := requireCanary && !a.deploymentPaused
if placeCanaries {
a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", strategy.Canary-len(canaries))
@@ -271,7 +283,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
// Determine how many we can place
haveCanaries := len(canaries) != 0 || placeCanaries
limit := a.computeLimit(tg, strategy, untainted, haveCanaries)
limit := a.computeLimit(tg, untainted, haveCanaries)
a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit)
// Place if:
@@ -283,7 +295,7 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
a.ctx.Logger().Printf("RECONCILER -- CAN PLACE %v", canPlace)
if canPlace {
// Place all new allocations
place := a.computePlacements(tg, untainted, destructive)
place := a.computePlacements(tg, untainted)
a.ctx.Logger().Printf("RECONCILER -- Placing (%d)", len(place))
for _, p := range place {
a.result.place = append(a.result.place, p)
@@ -328,10 +340,13 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) {
}
}
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *structs.UpdateStrategy, untainted allocSet, canaries bool) int {
// computeLimit returns the placement limit for a particular group. The inputs
// are the group definition, the existing/untainted allocation set and whether
// any canaries exist or are being placed.
func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted allocSet, canaries bool) int {
// If there is no update stategy or deployment for the group we can deploy
// as many as the group has
if strategy == nil || a.deployment == nil {
if group.Update == nil || a.deployment == nil {
return group.Count
} else if a.deploymentPaused {
// If the deployment is paused, do not create anything else
@@ -349,7 +364,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *struc
// If we have been promoted or there are no canaries, the limit is the
// configured MaxParallel - any outstanding non-healthy alloc for the
// deployment
limit := strategy.MaxParallel
limit := group.Update.MaxParallel
partOf, _ := untainted.filterByDeployment(a.deployment.ID)
for _, alloc := range partOf {
if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.Healthy == nil {
@@ -360,9 +375,11 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *struc
return limit
}
func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted, destructiveUpdates allocSet) []allocPlaceResult {
// computePlacement returns the set of allocations to place given the group
// definiton and the set of untainted/existing allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted allocSet) []allocPlaceResult {
// Hot path the nothing to do case
existing := len(untainted) + len(destructiveUpdates)
existing := len(untainted)
if existing == group.Count {
return nil
}
@@ -379,6 +396,8 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted,
return place
}
// computeStop returns the set of allocations to stop given the group definiton
// and the set of untainted/existing allocations for the group.
func (a *allocReconciler) computeStop(group *structs.TaskGroup, untainted allocSet) (keep, stop allocSet) {
// Hot path the nothing to do case
if len(untainted) <= group.Count {
@@ -408,6 +427,12 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, untainted allocS
return
}
// computeUpdates determines which allocations for the passed group require
// updates. Three groups are returned:
// 1. Those that require no upgrades
// 2. Those that can be upgraded in-place. These are added to the results
// automatically since the function contains the correct state to do so,
// 3. Those that require destructive updates
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
// Determine the set of allocations that need to be updated
ignore = make(map[string]*structs.Allocation)