mirror of
https://github.com/kemko/nomad.git
synced 2026-01-10 12:25:42 +03:00
nomad: support blocking queries for single evals
This commit is contained in:
@@ -27,32 +27,40 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest,
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "get_eval"}, time.Now())
|
||||
|
||||
// Look for the job
|
||||
snap, err := e.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out, err := snap.EvalByID(args.EvalID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// Setup the blocking query
|
||||
opts := blockingOptions{
|
||||
queryOpts: &args.QueryOptions,
|
||||
queryMeta: &reply.QueryMeta,
|
||||
watch: watch.NewItems(watch.Item{Eval: args.EvalID}),
|
||||
run: func() error {
|
||||
// Look for the job
|
||||
snap, err := e.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out, err := snap.EvalByID(args.EvalID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Setup the output
|
||||
if out != nil {
|
||||
reply.Eval = out
|
||||
reply.Index = out.ModifyIndex
|
||||
} else {
|
||||
// Use the last index that affected the nodes table
|
||||
index, err := snap.Index("evals")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = index
|
||||
}
|
||||
// Setup the output
|
||||
if out != nil {
|
||||
reply.Eval = out
|
||||
reply.Index = out.ModifyIndex
|
||||
} else {
|
||||
// Use the last index that affected the nodes table
|
||||
index, err := snap.Index("evals")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
reply.Index = index
|
||||
}
|
||||
|
||||
// Set the query response
|
||||
e.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
// Set the query response
|
||||
e.srv.setQueryMeta(&reply.QueryMeta)
|
||||
return nil
|
||||
}}
|
||||
return e.srv.blockingRPC(&opts)
|
||||
}
|
||||
|
||||
// Dequeue is used to dequeue a pending evaluation
|
||||
|
||||
@@ -51,6 +51,58 @@ func TestEvalEndpoint_GetEval(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_GetEval_blocking(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
state := s1.fsm.State()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the evals
|
||||
eval1 := mock.Eval()
|
||||
eval2 := mock.Eval()
|
||||
|
||||
// First create an unrelated eval
|
||||
time.AfterFunc(100*time.Millisecond, func() {
|
||||
err := state.UpsertEvals(1000, []*structs.Evaluation{eval1})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Upsert the eval we are watching later
|
||||
time.AfterFunc(200*time.Millisecond, func() {
|
||||
err := state.UpsertEvals(2000, []*structs.Evaluation{eval2})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
})
|
||||
|
||||
// Lookup the eval
|
||||
get := &structs.EvalSpecificRequest{
|
||||
EvalID: eval2.ID,
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
MinQueryIndex: 1,
|
||||
},
|
||||
}
|
||||
var resp structs.SingleEvalResponse
|
||||
start := time.Now()
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.GetEval", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if elapsed := time.Now().Sub(start); elapsed < 200*time.Millisecond {
|
||||
t.Fatalf("should block (returned in %s) %#v", elapsed, resp)
|
||||
}
|
||||
if resp.Index != 2000 {
|
||||
t.Fatalf("Bad index: %d %d", resp.Index, 2000)
|
||||
}
|
||||
if resp.Eval == nil || resp.Eval.ID != eval2.ID {
|
||||
t.Fatalf("bad: %#v", resp.Eval)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Dequeue(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0 // Prevent automatic dequeue
|
||||
|
||||
Reference in New Issue
Block a user