From 5c03764015f983d88c92f53a7c700e71d57fc42e Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Aug 2016 11:58:36 -0700 Subject: [PATCH 1/4] Added a test for restoring the summaries in fsm --- nomad/fsm.go | 49 +++++--- nomad/fsm_test.go | 39 ++++++- nomad/state/state_store.go | 192 +++++++++++++++++--------------- nomad/state/state_store_test.go | 117 ++++++------------- 4 files changed, 207 insertions(+), 190 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 4cdd93fae..47541c63e 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -578,29 +578,44 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } - // Create Job Summaries - // The entire snapshot has to be restored first before we create the missing - // job summaries so that the indexes are updated and we know the highest - // index - // COMPAT 0.4 -> 0.4.1 - jobs, err := restore.JobsWithoutSummary() - if err != nil { - fmt.Errorf("error retreiving jobs during restore: %v", err) - } - if err := restore.CreateJobSummaries(jobs); err != nil { - return fmt.Errorf("error creating job summaries: %v", err) - } - restore.Commit() - // Reconciling the queued allocations - return n.reconcileSummaries(jobs) + // Create Job Summaries + // COMPAT 0.4 -> 0.4.1 + index, err := n.state.Index("job_summary") + if err != nil { + return fmt.Errorf("couldn't fetch index of job summary table: %v", err) + } + if index == 0 { + if err := n.state.ReconcileJobSummaries(); err != nil { + return fmt.Errorf("error reconciling summaries: %v", err) + } + if err := n.reconcileQueuedAllocations(); err != nil { + return fmt.Errorf("error re-computing the number of queued allocations:; %v", err) + } + } + + return nil } // reconcileSummaries re-calculates the queued allocations for every job that we // created a Job Summary during the snap shot restore -func (n *nomadFSM) reconcileSummaries(jobs []*structs.Job) error { - // Start the state restore +func (n *nomadFSM) reconcileQueuedAllocations() error { + // Get all the jobs + iter, err := n.state.Jobs() + if err != nil { + return err + } + var jobs []*structs.Job + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + jobs = append(jobs, rawJob.(*structs.Job)) + } + + // Start a restore session restore, err := n.state.Restore() if err != nil { return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 34bb63102..c33a789fd 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -981,13 +981,33 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { fsm := testFSM(t) state := fsm.State() + // Add a node + node := mock.Node() + state.UpsertNode(800, node) + + // Make a job so that none of the tasks can be placed job1 := mock.Job() + job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000 state.UpsertJob(1000, job1) - state.DeleteJobSummary(1010, job1.ID) + + // make an allocation and a job which can make partial progress + alloc := mock.Alloc() + state.UpsertJob(1010, alloc.Job) + state.UpsertAllocs(1020, []*structs.Allocation{alloc}) + + // Delete the summaries + state.DeleteJobSummary(1030, job1.ID) + state.DeleteJobSummary(1040, alloc.Job.ID) + + // Delete the index + if err := state.RemoveIndex("job_summary"); err != nil { + t.Fatalf("err: %v", err) + } fsm2 := testSnapshotRestore(t, fsm) state2 := fsm2.State() latestIndex, _ := state.LatestIndex() + out1, _ := state2.JobSummaryByID(job1.ID) expected := structs.JobSummary{ JobID: job1.ID, @@ -999,8 +1019,23 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { CreateIndex: latestIndex, ModifyIndex: latestIndex, } - if !reflect.DeepEqual(&expected, out1) { t.Fatalf("expected: %#v, actual: %#v", &expected, out1) } + + out2, _ := state2.JobSummaryByID(alloc.Job.ID) + expected = structs.JobSummary{ + JobID: alloc.Job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Queued: 3, + Starting: 1, + }, + }, + CreateIndex: latestIndex, + ModifyIndex: latestIndex, + } + if !reflect.DeepEqual(&expected, out2) { + t.Fatalf("expected: %#v, actual: %#v", &expected, out2) + } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 22697471f..05f227717 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1164,6 +1164,19 @@ func (s *StateStore) Index(name string) (uint64, error) { return out.(*IndexEntry).Value, nil } +// RemoveIndex is a helper method to remove an index for testing purposes +func (s *StateStore) RemoveIndex(name string) error { + txn := s.db.Txn(true) + defer txn.Abort() + + if _, err := txn.DeleteAll("index", "id", name); err != nil { + return err + } + + txn.Commit() + return nil +} + // Indexes returns an iterator over all the indexes func (s *StateStore) Indexes() (memdb.ResultIterator, error) { txn := s.db.Txn(false) @@ -1176,6 +1189,91 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { return iter, nil } +// ReconcileJobSummaries re-creates summaries for all jobs present in the state +// store +func (s *StateStore) ReconcileJobSummaries() error { + txn := s.db.Txn(true) + defer txn.Abort() + + // Find the latest index + latestIndex, err := s.LatestIndex() + if err != nil { + return err + } + + // Get all the jobs + iter, err := txn.Get("jobs", "id") + if err != nil { + return err + } + var jobs []*structs.Job + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + jobs = append(jobs, rawJob.(*structs.Job)) + } + for _, job := range jobs { + + // Find all the allocations for the jobs + var allocs []*structs.Allocation + iter, err = txn.Get("allocs", "job", job.ID) + if err != nil { + return err + } + for { + rawAlloc := iter.Next() + if rawAlloc == nil { + break + } + allocs = append(allocs, rawAlloc.(*structs.Allocation)) + } + + // Create a job summary for the job + summary := structs.JobSummary{ + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + } + for _, tg := range job.TaskGroups { + summary.Summary[tg.Name] = structs.TaskGroupSummary{} + } + // Calculate the summary for the job + for _, alloc := range allocs { + tg := summary.Summary[alloc.TaskGroup] + switch alloc.ClientStatus { + case structs.AllocClientStatusFailed: + tg.Failed += 1 + case structs.AllocClientStatusLost: + tg.Lost += 1 + case structs.AllocClientStatusComplete: + tg.Complete += 1 + case structs.AllocClientStatusRunning: + tg.Running += 1 + case structs.AllocClientStatusPending: + tg.Starting += 1 + default: + s.logger.Printf("[ERR] state_store: invalid client status: %v in allocation %q", alloc.ClientStatus, alloc.ID) + } + summary.Summary[alloc.TaskGroup] = tg + } + + // Insert the job summary + summary.CreateIndex = latestIndex + summary.ModifyIndex = latestIndex + if err := txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) + } + } + + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", latestIndex}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() + return nil +} + // setJobStatuses is a helper for calling setJobStatus on multiple jobs by ID. // It takes a map of job IDs to an optional forceStatus string. It returns an // error if the job doesn't exist or setJobStatus fails. @@ -1445,10 +1543,9 @@ type StateSnapshot struct { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn - watch *stateWatch - items watch.Items - latestIndex uint64 + txn *memdb.Txn + watch *stateWatch + items watch.Items } // Abort is used to abort the restore operation @@ -1510,10 +1607,6 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error { if err := r.txn.Insert("index", idx); err != nil { return fmt.Errorf("index insert failed: %v", err) } - - if idx.Value > r.latestIndex { - r.latestIndex = idx.Value - } return nil } @@ -1535,89 +1628,6 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { return nil } -// JobsWithoutSummary returns the list of jobs which don't have any summary -func (r *StateRestore) JobsWithoutSummary() ([]*structs.Job, error) { - // Get all the jobs - var jobs []*structs.Job - iter, err := r.txn.Get("jobs", "id") - if err != nil { - return nil, fmt.Errorf("couldn't retrieve jobs: %v", err) - } - for { - raw := iter.Next() - if raw == nil { - break - } - - // Filter the jobs which have summaries - job := raw.(*structs.Job) - jobSummary, err := r.txn.First("job_summary", "id", job.ID) - if err != nil { - return nil, fmt.Errorf("unable to get job summary: %v", err) - } - if jobSummary != nil { - continue - } - - jobs = append(jobs, job) - } - return jobs, nil -} - -// CreateJobSummaries computes the job summaries for all the jobs -func (r *StateRestore) CreateJobSummaries(jobs []*structs.Job) error { - for _, job := range jobs { - // Get all the allocations for the job - iter, err := r.txn.Get("allocs", "job", job.ID) - if err != nil { - return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err) - } - var allocs []*structs.Allocation - for { - raw := iter.Next() - if raw == nil { - break - } - allocs = append(allocs, raw.(*structs.Allocation)) - } - - // Create a job summary for the job - summary := structs.JobSummary{ - JobID: job.ID, - Summary: make(map[string]structs.TaskGroupSummary), - } - for _, tg := range job.TaskGroups { - summary.Summary[tg.Name] = structs.TaskGroupSummary{} - } - // Calculate the summary for the job - for _, alloc := range allocs { - tg := summary.Summary[alloc.TaskGroup] - switch alloc.ClientStatus { - case structs.AllocClientStatusFailed: - tg.Failed += 1 - case structs.AllocClientStatusLost: - tg.Lost += 1 - case structs.AllocClientStatusComplete: - tg.Complete += 1 - case structs.AllocClientStatusRunning: - tg.Running += 1 - case structs.AllocClientStatusPending: - tg.Starting += 1 - } - summary.Summary[alloc.TaskGroup] = tg - } - - // Insert the job summary - summary.CreateIndex = r.latestIndex - summary.ModifyIndex = r.latestIndex - if err := r.txn.Insert("job_summary", summary); err != nil { - return fmt.Errorf("error inserting job summary: %v", err) - } - } - - return nil -} - // stateWatch holds shared state for watching updates. This is // outside of StateStore so it can be shared with snapshots. type stateWatch struct { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index cb2a7b075..c7bad31bb 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1169,86 +1169,6 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { } } -func TestStateStore_CreateJobSummaries(t *testing.T) { - state := testStateStore(t) - restore, err := state.Restore() - if err != nil { - t.Fatalf("err: %v", err) - } - // Restore a job - job := mock.Job() - if err := restore.JobRestore(job); err != nil { - t.Fatalf("err: %v", err) - } - - // Restore an Index - index := IndexEntry{ - Key: "Foo", - Value: 100, - } - - if err := restore.IndexRestore(&index); err != nil { - t.Fatalf("err: %v", err) - } - - // Restore an allocation - alloc := mock.Alloc() - alloc.JobID = job.ID - alloc.Job = job - if err := restore.AllocRestore(alloc); err != nil { - t.Fatalf("err: %v", err) - } - - // Create the job summaries - if err := restore.CreateJobSummaries([]*structs.Job{job}); err != nil { - t.Fatalf("err: %v", err) - } - restore.Commit() - - summary, err := state.JobSummaryByID(job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - expected := structs.JobSummary{ - JobID: job.ID, - Summary: map[string]structs.TaskGroupSummary{ - "web": { - Starting: 1, - }, - }, - CreateIndex: 100, - ModifyIndex: 100, - } - - if !reflect.DeepEqual(summary, &expected) { - t.Fatalf("Bad: %#v %#v", summary, expected) - } -} - -func TestStateRestore_JobsWithoutSummaries(t *testing.T) { - state := testStateStore(t) - restore, err := state.Restore() - if err != nil { - t.Fatalf("err: %v", err) - } - // Restore a job - job := mock.Job() - if err := restore.JobRestore(job); err != nil { - t.Fatalf("err: %v", err) - } - - jobs, err := restore.JobsWithoutSummary() - if err != nil { - t.Fatalf("err: %v", err) - } - if len(jobs) != 1 { - t.Fatalf("expected: %v, actual: %v", 1, len(jobs)) - } - if !reflect.DeepEqual(job, jobs[0]) { - t.Fatalf("Bad: %#v %#v", job, jobs[0]) - } -} - func TestStateStore_Indexes(t *testing.T) { state := testStateStore(t) node := mock.Node() @@ -2135,6 +2055,43 @@ func TestStateStore_JobSummary(t *testing.T) { } } +func TestStateStore_ReconcileJobSummary(t *testing.T) { + state := testStateStore(t) + + alloc := mock.Alloc() + state.UpsertJob(100, alloc.Job) + + alloc1 := mock.Alloc() + alloc1.JobID = alloc.Job.ID + alloc1.Job = alloc.Job + state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc1}) + + alloc2 := alloc1.Copy() + alloc2.ClientStatus = structs.AllocClientStatusRunning + state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc2}) + + // DeleteJobSummary is a helper method and doesn't modify the indexes table + state.DeleteJobSummary(130, alloc.Job.ID) + + state.ReconcileJobSummaries() + + summary, _ := state.JobSummaryByID(alloc.Job.ID) + expectedSummary := structs.JobSummary{ + JobID: alloc.Job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Starting: 1, + Running: 1, + }, + }, + CreateIndex: 120, + ModifyIndex: 120, + } + if !reflect.DeepEqual(&expectedSummary, summary) { + t.Fatalf("expected: %v, actual: %v", expectedSummary, summary) + } +} + func TestStateStore_EvictAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() From 72cd53d6e517068bb51e048fb9494fa514956d73 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Aug 2016 16:08:30 -0700 Subject: [PATCH 2/4] Added an endpoint for users to reconcile job summaries --- command/agent/http.go | 1 + command/agent/system_endpoint.go | 17 ++++++++ command/agent/system_endpoint_test.go | 21 ++++++++++ nomad/fsm.go | 59 ++++++++++++++++++++------ nomad/fsm_test.go | 17 ++++---- nomad/state/state_store.go | 27 ++++++------ nomad/structs/structs.go | 1 + nomad/system_endpoint.go | 16 +++++++ nomad/system_endpoint_test.go | 60 +++++++++++++++++++++++++++ 9 files changed, 185 insertions(+), 34 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index 197effe48..d17b68bd9 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -129,6 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) + s.mux.HandleFunc("/v1/system/reconcilesummaries", s.wrap(s.ReconcileJobSummaries)) if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) diff --git a/command/agent/system_endpoint.go b/command/agent/system_endpoint.go index 2ba23db61..ef7a81222 100644 --- a/command/agent/system_endpoint.go +++ b/command/agent/system_endpoint.go @@ -22,3 +22,20 @@ func (s *HTTPServer) GarbageCollectRequest(resp http.ResponseWriter, req *http.R } return nil, nil } + +func (s *HTTPServer) ReconcileJobSummaries(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + if req.Method != "PUT" { + return nil, CodedError(405, ErrInvalidMethod) + } + + var args structs.GenericRequest + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var gResp structs.GenericResponse + if err := s.agent.RPC("System.ReconcileJobSummaries", &args, &gResp); err != nil { + return nil, err + } + return nil, nil +} diff --git a/command/agent/system_endpoint_test.go b/command/agent/system_endpoint_test.go index bd3ca7418..240c434e3 100644 --- a/command/agent/system_endpoint_test.go +++ b/command/agent/system_endpoint_test.go @@ -21,3 +21,24 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) { } }) } + +func TestHTTP_ReconcileJobSummaries(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Make the HTTP request + req, err := http.NewRequest("PUT", "/v1/system/reconcilesummaries", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil { + t.Fatalf("err: %v", err) + } + t.Fatalf("code %v", respW.Code) + + if respW.Code != 200 { + t.Fatalf("expected: %v, actual: %v", 200, respW.Code) + } + }) +} diff --git a/nomad/fsm.go b/nomad/fsm.go index 47541c63e..9390e6039 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -135,6 +135,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyAllocUpdate(buf[1:], log.Index) case structs.AllocClientUpdateRequestType: return n.applyAllocClientUpdate(buf[1:], log.Index) + case structs.ReconcileJobSummariesRequestType: + return n.applyReconcileSummaries(buf[1:], log.Index) default: if ignoreUnknown { n.logger.Printf("[WARN] nomad.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType) @@ -444,6 +446,14 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } +// applyReconcileSummaries reconciles summaries for all the job +func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} { + if err := n.state.ReconcileJobSummaries(index); err != nil { + return err + } + return n.reconcileQueuedAllocations(index) +} + func (n *nomadFSM) Snapshot() (raft.FSMSnapshot, error) { // Create a new snapshot snap, err := n.state.Snapshot() @@ -586,11 +596,19 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { if err != nil { return fmt.Errorf("couldn't fetch index of job summary table: %v", err) } + + // If the index is 0 that means there is no job summary in the snapshot so + // we will have to create them if index == 0 { - if err := n.state.ReconcileJobSummaries(); err != nil { + // query the latest index + latestIndex, err := n.state.LatestIndex() + if err != nil { + return fmt.Errorf("unable to query latest index: %v", index) + } + if err := n.state.ReconcileJobSummaries(latestIndex); err != nil { return fmt.Errorf("error reconciling summaries: %v", err) } - if err := n.reconcileQueuedAllocations(); err != nil { + if err := n.reconcileQueuedAllocations(latestIndex); err != nil { return fmt.Errorf("error re-computing the number of queued allocations:; %v", err) } } @@ -600,7 +618,7 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // reconcileSummaries re-calculates the queued allocations for every job that we // created a Job Summary during the snap shot restore -func (n *nomadFSM) reconcileQueuedAllocations() error { +func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { // Get all the jobs iter, err := n.state.Jobs() if err != nil { @@ -615,12 +633,6 @@ func (n *nomadFSM) reconcileQueuedAllocations() error { jobs = append(jobs, rawJob.(*structs.Job)) } - // Start a restore session - restore, err := n.state.Restore() - if err != nil { - return err - } - defer restore.Abort() snap, err := n.state.Snapshot() if err != nil { return fmt.Errorf("unable to create snapshot: %v", err) @@ -650,10 +662,32 @@ func (n *nomadFSM) reconcileQueuedAllocations() error { if err := sched.Process(eval); err != nil { return err } - summary, err := snap.JobSummaryByID(job.ID) + + // Get the job summary from the fsm state store + summary, err := n.state.JobSummaryByID(job.ID) if err != nil { return err } + + // Add the allocations scheduler has made to queued since these + // allocations are never getting placed until the scheduler is invoked + // with a real planner + if l := len(planner.Plans); l != 1 { + return fmt.Errorf("unexpected number of plans during restore %d. Please file an issue including the logs", l) + } + for _, allocations := range planner.Plans[0].NodeAllocation { + for _, allocation := range allocations { + tgSummary, ok := summary.Summary[allocation.TaskGroup] + if !ok { + return fmt.Errorf("task group %q not found while updating queued count", allocation.TaskGroup) + } + tgSummary.Queued += 1 + summary.Summary[allocation.TaskGroup] = tgSummary + } + } + + // Add the queued allocations attached to the evaluation to the queued + // counter of the job summary if l := len(planner.Evals); l != 1 { return fmt.Errorf("unexpected number of evals during restore %d. Please file an issue including the logs", l) } @@ -662,15 +696,14 @@ func (n *nomadFSM) reconcileQueuedAllocations() error { if !ok { return fmt.Errorf("task group %q not found while updating queued count", tg) } - tgSummary.Queued = queued + tgSummary.Queued += queued summary.Summary[tg] = tgSummary } - if err := restore.JobSummaryRestore(summary); err != nil { + if err := n.state.UpsertJobSummary(index, summary); err != nil { return err } } - restore.Commit() return nil } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c33a789fd..20e02a3e3 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -990,14 +990,13 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { job1.TaskGroups[0].Tasks[0].Resources.CPU = 5000 state.UpsertJob(1000, job1) - // make an allocation and a job which can make partial progress - alloc := mock.Alloc() - state.UpsertJob(1010, alloc.Job) - state.UpsertAllocs(1020, []*structs.Allocation{alloc}) + // make a job which can make partial progress + job2 := mock.Job() + state.UpsertJob(1010, job2) // Delete the summaries state.DeleteJobSummary(1030, job1.ID) - state.DeleteJobSummary(1040, alloc.Job.ID) + state.DeleteJobSummary(1040, job2.ID) // Delete the index if err := state.RemoveIndex("job_summary"); err != nil { @@ -1023,13 +1022,13 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { t.Fatalf("expected: %#v, actual: %#v", &expected, out1) } - out2, _ := state2.JobSummaryByID(alloc.Job.ID) + out2, _ := state2.JobSummaryByID(job2.ID) expected = structs.JobSummary{ - JobID: alloc.Job.ID, + JobID: job2.ID, Summary: map[string]structs.TaskGroupSummary{ "web": structs.TaskGroupSummary{ - Queued: 3, - Starting: 1, + Queued: 10, + Starting: 0, }, }, CreateIndex: latestIndex, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 05f227717..062c0e2de 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -85,15 +85,21 @@ func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) { s.watch.stopWatch(items, notify) } -// UpsertJobSummary upserts a job summary into the state store. This is for -// testing purposes +// UpsertJobSummary upserts a job summary into the state store. func (s *StateStore) UpsertJobSummary(index uint64, jobSummary *structs.JobSummary) error { txn := s.db.Txn(true) defer txn.Abort() + // Update the index if err := txn.Insert("job_summary", *jobSummary); err != nil { return err } + + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() return nil } @@ -108,6 +114,9 @@ func (s *StateStore) DeleteJobSummary(index uint64, id string) error { if _, err := txn.DeleteAll("job_summary", "id", id); err != nil { return fmt.Errorf("deleting job summary failed: %v", err) } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } txn.Commit() return nil } @@ -1191,16 +1200,10 @@ func (s *StateStore) Indexes() (memdb.ResultIterator, error) { // ReconcileJobSummaries re-creates summaries for all jobs present in the state // store -func (s *StateStore) ReconcileJobSummaries() error { +func (s *StateStore) ReconcileJobSummaries(index uint64) error { txn := s.db.Txn(true) defer txn.Abort() - // Find the latest index - latestIndex, err := s.LatestIndex() - if err != nil { - return err - } - // Get all the jobs iter, err := txn.Get("jobs", "id") if err != nil { @@ -1259,15 +1262,15 @@ func (s *StateStore) ReconcileJobSummaries() error { } // Insert the job summary - summary.CreateIndex = latestIndex - summary.ModifyIndex = latestIndex + summary.CreateIndex = index + summary.ModifyIndex = index if err := txn.Insert("job_summary", summary); err != nil { return fmt.Errorf("error inserting job summary: %v", err) } } // Update the indexes table for job summary - if err := txn.Insert("index", &IndexEntry{"job_summary", latestIndex}); err != nil { + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } txn.Commit() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 62f5d5310..77f1abdf6 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -45,6 +45,7 @@ const ( EvalDeleteRequestType AllocUpdateRequestType AllocClientUpdateRequestType + ReconcileJobSummariesRequestType ) const ( diff --git a/nomad/system_endpoint.go b/nomad/system_endpoint.go index 3c3e4529d..65a0b7c52 100644 --- a/nomad/system_endpoint.go +++ b/nomad/system_endpoint.go @@ -27,3 +27,19 @@ func (s *System) GarbageCollect(args *structs.GenericRequest, reply *structs.Gen s.srv.evalBroker.Enqueue(s.srv.coreJobEval(structs.CoreJobForceGC, snapshotIndex)) return nil } + +// ReconcileSummaries reconciles the summaries of all the jobs in the state +// store +func (s *System) ReconcileJobSummaries(args *structs.GenericRequest, reply *structs.GenericResponse) error { + if done, err := s.srv.forward("System.ReconcileJobSummaries", args, args, reply); done { + return err + } + + _, index, err := s.srv.raftApply(structs.ReconcileJobSummariesRequestType, args) + if err != nil { + s.srv.logger.Printf("[ERR] nomad.client: Reconcile failed: %v", err) + return err + } + reply.Index = index + return nil +} diff --git a/nomad/system_endpoint_test.go b/nomad/system_endpoint_test.go index 9f4cc1f9d..91e5d51c9 100644 --- a/nomad/system_endpoint_test.go +++ b/nomad/system_endpoint_test.go @@ -2,6 +2,7 @@ package nomad import ( "fmt" + "reflect" "testing" "github.com/hashicorp/net-rpc-msgpackrpc" @@ -49,3 +50,62 @@ func TestSystemEndpoint_GarbageCollect(t *testing.T) { t.Fatalf("err: %s", err) }) } + +func TestSystemEndpoint_ReconcileSummaries(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Insert a job that can be GC'd + state := s1.fsm.State() + s1.fsm.State() + job := mock.Job() + if err := state.UpsertJob(1000, job); err != nil { + t.Fatalf("UpsertJob() failed: %v", err) + } + + // Delete the job summary + state.DeleteJobSummary(1001, job.ID) + + // Make the GC request + req := &structs.GenericRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "System.ReconcileJobSummaries", req, &resp); err != nil { + t.Fatalf("expect err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + // Check if Nomad has reconciled the summary for the job + summary, err := state.JobSummaryByID(job.ID) + if err != nil { + return false, err + } + if summary.CreateIndex == 0 || summary.ModifyIndex == 0 { + t.Fatalf("create index: %v, modify index: %v", summary.CreateIndex, summary.ModifyIndex) + } + + // setting the modifyindex and createindex of the expected summary to + // the output so that we can do deep equal + expectedSummary := structs.JobSummary{ + JobID: job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{ + Queued: 10, + }, + }, + ModifyIndex: summary.ModifyIndex, + CreateIndex: summary.CreateIndex, + } + if !reflect.DeepEqual(&expectedSummary, summary) { + return false, fmt.Errorf("expected: %v, actual: %v", expectedSummary, summary) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} From 2ff26b9739f9926c6c5e4e86967e83384d57a08c Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Aug 2016 16:58:12 -0700 Subject: [PATCH 3/4] Not updating summary if job is de-registered --- nomad/state/state_store.go | 10 +++++++ nomad/state/state_store_test.go | 47 ++++++++++++++++++++++++++++++++- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 062c0e2de..3451386f5 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1453,6 +1453,16 @@ func (s *StateStore) updateSummaryWithAlloc(index uint64, alloc *structs.Allocat return fmt.Errorf("unable to lookup job summary for job id %q: %v", err) } if summaryRaw == nil { + // Check if the job is de-registered + rawJob, err := txn.First("jobs", "id", alloc.JobID) + if err != nil { + return fmt.Errorf("unable to query job: %v", err) + } + + // If the job is de-registered then we skip updating it's summary + if rawJob == nil { + return nil + } return fmt.Errorf("job summary for job %q is not present", alloc.JobID) } summary := summaryRaw.(structs.JobSummary) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index c7bad31bb..f883a3c49 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2073,7 +2073,7 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { // DeleteJobSummary is a helper method and doesn't modify the indexes table state.DeleteJobSummary(130, alloc.Job.ID) - state.ReconcileJobSummaries() + state.ReconcileJobSummaries(120) summary, _ := state.JobSummaryByID(alloc.Job.ID) expectedSummary := structs.JobSummary{ @@ -2092,6 +2092,51 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { } } +func TestStateStore_UpdateAlloc_JobNotPresent(t *testing.T) { + state := testStateStore(t) + + alloc := mock.Alloc() + state.UpsertJob(100, alloc.Job) + state.UpsertAllocs(200, []*structs.Allocation{alloc}) + + // Delete the job + state.DeleteJob(300, alloc.Job.ID) + + // Update the alloc + alloc1 := alloc.Copy() + alloc1.ClientStatus = structs.AllocClientStatusRunning + + // Updating allocation should not throw any error + if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil { + t.Fatalf("expect err: %v", err) + } + + // Re-Register the job + state.UpsertJob(500, alloc.Job) + + // Update the alloc again + alloc2 := alloc.Copy() + alloc2.ClientStatus = structs.AllocClientStatusComplete + if err := state.UpdateAllocsFromClient(400, []*structs.Allocation{alloc1}); err != nil { + t.Fatalf("expect err: %v", err) + } + + // Job Summary of the newly registered job shouldn't account for the + // allocation update for the older job + expectedSummary := structs.JobSummary{ + JobID: alloc1.JobID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{}, + }, + CreateIndex: 500, + ModifyIndex: 500, + } + summary, _ := state.JobSummaryByID(alloc.Job.ID) + if !reflect.DeepEqual(&expectedSummary, summary) { + t.Fatalf("expected: %v, actual: %v", expectedSummary, summary) + } +} + func TestStateStore_EvictAlloc_Alloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc() From 3bdaf91dd10b59247faaf1a49897b047ae00fdb2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Aug 2016 18:08:37 -0700 Subject: [PATCH 4/4] Updated tests and comments --- command/agent/http.go | 2 +- command/agent/system_endpoint_test.go | 3 +- nomad/fsm.go | 24 +++++----- nomad/fsm_test.go | 17 ++++--- nomad/state/state_store.go | 33 ++++++-------- nomad/state/state_store_test.go | 64 ++++++++++++++++++++++++--- 6 files changed, 98 insertions(+), 45 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index d17b68bd9..73bb8d252 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -129,7 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) - s.mux.HandleFunc("/v1/system/reconcilesummaries", s.wrap(s.ReconcileJobSummaries)) + s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries)) if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) diff --git a/command/agent/system_endpoint_test.go b/command/agent/system_endpoint_test.go index 240c434e3..fa186a477 100644 --- a/command/agent/system_endpoint_test.go +++ b/command/agent/system_endpoint_test.go @@ -25,7 +25,7 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) { func TestHTTP_ReconcileJobSummaries(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Make the HTTP request - req, err := http.NewRequest("PUT", "/v1/system/reconcilesummaries", nil) + req, err := http.NewRequest("PUT", "/v1/system/reconcile/summaries", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -35,7 +35,6 @@ func TestHTTP_ReconcileJobSummaries(t *testing.T) { if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil { t.Fatalf("err: %v", err) } - t.Fatalf("code %v", respW.Code) if respW.Code != 200 { t.Fatalf("expected: %v, actual: %v", 200, respW.Code) diff --git a/nomad/fsm.go b/nomad/fsm.go index 9390e6039..d5ba49298 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -446,7 +446,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } -// applyReconcileSummaries reconciles summaries for all the job +// applyReconcileSummaries reconciles summaries for all the jobs func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} { if err := n.state.ReconcileJobSummaries(index); err != nil { return err @@ -592,6 +592,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create Job Summaries // COMPAT 0.4 -> 0.4.1 + // We can remove this in 0.5. This exists so that the server creates job + // summaries if they were not present previously. When users upgrade to 0.5 + // from 0.4.1, the snapshot will contain job summaries so it will be safe to + // remove this block. index, err := n.state.Index("job_summary") if err != nil { return fmt.Errorf("couldn't fetch index of job summary table: %v", err) @@ -624,20 +628,20 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { if err != nil { return err } - var jobs []*structs.Job - for { - rawJob := iter.Next() - if rawJob == nil { - break - } - jobs = append(jobs, rawJob.(*structs.Job)) - } snap, err := n.state.Snapshot() if err != nil { return fmt.Errorf("unable to create snapshot: %v", err) } - for _, job := range jobs { + + // Invoking the scheduler for every job so that we can populate the number + // of queued allocations for every job + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + job := rawJob.(*structs.Job) planner := &scheduler.Harness{ State: &snap.StateStore, } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 20e02a3e3..718338b8c 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -991,12 +991,14 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { state.UpsertJob(1000, job1) // make a job which can make partial progress - job2 := mock.Job() - state.UpsertJob(1010, job2) + alloc := mock.Alloc() + alloc.NodeID = node.ID + state.UpsertJob(1010, alloc.Job) + state.UpsertAllocs(1011, []*structs.Allocation{alloc}) // Delete the summaries state.DeleteJobSummary(1030, job1.ID) - state.DeleteJobSummary(1040, job2.ID) + state.DeleteJobSummary(1040, alloc.Job.ID) // Delete the index if err := state.RemoveIndex("job_summary"); err != nil { @@ -1022,13 +1024,16 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { t.Fatalf("expected: %#v, actual: %#v", &expected, out1) } - out2, _ := state2.JobSummaryByID(job2.ID) + // This exercises the code path which adds the allocations made by the + // planner and the number of unplaced allocations in the reconcile summaries + // codepath + out2, _ := state2.JobSummaryByID(alloc.Job.ID) expected = structs.JobSummary{ - JobID: job2.ID, + JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ "web": structs.TaskGroupSummary{ Queued: 10, - Starting: 0, + Starting: 1, }, }, CreateIndex: latestIndex, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3451386f5..59565a021 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1209,29 +1209,12 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { if err != nil { return err } - var jobs []*structs.Job for { rawJob := iter.Next() if rawJob == nil { break } - jobs = append(jobs, rawJob.(*structs.Job)) - } - for _, job := range jobs { - - // Find all the allocations for the jobs - var allocs []*structs.Allocation - iter, err = txn.Get("allocs", "job", job.ID) - if err != nil { - return err - } - for { - rawAlloc := iter.Next() - if rawAlloc == nil { - break - } - allocs = append(allocs, rawAlloc.(*structs.Allocation)) - } + job := rawJob.(*structs.Job) // Create a job summary for the job summary := structs.JobSummary{ @@ -1241,8 +1224,20 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { for _, tg := range job.TaskGroups { summary.Summary[tg.Name] = structs.TaskGroupSummary{} } + + // Find all the allocations for the jobs + iterAllocs, err := txn.Get("allocs", "job", job.ID) + if err != nil { + return err + } + // Calculate the summary for the job - for _, alloc := range allocs { + for { + rawAlloc := iterAllocs.Next() + if rawAlloc == nil { + break + } + alloc := rawAlloc.(*structs.Allocation) tg := summary.Summary[alloc.TaskGroup] switch alloc.ClientStatus { case structs.AllocClientStatusFailed: diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f883a3c49..15a2a2542 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2058,17 +2058,61 @@ func TestStateStore_JobSummary(t *testing.T) { func TestStateStore_ReconcileJobSummary(t *testing.T) { state := testStateStore(t) + // Create an alloc alloc := mock.Alloc() + + // Add another task group to the job + tg2 := alloc.Job.TaskGroups[0].Copy() + tg2.Name = "db" + alloc.Job.TaskGroups = append(alloc.Job.TaskGroups, tg2) state.UpsertJob(100, alloc.Job) - alloc1 := mock.Alloc() - alloc1.JobID = alloc.Job.ID - alloc1.Job = alloc.Job - state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc1}) + // Create one more alloc for the db task group + alloc2 := mock.Alloc() + alloc2.TaskGroup = "db" + alloc2.JobID = alloc.JobID + alloc2.Job = alloc.Job - alloc2 := alloc1.Copy() - alloc2.ClientStatus = structs.AllocClientStatusRunning - state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc2}) + // Upserts the alloc + state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc2}) + + // Change the state of the first alloc to running + alloc3 := alloc.Copy() + alloc3.ClientStatus = structs.AllocClientStatusRunning + state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc3}) + + //Add some more allocs to the second tg + alloc4 := mock.Alloc() + alloc4.JobID = alloc.JobID + alloc4.Job = alloc.Job + alloc4.TaskGroup = "db" + alloc5 := alloc4.Copy() + alloc5.ClientStatus = structs.AllocClientStatusRunning + + alloc6 := mock.Alloc() + alloc6.JobID = alloc.JobID + alloc6.Job = alloc.Job + alloc6.TaskGroup = "db" + alloc7 := alloc6.Copy() + alloc7.ClientStatus = structs.AllocClientStatusComplete + + alloc8 := mock.Alloc() + alloc8.JobID = alloc.JobID + alloc8.Job = alloc.Job + alloc8.TaskGroup = "db" + alloc9 := alloc8.Copy() + alloc9.ClientStatus = structs.AllocClientStatusFailed + + alloc10 := mock.Alloc() + alloc10.JobID = alloc.JobID + alloc10.Job = alloc.Job + alloc10.TaskGroup = "db" + alloc11 := alloc10.Copy() + alloc11.ClientStatus = structs.AllocClientStatusLost + + state.UpsertAllocs(130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10}) + + state.UpdateAllocsFromClient(150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11}) // DeleteJobSummary is a helper method and doesn't modify the indexes table state.DeleteJobSummary(130, alloc.Job.ID) @@ -2080,8 +2124,14 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ "web": structs.TaskGroupSummary{ + Running: 1, + }, + "db": structs.TaskGroupSummary{ Starting: 1, Running: 1, + Failed: 1, + Complete: 1, + Lost: 1, }, }, CreateIndex: 120,