mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Fix restoration of stopped periodic jobs
This PR fixes an issue in which we would add a stopped periodic job to the periodic launcher.
This commit is contained in:
@@ -325,7 +325,8 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
||||
// We always add the job to the periodic dispatcher because there is the
|
||||
// possibility that the periodic spec was removed and then we should stop
|
||||
// tracking it.
|
||||
if err := n.periodicDispatcher.Add(req.Job); err != nil {
|
||||
added, err := n.periodicDispatcher.Add(req.Job)
|
||||
if err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
|
||||
return err
|
||||
}
|
||||
@@ -338,7 +339,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
|
||||
// is added to when it was suppose to launch, leader election occurs and the
|
||||
// job was not launched. In this case, we use the insertion time to
|
||||
// determine if a launch was missed.
|
||||
if req.Job.IsPeriodic() {
|
||||
if added {
|
||||
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
|
||||
if err != nil {
|
||||
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)
|
||||
|
||||
@@ -314,7 +314,16 @@ func (s *Server) restorePeriodicDispatcher() error {
|
||||
continue
|
||||
}
|
||||
|
||||
s.periodicDispatcher.Add(job)
|
||||
added, err := s.periodicDispatcher.Add(job)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// We did not add the job to the tracker, this can be for a variety of
|
||||
// reasons, but it means that we do not need to force run it.
|
||||
if !added {
|
||||
continue
|
||||
}
|
||||
|
||||
// If the periodic job has never been launched before, launch will hold
|
||||
// the time the periodic job was added. Otherwise it has the last launch
|
||||
|
||||
@@ -186,18 +186,20 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
|
||||
}
|
||||
|
||||
// Add begins tracking of a periodic job. If it is already tracked, it acts as
|
||||
// an update to the jobs periodic spec.
|
||||
func (p *PeriodicDispatch) Add(job *structs.Job) error {
|
||||
// an update to the jobs periodic spec. The method returns whether the job was
|
||||
// added and any error that may have occured.
|
||||
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
|
||||
p.l.Lock()
|
||||
defer p.l.Unlock()
|
||||
|
||||
// Do nothing if not enabled
|
||||
if !p.enabled {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// If we were tracking a job and it has been disabled or made non-periodic remove it.
|
||||
disabled := !job.IsPeriodic() || !job.Periodic.Enabled
|
||||
// If we were tracking a job and it has been disabled, made non-periodic,
|
||||
// stopped or is parameterized, remove it
|
||||
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
|
||||
|
||||
tuple := structs.NamespacedID{
|
||||
ID: job.ID,
|
||||
@@ -210,13 +212,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
|
||||
}
|
||||
|
||||
// If the job is disabled and we aren't tracking it, do nothing.
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if the job is also a parameterized job. If it is, then we do not want to
|
||||
// treat it as a periodic job but only its dispatched children.
|
||||
if job.IsParameterized() {
|
||||
return nil
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Add or update the job.
|
||||
@@ -224,12 +220,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
|
||||
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
|
||||
if tracked {
|
||||
if err := p.heap.Update(job, next); err != nil {
|
||||
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
|
||||
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
|
||||
}
|
||||
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
|
||||
} else {
|
||||
if err := p.heap.Push(job, next); err != nil {
|
||||
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
|
||||
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
|
||||
}
|
||||
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
|
||||
}
|
||||
@@ -240,7 +236,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
|
||||
default:
|
||||
}
|
||||
|
||||
return nil
|
||||
return true, nil
|
||||
}
|
||||
|
||||
// Remove stops tracking the passed job. If the job is not tracked, it is a
|
||||
|
||||
@@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) {
|
||||
// Enable and track something
|
||||
p.SetEnabled(true)
|
||||
job := mock.PeriodicJob()
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -130,8 +130,10 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
|
||||
t.Parallel()
|
||||
p, _ := testPeriodicDispatcher()
|
||||
job := mock.Job()
|
||||
if err := p.Add(job); err != nil {
|
||||
if added, err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
|
||||
} else if added {
|
||||
t.Fatalf("Add of non-periodic job happened, expect no-op")
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -145,8 +147,23 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
|
||||
p, _ := testPeriodicDispatcher()
|
||||
job := mock.PeriodicJob()
|
||||
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add of periodic parameterized job failed: %v; expect no-op", err)
|
||||
if added, err := p.Add(job); err != nil || added {
|
||||
t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
if len(tracked) != 0 {
|
||||
t.Fatalf("Add of periodic parameterized job should be no-op: %v", tracked)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
|
||||
t.Parallel()
|
||||
p, _ := testPeriodicDispatcher()
|
||||
job := mock.PeriodicJob()
|
||||
job.Stop = true
|
||||
if added, err := p.Add(job); err != nil || added {
|
||||
t.Fatalf("Add of stopped periodic job failed: %v %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -159,8 +176,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
|
||||
t.Parallel()
|
||||
p, _ := testPeriodicDispatcher()
|
||||
job := mock.PeriodicJob()
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -170,8 +187,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
|
||||
|
||||
// Update the job and add it again.
|
||||
job.Periodic.Spec = "foo"
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
tracked = p.Tracked()
|
||||
@@ -191,8 +208,15 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
|
||||
job := mock.PeriodicJob()
|
||||
job2 := mock.PeriodicJob()
|
||||
job2.Namespace = "test"
|
||||
assert.Nil(p.Add(job))
|
||||
assert.Nil(p.Add(job2))
|
||||
|
||||
added, err := p.Add(job)
|
||||
assert.Nil(err)
|
||||
assert.True(added)
|
||||
|
||||
added, err = p.Add(job2)
|
||||
assert.Nil(err)
|
||||
assert.True(added)
|
||||
|
||||
assert.Len(p.Tracked(), 2)
|
||||
|
||||
assert.Nil(p.Remove(job2.Namespace, job2.ID))
|
||||
@@ -204,8 +228,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
|
||||
t.Parallel()
|
||||
p, _ := testPeriodicDispatcher()
|
||||
job := mock.PeriodicJob()
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -215,8 +239,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
|
||||
|
||||
// Update the job to be non-periodic and add it again.
|
||||
job.Periodic = nil
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
tracked = p.Tracked()
|
||||
@@ -233,15 +257,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
|
||||
job := testPeriodicJob(time.Now().Add(10 * time.Second))
|
||||
|
||||
// Add it.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
// Update it to be sooner and re-add.
|
||||
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
|
||||
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
// Check that nothing is created.
|
||||
@@ -281,8 +305,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
|
||||
p, _ := testPeriodicDispatcher()
|
||||
|
||||
job := mock.PeriodicJob()
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
tracked := p.Tracked()
|
||||
@@ -308,8 +332,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
|
||||
job := testPeriodicJob(time.Now().Add(1 * time.Second))
|
||||
|
||||
// Add it.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
// Remove the job.
|
||||
@@ -347,8 +371,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
|
||||
job := testPeriodicJob(time.Now().Add(10 * time.Second))
|
||||
|
||||
// Add it.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
// ForceRun the job
|
||||
@@ -379,8 +403,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
|
||||
job.Periodic.ProhibitOverlap = true
|
||||
|
||||
// Add it.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
@@ -408,8 +432,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
|
||||
job := testPeriodicJob(launch1, launch2)
|
||||
|
||||
// Add it.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
time.Sleep(3 * time.Second)
|
||||
@@ -440,11 +464,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
|
||||
job2 := testPeriodicJob(launch)
|
||||
|
||||
// Add them.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
if err := p.Add(job2); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job2); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
if l := len(p.Tracked()); l != 2 {
|
||||
@@ -480,11 +504,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
|
||||
job2.Namespace = "test"
|
||||
|
||||
// Add them.
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
if err := p.Add(job2); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job2); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
|
||||
if l := len(p.Tracked()); l != 2 {
|
||||
@@ -560,8 +584,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
|
||||
shuffle(toDelete)
|
||||
|
||||
for _, job := range jobs {
|
||||
if err := p.Add(job); err != nil {
|
||||
t.Fatalf("Add failed %v", err)
|
||||
if added, err := p.Add(job); err != nil || !added {
|
||||
t.Fatalf("Add failed %v", added, err)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user