From f47d341c4e26e2bfde7a1a9e62a94017f370533e Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Fri, 27 May 2016 11:26:14 -0700 Subject: [PATCH] Blocked evals don't store TG alloc metrics --- scheduler/generic_sched.go | 26 +++++++++++++++----------- scheduler/system_sched.go | 6 +++--- scheduler/util.go | 6 +++++- scheduler/util_test.go | 22 +++++++++++++++++++--- 4 files changed, 42 insertions(+), 18 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 9db02a813..bad43d21c 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -62,7 +62,8 @@ type GenericScheduler struct { limitReached bool nextEval *structs.Evaluation - blocked *structs.Evaluation + blocked *structs.Evaluation + failedTGAllocs map[string]*structs.AllocMetric } // NewServiceScheduler is a factory function to instantiate a new service scheduler @@ -100,7 +101,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, + s.failedTGAllocs, structs.EvalStatusFailed, desc) } // Retry up to the maxScheduleAttempts and reset if progress is made. @@ -117,7 +119,8 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { if err := s.createBlockedEval(true); err != nil { mErr.Errors = append(mErr.Errors, err) } - if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, statusErr.EvalStatus, err.Error()); err != nil { + if err := setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, + s.failedTGAllocs, statusErr.EvalStatus, err.Error()); err != nil { mErr.Errors = append(mErr.Errors, err) } return mErr.ErrorOrNil() @@ -127,12 +130,13 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // If the current evaluation is a blocked evaluation and we didn't place // everything, do not update the status to complete. - if s.eval.Status == structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 { + if s.eval.Status == structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 { return s.planner.ReblockEval(s.eval) } // Update the status to complete - return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, + s.failedTGAllocs, structs.EvalStatusComplete, "") } // createBlockedEval creates a blocked eval and submits it to the planner. If @@ -170,7 +174,7 @@ func (s *GenericScheduler) process() (bool, error) { s.plan = s.eval.MakePlan(s.job) // Reset the failed allocations - s.eval.FailedTGAllocs = nil + s.failedTGAllocs = nil // Create an evaluation context s.ctx = NewEvalContext(s.state, s.plan, s.logger) @@ -190,7 +194,7 @@ func (s *GenericScheduler) process() (bool, error) { // If there are failed allocations, we need to create a blocked evaluation // to place the failed allocations when resources become available. If the // current evaluation is already a blocked eval, we reuse it. - if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { + if s.eval.Status != structs.EvalStatusBlocked && len(s.failedTGAllocs) != 0 && s.blocked == nil { if err := s.createBlockedEval(false); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err @@ -382,7 +386,7 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { for _, missing := range place { // Check if this task group has already failed - if metric, ok := s.eval.FailedTGAllocs[missing.TaskGroup.Name]; ok { + if metric, ok := s.failedTGAllocs[missing.TaskGroup.Name]; ok { metric.CoalescedFailures += 1 continue } @@ -416,11 +420,11 @@ func (s *GenericScheduler) computePlacements(place []allocTuple) error { s.plan.AppendAlloc(alloc) } else { // Lazy initialize the failed map - if s.eval.FailedTGAllocs == nil { - s.eval.FailedTGAllocs = make(map[string]*structs.AllocMetric) + if s.failedTGAllocs == nil { + s.failedTGAllocs = make(map[string]*structs.AllocMetric) } - s.eval.FailedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics() + s.failedTGAllocs[missing.TaskGroup.Name] = s.ctx.Metrics() } } diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index 9f64e95be..3b8a6f438 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -60,20 +60,20 @@ func (s *SystemScheduler) Process(eval *structs.Evaluation) error { default: desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusFailed, desc) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, structs.EvalStatusFailed, desc) } // Retry up to the maxSystemScheduleAttempts and reset if progress is made. progress := func() bool { return progressMade(s.planResult) } if err := retryMax(maxSystemScheduleAttempts, s.process, progress); err != nil { if statusErr, ok := err.(*SetStatusError); ok { - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, statusErr.EvalStatus, err.Error()) + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, statusErr.EvalStatus, err.Error()) } return err } // Update the status to complete - return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, structs.EvalStatusComplete, "") + return setStatus(s.logger, s.planner, s.eval, s.nextEval, nil, nil, structs.EvalStatusComplete, "") } // process is wrapped in retryMax to iteratively run the handler until we have no diff --git a/scheduler/util.go b/scheduler/util.go index 14f9009e2..1c1c93275 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -366,11 +366,15 @@ func networkPortMap(n *structs.NetworkResource) map[string]int { } // setStatus is used to update the status of the evaluation -func setStatus(logger *log.Logger, planner Planner, eval, nextEval, spawnedBlocked *structs.Evaluation, status, desc string) error { +func setStatus(logger *log.Logger, planner Planner, + eval, nextEval, spawnedBlocked *structs.Evaluation, + tgMetrics map[string]*structs.AllocMetric, status, desc string) error { + logger.Printf("[DEBUG] sched: %#v: setting status to %s", eval, status) newEval := eval.Copy() newEval.Status = status newEval.StatusDescription = desc + newEval.FailedTGAllocs = tgMetrics if nextEval != nil { newEval.NextEval = nextEval.ID } diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 9c299c59f..fc2a1f133 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -488,7 +488,7 @@ func TestSetStatus(t *testing.T) { eval := mock.Eval() status := "a" desc := "b" - if err := setStatus(logger, h, eval, nil, nil, status, desc); err != nil { + if err := setStatus(logger, h, eval, nil, nil, nil, status, desc); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -504,7 +504,7 @@ func TestSetStatus(t *testing.T) { // Test next evals h = NewHarness(t) next := mock.Eval() - if err := setStatus(logger, h, eval, next, nil, status, desc); err != nil { + if err := setStatus(logger, h, eval, next, nil, nil, status, desc); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -520,7 +520,7 @@ func TestSetStatus(t *testing.T) { // Test blocked evals h = NewHarness(t) blocked := mock.Eval() - if err := setStatus(logger, h, eval, nil, blocked, status, desc); err != nil { + if err := setStatus(logger, h, eval, nil, blocked, nil, status, desc); err != nil { t.Fatalf("setStatus() failed: %v", err) } @@ -532,6 +532,22 @@ func TestSetStatus(t *testing.T) { if newEval.BlockedEval != blocked.ID { t.Fatalf("setStatus() didn't set BlockedEval correctly: %v", newEval) } + + // Test metrics + h = NewHarness(t) + metrics := map[string]*structs.AllocMetric{"foo": nil} + if err := setStatus(logger, h, eval, nil, nil, metrics, status, desc); err != nil { + t.Fatalf("setStatus() failed: %v", err) + } + + if len(h.Evals) != 1 { + t.Fatalf("setStatus() didn't update plan: %v", h.Evals) + } + + newEval = h.Evals[0] + if !reflect.DeepEqual(newEval.FailedTGAllocs, metrics) { + t.Fatalf("setStatus() didn't set failed task group metrics correctly: %v", newEval) + } } func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) {