From a8ee688ba67a13728ae5af9d890dc42c351ac6d2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 18 Dec 2015 12:26:28 -0800 Subject: [PATCH] merge --- nomad/leader.go | 16 ++ nomad/mock/mock.go | 10 + nomad/periodic.go | 473 +++++++++++++++++++++++++++++- nomad/periodic_test.go | 492 ++++++++++++++++++++++++++++++++ nomad/server.go | 9 +- nomad/state/schema.go | 9 + nomad/state/state_store.go | 12 + nomad/state/state_store_test.go | 42 +++ nomad/structs/structs.go | 45 +++ nomad/structs/structs_test.go | 74 +++++ 10 files changed, 1177 insertions(+), 5 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 0e267a7dc..41081dcfc 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -117,6 +117,14 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } + // Enable the periodic dispatcher,since we are now the leader. + s.periodicDispatcher.SetEnabled(true) + + // Restore the periodic dispatcher state + if err := s.restorePeriodicDispatcher(); err != nil { + return err + } + // Scheduler periodic jobs go s.schedulePeriodic(stopCh) @@ -167,6 +175,11 @@ func (s *Server) restoreEvalBroker() error { return nil } +func (s *Server) restorePeriodicDispatcher() error { + // TODO(alex) + return nil +} + // schedulePeriodic is used to do periodic job dispatch while we are leader func (s *Server) schedulePeriodic(stopCh chan struct{}) { evalGC := time.NewTicker(s.config.EvalGCInterval) @@ -250,6 +263,9 @@ func (s *Server) revokeLeadership() error { // Disable the eval broker, since it is only useful as a leader s.evalBroker.SetEnabled(false) + // Disable the periodic dispatcher, since it is only useful as a leader + s.periodicDispatcher.SetEnabled(false) + // Clear the heartbeat timers on either shutdown or step down, // since we are no longer responsible for TTL expirations. if err := s.clearAllHeartbeatTimers(); err != nil { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 3c2bb4f5e..0d61009a3 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -190,6 +190,16 @@ func SystemJob() *structs.Job { return job } +func PeriodicJob() *structs.Job { + job := Job() + job.Periodic = &structs.PeriodicConfig{ + Enabled: true, + SpecType: structs.PeriodicSpecCron, + Spec: "*/30 * * *", + } + return job +} + func Eval() *structs.Evaluation { eval := &structs.Evaluation{ ID: structs.GenerateUUID(), diff --git a/nomad/periodic.go b/nomad/periodic.go index f3d253ac0..3d527a82e 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -1,27 +1,496 @@ package nomad -import "github.com/hashicorp/nomad/nomad/structs" +import ( + "container/heap" + "errors" + "fmt" + "log" + "sync" + "time" + "github.com/hashicorp/nomad/nomad/structs" +) + +// PeriodicRunner is the interface for tracking and launching periodic jobs at +// their periodic spec. type PeriodicRunner interface { + Start() SetEnabled(enabled bool) Add(job *structs.Job) error Remove(jobID string) error ForceRun(jobID string) error + Tracked() []structs.Job + Flush() } -type PeriodicDispatch struct{} +// PeriodicDispatch is used to track and launch periodic jobs. It maintains the +// set of periodic jobs and creates derived jobs and evaluations per +// instantiation which is determined by the periodic spec. +type PeriodicDispatch struct { + srv *Server + enabled bool + running bool + tracked map[string]*structs.Job + heap *periodicHeap + + updateCh chan struct{} + stopCh chan struct{} + logger *log.Logger + l sync.RWMutex +} + +// NewPeriodicDispatch returns a periodic dispatcher that is used to track and +// launch periodic jobs. +func NewPeriodicDispatch(srv *Server) *PeriodicDispatch { + return &PeriodicDispatch{ + srv: srv, + tracked: make(map[string]*structs.Job), + heap: NewPeriodicHeap(), + updateCh: make(chan struct{}, 1), + stopCh: make(chan struct{}, 1), + logger: srv.logger, + } +} + +// SetEnabled is used to control if the periodic dispatcher is enabled. It +// should only be enabled on the active leader. Disabling an active dispatcher +// will stop any launched go routine and flush the dispatcher. func (p *PeriodicDispatch) SetEnabled(enabled bool) { + p.l.Lock() + p.enabled = enabled + p.l.Unlock() + if !enabled { + p.stopCh <- struct{}{} + p.Flush() + } } +// Start begins the goroutine that creates derived jobs and evals. +func (p *PeriodicDispatch) Start() { + p.l.Lock() + p.running = true + p.l.Unlock() + go p.run() +} + +// Tracked returns the set of tracked job IDs. +func (p *PeriodicDispatch) Tracked() []structs.Job { + p.l.RLock() + defer p.l.RUnlock() + tracked := make([]structs.Job, len(p.tracked)) + i := 0 + for _, job := range p.tracked { + tracked[i] = *job + i++ + } + return tracked +} + +// Add begins tracking of a periodic job. If it is already tracked, it acts as +// an update to the jobs periodic spec. func (p *PeriodicDispatch) Add(job *structs.Job) error { + p.l.Lock() + defer p.l.Unlock() + + // Do nothing if not enabled + if !p.enabled { + return fmt.Errorf("periodic dispatch disabled") + } + + // If we were tracking a job and it has been disabled or made non-periodic remove it. + disabled := !job.IsPeriodic() || !job.Periodic.Enabled + _, tracked := p.tracked[job.ID] + if tracked && disabled { + return p.Remove(job.ID) + } + + // If the job is diabled and we aren't tracking it, do nothing. + if disabled { + return nil + } + + // Add or update the job. + p.tracked[job.ID] = job + next := job.Periodic.Next(time.Now()) + if tracked { + if err := p.heap.Update(job, next); err != nil { + return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err) + } + } else { + if err := p.heap.Push(job, next); err != nil { + return fmt.Errorf("failed to add job %v", job.ID, err) + } + } + + // Signal an update. + if p.running { + select { + case p.updateCh <- struct{}{}: + default: + } + } + return nil } +// Remove stops tracking the passed job. If the job is not tracked, it is a +// no-op. func (p *PeriodicDispatch) Remove(jobID string) error { + p.l.Lock() + defer p.l.Unlock() + + // Do nothing if not enabled + if !p.enabled { + return fmt.Errorf("periodic dispatch disabled") + } + + if job, tracked := p.tracked[jobID]; tracked { + delete(p.tracked, jobID) + if err := p.heap.Remove(job); err != nil { + return fmt.Errorf("failed to remove tracked job %v: %v", jobID, err) + } + } + + // Signal an update. + if p.running { + select { + case p.updateCh <- struct{}{}: + default: + } + } + return nil } +// ForceRun causes the periodic job to be evaluated immediately. func (p *PeriodicDispatch) ForceRun(jobID string) error { + p.l.Lock() + defer p.l.Unlock() + + // Do nothing if not enabled + if !p.enabled { + return fmt.Errorf("periodic dispatch disabled") + } + + job, tracked := p.tracked[jobID] + if !tracked { + return fmt.Errorf("can't force run non-tracked job %v", jobID) + } + + return p.createEval(job, time.Now()) +} + +// run is a long-lived function that waits til a job's periodic spec is met and +// then creates an evaluation to run the job. +func (p *PeriodicDispatch) run() { + // Do nothing if not enabled. + p.l.RLock() + if !p.enabled { + p.l.RUnlock() + return + } + p.l.RUnlock() + + now := time.Now().Local() + +PICK: + // If there is nothing wait for an update. + p.l.RLock() + if p.heap.Length() == 0 { + p.l.RUnlock() + <-p.updateCh + p.l.RLock() + } + + nextJob, err := p.heap.Peek() + p.l.RUnlock() + if err != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err) + return + } + + launchTime := nextJob.next + + // If there are only invalid times, wait for an update. + if launchTime.IsZero() { + <-p.updateCh + goto PICK + } + + select { + case <-p.stopCh: + return + case <-p.updateCh: + goto PICK + case <-time.After(nextJob.next.Sub(now)): + // Get the current time so that we don't miss any jobs will we are + // creating evals. + nowUpdate := time.Now() + + // Create evals for all the jobs with the same launch time. + p.l.Lock() + for { + if p.heap.Length() == 0 { + break + } + + j, err := p.heap.Peek() + if err != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err) + break + } + + if j.next != launchTime { + break + } + + if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to update next launch of periodic job: %v", j.job.ID, err) + } + + // TODO(alex): Want to be able to check that there isn't a previously + // running cron job for this job. + go p.createEval(j.job, launchTime) + } + + p.l.Unlock() + now = nowUpdate + } + + goto PICK +} + +// createEval instantiates a job based on the passed periodic job and submits an +// evaluation for it. +func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error { + derived, err := p.deriveJob(periodicJob, time) + if err != nil { + return err + } + + // Commit this update via Raft + req := structs.JobRegisterRequest{Job: derived} + _, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req) + if err != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: Register failed: %v", err) + return err + } + + // Create a new evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: derived.Priority, + Type: derived.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: derived.ID, + JobModifyIndex: index, + Status: structs.EvalStatusPending, + } + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + + // Commit this evaluation via Raft + // XXX: There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not. + _, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: Eval create failed: %v", err) + return err + } + return nil } + +// deriveJob instantiates a new job based on the passed periodic job and the +// launch time. +// TODO: these jobs need to be marked as GC'able +func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) ( + derived *structs.Job, err error) { + + // Have to recover in case the job copy panics. + defer func() { + if r := recover(); r != nil { + p.logger.Printf("[ERR] nomad.periodic_dispatch: deriving job from"+ + " periodic job %v failed; deregistering from periodic runner: %v", + periodicJob.ID, r) + p.Remove(periodicJob.ID) + derived = nil + err = fmt.Errorf("Failed to create a copy of the periodic job %v: %v", periodicJob.ID, r) + } + }() + + // Create a copy of the periodic job, give it a derived ID/Name and make it + // non-periodic. + derived = periodicJob.Copy() + derived.ParentID = periodicJob.ID + derived.ID = p.derivedJobID(periodicJob, time) + derived.Name = periodicJob.ID + derived.Periodic = nil + return +} + +// deriveJobID returns a job ID based on the parent periodic job and the launch +// time. +func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string { + return fmt.Sprintf("%s-%d", periodicJob.ID, time.Unix()) +} + +// CreatedEvals returns the set of evaluations created from the passed periodic +// job. +func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) ([]*structs.Evaluation, error) { + state := p.srv.fsm.State() + iter, err := state.ChildJobs(periodicJobID) + if err != nil { + return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err) + } + + var evals []*structs.Evaluation + for i := iter.Next(); i != nil; i = iter.Next() { + job := i.(*structs.Job) + childEvals, err := state.EvalsByJob(job.ID) + if err != nil { + fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err) + } + + evals = append(evals, childEvals...) + } + + return evals, nil +} + +// Flush clears the state of the PeriodicDispatcher +func (p *PeriodicDispatch) Flush() { + p.l.Lock() + defer p.l.Unlock() + p.stopCh = make(chan struct{}, 1) + p.updateCh = make(chan struct{}, 1) + p.tracked = make(map[string]*structs.Job) + p.heap = NewPeriodicHeap() +} + +// TODO +type periodicHeap struct { + index map[string]*periodicJob + heap periodicHeapImp +} + +type periodicJob struct { + job *structs.Job + next time.Time + index int +} + +func NewPeriodicHeap() *periodicHeap { + return &periodicHeap{ + index: make(map[string]*periodicJob), + heap: make(periodicHeapImp, 0), + } +} + +func (p *periodicHeap) Push(job *structs.Job, next time.Time) error { + if _, ok := p.index[job.ID]; ok { + return fmt.Errorf("job %v already exists", job.ID) + } + + pJob := &periodicJob{job, next, 0} + p.index[job.ID] = pJob + heap.Push(&p.heap, pJob) + return nil +} + +func (p *periodicHeap) Pop() (*periodicJob, error) { + if len(p.heap) == 0 { + return nil, errors.New("heap is empty") + } + + pJob := heap.Pop(&p.heap).(*periodicJob) + delete(p.index, pJob.job.ID) + return pJob, nil +} + +func (p *periodicHeap) Peek() (periodicJob, error) { + if len(p.heap) == 0 { + return periodicJob{}, errors.New("heap is empty") + } + + return *(p.heap[0]), nil +} + +func (p *periodicHeap) Contains(job *structs.Job) bool { + _, ok := p.index[job.ID] + return ok +} + +func (p *periodicHeap) Update(job *structs.Job, next time.Time) error { + if job, ok := p.index[job.ID]; ok { + p.heap.update(job, next) + return nil + } + + return fmt.Errorf("heap doesn't contain job %v", job.ID) +} + +func (p *periodicHeap) Remove(job *structs.Job) error { + if pJob, ok := p.index[job.ID]; ok { + heap.Remove(&p.heap, pJob.index) + delete(p.index, job.ID) + return nil + } + + return fmt.Errorf("heap doesn't contain job %v", job.ID) +} + +func (p *periodicHeap) Length() int { + return len(p.heap) +} + +type periodicHeapImp []*periodicJob + +func (h periodicHeapImp) Len() int { return len(h) } + +func (h periodicHeapImp) Less(i, j int) bool { + // Two zero times should return false. + // Otherwise, zero is "greater" than any other time. + // (To sort it at the end of the list.) + // Sort such that zero times are at the end of the list. + iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero() + if iZero && jZero { + return false + } else if iZero { + return false + } else if jZero { + return true + } + + return h[i].next.Before(h[j].next) +} + +func (h periodicHeapImp) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} + +func (h *periodicHeapImp) Push(x interface{}) { + n := len(*h) + job := x.(*periodicJob) + job.index = n + *h = append(*h, job) +} + +func (h *periodicHeapImp) Pop() interface{} { + old := *h + n := len(old) + job := old[n-1] + job.index = -1 // for safety + *h = old[0 : n-1] + return job +} + +// update modifies the priority and next time of an periodic job in the queue. +func (h *periodicHeapImp) update(job *periodicJob, next time.Time) { + job.next = next + heap.Fix(h, job.index) +} diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 539c42658..f199038f3 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -2,10 +2,20 @@ package nomad import ( "fmt" + "math/rand" + "reflect" + "strconv" + "strings" + "testing" + "time" + "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) +// MockPeriodic can be used by other tests that want to mock the periodic +// dispatcher. type MockPeriodic struct { Enabled bool Jobs map[string]*structs.Job @@ -36,3 +46,485 @@ func (m *MockPeriodic) Remove(jobID string) error { func (m *MockPeriodic) ForceRun(jobID string) error { return nil } + +func (m *MockPeriodic) Start() {} + +func (m *MockPeriodic) Flush() { + m.Jobs = make(map[string]*structs.Job) +} + +func (m *MockPeriodic) Tracked() []structs.Job { + tracked := make([]structs.Job, len(m.Jobs)) + i := 0 + for _, job := range m.Jobs { + tracked[i] = *job + i++ + } + return tracked +} + +func TestPeriodicDispatch_DisabledOperations(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Disable the dispatcher. + s1.periodicDispatcher.SetEnabled(false) + + job := mock.PeriodicJob() + if err := s1.periodicDispatcher.Add(job); err == nil { + t.Fatalf("Add on disabled dispatcher should fail") + } + + if err := s1.periodicDispatcher.Remove(job.ID); err == nil { + t.Fatalf("Remove on disabled dispatcher should fail") + } +} + +func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + job := mock.Job() + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err) + } + + tracked := s1.periodicDispatcher.Tracked() + if len(tracked) != 0 { + t.Fatalf("Add of non-periodic job should be no-op: %v", tracked) + } +} + +func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + job := mock.PeriodicJob() + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + tracked := s1.periodicDispatcher.Tracked() + if len(tracked) != 1 { + t.Fatalf("Add didn't track the job: %v", tracked) + } + + // Update the job and add it again. + job.Periodic.Spec = "foo" + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + tracked = s1.periodicDispatcher.Tracked() + if len(tracked) != 1 { + t.Fatalf("Add didn't update: %v", tracked) + } + + if !reflect.DeepEqual(*job, tracked[0]) { + t.Fatalf("Add didn't properly update: got %v; want %v", tracked[0], job) + } +} + +func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create a job that won't be evalauted for a while. + job := testPeriodicJob(time.Now().Add(10 * time.Second)) + + // Add it. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + // Update it to be sooner and re-add. + expected := time.Now().Add(1 * time.Second) + job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix()) + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + time.Sleep(2 * time.Second) + + // Check that an eval was created for the right time. + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + if len(evals) != 1 { + t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals) + } + + eval := evals[0] + expID := s1.periodicDispatcher.derivedJobID(job, expected) + if eval.JobID != expID { + t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v", + eval.JobID, expID) + } +} + +func TestPeriodicDispatch_Remove_Untracked(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + if err := s1.periodicDispatcher.Remove("foo"); err != nil { + t.Fatalf("Remove failed %v; expected a no-op", err) + } +} + +func TestPeriodicDispatch_Remove_Tracked(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + job := mock.PeriodicJob() + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + tracked := s1.periodicDispatcher.Tracked() + if len(tracked) != 1 { + t.Fatalf("Add didn't track the job: %v", tracked) + } + + if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + t.Fatalf("Remove failed %v", err) + } + + tracked = s1.periodicDispatcher.Tracked() + if len(tracked) != 0 { + t.Fatalf("Remove didn't untrack the job: %v", tracked) + } +} + +func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create a job that will be evaluated soon. + job := testPeriodicJob(time.Now().Add(1 * time.Second)) + + // Add it. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + // Remove the job. + if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + t.Fatalf("Add failed %v", err) + } + + time.Sleep(2 * time.Second) + + // Check that an eval wasn't created. + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + if len(evals) != 0 { + t.Fatalf("Remove didn't cancel creation of an eval: %#v", evals) + } +} + +func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + if err := s1.periodicDispatcher.ForceRun("foo"); err == nil { + t.Fatal("ForceRun of untracked job should fail") + } +} + +func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create a job that won't be evalauted for a while. + job := testPeriodicJob(time.Now().Add(10 * time.Second)) + + // Add it. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + // ForceRun the job + if err := s1.periodicDispatcher.ForceRun(job.ID); err != nil { + t.Fatalf("ForceRun failed %v", err) + } + + // Check that an eval was created for the right time. + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + if len(evals) != 1 { + t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals) + } +} + +func TestPeriodicDispatch_Run_Multiple(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create a job that will be launched twice. + launch1 := time.Now().Add(1 * time.Second) + launch2 := time.Now().Add(2 * time.Second) + job := testPeriodicJob(launch1, launch2) + + // Add it. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + + time.Sleep(3 * time.Second) + + // Check that the evals were created correctly + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + d := s1.periodicDispatcher + expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)} + for i, eval := range evals { + if eval.JobID != expected[i] { + t.Fatalf("eval created incorrectly; got %v; want %v", eval.JobID, expected[i]) + } + } +} + +func TestPeriodicDispatch_Run_SameTime(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create two job that will be launched at the same time. + launch := time.Now().Add(1 * time.Second) + job := testPeriodicJob(launch) + job2 := testPeriodicJob(launch) + + // Add them. + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + if err := s1.periodicDispatcher.Add(job2); err != nil { + t.Fatalf("Add failed %v", err) + } + + time.Sleep(2 * time.Second) + + // Check that the evals were created correctly + for _, job := range []*structs.Job{job, job2} { + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + if len(evals) != 1 { + t.Fatalf("expected 1 eval; got %#v", evals) + } + + d := s1.periodicDispatcher + expected := d.derivedJobID(job, launch) + if evals[0].JobID != expected { + t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].JobID, expected) + } + } +} + +// This test adds and removes a bunch of jobs, some launching at the same time, +// some after each other and some invalid times, and ensures the correct +// behavior. +func TestPeriodicDispatch_Complex(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Start the periodic dispatcher. + s1.periodicDispatcher.Start() + + // Create some jobs launching at different times. + now := time.Now() + same := now.Add(1 * time.Second) + launch1 := same.Add(1 * time.Second) + launch2 := same.Add(1500 * time.Millisecond) + launch3 := same.Add(2 * time.Second) + invalid := now.Add(-200 * time.Second) + + // Create two jobs launching at the same time. + job1 := testPeriodicJob(same) + job2 := testPeriodicJob(same) + + // Create a job that will never launch. + job3 := testPeriodicJob(invalid) + + // Create a job that launches twice. + job4 := testPeriodicJob(launch1, launch3) + + // Create a job that launches once. + job5 := testPeriodicJob(launch2) + + // Create 3 jobs we will delete. + job6 := testPeriodicJob(same) + job7 := testPeriodicJob(launch1, launch3) + job8 := testPeriodicJob(launch2) + + // Create a map of expected eval job ids. + d := s1.periodicDispatcher + expected := map[string][]string{ + job1.ID: []string{d.derivedJobID(job1, same)}, + job2.ID: []string{d.derivedJobID(job2, same)}, + job3.ID: nil, + job4.ID: []string{ + d.derivedJobID(job4, launch1), + d.derivedJobID(job4, launch3), + }, + job5.ID: []string{d.derivedJobID(job5, launch2)}, + job6.ID: nil, + job7.ID: nil, + job8.ID: nil, + } + + // Shuffle the jobs so they can be added randomly + jobs := []*structs.Job{job1, job2, job3, job4, job5, job6, job7, job8} + toDelete := []*structs.Job{job6, job7, job8} + shuffle(jobs) + shuffle(toDelete) + + for _, job := range jobs { + if err := s1.periodicDispatcher.Add(job); err != nil { + t.Fatalf("Add failed %v", err) + } + } + + for _, job := range toDelete { + if err := s1.periodicDispatcher.Remove(job.ID); err != nil { + t.Fatalf("Remove failed %v", err) + } + } + + time.Sleep(4 * time.Second) + actual := make(map[string][]string, len(expected)) + for _, job := range jobs { + evals, err := s1.periodicDispatcher.CreatedEvals(job.ID) + if err != nil { + t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err) + } + + var jobs []string + for _, eval := range evals { + jobs = append(jobs, eval.JobID) + } + actual[job.ID] = jobs + } + + if !reflect.DeepEqual(actual, expected) { + t.Fatalf("Unexpected evals; got %#v; want %#v", actual, expected) + } +} + +func shuffle(jobs []*structs.Job) { + rand.Seed(time.Now().Unix()) + for i := range jobs { + j := rand.Intn(len(jobs)) + jobs[i], jobs[j] = jobs[j], jobs[i] + } +} + +func testPeriodicJob(times ...time.Time) *structs.Job { + job := mock.PeriodicJob() + job.Periodic.SpecType = structs.PeriodicSpecTest + + l := make([]string, len(times)) + for i, t := range times { + l[i] = strconv.Itoa(int(t.Unix())) + } + + job.Periodic.Spec = strings.Join(l, ",") + return job +} + +// TODO: Check that it doesn't create evals for overlapping things. + +func TestPeriodicHeap_Order(t *testing.T) { + h := NewPeriodicHeap() + j1 := mock.PeriodicJob() + j2 := mock.PeriodicJob() + j3 := mock.PeriodicJob() + + lookup := map[*structs.Job]string{ + j1: "j1", + j2: "j2", + j3: "j3", + } + + h.Push(j1, time.Time{}) + h.Push(j2, time.Unix(10, 0)) + h.Push(j3, time.Unix(11, 0)) + + exp := []string{"j2", "j3", "j1"} + var act []string + for i := 0; i < 3; i++ { + pJob, err := h.Pop() + if err != nil { + t.Fatal(err) + } + + act = append(act, lookup[pJob.job]) + } + + if !reflect.DeepEqual(act, exp) { + t.Fatalf("Wrong ordering; got %v; want %v", act, exp) + } +} diff --git a/nomad/server.go b/nomad/server.go index 826aa8458..3a1c25d83 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -113,8 +113,8 @@ type Server struct { // plans that are waiting to be assessed by the leader planQueue *PlanQueue - // periodicRunner is used to track and create evaluations for periodic jobs. - periodicRunner PeriodicRunner + // periodicDispatcher is used to track and create evaluations for periodic jobs. + periodicDispatcher *PeriodicDispatch // heartbeatTimers track the expiration time of each heartbeat that has // a TTL. On expiration, the node status is updated to be 'down'. @@ -184,6 +184,9 @@ func NewServer(config *Config) (*Server, error) { shutdownCh: make(chan struct{}), } + // Create the periodic dispatcher for launching periodic jobs. + s.periodicDispatcher = NewPeriodicDispatch(s) + // Initialize the RPC layer // TODO: TLS... if err := s.setupRPC(nil); err != nil { @@ -409,7 +412,7 @@ func (s *Server) setupRaft() error { // Create the FSM var err error - s.fsm, err = NewFSM(s.evalBroker, s.periodicRunner, s.config.LogOutput) + s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput) if err != nil { return err } diff --git a/nomad/state/schema.go b/nomad/state/schema.go index 961cb67a7..f0ee0fd75 100644 --- a/nomad/state/schema.go +++ b/nomad/state/schema.go @@ -109,6 +109,15 @@ func jobTableSchema() *memdb.TableSchema { Conditional: jobIsGCable, }, }, + "parent": &memdb.IndexSchema{ + Name: "parent", + AllowMissing: true, + Unique: false, + Indexer: &memdb.StringFieldIndex{ + Field: "ParentID", + Lowercase: true, + }, + }, }, } } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7eef401ec..25c5151f6 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -359,6 +359,18 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) { return iter, nil } +// ChildJobs returns an iterator over all the children of the passed job. +func (s *StateStore) ChildJobs(id string) (memdb.ResultIterator, error) { + txn := s.db.Txn(false) + + // Scan all jobs whose parent is the passed id. + iter, err := txn.Get("jobs", "parent", id) + if err != nil { + return nil, err + } + return iter, nil +} + // JobsByScheduler returns an iterator over all the jobs with the specific // scheduler type. func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 0609f3048..23331ee57 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -404,6 +404,48 @@ func TestStateStore_Jobs(t *testing.T) { } } +func TestStateStore_ChildJobs(t *testing.T) { + state := testStateStore(t) + parent := mock.Job() + var childJobs []*structs.Job + + if err := state.UpsertJob(999, parent); err != nil { + t.Fatalf("err: %v", err) + } + + for i := 0; i < 10; i++ { + job := mock.Job() + job.ParentID = parent.ID + childJobs = append(childJobs, job) + + err := state.UpsertJob(1000+uint64(i), job) + if err != nil { + t.Fatalf("err: %v", err) + } + } + + iter, err := state.ChildJobs(parent.ID) + if err != nil { + t.Fatalf("err: %v", err) + } + + var out []*structs.Job + for { + raw := iter.Next() + if raw == nil { + break + } + out = append(out, raw.(*structs.Job)) + } + + sort.Sort(JobIDSort(childJobs)) + sort.Sort(JobIDSort(out)) + + if !reflect.DeepEqual(childJobs, out) { + t.Fatalf("bad: %#v %#v", childJobs, out) + } +} + func TestStateStore_JobsByScheduler(t *testing.T) { state := testStateStore(t) var serviceJobs []*structs.Job diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index bed272309..db1fd8c79 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -8,6 +8,7 @@ import ( "io" "reflect" "regexp" + "strconv" "strings" "time" @@ -16,6 +17,7 @@ import ( "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/helper/args" + "github.com/mitchellh/copystructure" ) var ( @@ -719,6 +721,9 @@ type Job struct { // specified hierarchically like LineOfBiz/OrgName/Team/Project ID string + // ParentID is the unique identifier of the job that spawned this job. + ParentID string + // Name is the logical name of the job used to refer to it. This is unique // per region, but not unique globally. Name string @@ -787,6 +792,17 @@ func (j *Job) InitFields() { } } +// Copy returns a deep copy of the Job. It is expected that callers use recover. +// This job can panic if the deep copy failed as it uses reflection. +func (j *Job) Copy() *Job { + i, err := copystructure.Copy(j) + if err != nil { + panic(err) + } + + return i.(*Job) +} + // Validate is used to sanity check a job input func (j *Job) Validate() error { var mErr multierror.Error @@ -914,6 +930,10 @@ func (u *UpdateStrategy) Rolling() bool { const ( // PeriodicSpecCron is used for a cron spec. PeriodicSpecCron = "cron" + + // PeriodicSpecTest is only used by unit tests. It is a sorted, comma + // seperated list of unix timestamps at which to launch. + PeriodicSpecTest = "test" ) // Periodic defines the interval a job should be run at. @@ -944,6 +964,8 @@ func (p *PeriodicConfig) Validate() error { if _, err := cronexpr.Parse(p.Spec); err != nil { return fmt.Errorf("Invalid cron spec %q: %v", p.Spec, err) } + case PeriodicSpecTest: + // No-op default: return fmt.Errorf("Unknown specification type %q", p.SpecType) } @@ -961,6 +983,29 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time { if e, err := cronexpr.Parse(p.Spec); err == nil { return e.Next(fromTime) } + case PeriodicSpecTest: + split := strings.Split(p.Spec, ",") + if len(split) == 1 && split[0] == "" { + return time.Time{} + } + + // Parse the times + times := make([]time.Time, len(split)) + for i, s := range split { + unix, err := strconv.Atoi(s) + if err != nil { + return time.Time{} + } + + times[i] = time.Unix(int64(unix), 0) + } + + // Find the next match + for _, next := range times { + if fromTime.Before(next) { + return next + } + } } return time.Time{} diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 2b74c3f1a..22e5dfea9 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -93,6 +93,80 @@ func TestJob_Validate(t *testing.T) { } } +func TestJob_Copy(t *testing.T) { + j := &Job{ + Region: "global", + ID: GenerateUUID(), + Name: "my-job", + Type: JobTypeService, + Priority: 50, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*Constraint{ + &Constraint{ + LTarget: "$attr.kernel.name", + RTarget: "linux", + Operand: "=", + }, + }, + Periodic: &PeriodicConfig{ + Enabled: false, + }, + TaskGroups: []*TaskGroup{ + &TaskGroup{ + Name: "web", + Count: 10, + RestartPolicy: &RestartPolicy{ + Attempts: 3, + Interval: 10 * time.Minute, + Delay: 1 * time.Minute, + }, + Tasks: []*Task{ + &Task{ + Name: "web", + Driver: "exec", + Config: map[string]interface{}{ + "command": "/bin/date", + }, + Env: map[string]string{ + "FOO": "bar", + }, + Services: []*Service{ + { + Name: "${TASK}-frontend", + PortLabel: "http", + }, + }, + Resources: &Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*NetworkResource{ + &NetworkResource{ + MBits: 50, + DynamicPorts: []Port{{Label: "http"}}, + }, + }, + }, + }, + }, + Meta: map[string]string{ + "elb_check_type": "http", + "elb_check_interval": "30s", + "elb_check_min": "3", + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + } + + c := j.Copy() + if !reflect.DeepEqual(j, c) { + t.Fatalf("Copy() returned an unequal Job; got %v; want %v", c, j) + } +} + func TestJob_IsPeriodic(t *testing.T) { j := &Job{ Type: JobTypeService,