diff --git a/nomad/fsm.go b/nomad/fsm.go index 334957754..f9c43f7f4 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -39,12 +39,12 @@ const ( // along with Raft to provide strong consistency. We implement // this outside the Server to avoid exposing this outside the package. type nomadFSM struct { - evalBroker *EvalBroker - periodicRunner PeriodicRunner - logOutput io.Writer - logger *log.Logger - state *state.StateStore - timetable *TimeTable + evalBroker *EvalBroker + periodicDispatcher *PeriodicDispatch + logOutput io.Writer + logger *log.Logger + state *state.StateStore + timetable *TimeTable } // nomadSnapshot is used to provide a snapshot of the current @@ -60,7 +60,7 @@ type snapshotHeader struct { } // NewFSMPath is used to construct a new FSM with a blank state -func NewFSM(evalBroker *EvalBroker, periodic PeriodicRunner, logOutput io.Writer) (*nomadFSM, error) { +func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) { // Create a state store state, err := state.NewStateStore(logOutput) if err != nil { @@ -68,12 +68,12 @@ func NewFSM(evalBroker *EvalBroker, periodic PeriodicRunner, logOutput io.Writer } fsm := &nomadFSM{ - evalBroker: evalBroker, - periodicRunner: periodic, - logOutput: logOutput, - logger: log.New(logOutput, "", log.LstdFlags), - state: state, - timetable: NewTimeTable(timeTableGranularity, timeTableLimit), + evalBroker: evalBroker, + periodicDispatcher: periodic, + logOutput: logOutput, + logger: log.New(logOutput, "", log.LstdFlags), + state: state, + timetable: NewTimeTable(timeTableGranularity, timeTableLimit), } return fsm, nil } @@ -211,8 +211,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { // If it is periodic, insert it into the periodic runner and record the // time it was inserted. if req.Job.IsPeriodic() { - if err := n.periodicRunner.Add(req.Job); err != nil { - n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Add failed: %v", err) + if err := n.periodicDispatcher.Add(req.Job); err != nil { + n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err) return err } @@ -237,7 +237,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } if parent.IsPeriodic() { - t, err := n.periodicRunner.LaunchTime(req.Job.ID) + t, err := n.periodicDispatcher.LaunchTime(req.Job.ID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err) return err @@ -272,8 +272,8 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { } if job.IsPeriodic() { - if err := n.periodicRunner.Remove(req.JobID); err != nil { - n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err) + if err := n.periodicDispatcher.Remove(req.JobID); err != nil { + n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err) return err } diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index c1bf4dfa5..ef76486a8 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -43,7 +43,8 @@ func testStateStore(t *testing.T) *state.StateStore { } func testFSM(t *testing.T) *nomadFSM { - fsm, err := NewFSM(testBroker(t, 0), NewMockPeriodic(), os.Stderr) + p, _ := testPeriodicDispatcher() + fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr) if err != nil { t.Fatalf("err: %v", err) } @@ -249,7 +250,7 @@ func TestFSM_RegisterJob(t *testing.T) { } // Verify it was added to the periodic runner. - if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; !ok { + if _, ok := fsm.periodicDispatcher.tracked[job.ID]; !ok { t.Fatal("job not added to periodic runner") } @@ -306,7 +307,7 @@ func TestFSM_DeregisterJob(t *testing.T) { } // Verify it was removed from the periodic runner. - if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; ok { + if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok { t.Fatal("job not removed from periodic runner") } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 4c035efcd..6dab28cae 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -391,6 +391,10 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { // Sleep till after the job should have been launched. time.Sleep(3 * time.Second) + // Get the current time to ensure the launch time is after this once we + // restore. + now := time.Now() + // Restore the periodic dispatcher. s1.periodicDispatcher.SetEnabled(true) s1.periodicDispatcher.Start() @@ -402,13 +406,13 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { } // Check that an eval was made. - evals, err := createdEvals(s1.periodicDispatcher, job.ID) - if err != nil { - t.Fatalf("createdEvals(%v) failed: %v", job.ID, err) + last, err := s1.fsm.State().PeriodicLaunchByID(job.ID) + if err != nil || last == nil { + t.Fatalf("failed to get periodic launch time: %v", err) } - if len(evals) != 1 { - t.Fatalf("restorePeriodicDispatcher() didn't create an eval") + if last.Launch.Before(now) { + t.Fatalf("restorePeriodicDispatcher did not force launch") } } @@ -453,26 +457,12 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { } // Check that an eval was made. - evals, err := createdEvals(s1.periodicDispatcher, job.ID) - if err != nil { - t.Fatalf("createdEvals(%v) failed: %v", job.ID, err) + last, err := s1.fsm.State().PeriodicLaunchByID(job.ID) + if err != nil || last == nil { + t.Fatalf("failed to get periodic launch time: %v", err) } - - if len(evals) != 2 { - t.Fatalf("restorePeriodicDispatcher() didn't create an eval") - } - - // Check it was for the right time. - match := false - for _, eval := range evals { - if eval.JobLaunch != past && eval.JobLaunch != future { - match = true - break - } - } - - if !match { - t.Fatal("restorePeriodicDispatcher() didn't create the correct eval") + if last.Launch == past { + t.Fatalf("restorePeriodicDispatcher did not force launch") } } diff --git a/nomad/periodic.go b/nomad/periodic.go index 0d9e1547e..36678d342 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -19,26 +19,13 @@ const ( JobLaunchSuffix = "/periodic-" ) -// PeriodicRunner is the interface for tracking and launching periodic jobs at -// their periodic spec. -type PeriodicRunner interface { - Start() - SetEnabled(enabled bool) - Add(job *structs.Job) error - Remove(jobID string) error - ForceRun(jobID string) error - Tracked() []*structs.Job - Flush() - LaunchTime(jobID string) (time.Time, error) -} - // PeriodicDispatch is used to track and launch periodic jobs. It maintains the // set of periodic jobs and creates derived jobs and evaluations per // instantiation which is determined by the periodic spec. type PeriodicDispatch struct { - srv *Server - enabled bool - running bool + dispatcher JobEvalDispatcher + enabled bool + running bool tracked map[string]*structs.Job heap *periodicHeap @@ -50,17 +37,60 @@ type PeriodicDispatch struct { l sync.RWMutex } +// JobEvalDispatcher is an interface to submit jobs and have evaluations created +// for them. +type JobEvalDispatcher interface { + // DispatchJob takes a job a new, untracked job and creates an evaluation + // for it. + DispatchJob(job *structs.Job) error +} + +// DispatchJob creates an evaluation for the passed job and commits both the +// evaluation and the job to the raft log. +func (s *Server) DispatchJob(job *structs.Job) error { + // Commit this update via Raft + req := structs.JobRegisterRequest{Job: job} + _, index, err := s.raftApply(structs.JobRegisterRequestType, req) + if err != nil { + return err + } + + // Create a new evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + JobModifyIndex: index, + Status: structs.EvalStatusPending, + } + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + + // Commit this evaluation via Raft + // XXX: There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not. + _, _, err = s.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + return err + } + + return nil +} + // NewPeriodicDispatch returns a periodic dispatcher that is used to track and // launch periodic jobs. -func NewPeriodicDispatch(srv *Server) *PeriodicDispatch { +func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch { return &PeriodicDispatch{ - srv: srv, - tracked: make(map[string]*structs.Job), - heap: NewPeriodicHeap(), - updateCh: make(chan struct{}, 1), - stopCh: make(chan struct{}), - waitCh: make(chan struct{}), - logger: srv.logger, + dispatcher: dispatcher, + tracked: make(map[string]*structs.Job), + heap: NewPeriodicHeap(), + updateCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + waitCh: make(chan struct{}), + logger: logger, } } @@ -121,7 +151,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { p.removeLocked(job.ID) } - // If the job is diabled and we aren't tracking it, do nothing. + // If the job is disabled and we aren't tracking it, do nothing. return nil } @@ -219,7 +249,11 @@ func (p *PeriodicDispatch) run() { for p.shouldRun() { job, launch, err := p.nextLaunch() if err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) + p.l.RLock() + defer p.l.RUnlock() + if !p.running { + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) + } return } else if job == nil { return @@ -325,34 +359,8 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) return err } - // Commit this update via Raft - req := structs.JobRegisterRequest{Job: derived} - _, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req) - if err != nil { - p.logger.Printf("[ERR] nomad.periodic: registering child job for periodic job %q failed: %v", periodicJob.ID, err) - return err - } - - // Create a new evaluation - eval := &structs.Evaluation{ - ID: structs.GenerateUUID(), - Priority: derived.Priority, - Type: derived.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: derived.ID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - } - - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - p.logger.Printf("[ERR] nomad.periodic: creating eval for %q failed: %v", derived.ID, err) + if err := p.dispatcher.DispatchJob(derived); err != nil { + p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err) return err } @@ -361,7 +369,6 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) // deriveJob instantiates a new job based on the passed periodic job and the // launch time. -// TODO: these jobs need to be marked as GC'able func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( derived *structs.Job, err error) { @@ -384,13 +391,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( derived.ID = p.derivedJobID(periodicJob, time) derived.Name = derived.ID derived.Periodic = nil + derived.GC = true return } // deriveJobID returns a job ID based on the parent periodic job and the launch // time. func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string { - return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix()) + return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.UnixNano()) } // LaunchTime returns the launch time of the job. This is only valid for @@ -406,7 +414,7 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) { return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID) } - return time.Unix(int64(launch), 0), nil + return time.Unix(0, int64(launch)), nil } // Flush clears the state of the PeriodicDispatcher diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 4cde0b3f9..c5f1510d4 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -2,150 +2,98 @@ package nomad import ( "fmt" + "log" "math/rand" + "os" "reflect" "sort" "strconv" "strings" + "sync" "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" ) -// MockPeriodic can be used by other tests that want to mock the periodic -// dispatcher. -type MockPeriodic struct { - Enabled bool - Jobs map[string]*structs.Job +type MockJobEvalDispatcher struct { + Jobs map[string]*structs.Job + lock sync.Mutex } -func NewMockPeriodic() *MockPeriodic { - return &MockPeriodic{Jobs: make(map[string]*structs.Job)} +func NewMockJobEvalDispatcher() *MockJobEvalDispatcher { + return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)} } -func (m *MockPeriodic) SetEnabled(enabled bool) { - m.Enabled = enabled -} - -func (m *MockPeriodic) Add(job *structs.Job) error { - if job == nil { - return fmt.Errorf("Must pass non nil job") - } - +func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error { + m.lock.Lock() + defer m.lock.Unlock() m.Jobs[job.ID] = job return nil } -func (m *MockPeriodic) Remove(jobID string) error { - delete(m.Jobs, jobID) - return nil -} - -func (m *MockPeriodic) ForceRun(jobID string) error { - return nil -} - -func (m *MockPeriodic) LaunchTime(jobID string) (time.Time, error) { - return time.Time{}, nil -} - -func (m *MockPeriodic) Start() {} - -func (m *MockPeriodic) Flush() { - m.Jobs = make(map[string]*structs.Job) -} - -func (m *MockPeriodic) Tracked() []*structs.Job { - tracked := make([]*structs.Job, len(m.Jobs)) - i := 0 +// LaunchTimes returns the launch times of child jobs in sorted order. +func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, parentID string) ([]time.Time, error) { + m.lock.Lock() + defer m.lock.Unlock() + var launches []time.Time for _, job := range m.Jobs { - tracked[i] = job - i++ + if job.ParentID != parentID { + continue + } + + t, err := p.LaunchTime(job.ID) + if err != nil { + return nil, err + } + launches = append(launches, t) } - return tracked + sort.Sort(times(launches)) + return launches, nil } +type times []time.Time + +func (t times) Len() int { return len(t) } +func (t times) Swap(i, j int) { t[i], t[j] = t[j], t[i] } +func (t times) Less(i, j int) bool { return t[i].Before(t[j]) } + +// testPeriodicDispatcher returns an enabled PeriodicDispatcher which uses the +// MockJobEvalDispatcher. +func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) { + logger := log.New(os.Stderr, "", log.LstdFlags) + m := NewMockJobEvalDispatcher() + d := NewPeriodicDispatch(logger, m) + d.SetEnabled(true) + d.Start() + return d, m +} + +// testPeriodicJob is a helper that creates a periodic job that launches at the +// passed times. func testPeriodicJob(times ...time.Time) *structs.Job { job := mock.PeriodicJob() job.Periodic.SpecType = structs.PeriodicSpecTest l := make([]string, len(times)) for i, t := range times { - l[i] = strconv.Itoa(int(t.Unix())) + l[i] = strconv.Itoa(int(t.UnixNano())) } job.Periodic.Spec = strings.Join(l, ",") return job } -// createdEvals returns the set of evaluations created from the passed periodic -// job in sorted order, with the earliest job launch first. -func createdEvals(p *PeriodicDispatch, periodicJobID string) (PeriodicEvals, error) { - state := p.srv.fsm.State() - iter, err := state.ChildJobs(periodicJobID) - if err != nil { - return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err) - } - - var evals PeriodicEvals - for i := iter.Next(); i != nil; i = iter.Next() { - job := i.(*structs.Job) - childEvals, err := state.EvalsByJob(job.ID) - if err != nil { - return nil, fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err) - } - - for _, eval := range childEvals { - launch, err := p.LaunchTime(eval.JobID) - if err != nil { - return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err) - } - - pEval := &PeriodicEval{ - Eval: eval, - JobLaunch: launch, - } - - evals = append(evals, pEval) - } - } - - // Return the sorted evals. - sort.Sort(evals) - return evals, nil -} - -// PeriodicEval stores the evaluation and launch time for an instantiated -// periodic job. -type PeriodicEval struct { - Eval *structs.Evaluation - JobLaunch time.Time -} - -// For sorting. -type PeriodicEvals []*PeriodicEval - -func (p PeriodicEvals) Len() int { return len(p) } -func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) } - func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) - + p, _ := testPeriodicDispatcher() job := mock.Job() - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err) } - tracked := s1.periodicDispatcher.Tracked() + tracked := p.Tracked() if len(tracked) != 0 { t.Fatalf("Add of non-periodic job should be no-op: %v", tracked) } @@ -153,29 +101,24 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) - + p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } - tracked := s1.periodicDispatcher.Tracked() + tracked := p.Tracked() if len(tracked) != 1 { t.Fatalf("Add didn't track the job: %v", tracked) } // Update the job and add it again. job.Periodic.Spec = "foo" - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } - tracked = s1.periodicDispatcher.Tracked() + tracked = p.Tracked() if len(tracked) != 1 { t.Fatalf("Add didn't update: %v", tracked) } @@ -187,83 +130,70 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, m := testPeriodicDispatcher() // Create a job that won't be evalauted for a while. job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } // Update it to be sooner and re-add. expected := time.Now().Add(1 * time.Second) - job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix()) - if err := s1.periodicDispatcher.Add(job); err != nil { + job.Periodic.Spec = fmt.Sprintf("%d", expected.UnixNano()) + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } + // Check that nothing is created. + if _, ok := m.Jobs[job.ID]; ok { + t.Fatalf("periodic dispatcher created eval at the wrong time") + } + time.Sleep(2 * time.Second) - // Check that an eval was created for the right time. - evals, err := createdEvals(s1.periodicDispatcher, job.ID) + // Check that job was launched correctly. + times, err := m.LaunchTimes(p, job.ID) if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) + t.Fatalf("failed to get launch times for job %q", job.ID) } - - if len(evals) != 1 { - t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals) + if len(times) != 1 { + t.Fatalf("incorrect number of launch times for job %q", job.ID) } - - eval := evals[0].Eval - expID := s1.periodicDispatcher.derivedJobID(job, expected) - if eval.JobID != expID { - t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v", - eval.JobID, expID) + if times[0] != expected { + t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], expected) } } func TestPeriodicDispatch_Remove_Untracked(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) - - if err := s1.periodicDispatcher.Remove("foo"); err != nil { + p, _ := testPeriodicDispatcher() + if err := p.Remove("foo"); err != nil { t.Fatalf("Remove failed %v; expected a no-op", err) } } func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, _ := testPeriodicDispatcher() job := mock.PeriodicJob() - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } - tracked := s1.periodicDispatcher.Tracked() + tracked := p.Tracked() if len(tracked) != 1 { t.Fatalf("Add didn't track the job: %v", tracked) } - if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + if err := p.Remove(job.ID); err != nil { t.Fatalf("Remove failed %v", err) } - tracked = s1.periodicDispatcher.Tracked() + tracked = p.Tracked() if len(tracked) != 0 { t.Fatalf("Remove didn't untrack the job: %v", tracked) } @@ -271,90 +201,71 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, _ := testPeriodicDispatcher() // Create a job that will be evaluated soon. job := testPeriodicJob(time.Now().Add(1 * time.Second)) // Add it. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } // Remove the job. - if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + if err := p.Remove(job.ID); err != nil { t.Fatalf("Add failed %v", err) } time.Sleep(2 * time.Second) // Check that an eval wasn't created. - evals, err := createdEvals(s1.periodicDispatcher, job.ID) - if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) - } - - if len(evals) != 0 { - t.Fatalf("Remove didn't cancel creation of an eval: %#v", evals) + d := p.dispatcher.(*MockJobEvalDispatcher) + if _, ok := d.Jobs[job.ID]; ok { + t.Fatalf("Remove didn't cancel creation of an eval") } } func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, _ := testPeriodicDispatcher() - if err := s1.periodicDispatcher.ForceRun("foo"); err == nil { + if err := p.ForceRun("foo"); err == nil { t.Fatal("ForceRun of untracked job should fail") } } func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, m := testPeriodicDispatcher() // Create a job that won't be evalauted for a while. job := testPeriodicJob(time.Now().Add(10 * time.Second)) // Add it. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } // ForceRun the job - if err := s1.periodicDispatcher.ForceRun(job.ID); err != nil { + if err := p.ForceRun(job.ID); err != nil { t.Fatalf("ForceRun failed %v", err) } - // Check that an eval was created for the right time. - evals, err := createdEvals(s1.periodicDispatcher, job.ID) + // Check that job was launched correctly. + launches, err := m.LaunchTimes(p, job.ID) if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) + t.Fatalf("failed to get launch times for job %q: %v", job.ID, err) } - - if len(evals) != 1 { - t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals) + l := len(launches) + if l != 1 { + t.Fatalf("restorePeriodicDispatcher() created an unexpected"+ + " number of evals; got %d; want 1", l) } } func TestPeriodicDispatch_Run_Multiple(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, m := testPeriodicDispatcher() // Create a job that will be launched twice. launch1 := time.Now().Add(1 * time.Second) @@ -362,34 +273,31 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) { job := testPeriodicJob(launch1, launch2) // Add it. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } time.Sleep(3 * time.Second) - // Check that the evals were created correctly - evals, err := createdEvals(s1.periodicDispatcher, job.ID) + // Check that job was launched correctly. + times, err := m.LaunchTimes(p, job.ID) if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) + t.Fatalf("failed to get launch times for job %q", job.ID) } - - d := s1.periodicDispatcher - expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)} - for i, eval := range evals { - if eval.Eval.JobID != expected[i] { - t.Fatalf("eval created incorrectly; got %v; want %v", eval.Eval.JobID, expected[i]) - } + if len(times) != 2 { + t.Fatalf("incorrect number of launch times for job %q", job.ID) + } + if times[0] != launch1 { + t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1) + } + if times[1] != launch2 { + t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[1], launch2) } } func TestPeriodicDispatch_Run_SameTime(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, m := testPeriodicDispatcher() // Create two job that will be launched at the same time. launch := time.Now().Add(1 * time.Second) @@ -397,30 +305,26 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { job2 := testPeriodicJob(launch) // Add them. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } - if err := s1.periodicDispatcher.Add(job2); err != nil { + if err := p.Add(job2); err != nil { t.Fatalf("Add failed %v", err) } time.Sleep(2 * time.Second) - // Check that the evals were created correctly + // Check that the jobs were launched correctly. for _, job := range []*structs.Job{job, job2} { - evals, err := createdEvals(s1.periodicDispatcher, job.ID) + times, err := m.LaunchTimes(p, job.ID) if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) + t.Fatalf("failed to get launch times for job %q", job.ID) } - - if len(evals) != 1 { - t.Fatalf("expected 1 eval; got %#v", evals) + if len(times) != 1 { + t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times)) } - - d := s1.periodicDispatcher - expected := d.derivedJobID(job, launch) - if evals[0].Eval.JobID != expected { - t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].Eval.JobID, expected) + if times[0] != launch { + t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch) } } } @@ -430,11 +334,7 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { // behavior. func TestPeriodicDispatch_Complex(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, m := testPeriodicDispatcher() // Create some jobs launching at different times. now := time.Now() @@ -463,16 +363,12 @@ func TestPeriodicDispatch_Complex(t *testing.T) { job8 := testPeriodicJob(launch2) // Create a map of expected eval job ids. - d := s1.periodicDispatcher - expected := map[string][]string{ - job1.ID: []string{d.derivedJobID(job1, same)}, - job2.ID: []string{d.derivedJobID(job2, same)}, + expected := map[string][]time.Time{ + job1.ID: []time.Time{same}, + job2.ID: []time.Time{same}, job3.ID: nil, - job4.ID: []string{ - d.derivedJobID(job4, launch1), - d.derivedJobID(job4, launch3), - }, - job5.ID: []string{d.derivedJobID(job5, launch2)}, + job4.ID: []time.Time{launch1, launch3}, + job5.ID: []time.Time{launch2}, job6.ID: nil, job7.ID: nil, job8.ID: nil, @@ -485,54 +381,46 @@ func TestPeriodicDispatch_Complex(t *testing.T) { shuffle(toDelete) for _, job := range jobs { - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } } for _, job := range toDelete { - if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + if err := p.Remove(job.ID); err != nil { t.Fatalf("Remove failed %v", err) } } time.Sleep(4 * time.Second) - actual := make(map[string][]string, len(expected)) + actual := make(map[string][]time.Time, len(expected)) for _, job := range jobs { - evals, err := createdEvals(s1.periodicDispatcher, job.ID) + launches, err := m.LaunchTimes(p, job.ID) if err != nil { - t.Fatalf("createdEvals(%v) failed %v", job.ID, err) + t.Fatalf("LaunchTimes(%v) failed %v", job.ID, err) } - var jobs []string - for _, eval := range evals { - jobs = append(jobs, eval.Eval.JobID) - } - actual[job.ID] = jobs + actual[job.ID] = launches } if !reflect.DeepEqual(actual, expected) { - t.Fatalf("Unexpected evals; got %#v; want %#v", actual, expected) + t.Fatalf("Unexpected launches; got %#v; want %#v", actual, expected) } } func TestPeriodicDispatch_NextLaunch(t *testing.T) { t.Parallel() - s1 := testServer(t, func(c *Config) { - c.NumSchedulers = 0 // Prevent automatic dequeue - }) - defer s1.Shutdown() - testutil.WaitForLeader(t, s1.RPC) + p, _ := testPeriodicDispatcher() // Create two job that will be launched at the same time. invalid := time.Unix(0, 0) - expected := time.Now().Round(1 * time.Second).Add(1 * time.Second) + expected := time.Now().Add(1 * time.Second) job := testPeriodicJob(invalid) job2 := testPeriodicJob(expected) // Make sure the periodic dispatcher isn't running. - close(s1.periodicDispatcher.stopCh) - s1.periodicDispatcher.stopCh = make(chan struct{}) + close(p.stopCh) + p.stopCh = make(chan struct{}) // Run nextLaunch. timeout := make(chan struct{}) @@ -540,18 +428,18 @@ func TestPeriodicDispatch_NextLaunch(t *testing.T) { var launch time.Time var err error go func() { - j, launch, err = s1.periodicDispatcher.nextLaunch() + j, launch, err = p.nextLaunch() close(timeout) }() // Add them. - if err := s1.periodicDispatcher.Add(job); err != nil { + if err := p.Add(job); err != nil { t.Fatalf("Add failed %v", err) } // Delay adding a valid job. time.Sleep(200 * time.Millisecond) - if err := s1.periodicDispatcher.Add(job2); err != nil { + if err := p.Add(job2); err != nil { t.Fatalf("Add failed %v", err) } diff --git a/nomad/server.go b/nomad/server.go index 3a1c25d83..3a17c8224 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -185,7 +185,7 @@ func NewServer(config *Config) (*Server, error) { } // Create the periodic dispatcher for launching periodic jobs. - s.periodicDispatcher = NewPeriodicDispatch(s) + s.periodicDispatcher = NewPeriodicDispatch(s.logger, s) // Initialize the RPC layer // TODO: TLS... diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0773e94fb..7f56fa169 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -938,7 +938,7 @@ const ( PeriodicSpecCron = "cron" // PeriodicSpecTest is only used by unit tests. It is a sorted, comma - // seperated list of unix timestamps at which to launch. + // seperated list of unix nanosecond timestamps at which to launch. PeriodicSpecTest = "_internal_test" ) @@ -1003,7 +1003,7 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time { return time.Time{} } - times[i] = time.Unix(int64(unix), 0) + times[i] = time.Unix(0, int64(unix)) } // Find the next match