Reschedule previous allocs and track their reschedule attempts

This commit is contained in:
Preetha Appan
2018-01-14 16:47:21 -06:00
parent b2f2e28940
commit 5ecb7895bb
11 changed files with 496 additions and 60 deletions

View File

@@ -91,6 +91,10 @@ func Job() *structs.Job {
Delay: 1 * time.Minute,
Mode: structs.RestartPolicyModeDelay,
},
ReschedulePolicy: &structs.ReschedulePolicy{
Attempts: 2,
Interval: 10 * time.Minute,
},
Tasks: []*structs.Task{
{
Name: "web",

View File

@@ -114,7 +114,7 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
case structs.EvalTriggerJobRegister, structs.EvalTriggerNodeUpdate,
structs.EvalTriggerJobDeregister, structs.EvalTriggerRollingUpdate,
structs.EvalTriggerPeriodicJob, structs.EvalTriggerMaxPlans,
structs.EvalTriggerDeploymentWatcher:
structs.EvalTriggerDeploymentWatcher, structs.EvalTriggerRetryFailedAlloc:
default:
desc := fmt.Sprintf("scheduler cannot handle '%s' evaluation reason",
eval.TriggeredBy)
@@ -356,9 +356,6 @@ func (s *GenericScheduler) computeJobAllocs() error {
// nodes to lost
updateNonTerminalAllocsToLost(s.plan, tainted, allocs)
// Filter out the allocations in a terminal state
allocs = s.filterCompleteAllocs(allocs)
reconciler := NewAllocReconciler(s.ctx.Logger(),
genericAllocUpdateFn(s.ctx, s.stack, s.eval.ID),
s.batch, s.eval.JobID, s.job, s.deployment, allocs, tainted)
@@ -471,17 +468,30 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// stop the allocation before trying to find a replacement because this
// frees the resources currently used by the previous allocation.
stopPrevAlloc, stopPrevAllocDesc := missing.StopPreviousAlloc()
prevAllocation := missing.PreviousAllocation()
if stopPrevAlloc {
s.plan.AppendUpdate(missing.PreviousAllocation(), structs.AllocDesiredStatusStop, stopPrevAllocDesc, "")
s.plan.AppendUpdate(prevAllocation, structs.AllocDesiredStatusStop, stopPrevAllocDesc, "")
}
// Setup node weights for replacement allocations
selectOptions := &SelectOptions{}
if prevAllocation != nil {
var penaltyNodes []string
penaltyNodes = append(penaltyNodes, prevAllocation.NodeID)
if prevAllocation.RescheduleTrackers != nil {
for _, reschedTracker := range prevAllocation.RescheduleTrackers {
penaltyNodes = append(penaltyNodes, reschedTracker.PrevNodeID)
}
}
selectOptions.PenaltyNodeIDs = penaltyNodes
}
// Attempt to match the task group
var option *RankedNode
if preferredNode != nil {
option, _ = s.stack.SelectPreferringNodes(tg, []*structs.Node{preferredNode})
} else {
option, _ = s.stack.Select(tg)
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
}
option, _ = s.stack.Select(tg, selectOptions)
// Store the available nodes by datacenter
s.ctx.Metrics().NodesAvailable = byDC
@@ -510,8 +520,16 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// If the new allocation is replacing an older allocation then we
// set the record the older allocation id so that they are chained
if prev := missing.PreviousAllocation(); prev != nil {
if prev := prevAllocation; prev != nil {
alloc.PreviousAllocation = prev.ID
var rescheduleTrackers []*structs.RescheduleTracker
if prev.RescheduleTrackers != nil {
for _, reschedInfo := range prev.RescheduleTrackers {
rescheduleTrackers = append(rescheduleTrackers, reschedInfo.Copy())
}
}
rescheduleTrackers = append(rescheduleTrackers, &structs.RescheduleTracker{RescheduleTime: time.Now().UTC().UnixNano(), PrevAllocID: prev.ID, PrevNodeID: alloc.NodeID})
alloc.RescheduleTrackers = rescheduleTrackers
}
// If we are placing a canary and we found a match, add the canary
@@ -537,7 +555,7 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// If we weren't able to find a replacement for the allocation, back
// out the fact that we asked to stop the allocation.
if stopPrevAlloc {
s.plan.PopUpdate(missing.PreviousAllocation())
s.plan.PopUpdate(prevAllocation)
}
}
}

View File

@@ -2467,6 +2467,16 @@ func TestServiceSched_NodeDrain_Down(t *testing.T) {
var complete []*structs.Allocation
for i := 6; i < 10; i++ {
newAlloc := stop[i].Copy()
newAlloc.TaskStates = make(map[string]*structs.TaskState)
newAlloc.TaskStates["web"] = &structs.TaskState{
State: structs.TaskStateDead,
Events: []*structs.TaskEvent{
{
Type: structs.TaskTerminated,
ExitCode: 0,
},
},
}
newAlloc.ClientStatus = structs.AllocClientStatusComplete
complete = append(complete, newAlloc)
}
@@ -2705,6 +2715,220 @@ func TestServiceSched_RetryLimit(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusFailed)
}
func TestServiceSched_Reschedule_Once(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: 1,
Interval: 15 * time.Minute,
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
failedAllocID := allocs[1].ID
successAllocID := allocs[0].ID
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
if err != nil {
t.Fatalf("err: %v", err)
}
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify that one new allocation got created with its restart tracker info
assert := assert.New(t)
assert.Equal(3, len(out))
var newAlloc *structs.Allocation
for _, alloc := range out {
if alloc.ID != successAllocID && alloc.ID != failedAllocID {
newAlloc = alloc
}
}
assert.Equal(failedAllocID, newAlloc.PreviousAllocation)
assert.Equal(1, len(newAlloc.RescheduleTrackers))
assert.Equal(failedAllocID, newAlloc.RescheduleTrackers[0].PrevAllocID)
// Mark this alloc as failed again, should not get rescheduled
newAlloc.ClientStatus = structs.AllocClientStatusFailed
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{newAlloc}))
// Create another mock evaluation
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err = h.Process(NewServiceScheduler, eval)
assert.Nil(err)
// Verify no new allocs were created this time
out, err = h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
assert.Equal(3, len(out))
}
func TestServiceSched_Reschedule_Multiple(t *testing.T) {
h := NewHarness(t)
// Create some nodes
var nodes []*structs.Node
for i := 0; i < 10; i++ {
node := mock.Node()
nodes = append(nodes, node)
noErr(t, h.State.UpsertNode(h.NextIndex(), node))
}
maxRestartAttempts := 3
// Generate a fake job with allocations and an update policy.
job := mock.Job()
job.TaskGroups[0].Count = 2
job.TaskGroups[0].ReschedulePolicy = &structs.ReschedulePolicy{
Attempts: maxRestartAttempts,
Interval: 30 * time.Minute,
}
noErr(t, h.State.UpsertJob(h.NextIndex(), job))
var allocs []*structs.Allocation
for i := 0; i < 2; i++ {
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.Job = job
alloc.JobID = job.ID
alloc.NodeID = nodes[i].ID
alloc.Name = fmt.Sprintf("my-job.web[%d]", i)
allocs = append(allocs, alloc)
}
// Mark one of the allocations as failed
allocs[1].ClientStatus = structs.AllocClientStatusFailed
noErr(t, h.State.UpsertAllocs(h.NextIndex(), allocs))
// Create a mock evaluation
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
expectedNumAllocs := 3
expectedNumReschedTrackers := 1
assert := assert.New(t)
for i := 0; i < maxRestartAttempts; i++ {
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
noErr(t, err)
// Ensure multiple plans
if len(h.Plans) == 0 {
t.Fatalf("bad: %#v", h.Plans)
}
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
// Verify that a new allocation got created with its restart tracker info
assert.Equal(expectedNumAllocs, len(out))
// Find the new alloc with ClientStatusPending
var pendingAllocs []*structs.Allocation
fmt.Println("Iteration: ", i)
for _, alloc := range out {
fmt.Println(alloc.ID, alloc.ClientStatus, len(alloc.RescheduleTrackers), alloc.PreviousAllocation)
if alloc.ClientStatus == structs.AllocClientStatusPending {
pendingAllocs = append(pendingAllocs, alloc)
}
}
assert.Equal(1, len(pendingAllocs))
newAlloc := pendingAllocs[0]
assert.Equal(expectedNumReschedTrackers, len(newAlloc.RescheduleTrackers))
// Mark this alloc as failed again
newAlloc.ClientStatus = structs.AllocClientStatusFailed
noErr(t, h.State.UpsertAllocs(h.NextIndex(), []*structs.Allocation{newAlloc}))
// Create another mock evaluation
eval = &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: 50,
TriggeredBy: structs.EvalTriggerNodeUpdate,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
expectedNumAllocs += 1
expectedNumReschedTrackers += 1
}
// Process last eval again, should not reschedule
err := h.Process(NewServiceScheduler, eval)
assert.Nil(err)
// Verify no new allocs were created because restart attempts were exhausted
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
noErr(t, err)
assert.Equal(5, len(out)) // 2 original, plus 3 reschedule attempts
}
func TestBatchSched_Run_CompleteAlloc(t *testing.T) {
h := NewHarness(t)

View File

@@ -304,3 +304,52 @@ func (iter *JobAntiAffinityIterator) Next() *RankedNode {
func (iter *JobAntiAffinityIterator) Reset() {
iter.source.Reset()
}
// NodeAntiAffinityIterator is used to apply a penalty to
// a node that had a previous failed allocation for the same job.
// This is used when attempting to reschedule a failed alloc
type NodeAntiAffinityIterator struct {
ctx Context
source RankIterator
penalty float64
penaltyNodes map[string]struct{}
}
// NewNodeAntiAffinityIterator is used to create a NodeAntiAffinityIterator that
// applies the given penalty for placement onto nodes in penaltyNodes
func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float64) *NodeAntiAffinityIterator {
iter := &NodeAntiAffinityIterator{
ctx: ctx,
source: source,
penalty: penalty,
}
return iter
}
func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(nodes []string) {
penaltyNodes := make(map[string]struct{})
for _, node := range nodes {
penaltyNodes[node] = struct{}{}
}
iter.penaltyNodes = penaltyNodes
}
func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
for {
option := iter.source.Next()
if option == nil {
return nil
}
_, ok := iter.penaltyNodes[option.Node.ID]
if ok {
option.Score -= iter.penalty
iter.ctx.Metrics().ScoreNode(option.Node, "node-anti-affinity", iter.penalty)
}
return option
}
}
func (iter *NodeAntiAffinityIterator) Reset() {
iter.source.Reset()
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
)
func TestFeasibleRankIterator(t *testing.T) {
@@ -429,3 +430,36 @@ func collectRanked(iter RankIterator) (out []*RankedNode) {
}
return
}
func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) {
_, ctx := testContext(t)
node1 := &structs.Node{
ID: uuid.Generate(),
}
node2 := &structs.Node{
ID: uuid.Generate(),
}
nodes := []*RankedNode{
{
Node: node1,
},
{
Node: node2,
},
}
static := NewStaticRankIterator(ctx, nodes)
nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0)
nodeAntiAffIter.SetPenaltyNodes([]string{node1.ID})
out := collectRanked(nodeAntiAffIter)
assert := assert.New(t)
assert.Equal(2, len(out))
assert.Equal(node1.ID, out[0].Node.ID)
assert.Equal(-50.0, out[0].Score)
assert.Equal(node2.ID, out[1].Node.ID)
assert.Equal(0.0, out[1].Score)
}

View File

@@ -305,9 +305,11 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// Determine what set of allocations are on tainted nodes
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
untainted, reschedule := untainted.filterByRescheduleable(a.batch, tg.ReschedulePolicy)
// Create a structure for choosing names. Seed with the taken names which is
// the union of untainted and migrating nodes (includes canaries)
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate))
nameIndex := newAllocNameIndex(a.jobID, group, tg.Count, untainted.union(migrate, reschedule))
// Stop any unneeded allocations and update the untainted set to not
// included stopped allocations.
@@ -364,7 +366,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
// * The deployment is not paused or failed
// * Not placing any canaries
// * If there are any canaries that they have been promoted
place := a.computePlacements(tg, nameIndex, untainted, migrate)
place := a.computePlacements(tg, nameIndex, untainted, migrate, reschedule)
if !existingDeployment {
dstate.DesiredTotal += len(place)
}
@@ -610,20 +612,34 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
// computePlacement returns the set of allocations to place given the group
// definition, the set of untainted and migrating allocations for the group.
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
nameIndex *allocNameIndex, untainted, migrate allocSet) []allocPlaceResult {
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
// Hot path the nothing to do case
existing := len(untainted) + len(migrate)
if existing >= group.Count {
return nil
}
var place []allocPlaceResult
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
// add rescheduled alloc placement results
for _, alloc := range reschedule {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
name: alloc.Name,
taskGroup: group,
previousAlloc: alloc,
})
existing += 1
if existing == group.Count {
break
}
}
// add remaining
if existing < group.Count {
for _, name := range nameIndex.Next(uint(group.Count - existing)) {
place = append(place, allocPlaceResult{
name: name,
taskGroup: group,
})
}
}
return place
@@ -700,6 +716,9 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
removeNames := nameIndex.Highest(uint(remove))
for id, alloc := range untainted {
if _, ok := removeNames[alloc.Name]; ok {
if alloc.TerminalStatus() {
continue
}
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,
@@ -717,6 +736,9 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
// It is possible that we didn't stop as many as we should have if there
// were allocations with duplicate names.
for id, alloc := range untainted {
if alloc.TerminalStatus() {
continue
}
stop[id] = alloc
a.result.stop = append(a.result.stop, allocStopResult{
alloc: alloc,

View File

@@ -206,16 +206,72 @@ func (a allocSet) filterByTainted(nodes map[string]*structs.Node) (untainted, mi
untainted[alloc.ID] = alloc
continue
}
if n == nil || n.TerminalStatus() {
lost[alloc.ID] = alloc
if !alloc.TerminalStatus() {
if n == nil || n.TerminalStatus() {
lost[alloc.ID] = alloc
} else {
migrate[alloc.ID] = alloc
}
} else {
migrate[alloc.ID] = alloc
untainted[alloc.ID] = alloc
}
}
return
}
// filterByRescheduleable filters the allocation set to return the set of allocations that are either
// terminal or running, and a set of allocations that must be rescheduled
func (a allocSet) filterByRescheduleable(isBatch bool, reschedulePolicy *structs.ReschedulePolicy) (untainted, reschedule allocSet) {
untainted = make(map[string]*structs.Allocation)
reschedule = make(map[string]*structs.Allocation)
rescheduledPrevAllocs := make(map[string]struct{}) // Track previous allocs from any restart trackers
for _, alloc := range a {
if isBatch {
// Allocs from batch jobs should be filtered when the desired status
// is terminal and the client did not finish or when the client
// status is failed so that they will be replaced. If they are
// complete but not failed, they shouldn't be replaced.
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
if alloc.RanSuccessfully() {
untainted[alloc.ID] = alloc
}
continue
default:
}
if alloc.ShouldReschedule(reschedulePolicy) {
reschedule[alloc.ID] = alloc
} else {
untainted[alloc.ID] = alloc
}
} else {
// ignore allocs whose desired state is stop/evict
// everything else is either rescheduleable or untainted
if alloc.ShouldReschedule(reschedulePolicy) {
reschedule[alloc.ID] = alloc
} else if alloc.DesiredStatus != structs.AllocDesiredStatusStop && alloc.DesiredStatus != structs.AllocDesiredStatusEvict {
untainted[alloc.ID] = alloc
}
}
}
// Find allocs that exist in restart trackers from other allocs
for _, alloc := range reschedule {
if alloc.RescheduleTrackers != nil {
for _, reschedTrack := range alloc.RescheduleTrackers {
rescheduledPrevAllocs[reschedTrack.PrevAllocID] = struct{}{}
}
}
}
// Delete these from rescheduleable allocs
for allocId, _ := range rescheduledPrevAllocs {
delete(reschedule, allocId)
}
return
}
// filterByDeployment filters allocations into two sets, those that match the
// given deployment ID and those that don't
func (a allocSet) filterByDeployment(id string) (match, nonmatch allocSet) {

View File

@@ -16,6 +16,9 @@ const (
// batchJobAntiAffinityPenalty is the same as the
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 10.0
// previousFailedAllocNodePenalty is a scoring penalty for nodes that a failed allocation was previously run on
previousFailedAllocNodePenalty = 50.0
)
// Stack is a chained collection of iterators. The stack is used to
@@ -29,7 +32,12 @@ type Stack interface {
SetJob(job *structs.Job)
// Select is used to select a node for the task group
Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources)
Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources)
}
type SelectOptions struct {
PenaltyNodeIDs []string
PreferredNodes []*structs.Node
}
// GenericStack is the Stack used for the Generic scheduler. It is
@@ -49,6 +57,7 @@ type GenericStack struct {
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
nodeAntiAff *NodeAntiAffinityIterator
limit *LimitIterator
maxScore *MaxScoreIterator
}
@@ -111,8 +120,10 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
}
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "")
s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.jobAntiAff, 2)
s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
@@ -154,7 +165,22 @@ func (s *GenericStack) SetJob(job *structs.Job) {
}
}
func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) {
// This block handles trying to select from preferred nodes if options specify them
// It also sets back the set of nodes to the original nodes
if options != nil && len(options.PreferredNodes) > 0 {
originalNodes := s.source.nodes
s.source.SetNodes(options.PreferredNodes)
options.PreferredNodes = nil
if option, resources := s.Select(tg, options); option != nil {
s.source.SetNodes(originalNodes)
return option, resources
}
s.source.SetNodes(originalNodes)
return s.Select(tg, options)
}
// Reset the max selector and context
s.maxScore.Reset()
s.ctx.Reset()
@@ -170,6 +196,11 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
if options != nil {
s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs)
} else {
s.nodeAntiAff.SetPenaltyNodes(nil)
}
if contextual, ok := s.quota.(ContextualIterator); ok {
contextual.SetTaskGroup(tg)
@@ -190,19 +221,6 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
return option, tgConstr.size
}
// SelectPreferredNode returns a node where an allocation of the task group can
// be placed, the node passed to it is preferred over the other available nodes
func (s *GenericStack) SelectPreferringNodes(tg *structs.TaskGroup, nodes []*structs.Node) (*RankedNode, *structs.Resources) {
originalNodes := s.source.nodes
s.source.SetNodes(nodes)
if option, resources := s.Select(tg); option != nil {
s.source.SetNodes(originalNodes)
return option, resources
}
s.source.SetNodes(originalNodes)
return s.Select(tg)
}
// SystemStack is the Stack used for the System scheduler. It is designed to
// attempt to make placements on all nodes.
type SystemStack struct {
@@ -276,7 +294,7 @@ func (s *SystemStack) SetJob(job *structs.Job) {
}
}
func (s *SystemStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) {
// Reset the binpack selector and context
s.binPack.Reset()
s.ctx.Reset()

View File

@@ -47,8 +47,9 @@ func benchmarkServiceStack_MetaKeyConstraint(b *testing.B, key string, numNodes,
stack.SetJob(job)
b.ResetTimer()
selectOptions := &SelectOptions{}
for i := 0; i < b.N; i++ {
stack.Select(job.TaskGroups[0])
stack.Select(job.TaskGroups[0], selectOptions)
}
}
@@ -104,7 +105,8 @@ func TestServiceStack_Select_Size(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
node, size := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, size := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -138,7 +140,8 @@ func TestServiceStack_Select_PreferringNodes(t *testing.T) {
// Create a preferred node
preferredNode := mock.Node()
option, _ := stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode})
selectOptions := &SelectOptions{PreferredNodes: []*structs.Node{preferredNode}}
option, _ := stack.Select(job.TaskGroups[0], selectOptions)
if option == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -151,7 +154,8 @@ func TestServiceStack_Select_PreferringNodes(t *testing.T) {
preferredNode1 := preferredNode.Copy()
preferredNode1.Attributes["kernel.name"] = "windows"
preferredNode1.ComputeClass()
option, _ = stack.SelectPreferringNodes(job.TaskGroups[0], []*structs.Node{preferredNode1})
selectOptions = &SelectOptions{PreferredNodes: []*structs.Node{preferredNode1}}
option, _ = stack.Select(job.TaskGroups[0], selectOptions)
if option == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -174,7 +178,8 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
n1, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
n1, _ := stack.Select(job.TaskGroups[0], selectOptions)
m1 := ctx.Metrics()
if n1 == nil {
t.Fatalf("missing node %#v", m1)
@@ -184,7 +189,7 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) {
t.Fatalf("should only be 2")
}
n2, _ := stack.Select(job.TaskGroups[0])
n2, _ := stack.Select(job.TaskGroups[0], selectOptions)
m2 := ctx.Metrics()
if n2 == nil {
t.Fatalf("missing node %#v", m2)
@@ -215,7 +220,8 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) {
job.TaskGroups[0].Tasks[0].Driver = "foo"
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -243,8 +249,8 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) {
job := mock.Job()
job.Constraints[0].RTarget = "freebsd"
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -280,8 +286,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -347,7 +353,8 @@ func TestSystemStack_Select_Size(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
node, size := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, size := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -381,7 +388,8 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
n1, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
n1, _ := stack.Select(job.TaskGroups[0], selectOptions)
m1 := ctx.Metrics()
if n1 == nil {
t.Fatalf("missing node %#v", m1)
@@ -391,7 +399,7 @@ func TestSystemStack_Select_MetricsReset(t *testing.T) {
t.Fatalf("should only be 1")
}
n2, _ := stack.Select(job.TaskGroups[0])
n2, _ := stack.Select(job.TaskGroups[0], selectOptions)
m2 := ctx.Metrics()
if n2 == nil {
t.Fatalf("missing node %#v", m2)
@@ -418,7 +426,8 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) {
job.TaskGroups[0].Tasks[0].Driver = "foo"
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -435,7 +444,7 @@ func TestSystemStack_Select_DriverFilter(t *testing.T) {
stack = NewSystemStack(ctx)
stack.SetNodes(nodes)
stack.SetJob(job)
node, _ = stack.Select(job.TaskGroups[0])
node, _ = stack.Select(job.TaskGroups[0], selectOptions)
if node != nil {
t.Fatalf("node not filtered %#v", node)
}
@@ -460,7 +469,8 @@ func TestSystemStack_Select_ConstraintFilter(t *testing.T) {
job.Constraints[0].RTarget = "freebsd"
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}
@@ -497,7 +507,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
job := mock.Job()
stack.SetJob(job)
node, _ := stack.Select(job.TaskGroups[0])
selectOptions := &SelectOptions{}
node, _ := stack.Select(job.TaskGroups[0], selectOptions)
if node == nil {
t.Fatalf("missing node %#v", ctx.Metrics())
}

View File

@@ -275,7 +275,7 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error {
s.stack.SetNodes(nodes)
// Attempt to match the task group
option, _ := s.stack.Select(missing.TaskGroup)
option, _ := s.stack.Select(missing.TaskGroup, nil)
if option == nil {
// If nodes were filtered because of constraint mismatches and we

View File

@@ -511,7 +511,7 @@ func inplaceUpdate(ctx Context, eval *structs.Evaluation, job *structs.Job,
allocInPlace, "")
// Attempt to match the task group
option, _ := stack.Select(update.TaskGroup)
option, _ := stack.Select(update.TaskGroup, nil) // This select only looks at one node so we don't pass any node weight options
// Pop the allocation
ctx.Plan().PopUpdate(update.Alloc)
@@ -722,7 +722,7 @@ func updateNonTerminalAllocsToLost(plan *structs.Plan, tainted map[string]*struc
// genericAllocUpdateFn is a factory for the scheduler to create an allocUpdateType
// function to be passed into the reconciler. The factory takes objects that
// exist only in the scheduler context and returns a function that can be used
// by the reconciler to make decsions about how to update an allocation. The
// by the reconciler to make decisions about how to update an allocation. The
// factory allows the reconciler to be unaware of how to determine the type of
// update necessary and can minimize the set of objects it is exposed to.
func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateType {
@@ -767,7 +767,7 @@ func genericAllocUpdateFn(ctx Context, stack Stack, evalID string) allocUpdateTy
ctx.Plan().AppendUpdate(existing, structs.AllocDesiredStatusStop, allocInPlace, "")
// Attempt to match the task group
option, _ := stack.Select(newTG)
option, _ := stack.Select(newTG, nil) // This select only looks at one node so we don't pass any node weight options
// Pop the allocation
ctx.Plan().PopUpdate(existing)