Easy feedback fixes

This commit is contained in:
Alex Dadgar
2017-05-10 15:26:00 -07:00
parent 64d7fd8330
commit f949a77ee3
3 changed files with 48 additions and 63 deletions

View File

@@ -238,27 +238,14 @@ func deploymentSchema() *memdb.TableSchema {
},
},
// Job index is used to lookup deployments by job and the deployment
// status
// Job index is used to lookup deployments by job
"job": &memdb.IndexSchema{
Name: "job",
AllowMissing: false,
Unique: false,
// Use a compound index so that the scheduler can quickly get
// any paused/deploying deployment
Indexer: &memdb.CompoundIndex{
Indexes: []memdb.Indexer{
&memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
&memdb.StringFieldIndex{
Field: "Status",
Lowercase: true,
},
},
Indexer: &memdb.StringFieldIndex{
Field: "JobID",
Lowercase: true,
},
},
},

View File

@@ -134,9 +134,9 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
defer txn.Abort()
// Check if the job summary already exists
existing, err := txn.First("jobSummary", "id", jobSummary.JobID)
existing, err := txn.First("job_summary", "id", jobSummary.JobID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
return fmt.Errorf("job summary lookup failed: %v", err)
}
// Setup the indexes correctly
@@ -179,6 +179,8 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error {
return nil
}
// UpsertDeployment is used to insert a new deployment. If cancelPrior is set to
// true, all prior deployments for the same job will be cancelled.
func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployment, cancelPrior bool) error {
txn := s.db.Txn(true)
defer txn.Abort()
@@ -192,40 +194,13 @@ func (s *StateStore) UpsertDeployment(index uint64, deployment *structs.Deployme
func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Deployment, cancelPrior bool, txn *memdb.Txn) error {
// Go through and cancel any active deployment for the job.
if cancelPrior {
iter, err := txn.Get("deployment", "job_prefix", deployment.JobID)
if err != nil {
return fmt.Errorf("deployment lookup failed: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the ID is an exact match and that the deployment is active
d := raw.(*structs.Deployment)
if d.JobID != deployment.JobID || !d.Active() {
continue
}
// We need to cancel so make a copy and set its status
cancelled := d.Copy()
cancelled.ModifyIndex = index
cancelled.Status = structs.DeploymentStatusCancelled
cancelled.StatusDescription = fmt.Sprintf("Cancelled in favor of deployment %q", deployment.ID)
// Insert the cancelled deployment
if err := txn.Insert("deployment", cancelled); err != nil {
return err
}
}
s.cancelPriorDeployments(index, deployment, txn)
}
// Check if the deployment already exists
existing, err := txn.First("deployment", "id", deployment.ID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
return fmt.Errorf("deployment lookup failed: %v", err)
}
// Setup the indexes correctly
@@ -250,6 +225,39 @@ func (s *StateStore) upsertDeploymentImpl(index uint64, deployment *structs.Depl
return nil
}
// cancelPriorDeployments cancels any prior deployments for the job.
func (s *StateStore) cancelPriorDeployments(index uint64, deployment *structs.Deployment, txn *memdb.Txn) error {
iter, err := txn.Get("deployment", "job", deployment.JobID)
if err != nil {
return fmt.Errorf("deployment lookup failed: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the deployment is active
d := raw.(*structs.Deployment)
if !d.Active() {
continue
}
// We need to cancel so make a copy and set its status
cancelled := d.Copy()
cancelled.ModifyIndex = index
cancelled.Status = structs.DeploymentStatusCancelled
cancelled.StatusDescription = fmt.Sprintf("Cancelled in favor of deployment %q", deployment.ID)
// Insert the cancelled deployment
if err := txn.Insert("deployment", cancelled); err != nil {
return err
}
}
return nil
}
func (s *StateStore) Deployments(ws memdb.WatchSet) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
@@ -286,7 +294,7 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*str
txn := s.db.Txn(false)
// Get an iterator over the deployments
iter, err := txn.Get("deployment", "job_prefix", jobID)
iter, err := txn.Get("deployment", "job", jobID)
if err != nil {
return nil, err
}
@@ -301,23 +309,19 @@ func (s *StateStore) DeploymentsByJobID(ws memdb.WatchSet, jobID string) ([]*str
}
d := raw.(*structs.Deployment)
// Filter non-exact matches
if d.JobID != jobID {
continue
}
out = append(out, d)
}
return out, nil
}
// LatestDeploymentByJobID returns the latest deployment for the given job. The
// latest is determined strictly by CreateIndex.
func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*structs.Deployment, error) {
txn := s.db.Txn(false)
// Get an iterator over the deployments
iter, err := txn.Get("deployment", "job_prefix", jobID)
iter, err := txn.Get("deployment", "job", jobID)
if err != nil {
return nil, err
}
@@ -332,12 +336,6 @@ func (s *StateStore) LatestDeploymentByJobID(ws memdb.WatchSet, jobID string) (*
}
d := raw.(*structs.Deployment)
// Filter non-exact matches
if d.JobID != jobID {
continue
}
if out == nil || out.CreateIndex < d.CreateIndex {
out = d
}

View File

@@ -3511,7 +3511,7 @@ func (d *Deployment) Copy() *Deployment {
*c = *d
c.TaskGroups = nil
if l := len(d.TaskGroups); l != 0 {
if l := len(d.TaskGroups); d.TaskGroups != nil {
c.TaskGroups = make(map[string]*DeploymentState, l)
for tg, s := range d.TaskGroups {
c.TaskGroups[tg] = s.Copy()