From 8f32e65227f87db05a0bb0f235b5e11b4f93be72 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 20 Feb 2016 13:23:34 -0800 Subject: [PATCH] nomad: Allow resize of EvaluatePool --- nomad/plan_apply_pool.go | 56 +++++++++++++++++++++++++++-------- nomad/plan_apply_pool_test.go | 11 +++++++ 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/nomad/plan_apply_pool.go b/nomad/plan_apply_pool.go index b5c3d6ab1..a0d69676a 100644 --- a/nomad/plan_apply_pool.go +++ b/nomad/plan_apply_pool.go @@ -19,9 +19,10 @@ const ( // if a plan is valid. It can be used to parallelize the evaluation // of a plan. type EvaluatePool struct { - workers int - req chan evaluateRequest - res chan evaluateResult + workers int + workerStop []chan struct{} + req chan evaluateRequest + res chan evaluateResult } type evaluateRequest struct { @@ -39,16 +40,41 @@ type evaluateResult struct { // NewEvaluatePool returns a pool of the given size. func NewEvaluatePool(workers, bufSize int) *EvaluatePool { p := &EvaluatePool{ - workers: workers, - req: make(chan evaluateRequest, bufSize), - res: make(chan evaluateResult, bufSize), + workers: workers, + workerStop: make([]chan struct{}, workers), + req: make(chan evaluateRequest, bufSize), + res: make(chan evaluateResult, bufSize), } for i := 0; i < workers; i++ { - go p.run() + stopCh := make(chan struct{}) + p.workerStop[i] = stopCh + go p.run(stopCh) } return p } +// SetSize is used to resize the worker pool +func (p *EvaluatePool) SetSize(size int) { + // Handle an upwards resize + if size >= p.workers { + for i := p.workers; i < size; i++ { + stopCh := make(chan struct{}) + p.workerStop = append(p.workerStop, stopCh) + go p.run(stopCh) + } + p.workers = size + return + } + + // Handle a downwards resize + for i := p.workers; i > size; i-- { + close(p.workerStop[i-1]) + p.workerStop[i-1] = nil + } + p.workerStop = p.workerStop[:size] + p.workers = size +} + // RequestCh is used to push requests func (p *EvaluatePool) RequestCh() chan<- evaluateRequest { return p.req @@ -61,13 +87,19 @@ func (p *EvaluatePool) ResultCh() <-chan evaluateResult { // Shutdown is used to shutdown the pool func (p *EvaluatePool) Shutdown() { - close(p.req) + p.SetSize(0) } // run is a long running go routine per worker -func (p *EvaluatePool) run() { - for req := range p.req { - fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID) - p.res <- evaluateResult{req.nodeID, fit, err} +func (p *EvaluatePool) run(stopCh chan struct{}) { + for { + select { + case req := <-p.req: + fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID) + p.res <- evaluateResult{req.nodeID, fit, err} + + case <-stopCh: + return + } } } diff --git a/nomad/plan_apply_pool_test.go b/nomad/plan_apply_pool_test.go index 924d98e31..b592d96c3 100644 --- a/nomad/plan_apply_pool_test.go +++ b/nomad/plan_apply_pool_test.go @@ -38,3 +38,14 @@ func TestEvaluatePool(t *testing.T) { t.Fatalf("bad") } } + +func TestEvaluatePool_Resize(t *testing.T) { + pool := NewEvaluatePool(1, 4) + defer pool.Shutdown() + + // Scale up + pool.SetSize(4) + + // Scale down + pool.SetSize(2) +}