diff --git a/nomad/leader.go b/nomad/leader.go index 1f0f03307..84ed0e0cf 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -114,6 +114,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Scheduler periodic jobs go s.schedulePeriodic(stopCh) + + // Reap any failed evaluations + go s.reapFailedEvaluations(stopCh) return nil } @@ -173,6 +176,44 @@ func (s *Server) coreJobEval(job string) *structs.Evaluation { } } +// reapFailedEvaluations is used to reap evaluations that +// have reached their delivery limit and should be failed +func (s *Server) reapFailedEvaluations(stopCh chan struct{}) { + for { + select { + case <-stopCh: + return + default: + // Scan for a failed evaluation + eval, token, err := s.evalBroker.Dequeue([]string{failedQueue}, time.Second) + if err != nil { + return + } + if eval == nil { + continue + } + + // Update the status to failed + newEval := eval.Copy() + newEval.Status = structs.EvalStatusFailed + newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit) + s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval) + + // Update via Raft + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{newEval}, + } + if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil { + s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err) + continue + } + + // Ack completion + s.evalBroker.Ack(eval.ID, token) + } + } +} + // revokeLeadership is invoked once we step down as leader. // This is used to cleanup any state that may be specific to a leader. func (s *Server) revokeLeadership() error { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 8839f9d08..8f975e230 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -304,3 +304,40 @@ func TestLeader_PeriodicDispatch(t *testing.T) { t.Fatalf("should pending job") }) } + +func TestLeader_ReapFailedEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EvalDeliveryLimit = 1 + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Wait for a periodic dispatch + eval := mock.Eval() + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Dequeue and Nack + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + s1.evalBroker.Nack(out.ID, token) + + // Wait updated evaluation + state := s1.fsm.State() + testutil.WaitForResult(func() (bool, error) { + out, err := state.GetEvalByID(eval.ID) + if err != nil { + return false, err + } + return out != nil && out.Status == structs.EvalStatusFailed, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) +}