diff --git a/scheduler/service_sched.go b/scheduler/service_sched.go index b50fc206e..3ee1c1671 100644 --- a/scheduler/service_sched.go +++ b/scheduler/service_sched.go @@ -39,20 +39,19 @@ func NewServiceScheduler(logger *log.Logger, state State, planner Planner) Sched func (s *ServiceScheduler) Process(eval *structs.Evaluation) error { // Use the evaluation trigger reason to determine what we need to do switch eval.TriggeredBy { - case structs.EvalTriggerJobRegister: - return s.handleJobRegister(eval) + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate: + return s.computeJobAllocs(eval) case structs.EvalTriggerJobDeregister: return s.evictJobAllocs(eval) - case structs.EvalTriggerNodeUpdate: - return s.handleNodeUpdate(eval) default: return fmt.Errorf("service scheduler cannot handle '%s' evaluation reason", eval.TriggeredBy) } } -// handleJobRegister is used to handle a job being registered or updated -func (s *ServiceScheduler) handleJobRegister(eval *structs.Evaluation) error { +// computeJobAllocs is used to reconcile differences between the job, +// existing allocations and node status to update the allocations. +func (s *ServiceScheduler) computeJobAllocs(eval *structs.Evaluation) error { attempts := 0 START: // Check the attempt count @@ -90,18 +89,23 @@ START: eval.JobID, err) } - // TODO: Lookup the nodes the Allocs are on, potentially evict + // Determine the tainted nodes containing job allocs + tainted, err := s.taintedNodes(allocs) + if err != nil { + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", + eval.JobID, err) + } // Index the existing allocations indexed := indexAllocs(allocs) // Diff the required and existing allocations - place, update, evict, ignore := diffAllocs(job, groups, indexed) - s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d placements, %d updates, %d evictions, %d ignored allocs", - eval.ID, eval.JobID, len(place), len(update), len(evict), len(ignore)) + place, update, migrate, evict, ignore := diffAllocs(job, tainted, groups, indexed) + s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d placements, %d updates, %d migrations, %d evictions, %d ignored allocs", + eval.ID, eval.JobID, len(place), len(update), len(migrate), len(evict), len(ignore)) // Fast-pass if nothing to do - if len(place) == 0 && len(update) == 0 && len(evict) == 0 { + if len(place) == 0 && len(update) == 0 && len(evict) == 0 && len(migrate) == 0 { return nil } @@ -151,6 +155,25 @@ START: return nil } +// taintedNodes is used to scan the allocations and then check if the +// underlying nodes are tainted, and should force a migration of the allocation. +func (s *ServiceScheduler) taintedNodes(allocs []*structs.Allocation) (map[string]bool, error) { + out := make(map[string]bool) + for _, alloc := range allocs { + if _, ok := out[alloc.NodeID]; ok { + continue + } + + node, err := s.state.GetNodeByID(alloc.NodeID) + if err != nil { + return nil, err + } + + out[alloc.NodeID] = structs.ShouldDrainNode(node.Status) + } + return out, nil +} + // IteratorStack is used to hold pointers to each of the // iterators which are chained together to do selection. // Half of the stack is used for feasibility checking, while @@ -202,8 +225,7 @@ func (s *ServiceScheduler) iterStack(job *structs.Job, // Apply the bin packing, this depends on the resources needed by // a particular task group. - // TODO: Support eviction in the future - stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, false, job.Priority) + stack.BinPack = NewBinPackIterator(stack.Context, stack.RankSource, nil, true, job.Priority) // Apply a limit function. This is to avoid scanning *every* possible node. // Instead we need to visit "enough". Using a log of the total number of @@ -292,13 +314,6 @@ func (s *ServiceScheduler) planAllocations(stack *IteratorStack, job *structs.Jo return nil } -// handleNodeUpdate is used to handle an update to a node status where -// there is an existing allocation for this job -func (s *ServiceScheduler) handleNodeUpdate(eval *structs.Evaluation) error { - // TODO - return nil -} - // evictJobAllocs is used to evict all job allocations func (s *ServiceScheduler) evictJobAllocs(eval *structs.Evaluation) error { START: