Add diffSystemAlloc which gives richer information which node to place a system allocation

This commit is contained in:
Alex Dadgar
2015-10-15 13:14:44 -07:00
parent 5cd9a55bcd
commit 5bfb712a7d
6 changed files with 150 additions and 74 deletions

View File

@@ -128,7 +128,8 @@ func SystemJob() *structs.Job {
},
TaskGroups: []*structs.TaskGroup{
&structs.TaskGroup{
Name: "web",
Name: "web",
Count: 1,
Tasks: []*structs.Task{
&structs.Task{
Name: "web",

View File

@@ -243,7 +243,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocTuple) error {
func (s *GenericScheduler) computePlacements(place []*allocTuple) error {
// Get the base nodes
nodes, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {

View File

@@ -154,12 +154,6 @@ func (s *SystemScheduler) process() (bool, error) {
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *SystemScheduler) computeJobAllocs() error {
// Materialize all the task groups per node.
var groups map[string]*structs.TaskGroup
if s.job != nil {
groups = materializeSystemTaskGroups(s.job, s.nodes)
}
// Lookup the allocations by JobID
allocs, err := s.state.AllocsByJob(s.eval.JobID)
if err != nil {
@@ -178,7 +172,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
}
// Diff the required and existing allocations
diff := diffAllocs(s.job, tainted, groups, allocs)
diff := diffSystemAllocs(s.job, s.nodes, tainted, allocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the allocs to stop
@@ -186,12 +180,6 @@ func (s *SystemScheduler) computeJobAllocs() error {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
}
// Also stop all the allocs that are marked as needing migrating. This
// allows failed nodes to be properly GC'd.
for _, e := range diff.migrate {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNodeTainted)
}
// Attempt to do the upgrades in place
diff.update = inplaceUpdate(s.ctx, s.eval, s.job, s.stack, diff.update)
@@ -214,7 +202,7 @@ func (s *SystemScheduler) computeJobAllocs() error {
}
// computePlacements computes placements for allocations
func (s *SystemScheduler) computePlacements(place []allocTuple) error {
func (s *SystemScheduler) computePlacements(place []*allocTuple) error {
nodeByID := make(map[string]*structs.Node, len(s.nodes))
for _, node := range s.nodes {
nodeByID[node.ID] = node
@@ -226,17 +214,9 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
nodes := make([]*structs.Node, 1)
for _, missing := range place {
// Get the node by looking at the name in the task group.
nodeID, err := extractTaskGroupId(missing.Name)
if err != nil {
s.logger.Printf("[ERR] sched: %#v failed to parse node id from %q: %v",
s.eval, missing.Name, err)
return err
}
node, ok := nodeByID[nodeID]
node, ok := nodeByID[missing.Alloc.NodeID]
if !ok {
return fmt.Errorf("could not find node %q", nodeID)
return fmt.Errorf("could not find node %q", missing.Alloc.NodeID)
}
// Update the set of placement ndoes

View File

@@ -1,7 +1,6 @@
package scheduler
import (
"fmt"
"testing"
"time"
@@ -84,7 +83,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@@ -119,6 +118,7 @@ func TestSystemSched_JobRegister_AddNode(t *testing.T) {
update = append(update, updateList...)
}
if len(update) != 0 {
t.Log(len(update))
t.Fatalf("bad: %#v", plan)
}
@@ -200,7 +200,7 @@ func TestSystemSched_JobModify(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@@ -212,7 +212,7 @@ func TestSystemSched_JobModify(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", nodes[i].ID)
alloc.Name = "my-job.web[0]"
alloc.DesiredStatus = structs.AllocDesiredStatusFailed
terminal = append(terminal, alloc)
}
@@ -298,7 +298,7 @@ func TestSystemSched_JobModify_Rolling(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@@ -399,7 +399,7 @@ func TestSystemSched_JobModify_InPlace(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@@ -492,7 +492,7 @@ func TestSystemSched_JobDeregister(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
@@ -553,7 +553,7 @@ func TestSystemSched_NodeDrain(t *testing.T) {
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = fmt.Sprintf("my-job.web[%s]", node.ID)
alloc.Name = "my-job.web[0]"
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Create a mock evaluation to deal with drain

View File

@@ -5,16 +5,10 @@ import (
"log"
"math/rand"
"reflect"
"regexp"
"github.com/hashicorp/nomad/nomad/structs"
)
var (
// Regex to capture the identifier of a task group name.
taskGroupID = regexp.MustCompile(`.+\..+\[(.*)\]`)
)
// allocTuple is a tuple of the allocation name and potential alloc ID
type allocTuple struct {
Name string
@@ -26,6 +20,10 @@ type allocTuple struct {
// a job requires. This is used to do the count expansion.
func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup)
if job == nil {
return out
}
for _, tg := range job.TaskGroups {
for i := 0; i < tg.Count; i++ {
name := fmt.Sprintf("%s.%s[%d]", job.Name, tg.Name, i)
@@ -35,35 +33,9 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
return out
}
// materializeSystemTaskGroups is used to materialize all the task groups
// a system job requires. This is used to do the node expansion.
func materializeSystemTaskGroups(job *structs.Job, nodes []*structs.Node) map[string]*structs.TaskGroup {
out := make(map[string]*structs.TaskGroup)
for _, tg := range job.TaskGroups {
for _, node := range nodes {
name := fmt.Sprintf("%s.%s[%s]", job.Name, tg.Name, node.ID)
out[name] = tg
}
}
return out
}
// extractTaskGroupIdreturns the unique identifier for the task group
// name. It returns the id that distinguishes multiple instantiations of a task
// group. In the case of the system scheduler they will be the nodes name and
// otherwise it will be the tasks count.
func extractTaskGroupId(name string) (string, error) {
matches := taskGroupID.FindStringSubmatch(name)
if len(matches) != 2 {
return "", fmt.Errorf("could not determine task group id from %v: %#v", name, matches)
}
return matches[1], nil
}
// diffResult is used to return the sets that result from the diff
type diffResult struct {
place, update, migrate, stop, ignore []allocTuple
place, update, migrate, stop, ignore []*allocTuple
}
func (d *diffResult) GoString() string {
@@ -71,6 +43,14 @@ func (d *diffResult) GoString() string {
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore))
}
func (d *diffResult) Append(other *diffResult) {
d.place = append(d.place, other.place...)
d.update = append(d.update, other.update...)
d.migrate = append(d.migrate, other.migrate...)
d.stop = append(d.stop, other.stop...)
d.ignore = append(d.ignore, other.ignore...)
}
// diffAllocs is used to do a set difference between the target allocations
// and the existing allocations. This returns 5 sets of results, the list of
// named task groups that need to be placed (no existing allocation), the
@@ -93,7 +73,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
// If not required, we stop the alloc
if !ok {
result.stop = append(result.stop, allocTuple{
result.stop = append(result.stop, &allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
@@ -103,7 +83,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
// If we are on a tainted node, we must migrate
if taintedNodes[exist.NodeID] {
result.migrate = append(result.migrate, allocTuple{
result.migrate = append(result.migrate, &allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
@@ -116,7 +96,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
// if the job definition has changed in a way that affects
// this allocation and potentially ignore it.
if job.ModifyIndex != exist.Job.ModifyIndex {
result.update = append(result.update, allocTuple{
result.update = append(result.update, &allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
@@ -125,7 +105,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
}
// Everything is up-to-date
result.ignore = append(result.ignore, allocTuple{
result.ignore = append(result.ignore, &allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
@@ -141,7 +121,7 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
// is an existing allocation, we would have checked for a potential
// update or ignore above.
if !ok {
result.place = append(result.place, allocTuple{
result.place = append(result.place, &allocTuple{
Name: name,
TaskGroup: tg,
})
@@ -150,6 +130,47 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
return result
}
// diffSystemAllocs is like diffAllocs however, the allocations in the
// diffResult contain the specific nodeID they should be allocated on.
func diffSystemAllocs(job *structs.Job, nodes []*structs.Node, taintedNodes map[string]bool,
allocs []*structs.Allocation) *diffResult {
// Build a mapping of nodes to all their allocs.
nodeAllocs := make(map[string][]*structs.Allocation, len(allocs))
for _, alloc := range allocs {
nallocs := append(nodeAllocs[alloc.NodeID], alloc)
nodeAllocs[alloc.NodeID] = nallocs
}
for _, node := range nodes {
if _, ok := nodeAllocs[node.ID]; !ok {
nodeAllocs[node.ID] = nil
}
}
// Create the required task groups.
required := materializeTaskGroups(job)
result := &diffResult{}
for nodeID, allocs := range nodeAllocs {
diff := diffAllocs(job, taintedNodes, required, allocs)
// Mark the alloc as being for a specific node.
for _, alloc := range diff.place {
alloc.Alloc = &structs.Allocation{NodeID: nodeID}
}
// Migrate does not apply to system jobs and instead should be marked as
// stop because if a node is tainted, the job is invalid on that node.
diff.stop = append(diff.stop, diff.migrate...)
diff.migrate = nil
result.Append(diff)
}
return result
}
// readyNodesInDCs returns all the ready nodes in the given datacenters
func readyNodesInDCs(state State, dcs []string) ([]*structs.Node, error) {
// Index the DCs
@@ -290,7 +311,7 @@ func setStatus(logger *log.Logger, planner Planner, eval, nextEval *structs.Eval
// inplaceUpdate attempts to update allocations in-place where possible.
func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
stack Stack, updates []allocTuple) []allocTuple {
stack Stack, updates []*allocTuple) []*allocTuple {
n := len(updates)
inplace := 0
@@ -372,7 +393,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
// evictAndPlace is used to mark allocations for evicts and add them to the
// placement queue. evictAndPlace modifies both the the diffResult and the
// limit. It returns true if the limit has been reached.
func evictAndPlace(ctx Context, diff *diffResult, allocs []allocTuple, desc string, limit *int) bool {
func evictAndPlace(ctx Context, diff *diffResult, allocs []*allocTuple, desc string, limit *int) bool {
n := len(allocs)
for i := 0; i < n && i < *limit; i++ {
a := allocs[i]

View File

@@ -109,6 +109,80 @@ func TestDiffAllocs(t *testing.T) {
}
}
func TestDiffSystemAllocs(t *testing.T) {
job := mock.SystemJob()
// Create three alive nodes.
nodes := []*structs.Node{{ID: "foo"}, {ID: "bar"}, {ID: "baz"}}
// The "old" job has a previous modify index
oldJob := new(structs.Job)
*oldJob = *job
oldJob.ModifyIndex -= 1
tainted := map[string]bool{
"dead": true,
"baz": false,
}
allocs := []*structs.Allocation{
// Update allocation on baz
&structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "baz",
Name: "my-job.web[0]",
Job: oldJob,
},
// Ignore allocation on bar
&structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "bar",
Name: "my-job.web[0]",
Job: job,
},
// Stop allocation on dead.
&structs.Allocation{
ID: structs.GenerateUUID(),
NodeID: "dead",
Name: "my-job.web[0]",
},
}
diff := diffSystemAllocs(job, nodes, tainted, allocs)
place := diff.place
update := diff.update
migrate := diff.migrate
stop := diff.stop
ignore := diff.ignore
// We should update the first alloc
if len(update) != 1 || update[0].Alloc != allocs[0] {
t.Fatalf("bad: %#v", update)
}
// We should ignore the second alloc
if len(ignore) != 1 || ignore[0].Alloc != allocs[1] {
t.Fatalf("bad: %#v", ignore)
}
// We should stop the third alloc
if len(stop) != 1 || stop[0].Alloc != allocs[2] {
t.Fatalf("bad: %#v", stop)
}
// There should be no migrates.
if len(migrate) != 0 {
t.Fatalf("bad: %#v", migrate)
}
// We should place 1
if len(place) != 1 {
t.Fatalf("bad: %#v", place)
}
}
func TestReadyNodesInDCs(t *testing.T) {
state, err := state.NewStateStore(os.Stderr)
if err != nil {