From bb012d1b4db1868b7a110c60eae9840879b8bb2a Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 23 Dec 2015 18:54:51 -0800 Subject: [PATCH] Simplify periodic nextLaunch, dispatch and run --- nomad/periodic.go | 115 ++++++++++++----------------------------- nomad/periodic_test.go | 57 +------------------- 2 files changed, 33 insertions(+), 139 deletions(-) diff --git a/nomad/periodic.go b/nomad/periodic.go index abb885580..2377da226 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -2,7 +2,6 @@ package nomad import ( "container/heap" - "errors" "fmt" "log" "strconv" @@ -248,110 +247,60 @@ func (p *PeriodicDispatch) shouldRun() bool { // then creates an evaluation to run the job. func (p *PeriodicDispatch) run() { defer close(p.waitCh) - var now time.Time + var launchCh <-chan time.Time for p.shouldRun() { - job, launch, err := p.nextLaunch() - if err != nil { - p.l.RLock() - defer p.l.RUnlock() - if !p.running { - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) - } - return - } else if job == nil { - return + job, launch := p.nextLaunch() + if launch.IsZero() { + launchCh = nil + } else { + launchDur := launch.Sub(time.Now()) + launchCh = time.After(launchDur) + p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launchDur) } - now = time.Now() - p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s", job.ID, launch.Sub(now)) - select { case <-p.stopCh: return case <-p.updateCh: continue - case <-time.After(launch.Sub(now)): - // Get the current time so that we don't miss any jobs will we're creating evals. - now = time.Now() - p.dispatch(launch, now) + case <-launchCh: + p.dispatch(job, launch) } } } -// dispatch scans the periodic jobs in order of launch time and creates -// evaluations for all jobs whose next launch time is equal to that of the -// passed launchTime. The now time is used to determine the next launch time for -// the dispatched jobs. -func (p *PeriodicDispatch) dispatch(launchTime time.Time, now time.Time) { +// dispatch creates an evaluation for the job and updates its next launchtime +// based on the passed launch time. +func (p *PeriodicDispatch) dispatch(job *structs.Job, launchTime time.Time) { p.l.Lock() defer p.l.Unlock() - // Create evals for all the jobs with the same launch time. - for { - if p.heap.Length() == 0 { - return - } - - j, err := p.heap.Peek() - if err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err) - return - } - - if j.next != launchTime { - return - } - - if err := p.heap.Update(j.job, j.job.Periodic.Next(now)); err != nil { - p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err) - } - - p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime) - go p.createEval(j.job, launchTime) + nextLaunch := job.Periodic.Next(launchTime) + if err := p.heap.Update(job, nextLaunch); err != nil { + p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", job.ID, err) } + + p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", job.ID, launchTime) + p.createEval(job, launchTime) } // nextLaunch returns the next job to launch and when it should be launched. If // the next job can't be determined, an error is returned. If the dispatcher is // stopped, a nil job will be returned. -func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time, error) { -PICK: +func (p *PeriodicDispatch) nextLaunch() (*structs.Job, time.Time) { // If there is nothing wait for an update. p.l.RLock() + defer p.l.RUnlock() if p.heap.Length() == 0 { - p.l.RUnlock() - - // Block until there is an update, or the dispatcher is stopped. - select { - case <-p.stopCh: - return nil, time.Time{}, nil - case <-p.updateCh: - } - p.l.RLock() + return nil, time.Time{} } - nextJob, err := p.heap.Peek() - p.l.RUnlock() - if err != nil { - select { - case <-p.stopCh: - return nil, time.Time{}, nil - default: - return nil, time.Time{}, err - } + nextJob := p.heap.Peek() + if nextJob == nil { + return nil, time.Time{} } - // If there are only invalid times, wait for an update. - if nextJob.next.IsZero() { - select { - case <-p.stopCh: - return nil, time.Time{}, nil - case <-p.updateCh: - goto PICK - } - } - - return nextJob.job, nextJob.next, nil + return nextJob.job, nextJob.next } // createEval instantiates a job based on the passed periodic job and submits an @@ -461,22 +410,22 @@ func (p *periodicHeap) Push(job *structs.Job, next time.Time) error { return nil } -func (p *periodicHeap) Pop() (*periodicJob, error) { +func (p *periodicHeap) Pop() *periodicJob { if len(p.heap) == 0 { - return nil, errors.New("heap is empty") + return nil } pJob := heap.Pop(&p.heap).(*periodicJob) delete(p.index, pJob.job.ID) - return pJob, nil + return pJob } -func (p *periodicHeap) Peek() (periodicJob, error) { +func (p *periodicHeap) Peek() *periodicJob { if len(p.heap) == 0 { - return periodicJob{}, errors.New("heap is empty") + return nil } - return *(p.heap[0]), nil + return p.heap[0] } func (p *periodicHeap) Contains(job *structs.Job) bool { diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index c2e395919..0e7002265 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -433,57 +433,6 @@ func TestPeriodicDispatch_Complex(t *testing.T) { } } -func TestPeriodicDispatch_NextLaunch(t *testing.T) { - t.Parallel() - p, _ := testPeriodicDispatcher() - - // Create two job that will be launched at the same time. - invalid := time.Unix(0, 0) - expected := time.Now().Round(1 * time.Second).Add(1 * time.Second) - job := testPeriodicJob(invalid) - job2 := testPeriodicJob(expected) - - // Make sure the periodic dispatcher isn't running. - close(p.stopCh) - p.stopCh = make(chan struct{}) - - // Run nextLaunch. - timeout := make(chan struct{}) - var j *structs.Job - var launch time.Time - var err error - go func() { - j, launch, err = p.nextLaunch() - close(timeout) - }() - - // Add them. - if err := p.Add(job); err != nil { - t.Fatalf("Add failed %v", err) - } - - // Delay adding a valid job. - time.Sleep(200 * time.Millisecond) - if err := p.Add(job2); err != nil { - t.Fatalf("Add failed %v", err) - } - - select { - case <-time.After(2 * time.Second): - t.Fatal("timeout") - case <-timeout: - if err != nil { - t.Fatalf("nextLaunch() failed: %v", err) - } - if j != job2 { - t.Fatalf("Incorrect job returned; got %v; want %v", j, job2) - } - if launch != expected { - t.Fatalf("Incorrect launch time; got %v; want %v", launch, expected) - } - } -} - func shuffle(jobs []*structs.Job) { rand.Seed(time.Now().Unix()) for i := range jobs { @@ -512,11 +461,7 @@ func TestPeriodicHeap_Order(t *testing.T) { exp := []string{"j2", "j3", "j1"} var act []string for i := 0; i < 3; i++ { - pJob, err := h.Pop() - if err != nil { - t.Fatal(err) - } - + pJob := h.Pop() act = append(act, lookup[pJob.job]) }