nomad: support blocking queries on single jobs

This commit is contained in:
Ryan Uber
2015-10-29 15:01:29 -07:00
parent 573e9dfb9a
commit cd5bdd7c08
2 changed files with 82 additions and 24 deletions

View File

@@ -181,32 +181,41 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest,
}
defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now())
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Setup the blocking query
opts := blockingOptions{
queryOpts: &args.QueryOptions,
queryMeta: &reply.QueryMeta,
watch: watch.NewItems(watch.Item{Job: args.JobID}),
run: func() error {
// Setup the output
if out != nil {
reply.Job = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Look for the job
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}
out, err := snap.JobByID(args.JobID)
if err != nil {
return err
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
// Setup the output
if out != nil {
reply.Job = out
reply.Index = out.ModifyIndex
} else {
// Use the last index that affected the nodes table
index, err := snap.Index("jobs")
if err != nil {
return err
}
reply.Index = index
}
// Set the query response
j.srv.setQueryMeta(&reply.QueryMeta)
return nil
}}
return j.srv.blockingRPC(&opts)
}
// List is used to list the jobs registered in the system

View File

@@ -364,6 +364,55 @@ func TestJobEndpoint_GetJob(t *testing.T) {
}
}
func TestJobEndpoint_GetJob_blocking(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()
state := s1.fsm.State()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create the jobs
job1 := mock.Job()
job2 := mock.Job()
// Upsert a job we are not interested in first.
time.AfterFunc(100*time.Millisecond, func() {
if err := state.UpsertJob(2, job1); err != nil {
t.Fatalf("err: %v", err)
}
})
// Upsert another job later which should trigger the watch.
time.AfterFunc(200*time.Millisecond, func() {
if err := state.UpsertJob(2, job2); err != nil {
t.Fatalf("err: %v", err)
}
})
req := &structs.JobSpecificRequest{
JobID: job2.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
MinQueryIndex: 1,
},
}
start := time.Now()
var resp structs.SingleJobResponse
if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp); err != nil {
t.Fatalf("err: %v", err)
}
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
}
if resp.Index != 2 {
t.Fatalf("Bad index: %d %d", resp.Index, 2)
}
if resp.Job == nil || resp.Job.ID != job2.ID {
t.Fatalf("bad: %#v", resp.Job)
}
}
func TestJobEndpoint_ListJobs(t *testing.T) {
s1 := testServer(t, nil)
defer s1.Shutdown()