nomad: Adding the CRUD endpoints for jobs

This commit is contained in:
Armon Dadgar
2015-07-23 14:41:18 -07:00
parent dd85f12959
commit 138a014332
4 changed files with 295 additions and 0 deletions

87
nomad/job_endpoint.go Normal file
View File

@@ -0,0 +1,87 @@
package nomad
import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/nomad/nomad/structs"
)
// Job endpoint is used for job interactions
type Job struct {
srv *Server
}
// Register is used to upsert a job for scheduling
func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.GenericResponse) error {
if done, err := j.srv.forward("Job.Register", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "register"}, time.Now())
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Register failed: %v", err)
return err
}
// Set the reply index
reply.Index = index
return nil
}
// Deregister is used to remove a job the cluster.
func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.GenericResponse) error {
if done, err := j.srv.forward("Job.Deregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "deregister"}, time.Now())
// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: Deregister failed: %v", err)
return err
}
// Set the reply index
reply.Index = index
return nil
}
// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {
if done, err := j.srv.forward("Job.GetJob", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now())
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.GetJobByName(args.JobName)
if err != nil {
return err
}
// Setup the output
if out != nil {
reply.Job = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.GetIndex("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}

193
nomad/job_endpoint_test.go Normal file
View File

@@ -0,0 +1,193 @@
package nomad
import (
"reflect"
"testing"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
func TestJobEndpoint_Register(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mockJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.GetJobByName(job.Name)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.CreateIndex != resp.Index {
t.Fatalf("index mis-match")
}
}
func TestJobEndpoint_Register_Existing(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mockJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Update the job definition
job2 := mockJob()
job2.Priority = 100
job2.Name = job.Name
req.Job = job2
// Attempt update
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Index == 0 {
t.Fatalf("bad index: %d", resp.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.GetJobByName(job.Name)
if err != nil {
t.Fatalf("err: %v", err)
}
if out == nil {
t.Fatalf("expected job")
}
if out.ModifyIndex != resp.Index {
t.Fatalf("index mis-match")
}
if out.Priority != 100 {
t.Fatalf("expected update")
}
}
func TestJobEndpoint_Deregister(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mockJob()
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
// Deregister
dereg := &structs.JobDeregisterRequest{
JobName: job.Name,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp2 structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index == 0 {
t.Fatalf("bad index: %d", resp2.Index)
}
// Check for the node in the FSM
state := s1.fsm.State()
out, err := state.GetJobByName(job.Name)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("unexpected job")
}
}
func TestJobEndpoint_GetJob(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mockJob()
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
// Fetch the response
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil {
t.Fatalf("err: %v", err)
}
job.CreateIndex = resp.Index
job.ModifyIndex = resp.Index
// Lookup the job
get := &structs.JobSpecificRequest{
JobName: job.Name,
WriteRequest: structs.WriteRequest{Region: "region1"},
}
var resp2 structs.SingleJobResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != resp.Index {
t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index)
}
if !reflect.DeepEqual(job, resp2.Job) {
t.Fatalf("bad: %#v %#v", job, resp2.Job)
}
// Lookup non-existing job
get.JobName = "foobarbaz"
if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != resp.Index {
t.Fatalf("Bad index: %d %d", resp2.Index, resp.Index)
}
if resp2.Job != nil {
t.Fatalf("unexpected job")
}
}

View File

@@ -101,6 +101,7 @@ type Server struct {
type endpoints struct {
Status *Status
Client *Client
Job *Job
}
// NewServer is used to construct a new Nomad server from the
@@ -269,10 +270,12 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
// Create endpoints
s.endpoints.Status = &Status{s}
s.endpoints.Client = &Client{s}
s.endpoints.Job = &Job{s}
// Register the handlers
s.rpcServer.Register(s.endpoints.Status)
s.rpcServer.Register(s.endpoints.Client)
s.rpcServer.Register(s.endpoints.Job)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

View File

@@ -152,6 +152,12 @@ type JobDeregisterRequest struct {
WriteRequest
}
// JobSpecificRequest is used when we just need to specify a target job
type JobSpecificRequest struct {
JobName string
WriteRequest
}
// GenericResponse is used to respond to a request where no
// specific response information is needed.
type GenericResponse struct {
@@ -164,6 +170,12 @@ type SingleNodeResponse struct {
QueryMeta
}
// SingleJobResponse is used to return a single job
type SingleJobResponse struct {
Job *Job
QueryMeta
}
const (
NodeStatusInit = "initializing"
NodeStatusReady = "ready"