Merge pull request #2841 from hashicorp/b-rolling-no-fit

Treat destructive updates atomically
This commit is contained in:
Alex Dadgar
2017-07-19 11:11:25 -07:00
committed by GitHub
11 changed files with 748 additions and 177 deletions

View File

@@ -2997,6 +2997,8 @@ func (t *Task) Copy() *Task {
}
if i, err := copystructure.Copy(nt.Config); err != nil {
panic(err.Error())
} else {
nt.Config = i.(map[string]interface{})
}

View File

@@ -425,7 +425,7 @@ func (s *GenericScheduler) computeJobAllocs() error {
}
// Nothing remaining to do if placement is not required
if len(results.place) == 0 {
if len(results.place)+len(results.destructiveUpdate) == 0 {
if !s.job.Stopped() {
for _, tg := range s.job.TaskGroups {
s.queuedAllocs[tg.Name] = 0
@@ -438,13 +438,23 @@ func (s *GenericScheduler) computeJobAllocs() error {
for _, place := range results.place {
s.queuedAllocs[place.taskGroup.Name] += 1
}
for _, destructive := range results.destructiveUpdate {
s.queuedAllocs[destructive.placeTaskGroup.Name] += 1
}
// Compute the placements
return s.computePlacements(results.place)
place := make([]placementResult, 0, len(results.place)+len(results.destructiveUpdate))
for _, p := range results.place {
place = append(place, p)
}
for _, p := range results.destructiveUpdate {
place = append(place, p)
}
return s.computePlacements(place)
}
// computePlacements computes placements for allocations
func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
func (s *GenericScheduler) computePlacements(place []placementResult) error {
// Get the base nodes
nodes, byDC, err := readyNodesInDCs(s.state, s.job.Datacenters)
if err != nil {
@@ -460,24 +470,36 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
s.stack.SetNodes(nodes)
for _, missing := range place {
// Get the task group
tg := missing.TaskGroup()
// Check if this task group has already failed
if metric, ok := s.failedTGAllocs[missing.taskGroup.Name]; ok {
if metric, ok := s.failedTGAllocs[tg.Name]; ok {
metric.CoalescedFailures += 1
continue
}
// Find the preferred node
preferredNode, err := s.findPreferredNode(&missing)
preferredNode, err := s.findPreferredNode(missing)
if err != nil {
return err
}
// Check if we should stop the previous allocation upon successful
// placement of its replacement. This allow atomic placements/stops. We
// stop the allocation before trying to find a replacement because this
// frees the resources currently used by the previous allocation.
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
if stopPrevAlloc {
s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "")
}
// Attempt to match the task group
var option *RankedNode
if preferredNode != nil {
option, _ = s.stack.SelectPreferringNodes(missing.taskGroup, []*structs.Node{preferredNode})
option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode})
} else {
option, _ = s.stack.Select(missing.taskGroup)
option, _ = s.stack.Select(tg)
}
// Store the available nodes by datacenter
@@ -489,9 +511,9 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
alloc := &structs.Allocation{
ID: structs.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.name,
Name: missing.Name(),
JobID: s.job.ID,
TaskGroup: missing.taskGroup.Name,
TaskGroup: tg.Name,
Metrics: s.ctx.Metrics(),
NodeID: option.Node.ID,
DeploymentID: deploymentID,
@@ -500,32 +522,41 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
ClientStatus: structs.AllocClientStatusPending,
SharedResources: &structs.Resources{
DiskMB: missing.taskGroup.EphemeralDisk.SizeMB,
DiskMB: tg.EphemeralDisk.SizeMB,
},
}
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if missing.previousAlloc != nil {
alloc.PreviousAllocation = missing.previousAlloc.ID
if prev := missing.PreviousAllocation(); prev != nil {
alloc.PreviousAllocation = prev.ID
}
// If we are placing a canary and we found a match, add the canary
// to the deployment state object.
if missing.canary {
if state, ok := s.deployment.TaskGroups[missing.taskGroup.Name]; ok {
if missing.Canary() {
if state, ok := s.deployment.TaskGroups[tg.Name]; ok {
state.PlacedCanaries = append(state.PlacedCanaries, alloc.ID)
}
}
// Track the placement
s.plan.AppendAlloc(alloc)
} else {
// Lazy initialize the failed map
if s.failedTGAllocs == nil {
s.failedTGAllocs = make(map[string]*structs.AllocMetric)
}
s.failedTGAllocs[missing.taskGroup.Name] = s.ctx.Metrics()
// Track the fact that we didn't find a placement
s.failedTGAllocs[tg.Name] = s.ctx.Metrics()
// If we weren't able to find a replacement for the allocation, back
// out the fact that we asked to stop the allocation.
if stopPrevAlloc {
s.plan.PopUpdate(missing.PreviousAllocation())
}
}
}
@@ -533,11 +564,11 @@ func (s *GenericScheduler) computePlacements(place []allocPlaceResult) error {
}
// findPreferredNode finds the preferred node for an allocation
func (s *GenericScheduler) findPreferredNode(place *allocPlaceResult) (node *structs.Node, err error) {
if place.previousAlloc != nil && place.taskGroup.EphemeralDisk.Sticky == true {
func (s *GenericScheduler) findPreferredNode(place placementResult) (node *structs.Node, err error) {
if prev := place.PreviousAllocation(); prev != nil && place.TaskGroup().EphemeralDisk.Sticky == true {
var preferredNode *structs.Node
ws := memdb.NewWatchSet()
preferredNode, err = s.state.NodeByID(ws, place.previousAlloc.NodeID)
preferredNode, err = s.state.NodeByID(ws, prev.NodeID)
if preferredNode.Ready() {
node = preferredNode
}

View File

@@ -1481,6 +1481,105 @@ func TestServiceSched_JobModify_Rolling(t *testing.T) {
}
}
// This tests that the old allocation is stopped before placing.
func TestServiceSched_JobModify_Rolling_FullNode(t *testing.T) {
h := NewHarness(t)
// Create a node
node := mock.Node()
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
resourceAsk := node.Resources.Copy()
resourceAsk.CPU -= node.Reserved.CPU
resourceAsk.MemoryMB -= node.Reserved.MemoryMB
resourceAsk.DiskMB -= node.Reserved.DiskMB
resourceAsk.Networks = nil
// Generate a fake job with one alloc that consumes the whole node
job := mock.Job()
job.TaskGroups[0].Count = 1
job.TaskGroups[0].Tasks[0].Resources = resourceAsk
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
alloc := mock.Alloc()
alloc.Resources = resourceAsk
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = node.ID
alloc.Name = "my-job.web[0]"
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{alloc}))
// Update the job
job2 := job.Copy()
job2.TaskGroups[0].Update = &structs.UpdateStrategy{
MaxParallel: 1,
HealthCheck: structs.UpdateStrategyHealthCheck_Checks,
MinHealthyTime: 10 * time.Second,
HealthyDeadline: 10 * time.Minute,
}
// Update the task, such that it cannot be done in-place
job2.TaskGroups[0].Tasks[0].Config["command"] = "/bin/other"
noErr(t, h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
ID: structs.GenerateUUID(),
Priority: 50,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
}
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure a single plan
if len(h.Plans) != 1 {
t.Fatalf("bad: %#v", h.Plans)
}
plan := h.Plans[0]
// Ensure the plan evicted only MaxParallel
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(update) != 1 {
t.Fatalf("bad: got %d; want %d: %#v", len(update), 1, plan)
}
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
if len(planned) != 1 {
t.Fatalf("bad: %#v", plan)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
// Check that the deployment id is attached to the eval
if h.Evals[0].DeploymentID == "" {
t.Fatalf("Eval not annotated with deployment id")
}
// Ensure a deployment was created
if plan.Deployment == nil {
t.Fatalf("bad: %#v", plan)
}
state, ok := plan.Deployment.TaskGroups[job.TaskGroups[0].Name]
if !ok {
t.Fatalf("bad: %#v", plan)
}
if state.DesiredTotal != 1 && state.DesiredCanaries != 0 {
t.Fatalf("bad: %#v", state)
}
}
func TestServiceSched_JobModify_Canaries(t *testing.T) {
h := NewHarness(t)

View File

@@ -77,6 +77,9 @@ type reconcileResults struct {
// place is the set of allocations to place by the scheduler
place []allocPlaceResult
// destructiveUpdate is the set of allocations to apply a destructive update to
destructiveUpdate []allocDestructiveResult
// inplaceUpdate is the set of allocations to apply an inplace update to
inplaceUpdate []*structs.Allocation
@@ -92,25 +95,9 @@ type reconcileResults struct {
followupEvalWait time.Duration
}
// allocPlaceResult contains the information required to place a single
// allocation
type allocPlaceResult struct {
name string
canary bool
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
}
// allocStopResult contains the information required to stop a single allocation
type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
}
func (r *reconcileResults) GoString() string {
base := fmt.Sprintf("Total changes: (place %d) (update %d) (stop %d)",
len(r.place), len(r.inplaceUpdate), len(r.stop))
base := fmt.Sprintf("Total changes: (place %d) (destructive %d) (inplace %d) (stop %d)",
len(r.place), len(r.destructiveUpdate), len(r.inplaceUpdate), len(r.stop))
if r.deployment != nil {
base += fmt.Sprintf("\nCreated Deployment: %q", r.deployment.ID)
@@ -391,14 +378,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
desiredChanges.DestructiveUpdate += uint64(min)
desiredChanges.Ignore += uint64(len(destructive) - min)
for _, alloc := range destructive.nameOrder()[:min] {
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
statusDescription: allocUpdating,
})
a.result.place = append(a.result.place, allocPlaceResult{
name: alloc.Name,
taskGroup: tg,
previousAlloc: alloc,
a.result.destructiveUpdate = append(a.result.destructiveUpdate, allocDestructiveResult{
placeName: alloc.Name,
placeTaskGroup: tg,
stopAlloc: alloc,
stopStatusDescription: allocUpdating,
})
}
} else {

View File

@@ -235,6 +235,14 @@ func placeResultsToNames(place []allocPlaceResult) []string {
return names
}
func destructiveResultsToNames(destructive []allocDestructiveResult) []string {
names := make([]string, 0, len(destructive))
for _, d := range destructive {
names = append(names, d.placeName)
}
return names
}
func stopResultsToNames(stop []allocStopResult) []string {
names := make([]string, 0, len(stop))
for _, s := range stop {
@@ -255,6 +263,7 @@ type resultExpectation struct {
createDeployment *structs.Deployment
deploymentUpdates []*structs.DeploymentStatusUpdate
place int
destructive int
inplace int
stop int
desiredTGUpdates map[string]*structs.DesiredUpdates
@@ -282,6 +291,9 @@ func assertResults(t *testing.T, r *reconcileResults, exp *resultExpectation) {
if l := len(r.place); l != exp.place {
t.Fatalf("Expected %d placements; got %d", exp.place, l)
}
if l := len(r.destructiveUpdate); l != exp.destructive {
t.Fatalf("Expected %d destructive; got %d", exp.destructive, l)
}
if l := len(r.inplaceUpdate); l != exp.inplace {
t.Fatalf("Expected %d inplaceUpdate; got %d", exp.inplace, l)
}
@@ -582,9 +594,7 @@ func TestReconciler_Destructive(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 10,
inplace: 0,
stop: 10,
destructive: 10,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 10,
@@ -592,9 +602,7 @@ func TestReconciler_Destructive(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertPlaceResultsHavePreviousAllocs(t, 10, r.place)
assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler properly handles destructive upgrading allocations while
@@ -622,9 +630,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 15,
inplace: 0,
stop: 10,
place: 5,
destructive: 10,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Place: 5,
@@ -633,9 +640,8 @@ func TestReconciler_Destructive_ScaleUp(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 14), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 10, r.place)
assertNamesHaveIndexes(t, intRange(0, 9), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(10, 14), placeResultsToNames(r.place))
}
// Tests the reconciler properly handles destructive upgrading allocations while
@@ -663,9 +669,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 5,
inplace: 0,
stop: 10,
destructive: 5,
stop: 5,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Stop: 5,
@@ -674,9 +679,8 @@ func TestReconciler_Destructive_ScaleDown(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 4), placeResultsToNames(r.place))
assertPlaceResultsHavePreviousAllocs(t, 5, r.place)
assertNamesHaveIndexes(t, intRange(5, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 4), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler properly handles lost nodes with allocations
@@ -1320,9 +1324,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: d,
deploymentUpdates: nil,
place: 4,
inplace: 0,
stop: 4,
destructive: 4,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 4,
@@ -1331,8 +1333,7 @@ func TestReconciler_CreateDeployment_RollingUpgrade_Destructive(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler creates a deployment for inplace updates
@@ -2208,9 +2209,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 2,
inplace: 0,
stop: 4,
destructive: 2,
stop: 2,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
Stop: 2,
@@ -2221,8 +2221,8 @@ func TestReconciler_PromoteCanaries_Unblock(t *testing.T) {
})
assertNoCanariesStopped(t, d, r.stop)
assertNamesHaveIndexes(t, intRange(2, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(2, 3), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
}
// Tests the reconciler handles canary promotion when the canary count equals
@@ -2381,9 +2381,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: c.healthy,
inplace: 0,
stop: c.healthy,
destructive: c.healthy,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: uint64(c.healthy),
@@ -2393,8 +2391,7 @@ func TestReconciler_DeploymentLimit_HealthAccounting(t *testing.T) {
})
if c.healthy != 0 {
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(4, 3+c.healthy), destructiveResultsToNames(r.destructiveUpdate))
}
})
}
@@ -2411,7 +2408,7 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
d.TaskGroups[job.TaskGroups[0].Name] = &structs.DeploymentState{
Promoted: true,
DesiredTotal: 10,
PlacedAllocs: 4,
PlacedAllocs: 7,
}
// Create 3 allocations from the old job
@@ -2464,9 +2461,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: nil,
deploymentUpdates: nil,
place: 5,
inplace: 0,
stop: 5,
place: 2,
destructive: 3,
stop: 2,
followupEvalWait: 31 * time.Second,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
@@ -2479,8 +2476,9 @@ func TestReconciler_TaintedNode_RollingUpgrade(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1, 7, 9), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(7, 9), destructiveResultsToNames(r.destructiveUpdate))
assertNamesHaveIndexes(t, intRange(0, 1), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 1), stopResultsToNames(r.stop))
}
// Tests the reconciler handles a failed deployment and does no placements
@@ -2762,9 +2760,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) {
assertResults(t, r, &resultExpectation{
createDeployment: dnew,
deploymentUpdates: nil,
place: 4,
inplace: 0,
stop: 4,
destructive: 4,
desiredTGUpdates: map[string]*structs.DesiredUpdates{
job.TaskGroups[0].Name: {
DestructiveUpdate: 4,
@@ -2773,8 +2769,7 @@ func TestReconciler_FailedDeployment_NewJob(t *testing.T) {
},
})
assertNamesHaveIndexes(t, intRange(0, 3), stopResultsToNames(r.stop))
assertNamesHaveIndexes(t, intRange(0, 3), placeResultsToNames(r.place))
assertNamesHaveIndexes(t, intRange(0, 3), destructiveResultsToNames(r.destructiveUpdate))
}
// Tests the reconciler marks a deployment as complete

View File

@@ -8,6 +8,69 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// placementResult is an allocation that must be placed. It potentionally has a
// previous allocation attached to it that should be stopped only if the
// paired placement is complete. This gives an atomic place/stop behavior to
// prevent an impossible resource ask as part of a rolling update to wipe the
// job out.
type placementResult interface {
// TaskGroup returns the task group the placement is for
TaskGroup() *structs.TaskGroup
// Name returns the name of the desired allocation
Name() string
// Canary returns whether the placement should be a canary
Canary() bool
// PreviousAllocation returns the previous allocation
PreviousAllocation() *structs.Allocation
// StopPreviousAlloc returns whether the previous allocation should be
// stopped and if so the status description.
StopPreviousAlloc() (bool, string)
}
// allocStopResult contains the information required to stop a single allocation
type allocStopResult struct {
alloc *structs.Allocation
clientStatus string
statusDescription string
}
// allocPlaceResult contains the information required to place a single
// allocation
type allocPlaceResult struct {
name string
canary bool
taskGroup *structs.TaskGroup
previousAlloc *structs.Allocation
}
func (a allocPlaceResult) TaskGroup() *structs.TaskGroup { return a.taskGroup }
func (a allocPlaceResult) Name() string { return a.name }
func (a allocPlaceResult) Canary() bool { return a.canary }
func (a allocPlaceResult) PreviousAllocation() *structs.Allocation { return a.previousAlloc }
func (a allocPlaceResult) StopPreviousAlloc() (bool, string) { return false, "" }
// allocDestructiveResult contains the information required to do a destructive
// update. Destructive changes should be applied atomically, as in the old alloc
// is only stopped if the new one can be placed.
type allocDestructiveResult struct {
placeName string
placeTaskGroup *structs.TaskGroup
stopAlloc *structs.Allocation
stopStatusDescription string
}
func (a allocDestructiveResult) TaskGroup() *structs.TaskGroup { return a.placeTaskGroup }
func (a allocDestructiveResult) Name() string { return a.placeName }
func (a allocDestructiveResult) Canary() bool { return false }
func (a allocDestructiveResult) PreviousAllocation() *structs.Allocation { return a.stopAlloc }
func (a allocDestructiveResult) StopPreviousAlloc() (bool, string) {
return true, a.stopStatusDescription
}
// allocMatrix is a mapping of task groups to their allocation set.
type allocMatrix map[string]allocSet

View File

@@ -698,25 +698,27 @@ func desiredUpdates(diff *diffResult, inplaceUpdates,
// adjustQueuedAllocations decrements the number of allocations pending per task
// group based on the number of allocations successfully placed
func adjustQueuedAllocations(logger *log.Logger, result *structs.PlanResult, queuedAllocs map[string]int) {
if result != nil {
for _, allocations := range result.NodeAllocation {
for _, allocation := range allocations {
// Ensure that the allocation is newly created. We check that
// the CreateIndex is equal to the ModifyIndex in order to check
// that the allocation was just created. We do not check that
// the CreateIndex is equal to the results AllocIndex because
// the allocations we get back have gone through the planner's
// optimistic snapshot and thus their indexes may not be
// correct, but they will be consistent.
if allocation.CreateIndex != allocation.ModifyIndex {
continue
}
if result == nil {
return
}
if _, ok := queuedAllocs[allocation.TaskGroup]; ok {
queuedAllocs[allocation.TaskGroup] -= 1
} else {
logger.Printf("[ERR] sched: allocation %q placed but not in list of unplaced allocations", allocation.TaskGroup)
}
for _, allocations := range result.NodeAllocation {
for _, allocation := range allocations {
// Ensure that the allocation is newly created. We check that
// the CreateIndex is equal to the ModifyIndex in order to check
// that the allocation was just created. We do not check that
// the CreateIndex is equal to the results AllocIndex because
// the allocations we get back have gone through the planner's
// optimistic snapshot and thus their indexes may not be
// correct, but they will be consistent.
if allocation.CreateIndex != allocation.ModifyIndex {
continue
}
if _, ok := queuedAllocs[allocation.TaskGroup]; ok {
queuedAllocs[allocation.TaskGroup] -= 1
} else {
logger.Printf("[ERR] sched: allocation %q placed but not in list of unplaced allocations", allocation.TaskGroup)
}
}
}

View File

@@ -1,28 +1,16 @@
package copystructure
import (
"errors"
"reflect"
"sync"
"github.com/mitchellh/reflectwalk"
)
// Copy returns a deep copy of v.
func Copy(v interface{}) (interface{}, error) {
w := new(walker)
err := reflectwalk.Walk(v, w)
if err != nil {
return nil, err
}
// Get the result. If the result is nil, then we want to turn it
// into a typed nil if we can.
result := w.Result
if result == nil {
val := reflect.ValueOf(v)
result = reflect.Indirect(reflect.New(val.Type())).Interface()
}
return result, nil
return Config{}.Copy(v)
}
// CopierFunc is a function that knows how to deep copy a specific type.
@@ -40,6 +28,68 @@ type CopierFunc func(interface{}) (interface{}, error)
// this map as well as to Copy in a mutex.
var Copiers map[reflect.Type]CopierFunc = make(map[reflect.Type]CopierFunc)
// Must is a helper that wraps a call to a function returning
// (interface{}, error) and panics if the error is non-nil. It is intended
// for use in variable initializations and should only be used when a copy
// error should be a crashing case.
func Must(v interface{}, err error) interface{} {
if err != nil {
panic("copy error: " + err.Error())
}
return v
}
var errPointerRequired = errors.New("Copy argument must be a pointer when Lock is true")
type Config struct {
// Lock any types that are a sync.Locker and are not a mutex while copying.
// If there is an RLocker method, use that to get the sync.Locker.
Lock bool
// Copiers is a map of types associated with a CopierFunc. Use the global
// Copiers map if this is nil.
Copiers map[reflect.Type]CopierFunc
}
func (c Config) Copy(v interface{}) (interface{}, error) {
if c.Lock && reflect.ValueOf(v).Kind() != reflect.Ptr {
return nil, errPointerRequired
}
w := new(walker)
if c.Lock {
w.useLocks = true
}
if c.Copiers == nil {
c.Copiers = Copiers
}
err := reflectwalk.Walk(v, w)
if err != nil {
return nil, err
}
// Get the result. If the result is nil, then we want to turn it
// into a typed nil if we can.
result := w.Result
if result == nil {
val := reflect.ValueOf(v)
result = reflect.Indirect(reflect.New(val.Type())).Interface()
}
return result, nil
}
// Return the key used to index interfaces types we've seen. Store the number
// of pointers in the upper 32bits, and the depth in the lower 32bits. This is
// easy to calculate, easy to match a key with our current depth, and we don't
// need to deal with initializing and cleaning up nested maps or slices.
func ifaceKey(pointers, depth int) uint64 {
return uint64(pointers)<<32 | uint64(depth)
}
type walker struct {
Result interface{}
@@ -47,15 +97,55 @@ type walker struct {
ignoreDepth int
vals []reflect.Value
cs []reflect.Value
ps []bool
// This stores the number of pointers we've walked over, indexed by depth.
ps []int
// If an interface is indirected by a pointer, we need to know the type of
// interface to create when creating the new value. Store the interface
// types here, indexed by both the walk depth and the number of pointers
// already seen at that depth. Use ifaceKey to calculate the proper uint64
// value.
ifaceTypes map[uint64]reflect.Type
// any locks we've taken, indexed by depth
locks []sync.Locker
// take locks while walking the structure
useLocks bool
}
func (w *walker) Enter(l reflectwalk.Location) error {
w.depth++
// ensure we have enough elements to index via w.depth
for w.depth >= len(w.locks) {
w.locks = append(w.locks, nil)
}
for len(w.ps) < w.depth+1 {
w.ps = append(w.ps, 0)
}
return nil
}
func (w *walker) Exit(l reflectwalk.Location) error {
locker := w.locks[w.depth]
w.locks[w.depth] = nil
if locker != nil {
defer locker.Unlock()
}
// clear out pointers and interfaces as we exit the stack
w.ps[w.depth] = 0
for k := range w.ifaceTypes {
mask := uint64(^uint32(0))
if k&mask == uint64(w.depth) {
delete(w.ifaceTypes, k)
}
}
w.depth--
if w.ignoreDepth > w.depth {
w.ignoreDepth = 0
@@ -66,9 +156,13 @@ func (w *walker) Exit(l reflectwalk.Location) error {
}
switch l {
case reflectwalk.Array:
fallthrough
case reflectwalk.Map:
fallthrough
case reflectwalk.Slice:
w.replacePointerMaybe()
// Pop map off our container
w.cs = w.cs[:len(w.cs)-1]
case reflectwalk.MapValue:
@@ -76,13 +170,36 @@ func (w *walker) Exit(l reflectwalk.Location) error {
mv := w.valPop()
mk := w.valPop()
m := w.cs[len(w.cs)-1]
m.SetMapIndex(mk, mv)
// If mv is the zero value, SetMapIndex deletes the key form the map,
// or in this case never adds it. We need to create a properly typed
// zero value so that this key can be set.
if !mv.IsValid() {
mv = reflect.Zero(m.Elem().Type().Elem())
}
m.Elem().SetMapIndex(mk, mv)
case reflectwalk.ArrayElem:
// Pop off the value and the index and set it on the array
v := w.valPop()
i := w.valPop().Interface().(int)
if v.IsValid() {
a := w.cs[len(w.cs)-1]
ae := a.Elem().Index(i) // storing array as pointer on stack - so need Elem() call
if ae.CanSet() {
ae.Set(v)
}
}
case reflectwalk.SliceElem:
// Pop off the value and the index and set it on the slice
v := w.valPop()
i := w.valPop().Interface().(int)
s := w.cs[len(w.cs)-1]
s.Index(i).Set(v)
if v.IsValid() {
s := w.cs[len(w.cs)-1]
se := s.Elem().Index(i)
if se.CanSet() {
se.Set(v)
}
}
case reflectwalk.Struct:
w.replacePointerMaybe()
@@ -95,7 +212,10 @@ func (w *walker) Exit(l reflectwalk.Location) error {
if v.IsValid() {
s := w.cs[len(w.cs)-1]
sf := reflect.Indirect(s).FieldByName(f.Name)
sf.Set(v)
if sf.CanSet() {
sf.Set(v)
}
}
case reflectwalk.WalkLoc:
// Clear out the slices for GC
@@ -110,17 +230,14 @@ func (w *walker) Map(m reflect.Value) error {
if w.ignoring() {
return nil
}
// Get the type for the map
t := m.Type()
mapType := reflect.MapOf(t.Key(), t.Elem())
w.lock(m)
// Create the map. If the map itself is nil, then just make a nil map
var newMap reflect.Value
if m.IsNil() {
newMap = reflect.Indirect(reflect.New(mapType))
newMap = reflect.New(m.Type())
} else {
newMap = reflect.MakeMap(reflect.MapOf(t.Key(), t.Elem()))
newMap = wrapPtr(reflect.MakeMap(m.Type()))
}
w.cs = append(w.cs, newMap)
@@ -133,20 +250,28 @@ func (w *walker) MapElem(m, k, v reflect.Value) error {
}
func (w *walker) PointerEnter(v bool) error {
if w.ignoring() {
return nil
if v {
w.ps[w.depth]++
}
w.ps = append(w.ps, v)
return nil
}
func (w *walker) PointerExit(bool) error {
if w.ignoring() {
func (w *walker) PointerExit(v bool) error {
if v {
w.ps[w.depth]--
}
return nil
}
func (w *walker) Interface(v reflect.Value) error {
if !v.IsValid() {
return nil
}
if w.ifaceTypes == nil {
w.ifaceTypes = make(map[uint64]reflect.Type)
}
w.ps = w.ps[:len(w.ps)-1]
w.ifaceTypes[ifaceKey(w.ps[w.depth], w.depth)] = v.Type()
return nil
}
@@ -154,13 +279,14 @@ func (w *walker) Primitive(v reflect.Value) error {
if w.ignoring() {
return nil
}
w.lock(v)
// IsValid verifies the v is non-zero and CanInterface verifies
// that we're allowed to read this value (unexported fields).
var newV reflect.Value
if v.IsValid() && v.CanInterface() {
newV = reflect.New(v.Type())
reflect.Indirect(newV).Set(v)
newV.Elem().Set(v)
}
w.valPush(newV)
@@ -172,12 +298,13 @@ func (w *walker) Slice(s reflect.Value) error {
if w.ignoring() {
return nil
}
w.lock(s)
var newS reflect.Value
if s.IsNil() {
newS = reflect.Indirect(reflect.New(s.Type()))
newS = reflect.New(s.Type())
} else {
newS = reflect.MakeSlice(s.Type(), s.Len(), s.Cap())
newS = wrapPtr(reflect.MakeSlice(s.Type(), s.Len(), s.Cap()))
}
w.cs = append(w.cs, newS)
@@ -197,10 +324,36 @@ func (w *walker) SliceElem(i int, elem reflect.Value) error {
return nil
}
func (w *walker) Array(a reflect.Value) error {
if w.ignoring() {
return nil
}
w.lock(a)
newA := reflect.New(a.Type())
w.cs = append(w.cs, newA)
w.valPush(newA)
return nil
}
func (w *walker) ArrayElem(i int, elem reflect.Value) error {
if w.ignoring() {
return nil
}
// We don't write the array here because elem might still be
// arbitrarily complex. Just record the index and continue on.
w.valPush(reflect.ValueOf(i))
return nil
}
func (w *walker) Struct(s reflect.Value) error {
if w.ignoring() {
return nil
}
w.lock(s)
var v reflect.Value
if c, ok := Copiers[s.Type()]; ok {
@@ -213,7 +366,10 @@ func (w *walker) Struct(s reflect.Value) error {
return err
}
v = reflect.ValueOf(dup)
// We need to put a pointer to the value on the value stack,
// so allocate a new pointer and set it.
v = reflect.New(s.Type())
reflect.Indirect(v).Set(reflect.ValueOf(dup))
} else {
// No copier, we copy ourselves and allow reflectwalk to guide
// us deeper into the structure for copying.
@@ -234,18 +390,29 @@ func (w *walker) StructField(f reflect.StructField, v reflect.Value) error {
return nil
}
// If PkgPath is non-empty, this is a private (unexported) field.
// We do not set this unexported since the Go runtime doesn't allow us.
if f.PkgPath != "" {
return reflectwalk.SkipEntry
}
// Push the field onto the stack, we'll handle it when we exit
// the struct field in Exit...
w.valPush(reflect.ValueOf(f))
return nil
}
// ignore causes the walker to ignore any more values until we exit this on
func (w *walker) ignore() {
w.ignoreDepth = w.depth
}
func (w *walker) ignoring() bool {
return w.ignoreDepth > 0 && w.depth >= w.ignoreDepth
}
func (w *walker) pointerPeek() bool {
return w.ps[len(w.ps)-1]
return w.ps[w.depth] > 0
}
func (w *walker) valPop() reflect.Value {
@@ -277,5 +444,105 @@ func (w *walker) replacePointerMaybe() {
// we need to push that onto the stack.
if !w.pointerPeek() {
w.valPush(reflect.Indirect(w.valPop()))
return
}
v := w.valPop()
// If the expected type is a pointer to an interface of any depth,
// such as *interface{}, **interface{}, etc., then we need to convert
// the value "v" from *CONCRETE to *interface{} so types match for
// Set.
//
// Example if v is type *Foo where Foo is a struct, v would become
// *interface{} instead. This only happens if we have an interface expectation
// at this depth.
//
// For more info, see GH-16
if iType, ok := w.ifaceTypes[ifaceKey(w.ps[w.depth], w.depth)]; ok && iType.Kind() == reflect.Interface {
y := reflect.New(iType) // Create *interface{}
y.Elem().Set(reflect.Indirect(v)) // Assign "Foo" to interface{} (dereferenced)
v = y // v is now typed *interface{} (where *v = Foo)
}
for i := 1; i < w.ps[w.depth]; i++ {
if iType, ok := w.ifaceTypes[ifaceKey(w.ps[w.depth]-i, w.depth)]; ok {
iface := reflect.New(iType).Elem()
iface.Set(v)
v = iface
}
p := reflect.New(v.Type())
p.Elem().Set(v)
v = p
}
w.valPush(v)
}
// if this value is a Locker, lock it and add it to the locks slice
func (w *walker) lock(v reflect.Value) {
if !w.useLocks {
return
}
if !v.IsValid() || !v.CanInterface() {
return
}
type rlocker interface {
RLocker() sync.Locker
}
var locker sync.Locker
// We can't call Interface() on a value directly, since that requires
// a copy. This is OK, since the pointer to a value which is a sync.Locker
// is also a sync.Locker.
if v.Kind() == reflect.Ptr {
switch l := v.Interface().(type) {
case rlocker:
// don't lock a mutex directly
if _, ok := l.(*sync.RWMutex); !ok {
locker = l.RLocker()
}
case sync.Locker:
locker = l
}
} else if v.CanAddr() {
switch l := v.Addr().Interface().(type) {
case rlocker:
// don't lock a mutex directly
if _, ok := l.(*sync.RWMutex); !ok {
locker = l.RLocker()
}
case sync.Locker:
locker = l
}
}
// still no callable locker
if locker == nil {
return
}
// don't lock a mutex directly
switch locker.(type) {
case *sync.Mutex, *sync.RWMutex:
return
}
locker.Lock()
w.locks[w.depth] = locker
}
// wrapPtr is a helper that takes v and always make it *v. copystructure
// stores things internally as pointers until the last moment before unwrapping
func wrapPtr(v reflect.Value) reflect.Value {
if !v.IsValid() {
return v
}
vPtr := reflect.New(v.Type())
vPtr.Elem().Set(v)
return vPtr
}

View File

@@ -11,6 +11,8 @@ const (
MapValue
Slice
SliceElem
Array
ArrayElem
Struct
StructField
WalkLoc

View File

@@ -5,6 +5,7 @@
package reflectwalk
import (
"errors"
"reflect"
)
@@ -18,6 +19,12 @@ type PrimitiveWalker interface {
Primitive(reflect.Value) error
}
// InterfaceWalker implementations are able to handle interface values as they
// are encountered during the walk.
type InterfaceWalker interface {
Interface(reflect.Value) error
}
// MapWalker implementations are able to handle individual elements
// found within a map structure.
type MapWalker interface {
@@ -32,6 +39,13 @@ type SliceWalker interface {
SliceElem(int, reflect.Value) error
}
// ArrayWalker implementations are able to handle array elements found
// within complex structures.
type ArrayWalker interface {
Array(reflect.Value) error
ArrayElem(int, reflect.Value) error
}
// StructWalker is an interface that has methods that are called for
// structs when a Walk is done.
type StructWalker interface {
@@ -55,6 +69,14 @@ type PointerWalker interface {
PointerExit(bool) error
}
// SkipEntry can be returned from walk functions to skip walking
// the value of this field. This is only valid in the following functions:
//
// - Struct: skips all fields from being walked
// - StructField: skips walking the struct value
//
var SkipEntry = errors.New("skip this entry")
// Walk takes an arbitrary value and an interface and traverses the
// value, calling callbacks on the interface if they are supported.
// The interface should implement one or more of the walker interfaces
@@ -79,23 +101,63 @@ func Walk(data, walker interface{}) (err error) {
func walk(v reflect.Value, w interface{}) (err error) {
// Determine if we're receiving a pointer and if so notify the walker.
// The logic here is convoluted but very important (tests will fail if
// almost any part is changed). I will try to explain here.
//
// First, we check if the value is an interface, if so, we really need
// to check the interface's VALUE to see whether it is a pointer.
//
// Check whether the value is then a pointer. If so, then set pointer
// to true to notify the user.
//
// If we still have a pointer or an interface after the indirections, then
// we unwrap another level
//
// At this time, we also set "v" to be the dereferenced value. This is
// because once we've unwrapped the pointer we want to use that value.
pointer := false
if v.Kind() == reflect.Ptr {
pointer = true
v = reflect.Indirect(v)
}
if pw, ok := w.(PointerWalker); ok {
if err = pw.PointerEnter(pointer); err != nil {
return
pointerV := v
for {
if pointerV.Kind() == reflect.Interface {
if iw, ok := w.(InterfaceWalker); ok {
if err = iw.Interface(pointerV); err != nil {
return
}
}
pointerV = pointerV.Elem()
}
defer func() {
if err != nil {
if pointerV.Kind() == reflect.Ptr {
pointer = true
v = reflect.Indirect(pointerV)
}
if pw, ok := w.(PointerWalker); ok {
if err = pw.PointerEnter(pointer); err != nil {
return
}
err = pw.PointerExit(pointer)
}()
defer func(pointer bool) {
if err != nil {
return
}
err = pw.PointerExit(pointer)
}(pointer)
}
if pointer {
pointerV = v
}
pointer = false
// If we still have a pointer or interface we have to indirect another level.
switch pointerV.Kind() {
case reflect.Ptr, reflect.Interface:
continue
}
break
}
// We preserve the original value here because if it is an interface
@@ -125,6 +187,9 @@ func walk(v reflect.Value, w interface{}) (err error) {
case reflect.Struct:
err = walkStruct(v, w)
return
case reflect.Array:
err = walkArray(v, w)
return
default:
panic("unsupported type: " + k.String())
}
@@ -232,42 +297,99 @@ func walkSlice(v reflect.Value, w interface{}) (err error) {
return nil
}
func walkArray(v reflect.Value, w interface{}) (err error) {
ew, ok := w.(EnterExitWalker)
if ok {
ew.Enter(Array)
}
if aw, ok := w.(ArrayWalker); ok {
if err := aw.Array(v); err != nil {
return err
}
}
for i := 0; i < v.Len(); i++ {
elem := v.Index(i)
if aw, ok := w.(ArrayWalker); ok {
if err := aw.ArrayElem(i, elem); err != nil {
return err
}
}
ew, ok := w.(EnterExitWalker)
if ok {
ew.Enter(ArrayElem)
}
if err := walk(elem, w); err != nil {
return err
}
if ok {
ew.Exit(ArrayElem)
}
}
ew, ok = w.(EnterExitWalker)
if ok {
ew.Exit(Array)
}
return nil
}
func walkStruct(v reflect.Value, w interface{}) (err error) {
ew, ewok := w.(EnterExitWalker)
if ewok {
ew.Enter(Struct)
}
skip := false
if sw, ok := w.(StructWalker); ok {
if err = sw.Struct(v); err != nil {
err = sw.Struct(v)
if err == SkipEntry {
skip = true
err = nil
}
if err != nil {
return
}
}
vt := v.Type()
for i := 0; i < vt.NumField(); i++ {
sf := vt.Field(i)
f := v.FieldByIndex([]int{i})
if !skip {
vt := v.Type()
for i := 0; i < vt.NumField(); i++ {
sf := vt.Field(i)
f := v.FieldByIndex([]int{i})
if sw, ok := w.(StructWalker); ok {
err = sw.StructField(sf, f)
if sw, ok := w.(StructWalker); ok {
err = sw.StructField(sf, f)
// SkipEntry just pretends this field doesn't even exist
if err == SkipEntry {
continue
}
if err != nil {
return
}
}
ew, ok := w.(EnterExitWalker)
if ok {
ew.Enter(StructField)
}
err = walk(f, w)
if err != nil {
return
}
}
ew, ok := w.(EnterExitWalker)
if ok {
ew.Enter(StructField)
}
err = walk(f, w)
if err != nil {
return
}
if ok {
ew.Exit(StructField)
if ok {
ew.Exit(StructField)
}
}
}

8
vendor/vendor.json vendored
View File

@@ -972,8 +972,10 @@
"revisionTime": "2015-09-17T21:48:07Z"
},
{
"checksumSHA1": "+p4JY4wmFQAppCdlrJ8Kxybmht8=",
"path": "github.com/mitchellh/copystructure",
"revision": "80adcec1955ee4e97af357c30dee61aadcc02c10"
"revision": "d23ffcb85de31694d6ccaa23ccb4a03e55c1303f",
"revisionTime": "2017-05-25T01:39:02Z"
},
{
"checksumSHA1": "AXacfEchaUqT5RGmPmMXsOWRhv8=",
@@ -1002,8 +1004,10 @@
"revision": "281073eb9eb092240d33ef253c404f1cca550309"
},
{
"checksumSHA1": "KqsMqI+Y+3EFYPhyzafpIneaVCM=",
"path": "github.com/mitchellh/reflectwalk",
"revision": "eecf4c70c626c7cfbb95c90195bc34d386c74ac6"
"revision": "8d802ff4ae93611b807597f639c19f76074df5c6",
"revisionTime": "2017-05-08T17:38:06Z"
},
{
"checksumSHA1": "NTperEHVh1uBqfTy9+oKceN4tKI=",