diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index d1dc0948a..b9bb9a80e 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -242,9 +242,7 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) // Create the register request for a periodic job. - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Periodic = &structs.PeriodicConfig{Enabled: true} + job := mock.PeriodicJob() req := &structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{Region: "global"}, @@ -362,9 +360,7 @@ func TestJobEndpoint_Evaluate_Periodic(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) // Create the register request - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Periodic = &structs.PeriodicConfig{Enabled: true} + job := mock.PeriodicJob() req := &structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{Region: "global"}, @@ -476,9 +472,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) // Create the register request - job := mock.Job() - job.Type = structs.JobTypeBatch - job.Periodic = &structs.PeriodicConfig{Enabled: true} + job := mock.PeriodicJob() reg := &structs.JobRegisterRequest{ Job: job, WriteRequest: structs.WriteRequest{Region: "global"}, diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 13509d66c..186f5d48e 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -372,6 +372,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { c.NumSchedulers = 0 }) defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) // Inject a periodic job that will be triggered soon. launch := time.Now().Add(1 * time.Second) @@ -385,12 +386,14 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) { } // Flush the periodic dispatcher, ensuring that no evals will be created. - s1.periodicDispatcher.Flush() + s1.periodicDispatcher.SetEnabled(false) // Sleep till after the job should have been launched. - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) // Restore the periodic dispatcher. + s1.periodicDispatcher.SetEnabled(true) + s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. @@ -414,6 +417,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { c.NumSchedulers = 0 }) defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) // Inject a periodic job that triggered once in the past, should trigger now // and once in the future. @@ -433,12 +437,14 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) { s1.periodicDispatcher.createEval(job, past) // Flush the periodic dispatcher, ensuring that no evals will be created. - s1.periodicDispatcher.Flush() + s1.periodicDispatcher.SetEnabled(false) // Sleep till after the job should have been launched. - time.Sleep(2 * time.Second) + time.Sleep(3 * time.Second) // Restore the periodic dispatcher. + s1.periodicDispatcher.SetEnabled(true) + s1.periodicDispatcher.Start() s1.restorePeriodicDispatcher() // Ensure the job is tracked. diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 0d61009a3..3408e0f97 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -192,10 +192,11 @@ func SystemJob() *structs.Job { func PeriodicJob() *structs.Job { job := Job() + job.Type = structs.JobTypeBatch job.Periodic = &structs.PeriodicConfig{ Enabled: true, SpecType: structs.PeriodicSpecCron, - Spec: "*/30 * * *", + Spec: "*/30 * * * *", } return job } diff --git a/nomad/periodic.go b/nomad/periodic.go index e4e2bfb02..c4f062c26 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -45,6 +45,7 @@ type PeriodicDispatch struct { updateCh chan struct{} stopCh chan struct{} + waitCh chan struct{} logger *log.Logger l sync.RWMutex } @@ -57,7 +58,8 @@ func NewPeriodicDispatch(srv *Server) *PeriodicDispatch { tracked: make(map[string]*structs.Job), heap: NewPeriodicHeap(), updateCh: make(chan struct{}, 1), - stopCh: make(chan struct{}, 1), + stopCh: make(chan struct{}), + waitCh: make(chan struct{}), logger: srv.logger, } } @@ -70,7 +72,8 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) { p.enabled = enabled p.l.Unlock() if !enabled { - p.stopCh <- struct{}{} + close(p.stopCh) + <-p.waitCh p.Flush() } } @@ -196,6 +199,8 @@ func (p *PeriodicDispatch) ForceRun(jobID string) error { // run is a long-lived function that waits til a job's periodic spec is met and // then creates an evaluation to run the job. func (p *PeriodicDispatch) run() { + defer close(p.waitCh) + // Do nothing if not enabled. p.l.RLock() if !p.enabled { @@ -212,15 +217,24 @@ PICK: if p.heap.Length() == 0 { p.l.RUnlock() p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting") - <-p.updateCh + select { + case <-p.stopCh: + return + case <-p.updateCh: + } p.l.RLock() } nextJob, err := p.heap.Peek() p.l.RUnlock() if err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) - return + select { + case <-p.stopCh: + return + default: + p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) + return + } } launchTime := nextJob.next @@ -228,8 +242,12 @@ PICK: // If there are only invalid times, wait for an update. if launchTime.IsZero() { p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID) - <-p.updateCh - goto PICK + select { + case <-p.stopCh: + return + case <-p.updateCh: + goto PICK + } } now = time.Now() @@ -426,8 +444,9 @@ func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Tim func (p *PeriodicDispatch) Flush() { p.l.Lock() defer p.l.Unlock() - p.stopCh = make(chan struct{}, 1) + p.stopCh = make(chan struct{}) p.updateCh = make(chan struct{}, 1) + p.waitCh = make(chan struct{}) p.tracked = make(map[string]*structs.Job) p.heap = NewPeriodicHeap() } @@ -486,8 +505,11 @@ func (p *periodicHeap) Contains(job *structs.Job) bool { } func (p *periodicHeap) Update(job *structs.Job, next time.Time) error { - if job, ok := p.index[job.ID]; ok { - p.heap.update(job, next) + if pJob, ok := p.index[job.ID]; ok { + // Need to update the job as well because its spec can change. + pJob.job = job + pJob.next = next + heap.Fix(&p.heap, pJob.index) return nil } @@ -550,9 +572,3 @@ func (h *periodicHeapImp) Pop() interface{} { *h = old[0 : n-1] return job } - -// update modifies the priority and next time of an periodic job in the queue. -func (h *periodicHeapImp) update(job *periodicJob, next time.Time) { - job.next = next - heap.Fix(h, job.index) -} diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index f6382d908..abe05b088 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -77,6 +77,7 @@ func testPeriodicJob(times ...time.Time) *structs.Job { } func TestPeriodicDispatch_DisabledOperations(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -97,6 +98,7 @@ func TestPeriodicDispatch_DisabledOperations(t *testing.T) { } func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -115,6 +117,7 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { } func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -148,15 +151,13 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { } func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create a job that won't be evalauted for a while. job := testPeriodicJob(time.Now().Add(10 * time.Second)) @@ -193,6 +194,7 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { } func TestPeriodicDispatch_Remove_Untracked(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -205,6 +207,7 @@ func TestPeriodicDispatch_Remove_Untracked(t *testing.T) { } func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -232,15 +235,13 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { } func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create a job that will be evaluated soon. job := testPeriodicJob(time.Now().Add(1 * time.Second)) @@ -268,6 +269,7 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { } func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -280,15 +282,13 @@ func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { } func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create a job that won't be evalauted for a while. job := testPeriodicJob(time.Now().Add(10 * time.Second)) @@ -314,15 +314,13 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { } func TestPeriodicDispatch_Run_Multiple(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create a job that will be launched twice. launch1 := time.Now().Add(1 * time.Second) launch2 := time.Now().Add(2 * time.Second) @@ -351,15 +349,13 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) { } func TestPeriodicDispatch_Run_SameTime(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create two job that will be launched at the same time. launch := time.Now().Add(1 * time.Second) job := testPeriodicJob(launch) @@ -398,15 +394,13 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) { // some after each other and some invalid times, and ensures the correct // behavior. func TestPeriodicDispatch_Complex(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) defer s1.Shutdown() testutil.WaitForLeader(t, s1.RPC) - // Start the periodic dispatcher. - s1.periodicDispatcher.Start() - // Create some jobs launching at different times. now := time.Now() same := now.Add(1 * time.Second) @@ -496,6 +490,7 @@ func shuffle(jobs []*structs.Job) { } func TestPeriodicDispatch_CreatedEvals(t *testing.T) { + t.Parallel() s1 := testServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -527,9 +522,8 @@ func TestPeriodicDispatch_CreatedEvals(t *testing.T) { } -// TODO: Check that it doesn't create evals for overlapping things. - func TestPeriodicHeap_Order(t *testing.T) { + t.Parallel() h := NewPeriodicHeap() j1 := mock.PeriodicJob() j2 := mock.PeriodicJob()