From eb1126be230e45ed79c2e95ed45d0f613ee9abcd Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 16 Dec 2015 13:46:09 -0800 Subject: [PATCH] fixes from review --- nomad/fsm.go | 4 ++-- nomad/fsm_test.go | 4 ++-- nomad/periodic.go | 19 +++++++++-------- nomad/periodic_test.go | 6 +++--- nomad/state/state_store.go | 36 +++++++++++++++++++++------------ nomad/state/state_store_test.go | 27 +++++++++++++++++++------ nomad/structs/structs.go | 6 +++++- 7 files changed, 65 insertions(+), 37 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index bfff13920..334957754 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -217,7 +217,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } // Record the insertion time as a launch. - launch := &structs.PeriodicLaunch{req.Job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: req.Job.ID, Launch: time.Now()} if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err) return err @@ -242,7 +242,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err) return err } - launch := &structs.PeriodicLaunch{parentID, t} + launch := &structs.PeriodicLaunch{ID: parentID, Launch: t} if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil { n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err) return err diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 332eaccc4..c1bf4dfa5 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -645,10 +645,10 @@ func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) { fsm := testFSM(t) state := fsm.State() job1 := mock.Job() - launch1 := &structs.PeriodicLaunch{job1.ID, time.Now()} + launch1 := &structs.PeriodicLaunch{ID: job1.ID, Launch: time.Now()} state.UpsertPeriodicLaunch(1000, launch1) job2 := mock.Job() - launch2 := &structs.PeriodicLaunch{job2.ID, time.Now()} + launch2 := &structs.PeriodicLaunch{ID: job2.ID, Launch: time.Now()} state.UpsertPeriodicLaunch(1001, launch2) // Verify the contents diff --git a/nomad/periodic.go b/nomad/periodic.go index ac3647b3a..08f45b91f 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -27,7 +27,7 @@ type PeriodicRunner interface { Add(job *structs.Job) error Remove(jobID string) error ForceRun(jobID string) error - Tracked() []structs.Job + Tracked() []*structs.Job Flush() LaunchTime(jobID string) (time.Time, error) } @@ -89,13 +89,13 @@ func (p *PeriodicDispatch) Start() { } // Tracked returns the set of tracked job IDs. -func (p *PeriodicDispatch) Tracked() []structs.Job { +func (p *PeriodicDispatch) Tracked() []*structs.Job { p.l.RLock() defer p.l.RUnlock() - tracked := make([]structs.Job, len(p.tracked)) + tracked := make([]*structs.Job, len(p.tracked)) i := 0 for _, job := range p.tracked { - tracked[i] = *job + tracked[i] = job i++ } return tracked @@ -115,12 +115,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error { // If we were tracking a job and it has been disabled or made non-periodic remove it. disabled := !job.IsPeriodic() || !job.Periodic.Enabled _, tracked := p.tracked[job.ID] - if tracked && disabled { - return p.removeLocked(job.ID) - } - - // If the job is diabled and we aren't tracking it, do nothing. if disabled { + if tracked { + p.removeLocked(job.ID) + } + + // If the job is diabled and we aren't tracking it, do nothing. return nil } @@ -223,7 +223,6 @@ PICK: p.l.RLock() if p.heap.Length() == 0 { p.l.RUnlock() - p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting") select { case <-p.stopCh: return diff --git a/nomad/periodic_test.go b/nomad/periodic_test.go index 5ca4ff6a4..47604a17f 100644 --- a/nomad/periodic_test.go +++ b/nomad/periodic_test.go @@ -58,11 +58,11 @@ func (m *MockPeriodic) Flush() { m.Jobs = make(map[string]*structs.Job) } -func (m *MockPeriodic) Tracked() []structs.Job { - tracked := make([]structs.Job, len(m.Jobs)) +func (m *MockPeriodic) Tracked() []*structs.Job { + tracked := make([]*structs.Job, len(m.Jobs)) i := 0 for _, job := range m.Jobs { - tracked[i] = *job + tracked[i] = job i++ } return tracked diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 070bb8aad..d73616696 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -131,10 +131,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "nodes"}) - watcher.Add(watch.Item{Node: nodeID}) - // Lookup the node existing, err := txn.First("nodes", "id", nodeID) if err != nil { @@ -144,6 +140,10 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error { return fmt.Errorf("node not found") } + watcher := watch.NewItems() + watcher.Add(watch.Item{Table: "nodes"}) + watcher.Add(watch.Item{Node: nodeID}) + // Delete the node if err := txn.Delete("nodes", existing); err != nil { return fmt.Errorf("node delete failed: %v", err) @@ -306,10 +306,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "jobs"}) - watcher.Add(watch.Item{Job: jobID}) - // Lookup the node existing, err := txn.First("jobs", "id", jobID) if err != nil { @@ -319,6 +315,10 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error { return fmt.Errorf("job not found") } + watcher := watch.NewItems() + watcher.Add(watch.Item{Table: "jobs"}) + watcher.Add(watch.Item{Job: jobID}) + // Delete the node if err := txn.Delete("jobs", existing); err != nil { return fmt.Errorf("job delete failed: %v", err) @@ -417,10 +417,20 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic watcher.Add(watch.Item{Job: launch.ID}) // Check if the job already exists - if _, err := txn.First("periodic_launch", "id", launch.ID); err != nil { + existing, err := txn.First("periodic_launch", "id", launch.ID) + if err != nil { return fmt.Errorf("periodic launch lookup failed: %v", err) } + // Setup the indexes correctly + if existing != nil { + launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex + launch.ModifyIndex = index + } else { + launch.CreateIndex = index + launch.ModifyIndex = index + } + // Insert the job if err := txn.Insert("periodic_launch", launch); err != nil { return fmt.Errorf("launch insert failed: %v", err) @@ -439,10 +449,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error { txn := s.db.Txn(true) defer txn.Abort() - watcher := watch.NewItems() - watcher.Add(watch.Item{Table: "periodic_launch"}) - watcher.Add(watch.Item{Job: jobID}) - // Lookup the launch existing, err := txn.First("periodic_launch", "id", jobID) if err != nil { @@ -452,6 +458,10 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error { return fmt.Errorf("launch not found") } + watcher := watch.NewItems() + watcher.Add(watch.Item{Table: "periodic_launch"}) + watcher.Add(watch.Item{Job: jobID}) + // Delete the launch if err := txn.Delete("periodic_launch", existing); err != nil { return fmt.Errorf("launch delete failed: %v", err) diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 18515481d..1e158949b 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -672,7 +672,7 @@ func TestStateStore_RestoreJob(t *testing.T) { func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() - launch := &structs.PeriodicLaunch{job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()} notify := setupNotifyTest( state, @@ -688,6 +688,12 @@ func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + if out.CreateIndex != 1000 { + t.Fatalf("bad: %#v", out) + } + if out.ModifyIndex != 1000 { + t.Fatalf("bad: %#v", out) + } if !reflect.DeepEqual(launch, out) { t.Fatalf("bad: %#v %#v", job, out) @@ -707,7 +713,7 @@ func TestStateStore_UpsertPeriodicLaunch(t *testing.T) { func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() - launch := &structs.PeriodicLaunch{job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()} notify := setupNotifyTest( state, @@ -719,7 +725,10 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) { t.Fatalf("err: %v", err) } - launch2 := &structs.PeriodicLaunch{job.ID, launch.Launch.Add(1 * time.Second)} + launch2 := &structs.PeriodicLaunch{ + ID: job.ID, + Launch: launch.Launch.Add(1 * time.Second), + } err = state.UpsertPeriodicLaunch(1001, launch2) if err != nil { t.Fatalf("err: %v", err) @@ -729,6 +738,12 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) { if err != nil { t.Fatalf("err: %v", err) } + if out.CreateIndex != 1000 { + t.Fatalf("bad: %#v", out) + } + if out.ModifyIndex != 1001 { + t.Fatalf("bad: %#v", out) + } if !reflect.DeepEqual(launch2, out) { t.Fatalf("bad: %#v %#v", launch2, out) @@ -748,7 +763,7 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) { func TestStateStore_DeletePeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() - launch := &structs.PeriodicLaunch{job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()} notify := setupNotifyTest( state, @@ -791,7 +806,7 @@ func TestStateStore_PeriodicLaunches(t *testing.T) { for i := 0; i < 10; i++ { job := mock.Job() - launch := &structs.PeriodicLaunch{job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()} launches = append(launches, launch) err := state.UpsertPeriodicLaunch(1000+uint64(i), launch) @@ -840,7 +855,7 @@ func TestStateStore_PeriodicLaunches(t *testing.T) { func TestStateStore_RestorePeriodicLaunch(t *testing.T) { state := testStateStore(t) job := mock.Job() - launch := &structs.PeriodicLaunch{job.ID, time.Now()} + launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()} notify := setupNotifyTest( state, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a33befbd3..0773e94fb 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -939,7 +939,7 @@ const ( // PeriodicSpecTest is only used by unit tests. It is a sorted, comma // seperated list of unix timestamps at which to launch. - PeriodicSpecTest = "test" + PeriodicSpecTest = "_internal_test" ) // Periodic defines the interval a job should be run at. @@ -1021,6 +1021,10 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time { type PeriodicLaunch struct { ID string // ID of the periodic job. Launch time.Time // The last launch time. + + // Raft Indexes + CreateIndex uint64 + ModifyIndex uint64 } var (