diff --git a/scheduler/service_sched.go b/scheduler/service_sched.go index 3ad8d8ce3..35b24ce17 100644 --- a/scheduler/service_sched.go +++ b/scheduler/service_sched.go @@ -45,32 +45,26 @@ func (s *ServiceScheduler) Process(eval *structs.Evaluation) error { // Store the evaluation s.eval = eval - // Use the evaluation trigger reason to determine what we need to do + // Verify the evaluation trigger reason is understood switch eval.TriggeredBy { - case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate: - return s.process(s.computeJobAllocs) - case structs.EvalTriggerJobDeregister: - return s.process(s.evictJobAllocs) + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerJobDeregister: default: return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) } + + // Retry up to the maxScheduleAttempts + return retryMax(maxScheduleAttempts, s.process) } -// process is used to iteratively run the handler until we have no +// process is wrapped in retryMax to iteratively run the handler until we have no // further work or we've made the maximum number of attempts. -func (s *ServiceScheduler) process(handler func() error) error { -START: - // Check the attempt count - if s.attempts == maxScheduleAttempts { - return fmt.Errorf("maximum schedule attempts reached (%d)", s.attempts) - } - s.attempts += 1 - +func (s *ServiceScheduler) process() (bool, error) { // Lookup the Job by ID job, err := s.state.GetJobByID(s.eval.JobID) if err != nil { - return fmt.Errorf("failed to get job '%s': %v", + return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } s.job = job @@ -78,23 +72,23 @@ START: // Create a plan s.plan = s.eval.MakePlan(job) - // Invoke the handler to setup the plan - if err := handler(); err != nil { + // Compute the target job allocations + if err := s.computeJobAllocs(); err != nil { s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err) - return err + return false, err } // Submit the plan result, newState, err := s.planner.SubmitPlan(s.plan) if err != nil { - return err + return false, err } // If we got a state refresh, try again since we have stale data if newState != nil { s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) s.state = newState - goto START + return false, nil } // Try again if the plan was not fully committed, potential conflict @@ -102,23 +96,22 @@ START: if !fullCommit { s.logger.Printf("[DEBUG] sched: %#v: attempted %d placements, %d placed", s.eval, expected, actual) - goto START + return false, nil } - return nil + + // Success! + return true, nil } // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. func (s *ServiceScheduler) computeJobAllocs() error { - // If the job is missing, maybe a concurrent deregister - if s.job == nil { - s.logger.Printf("[DEBUG] sched: %#v: job not found, skipping", s.eval) - return nil + // Materialize all the task groups, job could be missing if deregistered + var groups map[string]*structs.TaskGroup + if s.job != nil { + groups = materializeTaskGroups(s.job) } - // Materialize all the task groups - groups := materializeTaskGroups(s.job) - // If there is nothing required for this job, treat like a deregister if len(groups) == 0 { return s.evictJobAllocs() @@ -170,9 +163,27 @@ func (s *ServiceScheduler) computeJobAllocs() error { return fmt.Errorf("failed to create iter stack: %v", err) } - // Attempt to place all the allocations - if err := s.planAllocations(stack, place, groups); err != nil { - return fmt.Errorf("failed to plan allocations: %v", err) + for _, missing := range place { + stack.SetTaskGroup(groups[missing.Name]) + option := stack.Select() + if option == nil { + s.logger.Printf("[DEBUG] sched: %#v: failed to place alloc %s", + s.eval, missing) + continue + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: mock.GenerateUUID(), + Name: missing.Name, + NodeID: option.Node.ID, + JobID: s.job.ID, + Job: s.job, + Resources: nil, // TODO: size + Metrics: nil, + Status: structs.AllocStatusPending, + } + s.plan.AppendAlloc(alloc) } return nil } @@ -196,28 +207,10 @@ func (s *ServiceScheduler) taintedNodes(allocs []*structs.Allocation) (map[strin return out, nil } -// IteratorStack is used to hold pointers to each of the -// iterators which are chained together to do selection. -// Half of the stack is used for feasibility checking, while -// the second half of the stack is used for ranking and selection. -type IteratorStack struct { - Context *EvalContext - BaseNodes []*structs.Node - Source *StaticIterator - JobConstraint *ConstraintIterator - TaskGroupDrivers *DriverIterator - TaskGroupConstraint *ConstraintIterator - RankSource *FeasibleRankIterator - BinPack *BinPackIterator - Limit *LimitIterator - MaxScore *MaxScoreIterator -} - -// iterStack is used to get a set of base nodes and to -// initialize the entire stack of iterators. -func (s *ServiceScheduler) iterStack() (*IteratorStack, error) { +// makeStack is used to setup the ServiceStack used for node placement +func (s *ServiceScheduler) iterStack() (*ServiceStack, error) { // Create a new stack - stack := new(IteratorStack) + stack := new(ServiceStack) // Create an evaluation context stack.Context = NewEvalContext(s.state, s.plan, s.logger) @@ -266,55 +259,6 @@ func (s *ServiceScheduler) iterStack() (*IteratorStack, error) { return stack, nil } -func (s *ServiceScheduler) planAllocations(stack *IteratorStack, - place []allocNameID, groups map[string]*structs.TaskGroup) error { - - // Attempt to place each missing allocation - for _, missing := range place { - taskGroup := groups[missing.Name] - - // Collect the constraints, drivers and resources required by each - // sub-task to aggregate the TaskGroup totals - constr := make([]*structs.Constraint, 0, len(taskGroup.Constraints)) - drivers := make(map[string]struct{}) - size := new(structs.Resources) - constr = append(constr, taskGroup.Constraints...) - for _, task := range taskGroup.Tasks { - drivers[task.Driver] = struct{}{} - constr = append(constr, task.Constraints...) - size.Add(task.Resources) - } - - // Update the parameters of iterators - stack.MaxScore.Reset() - stack.TaskGroupDrivers.SetDrivers(drivers) - stack.TaskGroupConstraint.SetConstraints(constr) - stack.BinPack.SetResources(size) - - // Select the best fit - option := stack.MaxScore.Next() - if option == nil { - s.logger.Printf("[DEBUG] sched: %#v: failed to place alloc %s", - s.eval, missing) - continue - } - - // Create an allocation for this - alloc := &structs.Allocation{ - ID: mock.GenerateUUID(), - Name: missing.Name, - NodeID: option.Node.ID, - JobID: s.job.ID, - Job: s.job, - Resources: size, - Metrics: nil, - Status: structs.AllocStatusPending, - } - s.plan.AppendAlloc(alloc) - } - return nil -} - // evictJobAllocs is used to evict all job allocations func (s *ServiceScheduler) evictJobAllocs() error { // Lookup the allocations by JobID diff --git a/scheduler/service_sched_test.go b/scheduler/service_sched_test.go index 5480f6a9d..c3cb29595 100644 --- a/scheduler/service_sched_test.go +++ b/scheduler/service_sched_test.go @@ -12,7 +12,6 @@ func TestServiceSched_JobDeregister(t *testing.T) { // Generate a fake job with allocations job := mock.Job() - noErr(t, h.State.RegisterJob(h.NextIndex(), job)) var allocs []*structs.Allocation for i := 0; i < 10; i++ { diff --git a/scheduler/util.go b/scheduler/util.go index 6d4fc55e7..aa9b333cf 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -123,3 +123,20 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) { } return out, nil } + +// retryMax is used to retry a callback until it returns success or +// a maximum number of attempts is reached +func retryMax(max int, cb func() (bool, error)) error { + attempts := 0 + for attempts < max { + done, err := cb() + if err != nil { + return err + } + if done { + return nil + } + attempts += 1 + } + return fmt.Errorf("maximum attempts reached (%d)", max) +} diff --git a/scheduler/util_test.go b/scheduler/util_test.go index 5b86ec65d..6b9e135fc 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -189,3 +189,31 @@ func TestReadyNodesInDCs(t *testing.T) { t.Fatalf("Bad: %#v", nodes) } } + +func TestRetryMax(t *testing.T) { + calls := 0 + bad := func() (bool, error) { + calls += 1 + return false, nil + } + err := retryMax(3, bad) + if err == nil { + t.Fatalf("should fail") + } + if calls != 3 { + t.Fatalf("mis match") + } + + calls = 0 + good := func() (bool, error) { + calls += 1 + return true, nil + } + err = retryMax(3, good) + if err != nil { + t.Fatalf("err: %v") + } + if calls != 1 { + t.Fatalf("mis match") + } +}