diff --git a/jobspec/parse.go b/jobspec/parse.go index 963c0b3e4..3c61fdd3f 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -698,8 +698,8 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error m["Enabled"] = enabled } - // If "cron_spec" is provided, set the type to "cron" and store the spec. - if cron, ok := m["cron_spec"]; ok { + // If "cron" is provided, set the type to "cron" and store the spec. + if cron, ok := m["cron"]; ok { m["SpecType"] = structs.PeriodicSpecCron m["Spec"] = cron } diff --git a/jobspec/test-fixtures/periodic-cron.hcl b/jobspec/test-fixtures/periodic-cron.hcl index 2b1bd2b39..c463cc4f1 100644 --- a/jobspec/test-fixtures/periodic-cron.hcl +++ b/jobspec/test-fixtures/periodic-cron.hcl @@ -1,5 +1,5 @@ job "foo" { periodic { - cron_spec = "*/5 * * *" + cron = "*/5 * * *" } } diff --git a/nomad/fsm.go b/nomad/fsm.go index ca81f0a77..af1799404 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -207,11 +207,46 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { return err } + // If it is periodic, insert it into the periodic runner and record the + // time it was inserted. if req.Job.IsPeriodic() { if err := n.periodicRunner.Add(req.Job); err != nil { n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Add failed: %v", err) return err } + + // Record the insertion time as a launch. + launch := &structs.PeriodicLaunch{req.Job.ID, time.Now()} + if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err) + return err + } + } + + // Check if the parent job is periodic and mark the launch time. + parentID := req.Job.ParentID + if parentID != "" { + parent, err := n.state.JobByID(parentID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err) + return err + } else if parent == nil { + // The parent has been deregistered. + return nil + } + + if parent.IsPeriodic() { + t, err := n.periodicRunner.LaunchTime(req.Job.ID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err) + return err + } + launch := &structs.PeriodicLaunch{parentID, t} + if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil { + n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err) + return err + } + } } return nil @@ -224,14 +259,27 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } + job, err := n.state.JobByID(req.JobID) + if err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) + return err + } + if err := n.state.DeleteJob(index, req.JobID); err != nil { n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) return err } - if err := n.periodicRunner.Remove(req.JobID); err != nil { - n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err) - return err + if job.IsPeriodic() { + if err := n.periodicRunner.Remove(req.JobID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err) + return err + } + + if err := n.state.DeletePeriodicLaunch(index, req.JobID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: DeletePeriodicLaunch failed: %v", err) + return err + } } return nil diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index eaba36917..d89ee67f8 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -222,9 +222,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) { func TestFSM_RegisterJob(t *testing.T) { fsm := testFSM(t) - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Periodic = &structs.PeriodicConfig{Enabled: true} + job := mock.PeriodicJob() req := structs.JobRegisterRequest{ Job: job, } @@ -254,14 +252,24 @@ func TestFSM_RegisterJob(t *testing.T) { if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; !ok { t.Fatal("job not added to periodic runner") } + + // Verify the launch time was tracked. + launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if launchOut == nil { + t.Fatalf("not found!") + } + if launchOut.Launch.IsZero() { + t.Fatalf("bad launch time: %v", launchOut.Launch) + } } func TestFSM_DeregisterJob(t *testing.T) { fsm := testFSM(t) - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Periodic = &structs.PeriodicConfig{Enabled: true} + job := mock.PeriodicJob() req := structs.JobRegisterRequest{ Job: job, } @@ -301,6 +309,15 @@ func TestFSM_DeregisterJob(t *testing.T) { if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; ok { t.Fatal("job not removed from periodic runner") } + + // Verify it was removed from the periodic launch table. + launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + if launchOut != nil { + t.Fatalf("launch found!") + } } func TestFSM_UpdateEval(t *testing.T) { diff --git a/nomad/leader.go b/nomad/leader.go index 948ca219f..f1ec1a3b3 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -188,43 +188,19 @@ func (s *Server) restorePeriodicDispatcher() error { } now := time.Now() - var last time.Time for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) s.periodicDispatcher.Add(job) - // Need to force run the job if an evaluation should have been created - // during the leader election period. At a high-level the logic to - // determine whether to force run a job is split based on whether an - // eval has been created for it. If so we check that since the last - // eval, should there have been a launch. If there is no eval, we check - // if there should have been a launch since the job was inserted. - evals, err := s.periodicDispatcher.CreatedEvals(job.ID) - if err != nil { - return fmt.Errorf("failed to get the evals created for periodic job %v: %v", - job.ID, err) + launch, err := s.fsm.State().PeriodicLaunchByID(job.ID) + if err != nil || launch == nil { + return fmt.Errorf("failed to get periodic launch time: %v", err) } - // Determine if we need to force run by checking if a run should have - // occured since the last eval - if l := len(evals); l != 0 { - last = evals[l-1].JobLaunch - if !job.Periodic.Next(last).Before(now) { - continue - } - - goto FORCE - } - - // Determine if we need to force run by checking if a run should have - // occured since the job was added. - last = s.fsm.TimeTable().NearestTime(job.ModifyIndex) - // TODO(alex): Think about the 0 time case - if !job.Periodic.Next(last).Before(now) { + if !job.Periodic.Next(launch.Launch).Before(now) { continue } - FORCE: if err := s.periodicDispatcher.ForceRun(job.ID); err != nil { s.logger.Printf( "[ERR] nomad.periodic: force run of periodic job %q failed: %v", diff --git a/nomad/periodic.go b/nomad/periodic.go index c4f062c26..f0cee38ba 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -17,7 +17,7 @@ import ( const ( // The string appended to the periodic jobs ID when launching derived // instances of it. - JobLaunchSuffix = "-launch-" + JobLaunchSuffix = "/periodic-" ) // PeriodicRunner is the interface for tracking and launching periodic jobs at @@ -30,6 +30,7 @@ type PeriodicRunner interface { ForceRun(jobID string) error Tracked() []structs.Job Flush() + LaunchTime(jobID string) (time.Time, error) } // PeriodicDispatch is used to track and launch periodic jobs. It maintains the @@ -72,8 +73,10 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) { p.enabled = enabled p.l.Unlock() if !enabled { - close(p.stopCh) - <-p.waitCh + if p.running { + close(p.stopCh) + <-p.waitCh + } p.Flush() } } @@ -107,7 +110,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { // Do nothing if not enabled if !p.enabled { - return fmt.Errorf("periodic dispatch disabled") + return nil } // If we were tracking a job and it has been disabled or made non-periodic remove it. @@ -156,7 +159,7 @@ func (p *PeriodicDispatch) Remove(jobID string) error { // Do nothing if not enabled if !p.enabled { - return fmt.Errorf("periodic dispatch disabled") + return nil } if job, tracked := p.tracked[jobID]; tracked { @@ -374,6 +377,7 @@ func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time // CreatedEvals returns the set of evaluations created from the passed periodic // job in sorted order, with the earliest job launch first. +// TODO: Get rid of this func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, error) { state := p.srv.fsm.State() iter, err := state.ChildJobs(periodicJobID) @@ -390,7 +394,7 @@ func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, er } for _, eval := range childEvals { - launch, err := p.evalLaunchTime(eval) + launch, err := p.LaunchTime(eval.JobID) if err != nil { return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err) } @@ -422,11 +426,9 @@ func (p PeriodicEvals) Len() int { return len(p) } func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] } func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) } -// evalLaunchTime returns the launch time of the job associated with the eval. -// This is only valid for evaluations created by PeriodicDispatch and will -// otherwise return an error. -func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Time, error) { - jobID := created.JobID +// LaunchTime returns the launch time of the job. This is only valid for +// jobs created by PeriodicDispatch and will otherwise return an error. +func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) { index := strings.LastIndex(jobID, JobLaunchSuffix) if index == -1 { return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID) diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index abe05b088..db6594634 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -47,6 +47,10 @@ func (m *MockPeriodic) ForceRun(jobID string) error { return nil } +func (m *MockPeriodic) LaunchTime(jobID string) (time.Time, error) { + return time.Time{}, nil +} + func (m *MockPeriodic) Start() {} func (m *MockPeriodic) Flush() { @@ -76,27 +80,6 @@ func testPeriodicJob(times ...time.Time) *structs.Job { return job } -func TestPeriodicDispatch_DisabledOperations(t *testing.T) { - t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) - - // Disable the dispatcher. - s1.periodicDispatcher.SetEnabled(false) - - job := mock.PeriodicJob() - if err := s1.periodicDispatcher.Add(job); err == nil { - t.Fatalf("Add on disabled dispatcher should fail") - } - - if err := s1.periodicDispatcher.Remove(job.ID); err == nil { - t.Fatalf("Remove on disabled dispatcher should fail") - } -} - func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() s1 := testServer(t, func(c *Config) { diff --git a/nomad/state/schema.go b/nomad/state/schema.go index d3119016b..0ff2ad58a 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -19,6 +19,7 @@ func stateStoreSchema() *memdb.DBSchema { indexTableSchema, nodeTableSchema, jobTableSchema, + periodicLaunchTableSchema, evalTableSchema, allocTableSchema, } @@ -141,6 +142,28 @@ func jobIsGCable(obj interface{}) (bool, error) { return j.GC, nil } +// periodicLaunchTableSchema returns the MemDB schema tracking the most recent +// launch time for a perioidic job. +func periodicLaunchTableSchema() *memdb.TableSchema { + return &memdb.TableSchema{ + Name: "periodic_launch", + Indexes: map[string]*memdb.IndexSchema{ + // Primary index is used for job management + // and simple direct lookup. ID is required to be + // unique. + "id": &memdb.IndexSchema{ + Name: "id", + AllowMissing: false, + Unique: true, + Indexer: &memdb.StringFieldIndex{ + Field: "ID", + Lowercase: true, + }, + }, + }, + } +} + // evalTableSchema returns the MemDB schema for the eval table. // This table is used to store all the evaluations that are pending // or recently completed. diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f0953d403..103c08a8b 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -408,6 +408,80 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) { return iter, nil } +// UpsertPeriodicLaunch is used to register a launch or update it. +func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error { + txn := s.db.Txn(true) + defer txn.Abort() + + watcher := watch.NewItems() + watcher.Add(watch.Item{Table: "periodic_launch"}) + watcher.Add(watch.Item{Job: launch.ID}) + + // Check if the job already exists + if _, err := txn.First("periodic_launch", "id", launch.ID); err != nil { + return fmt.Errorf("periodic launch lookup failed: %v", err) + } + + // Insert the job + if err := txn.Insert("periodic_launch", launch); err != nil { + return fmt.Errorf("launch insert failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Defer(func() { s.watch.notify(watcher) }) + txn.Commit() + return nil +} + +// DeletePeriodicLaunch is used to delete the periodic launch +func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error { + txn := s.db.Txn(true) + defer txn.Abort() + + watcher := watch.NewItems() + watcher.Add(watch.Item{Table: "periodic_launch"}) + watcher.Add(watch.Item{Job: jobID}) + + // Lookup the launch + existing, err := txn.First("periodic_launch", "id", jobID) + if err != nil { + return fmt.Errorf("launch lookup failed: %v", err) + } + if existing == nil { + return fmt.Errorf("launch not found") + } + + // Delete the launch + if err := txn.Delete("periodic_launch", existing); err != nil { + return fmt.Errorf("launch delete failed: %v", err) + } + if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + + txn.Defer(func() { s.watch.notify(watcher) }) + txn.Commit() + return nil +} + +// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job +// ID. +func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, error) { + txn := s.db.Txn(false) + + existing, err := txn.First("periodic_launch", "id", id) + if err != nil { + return nil, fmt.Errorf("periodic launch lookup failed: %v", err) + } + + if existing != nil { + return existing.(*structs.PeriodicLaunch), nil + } + return nil, nil +} + // UpsertEvaluation is used to upsert an evaluation func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error { txn := s.db.Txn(true) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index a1504e936..5e27f0112 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -5,6 +5,7 @@ import ( "reflect" "sort" "testing" + "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -668,6 +669,122 @@ func TestStateStore_RestoreJob(t *testing.T) { notify.verify(t) } +func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + launch := &structs.PeriodicLaunch{job.ID, time.Now()} + + notify := setupNotifyTest( + state, + watch.Item{Table: "periodic_launch"}, + watch.Item{Job: job.ID}) + + err := state.UpsertPeriodicLaunch(1000, launch) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.PeriodicLaunchByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(launch, out) { + t.Fatalf("bad: %#v %#v", job, out) + } + + index, err := state.Index("periodic_launch") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1000 { + t.Fatalf("bad: %d", index) + } + + notify.verify(t) +} + +func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + launch := &structs.PeriodicLaunch{job.ID, time.Now()} + + notify := setupNotifyTest( + state, + watch.Item{Table: "periodic_launch"}, + watch.Item{Job: job.ID}) + + err := state.UpsertPeriodicLaunch(1000, launch) + if err != nil { + t.Fatalf("err: %v", err) + } + + launch2 := &structs.PeriodicLaunch{job.ID, launch.Launch.Add(1 * time.Second)} + err = state.UpsertPeriodicLaunch(1001, launch2) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.PeriodicLaunchByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if !reflect.DeepEqual(launch2, out) { + t.Fatalf("bad: %#v %#v", launch2, out) + } + + index, err := state.Index("periodic_launch") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + + notify.verify(t) +} + +func TestStateStore_DeletePeriodicLaunch(t *testing.T) { + state := testStateStore(t) + job := mock.Job() + launch := &structs.PeriodicLaunch{job.ID, time.Now()} + + notify := setupNotifyTest( + state, + watch.Item{Table: "periodic_launch"}, + watch.Item{Job: job.ID}) + + err := state.UpsertPeriodicLaunch(1000, launch) + if err != nil { + t.Fatalf("err: %v", err) + } + + err = state.DeletePeriodicLaunch(1001, job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + out, err := state.PeriodicLaunchByID(job.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + if out != nil { + t.Fatalf("bad: %#v %#v", job, out) + } + + index, err := state.Index("periodic_launch") + if err != nil { + t.Fatalf("err: %v", err) + } + if index != 1001 { + t.Fatalf("bad: %d", index) + } + + notify.verify(t) +} + func TestStateStore_Indexes(t *testing.T) { state := testStateStore(t) node := mock.Node() diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7834fb610..a33befbd3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1017,6 +1017,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time { return time.Time{} } +// PeriodicLaunch tracks the last launch time of a periodic job. +type PeriodicLaunch struct { + ID string // ID of the periodic job. + Launch time.Time // The last launch time. +} + var ( defaultServiceJobRestartPolicy = RestartPolicy{ Delay: 15 * time.Second, diff --git a/website/source/docs/jobspec/index.html.md b/website/source/docs/jobspec/index.html.md index f55251c2b..f3c2a4435 100644 --- a/website/source/docs/jobspec/index.html.md +++ b/website/source/docs/jobspec/index.html.md @@ -156,6 +156,24 @@ The `job` object supports the following keys: and "h" suffix can be used, such as "30s". Both values default to zero, which disables rolling updates. +* `periodic` - `periodic` allows the job to be scheduled at fixed times, dates + or intervals. The `periodic` block has the following configuration: + + ``` + periodic { + // Enabled is defaulted to true if the block is included. It can be set + // to false to pause the periodic job from running. + enabled = true + + // A cron expression configuring the interval the job is launched at. + // Supports predefined expressions such as "@daily" and "@weekly" + cron = "*/15 * * * * *" + } + ``` + + `cron`: See [here](https://github.com/gorhill/cronexpr#implementation) + for full documentation of supported cron specs and the predefined expressions. + ### Task Group The `group` object supports the following keys: