From 6a39f5b5da16d3d5852e93591129e015d527e5ea Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 13 Aug 2015 22:35:48 -0700 Subject: [PATCH] scheduler: adding minor specialization for batch --- scheduler/generic_sched.go | 18 +++++++++++++----- scheduler/stack.go | 33 ++++++++++++++++++++------------- scheduler/stack_test.go | 12 ++++++------ 3 files changed, 39 insertions(+), 24 deletions(-) diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go index 63aa4aa32..7594e3260 100644 --- a/scheduler/generic_sched.go +++ b/scheduler/generic_sched.go @@ -9,9 +9,13 @@ import ( ) const ( - // maxScheduleAttempts is used to limit the number of times - // we will attempt to schedule if we continue to hit conflicts. - maxScheduleAttempts = 5 + // maxServiceScheduleAttempts is used to limit the number of times + // we will attempt to schedule if we continue to hit conflicts for services. + maxServiceScheduleAttempts = 5 + + // maxBatchScheduleAttempts is used to limit the number of times + // we will attempt to schedule if we continue to hit conflicts for batch. + maxBatchScheduleAttempts = 2 ) // GenericScheduler is used for 'service' and 'batch' type jobs. This scheduler is @@ -66,7 +70,11 @@ func (s *GenericScheduler) Process(eval *structs.Evaluation) error { s.eval = eval // Retry up to the maxScheduleAttempts - return retryMax(maxScheduleAttempts, s.process) + limit := maxServiceScheduleAttempts + if s.batch { + limit = maxBatchScheduleAttempts + } + return retryMax(limit, s.process) } // process is wrapped in retryMax to iteratively run the handler until we have no @@ -184,7 +192,7 @@ func (s *GenericScheduler) computePlacements(job *structs.Job, place []allocTupl } // Construct the placement stack - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(s.batch, ctx, nodes) for _, missing := range place { option, size := stack.Select(missing.TaskGroup) diff --git a/scheduler/stack.go b/scheduler/stack.go index cd6d5dc6c..0a174a29b 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -18,9 +18,10 @@ type Stack interface { Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) } -// ServiceStack is the Stack used for the Service scheduler. It is +// GenericStack is the Stack used for the Generic scheduler. It is // designed to make better placement decisions at the cost of performance. -type ServiceStack struct { +type GenericStack struct { + batch bool ctx Context jobConstraint *ConstraintIterator taskGroupDrivers *DriverIterator @@ -29,11 +30,12 @@ type ServiceStack struct { maxScore *MaxScoreIterator } -// NewServiceStack constructs a stack used for selecting service placements -func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack { +// NewGenericStack constructs a stack used for selecting service placements +func NewGenericStack(batch bool, ctx Context, baseNodes []*structs.Node) *GenericStack { // Create a new stack - stack := &ServiceStack{ - ctx: ctx, + stack := &GenericStack{ + batch: batch, + ctx: ctx, } // Create the source iterator. We randomize the order we visit nodes @@ -53,14 +55,19 @@ func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack { // Upgrade from feasible to rank iterator rankSource := NewFeasibleRankIterator(ctx, stack.taskGroupConstraint) - // Apply the bin packing, this depends on the resources needed by a particular task group. - stack.binPack = NewBinPackIterator(ctx, rankSource, nil, true, 0) + // Apply the bin packing, this depends on the resources needed + // by a particular task group. Only enable eviction for the service + // scheduler as that logic is expensive. + evict := !batch + stack.binPack = NewBinPackIterator(ctx, rankSource, nil, evict, 0) // Apply a limit function. This is to avoid scanning *every* possible node. - // Instead we need to visit "enough". Using a log of the total number of - // nodes is a good restriction, with at least 2 as the floor + // For batch jobs we only need to evaluate 2 options and depend on the + // powwer of two choices. For services jobs we need to visit "enough". + // Using a log of the total number of nodes is a good restriction, with + // at least 2 as the floor limit := 2 - if n := len(baseNodes); n > 0 { + if n := len(baseNodes); !batch && n > 0 { logLimit := int(math.Ceil(math.Log2(float64(n)))) if logLimit > limit { limit = logLimit @@ -73,12 +80,12 @@ func NewServiceStack(ctx Context, baseNodes []*structs.Node) *ServiceStack { return stack } -func (s *ServiceStack) SetJob(job *structs.Job) { +func (s *GenericStack) SetJob(job *structs.Job) { s.jobConstraint.SetConstraints(job.Constraints) s.binPack.SetPriority(job.Priority) } -func (s *ServiceStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { +func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Resources) { // Reset the max selector and context s.maxScore.Reset() s.ctx.Reset() diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 025f55127..e327213e0 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -10,7 +10,7 @@ import ( func TestServiceStack_SetJob(t *testing.T) { _, ctx := testContext(t) - stack := NewServiceStack(ctx, nil) + stack := NewGenericStack(false, ctx, nil) job := mock.Job() stack.SetJob(job) @@ -28,7 +28,7 @@ func TestServiceStack_Select_Size(t *testing.T) { nodes := []*structs.Node{ mock.Node(), } - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(false, ctx, nodes) job := mock.Job() stack.SetJob(job) @@ -58,7 +58,7 @@ func TestServiceStack_Select_MetricsReset(t *testing.T) { mock.Node(), mock.Node(), } - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(false, ctx, nodes) job := mock.Job() stack.SetJob(job) @@ -93,7 +93,7 @@ func TestServiceStack_Select_DriverFilter(t *testing.T) { zero := nodes[0] zero.Attributes["driver.foo"] = "1" - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(false, ctx, nodes) job := mock.Job() job.TaskGroups[0].Tasks[0].Driver = "foo" @@ -118,7 +118,7 @@ func TestServiceStack_Select_ConstraintFilter(t *testing.T) { zero := nodes[0] zero.Attributes["os"] = "freebsd" - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(false, ctx, nodes) job := mock.Job() job.Constraints[0].RTarget = "freebsd" @@ -155,7 +155,7 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { one := nodes[1] one.Reserved = one.Resources - stack := NewServiceStack(ctx, nodes) + stack := NewGenericStack(false, ctx, nodes) job := mock.Job() stack.SetJob(job)