Merge pull request #8691 from hashicorp/b-reschedule-job-versions

Respect alloc job version for lost/failed allocs
This commit is contained in:
Mahmood Ali
2020-08-25 18:02:45 -04:00
committed by GitHub
10 changed files with 275 additions and 15 deletions

View File

@@ -5,6 +5,10 @@ IMPROVEMENTS:
* api: Added node purge SDK functionality. [[GH-8142](https://github.com/hashicorp/nomad/issues/8142)]
* driver/docker: Allow configurable image pull context timeout setting. [[GH-5718](https://github.com/hashicorp/nomad/issues/5718)]
BUG FIXES:
* core: Fixed a bug where unpromoted job versions are used when rescheduling failed allocations [[GH-8691](https://github.com/hashicorp/nomad/issues/8691)]
## 0.12.3 (August 13, 2020)
BUG FIXES:

View File

@@ -9850,12 +9850,14 @@ func (p *Plan) PopUpdate(alloc *Allocation) {
}
}
func (p *Plan) AppendAlloc(alloc *Allocation) {
// AppendAlloc appends the alloc to the plan allocations.
// Uses the passed job if explicitly passed, otherwise
// it is assumed the alloc will use the plan Job version.
func (p *Plan) AppendAlloc(alloc *Allocation, job *Job) {
node := alloc.NodeID
existing := p.NodeAllocation[node]
// Normalize the job
alloc.Job = nil
alloc.Job = job
p.NodeAllocation[node] = append(existing, alloc)
}

View File

@@ -2,6 +2,7 @@ package scheduler
import (
"fmt"
"sort"
"time"
log "github.com/hashicorp/go-hclog"
@@ -387,12 +388,12 @@ func (s *GenericScheduler) computeJobAllocs() error {
update.DeploymentID = s.deployment.GetID()
update.DeploymentStatus = nil
}
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}
// Handle the annotation updates
for _, update := range results.attributeUpdates {
s.ctx.Plan().AppendAlloc(update)
s.ctx.Plan().AppendAlloc(update, nil)
}
// Nothing remaining to do if placement is not required
@@ -429,6 +430,32 @@ func (s *GenericScheduler) computeJobAllocs() error {
return s.computePlacements(destructive, place)
}
// downgradedJobForPlacement returns the job appropriate for non-canary placement replacement
func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string, *structs.Job, error) {
ns, jobID := s.job.Namespace, s.job.ID
tgName := p.TaskGroup().Name
// find deployments and use the latest promoted or canaried version
deployments, err := s.state.DeploymentsByJobID(nil, ns, jobID, false)
if err != nil {
return "", nil, fmt.Errorf("failed to lookup job deployments: %v", err)
}
sort.Slice(deployments, func(i, j int) bool { return deployments[i].JobVersion > deployments[i].JobVersion })
for _, d := range deployments {
// It's unexpected to have a recent deployment that doesn't contain the TaskGroup; as all allocations
// should be destroyed. In such cases, attempt to find the deployment for that TaskGroup and hopefully
// we will kill it soon. This is a defensive measure, have not seen it in practice
//
// Zero dstate.DesiredCanaries indicates that the TaskGroup allocates were updated in-place without using canaries.
if dstate := d.TaskGroups[tgName]; dstate != nil && (dstate.Promoted || dstate.DesiredCanaries == 0) {
job, err := s.state.JobByIDAndVersion(nil, ns, jobID, d.JobVersion)
return d.ID, job, err
}
}
return "", nil, nil
}
// computePlacements computes placements for allocations. It is given the set of
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
@@ -457,12 +484,40 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Get the task group
tg := missing.TaskGroup()
var downgradedJob *structs.Job
if missing.DowngradeNonCanary() {
jobDeploymentID, job, err := s.downgradedJobForPlacement(missing)
if err != nil {
return err
}
// Defensive check - if there is no appropriate deployment for this job, use the latest
if job != nil && job.Version >= missing.MinJobVersion() && job.LookupTaskGroup(tg.Name) != nil {
tg = job.LookupTaskGroup(tg.Name)
downgradedJob = job
deploymentID = jobDeploymentID
} else {
jobVersion := -1
if job != nil {
jobVersion = int(job.Version)
}
s.logger.Debug("failed to find appropriate job; using the latest", "expected_version", missing.MinJobVersion, "found_version", jobVersion)
}
}
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
continue
}
// Use downgraded job in scheduling stack to honor
// old job resources and constraints
if downgradedJob != nil {
s.stack.SetJob(downgradedJob)
}
// Find the preferred node
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
@@ -489,6 +544,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
// Restore stack job now that placement is done, to use plan job version
if downgradedJob != nil {
s.stack.SetJob(s.job)
}
// Set fields based on if we found an allocation option
if option != nil {
resources := &structs.AllocatedResources{
@@ -547,7 +607,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
s.handlePreemptions(option, alloc, missing)
// Track the placement
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, downgradedJob)
} else {
// Lazy initialize the failed map

View File

@@ -5342,3 +5342,162 @@ func TestServiceSched_Preemption(t *testing.T) {
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}
// TestServiceSched_Migrate_CanaryStatus asserts that migrations/rescheduling
// of allocations use the proper versions of allocs rather than latest:
// Canaries should be replaced by canaries, and non-canaries should be replaced
// with the latest promoted version.
func TestServiceSched_Migrate_CanaryStatus(t *testing.T) {
h := NewHarness(t)
node1 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))
totalCount := 3
desiredCanaries := 1
job := mock.Job()
job.Stable = true
job.TaskGroups[0].Count = totalCount
job.TaskGroups[0].Update = &structs.UpdateStrategy{
MaxParallel: 1,
Canary: desiredCanaries,
}
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job))
deployment := &structs.Deployment{
ID: uuid.Generate(),
JobID: job.ID,
Namespace: job.Namespace,
JobVersion: job.Version,
JobModifyIndex: job.JobModifyIndex,
JobCreateIndex: job.CreateIndex,
TaskGroups: map[string]*structs.DeploymentState{
"web": {DesiredTotal: totalCount},
},
Status: structs.DeploymentStatusSuccessful,
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
}
require.NoError(t, h.State.UpsertDeployment(h.NextIndex(), deployment))
var allocs []*structs.Allocation
for i := 0; i < 3; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node1.ID
alloc.DeploymentID = deployment.ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
require.NoError(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// new update with new task group
job2 := job.Copy()
job2.Stable = false
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
require.NoError(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
// Ensure a single plan
require.Len(t, h.Plans, 1)
plan := h.Plans[0]
// Ensure a deployment was created
require.NotNil(t, plan.Deployment)
updateDeployment := plan.Deployment.ID
// Check status first - should be 4 allocs, only one is canary
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 4)
sort.Slice(allocs, func(i, j int) bool { return allocs[i].CreateIndex < allocs[j].CreateIndex })
for _, a := range allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.False(t, a.DeploymentStatus.IsCanary())
require.Equal(t, node1.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, allocs[3].DesiredStatus)
require.Equal(t, uint64(1), allocs[3].Job.Version)
require.True(t, allocs[3].DeploymentStatus.Canary)
require.Equal(t, node1.ID, allocs[3].NodeID)
require.Equal(t, updateDeployment, allocs[3].DeploymentID)
}
// now, drain node1 and ensure all are migrated to node2
node1 = node1.Copy()
node1.Status = structs.NodeStatusDown
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node1))
node2 := mock.Node()
require.NoError(t, h.State.UpsertNode(h.NextIndex(), node2))
neval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
NodeID: node1.ID,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{neval}))
// Process the evaluation
err = h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
// Now test that all node1 allocs are migrated while preserving Version and Canary info
{
ws := memdb.NewWatchSet()
allocs, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, true)
require.NoError(t, err)
require.Len(t, allocs, 8)
nodeAllocs := map[string][]*structs.Allocation{}
for _, a := range allocs {
nodeAllocs[a.NodeID] = append(nodeAllocs[a.NodeID], a)
}
require.Len(t, nodeAllocs[node1.ID], 4)
for _, a := range nodeAllocs[node1.ID] {
require.Equal(t, structs.AllocDesiredStatusStop, a.DesiredStatus)
require.Equal(t, node1.ID, a.NodeID)
}
node2Allocs := nodeAllocs[node2.ID]
require.Len(t, node2Allocs, 4)
sort.Slice(node2Allocs, func(i, j int) bool { return node2Allocs[i].Job.Version < node2Allocs[j].Job.Version })
for _, a := range node2Allocs[:3] {
require.Equal(t, structs.AllocDesiredStatusRun, a.DesiredStatus)
require.Equal(t, uint64(0), a.Job.Version)
require.Equal(t, node2.ID, a.NodeID)
require.Equal(t, deployment.ID, a.DeploymentID)
}
require.Equal(t, structs.AllocDesiredStatusRun, node2Allocs[3].DesiredStatus)
require.Equal(t, uint64(1), node2Allocs[3].Job.Version)
require.Equal(t, node2.ID, node2Allocs[3].NodeID)
require.Equal(t, updateDeployment, node2Allocs[3].DeploymentID)
}
}

View File

@@ -424,10 +424,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// The fact that we have destructive updates and have less canaries than is
// desired means we need to create canaries
numDestructive := len(destructive)
strategy := tg.Update
canariesPromoted := dstate != nil && dstate.Promoted
requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
replaceAllAllocs := len(untainted) == 0 && len(migrate)+len(lost) != 0
requireCanary := (len(destructive) != 0 || replaceAllAllocs) &&
strategy != nil && len(canaries) < strategy.Canary && !canariesPromoted
if requireCanary {
dstate.DesiredCanaries = strategy.Canary
}
@@ -455,7 +456,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * There is no delayed stop_after_client_disconnect alloc, which delays scheduling for the whole group
var place []allocPlaceResult
if len(lostLater) == 0 {
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow)
place = a.computePlacements(tg, nameIndex, untainted, migrate, rescheduleNow, canaryState)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@@ -533,9 +534,12 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
canary: false,
canary: alloc.DeploymentStatus.IsCanary(),
taskGroup: tg,
previousAlloc: alloc,
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
@@ -708,7 +712,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
// computePlacement returns the set of allocations to place given the group
// definition, the set of untainted, migrating and reschedule allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet, canaryState bool) []allocPlaceResult {
// Add rescheduled placement results
var place []allocPlaceResult
@@ -719,6 +723,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
previousAlloc: alloc,
reschedule: true,
canary: alloc.DeploymentStatus.IsCanary(),
downgradeNonCanary: canaryState && !alloc.DeploymentStatus.IsCanary(),
minJobVersion: alloc.Job.Version,
})
}
@@ -732,8 +739,9 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
name: name,
taskGroup: group,
downgradeNonCanary: canaryState,
})
}
}

View File

@@ -34,6 +34,12 @@ type placementResult interface {
// StopPreviousAlloc returns whether the previous allocation should be
// stopped and if so the status description.
StopPreviousAlloc() (bool, string)
// DowngradeNonCanary indicates that placement should use the latest stable job
// with the MinJobVersion, rather than the current deployment version
DowngradeNonCanary() bool
MinJobVersion() uint64
}
// allocStopResult contains the information required to stop a single allocation
@@ -52,6 +58,9 @@ type allocPlaceResult struct {
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
reschedule bool
downgradeNonCanary bool
minJobVersion uint64
}
func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup }
@@ -60,6 +69,8 @@ func (a allocPlaceResult) Canary() bool { return a.ca
func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc }
func (a allocPlaceResult) IsRescheduling() bool { return a.reschedule }
func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" }
func (a allocPlaceResult) DowngradeNonCanary() bool { return a.downgradeNonCanary }
func (a allocPlaceResult) MinJobVersion() uint64 { return a.minJobVersion }
// allocDestructiveResult contains the information required to do a destructive
// update. Destructive changes should be applied atomically, as in the old alloc
@@ -79,6 +90,8 @@ func (a allocDestructiveResult) IsRescheduling() bool { retur
func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) {
return true, a.stopStatusDescription
}
func (a allocDestructiveResult) DowngradeNonCanary() bool { return false }
func (a allocDestructiveResult) MinJobVersion() uint64 { return 0 }
// allocMatrix is a mapping of task groups to their allocation set.
type allocMatrix map[string]allocSet

View File

@@ -88,6 +88,12 @@ type State interface {
// GetJobByID is used to lookup a job by ID
JobByID(ws memdb.WatchSet, namespace, id string) (*structs.Job, error)
// DeploymentsByJobID returns the deployments associated with the job
DeploymentsByJobID(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Deployment, error)
// JobByIDAndVersion returns the job associated with id and specific version
JobByIDAndVersion(ws memdb.WatchSet, namespace, id string, version uint64) (*structs.Job, error)
// LatestDeploymentByJobID returns the latest deployment matching the given
// job ID
LatestDeploymentByJobID(ws memdb.WatchSet, namespace, jobID string) (*structs.Deployment, error)

View File

@@ -46,6 +46,7 @@ type GenericStack struct {
wrappedChecks *FeasibilityWrapper
quota FeasibleIterator
jobVersion *uint64
jobConstraint *ConstraintChecker
taskGroupDrivers *DriverChecker
taskGroupConstraint *ConstraintChecker
@@ -89,6 +90,13 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
}
func (s *GenericStack) SetJob(job *structs.Job) {
if s.jobVersion != nil && *s.jobVersion == job.Version {
return
}
jobVer := job.Version
s.jobVersion = &jobVer
s.jobConstraint.SetConstraints(job.Constraints)
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)

View File

@@ -398,7 +398,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
alloc.PreemptedAllocations = preemptedAllocIDs
}
s.plan.AppendAlloc(alloc)
s.plan.AppendAlloc(alloc, nil)
}
return nil

View File

@@ -655,7 +655,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
},
}
newAlloc.Metrics = ctx.Metrics()
ctx.Plan().AppendAlloc(newAlloc)
ctx.Plan().AppendAlloc(newAlloc, nil)
// Remove this allocation from the slice
doInplace(&i, &n, &inplaceCount)