diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 12e40e542..e8f2c3f0e 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -54,6 +54,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ case strings.HasSuffix(path, "/plan"): jobName := strings.TrimSuffix(path, "/plan") return s.jobPlan(resp, req, jobName) + case strings.HasSuffix(path, "/summary"): + jobName := strings.TrimSuffix(path, "/summary") + return s.jobSummaryRequest(resp, req, jobName) default: return s.jobCRUD(resp, req, path) } @@ -241,3 +244,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request, setIndex(resp, out.Index) return out, nil } + +func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) { + args := structs.JobSummaryRequest{ + JobID: name, + } + if s.parse(resp, req, &args.Region, &args.QueryOptions) { + return nil, nil + } + + var out structs.SingleJobSummaryResponse + if err := s.agent.RPC("JobSummary.GetJobSummary", &args, &out); err != nil { + return nil, err + } + + setMeta(resp, &out.QueryMeta) + if out.JobSummary == nil { + return nil, CodedError(404, "job not found") + } + setIndex(resp, out.Index) + return out.JobSummary, nil +} diff --git a/nomad/job_summary_endpoint.go b/nomad/job_summary_endpoint.go new file mode 100644 index 000000000..562c709a7 --- /dev/null +++ b/nomad/job_summary_endpoint.go @@ -0,0 +1,62 @@ +package nomad + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/watch" +) + +type JobSummary struct { + srv *Server +} + +func (j *JobSummary) GetJobSummary(args *structs.JobSummaryRequest, + reply *structs.SingleJobSummaryResponse) error { + if done, err := j.srv.forward("JobSummary.GetJobSummary", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now()) + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), + run: func() error { + + // Look for the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.JobSummaryByID(args.JobID) + if err != nil { + return err + } + + // Setup the output + reply.JobSummary = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("job_summary") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) + + return nil +} + +func (j *JobSummary) List() error { + return nil +} diff --git a/nomad/server.go b/nomad/server.go index 9ba4045bd..f8e350eb9 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -150,15 +150,16 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Status *Status - Node *Node - Job *Job - Eval *Eval - Plan *Plan - Alloc *Alloc - Region *Region - Periodic *Periodic - System *System + Status *Status + Node *Node + Job *Job + Eval *Eval + Plan *Plan + Alloc *Alloc + Region *Region + Periodic *Periodic + System *System + JobSummary *JobSummary } // NewServer is used to construct a new Nomad server from the @@ -561,6 +562,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Region = &Region{s} s.endpoints.Periodic = &Periodic{s} s.endpoints.System = &System{s} + s.endpoints.JobSummary = &JobSummary{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -572,6 +574,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Region) s.rpcServer.Register(s.endpoints.Periodic) s.rpcServer.Register(s.endpoints.System) + s.rpcServer.Register(s.endpoints.JobSummary) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 2b640cca1..b2d707426 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -345,7 +345,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error { } // Update the job summary - if err := s.updateSummaryWithJob(job, txn); err != nil { + if err := s.updateSummaryWithJob(job, index, watcher, txn); err != nil { return fmt.Errorf("job summary update failed: %v", err) } @@ -371,6 +371,8 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { watcher := watch.NewItems() watcher.Add(watch.Item{Table: "jobs"}) watcher.Add(watch.Item{Job: jobID}) + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: jobID}) // Delete the node if err := txn.Delete("jobs", existing); err != nil { @@ -384,6 +386,9 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil { return fmt.Errorf("deleing job summary failed: %v", err) } + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } txn.Defer(func() { s.watch.notify(watcher) }) txn.Commit() @@ -838,7 +843,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I exist := existing.(*structs.Allocation) // Update the job summary - if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil { + if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil { return fmt.Errorf("unable to update job summary: %v", err) } @@ -895,7 +900,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er } exist, _ := existing.(*structs.Allocation) - if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil { + if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil { return fmt.Errorf("updating job summary failed: %v", err) } if exist == nil { @@ -1231,16 +1236,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b // updateSummaryWithJob creates or updates job summaries when new jobs are // upserted or existing ones are updated -func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) error { +func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watcher watch.Items, txn *memdb.Txn) error { existing, err := s.JobSummaryByID(job.ID) if err != nil { return fmt.Errorf("unable to retrieve summary for job: %v", err) } + var hasSummaryChanged bool if existing == nil { existing = &structs.JobSummary{ - JobID: job.ID, - Summary: make(map[string]structs.TaskGroupSummary), + JobID: job.ID, + Summary: make(map[string]structs.TaskGroupSummary), + CreateIndex: index, } + hasSummaryChanged = true } for _, tg := range job.TaskGroups { if _, ok := existing.Summary[tg.Name]; !ok { @@ -1251,6 +1259,20 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro Starting: 0, } existing.Summary[tg.Name] = newSummary + hasSummaryChanged = true + } + } + + // The job summary has changed, so add to watcher and update the modify + // index. + if hasSummaryChanged { + existing.ModifyIndex = index + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: job.ID}) + + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) } } @@ -1263,7 +1285,7 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro // updateSummaryWithAlloc updates the job summary when allocations are updated // or inserted func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, - existingAlloc *structs.Allocation, txn *memdb.Txn) error { + existingAlloc *structs.Allocation, index uint64, watcher watch.Items, txn *memdb.Txn) error { existing, err := s.JobSummaryByID(newAlloc.JobID) if err != nil { @@ -1284,6 +1306,7 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, if !ok { return nil } + var hasSummaryChanged bool if existingAlloc == nil { switch newAlloc.DesiredStatus { case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: @@ -1292,6 +1315,7 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, switch newAlloc.ClientStatus { case structs.AllocClientStatusPending: tgSummary.Starting += 1 + hasSummaryChanged = true case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete: s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus) } @@ -1323,6 +1347,21 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation, case structs.AllocClientStatusLost: tgSummary.Lost -= 1 } + + hasSummaryChanged = true + } + + // The job summary has changed, so add to watcher and update the modify + // index. + if hasSummaryChanged { + existing.ModifyIndex = index + watcher.Add(watch.Item{Table: "job_summary"}) + watcher.Add(watch.Item{JobSummary: existing.JobID}) + + // Update the indexes table for job summary + if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } } existing.Summary[newAlloc.TaskGroup] = tgSummary diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 4f5d4201a..90bbe7956 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -258,6 +258,12 @@ type JobPlanRequest struct { WriteRequest } +// JobSummaryRequest is used when we just need to get a specific job summary +type JobSummaryRequest struct { + JobID string + QueryOptions +} + // NodeListRequest is used to parameterize a list request type NodeListRequest struct { QueryOptions @@ -441,6 +447,12 @@ type SingleJobResponse struct { QueryMeta } +// SingleJobSummary is used to return a single job summary +type SingleJobSummaryResponse struct { + JobSummary *JobSummary + QueryMeta +} + // JobListResponse is used for a list request type JobListResponse struct { Jobs []*JobListStub @@ -964,6 +976,10 @@ const ( type JobSummary struct { JobID string Summary map[string]TaskGroupSummary + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 } // TaskGroup summarizes the state of all the allocations of a particular diff --git a/nomad/watch/watch.go b/nomad/watch/watch.go index 4e9bafbc9..b6a71749c 100644 --- a/nomad/watch/watch.go +++ b/nomad/watch/watch.go @@ -9,14 +9,15 @@ package watch // multiple fields does not place a watch on multiple items. Each Item // describes exactly one scoped watch. type Item struct { - Alloc string - AllocEval string - AllocJob string - AllocNode string - Eval string - Job string - Node string - Table string + Alloc string + AllocEval string + AllocJob string + AllocNode string + Eval string + Job string + JobSummary string + Node string + Table string } // Items is a helper used to construct a set of watchItems. It deduplicates