diff --git a/api/jobs.go b/api/jobs.go index b56b9c864..f84a6bd52 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -343,26 +343,28 @@ type periodicForceResponse struct { // UpdateStrategy defines a task groups update strategy. type UpdateStrategy struct { - Stagger *time.Duration `mapstructure:"stagger"` - MaxParallel *int `mapstructure:"max_parallel"` - HealthCheck *string `mapstructure:"health_check"` - MinHealthyTime *time.Duration `mapstructure:"min_healthy_time"` - HealthyDeadline *time.Duration `mapstructure:"healthy_deadline"` - AutoRevert *bool `mapstructure:"auto_revert"` - Canary *int `mapstructure:"canary"` + Stagger *time.Duration `mapstructure:"stagger"` + MaxParallel *int `mapstructure:"max_parallel"` + HealthCheck *string `mapstructure:"health_check"` + MinHealthyTime *time.Duration `mapstructure:"min_healthy_time"` + HealthyDeadline *time.Duration `mapstructure:"healthy_deadline"` + ProgressDeadline *time.Duration `mapstructure:"progress_deadline"` + AutoRevert *bool `mapstructure:"auto_revert"` + Canary *int `mapstructure:"canary"` } // DefaultUpdateStrategy provides a baseline that can be used to upgrade // jobs with the old policy or for populating field defaults. func DefaultUpdateStrategy() *UpdateStrategy { return &UpdateStrategy{ - Stagger: helper.TimeToPtr(30 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(5 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(30 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + ProgressDeadline: helper.TimeToPtr(15 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), } } @@ -393,6 +395,10 @@ func (u *UpdateStrategy) Copy() *UpdateStrategy { copy.HealthyDeadline = helper.TimeToPtr(*u.HealthyDeadline) } + if u.ProgressDeadline != nil { + copy.ProgressDeadline = helper.TimeToPtr(*u.ProgressDeadline) + } + if u.AutoRevert != nil { copy.AutoRevert = helper.BoolToPtr(*u.AutoRevert) } @@ -429,6 +435,10 @@ func (u *UpdateStrategy) Merge(o *UpdateStrategy) { u.HealthyDeadline = helper.TimeToPtr(*o.HealthyDeadline) } + if o.ProgressDeadline != nil { + u.ProgressDeadline = helper.TimeToPtr(*o.ProgressDeadline) + } + if o.AutoRevert != nil { u.AutoRevert = helper.BoolToPtr(*o.AutoRevert) } @@ -457,6 +467,10 @@ func (u *UpdateStrategy) Canonicalize() { u.HealthyDeadline = d.HealthyDeadline } + if u.ProgressDeadline == nil { + u.ProgressDeadline = d.ProgressDeadline + } + if u.MinHealthyTime == nil { u.MinHealthyTime = d.MinHealthyTime } @@ -496,6 +510,10 @@ func (u *UpdateStrategy) Empty() bool { return false } + if u.ProgressDeadline != nil && *u.ProgressDeadline != 0 { + return false + } + if u.AutoRevert != nil && *u.AutoRevert { return false } diff --git a/api/jobs_test.go b/api/jobs_test.go index 9119f5118..939f78487 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -354,13 +354,14 @@ func TestJobs_Canonicalize(t *testing.T) { JobModifyIndex: helper.Uint64ToPtr(0), Datacenters: []string{"dc1"}, Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(30 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(5 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(30 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + ProgressDeadline: helper.TimeToPtr(15 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), }, TaskGroups: []*TaskGroup{ { @@ -387,13 +388,14 @@ func TestJobs_Canonicalize(t *testing.T) { }, Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(30 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(5 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(30 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + ProgressDeadline: helper.TimeToPtr(15 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), }, Migrate: DefaultMigrateStrategy(), Tasks: []*Task{ @@ -515,13 +517,14 @@ func TestJobs_Canonicalize(t *testing.T) { ID: helper.StringToPtr("bar"), ParentID: helper.StringToPtr("lol"), Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(1 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(6 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(1 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(6 * time.Minute), + ProgressDeadline: helper.TimeToPtr(7 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), }, TaskGroups: []*TaskGroup{ { @@ -569,13 +572,14 @@ func TestJobs_Canonicalize(t *testing.T) { ModifyIndex: helper.Uint64ToPtr(0), JobModifyIndex: helper.Uint64ToPtr(0), Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(1 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(6 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(1 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(6 * time.Minute), + ProgressDeadline: helper.TimeToPtr(7 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), }, TaskGroups: []*TaskGroup{ { @@ -601,13 +605,14 @@ func TestJobs_Canonicalize(t *testing.T) { Unlimited: helper.BoolToPtr(true), }, Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(2 * time.Second), - MaxParallel: helper.IntToPtr(2), - HealthCheck: helper.StringToPtr("manual"), - MinHealthyTime: helper.TimeToPtr(1 * time.Second), - HealthyDeadline: helper.TimeToPtr(6 * time.Minute), - AutoRevert: helper.BoolToPtr(true), - Canary: helper.IntToPtr(1), + Stagger: helper.TimeToPtr(2 * time.Second), + MaxParallel: helper.IntToPtr(2), + HealthCheck: helper.StringToPtr("manual"), + MinHealthyTime: helper.TimeToPtr(1 * time.Second), + HealthyDeadline: helper.TimeToPtr(6 * time.Minute), + ProgressDeadline: helper.TimeToPtr(7 * time.Minute), + AutoRevert: helper.BoolToPtr(true), + Canary: helper.IntToPtr(1), }, Migrate: DefaultMigrateStrategy(), Tasks: []*Task{ @@ -642,13 +647,14 @@ func TestJobs_Canonicalize(t *testing.T) { Unlimited: helper.BoolToPtr(true), }, Update: &UpdateStrategy{ - Stagger: helper.TimeToPtr(1 * time.Second), - MaxParallel: helper.IntToPtr(1), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(6 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(0), + Stagger: helper.TimeToPtr(1 * time.Second), + MaxParallel: helper.IntToPtr(1), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(6 * time.Minute), + ProgressDeadline: helper.TimeToPtr(7 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(0), }, Migrate: DefaultMigrateStrategy(), Tasks: []*Task{ diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 28ccd00cc..61410e576 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -577,7 +577,9 @@ func (r *AllocRunner) Alloc() *structs.Allocation { // The health has been set if r.allocHealth != nil { if alloc.DeploymentStatus == nil { - alloc.DeploymentStatus = &structs.AllocDeploymentStatus{} + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{ + Timestamp: time.Now(), + } } alloc.DeploymentStatus.Healthy = helper.BoolToPtr(*r.allocHealth) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 510680002..e74b358d5 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -693,13 +693,14 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { if taskGroup.Update != nil { tg.Update = &structs.UpdateStrategy{ - Stagger: *taskGroup.Update.Stagger, - MaxParallel: *taskGroup.Update.MaxParallel, - HealthCheck: *taskGroup.Update.HealthCheck, - MinHealthyTime: *taskGroup.Update.MinHealthyTime, - HealthyDeadline: *taskGroup.Update.HealthyDeadline, - AutoRevert: *taskGroup.Update.AutoRevert, - Canary: *taskGroup.Update.Canary, + Stagger: *taskGroup.Update.Stagger, + MaxParallel: *taskGroup.Update.MaxParallel, + HealthCheck: *taskGroup.Update.HealthCheck, + MinHealthyTime: *taskGroup.Update.MinHealthyTime, + HealthyDeadline: *taskGroup.Update.HealthyDeadline, + ProgressDeadline: *taskGroup.Update.ProgressDeadline, + AutoRevert: *taskGroup.Update.AutoRevert, + Canary: *taskGroup.Update.Canary, } } diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index fb129dc91..1a27eecf8 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1161,13 +1161,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { }, }, Update: &api.UpdateStrategy{ - Stagger: helper.TimeToPtr(1 * time.Second), - MaxParallel: helper.IntToPtr(5), - HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Manual), - MinHealthyTime: helper.TimeToPtr(1 * time.Minute), - HealthyDeadline: helper.TimeToPtr(3 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(1), + Stagger: helper.TimeToPtr(1 * time.Second), + MaxParallel: helper.IntToPtr(5), + HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Manual), + MinHealthyTime: helper.TimeToPtr(1 * time.Minute), + HealthyDeadline: helper.TimeToPtr(3 * time.Minute), + ProgressDeadline: helper.TimeToPtr(3 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(1), }, Periodic: &api.PeriodicConfig{ Enabled: helper.BoolToPtr(true), @@ -1222,10 +1223,11 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Migrate: helper.BoolToPtr(true), }, Update: &api.UpdateStrategy{ - HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Checks), - MinHealthyTime: helper.TimeToPtr(2 * time.Minute), - HealthyDeadline: helper.TimeToPtr(5 * time.Minute), - AutoRevert: helper.BoolToPtr(true), + HealthCheck: helper.StringToPtr(structs.UpdateStrategyHealthCheck_Checks), + MinHealthyTime: helper.TimeToPtr(2 * time.Minute), + HealthyDeadline: helper.TimeToPtr(5 * time.Minute), + ProgressDeadline: helper.TimeToPtr(5 * time.Minute), + AutoRevert: helper.BoolToPtr(true), }, Meta: map[string]string{ @@ -1446,13 +1448,14 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Migrate: true, }, Update: &structs.UpdateStrategy{ - Stagger: 1 * time.Second, - MaxParallel: 5, - HealthCheck: structs.UpdateStrategyHealthCheck_Checks, - MinHealthyTime: 2 * time.Minute, - HealthyDeadline: 5 * time.Minute, - AutoRevert: true, - Canary: 1, + Stagger: 1 * time.Second, + MaxParallel: 5, + HealthCheck: structs.UpdateStrategyHealthCheck_Checks, + MinHealthyTime: 2 * time.Minute, + HealthyDeadline: 5 * time.Minute, + ProgressDeadline: 5 * time.Minute, + AutoRevert: true, + Canary: 1, }, Meta: map[string]string{ "key": "value", diff --git a/jobspec/parse.go b/jobspec/parse.go index 63157140e..e610d8dfe 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -1322,6 +1322,7 @@ func parseUpdate(result **api.UpdateStrategy, list *ast.ObjectList) error { "health_check", "min_healthy_time", "healthy_deadline", + "progress_deadline", "auto_revert", "canary", } diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 69875f771..108bde845 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -47,13 +47,14 @@ func TestParse(t *testing.T) { }, Update: &api.UpdateStrategy{ - Stagger: helper.TimeToPtr(60 * time.Second), - MaxParallel: helper.IntToPtr(2), - HealthCheck: helper.StringToPtr("manual"), - MinHealthyTime: helper.TimeToPtr(10 * time.Second), - HealthyDeadline: helper.TimeToPtr(10 * time.Minute), - AutoRevert: helper.BoolToPtr(true), - Canary: helper.IntToPtr(1), + Stagger: helper.TimeToPtr(60 * time.Second), + MaxParallel: helper.IntToPtr(2), + HealthCheck: helper.StringToPtr("manual"), + MinHealthyTime: helper.TimeToPtr(10 * time.Second), + HealthyDeadline: helper.TimeToPtr(10 * time.Minute), + ProgressDeadline: helper.TimeToPtr(10 * time.Minute), + AutoRevert: helper.BoolToPtr(true), + Canary: helper.IntToPtr(1), }, TaskGroups: []*api.TaskGroup{ @@ -103,12 +104,13 @@ func TestParse(t *testing.T) { SizeMB: helper.IntToPtr(150), }, Update: &api.UpdateStrategy{ - MaxParallel: helper.IntToPtr(3), - HealthCheck: helper.StringToPtr("checks"), - MinHealthyTime: helper.TimeToPtr(1 * time.Second), - HealthyDeadline: helper.TimeToPtr(1 * time.Minute), - AutoRevert: helper.BoolToPtr(false), - Canary: helper.IntToPtr(2), + MaxParallel: helper.IntToPtr(3), + HealthCheck: helper.StringToPtr("checks"), + MinHealthyTime: helper.TimeToPtr(1 * time.Second), + HealthyDeadline: helper.TimeToPtr(1 * time.Minute), + ProgressDeadline: helper.TimeToPtr(1 * time.Minute), + AutoRevert: helper.BoolToPtr(false), + Canary: helper.IntToPtr(2), }, Migrate: &api.MigrateStrategy{ MaxParallel: helper.IntToPtr(2), diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 11f40b883..da425abbd 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -22,6 +22,7 @@ job "binstore-storagelocker" { health_check = "manual" min_healthy_time = "10s" healthy_deadline = "10m" + progress_deadline = "10m" auto_revert = true canary = 1 } @@ -63,6 +64,7 @@ job "binstore-storagelocker" { health_check = "checks" min_healthy_time = "1s" healthy_deadline = "1m" + progress_deadline = "1m" auto_revert = false canary = 2 } diff --git a/nomad/deploymentwatcher/deployment_watcher.go b/nomad/deploymentwatcher/deployment_watcher.go index 80cf9737d..79052843e 100644 --- a/nomad/deploymentwatcher/deployment_watcher.go +++ b/nomad/deploymentwatcher/deployment_watcher.go @@ -2,6 +2,7 @@ package deploymentwatcher import ( "context" + "fmt" "log" "sync" "time" @@ -95,11 +96,12 @@ func newDeploymentWatcher(parent context.Context, queryLimiter *rate.Limiter, } // Start the long lived watcher that scans for allocation updates - go w.watch() + go w.watch2() return w } +// TODO func (w *deploymentWatcher) SetAllocHealth( req *structs.DeploymentAllocHealthRequest, resp *structs.DeploymentUpdateResponse) error { @@ -181,7 +183,6 @@ func (w *deploymentWatcher) SetAllocHealth( if j != nil { resp.RevertedJobVersion = helper.Uint64ToPtr(j.Version) } - w.setLatestEval(index) return nil } @@ -220,7 +221,6 @@ func (w *deploymentWatcher) PromoteDeployment( resp.EvalCreateIndex = index resp.DeploymentModifyIndex = index resp.Index = index - w.setLatestEval(index) return nil } @@ -252,7 +252,6 @@ func (w *deploymentWatcher) PauseDeployment( } resp.DeploymentModifyIndex = i resp.Index = i - w.setLatestEval(i) return nil } @@ -302,7 +301,6 @@ func (w *deploymentWatcher) FailDeployment( if rollbackJob != nil { resp.RevertedJobVersion = helper.Uint64ToPtr(rollbackJob.Version) } - w.setLatestEval(i) return nil } @@ -396,10 +394,8 @@ func (w *deploymentWatcher) watch() { // evaluation. e := w.getEval() u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc) - if index, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { + if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { w.logger.Printf("[ERR] nomad.deployment_watcher: failed to update deployment %q status: %v", w.d.ID, err) - } else { - w.setLatestEval(index) } } else if createEval { // Create an eval to push the deployment along @@ -408,6 +404,214 @@ func (w *deploymentWatcher) watch() { } } +func (w *deploymentWatcher) watch2() { + var currentDeadline time.Time + deadlineTimer := time.NewTimer(0) + deadlineTimer.Stop() + select { + case <-deadlineTimer.C: + default: + } + + allocIndex := uint64(1) + var updates *allocUpdates + + rollback, deadlineHit := false, false + +FAIL: + for { + select { + case <-w.ctx.Done(): + return + case <-deadlineTimer.C: + // We have hit the progress deadline so fail the deployment. We need + // to determine whether we should rollback the job by inspecting + // which allocs as part of the deployment are healthy and which + // aren't. + var err error + rollback, err = w.shouldRollback() + if err != nil { + w.logger.Printf("[ERR] nomad.deployment_watcher: failed to determine whether to rollback job for deployment %q: %v", w.d.ID, err) + } + break FAIL + + case updates = <-w.getAllocsCh(allocIndex): + if err := updates.err; err != nil { + if err == context.Canceled || w.ctx.Err() == context.Canceled { + return + } + + w.logger.Printf("[ERR] nomad.deployment_watcher: failed to retrieve allocations for deployment %q: %v", w.d.ID, err) + return + } + + lastHandled := allocIndex + allocIndex = updates.index + + res, err := w.handleAllocUpdate(updates.allocs, lastHandled) + if err != nil { + if err == context.Canceled || w.ctx.Err() == context.Canceled { + return + } + + w.logger.Printf("[ERR] nomad.deployment_watcher: failed handling allocation updates: %v", err) + return + } + + // Start the deadline timer if given a different deadline + if !res.nextDeadline.Equal(currentDeadline) { + currentDeadline = res.nextDeadline + if deadlineTimer.Reset(res.nextDeadline.Sub(time.Now())) { + <-deadlineTimer.C + } + } + + if res.failDeployment { + rollback = res.rollback + break FAIL + } + + if res.createEval { + // Create an eval to push the deployment along + w.createEvalBatched(allocIndex) + } + } + } + + // Change the deployments status to failed + desc := structs.DeploymentStatusDescriptionFailedAllocations + if deadlineHit { + desc = structs.DeploymentStatusDescriptionProgressDeadline + } + + // Rollback to the old job if necessary + var j *structs.Job + if rollback { + var err error + j, err = w.latestStableJob() + if err != nil { + w.logger.Printf("[ERR] nomad.deployment_watcher: failed to lookup latest stable job for %q: %v", w.d.JobID, err) + } + + // Description should include that the job is being rolled back to + // version N + if j != nil { + j, desc = w.handleRollbackValidity(j, desc) + } else { + desc = structs.DeploymentStatusDescriptionNoRollbackTarget(desc) + } + } + + // Update the status of the deployment to failed and create an evaluation. + e := w.getEval() + u := w.getDeploymentStatusUpdate(structs.DeploymentStatusFailed, desc) + if _, err := w.upsertDeploymentStatusUpdate(u, e, j); err != nil { + w.logger.Printf("[ERR] nomad.deployment_watcher: failed to update deployment %q status: %v", w.d.ID, err) + } +} + +type allocUpdateResult struct { + createEval bool + failDeployment bool + rollback bool + nextDeadline time.Time +} + +func (w *deploymentWatcher) handleAllocUpdate(allocs []*structs.AllocListStub, lastHandled uint64) (allocUpdateResult, error) { + var res allocUpdateResult + + // Get the latest evaluation index + latestEval, err := w.latestEvalIndex() + if err != nil { + if err == context.Canceled || w.ctx.Err() == context.Canceled { + return res, err + } + + return res, fmt.Errorf("failed to determine last evaluation index for job %q: %v", w.d.JobID, err) + } + + for _, alloc := range allocs { + tg := w.j.LookupTaskGroup(alloc.TaskGroup) + upd := tg.Update + if upd == nil { + continue + } + + // We need to create an eval so the job can progress. + if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > latestEval { + res.createEval = true + } + + // If the group is using a deadline, we don't have to do anything but + // determine the next deadline. + if pdeadline := upd.ProgressDeadline; pdeadline != 0 { + // Determine what the deadline would be if it just got created + createDeadline := time.Unix(0, alloc.CreateTime).Add(pdeadline) + if res.nextDeadline.IsZero() || createDeadline.Before(res.nextDeadline) { + res.nextDeadline = createDeadline + } + + // If we just went healthy update the deadline + if alloc.DeploymentStatus.IsHealthy() && alloc.DeploymentStatus.ModifyIndex > lastHandled { + healthyDeadline := alloc.DeploymentStatus.Timestamp.Add(pdeadline) + if healthyDeadline.Before(res.nextDeadline) { + res.nextDeadline = healthyDeadline + } + } + + continue + } + + // Fail on the first bad allocation + if alloc.DeploymentStatus.IsUnhealthy() { + // Check if the group has autorevert set + if upd.AutoRevert { + res.rollback = true + } + + // Since we have an unhealthy allocation, fail the deployment + res.failDeployment = true + } + + // All conditions have been hit so we can break + if res.createEval && res.failDeployment && res.rollback { + break + } + } + + return res, nil +} + +func (w *deploymentWatcher) shouldRollback() (bool, error) { + snap, err := w.state.Snapshot() + if err != nil { + return false, err + } + + d, err := snap.DeploymentByID(nil, w.d.ID) + if err != nil { + return false, err + } + + for tg, state := range d.TaskGroups { + // We have healthy allocs + if state.DesiredTotal == state.HealthyAllocs { + continue + } + + // We don't need to autorevert this group + upd := w.j.LookupTaskGroup(tg).Update + if upd == nil || !upd.AutoRevert { + continue + } + + // Unhealthy allocs and we need to autorevert + return true, nil + } + + return false, nil +} + // latestStableJob returns the latest stable job. It may be nil if none exist func (w *deploymentWatcher) latestStableJob() (*structs.Job, error) { snap, err := w.state.Snapshot() @@ -452,11 +656,8 @@ func (w *deploymentWatcher) createEvalBatched(forIndex uint64) { } // Create the eval - evalCreateIndex, err := w.createEvaluation(w.getEval()) - if err != nil { + if _, err := w.createEvaluation(w.getEval()); err != nil { w.logger.Printf("[ERR] nomad.deployment_watcher: failed to create evaluation for deployment %q: %v", w.d.ID, err) - } else { - w.setLatestEval(evalCreateIndex) } w.l.Lock() @@ -489,6 +690,28 @@ func (w *deploymentWatcher) getDeploymentStatusUpdate(status, desc string) *stru } } +type allocUpdates struct { + allocs []*structs.AllocListStub + index uint64 + err error +} + +// getAllocsCh retrieves the allocations that are part of the deployment blocking +// at the given index. +func (w *deploymentWatcher) getAllocsCh(index uint64) <-chan *allocUpdates { + out := make(chan *allocUpdates, 1) + go func() { + allocs, index, err := w.getAllocs(index) + out <- &allocUpdates{ + allocs: allocs, + index: index, + err: err, + } + }() + + return out +} + // getAllocs retrieves the allocations that are part of the deployment blocking // at the given index. func (w *deploymentWatcher) getAllocs(index uint64) ([]*structs.AllocListStub, uint64, error) { @@ -549,36 +772,14 @@ func (w *deploymentWatcher) latestEvalIndex() (uint64, error) { if len(evals) == 0 { idx, err := snap.Index("evals") - if err != nil { - w.setLatestEval(idx) - } return idx, err } // Prefer using the snapshot index. Otherwise use the create index e := evals[0] if e.SnapshotIndex != 0 { - w.setLatestEval(e.SnapshotIndex) return e.SnapshotIndex, nil } - w.setLatestEval(e.CreateIndex) return e.CreateIndex, nil } - -// setLatestEval sets the given index as the latest eval unless the currently -// stored index is higher. -func (w *deploymentWatcher) setLatestEval(index uint64) { - w.l.Lock() - defer w.l.Unlock() - if index > w.latestEval { - w.latestEval = index - } -} - -// getLatestEval returns the latest eval index. -func (w *deploymentWatcher) getLatestEval() uint64 { - w.l.Lock() - defer w.l.Unlock() - return w.latestEval -} diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index f3b3557f9..3b710128a 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -1787,6 +1787,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "0", New: "", }, + { + Type: DiffTypeDeleted, + Name: "ProgressDeadline", + Old: "0", + New: "", + }, }, }, }, @@ -1837,6 +1843,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "", New: "0", }, + { + Type: DiffTypeAdded, + Name: "ProgressDeadline", + Old: "", + New: "0", + }, }, }, }, @@ -1846,22 +1858,24 @@ func TestTaskGroupDiff(t *testing.T) { // Update strategy edited Old: &TaskGroup{ Update: &UpdateStrategy{ - MaxParallel: 5, - HealthCheck: "foo", - MinHealthyTime: 1 * time.Second, - HealthyDeadline: 30 * time.Second, - AutoRevert: true, - Canary: 2, + MaxParallel: 5, + HealthCheck: "foo", + MinHealthyTime: 1 * time.Second, + HealthyDeadline: 30 * time.Second, + ProgressDeadline: 29 * time.Second, + AutoRevert: true, + Canary: 2, }, }, New: &TaskGroup{ Update: &UpdateStrategy{ - MaxParallel: 7, - HealthCheck: "bar", - MinHealthyTime: 2 * time.Second, - HealthyDeadline: 31 * time.Second, - AutoRevert: false, - Canary: 1, + MaxParallel: 7, + HealthCheck: "bar", + MinHealthyTime: 2 * time.Second, + HealthyDeadline: 31 * time.Second, + ProgressDeadline: 32 * time.Second, + AutoRevert: false, + Canary: 1, }, }, Expected: &TaskGroupDiff{ @@ -1907,6 +1921,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1000000000", New: "2000000000", }, + { + Type: DiffTypeEdited, + Name: "ProgressDeadline", + Old: "29000000000", + New: "32000000000", + }, }, }, }, @@ -1917,22 +1937,24 @@ func TestTaskGroupDiff(t *testing.T) { Contextual: true, Old: &TaskGroup{ Update: &UpdateStrategy{ - MaxParallel: 5, - HealthCheck: "foo", - MinHealthyTime: 1 * time.Second, - HealthyDeadline: 30 * time.Second, - AutoRevert: true, - Canary: 2, + MaxParallel: 5, + HealthCheck: "foo", + MinHealthyTime: 1 * time.Second, + HealthyDeadline: 30 * time.Second, + ProgressDeadline: 30 * time.Second, + AutoRevert: true, + Canary: 2, }, }, New: &TaskGroup{ Update: &UpdateStrategy{ - MaxParallel: 7, - HealthCheck: "foo", - MinHealthyTime: 1 * time.Second, - HealthyDeadline: 30 * time.Second, - AutoRevert: true, - Canary: 2, + MaxParallel: 7, + HealthCheck: "foo", + MinHealthyTime: 1 * time.Second, + HealthyDeadline: 30 * time.Second, + ProgressDeadline: 30 * time.Second, + AutoRevert: true, + Canary: 2, }, }, Expected: &TaskGroupDiff{ @@ -1978,6 +2000,12 @@ func TestTaskGroupDiff(t *testing.T) { Old: "1000000000", New: "1000000000", }, + { + Type: DiffTypeNone, + Name: "ProgressDeadline", + Old: "30000000000", + New: "30000000000", + }, }, }, }, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 40a9a34d3..f8006eae3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2478,13 +2478,14 @@ var ( // DefaultUpdateStrategy provides a baseline that can be used to upgrade // jobs with the old policy or for populating field defaults. DefaultUpdateStrategy = &UpdateStrategy{ - Stagger: 30 * time.Second, - MaxParallel: 1, - HealthCheck: UpdateStrategyHealthCheck_Checks, - MinHealthyTime: 10 * time.Second, - HealthyDeadline: 5 * time.Minute, - AutoRevert: false, - Canary: 0, + Stagger: 30 * time.Second, + MaxParallel: 1, + HealthCheck: UpdateStrategyHealthCheck_Checks, + MinHealthyTime: 10 * time.Second, + HealthyDeadline: 5 * time.Minute, + ProgressDeadline: 15 * time.Minute, + AutoRevert: false, + Canary: 0, } ) @@ -2511,6 +2512,12 @@ type UpdateStrategy struct { // period doesn't count against the MinHealthyTime. HealthyDeadline time.Duration + // ProgressDeadline is the time in which an allocation as part of the + // deployment must transistion to healthy. If no allocation becomes healthy + // after the deadline, the deployment is marked as failed. If the deadline + // is zero, the first failure causes the deployment to fail. + ProgressDeadline time.Duration + // AutoRevert declares that if a deployment fails because of unhealthy // allocations, there should be an attempt to auto-revert the job to a // stable version. @@ -2555,9 +2562,15 @@ func (u *UpdateStrategy) Validate() error { if u.HealthyDeadline <= 0 { multierror.Append(&mErr, fmt.Errorf("Healthy deadline must be greater than zero: %v", u.HealthyDeadline)) } + if u.ProgressDeadline < 0 { + multierror.Append(&mErr, fmt.Errorf("Progress deadline must be zero or greater: %v", u.ProgressDeadline)) + } if u.MinHealthyTime >= u.HealthyDeadline { multierror.Append(&mErr, fmt.Errorf("Minimum healthy time must be less than healthy deadline: %v > %v", u.MinHealthyTime, u.HealthyDeadline)) } + if u.HealthyDeadline >= u.ProgressDeadline { + multierror.Append(&mErr, fmt.Errorf("Healthy deadline must be less than progress deadline: %v > %v", u.HealthyDeadline, u.ProgressDeadline)) + } if u.Stagger <= 0 { multierror.Append(&mErr, fmt.Errorf("Stagger must be greater than zero: %v", u.Stagger)) } @@ -5300,6 +5313,7 @@ const ( DeploymentStatusDescriptionStoppedJob = "Cancelled because job is stopped" DeploymentStatusDescriptionNewerJob = "Cancelled due to newer version of job" DeploymentStatusDescriptionFailedAllocations = "Failed due to unhealthy allocations" + DeploymentStatusDescriptionProgressDeadline = "Failed due to progress deadline" DeploymentStatusDescriptionFailedByUser = "Deployment marked as failed" ) @@ -6172,6 +6186,9 @@ type AllocDeploymentStatus struct { // healthy or unhealthy. Healthy *bool + // Timestamp is the time at which the health status was set. + Timestamp time.Time + // ModifyIndex is the raft index in which the deployment status was last // changed. ModifyIndex uint64 diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index ec65fe755..391a73f13 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1565,12 +1565,13 @@ func TestConstraint_Validate(t *testing.T) { func TestUpdateStrategy_Validate(t *testing.T) { u := &UpdateStrategy{ - MaxParallel: 0, - HealthCheck: "foo", - MinHealthyTime: -10, - HealthyDeadline: -15, - AutoRevert: false, - Canary: -1, + MaxParallel: 0, + HealthCheck: "foo", + MinHealthyTime: -10, + HealthyDeadline: -15, + ProgressDeadline: -25, + AutoRevert: false, + Canary: -1, } err := u.Validate() @@ -1590,7 +1591,13 @@ func TestUpdateStrategy_Validate(t *testing.T) { if !strings.Contains(mErr.Errors[4].Error(), "Healthy deadline must be greater than zero") { t.Fatalf("err: %s", err) } - if !strings.Contains(mErr.Errors[5].Error(), "Minimum healthy time must be less than healthy deadline") { + if !strings.Contains(mErr.Errors[5].Error(), "Progress deadline must be zero or greater") { + t.Fatalf("err: %s", err) + } + if !strings.Contains(mErr.Errors[6].Error(), "Minimum healthy time must be less than healthy deadline") { + t.Fatalf("err: %s", err) + } + if !strings.Contains(mErr.Errors[7].Error(), "Healthy deadline must be less than progress deadline") { t.Fatalf("err: %s", err) } } diff --git a/progress.md b/progress.md new file mode 100644 index 000000000..d8543eb52 --- /dev/null +++ b/progress.md @@ -0,0 +1,32 @@ +What do I need to know + +1. Creation time of the deployment +2. The time at which each allocation had its health set + +Cases: + +1. The deployment is created but no allocations are created +Deadline is deployment start time + progress deadline +XXX Deadline is first allocation start time + progress deadline + +2. Allocations created and some healthy, some unhealthy +Original deadline is deployment start time + progress deadline +If any alloc goes healthy, the new deadline is its time + progress deadline + +--- + +Ideas: + +1. Think I need an option to kill the alloc after it is unhealthy to cause a + server side restart. + +check_restart handles the case of checks but not the case of templates +blocking, downloading an image, etc. + +--- +Changed Behavior: + +1. The deployment will fail if there is resource issues which I super don't + like. + +THIS IS MORE OR LESS NOT AN OPTION diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 485bfa55e..0f4b6f595 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -194,18 +194,24 @@ func (a *allocReconciler) Compute() *reconcileResults { // Detect if the deployment is paused if a.deployment != nil { + // XXX Fix + // XXX An idea for not replacing failed allocs that are part of + // deployment that will fail immediately is to only replace them if + // their desired transistion has a replace bool set by the deployment + // watcher. + // Detect if any allocs associated with this deploy have failed // Failed allocations could edge trigger an evaluation before the deployment watcher // runs and marks the deploy as failed. This block makes sure that is still // considered a failed deploy failedAllocsInDeploy := false - for _, as := range m { - for _, alloc := range as { - if alloc.DeploymentID == a.deployment.ID && alloc.ClientStatus == structs.AllocClientStatusFailed { - failedAllocsInDeploy = true - } - } - } + //for _, as := range m { + //for _, alloc := range as { + //if alloc.DeploymentID == a.deployment.ID && alloc.ClientStatus == structs.AllocClientStatusFailed { + //failedAllocsInDeploy = true + //} + //} + //} a.deploymentPaused = a.deployment.Status == structs.DeploymentStatusPaused a.deploymentFailed = a.deployment.Status == structs.DeploymentStatusFailed || failedAllocsInDeploy } @@ -527,12 +533,9 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool { // Final check to see if the deployment is complete is to ensure everything // is healthy if deploymentComplete && a.deployment != nil { - partOf, _ := untainted.filterByDeployment(a.deployment.ID) - for _, alloc := range partOf { - if !alloc.DeploymentStatus.IsHealthy() { - deploymentComplete = false - break - } + dstate := a.deployment.TaskGroups[group] + if dstate.DesiredTotal != dstate.HealthyAllocs { + deploymentComplete = false } }