Merge pull request #4038 from hashicorp/f-deployment

Only create deployment if job changes or no allocs
This commit is contained in:
Alex Dadgar
2018-03-26 11:06:39 -07:00
committed by GitHub
3 changed files with 124 additions and 52 deletions

View File

@@ -366,6 +366,11 @@ func (s *GenericScheduler) computeJobAllocs() error {
s.ctx.Plan().AppendAlloc(update)
}
// Handle the annotation updates
for _, update := range results.attributeUpdates {
s.ctx.Plan().AppendAlloc(update)
}
// Nothing remaining to do if placement is not required
if len(results.place)+len(results.destructiveUpdate) == 0 {
if !s.job.Stopped() {

View File

@@ -95,6 +95,10 @@ type reconcileResults struct {
// stop is the set of allocations to stop
stop []allocStopResult
// attributeUpdates are updates to the allocation that are not from a
// jobspec change.
attributeUpdates map[string]*structs.Allocation
// desiredTGUpdates captures the desired set of changes to make for each
// task group.
desiredTGUpdates map[string]*structs.DesiredUpdates
@@ -326,8 +330,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
}
}
// Filter batch allocations that do not need to be considered.
all, ignore := a.batchFiltration(all)
// Filter allocations that do not need to be considered because they are
// from an older job version and are terminal.
all, ignore := a.filterOldTerminalAllocs(all)
desiredChanges.Ignore += uint64(len(ignore))
canaries, all := a.handleGroupCanaries(all, desiredChanges)
@@ -338,11 +343,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Determine what set of terminal allocations need to be rescheduled
untainted, rescheduleNow, rescheduleLater := untainted.filterByRescheduleable(a.batch)
// Create batched follow up evaluations for allocations that are reschedulable later
var rescheduleLaterAllocs map[string]*structs.Allocation
if len(rescheduleLater) > 0 {
rescheduleLaterAllocs = a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
}
// Create batched follow up evaluations for allocations that are
// reschedulable later and mark the allocations for in place updating
a.handleDelayedReschedules(rescheduleLater, all, tg.Name)
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes (includes canaries)
@@ -365,7 +368,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Do inplace upgrades where possible and capture the set of upgrades that
// need to be done destructively.
ignore, inplace, destructive := a.computeUpdates(tg, untainted, rescheduleLaterAllocs)
ignore, inplace, destructive := a.computeUpdates(tg, untainted)
desiredChanges.Ignore += uint64(len(ignore))
desiredChanges.InPlaceUpdate += uint64(len(inplace))
if !existingDeployment {
@@ -480,8 +483,20 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
}
// Create new deployment if:
// 1. Updating a job specification
// 2. No running allocations (first time running a job)
updatingSpec := len(destructive) != 0 || len(a.result.inplaceUpdate) != 0
hadRunning := false
for _, alloc := range all {
if alloc.Job.Version == a.job.Version {
hadRunning = true
break
}
}
// Create a new deployment if necessary
if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 {
if !existingDeployment && strategy != nil && dstate.DesiredTotal != 0 && (!hadRunning || updatingSpec) {
// A previous group may have made the deployment already
if a.deployment == nil {
a.deployment = structs.NewDeployment(a.job)
@@ -511,9 +526,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
return deploymentComplete
}
// batchFiltration filters batch allocations that should be ignored. These are
// allocations that are terminal from a previous job version.
func (a *allocReconciler) batchFiltration(all allocSet) (filtered, ignore allocSet) {
// filterOldTerminalAllocs filters allocations that should be ignored since they
// are allocations that are terminal from a previous job version.
func (a *allocReconciler) filterOldTerminalAllocs(all allocSet) (filtered, ignore allocSet) {
if !a.batch {
return all, nil
}
@@ -782,7 +797,7 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// 2. Those that can be upgraded in-place. These are added to the results
// automatically since the function contains the correct state to do so,
// 3. Those that require destructive updates
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, rescheduleLaterAllocs allocSet) (ignore, inplace, destructive allocSet) {
func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted allocSet) (ignore, inplace, destructive allocSet) {
// Determine the set of allocations that need to be updated
ignore = make(map[string]*structs.Allocation)
inplace = make(map[string]*structs.Allocation)
@@ -790,19 +805,11 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, re
for _, alloc := range untainted {
ignoreChange, destructiveChange, inplaceAlloc := a.allocUpdateFn(alloc, a.job, group)
// Also check if the alloc is marked for later rescheduling.
// If so it should be in the inplace list
reschedLaterAlloc, isRescheduleLater := rescheduleLaterAllocs[alloc.ID]
if isRescheduleLater {
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, reschedLaterAlloc)
} else if ignoreChange {
if ignoreChange {
ignore[alloc.ID] = alloc
} else if destructiveChange {
destructive[alloc.ID] = alloc
} else {
// Attach the deployment ID and and clear the health if the
// deployment has changed
inplace[alloc.ID] = alloc
a.result.inplaceUpdate = append(a.result.inplaceUpdate, inplaceAlloc)
}
@@ -813,7 +820,11 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted, re
// handleDelayedReschedules creates batched followup evaluations with the WaitUntil field set
// for allocations that are eligible to be rescheduled later
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) allocSet {
func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRescheduleInfo, all allocSet, tgName string) {
if len(rescheduleLater) == 0 {
return
}
// Sort by time
sort.Slice(rescheduleLater, func(i, j int) bool {
return rescheduleLater[i].rescheduleTime.Before(rescheduleLater[j].rescheduleTime)
@@ -822,6 +833,7 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes
var evals []*structs.Evaluation
nextReschedTime := rescheduleLater[0].rescheduleTime
allocIDToFollowupEvalID := make(map[string]string, len(rescheduleLater))
// Create a new eval for the first batch
eval := &structs.Evaluation{
ID: uuid.Generate(),
@@ -835,6 +847,7 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes
WaitUntil: nextReschedTime,
}
evals = append(evals, eval)
for _, allocReschedInfo := range rescheduleLater {
if allocReschedInfo.rescheduleTime.Sub(nextReschedTime) < batchedFailedAllocWindowSize {
allocIDToFollowupEvalID[allocReschedInfo.allocID] = eval.ID
@@ -861,13 +874,16 @@ func (a *allocReconciler) handleDelayedReschedules(rescheduleLater []*delayedRes
a.result.desiredFollowupEvals[tgName] = evals
// Initialize the annotations
if len(allocIDToFollowupEvalID) != 0 && a.result.attributeUpdates == nil {
a.result.attributeUpdates = make(map[string]*structs.Allocation)
}
// Create in-place updates for every alloc ID that needs to be updated with its follow up eval ID
rescheduleLaterAllocs := make(map[string]*structs.Allocation)
for allocID, evalID := range allocIDToFollowupEvalID {
existingAlloc := all[allocID]
updatedAlloc := existingAlloc.Copy()
updatedAlloc.FollowupEvalID = evalID
rescheduleLaterAllocs[allocID] = updatedAlloc
a.result.attributeUpdates[updatedAlloc.ID] = updatedAlloc
}
return rescheduleLaterAllocs
}

View File

@@ -114,8 +114,7 @@ func allocUpdateFnDestructive(*structs.Allocation, *structs.Job, *structs.TaskGr
func allocUpdateFnInplace(existing *structs.Allocation, _ *structs.Job, newTG *structs.TaskGroup) (bool, bool, *structs.Allocation) {
// Create a shallow copy
newAlloc := new(structs.Allocation)
*newAlloc = *existing
newAlloc := existing.CopySkipJob()
newAlloc.TaskResources = make(map[string]*structs.Resources)
// Use the new task resources but keep the network from the old
@@ -289,6 +288,14 @@ func stopResultsToNames(stop []allocStopResult) []string {
return names
}
func attributeUpdatesToNames(attributeUpdates map[string]*structs.Allocation) []string {
names := make([]string, 0, len(attributeUpdates))
for _, a := range attributeUpdates {
names = append(names, a.Name)
}
return names
}
func allocsToNames(allocs []*structs.Allocation) []string {
names := make([]string, 0, len(allocs))
for _, a := range allocs {
@@ -303,6 +310,7 @@ type resultExpectation struct {
place int
destructive int
inplace int
attributeUpdates int
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
}
@@ -334,6 +342,9 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
if l := len(r.inplaceUpdate); l != exp.inplace {
t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l)
}
if l := len(r.attributeUpdates); l != exp.attributeUpdates {
t.Fatalf("Expected %d attribute updates; got %d", exp.attributeUpdates, l)
}
if l := len(r.stop); l != exp.stop {
t.Fatalf("Expected %d stops; got %d", exp.stop, l)
}
@@ -1208,14 +1219,17 @@ func TestReconciler_MultiTG(t *testing.T) {
// Tests delayed rescheduling of failed batch allocations
func TestReconciler_RescheduleLater_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 4
now := time.Now()
// Set up reschedule policy
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 6 existing allocations - 2 running, 1 complete and 3 failed
var allocs []*structs.Allocation
for i := 0; i < 6; i++ {
@@ -1227,6 +1241,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) {
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 3 as failed with restart tracking info
allocs[0].ClientStatus = structs.AllocClientStatusFailed
allocs[0].NextAllocation = allocs[1].ID
@@ -1252,6 +1267,7 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) {
PrevNodeID: uuid.Generate(),
},
}}
// Mark one as complete
allocs[5].ClientStatus = structs.AllocClientStatusComplete
@@ -1259,7 +1275,6 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) {
r := reconciler.Compute()
// Two reschedule attempts were already made, one more can be made at a future time
// Verify that the follow up eval has the expected waitUntil time
evals := r.desiredFollowupEvals[tgName]
require.NotNil(evals)
@@ -1271,33 +1286,42 @@ func TestReconciler_RescheduleLater_Batch(t *testing.T) {
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 1,
inplace: 0,
attributeUpdates: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 1,
Ignore: 3,
InPlaceUpdate: 0,
Ignore: 4,
},
},
})
assertNamesHaveIndexes(t, intRange(2, 2), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
assertNamesHaveIndexes(t, intRange(2, 2), attributeUpdatesToNames(r.attributeUpdates))
// Verify that the followup evalID field is set correctly
var annotated *structs.Allocation
for _, a := range r.attributeUpdates {
annotated = a
}
require.Equal(evals[0].ID, annotated.FollowupEvalID)
}
// Tests delayed rescheduling of failed batch allocations and batching of allocs
// with fail times that are close together
func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
require := require.New(t)
// Set desired 4
job := mock.Job()
job.TaskGroups[0].Count = 10
now := time.Now()
// Set up reschedule policy
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 3, Interval: 24 * time.Hour, Delay: delayDur, DelayFunction: "linear"}
tgName := job.TaskGroups[0].Name
// Create 10 existing allocations
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
@@ -1309,6 +1333,7 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark 5 as failed with fail times very close together
for i := 0; i < 5; i++ {
allocs[i].ClientStatus = structs.AllocClientStatusFailed
@@ -1343,19 +1368,21 @@ func TestReconciler_RescheduleLaterWithBatchedEvals_Batch(t *testing.T) {
createDeployment: nil,
deploymentUpdates: nil,
place: 0,
inplace: 7,
inplace: 0,
attributeUpdates: 7,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 0,
InPlaceUpdate: 7,
Ignore: 3,
InPlaceUpdate: 0,
Ignore: 10,
},
},
})
assertNamesHaveIndexes(t, intRange(0, 6), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
for _, alloc := range r.inplaceUpdate {
assertNamesHaveIndexes(t, intRange(0, 6), attributeUpdatesToNames(r.attributeUpdates))
// Verify that the followup evalID field is set correctly
for _, alloc := range r.attributeUpdates {
if allocNameToIndex(alloc.Name) < 5 {
require.Equal(evals[0].ID, alloc.FollowupEvalID)
} else if allocNameToIndex(alloc.Name) < 7 {
@@ -1447,11 +1474,13 @@ func TestReconciler_RescheduleNow_Batch(t *testing.T) {
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_RescheduleLater_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
delayDur := 15 * time.Second
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: delayDur, MaxDelay: 1 * time.Hour}
@@ -1467,8 +1496,10 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) {
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
@@ -1498,33 +1529,49 @@ func TestReconciler_RescheduleLater_Service(t *testing.T) {
createDeployment: nil,
deploymentUpdates: nil,
place: 1,
inplace: 1,
inplace: 0,
attributeUpdates: 1,
stop: 0,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 1,
InPlaceUpdate: 1,
Ignore: 3,
InPlaceUpdate: 0,
Ignore: 4,
},
},
})
assertNamesHaveIndexes(t, intRange(4, 4), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(1, 1), allocsToNames(r.inplaceUpdate))
// verify that the followup evalID field is set correctly
r.inplaceUpdate[0].EvalID = evals[0].ID
assertNamesHaveIndexes(t, intRange(1, 1), attributeUpdatesToNames(r.attributeUpdates))
// Verify that the followup evalID field is set correctly
var annotated *structs.Allocation
for _, a := range r.attributeUpdates {
annotated = a
}
require.Equal(evals[0].ID, annotated.FollowupEvalID)
}
// Tests rescheduling failed service allocations with desired state stop
func TestReconciler_RescheduleNow_Service(t *testing.T) {
require := require.New(t)
// Set desired 5
job := mock.Job()
job.TaskGroups[0].Count = 5
tgName := job.TaskGroups[0].Name
now := time.Now()
// Set up reschedule policy
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{Attempts: 1, Interval: 24 * time.Hour, Delay: 5 * time.Second, MaxDelay: 1 * time.Hour}
// Set up reschedule policy and update stanza
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 24 * time.Hour,
Delay: 5 * time.Second,
DelayFunction: "",
MaxDelay: 1 * time.Hour,
Unlimited: false,
}
job.TaskGroups[0].Update = noCanaryUpdate
// Create 5 existing allocations
var allocs []*structs.Allocation
@@ -1537,8 +1584,10 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) {
allocs = append(allocs, alloc)
alloc.ClientStatus = structs.AllocClientStatusRunning
}
// Mark two as failed
allocs[0].ClientStatus = structs.AllocClientStatusFailed
// Mark one of them as already rescheduled once
allocs[0].RescheduleTracker = &structs.RescheduleTracker{Events: []*structs.RescheduleEvent{
{RescheduleTime: time.Now().Add(-1 * time.Hour).UTC().UnixNano(),
@@ -1576,8 +1625,8 @@ func TestReconciler_RescheduleNow_Service(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place))
// Rescheduled allocs should have previous allocs
assertNamesHaveIndexes(t, intRange(1, 1, 4, 4), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 1, r.place)
assertPlacementsAreRescheduled(t, 1, r.place)
}
@@ -1856,14 +1905,16 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
// Tests the reconciler creates a deployment for inplace updates
func TestReconciler_CreateDeployment_RollingUpgrade_Inplace(t *testing.T) {
job := mock.Job()
jobOld := mock.Job()
job := jobOld.Copy()
job.Version++
job.TaskGroups[0].Update = noCanaryUpdate
// Create 10 allocations from the old job
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.Job = jobOld
alloc.JobID = job.ID
alloc.NodeID = uuid.Generate()
alloc.Name = structs.AllocName(job.ID, job.TaskGroups[0].Name, uint(i))