From c42c4509965ffed0b5bd0d331abfa93903a71ac6 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Mon, 27 Jul 2015 14:59:16 -0700 Subject: [PATCH] nomad: adding plan queue --- nomad/plan_queue.go | 244 +++++++++++++++++++++++++++++++++++++++ nomad/plan_queue_test.go | 181 +++++++++++++++++++++++++++++ nomad/structs/structs.go | 15 +++ 3 files changed, 440 insertions(+) create mode 100644 nomad/plan_queue.go create mode 100644 nomad/plan_queue_test.go diff --git a/nomad/plan_queue.go b/nomad/plan_queue.go new file mode 100644 index 000000000..601bb1574 --- /dev/null +++ b/nomad/plan_queue.go @@ -0,0 +1,244 @@ +package nomad + +import ( + "container/heap" + "fmt" + "sync" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + // planQueueFlushed is the error used for all pending plans + // when the queue is flushed or disabled + planQueueFlushed = fmt.Errorf("plan queue flushed") +) + +// PlanFuture is used to return a future for an enqueue +type PlanFuture interface { + Wait() (*structs.PlanResult, error) +} + +// PlanQueue is used to submit commit plans for task allocations +// to the current leader. The leader verifies that resources are not +// over-committed and commits to Raft. This allows sub-schedulers to +// be optimistically concurrent. In the case of an overcommit, the plan +// may be partially applied if allowed, or completely rejected (gang commit). +type PlanQueue struct { + enabled bool + stats *QueueStats + + ready PendingPlans + waitCh chan struct{} + + l sync.RWMutex +} + +// NewPlanQueue is used to construct and return a new plan queue +func NewPlanQueue() (*PlanQueue, error) { + q := &PlanQueue{ + enabled: false, + stats: new(QueueStats), + ready: make([]*pendingPlan, 0, 16), + waitCh: make(chan struct{}, 1), + } + return q, nil +} + +// pendingPlan is used to wrap a plan that is enqueued +// so that we can re-use it as a future. +type pendingPlan struct { + plan *structs.Plan + result *structs.PlanResult + errCh chan error +} + +// Wait is used to block for the plan result or potential error +func (p *pendingPlan) Wait() (*structs.PlanResult, error) { + err := <-p.errCh + return p.result, err +} + +// respond is used to set the response and error for the future +func (p *pendingPlan) respond(result *structs.PlanResult, err error) { + p.result = result + p.errCh <- err +} + +// PendingPlans is a list of waiting plans. +// We implement the container/heap interface so that this is a +// priority queue +type PendingPlans []*pendingPlan + +// SetEnabled is used to control if the queue is enabled. The queue +// should only be enabled on the active leader. +func (q *PlanQueue) SetEnabled(enabled bool) { + q.l.Lock() + q.enabled = enabled + q.l.Unlock() + if !enabled { + q.Flush() + } +} + +// Enqueue is used to enqueue a plan +func (q *PlanQueue) Enqueue(plan *structs.Plan) (PlanFuture, error) { + q.l.Lock() + defer q.l.Unlock() + + // Do nothing if not enabled + if !q.enabled { + return nil, fmt.Errorf("plan queue is disabled") + } + + // Wrap the pending plan + pending := &pendingPlan{ + plan: plan, + errCh: make(chan error, 1), + } + + // Push onto the heap + heap.Push(&q.ready, pending) + + // Update the stats + q.stats.Depth += 1 + + // Unblock any blocked reader + select { + case q.waitCh <- struct{}{}: + default: + } + return pending, nil +} + +// Dequeue is used to perform a blocking dequeue +func (q *PlanQueue) Dequeue(timeout time.Duration) (*pendingPlan, error) { +SCAN: + q.l.Lock() + + // Do nothing if not enabled + if !q.enabled { + q.l.Unlock() + return nil, fmt.Errorf("plan queue is disabled") + } + + // Look for available work + if len(q.ready) > 0 { + raw := heap.Pop(&q.ready) + pending := raw.(*pendingPlan) + q.stats.Depth -= 1 + q.l.Unlock() + return pending, nil + } + q.l.Unlock() + + // Setup the timeout timer + var timer *time.Timer + if timer == nil && timeout > 0 { + timer = time.NewTimer(timeout) + defer timer.Stop() + } + + // Wait for timeout or new work + select { + case <-q.waitCh: + goto SCAN + case <-timer.C: + return nil, nil + } +} + +// Flush is used to reset the state of the plan queue +func (q *PlanQueue) Flush() { + q.l.Lock() + defer q.l.Unlock() + + // Error out all the futures + for _, pending := range q.ready { + pending.respond(nil, planQueueFlushed) + } + + // Reset the broker + q.stats.Depth = 0 + q.ready = make([]*pendingPlan, 0, 16) + + // Unblock any waiters + select { + case q.waitCh <- struct{}{}: + default: + } +} + +// Stats is used to query the state of the queue +func (q *PlanQueue) Stats() *QueueStats { + // Allocate a new stats struct + stats := new(QueueStats) + + q.l.RLock() + defer q.l.RUnlock() + + // Copy all the stats + *stats = *q.stats + return stats +} + +// EmitStats is used to export metrics about the broker while enabled +func (q *PlanQueue) EmitStats(period time.Duration) { + for { + <-time.After(period) + + stats := q.Stats() + metrics.SetGauge([]string{"nomad", "plan", "queue_depth"}, float32(stats.Depth)) + } +} + +// QueueStats returns all the stats about the plan queue +type QueueStats struct { + Depth int +} + +// Len is for the sorting interface +func (p PendingPlans) Len() int { + return len(p) +} + +// Less is for the sorting interface. We flip the check +// so that the "min" in the min-heap is the element with the +// highest priority. For the same priority, we use the create +// index of the evaluation to give a FIFO ordering. +func (p PendingPlans) Less(i, j int) bool { + if p[i].plan.Priority != p[j].plan.Priority { + return !(p[i].plan.Priority < p[j].plan.Priority) + } + return p[i].plan.EvalCreateIndex < p[j].plan.EvalCreateIndex +} + +// Swap is for the sorting interface +func (p PendingPlans) Swap(i, j int) { + p[i], p[j] = p[j], p[i] +} + +// Push is used to add a new evalution to the slice +func (p *PendingPlans) Push(e interface{}) { + *p = append(*p, e.(*pendingPlan)) +} + +// Pop is used to remove an evaluation from the slice +func (p *PendingPlans) Pop() interface{} { + n := len(*p) + e := (*p)[n-1] + (*p)[n-1] = nil + *p = (*p)[:n-1] + return e +} + +// Peek is used to peek at the next element that would be popped +func (p PendingPlans) Peek() *pendingPlan { + n := len(p) + if n == 0 { + return nil + } + return p[n-1] +} diff --git a/nomad/plan_queue_test.go b/nomad/plan_queue_test.go new file mode 100644 index 000000000..e684ccbd7 --- /dev/null +++ b/nomad/plan_queue_test.go @@ -0,0 +1,181 @@ +package nomad + +import ( + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +func testPlanQueue(t *testing.T) *PlanQueue { + pq, err := NewPlanQueue() + if err != nil { + t.Fatalf("err: %v", err) + } + return pq +} + +func mockPlan() *structs.Plan { + return &structs.Plan{ + Priority: 50, + EvalCreateIndex: 1000, + } +} + +func mockPlanResult() *structs.PlanResult { + return &structs.PlanResult{} +} + +func TestPlanQueue_Enqueue_Dequeue(t *testing.T) { + pq := testPlanQueue(t) + pq.SetEnabled(true) + + plan := mockPlan() + future, err := pq.Enqueue(plan) + if err != nil { + t.Fatalf("err: %v", err) + } + + stats := pq.Stats() + if stats.Depth != 1 { + t.Fatalf("bad: %#v", stats) + } + + resCh := make(chan *structs.PlanResult, 1) + go func() { + res, err := future.Wait() + if err != nil { + t.Fatalf("err: %v", err) + } + resCh <- res + }() + + pending, err := pq.Dequeue(time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + + stats = pq.Stats() + if stats.Depth != 0 { + t.Fatalf("bad: %#v", stats) + } + + if pending == nil || pending.plan != plan { + t.Fatalf("bad: %#v", pending) + } + + result := mockPlanResult() + pending.respond(result, nil) + + select { + case r := <-resCh: + if r != result { + t.Fatalf("Bad: %#v", r) + } + case <-time.After(time.Second): + t.Fatalf("timeout") + } +} + +func TestPlanQueue_Enqueue_Disable(t *testing.T) { + pq := testPlanQueue(t) + + // Enqueue + plan := mockPlan() + pq.SetEnabled(true) + future, err := pq.Enqueue(plan) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Flush via SetEnabled + pq.SetEnabled(false) + + // Check the stats + stats := pq.Stats() + if stats.Depth != 0 { + t.Fatalf("bad: %#v", stats) + } + + // Future should be canceled + res, err := future.Wait() + if err != planQueueFlushed { + t.Fatalf("err: %v", err) + } + if res != nil { + t.Fatalf("bad: %#v", res) + } +} + +func TestPlanQueue_Dequeue_Timeout(t *testing.T) { + pq := testPlanQueue(t) + pq.SetEnabled(true) + + start := time.Now() + out, err := pq.Dequeue(5 * time.Millisecond) + end := time.Now() + + if err != nil { + t.Fatalf("err: %v", err) + } + if out != nil { + t.Fatalf("unexpected: %#v", out) + } + + if diff := end.Sub(start); diff < 5*time.Millisecond { + t.Fatalf("bad: %#v", diff) + } +} + +// Ensure higher priority dequeued first +func TestPlanQueue_Dequeue_Priority(t *testing.T) { + pq := testPlanQueue(t) + pq.SetEnabled(true) + + plan1 := mockPlan() + plan1.Priority = 10 + pq.Enqueue(plan1) + + plan2 := mockPlan() + plan2.Priority = 30 + pq.Enqueue(plan2) + + plan3 := mockPlan() + plan3.Priority = 20 + pq.Enqueue(plan3) + + out1, _ := pq.Dequeue(time.Second) + if out1.plan != plan2 { + t.Fatalf("bad: %#v", out1) + } + + out2, _ := pq.Dequeue(time.Second) + if out2.plan != plan3 { + t.Fatalf("bad: %#v", out2) + } + + out3, _ := pq.Dequeue(time.Second) + if out3.plan != plan1 { + t.Fatalf("bad: %#v", out3) + } +} + +// Ensure FIFO at fixed priority +func TestPlanQueue_Dequeue_FIFO(t *testing.T) { + pq := testPlanQueue(t) + pq.SetEnabled(true) + NUM := 100 + + for i := 0; i < NUM; i++ { + plan := mockPlan() + plan.EvalCreateIndex = uint64(i) + pq.Enqueue(plan) + } + + for i := 0; i < NUM; i++ { + out1, _ := pq.Dequeue(time.Second) + if out1.plan.EvalCreateIndex != uint64(i) { + t.Fatalf("bad: %d %#v", i, out1) + } + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7c43df7f4..ed97cf3aa 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -526,6 +526,21 @@ type Evaluation struct { ModifyIndex uint64 } +// Plan is used to submit a commit plan for task allocations. These +// are submitted to the leader which verifies that resources have +// not been overcommitted before admiting the plan. +type Plan struct { + // Priority is the priority of the upstream job + Priority int + + // EvalCreateIndex is the create index of the evaluation. + // This is used to provide FIFO ordering + EvalCreateIndex uint64 +} + +type PlanResult struct { +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{}