From c16195a2ea2b72953dc1cc16e52d05dff5a191a7 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Mon, 22 May 2017 17:02:20 -0700 Subject: [PATCH] Only upsert a job if the spec changes and push deployment creation into reconciler --- nomad/job_endpoint.go | 23 +++++---- nomad/job_endpoint_test.go | 26 ++++++++++ nomad/structs/structs.go | 41 ++++++++++++++++ nomad/structs/structs_test.go | 42 ++++++++++++++++ scheduler/generic_sched.go | 5 ++ scheduler/reconcile.go | 91 ++++++++++++++++++++++++----------- 6 files changed, 192 insertions(+), 36 deletions(-) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 2d6703fb9..0a1cf719b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -142,15 +142,20 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // Clear the Vault token args.Job.VaultToken = "" - // Commit this update via Raft - _, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) - if err != nil { - j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err) - return err - } + // Check if the job has changed at all + if currentJob == nil || currentJob.SpecChanged(args.Job) { + // Commit this update via Raft + _, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err) + return err + } - // Populate the reply with job information - reply.JobModifyIndex = index + // Populate the reply with job information + reply.JobModifyIndex = index + } else { + reply.JobModifyIndex = currentJob.JobModifyIndex + } // If the job is periodic or parameterized, we don't create an eval. if args.Job.IsPeriodic() || args.Job.IsParameterized() { @@ -164,7 +169,7 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis Type: args.Job.Type, TriggeredBy: structs.EvalTriggerJobRegister, JobID: args.Job.ID, - JobModifyIndex: index, + JobModifyIndex: reply.JobModifyIndex, Status: structs.EvalStatusPending, } update := &structs.EvalUpdateRequest{ diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 782b8b837..efb923cfd 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" + "github.com/kr/pretty" ) func TestJobEndpoint_Register(t *testing.T) { @@ -232,6 +233,9 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { if out.Priority != 100 { t.Fatalf("expected update") } + if out.Version != 1 { + t.Fatalf("expected update") + } // Lookup the evaluation eval, err := state.EvalByID(ws, resp.EvalID) @@ -263,6 +267,28 @@ func TestJobEndpoint_Register_Existing(t *testing.T) { if eval.Status != structs.EvalStatusPending { t.Fatalf("bad: %#v", eval) } + + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check to ensure the job version didn't get bumped becasue we submitted + // the same job + state = s1.fsm.State() + ws = memdb.NewWatchSet() + out, err = state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.Version != 1 { + t.Fatalf("expected no update; got %v; diff %v", out.Version, pretty.Diff(job2, out)) + } } func TestJobEndpoint_Register_Periodic(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 9be6a3341..bfbb2952d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1627,6 +1627,47 @@ func (j *Job) RequiredSignals() map[string]map[string][]string { return signals } +// SpecChanged determines if the functional specification has changed between +// two job versions. The new job will be mutated but returned to its original +// value before returning, thus concurrent access to the job should be blocked. +func (j *Job) SpecChanged(new *Job) bool { + if j == nil { + return new != nil + } + + // Capture the original mutable values so they can be restored. + oStatus := new.Status + oStatusDescription := new.StatusDescription + oStable := new.Stable + oVersion := new.Version + oCreateIndex := new.CreateIndex + oModifyIndex := new.ModifyIndex + oJobModifyIndex := new.JobModifyIndex + + // Update the new job so we can do a reflect + new.Status = j.Status + new.StatusDescription = j.StatusDescription + new.Stable = j.Stable + new.Version = j.Version + new.CreateIndex = j.CreateIndex + new.ModifyIndex = j.ModifyIndex + new.JobModifyIndex = j.JobModifyIndex + + // Deep equals the jobs + equal := reflect.DeepEqual(j, new) + + // Restore the new jobs values + new.Status = oStatus + new.StatusDescription = oStatusDescription + new.Stable = oStable + new.Version = oVersion + new.CreateIndex = oCreateIndex + new.ModifyIndex = oModifyIndex + new.JobModifyIndex = oJobModifyIndex + + return !equal +} + // JobListStub is used to return a subset of job information // for the job list type JobListStub struct { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 70c20f4ea..3942d47a3 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -295,6 +295,48 @@ func TestJob_Canonicalize_Update(t *testing.T) { } } +func TestJob_SpecChanged(t *testing.T) { + // Get a base test job + base := testJob() + + // Only modify the indexes/mutable state of the job + mutatedBase := base.Copy() + mutatedBase.Status = "foo" + mutatedBase.ModifyIndex = base.ModifyIndex + 100 + + // changed contains a spec change that should be detected + change := base.Copy() + change.Priority = 99 + + cases := []struct { + Name string + Original *Job + New *Job + Changed bool + }{ + { + Name: "Same job except mutable indexes", + Changed: false, + Original: base, + New: mutatedBase, + }, + { + Name: "Different", + Changed: true, + Original: base, + New: change, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + if actual := c.Original.SpecChanged(c.New); actual != c.Changed { + t.Fatalf("SpecChanged() returned %v; want %v", actual, c.Changed) + } + }) + } +} + func testJob() *Job { return &Job{ Region: "global", diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index c2ae5fb59..19415f67e 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -390,6 +390,11 @@ func (s *GenericScheduler) computeJobAllocs2() error { s.plan.CreatedDeployment = results.createDeployment s.plan.DeploymentUpdates = results.deploymentUpdates + // Update the stored deployment + if results.createDeployment != nil { + s.deployment = results.createDeployment + } + // Handle the stop for _, stop := range results.stop { s.plan.AppendUpdate(stop.alloc, structs.AllocDesiredStatusStop, stop.statusDescription, stop.clientStatus) diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 269233626..a3eb17dd0 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -106,7 +106,8 @@ func NewAllocReconciler(ctx Context, stack Stack, batch bool, func (a *allocReconciler) Compute() *reconcileResults { // If we are just stopping a job we do not need to do anything more than // stopping all running allocs - if a.job == nil || a.job.Stop { + stopped := a.job == nil || a.job.Stop + if stopped { a.handleStop() // Cancel the deployment since it is not needed @@ -121,6 +122,29 @@ func (a *allocReconciler) Compute() *reconcileResults { return a.result } + // Check if the deployment is referencing an older job + if d := a.deployment; d != nil { + if d.JobCreateIndex != a.job.CreateIndex || d.JobModifyIndex != a.job.JobModifyIndex { + a.result.deploymentUpdates = append(a.result.deploymentUpdates, &structs.DeploymentStatusUpdate{ + DeploymentID: a.deployment.ID, + Status: structs.DeploymentStatusCancelled, + StatusDescription: structs.DeploymentStatusDescriptionNewerJob, + }) + a.deployment = nil + } + } + + // Create a new deployment + if a.deployment == nil && !stopped && a.job.HasUpdateStrategy() { + a.deployment = structs.NewDeployment(a.job) + a.result.createDeployment = a.deployment + a.ctx.Logger().Printf("ALEX: MADE DEPLOYMENT %q", a.deployment.ID) + } + + if a.deployment != nil { + a.ctx.Logger().Printf("ALEX: CURRENT DEPLOYMENT %q", a.deployment.ID) + } + m := newAllocMatrix(a.job, a.existingAllocs) for group, as := range m { a.computeGroup(group, as) @@ -225,13 +249,6 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { _, inplace, destructive := a.computeUpdates(tg, untainted) a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) - // XXX Not clear if this is needed - // Update untainted so that it contains all existing allocations that have - // been inplace updated or do not have to be updated and does not include - // any canaries. - //untainted = untainted.difference(destructive) - //a.ctx.Logger().Printf("RECONCILER -- untainted %#v", untainted) - // Get the update strategy of the group strategy, update := a.groupUpdateStrategy[group] @@ -239,8 +256,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // The fact that we have destructive updates and have less canaries than is // desired means we need to create canaries - requireCanary := len(destructive) != 0 && update && strategy.Canary != 0 && len(canaries) < strategy.Canary - if requireCanary && !a.deploymentPaused { + requireCanary := len(destructive) != 0 && update && len(canaries) < strategy.Canary + placeCanaries := requireCanary && !a.deploymentPaused + if placeCanaries { a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", strategy.Canary-len(canaries)) for i := len(canaries); i < strategy.Canary; i++ { a.result.place = append(a.result.place, allocPlaceResult{ @@ -253,12 +271,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } // Determine how many we can place - limit := tg.Count - if update { - // XXX This is wrong. Need to detect health first. Probably only within - // the truly untainted set - limit = strategy.MaxParallel - } + haveCanaries := len(canaries) != 0 || placeCanaries + limit := a.computeLimit(tg, strategy, untainted, haveCanaries) a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit) // Place if: @@ -299,18 +313,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { limit -= min } - // Migrations should be done under the rolling update strategy, however we - // do not abide by the paused state of the deployment since this could block - // node draining. - min := helper.IntMin(len(migrate), limit) - i := 0 - a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", min) + // TODO Migrations should be done using a stagger and max_parallel. + a.ctx.Logger().Printf("RECONCILER -- Migrating (%d)", len(migrate)) for _, alloc := range migrate { - if i == min { - break - } - i++ - a.result.stop = append(a.result.stop, allocStopResult{ alloc: alloc, statusDescription: allocMigrating, @@ -324,6 +329,38 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } } +func (a *allocReconciler) computeLimit(group *structs.TaskGroup, strategy *structs.UpdateStrategy, untainted allocSet, canaries bool) int { + // If there is no update stategy or deployment for the group we can deploy + // as many as the group has + if strategy == nil || a.deployment == nil { + return group.Count + } else if a.deploymentPaused { + // If the deployment is paused, do not create anything else + return 0 + } + + // Get the state of the deployment for the group + deploymentState := a.deployment.TaskGroups[group.Name] + + // If we have canaries and they have not been promoted the limit is 0 + if canaries && (deploymentState == nil || !deploymentState.Promoted) { + return 0 + } + + // If we have been promoted or there are no canaries, the limit is the + // configured MaxParallel - any outstanding non-healthy alloc for the + // deployment + limit := strategy.MaxParallel + partOf, _ := untainted.filterByDeployment(a.deployment.ID) + for _, alloc := range partOf { + if alloc.DeploymentStatus == nil || alloc.DeploymentStatus.Healthy == nil { + limit-- + } + } + + return limit +} + func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted, destructiveUpdates allocSet) []allocPlaceResult { // Hot path the nothing to do case existing := len(untainted) + len(destructiveUpdates)