mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Reuse the same evaluation and reblock it until there is no more work to do
This commit is contained in:
@@ -202,6 +202,46 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest,
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reblock is used to reinsert an existing blocked evaluation into the blocked
|
||||
// evaluation tracker.
|
||||
func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error {
|
||||
if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done {
|
||||
return err
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now())
|
||||
|
||||
// Ensure there is only a single update with token
|
||||
if len(args.Evals) != 1 {
|
||||
return fmt.Errorf("only a single eval can be reblocked")
|
||||
}
|
||||
eval := args.Evals[0]
|
||||
|
||||
// Verify the evaluation is outstanding, and that the tokens match.
|
||||
if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Look for the eval
|
||||
snap, err := e.srv.fsm.State().Snapshot()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out, err := snap.EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if out == nil {
|
||||
return fmt.Errorf("evaluation does not exist")
|
||||
}
|
||||
if out.Status != structs.EvalStatusBlocked {
|
||||
return fmt.Errorf("evaluation not blocked")
|
||||
}
|
||||
|
||||
// Reblock the eval
|
||||
e.srv.blockedEvals.Block(eval)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Reap is used to cleanup dead evaluations and allocations
|
||||
func (e *Eval) Reap(args *structs.EvalDeleteRequest,
|
||||
reply *structs.GenericResponse) error {
|
||||
|
||||
@@ -575,3 +575,121 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) {
|
||||
t.Fatalf("bad: %#v", resp.Allocations)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Reblock_NonExistent(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return s1.evalBroker.Enabled(), nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should enable eval broker")
|
||||
})
|
||||
|
||||
// Create the register request
|
||||
eval1 := mock.Eval()
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("missing eval")
|
||||
}
|
||||
|
||||
get := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval1},
|
||||
EvalToken: token,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
|
||||
t.Fatalf("expect error since eval does not exist")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Reblock_NonBlocked(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return s1.evalBroker.Enabled(), nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should enable eval broker")
|
||||
})
|
||||
|
||||
// Create the eval
|
||||
eval1 := mock.Eval()
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
|
||||
// Insert it into the state store
|
||||
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("missing eval")
|
||||
}
|
||||
|
||||
get := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval1},
|
||||
EvalToken: token,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil {
|
||||
t.Fatalf("should error since eval was not in blocked state", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalEndpoint_Reblock(t *testing.T) {
|
||||
s1 := testServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return s1.evalBroker.Enabled(), nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should enable eval broker")
|
||||
})
|
||||
|
||||
// Create the eval
|
||||
eval1 := mock.Eval()
|
||||
eval1.Status = structs.EvalStatusBlocked
|
||||
s1.evalBroker.Enqueue(eval1)
|
||||
|
||||
// Insert it into the state store
|
||||
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("missing eval")
|
||||
}
|
||||
|
||||
get := &structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval1},
|
||||
EvalToken: token,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that it is blocked
|
||||
bStats := s1.blockedEvals.Stats()
|
||||
if bStats.TotalBlocked+bStats.TotalEscaped == 0 {
|
||||
t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,6 +395,43 @@ SUBMIT:
|
||||
return nil
|
||||
}
|
||||
|
||||
// ReblockEval is used to reinsert a blocked evaluation into the blocked eval
|
||||
// tracker. This allows the worker to act as the planner for the scheduler.
|
||||
func (w *Worker) ReblockEval(eval *structs.Evaluation) error {
|
||||
// Check for a shutdown before plan submission
|
||||
if w.srv.IsShutdown() {
|
||||
return fmt.Errorf("shutdown while planning")
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now())
|
||||
|
||||
// TODO need to set the evaluations WorkerIndex
|
||||
|
||||
// Setup the request
|
||||
req := structs.EvalUpdateRequest{
|
||||
Evals: []*structs.Evaluation{eval},
|
||||
EvalToken: w.evalToken,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: w.srv.config.Region,
|
||||
},
|
||||
}
|
||||
var resp structs.GenericResponse
|
||||
|
||||
SUBMIT:
|
||||
// Make the RPC call
|
||||
if err := w.srv.RPC("Eval.Reblock", &req, &resp); err != nil {
|
||||
w.logger.Printf("[ERR] worker: failed to reblock evaluation %#v: %v",
|
||||
eval, err)
|
||||
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
||||
goto SUBMIT
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
w.logger.Printf("[DEBUG] worker: reblocked evaluation %#v", eval)
|
||||
w.backoffReset()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// shouldResubmit checks if a given error should be swallowed and the plan
|
||||
// resubmitted after a backoff. Usually these are transient errors that
|
||||
// the cluster should heal from quickly.
|
||||
|
||||
@@ -442,3 +442,51 @@ func TestWorker_CreateEval(t *testing.T) {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWorker_ReblockEval(t *testing.T) {
|
||||
s1 := testServer(t, func(c *Config) {
|
||||
c.NumSchedulers = 0
|
||||
c.EnabledSchedulers = []string{structs.JobTypeService}
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the blocked eval
|
||||
eval1 := mock.Eval()
|
||||
eval1.Status = structs.EvalStatusBlocked
|
||||
|
||||
// Insert it into the state store
|
||||
if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Enqueue the eval
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
err := s1.evalBroker.Enqueue(eval1)
|
||||
return err == nil, err
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if evalOut != eval1 {
|
||||
t.Fatalf("Bad eval")
|
||||
}
|
||||
|
||||
eval2 := evalOut.Copy()
|
||||
|
||||
// Attempt to reblock eval
|
||||
w := &Worker{srv: s1, logger: s1.logger, evalToken: token}
|
||||
err = w.ReblockEval(eval2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check that it is blocked
|
||||
bStats := s1.blockedEvals.Stats()
|
||||
if bStats.TotalBlocked+bStats.TotalEscaped == 0 {
|
||||
t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -113,6 +113,11 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
|
||||
if statusErr, ok := err.(*SetStatusError); ok {
|
||||
// Scheduling was tried but made no forward progress so create a
|
||||
// blocked eval to retry once resources become available.
|
||||
|
||||
// TODO: Set the trigger by reason of the blocked eval here to
|
||||
// something like "max-attempts"
|
||||
// We can then periodically dequeue these from the blocked_eval
|
||||
// tracker.
|
||||
var mErr multierror.Error
|
||||
if err := s.createBlockedEval(); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
@@ -125,6 +130,12 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// If the current evaluation is a blocked evaluation and we didn't place
|
||||
// everything, do not update the status to complete.
|
||||
if s.eval.Status == structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 {
|
||||
return s.planner.ReblockEval(s.eval)
|
||||
}
|
||||
|
||||
// Update the status to complete
|
||||
return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "")
|
||||
}
|
||||
@@ -177,8 +188,9 @@ func (s *GenericScheduler) process() (bool, error) {
|
||||
}
|
||||
|
||||
// If there are failed allocations, we need to create a blocked evaluation
|
||||
// to place the failed allocations when resources become available.
|
||||
if len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil {
|
||||
// to place the failed allocations when resources become available. If the
|
||||
// current evaluation is already a blocked eval, we reuse it.
|
||||
if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil {
|
||||
if err := s.createBlockedEval(); err != nil {
|
||||
s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err)
|
||||
return false, err
|
||||
|
||||
@@ -275,7 +275,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) {
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestServiceSched_JobRegister_BlockedEval(t *testing.T) {
|
||||
func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Create a full node
|
||||
@@ -454,6 +454,126 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) {
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestServiceSched_EvaluateBlockedEval(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Create a job and set the task group count to zero.
|
||||
job := mock.Job()
|
||||
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
|
||||
|
||||
// Create a mock blocked evaluation
|
||||
eval := &structs.Evaluation{
|
||||
ID: structs.GenerateUUID(),
|
||||
Status: structs.EvalStatusBlocked,
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
}
|
||||
|
||||
// Insert it into the state store
|
||||
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure there was no plan
|
||||
if len(h.Plans) != 0 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
|
||||
// Ensure that the eval was reblocked
|
||||
if len(h.ReblockEvals) != 1 {
|
||||
t.Fatalf("bad: %#v", h.ReblockEvals)
|
||||
}
|
||||
if h.ReblockEvals[0].ID != eval.ID {
|
||||
t.Fatalf("expect same eval to be reblocked; got %q; want %q", h.ReblockEvals[0].ID, eval.ID)
|
||||
}
|
||||
|
||||
// Ensure the eval status was not updated
|
||||
if len(h.Evals) != 0 {
|
||||
t.Fatalf("Existing eval should not have status set")
|
||||
}
|
||||
}
|
||||
|
||||
func TestServiceSched_EvaluateBlockedEval_Finished(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
// Create some nodes
|
||||
for i := 0; i < 10; i++ {
|
||||
node := mock.Node()
|
||||
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
|
||||
}
|
||||
|
||||
// Create a job and set the task group count to zero.
|
||||
job := mock.Job()
|
||||
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
|
||||
|
||||
// Create a mock blocked evaluation
|
||||
eval := &structs.Evaluation{
|
||||
ID: structs.GenerateUUID(),
|
||||
Status: structs.EvalStatusBlocked,
|
||||
Priority: job.Priority,
|
||||
TriggeredBy: structs.EvalTriggerJobRegister,
|
||||
JobID: job.ID,
|
||||
}
|
||||
|
||||
// Insert it into the state store
|
||||
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
|
||||
|
||||
// Process the evaluation
|
||||
err := h.Process(NewServiceScheduler, eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure a single plan
|
||||
if len(h.Plans) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Plans)
|
||||
}
|
||||
plan := h.Plans[0]
|
||||
|
||||
// Ensure the plan doesn't have annotations.
|
||||
if plan.Annotations != nil {
|
||||
t.Fatalf("expected no annotations")
|
||||
}
|
||||
|
||||
// Ensure the eval has no spawned blocked eval
|
||||
if len(h.Evals) != 1 {
|
||||
t.Fatalf("bad: %#v", h.Evals)
|
||||
if h.Evals[0].SpawnedBlockedEval != "" {
|
||||
t.Fatalf("bad: %#v", h.Evals[0])
|
||||
}
|
||||
}
|
||||
|
||||
// Ensure the plan allocated
|
||||
var planned []*structs.Allocation
|
||||
for _, allocList := range plan.NodeAllocation {
|
||||
planned = append(planned, allocList...)
|
||||
}
|
||||
if len(planned) != 10 {
|
||||
t.Fatalf("bad: %#v", plan)
|
||||
}
|
||||
|
||||
// Lookup the allocations by JobID
|
||||
out, err := h.State.AllocsByJob(job.ID)
|
||||
noErr(t, err)
|
||||
|
||||
// Ensure all allocations placed
|
||||
if len(out) != 10 {
|
||||
t.Fatalf("bad: %#v", out)
|
||||
}
|
||||
|
||||
// Ensure the eval was not reblocked
|
||||
if len(h.ReblockEvals) != 0 {
|
||||
t.Fatalf("Existing eval should not have been reblocked as it placed all allocations")
|
||||
}
|
||||
|
||||
h.AssertEvalStatus(t, structs.EvalStatusComplete)
|
||||
}
|
||||
|
||||
func TestServiceSched_JobModify(t *testing.T) {
|
||||
h := NewHarness(t)
|
||||
|
||||
|
||||
@@ -87,4 +87,10 @@ type Planner interface {
|
||||
// CreateEval is used to create an evaluation. This should set the
|
||||
// PreviousEval to that of the current evaluation.
|
||||
CreateEval(*structs.Evaluation) error
|
||||
|
||||
// ReblockEval takes a blocked evaluation and re-inserts it into the blocked
|
||||
// evaluation tracker. This update occurs only in-memory on the leader. The
|
||||
// evaluation must exist in a blocked state prior to this being called such
|
||||
// that on leader changes, the evaluation will be reblocked properly.
|
||||
ReblockEval(*structs.Evaluation) error
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log"
|
||||
"os"
|
||||
"sync"
|
||||
@@ -29,6 +30,10 @@ func (r *RejectPlan) CreateEval(*structs.Evaluation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *RejectPlan) ReblockEval(*structs.Evaluation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Harness is a lightweight testing harness for schedulers. It manages a state
|
||||
// store copy and provides the planner interface. It can be extended for various
|
||||
// testing uses or for invoking the scheduler without side effects.
|
||||
@@ -38,9 +43,10 @@ type Harness struct {
|
||||
Planner Planner
|
||||
planLock sync.Mutex
|
||||
|
||||
Plans []*structs.Plan
|
||||
Evals []*structs.Evaluation
|
||||
CreateEvals []*structs.Evaluation
|
||||
Plans []*structs.Plan
|
||||
Evals []*structs.Evaluation
|
||||
CreateEvals []*structs.Evaluation
|
||||
ReblockEvals []*structs.Evaluation
|
||||
|
||||
nextIndex uint64
|
||||
nextIndexLock sync.Mutex
|
||||
@@ -138,6 +144,28 @@ func (h *Harness) CreateEval(eval *structs.Evaluation) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *Harness) ReblockEval(eval *structs.Evaluation) error {
|
||||
// Ensure sequential plan application
|
||||
h.planLock.Lock()
|
||||
defer h.planLock.Unlock()
|
||||
|
||||
// Check that the evaluation was already blocked.
|
||||
old, err := h.State.EvalByID(eval.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if old == nil {
|
||||
return fmt.Errorf("evaluation does not exist to be reblocked")
|
||||
}
|
||||
if old.Status != structs.EvalStatusBlocked {
|
||||
return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID)
|
||||
}
|
||||
|
||||
h.ReblockEvals = append(h.ReblockEvals, eval)
|
||||
return nil
|
||||
}
|
||||
|
||||
// NextIndex returns the next index
|
||||
func (h *Harness) NextIndex() uint64 {
|
||||
h.nextIndexLock.Lock()
|
||||
|
||||
Reference in New Issue
Block a user