diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index e8f2c3f0e..2d366d592 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -254,7 +254,7 @@ func (s *HTTPServer) jobSummaryRequest(resp http.ResponseWriter, req *http.Reque } var out structs.SingleJobSummaryResponse - if err := s.agent.RPC("JobSummary.GetJobSummary", &args, &out); err != nil { + if err := s.agent.RPC("Job.GetSummary", &args, &out); err != nil { return nil, err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 455d14487..a2944fcae 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -113,6 +113,52 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return nil } +// GetSummary retreives the summary of a job +func (j *Job) GetSummary(args *structs.JobSummaryRequest, + reply *structs.SingleJobSummaryResponse) error { + if done, err := j.srv.forward("Job.GetSummary", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now()) + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), + run: func() error { + + // Look for the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.JobSummaryByID(args.JobID) + if err != nil { + return err + } + + // Setup the output + reply.JobSummary = out + if out != nil { + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("job_summary") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) + + return nil +} + // Evaluate is used to force a job for re-evaluation func (j *Job) Evaluate(args *structs.JobEvaluateRequest, reply *structs.JobRegisterResponse) error { if done, err := j.srv.forward("Job.Evaluate", args, args, reply); done { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index da69a684e..b3dd5a0b0 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -713,6 +713,56 @@ func TestJobEndpoint_GetJob(t *testing.T) { } } +func TestJobEndpoint_GetJobSummary(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.JobRegisterResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + job.CreateIndex = resp.JobModifyIndex + job.ModifyIndex = resp.JobModifyIndex + job.JobModifyIndex = resp.JobModifyIndex + + // Lookup the job + get := &structs.JobSummaryRequest{ + JobID: job.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + var resp2 structs.SingleJobSummaryResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetSummary", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != resp.JobModifyIndex { + t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) + } + + expectedJobSummary := structs.JobSummary{ + JobID: job.ID, + Summary: map[string]structs.TaskGroupSummary{ + "web": structs.TaskGroupSummary{}, + }, + CreateIndex: job.CreateIndex, + ModifyIndex: job.CreateIndex, + } + + if !reflect.DeepEqual(resp2.JobSummary, &expectedJobSummary) { + t.Fatalf("exptected: %v, actual: %v", expectedJobSummary, resp2.JobSummary) + } + +} + func TestJobEndpoint_GetJob_Blocking(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown() diff --git a/nomad/job_summary_endpoint.go b/nomad/job_summary_endpoint.go deleted file mode 100644 index 562c709a7..000000000 --- a/nomad/job_summary_endpoint.go +++ /dev/null @@ -1,62 +0,0 @@ -package nomad - -import ( - "time" - - "github.com/armon/go-metrics" - "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/nomad/watch" -) - -type JobSummary struct { - srv *Server -} - -func (j *JobSummary) GetJobSummary(args *structs.JobSummaryRequest, - reply *structs.SingleJobSummaryResponse) error { - if done, err := j.srv.forward("JobSummary.GetJobSummary", args, args, reply); done { - return err - } - defer metrics.MeasureSince([]string{"nomad", "job_summary", "get_job_summary"}, time.Now()) - // Setup the blocking query - opts := blockingOptions{ - queryOpts: &args.QueryOptions, - queryMeta: &reply.QueryMeta, - watch: watch.NewItems(watch.Item{JobSummary: args.JobID}), - run: func() error { - - // Look for the job - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.JobSummaryByID(args.JobID) - if err != nil { - return err - } - - // Setup the output - reply.JobSummary = out - if out != nil { - reply.Index = out.ModifyIndex - } else { - // Use the last index that affected the nodes table - index, err := snap.Index("job_summary") - if err != nil { - return err - } - reply.Index = index - } - - // Set the query response - j.srv.setQueryMeta(&reply.QueryMeta) - return nil - }} - return j.srv.blockingRPC(&opts) - - return nil -} - -func (j *JobSummary) List() error { - return nil -} diff --git a/nomad/server.go b/nomad/server.go index f8e350eb9..9ba4045bd 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -150,16 +150,15 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Status *Status - Node *Node - Job *Job - Eval *Eval - Plan *Plan - Alloc *Alloc - Region *Region - Periodic *Periodic - System *System - JobSummary *JobSummary + Status *Status + Node *Node + Job *Job + Eval *Eval + Plan *Plan + Alloc *Alloc + Region *Region + Periodic *Periodic + System *System } // NewServer is used to construct a new Nomad server from the @@ -562,7 +561,6 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Region = &Region{s} s.endpoints.Periodic = &Periodic{s} s.endpoints.System = &System{s} - s.endpoints.JobSummary = &JobSummary{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -574,7 +572,6 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Region) s.rpcServer.Register(s.endpoints.Periodic) s.rpcServer.Register(s.endpoints.System) - s.rpcServer.Register(s.endpoints.JobSummary) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil {