scheduler: working on job updates

This commit is contained in:
Armon Dadgar
2015-08-11 16:41:48 -07:00
parent 513d56db1c
commit 85938507aa
2 changed files with 163 additions and 1 deletions

View File

@@ -60,6 +60,9 @@ type State interface {
// GetNodeByID is used to lookup a node by ID
GetNodeByID(nodeID string) (*structs.Node, error)
// GetJobByID is used to lookup a job by ID
GetJobByID(id string) (*structs.Job, error)
}
// Planner interface is used to submit a task allocation plan.

View File

@@ -45,10 +45,70 @@ func (s *ServiceScheduler) Process(eval *structs.Evaluation) error {
// handleJobRegister is used to handle a job being registered or updated
func (s *ServiceScheduler) handleJobRegister(eval *structs.Evaluation) error {
// Lookup the Job by ID
job, err := s.state.GetJobByID(eval.JobID)
if err != nil {
return fmt.Errorf("failed to get job '%s': %v",
eval.JobID, err)
}
// If the job is missing, maybe a concurrent deregister
if job == nil {
s.logger.Printf("[DEBUG] sched: skipping eval %s, job %s not found",
eval.ID, eval.JobID)
return nil
}
// Materialize all the task groups
groups := materializeTaskGroups(job)
// If there is nothing required for this job, treat like a deregister
if len(groups) == 0 {
return s.handleJobDeregister(eval)
}
// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(eval.JobID)
if err != nil {
return fmt.Errorf("failed to get allocs for job '%s': %v",
eval.JobID, err)
}
// Index the existing allocations
indexed := indexAllocs(allocs)
// Diff the required and existing allocations
place, update, evict, ignore := diffAllocs(job, groups, indexed)
s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d placements, %d updates, %d evictions, %d ignored allocs",
eval.ID, eval.JobID, len(place), len(update), len(evict), len(ignore))
// Fast-pass if nothing to do
if len(place) == 0 && len(update) == 0 && len(evict) == 0 {
return nil
}
// Start a plan for this evaluation
plan := eval.MakePlan(job)
// Add all the evicts
addEvictsToPlan(plan, evict, indexed)
// For simplicity, we treat all updates as an evict + place.
// XXX: This should be done with rolling in-place updates instead.
addEvictsToPlan(plan, update, indexed)
place = append(place, update...)
// Attempt to place all the allocations
planAllocations(job, plan, place, groups)
// TODO
return nil
}
func planAllocations(job *structs.Job, plan *structs.Plan,
place []allocNameID, groups map[string]*structs.TaskGroup) {
}
// handleJobDeregister is used to handle a job being deregistered
func (s *ServiceScheduler) handleJobDeregister(eval *structs.Evaluation) error {
START:
@@ -60,6 +120,8 @@ START:
}
// Nothing to do if there is no evictsion
s.logger.Printf("[DEBUG] sched: eval %s job %s needs %d evictions",
eval.ID, eval.JobID, len(allocs))
if len(allocs) == 0 {
return nil
}
@@ -93,9 +155,106 @@ START:
return nil
}
// handleNodeUPdate is used to handle an update to a node status where
// handleNodeUpdate is used to handle an update to a node status where
// there is an existing allocation for this job
func (s *ServiceScheduler) handleNodeUpdate(eval *structs.Evaluation) error {
// TODO
return nil
}
// materializeTaskGroups is used to materialize all the task groups
// a job requires. This is used to do the count expansion.
func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup)
for _, tg := range job.TaskGroups {
for i := 0; i < tg.Count; i++ {
name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i)
out[name] = tg
}
}
return out
}
// indexAllocs is used to index a list of allocations by name
func indexAllocs(allocs []*structs.Allocation) map[string][]*structs.Allocation {
out := make(map[string][]*structs.Allocation)
for _, alloc := range allocs {
name := alloc.Name
out[name] = append(out[name], alloc)
}
return out
}
// allocNameID is a tuple of the allocation name and ID
type allocNameID struct {
Name string
ID string
}
// diffAllocs is used to do a set difference between the target allocations
// and the existing allocations. This returns 4 sets of results, the list of
// named task groups that need to be placed (no existing allocation), the
// allocations that need to be updated (job definition is newer), the allocs
// that need to be evicted (no longer required), and those that should be
// ignored.
func diffAllocs(job *structs.Job,
required map[string]*structs.TaskGroup,
existing map[string][]*structs.Allocation) (place, update, evict, ignore []allocNameID) {
// Scan the existing updates
for name, existList := range existing {
for _, exist := range existList {
// Check for the definition in the required set
_, ok := required[name]
// If not required, we evict
if !ok {
evict = append(evict, allocNameID{name, exist.ID})
continue
}
// If the definition is updated we need to update
// XXX: This is an extremely conservative approach. We can check
// if the job definition has changed in a way that affects
// this allocation and potentially ignore it.
if job.ModifyIndex != exist.Job.ModifyIndex {
update = append(update, allocNameID{name, exist.ID})
continue
}
// Everything is up-to-date
ignore = append(ignore, allocNameID{name, exist.ID})
}
}
// Scan the required groups
for name := range required {
// Check for an existing allocation
_, ok := existing[name]
// Require a placement if no existing allocation. If there
// is an existing allocation, we would have checked for a potential
// update or ignore above.
if !ok {
place = append(place, allocNameID{name, ""})
}
}
return
}
// addEvictsToPlan is used to add all the evictions to the plan
func addEvictsToPlan(plan *structs.Plan,
evicts []allocNameID, indexed map[string][]*structs.Allocation) {
for _, evict := range evicts {
list := indexed[evict.Name]
for _, alloc := range list {
if alloc.ID != evict.ID {
continue
}
// Add this eviction to the per-node list
nodeEvict := plan.NodeEvict[alloc.NodeID]
nodeEvict = append(nodeEvict, evict.ID)
plan.NodeEvict[alloc.NodeID] = nodeEvict
}
}
}