From c218f5bb7085cf835e522415080fcd2e64001145 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 6 Jan 2017 10:34:55 -0800 Subject: [PATCH] Store pointer of JobSummary in state store and remove in-place modifications of the object and replace with Copy-Update-Insert operations --- nomad/fsm.go | 10 ++++-- nomad/state/state_store.go | 62 +++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 695a95fa5..9c1a233f0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -723,10 +723,12 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { } // Get the job summary from the fsm state store - summary, err := n.state.JobSummaryByID(job.ID) + raw, err := n.state.JobSummaryByID(job.ID) if err != nil { return err } + summary := raw.Copy() + summary.ModifyIndex = index // Add the allocations scheduler has made to queued since these // allocations are never getting placed until the scheduler is invoked @@ -755,6 +757,10 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { if !ok { return fmt.Errorf("task group %q not found while updating queued count", tg) } + + // We add instead of setting here because we want to take into + // consideration what the scheduler with a mock planner thinks it + // placed. Those should be counted as queued as well tgSummary.Queued += queued summary.Summary[tg] = tgSummary } @@ -997,7 +1003,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, break } - jobSummary := raw.(structs.JobSummary) + jobSummary := raw.(*structs.JobSummary) sink.Write([]byte{byte(JobSummarySnapshot)}) if err := encoder.Encode(jobSummary); err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c9c794507..a51661949 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -91,7 +91,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma defer txn.Abort() // Update the index - if err := txn.Insert("job_summary", *jobSummary); err != nil { + if err := txn.Insert("job_summary", jobSummary); err != nil { return err } @@ -410,7 +410,7 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { // Only continue if the summary exists. It could not exist if the parent // job was removed if summaryRaw != nil { - existing := summaryRaw.(structs.JobSummary) + existing := summaryRaw.(*structs.JobSummary) pSummary := existing.Copy() if pSummary.Children != nil { @@ -426,11 +426,14 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("unknown old job status %q", job.Status) } + // Update the modify index + pSummary.ModifyIndex = index + watcher.Add(watch.Item{Table: "job_summary"}) watcher.Add(watch.Item{JobSummary: job.ParentID}) // Insert the summary - if err := txn.Insert("job_summary", *pSummary); err != nil { + if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -545,8 +548,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { return nil, err } if existing != nil { - summary := existing.(structs.JobSummary) - return summary.Copy(), nil + summary := existing.(*structs.JobSummary) + return summary, nil } return nil, nil @@ -725,8 +728,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("job summary lookup failed: %v", err) } if summaryRaw != nil { - js := summaryRaw.(structs.JobSummary) - var hasSummaryChanged bool + js := summaryRaw.(*structs.JobSummary).Copy() + hasSummaryChanged := false for tg, num := range eval.QueuedAllocations { if summary, ok := js.Summary[tg]; ok { if summary.Queued != num { @@ -1389,7 +1392,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { job := rawJob.(*structs.Job) // Create a job summary for the job - summary := structs.JobSummary{ + summary := &structs.JobSummary{ JobID: job.ID, Summary: make(map[string]structs.TaskGroupSummary), } @@ -1534,7 +1537,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. // Only continue if the summary exists. It could not exist if the parent // job was removed if summaryRaw != nil { - existing := summaryRaw.(structs.JobSummary) + existing := summaryRaw.(*structs.JobSummary) pSummary := existing.Copy() if pSummary.Children == nil { pSummary.Children = new(structs.JobChildrenSummary) @@ -1576,7 +1579,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. watcher.Add(watch.Item{JobSummary: updated.ParentID}) // Insert the summary - if err := txn.Insert("job_summary", *pSummary); err != nil { + if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -1636,13 +1639,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, watcher watch.Items, txn *memdb.Txn) error { - existing, err := s.JobSummaryByID(job.ID) + // Update the job summary + summaryRaw, err := txn.First("job_summary", "id", job.ID) if err != nil { - return fmt.Errorf("unable to retrieve summary for job: %v", err) + return fmt.Errorf("job summary lookup failed: %v", err) } - var hasSummaryChanged bool - if existing == nil { - existing = &structs.JobSummary{ + + // Get the summary or create if necessary + var summary *structs.JobSummary + hasSummaryChanged := false + if summaryRaw != nil { + summary = summaryRaw.(*structs.JobSummary).Copy() + } else { + summary = &structs.JobSummary{ JobID: job.ID, Summary: make(map[string]structs.TaskGroupSummary), Children: new(structs.JobChildrenSummary), @@ -1650,15 +1659,16 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, } hasSummaryChanged = true } + for _, tg := range job.TaskGroups { - if _, ok := existing.Summary[tg.Name]; !ok { + if _, ok := summary.Summary[tg.Name]; !ok { newSummary := structs.TaskGroupSummary{ Complete: 0, Failed: 0, Running: 0, Starting: 0, } - existing.Summary[tg.Name] = newSummary + summary.Summary[tg.Name] = newSummary hasSummaryChanged = true } } @@ -1666,7 +1676,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // The job summary has changed, so add to watcher and update the modify // index. if hasSummaryChanged { - existing.ModifyIndex = index + summary.ModifyIndex = index watcher.Add(watch.Item{Table: "job_summary"}) watcher.Add(watch.Item{JobSummary: job.ID}) @@ -1674,7 +1684,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - if err := txn.Insert("job_summary", *existing); err != nil { + if err := txn.Insert("job_summary", summary); err != nil { return err } } @@ -1696,6 +1706,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if err != nil { return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) } + if summaryRaw == nil { // Check if the job is de-registered rawJob, err := txn.First("jobs", "id", alloc.JobID) @@ -1707,10 +1718,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if rawJob == nil { return nil } + return fmt.Errorf("job summary for job %q is not present", alloc.JobID) } - summary := summaryRaw.(structs.JobSummary) - jobSummary := summary.Copy() + + // Get a copy of the existing summary + jobSummary := summaryRaw.(*structs.JobSummary).Copy() // Not updating the job summary because the allocation doesn't belong to the // currently registered job @@ -1722,7 +1735,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if !ok { return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) } - var summaryChanged bool + + summaryChanged := false if existingAlloc == nil { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: @@ -1783,7 +1797,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return fmt.Errorf("index update failed: %v", err) } - if err := txn.Insert("job_summary", *jobSummary); err != nil { + if err := txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("updating job summary failed: %v", err) } } @@ -1920,7 +1934,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { r.items.Add(watch.Item{Table: "job_summary"}) r.items.Add(watch.Item{JobSummary: jobSummary.JobID}) - if err := r.txn.Insert("job_summary", *jobSummary); err != nil { + if err := r.txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } return nil