mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
Reaping failed evaluations creates follow up eval
Create a follow up evaluation when reaping failed evaluations. This ensures that a job will still make eventual progress.
This commit is contained in:
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
@@ -19,6 +20,15 @@ const (
|
||||
// unblocked to re-enter the scheduler. A failed evaluation occurs under
|
||||
// high contention when the schedulers plan does not make progress.
|
||||
failedEvalUnblockInterval = 1 * time.Minute
|
||||
|
||||
// failedEvalFollowUpBaseLineWait is the minimum time waited before retrying
|
||||
// a failed evaluation.
|
||||
failedEvalFollowUpBaseLineWait = 1 * time.Minute
|
||||
|
||||
// failedEvalFollowUpWaitRange defines the the range of additional time from
|
||||
// the minimum in which to wait before retrying a failed evaluation. A value
|
||||
// from this range should be selected using a uniform distribution.
|
||||
failedEvalFollowUpWaitRange = 9 * time.Minute
|
||||
)
|
||||
|
||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||
@@ -392,9 +402,16 @@ func (s *Server) reapFailedEvaluations(stopCh chan struct{}) {
|
||||
newEval.StatusDescription = fmt.Sprintf("evaluation reached delivery limit (%d)", s.config.EvalDeliveryLimit)
|
||||
s.logger.Printf("[WARN] nomad: eval %#v reached delivery limit, marking as failed", newEval)
|
||||
|
||||
// Create a follow-up evaluation that will be used to retry the
|
||||
// scheduling for the job after the cluster is hopefully more stable
|
||||
// due to the fairly large backoff.
|
||||
followupEvalWait := failedEvalFollowUpBaseLineWait +
|
||||
time.Duration(rand.Int63n(int64(failedEvalFollowUpWaitRange)))
|
||||
followupEval := eval.CreateFailedFollowUpEval(followupEvalWait)
|
||||
|
||||
// Update via Raft
|
||||
req := structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{newEval},
|
||||
Evals: []*structs.Evaluation{newEval, followupEval},
|
||||
}
|
||||
if _, _, err := s.raftApply(structs.EvalUpdateRequestType, &req); err != nil {
|
||||
s.logger.Printf("[ERR] nomad: failed to update failed eval %#v: %v", newEval, err)
|
||||
|
||||
@@ -508,7 +508,7 @@ func TestLeader_ReapFailedEval(t *testing.T) {
|
||||
}
|
||||
s1.evalBroker.Nack(out.ID, token)
|
||||
|
||||
// Wait updated evaluation
|
||||
// Wait for an updated and followup evaluation
|
||||
state := s1.fsm.State()
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
ws := memdb.NewWatchSet()
|
||||
@@ -516,7 +516,45 @@ func TestLeader_ReapFailedEval(t *testing.T) {
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return out != nil && out.Status == structs.EvalStatusFailed, nil
|
||||
if out == nil {
|
||||
return false, fmt.Errorf("expect original evaluation to exist")
|
||||
}
|
||||
if out.Status != structs.EvalStatusFailed {
|
||||
return false, fmt.Errorf("got status %v; want %v", out.Status, structs.EvalStatusFailed)
|
||||
}
|
||||
|
||||
// See if there is a followup
|
||||
evals, err := state.EvalsByJob(ws, eval.JobID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if l := len(evals); l != 2 {
|
||||
return false, fmt.Errorf("got %d evals, want 2", l)
|
||||
}
|
||||
|
||||
for _, e := range evals {
|
||||
if e.ID == eval.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
if e.Status != structs.EvalStatusPending {
|
||||
return false, fmt.Errorf("follow up eval has status %v; want %v",
|
||||
e.Status, structs.EvalStatusPending)
|
||||
}
|
||||
|
||||
if e.Wait < failedEvalFollowUpBaseLineWait ||
|
||||
e.Wait > failedEvalFollowUpBaseLineWait+failedEvalFollowUpWaitRange {
|
||||
return false, fmt.Errorf("bad wait: %v", e.Wait)
|
||||
}
|
||||
|
||||
if e.TriggeredBy != structs.EvalTriggerFailedFollowUp {
|
||||
return false, fmt.Errorf("follow up eval TriggeredBy %v; want %v",
|
||||
e.TriggeredBy, structs.EvalTriggerFailedFollowUp)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
@@ -3736,13 +3736,14 @@ const (
|
||||
)
|
||||
|
||||
const (
|
||||
EvalTriggerJobRegister = "job-register"
|
||||
EvalTriggerJobDeregister = "job-deregister"
|
||||
EvalTriggerPeriodicJob = "periodic-job"
|
||||
EvalTriggerNodeUpdate = "node-update"
|
||||
EvalTriggerScheduled = "scheduled"
|
||||
EvalTriggerRollingUpdate = "rolling-update"
|
||||
EvalTriggerMaxPlans = "max-plan-attempts"
|
||||
EvalTriggerJobRegister = "job-register"
|
||||
EvalTriggerJobDeregister = "job-deregister"
|
||||
EvalTriggerPeriodicJob = "periodic-job"
|
||||
EvalTriggerNodeUpdate = "node-update"
|
||||
EvalTriggerScheduled = "scheduled"
|
||||
EvalTriggerRollingUpdate = "rolling-update"
|
||||
EvalTriggerFailedFollowUp = "failed-eval-follow-up"
|
||||
EvalTriggerMaxPlans = "max-plan-attempts"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -3985,6 +3986,23 @@ func (e *Evaluation) CreateBlockedEval(classEligibility map[string]bool, escaped
|
||||
}
|
||||
}
|
||||
|
||||
// CreateFailedFollowUpEval creates a follow up evaluation when the current one
|
||||
// has been marked as failed becasue it has hit the delivery limit and will not
|
||||
// be retried by the eval_broker.
|
||||
func (e *Evaluation) CreateFailedFollowUpEval(wait time.Duration) *Evaluation {
|
||||
return &Evaluation{
|
||||
ID: GenerateUUID(),
|
||||
Priority: e.Priority,
|
||||
Type: e.Type,
|
||||
TriggeredBy: EvalTriggerFailedFollowUp,
|
||||
JobID: e.JobID,
|
||||
JobModifyIndex: e.JobModifyIndex,
|
||||
Status: EvalStatusPending,
|
||||
Wait: wait,
|
||||
PreviousEval: e.ID,
|
||||
}
|
||||
}
|
||||
|
||||
// Plan is used to submit a commit plan for task allocations. These
|
||||
// are submitted to the leader which verifies that resources have
|
||||
// not been overcommitted before admiting the plan.
|
||||
|
||||
Reference in New Issue
Block a user