Merge pull request #3646 from hashicorp/b-periodic-non-leader

Fix followers not creating periodic launch
This commit is contained in:
Alex Dadgar
2017-12-11 15:33:38 -08:00
committed by GitHub
9 changed files with 183 additions and 69 deletions

View File

@@ -50,6 +50,8 @@ IMPROVEMENTS:
BUG FIXES:
* core: Fix issue in which restoring periodic jobs could fail when a leader
election occurs [GH-3646]
* core: Fixed an issue where the leader server could get into a state where it
was no longer performing the periodic leader loop duties after a barrier
timeout error [GH-3402]

View File

@@ -351,8 +351,7 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// We always add the job to the periodic dispatcher because there is the
// possibility that the periodic spec was removed and then we should stop
// tracking it.
added, err := n.periodicDispatcher.Add(req.Job)
if err != nil {
if err := n.periodicDispatcher.Add(req.Job); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Add failed: %v", err)
return err
}
@@ -360,12 +359,12 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} {
// Create a watch set
ws := memdb.NewWatchSet()
// If it is periodic, record the time it was inserted. This is necessary for
// recovering during leader election. It is possible that from the time it
// is added to when it was suppose to launch, leader election occurs and the
// job was not launched. In this case, we use the insertion time to
// determine if a launch was missed.
if added {
// If it is an active periodic job, record the time it was inserted. This is
// necessary for recovering during leader election. It is possible that from
// the time it is added to when it was suppose to launch, leader election
// occurs and the job was not launched. In this case, we use the insertion
// time to determine if a launch was missed.
if req.Job.IsPeriodicActive() {
prevLaunch, err := n.state.PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: PeriodicLaunchByID failed: %v", err)

View File

@@ -331,6 +331,65 @@ func TestFSM_RegisterJob(t *testing.T) {
}
}
func TestFSM_RegisterPeriodicJob_NonLeader(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
// Disable the dispatcher
fsm.periodicDispatcher.SetEnabled(false)
job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
resp := fsm.Apply(makeLog(buf))
if resp != nil {
t.Fatalf("resp: %v", resp)
}
// Verify we are registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if jobOut == nil {
t.Fatalf("not found!")
}
if jobOut.CreateIndex != 1 {
t.Fatalf("bad index: %d", jobOut.CreateIndex)
}
// Verify it wasn't added to the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
if _, ok := fsm.periodicDispatcher.tracked[tuple]; ok {
t.Fatal("job added to periodic runner")
}
// Verify the launch time was tracked.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, req.Namespace, req.Job.ID)
if err != nil {
t.Fatalf("err: %v", err)
}
if launchOut == nil {
t.Fatalf("not found!")
}
if launchOut.Launch.IsZero() {
t.Fatalf("bad launch time: %v", launchOut.Launch)
}
}
func TestFSM_RegisterJob_BadNamespace(t *testing.T) {
t.Parallel()
fsm := testFSM(t)

View File

@@ -360,14 +360,12 @@ func (s *Server) restorePeriodicDispatcher() error {
continue
}
added, err := s.periodicDispatcher.Add(job)
if err != nil {
if err := s.periodicDispatcher.Add(job); err != nil {
return err
}
// We did not add the job to the tracker, this can be for a variety of
// reasons, but it means that we do not need to force run it.
if !added {
// We do not need to force run the job since it isn't active.
if !job.IsPeriodicActive() {
continue
}
@@ -375,9 +373,13 @@ func (s *Server) restorePeriodicDispatcher() error {
// the time the periodic job was added. Otherwise it has the last launch
// time of the periodic job.
launch, err := s.fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
if err != nil || launch == nil {
if err != nil {
return fmt.Errorf("failed to get periodic launch time: %v", err)
}
if launch == nil {
return fmt.Errorf("no recorded periodic launch time for job %q in namespace %q",
job.ID, job.Namespace)
}
// nextLaunch is the next launch that should occur.
nextLaunch := job.Periodic.Next(launch.Launch.In(job.Periodic.GetLocation()))

View File

@@ -192,18 +192,18 @@ func (p *PeriodicDispatch) Tracked() []*structs.Job {
// Add begins tracking of a periodic job. If it is already tracked, it acts as
// an update to the jobs periodic spec. The method returns whether the job was
// added and any error that may have occurred.
func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
func (p *PeriodicDispatch) Add(job *structs.Job) error {
p.l.Lock()
defer p.l.Unlock()
// Do nothing if not enabled
if !p.enabled {
return false, nil
return nil
}
// If we were tracking a job and it has been disabled, made non-periodic,
// stopped or is parameterized, remove it
disabled := !job.IsPeriodic() || !job.Periodic.Enabled || job.Stopped() || job.IsParameterized()
disabled := !job.IsPeriodicActive()
tuple := structs.NamespacedID{
ID: job.ID,
@@ -216,7 +216,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
}
// If the job is disabled and we aren't tracking it, do nothing.
return false, nil
return nil
}
// Add or update the job.
@@ -224,12 +224,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
next := job.Periodic.Next(time.Now().In(job.Periodic.GetLocation()))
if tracked {
if err := p.heap.Update(job, next); err != nil {
return false, fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
return fmt.Errorf("failed to update job %q (%s) launch time: %v", job.ID, job.Namespace, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q (%s)", job.ID, job.Namespace)
} else {
if err := p.heap.Push(job, next); err != nil {
return false, fmt.Errorf("failed to add job %v: %v", job.ID, err)
return fmt.Errorf("failed to add job %v: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q (%s)", job.ID, job.Namespace)
}
@@ -240,7 +240,7 @@ func (p *PeriodicDispatch) Add(job *structs.Job) (added bool, err error) {
default:
}
return true, nil
return nil
}
// Remove stops tracking the passed job. If the job is not tracked, it is a

View File

@@ -77,7 +77,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
job := mock.PeriodicJob()
job.Periodic.ProhibitOverlap = true // Shouldn't affect anything.
assert.Nil(state.UpsertJob(100, job))
_, err := s1.periodicDispatcher.Add(job)
err := s1.periodicDispatcher.Add(job)
assert.Nil(err)
// Force launch it.

View File

@@ -116,8 +116,8 @@ func TestPeriodicDispatch_SetEnabled(t *testing.T) {
// Enable and track something
p.SetEnabled(true)
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
@@ -130,10 +130,8 @@ func TestPeriodicDispatch_Add_NonPeriodic(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.Job()
if added, err := p.Add(job); err != nil {
if err := p.Add(job); err != nil {
t.Fatalf("Add of non-periodic job failed: %v; expect no-op", err)
} else if added {
t.Fatalf("Add of non-periodic job happened, expect no-op")
}
tracked := p.Tracked()
@@ -147,8 +145,8 @@ func TestPeriodicDispatch_Add_Periodic_Parameterized(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of periodic parameterized job failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add of periodic parameterized job failed: %v", err)
}
tracked := p.Tracked()
@@ -162,8 +160,8 @@ func TestPeriodicDispatch_Add_Periodic_Stopped(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
job.Stop = true
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add of stopped periodic job failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add of stopped periodic job failed: %v", err)
}
tracked := p.Tracked()
@@ -176,8 +174,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
@@ -187,8 +185,8 @@ func TestPeriodicDispatch_Add_UpdateJob(t *testing.T) {
// Update the job and add it again.
job.Periodic.Spec = "foo"
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed: %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
@@ -208,13 +206,9 @@ func TestPeriodicDispatch_Add_Remove_Namespaced(t *testing.T) {
job := mock.PeriodicJob()
job2 := mock.PeriodicJob()
job2.Namespace = "test"
added, err := p.Add(job)
assert.Nil(err)
assert.True(added)
assert.Nil(p.Add(job))
added, err = p.Add(job2)
assert.Nil(err)
assert.True(added)
assert.Nil(p.Add(job2))
assert.Len(p.Tracked(), 2)
@@ -227,8 +221,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
t.Parallel()
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
@@ -238,8 +232,8 @@ func TestPeriodicDispatch_Add_RemoveJob(t *testing.T) {
// Update the job to be non-periodic and add it again.
job.Periodic = nil
if added, err := p.Add(job); err != nil || added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked = p.Tracked()
@@ -256,15 +250,15 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Update it to be sooner and re-add.
expected := time.Now().Round(1 * time.Second).Add(1 * time.Second)
job.Periodic.Spec = fmt.Sprintf("%d", expected.Unix())
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Check that nothing is created.
@@ -304,8 +298,8 @@ func TestPeriodicDispatch_Remove_Tracked(t *testing.T) {
p, _ := testPeriodicDispatcher()
job := mock.PeriodicJob()
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
tracked := p.Tracked()
@@ -331,8 +325,8 @@ func TestPeriodicDispatch_Remove_TriggersUpdate(t *testing.T) {
job := testPeriodicJob(time.Now().Add(1 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// Remove the job.
@@ -370,8 +364,8 @@ func TestPeriodicDispatch_ForceRun_Tracked(t *testing.T) {
job := testPeriodicJob(time.Now().Add(10 * time.Second))
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
// ForceRun the job
@@ -402,8 +396,8 @@ func TestPeriodicDispatch_Run_DisallowOverlaps(t *testing.T) {
job.Periodic.ProhibitOverlap = true
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
@@ -431,8 +425,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
job := testPeriodicJob(launch1, launch2)
// Add it.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
time.Sleep(3 * time.Second)
@@ -463,11 +457,11 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
job2 := testPeriodicJob(launch)
// Add them.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
@@ -503,11 +497,11 @@ func TestPeriodicDispatch_Run_SameID_Different_Namespace(t *testing.T) {
job2.Namespace = "test"
// Add them.
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
if added, err := p.Add(job2); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job2); err != nil {
t.Fatalf("Add failed %v", err)
}
if l := len(p.Tracked()); l != 2 {
@@ -587,8 +581,8 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
shuffle(toDelete)
for _, job := range jobs {
if added, err := p.Add(job); err != nil || !added {
t.Fatalf("Add failed %v %v", added, err)
if err := p.Add(job); err != nil {
t.Fatalf("Add failed %v", err)
}
}

View File

@@ -1939,6 +1939,12 @@ func (j *Job) IsPeriodic() bool {
return j.Periodic != nil
}
// IsPeriodicActive returns whether the job is an active periodic job that will
// create child jobs
func (j *Job) IsPeriodicActive() bool {
return j.IsPeriodic() && j.Periodic.Enabled && !j.Stopped() && !j.IsParameterized()
}
// IsParameterized returns whether a job is parameterized job.
func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil

View File

@@ -637,6 +637,58 @@ func TestJob_IsPeriodic(t *testing.T) {
}
}
func TestJob_IsPeriodicActive(t *testing.T) {
cases := []struct {
job *Job
active bool
}{
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: true,
},
},
active: true,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: false,
},
},
active: false,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: true,
},
Stop: true,
},
active: false,
},
{
job: &Job{
Type: JobTypeService,
Periodic: &PeriodicConfig{
Enabled: false,
},
ParameterizedJob: &ParameterizedJobConfig{},
},
active: false,
},
}
for i, c := range cases {
if act := c.job.IsPeriodicActive(); act != c.active {
t.Fatalf("case %d failed: got %v; want %v", i, act, c.active)
}
}
}
func TestJob_SystemJob_Validate(t *testing.T) {
j := testJob()
j.Type = JobTypeSystem