From 43392d0c39888960616f17a715eb2e8ca59b579b Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 5 Aug 2015 17:06:02 -0700 Subject: [PATCH] nomad: deduplicate enqueue of evaluations --- nomad/eval_broker.go | 13 +++++++++++++ nomad/eval_broker_test.go | 6 ++++++ 2 files changed, 19 insertions(+) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 5fdd79e5a..14faef9bb 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -27,6 +27,8 @@ type EvalBroker struct { enabled bool stats *BrokerStats + evals map[string]*structs.Evaluation + ready map[string]PendingEvaluations unack map[string]*unackEval waiting map[string]chan struct{} @@ -56,6 +58,7 @@ func NewEvalBroker(timeout time.Duration) (*EvalBroker, error) { nackTimeout: timeout, enabled: false, stats: new(BrokerStats), + evals: make(map[string]*structs.Evaluation), ready: make(map[string]PendingEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), @@ -86,6 +89,14 @@ func (b *EvalBroker) SetEnabled(enabled bool) { func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error { b.l.Lock() defer b.l.Unlock() + + // Check if already enqueued + if _, ok := b.evals[eval.ID]; ok { + return nil + } else if b.enabled { + b.evals[eval.ID] = eval + } + return b.enqueueLocked(eval) } @@ -315,6 +326,7 @@ func (b *EvalBroker) Ack(evalID string) error { // Cleanup delete(b.unack, evalID) + delete(b.evals, evalID) // Update the stats b.stats.TotalUnacked -= 1 @@ -370,6 +382,7 @@ func (b *EvalBroker) Flush() { b.stats.TotalReady = 0 b.stats.TotalUnacked = 0 b.stats.ByScheduler = make(map[string]*SchedulerStats) + b.evals = make(map[string]*structs.Evaluation) b.ready = make(map[string]PendingEvaluations) b.unack = make(map[string]*unackEval) } diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 22b2147f5..b772a3f0c 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -52,6 +52,12 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("err: %v", err) } + // Double enqueue is a no-op + err = b.Enqueue(eval) + if err != nil { + t.Fatalf("err: %v", err) + } + if !b.Enabled() { t.Fatalf("should be enabled") }