diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 14efe9911..4a05a6bbb 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/nomad/nomad/structs" ) @@ -28,11 +29,11 @@ type BlockedEvals struct { // captured is the set of evaluations that are captured by computed node // classes. - captured map[string]*structs.Evaluation + captured map[string]wrappedEval // escaped is the set of evaluations that have escaped computed node // classes. - escaped map[string]*structs.Evaluation + escaped map[string]wrappedEval // unblockCh is used to buffer unblocking of evaluations. capacityChangeCh chan *capacityUpdate @@ -67,6 +68,12 @@ type capacityUpdate struct { index uint64 } +// wrappedEval captures both the evaluation and the optional token +type wrappedEval struct { + eval *structs.Evaluation + token string +} + // BlockedStats returns all the stats about the blocked eval tracker. type BlockedStats struct { // TotalEscaped is the total number of blocked evaluations that have escaped @@ -82,8 +89,8 @@ type BlockedStats struct { func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { return &BlockedEvals{ evalBroker: evalBroker, - captured: make(map[string]*structs.Evaluation), - escaped: make(map[string]*structs.Evaluation), + captured: make(map[string]wrappedEval), + escaped: make(map[string]wrappedEval), jobs: make(map[string]struct{}), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), @@ -178,19 +185,24 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { b.stats.TotalBlocked++ b.jobs[eval.JobID] = struct{}{} + wrapped := wrappedEval{ + eval: eval, + token: token, + } + // If the eval has escaped, meaning computed node classes could not capture // the constraints of the job, we store the eval separately as we have to // unblock it whenever node capacity changes. This is because we don't know // what node class is feasible for the jobs constraints. if eval.EscapedComputedClass { - b.escaped[eval.ID] = eval + b.escaped[eval.ID] = wrapped b.stats.TotalEscaped++ return } // Add the eval to the set of blocked evals whose jobs constraints are // captured by computed node class. - b.captured[eval.ID] = eval + b.captured[eval.ID] = wrapped } // missedUnblock returns whether an evaluation missed an unblock while it was in @@ -281,13 +293,13 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // Every eval that has escaped computed node class has to be unblocked // because any node could potentially be feasible. - var unblocked []*structs.Evaluation - if l := len(b.escaped); l != 0 { - unblocked = make([]*structs.Evaluation, 0, l) - for id, eval := range b.escaped { - unblocked = append(unblocked, eval) + numEscaped := len(b.escaped) + unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4)) + if numEscaped != 0 { + for id, wrapped := range b.escaped { + unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) } } @@ -296,8 +308,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // when the evaluation was originally run through the scheduler, that it // never saw a node with the given computed class and thus needs to be // unblocked for correctness. - for id, eval := range b.captured { - if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig { + for id, wrapped := range b.captured { + if elig, ok := wrapped.eval.ClassEligibility[computedClass]; ok && !elig { // Can skip because the eval has explicitly marked the node class // as ineligible. continue @@ -305,8 +317,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // The computed node class has never been seen by the eval so we unblock // it. - unblocked = append(unblocked, eval) - delete(b.jobs, eval.JobID) + unblocked[wrapped.eval] = wrapped.token + delete(b.jobs, wrapped.eval.JobID) delete(b.captured, id) } @@ -331,27 +343,27 @@ func (b *BlockedEvals) UnblockFailed() { return } - var unblock []*structs.Evaluation - for id, eval := range b.captured { - if eval.TriggeredBy == structs.EvalTriggerMaxPlans { - unblock = append(unblock, eval) + unblocked := make(map[*structs.Evaluation]string, 4) + for id, wrapped := range b.captured { + if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblocked[wrapped.eval] = wrapped.token delete(b.captured, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) } } - for id, eval := range b.escaped { - if eval.TriggeredBy == structs.EvalTriggerMaxPlans { - unblock = append(unblock, eval) + for id, wrapped := range b.escaped { + if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) b.stats.TotalEscaped -= 1 } } - if l := len(unblock); l > 0 { + if l := len(unblocked); l > 0 { b.stats.TotalBlocked -= l - b.evalBroker.EnqueueAll(unblock) + b.evalBroker.EnqueueAll(unblocked) } } @@ -395,8 +407,8 @@ func (b *BlockedEvals) Flush() { // Reset the blocked eval tracker. b.stats.TotalEscaped = 0 b.stats.TotalBlocked = 0 - b.captured = make(map[string]*structs.Evaluation) - b.escaped = make(map[string]*structs.Evaluation) + b.captured = make(map[string]wrappedEval) + b.escaped = make(map[string]wrappedEval) b.jobs = make(map[string]struct{}) b.duplicates = nil b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index d91ee8244..190ead092 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -135,15 +135,16 @@ func (b *EvalBroker) SetEnabled(enabled bool) { } } -// EnqueueAll is used to enqueue many evaluations. -func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { +// EnqueueAll is used to enqueue many evaluations. The map allows evaluations +// that are being re-enqueued to include their token. +func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) { // The lock needs to be held until all evaluations are enqueued. This is so // that when Dequeue operations are unblocked they will pick the highest // priority evaluations. b.l.Lock() defer b.l.Unlock() - for _, eval := range evals { - b.processEnqueue(eval, "") + for eval, token := range evals { + b.processEnqueue(eval, token) } }