Leader election restore, add structs to api jobs

This commit is contained in:
Alex Dadgar
2015-12-04 15:10:08 -08:00
parent 2c0d30fbbd
commit ac3d833942
7 changed files with 387 additions and 37 deletions

View File

@@ -101,12 +101,19 @@ func (j *Jobs) ForceEvaluate(jobID string, q *WriteOptions) (string, *WriteMeta,
return resp.EvalID, wm, nil
}
//UpdateStrategy is for serializing update strategy for a job.
// UpdateStrategy is for serializing update strategy for a job.
type UpdateStrategy struct {
Stagger time.Duration
MaxParallel int
}
// PeriodicConfig is for serializing periodic config for a job.
type PeriodicConfig struct {
Enabled bool
Spec string
SpecType string
}
// Job is used to serialize a job.
type Job struct {
Region string
@@ -119,6 +126,7 @@ type Job struct {
Constraints []*Constraint
TaskGroups []*TaskGroup
Update *UpdateStrategy
Periodic *PeriodicConfig
Meta map[string]string
Status string
StatusDescription string

View File

@@ -117,8 +117,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
return err
}
// Enable the periodic dispatcher,since we are now the leader.
// Enable the periodic dispatcher, since we are now the leader.
s.periodicDispatcher.SetEnabled(true)
s.periodicDispatcher.Start()
// Restore the periodic dispatcher state
if err := s.restorePeriodicDispatcher(); err != nil {
@@ -175,8 +176,65 @@ func (s *Server) restoreEvalBroker() error {
return nil
}
// restorePeriodicDispatcher is used to restore all periodic jobs into the
// periodic dispatcher. It also determines if a periodic job should have been
// created during the leadership transition and force runs them. The periodic
// dispatcher is maintained only by the leader, so it must be restored anytime a
// leadership transition takes place.
func (s *Server) restorePeriodicDispatcher() error {
// TODO(alex)
iter, err := s.fsm.State().JobsByPeriodic(true)
if err != nil {
return fmt.Errorf("failed to get periodic jobs: %v", err)
}
now := time.Now()
var last time.Time
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
s.periodicDispatcher.Add(job)
// Need to force run the job if an evaluation should have been created
// during the leader election period. At a high-level the logic to
// determine whether to force run a job is split based on whether an
// eval has been created for it. If so we check that since the last
// eval, should there have been a launch. If there is no eval, we check
// if there should have been a launch since the job was inserted.
evals, err := s.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
return fmt.Errorf("failed to get the evals created for periodic job %v: %v",
job.ID, err)
}
// Determine if we need to force run by checking if a run should have
// occured since the last eval
if l := len(evals); l != 0 {
last = evals[l-1].JobLaunch
if !job.Periodic.Next(last).Before(now) {
continue
}
goto FORCE
}
// Determine if we need to force run by checking if a run should have
// occured since the job was added.
last = s.fsm.TimeTable().NearestTime(job.ModifyIndex)
// TODO(alex): Think about the 0 time case
if !job.Periodic.Next(last).Before(now) {
continue
}
FORCE:
if err := s.periodicDispatcher.ForceRun(job.ID); err != nil {
s.logger.Printf(
"[ERR] nomad.periodic: force run of periodic job %q failed: %v",
job.ID, err)
return fmt.Errorf("failed for force run periodic job %q: %v", job.ID, err)
}
s.logger.Printf("[DEBUG] nomad.periodic: periodic job %q force"+
" run during leadership establishment", job.ID)
}
return nil
}

View File

@@ -286,6 +286,190 @@ func TestLeader_EvalBroker_Reset(t *testing.T) {
})
}
func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer s2.Shutdown()
s3 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
c.DevDisableBootstrap = true
})
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
testJoin(t, s1, s2, s3)
testutil.WaitForLeader(t, s1.RPC)
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.raftPeers.Peers()
return len(peers) == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
var leader *Server
for _, s := range servers {
if s.IsLeader() {
leader = s
break
}
}
if leader == nil {
t.Fatalf("Should have a leader")
}
// Inject a periodic job and non-periodic job
periodic := mock.PeriodicJob()
nonPeriodic := mock.Job()
for _, job := range []*structs.Job{nonPeriodic, periodic} {
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := leader.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
}
// Kill the leader
leader.Shutdown()
time.Sleep(100 * time.Millisecond)
// Wait for a new leader
leader = nil
testutil.WaitForResult(func() (bool, error) {
for _, s := range servers {
if s.IsLeader() {
leader = s
return true, nil
}
}
return false, nil
}, func(err error) {
t.Fatalf("should have leader")
})
// Check that the new leader is tracking the periodic job.
testutil.WaitForResult(func() (bool, error) {
_, tracked := leader.periodicDispatcher.tracked[periodic.ID]
return tracked, nil
}, func(err error) {
t.Fatalf("periodic job not tracked")
})
}
func TestLeader_PeriodicDispatcher_Restore_NoEvals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
// Inject a periodic job that will be triggered soon.
launch := time.Now().Add(1 * time.Second)
job := testPeriodicJob(launch)
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.Flush()
// Sleep till after the job should have been launched.
time.Sleep(2 * time.Second)
// Restore the periodic dispatcher.
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err)
}
if len(evals) != 1 {
t.Fatalf("restorePeriodicDispatcher() didn't create an eval")
}
}
func TestLeader_PeriodicDispatcher_Restore_Evals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer s1.Shutdown()
// Inject a periodic job that triggered once in the past, should trigger now
// and once in the future.
now := time.Now()
past := now.Add(-1 * time.Second)
future := now.Add(10 * time.Second)
job := testPeriodicJob(past, now, future)
req := structs.JobRegisterRequest{
Job: job,
}
_, _, err := s1.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
t.Fatalf("err: %v", err)
}
// Create an eval for the past launch.
s1.periodicDispatcher.createEval(job, past)
// Flush the periodic dispatcher, ensuring that no evals will be created.
s1.periodicDispatcher.Flush()
// Sleep till after the job should have been launched.
time.Sleep(2 * time.Second)
// Restore the periodic dispatcher.
s1.restorePeriodicDispatcher()
// Ensure the job is tracked.
if _, tracked := s1.periodicDispatcher.tracked[job.ID]; !tracked {
t.Fatalf("periodic job not restored")
}
// Check that an eval was made.
evals, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err)
}
if len(evals) != 2 {
t.Fatalf("restorePeriodicDispatcher() didn't create an eval")
}
// Check it was for the right time.
match := false
for _, eval := range evals {
if eval.JobLaunch != past && eval.JobLaunch != future {
match = true
break
}
}
if !match {
t.Fatal("restorePeriodicDispatcher() didn't create the correct eval")
}
}
func TestLeader_PeriodicDispatch(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0

View File

@@ -5,12 +5,21 @@ import (
"errors"
"fmt"
"log"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// The string appended to the periodic jobs ID when launching derived
// instances of it.
JobLaunchSuffix = "-launch-"
)
// PeriodicRunner is the interface for tracking and launching periodic jobs at
// their periodic spec.
type PeriodicRunner interface {
@@ -117,10 +126,12 @@ func (p *PeriodicDispatch) Add(job *structs.Job) error {
if err := p.heap.Update(job, next); err != nil {
return fmt.Errorf("failed to update job %v launch time: %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: updated periodic job %q", job.ID)
} else {
if err := p.heap.Push(job, next); err != nil {
return fmt.Errorf("failed to add job %v", job.ID, err)
}
p.logger.Printf("[DEBUG] nomad.periodic: registered periodic job %q", job.ID)
}
// Signal an update.
@@ -160,6 +171,7 @@ func (p *PeriodicDispatch) Remove(jobID string) error {
}
}
p.logger.Printf("[DEBUG] nomad.periodic: deregistered periodic job %q", jobID)
return nil
}
@@ -192,13 +204,14 @@ func (p *PeriodicDispatch) run() {
}
p.l.RUnlock()
now := time.Now().Local()
var now time.Time
PICK:
// If there is nothing wait for an update.
p.l.RLock()
if p.heap.Length() == 0 {
p.l.RUnlock()
p.logger.Printf("[DEBUG] nomad.periodic: no periodic jobs; waiting")
<-p.updateCh
p.l.RLock()
}
@@ -206,7 +219,7 @@ PICK:
nextJob, err := p.heap.Peek()
p.l.RUnlock()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err)
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
return
}
@@ -214,10 +227,15 @@ PICK:
// If there are only invalid times, wait for an update.
if launchTime.IsZero() {
p.logger.Printf("[DEBUG] nomad.periodic: job %q has no valid launch time", nextJob.job.ID)
<-p.updateCh
goto PICK
}
now = time.Now()
p.logger.Printf("[DEBUG] nomad.periodic: launching job %q in %s",
nextJob.job.ID, nextJob.next.Sub(now))
select {
case <-p.stopCh:
return
@@ -237,7 +255,7 @@ PICK:
j, err := p.heap.Peek()
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to determine next periodic job: %v", err)
p.logger.Printf("[ERR] nomad.periodic: failed to determine next periodic job: %v", err)
break
}
@@ -246,11 +264,10 @@ PICK:
}
if err := p.heap.Update(j.job, j.job.Periodic.Next(nowUpdate)); err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: failed to update next launch of periodic job: %v", j.job.ID, err)
p.logger.Printf("[ERR] nomad.periodic: failed to update next launch of periodic job %q: %v", j.job.ID, err)
}
// TODO(alex): Want to be able to check that there isn't a previously
// running cron job for this job.
p.logger.Printf("[DEBUG] nomad.periodic: launching job %v at %v", j.job.ID, launchTime)
go p.createEval(j.job, launchTime)
}
@@ -273,7 +290,7 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time)
req := structs.JobRegisterRequest{Job: derived}
_, index, err := p.srv.raftApply(structs.JobRegisterRequestType, req)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: Register failed: %v", err)
p.logger.Printf("[ERR] nomad.periodic: registering child job for periodic job %q failed: %v", periodicJob.ID, err)
return err
}
@@ -296,7 +313,7 @@ func (p *PeriodicDispatch) createEval(periodicJob *structs.Job, time time.Time)
// but that the EvalUpdate does not.
_, _, err = p.srv.raftApply(structs.EvalUpdateRequestType, update)
if err != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: Eval create failed: %v", err)
p.logger.Printf("[ERR] nomad.periodic: creating eval for %q failed: %v", derived.ID, err)
return err
}
@@ -312,7 +329,7 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
// Have to recover in case the job copy panics.
defer func() {
if r := recover(); r != nil {
p.logger.Printf("[ERR] nomad.periodic_dispatch: deriving job from"+
p.logger.Printf("[ERR] nomad.periodic: deriving job from"+
" periodic job %v failed; deregistering from periodic runner: %v",
periodicJob.ID, r)
p.Remove(periodicJob.ID)
@@ -326,7 +343,7 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
derived = periodicJob.Copy()
derived.ParentID = periodicJob.ID
derived.ID = p.derivedJobID(periodicJob, time)
derived.Name = periodicJob.ID
derived.Name = derived.ID
derived.Periodic = nil
return
}
@@ -334,32 +351,77 @@ func (p *PeriodicDispatch) deriveJob(periodicJob *structs.Job, time time.Time) (
// deriveJobID returns a job ID based on the parent periodic job and the launch
// time.
func (p *PeriodicDispatch) derivedJobID(periodicJob *structs.Job, time time.Time) string {
return fmt.Sprintf("%s-%d", periodicJob.ID, time.Unix())
return fmt.Sprintf("%s%s%d", periodicJob.ID, JobLaunchSuffix, time.Unix())
}
// CreatedEvals returns the set of evaluations created from the passed periodic
// job.
func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) ([]*structs.Evaluation, error) {
// job in sorted order, with the earliest job launch first.
func (p *PeriodicDispatch) CreatedEvals(periodicJobID string) (PeriodicEvals, error) {
state := p.srv.fsm.State()
iter, err := state.ChildJobs(periodicJobID)
if err != nil {
return nil, fmt.Errorf("failed to look up children of job %v: %v", periodicJobID, err)
}
var evals []*structs.Evaluation
var evals PeriodicEvals
for i := iter.Next(); i != nil; i = iter.Next() {
job := i.(*structs.Job)
childEvals, err := state.EvalsByJob(job.ID)
if err != nil {
fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err)
return nil, fmt.Errorf("failed to look up evals for job %v: %v", job.ID, err)
}
evals = append(evals, childEvals...)
for _, eval := range childEvals {
launch, err := p.evalLaunchTime(eval)
if err != nil {
return nil, fmt.Errorf("failed to get launch time for eval %v: %v", eval, err)
}
pEval := &PeriodicEval{
Eval: eval,
JobLaunch: launch,
}
evals = append(evals, pEval)
}
}
// Return the sorted evals.
sort.Sort(evals)
return evals, nil
}
// PeriodicEval stores the evaluation and launch time for an instantiated
// periodic job.
type PeriodicEval struct {
Eval *structs.Evaluation
JobLaunch time.Time
}
type PeriodicEvals []*PeriodicEval
func (p PeriodicEvals) Len() int { return len(p) }
func (p PeriodicEvals) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
func (p PeriodicEvals) Less(i, j int) bool { return p[i].JobLaunch.Before(p[j].JobLaunch) }
// evalLaunchTime returns the launch time of the job associated with the eval.
// This is only valid for evaluations created by PeriodicDispatch and will
// otherwise return an error.
func (p *PeriodicDispatch) evalLaunchTime(created *structs.Evaluation) (time.Time, error) {
jobID := created.JobID
index := strings.LastIndex(jobID, JobLaunchSuffix)
if index == -1 {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
launch, err := strconv.Atoi(jobID[index+len(JobLaunchSuffix):])
if err != nil {
return time.Time{}, fmt.Errorf("couldn't parse launch time from eval: %v", jobID)
}
return time.Unix(int64(launch), 0), nil
}
// Flush clears the state of the PeriodicDispatcher
func (p *PeriodicDispatch) Flush() {
p.l.Lock()

View File

@@ -63,6 +63,19 @@ func (m *MockPeriodic) Tracked() []structs.Job {
return tracked
}
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Unix()))
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
func TestPeriodicDispatch_DisabledOperations(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
@@ -171,7 +184,7 @@ func TestPeriodicDispatch_Add_TriggersUpdate(t *testing.T) {
t.Fatalf("Unexpected number of evals created; got %#v; want 1", evals)
}
eval := evals[0]
eval := evals[0].Eval
expID := s1.periodicDispatcher.derivedJobID(job, expected)
if eval.JobID != expID {
t.Fatalf("periodic dispatcher created eval at the wrong time; got %v; want %v",
@@ -331,8 +344,8 @@ func TestPeriodicDispatch_Run_Multiple(t *testing.T) {
d := s1.periodicDispatcher
expected := []string{d.derivedJobID(job, launch1), d.derivedJobID(job, launch2)}
for i, eval := range evals {
if eval.JobID != expected[i] {
t.Fatalf("eval created incorrectly; got %v; want %v", eval.JobID, expected[i])
if eval.Eval.JobID != expected[i] {
t.Fatalf("eval created incorrectly; got %v; want %v", eval.Eval.JobID, expected[i])
}
}
}
@@ -375,8 +388,8 @@ func TestPeriodicDispatch_Run_SameTime(t *testing.T) {
d := s1.periodicDispatcher
expected := d.derivedJobID(job, launch)
if evals[0].JobID != expected {
t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].JobID, expected)
if evals[0].Eval.JobID != expected {
t.Fatalf("eval created incorrectly; got %v; want %v", evals[0].Eval.JobID, expected)
}
}
}
@@ -464,7 +477,7 @@ func TestPeriodicDispatch_Complex(t *testing.T) {
var jobs []string
for _, eval := range evals {
jobs = append(jobs, eval.JobID)
jobs = append(jobs, eval.Eval.JobID)
}
actual[job.ID] = jobs
}
@@ -482,17 +495,36 @@ func shuffle(jobs []*structs.Job) {
}
}
func testPeriodicJob(times ...time.Time) *structs.Job {
job := mock.PeriodicJob()
job.Periodic.SpecType = structs.PeriodicSpecTest
func TestPeriodicDispatch_CreatedEvals(t *testing.T) {
s1 := testServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)
l := make([]string, len(times))
for i, t := range times {
l[i] = strconv.Itoa(int(t.Unix()))
// Create three evals.
job := mock.PeriodicJob()
now := time.Now().Round(time.Second)
times := []time.Time{now.Add(1 * time.Second), now.Add(2 * time.Second), now}
for _, time := range times {
if err := s1.periodicDispatcher.createEval(job, time); err != nil {
t.Fatalf("createEval() failed: %v", err)
}
}
// Get the created evals.
created, err := s1.periodicDispatcher.CreatedEvals(job.ID)
if err != nil {
t.Fatalf("CreatedEvals(%v) failed: %v", job.ID, err)
}
expected := []time.Time{times[2], times[0], times[1]}
for i, c := range created {
if c.JobLaunch != expected[i] {
t.Fatalf("CreatedEvals are in wrong order; got %v; want %v at index %d",
c.JobLaunch, expected[i], i)
}
}
job.Periodic.Spec = strings.Join(l, ",")
return job
}
// TODO: Check that it doesn't create evals for overlapping things.

View File

@@ -863,9 +863,15 @@ func (j *Job) Validate() error {
}
// Validate periodic is only used with batch jobs.
if j.Periodic != nil && j.Periodic.Enabled && j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
if j.IsPeriodic() {
if j.Type != JobTypeBatch {
mErr.Errors = append(mErr.Errors,
fmt.Errorf("Periodic can only be used with %q scheduler", JobTypeBatch))
}
if err := j.Periodic.Validate(); err != nil {
mErr.Errors = append(mErr.Errors, err)
}
}
return mErr.ErrorOrNil()

View File

@@ -64,7 +64,7 @@ func (t *TimeTable) Deserialize(dec *codec.Decoder) error {
return nil
}
// Witness is used to witness a new inde and time.
// Witness is used to witness a new index and time.
func (t *TimeTable) Witness(index uint64, when time.Time) {
t.l.Lock()
defer t.l.Unlock()