From 09bfb1760f87d0f1384463fd8a69ca9bb895e801 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 20 Feb 2016 13:07:09 -0800 Subject: [PATCH] nomad: adding a worker pool for plan apply --- nomad/plan_apply_pool.go | 73 +++++++++++++++++++++++++++++++++++ nomad/plan_apply_pool_test.go | 40 +++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 nomad/plan_apply_pool.go create mode 100644 nomad/plan_apply_pool_test.go diff --git a/nomad/plan_apply_pool.go b/nomad/plan_apply_pool.go new file mode 100644 index 000000000..b5c3d6ab1 --- /dev/null +++ b/nomad/plan_apply_pool.go @@ -0,0 +1,73 @@ +package nomad + +import ( + "github.com/hashicorp/nomad/nomad/state" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // workerPoolSize is the size of the worker pool + workerPoolSize = 2 + + // workerPoolBufferSize is the size of the buffers used to push + // request to the workers and to collect the responses. It should + // be large enough just to keep things busy + workerPoolBufferSize = 64 +) + +// EvaluatePool is used to have a pool of workers that are evaluating +// if a plan is valid. It can be used to parallelize the evaluation +// of a plan. +type EvaluatePool struct { + workers int + req chan evaluateRequest + res chan evaluateResult +} + +type evaluateRequest struct { + snap *state.StateSnapshot + plan *structs.Plan + nodeID string +} + +type evaluateResult struct { + nodeID string + fit bool + err error +} + +// NewEvaluatePool returns a pool of the given size. +func NewEvaluatePool(workers, bufSize int) *EvaluatePool { + p := &EvaluatePool{ + workers: workers, + req: make(chan evaluateRequest, bufSize), + res: make(chan evaluateResult, bufSize), + } + for i := 0; i < workers; i++ { + go p.run() + } + return p +} + +// RequestCh is used to push requests +func (p *EvaluatePool) RequestCh() chan<- evaluateRequest { + return p.req +} + +// ResultCh is used to read the results as they are ready +func (p *EvaluatePool) ResultCh() <-chan evaluateResult { + return p.res +} + +// Shutdown is used to shutdown the pool +func (p *EvaluatePool) Shutdown() { + close(p.req) +} + +// run is a long running go routine per worker +func (p *EvaluatePool) run() { + for req := range p.req { + fit, err := evaluateNodePlan(req.snap, req.plan, req.nodeID) + p.res <- evaluateResult{req.nodeID, fit, err} + } +} diff --git a/nomad/plan_apply_pool_test.go b/nomad/plan_apply_pool_test.go new file mode 100644 index 000000000..924d98e31 --- /dev/null +++ b/nomad/plan_apply_pool_test.go @@ -0,0 +1,40 @@ +package nomad + +import ( + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestEvaluatePool(t *testing.T) { + state := testStateStore(t) + node := mock.Node() + state.UpsertNode(1000, node) + snap, _ := state.Snapshot() + + alloc := mock.Alloc() + plan := &structs.Plan{ + NodeAllocation: map[string][]*structs.Allocation{ + node.ID: []*structs.Allocation{alloc}, + }, + } + + pool := NewEvaluatePool(1, 4) + defer pool.Shutdown() + + // Push a request + req := pool.RequestCh() + req <- evaluateRequest{snap, plan, node.ID} + + // Get the response + res := <-pool.ResultCh() + + // Verify response + if res.err != nil { + t.Fatalf("err: %v", res.err) + } + if !res.fit { + t.Fatalf("bad") + } +}