diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index ff296a986..ad15d626d 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -181,32 +181,41 @@ func (j *Job) GetJob(args *structs.JobSpecificRequest, } defer metrics.MeasureSince([]string{"nomad", "job", "get_job"}, time.Now()) - // Look for the job - snap, err := j.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.JobByID(args.JobID) - if err != nil { - return err - } + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{Job: args.JobID}), + run: func() error { - // Setup the output - if out != nil { - reply.Job = out - reply.Index = out.ModifyIndex - } else { - // Use the last index that affected the nodes table - index, err := snap.Index("jobs") - if err != nil { - return err - } - reply.Index = index - } + // Look for the job + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.JobByID(args.JobID) + if err != nil { + return err + } - // Set the query response - j.srv.setQueryMeta(&reply.QueryMeta) - return nil + // Setup the output + if out != nil { + reply.Job = out + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("jobs") + if err != nil { + return err + } + reply.Index = index + } + + // Set the query response + j.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return j.srv.blockingRPC(&opts) } // List is used to list the jobs registered in the system diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 0591e73bf..5b4ba079b 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -364,6 +364,55 @@ func TestJobEndpoint_GetJob(t *testing.T) { } } +func TestJobEndpoint_GetJob_blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the jobs + job1 := mock.Job() + job2 := mock.Job() + + // Upsert a job we are not interested in first. + time.AfterFunc(100*time.Millisecond, func() { + if err := state.UpsertJob(2, job1); err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Upsert another job later which should trigger the watch. + time.AfterFunc(200*time.Millisecond, func() { + if err := state.UpsertJob(2, job2); err != nil { + t.Fatalf("err: %v", err) + } + }) + + req := &structs.JobSpecificRequest{ + JobID: job2.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 1, + }, + } + start := time.Now() + var resp structs.SingleJobResponse + if err := msgpackrpc.CallWithCodec(codec, "Job.GetJob", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 2 { + t.Fatalf("Bad index: %d %d", resp.Index, 2) + } + if resp.Job == nil || resp.Job.ID != job2.ID { + t.Fatalf("bad: %#v", resp.Job) + } +} + func TestJobEndpoint_ListJobs(t *testing.T) { s1 := testServer(t, nil) defer s1.Shutdown()