fixes from review

This commit is contained in:
Alex Dadgar
2015-12-16 13:46:09 -08:00
parent f7a01f39a6
commit eb1126be23
7 changed files with 65 additions and 37 deletions

View File

@@ -217,7 +217,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
}
// Record the insertion time as a launch.
launch := &structs.PeriodicLaunch{req.Job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: req.Job.ID, Launch: time.Now()}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err
@@ -242,7 +242,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
n.logger.Printf("[ERR] nomad.fsm: LaunchTime(%v) failed: %v", req.Job.ID, err)
return err
}
launch := &structs.PeriodicLaunch{parentID, t}
launch := &structs.PeriodicLaunch{ID: parentID, Launch: t}
if err := n.state.UpsertPeriodicLaunch(index, launch); err != nil {
n.logger.Printf("[ERR] nomad.fsm: UpsertPeriodicLaunch failed: %v", err)
return err

View File

@@ -645,10 +645,10 @@ func TestFSM_SnapshotRestore_PeriodicLaunches(t *testing.T) {
fsm := testFSM(t)
state := fsm.State()
job1 := mock.Job()
launch1 := &structs.PeriodicLaunch{job1.ID, time.Now()}
launch1 := &structs.PeriodicLaunch{ID: job1.ID, Launch: time.Now()}
state.UpsertPeriodicLaunch(1000, launch1)
job2 := mock.Job()
launch2 := &structs.PeriodicLaunch{job2.ID, time.Now()}
launch2 := &structs.PeriodicLaunch{ID: job2.ID, Launch: time.Now()}
state.UpsertPeriodicLaunch(1001, launch2)
// Verify the contents

View File

@@ -27,7 +27,7 @@ type PeriodicRunner interface {
Add(job *structs.Job) error
Remove(jobID string) error
ForceRun(jobID string) error
Tracked() []structs.Job
Tracked() []*structs.Job
Flush()
LaunchTime(jobID string) (time.Time, error)
}
@@ -89,13 +89,13 @@ func (p *PeriodicDispatch) Start() {
}
// Tracked returns the set of tracked job IDs.
func (p *PeriodicDispatch) Tracked() []structs.Job {
func (p *PeriodicDispatch) Tracked() []*structs.Job {
p.l.RLock()
defer p.l.RUnlock()
tracked := make([]structs.Job, len(p.tracked))
tracked := make([]*structs.Job, len(p.tracked))
i := 0
for _, job := range p.tracked {
tracked[i] = *job
tracked[i] = job
i++
}
return tracked
@@ -115,12 +115,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
// If we were tracking a job and it has been disabled or made non-periodic remove it.
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
_, tracked := p.tracked[job.ID]
if tracked && disabled {
return p.removeLocked(job.ID)
}
// If the job is diabled and we aren't tracking it, do nothing.
if disabled {
if tracked {
p.removeLocked(job.ID)
}
// If the job is diabled and we aren't tracking it, do nothing.
return nil
}
@@ -223,7 +223,6 @@ PICK:
p.l.RLock()
if p.heap.Length() == 0 {
p.l.RUnlock()
p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting")
select {
case <-p.stopCh:
return

View File

@@ -58,11 +58,11 @@ func (m *MockPeriodic) Flush() {
m.Jobs = make(map[string]*structs.Job)
}
func (m *MockPeriodic) Tracked() []structs.Job {
tracked := make([]structs.Job, len(m.Jobs))
func (m *MockPeriodic) Tracked() []*structs.Job {
tracked := make([]*structs.Job, len(m.Jobs))
i := 0
for _, job := range m.Jobs {
tracked[i] = *job
tracked[i] = job
i++
}
return tracked

View File

@@ -131,10 +131,6 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Lookup the node
existing, err := txn.First("nodes", "id", nodeID)
if err != nil {
@@ -144,6 +140,10 @@ func (s *StateStore) DeleteNode(index uint64, nodeID string) error {
return fmt.Errorf("node not found")
}
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "nodes"})
watcher.Add(watch.Item{Node: nodeID})
// Delete the node
if err := txn.Delete("nodes", existing); err != nil {
return fmt.Errorf("node delete failed: %v", err)
@@ -306,10 +306,6 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
// Lookup the node
existing, err := txn.First("jobs", "id", jobID)
if err != nil {
@@ -319,6 +315,10 @@ func (s *StateStore) DeleteJob(index uint64, jobID string) error {
return fmt.Errorf("job not found")
}
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "jobs"})
watcher.Add(watch.Item{Job: jobID})
// Delete the node
if err := txn.Delete("jobs", existing); err != nil {
return fmt.Errorf("job delete failed: %v", err)
@@ -417,10 +417,20 @@ func (s *StateStore) UpsertPeriodicLaunch(index uint64, launch *structs.Periodic
watcher.Add(watch.Item{Job: launch.ID})
// Check if the job already exists
if _, err := txn.First("periodic_launch", "id", launch.ID); err != nil {
existing, err := txn.First("periodic_launch", "id", launch.ID)
if err != nil {
return fmt.Errorf("periodic launch lookup failed: %v", err)
}
// Setup the indexes correctly
if existing != nil {
launch.CreateIndex = existing.(*structs.PeriodicLaunch).CreateIndex
launch.ModifyIndex = index
} else {
launch.CreateIndex = index
launch.ModifyIndex = index
}
// Insert the job
if err := txn.Insert("periodic_launch", launch); err != nil {
return fmt.Errorf("launch insert failed: %v", err)
@@ -439,10 +449,6 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error {
txn := s.db.Txn(true)
defer txn.Abort()
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: jobID})
// Lookup the launch
existing, err := txn.First("periodic_launch", "id", jobID)
if err != nil {
@@ -452,6 +458,10 @@ func (s *StateStore) DeletePeriodicLaunch(index uint64, jobID string) error {
return fmt.Errorf("launch not found")
}
watcher := watch.NewItems()
watcher.Add(watch.Item{Table: "periodic_launch"})
watcher.Add(watch.Item{Job: jobID})
// Delete the launch
if err := txn.Delete("periodic_launch", existing); err != nil {
return fmt.Errorf("launch delete failed: %v", err)

View File

@@ -672,7 +672,7 @@ func TestStateStore_RestoreJob(t *testing.T) {
func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
@@ -688,6 +688,12 @@ func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if !reflect.DeepEqual(launch, out) {
t.Fatalf("bad: %#v %#v", job, out)
@@ -707,7 +713,7 @@ func TestStateStore_UpsertPeriodicLaunch(t *testing.T) {
func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
@@ -719,7 +725,10 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
t.Fatalf("err: %v", err)
}
launch2 := &structs.PeriodicLaunch{job.ID, launch.Launch.Add(1 * time.Second)}
launch2 := &structs.PeriodicLaunch{
ID: job.ID,
Launch: launch.Launch.Add(1 * time.Second),
}
err = state.UpsertPeriodicLaunch(1001, launch2)
if err != nil {
t.Fatalf("err: %v", err)
@@ -729,6 +738,12 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if out.CreateIndex != 1000 {
t.Fatalf("bad: %#v", out)
}
if out.ModifyIndex != 1001 {
t.Fatalf("bad: %#v", out)
}
if !reflect.DeepEqual(launch2, out) {
t.Fatalf("bad: %#v %#v", launch2, out)
@@ -748,7 +763,7 @@ func TestStateStore_UpdateUpsertPeriodicLaunch(t *testing.T) {
func TestStateStore_DeletePeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,
@@ -791,7 +806,7 @@ func TestStateStore_PeriodicLaunches(t *testing.T) {
for i := 0; i < 10; i++ {
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
launches = append(launches, launch)
err := state.UpsertPeriodicLaunch(1000+uint64(i), launch)
@@ -840,7 +855,7 @@ func TestStateStore_PeriodicLaunches(t *testing.T) {
func TestStateStore_RestorePeriodicLaunch(t *testing.T) {
state := testStateStore(t)
job := mock.Job()
launch := &structs.PeriodicLaunch{job.ID, time.Now()}
launch := &structs.PeriodicLaunch{ID: job.ID, Launch: time.Now()}
notify := setupNotifyTest(
state,

View File

@@ -939,7 +939,7 @@ const (
// PeriodicSpecTest is only used by unit tests. It is a sorted, comma
// seperated list of unix timestamps at which to launch.
PeriodicSpecTest = "test"
PeriodicSpecTest = "_internal_test"
)
// Periodic defines the interval a job should be run at.
@@ -1021,6 +1021,10 @@ func (p *PeriodicConfig) Next(fromTime time.Time) time.Time {
type PeriodicLaunch struct {
ID string // ID of the periodic job.
Launch time.Time // The last launch time.
// Raft Indexes
CreateIndex uint64
ModifyIndex uint64
}
var (