From 85e0d6fccd5eb2d2b792a1eeb2322e0c1dff7167 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 31 May 2017 11:34:46 -0700 Subject: [PATCH] assign names --- client/driver/env/env.go | 2 +- nomad/structs/bitmap.go | 9 + nomad/structs/bitmap_test.go | 22 +++ nomad/structs/funcs.go | 5 + nomad/structs/structs.go | 35 ++-- nomad/structs/structs_test.go | 18 +- scheduler/generic_sched_test.go | 4 +- scheduler/reconcile.go | 295 ++++++++++++++++++++++++-------- scheduler/reconcile_util.go | 16 ++ 9 files changed, 310 insertions(+), 96 deletions(-) diff --git a/client/driver/env/env.go b/client/driver/env/env.go index 481bd8853..71bee9e44 100644 --- a/client/driver/env/env.go +++ b/client/driver/env/env.go @@ -380,7 +380,7 @@ func (b *Builder) setTask(task *structs.Task) *Builder { func (b *Builder) setAlloc(alloc *structs.Allocation) *Builder { b.allocId = alloc.ID b.allocName = alloc.Name - b.allocIndex = alloc.Index() + b.allocIndex = int(alloc.Index()) b.jobName = alloc.Job.Name // Set meta diff --git a/nomad/structs/bitmap.go b/nomad/structs/bitmap.go index 78a213a3f..63758a0be 100644 --- a/nomad/structs/bitmap.go +++ b/nomad/structs/bitmap.go @@ -40,6 +40,15 @@ func (b Bitmap) Set(idx uint) { b[bucket] |= mask } +// Unset is used to unset the given index of the bitmap +func (b Bitmap) Unset(idx uint) { + bucket := idx >> 3 + // Mask should be all ones minus the idx position + offset := 1 << (idx & 7) + mask := byte(offset ^ 0xff) + b[bucket] &= mask +} + // Check is used to check the given index of the bitmap func (b Bitmap) Check(idx uint) bool { bucket := idx >> 3 diff --git a/nomad/structs/bitmap_test.go b/nomad/structs/bitmap_test.go index 83961697d..42b2c635e 100644 --- a/nomad/structs/bitmap_test.go +++ b/nomad/structs/bitmap_test.go @@ -96,4 +96,26 @@ func TestBitmap(t *testing.T) { t.Fatalf("bad") } } + + // Set a few bits + b.Set(0) + b.Set(255) + b.Unset(0) + b.Unset(255) + + // Clear the bits + if b[0] != 0 { + t.Fatalf("bad") + } + if b.Check(0) { + t.Fatalf("bad") + } + + // Verify the bytes + if b[len(b)-1] != 0 { + t.Fatalf("bad") + } + if b.Check(255) { + t.Fatalf("bad") + } } diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index ab8a980a2..b6c5ef12c 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -214,3 +214,8 @@ func DenormalizeAllocationJobs(job *Job, allocs []*Allocation) { } } } + +// AllocName returns the name of the allocation given the input. +func AllocName(job, group string, idx uint) string { + return fmt.Sprintf("%s.%s[%d]", job, group, idx) +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e2b4942cc..66c45f65d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3976,6 +3976,20 @@ type Allocation struct { CreateTime int64 } +// Index returns the index of the allocation. If the allocation is from a task +// group with count greater than 1, there will be multiple allocations for it. +func (a *Allocation) Index() uint { + l := len(a.Name) + prefix := len(a.JobID) + len(a.TaskGroup) + 2 + if l <= 3 || l <= prefix { + return uint(0) + } + + strNum := a.Name[prefix : len(a.Name)-1] + num, _ := strconv.Atoi(strNum) + return uint(num) +} + func (a *Allocation) Copy() *Allocation { return a.copyImpl(true) } @@ -4109,27 +4123,6 @@ func (a *Allocation) ShouldMigrate() bool { return true } -var ( - // AllocationIndexRegex is a regular expression to find the allocation index. - AllocationIndexRegex = regexp.MustCompile(".+\\[(\\d+)\\]$") -) - -// Index returns the index of the allocation. If the allocation is from a task -// group with count greater than 1, there will be multiple allocations for it. -func (a *Allocation) Index() int { - matches := AllocationIndexRegex.FindStringSubmatch(a.Name) - if len(matches) != 2 { - return -1 - } - - index, err := strconv.Atoi(matches[1]) - if err != nil { - return -1 - } - - return index -} - // AllocListStub is used to return a subset of alloc information type AllocListStub struct { ID string diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 3942d47a3..f6e0328f1 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1658,13 +1658,21 @@ func TestRestartPolicy_Validate(t *testing.T) { } func TestAllocation_Index(t *testing.T) { - a1 := Allocation{Name: "example.cache[0]"} - e1 := 0 - a2 := Allocation{Name: "ex[123]am123ple.c311ac[123]he12[1][77]"} - e2 := 77 + a1 := Allocation{ + Name: "example.cache[1]", + TaskGroup: "cache", + JobID: "example", + Job: &Job{ + ID: "example", + TaskGroups: []*TaskGroup{{Name: "cache"}}}, + } + e1 := uint(1) + a2 := a1.Copy() + a2.Name = "example.cache[713127]" + e2 := uint(713127) if a1.Index() != e1 || a2.Index() != e2 { - t.Fatal() + t.Fatalf("Got %d and %d", a1.Index(), a2.Index()) } } diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index e1616b22e..9ef7c0583 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -1289,7 +1289,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID - alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.Name = structs.AllocName(alloc.JobID, alloc.TaskGroup, uint(i)) allocs = append(allocs, alloc) } noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs)) @@ -1301,7 +1301,7 @@ func TestServiceSched_JobModify_CountZero(t *testing.T) { alloc.Job = job alloc.JobID = job.ID alloc.NodeID = nodes[i].ID - alloc.Name = fmt.Sprintf("my-job.web[%d]", i) + alloc.Name = structs.AllocName(alloc.JobID, alloc.TaskGroup, uint(i)) alloc.DesiredStatus = structs.AllocDesiredStatusStop terminal = append(terminal, alloc) } diff --git a/scheduler/reconcile.go b/scheduler/reconcile.go index 44cd16667..7d1be5357 100644 --- a/scheduler/reconcile.go +++ b/scheduler/reconcile.go @@ -1,19 +1,11 @@ package scheduler import ( - "sort" - memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) -/* TODO - * 1) We need a structure that yields names in an order that fills the gaps - * between the existing allocations and handles canaries replacing certain - * allocations. - */ - // allocReconciler is used to determine the set of allocations that require // placement, inplace updating or stopping given the job specification and // existing cluster state. The reconciler should only be used for batch and @@ -218,6 +210,20 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { return } + // Get the deployment state for the group + creatingDeployment := a.result.createDeployment != nil + var dstate *structs.DeploymentState + if a.deployment != nil { + var ok bool + dstate, ok = a.deployment.TaskGroups[group] + + // We are creating a deployment + if !ok && creatingDeployment { + dstate = &structs.DeploymentState{} + a.deployment.TaskGroups[group] = dstate + } + } + // Track the lost and migrating desiredChanges.Migrate += uint64(len(migrate) + len(lost)) @@ -247,9 +253,8 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { a.ctx.Logger().Printf("RECONCILER -- older canaries %#v", older) a.ctx.Logger().Printf("RECONCILER -- current canaries %#v", current) - untainted = untainted.difference(older, current) + untainted = untainted.difference(older) canaries = current - a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted) } else { // We don't need any of those canaries since there no longer is a // deployment @@ -257,16 +262,29 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { desiredChanges.Stop += uint64(len(canaries)) untainted = untainted.difference(canaries) canaries = nil - a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted) } + a.ctx.Logger().Printf("RECONCILER -- untainted - remove canaries %#v", untainted) } + // Create a structure for choosing names + nameIndex := newAllocNameIndex(a.eval.JobID, group, tg.Count, untainted) + // Stop any unneeded allocations and update the untainted set to not - // included stopped allocations - keep, stop := a.computeStop(tg, untainted) + // included stopped allocations. We ignore canaries since that can push us + // over the desired count + existingCanariesPromoted := dstate == nil || dstate.DesiredCanaries == 0 || dstate.Promoted + stop := a.computeStop(tg, nameIndex, untainted.difference(canaries), canaries, existingCanariesPromoted) a.markStop(stop, "", allocNotNeeded) desiredChanges.Stop += uint64(len(stop)) - untainted = keep + untainted = untainted.difference(stop) + + // Having stopped un-needed allocations, append the canaries to the existing + // set of untainted because they are promoted. This will cause them to be + // treated like non-canaries + if existingCanariesPromoted { + untainted = untainted.union(canaries) + nameIndex.Add(canaries) + } // Do inplace upgrades where possible and capture the set of upgrades that // need to be done destructively. @@ -275,33 +293,19 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { desiredChanges.InPlaceUpdate += uint64(len(inplace)) desiredChanges.DestructiveUpdate += uint64(len(destructive)) - a.ctx.Logger().Printf("RECONCILER -- Stopping (%d); Untainted (%d)", len(stop), len(keep)) + a.ctx.Logger().Printf("RECONCILER -- Stopping (%d)", len(stop)) a.ctx.Logger().Printf("RECONCILER -- Inplace (%d); Destructive (%d)", len(inplace), len(destructive)) // Get the update strategy of the group strategy := tg.Update - // XXX need a structure for picking names - - // Get the deployment state for the group - creatingDeployment := a.result.createDeployment != nil - var dstate *structs.DeploymentState - if a.deployment != nil { - var ok bool - dstate, ok = a.deployment.TaskGroups[group] - - // We are creating a deployment - if !ok && creatingDeployment { - dstate = &structs.DeploymentState{} - a.deployment.TaskGroups[group] = dstate - } - } - // The fact that we have destructive updates and have less canaries than is // desired means we need to create canaries - requireCanary := len(destructive) != 0 && strategy != nil && len(canaries) < strategy.Canary + numDestructive := len(destructive) + requireCanary := numDestructive != 0 && strategy != nil && len(canaries) < strategy.Canary if requireCanary && !a.deploymentPaused { number := strategy.Canary - len(canaries) + number = helper.IntMin(numDestructive, number) desiredChanges.Canary += uint64(number) if creatingDeployment { dstate.DesiredCanaries = strategy.Canary @@ -309,10 +313,9 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { } a.ctx.Logger().Printf("RECONCILER -- Canary (%d)", number) - for i := 0; i < number; i++ { + for _, name := range nameIndex.NextCanaries(uint(number), canaries, destructive) { a.result.place = append(a.result.place, allocPlaceResult{ - // XXX Pick better name - name: structs.GenerateUUID(), + name: name, canary: true, taskGroup: tg, }) @@ -321,19 +324,18 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // Determine how many we can place haveCanaries := dstate != nil && dstate.DesiredCanaries != 0 - limit := a.computeLimit(tg, untainted, haveCanaries) + limit := a.computeLimit(tg, untainted, destructive, haveCanaries) a.ctx.Logger().Printf("RECONCILER -- LIMIT %v", limit) // Place if: // * The deployment is not paused // * Not placing any canaries // * If there are any canaries that they have been promoted - place := a.computePlacements(tg, untainted) + place := a.computePlacements(tg, nameIndex, untainted) if creatingDeployment { dstate.DesiredTotal += len(place) } - existingCanariesPromoted := dstate == nil || dstate.DesiredCanaries == 0 || dstate.Promoted if !a.deploymentPaused && existingCanariesPromoted { // Update the desired changes and if we are creating a deployment update // the state. @@ -387,10 +389,10 @@ func (a *allocReconciler) computeGroup(group string, as allocSet) { // computeLimit returns the placement limit for a particular group. The inputs // are the group definition, the existing/untainted allocation set and whether // any canaries exist or are being placed. -func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted allocSet, canaries bool) int { +func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, destructive allocSet, canaries bool) int { // If there is no update stategy or deployment for the group we can deploy // as many as the group has - if group.Update == nil || a.deployment == nil { + if group.Update == nil || len(destructive) == 0 { return group.Count } else if a.deploymentPaused { // If the deployment is paused, do not create anything else @@ -421,18 +423,19 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted alloc // computePlacement returns the set of allocations to place given the group // definiton and the set of untainted/existing allocations for the group. -func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted allocSet) []allocPlaceResult { +func (a *allocReconciler) computePlacements(group *structs.TaskGroup, + nameIndex *allocNameIndex, untainted allocSet) []allocPlaceResult { + // Hot path the nothing to do case existing := len(untainted) - if existing == group.Count { + if existing >= group.Count { return nil } - // XXX need to pick better names var place []allocPlaceResult - for i := existing; i < group.Count; i++ { + for _, name := range nameIndex.Next(uint(group.Count - existing)) { place = append(place, allocPlaceResult{ - name: structs.GenerateUUID(), + name: name, taskGroup: group, }) } @@ -441,34 +444,28 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup, untainted } // computeStop returns the set of allocations to stop given the group definiton -// and the set of untainted/existing allocations for the group. -func (a *allocReconciler) computeStop(group *structs.TaskGroup, untainted allocSet) (keep, stop allocSet) { +// and the set of untainted and canary allocations for the group. +func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *allocNameIndex, + untainted, canaries allocSet, promoted bool) allocSet { // Hot path the nothing to do case - if len(untainted) <= group.Count { - return untainted, nil + remove := len(untainted) - group.Count + if promoted { + remove += len(canaries) + } + if remove <= 0 { + return nil } - // XXX Sort doesn't actually do the right thing "foo.bar[11]" < "foo.bar[3]" - // TODO make name tree - names := make([]string, 0, len(untainted)) - for name := range untainted { - names = append(names, name) - } - sort.Strings(names) - - keep = make(map[string]*structs.Allocation) - stop = make(map[string]*structs.Allocation) - - for i, name := range names { - a := untainted[name] - if i < group.Count { - keep[a.Name] = a - } else { - stop[a.Name] = a + // nameIndex does not include the canaries + removeNames := nameIndex.Highest(uint(remove)) + stop := make(map[string]*structs.Allocation) + for id, a := range untainted { + if _, remove := removeNames[a.Name]; remove { + stop[id] = a } } - return + return stop } // computeUpdates determines which allocations for the passed group require @@ -565,3 +562,167 @@ func (a *allocReconciler) computeUpdates(group *structs.TaskGroup, untainted all return } + +// allocNameIndex is used to select allocation names for placement or removal +// given an existing set of placed allocations. +type allocNameIndex struct { + job, taskGroup string + count int + b structs.Bitmap +} + +// newAllocNameIndex returns an allocNameIndex for use in selecting names of +// allocations to create or stop. It takes the job and task group name, desired +// count and any existing allocations as input. +func newAllocNameIndex(job, taskGroup string, count int, in allocSet) *allocNameIndex { + return &allocNameIndex{ + count: count, + b: bitmapFrom(in, uint(count)), + job: job, + taskGroup: taskGroup, + } +} + +func bitmapFrom(input allocSet, minSize uint) structs.Bitmap { + var max uint + for _, a := range input { + if num := a.Index(); num > max { + max = num + } + } + + if l := uint(len(input)); minSize < l { + minSize = l + } + if max < minSize { + max = minSize + } + if max == 0 { + max = 8 + } + + // byteAlign the count + if remainder := max % 8; remainder != 0 { + max = max + 8 - remainder + } + + bitmap, err := structs.NewBitmap(max) + if err != nil { + panic(err) + } + + for _, a := range input { + bitmap.Set(a.Index()) + } + + return bitmap +} + +// Add adds the allocations to the name index +func (a *allocNameIndex) Add(set allocSet) { + for _, alloc := range set { + a.b.Set(alloc.Index()) + } +} + +// RemoveHighest removes and returns the hightest n used names. The returned set +// can be less than n if there aren't n names set in the index +func (a *allocNameIndex) Highest(n uint) map[string]struct{} { + h := make(map[string]struct{}, n) + for i := a.b.Size(); i > uint(0) && uint(len(h)) <= n; i-- { + // Use this to avoid wrapping around b/c of the unsigned int + idx := i - 1 + if a.b.Check(idx) { + a.b.Unset(idx) + h[structs.AllocName(a.job, a.taskGroup, idx)] = struct{}{} + } + } + + return h +} + +// NextCanaries returns the next n names for use as canaries and sets them as +// used. The existing canaries and destructive updates are also passed in. +func (a *allocNameIndex) NextCanaries(n uint, existing, destructive allocSet) []string { + next := make([]string, 0, n) + + // First select indexes from the allocations that are undergoing destructive + // updates. This way we avoid duplicate names as they will get replaced. + dmap := bitmapFrom(destructive, uint(a.count)) + var remainder uint + for _, idx := range dmap.IndexesInRange(true, uint(0), uint(a.count)-1) { + name := structs.AllocName(a.job, a.taskGroup, uint(idx)) + if _, used := existing[name]; !used { + next = append(next, name) + a.b.Set(uint(idx)) + + // If we have enough, return + remainder := n - uint(len(next)) + if remainder == 0 { + return next + } + } + } + + // Get the set of unset names that can be used + for _, idx := range a.b.IndexesInRange(false, uint(0), uint(a.count)-1) { + name := structs.AllocName(a.job, a.taskGroup, uint(idx)) + if _, used := existing[name]; !used { + next = append(next, name) + a.b.Set(uint(idx)) + + // If we have enough, return + remainder = n - uint(len(next)) + if remainder == 0 { + return next + } + } + } + + // We have exhausted the prefered and free set, now just pick overlapping + // indexes + var i uint + for i = 0; i < remainder; i++ { + name := structs.AllocName(a.job, a.taskGroup, i) + if _, used := existing[name]; !used { + next = append(next, name) + a.b.Set(i) + + // If we have enough, return + remainder = n - uint(len(next)) + if remainder == 0 { + return next + } + } + } + + return next +} + +// Next returns the next n names for use as new placements and sets them as +// used. +func (a *allocNameIndex) Next(n uint) []string { + next := make([]string, 0, n) + + // Get the set of unset names that can be used + var remainder uint + for _, idx := range a.b.IndexesInRange(false, uint(0), uint(a.count)-1) { + next = append(next, structs.AllocName(a.job, a.taskGroup, uint(idx))) + a.b.Set(uint(idx)) + + // If we have enough, return + remainder := n - uint(len(next)) + if remainder == 0 { + return next + } + } + + // We have exhausted the free set, now just pick overlapping indexes + var i uint + for i = 0; i < remainder; i++ { + next = append(next, structs.AllocName(a.job, a.taskGroup, i)) + a.b.Set(i) + } + + return next +} diff --git a/scheduler/reconcile_util.go b/scheduler/reconcile_util.go index c30168a0b..e039986a5 100644 --- a/scheduler/reconcile_util.go +++ b/scheduler/reconcile_util.go @@ -73,6 +73,22 @@ OUTER: return diff } +// union returns a new allocSet that has the union of the two allocSets. +// Conflicts prefer the last passed allocSet containing the value +func (a allocSet) union(others ...allocSet) allocSet { + union := make(map[string]*structs.Allocation, len(a)) + order := []allocSet{a} + order = append(order, others...) + + for _, set := range order { + for k, v := range set { + union[k] = v + } + } + + return union +} + // fitlerByTainted takes a set of tainted nodes and filters the allocation set // into three groups: // 1. Those that exist on untainted nodes