mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Add helper methods, use require and other code review feedback
This commit is contained in:
@@ -434,24 +434,8 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
|
||||
}
|
||||
|
||||
// Compute penalty nodes for rescheduled allocs
|
||||
selectOptions := &SelectOptions{}
|
||||
if prevAllocation != nil {
|
||||
var penaltyNodes []string
|
||||
penaltyNodes = append(penaltyNodes, prevAllocation.NodeID)
|
||||
if prevAllocation.RescheduleTracker != nil {
|
||||
for _, reschedEvent := range prevAllocation.RescheduleTracker.Events {
|
||||
penaltyNodes = append(penaltyNodes, reschedEvent.PrevNodeID)
|
||||
}
|
||||
}
|
||||
selectOptions.PenaltyNodeIDs = penaltyNodes
|
||||
}
|
||||
|
||||
// Attempt to match the task group
|
||||
var option *RankedNode
|
||||
if preferredNode != nil {
|
||||
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
|
||||
}
|
||||
option, _ = s.stack.Select(tg, selectOptions)
|
||||
selectOptions := getSelectOptions(prevAllocation, preferredNode)
|
||||
option, _ := s.stack.Select(tg, selectOptions)
|
||||
|
||||
// Store the available nodes by datacenter
|
||||
s.ctx.Metrics().NodesAvailable = byDC
|
||||
@@ -480,16 +464,11 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
|
||||
|
||||
// If the new allocation is replacing an older allocation then we
|
||||
// set the record the older allocation id so that they are chained
|
||||
if prev := prevAllocation; prev != nil {
|
||||
alloc.PreviousAllocation = prev.ID
|
||||
var rescheduleEvents []*structs.RescheduleEvent
|
||||
if prev.RescheduleTracker != nil {
|
||||
for _, reschedEvent := range prev.RescheduleTracker.Events {
|
||||
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
|
||||
}
|
||||
if prevAllocation != nil {
|
||||
alloc.PreviousAllocation = prevAllocation.ID
|
||||
if tg.ReschedulePolicy != nil && tg.ReschedulePolicy.Attempts > 0 {
|
||||
updateRescheduleTracker(alloc, prevAllocation)
|
||||
}
|
||||
rescheduleEvents = append(rescheduleEvents, &structs.RescheduleEvent{RescheduleTime: time.Now().UTC().UnixNano(), PrevAllocID: prev.ID, PrevNodeID: alloc.NodeID})
|
||||
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
|
||||
}
|
||||
|
||||
// If we are placing a canary and we found a match, add the canary
|
||||
@@ -518,12 +497,44 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
|
||||
s.plan.PopUpdate(prevAllocation)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getSelectOptions sets up preferred nodes and penalty nodes
|
||||
func getSelectOptions(prevAllocation *structs.Allocation, preferredNode *structs.Node) *SelectOptions {
|
||||
selectOptions := &SelectOptions{}
|
||||
if prevAllocation != nil {
|
||||
penaltyNodes := make(map[string]struct{})
|
||||
penaltyNodes[prevAllocation.NodeID] = struct{}{}
|
||||
if prevAllocation.RescheduleTracker != nil {
|
||||
for _, reschedEvent := range prevAllocation.RescheduleTracker.Events {
|
||||
penaltyNodes[reschedEvent.PrevNodeID] = struct{}{}
|
||||
}
|
||||
}
|
||||
selectOptions.PenaltyNodeIDs = penaltyNodes
|
||||
}
|
||||
if preferredNode != nil {
|
||||
selectOptions.PreferredNodes = []*structs.Node{preferredNode}
|
||||
}
|
||||
return selectOptions
|
||||
}
|
||||
|
||||
// updateRescheduleTracker sets up the previous alloc id and
|
||||
func updateRescheduleTracker(alloc *structs.Allocation, prev *structs.Allocation) {
|
||||
var rescheduleEvents []*structs.RescheduleEvent
|
||||
if prev.RescheduleTracker != nil {
|
||||
for _, reschedEvent := range prev.RescheduleTracker.Events {
|
||||
rescheduleEvents = append(rescheduleEvents, reschedEvent.Copy())
|
||||
}
|
||||
}
|
||||
rescheduleEvents = append(rescheduleEvents, &structs.RescheduleEvent{RescheduleTime: time.Now().UTC().UnixNano(), PrevAllocID: prev.ID, PrevNodeID: alloc.NodeID})
|
||||
alloc.RescheduleTracker = &structs.RescheduleTracker{Events: rescheduleEvents}
|
||||
}
|
||||
|
||||
// findPreferredNode finds the preferred node for an allocation
|
||||
func (s *GenericScheduler) findPreferredNode(place placementResult) (node *structs.Node, err error) {
|
||||
if prev := place.PreviousAllocation(); prev != nil && place.TaskGroup().EphemeralDisk.Sticky == true {
|
||||
|
||||
@@ -2888,7 +2888,6 @@ func TestServiceSched_Reschedule_Multiple(t *testing.T) {
|
||||
|
||||
// Find the new alloc with ClientStatusPending
|
||||
var pendingAllocs []*structs.Allocation
|
||||
fmt.Println("Iteration: ", i)
|
||||
for _, alloc := range out {
|
||||
if alloc.ClientStatus == structs.AllocClientStatusPending {
|
||||
pendingAllocs = append(pendingAllocs, alloc)
|
||||
|
||||
@@ -326,11 +326,7 @@ func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float
|
||||
return iter
|
||||
}
|
||||
|
||||
func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(nodes []string) {
|
||||
penaltyNodes := make(map[string]struct{})
|
||||
for _, node := range nodes {
|
||||
penaltyNodes[node] = struct{}{}
|
||||
}
|
||||
func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
|
||||
iter.penaltyNodes = penaltyNodes
|
||||
}
|
||||
|
||||
@@ -351,5 +347,6 @@ func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
|
||||
}
|
||||
|
||||
func (iter *NodeAntiAffinityIterator) Reset() {
|
||||
iter.penaltyNodes = make(map[string]struct{})
|
||||
iter.source.Reset()
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ import (
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/stretchr/testify/assert"
|
||||
require "github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestFeasibleRankIterator(t *testing.T) {
|
||||
@@ -451,15 +451,16 @@ func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) {
|
||||
static := NewStaticRankIterator(ctx, nodes)
|
||||
|
||||
nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0)
|
||||
nodeAntiAffIter.SetPenaltyNodes([]string{node1.ID})
|
||||
nodeAntiAffIter.SetPenaltyNodes(map[string]struct{}{node1.ID: {}})
|
||||
|
||||
out := collectRanked(nodeAntiAffIter)
|
||||
assert := assert.New(t)
|
||||
assert.Equal(2, len(out))
|
||||
assert.Equal(node1.ID, out[0].Node.ID)
|
||||
assert.Equal(-50.0, out[0].Score)
|
||||
|
||||
assert.Equal(node2.ID, out[1].Node.ID)
|
||||
assert.Equal(0.0, out[1].Score)
|
||||
require := require.New(t)
|
||||
require.Equal(2, len(out))
|
||||
require.Equal(node1.ID, out[0].Node.ID)
|
||||
require.Equal(-50.0, out[0].Score)
|
||||
|
||||
require.Equal(node2.ID, out[1].Node.ID)
|
||||
require.Equal(0.0, out[1].Score)
|
||||
|
||||
}
|
||||
|
||||
@@ -305,6 +305,7 @@ func (a *allocReconciler) computeGroup(group string, all allocSet) bool {
|
||||
// Determine what set of allocations are on tainted nodes
|
||||
untainted, migrate, lost := all.filterByTainted(a.taintedNodes)
|
||||
|
||||
// Determine what set of terminal allocations need to be rescheduled
|
||||
untainted, reschedule := untainted.filterByRescheduleable(a.batch, tg.ReschedulePolicy)
|
||||
|
||||
// Create a structure for choosing names. Seed with the taken names which is
|
||||
@@ -610,7 +611,7 @@ func (a *allocReconciler) computeLimit(group *structs.TaskGroup, untainted, dest
|
||||
}
|
||||
|
||||
// computePlacement returns the set of allocations to place given the group
|
||||
// definition, the set of untainted and migrating allocations for the group.
|
||||
// definition, the set of untainted, migrating and reschedule allocations for the group.
|
||||
func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
|
||||
nameIndex *allocNameIndex, untainted, migrate allocSet, reschedule allocSet) []allocPlaceResult {
|
||||
|
||||
@@ -621,6 +622,7 @@ func (a *allocReconciler) computePlacements(group *structs.TaskGroup,
|
||||
}
|
||||
var place []allocPlaceResult
|
||||
// Add rescheduled placement results
|
||||
// Any allocations being rescheduled will remain at DesiredStatusRun ClientStatusFailed
|
||||
for _, alloc := range reschedule {
|
||||
place = append(place, allocPlaceResult{
|
||||
name: alloc.Name,
|
||||
@@ -668,6 +670,14 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||
return stop
|
||||
}
|
||||
|
||||
// Filter out any terminal allocations from the untainted set
|
||||
// This is so that we don't try to mark them as stopped redundantly
|
||||
for id, alloc := range untainted {
|
||||
if alloc.TerminalStatus() {
|
||||
delete(untainted, id)
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer stopping any alloc that has the same name as the canaries if we
|
||||
// are promoted
|
||||
if !canaryState && len(canaries) != 0 {
|
||||
@@ -716,9 +726,6 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||
removeNames := nameIndex.Highest(uint(remove))
|
||||
for id, alloc := range untainted {
|
||||
if _, ok := removeNames[alloc.Name]; ok {
|
||||
if alloc.TerminalStatus() {
|
||||
continue
|
||||
}
|
||||
stop[id] = alloc
|
||||
a.result.stop = append(a.result.stop, allocStopResult{
|
||||
alloc: alloc,
|
||||
@@ -736,9 +743,6 @@ func (a *allocReconciler) computeStop(group *structs.TaskGroup, nameIndex *alloc
|
||||
// It is possible that we didn't stop as many as we should have if there
|
||||
// were allocations with duplicate names.
|
||||
for id, alloc := range untainted {
|
||||
if alloc.TerminalStatus() {
|
||||
continue
|
||||
}
|
||||
stop[id] = alloc
|
||||
a.result.stop = append(a.result.stop, allocStopResult{
|
||||
alloc: alloc,
|
||||
|
||||
@@ -36,7 +36,7 @@ type Stack interface {
|
||||
}
|
||||
|
||||
type SelectOptions struct {
|
||||
PenaltyNodeIDs []string
|
||||
PenaltyNodeIDs map[string]struct{}
|
||||
PreferredNodes []*structs.Node
|
||||
}
|
||||
|
||||
@@ -198,8 +198,6 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R
|
||||
s.binPack.SetTaskGroup(tg)
|
||||
if options != nil {
|
||||
s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs)
|
||||
} else {
|
||||
s.nodeAntiAff.SetPenaltyNodes(nil)
|
||||
}
|
||||
|
||||
if contextual, ok := s.quota.(ContextualIterator); ok {
|
||||
|
||||
Reference in New Issue
Block a user