Added the job summary related endpoints

This commit is contained in:
Diptanu Choudhury
2016-07-18 16:51:47 -07:00
parent bac783a81e
commit 38b1ba7e4e
6 changed files with 169 additions and 24 deletions

View File

@@ -54,6 +54,9 @@ func (s *HTTPServer) JobSpecificRequest(resp http.ResponseWriter, req *http.Requ
case strings.HasSuffix(path, "/plan"):
jobName := strings.TrimSuffix(path, "/plan")
return s.jobPlan(resp, req, jobName)
case strings.HasSuffix(path, "/summary"):
jobName := strings.TrimSuffix(path, "/summary")
return s.jobSummaryRequest(resp, req, jobName)
default:
return s.jobCRUD(resp, req, path)
}
@@ -241,3 +244,24 @@ func (s *HTTPServer) jobDelete(resp http.ResponseWriter, req *http.Request,
setIndex(resp, out.Index)
return out, nil
}
func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Request, name string) (interface{}, error) {
args := structs.JobSummaryRequest{
JobID: name,
}
if s.parse(resp, req, &args.Region, &args.QueryOptions) {
return nil, nil
}
var out structs.SingleJobSummaryResponse
if err := s.agent.RPC("JobSummary.GetJobSummary", &args, &out); err != nil {
return nil, err
}
setMeta(resp, &out.QueryMeta)
if out.JobSummary == nil {
return nil, CodedError(404, "job not found")
}
setIndex(resp, out.Index)
return out.JobSummary, nil
}

View File

@@ -0,0 +1,62 @@
package nomad
import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/watch"
)
type JobSummary struct {
srv *Server
}
func (j *JobSummary) GetJobSummary(args *structs.JobSummaryRequest,
reply *structs.SingleJobSummaryResponse) error {
if done, err := j.srv.forward("JobSummary.GetJobSummary", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now())
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{JobSummary: args.JobID}),
run: func() error {
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobSummaryByID(args.JobID)
if err != nil {
return err
}
// Setup the output
reply.JobSummary = out
if out != nil {
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("job_summary")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
return nil
}
func (j *JobSummary) List() error {
return nil
}

View File

@@ -150,15 +150,16 @@ type Server struct {
// Holds the RPC endpoints
type endpoints struct {
Status *Status
Node *Node
Job *Job
Eval *Eval
Plan *Plan
Alloc *Alloc
Region *Region
Periodic *Periodic
System *System
Status *Status
Node *Node
Job *Job
Eval *Eval
Plan *Plan
Alloc *Alloc
Region *Region
Periodic *Periodic
System *System
JobSummary *JobSummary
}
// NewServer is used to construct a new Nomad server from the
@@ -561,6 +562,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.endpoints.Region = &Region{s}
s.endpoints.Periodic = &Periodic{s}
s.endpoints.System = &System{s}
s.endpoints.JobSummary = &JobSummary{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
@@ -572,6 +574,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Region)
s.rpcServer.Register(s.endpoints.Periodic)
s.rpcServer.Register(s.endpoints.System)
s.rpcServer.Register(s.endpoints.JobSummary)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

View File

@@ -345,7 +345,7 @@ func (s *StateStore) UpsertJob(index uint64, job *structs.Job) error {
}
// Update the job summary
if err := s.updateSummaryWithJob(job, txn); err != nil {
if err := s.updateSummaryWithJob(job, index, watcher, txn); err != nil {
return fmt.Errorf("job summary update failed: %v", err)
}
@@ -371,6 +371,8 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: jobID})
// Delete the node
if err := txn.Delete("jobs", existing); err != nil {
@@ -384,6 +386,9 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
if _, err = txn.DeleteAll("job_summary", "id", jobID); err != nil {
return fmt.Errorf("deleing job summary failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
@@ -838,7 +843,7 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, watcher watch.I
exist := existing.(*structs.Allocation)
// Update the job summary
if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil {
if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil {
return fmt.Errorf("unable to update job summary: %v", err)
}
@@ -895,7 +900,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
}
exist, _ := existing.(*structs.Allocation)
if err := s.updateSummaryWithAlloc(alloc, exist, txn); err != nil {
if err := s.updateSummaryWithAlloc(alloc, exist, index, watcher, txn); err != nil {
return fmt.Errorf("updating job summary failed: %v", err)
}
if exist == nil {
@@ -1231,16 +1236,19 @@ func (s *StateStore) getJobStatus(txn *memdb.Txn, job *structs.Job, evalDelete b
// updateSummaryWithJob creates or updates job summaries when new jobs are
// upserted or existing ones are updated
func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) error {
func (s *StateStore) updateSummaryWithJob(job *structs.Job, index uint64, watcher watch.Items, txn *memdb.Txn) error {
existing, err := s.JobSummaryByID(job.ID)
if err != nil {
return fmt.Errorf("unable to retrieve summary for job: %v", err)
}
var hasSummaryChanged bool
if existing == nil {
existing = &structs.JobSummary{
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
JobID: job.ID,
Summary: make(map[string]structs.TaskGroupSummary),
CreateIndex: index,
}
hasSummaryChanged = true
}
for _, tg := range job.TaskGroups {
if _, ok := existing.Summary[tg.Name]; !ok {
@@ -1251,6 +1259,20 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro
Starting: 0,
}
existing.Summary[tg.Name] = newSummary
hasSummaryChanged = true
}
}
// The job summary has changed, so add to watcher and update the modify
// index.
if hasSummaryChanged {
existing.ModifyIndex = index
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: job.ID})
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
@@ -1263,7 +1285,7 @@ func (s *StateStore) updateSummaryWithJob(job *structs.Job, txn *memdb.Txn) erro
// updateSummaryWithAlloc updates the job summary when allocations are updated
// or inserted
func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
existingAlloc *structs.Allocation, txn *memdb.Txn) error {
existingAlloc *structs.Allocation, index uint64, watcher watch.Items, txn *memdb.Txn) error {
existing, err := s.JobSummaryByID(newAlloc.JobID)
if err != nil {
@@ -1284,6 +1306,7 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
if !ok {
return nil
}
var hasSummaryChanged bool
if existingAlloc == nil {
switch newAlloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
@@ -1292,6 +1315,7 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
switch newAlloc.ClientStatus {
case structs.AllocClientStatusPending:
tgSummary.Starting += 1
hasSummaryChanged = true
case structs.AllocClientStatusRunning, structs.AllocClientStatusFailed, structs.AllocClientStatusComplete:
s.logger.Printf("[WARN]: new allocation inserted into state store with id: %v and state: %v", newAlloc.ClientStatus)
}
@@ -1323,6 +1347,21 @@ func (s *StateStore) updateSummaryWithAlloc(newAlloc *structs.Allocation,
case structs.AllocClientStatusLost:
tgSummary.Lost -= 1
}
hasSummaryChanged = true
}
// The job summary has changed, so add to watcher and update the modify
// index.
if hasSummaryChanged {
existing.ModifyIndex = index
watcher.Add(watch.Item{Table: "job_summary"})
watcher.Add(watch.Item{JobSummary: existing.JobID})
// Update the indexes table for job summary
if err := txn.Insert("index", &IndexEntry{"job_summary", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
}
existing.Summary[newAlloc.TaskGroup] = tgSummary

View File

@@ -258,6 +258,12 @@ type JobPlanRequest struct {
WriteRequest
}
// JobSummaryRequest is used when we just need to get a specific job summary
type JobSummaryRequest struct {
JobID string
QueryOptions
}
// NodeListRequest is used to parameterize a list request
type NodeListRequest struct {
QueryOptions
@@ -441,6 +447,12 @@ type SingleJobResponse struct {
QueryMeta
}
// SingleJobSummary is used to return a single job summary
type SingleJobSummaryResponse struct {
JobSummary *JobSummary
QueryMeta
}
// JobListResponse is used for a list request
type JobListResponse struct {
Jobs []*JobListStub
@@ -964,6 +976,10 @@ const (
type JobSummary struct {
JobID string
Summary map[string]TaskGroupSummary
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
// TaskGroup summarizes the state of all the allocations of a particular

View File

@@ -9,14 +9,15 @@ package watch
// multiple fields does not place a watch on multiple items. Each Item
// describes exactly one scoped watch.
type Item struct {
Alloc string
AllocEval string
AllocJob string
AllocNode string
Eval string
Job string
Node string
Table string
Alloc string
AllocEval string
AllocJob string
AllocNode string
Eval string
Job string
JobSummary string
Node string
Table string
}
// Items is a helper used to construct a set of watchItems. It deduplicates