Implemented job state accounting logic for upsert job

This commit is contained in:
Diptanu Choudhury
2016-06-30 12:04:22 -07:00
parent fe40544455
commit f98be5daf3
3 changed files with 81 additions and 1 deletions

View File

@@ -132,7 +132,7 @@ func jobSummarySchema() *memdb.TableSchema {
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Field: "JobID",
Lowercase: true,
},
},

View File

@@ -344,6 +344,11 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
return fmt.Errorf("index update failed: %v", err)
}
// Update the job summary
if err := s.updateSummaryWithJob(job, txn); err != nil {
return fmt.Errorf("job summary update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
@@ -455,6 +460,21 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
return iter, nil
}
// JobSummary returns an iterator over al the jobs which matches a specific id.
func (s *StateStore) JobSummary(jobID string) (*structs.JobSummary, error) {
txn := s.db.Txn(false)
existing, err := txn.First("jobsummary", "id", jobID)
if err != nil {
return nil, err
}
if existing != nil {
return existing.(*structs.JobSummary), nil
}
return nil, nil
}
// UpsertPeriodicLaunch is used to register a launch or update it.
func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
txn := s.db.Txn(true)
@@ -839,6 +859,11 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
return fmt.Errorf("alloc insert failed: %v", err)
}
existingAlloc, _ := existing.(*structs.Allocation)
if err := s.updateSummaryWithAlloc(alloc, existingAlloc, txn); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
// If the allocation is running, force the job to running status.
forceStatus := ""
if !alloc.TerminalStatus() {
@@ -1155,6 +1180,47 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
return structs.JobStatusPending, nil
}
func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) error {
existing, err := s.JobSummary(job.ID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for job: %v", err)
}
if existing == nil {
existing = &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
}
}
for _, tg := range job.TaskGroups {
if summary, ok := existing.Summary[tg.Name]; !ok {
newSummary := structs.TaskGroupSummary{
Name: tg.Name,
Queued: tg.Count,
Complete: 0,
Failed: 0,
Running: 0,
Starting: 0,
}
existing.Summary[tg.Name] = newSummary
} else {
if summary.Queued > tg.Count {
summary.Queued = tg.Count
}
existing.Summary[tg.Name] = summary
}
}
if err := txn.Insert("jobsummary", existing); err != nil {
return err
}
return nil
}
func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
existingAlloc *structs.Allocation, txn *memdb.Txn) error {
return nil
}
// StateSnapshot is used to provide a point-in-time snapshot
type StateSnapshot struct {
StateStore

View File

@@ -937,6 +937,20 @@ const (
CoreJobPriority = JobMaxPriority * 2
)
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary
}
type TaskGroupSummary struct {
Name string
Queued int
Complete int
Failed int
Running int
Starting int
}
// Job is the scope of a scheduling request to Nomad. It is the largest
// scoped object, and is a named collection of task groups. Each task group
// is further composed of tasks. A task group (TG) is the unit of scheduling