From 3bdaf91dd10b59247faaf1a49897b047ae00fdb2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Wed, 3 Aug 2016 18:08:37 -0700 Subject: [PATCH] Updated tests and comments --- command/agent/http.go | 2 +- command/agent/system_endpoint_test.go | 3 +- nomad/fsm.go | 24 +++++----- nomad/fsm_test.go | 17 ++++--- nomad/state/state_store.go | 33 ++++++-------- nomad/state/state_store_test.go | 64 ++++++++++++++++++++++++--- 6 files changed, 98 insertions(+), 45 deletions(-) diff --git a/command/agent/http.go b/command/agent/http.go index d17b68bd9..73bb8d252 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -129,7 +129,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest)) - s.mux.HandleFunc("/v1/system/reconcilesummaries", s.wrap(s.ReconcileJobSummaries)) + s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries)) if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) diff --git a/command/agent/system_endpoint_test.go b/command/agent/system_endpoint_test.go index 240c434e3..fa186a477 100644 --- a/command/agent/system_endpoint_test.go +++ b/command/agent/system_endpoint_test.go @@ -25,7 +25,7 @@ func TestHTTP_SystemGarbageCollect(t *testing.T) { func TestHTTP_ReconcileJobSummaries(t *testing.T) { httpTest(t, nil, func(s *TestServer) { // Make the HTTP request - req, err := http.NewRequest("PUT", "/v1/system/reconcilesummaries", nil) + req, err := http.NewRequest("PUT", "/v1/system/reconcile/summaries", nil) if err != nil { t.Fatalf("err: %v", err) } @@ -35,7 +35,6 @@ func TestHTTP_ReconcileJobSummaries(t *testing.T) { if _, err := s.Server.ReconcileJobSummaries(respW, req); err != nil { t.Fatalf("err: %v", err) } - t.Fatalf("code %v", respW.Code) if respW.Code != 200 { t.Fatalf("expected: %v, actual: %v", 200, respW.Code) diff --git a/nomad/fsm.go b/nomad/fsm.go index 9390e6039..d5ba49298 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -446,7 +446,7 @@ func (n *nomadFSM) applyAllocClientUpdate(buf []byte, index uint64) interface{} return nil } -// applyReconcileSummaries reconciles summaries for all the job +// applyReconcileSummaries reconciles summaries for all the jobs func (n *nomadFSM) applyReconcileSummaries(buf []byte, index uint64) interface{} { if err := n.state.ReconcileJobSummaries(index); err != nil { return err @@ -592,6 +592,10 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { // Create Job Summaries // COMPAT 0.4 -> 0.4.1 + // We can remove this in 0.5. This exists so that the server creates job + // summaries if they were not present previously. When users upgrade to 0.5 + // from 0.4.1, the snapshot will contain job summaries so it will be safe to + // remove this block. index, err := n.state.Index("job_summary") if err != nil { return fmt.Errorf("couldn't fetch index of job summary table: %v", err) @@ -624,20 +628,20 @@ func (n *nomadFSM) reconcileQueuedAllocations(index uint64) error { if err != nil { return err } - var jobs []*structs.Job - for { - rawJob := iter.Next() - if rawJob == nil { - break - } - jobs = append(jobs, rawJob.(*structs.Job)) - } snap, err := n.state.Snapshot() if err != nil { return fmt.Errorf("unable to create snapshot: %v", err) } - for _, job := range jobs { + + // Invoking the scheduler for every job so that we can populate the number + // of queued allocations for every job + for { + rawJob := iter.Next() + if rawJob == nil { + break + } + job := rawJob.(*structs.Job) planner := &scheduler.Harness{ State: &snap.StateStore, } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 20e02a3e3..718338b8c 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -991,12 +991,14 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { state.UpsertJob(1000, job1) // make a job which can make partial progress - job2 := mock.Job() - state.UpsertJob(1010, job2) + alloc := mock.Alloc() + alloc.NodeID = node.ID + state.UpsertJob(1010, alloc.Job) + state.UpsertAllocs(1011, []*structs.Allocation{alloc}) // Delete the summaries state.DeleteJobSummary(1030, job1.ID) - state.DeleteJobSummary(1040, job2.ID) + state.DeleteJobSummary(1040, alloc.Job.ID) // Delete the index if err := state.RemoveIndex("job_summary"); err != nil { @@ -1022,13 +1024,16 @@ func TestFSM_SnapshotRestore_AddMissingSummary(t *testing.T) { t.Fatalf("expected: %#v, actual: %#v", &expected, out1) } - out2, _ := state2.JobSummaryByID(job2.ID) + // This exercises the code path which adds the allocations made by the + // planner and the number of unplaced allocations in the reconcile summaries + // codepath + out2, _ := state2.JobSummaryByID(alloc.Job.ID) expected = structs.JobSummary{ - JobID: job2.ID, + JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ "web": structs.TaskGroupSummary{ Queued: 10, - Starting: 0, + Starting: 1, }, }, CreateIndex: latestIndex, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 3451386f5..59565a021 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1209,29 +1209,12 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { if err != nil { return err } - var jobs []*structs.Job for { rawJob := iter.Next() if rawJob == nil { break } - jobs = append(jobs, rawJob.(*structs.Job)) - } - for _, job := range jobs { - - // Find all the allocations for the jobs - var allocs []*structs.Allocation - iter, err = txn.Get("allocs", "job", job.ID) - if err != nil { - return err - } - for { - rawAlloc := iter.Next() - if rawAlloc == nil { - break - } - allocs = append(allocs, rawAlloc.(*structs.Allocation)) - } + job := rawJob.(*structs.Job) // Create a job summary for the job summary := structs.JobSummary{ @@ -1241,8 +1224,20 @@ func (s *StateStore) ReconcileJobSummaries(index uint64) error { for _, tg := range job.TaskGroups { summary.Summary[tg.Name] = structs.TaskGroupSummary{} } + + // Find all the allocations for the jobs + iterAllocs, err := txn.Get("allocs", "job", job.ID) + if err != nil { + return err + } + // Calculate the summary for the job - for _, alloc := range allocs { + for { + rawAlloc := iterAllocs.Next() + if rawAlloc == nil { + break + } + alloc := rawAlloc.(*structs.Allocation) tg := summary.Summary[alloc.TaskGroup] switch alloc.ClientStatus { case structs.AllocClientStatusFailed: diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index f883a3c49..15a2a2542 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2058,17 +2058,61 @@ func TestStateStore_JobSummary(t *testing.T) { func TestStateStore_ReconcileJobSummary(t *testing.T) { state := testStateStore(t) + // Create an alloc alloc := mock.Alloc() + + // Add another task group to the job + tg2 := alloc.Job.TaskGroups[0].Copy() + tg2.Name = "db" + alloc.Job.TaskGroups = append(alloc.Job.TaskGroups, tg2) state.UpsertJob(100, alloc.Job) - alloc1 := mock.Alloc() - alloc1.JobID = alloc.Job.ID - alloc1.Job = alloc.Job - state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc1}) + // Create one more alloc for the db task group + alloc2 := mock.Alloc() + alloc2.TaskGroup = "db" + alloc2.JobID = alloc.JobID + alloc2.Job = alloc.Job - alloc2 := alloc1.Copy() - alloc2.ClientStatus = structs.AllocClientStatusRunning - state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc2}) + // Upserts the alloc + state.UpsertAllocs(110, []*structs.Allocation{alloc, alloc2}) + + // Change the state of the first alloc to running + alloc3 := alloc.Copy() + alloc3.ClientStatus = structs.AllocClientStatusRunning + state.UpdateAllocsFromClient(120, []*structs.Allocation{alloc3}) + + //Add some more allocs to the second tg + alloc4 := mock.Alloc() + alloc4.JobID = alloc.JobID + alloc4.Job = alloc.Job + alloc4.TaskGroup = "db" + alloc5 := alloc4.Copy() + alloc5.ClientStatus = structs.AllocClientStatusRunning + + alloc6 := mock.Alloc() + alloc6.JobID = alloc.JobID + alloc6.Job = alloc.Job + alloc6.TaskGroup = "db" + alloc7 := alloc6.Copy() + alloc7.ClientStatus = structs.AllocClientStatusComplete + + alloc8 := mock.Alloc() + alloc8.JobID = alloc.JobID + alloc8.Job = alloc.Job + alloc8.TaskGroup = "db" + alloc9 := alloc8.Copy() + alloc9.ClientStatus = structs.AllocClientStatusFailed + + alloc10 := mock.Alloc() + alloc10.JobID = alloc.JobID + alloc10.Job = alloc.Job + alloc10.TaskGroup = "db" + alloc11 := alloc10.Copy() + alloc11.ClientStatus = structs.AllocClientStatusLost + + state.UpsertAllocs(130, []*structs.Allocation{alloc4, alloc6, alloc8, alloc10}) + + state.UpdateAllocsFromClient(150, []*structs.Allocation{alloc5, alloc7, alloc9, alloc11}) // DeleteJobSummary is a helper method and doesn't modify the indexes table state.DeleteJobSummary(130, alloc.Job.ID) @@ -2080,8 +2124,14 @@ func TestStateStore_ReconcileJobSummary(t *testing.T) { JobID: alloc.Job.ID, Summary: map[string]structs.TaskGroupSummary{ "web": structs.TaskGroupSummary{ + Running: 1, + }, + "db": structs.TaskGroupSummary{ Starting: 1, Running: 1, + Failed: 1, + Complete: 1, + Lost: 1, }, }, CreateIndex: 120,