From f949a77ee3a98d07be0bfb2be9a27f14ed74bb9d Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 10 May 2017 15:26:00 -0700 Subject: [PATCH] Easy feedback fixes --- nomad/state/schema.go | 21 ++------- nomad/state/state_store.go | 88 +++++++++++++++++++------------------- nomad/structs/structs.go | 2 +- 3 files changed, 48 insertions(+), 63 deletions(-) diff --git a/nomad/state/schema.go b/nomad/state/schema.go index e17cd416f..dfeef6678 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -238,27 +238,14 @@ func deploymentSchema() *memdb.TableSchema { }, }, - // Job index is used to lookup deployments by job and the deployment - // status + // Job index is used to lookup deployments by job "job": &memdb.IndexSchema{ Name: "job", AllowMissing: false, Unique: false, - - // Use a compound index so that the scheduler can quickly get - // any paused/deploying deployment - Indexer: &memdb.CompoundIndex{ - Indexes: []memdb.Indexer{ - &memdb.StringFieldIndex{ - Field: "JobID", - Lowercase: true, - }, - - &memdb.StringFieldIndex{ - Field: "Status", - Lowercase: true, - }, - }, + Indexer: &memdb.StringFieldIndex{ + Field: "JobID", + Lowercase: true, }, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 510ef1299..169cdde4f 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -134,9 +134,9 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma defer txn.Abort() // Check if the job summary already exists - existing, err := txn.First("jobSummary", "id", jobSummary.JobID) + existing, err := txn.First("job_summary", "id", jobSummary.JobID) if err != nil { - return fmt.Errorf("periodic launch lookup failed: %v", err) + return fmt.Errorf("job summary lookup failed: %v", err) } // Setup the indexes correctly @@ -179,6 +179,8 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error { return nil } +// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to +// true, all prior deployments for the same job will be cancelled. func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment, cancelPrior bool) error { txn := s.db.Txn(true) defer txn.Abort() @@ -192,40 +194,13 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, cancelPrior bool, txn *memdb.Txn) error { // Go through and cancel any active deployment for the job. if cancelPrior { - iter, err := txn.Get("deployment", "job_prefix", deployment.JobID) - if err != nil { - return fmt.Errorf("deployment lookup failed: %v", err) - } - - for { - raw := iter.Next() - if raw == nil { - break - } - - // Ensure the ID is an exact match and that the deployment is active - d := raw.(*structs.Deployment) - if d.JobID != deployment.JobID || !d.Active() { - continue - } - - // We need to cancel so make a copy and set its status - cancelled := d.Copy() - cancelled.ModifyIndex = index - cancelled.Status = structs.DeploymentStatusCancelled - cancelled.StatusDescription = fmt.Sprintf("Cancelled in favor of deployment %q", deployment.ID) - - // Insert the cancelled deployment - if err := txn.Insert("deployment", cancelled); err != nil { - return err - } - } + s.cancelPriorDeployments(index, deployment, txn) } // Check if the deployment already exists existing, err := txn.First("deployment", "id", deployment.ID) if err != nil { - return fmt.Errorf("periodic launch lookup failed: %v", err) + return fmt.Errorf("deployment lookup failed: %v", err) } // Setup the indexes correctly @@ -250,6 +225,39 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl return nil } +// cancelPriorDeployments cancels any prior deployments for the job. +func (s *StateStore) cancelPriorDeployments(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error { + iter, err := txn.Get("deployment", "job", deployment.JobID) + if err != nil { + return fmt.Errorf("deployment lookup failed: %v", err) + } + + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the deployment is active + d := raw.(*structs.Deployment) + if !d.Active() { + continue + } + + // We need to cancel so make a copy and set its status + cancelled := d.Copy() + cancelled.ModifyIndex = index + cancelled.Status = structs.DeploymentStatusCancelled + cancelled.StatusDescription = fmt.Sprintf("Cancelled in favor of deployment %q", deployment.ID) + + // Insert the cancelled deployment + if err := txn.Insert("deployment", cancelled); err != nil { + return err + } + } + return nil +} + func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) { txn := s.db.Txn(false) @@ -286,7 +294,7 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*str txn := s.db.Txn(false) // Get an iterator over the deployments - iter, err := txn.Get("deployment", "job_prefix", jobID) + iter, err := txn.Get("deployment", "job", jobID) if err != nil { return nil, err } @@ -301,23 +309,19 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*str } d := raw.(*structs.Deployment) - - // Filter non-exact matches - if d.JobID != jobID { - continue - } - out = append(out, d) } return out, nil } +// LatestDeploymentByJobID returns the latest deployment for the given job. The +// latest is determined strictly by CreateIndex. func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error) { txn := s.db.Txn(false) // Get an iterator over the deployments - iter, err := txn.Get("deployment", "job_prefix", jobID) + iter, err := txn.Get("deployment", "job", jobID) if err != nil { return nil, err } @@ -332,12 +336,6 @@ func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (* } d := raw.(*structs.Deployment) - - // Filter non-exact matches - if d.JobID != jobID { - continue - } - if out == nil || out.CreateIndex < d.CreateIndex { out = d } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index c4cfae486..3bf68d530 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3511,7 +3511,7 @@ func (d *Deployment) Copy() *Deployment { *c = *d c.TaskGroups = nil - if l := len(d.TaskGroups); l != 0 { + if l := len(d.TaskGroups); d.TaskGroups != nil { c.TaskGroups = make(map[string]*DeploymentState, l) for tg, s := range d.TaskGroups { c.TaskGroups[tg] = s.Copy()