diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go new file mode 100644 index 000000000..1552561d9 --- /dev/null +++ b/nomad/job_endpoint.go @@ -0,0 +1,87 @@ +package nomad + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Job endpoint is used for job interactions +type Job struct { + srv *Server +} + +// Register is used to upsert a job for scheduling +func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.GenericResponse) error { + if done, err := j.srv.forward("Job.Register", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "register"}, time.Now()) + + // Commit this update via Raft + _, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err) + return err + } + + // Set the reply index + reply.Index = index + return nil +} + +// Deregister is used to remove a job the cluster. +func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.GenericResponse) error { + if done, err := j.srv.forward("Job.Deregister", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now()) + + // Commit this update via Raft + _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: Deregister failed: %v", err) + return err + } + + // Set the reply index + reply.Index = index + return nil +} + +// GetJob is used to request information about a specific job +func (j *Job) GetJob(args *structs.JobSpecificRequest, + reply *structs.SingleJobResponse) error { + if done, err := j.srv.forward("Job.GetJob", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now()) + + // Look for the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.GetJobByName(args.JobName) + if err != nil { + return err + } + + // Setup the output + if out != nil { + reply.Job = out + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.GetIndex("jobs") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil +} diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go new file mode 100644 index 000000000..58f1f081a --- /dev/null +++ b/nomad/job_endpoint_test.go @@ -0,0 +1,193 @@ +package nomad + +import ( + "reflect" + "testing" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestJobEndpoint_Register(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mockJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.GetJobByName(job.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.CreateIndex != resp.Index { + t.Fatalf("index mis-match") + } +} + +func TestJobEndpoint_Register_Existing(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mockJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Update the job definition + job2 := mockJob() + job2.Priority = 100 + job2.Name = job.Name + req.Job = job2 + + // Attempt update + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.GetJobByName(job.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("expected job") + } + if out.ModifyIndex != resp.Index { + t.Fatalf("index mis-match") + } + if out.Priority != 100 { + t.Fatalf("expected update") + } +} + +func TestJobEndpoint_Deregister(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mockJob() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Deregister + dereg := &structs.JobDeregisterRequest{ + JobName: job.Name, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp2 structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index == 0 { + t.Fatalf("bad index: %d", resp2.Index) + } + + // Check for the node in the FSM + state := s1.fsm.State() + out, err := state.GetJobByName(job.Name) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("unexpected job") + } +} + +func TestJobEndpoint_GetJob(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mockJob() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + + // Fetch the response + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + job.CreateIndex = resp.Index + job.ModifyIndex = resp.Index + + // Lookup the job + get := &structs.JobSpecificRequest{ + JobName: job.Name, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp2 structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != resp.Index { + t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) + } + + if !reflect.DeepEqual(job, resp2.Job) { + t.Fatalf("bad: %#v %#v", job, resp2.Job) + } + + // Lookup non-existing job + get.JobName = "foobarbaz" + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &resp2); err != nil { + t.Fatalf("err: %v", err) + } + if resp2.Index != resp.Index { + t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index) + } + if resp2.Job != nil { + t.Fatalf("unexpected job") + } +} diff --git a/nomad/server.go b/nomad/server.go index 561de7fef..2fe3392bf 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -101,6 +101,7 @@ type Server struct { type endpoints struct { Status *Status Client *Client + Job *Job } // NewServer is used to construct a new Nomad server from the @@ -269,10 +270,12 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { // Create endpoints s.endpoints.Status = &Status{s} s.endpoints.Client = &Client{s} + s.endpoints.Job = &Job{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Client) + s.rpcServer.Register(s.endpoints.Job) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1201c6839..202b35e9d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -152,6 +152,12 @@ type JobDeregisterRequest struct { WriteRequest } +// JobSpecificRequest is used when we just need to specify a target job +type JobSpecificRequest struct { + JobName string + WriteRequest +} + // GenericResponse is used to respond to a request where no // specific response information is needed. type GenericResponse struct { @@ -164,6 +170,12 @@ type SingleNodeResponse struct { QueryMeta } +// SingleJobResponse is used to return a single job +type SingleJobResponse struct { + Job *Job + QueryMeta +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready"