Small fixes and test fixes

This commit is contained in:
Alex Dadgar
2015-12-04 16:53:36 -08:00
parent ac3d833942
commit c8f2d0c990
5 changed files with 61 additions and 50 deletions

View File

@@ -242,9 +242,7 @@ func TestJobEndpoint_Register_Periodic(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
// Create the register request for a periodic job.
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
@@ -362,9 +360,7 @@ func TestJobEndpoint_Evaluate_Periodic(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
job := mock.PeriodicJob()
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},
@@ -476,9 +472,7 @@ func TestJobEndpoint_Deregister_Periodic(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
// Create the register request
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
job := mock.PeriodicJob()
reg := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{Region: "global"},

View File

@@ -372,6 +372,7 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that will be triggered soon.
launch := time.Now().Add(1 * time.Second)
@@ -385,12 +386,14 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.Flush()
s1.periodicDispatcher.SetEnabled(false)
// Sleep till after the job should have been launched.
time.Sleep(2 * time.Second)
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
@@ -414,6 +417,7 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
@@ -433,12 +437,14 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
s1.periodicDispatcher.createEval(job, past)
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.Flush()
s1.periodicDispatcher.SetEnabled(false)
// Sleep till after the job should have been launched.
time.Sleep(2 * time.Second)
time.Sleep(3 * time.Second)
// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.

View File

@@ -192,10 +192,11 @@ func SystemJob() *structs.Job {
func PeriodicJob() *structs.Job {
job := Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{
Enabled: true,
SpecType: structs.PeriodicSpecCron,
Spec: "*/30 * * *",
Spec: "*/30 * * * *",
}
return job
}

View File

@@ -45,6 +45,7 @@ type PeriodicDispatch struct {
updateCh chan struct{}
stopCh chan struct{}
waitCh chan struct{}
logger *log.Logger
l sync.RWMutex
}
@@ -57,7 +58,8 @@ func NewPeriodicDispatch(srv *Server) *PeriodicDispatch {
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: srv.logger,
}
}
@@ -70,7 +72,8 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.enabled = enabled
p.l.Unlock()
if !enabled {
p.stopCh <- struct{}{}
close(p.stopCh)
<-p.waitCh
p.Flush()
}
}
@@ -196,6 +199,8 @@ func (p *PeriodicDispatch) ForceRun(jobID string) error {
// run is a long-lived function that waits til a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
defer close(p.waitCh)
// Do nothing if not enabled.
p.l.RLock()
if !p.enabled {
@@ -212,15 +217,24 @@ PICK:
if p.heap.Length() == 0 {
p.l.RUnlock()
p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting")
<-p.updateCh
select {
case <-p.stopCh:
return
case <-p.updateCh:
}
p.l.RLock()
}
nextJob, err := p.heap.Peek()
p.l.RUnlock()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
select {
case <-p.stopCh:
return
default:
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
}
}
launchTime := nextJob.next
@@ -228,8 +242,12 @@ PICK:
// If there are only invalid times, wait for an update.
if launchTime.IsZero() {
p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID)
<-p.updateCh
goto PICK
select {
case <-p.stopCh:
return
case <-p.updateCh:
goto PICK
}
}
now = time.Now()
@@ -426,8 +444,9 @@ func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Tim
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{}, 1)
p.stopCh = make(chan struct{})
p.updateCh = make(chan struct{}, 1)
p.waitCh = make(chan struct{})
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
}
@@ -486,8 +505,11 @@ func (p *periodicHeap) Contains(job *structs.Job) bool {
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
if job, ok := p.index[job.ID]; ok {
p.heap.update(job, next)
if pJob, ok := p.index[job.ID]; ok {
// Need to update the job as well because its spec can change.
pJob.job = job
pJob.next = next
heap.Fix(&p.heap, pJob.index)
return nil
}
@@ -550,9 +572,3 @@ func (h *periodicHeapImp) Pop() interface{} {
*h = old[0 : n-1]
return job
}
// update modifies the priority and next time of an periodic job in the queue.
func (h *periodicHeapImp) update(job *periodicJob, next time.Time) {
job.next = next
heap.Fix(h, job.index)
}

View File

@@ -77,6 +77,7 @@ func testPeriodicJob(times ...time.Time) *structs.Job {
}
func TestPeriodicDispatch_DisabledOperations(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -97,6 +98,7 @@ func TestPeriodicDispatch_DisabledOperations(t *testing.T) {
}
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -115,6 +117,7 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
}
func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -148,15 +151,13 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
}
func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
@@ -193,6 +194,7 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
}
func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -205,6 +207,7 @@ func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
}
func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -232,15 +235,13 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
}
func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that will be evaluated soon.
job := testPeriodicJob(time.Now().Add(1 * time.Second))
@@ -268,6 +269,7 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
}
func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -280,15 +282,13 @@ func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
}
func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
@@ -314,15 +314,13 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
}
func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that will be launched twice.
launch1 := time.Now().Add(1 * time.Second)
launch2 := time.Now().Add(2 * time.Second)
@@ -351,15 +349,13 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
}
func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create two job that will be launched at the same time.
launch := time.Now().Add(1 * time.Second)
job := testPeriodicJob(launch)
@@ -398,15 +394,13 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
// some after each other and some invalid times, and ensures the correct
// behavior.
func TestPeriodicDispatch_Complex(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create some jobs launching at different times.
now := time.Now()
same := now.Add(1 * time.Second)
@@ -496,6 +490,7 @@ func shuffle(jobs []*structs.Job) {
}
func TestPeriodicDispatch_CreatedEvals(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
@@ -527,9 +522,8 @@ func TestPeriodicDispatch_CreatedEvals(t *testing.T) {
}
// TODO: Check that it doesn't create evals for overlapping things.
func TestPeriodicHeap_Order(t *testing.T) {
t.Parallel()
h := NewPeriodicHeap()
j1 := mock.PeriodicJob()
j2 := mock.PeriodicJob()