From c218f5bb7085cf835e522415080fcd2e64001145 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 6 Jan 2017 10:34:55 -0800 Subject: [PATCH 1/4] Store pointer of JobSummary in state store and remove in-place modifications of the object and replace with Copy-Update-Insert operations --- nomad/fsm.go | 10 ++++-- nomad/state/state_store.go | 62 +++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 26 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 695a95fa5..9c1a233f0 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -723,10 +723,12 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { } // Get the job summary from the fsm state store - summary, err := n.state.JobSummaryByID(job.ID) + raw, err := n.state.JobSummaryByID(job.ID) if err != nil { return err } + summary := raw.Copy() + summary.ModifyIndex = index // Add the allocations scheduler has made to queued since these // allocations are never getting placed until the scheduler is invoked @@ -755,6 +757,10 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { if !ok { return fmt.Errorf("task group %q not found while updating queued count", tg) } + + // We add instead of setting here because we want to take into + // consideration what the scheduler with a mock planner thinks it + // placed. Those should be counted as queued as well tgSummary.Queued += queued summary.Summary[tg] = tgSummary } @@ -997,7 +1003,7 @@ func (s *nomadSnapshot) persistJobSummaries(sink raft.SnapshotSink, break } - jobSummary := raw.(structs.JobSummary) + jobSummary := raw.(*structs.JobSummary) sink.Write([]byte{byte(JobSummarySnapshot)}) if err := encoder.Encode(jobSummary); err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index c9c794507..a51661949 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -91,7 +91,7 @@ func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSumma defer txn.Abort() // Update the index - if err := txn.Insert("job_summary", *jobSummary); err != nil { + if err := txn.Insert("job_summary", jobSummary); err != nil { return err } @@ -410,7 +410,7 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { // Only continue if the summary exists. It could not exist if the parent // job was removed if summaryRaw != nil { - existing := summaryRaw.(structs.JobSummary) + existing := summaryRaw.(*structs.JobSummary) pSummary := existing.Copy() if pSummary.Children != nil { @@ -426,11 +426,14 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("unknown old job status %q", job.Status) } + // Update the modify index + pSummary.ModifyIndex = index + watcher.Add(watch.Item{Table: "job_summary"}) watcher.Add(watch.Item{JobSummary: job.ParentID}) // Insert the summary - if err := txn.Insert("job_summary", *pSummary); err != nil { + if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -545,8 +548,8 @@ func (s *StateStore) JobSummaryByID(jobID string) (*structs.JobSummary, error) { return nil, err } if existing != nil { - summary := existing.(structs.JobSummary) - return summary.Copy(), nil + summary := existing.(*structs.JobSummary) + return summary, nil } return nil, nil @@ -725,8 +728,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("job summary lookup failed: %v", err) } if summaryRaw != nil { - js := summaryRaw.(structs.JobSummary) - var hasSummaryChanged bool + js := summaryRaw.(*structs.JobSummary).Copy() + hasSummaryChanged := false for tg, num := range eval.QueuedAllocations { if summary, ok := js.Summary[tg]; ok { if summary.Queued != num { @@ -1389,7 +1392,7 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { job := rawJob.(*structs.Job) // Create a job summary for the job - summary := structs.JobSummary{ + summary := &structs.JobSummary{ JobID: job.ID, Summary: make(map[string]structs.TaskGroupSummary), } @@ -1534,7 +1537,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. // Only continue if the summary exists. It could not exist if the parent // job was removed if summaryRaw != nil { - existing := summaryRaw.(structs.JobSummary) + existing := summaryRaw.(*structs.JobSummary) pSummary := existing.Copy() if pSummary.Children == nil { pSummary.Children = new(structs.JobChildrenSummary) @@ -1576,7 +1579,7 @@ func (s *StateStore) setJobStatus(index uint64, watcher watch.Items, txn *memdb. watcher.Add(watch.Item{JobSummary: updated.ParentID}) // Insert the summary - if err := txn.Insert("job_summary", *pSummary); err != nil { + if err := txn.Insert("job_summary", pSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { @@ -1636,13 +1639,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, watcher watch.Items, txn *memdb.Txn) error { - existing, err := s.JobSummaryByID(job.ID) + // Update the job summary + summaryRaw, err := txn.First("job_summary", "id", job.ID) if err != nil { - return fmt.Errorf("unable to retrieve summary for job: %v", err) + return fmt.Errorf("job summary lookup failed: %v", err) } - var hasSummaryChanged bool - if existing == nil { - existing = &structs.JobSummary{ + + // Get the summary or create if necessary + var summary *structs.JobSummary + hasSummaryChanged := false + if summaryRaw != nil { + summary = summaryRaw.(*structs.JobSummary).Copy() + } else { + summary = &structs.JobSummary{ JobID: job.ID, Summary: make(map[string]structs.TaskGroupSummary), Children: new(structs.JobChildrenSummary), @@ -1650,15 +1659,16 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, } hasSummaryChanged = true } + for _, tg := range job.TaskGroups { - if _, ok := existing.Summary[tg.Name]; !ok { + if _, ok := summary.Summary[tg.Name]; !ok { newSummary := structs.TaskGroupSummary{ Complete: 0, Failed: 0, Running: 0, Starting: 0, } - existing.Summary[tg.Name] = newSummary + summary.Summary[tg.Name] = newSummary hasSummaryChanged = true } } @@ -1666,7 +1676,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, // The job summary has changed, so add to watcher and update the modify // index. if hasSummaryChanged { - existing.ModifyIndex = index + summary.ModifyIndex = index watcher.Add(watch.Item{Table: "job_summary"}) watcher.Add(watch.Item{JobSummary: job.ID}) @@ -1674,7 +1684,7 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - if err := txn.Insert("job_summary", *existing); err != nil { + if err := txn.Insert("job_summary", summary); err != nil { return err } } @@ -1696,6 +1706,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if err != nil { return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) } + if summaryRaw == nil { // Check if the job is de-registered rawJob, err := txn.First("jobs", "id", alloc.JobID) @@ -1707,10 +1718,12 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if rawJob == nil { return nil } + return fmt.Errorf("job summary for job %q is not present", alloc.JobID) } - summary := summaryRaw.(structs.JobSummary) - jobSummary := summary.Copy() + + // Get a copy of the existing summary + jobSummary := summaryRaw.(*structs.JobSummary).Copy() // Not updating the job summary because the allocation doesn't belong to the // currently registered job @@ -1722,7 +1735,8 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat if !ok { return fmt.Errorf("unable to find task group in the job summary: %v", alloc.TaskGroup) } - var summaryChanged bool + + summaryChanged := false if existingAlloc == nil { switch alloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: @@ -1783,7 +1797,7 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return fmt.Errorf("index update failed: %v", err) } - if err := txn.Insert("job_summary", *jobSummary); err != nil { + if err := txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("updating job summary failed: %v", err) } } @@ -1920,7 +1934,7 @@ func (r *StateRestore) PeriodicLaunchRestore(launch *structs.PeriodicLaunch) err func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { r.items.Add(watch.Item{Table: "job_summary"}) r.items.Add(watch.Item{JobSummary: jobSummary.JobID}) - if err := r.txn.Insert("job_summary", *jobSummary); err != nil { + if err := r.txn.Insert("job_summary", jobSummary); err != nil { return fmt.Errorf("job summary insert failed: %v", err) } return nil From 52a1acd611926d3e86e75c8d921e1c65cb068738 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sat, 7 Jan 2017 13:41:09 -0800 Subject: [PATCH 2/4] Detect newly created allocation's properly --- api/allocations.go | 1 + scheduler/util.go | 10 ++++++++-- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/api/allocations.go b/api/allocations.go index acf77d950..c2d504500 100644 --- a/api/allocations.go +++ b/api/allocations.go @@ -88,6 +88,7 @@ type Allocation struct { PreviousAllocation string CreateIndex uint64 ModifyIndex uint64 + AllocModifyIndex uint64 CreateTime int64 } diff --git a/scheduler/util.go b/scheduler/util.go index 4b66be2e0..1ed306b76 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -684,8 +684,14 @@ func adjustQueuedAllocations(logger *log.Logger, result *structs.PlanResult, que if result != nil { for _, allocations := range result.NodeAllocation { for _, allocation := range allocations { - // Ensure that the allocation is newly created - if allocation.CreateIndex != result.AllocIndex { + // Ensure that the allocation is newly created. We check that + // the CreateIndex is equal to the ModifyIndex in order to check + // that the allocation was just created. We do not check that + // the CreateIndex is equal to the results AllocIndex because + // the allocations we get back have gone through the planner's + // optimistic snapshot and thus their indexes may not be + // correct, but they will be consistent. + if allocation.CreateIndex != allocation.ModifyIndex { continue } From 0296011d8c01ca64cf7a5fecbaefd7bd7f8dafab Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 8 Jan 2017 14:14:35 -0800 Subject: [PATCH 3/4] Fix adjust test --- scheduler/util_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 73627c1c8..846da90f5 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -1055,10 +1055,13 @@ func TestUtil_AdjustQueuedAllocations(t *testing.T) { alloc1 := mock.Alloc() alloc2 := mock.Alloc() alloc2.CreateIndex = 4 + alloc2.ModifyIndex = 4 alloc3 := mock.Alloc() alloc3.CreateIndex = 3 + alloc3.ModifyIndex = 5 alloc4 := mock.Alloc() alloc4.CreateIndex = 6 + alloc4.ModifyIndex = 8 planResult := structs.PlanResult{ NodeUpdate: map[string][]*structs.Allocation{ @@ -1073,7 +1076,7 @@ func TestUtil_AdjustQueuedAllocations(t *testing.T) { }, }, RefreshIndex: 3, - AllocIndex: 4, + AllocIndex: 16, // Should not be considered } queuedAllocs := map[string]int{"web": 2} From 84485c9e3796dde566a9d80737381d925092f892 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 11 Jan 2017 13:18:36 -0800 Subject: [PATCH 4/4] Review fixes --- nomad/fsm.go | 13 ++++++++----- nomad/state/state_store.go | 25 +++++++++++++++---------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 9c1a233f0..36f38a29d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "reflect" "time" "github.com/armon/go-metrics" @@ -723,12 +724,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { } // Get the job summary from the fsm state store - raw, err := n.state.JobSummaryByID(job.ID) + originalSummary, err := n.state.JobSummaryByID(job.ID) if err != nil { return err } - summary := raw.Copy() - summary.ModifyIndex = index + summary := originalSummary.Copy() // Add the allocations scheduler has made to queued since these // allocations are never getting placed until the scheduler is invoked @@ -765,8 +765,11 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { summary.Summary[tg] = tgSummary } - if err := n.state.UpsertJobSummary(index, summary); err != nil { - return err + if !reflect.DeepEqual(summary, originalSummary) { + summary.ModifyIndex = index + if err := n.state.UpsertJobSummary(index, summary); err != nil { + return err + } } } return nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index a51661949..497d690b2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -414,30 +414,35 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { pSummary := existing.Copy() if pSummary.Children != nil { + modified := false switch job.Status { case structs.JobStatusPending: pSummary.Children.Pending-- pSummary.Children.Dead++ + modified = true case structs.JobStatusRunning: pSummary.Children.Running-- pSummary.Children.Dead++ + modified = true case structs.JobStatusDead: default: return fmt.Errorf("unknown old job status %q", job.Status) } - // Update the modify index - pSummary.ModifyIndex = index + if modified { + // Update the modify index + pSummary.ModifyIndex = index - watcher.Add(watch.Item{Table: "job_summary"}) - watcher.Add(watch.Item{JobSummary: job.ParentID}) + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: job.ParentID}) - // Insert the summary - if err := txn.Insert("job_summary", pSummary); err != nil { - return fmt.Errorf("job summary insert failed: %v", err) - } - if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { - return fmt.Errorf("index update failed: %v", err) + // Insert the summary + if err := txn.Insert("job_summary", pSummary); err != nil { + return fmt.Errorf("job summary insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } } }