Store pointer of JobSummary in state store and remove in-place modifications of the object and replace with Copy-Update-Insert operations

This commit is contained in:
Alex Dadgar
2017-01-06 10:34:55 -08:00
parent 8214cf08c9
commit c218f5bb70
2 changed files with 46 additions and 26 deletions

View File

@@ -723,10 +723,12 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
}
// Get the job summary from the fsm state store
summary, err := n.state.JobSummaryByID(job.ID)
raw, err := n.state.JobSummaryByID(job.ID)
if err != nil {
return err
}
summary := raw.Copy()
summary.ModifyIndex = index
// Add the allocations scheduler has made to queued since these
// allocations are never getting placed until the scheduler is invoked
@@ -755,6 +757,10 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error {
if !ok {
return fmt.Errorf("task group %q not found while updating queued count", tg)
}
// We add instead of setting here because we want to take into
// consideration what the scheduler with a mock planner thinks it
// placed. Those should be counted as queued as well
tgSummary.Queued += queued
summary.Summary[tg] = tgSummary
}
@@ -997,7 +1003,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink,
break
}
jobSummary := raw.(structs.JobSummary)
jobSummary := raw.(*structs.JobSummary)
sink.Write([]byte{byte(JobSummarySnapshot)})
if err := encoder.Encode(jobSummary); err != nil {

View File

@@ -91,7 +91,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma
defer txn.Abort()
// Update the index
if err := txn.Insert("job_summary", *jobSummary); err != nil {
if err := txn.Insert("job_summary", jobSummary); err != nil {
return err
}
@@ -410,7 +410,7 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(structs.JobSummary)
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children != nil {
@@ -426,11 +426,14 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("unknown old job status %q", job.Status)
}
// Update the modify index
pSummary.ModifyIndex = index
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ParentID})
// Insert the summary
if err := txn.Insert("job_summary", *pSummary); err != nil {
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
@@ -545,8 +548,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) {
return nil, err
}
if existing != nil {
summary := existing.(structs.JobSummary)
return summary.Copy(), nil
summary := existing.(*structs.JobSummary)
return summary, nil
}
return nil, nil
@@ -725,8 +728,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
return fmt.Errorf("job summary lookup failed: %v", err)
}
if summaryRaw != nil {
js := summaryRaw.(structs.JobSummary)
var hasSummaryChanged bool
js := summaryRaw.(*structs.JobSummary).Copy()
hasSummaryChanged := false
for tg, num := range eval.QueuedAllocations {
if summary, ok := js.Summary[tg]; ok {
if summary.Queued != num {
@@ -1389,7 +1392,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error {
job := rawJob.(*structs.Job)
// Create a job summary for the job
summary := structs.JobSummary{
summary := &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
}
@@ -1534,7 +1537,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.
// Only continue if the summary exists. It could not exist if the parent
// job was removed
if summaryRaw != nil {
existing := summaryRaw.(structs.JobSummary)
existing := summaryRaw.(*structs.JobSummary)
pSummary := existing.Copy()
if pSummary.Children == nil {
pSummary.Children = new(structs.JobChildrenSummary)
@@ -1576,7 +1579,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb.
watcher.Add(watch.Item{JobSummary: updated.ParentID})
// Insert the summary
if err := txn.Insert("job_summary", *pSummary); err != nil {
if err := txn.Insert("job_summary", pSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
@@ -1636,13 +1639,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
watcher watch.Items, txn *memdb.Txn) error {
existing, err := s.JobSummaryByID(job.ID)
// Update the job summary
summaryRaw, err := txn.First("job_summary", "id", job.ID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for job: %v", err)
return fmt.Errorf("job summary lookup failed: %v", err)
}
var hasSummaryChanged bool
if existing == nil {
existing = &structs.JobSummary{
// Get the summary or create if necessary
var summary *structs.JobSummary
hasSummaryChanged := false
if summaryRaw != nil {
summary = summaryRaw.(*structs.JobSummary).Copy()
} else {
summary = &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
Children: new(structs.JobChildrenSummary),
@@ -1650,15 +1659,16 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
}
hasSummaryChanged = true
}
for _, tg := range job.TaskGroups {
if _, ok := existing.Summary[tg.Name]; !ok {
if _, ok := summary.Summary[tg.Name]; !ok {
newSummary := structs.TaskGroupSummary{
Complete: 0,
Failed: 0,
Running: 0,
Starting: 0,
}
existing.Summary[tg.Name] = newSummary
summary.Summary[tg.Name] = newSummary
hasSummaryChanged = true
}
}
@@ -1666,7 +1676,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
// The job summary has changed, so add to watcher and update the modify
// index.
if hasSummaryChanged {
existing.ModifyIndex = index
summary.ModifyIndex = index
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ID})
@@ -1674,7 +1684,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job,
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
if err := txn.Insert("job_summary", *existing); err != nil {
if err := txn.Insert("job_summary", summary); err != nil {
return err
}
}
@@ -1696,6 +1706,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if err != nil {
return fmt.Errorf("unable to lookup job summary for job id %q: %v", err)
}
if summaryRaw == nil {
// Check if the job is de-registered
rawJob, err := txn.First("jobs", "id", alloc.JobID)
@@ -1707,10 +1718,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if rawJob == nil {
return nil
}
return fmt.Errorf("job summary for job %q is not present", alloc.JobID)
}
summary := summaryRaw.(structs.JobSummary)
jobSummary := summary.Copy()
// Get a copy of the existing summary
jobSummary := summaryRaw.(*structs.JobSummary).Copy()
// Not updating the job summary because the allocation doesn't belong to the
// currently registered job
@@ -1722,7 +1735,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
if !ok {
return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup)
}
var summaryChanged bool
summaryChanged := false
if existingAlloc == nil {
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
@@ -1783,7 +1797,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat
return fmt.Errorf("index update failed: %v", err)
}
if err := txn.Insert("job_summary", *jobSummary); err != nil {
if err := txn.Insert("job_summary", jobSummary); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
}
@@ -1920,7 +1934,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err
func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
r.items.Add(watch.Item{Table: "job_summary"})
r.items.Add(watch.Item{JobSummary: jobSummary.JobID})
if err := r.txn.Insert("job_summary", *jobSummary); err != nil {
if err := r.txn.Insert("job_summary", jobSummary); err != nil {
return fmt.Errorf("job summary insert failed: %v", err)
}
return nil