diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 07dfc18fe..e473b5b10 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -27,32 +27,40 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest, } defer metrics.MeasureSince([]string{"nomad", "eval", "get_eval"}, time.Now()) - // Look for the job - snap, err := e.srv.fsm.State().Snapshot() - if err != nil { - return err - } - out, err := snap.EvalByID(args.EvalID) - if err != nil { - return err - } + // Setup the blocking query + opts := blockingOptions{ + queryOpts: &args.QueryOptions, + queryMeta: &reply.QueryMeta, + watch: watch.NewItems(watch.Item{Eval: args.EvalID}), + run: func() error { + // Look for the job + snap, err := e.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.EvalByID(args.EvalID) + if err != nil { + return err + } - // Setup the output - if out != nil { - reply.Eval = out - reply.Index = out.ModifyIndex - } else { - // Use the last index that affected the nodes table - index, err := snap.Index("evals") - if err != nil { - return err - } - reply.Index = index - } + // Setup the output + if out != nil { + reply.Eval = out + reply.Index = out.ModifyIndex + } else { + // Use the last index that affected the nodes table + index, err := snap.Index("evals") + if err != nil { + return err + } + reply.Index = index + } - // Set the query response - e.srv.setQueryMeta(&reply.QueryMeta) - return nil + // Set the query response + e.srv.setQueryMeta(&reply.QueryMeta) + return nil + }} + return e.srv.blockingRPC(&opts) } // Dequeue is used to dequeue a pending evaluation diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 3b9a62a8e..01ec27f46 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -51,6 +51,58 @@ func TestEvalEndpoint_GetEval(t *testing.T) { } } +func TestEvalEndpoint_GetEval_blocking(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + state := s1.fsm.State() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the evals + eval1 := mock.Eval() + eval2 := mock.Eval() + + // First create an unrelated eval + time.AfterFunc(100*time.Millisecond, func() { + err := state.UpsertEvals(1000, []*structs.Evaluation{eval1}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Upsert the eval we are watching later + time.AfterFunc(200*time.Millisecond, func() { + err := state.UpsertEvals(2000, []*structs.Evaluation{eval2}) + if err != nil { + t.Fatalf("err: %v", err) + } + }) + + // Lookup the eval + get := &structs.EvalSpecificRequest{ + EvalID: eval2.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + MinQueryIndex: 1, + }, + } + var resp structs.SingleEvalResponse + start := time.Now() + if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond { + t.Fatalf("should block (returned in %s) %#v", elapsed, resp) + } + if resp.Index != 2000 { + t.Fatalf("Bad index: %d %d", resp.Index, 2000) + } + if resp.Eval == nil || resp.Eval.ID != eval2.ID { + t.Fatalf("bad: %#v", resp.Eval) + } +} + func TestEvalEndpoint_Dequeue(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue