mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
scheduler: adding minor specialization for batch
This commit is contained in:
@@ -9,9 +9,13 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// maxScheduleAttempts is used to limit the number of times
|
||||
// we will attempt to schedule if we continue to hit conflicts.
|
||||
maxScheduleAttempts = 5
|
||||
// maxServiceScheduleAttempts is used to limit the number of times
|
||||
// we will attempt to schedule if we continue to hit conflicts for services.
|
||||
maxServiceScheduleAttempts = 5
|
||||
|
||||
// maxBatchScheduleAttempts is used to limit the number of times
|
||||
// we will attempt to schedule if we continue to hit conflicts for batch.
|
||||
maxBatchScheduleAttempts = 2
|
||||
)
|
||||
|
||||
// GenericScheduler is used for 'service' and 'batch' type jobs. This scheduler is
|
||||
@@ -66,7 +70,11 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error {
|
||||
s.eval = eval
|
||||
|
||||
// Retry up to the maxScheduleAttempts
|
||||
return retryMax(maxScheduleAttempts, s.process)
|
||||
limit := maxServiceScheduleAttempts
|
||||
if s.batch {
|
||||
limit = maxBatchScheduleAttempts
|
||||
}
|
||||
return retryMax(limit, s.process)
|
||||
}
|
||||
|
||||
// process is wrapped in retryMax to iteratively run the handler until we have no
|
||||
@@ -184,7 +192,7 @@ func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTupl
|
||||
}
|
||||
|
||||
// Construct the placement stack
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(s.batch, ctx, nodes)
|
||||
|
||||
for _, missing := range place {
|
||||
option, size := stack.Select(missing.TaskGroup)
|
||||
|
||||
@@ -18,9 +18,10 @@ type Stack interface {
|
||||
Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources)
|
||||
}
|
||||
|
||||
// ServiceStack is the Stack used for the Service scheduler. It is
|
||||
// GenericStack is the Stack used for the Generic scheduler. It is
|
||||
// designed to make better placement decisions at the cost of performance.
|
||||
type ServiceStack struct {
|
||||
type GenericStack struct {
|
||||
batch bool
|
||||
ctx Context
|
||||
jobConstraint *ConstraintIterator
|
||||
taskGroupDrivers *DriverIterator
|
||||
@@ -29,11 +30,12 @@ type ServiceStack struct {
|
||||
maxScore *MaxScoreIterator
|
||||
}
|
||||
|
||||
// NewServiceStack constructs a stack used for selecting service placements
|
||||
func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack {
|
||||
// NewGenericStack constructs a stack used for selecting service placements
|
||||
func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *GenericStack {
|
||||
// Create a new stack
|
||||
stack := &ServiceStack{
|
||||
ctx: ctx,
|
||||
stack := &GenericStack{
|
||||
batch: batch,
|
||||
ctx: ctx,
|
||||
}
|
||||
|
||||
// Create the source iterator. We randomize the order we visit nodes
|
||||
@@ -53,14 +55,19 @@ func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack {
|
||||
// Upgrade from feasible to rank iterator
|
||||
rankSource := NewFeasibleRankIterator(ctx, stack.taskGroupConstraint)
|
||||
|
||||
// Apply the bin packing, this depends on the resources needed by a particular task group.
|
||||
stack.binPack = NewBinPackIterator(ctx, rankSource, nil, true, 0)
|
||||
// Apply the bin packing, this depends on the resources needed
|
||||
// by a particular task group. Only enable eviction for the service
|
||||
// scheduler as that logic is expensive.
|
||||
evict := !batch
|
||||
stack.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0)
|
||||
|
||||
// Apply a limit function. This is to avoid scanning *every* possible node.
|
||||
// Instead we need to visit "enough". Using a log of the total number of
|
||||
// nodes is a good restriction, with at least 2 as the floor
|
||||
// For batch jobs we only need to evaluate 2 options and depend on the
|
||||
// powwer of two choices. For services jobs we need to visit "enough".
|
||||
// Using a log of the total number of nodes is a good restriction, with
|
||||
// at least 2 as the floor
|
||||
limit := 2
|
||||
if n := len(baseNodes); n > 0 {
|
||||
if n := len(baseNodes); !batch && n > 0 {
|
||||
logLimit := int(math.Ceil(math.Log2(float64(n))))
|
||||
if logLimit > limit {
|
||||
limit = logLimit
|
||||
@@ -73,12 +80,12 @@ func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack {
|
||||
return stack
|
||||
}
|
||||
|
||||
func (s *ServiceStack) SetJob(job *structs.Job) {
|
||||
func (s *GenericStack) SetJob(job *structs.Job) {
|
||||
s.jobConstraint.SetConstraints(job.Constraints)
|
||||
s.binPack.SetPriority(job.Priority)
|
||||
}
|
||||
|
||||
func (s *ServiceStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
|
||||
func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) {
|
||||
// Reset the max selector and context
|
||||
s.maxScore.Reset()
|
||||
s.ctx.Reset()
|
||||
|
||||
@@ -10,7 +10,7 @@ import (
|
||||
|
||||
func TestServiceStack_SetJob(t *testing.T) {
|
||||
_, ctx := testContext(t)
|
||||
stack := NewServiceStack(ctx, nil)
|
||||
stack := NewGenericStack(false, ctx, nil)
|
||||
|
||||
job := mock.Job()
|
||||
stack.SetJob(job)
|
||||
@@ -28,7 +28,7 @@ func TestServiceStack_Select_Size(t *testing.T) {
|
||||
nodes := []*structs.Node{
|
||||
mock.Node(),
|
||||
}
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(false, ctx, nodes)
|
||||
|
||||
job := mock.Job()
|
||||
stack.SetJob(job)
|
||||
@@ -58,7 +58,7 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) {
|
||||
mock.Node(),
|
||||
mock.Node(),
|
||||
}
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(false, ctx, nodes)
|
||||
|
||||
job := mock.Job()
|
||||
stack.SetJob(job)
|
||||
@@ -93,7 +93,7 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) {
|
||||
zero := nodes[0]
|
||||
zero.Attributes["driver.foo"] = "1"
|
||||
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(false, ctx, nodes)
|
||||
|
||||
job := mock.Job()
|
||||
job.TaskGroups[0].Tasks[0].Driver = "foo"
|
||||
@@ -118,7 +118,7 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) {
|
||||
zero := nodes[0]
|
||||
zero.Attributes["os"] = "freebsd"
|
||||
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(false, ctx, nodes)
|
||||
|
||||
job := mock.Job()
|
||||
job.Constraints[0].RTarget = "freebsd"
|
||||
@@ -155,7 +155,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
|
||||
one := nodes[1]
|
||||
one.Reserved = one.Resources
|
||||
|
||||
stack := NewServiceStack(ctx, nodes)
|
||||
stack := NewGenericStack(false, ctx, nodes)
|
||||
|
||||
job := mock.Job()
|
||||
stack.SetJob(job)
|
||||
|
||||
Reference in New Issue
Block a user