Remove the periodicRunner interface and pass the server as an interface to the periodicDispatcher

This commit is contained in:
Alex Dadgar
2015-12-18 17:26:05 -08:00
parent 180fbaee78
commit 7f2ffe4c9f
7 changed files with 243 additions and 356 deletions

View File

@@ -39,12 +39,12 @@ const (
// along with Raft to provide strong consistency. We implement
// this outside the Server to avoid exposing this outside the package.
type nomadFSM struct {
evalBroker *EvalBroker
periodicRunner PeriodicRunner
logOutput io.Writer
logger *log.Logger
state *state.StateStore
timetable *TimeTable
evalBroker *EvalBroker
periodicDispatcher *PeriodicDispatch
logOutput io.Writer
logger *log.Logger
state *state.StateStore
timetable *TimeTable
}
// nomadSnapshot is used to provide a snapshot of the current
@@ -60,7 +60,7 @@ type snapshotHeader struct {
}
// NewFSMPath is used to construct a new FSM with a blank state
func NewFSM(evalBroker *EvalBroker, periodic PeriodicRunner, logOutput io.Writer) (*nomadFSM, error) {
func NewFSM(evalBroker *EvalBroker, periodic *PeriodicDispatch, logOutput io.Writer) (*nomadFSM, error) {
// Create a state store
state, err := state.NewStateStore(logOutput)
if err != nil {
@@ -68,12 +68,12 @@ func NewFSM(evalBroker *EvalBroker, periodic PeriodicRunner, logOutput io.Writer
}
fsm := &nomadFSM{
evalBroker: evalBroker,
periodicRunner: periodic,
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
state: state,
timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
evalBroker: evalBroker,
periodicDispatcher: periodic,
logOutput: logOutput,
logger: log.New(logOutput, "", log.LstdFlags),
state: state,
timetable: NewTimeTable(timeTableGranularity, timeTableLimit),
}
return fsm, nil
}
@@ -211,8 +211,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// If it is periodic, insert it into the periodic runner and record the
// time it was inserted.
if req.Job.IsPeriodic() {
if err := n.periodicRunner.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Add failed: %v", err)
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
@@ -237,7 +237,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
if parent.IsPeriodic() {
t, err := n.periodicRunner.LaunchTime(req.Job.ID)
t, err := n.periodicDispatcher.LaunchTime(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
return err
@@ -272,8 +272,8 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
}
if job.IsPeriodic() {
if err := n.periodicRunner.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err)
if err := n.periodicDispatcher.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}

View File

@@ -43,7 +43,8 @@ func testStateStore(t *testing.T) *state.StateStore {
}
func testFSM(t *testing.T) *nomadFSM {
fsm, err := NewFSM(testBroker(t, 0), NewMockPeriodic(), os.Stderr)
p, _ := testPeriodicDispatcher()
fsm, err := NewFSM(testBroker(t, 0), p, os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
@@ -249,7 +250,7 @@ func TestFSM_RegisterJob(t *testing.T) {
}
// Verify it was added to the periodic runner.
if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; !ok {
if _, ok := fsm.periodicDispatcher.tracked[job.ID]; !ok {
t.Fatal("job not added to periodic runner")
}
@@ -306,7 +307,7 @@ func TestFSM_DeregisterJob(t *testing.T) {
}
// Verify it was removed from the periodic runner.
if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; ok {
if _, ok := fsm.periodicDispatcher.tracked[job.ID]; ok {
t.Fatal("job not removed from periodic runner")
}

View File

@@ -391,6 +391,10 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
// Sleep till after the job should have been launched.
time.Sleep(3 * time.Second)
// Get the current time to ensure the launch time is after this once we
// restore.
now := time.Now()
// Restore the periodic dispatcher.
s1.periodicDispatcher.SetEnabled(true)
s1.periodicDispatcher.Start()
@@ -402,13 +406,13 @@ func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
}
// Check that an eval was made.
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed: %v", job.ID, err)
last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if len(evals) != 1 {
t.Fatalf("restorePeriodicDispatcher() didn't create an eval")
if last.Launch.Before(now) {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}
}
@@ -453,26 +457,12 @@ func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
}
// Check that an eval was made.
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed: %v", job.ID, err)
last, err := s1.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || last == nil {
t.Fatalf("failed to get periodic launch time: %v", err)
}
if len(evals) != 2 {
t.Fatalf("restorePeriodicDispatcher() didn't create an eval")
}
// Check it was for the right time.
match := false
for _, eval := range evals {
if eval.JobLaunch != past && eval.JobLaunch != future {
match = true
break
}
}
if !match {
t.Fatal("restorePeriodicDispatcher() didn't create the correct eval")
if last.Launch == past {
t.Fatalf("restorePeriodicDispatcher did not force launch")
}
}

View File

@@ -19,26 +19,13 @@ const (
JobLaunchSuffix = "/periodic-"
)
// PeriodicRunner is the interface for tracking and launching periodic jobs at
// their periodic spec.
type PeriodicRunner interface {
Start()
SetEnabled(enabled bool)
Add(job *structs.Job) error
Remove(jobID string) error
ForceRun(jobID string) error
Tracked() []*structs.Job
Flush()
LaunchTime(jobID string) (time.Time, error)
}
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
srv *Server
enabled bool
running bool
dispatcher JobEvalDispatcher
enabled bool
running bool
tracked map[string]*structs.Job
heap *periodicHeap
@@ -50,17 +37,60 @@ type PeriodicDispatch struct {
l sync.RWMutex
}
// JobEvalDispatcher is an interface to submit jobs and have evaluations created
// for them.
type JobEvalDispatcher interface {
// DispatchJob takes a job a new, untracked job and creates an evaluation
// for it.
DispatchJob(job *structs.Job) error
}
// DispatchJob creates an evaluation for the passed job and commits both the
// evaluation and the job to the raft log.
func (s *Server) DispatchJob(job *structs.Job) error {
// Commit this update via Raft
req := structs.JobRegisterRequest{Job: job}
_, index, err := s.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
return err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: job.Priority,
Type: job.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, _, err = s.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
return err
}
return nil
}
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(srv *Server) *PeriodicDispatch {
func NewPeriodicDispatch(logger *log.Logger, dispatcher JobEvalDispatcher) *PeriodicDispatch {
return &PeriodicDispatch{
srv: srv,
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: srv.logger,
dispatcher: dispatcher,
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}),
waitCh: make(chan struct{}),
logger: logger,
}
}
@@ -121,7 +151,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.removeLocked(job.ID)
}
// If the job is diabled and we aren't tracking it, do nothing.
// If the job is disabled and we aren't tracking it, do nothing.
return nil
}
@@ -219,7 +249,11 @@ func (p *PeriodicDispatch) run() {
for p.shouldRun() {
job, launch, err := p.nextLaunch()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
p.l.RLock()
defer p.l.RUnlock()
if !p.running {
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
}
return
} else if job == nil {
return
@@ -325,34 +359,8 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time)
return err
}
// Commit this update via Raft
req := structs.JobRegisterRequest{Job: derived}
_, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: registering child job for periodic job %q failed: %v", periodicJob.ID, err)
return err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: derived.Priority,
Type: derived.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: derived.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic: creating eval for %q failed: %v", derived.ID, err)
if err := p.dispatcher.DispatchJob(derived); err != nil {
p.logger.Printf("[ERR] nomad.periodic: failed to dispatch job %q: %v", periodicJob.ID, err)
return err
}
@@ -361,7 +369,6 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time)
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
// TODO: these jobs need to be marked as GC'able
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
@@ -384,13 +391,14 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = derived.ID
derived.Periodic = nil
derived.GC = true
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix())
return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.UnixNano())
}
// LaunchTime returns the launch time of the job. This is only valid for
@@ -406,7 +414,7 @@ func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
return time.Unix(int64(launch), 0), nil
return time.Unix(0, int64(launch)), nil
}
// Flush clears the state of the PeriodicDispatcher

View File

@@ -2,150 +2,98 @@ package nomad
import (
"fmt"
"log"
"math/rand"
"os"
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
// MockPeriodic can be used by other tests that want to mock the periodic
// dispatcher.
type MockPeriodic struct {
Enabled bool
Jobs map[string]*structs.Job
type MockJobEvalDispatcher struct {
Jobs map[string]*structs.Job
lock sync.Mutex
}
func NewMockPeriodic() *MockPeriodic {
return &MockPeriodic{Jobs: make(map[string]*structs.Job)}
func NewMockJobEvalDispatcher() *MockJobEvalDispatcher {
return &MockJobEvalDispatcher{Jobs: make(map[string]*structs.Job)}
}
func (m *MockPeriodic) SetEnabled(enabled bool) {
m.Enabled = enabled
}
func (m *MockPeriodic) Add(job *structs.Job) error {
if job == nil {
return fmt.Errorf("Must pass non nil job")
}
func (m *MockJobEvalDispatcher) DispatchJob(job *structs.Job) error {
m.lock.Lock()
defer m.lock.Unlock()
m.Jobs[job.ID] = job
return nil
}
func (m *MockPeriodic) Remove(jobID string) error {
delete(m.Jobs, jobID)
return nil
}
func (m *MockPeriodic) ForceRun(jobID string) error {
return nil
}
func (m *MockPeriodic) LaunchTime(jobID string) (time.Time, error) {
return time.Time{}, nil
}
func (m *MockPeriodic) Start() {}
func (m *MockPeriodic) Flush() {
m.Jobs = make(map[string]*structs.Job)
}
func (m *MockPeriodic) Tracked() []*structs.Job {
tracked := make([]*structs.Job, len(m.Jobs))
i := 0
// LaunchTimes returns the launch times of child jobs in sorted order.
func (m *MockJobEvalDispatcher) LaunchTimes(p *PeriodicDispatch, parentID string) ([]time.Time, error) {
m.lock.Lock()
defer m.lock.Unlock()
var launches []time.Time
for _, job := range m.Jobs {
tracked[i] = job
i++
if job.ParentID != parentID {
continue
}
t, err := p.LaunchTime(job.ID)
if err != nil {
return nil, err
}
launches = append(launches, t)
}
return tracked
sort.Sort(times(launches))
return launches, nil
}
type times []time.Time
func (t times) Len() int { return len(t) }
func (t times) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t times) Less(i, j int) bool { return t[i].Before(t[j]) }
// testPeriodicDispatcher returns an enabled PeriodicDispatcher which uses the
// MockJobEvalDispatcher.
func testPeriodicDispatcher() (*PeriodicDispatch, *MockJobEvalDispatcher) {
logger := log.New(os.Stderr, "", log.LstdFlags)
m := NewMockJobEvalDispatcher()
d := NewPeriodicDispatch(logger, m)
d.SetEnabled(true)
d.Start()
return d, m
}
// testPeriodicJob is a helper that creates a periodic job that launches at the
// passed times.
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Unix()))
l[i] = strconv.Itoa(int(t.UnixNano()))
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
// createdEvals returns the set of evaluations created from the passed periodic
// job in sorted order, with the earliest job launch first.
func createdEvals(p *PeriodicDispatch, periodicJobID string) (PeriodicEvals, error) {
state := p.srv.fsm.State()
iter, err := state.ChildJobs(periodicJobID)
if err != nil {
return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err)
}
var evals PeriodicEvals
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
childEvals, err := state.EvalsByJob(job.ID)
if err != nil {
return nil, fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err)
}
for _, eval := range childEvals {
launch, err := p.LaunchTime(eval.JobID)
if err != nil {
return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err)
}
pEval := &PeriodicEval{
Eval: eval,
JobLaunch: launch,
}
evals = append(evals, pEval)
}
}
// Return the sorted evals.
sort.Sort(evals)
return evals, nil
}
// PeriodicEval stores the evaluation and launch time for an instantiated
// periodic job.
type PeriodicEval struct {
Eval *structs.Evaluation
JobLaunch time.Time
}
// For sorting.
type PeriodicEvals []*PeriodicEval
func (p PeriodicEvals) Len() int { return len(p) }
func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) }
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
job := mock.Job()
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
}
tracked := s1.periodicDispatcher.Tracked()
tracked := p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of non-periodic job should be no-op: %v", tracked)
}
@@ -153,29 +101,24 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := s1.periodicDispatcher.Tracked()
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
// Update the job and add it again.
job.Periodic.Spec = "foo"
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = s1.periodicDispatcher.Tracked()
tracked = p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't update: %v", tracked)
}
@@ -187,83 +130,70 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, m := testPeriodicDispatcher()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := s1.periodicDispatcher.Add(job); err != nil {
job.Periodic.Spec = fmt.Sprintf("%d", expected.UnixNano())
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Check that nothing is created.
if _, ok := m.Jobs[job.ID]; ok {
t.Fatalf("periodic dispatcher created eval at the wrong time")
}
time.Sleep(2 * time.Second)
// Check that an eval was created for the right time.
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(evals) != 1 {
t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals)
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
eval := evals[0].Eval
expID := s1.periodicDispatcher.derivedJobID(job, expected)
if eval.JobID != expID {
t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v",
eval.JobID, expID)
if times[0] != expected {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], expected)
}
}
func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
if err := s1.periodicDispatcher.Remove("foo"); err != nil {
p, _ := testPeriodicDispatcher()
if err := p.Remove("foo"); err != nil {
t.Fatalf("Remove failed %v; expected a no-op", err)
}
}
func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := s1.periodicDispatcher.Tracked()
tracked := p.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
tracked = s1.periodicDispatcher.Tracked()
tracked = p.Tracked()
if len(tracked) != 0 {
t.Fatalf("Remove didn't untrack the job: %v", tracked)
}
@@ -271,90 +201,71 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
// Create a job that will be evaluated soon.
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that an eval wasn't created.
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
}
if len(evals) != 0 {
t.Fatalf("Remove didn't cancel creation of an eval: %#v", evals)
d := p.dispatcher.(*MockJobEvalDispatcher)
if _, ok := d.Jobs[job.ID]; ok {
t.Fatalf("Remove didn't cancel creation of an eval")
}
}
func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
if err := s1.periodicDispatcher.ForceRun("foo"); err == nil {
if err := p.ForceRun("foo"); err == nil {
t.Fatal("ForceRun of untracked job should fail")
}
}
func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, m := testPeriodicDispatcher()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
if err := s1.periodicDispatcher.ForceRun(job.ID); err != nil {
if err := p.ForceRun(job.ID); err != nil {
t.Fatalf("ForceRun failed %v", err)
}
// Check that an eval was created for the right time.
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
// Check that job was launched correctly.
launches, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
t.Fatalf("failed to get launch times for job %q: %v", job.ID, err)
}
if len(evals) != 1 {
t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals)
l := len(launches)
if l != 1 {
t.Fatalf("restorePeriodicDispatcher() created an unexpected"+
" number of evals; got %d; want 1", l)
}
}
func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, m := testPeriodicDispatcher()
// Create a job that will be launched twice.
launch1 := time.Now().Add(1 * time.Second)
@@ -362,34 +273,31 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
job := testPeriodicJob(launch1, launch2)
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
// Check that the evals were created correctly
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
// Check that job was launched correctly.
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
t.Fatalf("failed to get launch times for job %q", job.ID)
}
d := s1.periodicDispatcher
expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)}
for i, eval := range evals {
if eval.Eval.JobID != expected[i] {
t.Fatalf("eval created incorrectly; got %v; want %v", eval.Eval.JobID, expected[i])
}
if len(times) != 2 {
t.Fatalf("incorrect number of launch times for job %q", job.ID)
}
if times[0] != launch1 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch1)
}
if times[1] != launch2 {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[1], launch2)
}
}
func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, m := testPeriodicDispatcher()
// Create two job that will be launched at the same time.
launch := time.Now().Add(1 * time.Second)
@@ -397,30 +305,26 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
job2 := testPeriodicJob(launch)
// Add them.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if err := s1.periodicDispatcher.Add(job2); err != nil {
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that the evals were created correctly
// Check that the jobs were launched correctly.
for _, job := range []*structs.Job{job, job2} {
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
times, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
t.Fatalf("failed to get launch times for job %q", job.ID)
}
if len(evals) != 1 {
t.Fatalf("expected 1 eval; got %#v", evals)
if len(times) != 1 {
t.Fatalf("incorrect number of launch times for job %q; got %d; want 1", job.ID, len(times))
}
d := s1.periodicDispatcher
expected := d.derivedJobID(job, launch)
if evals[0].Eval.JobID != expected {
t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].Eval.JobID, expected)
if times[0] != launch {
t.Fatalf("periodic dispatcher created eval for time %v; want %v", times[0], launch)
}
}
}
@@ -430,11 +334,7 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
// behavior.
func TestPeriodicDispatch_Complex(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, m := testPeriodicDispatcher()
// Create some jobs launching at different times.
now := time.Now()
@@ -463,16 +363,12 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
job8 := testPeriodicJob(launch2)
// Create a map of expected eval job ids.
d := s1.periodicDispatcher
expected := map[string][]string{
job1.ID: []string{d.derivedJobID(job1, same)},
job2.ID: []string{d.derivedJobID(job2, same)},
expected := map[string][]time.Time{
job1.ID: []time.Time{same},
job2.ID: []time.Time{same},
job3.ID: nil,
job4.ID: []string{
d.derivedJobID(job4, launch1),
d.derivedJobID(job4, launch3),
},
job5.ID: []string{d.derivedJobID(job5, launch2)},
job4.ID: []time.Time{launch1, launch3},
job5.ID: []time.Time{launch2},
job6.ID: nil,
job7.ID: nil,
job8.ID: nil,
@@ -485,54 +381,46 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
shuffle(toDelete)
for _, job := range jobs {
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}
for _, job := range toDelete {
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
if err := p.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
}
time.Sleep(4 * time.Second)
actual := make(map[string][]string, len(expected))
actual := make(map[string][]time.Time, len(expected))
for _, job := range jobs {
evals, err := createdEvals(s1.periodicDispatcher, job.ID)
launches, err := m.LaunchTimes(p, job.ID)
if err != nil {
t.Fatalf("createdEvals(%v) failed %v", job.ID, err)
t.Fatalf("LaunchTimes(%v) failed %v", job.ID, err)
}
var jobs []string
for _, eval := range evals {
jobs = append(jobs, eval.Eval.JobID)
}
actual[job.ID] = jobs
actual[job.ID] = launches
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Unexpected evals; got %#v; want %#v", actual, expected)
t.Fatalf("Unexpected launches; got %#v; want %#v", actual, expected)
}
}
func TestPeriodicDispatch_NextLaunch(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
p, _ := testPeriodicDispatcher()
// Create two job that will be launched at the same time.
invalid := time.Unix(0, 0)
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
expected := time.Now().Add(1 * time.Second)
job := testPeriodicJob(invalid)
job2 := testPeriodicJob(expected)
// Make sure the periodic dispatcher isn't running.
close(s1.periodicDispatcher.stopCh)
s1.periodicDispatcher.stopCh = make(chan struct{})
close(p.stopCh)
p.stopCh = make(chan struct{})
// Run nextLaunch.
timeout := make(chan struct{})
@@ -540,18 +428,18 @@ func TestPeriodicDispatch_NextLaunch(t *testing.T) {
var launch time.Time
var err error
go func() {
j, launch, err = s1.periodicDispatcher.nextLaunch()
j, launch, err = p.nextLaunch()
close(timeout)
}()
// Add them.
if err := s1.periodicDispatcher.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Delay adding a valid job.
time.Sleep(200 * time.Millisecond)
if err := s1.periodicDispatcher.Add(job2); err != nil {
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}

View File

@@ -185,7 +185,7 @@ func NewServer(config *Config) (*Server, error) {
}
// Create the periodic dispatcher for launching periodic jobs.
s.periodicDispatcher = NewPeriodicDispatch(s)
s.periodicDispatcher = NewPeriodicDispatch(s.logger, s)
// Initialize the RPC layer
// TODO: TLS...

View File

@@ -938,7 +938,7 @@ const (
PeriodicSpecCron = "cron"
// PeriodicSpecTest is only used by unit tests. It is a sorted, comma
// seperated list of unix timestamps at which to launch.
// seperated list of unix nanosecond timestamps at which to launch.
PeriodicSpecTest = "_internal_test"
)
@@ -1003,7 +1003,7 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
return time.Time{}
}
times[i] = time.Unix(int64(unix), 0)
times[i] = time.Unix(0, int64(unix))
}
// Find the next match