From d16e96e8d8a15abc73a8a7f4a39c9574a779be88 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 27 Jul 2015 15:31:49 -0700 Subject: [PATCH] nomad: adding plan endpoint --- nomad/plan_endpoint.go | 38 +++++++++++++++++++++++++++++++++++++ nomad/plan_endpoint_test.go | 33 ++++++++++++++++++++++++++++++++ nomad/server.go | 3 +++ nomad/structs/structs.go | 14 ++++++++++++++ 4 files changed, 88 insertions(+) create mode 100644 nomad/plan_endpoint.go create mode 100644 nomad/plan_endpoint_test.go diff --git a/nomad/plan_endpoint.go b/nomad/plan_endpoint.go new file mode 100644 index 000000000..cb9b798e8 --- /dev/null +++ b/nomad/plan_endpoint.go @@ -0,0 +1,38 @@ +package nomad + +import ( + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Plan endpoint is used for plan interactions +type Plan struct { + srv *Server +} + +// Submit is used to submit a plan to the leader +func (p *Plan) Submit(args *structs.PlanRequest, reply *structs.PlanResponse) error { + if done, err := p.srv.forward("Plan.Submit", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "plan", "submit"}, time.Now()) + + // Submit the plan to the queue + future, err := p.srv.planQueue.Enqueue(args.Plan) + if err != nil { + return err + } + + // Wait for the results + result, err := future.Wait() + if err != nil { + return err + } + + // Package the result + reply.Result = result + reply.Index = result.AllocIndex + return nil +} diff --git a/nomad/plan_endpoint_test.go b/nomad/plan_endpoint_test.go new file mode 100644 index 000000000..729987f6a --- /dev/null +++ b/nomad/plan_endpoint_test.go @@ -0,0 +1,33 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestPlanEndpoint_Submit(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Submit a plan + plan := mockPlan() + req := &structs.PlanRequest{ + Plan: plan, + WriteRequest: structs.WriteRequest{Region: "region1"}, + } + var resp structs.PlanResponse + if err := msgpackrpc.CallWithCodec(codec, "Plan.Submit", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("Bad index: %d", resp.Index) + } + if resp.Result == nil { + t.Fatalf("missing result") + } +} diff --git a/nomad/server.go b/nomad/server.go index 5cf65e2e0..7fb92a09b 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -111,6 +111,7 @@ type endpoints struct { Client *Client Job *Job Eval *Eval + Plan *Plan } // NewServer is used to construct a new Nomad server from the @@ -300,12 +301,14 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Client = &Client{s} s.endpoints.Job = &Job{s} s.endpoints.Eval = &Eval{s} + s.endpoints.Plan = &Plan{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) s.rpcServer.Register(s.endpoints.Client) s.rpcServer.Register(s.endpoints.Job) s.rpcServer.Register(s.endpoints.Eval) + s.rpcServer.Register(s.endpoints.Plan) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index ed97cf3aa..fc4b7b3fb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -186,6 +186,12 @@ type EvalDequeueRequest struct { WriteRequest } +// PlanRequest is used to submit an allocation plan to the leader +type PlanRequest struct { + Plan *Plan + WriteRequest +} + // GenericResponse is used to respond to a request where no // specific response information is needed. type GenericResponse struct { @@ -210,6 +216,12 @@ type SingleEvalResponse struct { QueryMeta } +// PlanResponse is used to return from a PlanRequest +type PlanResponse struct { + Result *PlanResult + WriteMeta +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready" @@ -538,7 +550,9 @@ type Plan struct { EvalCreateIndex uint64 } +// PlanResult is the result of a plan submitted to the leader. type PlanResult struct { + AllocIndex uint64 } // msgpackHandle is a shared handle for encoding/decoding of structs