From b2401ecc3c9f3cd4e5bf96fdd515102d047de10b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 15 Aug 2015 14:47:13 -0700 Subject: [PATCH] scheduler: update status and test retry limit --- scheduler/generic_sched.go | 32 +++++++++++++++++-- scheduler/generic_sched_test.go | 56 +++++++++++++++++++++++++++++++++ scheduler/scheduler_test.go | 36 +++++++++++++++++++++ scheduler/util.go | 5 ++- 4 files changed, 126 insertions(+), 3 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index b1731dfd8..fe2caa5fc 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -18,6 +18,16 @@ const ( maxBatchScheduleAttempts = 2 ) +// SetStatusError is used to set the status of the evaluation to the given error +type SetStatusError struct { + Err error + EvalStatus string +} + +func (s *SetStatusError) Error() string { + return s.Err.Error() +} + // GenericScheduler is used for 'service' and 'batch' type jobs. This scheduler is // designed for long-lived services, and as such spends more time attemping // to make a high quality placement. This is the primary scheduler for @@ -55,6 +65,15 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul return s } +// setStatus is used to update the status of the evaluation +func (s *GenericScheduler) setStatus(status, desc string) error { + s.logger.Printf("[DEBUG] sched: %#v: setting status to %s (%s)", s.eval, status, desc) + newEval := s.eval.Copy() + newEval.Status = status + newEval.StatusDescription = desc + return s.planner.UpdateEval(newEval) +} + // Process is used to handle a single evaluation func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // Verify the evaluation trigger reason is understood @@ -62,8 +81,9 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, structs.EvalTriggerJobDeregister: default: - return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason", + desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) + return s.setStatus(structs.EvalStatusFailed, desc) } // Store the evaluation @@ -74,7 +94,15 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { if s.batch { limit = maxBatchScheduleAttempts } - return retryMax(limit, s.process) + if err := retryMax(limit, s.process); err != nil { + if statusErr, ok := err.(*SetStatusError); ok { + return s.setStatus(statusErr.EvalStatus, err.Error()) + } + return err + } + + // Update the status to complete + return s.setStatus(structs.EvalStatusComplete, "") } // process is wrapped in retryMax to iteratively run the handler until we have no diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 24f9119e7..1429729bf 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -57,6 +57,8 @@ func TestServiceSched_JobRegister(t *testing.T) { if len(out) != 10 { t.Fatalf("bad: %#v", out) } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobRegister_AllocFail(t *testing.T) { @@ -100,6 +102,8 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { if len(out) != 10 { t.Fatalf("bad: %#v", out) } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobModify(t *testing.T) { @@ -178,6 +182,8 @@ func TestServiceSched_JobModify(t *testing.T) { if len(out) != 10 { t.Fatalf("bad: %#v", out) } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_JobDeregister(t *testing.T) { @@ -228,6 +234,8 @@ func TestServiceSched_JobDeregister(t *testing.T) { if len(out) != 0 { t.Fatalf("bad: %#v", out) } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) } func TestServiceSched_NodeDrain(t *testing.T) { @@ -301,4 +309,52 @@ func TestServiceSched_NodeDrain(t *testing.T) { if len(out) != 10 { t.Fatalf("bad: %#v", out) } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestServiceSched_RetryLimit(t *testing.T) { + h := NewHarness(t) + h.Planner = &RejectPlan{h} + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.RegisterNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.Job() + noErr(t, h.State.RegisterJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: mock.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no allocations placed + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + // Should hit the retry limit + h.AssertEvalStatus(t, structs.EvalStatusFailed) } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 6f605e450..fe6c80b78 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -10,6 +10,21 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +// RejectPlan is used to always reject the entire plan and force a state refresh +type RejectPlan struct { + Harness *Harness +} + +func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) { + result := new(structs.PlanResult) + result.RefreshIndex = r.Harness.NextIndex() + return result, r.Harness.State, nil +} + +func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error { + return nil +} + // Harness is a lightweight testing harness for schedulers. // It manages a state store copy and provides the planner // interface. It can be extended for various testing uses. @@ -80,7 +95,17 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er } func (h *Harness) UpdateEval(eval *structs.Evaluation) error { + // Ensure sequential plan application + h.planLock.Lock() + defer h.planLock.Unlock() + + // Store the eval h.Evals = append(h.Evals, eval) + + // Check for custom planner + if h.Planner != nil { + return h.Planner.UpdateEval(eval) + } return nil } @@ -113,6 +138,17 @@ func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error { return sched.Process(eval) } +func (h *Harness) AssertEvalStatus(t *testing.T, state string) { + if len(h.Evals) != 1 { + t.Fatalf("bad: %#v", h.Evals) + } + update := h.Evals[0] + + if update.Status != state { + t.Fatalf("bad: %#v", update) + } +} + // noErr is used to assert there are no errors func noErr(t *testing.T, err error) { if err != nil { diff --git a/scheduler/util.go b/scheduler/util.go index 84eeaa105..736e8120e 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -162,7 +162,10 @@ func retryMax(max int, cb func() (bool, error)) error { } attempts += 1 } - return fmt.Errorf("maximum attempts reached (%d)", max) + return &SetStatusError{ + Err: fmt.Errorf("maximum attempts reached (%d)", max), + EvalStatus: structs.EvalStatusFailed, + } } // taintedNodes is used to scan the allocations and then check if the