nomad: adding job query endpoints

This commit is contained in:
Armon Dadgar
2015-09-06 12:18:45 -07:00
parent 168e3d5336
commit b20afe9c60
3 changed files with 241 additions and 0 deletions

View File

@@ -220,3 +220,104 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest,
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}
// List is used to list the jobs registered in the system
func (j *Job) List(args *structs.JobListRequest,
reply *structs.JobListResponse) error {
if done, err := j.srv.forward("Job.List", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "list"}, time.Now())
// Capture all the jobs
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
iter, err := snap.Jobs()
if err != nil {
return err
}
for {
raw := iter.Next()
if raw == nil {
break
}
job := raw.(*structs.Job)
stub := &structs.JobListStub{
ID: job.ID,
Name: job.Name,
Type: job.Type,
Priority: job.Priority,
Status: job.Status,
StatusDescription: job.StatusDescription,
CreateIndex: job.CreateIndex,
ModifyIndex: job.ModifyIndex,
}
reply.Jobs = append(reply.Jobs, stub)
}
// Use the last index that affected the jobs table
index, err := snap.GetIndex("jobs")
if err != nil {
return err
}
reply.Index = index
return nil
}
// Allocations is used to list the allocations for a job
func (j *Job) Allocations(args *structs.JobSpecificRequest,
reply *structs.JobAllocationsResponse) error {
if done, err := j.srv.forward("Job.Allocations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "allocations"}, time.Now())
// Capture the allocations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Allocations, err = snap.AllocsByJob(args.JobID)
if err != nil {
return err
}
// Use the last index that affected the allocs table
index, err := snap.GetIndex("allocs")
if err != nil {
return err
}
reply.Index = index
return nil
}
// Evaluations is used to list the evaluations for a job
func (j *Job) Evaluations(args *structs.JobSpecificRequest,
reply *structs.JobEvaluationsResponse) error {
if done, err := j.srv.forward("Job.Evaluations", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "evaluations"}, time.Now())
// Capture the evaluations
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
reply.Evaluations, err = snap.EvalsByJob(args.JobID)
if err != nil {
return err
}
// Use the last index that affected the evals table
index, err := snap.GetIndex("evals")
if err != nil {
return err
}
reply.Index = index
return nil
}

View File

@@ -354,3 +354,107 @@ func TestJobEndpoint_GetJob(t *testing.T) {
t.Fatalf("unexpected job")
}
}
func TestJobEndpoint_ListJobs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
state := s1.fsm.State()
err := state.RegisterJob(1000, job)
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the jobs
get := &structs.JobListRequest{
QueryOptions: structs.QueryOptions{Region: "region1"},
}
var resp2 structs.JobListResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
}
if len(resp2.Jobs) != 1 {
t.Fatalf("bad: %#v", resp2.Jobs)
}
if resp2.Jobs[0].ID != job.ID {
t.Fatalf("bad: %#v", resp2.Jobs[0])
}
}
func TestJobEndpoint_Allocations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
alloc1 := mock.Alloc()
alloc2 := mock.Alloc()
alloc2.JobID = alloc1.JobID
state := s1.fsm.State()
err := state.UpdateAllocations(1000,
[]*structs.Allocation{alloc1, alloc2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: alloc1.JobID,
QueryOptions: structs.QueryOptions{Region: "region1"},
}
var resp2 structs.JobAllocationsResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
}
if len(resp2.Allocations) != 2 {
t.Fatalf("bad: %#v", resp2.Allocations)
}
}
func TestJobEndpoint_Evaluations(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
eval1 := mock.Eval()
eval2 := mock.Eval()
eval2.JobID = eval1.JobID
state := s1.fsm.State()
err := state.UpsertEvals(1000,
[]*structs.Evaluation{eval1, eval2})
if err != nil {
t.Fatalf("err: %v", err)
}
// Lookup the jobs
get := &structs.JobSpecificRequest{
JobID: eval1.JobID,
QueryOptions: structs.QueryOptions{Region: "region1"},
}
var resp2 structs.JobEvaluationsResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp2); err != nil {
t.Fatalf("err: %v", err)
}
if resp2.Index != 1000 {
t.Fatalf("Bad index: %d %d", resp2.Index, 1000)
}
if len(resp2.Evaluations) != 2 {
t.Fatalf("bad: %#v", resp2.Evaluations)
}
}

View File

@@ -174,6 +174,11 @@ type JobSpecificRequest struct {
QueryOptions
}
// JobListRequest is used to parameterize a list request
type JobListRequest struct {
QueryOptions
}
// EvalUpdateRequest is used for upserting evaluations.
type EvalUpdateRequest struct {
Evals []*Evaluation
@@ -291,6 +296,37 @@ type SingleJobResponse struct {
QueryMeta
}
// JobListStub is used to return a subset of job information
// for the job list
type JobListStub struct {
ID string
Name string
Type string
Priority int
Status string
StatusDescription string
CreateIndex uint64
ModifyIndex uint64
}
// JobListResponse is used for a list request
type JobListResponse struct {
Jobs []*JobListStub
QueryMeta
}
// JobAllocationsResponse is used to return the allocations for a job
type JobAllocationsResponse struct {
Allocations []*Allocation
QueryMeta
}
// JobEvaluationsResponse is used to return the evaluations for a job
type JobEvaluationsResponse struct {
Evaluations []*Evaluation
QueryMeta
}
// SingleEvalResponse is used to return a single evaluation
type SingleEvalResponse struct {
Eval *Evaluation