diff --git a/nomad/state/schema.go b/nomad/state/schema.go index a3a8f2f38..f4f95263a 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -132,7 +132,7 @@ func jobSummarySchema() *memdb.TableSchema { AllowMissing: false, Unique: true, Indexer: &memdb.StringFieldIndex{ - Field: "ID", + Field: "JobID", Lowercase: true, }, }, diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 8f40e64a4..d0262d137 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -344,6 +344,11 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { return fmt.Errorf("index update failed: %v", err) } + // Update the job summary + if err := s.updateSummaryWithJob(job, txn); err != nil { + return fmt.Errorf("job summary update failed: %v", err) + } + txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() return nil @@ -455,6 +460,21 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) { return iter, nil } +// JobSummary returns an iterator over al the jobs which matches a specific id. +func (s *StateStore) JobSummary(jobID string) (*structs.JobSummary, error) { + txn := s.db.Txn(false) + + existing, err := txn.First("jobsummary", "id", jobID) + if err != nil { + return nil, err + } + if existing != nil { + return existing.(*structs.JobSummary), nil + } + + return nil, nil +} + // UpsertPeriodicLaunch is used to register a launch or update it. func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { txn := s.db.Txn(true) @@ -839,6 +859,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("alloc insert failed: %v", err) } + existingAlloc, _ := existing.(*structs.Allocation) + if err := s.updateSummaryWithAlloc(alloc, existingAlloc, txn); err != nil { + return fmt.Errorf("updating job summary failed: %v", err) + } + // If the allocation is running, force the job to running status. forceStatus := "" if !alloc.TerminalStatus() { @@ -1155,6 +1180,47 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b return structs.JobStatusPending, nil } +func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) error { + existing, err := s.JobSummary(job.ID) + if err != nil { + return fmt.Errorf("unable to retrieve summary for job: %v", err) + } + if existing == nil { + existing = &structs.JobSummary{ + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + } + } + for _, tg := range job.TaskGroups { + if summary, ok := existing.Summary[tg.Name]; !ok { + newSummary := structs.TaskGroupSummary{ + Name: tg.Name, + Queued: tg.Count, + Complete: 0, + Failed: 0, + Running: 0, + Starting: 0, + } + existing.Summary[tg.Name] = newSummary + } else { + if summary.Queued > tg.Count { + summary.Queued = tg.Count + } + existing.Summary[tg.Name] = summary + } + } + + if err := txn.Insert("jobsummary", existing); err != nil { + return err + } + return nil +} + +func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, + existingAlloc *structs.Allocation, txn *memdb.Txn) error { + return nil +} + // StateSnapshot is used to provide a point-in-time snapshot type StateSnapshot struct { StateStore diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 55d2af707..b502012cf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -937,6 +937,20 @@ const ( CoreJobPriority = JobMaxPriority * 2 ) +type JobSummary struct { + JobID string + Summary map[string]TaskGroupSummary +} + +type TaskGroupSummary struct { + Name string + Queued int + Complete int + Failed int + Running int + Starting int +} + // Job is the scope of a scheduling request to Nomad. It is the largest // scoped object, and is a named collection of task groups. Each task group // is further composed of tasks. A task group (TG) is the unit of scheduling