Added code to create missing job summaries

This commit is contained in:
Diptanu Choudhury
2016-07-21 23:59:28 -07:00
parent 5e86e9a829
commit c79784d0ea
3 changed files with 116 additions and 0 deletions

View File

@@ -577,6 +577,12 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}
// Create Job Summaries
// COMPAT 0.4 -> 0.4.1
if err := restore.CreateJobSummaries(); err != nil {
return fmt.Errorf("error creating job summaries: %v", err)
}
// Commit the state restore
restore.Commit()
return nil

View File

@@ -1499,6 +1499,72 @@ func (r *StateRestore) JobSummaryRestore(jobSummary *structs.JobSummary) error {
return nil
}
// CreateJobSummaries computes the job summaries for all the jobs
func (r *StateRestore) CreateJobSummaries() error {
// Get all the jobs
var jobs []*structs.Job
iter, err := r.txn.Get("jobs", "id")
if err != nil {
return fmt.Errorf("couldn't retrieve jobs: %v", err)
}
for {
raw := iter.Next()
if raw == nil {
break
}
jobs = append(jobs, raw.(*structs.Job))
}
for _, job := range jobs {
// Get all the allocations for the job
iter, err = r.txn.Get("allocs", "job", job.ID)
if err != nil {
return fmt.Errorf("couldn't retrieve allocations for job %v: %v", job.ID, err)
}
var allocs []*structs.Allocation
for {
raw := iter.Next()
if raw == nil {
break
}
allocs = append(allocs, raw.(*structs.Allocation))
}
// Create a job summary for the job
summary := structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
}
// Calculate the summary for the job
for _, alloc := range allocs {
if _, ok := summary.Summary[alloc.TaskGroup]; !ok {
summary.Summary[alloc.TaskGroup] = structs.TaskGroupSummary{}
}
tg := summary.Summary[alloc.TaskGroup]
switch alloc.ClientStatus {
case structs.AllocClientStatusFailed:
tg.Failed += 1
case structs.AllocClientStatusLost:
tg.Lost += 1
case structs.AllocClientStatusComplete:
tg.Complete += 1
case structs.AllocClientStatusRunning:
tg.Running += 1
case structs.AllocClientStatusPending:
tg.Starting += 1
}
summary.Summary[alloc.TaskGroup] = tg
}
// Insert the job summary
if err := r.txn.Insert("job_summary", summary); err != nil {
return fmt.Errorf("error inserting job summary: %v", err)
}
}
return nil
}
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {

View File

@@ -1111,6 +1111,50 @@ func TestStateStore_RestoreJobSummary(t *testing.T) {
}
}
func TestStateStore_CreateJobSummaries(t *testing.T) {
state := testStateStore(t)
restore, err := state.Restore()
if err != nil {
t.Fatalf("err: %v", err)
}
// Restore a job
job := mock.Job()
if err := restore.JobRestore(job); err != nil {
t.Fatalf("err: %v", err)
}
// Restore an allocation
alloc := mock.Alloc()
alloc.JobID = job.ID
alloc.Job = job
if err := restore.AllocRestore(alloc); err != nil {
t.Fatalf("err: %v", err)
}
// Create the job summaries
if err := restore.CreateJobSummaries(); err != nil {
t.Fatalf("err: %v", err)
}
restore.Commit()
summary, err := state.JobSummaryByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
expected := structs.JobSummary{
JobID: job.ID,
Summary: map[string]structs.TaskGroupSummary{
"web": {
Starting: 1,
},
},
}
if !reflect.DeepEqual(summary, &expected) {
t.Fatalf("Bad: %#v %#v", summary, expected)
}
}
func TestStateStore_Indexes(t *testing.T) {
state := testStateStore(t)
node := mock.Node()