From 94f4b23acb2d1f3391c34fb8997fb590f2c82bed Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 13 Jan 2016 10:19:53 -0800 Subject: [PATCH] Add force spawn endpoint --- api/jobs.go | 6 ++ api/periodic.go | 26 ++++++++ api/periodic_test.go | 61 ++++++++++++++++++ api/util_test.go | 9 +++ command/agent/http.go | 2 + command/agent/periodic_endpoint.go | 40 ++++++++++++ command/agent/periodic_endpoint_test.go | 49 +++++++++++++++ nomad/leader.go | 2 +- nomad/periodic.go | 39 +++++++----- nomad/periodic_endpoint.go | 55 ++++++++++++++++ nomad/periodic_endpoint_test.go | 83 +++++++++++++++++++++++++ nomad/periodic_test.go | 8 +-- nomad/server.go | 17 ++--- nomad/structs/structs.go | 15 ++++- 14 files changed, 382 insertions(+), 30 deletions(-) create mode 100644 api/periodic.go create mode 100644 api/periodic_test.go create mode 100644 command/agent/periodic_endpoint.go create mode 100644 command/agent/periodic_endpoint_test.go create mode 100644 nomad/periodic_endpoint.go create mode 100644 nomad/periodic_endpoint_test.go diff --git a/api/jobs.go b/api/jobs.go index c0f849044..e45e67990 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -221,6 +221,12 @@ func (j *Job) AddTaskGroup(grp *TaskGroup) *Job { return j } +// AddPeriodicConfig adds a periodic config to an existing job. +func (j *Job) AddPeriodicConfig(cfg *PeriodicConfig) *Job { + j.Periodic = cfg + return j +} + // registerJobRequest is used to serialize a job registration type registerJobRequest struct { Job *Job diff --git a/api/periodic.go b/api/periodic.go new file mode 100644 index 000000000..c4ec1e781 --- /dev/null +++ b/api/periodic.go @@ -0,0 +1,26 @@ +package api + +// Periodic is used to access periodic job endpoints +type Periodic struct { + client *Client +} + +// Regions returns a handle on the allocs endpoints. +func (c *Client) PeriodicJobs() *Periodic { + return &Periodic{client: c} +} + +// List returns a list of all of the regions. +func (p *Periodic) Force(jobID string, q *WriteOptions) (string, *WriteMeta, error) { + var resp periodicForceResponse + wm, err := p.client.write("/v1/periodic/"+jobID+"/force", nil, &resp, q) + if err != nil { + return "", nil, err + } + return resp.EvalID, wm, nil +} + +// periodicForceResponse is used to deserialize a force response +type periodicForceResponse struct { + EvalID string +} diff --git a/api/periodic_test.go b/api/periodic_test.go new file mode 100644 index 000000000..f65d67719 --- /dev/null +++ b/api/periodic_test.go @@ -0,0 +1,61 @@ +package api + +import ( + "strings" + "testing" + + "github.com/hashicorp/nomad/testutil" +) + +func TestPeriodicForce(t *testing.T) { + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + periodic := c.PeriodicJobs() + + // Force-eval on a non-existent job fails + _, _, err := periodic.Force("job1", nil) + if err == nil || !strings.Contains(err.Error(), "not found") { + t.Fatalf("expected not found error, got: %#v", err) + } + + // Create a new job + job := testPeriodicJob() + _, _, err = jobs.Register(job, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + + testutil.WaitForResult(func() (bool, error) { + out, _, err := jobs.Info(job.ID, nil) + if err != nil || out == nil || out.ID != job.ID { + return false, err + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) + + // Try force again + evalID, wm, err := periodic.Force(job.ID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertWriteMeta(t, wm) + + if evalID == "" { + t.Fatalf("empty evalID") + } + + // Retrieve the eval + evals := c.Evaluations() + eval, qm, err := evals.Info(evalID, nil) + if err != nil { + t.Fatalf("err: %s", err) + } + assertQueryMeta(t, qm) + if eval.ID == evalID { + return + } + t.Fatalf("evaluation %q missing", evalID) +} diff --git a/api/util_test.go b/api/util_test.go index dfd0e95d0..408d93a54 100644 --- a/api/util_test.go +++ b/api/util_test.go @@ -32,3 +32,12 @@ func testJob() *Job { return job } + +func testPeriodicJob() *Job { + job := testJob().AddPeriodicConfig(&PeriodicConfig{ + Enabled: true, + Spec: "*/30 * * * *", + SpecType: "cron", + }) + return job +} diff --git a/command/agent/http.go b/command/agent/http.go index 4c46ee8b4..e1ab90a22 100644 --- a/command/agent/http.go +++ b/command/agent/http.go @@ -114,6 +114,8 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) { s.mux.HandleFunc("/v1/status/leader", s.wrap(s.StatusLeaderRequest)) s.mux.HandleFunc("/v1/status/peers", s.wrap(s.StatusPeersRequest)) + s.mux.HandleFunc("/v1/periodic/", s.wrap(s.PeriodicSpecificReqeust)) + if enableDebug { s.mux.HandleFunc("/debug/pprof/", pprof.Index) s.mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) diff --git a/command/agent/periodic_endpoint.go b/command/agent/periodic_endpoint.go new file mode 100644 index 000000000..eec622127 --- /dev/null +++ b/command/agent/periodic_endpoint.go @@ -0,0 +1,40 @@ +package agent + +import ( + "net/http" + "strings" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func (s *HTTPServer) PeriodicSpecificReqeust(resp http.ResponseWriter, req *http.Request) (interface{}, error) { + path := strings.TrimPrefix(req.URL.Path, "/v1/periodic/") + switch { + case strings.HasSuffix(path, "/force"): + jobName := strings.TrimSuffix(path, "/force") + return s.periodicForceRequest(resp, req, jobName) + default: + return nil, CodedError(405, ErrInvalidMethod) + + } + +} + +func (s *HTTPServer) periodicForceRequest(resp http.ResponseWriter, req *http.Request, + jobName string) (interface{}, error) { + if req.Method != "PUT" && req.Method != "POST" { + return nil, CodedError(405, ErrInvalidMethod) + } + + args := structs.PeriodicForceRequest{ + JobID: jobName, + } + s.parseRegion(req, &args.Region) + + var out structs.PeriodicForceResponse + if err := s.agent.RPC("Periodic.Force", &args, &out); err != nil { + return nil, err + } + setIndex(resp, out.Index) + return out, nil +} diff --git a/command/agent/periodic_endpoint_test.go b/command/agent/periodic_endpoint_test.go new file mode 100644 index 000000000..500034cdf --- /dev/null +++ b/command/agent/periodic_endpoint_test.go @@ -0,0 +1,49 @@ +package agent + +import ( + "net/http" + "net/http/httptest" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestHTTP_PeriodicForce(t *testing.T) { + httpTest(t, nil, func(s *TestServer) { + // Create and register a periodic job. + job := mock.PeriodicJob() + args := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.JobRegisterResponse + if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Make the HTTP request + req, err := http.NewRequest("POST", "/v1/periodic/"+job.ID+"/force", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + respW := httptest.NewRecorder() + + // Make the request + obj, err := s.Server.PeriodicSpecificReqeust(respW, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check for the index + if respW.HeaderMap.Get("X-Nomad-Index") == "" { + t.Fatalf("missing index") + } + + // Check the response + r := obj.(structs.PeriodicForceResponse) + if r.EvalID == "" { + t.Fatalf("bad: %#v", r) + } + }) +} diff --git a/nomad/leader.go b/nomad/leader.go index 5969a9239..b11ff81f0 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -211,7 +211,7 @@ func (s *Server) restorePeriodicDispatcher() error { continue } - if err := s.periodicDispatcher.ForceRun(job.ID); err != nil { + if _, err := s.periodicDispatcher.ForceRun(job.ID); err != nil { msg := fmt.Sprintf("force run of periodic job %q failed: %v", job.ID, err) s.logger.Printf("[ERR] nomad.periodic: %s", msg) return errors.New(msg) diff --git a/nomad/periodic.go b/nomad/periodic.go index 0609f1f79..3e20357cc 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -34,21 +34,21 @@ type PeriodicDispatch struct { // for them. type JobEvalDispatcher interface { // DispatchJob takes a job a new, untracked job and creates an evaluation - // for it. - DispatchJob(job *structs.Job) error + // for it and returns the eval. + DispatchJob(job *structs.Job) (*structs.Evaluation, error) // RunningChildren returns whether the passed job has any running children. RunningChildren(job *structs.Job) (bool, error) } // DispatchJob creates an evaluation for the passed job and commits both the -// evaluation and the job to the raft log. -func (s *Server) DispatchJob(job *structs.Job) error { +// evaluation and the job to the raft log. It returns the eval. +func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { // Commit this update via Raft req := structs.JobRegisterRequest{Job: job} _, index, err := s.raftApply(structs.JobRegisterRequestType, req) if err != nil { - return err + return nil, err } // Create a new evaluation @@ -68,12 +68,15 @@ func (s *Server) DispatchJob(job *structs.Job) error { // Commit this evaluation via Raft // XXX: There is a risk of partial failure where the JobRegister succeeds // but that the EvalUpdate does not. - _, _, err = s.raftApply(structs.EvalUpdateRequestType, update) + _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) if err != nil { - return err + return nil, err } - return nil + // Update its indexes. + eval.CreateIndex = evalIndex + eval.ModifyIndex = evalIndex + return eval, nil } // RunningChildren checks whether the passed job has any running children. @@ -262,18 +265,19 @@ func (p *PeriodicDispatch) removeLocked(jobID string) error { return nil } -// ForceRun causes the periodic job to be evaluated immediately. -func (p *PeriodicDispatch) ForceRun(jobID string) error { +// ForceRun causes the periodic job to be evaluated immediately and returns the +// subsequent eval. +func (p *PeriodicDispatch) ForceRun(jobID string) (*structs.Evaluation, error) { p.l.Lock() // Do nothing if not enabled if !p.enabled { - return fmt.Errorf("periodic dispatch disabled") + return nil, fmt.Errorf("periodic dispatch disabled") } job, tracked := p.tracked[jobID] if !tracked { - return fmt.Errorf("can't force run non-tracked job %v", jobID) + return nil, fmt.Errorf("can't force run non-tracked job %v", jobID) } p.l.Unlock() @@ -370,18 +374,19 @@ func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) { // createEval instantiates a job based on the passed periodic job and submits an // evaluation for it. This should not be called with the lock held. -func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error { +func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) (*structs.Evaluation, error) { derived, err := p.deriveJob(periodicJob, time) if err != nil { - return err + return nil, err } - if err := p.dispatcher.DispatchJob(derived); err != nil { + eval, err := p.dispatcher.DispatchJob(derived) + if err != nil { p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err) - return err + return nil, err } - return nil + return eval, nil } // deriveJob instantiates a new job based on the passed periodic job and the diff --git a/nomad/periodic_endpoint.go b/nomad/periodic_endpoint.go new file mode 100644 index 000000000..f7aea0612 --- /dev/null +++ b/nomad/periodic_endpoint.go @@ -0,0 +1,55 @@ +package nomad + +import ( + "fmt" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +// Job endpoint is used for job interactions +type Periodic struct { + srv *Server +} + +// Evaluate is used to force a job for re-evaluation +func (p *Periodic) Force(args *structs.PeriodicForceRequest, reply *structs.PeriodicForceResponse) error { + if done, err := p.srv.forward("Periodic.Force", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "periodic", "force"}, time.Now()) + + // Validate the arguments + if args.JobID == "" { + return fmt.Errorf("missing job ID for evaluation") + } + + // Lookup the job + snap, err := p.srv.fsm.State().Snapshot() + if err != nil { + return err + } + job, err := snap.JobByID(args.JobID) + if err != nil { + return err + } + if job == nil { + return fmt.Errorf("job not found") + } + + if !job.IsPeriodic() { + return fmt.Errorf("can't force launch non-periodic job") + } + + // Force run the job. + eval, err := p.srv.periodicDispatcher.ForceRun(job.ID) + if err != nil { + return fmt.Errorf("force launch for job %q failed: %v", job.ID, err) + } + + reply.EvalID = eval.ID + reply.EvalCreateIndex = eval.CreateIndex + reply.Index = eval.CreateIndex + return nil +} diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go new file mode 100644 index 000000000..295070162 --- /dev/null +++ b/nomad/periodic_endpoint_test.go @@ -0,0 +1,83 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" +) + +func TestPeriodicEndpoint_Force(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + state := s1.fsm.State() + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create and insert a periodic job. + job := mock.PeriodicJob() + job.Periodic.ProhibitOverlap = true // Shouldn't affect anything. + if err := state.UpsertJob(100, job); err != nil { + t.Fatalf("err: %v", err) + } + s1.periodicDispatcher.Add(job) + + // Force launch it. + req := &structs.PeriodicForceRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.PeriodicForceResponse + if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Index == 0 { + t.Fatalf("bad index: %d", resp.Index) + } + + // Lookup the evaluation + eval, err := state.EvalByID(resp.EvalID) + if err != nil { + t.Fatalf("err: %v", err) + } + if eval == nil { + t.Fatalf("expected eval") + } + if eval.CreateIndex != resp.EvalCreateIndex { + t.Fatalf("index mis-match") + } +} + +func TestPeriodicEndpoint_Force_NonPeriodic(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + state := s1.fsm.State() + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create and insert a non-periodic job. + job := mock.Job() + if err := state.UpsertJob(100, job); err != nil { + t.Fatalf("err: %v", err) + } + + // Force launch it. + req := &structs.PeriodicForceRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.PeriodicForceResponse + if err := msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp); err == nil { + t.Fatalf("Force on non-perodic job should err") + } +} diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 04acd54c8..10219d0ad 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -27,11 +27,11 @@ func NewMockJobEvalDispatcher() *MockJobEvalDispatcher { return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)} } -func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error { +func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { m.lock.Lock() defer m.lock.Unlock() m.Jobs[job.ID] = job - return nil + return nil, nil } func (m *MockJobEvalDispatcher) RunningChildren(parent *structs.Job) (bool, error) { @@ -266,7 +266,7 @@ func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { t.Parallel() p, _ := testPeriodicDispatcher() - if err := p.ForceRun("foo"); err == nil { + if _, err := p.ForceRun("foo"); err == nil { t.Fatal("ForceRun of untracked job should fail") } } @@ -284,7 +284,7 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { } // ForceRun the job - if err := p.ForceRun(job.ID); err != nil { + if _, err := p.ForceRun(job.ID); err != nil { t.Fatalf("ForceRun failed %v", err) } diff --git a/nomad/server.go b/nomad/server.go index 3a17c8224..b6c13826d 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -132,13 +132,14 @@ type Server struct { // Holds the RPC endpoints type endpoints struct { - Status *Status - Node *Node - Job *Job - Eval *Eval - Plan *Plan - Alloc *Alloc - Region *Region + Status *Status + Node *Node + Job *Job + Eval *Eval + Plan *Plan + Alloc *Alloc + Region *Region + Periodic *Periodic } // NewServer is used to construct a new Nomad server from the @@ -362,6 +363,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.endpoints.Plan = &Plan{s} s.endpoints.Alloc = &Alloc{s} s.endpoints.Region = &Region{s} + s.endpoints.Periodic = &Periodic{s} // Register the handlers s.rpcServer.Register(s.endpoints.Status) @@ -371,6 +373,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error { s.rpcServer.Register(s.endpoints.Plan) s.rpcServer.Register(s.endpoints.Alloc) s.rpcServer.Register(s.endpoints.Region) + s.rpcServer.Register(s.endpoints.Periodic) list, err := net.ListenTCP("tcp", s.config.RPCAddr) if err != nil { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 68b7e6e8d..f9e802ee5 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -123,7 +123,7 @@ type QueryMeta struct { KnownLeader bool } -// WriteMeta allows a write response to includ e potentially +// WriteMeta allows a write response to include potentially // useful metadata about the write type WriteMeta struct { // This is the index associated with the write @@ -272,6 +272,12 @@ type AllocSpecificRequest struct { QueryOptions } +// PeriodicForceReqeuest is used to force a specific periodic job. +type PeriodicForceRequest struct { + JobID string + WriteRequest +} + // GenericRequest is used to request where no // specific information is needed. type GenericRequest struct { @@ -415,6 +421,13 @@ type EvalAllocationsResponse struct { QueryMeta } +// PeriodicForceResponse is used to respond to a periodic job force launch +type PeriodicForceResponse struct { + EvalID string + EvalCreateIndex uint64 + WriteMeta +} + const ( NodeStatusInit = "initializing" NodeStatusReady = "ready"