scheduler: update status and test retry limit

This commit is contained in:
Armon Dadgar
2015-08-15 14:47:13 -07:00
parent 50fd49cdbc
commit b2401ecc3c
4 changed files with 126 additions and 3 deletions

View File

@@ -18,6 +18,16 @@ const (
maxBatchScheduleAttempts = 2
)
// SetStatusError is used to set the status of the evaluation to the given error
type SetStatusError struct {
Err error
EvalStatus string
}
func (s *SetStatusError) Error() string {
return s.Err.Error()
}
// GenericScheduler is used for 'service' and 'batch' type jobs. This scheduler is
// designed for long-lived services, and as such spends more time attemping
// to make a high quality placement. This is the primary scheduler for
@@ -55,6 +65,15 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul
return s
}
// setStatus is used to update the status of the evaluation
func (s *GenericScheduler) setStatus(status, desc string) error {
s.logger.Printf("[DEBUG] sched: %#v: setting status to %s (%s)", s.eval, status, desc)
newEval := s.eval.Copy()
newEval.Status = status
newEval.StatusDescription = desc
return s.planner.UpdateEval(newEval)
}
// Process is used to handle a single evaluation
func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
// Verify the evaluation trigger reason is understood
@@ -62,8 +81,9 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerJobDeregister:
default:
return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason",
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
return s.setStatus(structs.EvalStatusFailed, desc)
}
// Store the evaluation
@@ -74,7 +94,15 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
if s.batch {
limit = maxBatchScheduleAttempts
}
return retryMax(limit, s.process)
if err := retryMax(limit, s.process); err != nil {
if statusErr, ok := err.(*SetStatusError); ok {
return s.setStatus(statusErr.EvalStatus, err.Error())
}
return err
}
// Update the status to complete
return s.setStatus(structs.EvalStatusComplete, "")
}
// process is wrapped in retryMax to iteratively run the handler until we have no

View File

@@ -57,6 +57,8 @@ func TestServiceSched_JobRegister(t *testing.T) {
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
@@ -100,6 +102,8 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobModify(t *testing.T) {
@@ -178,6 +182,8 @@ func TestServiceSched_JobModify(t *testing.T) {
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobDeregister(t *testing.T) {
@@ -228,6 +234,8 @@ func TestServiceSched_JobDeregister(t *testing.T) {
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_NodeDrain(t *testing.T) {
@@ -301,4 +309,52 @@ func TestServiceSched_NodeDrain(t *testing.T) {
if len(out) != 10 {
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_RetryLimit(t *testing.T) {
h := NewHarness(t)
h.Planner = &RejectPlan{h}
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
noErr(t, h.State.RegisterNode(h.NextIndex(), node))
}
// Create a job
job := mock.Job()
noErr(t, h.State.RegisterJob(h.NextIndex(), job))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
ID: mock.GenerateUUID(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
out, err := h.State.AllocsByJob(job.ID)
noErr(t, err)
// Ensure no allocations placed
if len(out) != 0 {
t.Fatalf("bad: %#v", out)
}
// Should hit the retry limit
h.AssertEvalStatus(t, structs.EvalStatusFailed)
}

View File

@@ -10,6 +10,21 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// RejectPlan is used to always reject the entire plan and force a state refresh
type RejectPlan struct {
Harness *Harness
}
func (r *RejectPlan) SubmitPlan(*structs.Plan) (*structs.PlanResult, State, error) {
result := new(structs.PlanResult)
result.RefreshIndex = r.Harness.NextIndex()
return result, r.Harness.State, nil
}
func (r *RejectPlan) UpdateEval(eval *structs.Evaluation) error {
return nil
}
// Harness is a lightweight testing harness for schedulers.
// It manages a state store copy and provides the planner
// interface. It can be extended for various testing uses.
@@ -80,7 +95,17 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
}
func (h *Harness) UpdateEval(eval *structs.Evaluation) error {
// Ensure sequential plan application
h.planLock.Lock()
defer h.planLock.Unlock()
// Store the eval
h.Evals = append(h.Evals, eval)
// Check for custom planner
if h.Planner != nil {
return h.Planner.UpdateEval(eval)
}
return nil
}
@@ -113,6 +138,17 @@ func (h *Harness) Process(factory Factory, eval *structs.Evaluation) error {
return sched.Process(eval)
}
func (h *Harness) AssertEvalStatus(t *testing.T, state string) {
if len(h.Evals) != 1 {
t.Fatalf("bad: %#v", h.Evals)
}
update := h.Evals[0]
if update.Status != state {
t.Fatalf("bad: %#v", update)
}
}
// noErr is used to assert there are no errors
func noErr(t *testing.T, err error) {
if err != nil {

View File

@@ -162,7 +162,10 @@ func retryMax(max int, cb func() (bool, error)) error {
}
attempts += 1
}
return fmt.Errorf("maximum attempts reached (%d)", max)
return &SetStatusError{
Err: fmt.Errorf("maximum attempts reached (%d)", max),
EvalStatus: structs.EvalStatusFailed,
}
}
// taintedNodes is used to scan the allocations and then check if the