mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Complete deployments mark jobs as stable
This PR allows jobs to be marked as stable automatically by a successful deployment.
This commit is contained in:
@@ -138,30 +138,10 @@ func (s *StateStore) UpsertPlanResults(index uint64, results *structs.ApplyPlanR
|
||||
// upsertDeploymentUpdates updates the deployments given the passed status
|
||||
// updates.
|
||||
func (s *StateStore) upsertDeploymentUpdates(index uint64, updates []*structs.DeploymentStatusUpdate, txn *memdb.Txn) error {
|
||||
for _, d := range updates {
|
||||
raw, err := txn.First("deployment", "id", d.DeploymentID)
|
||||
if err != nil {
|
||||
for _, u := range updates {
|
||||
if err := s.updateDeploymentStatusImpl(index, u, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
if raw == nil {
|
||||
return fmt.Errorf("Deployment ID %q couldn't be updated as it does not exist", d.DeploymentID)
|
||||
}
|
||||
|
||||
copy := raw.(*structs.Deployment).Copy()
|
||||
|
||||
// Apply the new status
|
||||
copy.Status = d.Status
|
||||
copy.StatusDescription = d.StatusDescription
|
||||
copy.ModifyIndex = index
|
||||
|
||||
// Insert the deployment
|
||||
if err := txn.Insert("deployment", copy); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
@@ -256,6 +236,13 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// If the deployment is being marked as complete, set the job to stable.
|
||||
if deployment.Status == structs.DeploymentStatusSuccessful {
|
||||
if err := s.updateJobStability(index, deployment, txn); err != nil {
|
||||
return fmt.Errorf("failed to update job stability: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -569,7 +556,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) {
|
||||
func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
if err := s.upsertJobImpl(index, job, txn); err != nil {
|
||||
if err := s.upsertJobImpl(index, job, false, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
txn.Commit()
|
||||
@@ -577,7 +564,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
|
||||
}
|
||||
|
||||
// upsertJobImpl is the inplementation for registering a job or updating a job definition
|
||||
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, txn *memdb.Txn) error {
|
||||
func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, keepVersion bool, txn *memdb.Txn) error {
|
||||
// Check if the job already exists
|
||||
existing, err := txn.First("jobs", "id", job.ID)
|
||||
if err != nil {
|
||||
@@ -589,7 +576,13 @@ func (s *StateStore) upsertJobImpl(index uint64, job *structs.Job, txn *memdb.Tx
|
||||
job.CreateIndex = existing.(*structs.Job).CreateIndex
|
||||
job.ModifyIndex = index
|
||||
job.JobModifyIndex = index
|
||||
job.Version = existing.(*structs.Job).Version + 1
|
||||
|
||||
// Bump the version unless asked to keep it. This should only be done
|
||||
// when changing an internal field such as Stable. A spec change should
|
||||
// always come with a version bump
|
||||
if !keepVersion {
|
||||
job.Version = existing.(*structs.Job).Version + 1
|
||||
}
|
||||
|
||||
// Compute the job status
|
||||
var err error
|
||||
@@ -883,12 +876,19 @@ func (s *StateStore) jobVersionByID(txn *memdb.Txn, ws *memdb.WatchSet, id strin
|
||||
// JobByIDAndVersion returns the job identified by its ID and Version
|
||||
func (s *StateStore) JobByIDAndVersion(ws memdb.WatchSet, id string, version uint64) (*structs.Job, error) {
|
||||
txn := s.db.Txn(false)
|
||||
return s.jobByIDAndVersionImpl(ws, id, version, txn)
|
||||
}
|
||||
|
||||
// jobByIDAndVersionImpl returns the job identified by its ID and Version
|
||||
func (s *StateStore) jobByIDAndVersionImpl(ws memdb.WatchSet, id string, version uint64, txn *memdb.Txn) (*structs.Job, error) {
|
||||
watchCh, existing, err := txn.FirstWatch("job_version", "id", id, version)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ws.Add(watchCh)
|
||||
if ws != nil {
|
||||
ws.Add(watchCh)
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
job := existing.(*structs.Job)
|
||||
@@ -1844,7 +1844,7 @@ func (s *StateStore) UpdateDeploymentStatus(index uint64, req *structs.Deploymen
|
||||
|
||||
// Upsert the job if necessary
|
||||
if req.Job != nil {
|
||||
if err := s.upsertJobImpl(index, req.Job, txn); err != nil {
|
||||
if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -1889,9 +1889,45 @@ func (s *StateStore) updateDeploymentStatusImpl(index uint64, u *structs.Deploym
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// If the deployment is being marked as complete, set the job to stable.
|
||||
if copy.Status == structs.DeploymentStatusSuccessful {
|
||||
if err := s.updateJobStability(index, copy, txn); err != nil {
|
||||
return fmt.Errorf("failed to update job stability: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateJobStability updates the job version referenced by a successful
|
||||
// deployment to stable.
|
||||
func (s *StateStore) updateJobStability(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error {
|
||||
// Hot-path
|
||||
if deployment.Status != structs.DeploymentStatusSuccessful {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the job that is referenced
|
||||
job, err := s.jobByIDAndVersionImpl(nil, deployment.JobID, deployment.JobVersion, txn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Has already been cleared, nothing to do
|
||||
if job == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// If the job is already stable, nothing to do
|
||||
if job.Stable {
|
||||
return nil
|
||||
}
|
||||
|
||||
copy := job.Copy()
|
||||
copy.Stable = true
|
||||
return s.upsertJobImpl(index, copy, true, txn)
|
||||
}
|
||||
|
||||
// UpdateDeploymentPromotion is used to promote canaries in a deployment and
|
||||
// potentially make a evaluation
|
||||
func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyDeploymentPromoteRequest) error {
|
||||
@@ -1970,15 +2006,10 @@ func (s *StateStore) UpdateDeploymentPromotion(index uint64, req *structs.ApplyD
|
||||
}
|
||||
|
||||
// Insert the deployment
|
||||
if err := txn.Insert("deployment", copy); err != nil {
|
||||
if err := s.upsertDeploymentImpl(index, copy, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the index
|
||||
if err := txn.Insert("index", &IndexEntry{"deployment", index}); err != nil {
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
// Upsert the optional eval
|
||||
if req.Eval != nil {
|
||||
if err := s.nestedUpsertEval(txn, index, req.Eval); err != nil {
|
||||
@@ -2068,7 +2099,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(index uint64, req *structs.Appl
|
||||
|
||||
// Upsert the job if necessary
|
||||
if req.Job != nil {
|
||||
if err := s.upsertJobImpl(index, req.Job, txn); err != nil {
|
||||
if err := s.upsertJobImpl(index, req.Job, false, txn); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4790,6 +4790,63 @@ func TestStateStore_UpsertDeploymentStatusUpdate_NonTerminal(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// Test that when a deployment is updated to successful the job is updated to
|
||||
// stable
|
||||
func TestStateStore_UpsertDeploymentStatusUpdate_Successful(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
// Insert a job
|
||||
job := mock.Job()
|
||||
if err := state.UpsertJob(1, job); err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Insert a deployment
|
||||
d := structs.NewDeployment(job)
|
||||
if err := state.UpsertDeployment(2, d); err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Update the deployment
|
||||
req := &structs.DeploymentStatusUpdateRequest{
|
||||
DeploymentUpdate: &structs.DeploymentStatusUpdate{
|
||||
DeploymentID: d.ID,
|
||||
Status: structs.DeploymentStatusSuccessful,
|
||||
StatusDescription: structs.DeploymentStatusDescriptionSuccessful,
|
||||
},
|
||||
}
|
||||
err := state.UpdateDeploymentStatus(3, req)
|
||||
if err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
|
||||
// Check that the status was updated properly
|
||||
ws := memdb.NewWatchSet()
|
||||
dout, err := state.DeploymentByID(ws, d.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
if dout.Status != structs.DeploymentStatusSuccessful ||
|
||||
dout.StatusDescription != structs.DeploymentStatusDescriptionSuccessful {
|
||||
t.Fatalf("bad: %#v", dout)
|
||||
}
|
||||
|
||||
// Check that the job was created
|
||||
jout, _ := state.JobByID(ws, job.ID)
|
||||
if err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
if jout == nil {
|
||||
t.Fatalf("bad: %#v", jout)
|
||||
}
|
||||
if !jout.Stable {
|
||||
t.Fatalf("job not marked stable %#v", jout)
|
||||
}
|
||||
if jout.Version != d.JobVersion {
|
||||
t.Fatalf("job version changed; got %d; want %d", jout.Version, d.JobVersion)
|
||||
}
|
||||
}
|
||||
|
||||
// Test that non-existant deployment can't be promoted
|
||||
func TestStateStore_UpsertDeploymentPromotion_NonExistant(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
Reference in New Issue
Block a user