mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
nomad: Allow resize of EvaluatePool
This commit is contained in:
@@ -19,9 +19,10 @@ const (
|
||||
// if a plan is valid. It can be used to parallelize the evaluation
|
||||
// of a plan.
|
||||
type EvaluatePool struct {
|
||||
workers int
|
||||
req chan evaluateRequest
|
||||
res chan evaluateResult
|
||||
workers int
|
||||
workerStop []chan struct{}
|
||||
req chan evaluateRequest
|
||||
res chan evaluateResult
|
||||
}
|
||||
|
||||
type evaluateRequest struct {
|
||||
@@ -39,16 +40,41 @@ type evaluateResult struct {
|
||||
// NewEvaluatePool returns a pool of the given size.
|
||||
func NewEvaluatePool(workers, bufSize int) *EvaluatePool {
|
||||
p := &EvaluatePool{
|
||||
workers: workers,
|
||||
req: make(chan evaluateRequest, bufSize),
|
||||
res: make(chan evaluateResult, bufSize),
|
||||
workers: workers,
|
||||
workerStop: make([]chan struct{}, workers),
|
||||
req: make(chan evaluateRequest, bufSize),
|
||||
res: make(chan evaluateResult, bufSize),
|
||||
}
|
||||
for i := 0; i < workers; i++ {
|
||||
go p.run()
|
||||
stopCh := make(chan struct{})
|
||||
p.workerStop[i] = stopCh
|
||||
go p.run(stopCh)
|
||||
}
|
||||
return p
|
||||
}
|
||||
|
||||
// SetSize is used to resize the worker pool
|
||||
func (p *EvaluatePool) SetSize(size int) {
|
||||
// Handle an upwards resize
|
||||
if size >= p.workers {
|
||||
for i := p.workers; i < size; i++ {
|
||||
stopCh := make(chan struct{})
|
||||
p.workerStop = append(p.workerStop, stopCh)
|
||||
go p.run(stopCh)
|
||||
}
|
||||
p.workers = size
|
||||
return
|
||||
}
|
||||
|
||||
// Handle a downwards resize
|
||||
for i := p.workers; i > size; i-- {
|
||||
close(p.workerStop[i-1])
|
||||
p.workerStop[i-1] = nil
|
||||
}
|
||||
p.workerStop = p.workerStop[:size]
|
||||
p.workers = size
|
||||
}
|
||||
|
||||
// RequestCh is used to push requests
|
||||
func (p *EvaluatePool) RequestCh() chan<- evaluateRequest {
|
||||
return p.req
|
||||
@@ -61,13 +87,19 @@ func (p *EvaluatePool) ResultCh() <-chan evaluateResult {
|
||||
|
||||
// Shutdown is used to shutdown the pool
|
||||
func (p *EvaluatePool) Shutdown() {
|
||||
close(p.req)
|
||||
p.SetSize(0)
|
||||
}
|
||||
|
||||
// run is a long running go routine per worker
|
||||
func (p *EvaluatePool) run() {
|
||||
for req := range p.req {
|
||||
fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID)
|
||||
p.res <- evaluateResult{req.nodeID, fit, err}
|
||||
func (p *EvaluatePool) run(stopCh chan struct{}) {
|
||||
for {
|
||||
select {
|
||||
case req := <-p.req:
|
||||
fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID)
|
||||
p.res <- evaluateResult{req.nodeID, fit, err}
|
||||
|
||||
case <-stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -38,3 +38,14 @@ func TestEvaluatePool(t *testing.T) {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvaluatePool_Resize(t *testing.T) {
|
||||
pool := NewEvaluatePool(1, 4)
|
||||
defer pool.Shutdown()
|
||||
|
||||
// Scale up
|
||||
pool.SetSize(4)
|
||||
|
||||
// Scale down
|
||||
pool.SetSize(2)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user