scheduler: trying to simplify further

This commit is contained in:
Armon Dadgar
2015-08-13 17:40:23 -07:00
parent 0155fd0e28
commit 5df7d904df
4 changed files with 91 additions and 103 deletions

View File

@@ -45,32 +45,26 @@ func (s *ServiceScheduler) Process(eval *structs.Evaluation) error {
// Store the evaluation
s.eval = eval
// Use the evaluation trigger reason to determine what we need to do
// Verify the evaluation trigger reason is understood
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate:
return s.process(s.computeJobAllocs)
case structs.EvalTriggerJobDeregister:
return s.process(s.evictJobAllocs)
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerJobDeregister:
default:
return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
}
// Retry up to the maxScheduleAttempts
return retryMax(maxScheduleAttempts, s.process)
}
// process is used to iteratively run the handler until we have no
// process is wrapped in retryMax to iteratively run the handler until we have no
// further work or we've made the maximum number of attempts.
func (s *ServiceScheduler) process(handler func() error) error {
START:
// Check the attempt count
if s.attempts == maxScheduleAttempts {
return fmt.Errorf("maximum schedule attempts reached (%d)", s.attempts)
}
s.attempts += 1
func (s *ServiceScheduler) process() (bool, error) {
// Lookup the Job by ID
job, err := s.state.GetJobByID(s.eval.JobID)
if err != nil {
return fmt.Errorf("failed to get job '%s': %v",
return false, fmt.Errorf("failed to get job '%s': %v",
s.eval.JobID, err)
}
s.job = job
@@ -78,23 +72,23 @@ START:
// Create a plan
s.plan = s.eval.MakePlan(job)
// Invoke the handler to setup the plan
if err := handler(); err != nil {
// Compute the target job allocations
if err := s.computeJobAllocs(); err != nil {
s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err)
return err
return false, err
}
// Submit the plan
result, newState, err := s.planner.SubmitPlan(s.plan)
if err != nil {
return err
return false, err
}
// If we got a state refresh, try again since we have stale data
if newState != nil {
s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval)
s.state = newState
goto START
return false, nil
}
// Try again if the plan was not fully committed, potential conflict
@@ -102,23 +96,22 @@ START:
if !fullCommit {
s.logger.Printf("[DEBUG] sched: %#v: attempted %d placements, %d placed",
s.eval, expected, actual)
goto START
return false, nil
}
return nil
// Success!
return true, nil
}
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *ServiceScheduler) computeJobAllocs() error {
// If the job is missing, maybe a concurrent deregister
if s.job == nil {
s.logger.Printf("[DEBUG] sched: %#v: job not found, skipping", s.eval)
return nil
// Materialize all the task groups, job could be missing if deregistered
var groups map[string]*structs.TaskGroup
if s.job != nil {
groups = materializeTaskGroups(s.job)
}
// Materialize all the task groups
groups := materializeTaskGroups(s.job)
// If there is nothing required for this job, treat like a deregister
if len(groups) == 0 {
return s.evictJobAllocs()
@@ -170,9 +163,27 @@ func (s *ServiceScheduler) computeJobAllocs() error {
return fmt.Errorf("failed to create iter stack: %v", err)
}
// Attempt to place all the allocations
if err := s.planAllocations(stack, place, groups); err != nil {
return fmt.Errorf("failed to plan allocations: %v", err)
for _, missing := range place {
stack.SetTaskGroup(groups[missing.Name])
option := stack.Select()
if option == nil {
s.logger.Printf("[DEBUG] sched: %#v: failed to place alloc %s",
s.eval, missing)
continue
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: mock.GenerateUUID(),
Name: missing.Name,
NodeID: option.Node.ID,
JobID: s.job.ID,
Job: s.job,
Resources: nil, // TODO: size
Metrics: nil,
Status: structs.AllocStatusPending,
}
s.plan.AppendAlloc(alloc)
}
return nil
}
@@ -196,28 +207,10 @@ func (s *ServiceScheduler) taintedNodes(allocs []*structs.Allocation) (map[strin
return out, nil
}
// IteratorStack is used to hold pointers to each of the
// iterators which are chained together to do selection.
// Half of the stack is used for feasibility checking, while
// the second half of the stack is used for ranking and selection.
type IteratorStack struct {
Context *EvalContext
BaseNodes []*structs.Node
Source *StaticIterator
JobConstraint *ConstraintIterator
TaskGroupDrivers *DriverIterator
TaskGroupConstraint *ConstraintIterator
RankSource *FeasibleRankIterator
BinPack *BinPackIterator
Limit *LimitIterator
MaxScore *MaxScoreIterator
}
// iterStack is used to get a set of base nodes and to
// initialize the entire stack of iterators.
func (s *ServiceScheduler) iterStack() (*IteratorStack, error) {
// makeStack is used to setup the ServiceStack used for node placement
func (s *ServiceScheduler) iterStack() (*ServiceStack, error) {
// Create a new stack
stack := new(IteratorStack)
stack := new(ServiceStack)
// Create an evaluation context
stack.Context = NewEvalContext(s.state, s.plan, s.logger)
@@ -266,55 +259,6 @@ func (s *ServiceScheduler) iterStack() (*IteratorStack, error) {
return stack, nil
}
func (s *ServiceScheduler) planAllocations(stack *IteratorStack,
place []allocNameID, groups map[string]*structs.TaskGroup) error {
// Attempt to place each missing allocation
for _, missing := range place {
taskGroup := groups[missing.Name]
// Collect the constraints, drivers and resources required by each
// sub-task to aggregate the TaskGroup totals
constr := make([]*structs.Constraint, 0, len(taskGroup.Constraints))
drivers := make(map[string]struct{})
size := new(structs.Resources)
constr = append(constr, taskGroup.Constraints...)
for _, task := range taskGroup.Tasks {
drivers[task.Driver] = struct{}{}
constr = append(constr, task.Constraints...)
size.Add(task.Resources)
}
// Update the parameters of iterators
stack.MaxScore.Reset()
stack.TaskGroupDrivers.SetDrivers(drivers)
stack.TaskGroupConstraint.SetConstraints(constr)
stack.BinPack.SetResources(size)
// Select the best fit
option := stack.MaxScore.Next()
if option == nil {
s.logger.Printf("[DEBUG] sched: %#v: failed to place alloc %s",
s.eval, missing)
continue
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: mock.GenerateUUID(),
Name: missing.Name,
NodeID: option.Node.ID,
JobID: s.job.ID,
Job: s.job,
Resources: size,
Metrics: nil,
Status: structs.AllocStatusPending,
}
s.plan.AppendAlloc(alloc)
}
return nil
}
// evictJobAllocs is used to evict all job allocations
func (s *ServiceScheduler) evictJobAllocs() error {
// Lookup the allocations by JobID

View File

@@ -12,7 +12,6 @@ func TestServiceSched_JobDeregister(t *testing.T) {
// Generate a fake job with allocations
job := mock.Job()
noErr(t, h.State.RegisterJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 10; i++ {

View File

@@ -123,3 +123,20 @@ func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) {
}
return out, nil
}
// retryMax is used to retry a callback until it returns success or
// a maximum number of attempts is reached
func retryMax(max int, cb func() (bool, error)) error {
attempts := 0
for attempts < max {
done, err := cb()
if err != nil {
return err
}
if done {
return nil
}
attempts += 1
}
return fmt.Errorf("maximum attempts reached (%d)", max)
}

View File

@@ -189,3 +189,31 @@ func TestReadyNodesInDCs(t *testing.T) {
t.Fatalf("Bad: %#v", nodes)
}
}
func TestRetryMax(t *testing.T) {
calls := 0
bad := func() (bool, error) {
calls += 1
return false, nil
}
err := retryMax(3, bad)
if err == nil {
t.Fatalf("should fail")
}
if calls != 3 {
t.Fatalf("mis match")
}
calls = 0
good := func() (bool, error) {
calls += 1
return true, nil
}
err = retryMax(3, good)
if err != nil {
t.Fatalf("err: %v")
}
if calls != 1 {
t.Fatalf("mis match")
}
}