From 666d66be23d7b609560ee196bf337169042c48aa Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 7 Sep 2015 12:27:12 -0700 Subject: [PATCH] scheduler: support in-place allocation updates --- scheduler/generic_sched.go | 122 ++++++++++++++++++++++++++------ scheduler/generic_sched_test.go | 89 +++++++++++++++++++++++ 2 files changed, 189 insertions(+), 22 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index be094767d..36fb821c6 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -25,6 +25,9 @@ const ( // allocUpdating is the status used when a job requires an update allocUpdating = "alloc is being updated due to job update" + + // allocInPlace is the status used when speculating on an in-place update + allocInPlace = "alloc updating in-place" ) // SetStatusError is used to set the status of the evaluation to the given error @@ -49,6 +52,7 @@ type GenericScheduler struct { batch bool eval *structs.Evaluation + job *structs.Job plan *structs.Plan ctx *EvalContext stack *GenericStack @@ -78,7 +82,7 @@ func NewBatchScheduler(logger *log.Logger, state State, planner Planner) Schedul // setStatus is used to update the status of the evaluation func (s *GenericScheduler) setStatus(status, desc string) error { - s.logger.Printf("[DEBUG] sched: %#v: setting status to %s (%s)", s.eval, status, desc) + s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) newEval := s.eval.Copy() newEval.Status = status newEval.StatusDescription = desc @@ -120,23 +124,27 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { // further work or we've made the maximum number of attempts. func (s *GenericScheduler) process() (bool, error) { // Lookup the Job by ID - job, err := s.state.JobByID(s.eval.JobID) + var err error + s.job, err = s.state.JobByID(s.eval.JobID) if err != nil { return false, fmt.Errorf("failed to get job '%s': %v", s.eval.JobID, err) } // Create a plan - s.plan = s.eval.MakePlan(job) + s.plan = s.eval.MakePlan(s.job) // Create an evaluation context s.ctx = NewEvalContext(s.state, s.plan, s.logger) // Construct the placement stack s.stack = NewGenericStack(s.batch, s.ctx, nil) + if s.job != nil { + s.stack.SetJob(s.job) + } // Compute the target job allocations - if err := s.computeJobAllocs(job); err != nil { + if err := s.computeJobAllocs(); err != nil { s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err) return false, err } @@ -173,11 +181,11 @@ func (s *GenericScheduler) process() (bool, error) { // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. -func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error { +func (s *GenericScheduler) computeJobAllocs() error { // Materialize all the task groups, job could be missing if deregistered var groups map[string]*structs.TaskGroup - if job != nil { - groups = materializeTaskGroups(job) + if s.job != nil { + groups = materializeTaskGroups(s.job) } // Lookup the allocations by JobID @@ -195,7 +203,7 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error { } // Diff the required and existing allocations - diff := diffAllocs(job, tainted, groups, allocs) + diff := diffAllocs(s.job, tainted, groups, allocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) // Add all the allocs to stop @@ -203,6 +211,16 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error { s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) } + // Attempt to do the upgrades in place + diff.update = s.inplaceUpdate(diff.update) + + // Any updates not possible inplace we treat all as an evict + place. + // XXX: This should be done with rolling in-place updates instead. + for _, e := range diff.update { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocUpdating) + } + diff.place = append(diff.place, diff.update...) + // For simplicity, we treat all migrates as an evict + place. // XXX: This could probably be done more intelligently? for _, e := range diff.migrate { @@ -210,28 +228,88 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error { } diff.place = append(diff.place, diff.migrate...) - // For simplicity, we treat all updates as an evict + place. - // XXX: This should be done with rolling in-place updates instead. - for _, e := range diff.update { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocUpdating) - } - diff.place = append(diff.place, diff.update...) - // Nothing remaining to do if placement is not required if len(diff.place) == 0 { return nil } // Compute the placements - return s.computePlacements(job, diff.place) + return s.computePlacements(diff.place) } -func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTuple) error { - // Create an evaluation context - ctx := NewEvalContext(s.state, s.plan, s.logger) +// inplaceUpdate attempts to update allocations in-place where possible. +func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { + n := len(updates) + inplace := 0 + for i := 0; i < n; i++ { + // Get the udpate + update := updates[i] + // Check if the task drivers or config has changed, requires + // a rolling upgrade since that cannot be done in-place. + existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) + if tasksUpdated(update.TaskGroup, existing) { + continue + } + + // Get the existing node + node, err := s.state.NodeByID(update.Alloc.NodeID) + if err != nil { + s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", + update.Alloc.NodeID, err) + continue + } + if node == nil { + continue + } + + // Set the existing node as the base set + s.stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation + s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, + allocInPlace) + + // Attempt to match the task group + option, size := s.stack.Select(update.TaskGroup) + + // Pop the allocation + s.plan.PopUpdate(update.Alloc) + + // Skip if we could not do an in-place update + if option == nil { + continue + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *update.Alloc + + // Update the allocation + newAlloc.EvalID = s.eval.ID + newAlloc.Job = s.job + newAlloc.Resources = size + newAlloc.Metrics = s.ctx.Metrics() + newAlloc.DesiredStatus = structs.AllocDesiredStatusRun + newAlloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(newAlloc) + + // Remove this allocation from the slice + updates[i] = updates[n-1] + i-- + n-- + inplace++ + } + if len(updates) > 0 { + s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) + } + return updates[:n] +} + +// computePlacements computes placements for allocations +func (s *GenericScheduler) computePlacements(place []allocTuple) error { // Get the base nodes - nodes, err := readyNodesInDCs(s.state, job.Datacenters) + nodes, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { return err } @@ -271,8 +349,8 @@ func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTupl EvalID: s.eval.ID, Name: missing.Name, NodeID: nodeID, - JobID: job.ID, - Job: job, + JobID: s.job.ID, + Job: s.job, TaskGroup: missing.TaskGroup.Name, Resources: size, Metrics: s.ctx.Metrics(), diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index cebe1f280..7089b9b0b 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -133,6 +133,7 @@ func TestServiceSched_JobModify(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -140,6 +141,9 @@ func TestServiceSched_JobModify(t *testing.T) { // Update the job job2 := mock.Job() job2.ID = job.ID + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) // Create a mock evaluation to deal with drain @@ -193,6 +197,91 @@ func TestServiceSched_JobModify(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobModify_InPlace(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.Job() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for i := 0; i < 10; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.Job() + job2.ID = job.ID + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: mock.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + for _, p := range planned { + if p.Job != job2 { + t.Fatalf("should update job") + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobDeregister(t *testing.T) { h := NewHarness(t)