From d457735b2fa1982e2af1837b377d3055fd169c2f Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 15 Jul 2017 16:31:33 -0700 Subject: [PATCH] Treat destructive updates atomically --- scheduler/generic_sched.go | 54 ++++++++++++++++-------- scheduler/reconcile.go | 36 +++++----------- scheduler/reconcile_test.go | 83 +++++++++++++++++-------------------- scheduler/reconcile_util.go | 63 ++++++++++++++++++++++++++++ 4 files changed, 148 insertions(+), 88 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 77ce3ed83..7e84b35e5 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -425,7 +425,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Nothing remaining to do if placement is not required - if len(results.place) == 0 { + if len(results.place)+len(results.destructiveUpdate) == 0 { if !s.job.Stopped() { for _, tg := range s.job.TaskGroups { s.queuedAllocs[tg.Name] = 0 @@ -440,11 +440,18 @@ func (s *GenericScheduler) computeJobAllocs() error { } // Compute the placements - return s.computePlacements(results.place) + place := make([]placementResult, 0, len(results.place)+len(results.destructiveUpdate)) + for _, p := range results.place { + place = append(place, p) + } + for _, p := range results.destructiveUpdate { + place = append(place, p) + } + return s.computePlacements(place) } // computePlacements computes placements for allocations -func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { +func (s *GenericScheduler) computePlacements(place []placementResult) error { // Get the base nodes nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { @@ -460,14 +467,17 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { s.stack.SetNodes(nodes) for _, missing := range place { + // Get the task group + tg := missing.TaskGroup() + // Check if this task group has already failed - if metric, ok := s.failedTGAllocs[missing.taskGroup.Name]; ok { + if metric, ok := s.failedTGAllocs[tg.Name]; ok { metric.CoalescedFailures += 1 continue } // Find the preferred node - preferredNode, err := s.findPreferredNode(&missing) + preferredNode, err := s.findPreferredNode(missing) if err != nil { return err } @@ -475,9 +485,9 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { // Attempt to match the task group var option *RankedNode if preferredNode != nil { - option, _ = s.stack.SelectPreferringNodes(missing.taskGroup, []*structs.Node{preferredNode}) + option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode}) } else { - option, _ = s.stack.Select(missing.taskGroup) + option, _ = s.stack.Select(tg) } // Store the available nodes by datacenter @@ -489,9 +499,9 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { alloc := &structs.Allocation{ ID: structs.GenerateUUID(), EvalID: s.eval.ID, - Name: missing.name, + Name: missing.Name(), JobID: s.job.ID, - TaskGroup: missing.taskGroup.Name, + TaskGroup: tg.Name, Metrics: s.ctx.Metrics(), NodeID: option.Node.ID, DeploymentID: deploymentID, @@ -500,32 +510,40 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { ClientStatus: structs.AllocClientStatusPending, SharedResources: &structs.Resources{ - DiskMB: missing.taskGroup.EphemeralDisk.SizeMB, + DiskMB: tg.EphemeralDisk.SizeMB, }, } // If the new allocation is replacing an older allocation then we // set the record the older allocation id so that they are chained - if missing.previousAlloc != nil { - alloc.PreviousAllocation = missing.previousAlloc.ID + if prev := missing.PreviousAllocation(); prev != nil { + alloc.PreviousAllocation = prev.ID } // If we are placing a canary and we found a match, add the canary // to the deployment state object. - if missing.canary { - if state, ok := s.deployment.TaskGroups[missing.taskGroup.Name]; ok { + if missing.Canary() { + if state, ok := s.deployment.TaskGroups[tg.Name]; ok { state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID) } } + // Track the placement s.plan.AppendAlloc(alloc) + + // Since we have placed check to see if we should stop any previous + // allocation + if stop, desc := missing.StopPreviousAlloc(); stop { + s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, desc, "") + } + } else { // Lazy initialize the failed map if s.failedTGAllocs == nil { s.failedTGAllocs = make(map[string]*structs.AllocMetric) } - s.failedTGAllocs[missing.taskGroup.Name] = s.ctx.Metrics() + s.failedTGAllocs[tg.Name] = s.ctx.Metrics() } } @@ -533,11 +551,11 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error { } // findPreferredNode finds the preferred node for an allocation -func (s *GenericScheduler) findPreferredNode(place *allocPlaceResult) (node *structs.Node, err error) { - if place.previousAlloc != nil && place.taskGroup.EphemeralDisk.Sticky == true { +func (s *GenericScheduler) findPreferredNode(place placementResult) (node *structs.Node, err error) { + if prev := place.PreviousAllocation(); prev != nil && place.TaskGroup().EphemeralDisk.Sticky == true { var preferredNode *structs.Node ws := memdb.NewWatchSet() - preferredNode, err = s.state.NodeByID(ws, place.previousAlloc.NodeID) + preferredNode, err = s.state.NodeByID(ws, prev.NodeID) if preferredNode.Ready() { node = preferredNode } diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index ebb0381fc..366dfaa8d 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -77,6 +77,9 @@ type reconcileResults struct { // place is the set of allocations to place by the scheduler place []allocPlaceResult + // destructiveUpdate is the set of allocations to apply a destructive update to + destructiveUpdate []allocDestructiveResult + // inplaceUpdate is the set of allocations to apply an inplace update to inplaceUpdate []*structs.Allocation @@ -92,25 +95,9 @@ type reconcileResults struct { followupEvalWait time.Duration } -// allocPlaceResult contains the information required to place a single -// allocation -type allocPlaceResult struct { - name string - canary bool - taskGroup *structs.TaskGroup - previousAlloc *structs.Allocation -} - -// allocStopResult contains the information required to stop a single allocation -type allocStopResult struct { - alloc *structs.Allocation - clientStatus string - statusDescription string -} - func (r *reconcileResults) GoString() string { - base := fmt.Sprintf("Total changes: (place %d) (update %d) (stop %d)", - len(r.place), len(r.inplaceUpdate), len(r.stop)) + base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)", + len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop)) if r.deployment != nil { base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID) @@ -391,14 +378,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { desiredChanges.DestructiveUpdate += uint64(min) desiredChanges.Ignore += uint64(len(destructive) - min) for _, alloc := range destructive.nameOrder()[:min] { - a.result.stop = append(a.result.stop, allocStopResult{ - alloc: alloc, - statusDescription: allocUpdating, - }) - a.result.place = append(a.result.place, allocPlaceResult{ - name: alloc.Name, - taskGroup: tg, - previousAlloc: alloc, + a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{ + placeName: alloc.Name, + placeTaskGroup: tg, + stopAlloc: alloc, + stopStatusDescription: allocUpdating, }) } } else { diff --git a/scheduler/reconcile_test.go b/scheduler/reconcile_test.go index e9d7957a4..3b68482bc 100644 --- a/scheduler/reconcile_test.go +++ b/scheduler/reconcile_test.go @@ -235,6 +235,14 @@ func placeResultsToNames(place []allocPlaceResult) []string { return names } +func destructiveResultsToNames(destructive []allocDestructiveResult) []string { + names := make([]string, 0, len(destructive)) + for _, d := range destructive { + names = append(names, d.placeName) + } + return names +} + func stopResultsToNames(stop []allocStopResult) []string { names := make([]string, 0, len(stop)) for _, s := range stop { @@ -255,6 +263,7 @@ type resultExpectation struct { createDeployment *structs.Deployment deploymentUpdates []*structs.DeploymentStatusUpdate place int + destructive int inplace int stop int desiredTGUpdates map[string]*structs.DesiredUpdates @@ -282,6 +291,9 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) { if l := len(r.place); l != exp.place { t.Fatalf("Expected %d placements; got %d", exp.place, l) } + if l := len(r.destructiveUpdate); l != exp.destructive { + t.Fatalf("Expected %d destructive; got %d", exp.destructive, l) + } if l := len(r.inplaceUpdate); l != exp.inplace { t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l) } @@ -582,9 +594,7 @@ func TestReconciler_Destructive(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: 10, - inplace: 0, - stop: 10, + destructive: 10, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { DestructiveUpdate: 10, @@ -592,9 +602,7 @@ func TestReconciler_Destructive(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) - assertPlaceResultsHavePreviousAllocs(t, 10, r.place) + assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate)) } // Tests the reconciler properly handles destructive upgrading allocations while @@ -622,9 +630,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: 15, - inplace: 0, - stop: 10, + place: 5, + destructive: 10, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Place: 5, @@ -633,9 +640,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) - assertNamesHaveIndexes(t, intRange(0, 14), placeResultsToNames(r.place)) - assertPlaceResultsHavePreviousAllocs(t, 10, r.place) + assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate)) + assertNamesHaveIndexes(t, intRange(10, 14), placeResultsToNames(r.place)) } // Tests the reconciler properly handles destructive upgrading allocations while @@ -663,9 +669,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: 5, - inplace: 0, - stop: 10, + destructive: 5, + stop: 5, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Stop: 5, @@ -674,9 +679,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop)) - assertNamesHaveIndexes(t, intRange(0, 4), placeResultsToNames(r.place)) - assertPlaceResultsHavePreviousAllocs(t, 5, r.place) + assertNamesHaveIndexes(t, intRange(5, 9), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 4), destructiveResultsToNames(r.destructiveUpdate)) } // Tests the reconciler properly handles lost nodes with allocations @@ -1320,9 +1324,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: d, deploymentUpdates: nil, - place: 4, - inplace: 0, - stop: 4, + destructive: 4, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { DestructiveUpdate: 4, @@ -1331,8 +1333,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate)) } // Tests the reconciler creates a deployment for inplace updates @@ -2208,9 +2209,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: 2, - inplace: 0, - stop: 4, + destructive: 2, + stop: 2, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { Stop: 2, @@ -2221,8 +2221,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) { }) assertNoCanariesStopped(t, d, r.stop) - assertNamesHaveIndexes(t, intRange(2, 3), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(2, 3), destructiveResultsToNames(r.destructiveUpdate)) + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) } // Tests the reconciler handles canary promotion when the canary count equals @@ -2381,9 +2381,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: c.healthy, - inplace: 0, - stop: c.healthy, + destructive: c.healthy, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { DestructiveUpdate: uint64(c.healthy), @@ -2393,8 +2391,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) { }) if c.healthy != 0 { - assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), destructiveResultsToNames(r.destructiveUpdate)) } }) } @@ -2411,7 +2408,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{ Promoted: true, DesiredTotal: 10, - PlacedAllocs: 4, + PlacedAllocs: 7, } // Create 3 allocations from the old job @@ -2464,9 +2461,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: nil, deploymentUpdates: nil, - place: 5, - inplace: 0, - stop: 5, + place: 2, + destructive: 3, + stop: 2, followupEvalWait: 31 * time.Second, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { @@ -2479,8 +2476,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), placeResultsToNames(r.place)) - assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), stopResultsToNames(r.stop)) + assertNamesHaveIndexes(t, intRange(7, 9), destructiveResultsToNames(r.destructiveUpdate)) + assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place)) + assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop)) } // Tests the reconciler handles a failed deployment and does no placements @@ -2762,9 +2760,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) { assertResults(t, r, &resultExpectation{ createDeployment: dnew, deploymentUpdates: nil, - place: 4, - inplace: 0, - stop: 4, + destructive: 4, desiredTGUpdates: map[string]*structs.DesiredUpdates{ job.TaskGroups[0].Name: { DestructiveUpdate: 4, @@ -2773,8 +2769,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) { }, }) - assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop)) - assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place)) + assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate)) } // Tests the reconciler marks a deployment as complete diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index 34b1e811e..75c2994b5 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -8,6 +8,69 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// placementResult is an allocation that must be placed. It potentionally has a +// previous allocation attached to it that should be stopped only if the +// paired placement is complete. This gives an atomic place/stop behavior to +// prevent an impossible resource ask as part of a rolling update to wipe the +// job out. +type placementResult interface { + // TaskGroup returns the task group the placement is for + TaskGroup() *structs.TaskGroup + + // Name returns the name of the desired allocation + Name() string + + // Canary returns whether the placement should be a canary + Canary() bool + + // PreviousAllocation returns the previous allocation + PreviousAllocation() *structs.Allocation + + // StopPreviousAlloc returns whether the previous allocation should be + // stopped and if so the status description. + StopPreviousAlloc() (bool, string) +} + +// allocStopResult contains the information required to stop a single allocation +type allocStopResult struct { + alloc *structs.Allocation + clientStatus string + statusDescription string +} + +// allocPlaceResult contains the information required to place a single +// allocation +type allocPlaceResult struct { + name string + canary bool + taskGroup *structs.TaskGroup + previousAlloc *structs.Allocation +} + +func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup } +func (a allocPlaceResult) Name() string { return a.name } +func (a allocPlaceResult) Canary() bool { return a.canary } +func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc } +func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" } + +// allocDestructiveResult contains the information required to do a destructive +// update. Destructive changes should be applied atomically, as in the old alloc +// is only stopped if the new one can be placed. +type allocDestructiveResult struct { + placeName string + placeTaskGroup *structs.TaskGroup + stopAlloc *structs.Allocation + stopStatusDescription string +} + +func (a allocDestructiveResult) TaskGroup() *structs.TaskGroup { return a.placeTaskGroup } +func (a allocDestructiveResult) Name() string { return a.placeName } +func (a allocDestructiveResult) Canary() bool { return false } +func (a allocDestructiveResult) PreviousAllocation() *structs.Allocation { return a.stopAlloc } +func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) { + return true, a.stopStatusDescription +} + // allocMatrix is a mapping of task groups to their allocation set. type allocMatrix map[string]allocSet