Treat destructive updates atomically

This commit is contained in:
Alex Dadgar
2017-07-15 16:31:33 -07:00
parent 01f8d95a0c
commit d457735b2f
4 changed files with 148 additions and 88 deletions

View File

@@ -425,7 +425,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Nothing remaining to do if placement is not required
if len(results.place) == 0 {
if len(results.place)+len(results.destructiveUpdate) == 0 {
if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
@@ -440,11 +440,18 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Compute the placements
return s.computePlacements(results.place)
place := make([]placementResult, 0, len(results.place)+len(results.destructiveUpdate))
for _, p := range results.place {
place = append(place, p)
}
for _, p := range results.destructiveUpdate {
place = append(place, p)
}
return s.computePlacements(place)
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
func (s *GenericScheduler) computePlacements(place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
@@ -460,14 +467,17 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
s.stack.SetNodes(nodes)
for _, missing := range place {
// Get the task group
tg := missing.TaskGroup()
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[missing.taskGroup.Name]; ok {
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
continue
}
// Find the preferred node
preferredNode, err := s.findPreferredNode(&missing)
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
return err
}
@@ -475,9 +485,9 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
// Attempt to match the task group
var option *RankedNode
if preferredNode != nil {
option, _ = s.stack.SelectPreferringNodes(missing.taskGroup, []*structs.Node{preferredNode})
option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode})
} else {
option, _ = s.stack.Select(missing.taskGroup)
option, _ = s.stack.Select(tg)
}
// Store the available nodes by datacenter
@@ -489,9 +499,9 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.name,
Name: missing.Name(),
JobID: s.job.ID,
TaskGroup: missing.taskGroup.Name,
TaskGroup: tg.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
DeploymentID: deploymentID,
@@ -500,32 +510,40 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.taskGroup.EphemeralDisk.SizeMB,
DiskMB: tg.EphemeralDisk.SizeMB,
},
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.previousAlloc != nil {
alloc.PreviousAllocation = missing.previousAlloc.ID
if prev := missing.PreviousAllocation(); prev != nil {
alloc.PreviousAllocation = prev.ID
}
// If we are placing a canary and we found a match, add the canary
// to the deployment state object.
if missing.canary {
if state, ok := s.deployment.TaskGroups[missing.taskGroup.Name]; ok {
if missing.Canary() {
if state, ok := s.deployment.TaskGroups[tg.Name]; ok {
state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID)
}
}
// Track the placement
s.plan.AppendAlloc(alloc)
// Since we have placed check to see if we should stop any previous
// allocation
if stop, desc := missing.StopPreviousAlloc(); stop {
s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, desc, "")
}
} else {
// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.failedTGAllocs[missing.taskGroup.Name] = s.ctx.Metrics()
s.failedTGAllocs[tg.Name] = s.ctx.Metrics()
}
}
@@ -533,11 +551,11 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
}
// findPreferredNode finds the preferred node for an allocation
func (s *GenericScheduler) findPreferredNode(place *allocPlaceResult) (node *structs.Node, err error) {
if place.previousAlloc != nil && place.taskGroup.EphemeralDisk.Sticky == true {
func (s *GenericScheduler) findPreferredNode(place placementResult) (node *structs.Node, err error) {
if prev := place.PreviousAllocation(); prev != nil && place.TaskGroup().EphemeralDisk.Sticky == true {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
preferredNode, err = s.state.NodeByID(ws, place.previousAlloc.NodeID)
preferredNode, err = s.state.NodeByID(ws, prev.NodeID)
if preferredNode.Ready() {
node = preferredNode
}

View File

@@ -77,6 +77,9 @@ type reconcileResults struct {
// place is the set of allocations to place by the scheduler
place []allocPlaceResult
// destructiveUpdate is the set of allocations to apply a destructive update to
destructiveUpdate []allocDestructiveResult
// inplaceUpdate is the set of allocations to apply an inplace update to
inplaceUpdate []*structs.Allocation
@@ -92,25 +95,9 @@ type reconcileResults struct {
followupEvalWait time.Duration
}
// allocPlaceResult contains the information required to place a single
// allocation
type allocPlaceResult struct {
name string
canary bool
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
}
// allocStopResult contains the information required to stop a single allocation
type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
}
func (r *reconcileResults) GoString() string {
base := fmt.Sprintf("Total changes: (place %d) (update %d) (stop %d)",
len(r.place), len(r.inplaceUpdate), len(r.stop))
base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)",
len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop))
if r.deployment != nil {
base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
@@ -391,14 +378,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
for _, alloc := range destructive.nameOrder()[:min] {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocUpdating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
taskGroup: tg,
previousAlloc: alloc,
a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
placeName: alloc.Name,
placeTaskGroup: tg,
stopAlloc: alloc,
stopStatusDescription: allocUpdating,
})
}
} else {

View File

@@ -235,6 +235,14 @@ func placeResultsToNames(place []allocPlaceResult) []string {
return names
}
func destructiveResultsToNames(destructive []allocDestructiveResult) []string {
names := make([]string, 0, len(destructive))
for _, d := range destructive {
names = append(names, d.placeName)
}
return names
}
func stopResultsToNames(stop []allocStopResult) []string {
names := make([]string, 0, len(stop))
for _, s := range stop {
@@ -255,6 +263,7 @@ type resultExpectation struct {
createDeployment *structs.Deployment
deploymentUpdates []*structs.DeploymentStatusUpdate
place int
destructive int
inplace int
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
@@ -282,6 +291,9 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
if l := len(r.place); l != exp.place {
t.Fatalf("Expected %d placements; got %d", exp.place, l)
}
if l := len(r.destructiveUpdate); l != exp.destructive {
t.Fatalf("Expected %d destructive; got %d", exp.destructive, l)
}
if l := len(r.inplaceUpdate); l != exp.inplace {
t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l)
}
@@ -582,9 +594,7 @@ func TestReconciler_Destructive(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 10,
inplace: 0,
stop: 10,
destructive: 10,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 10,
@@ -592,9 +602,7 @@ func TestReconciler_Destructive(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertPlaceResultsHavePreviousAllocs(t, 10, r.place)
assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler properly handles destructive upgrading allocations while
@@ -622,9 +630,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 15,
inplace: 0,
stop: 10,
place: 5,
destructive: 10,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 5,
@@ -633,9 +640,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 14), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 10, r.place)
assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(10, 14), placeResultsToNames(r.place))
}
// Tests the reconciler properly handles destructive upgrading allocations while
@@ -663,9 +669,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 5,
inplace: 0,
stop: 10,
destructive: 5,
stop: 5,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Stop: 5,
@@ -674,9 +679,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 4), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 5, r.place)
assertNamesHaveIndexes(t, intRange(5, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 4), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler properly handles lost nodes with allocations
@@ -1320,9 +1324,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: d,
deploymentUpdates: nil,
place: 4,
inplace: 0,
stop: 4,
destructive: 4,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 4,
@@ -1331,8 +1333,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler creates a deployment for inplace updates
@@ -2208,9 +2209,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
inplace: 0,
stop: 4,
destructive: 2,
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Stop: 2,
@@ -2221,8 +2221,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
})
assertNoCanariesStopped(t, d, r.stop)
assertNamesHaveIndexes(t, intRange(2, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(2, 3), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
}
// Tests the reconciler handles canary promotion when the canary count equals
@@ -2381,9 +2381,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: c.healthy,
inplace: 0,
stop: c.healthy,
destructive: c.healthy,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: uint64(c.healthy),
@@ -2393,8 +2391,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) {
})
if c.healthy != 0 {
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), destructiveResultsToNames(r.destructiveUpdate))
}
})
}
@@ -2411,7 +2408,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
Promoted: true,
DesiredTotal: 10,
PlacedAllocs: 4,
PlacedAllocs: 7,
}
// Create 3 allocations from the old job
@@ -2464,9 +2461,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 5,
inplace: 0,
stop: 5,
place: 2,
destructive: 3,
stop: 2,
followupEvalWait: 31 * time.Second,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
@@ -2479,8 +2476,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(7, 9), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
}
// Tests the reconciler handles a failed deployment and does no placements
@@ -2762,9 +2760,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: dnew,
deploymentUpdates: nil,
place: 4,
inplace: 0,
stop: 4,
destructive: 4,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 4,
@@ -2773,8 +2769,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler marks a deployment as complete

View File

@@ -8,6 +8,69 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// placementResult is an allocation that must be placed. It potentionally has a
// previous allocation attached to it that should be stopped only if the
// paired placement is complete. This gives an atomic place/stop behavior to
// prevent an impossible resource ask as part of a rolling update to wipe the
// job out.
type placementResult interface {
// TaskGroup returns the task group the placement is for
TaskGroup() *structs.TaskGroup
// Name returns the name of the desired allocation
Name() string
// Canary returns whether the placement should be a canary
Canary() bool
// PreviousAllocation returns the previous allocation
PreviousAllocation() *structs.Allocation
// StopPreviousAlloc returns whether the previous allocation should be
// stopped and if so the status description.
StopPreviousAlloc() (bool, string)
}
// allocStopResult contains the information required to stop a single allocation
type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
}
// allocPlaceResult contains the information required to place a single
// allocation
type allocPlaceResult struct {
name string
canary bool
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
}
func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup }
func (a allocPlaceResult) Name() string { return a.name }
func (a allocPlaceResult) Canary() bool { return a.canary }
func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc }
func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" }
// allocDestructiveResult contains the information required to do a destructive
// update. Destructive changes should be applied atomically, as in the old alloc
// is only stopped if the new one can be placed.
type allocDestructiveResult struct {
placeName string
placeTaskGroup *structs.TaskGroup
stopAlloc *structs.Allocation
stopStatusDescription string
}
func (a allocDestructiveResult) TaskGroup() *structs.TaskGroup { return a.placeTaskGroup }
func (a allocDestructiveResult) Name() string { return a.placeName }
func (a allocDestructiveResult) Canary() bool { return false }
func (a allocDestructiveResult) PreviousAllocation() *structs.Allocation { return a.stopAlloc }
func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) {
return true, a.stopStatusDescription
}
// allocMatrix is a mapping of task groups to their allocation set.
type allocMatrix map[string]allocSet