scheduler: updating for new APIs

This commit is contained in:
Armon Dadgar
2015-08-25 17:06:06 -07:00
parent 9a917281af
commit 69a3076d87
9 changed files with 74 additions and 59 deletions

View File

@@ -959,10 +959,14 @@ type Plan struct {
FailedAllocs []*Allocation
}
func (p *Plan) AppendEvict(alloc *Allocation) {
func (p *Plan) AppendUpdate(alloc *Allocation, status, desc string) {
newAlloc := new(Allocation)
*newAlloc = *alloc
newAlloc.DesiredStatus = status
newAlloc.DesiredDescription = desc
node := alloc.NodeID
existing := p.NodeUpdate[node]
p.NodeUpdate[node] = append(existing, alloc)
p.NodeUpdate[node] = append(existing, newAlloc)
}
func (p *Plan) AppendAlloc(alloc *Allocation) {

View File

@@ -85,8 +85,8 @@ func (e *EvalContext) ProposedAllocs(nodeID string) ([]*structs.Allocation, erro
// Determine the proposed allocation by first removing allocations
// that are planned evictions and adding the new allocations.
proposed := existingAlloc
if evict := e.plan.NodeEvict[nodeID]; len(evict) > 0 {
proposed = structs.RemoveAllocs(existingAlloc, evict)
if update := e.plan.NodeUpdate[nodeID]; len(update) > 0 {
proposed = structs.RemoveAllocs(existingAlloc, update)
}
proposed = append(proposed, e.plan.NodeAllocation[nodeID]...)

View File

@@ -16,7 +16,7 @@ func testContext(t *testing.T) (*state.StateStore, *EvalContext) {
t.Fatalf("err: %v", err)
}
plan := &structs.Plan{
NodeEvict: make(map[string][]string),
NodeUpdate: make(map[string][]*structs.Allocation),
NodeAllocation: make(map[string][]*structs.Allocation),
}
@@ -61,7 +61,7 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
CPU: 2048,
MemoryMB: 2048,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
alloc2 := &structs.Allocation{
ID: mock.GenerateUUID(),
@@ -72,13 +72,13 @@ func TestEvalContext_ProposedAlloc(t *testing.T) {
CPU: 1024,
MemoryMB: 1024,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
noErr(t, state.UpdateAllocations(1000, nil, []*structs.Allocation{alloc1, alloc2}))
noErr(t, state.UpdateAllocations(1000, []*structs.Allocation{alloc1, alloc2}))
// Add a planned eviction to alloc1
plan := ctx.Plan()
plan.NodeEvict[nodes[0].Node.ID] = []string{alloc1.ID}
plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1}
// Add a planned placement to node1
plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{

View File

@@ -16,6 +16,15 @@ const (
// maxBatchScheduleAttempts is used to limit the number of times
// we will attempt to schedule if we continue to hit conflicts for batch.
maxBatchScheduleAttempts = 2
// allocNotNeeded is the status used when a job no longer requires an allocation
allocNotNeeded = "alloc not needed due to job update"
// allocMigrating is the status used when we must migrate an allocation
allocMigrating = "alloc is being migrated"
// allocUpdating is the status used when a job requires an update
allocUpdating = "alloc is being updated due to job update"
)
// SetStatusError is used to set the status of the evaluation to the given error
@@ -181,22 +190,22 @@ func (s *GenericScheduler) computeJobAllocs(job *structs.Job) error {
diff := diffAllocs(job, tainted, groups, allocs)
s.logger.Printf("[DEBUG] sched: %#v: %#v", s.eval, diff)
// Add all the evicts
for _, e := range diff.evict {
s.plan.AppendEvict(e.Alloc)
// Add all the allocs to stop
for _, e := range diff.stop {
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocNotNeeded)
}
// For simplicity, we treat all migrates as an evict + place.
// XXX: This could probably be done more intelligently?
for _, e := range diff.migrate {
s.plan.AppendEvict(e.Alloc)
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocMigrating)
}
diff.place = append(diff.place, diff.migrate...)
// For simplicity, we treat all updates as an evict + place.
// XXX: This should be done with rolling in-place updates instead.
for _, e := range diff.update {
s.plan.AppendEvict(e.Alloc)
s.plan.AppendUpdate(e.Alloc, structs.AllocDesiredStatusStop, allocUpdating)
}
diff.place = append(diff.place, diff.update...)
@@ -237,28 +246,31 @@ func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTupl
option, size := stack.Select(missing.TaskGroup)
// Handle a placement failure
var nodeID, status, desc string
var nodeID, status, desc, clientStatus string
if option == nil {
status = structs.AllocStatusFailed
status = structs.AllocDesiredStatusFailed
desc = "failed to find a node for placement"
clientStatus = structs.AllocClientStatusFailed
} else {
nodeID = option.Node.ID
status = structs.AllocStatusPending
status = structs.AllocDesiredStatusRun
clientStatus = structs.AllocClientStatusPending
}
// Create an allocation for this
alloc := &structs.Allocation{
ID: mock.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
NodeID: nodeID,
JobID: job.ID,
Job: job,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: ctx.Metrics(),
Status: status,
StatusDescription: desc,
ID: mock.GenerateUUID(),
EvalID: s.eval.ID,
Name: missing.Name,
NodeID: nodeID,
JobID: job.ID,
Job: job,
TaskGroup: missing.TaskGroup.Name,
Resources: size,
Metrics: ctx.Metrics(),
DesiredStatus: status,
DesiredDescription: desc,
ClientStatus: clientStatus,
}
if nodeID != "" {
s.plan.AppendAlloc(alloc)

View File

@@ -134,7 +134,7 @@ func TestServiceSched_JobModify(t *testing.T) {
alloc.NodeID = nodes[i].ID
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpdateAllocations(h.NextIndex(), nil, allocs))
noErr(t, h.State.UpdateAllocations(h.NextIndex(), allocs))
// Update the job
job2 := mock.Job()
@@ -162,11 +162,11 @@ func TestServiceSched_JobModify(t *testing.T) {
plan := h.Plans[0]
// Ensure the plan evicted all allocs
var evict []string
for _, evictList := range plan.NodeEvict {
evict = append(evict, evictList...)
var update []*structs.Allocation
for _, updateList := range plan.NodeUpdate {
update = append(update, updateList...)
}
if len(evict) != len(allocs) {
if len(update) != len(allocs) {
t.Fatalf("bad: %#v", plan)
}
@@ -186,7 +186,7 @@ func TestServiceSched_JobModify(t *testing.T) {
// Ensure all allocations placed
out = structs.FilterTerminalAllocs(out)
if len(out) != 10 {
t.Fatalf("bad: %d %#v", out)
t.Fatalf("bad: %#v", out)
}
h.AssertEvalStatus(t, structs.EvalStatusComplete)
@@ -205,7 +205,7 @@ func TestServiceSched_JobDeregister(t *testing.T) {
alloc.JobID = job.ID
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpdateAllocations(h.NextIndex(), nil, allocs))
noErr(t, h.State.UpdateAllocations(h.NextIndex(), allocs))
// Create a mock evaluation to deregister the job
eval := &structs.Evaluation{
@@ -228,7 +228,7 @@ func TestServiceSched_JobDeregister(t *testing.T) {
plan := h.Plans[0]
// Ensure the plan evicted all nodes
if len(plan.NodeEvict["foo"]) != len(allocs) {
if len(plan.NodeUpdate["foo"]) != len(allocs) {
t.Fatalf("bad: %#v", plan)
}
@@ -271,7 +271,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
alloc.NodeID = node.ID
allocs = append(allocs, alloc)
}
noErr(t, h.State.UpdateAllocations(h.NextIndex(), nil, allocs))
noErr(t, h.State.UpdateAllocations(h.NextIndex(), allocs))
// Create a mock evaluation to deal with drain
eval := &structs.Evaluation{
@@ -295,7 +295,7 @@ func TestServiceSched_NodeDrain(t *testing.T) {
plan := h.Plans[0]
// Ensure the plan evicted all allocs
if len(plan.NodeEvict[node.ID]) != len(allocs) {
if len(plan.NodeUpdate[node.ID]) != len(allocs) {
t.Fatalf("bad: %#v", plan)
}

View File

@@ -192,7 +192,7 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
CPU: 2048,
MemoryMB: 2048,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
alloc2 := &structs.Allocation{
ID: mock.GenerateUUID(),
@@ -203,9 +203,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
CPU: 1024,
MemoryMB: 1024,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
noErr(t, state.UpdateAllocations(1000, nil, []*structs.Allocation{alloc1, alloc2}))
noErr(t, state.UpdateAllocations(1000, []*structs.Allocation{alloc1, alloc2}))
resources := &structs.Resources{
CPU: 1024,
@@ -261,7 +261,7 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
CPU: 2048,
MemoryMB: 2048,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
alloc2 := &structs.Allocation{
ID: mock.GenerateUUID(),
@@ -272,13 +272,13 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
CPU: 1024,
MemoryMB: 1024,
},
Status: structs.AllocStatusPending,
DesiredStatus: structs.AllocDesiredStatusRun,
}
noErr(t, state.UpdateAllocations(1000, nil, []*structs.Allocation{alloc1, alloc2}))
noErr(t, state.UpdateAllocations(1000, []*structs.Allocation{alloc1, alloc2}))
// Add a planned eviction to alloc1
plan := ctx.Plan()
plan.NodeEvict[nodes[0].Node.ID] = []string{alloc1.ID}
plan.NodeUpdate[nodes[0].Node.ID] = []*structs.Allocation{alloc1}
resources := &structs.Resources{
CPU: 1024,

View File

@@ -74,15 +74,14 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
// Prepare the result
result := new(structs.PlanResult)
result.NodeEvict = plan.NodeEvict
result.NodeUpdate = plan.NodeUpdate
result.NodeAllocation = plan.NodeAllocation
result.AllocIndex = index
// Flatten evicts and allocs
var evicts []string
var allocs []*structs.Allocation
for _, evictList := range plan.NodeEvict {
evicts = append(evicts, evictList...)
for _, updateList := range plan.NodeUpdate {
allocs = append(allocs, updateList...)
}
for _, allocList := range plan.NodeAllocation {
allocs = append(allocs, allocList...)
@@ -90,7 +89,7 @@ func (h *Harness) SubmitPlan(plan *structs.Plan) (*structs.PlanResult, State, er
allocs = append(allocs, plan.FailedAllocs...)
// Apply the full plan
err := h.State.UpdateAllocations(index, evicts, allocs)
err := h.State.UpdateAllocations(index, allocs)
return result, nil, err
}

View File

@@ -28,12 +28,12 @@ func materializeTaskGroups(job *structs.Job) map[string]*structs.TaskGroup {
// diffResult is used to return the sets that result from the diff
type diffResult struct {
place, update, migrate, evict, ignore []allocTuple
place, update, migrate, stop, ignore []allocTuple
}
func (d *diffResult) GoString() string {
return fmt.Sprintf("allocs: (place %d) (update %d) (migrate %d) (evict %d) (ignore %d)",
len(d.place), len(d.update), len(d.migrate), len(d.evict), len(d.ignore))
return fmt.Sprintf("allocs: (place %d) (update %d) (stop %d) (evict %d) (ignore %d)",
len(d.place), len(d.update), len(d.migrate), len(d.stop), len(d.ignore))
}
// diffAllocs is used to do a set difference between the target allocations
@@ -56,9 +56,9 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]bool,
// Check for the definition in the required set
tg, ok := required[name]
// If not required, we evict
// If not required, we stop the alloc
if !ok {
result.evict = append(result.evict, allocTuple{
result.stop = append(result.stop, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,

View File

@@ -79,7 +79,7 @@ func TestDiffAllocs(t *testing.T) {
place := diff.place
update := diff.update
migrate := diff.migrate
evict := diff.evict
stop := diff.stop
ignore := diff.ignore
// We should update the first alloc
@@ -92,9 +92,9 @@ func TestDiffAllocs(t *testing.T) {
t.Fatalf("bad: %#v", ignore)
}
// We should evict the 3rd alloc
if len(evict) != 1 || evict[0].Alloc != allocs[2] {
t.Fatalf("bad: %#v", evict)
// We should stop the 3rd alloc
if len(stop) != 1 || stop[0].Alloc != allocs[2] {
t.Fatalf("bad: %#v", stop)
}
// We should migrate the 4rd alloc