This commit is contained in:
Alex Dadgar
2015-12-18 12:26:28 -08:00
parent 7f7aad575d
commit a8ee688ba6
10 changed files with 1177 additions and 5 deletions

View File

@@ -117,6 +117,14 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}
// Enable the periodic dispatcher,since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
return err
}
// Scheduler periodic jobs
go s.schedulePeriodic(stopCh)
@@ -167,6 +175,11 @@ func (s *Server) restoreEvalBroker() error {
return nil
}
func (s *Server) restorePeriodicDispatcher() error {
// TODO(alex)
return nil
}
// schedulePeriodic is used to do periodic job dispatch while we are leader
func (s *Server) schedulePeriodic(stopCh chan struct{}) {
evalGC := time.NewTicker(s.config.EvalGCInterval)
@@ -250,6 +263,9 @@ func (s *Server) revokeLeadership() error {
// Disable the eval broker, since it is only useful as a leader
s.evalBroker.SetEnabled(false)
// Disable the periodic dispatcher, since it is only useful as a leader
s.periodicDispatcher.SetEnabled(false)
// Clear the heartbeat timers on either shutdown or step down,
// since we are no longer responsible for TTL expirations.
if err := s.clearAllHeartbeatTimers(); err != nil {

View File

@@ -190,6 +190,16 @@ func SystemJob() *structs.Job {
return job
}
func PeriodicJob() *structs.Job {
job := Job()
job.Periodic = &structs.PeriodicConfig{
Enabled: true,
SpecType: structs.PeriodicSpecCron,
Spec: "*/30 * * *",
}
return job
}
func Eval() *structs.Evaluation {
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),

View File

@@ -1,27 +1,496 @@
package nomad
import "github.com/hashicorp/nomad/nomad/structs"
import (
"container/heap"
"errors"
"fmt"
"log"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
// PeriodicRunner is the interface for tracking and launching periodic jobs at
// their periodic spec.
type PeriodicRunner interface {
Start()
SetEnabled(enabled bool)
Add(job *structs.Job) error
Remove(jobID string) error
ForceRun(jobID string) error
Tracked() []structs.Job
Flush()
}
type PeriodicDispatch struct{}
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
// set of periodic jobs and creates derived jobs and evaluations per
// instantiation which is determined by the periodic spec.
type PeriodicDispatch struct {
srv *Server
enabled bool
running bool
tracked map[string]*structs.Job
heap *periodicHeap
updateCh chan struct{}
stopCh chan struct{}
logger *log.Logger
l sync.RWMutex
}
// NewPeriodicDispatch returns a periodic dispatcher that is used to track and
// launch periodic jobs.
func NewPeriodicDispatch(srv *Server) *PeriodicDispatch {
return &PeriodicDispatch{
srv: srv,
tracked: make(map[string]*structs.Job),
heap: NewPeriodicHeap(),
updateCh: make(chan struct{}, 1),
stopCh: make(chan struct{}, 1),
logger: srv.logger,
}
}
// SetEnabled is used to control if the periodic dispatcher is enabled. It
// should only be enabled on the active leader. Disabling an active dispatcher
// will stop any launched go routine and flush the dispatcher.
func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.l.Lock()
p.enabled = enabled
p.l.Unlock()
if !enabled {
p.stopCh <- struct{}{}
p.Flush()
}
}
// Start begins the goroutine that creates derived jobs and evals.
func (p *PeriodicDispatch) Start() {
p.l.Lock()
p.running = true
p.l.Unlock()
go p.run()
}
// Tracked returns the set of tracked job IDs.
func (p *PeriodicDispatch) Tracked() []structs.Job {
p.l.RLock()
defer p.l.RUnlock()
tracked := make([]structs.Job, len(p.tracked))
i := 0
for _, job := range p.tracked {
tracked[i] = *job
i++
}
return tracked
}
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec.
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
}
// If we were tracking a job and it has been disabled or made non-periodic remove it.
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
_, tracked := p.tracked[job.ID]
if tracked && disabled {
return p.Remove(job.ID)
}
// If the job is diabled and we aren't tracking it, do nothing.
if disabled {
return nil
}
// Add or update the job.
p.tracked[job.ID] = job
next := job.Periodic.Next(time.Now())
if tracked {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err)
}
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v", job.ID, err)
}
}
// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
}
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a
// no-op.
func (p *PeriodicDispatch) Remove(jobID string) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
}
if job, tracked := p.tracked[jobID]; tracked {
delete(p.tracked, jobID)
if err := p.heap.Remove(job); err != nil {
return fmt.Errorf("failed to remove tracked job %v: %v", jobID, err)
}
}
// Signal an update.
if p.running {
select {
case p.updateCh <- struct{}{}:
default:
}
}
return nil
}
// ForceRun causes the periodic job to be evaluated immediately.
func (p *PeriodicDispatch) ForceRun(jobID string) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
}
job, tracked := p.tracked[jobID]
if !tracked {
return fmt.Errorf("can't force run non-tracked job %v", jobID)
}
return p.createEval(job, time.Now())
}
// run is a long-lived function that waits til a job's periodic spec is met and
// then creates an evaluation to run the job.
func (p *PeriodicDispatch) run() {
// Do nothing if not enabled.
p.l.RLock()
if !p.enabled {
p.l.RUnlock()
return
}
p.l.RUnlock()
now := time.Now().Local()
PICK:
// If there is nothing wait for an update.
p.l.RLock()
if p.heap.Length() == 0 {
p.l.RUnlock()
<-p.updateCh
p.l.RLock()
}
nextJob, err := p.heap.Peek()
p.l.RUnlock()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err)
return
}
launchTime := nextJob.next
// If there are only invalid times, wait for an update.
if launchTime.IsZero() {
<-p.updateCh
goto PICK
}
select {
case <-p.stopCh:
return
case <-p.updateCh:
goto PICK
case <-time.After(nextJob.next.Sub(now)):
// Get the current time so that we don't miss any jobs will we are
// creating evals.
nowUpdate := time.Now()
// Create evals for all the jobs with the same launch time.
p.l.Lock()
for {
if p.heap.Length() == 0 {
break
}
j, err := p.heap.Peek()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err)
break
}
if j.next != launchTime {
break
}
if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to update next launch of periodic job: %v", j.job.ID, err)
}
// TODO(alex): Want to be able to check that there isn't a previously
// running cron job for this job.
go p.createEval(j.job, launchTime)
}
p.l.Unlock()
now = nowUpdate
}
goto PICK
}
// createEval instantiates a job based on the passed periodic job and submits an
// evaluation for it.
func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time) error {
derived, err := p.deriveJob(periodicJob, time)
if err != nil {
return err
}
// Commit this update via Raft
req := structs.JobRegisterRequest{Job: derived}
_, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: Register failed: %v", err)
return err
}
// Create a new evaluation
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: derived.Priority,
Type: derived.Type,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: derived.ID,
JobModifyIndex: index,
Status: structs.EvalStatusPending,
}
update := &structs.EvalUpdateRequest{
Evals: []*structs.Evaluation{eval},
}
// Commit this evaluation via Raft
// XXX: There is a risk of partial failure where the JobRegister succeeds
// but that the EvalUpdate does not.
_, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: Eval create failed: %v", err)
return err
}
return nil
}
// deriveJob instantiates a new job based on the passed periodic job and the
// launch time.
// TODO: these jobs need to be marked as GC'able
func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived *structs.Job, err error) {
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: deriving job from"+
" periodic job %v failed; deregistering from periodic runner: %v",
periodicJob.ID, r)
p.Remove(periodicJob.ID)
derived = nil
err = fmt.Errorf("Failed to create a copy of the periodic job %v: %v", periodicJob.ID, r)
}
}()
// Create a copy of the periodic job, give it a derived ID/Name and make it
// non-periodic.
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = periodicJob.ID
derived.Periodic = nil
return
}
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s-%d", periodicJob.ID, time.Unix())
}
// CreatedEvals returns the set of evaluations created from the passed periodic
// job.
func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) ([]*structs.Evaluation, error) {
state := p.srv.fsm.State()
iter, err := state.ChildJobs(periodicJobID)
if err != nil {
return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err)
}
var evals []*structs.Evaluation
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
childEvals, err := state.EvalsByJob(job.ID)
if err != nil {
fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err)
}
evals = append(evals, childEvals...)
}
return evals, nil
}
// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()
defer p.l.Unlock()
p.stopCh = make(chan struct{}, 1)
p.updateCh = make(chan struct{}, 1)
p.tracked = make(map[string]*structs.Job)
p.heap = NewPeriodicHeap()
}
// TODO
type periodicHeap struct {
index map[string]*periodicJob
heap periodicHeapImp
}
type periodicJob struct {
job *structs.Job
next time.Time
index int
}
func NewPeriodicHeap() *periodicHeap {
return &periodicHeap{
index: make(map[string]*periodicJob),
heap: make(periodicHeapImp, 0),
}
}
func (p *periodicHeap) Push(job *structs.Job, next time.Time) error {
if _, ok := p.index[job.ID]; ok {
return fmt.Errorf("job %v already exists", job.ID)
}
pJob := &periodicJob{job, next, 0}
p.index[job.ID] = pJob
heap.Push(&p.heap, pJob)
return nil
}
func (p *periodicHeap) Pop() (*periodicJob, error) {
if len(p.heap) == 0 {
return nil, errors.New("heap is empty")
}
pJob := heap.Pop(&p.heap).(*periodicJob)
delete(p.index, pJob.job.ID)
return pJob, nil
}
func (p *periodicHeap) Peek() (periodicJob, error) {
if len(p.heap) == 0 {
return periodicJob{}, errors.New("heap is empty")
}
return *(p.heap[0]), nil
}
func (p *periodicHeap) Contains(job *structs.Job) bool {
_, ok := p.index[job.ID]
return ok
}
func (p *periodicHeap) Update(job *structs.Job, next time.Time) error {
if job, ok := p.index[job.ID]; ok {
p.heap.update(job, next)
return nil
}
return fmt.Errorf("heap doesn't contain job %v", job.ID)
}
func (p *periodicHeap) Remove(job *structs.Job) error {
if pJob, ok := p.index[job.ID]; ok {
heap.Remove(&p.heap, pJob.index)
delete(p.index, job.ID)
return nil
}
return fmt.Errorf("heap doesn't contain job %v", job.ID)
}
func (p *periodicHeap) Length() int {
return len(p.heap)
}
type periodicHeapImp []*periodicJob
func (h periodicHeapImp) Len() int { return len(h) }
func (h periodicHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h periodicHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *periodicHeapImp) Push(x interface{}) {
n := len(*h)
job := x.(*periodicJob)
job.index = n
*h = append(*h, job)
}
func (h *periodicHeapImp) Pop() interface{} {
old := *h
n := len(old)
job := old[n-1]
job.index = -1 // for safety
*h = old[0 : n-1]
return job
}
// update modifies the priority and next time of an periodic job in the queue.
func (h *periodicHeapImp) update(job *periodicJob, next time.Time) {
job.next = next
heap.Fix(h, job.index)
}

View File

@@ -2,10 +2,20 @@ package nomad
import (
"fmt"
"math/rand"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
// MockPeriodic can be used by other tests that want to mock the periodic
// dispatcher.
type MockPeriodic struct {
Enabled bool
Jobs map[string]*structs.Job
@@ -36,3 +46,485 @@ func (m *MockPeriodic) Remove(jobID string) error {
func (m *MockPeriodic) ForceRun(jobID string) error {
return nil
}
func (m *MockPeriodic) Start() {}
func (m *MockPeriodic) Flush() {
m.Jobs = make(map[string]*structs.Job)
}
func (m *MockPeriodic) Tracked() []structs.Job {
tracked := make([]structs.Job, len(m.Jobs))
i := 0
for _, job := range m.Jobs {
tracked[i] = *job
i++
}
return tracked
}
func TestPeriodicDispatch_DisabledOperations(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Disable the dispatcher.
s1.periodicDispatcher.SetEnabled(false)
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err == nil {
t.Fatalf("Add on disabled dispatcher should fail")
}
if err := s1.periodicDispatcher.Remove(job.ID); err == nil {
t.Fatalf("Remove on disabled dispatcher should fail")
}
}
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
job := mock.Job()
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
}
tracked := s1.periodicDispatcher.Tracked()
if len(tracked) != 0 {
t.Fatalf("Add of non-periodic job should be no-op: %v", tracked)
}
}
func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := s1.periodicDispatcher.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
// Update the job and add it again.
job.Periodic.Spec = "foo"
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = s1.periodicDispatcher.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't update: %v", tracked)
}
if !reflect.DeepEqual(*job, tracked[0]) {
t.Fatalf("Add didn't properly update: got %v; want %v", tracked[0], job)
}
}
func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that an eval was created for the right time.
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
if len(evals) != 1 {
t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals)
}
eval := evals[0]
expID := s1.periodicDispatcher.derivedJobID(job, expected)
if eval.JobID != expID {
t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v",
eval.JobID, expID)
}
}
func TestPeriodicDispatch_Remove_Untracked(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
if err := s1.periodicDispatcher.Remove("foo"); err != nil {
t.Fatalf("Remove failed %v; expected a no-op", err)
}
}
func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := s1.periodicDispatcher.Tracked()
if len(tracked) != 1 {
t.Fatalf("Add didn't track the job: %v", tracked)
}
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
tracked = s1.periodicDispatcher.Tracked()
if len(tracked) != 0 {
t.Fatalf("Remove didn't untrack the job: %v", tracked)
}
}
func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that will be evaluated soon.
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that an eval wasn't created.
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
if len(evals) != 0 {
t.Fatalf("Remove didn't cancel creation of an eval: %#v", evals)
}
}
func TestPeriodicDispatch_ForceRun_Untracked(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
if err := s1.periodicDispatcher.ForceRun("foo"); err == nil {
t.Fatal("ForceRun of untracked job should fail")
}
}
func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that won't be evalauted for a while.
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
if err := s1.periodicDispatcher.ForceRun(job.ID); err != nil {
t.Fatalf("ForceRun failed %v", err)
}
// Check that an eval was created for the right time.
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
if len(evals) != 1 {
t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals)
}
}
func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create a job that will be launched twice.
launch1 := time.Now().Add(1 * time.Second)
launch2 := time.Now().Add(2 * time.Second)
job := testPeriodicJob(launch1, launch2)
// Add it.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
// Check that the evals were created correctly
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
d := s1.periodicDispatcher
expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)}
for i, eval := range evals {
if eval.JobID != expected[i] {
t.Fatalf("eval created incorrectly; got %v; want %v", eval.JobID, expected[i])
}
}
}
func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create two job that will be launched at the same time.
launch := time.Now().Add(1 * time.Second)
job := testPeriodicJob(launch)
job2 := testPeriodicJob(launch)
// Add them.
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if err := s1.periodicDispatcher.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(2 * time.Second)
// Check that the evals were created correctly
for _, job := range []*structs.Job{job, job2} {
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
if len(evals) != 1 {
t.Fatalf("expected 1 eval; got %#v", evals)
}
d := s1.periodicDispatcher
expected := d.derivedJobID(job, launch)
if evals[0].JobID != expected {
t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].JobID, expected)
}
}
}
// This test adds and removes a bunch of jobs, some launching at the same time,
// some after each other and some invalid times, and ensures the correct
// behavior.
func TestPeriodicDispatch_Complex(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Start the periodic dispatcher.
s1.periodicDispatcher.Start()
// Create some jobs launching at different times.
now := time.Now()
same := now.Add(1 * time.Second)
launch1 := same.Add(1 * time.Second)
launch2 := same.Add(1500 * time.Millisecond)
launch3 := same.Add(2 * time.Second)
invalid := now.Add(-200 * time.Second)
// Create two jobs launching at the same time.
job1 := testPeriodicJob(same)
job2 := testPeriodicJob(same)
// Create a job that will never launch.
job3 := testPeriodicJob(invalid)
// Create a job that launches twice.
job4 := testPeriodicJob(launch1, launch3)
// Create a job that launches once.
job5 := testPeriodicJob(launch2)
// Create 3 jobs we will delete.
job6 := testPeriodicJob(same)
job7 := testPeriodicJob(launch1, launch3)
job8 := testPeriodicJob(launch2)
// Create a map of expected eval job ids.
d := s1.periodicDispatcher
expected := map[string][]string{
job1.ID: []string{d.derivedJobID(job1, same)},
job2.ID: []string{d.derivedJobID(job2, same)},
job3.ID: nil,
job4.ID: []string{
d.derivedJobID(job4, launch1),
d.derivedJobID(job4, launch3),
},
job5.ID: []string{d.derivedJobID(job5, launch2)},
job6.ID: nil,
job7.ID: nil,
job8.ID: nil,
}
// Shuffle the jobs so they can be added randomly
jobs := []*structs.Job{job1, job2, job3, job4, job5, job6, job7, job8}
toDelete := []*structs.Job{job6, job7, job8}
shuffle(jobs)
shuffle(toDelete)
for _, job := range jobs {
if err := s1.periodicDispatcher.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}
for _, job := range toDelete {
if err := s1.periodicDispatcher.Remove(job.ID); err != nil {
t.Fatalf("Remove failed %v", err)
}
}
time.Sleep(4 * time.Second)
actual := make(map[string][]string, len(expected))
for _, job := range jobs {
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed %v", job.ID, err)
}
var jobs []string
for _, eval := range evals {
jobs = append(jobs, eval.JobID)
}
actual[job.ID] = jobs
}
if !reflect.DeepEqual(actual, expected) {
t.Fatalf("Unexpected evals; got %#v; want %#v", actual, expected)
}
}
func shuffle(jobs []*structs.Job) {
rand.Seed(time.Now().Unix())
for i := range jobs {
j := rand.Intn(len(jobs))
jobs[i], jobs[j] = jobs[j], jobs[i]
}
}
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Unix()))
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
// TODO: Check that it doesn't create evals for overlapping things.
func TestPeriodicHeap_Order(t *testing.T) {
h := NewPeriodicHeap()
j1 := mock.PeriodicJob()
j2 := mock.PeriodicJob()
j3 := mock.PeriodicJob()
lookup := map[*structs.Job]string{
j1: "j1",
j2: "j2",
j3: "j3",
}
h.Push(j1, time.Time{})
h.Push(j2, time.Unix(10, 0))
h.Push(j3, time.Unix(11, 0))
exp := []string{"j2", "j3", "j1"}
var act []string
for i := 0; i < 3; i++ {
pJob, err := h.Pop()
if err != nil {
t.Fatal(err)
}
act = append(act, lookup[pJob.job])
}
if !reflect.DeepEqual(act, exp) {
t.Fatalf("Wrong ordering; got %v; want %v", act, exp)
}
}

View File

@@ -113,8 +113,8 @@ type Server struct {
// plans that are waiting to be assessed by the leader
planQueue *PlanQueue
// periodicRunner is used to track and create evaluations for periodic jobs.
periodicRunner PeriodicRunner
// periodicDispatcher is used to track and create evaluations for periodic jobs.
periodicDispatcher *PeriodicDispatch
// heartbeatTimers track the expiration time of each heartbeat that has
// a TTL. On expiration, the node status is updated to be 'down'.
@@ -184,6 +184,9 @@ func NewServer(config *Config) (*Server, error) {
shutdownCh: make(chan struct{}),
}
// Create the periodic dispatcher for launching periodic jobs.
s.periodicDispatcher = NewPeriodicDispatch(s)
// Initialize the RPC layer
// TODO: TLS...
if err := s.setupRPC(nil); err != nil {
@@ -409,7 +412,7 @@ func (s *Server) setupRaft() error {
// Create the FSM
var err error
s.fsm, err = NewFSM(s.evalBroker, s.periodicRunner, s.config.LogOutput)
s.fsm, err = NewFSM(s.evalBroker, s.periodicDispatcher, s.config.LogOutput)
if err != nil {
return err
}

View File

@@ -109,6 +109,15 @@ func jobTableSchema() *memdb.TableSchema {
Conditional: jobIsGCable,
},
},
"parent": &memdb.IndexSchema{
Name: "parent",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "ParentID",
Lowercase: true,
},
},
},
}
}

View File

@@ -359,6 +359,18 @@ func (s *StateStore) Jobs() (memdb.ResultIterator, error) {
return iter, nil
}
// ChildJobs returns an iterator over all the children of the passed job.
func (s *StateStore) ChildJobs(id string) (memdb.ResultIterator, error) {
txn := s.db.Txn(false)
// Scan all jobs whose parent is the passed id.
iter, err := txn.Get("jobs", "parent", id)
if err != nil {
return nil, err
}
return iter, nil
}
// JobsByScheduler returns an iterator over all the jobs with the specific
// scheduler type.
func (s *StateStore) JobsByScheduler(schedulerType string) (memdb.ResultIterator, error) {

View File

@@ -404,6 +404,48 @@ func TestStateStore_Jobs(t *testing.T) {
}
}
func TestStateStore_ChildJobs(t *testing.T) {
state := testStateStore(t)
parent := mock.Job()
var childJobs []*structs.Job
if err := state.UpsertJob(999, parent); err != nil {
t.Fatalf("err: %v", err)
}
for i := 0; i < 10; i++ {
job := mock.Job()
job.ParentID = parent.ID
childJobs = append(childJobs, job)
err := state.UpsertJob(1000+uint64(i), job)
if err != nil {
t.Fatalf("err: %v", err)
}
}
iter, err := state.ChildJobs(parent.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
var out []*structs.Job
for {
raw := iter.Next()
if raw == nil {
break
}
out = append(out, raw.(*structs.Job))
}
sort.Sort(JobIDSort(childJobs))
sort.Sort(JobIDSort(out))
if !reflect.DeepEqual(childJobs, out) {
t.Fatalf("bad: %#v %#v", childJobs, out)
}
}
func TestStateStore_JobsByScheduler(t *testing.T) {
state := testStateStore(t)
var serviceJobs []*structs.Job

View File

@@ -8,6 +8,7 @@ import (
"io"
"reflect"
"regexp"
"strconv"
"strings"
"time"
@@ -16,6 +17,7 @@ import (
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/helper/args"
"github.com/mitchellh/copystructure"
)
var (
@@ -719,6 +721,9 @@ type Job struct {
// specified hierarchically like LineOfBiz/OrgName/Team/Project
ID string
// ParentID is the unique identifier of the job that spawned this job.
ParentID string
// Name is the logical name of the job used to refer to it. This is unique
// per region, but not unique globally.
Name string
@@ -787,6 +792,17 @@ func (j *Job) InitFields() {
}
}
// Copy returns a deep copy of the Job. It is expected that callers use recover.
// This job can panic if the deep copy failed as it uses reflection.
func (j *Job) Copy() *Job {
i, err := copystructure.Copy(j)
if err != nil {
panic(err)
}
return i.(*Job)
}
// Validate is used to sanity check a job input
func (j *Job) Validate() error {
var mErr multierror.Error
@@ -914,6 +930,10 @@ func (u *UpdateStrategy) Rolling() bool {
const (
// PeriodicSpecCron is used for a cron spec.
PeriodicSpecCron = "cron"
// PeriodicSpecTest is only used by unit tests. It is a sorted, comma
// seperated list of unix timestamps at which to launch.
PeriodicSpecTest = "test"
)
// Periodic defines the interval a job should be run at.
@@ -944,6 +964,8 @@ func (p *PeriodicConfig) Validate() error {
if _, err := cronexpr.Parse(p.Spec); err != nil {
return fmt.Errorf("Invalid cron spec %q: %v", p.Spec, err)
}
case PeriodicSpecTest:
// No-op
default:
return fmt.Errorf("Unknown specification type %q", p.SpecType)
}
@@ -961,6 +983,29 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
if e, err := cronexpr.Parse(p.Spec); err == nil {
return e.Next(fromTime)
}
case PeriodicSpecTest:
split := strings.Split(p.Spec, ",")
if len(split) == 1 && split[0] == "" {
return time.Time{}
}
// Parse the times
times := make([]time.Time, len(split))
for i, s := range split {
unix, err := strconv.Atoi(s)
if err != nil {
return time.Time{}
}
times[i] = time.Unix(int64(unix), 0)
}
// Find the next match
for _, next := range times {
if fromTime.Before(next) {
return next
}
}
}
return time.Time{}

View File

@@ -93,6 +93,80 @@ func TestJob_Validate(t *testing.T) {
}
}
func TestJob_Copy(t *testing.T) {
j := &Job{
Region: "global",
ID: GenerateUUID(),
Name: "my-job",
Type: JobTypeService,
Priority: 50,
AllAtOnce: false,
Datacenters: []string{"dc1"},
Constraints: []*Constraint{
&Constraint{
LTarget: "$attr.kernel.name",
RTarget: "linux",
Operand: "=",
},
},
Periodic: &PeriodicConfig{
Enabled: false,
},
TaskGroups: []*TaskGroup{
&TaskGroup{
Name: "web",
Count: 10,
RestartPolicy: &RestartPolicy{
Attempts: 3,
Interval: 10 * time.Minute,
Delay: 1 * time.Minute,
},
Tasks: []*Task{
&Task{
Name: "web",
Driver: "exec",
Config: map[string]interface{}{
"command": "/bin/date",
},
Env: map[string]string{
"FOO": "bar",
},
Services: []*Service{
{
Name: "${TASK}-frontend",
PortLabel: "http",
},
},
Resources: &Resources{
CPU: 500,
MemoryMB: 256,
Networks: []*NetworkResource{
&NetworkResource{
MBits: 50,
DynamicPorts: []Port{{Label: "http"}},
},
},
},
},
},
Meta: map[string]string{
"elb_check_type": "http",
"elb_check_interval": "30s",
"elb_check_min": "3",
},
},
},
Meta: map[string]string{
"owner": "armon",
},
}
c := j.Copy()
if !reflect.DeepEqual(j, c) {
t.Fatalf("Copy() returned an unequal Job; got %v; want %v", c, j)
}
}
func TestJob_IsPeriodic(t *testing.T) {
j := &Job{
Type: JobTypeService,