diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 1a7337a1f..580ade621 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -138,30 +138,10 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR // upsertDeploymentUpdates updates the deployments given the passed status // updates. func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error { - for _, d := range updates { - raw, err := txn.First("deployment", "id", d.DeploymentID) - if err != nil { + for _, u := range updates { + if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil { return err } - if raw == nil { - return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", d.DeploymentID) - } - - copy := raw.(*structs.Deployment).Copy() - - // Apply the new status - copy.Status = d.Status - copy.StatusDescription = d.StatusDescription - copy.ModifyIndex = index - - // Insert the deployment - if err := txn.Insert("deployment", copy); err != nil { - return err - } - } - - if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) } return nil @@ -256,6 +236,13 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return fmt.Errorf("index update failed: %v", err) } + // If the deployment is being marked as complete, set the job to stable. + if deployment.Status == structs.DeploymentStatusSuccessful { + if err := s.updateJobStability(index, deployment, txn); err != nil { + return fmt.Errorf("failed to update job stability: %v", err) + } + } + return nil } @@ -569,7 +556,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { txn := s.db.Txn(true) defer txn.Abort() - if err := s.upsertJobImpl(index, job, txn); err != nil { + if err := s.upsertJobImpl(index, job, false, txn); err != nil { return err } txn.Commit() @@ -577,7 +564,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // upsertJobImpl is the inplementation for registering a job or updating a job definition -func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error { // Check if the job already exists existing, err := txn.First("jobs", "id", job.ID) if err != nil { @@ -589,7 +576,13 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, txn *memdb.Tx job.CreateIndex = existing.(*structs.Job).CreateIndex job.ModifyIndex = index job.JobModifyIndex = index - job.Version = existing.(*structs.Job).Version + 1 + + // Bump the version unless asked to keep it. This should only be done + // when changing an internal field such as Stable. A spec change should + // always come with a version bump + if !keepVersion { + job.Version = existing.(*structs.Job).Version + 1 + } // Compute the job status var err error @@ -883,12 +876,19 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin // JobByIDAndVersion returns the job identified by its ID and Version func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, id string, version uint64) (*structs.Job, error) { txn := s.db.Txn(false) + return s.jobByIDAndVersionImpl(ws, id, version, txn) +} + +// jobByIDAndVersionImpl returns the job identified by its ID and Version +func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, id string, version uint64, txn *memdb.Txn) (*structs.Job, error) { watchCh, existing, err := txn.FirstWatch("job_version", "id", id, version) if err != nil { return nil, err } - ws.Add(watchCh) + if ws != nil { + ws.Add(watchCh) + } if existing != nil { job := existing.(*structs.Job) @@ -1844,7 +1844,7 @@ func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.Deploymen // Upsert the job if necessary if req.Job != nil { - if err := s.upsertJobImpl(index, req.Job, txn); err != nil { + if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil { return err } } @@ -1889,9 +1889,45 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym return fmt.Errorf("index update failed: %v", err) } + // If the deployment is being marked as complete, set the job to stable. + if copy.Status == structs.DeploymentStatusSuccessful { + if err := s.updateJobStability(index, copy, txn); err != nil { + return fmt.Errorf("failed to update job stability: %v", err) + } + } + return nil } +// updateJobStability updates the job version referenced by a successful +// deployment to stable. +func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error { + // Hot-path + if deployment.Status != structs.DeploymentStatusSuccessful { + return nil + } + + // Get the job that is referenced + job, err := s.jobByIDAndVersionImpl(nil, deployment.JobID, deployment.JobVersion, txn) + if err != nil { + return err + } + + // Has already been cleared, nothing to do + if job == nil { + return nil + } + + // If the job is already stable, nothing to do + if job.Stable { + return nil + } + + copy := job.Copy() + copy.Stable = true + return s.upsertJobImpl(index, copy, true, txn) +} + // UpdateDeploymentPromotion is used to promote canaries in a deployment and // potentially make a evaluation func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error { @@ -1970,15 +2006,10 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD } // Insert the deployment - if err := txn.Insert("deployment", copy); err != nil { + if err := s.upsertDeploymentImpl(index, copy, txn); err != nil { return err } - // Update the index - if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) - } - // Upsert the optional eval if req.Eval != nil { if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil { @@ -2068,7 +2099,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl // Upsert the job if necessary if req.Job != nil { - if err := s.upsertJobImpl(index, req.Job, txn); err != nil { + if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil { return err } } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 687c33409..ae78483ba 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -4790,6 +4790,63 @@ func TestStateStore_UpsertDeploymentStatusUpdate_NonTerminal(t *testing.T) { } } +// Test that when a deployment is updated to successful the job is updated to +// stable +func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) { + state := testStateStore(t) + + // Insert a job + job := mock.Job() + if err := state.UpsertJob(1, job); err != nil { + t.Fatalf("bad: %v", err) + } + + // Insert a deployment + d := structs.NewDeployment(job) + if err := state.UpsertDeployment(2, d); err != nil { + t.Fatalf("bad: %v", err) + } + + // Update the deployment + req := &structs.DeploymentStatusUpdateRequest{ + DeploymentUpdate: &structs.DeploymentStatusUpdate{ + DeploymentID: d.ID, + Status: structs.DeploymentStatusSuccessful, + StatusDescription: structs.DeploymentStatusDescriptionSuccessful, + }, + } + err := state.UpdateDeploymentStatus(3, req) + if err != nil { + t.Fatalf("bad: %v", err) + } + + // Check that the status was updated properly + ws := memdb.NewWatchSet() + dout, err := state.DeploymentByID(ws, d.ID) + if err != nil { + t.Fatalf("bad: %v", err) + } + if dout.Status != structs.DeploymentStatusSuccessful || + dout.StatusDescription != structs.DeploymentStatusDescriptionSuccessful { + t.Fatalf("bad: %#v", dout) + } + + // Check that the job was created + jout, _ := state.JobByID(ws, job.ID) + if err != nil { + t.Fatalf("bad: %v", err) + } + if jout == nil { + t.Fatalf("bad: %#v", jout) + } + if !jout.Stable { + t.Fatalf("job not marked stable %#v", jout) + } + if jout.Version != d.JobVersion { + t.Fatalf("job version changed; got %d; want %d", jout.Version, d.JobVersion) + } +} + // Test that non-existant deployment can't be promoted func TestStateStore_UpsertDeploymentPromotion_NonExistant(t *testing.T) { state := testStateStore(t)