diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index 712111697..25895096e 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -202,6 +202,46 @@ func (e *Eval) Create(args *structs.EvalUpdateRequest, return nil } +// Reblock is used to reinsert an existing blocked evaluation into the blocked +// evaluation tracker. +func (e *Eval) Reblock(args *structs.EvalUpdateRequest, reply *structs.GenericResponse) error { + if done, err := e.srv.forward("Eval.Reblock", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "eval", "reblock"}, time.Now()) + + // Ensure there is only a single update with token + if len(args.Evals) != 1 { + return fmt.Errorf("only a single eval can be reblocked") + } + eval := args.Evals[0] + + // Verify the evaluation is outstanding, and that the tokens match. + if err := e.srv.evalBroker.OutstandingReset(eval.ID, args.EvalToken); err != nil { + return err + } + + // Look for the eval + snap, err := e.srv.fsm.State().Snapshot() + if err != nil { + return err + } + out, err := snap.EvalByID(eval.ID) + if err != nil { + return err + } + if out == nil { + return fmt.Errorf("evaluation does not exist") + } + if out.Status != structs.EvalStatusBlocked { + return fmt.Errorf("evaluation not blocked") + } + + // Reblock the eval + e.srv.blockedEvals.Block(eval) + return nil +} + // Reap is used to cleanup dead evaluations and allocations func (e *Eval) Reap(args *structs.EvalDeleteRequest, reply *structs.GenericResponse) error { diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index f037c8bd3..3fa270ce8 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -575,3 +575,121 @@ func TestEvalEndpoint_Allocations_Blocking(t *testing.T) { t.Fatalf("bad: %#v", resp.Allocations) } } + +func TestEvalEndpoint_Reblock_NonExistent(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the register request + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil { + t.Fatalf("expect error since eval does not exist") + } +} + +func TestEvalEndpoint_Reblock_NonBlocked(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the eval + eval1 := mock.Eval() + s1.evalBroker.Enqueue(eval1) + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err == nil { + t.Fatalf("should error since eval was not in blocked state", err) + } +} + +func TestEvalEndpoint_Reblock(t *testing.T) { + s1 := testServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + + testutil.WaitForResult(func() (bool, error) { + return s1.evalBroker.Enabled(), nil + }, func(err error) { + t.Fatalf("should enable eval broker") + }) + + // Create the eval + eval1 := mock.Eval() + eval1.Status = structs.EvalStatusBlocked + s1.evalBroker.Enqueue(eval1) + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out == nil { + t.Fatalf("missing eval") + } + + get := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval1}, + EvalToken: token, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var resp structs.GenericResponse + if err := msgpackrpc.CallWithCodec(codec, "Eval.Reblock", get, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check that it is blocked + bStats := s1.blockedEvals.Stats() + if bStats.TotalBlocked+bStats.TotalEscaped == 0 { + t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker") + } +} diff --git a/nomad/worker.go b/nomad/worker.go index 1bae409d5..9cb1a286a 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -395,6 +395,43 @@ SUBMIT: return nil } +// ReblockEval is used to reinsert a blocked evaluation into the blocked eval +// tracker. This allows the worker to act as the planner for the scheduler. +func (w *Worker) ReblockEval(eval *structs.Evaluation) error { + // Check for a shutdown before plan submission + if w.srv.IsShutdown() { + return fmt.Errorf("shutdown while planning") + } + defer metrics.MeasureSince([]string{"nomad", "worker", "reblock_eval"}, time.Now()) + + // TODO need to set the evaluations WorkerIndex + + // Setup the request + req := structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + EvalToken: w.evalToken, + WriteRequest: structs.WriteRequest{ + Region: w.srv.config.Region, + }, + } + var resp structs.GenericResponse + +SUBMIT: + // Make the RPC call + if err := w.srv.RPC("Eval.Reblock", &req, &resp); err != nil { + w.logger.Printf("[ERR] worker: failed to reblock evaluation %#v: %v", + eval, err) + if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) { + goto SUBMIT + } + return err + } else { + w.logger.Printf("[DEBUG] worker: reblocked evaluation %#v", eval) + w.backoffReset() + } + return nil +} + // shouldResubmit checks if a given error should be swallowed and the plan // resubmitted after a backoff. Usually these are transient errors that // the cluster should heal from quickly. diff --git a/nomad/worker_test.go b/nomad/worker_test.go index c1663592a..6b4f657a9 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -442,3 +442,51 @@ func TestWorker_CreateEval(t *testing.T) { t.Fatalf("bad: %v", out) } } + +func TestWorker_ReblockEval(t *testing.T) { + s1 := testServer(t, func(c *Config) { + c.NumSchedulers = 0 + c.EnabledSchedulers = []string{structs.JobTypeService} + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create the blocked eval + eval1 := mock.Eval() + eval1.Status = structs.EvalStatusBlocked + + // Insert it into the state store + if err := s1.fsm.State().UpsertEvals(1000, []*structs.Evaluation{eval1}); err != nil { + t.Fatal(err) + } + + // Enqueue the eval + testutil.WaitForResult(func() (bool, error) { + err := s1.evalBroker.Enqueue(eval1) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + evalOut, token, err := s1.evalBroker.Dequeue([]string{eval1.Type}, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if evalOut != eval1 { + t.Fatalf("Bad eval") + } + + eval2 := evalOut.Copy() + + // Attempt to reblock eval + w := &Worker{srv: s1, logger: s1.logger, evalToken: token} + err = w.ReblockEval(eval2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Check that it is blocked + bStats := s1.blockedEvals.Stats() + if bStats.TotalBlocked+bStats.TotalEscaped == 0 { + t.Fatalf("ReblockEval didn't insert eval into the blocked eval tracker") + } +} diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 4f3bce9b6..8276acc97 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -113,6 +113,11 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { if statusErr, ok := err.(*SetStatusError); ok { // Scheduling was tried but made no forward progress so create a // blocked eval to retry once resources become available. + + // TODO: Set the trigger by reason of the blocked eval here to + // something like "max-attempts" + // We can then periodically dequeue these from the blocked_eval + // tracker. var mErr multierror.Error if err := s.createBlockedEval(); err != nil { mErr.Errors = append(mErr.Errors, err) @@ -125,6 +130,12 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { return err } + // If the current evaluation is a blocked evaluation and we didn't place + // everything, do not update the status to complete. + if s.eval.Status == structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 { + return s.planner.ReblockEval(s.eval) + } + // Update the status to complete return setStatus(s.logger, s.planner, s.eval, s.nextEval, s.blocked, structs.EvalStatusComplete, "") } @@ -177,8 +188,9 @@ func (s *GenericScheduler) process() (bool, error) { } // If there are failed allocations, we need to create a blocked evaluation - // to place the failed allocations when resources become available. - if len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { + // to place the failed allocations when resources become available. If the + // current evaluation is already a blocked eval, we reuse it. + if s.eval.Status != structs.EvalStatusBlocked && len(s.eval.FailedTGAllocs) != 0 && s.blocked == nil { if err := s.createBlockedEval(); err != nil { s.logger.Printf("[ERR] sched: %#v failed to make blocked eval: %v", s.eval, err) return false, err diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 52a46ff90..d44d76d38 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -275,7 +275,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestServiceSched_JobRegister_BlockedEval(t *testing.T) { +func TestServiceSched_JobRegister_CreateBlockedEval(t *testing.T) { h := NewHarness(t) // Create a full node @@ -454,6 +454,126 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_EvaluateBlockedEval(t *testing.T) { + h := NewHarness(t) + + // Create a job and set the task group count to zero. + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock blocked evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Status: structs.EvalStatusBlocked, + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Insert it into the state store + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure there was no plan + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Ensure that the eval was reblocked + if len(h.ReblockEvals) != 1 { + t.Fatalf("bad: %#v", h.ReblockEvals) + } + if h.ReblockEvals[0].ID != eval.ID { + t.Fatalf("expect same eval to be reblocked; got %q; want %q", h.ReblockEvals[0].ID, eval.ID) + } + + // Ensure the eval status was not updated + if len(h.Evals) != 0 { + t.Fatalf("Existing eval should not have status set") + } +} + +func TestServiceSched_EvaluateBlockedEval_Finished(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job and set the task group count to zero. + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock blocked evaluation + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Status: structs.EvalStatusBlocked, + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Insert it into the state store + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + if plan.Annotations != nil { + t.Fatalf("expected no annotations") + } + + // Ensure the eval has no spawned blocked eval + if len(h.Evals) != 1 { + t.Fatalf("bad: %#v", h.Evals) + if h.Evals[0].SpawnedBlockedEval != "" { + t.Fatalf("bad: %#v", h.Evals[0]) + } + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + // Ensure the eval was not reblocked + if len(h.ReblockEvals) != 0 { + t.Fatalf("Existing eval should not have been reblocked as it placed all allocations") + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobModify(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 518280d07..d96c6aed7 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -87,4 +87,10 @@ type Planner interface { // CreateEval is used to create an evaluation. This should set the // PreviousEval to that of the current evaluation. CreateEval(*structs.Evaluation) error + + // ReblockEval takes a blocked evaluation and re-inserts it into the blocked + // evaluation tracker. This update occurs only in-memory on the leader. The + // evaluation must exist in a blocked state prior to this being called such + // that on leader changes, the evaluation will be reblocked properly. + ReblockEval(*structs.Evaluation) error } diff --git a/scheduler/testing.go b/scheduler/testing.go index b7a4d7da1..19cc0221f 100644 --- a/scheduler/testing.go +++ b/scheduler/testing.go @@ -1,6 +1,7 @@ package scheduler import ( + "fmt" "log" "os" "sync" @@ -29,6 +30,10 @@ func (r *RejectPlan) CreateEval(*structs.Evaluation) error { return nil } +func (r *RejectPlan) ReblockEval(*structs.Evaluation) error { + return nil +} + // Harness is a lightweight testing harness for schedulers. It manages a state // store copy and provides the planner interface. It can be extended for various // testing uses or for invoking the scheduler without side effects. @@ -38,9 +43,10 @@ type Harness struct { Planner Planner planLock sync.Mutex - Plans []*structs.Plan - Evals []*structs.Evaluation - CreateEvals []*structs.Evaluation + Plans []*structs.Plan + Evals []*structs.Evaluation + CreateEvals []*structs.Evaluation + ReblockEvals []*structs.Evaluation nextIndex uint64 nextIndexLock sync.Mutex @@ -138,6 +144,28 @@ func (h *Harness) CreateEval(eval *structs.Evaluation) error { return nil } +func (h *Harness) ReblockEval(eval *structs.Evaluation) error { + // Ensure sequential plan application + h.planLock.Lock() + defer h.planLock.Unlock() + + // Check that the evaluation was already blocked. + old, err := h.State.EvalByID(eval.ID) + if err != nil { + return err + } + + if old == nil { + return fmt.Errorf("evaluation does not exist to be reblocked") + } + if old.Status != structs.EvalStatusBlocked { + return fmt.Errorf("evaluation %q is not already in a blocked state", old.ID) + } + + h.ReblockEvals = append(h.ReblockEvals, eval) + return nil +} + // NextIndex returns the next index func (h *Harness) NextIndex() uint64 { h.nextIndexLock.Lock()