This commit is contained in:
Alex Dadgar
2015-12-18 17:51:30 -08:00
parent c8f2d0c990
commit fc91fa6750
12 changed files with 336 additions and 72 deletions

View File

@@ -698,8 +698,8 @@ func parsePeriodic(result **structs.PeriodicConfig, list *ast.ObjectList) error
m["Enabled"] = enabled
}
// If "cron_spec" is provided, set the type to "cron" and store the spec.
if cron, ok := m["cron_spec"]; ok {
// If "cron" is provided, set the type to "cron" and store the spec.
if cron, ok := m["cron"]; ok {
m["SpecType"] = structs.PeriodicSpecCron
m["Spec"] = cron
}

View File

@@ -1,5 +1,5 @@
job "foo" {
periodic {
cron_spec = "*/5 * * *"
cron = "*/5 * * *"
}
}

View File

@@ -207,11 +207,46 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
return err
}
// If it is periodic, insert it into the periodic runner and record the
// time it was inserted.
if req.Job.IsPeriodic() {
if err := n.periodicRunner.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Add failed: %v", err)
return err
}
// Record the insertion time as a launch.
launch := &structs.PeriodicLaunch{req.Job.ID, time.Now()}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err
}
}
// Check if the parent job is periodic and mark the launch time.
parentID := req.Job.ParentID
if parentID != "" {
parent, err := n.state.JobByID(parentID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID(%v) lookup for parent failed: %v", parentID, err)
return err
} else if parent == nil {
// The parent has been deregistered.
return nil
}
if parent.IsPeriodic() {
t, err := n.periodicRunner.LaunchTime(req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
return err
}
launch := &structs.PeriodicLaunch{parentID, t}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err
}
}
}
return nil
@@ -224,14 +259,27 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}
job, err := n.state.JobByID(req.JobID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
if err := n.state.DeleteJob(index, req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}
if err := n.periodicRunner.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err)
return err
if job.IsPeriodic() {
if err := n.periodicRunner.Remove(req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicRunner.Remove failed: %v", err)
return err
}
if err := n.state.DeletePeriodicLaunch(index, req.JobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeletePeriodicLaunch failed: %v", err)
return err
}
}
return nil

View File

@@ -222,9 +222,7 @@ func TestFSM_UpdateNodeDrain(t *testing.T) {
func TestFSM_RegisterJob(t *testing.T) {
fsm := testFSM(t)
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
}
@@ -254,14 +252,24 @@ func TestFSM_RegisterJob(t *testing.T) {
if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; !ok {
t.Fatal("job not added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_DeregisterJob(t *testing.T) {
fsm := testFSM(t)
job := mock.Job()
job.Type = structs.JobTypeBatch
job.Periodic = &structs.PeriodicConfig{Enabled: true}
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
}
@@ -301,6 +309,15 @@ func TestFSM_DeregisterJob(t *testing.T) {
if _, ok := fsm.periodicRunner.(*MockPeriodic).Jobs[job.ID]; ok {
t.Fatal("job not removed from periodic runner")
}
// Verify it was removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut != nil {
t.Fatalf("launch found!")
}
}
func TestFSM_UpdateEval(t *testing.T) {

View File

@@ -188,43 +188,19 @@ func (s *Server) restorePeriodicDispatcher() error {
}
now := time.Now()
var last time.Time
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
s.periodicDispatcher.Add(job)
// Need to force run the job if an evaluation should have been created
// during the leader election period. At a high-level the logic to
// determine whether to force run a job is split based on whether an
// eval has been created for it. If so we check that since the last
// eval, should there have been a launch. If there is no eval, we check
// if there should have been a launch since the job was inserted.
evals, err := s.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
return fmt.Errorf("failed to get the evals created for periodic job %v: %v",
job.ID, err)
launch, err := s.fsm.State().PeriodicLaunchByID(job.ID)
if err != nil || launch == nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
// Determine if we need to force run by checking if a run should have
// occured since the last eval
if l := len(evals); l != 0 {
last = evals[l-1].JobLaunch
if !job.Periodic.Next(last).Before(now) {
continue
}
goto FORCE
}
// Determine if we need to force run by checking if a run should have
// occured since the job was added.
last = s.fsm.TimeTable().NearestTime(job.ModifyIndex)
// TODO(alex): Think about the 0 time case
if !job.Periodic.Next(last).Before(now) {
if !job.Periodic.Next(launch.Launch).Before(now) {
continue
}
FORCE:
if err := s.periodicDispatcher.ForceRun(job.ID); err != nil {
s.logger.Printf(
"[ERR] nomad.periodic: force run of periodic job %q failed: %v",

View File

@@ -17,7 +17,7 @@ import (
const (
// The string appended to the periodic jobs ID when launching derived
// instances of it.
JobLaunchSuffix = "-launch-"
JobLaunchSuffix = "/periodic-"
)
// PeriodicRunner is the interface for tracking and launching periodic jobs at
@@ -30,6 +30,7 @@ type PeriodicRunner interface {
ForceRun(jobID string) error
Tracked() []structs.Job
Flush()
LaunchTime(jobID string) (time.Time, error)
}
// PeriodicDispatch is used to track and launch periodic jobs. It maintains the
@@ -72,8 +73,10 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) {
p.enabled = enabled
p.l.Unlock()
if !enabled {
close(p.stopCh)
<-p.waitCh
if p.running {
close(p.stopCh)
<-p.waitCh
}
p.Flush()
}
}
@@ -107,7 +110,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
return nil
}
// If we were tracking a job and it has been disabled or made non-periodic remove it.
@@ -156,7 +159,7 @@ func (p *PeriodicDispatch) Remove(jobID string) error {
// Do nothing if not enabled
if !p.enabled {
return fmt.Errorf("periodic dispatch disabled")
return nil
}
if job, tracked := p.tracked[jobID]; tracked {
@@ -374,6 +377,7 @@ func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time
// CreatedEvals returns the set of evaluations created from the passed periodic
// job in sorted order, with the earliest job launch first.
// TODO: Get rid of this
func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, error) {
state := p.srv.fsm.State()
iter, err := state.ChildJobs(periodicJobID)
@@ -390,7 +394,7 @@ func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, er
}
for _, eval := range childEvals {
launch, err := p.evalLaunchTime(eval)
launch, err := p.LaunchTime(eval.JobID)
if err != nil {
return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err)
}
@@ -422,11 +426,9 @@ func (p PeriodicEvals) Len() int { return len(p) }
func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) }
// evalLaunchTime returns the launch time of the job associated with the eval.
// This is only valid for evaluations created by PeriodicDispatch and will
// otherwise return an error.
func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Time, error) {
jobID := created.JobID
// LaunchTime returns the launch time of the job. This is only valid for
// jobs created by PeriodicDispatch and will otherwise return an error.
func (p *PeriodicDispatch) LaunchTime(jobID string) (time.Time, error) {
index := strings.LastIndex(jobID, JobLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)

View File

@@ -47,6 +47,10 @@ func (m *MockPeriodic) ForceRun(jobID string) error {
return nil
}
func (m *MockPeriodic) LaunchTime(jobID string) (time.Time, error) {
return time.Time{}, nil
}
func (m *MockPeriodic) Start() {}
func (m *MockPeriodic) Flush() {
@@ -76,27 +80,6 @@ func testPeriodicJob(times ...time.Time) *structs.Job {
return job
}
func TestPeriodicDispatch_DisabledOperations(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
// Disable the dispatcher.
s1.periodicDispatcher.SetEnabled(false)
job := mock.PeriodicJob()
if err := s1.periodicDispatcher.Add(job); err == nil {
t.Fatalf("Add on disabled dispatcher should fail")
}
if err := s1.periodicDispatcher.Remove(job.ID); err == nil {
t.Fatalf("Remove on disabled dispatcher should fail")
}
}
func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {

View File

@@ -19,6 +19,7 @@ func stateStoreSchema() *memdb.DBSchema {
indexTableSchema,
nodeTableSchema,
jobTableSchema,
periodicLaunchTableSchema,
evalTableSchema,
allocTableSchema,
}
@@ -141,6 +142,28 @@ func jobIsGCable(obj interface{}) (bool, error) {
return j.GC, nil
}
// periodicLaunchTableSchema returns the MemDB schema tracking the most recent
// launch time for a perioidic job.
func periodicLaunchTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{
Name: "periodic_launch",
Indexes: map[string]*memdb.IndexSchema{
// Primary index is used for job management
// and simple direct lookup. ID is required to be
// unique.
"id": &memdb.IndexSchema{
Name: "id",
AllowMissing: false,
Unique: true,
Indexer: &memdb.StringFieldIndex{
Field: "ID",
Lowercase: true,
},
},
},
}
}
// evalTableSchema returns the MemDB schema for the eval table.
// This table is used to store all the evaluations that are pending
// or recently completed.

View File

@@ -408,6 +408,80 @@ func (s *StateStore) JobsByGC(gc bool) (memdb.ResultIterator, error) {
return iter, nil
}
// UpsertPeriodicLaunch is used to register a launch or update it.
func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.PeriodicLaunch) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: launch.ID})
// Check if the job already exists
if _, err := txn.First("periodic_launch", "id", launch.ID); err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
}
// Insert the job
if err := txn.Insert("periodic_launch", launch); err != nil {
return fmt.Errorf("launch insert failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// DeletePeriodicLaunch is used to delete the periodic launch
func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: jobID})
// Lookup the launch
existing, err := txn.First("periodic_launch", "id", jobID)
if err != nil {
return fmt.Errorf("launch lookup failed: %v", err)
}
if existing == nil {
return fmt.Errorf("launch not found")
}
// Delete the launch
if err := txn.Delete("periodic_launch", existing); err != nil {
return fmt.Errorf("launch delete failed: %v", err)
}
if err := txn.Insert("index", &IndexEntry{"periodic_launch", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notify(watcher) })
txn.Commit()
return nil
}
// PeriodicLaunchByID is used to lookup a periodic launch by the periodic job
// ID.
func (s *StateStore) PeriodicLaunchByID(id string) (*structs.PeriodicLaunch, error) {
txn := s.db.Txn(false)
existing, err := txn.First("periodic_launch", "id", id)
if err != nil {
return nil, fmt.Errorf("periodic launch lookup failed: %v", err)
}
if existing != nil {
return existing.(*structs.PeriodicLaunch), nil
}
return nil, nil
}
// UpsertEvaluation is used to upsert an evaluation
func (s *StateStore) UpsertEvals(index uint64, evals []*structs.Evaluation) error {
txn := s.db.Txn(true)

View File

@@ -5,6 +5,7 @@ import (
"reflect"
"sort"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@@ -668,6 +669,122 @@ func TestStateStore_RestoreJob(t *testing.T) {
notify.verify(t)
}
func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(launch, out) {
t.Fatalf("bad: %#v %#v", job, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1000 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
launch2 := &structs.PeriodicLaunch{job.ID, launch.Launch.Add(1 * time.Second)}
err = state.UpsertPeriodicLaunch(1001, launch2)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if !reflect.DeepEqual(launch2, out) {
t.Fatalf("bad: %#v %#v", launch2, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_DeletePeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
notify := setupNotifyTest(
state,
watch.Item{Table: "periodic_launch"},
watch.Item{Job: job.ID})
err := state.UpsertPeriodicLaunch(1000, launch)
if err != nil {
t.Fatalf("err: %v", err)
}
err = state.DeletePeriodicLaunch(1001, job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
out, err := state.PeriodicLaunchByID(job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != nil {
t.Fatalf("bad: %#v %#v", job, out)
}
index, err := state.Index("periodic_launch")
if err != nil {
t.Fatalf("err: %v", err)
}
if index != 1001 {
t.Fatalf("bad: %d", index)
}
notify.verify(t)
}
func TestStateStore_Indexes(t *testing.T) {
state := testStateStore(t)
node := mock.Node()

View File

@@ -1017,6 +1017,12 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
return time.Time{}
}
// PeriodicLaunch tracks the last launch time of a periodic job.
type PeriodicLaunch struct {
ID string // ID of the periodic job.
Launch time.Time // The last launch time.
}
var (
defaultServiceJobRestartPolicy = RestartPolicy{
Delay: 15 * time.Second,

View File

@@ -156,6 +156,24 @@ The `job` object supports the following keys:
and "h" suffix can be used, such as "30s". Both values default to zero,
which disables rolling updates.
* `periodic` - `periodic` allows the job to be scheduled at fixed times, dates
or intervals. The `periodic` block has the following configuration:
```
periodic {
// Enabled is defaulted to true if the block is included. It can be set
// to false to pause the periodic job from running.
enabled = true
// A cron expression configuring the interval the job is launched at.
// Supports predefined expressions such as "@daily" and "@weekly"
cron = "*/15 * * * * *"
}
```
`cron`: See [here](https://github.com/gorhill/cronexpr#implementation)
for full documentation of supported cron specs and the predefined expressions.
### Task Group
The `group` object supports the following keys: