diff --git a/api/jobs.go b/api/jobs.go index 17e75daff..3eea39daf 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -101,12 +101,19 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta, return resp.EvalID, wm, nil } -//UpdateStrategy is for serializing update strategy for a job. +// UpdateStrategy is for serializing update strategy for a job. type UpdateStrategy struct { Stagger time.Duration MaxParallel int } +// PeriodicConfig is for serializing periodic config for a job. +type PeriodicConfig struct { + Enabled bool + Spec string + SpecType string +} + // Job is used to serialize a job. type Job struct { Region string @@ -119,6 +126,7 @@ type Job struct { Constraints []*Constraint TaskGroups []*TaskGroup Update *UpdateStrategy + Periodic *PeriodicConfig Meta map[string]string Status string StatusDescription string diff --git a/nomad/leader.go b/nomad/leader.go index 41081dcfc..948ca219f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -117,8 +117,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } - // Enable the periodic dispatcher,since we are now the leader. + // Enable the periodic dispatcher, since we are now the leader. s.periodicDispatcher.SetEnabled(true) + s.periodicDispatcher.Start() // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { @@ -175,8 +176,65 @@ func (s *Server) restoreEvalBroker() error { return nil } +// restorePeriodicDispatcher is used to restore all periodic jobs into the +// periodic dispatcher. It also determines if a periodic job should have been +// created during the leadership transition and force runs them. The periodic +// dispatcher is maintained only by the leader, so it must be restored anytime a +// leadership transition takes place. func (s *Server) restorePeriodicDispatcher() error { - // TODO(alex) + iter, err := s.fsm.State().JobsByPeriodic(true) + if err != nil { + return fmt.Errorf("failed to get periodic jobs: %v", err) + } + + now := time.Now() + var last time.Time + for i := iter.Next(); i != nil; i = iter.Next() { + job := i.(*structs.Job) + s.periodicDispatcher.Add(job) + + // Need to force run the job if an evaluation should have been created + // during the leader election period. At a high-level the logic to + // determine whether to force run a job is split based on whether an + // eval has been created for it. If so we check that since the last + // eval, should there have been a launch. If there is no eval, we check + // if there should have been a launch since the job was inserted. + evals, err := s.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + return fmt.Errorf("failed to get the evals created for periodic job %v: %v", + job.ID, err) + } + + // Determine if we need to force run by checking if a run should have + // occured since the last eval + if l := len(evals); l != 0 { + last = evals[l-1].JobLaunch + if !job.Periodic.Next(last).Before(now) { + continue + } + + goto FORCE + } + + // Determine if we need to force run by checking if a run should have + // occured since the job was added. + last = s.fsm.TimeTable().NearestTime(job.ModifyIndex) + // TODO(alex): Think about the 0 time case + if !job.Periodic.Next(last).Before(now) { + continue + } + + FORCE: + if err := s.periodicDispatcher.ForceRun(job.ID); err != nil { + s.logger.Printf( + "[ERR] nomad.periodic: force run of periodic job %q failed: %v", + job.ID, err) + return fmt.Errorf("failed for force run periodic job %q: %v", job.ID, err) + } + s.logger.Printf("[DEBUG] nomad.periodic: periodic job %q force"+ + " run during leadership establishment", job.ID) + } + return nil } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index b753b41f4..13509d66c 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -286,6 +286,190 @@ func TestLeader_EvalBroker_Reset(t *testing.T) { }) } +func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + + s2 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.DevDisableBootstrap = true + }) + defer s2.Shutdown() + + s3 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.DevDisableBootstrap = true + }) + defer s3.Shutdown() + servers := []*Server{s1, s2, s3} + testJoin(t, s1, s2, s3) + testutil.WaitForLeader(t, s1.RPC) + + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.raftPeers.Peers() + return len(peers) == 3, nil + }, func(err error) { + t.Fatalf("should have 3 peers") + }) + } + + var leader *Server + for _, s := range servers { + if s.IsLeader() { + leader = s + break + } + } + if leader == nil { + t.Fatalf("Should have a leader") + } + + // Inject a periodic job and non-periodic job + periodic := mock.PeriodicJob() + nonPeriodic := mock.Job() + for _, job := range []*structs.Job{nonPeriodic, periodic} { + req := structs.JobRegisterRequest{ + Job: job, + } + _, _, err := leader.raftApply(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + // Kill the leader + leader.Shutdown() + time.Sleep(100 * time.Millisecond) + + // Wait for a new leader + leader = nil + testutil.WaitForResult(func() (bool, error) { + for _, s := range servers { + if s.IsLeader() { + leader = s + return true, nil + } + } + return false, nil + }, func(err error) { + t.Fatalf("should have leader") + }) + + // Check that the new leader is tracking the periodic job. + testutil.WaitForResult(func() (bool, error) { + _, tracked := leader.periodicDispatcher.tracked[periodic.ID] + return tracked, nil + }, func(err error) { + t.Fatalf("periodic job not tracked") + }) +} + +func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + + // Inject a periodic job that will be triggered soon. + launch := time.Now().Add(1 * time.Second) + job := testPeriodicJob(launch) + req := structs.JobRegisterRequest{ + Job: job, + } + _, _, err := s1.raftApply(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Flush the periodic dispatcher, ensuring that no evals will be created. + s1.periodicDispatcher.Flush() + + // Sleep till after the job should have been launched. + time.Sleep(2 * time.Second) + + // Restore the periodic dispatcher. + s1.restorePeriodicDispatcher() + + // Ensure the job is tracked. + if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked { + t.Fatalf("periodic job not restored") + } + + // Check that an eval was made. + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err) + } + + if len(evals) != 1 { + t.Fatalf("restorePeriodicDispatcher() didn't create an eval") + } +} + +func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + }) + defer s1.Shutdown() + + // Inject a periodic job that triggered once in the past, should trigger now + // and once in the future. + now := time.Now() + past := now.Add(-1 * time.Second) + future := now.Add(10 * time.Second) + job := testPeriodicJob(past, now, future) + req := structs.JobRegisterRequest{ + Job: job, + } + _, _, err := s1.raftApply(structs.JobRegisterRequestType, req) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create an eval for the past launch. + s1.periodicDispatcher.createEval(job, past) + + // Flush the periodic dispatcher, ensuring that no evals will be created. + s1.periodicDispatcher.Flush() + + // Sleep till after the job should have been launched. + time.Sleep(2 * time.Second) + + // Restore the periodic dispatcher. + s1.restorePeriodicDispatcher() + + // Ensure the job is tracked. + if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked { + t.Fatalf("periodic job not restored") + } + + // Check that an eval was made. + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err) + } + + if len(evals) != 2 { + t.Fatalf("restorePeriodicDispatcher() didn't create an eval") + } + + // Check it was for the right time. + match := false + for _, eval := range evals { + if eval.JobLaunch != past && eval.JobLaunch != future { + match = true + break + } + } + + if !match { + t.Fatal("restorePeriodicDispatcher() didn't create the correct eval") + } +} + func TestLeader_PeriodicDispatch(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 diff --git a/nomad/periodic.go b/nomad/periodic.go index 3d527a82e..e4e2bfb02 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -5,12 +5,21 @@ import ( "errors" "fmt" "log" + "sort" + "strconv" + "strings" "sync" "time" "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // The string appended to the periodic jobs ID when launching derived + // instances of it. + JobLaunchSuffix = "-launch-" +) + // PeriodicRunner is the interface for tracking and launching periodic jobs at // their periodic spec. type PeriodicRunner interface { @@ -117,10 +126,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { if err := p.heap.Update(job, next); err != nil { return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err) } + p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q", job.ID) } else { if err := p.heap.Push(job, next); err != nil { return fmt.Errorf("failed to add job %v", job.ID, err) } + p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q", job.ID) } // Signal an update. @@ -160,6 +171,7 @@ func (p *PeriodicDispatch) Remove(jobID string) error { } } + p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID) return nil } @@ -192,13 +204,14 @@ func (p *PeriodicDispatch) run() { } p.l.RUnlock() - now := time.Now().Local() + var now time.Time PICK: // If there is nothing wait for an update. p.l.RLock() if p.heap.Length() == 0 { p.l.RUnlock() + p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting") <-p.updateCh p.l.RLock() } @@ -206,7 +219,7 @@ PICK: nextJob, err := p.heap.Peek() p.l.RUnlock() if err != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err) + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) return } @@ -214,10 +227,15 @@ PICK: // If there are only invalid times, wait for an update. if launchTime.IsZero() { + p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID) <-p.updateCh goto PICK } + now = time.Now() + p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", + nextJob.job.ID, nextJob.next.Sub(now)) + select { case <-p.stopCh: return @@ -237,7 +255,7 @@ PICK: j, err := p.heap.Peek() if err != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err) + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) break } @@ -246,11 +264,10 @@ PICK: } if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to update next launch of periodic job: %v", j.job.ID, err) + p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err) } - // TODO(alex): Want to be able to check that there isn't a previously - // running cron job for this job. + p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime) go p.createEval(j.job, launchTime) } @@ -273,7 +290,7 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) req := structs.JobRegisterRequest{Job: derived} _, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req) if err != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: Register failed: %v", err) + p.logger.Printf("[ERR] nomad.periodic: registering child job for periodic job %q failed: %v", periodicJob.ID, err) return err } @@ -296,7 +313,7 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) // but that the EvalUpdate does not. _, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update) if err != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: Eval create failed: %v", err) + p.logger.Printf("[ERR] nomad.periodic: creating eval for %q failed: %v", derived.ID, err) return err } @@ -312,7 +329,7 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( // Have to recover in case the job copy panics. defer func() { if r := recover(); r != nil { - p.logger.Printf("[ERR] nomad.periodic_dispatch: deriving job from"+ + p.logger.Printf("[ERR] nomad.periodic: deriving job from"+ " periodic job %v failed; deregistering from periodic runner: %v", periodicJob.ID, r) p.Remove(periodicJob.ID) @@ -326,7 +343,7 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( derived = periodicJob.Copy() derived.ParentID = periodicJob.ID derived.ID = p.derivedJobID(periodicJob, time) - derived.Name = periodicJob.ID + derived.Name = derived.ID derived.Periodic = nil return } @@ -334,32 +351,77 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( // deriveJobID returns a job ID based on the parent periodic job and the launch // time. func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string { - return fmt.Sprintf("%s-%d", periodicJob.ID, time.Unix()) + return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix()) } // CreatedEvals returns the set of evaluations created from the passed periodic -// job. -func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) ([]*structs.Evaluation, error) { +// job in sorted order, with the earliest job launch first. +func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, error) { state := p.srv.fsm.State() iter, err := state.ChildJobs(periodicJobID) if err != nil { return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err) } - var evals []*structs.Evaluation + var evals PeriodicEvals for i := iter.Next(); i != nil; i = iter.Next() { job := i.(*structs.Job) childEvals, err := state.EvalsByJob(job.ID) if err != nil { - fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err) + return nil, fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err) } - evals = append(evals, childEvals...) + for _, eval := range childEvals { + launch, err := p.evalLaunchTime(eval) + if err != nil { + return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err) + } + + pEval := &PeriodicEval{ + Eval: eval, + JobLaunch: launch, + } + + evals = append(evals, pEval) + } } + // Return the sorted evals. + sort.Sort(evals) return evals, nil } +// PeriodicEval stores the evaluation and launch time for an instantiated +// periodic job. +type PeriodicEval struct { + Eval *structs.Evaluation + JobLaunch time.Time +} + +type PeriodicEvals []*PeriodicEval + +func (p PeriodicEvals) Len() int { return len(p) } +func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] } +func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) } + +// evalLaunchTime returns the launch time of the job associated with the eval. +// This is only valid for evaluations created by PeriodicDispatch and will +// otherwise return an error. +func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Time, error) { + jobID := created.JobID + index := strings.LastIndex(jobID, JobLaunchSuffix) + if index == -1 { + return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID) + } + + launch, err := strconv.Atoi(jobID[index+len(JobLaunchSuffix):]) + if err != nil { + return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID) + } + + return time.Unix(int64(launch), 0), nil +} + // Flush clears the state of the PeriodicDispatcher func (p *PeriodicDispatch) Flush() { p.l.Lock() diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index f199038f3..f6382d908 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -63,6 +63,19 @@ func (m *MockPeriodic) Tracked() []structs.Job { return tracked } +func testPeriodicJob(times ...time.Time) *structs.Job { + job := mock.PeriodicJob() + job.Periodic.SpecType = structs.PeriodicSpecTest + + l := make([]string, len(times)) + for i, t := range times { + l[i] = strconv.Itoa(int(t.Unix())) + } + + job.Periodic.Spec = strings.Join(l, ",") + return job +} + func TestPeriodicDispatch_DisabledOperations(t *testing.T) { s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -171,7 +184,7 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals) } - eval := evals[0] + eval := evals[0].Eval expID := s1.periodicDispatcher.derivedJobID(job, expected) if eval.JobID != expID { t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v", @@ -331,8 +344,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) { d := s1.periodicDispatcher expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)} for i, eval := range evals { - if eval.JobID != expected[i] { - t.Fatalf("eval created incorrectly; got %v; want %v", eval.JobID, expected[i]) + if eval.Eval.JobID != expected[i] { + t.Fatalf("eval created incorrectly; got %v; want %v", eval.Eval.JobID, expected[i]) } } } @@ -375,8 +388,8 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { d := s1.periodicDispatcher expected := d.derivedJobID(job, launch) - if evals[0].JobID != expected { - t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].JobID, expected) + if evals[0].Eval.JobID != expected { + t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].Eval.JobID, expected) } } } @@ -464,7 +477,7 @@ func TestPeriodicDispatch_Complex(t *testing.T) { var jobs []string for _, eval := range evals { - jobs = append(jobs, eval.JobID) + jobs = append(jobs, eval.Eval.JobID) } actual[job.ID] = jobs } @@ -482,17 +495,36 @@ func shuffle(jobs []*structs.Job) { } } -func testPeriodicJob(times ...time.Time) *structs.Job { - job := mock.PeriodicJob() - job.Periodic.SpecType = structs.PeriodicSpecTest +func TestPeriodicDispatch_CreatedEvals(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) - l := make([]string, len(times)) - for i, t := range times { - l[i] = strconv.Itoa(int(t.Unix())) + // Create three evals. + job := mock.PeriodicJob() + now := time.Now().Round(time.Second) + times := []time.Time{now.Add(1 * time.Second), now.Add(2 * time.Second), now} + for _, time := range times { + if err := s1.periodicDispatcher.createEval(job, time); err != nil { + t.Fatalf("createEval() failed: %v", err) + } + } + + // Get the created evals. + created, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err) + } + expected := []time.Time{times[2], times[0], times[1]} + for i, c := range created { + if c.JobLaunch != expected[i] { + t.Fatalf("CreatedEvals are in wrong order; got %v; want %v at index %d", + c.JobLaunch, expected[i], i) + } } - job.Periodic.Spec = strings.Join(l, ",") - return job } // TODO: Check that it doesn't create evals for overlapping things. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index db1fd8c79..7834fb610 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -863,9 +863,15 @@ func (j *Job) Validate() error { } // Validate periodic is only used with batch jobs. - if j.Periodic != nil && j.Periodic.Enabled && j.Type != JobTypeBatch { - mErr.Errors = append(mErr.Errors, - fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) + if j.IsPeriodic() { + if j.Type != JobTypeBatch { + mErr.Errors = append(mErr.Errors, + fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch)) + } + + if err := j.Periodic.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } return mErr.ErrorOrNil() diff --git a/nomad/timetable.go b/nomad/timetable.go index 38344f79a..36076ce4a 100644 --- a/nomad/timetable.go +++ b/nomad/timetable.go @@ -64,7 +64,7 @@ func (t *TimeTable) Deserialize(dec *codec.Decoder) error { return nil } -// Witness is used to witness a new inde and time. +// Witness is used to witness a new index and time. func (t *TimeTable) Witness(index uint64, when time.Time) { t.l.Lock() defer t.l.Unlock()