From b8c4539619ff71f6e13d0178972757bc8d110b78 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 23 Jul 2015 22:11:25 -0700 Subject: [PATCH] nomad: adding Eval endpoints for Ack and Nack --- nomad/eval_endpoint.go | 31 +++++++++++++++++ nomad/eval_endpoint_test.go | 67 +++++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 1f6113bae..120657edf 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -60,6 +60,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, if done, err := e.srv.forward("Eval.GetEval", args, args, reply); done { return err } + defer metrics.MeasureSince([]string{"nomad", "eval", "dequeue"}, time.Now()) // Ensure there is at least one scheduler if len(args.Schedulers) == 0 { @@ -86,3 +87,33 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, e.srv.setQueryMeta(&reply.QueryMeta) return nil } + +// Ack is used to acknowledge completion of a dequeued evaluation +func (e *Eval) Ack(args *structs.EvalSpecificRequest, + reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Eval.Ack", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now()) + + // Ack the EvalID + if err := e.srv.evalBroker.Ack(args.EvalID); err != nil { + return err + } + return nil +} + +// NAck is used to negative acknowledge completion of a dequeued evaluation +func (e *Eval) Nack(args *structs.EvalSpecificRequest, + reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Eval.Nack", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now()) + + // Nack the EvalID + if err := e.srv.evalBroker.Nack(args.EvalID); err != nil { + return err + } + return nil +} diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 8cc698274..f4ce5792c 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -3,6 +3,7 @@ package nomad import ( "reflect" "testing" + "time" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/structs" @@ -78,3 +79,69 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { t.Fatalf("should be outstanding") } } + +func TestEvalEndpoint_Ack(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mockEval() + s1.evalBroker.Enqueue(eval1) + out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + if out == nil { + t.Fatalf("missing eval") + } + + // Ack the eval + get := &structs.EvalSpecificRequest{ + EvalID: out.ID, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Ack", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure outstanding + if s1.evalBroker.Outstanding(eval1.ID) { + t.Fatalf("should not be outstanding") + } +} + +func TestEvalEndpoint_Nack(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + eval1 := mockEval() + s1.evalBroker.Enqueue(eval1) + out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + if out == nil { + t.Fatalf("missing eval") + } + + // Ack the eval + get := &structs.EvalSpecificRequest{ + EvalID: out.ID, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Nack", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure outstanding + if s1.evalBroker.Outstanding(eval1.ID) { + t.Fatalf("should not be outstanding") + } + + // Should get it back + out2, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + if out2 != out { + t.Fatalf("nack failed") + } +}