diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 5ed5af097..abfb16c8c 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -128,7 +128,8 @@ func SystemJob() *structs.Job { }, TaskGroups: []*structs.TaskGroup{ &structs.TaskGroup{ - Name: "web", + Name: "web", + Count: 1, Tasks: []*structs.Task{ &structs.Task{ Name: "web", diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index e1feb89f1..f2a34f6eb 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -243,7 +243,7 @@ func (s *GenericScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *GenericScheduler) computePlacements(place []allocTuple) error { +func (s *GenericScheduler) computePlacements(place []*allocTuple) error { // Get the base nodes nodes, err := readyNodesInDCs(s.state, s.job.Datacenters) if err != nil { diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index dfa0859db..8abed6aee 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -154,12 +154,6 @@ func (s *SystemScheduler) process() (bool, error) { // computeJobAllocs is used to reconcile differences between the job, // existing allocations and node status to update the allocations. func (s *SystemScheduler) computeJobAllocs() error { - // Materialize all the task groups per node. - var groups map[string]*structs.TaskGroup - if s.job != nil { - groups = materializeSystemTaskGroups(s.job, s.nodes) - } - // Lookup the allocations by JobID allocs, err := s.state.AllocsByJob(s.eval.JobID) if err != nil { @@ -178,7 +172,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // Diff the required and existing allocations - diff := diffAllocs(s.job, tainted, groups, allocs) + diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs) s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff) // Add all the allocs to stop @@ -186,12 +180,6 @@ func (s *SystemScheduler) computeJobAllocs() error { s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded) } - // Also stop all the allocs that are marked as needing migrating. This - // allows failed nodes to be properly GC'd. - for _, e := range diff.migrate { - s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted) - } - // Attempt to do the upgrades in place diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update) @@ -214,7 +202,7 @@ func (s *SystemScheduler) computeJobAllocs() error { } // computePlacements computes placements for allocations -func (s *SystemScheduler) computePlacements(place []allocTuple) error { +func (s *SystemScheduler) computePlacements(place []*allocTuple) error { nodeByID := make(map[string]*structs.Node, len(s.nodes)) for _, node := range s.nodes { nodeByID[node.ID] = node @@ -226,17 +214,9 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { nodes := make([]*structs.Node, 1) for _, missing := range place { - // Get the node by looking at the name in the task group. - nodeID, err := extractTaskGroupId(missing.Name) - if err != nil { - s.logger.Printf("[ERR] sched: %#v failed to parse node id from %q: %v", - s.eval, missing.Name, err) - return err - } - - node, ok := nodeByID[nodeID] + node, ok := nodeByID[missing.Alloc.NodeID] if !ok { - return fmt.Errorf("could not find node %q", nodeID) + return fmt.Errorf("could not find node %q", missing.Alloc.NodeID) } // Update the set of placement ndoes diff --git a/scheduler/system_sched_test.go b/scheduler/system_sched_test.go index 99f612d07..ece805057 100644 --- a/scheduler/system_sched_test.go +++ b/scheduler/system_sched_test.go @@ -1,7 +1,6 @@ package scheduler import ( - "fmt" "testing" "time" @@ -84,7 +83,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -119,6 +118,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) { update = append(update, updateList...) } if len(update) != 0 { + t.Log(len(update)) t.Fatalf("bad: %#v", plan) } @@ -200,7 +200,7 @@ func TestSystemSched_JobModify(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -212,7 +212,7 @@ func TestSystemSched_JobModify(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", nodes[i].ID) + alloc.Name = "my-job.web[0]" alloc.DesiredStatus = structs.AllocDesiredStatusFailed terminal = append(terminal, alloc) } @@ -298,7 +298,7 @@ func TestSystemSched_JobModify_Rolling(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -399,7 +399,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -492,7 +492,7 @@ func TestSystemSched_JobDeregister(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -553,7 +553,7 @@ func TestSystemSched_NodeDrain(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = node.ID - alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID) + alloc.Name = "my-job.web[0]" noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc})) // Create a mock evaluation to deal with drain diff --git a/scheduler/util.go b/scheduler/util.go index f883b34c5..fc85a4e63 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -5,16 +5,10 @@ import ( "log" "math/rand" "reflect" - "regexp" "github.com/hashicorp/nomad/nomad/structs" ) -var ( - // Regex to capture the identifier of a task group name. - taskGroupID = regexp.MustCompile(`.+\..+\[(.*)\]`) -) - // allocTuple is a tuple of the allocation name and potential alloc ID type allocTuple struct { Name string @@ -26,6 +20,10 @@ type allocTuple struct { // a job requires. This is used to do the count expansion. func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { out := make(map[string]*structs.TaskGroup) + if job == nil { + return out + } + for _, tg := range job.TaskGroups { for i := 0; i < tg.Count; i++ { name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i) @@ -35,35 +33,9 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup { return out } -// materializeSystemTaskGroups is used to materialize all the task groups -// a system job requires. This is used to do the node expansion. -func materializeSystemTaskGroups(job *structs.Job, nodes []*structs.Node) map[string]*structs.TaskGroup { - out := make(map[string]*structs.TaskGroup) - for _, tg := range job.TaskGroups { - for _, node := range nodes { - name := fmt.Sprintf("%s.%s[%s]", job.Name, tg.Name, node.ID) - out[name] = tg - } - } - return out -} - -// extractTaskGroupIdreturns the unique identifier for the task group -// name. It returns the id that distinguishes multiple instantiations of a task -// group. In the case of the system scheduler they will be the nodes name and -// otherwise it will be the tasks count. -func extractTaskGroupId(name string) (string, error) { - matches := taskGroupID.FindStringSubmatch(name) - if len(matches) != 2 { - return "", fmt.Errorf("could not determine task group id from %v: %#v", name, matches) - } - - return matches[1], nil -} - // diffResult is used to return the sets that result from the diff type diffResult struct { - place, update, migrate, stop, ignore []allocTuple + place, update, migrate, stop, ignore []*allocTuple } func (d *diffResult) GoString() string { @@ -71,6 +43,14 @@ func (d *diffResult) GoString() string { len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore)) } +func (d *diffResult) Append(other *diffResult) { + d.place = append(d.place, other.place...) + d.update = append(d.update, other.update...) + d.migrate = append(d.migrate, other.migrate...) + d.stop = append(d.stop, other.stop...) + d.ignore = append(d.ignore, other.ignore...) +} + // diffAllocs is used to do a set difference between the target allocations // and the existing allocations. This returns 5 sets of results, the list of // named task groups that need to be placed (no existing allocation), the @@ -93,7 +73,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If not required, we stop the alloc if !ok { - result.stop = append(result.stop, allocTuple{ + result.stop = append(result.stop, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -103,7 +83,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // If we are on a tainted node, we must migrate if taintedNodes[exist.NodeID] { - result.migrate = append(result.migrate, allocTuple{ + result.migrate = append(result.migrate, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -116,7 +96,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // if the job definition has changed in a way that affects // this allocation and potentially ignore it. if job.ModifyIndex != exist.Job.ModifyIndex { - result.update = append(result.update, allocTuple{ + result.update = append(result.update, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -125,7 +105,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, } // Everything is up-to-date - result.ignore = append(result.ignore, allocTuple{ + result.ignore = append(result.ignore, &allocTuple{ Name: name, TaskGroup: tg, Alloc: exist, @@ -141,7 +121,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, // is an existing allocation, we would have checked for a potential // update or ignore above. if !ok { - result.place = append(result.place, allocTuple{ + result.place = append(result.place, &allocTuple{ Name: name, TaskGroup: tg, }) @@ -150,6 +130,47 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool, return result } +// diffSystemAllocs is like diffAllocs however, the allocations in the +// diffResult contain the specific nodeID they should be allocated on. +func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]bool, + allocs []*structs.Allocation) *diffResult { + + // Build a mapping of nodes to all their allocs. + nodeAllocs := make(map[string][]*structs.Allocation, len(allocs)) + for _, alloc := range allocs { + nallocs := append(nodeAllocs[alloc.NodeID], alloc) + nodeAllocs[alloc.NodeID] = nallocs + } + + for _, node := range nodes { + if _, ok := nodeAllocs[node.ID]; !ok { + nodeAllocs[node.ID] = nil + } + } + + // Create the required task groups. + required := materializeTaskGroups(job) + + result := &diffResult{} + for nodeID, allocs := range nodeAllocs { + diff := diffAllocs(job, taintedNodes, required, allocs) + + // Mark the alloc as being for a specific node. + for _, alloc := range diff.place { + alloc.Alloc = &structs.Allocation{NodeID: nodeID} + } + + // Migrate does not apply to system jobs and instead should be marked as + // stop because if a node is tainted, the job is invalid on that node. + diff.stop = append(diff.stop, diff.migrate...) + diff.migrate = nil + + result.Append(diff) + } + + return result +} + // readyNodesInDCs returns all the ready nodes in the given datacenters func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) { // Index the DCs @@ -290,7 +311,7 @@ func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Eval // inplaceUpdate attempts to update allocations in-place where possible. func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, - stack Stack, updates []allocTuple) []allocTuple { + stack Stack, updates []*allocTuple) []*allocTuple { n := len(updates) inplace := 0 @@ -372,7 +393,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job, // evictAndPlace is used to mark allocations for evicts and add them to the // placement queue. evictAndPlace modifies both the the diffResult and the // limit. It returns true if the limit has been reached. -func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool { +func evictAndPlace(ctx Context, diff *diffResult, allocs []*allocTuple, desc string, limit *int) bool { n := len(allocs) for i := 0; i < n && i < *limit; i++ { a := allocs[i] diff --git a/scheduler/util_test.go b/scheduler/util_test.go index a9d894d8c..38967c83e 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -109,6 +109,80 @@ func TestDiffAllocs(t *testing.T) { } } +func TestDiffSystemAllocs(t *testing.T) { + job := mock.SystemJob() + + // Create three alive nodes. + nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}} + + // The "old" job has a previous modify index + oldJob := new(structs.Job) + *oldJob = *job + oldJob.ModifyIndex -= 1 + + tainted := map[string]bool{ + "dead": true, + "baz": false, + } + + allocs := []*structs.Allocation{ + // Update allocation on baz + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "baz", + Name: "my-job.web[0]", + Job: oldJob, + }, + + // Ignore allocation on bar + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "bar", + Name: "my-job.web[0]", + Job: job, + }, + + // Stop allocation on dead. + &structs.Allocation{ + ID: structs.GenerateUUID(), + NodeID: "dead", + Name: "my-job.web[0]", + }, + } + + diff := diffSystemAllocs(job, nodes, tainted, allocs) + place := diff.place + update := diff.update + migrate := diff.migrate + stop := diff.stop + ignore := diff.ignore + + // We should update the first alloc + if len(update) != 1 || update[0].Alloc != allocs[0] { + t.Fatalf("bad: %#v", update) + } + + // We should ignore the second alloc + if len(ignore) != 1 || ignore[0].Alloc != allocs[1] { + t.Fatalf("bad: %#v", ignore) + } + + // We should stop the third alloc + if len(stop) != 1 || stop[0].Alloc != allocs[2] { + t.Fatalf("bad: %#v", stop) + } + + // There should be no migrates. + if len(migrate) != 0 { + t.Fatalf("bad: %#v", migrate) + } + + // We should place 1 + if len(place) != 1 { + t.Fatalf("bad: %#v", place) + } +} + func TestReadyNodesInDCs(t *testing.T) { state, err := state.NewStateStore(os.Stderr) if err != nil {