From b24f48a4edc975073847fd86188ae90e2fc83399 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 14 Oct 2015 16:43:06 -0700 Subject: [PATCH] System scheduler and system stack --- nomad/mock/mock.go | 52 +++ nomad/structs/structs.go | 1 + scheduler/feasible.go | 15 +- scheduler/generic_sched.go | 2 +- scheduler/generic_sched_test.go | 6 +- scheduler/scheduler.go | 1 + scheduler/stack.go | 94 ++++- scheduler/stack_test.go | 202 ++++++++++ scheduler/system_sched.go | 392 +++++++++++++++++++ scheduler/system_sched_test.go | 651 ++++++++++++++++++++++++++++++++ scheduler/util.go | 32 ++ 11 files changed, 1442 insertions(+), 6 deletions(-) create mode 100644 scheduler/system_sched.go create mode 100644 scheduler/system_sched_test.go diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 351d8f7fd..5ed5af097 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -109,6 +109,58 @@ func Job() *structs.Job { return job } +func SystemJob() *structs.Job { + job := &structs.Job{ + Region: "global", + ID: structs.GenerateUUID(), + Name: "my-job", + Type: structs.JobTypeSystem, + Priority: 100, + AllAtOnce: false, + Datacenters: []string{"dc1"}, + Constraints: []*structs.Constraint{ + &structs.Constraint{ + Hard: true, + LTarget: "$attr.kernel.name", + RTarget: "linux", + Operand: "=", + }, + }, + TaskGroups: []*structs.TaskGroup{ + &structs.TaskGroup{ + Name: "web", + Tasks: []*structs.Task{ + &structs.Task{ + Name: "web", + Driver: "exec", + Config: map[string]string{ + "command": "/bin/date", + "args": "+%s", + }, + Resources: &structs.Resources{ + CPU: 500, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: []string{"http"}, + }, + }, + }, + }, + }, + }, + }, + Meta: map[string]string{ + "owner": "armon", + }, + Status: structs.JobStatusPending, + CreateIndex: 42, + ModifyIndex: 99, + } + return job +} + func Eval() *structs.Evaluation { eval := &structs.Evaluation{ ID: structs.GenerateUUID(), diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 09cef210a..e141bdeb3 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -699,6 +699,7 @@ const ( JobTypeCore = "_core" JobTypeService = "service" JobTypeBatch = "batch" + JobTypeSystem = "system" ) const ( diff --git a/scheduler/feasible.go b/scheduler/feasible.go index cbf811b8c..559252045 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -4,6 +4,7 @@ import ( "fmt" "reflect" "regexp" + "strconv" "strings" "github.com/hashicorp/go-version" @@ -129,10 +130,22 @@ func (iter *DriverIterator) Reset() { func (iter *DriverIterator) hasDrivers(option *structs.Node) bool { for driver := range iter.drivers { driverStr := fmt.Sprintf("driver.%s", driver) - _, ok := option.Attributes[driverStr] + value, ok := option.Attributes[driverStr] if !ok { return false } + + enabled, err := strconv.ParseBool(value) + if err != nil { + iter.ctx.Logger(). + Printf("[WARN] scheduler.DriverIterator: node %v has invalid driver setting %v: %v", + option.ID, driverStr, value) + return false + } + + if !enabled { + return false + } } return true } diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index bd5bb81e4..d29736cb6 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -275,7 +275,7 @@ func (s *GenericScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { n := len(updates) inplace := 0 for i := 0; i < n; i++ { - // Get the udpate + // Get the update update := updates[i] // Check if the task drivers or config has changed, requires diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index b6cef7837..dfb35cb3c 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -22,7 +22,7 @@ func TestServiceSched_JobRegister(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -71,7 +71,7 @@ func TestServiceSched_JobRegister_AllocFail(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, @@ -550,7 +550,7 @@ func TestServiceSched_RetryLimit(t *testing.T) { job := mock.Job() noErr(t, h.State.UpsertJob(h.NextIndex(), job)) - // Create a mock evaluation to deregister the job + // Create a mock evaluation to register the job eval := &structs.Evaluation{ ID: structs.GenerateUUID(), Priority: job.Priority, diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 8a62d7c85..baed71e73 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -13,6 +13,7 @@ import ( var BuiltinSchedulers = map[string]Factory{ "service": NewServiceScheduler, "batch": NewBatchScheduler, + "system": NewSystemScheduler, } // NewScheduler is used to instantiate and return a new scheduler diff --git a/scheduler/stack.go b/scheduler/stack.go index c1468602f..e16759bd3 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -109,7 +109,7 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) { // Apply a limit function. This is to avoid scanning *every* possible node. // For batch jobs we only need to evaluate 2 options and depend on the - // powwer of two choices. For services jobs we need to visit "enough". + // power of two choices. For services jobs we need to visit "enough". // Using a log of the total number of nodes is a good restriction, with // at least 2 as the floor limit := 2 @@ -165,3 +165,95 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso s.ctx.Metrics().AllocationTime = time.Since(start) return option, size } + +// SystemStack is the Stack used for the System scheduler. It is designed to +// attempt to make placements on all nodes. +type SystemStack struct { + ctx Context + source *StaticIterator + jobConstraint *ConstraintIterator + taskGroupDrivers *DriverIterator + taskGroupConstraint *ConstraintIterator + binPack *BinPackIterator +} + +// NewSystemStack constructs a stack used for selecting service placements +func NewSystemStack(ctx Context, baseNodes []*structs.Node) *SystemStack { + // Create a new stack + s := &SystemStack{ctx: ctx} + + // Create the source iterator. We visit nodes in a linear order because we + // have to evaluate on all nodes. + s.source = NewStaticIterator(ctx, baseNodes) + + // Attach the job constraints. The job is filled in later. + s.jobConstraint = NewConstraintIterator(ctx, s.source, nil) + + // Filter on task group drivers first as they are faster + s.taskGroupDrivers = NewDriverIterator(ctx, s.jobConstraint, nil) + + // Filter on task group constraints second + s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil) + + // Upgrade from feasible to rank iterator + rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint) + + // Apply the bin packing, this depends on the resources needed + // by a particular task group. Enable eviction as system jobs are high + // priority. + s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) + + // Set the nodes if given + if len(baseNodes) != 0 { + s.SetNodes(baseNodes) + } + return s +} + +func (s *SystemStack) SetNodes(baseNodes []*structs.Node) { + // Update the set of base nodes + s.source.SetNodes(baseNodes) +} + +func (s *SystemStack) SetJob(job *structs.Job) { + s.jobConstraint.SetConstraints(job.Constraints) + s.binPack.SetPriority(job.Priority) +} + +func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { + // Reset the binpack selector and context + s.binPack.Reset() + s.ctx.Reset() + start := time.Now() + + // Collect the constraints, drivers and resources required by each + // sub-task to aggregate the TaskGroup totals + constr := make([]*structs.Constraint, 0, len(tg.Constraints)) + drivers := make(map[string]struct{}) + size := new(structs.Resources) + constr = append(constr, tg.Constraints...) + for _, task := range tg.Tasks { + drivers[task.Driver] = struct{}{} + constr = append(constr, task.Constraints...) + size.Add(task.Resources) + } + + // Update the parameters of iterators + s.taskGroupDrivers.SetDrivers(drivers) + s.taskGroupConstraint.SetConstraints(constr) + s.binPack.SetTasks(tg.Tasks) + + // Get the next option that satisfies the constraints. + option := s.binPack.Next() + + // Ensure that the task resources were specified + if option != nil && len(option.TaskResources) != len(tg.Tasks) { + for _, task := range tg.Tasks { + option.SetTaskResources(task, task.Resources) + } + } + + // Store the compute time + s.ctx.Metrics().AllocationTime = time.Since(start) + return option, size +} diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 07e945315..d63306c88 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -207,3 +207,205 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { t.Fatalf("bad: %#v", met) } } + +func TestSystemStack_SetNodes(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx, nil) + + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack.SetNodes(nodes) + + out := collectFeasible(stack.source) + if !reflect.DeepEqual(out, nodes) { + t.Fatalf("bad: %#v", out) + } +} + +func TestSystemStack_SetJob(t *testing.T) { + _, ctx := testContext(t) + stack := NewSystemStack(ctx, nil) + + job := mock.Job() + stack.SetJob(job) + + if stack.binPack.priority != job.Priority { + t.Fatalf("bad") + } + if !reflect.DeepEqual(stack.jobConstraint.constraints, job.Constraints) { + t.Fatalf("bad") + } +} + +func TestSystemStack_Select_Size(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + node, size := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + if size == nil { + t.Fatalf("missing size") + } + + if size.CPU != 500 || size.MemoryMB != 256 { + t.Fatalf("bad: %#v", size) + } + + met := ctx.Metrics() + if met.AllocationTime == 0 { + t.Fatalf("missing time") + } +} + +func TestSystemStack_Select_MetricsReset(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + mock.Node(), + mock.Node(), + } + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + n1, _ := stack.Select(job.TaskGroups[0]) + m1 := ctx.Metrics() + if n1 == nil { + t.Fatalf("missing node %#v", m1) + } + + if m1.NodesEvaluated != 1 { + t.Fatalf("should only be 1") + } + + n2, _ := stack.Select(job.TaskGroups[0]) + m2 := ctx.Metrics() + if n2 == nil { + t.Fatalf("missing node %#v", m2) + } + + // If we don't reset, this would be 2 + if m2.NodesEvaluated != 1 { + t.Fatalf("should only be 2") + } +} + +func TestSystemStack_Select_DriverFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + } + zero := nodes[0] + zero.Attributes["driver.foo"] = "1" + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + job.TaskGroups[0].Tasks[0].Driver = "foo" + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + zero.Attributes["driver.foo"] = "0" + stack = NewSystemStack(ctx, nodes) + stack.SetJob(job) + node, _ = stack.Select(job.TaskGroups[0]) + if node != nil { + t.Fatalf("node not filtered %#v", node) + } +} + +func TestSystemStack_Select_ConstraintFilter(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[1] + zero.Attributes["kernel.name"] = "freebsd" + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + job.Constraints[0].RTarget = "freebsd" + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != zero { + t.Fatalf("bad") + } + + met := ctx.Metrics() + if met.NodesFiltered != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassFiltered["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ConstraintFiltered["$attr.kernel.name = freebsd"] != 1 { + t.Fatalf("bad: %#v", met) + } +} + +func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { + _, ctx := testContext(t) + nodes := []*structs.Node{ + mock.Node(), + mock.Node(), + } + zero := nodes[0] + zero.Reserved = zero.Resources + one := nodes[1] + + stack := NewSystemStack(ctx, nodes) + + job := mock.Job() + stack.SetJob(job) + + node, _ := stack.Select(job.TaskGroups[0]) + if node == nil { + t.Fatalf("missing node %#v", ctx.Metrics()) + } + + if node.Node != one { + t.Fatalf("bad") + } + + met := ctx.Metrics() + if met.NodesExhausted != 1 { + t.Fatalf("bad: %#v", met) + } + if met.ClassExhausted["linux-medium-pci"] != 1 { + t.Fatalf("bad: %#v", met) + } + if len(met.Scores) != 1 { + t.Fatalf("bad: %#v", met) + } +} diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go new file mode 100644 index 000000000..b36983f27 --- /dev/null +++ b/scheduler/system_sched.go @@ -0,0 +1,392 @@ +package scheduler + +import ( + "fmt" + "log" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // maxSystemScheduleAttempts is used to limit the number of times + // we will attempt to schedule if we continue to hit conflicts for system + // jobs. + maxSystemScheduleAttempts = 2 + + // allocNodeTainted is the status used when stopping an alloc because it's + // node is tainted. + allocNodeTainted = "system alloc not needed as node is tainted" +) + +// SystemScheduler is used for 'system' jobs. This scheduler is +// designed for services that should be run on every client. +type SystemScheduler struct { + logger *log.Logger + state State + planner Planner + + eval *structs.Evaluation + job *structs.Job + plan *structs.Plan + ctx *EvalContext + stack *SystemStack + nodes []*structs.Node + + limitReached bool + nextEval *structs.Evaluation +} + +// NewSystemScheduler is a factory function to instantiate a new system +// scheduler. +func NewSystemScheduler(logger *log.Logger, state State, planner Planner) Scheduler { + return &SystemScheduler{ + logger: logger, + state: state, + planner: planner, + } +} + +// setStatus is used to update the status of the evaluation +func (s *SystemScheduler) setStatus(status, desc string) error { + s.logger.Printf("[DEBUG] sched: %#v: setting status to %s", s.eval, status) + newEval := s.eval.Copy() + newEval.Status = status + newEval.StatusDescription = desc + if s.nextEval != nil { + newEval.NextEval = s.nextEval.ID + } + return s.planner.UpdateEval(newEval) +} + +// Process is used to handle a single evaluation. +func (s *SystemScheduler) Process(eval *structs.Evaluation) error { + // Store the evaluation + s.eval = eval + + // Verify the evaluation trigger reason is understood + switch eval.TriggeredBy { + case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate, + structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate: + default: + desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason", + eval.TriggeredBy) + return s.setStatus(structs.EvalStatusFailed, desc) + } + + // Retry up to the maxSystemScheduleAttempts + if err := retryMax(maxSystemScheduleAttempts, s.process); err != nil { + if statusErr, ok := err.(*SetStatusError); ok { + return s.setStatus(statusErr.EvalStatus, err.Error()) + } + return err + } + + // Update the status to complete + return s.setStatus(structs.EvalStatusComplete, "") +} + +// process is wrapped in retryMax to iteratively run the handler until we have no +// further work or we've made the maximum number of attempts. +func (s *SystemScheduler) process() (bool, error) { + // Lookup the Job by ID + var err error + s.job, err = s.state.JobByID(s.eval.JobID) + if err != nil { + return false, fmt.Errorf("failed to get job '%s': %v", + s.eval.JobID, err) + } + + // Get the ready nodes in the required datacenters + if s.job != nil { + s.nodes, err = readyNodesInDCs(s.state, s.job.Datacenters) + if err != nil { + return false, fmt.Errorf("failed to get ready nodes: %v", err) + } + } + + // Create a plan + s.plan = s.eval.MakePlan(s.job) + + // Create an evaluation context + s.ctx = NewEvalContext(s.state, s.plan, s.logger) + + // Construct the placement stack + s.stack = NewSystemStack(s.ctx, nil) + if s.job != nil { + s.stack.SetJob(s.job) + } + + // Compute the target job allocations + if err := s.computeJobAllocs(); err != nil { + s.logger.Printf("[ERR] sched: %#v: %v", s.eval, err) + return false, err + } + + // If the plan is a no-op, we can bail + if s.plan.IsNoOp() { + return true, nil + } + + // If the limit of placements was reached we need to create an evaluation + // to pickup from here after the stagger period. + if s.limitReached && s.nextEval == nil { + s.nextEval = s.eval.NextRollingEval(s.job.Update.Stagger) + if err := s.planner.CreateEval(s.nextEval); err != nil { + s.logger.Printf("[ERR] sched: %#v failed to make next eval for rolling update: %v", s.eval, err) + return false, err + } + s.logger.Printf("[DEBUG] sched: %#v: rolling update limit reached, next eval '%s' created", s.eval, s.nextEval.ID) + } + + // Submit the plan + result, newState, err := s.planner.SubmitPlan(s.plan) + if err != nil { + return false, err + } + + // If we got a state refresh, try again since we have stale data + if newState != nil { + s.logger.Printf("[DEBUG] sched: %#v: refresh forced", s.eval) + s.state = newState + return false, nil + } + + // Try again if the plan was not fully committed, potential conflict + fullCommit, expected, actual := result.FullCommit(s.plan) + if !fullCommit { + s.logger.Printf("[DEBUG] sched: %#v: attempted %d placements, %d placed", + s.eval, expected, actual) + return false, nil + } + + // Success! + return true, nil +} + +// computeJobAllocs is used to reconcile differences between the job, +// existing allocations and node status to update the allocations. +func (s *SystemScheduler) computeJobAllocs() error { + // Materialize all the task groups per node. + var groups map[string]*structs.TaskGroup + if s.job != nil { + groups = materializeSystemTaskGroups(s.job, s.nodes) + } + + // Lookup the allocations by JobID + allocs, err := s.state.AllocsByJob(s.eval.JobID) + if err != nil { + return fmt.Errorf("failed to get allocs for job '%s': %v", + s.eval.JobID, err) + } + + // Filter out the allocations in a terminal state + allocs = structs.FilterTerminalAllocs(allocs) + + // Determine the tainted nodes containing job allocs + tainted, err := taintedNodes(s.state, allocs) + if err != nil { + return fmt.Errorf("failed to get tainted nodes for job '%s': %v", + s.eval.JobID, err) + } + + // Diff the required and existing allocations + diff := diffAllocs(s.job, tainted, groups, allocs) + s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) + + // Add all the allocs to stop + for _, e := range diff.stop { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) + } + + // Also stop all the allocs that are marked as needing migrating. This + // allows failed nodes to be properly GC'd. + for _, e := range diff.migrate { + s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted) + } + + // Attempt to do the upgrades in place + diff.update = s.inplaceUpdate(diff.update) + + // Check if a rolling upgrade strategy is being used + limit := len(diff.update) + if s.job != nil && s.job.Update.Rolling() { + limit = s.job.Update.MaxParallel + } + + // Treat non in-place updates as an eviction and new placement. + s.evictAndPlace(diff, diff.update, allocUpdating, &limit) + + // Nothing remaining to do if placement is not required + if len(diff.place) == 0 { + return nil + } + + // Compute the placements + return s.computePlacements(diff.place) +} + +// evictAndPlace is used to mark allocations for evicts and add them to the placement queue +func (s *SystemScheduler) evictAndPlace(diff *diffResult, allocs []allocTuple, desc string, limit *int) { + n := len(allocs) + for i := 0; i < n && i < *limit; i++ { + a := allocs[i] + s.plan.AppendUpdate(a.Alloc, structs.AllocDesiredStatusStop, desc) + diff.place = append(diff.place, a) + } + if n <= *limit { + *limit -= n + } else { + *limit = 0 + s.limitReached = true + } +} + +// inplaceUpdate attempts to update allocations in-place where possible. +func (s *SystemScheduler) inplaceUpdate(updates []allocTuple) []allocTuple { + n := len(updates) + inplace := 0 + for i := 0; i < n; i++ { + // Get the update + update := updates[i] + + // Check if the task drivers or config has changed, requires + // a rolling upgrade since that cannot be done in-place. + existing := update.Alloc.Job.LookupTaskGroup(update.TaskGroup.Name) + if tasksUpdated(update.TaskGroup, existing) { + continue + } + + // Get the existing node + node, err := s.state.NodeByID(update.Alloc.NodeID) + if err != nil { + s.logger.Printf("[ERR] sched: %#v failed to get node '%s': %v", + s.eval, update.Alloc.NodeID, err) + continue + } + if node == nil { + continue + } + + // Set the existing node as the base set + s.stack.SetNodes([]*structs.Node{node}) + + // Stage an eviction of the current allocation + s.plan.AppendUpdate(update.Alloc, structs.AllocDesiredStatusStop, + allocInPlace) + + // Attempt to match the task group + option, size := s.stack.Select(update.TaskGroup) + + // Pop the allocation + s.plan.PopUpdate(update.Alloc) + + // Skip if we could not do an in-place update + if option == nil { + continue + } + + // Restore the network offers from the existing allocation. + // We do not allow network resources (reserved/dynamic ports) + // to be updated. This is guarded in taskUpdated, so we can + // safely restore those here. + for task, resources := range option.TaskResources { + existing := update.Alloc.TaskResources[task] + resources.Networks = existing.Networks + } + + // Create a shallow copy + newAlloc := new(structs.Allocation) + *newAlloc = *update.Alloc + + // Update the allocation + newAlloc.EvalID = s.eval.ID + newAlloc.Job = s.job + newAlloc.Resources = size + newAlloc.TaskResources = option.TaskResources + newAlloc.Metrics = s.ctx.Metrics() + newAlloc.DesiredStatus = structs.AllocDesiredStatusRun + newAlloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(newAlloc) + + // Remove this allocation from the slice + updates[i] = updates[n-1] + i-- + n-- + inplace++ + } + if len(updates) > 0 { + s.logger.Printf("[DEBUG] sched: %#v: %d in-place updates of %d", s.eval, inplace, len(updates)) + } + return updates[:n] +} + +// computePlacements computes placements for allocations +func (s *SystemScheduler) computePlacements(place []allocTuple) error { + nodeByID := make(map[string]*structs.Node, len(s.nodes)) + for _, node := range s.nodes { + nodeByID[node.ID] = node + } + + // Track the failed task groups so that we can coalesce + // the failures together to avoid creating many failed allocs. + failedTG := make(map[*structs.TaskGroup]*structs.Allocation) + + nodes := make([]*structs.Node, 1) + for _, missing := range place { + // Get the node by looking at the name in the task group. + nodeID, err := extractTaskGroupId(missing.Name) + if err != nil { + s.logger.Printf("[ERR] sched: %#v failed to parse node id from %q: %v", + s.eval, missing.Name, err) + return err + } + + node, ok := nodeByID[nodeID] + if !ok { + return fmt.Errorf("could not find node %q", nodeID) + } + + // Update the set of placement ndoes + nodes[0] = node + s.stack.SetNodes(nodes) + + // Attempt to match the task group + option, size := s.stack.Select(missing.TaskGroup) + + if option == nil { + // Check if this task group has already failed + if alloc, ok := failedTG[missing.TaskGroup]; ok { + alloc.Metrics.CoalescedFailures += 1 + continue + } + } + + // Create an allocation for this + alloc := &structs.Allocation{ + ID: structs.GenerateUUID(), + EvalID: s.eval.ID, + Name: missing.Name, + JobID: s.job.ID, + Job: s.job, + TaskGroup: missing.TaskGroup.Name, + Resources: size, + Metrics: s.ctx.Metrics(), + } + + // Set fields based on if we found an allocation option + if option != nil { + alloc.NodeID = option.Node.ID + alloc.TaskResources = option.TaskResources + alloc.DesiredStatus = structs.AllocDesiredStatusRun + alloc.ClientStatus = structs.AllocClientStatusPending + s.plan.AppendAlloc(alloc) + } else { + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + alloc.DesiredDescription = "failed to find a node for placement" + alloc.ClientStatus = structs.AllocClientStatusFailed + s.plan.AppendFailed(alloc) + failedTG[missing.TaskGroup] = alloc + } + } + return nil +} diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go new file mode 100644 index 000000000..99f612d07 --- /dev/null +++ b/scheduler/system_sched_test.go @@ -0,0 +1,651 @@ +package scheduler + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestSystemSched_JobRegister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AddNode(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a new node. + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Create a mock evaluation to deal with the node update + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan had no node updates + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated on the new node + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure it allocated on the right node + if _, ok := plan.NodeAllocation[node.ID]; !ok { + t.Fatalf("allocated on wrong node: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 11 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobRegister_AllocFail(t *testing.T) { + h := NewHarness(t) + + // Create NO nodes + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure no plan as this should be a no-op. + if len(h.Plans) != 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Add a few terminal status allocations, these should be ignored + var terminal []*structs.Allocation + for i := 0; i < 5; i++ { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = nodes[i].ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", nodes[i].ID) + alloc.DesiredStatus = structs.AllocDesiredStatusFailed + terminal = append(terminal, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), terminal)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != len(allocs) { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + out = structs.FilterTerminalAllocs(out) + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_JobModify_Rolling(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + job2.Update = structs.UpdateStrategy{ + Stagger: 30 * time.Second, + MaxParallel: 5, + } + + // Update the task, such that it cannot be done in-place + job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other" + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted only MaxParallel + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan allocated + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != job2.Update.MaxParallel { + t.Fatalf("bad: %#v", plan) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Ensure a follow up eval was created + eval = h.Evals[0] + if eval.NextEval == "" { + t.Fatalf("missing next eval") + } + + // Check for create + if len(h.CreateEvals) == 0 { + t.Fatalf("missing created eval") + } + create := h.CreateEvals[0] + if eval.NextEval != create.ID { + t.Fatalf("ID mismatch") + } + if create.PreviousEval != eval.ID { + t.Fatalf("missing previous eval") + } + + if create.TriggeredBy != structs.EvalTriggerRollingUpdate { + t.Fatalf("bad: %#v", create) + } +} + +func TestSystemSched_JobModify_InPlace(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Update the job + job2 := mock.SystemJob() + job2.ID = job.ID + noErr(t, h.State.UpsertJob(h.NextIndex(), job2)) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan did not evict any allocs + var update []*structs.Allocation + for _, updateList := range plan.NodeUpdate { + update = append(update, updateList...) + } + if len(update) != 0 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the existing allocs + var planned []*structs.Allocation + for _, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + } + if len(planned) != 10 { + t.Fatalf("bad: %#v", plan) + } + for _, p := range planned { + if p.Job != job2 { + t.Fatalf("should update job") + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure all allocations placed + if len(out) != 10 { + t.Fatalf("bad: %#v", out) + } + h.AssertEvalStatus(t, structs.EvalStatusComplete) + + // Verify the network did not change + for _, alloc := range out { + for _, resources := range alloc.TaskResources { + if resources.Networks[0].ReservedPorts[0] != 5000 { + t.Fatalf("bad: %#v", alloc) + } + } + } +} + +func TestSystemSched_JobDeregister(t *testing.T) { + h := NewHarness(t) + + // Create some nodes + var nodes []*structs.Node + for i := 0; i < 10; i++ { + node := mock.Node() + nodes = append(nodes, node) + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Generate a fake job with allocations + job := mock.SystemJob() + + var allocs []*structs.Allocation + for _, node := range nodes { + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + allocs = append(allocs, alloc) + } + noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted the job from all nodes. + for _, node := range nodes { + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no remaining allocations + out = structs.FilterTerminalAllocs(out) + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_NodeDrain(t *testing.T) { + h := NewHarness(t) + + // Register a draining node + node := mock.Node() + node.Drain = true + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + + // Generate a fake job allocated on that node. + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + alloc := mock.Alloc() + alloc.Job = job + alloc.JobID = job.ID + alloc.NodeID = node.ID + alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) + + // Create a mock evaluation to deal with drain + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: 50, + TriggeredBy: structs.EvalTriggerNodeUpdate, + JobID: job.ID, + NodeID: node.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure a single plan + if len(h.Plans) != 1 { + t.Fatalf("bad: %#v", h.Plans) + } + plan := h.Plans[0] + + // Ensure the plan evicted all allocs + if len(plan.NodeUpdate[node.ID]) != 1 { + t.Fatalf("bad: %#v", plan) + } + + // Ensure the plan updated the allocation. + var planned []*structs.Allocation + for _, allocList := range plan.NodeUpdate { + planned = append(planned, allocList...) + } + if len(planned) != 1 { + t.Log(len(planned)) + t.Fatalf("bad: %#v", plan) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure the allocations is stopped + if planned[0].DesiredStatus != structs.AllocDesiredStatusStop { + t.Fatalf("bad: %#v", out) + } + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + +func TestSystemSched_RetryLimit(t *testing.T) { + h := NewHarness(t) + h.Planner = &RejectPlan{h} + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + noErr(t, h.State.UpsertNode(h.NextIndex(), node)) + } + + // Create a job + job := mock.SystemJob() + noErr(t, h.State.UpsertJob(h.NextIndex(), job)) + + // Create a mock evaluation to deregister the job + eval := &structs.Evaluation{ + ID: structs.GenerateUUID(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + } + + // Process the evaluation + err := h.Process(NewSystemScheduler, eval) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Ensure multiple plans + if len(h.Plans) == 0 { + t.Fatalf("bad: %#v", h.Plans) + } + + // Lookup the allocations by JobID + out, err := h.State.AllocsByJob(job.ID) + noErr(t, err) + + // Ensure no allocations placed + if len(out) != 0 { + t.Fatalf("bad: %#v", out) + } + + // Should hit the retry limit + h.AssertEvalStatus(t, structs.EvalStatusFailed) +} diff --git a/scheduler/util.go b/scheduler/util.go index cae0043ab..606b2ea32 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -4,10 +4,16 @@ import ( "fmt" "math/rand" "reflect" + "regexp" "github.com/hashicorp/nomad/nomad/structs" ) +var ( + // Regex to capture the identifier of a task group name. + taskGroupID = regexp.MustCompile(`.+\..+\[(.*)\]`) +) + // allocTuple is a tuple of the allocation name and potential alloc ID type allocTuple struct { Name string @@ -28,6 +34,32 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { return out } +// materializeSystemTaskGroups is used to materialize all the task groups +// a system job requires. This is used to do the node expansion. +func materializeSystemTaskGroups(job *structs.Job, nodes []*structs.Node) map[string]*structs.TaskGroup { + out := make(map[string]*structs.TaskGroup) + for _, tg := range job.TaskGroups { + for _, node := range nodes { + name := fmt.Sprintf("%s.%s[%s]", job.Name, tg.Name, node.ID) + out[name] = tg + } + } + return out +} + +// extractTaskGroupIdreturns the unique identifier for the task group +// name. It returns the id that distinguishes multiple instantiations of a task +// group. In the case of the system scheduler they will be the nodes name and +// otherwise it will be the tasks count. +func extractTaskGroupId(name string) (string, error) { + matches := taskGroupID.FindStringSubmatch(name) + if len(matches) != 2 { + return "", fmt.Errorf("could not determine task group id from %v: %#v", name, matches) + } + + return matches[1], nil +} + // diffResult is used to return the sets that result from the diff type diffResult struct { place, update, migrate, stop, ignore []allocTuple