mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
nomad: deduplicate enqueue of evaluations
This commit is contained in:
@@ -27,6 +27,8 @@ type EvalBroker struct {
|
||||
enabled bool
|
||||
stats *BrokerStats
|
||||
|
||||
evals map[string]*structs.Evaluation
|
||||
|
||||
ready map[string]PendingEvaluations
|
||||
unack map[string]*unackEval
|
||||
waiting map[string]chan struct{}
|
||||
@@ -56,6 +58,7 @@ func NewEvalBroker(timeout time.Duration) (*EvalBroker, error) {
|
||||
nackTimeout: timeout,
|
||||
enabled: false,
|
||||
stats: new(BrokerStats),
|
||||
evals: make(map[string]*structs.Evaluation),
|
||||
ready: make(map[string]PendingEvaluations),
|
||||
unack: make(map[string]*unackEval),
|
||||
waiting: make(map[string]chan struct{}),
|
||||
@@ -86,6 +89,14 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
|
||||
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) error {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
|
||||
// Check if already enqueued
|
||||
if _, ok := b.evals[eval.ID]; ok {
|
||||
return nil
|
||||
} else if b.enabled {
|
||||
b.evals[eval.ID] = eval
|
||||
}
|
||||
|
||||
return b.enqueueLocked(eval)
|
||||
}
|
||||
|
||||
@@ -315,6 +326,7 @@ func (b *EvalBroker) Ack(evalID string) error {
|
||||
|
||||
// Cleanup
|
||||
delete(b.unack, evalID)
|
||||
delete(b.evals, evalID)
|
||||
|
||||
// Update the stats
|
||||
b.stats.TotalUnacked -= 1
|
||||
@@ -370,6 +382,7 @@ func (b *EvalBroker) Flush() {
|
||||
b.stats.TotalReady = 0
|
||||
b.stats.TotalUnacked = 0
|
||||
b.stats.ByScheduler = make(map[string]*SchedulerStats)
|
||||
b.evals = make(map[string]*structs.Evaluation)
|
||||
b.ready = make(map[string]PendingEvaluations)
|
||||
b.unack = make(map[string]*unackEval)
|
||||
}
|
||||
|
||||
@@ -52,6 +52,12 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Double enqueue is a no-op
|
||||
err = b.Enqueue(eval)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if !b.Enabled() {
|
||||
t.Fatalf("should be enabled")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user