schedulder: handle tainted nodes

This commit is contained in:
Armon Dadgar
2015-08-13 16:48:34 -07:00
parent 8bb8a304df
commit 404ed068e7

View File

@@ -39,20 +39,19 @@ func NewServiceScheduler(logger *log.Logger, state State, planner Planner) Sched
func (s *ServiceScheduler) Process(eval *structs.Evaluation) error {
// Use the evaluation trigger reason to determine what we need to do
switch eval.TriggeredBy {
case structs.EvalTriggerJobRegister:
return s.handleJobRegister(eval)
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate:
return s.computeJobAllocs(eval)
case structs.EvalTriggerJobDeregister:
return s.evictJobAllocs(eval)
case structs.EvalTriggerNodeUpdate:
return s.handleNodeUpdate(eval)
default:
return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
}
}
// handleJobRegister is used to handle a job being registered or updated
func (s *ServiceScheduler) handleJobRegister(eval *structs.Evaluation) error {
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *ServiceScheduler) computeJobAllocs(eval *structs.Evaluation) error {
attempts := 0
START:
// Check the attempt count
@@ -90,18 +89,23 @@ START:
eval.JobID, err)
}
// TODO: Lookup the nodes the Allocs are on, potentially evict
// Determine the tainted nodes containing job allocs
tainted, err := s.taintedNodes(allocs)
if err != nil {
return fmt.Errorf("failed to get tainted nodes for job '%s': %v",
eval.JobID, err)
}
// Index the existing allocations
indexed := indexAllocs(allocs)
// Diff the required and existing allocations
place, update, evict, ignore := diffAllocs(job, groups, indexed)
s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d placements, %d updates, %d evictions, %d ignored allocs",
eval.ID, eval.JobID, len(place), len(update), len(evict), len(ignore))
place, update, migrate, evict, ignore := diffAllocs(job, tainted, groups, indexed)
s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d placements, %d updates, %d migrations, %d evictions, %d ignored allocs",
eval.ID, eval.JobID, len(place), len(update), len(migrate), len(evict), len(ignore))
// Fast-pass if nothing to do
if len(place) == 0 && len(update) == 0 && len(evict) == 0 {
if len(place) == 0 && len(update) == 0 && len(evict) == 0 && len(migrate) == 0 {
return nil
}
@@ -151,6 +155,25 @@ START:
return nil
}
// taintedNodes is used to scan the allocations and then check if the
// underlying nodes are tainted, and should force a migration of the allocation.
func (s *ServiceScheduler) taintedNodes(allocs []*structs.Allocation) (map[string]bool, error) {
out := make(map[string]bool)
for _, alloc := range allocs {
if _, ok := out[alloc.NodeID]; ok {
continue
}
node, err := s.state.GetNodeByID(alloc.NodeID)
if err != nil {
return nil, err
}
out[alloc.NodeID] = structs.ShouldDrainNode(node.Status)
}
return out, nil
}
// IteratorStack is used to hold pointers to each of the
// iterators which are chained together to do selection.
// Half of the stack is used for feasibility checking, while
@@ -202,8 +225,7 @@ func (s *ServiceScheduler) iterStack(job *structs.Job,
// Apply the bin packing, this depends on the resources needed by
// a particular task group.
// TODO: Support eviction in the future
stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, false, job.Priority)
stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, true, job.Priority)
// Apply a limit function. This is to avoid scanning *every* possible node.
// Instead we need to visit "enough". Using a log of the total number of
@@ -292,13 +314,6 @@ func (s *ServiceScheduler) planAllocations(stack *IteratorStack, job *structs.Jo
return nil
}
// handleNodeUpdate is used to handle an update to a node status where
// there is an existing allocation for this job
func (s *ServiceScheduler) handleNodeUpdate(eval *structs.Evaluation) error {
// TODO
return nil
}
// evictJobAllocs is used to evict all job allocations
func (s *ServiceScheduler) evictJobAllocs(eval *structs.Evaluation) error {
START: