Merge pull request #8065 from hashicorp/f-oss-preemption

Open source Preemption
This commit is contained in:
Mahmood Ali
2020-05-27 16:56:51 -04:00
committed by GitHub
9 changed files with 377 additions and 116 deletions

View File

@@ -1,3 +1,8 @@
## 0.12.0 (Unreleased)
FEATURES:
* **Preemption**: Preemption is now an open source feature
## 0.11.3 (Unreleased)
IMPROVEMENTS:

View File

@@ -642,3 +642,49 @@ func (s *GenericScheduler) findPreferredNode(place placementResult) (*structs.No
}
return nil, nil
}
// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
option := s.stack.Select(tg, selectOptions)
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
// Check if preemption is enabled, defaults to true
enablePreemption := true
if schedConfig != nil {
if s.job.Type == structs.JobTypeBatch {
enablePreemption = schedConfig.PreemptionConfig.BatchSchedulerEnabled
} else {
enablePreemption = schedConfig.PreemptionConfig.ServiceSchedulerEnabled
}
}
// Run stack again with preemption enabled
if option == nil && enablePreemption {
selectOptions.Preempt = true
option = s.stack.Select(tg, selectOptions)
}
return option
}
// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
if option.PreemptedAllocs == nil {
return
}
// If this placement involves preemption, set DesiredState to evict for those allocations
var preemptedAllocIDs []string
for _, stop := range option.PreemptedAllocs {
s.plan.AppendPreemptedAlloc(stop, alloc.ID)
preemptedAllocIDs = append(preemptedAllocIDs, stop.ID)
if s.eval.AnnotatePlan && s.plan.Annotations != nil {
s.plan.Annotations.PreemptedAllocs = append(s.plan.Annotations.PreemptedAllocs, stop.Stub())
if s.plan.Annotations.DesiredTGUpdates != nil {
desired := s.plan.Annotations.DesiredTGUpdates[missing.TaskGroup().Name]
desired.Preemptions += 1
}
}
}
alloc.PreemptedAllocations = preemptedAllocIDs
}

View File

@@ -1,15 +0,0 @@
// +build !ent
package scheduler
import "github.com/hashicorp/nomad/nomad/structs"
// selectNextOption calls the stack to get a node for placement
func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions *SelectOptions) *RankedNode {
return s.stack.Select(tg, selectOptions)
}
// handlePreemptions sets relevant preeemption related fields. In OSS this is a no op.
func (s *GenericScheduler) handlePreemptions(option *RankedNode, alloc *structs.Allocation, missing placementResult) {
}

View File

@@ -5183,3 +5183,161 @@ func Test_updateRescheduleTracker(t *testing.T) {
}
}
func TestServiceSched_Preemption(t *testing.T) {
require := require.New(t)
h := NewHarness(t)
// Create a node
node := mock.Node()
node.Resources = nil
node.ReservedResources = nil
node.NodeResources = &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 1000,
},
Memory: structs.NodeMemoryResources{
MemoryMB: 2048,
},
Disk: structs.NodeDiskResources{
DiskMB: 100 * 1024,
},
Networks: []*structs.NetworkResource{
{
Device: "eth0",
CIDR: "192.168.0.100/32",
MBits: 1000,
},
},
}
node.ReservedResources = &structs.NodeReservedResources{
Cpu: structs.NodeReservedCpuResources{
CpuShares: 50,
},
Memory: structs.NodeReservedMemoryResources{
MemoryMB: 256,
},
Disk: structs.NodeReservedDiskResources{
DiskMB: 4 * 1024,
},
Networks: structs.NodeReservedNetworkResources{
ReservedHostPorts: "22",
},
}
require.NoError(h.State.UpsertNode(h.NextIndex(), node))
// Create a couple of jobs and schedule them
job1 := mock.Job()
job1.TaskGroups[0].Count = 1
job1.Priority = 30
r1 := job1.TaskGroups[0].Tasks[0].Resources
r1.CPU = 500
r1.MemoryMB = 1024
r1.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job1))
job2 := mock.Job()
job2.TaskGroups[0].Count = 1
job2.Priority = 50
r2 := job2.TaskGroups[0].Tasks[0].Resources
r2.CPU = 350
r2.MemoryMB = 512
r2.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job2))
// Create a mock evaluation to register the jobs
eval1 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job1.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job1.ID,
Status: structs.EvalStatusPending,
}
eval2 := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job2.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job2.ID,
Status: structs.EvalStatusPending,
}
require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval1, eval2}))
expectedPreemptedAllocs := make(map[string]struct{})
// Process the two evals for job1 and job2 and make sure they allocated
for index, eval := range []*structs.Evaluation{eval1, eval2} {
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.Nil(err)
plan := h.Plans[index]
// Ensure the plan doesn't have annotations.
require.Nil(plan.Annotations)
// Ensure the eval has no spawned blocked eval
require.Equal(0, len(h.CreateEvals))
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Equal(1, len(planned))
expectedPreemptedAllocs[planned[0].ID] = struct{}{}
}
// Create a higher priority job
job3 := mock.Job()
job3.Priority = 100
job3.TaskGroups[0].Count = 1
r3 := job3.TaskGroups[0].Tasks[0].Resources
r3.CPU = 900
r3.MemoryMB = 1700
r3.Networks = nil
require.NoError(h.State.UpsertJob(h.NextIndex(), job3))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job3.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job3.ID,
Status: structs.EvalStatusPending,
}
require.NoError(h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.Nil(err)
// New plan should be the third one in the harness
plan := h.Plans[2]
// Ensure the eval has no spawned blocked eval
require.Equal(0, len(h.CreateEvals))
// Ensure the plan allocated
var planned []*structs.Allocation
for _, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
}
require.Equal(1, len(planned))
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job3.Namespace, job3.ID, false)
require.NoError(err)
// Ensure all allocations placed
require.Equal(1, len(out))
actualPreemptedAllocs := make(map[string]struct{})
for _, id := range out[0].PreemptedAllocations {
actualPreemptedAllocs[id] = struct{}{}
}
require.Equal(expectedPreemptedAllocs, actualPreemptedAllocs)
}

View File

@@ -705,3 +705,76 @@ func (iter *ScoreNormalizationIterator) Next() *RankedNode {
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
return option
}
// PreemptionScoringIterator is used to score nodes according to the
// combination of preemptible allocations in them
type PreemptionScoringIterator struct {
ctx Context
source RankIterator
}
// PreemptionScoringIterator is used to create a score based on net aggregate priority
// of preempted allocations
func NewPreemptionScoringIterator(ctx Context, source RankIterator) RankIterator {
return &PreemptionScoringIterator{
ctx: ctx,
source: source,
}
}
func (iter *PreemptionScoringIterator) Reset() {
iter.source.Reset()
}
func (iter *PreemptionScoringIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil || option.PreemptedAllocs == nil {
return option
}
netPriority := netPriority(option.PreemptedAllocs)
// preemption score is inversely proportional to netPriority
preemptionScore := preemptionScore(netPriority)
option.Scores = append(option.Scores, preemptionScore)
iter.ctx.Metrics().ScoreNode(option.Node, "preemption", preemptionScore)
return option
}
// netPriority is a scoring heuristic that represents a combination of two factors.
// First factor is the max priority in the set of allocations, with
// an additional factor that takes into account the individual priorities of allocations
func netPriority(allocs []*structs.Allocation) float64 {
sumPriority := 0
max := 0.0
for _, alloc := range allocs {
if float64(alloc.Job.Priority) > max {
max = float64(alloc.Job.Priority)
}
sumPriority += alloc.Job.Priority
}
// We use the maximum priority across all allocations
// with an additional penalty that increases proportional to the
// ratio of the sum by max
// This ensures that we penalize nodes that have a low max but a high
// number of preemptible allocations
ret := max + (float64(sumPriority) / max)
return ret
}
// preemptionScore is calculated using a logistic function
// see https://www.desmos.com/calculator/alaeiuaiey for a visual representation of the curve.
// Lower values of netPriority get a score closer to 1 and the inflection point is around 2048
// The score is modelled to be between 0 and 1 because its combined with other
// scoring factors like bin packing
func preemptionScore(netPriority float64) float64 {
// These values were chosen such that a net priority of 2048 would get a preemption score of 0.5
// rate is the decay parameter of the logistic function used in scoring preemption options
const rate = 0.0048
// origin controls the inflection point of the logistic function used in scoring preemption options
const origin = 2048.0
// This function manifests as an s curve that asympotically moves towards zero for large values of netPriority
return 1.0 / (1 + math.Exp(rate*(netPriority-origin)))
}

View File

@@ -296,3 +296,92 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran
s.ctx.Metrics().AllocationTime = time.Since(start)
return option
}
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := ctx.State().SchedulerConfig()
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig.EffectiveSchedulerAlgorithm())
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Add the preemption options scoring iterator
preemptionScorer := NewPreemptionScoringIterator(ctx, s.spread)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, preemptionScorer)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}

View File

@@ -1,91 +0,0 @@
// +build !ent
package scheduler
// NewGenericStack constructs a stack used for selecting service placements
func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Create a new stack
s := &GenericStack{
batch: batch,
ctx: ctx,
}
// Create the source iterator. We randomize the order we visit nodes
// to reduce collisions between schedulers and to do a basic load
// balancing across eligible nodes.
s.source = NewRandomIterator(ctx, nil)
// Create the quota iterator to determine if placements would result in the
// quota attached to the namespace of the job to go over.
s.quota = NewQuotaIterator(ctx, s.source)
// Attach the job constraints. The job is filled in later.
s.jobConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group drivers first as they are faster
s.taskGroupDrivers = NewDriverChecker(ctx, nil)
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintChecker(ctx, nil)
// Filter on task group devices
s.taskGroupDevices = NewDeviceChecker(ctx)
// Filter on task group host volumes
s.taskGroupHostVolumes = NewHostVolumeChecker(ctx)
// Filter on available, healthy CSI plugins
s.taskGroupCSIVolumes = NewCSIVolumeChecker(ctx)
// Create the feasibility wrapper which wraps all feasibility checks in
// which feasibility checking can be skipped if the computed node class has
// previously been marked as eligible or ineligible. Generally this will be
// checks that only needs to examine the single node to determine feasibility.
jobs := []FeasibilityChecker{s.jobConstraint}
tgs := []FeasibilityChecker{s.taskGroupDrivers,
s.taskGroupConstraint,
s.taskGroupHostVolumes,
s.taskGroupDevices}
avail := []FeasibilityChecker{s.taskGroupCSIVolumes}
s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs, avail)
// Filter on distinct host constraints.
s.distinctHostsConstraint = NewDistinctHostsIterator(ctx, s.wrappedChecks)
// Filter on distinct property constraints.
s.distinctPropertyConstraint = NewDistinctPropertyIterator(ctx, s.distinctHostsConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.distinctPropertyConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
schedulerAlgorithm := schedConfig.EffectiveSchedulerAlgorithm()
s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedulerAlgorithm)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
// Apply node rescheduling penalty. This tries to avoid placing on a
// node where the allocation failed previously
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
// Apply scores based on affinity stanza
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
// Apply scores based on spread stanza
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
// Normalizes scores by averaging them across various scorers
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
return s
}

View File

@@ -386,10 +386,10 @@ $ curl \
- `SystemSchedulerEnabled` `(bool: true)` - Specifies whether preemption for system jobs is enabled. Note that
this defaults to true.
- `BatchSchedulerEnabled` `(bool: false)` (Enterprise Only) - Specifies whether preemption for batch jobs is enabled. Note that
- `BatchSchedulerEnabled` `(bool: false)` - Specifies whether preemption for batch jobs is enabled. Note that
this defaults to false and must be explicitly enabled.
- `ServiceSchedulerEnabled` `(bool: false)` (Enterprise Only) - Specifies whether preemption for service jobs is enabled. Note that
- `ServiceSchedulerEnabled` `(bool: false)` - Specifies whether preemption for service jobs is enabled. Note that
this defaults to false and must be explicitly enabled.
- `CreateIndex` - The Raft index at which the config was created.
@@ -447,11 +447,11 @@ server state is authoritative.
system jobs is enabled. Note that if this is set to true, then system jobs
can preempt any other jobs.
- `BatchSchedulerEnabled` `(bool: false)` (Enterprise Only) - Specifies
- `BatchSchedulerEnabled` `(bool: false)` - Specifies
whether preemption for batch jobs is enabled. Note that if this is set to
true, then batch jobs can preempt any other jobs.
- `ServiceSchedulerEnabled` `(bool: false)` (Enterprise Only) - Specifies
- `ServiceSchedulerEnabled` `(bool: false)` - Specifies
whether preemption for service jobs is enabled. Note that if this is set to
true, then service jobs can preempt any other jobs.
@@ -470,4 +470,4 @@ server state is authoritative.
- `Index` - Current Raft index when the request was received.
[`default_scheduler_config`]: /docs/configuration/server#default_scheduler_config
[`default_scheduler_config`]: /docs/configuration/server#default_scheduler_config

View File

@@ -28,11 +28,7 @@ into the plan queue.
# Details
~> **Enterprise Functionality** System job preemption is available as an Open Source
feature, while Batch and Service job preemption are only available as
Enterprise features.
Preemption is enabled by default in Nomad 0.9. Operators can use the [scheduler config](/api-docs/operator#update-scheduler-configuration) API endpoint to disable preemption.
Preemption is enabled by default in Nomad 0.9 for system jobs. Operators can use the [scheduler config](/api-docs/operator#update-scheduler-configuration) API endpoint to disable preemption.
Nomad uses the [job priority](/docs/job-specification/job#priority) field to determine what running allocations can be preempted.
In order to prevent a cascade of preemptions due to jobs close in priority being preempted, only allocations from jobs with a priority