diff --git a/nomad/fsm.go b/nomad/fsm.go index 1b42aea2a..71d028868 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -577,6 +577,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } + // Create Job Summaries + // COMPAT 0.4 -> 0.4.1 + if err := restore.CreateJobSummaries(); err != nil { + return fmt.Errorf("error creating job summaries: %v", err) + } + // Commit the state restore restore.Commit() return nil diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index b724502da..94ce680b7 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1499,6 +1499,72 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error { return nil } +// CreateJobSummaries computes the job summaries for all the jobs +func (r *StateRestore) CreateJobSummaries() error { + // Get all the jobs + var jobs []*structs.Job + iter, err := r.txn.Get("jobs", "id") + if err != nil { + return fmt.Errorf("couldn't retrieve jobs: %v", err) + } + for { + raw := iter.Next() + if raw == nil { + break + } + jobs = append(jobs, raw.(*structs.Job)) + } + + for _, job := range jobs { + + // Get all the allocations for the job + iter, err = r.txn.Get("allocs", "job", job.ID) + if err != nil { + return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err) + } + var allocs []*structs.Allocation + for { + raw := iter.Next() + if raw == nil { + break + } + allocs = append(allocs, raw.(*structs.Allocation)) + } + + // Create a job summary for the job + summary := structs.JobSummary{ + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + } + // Calculate the summary for the job + for _, alloc := range allocs { + if _, ok := summary.Summary[alloc.TaskGroup]; !ok { + summary.Summary[alloc.TaskGroup] = structs.TaskGroupSummary{} + } + tg := summary.Summary[alloc.TaskGroup] + switch alloc.ClientStatus { + case structs.AllocClientStatusFailed: + tg.Failed += 1 + case structs.AllocClientStatusLost: + tg.Lost += 1 + case structs.AllocClientStatusComplete: + tg.Complete += 1 + case structs.AllocClientStatusRunning: + tg.Running += 1 + case structs.AllocClientStatusPending: + tg.Starting += 1 + } + summary.Summary[alloc.TaskGroup] = tg + } + // Insert the job summary + if err := r.txn.Insert("job_summary", summary); err != nil { + return fmt.Errorf("error inserting job summary: %v", err) + } + } + + return nil +} + // stateWatch holds shared state for watching updates. This is // outside of StateStore so it can be shared with snapshots. type stateWatch struct { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 323ae09e2..03639bcfc 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -1111,6 +1111,50 @@ func TestStateStore_RestoreJobSummary(t *testing.T) { } } +func TestStateStore_CreateJobSummaries(t *testing.T) { + state := testStateStore(t) + restore, err := state.Restore() + if err != nil { + t.Fatalf("err: %v", err) + } + // Restore a job + job := mock.Job() + if err := restore.JobRestore(job); err != nil { + t.Fatalf("err: %v", err) + } + + // Restore an allocation + alloc := mock.Alloc() + alloc.JobID = job.ID + alloc.Job = job + if err := restore.AllocRestore(alloc); err != nil { + t.Fatalf("err: %v", err) + } + + // Create the job summaries + if err := restore.CreateJobSummaries(); err != nil { + t.Fatalf("err: %v", err) + } + restore.Commit() + + summary, err := state.JobSummaryByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + expected := structs.JobSummary{ + JobID: job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": { + Starting: 1, + }, + }, + } + + if !reflect.DeepEqual(summary, &expected) { + t.Fatalf("Bad: %#v %#v", summary, expected) + } +} + func TestStateStore_Indexes(t *testing.T) { state := testStateStore(t) node := mock.Node()