When enqueuing into eval broker always pass blocked eval's token

This commit is contained in:
Alex Dadgar
2016-06-23 22:40:22 -07:00
parent 016917da9b
commit 5666d64ab6
2 changed files with 46 additions and 33 deletions

View File

@@ -5,6 +5,7 @@ import (
"time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -28,11 +29,11 @@ type BlockedEvals struct {
// captured is the set of evaluations that are captured by computed node
// classes.
captured map[string]*structs.Evaluation
captured map[string]wrappedEval
// escaped is the set of evaluations that have escaped computed node
// classes.
escaped map[string]*structs.Evaluation
escaped map[string]wrappedEval
// unblockCh is used to buffer unblocking of evaluations.
capacityChangeCh chan *capacityUpdate
@@ -67,6 +68,12 @@ type capacityUpdate struct {
index uint64
}
// wrappedEval captures both the evaluation and the optional token
type wrappedEval struct {
eval *structs.Evaluation
token string
}
// BlockedStats returns all the stats about the blocked eval tracker.
type BlockedStats struct {
// TotalEscaped is the total number of blocked evaluations that have escaped
@@ -82,8 +89,8 @@ type BlockedStats struct {
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
return &BlockedEvals{
evalBroker: evalBroker,
captured: make(map[string]*structs.Evaluation),
escaped: make(map[string]*structs.Evaluation),
captured: make(map[string]wrappedEval),
escaped: make(map[string]wrappedEval),
jobs: make(map[string]struct{}),
unblockIndexes: make(map[string]uint64),
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
@@ -178,19 +185,24 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
b.stats.TotalBlocked++
b.jobs[eval.JobID] = struct{}{}
wrapped := wrappedEval{
eval: eval,
token: token,
}
// If the eval has escaped, meaning computed node classes could not capture
// the constraints of the job, we store the eval separately as we have to
// unblock it whenever node capacity changes. This is because we don't know
// what node class is feasible for the jobs constraints.
if eval.EscapedComputedClass {
b.escaped[eval.ID] = eval
b.escaped[eval.ID] = wrapped
b.stats.TotalEscaped++
return
}
// Add the eval to the set of blocked evals whose jobs constraints are
// captured by computed node class.
b.captured[eval.ID] = eval
b.captured[eval.ID] = wrapped
}
// missedUnblock returns whether an evaluation missed an unblock while it was in
@@ -281,13 +293,13 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
// Every eval that has escaped computed node class has to be unblocked
// because any node could potentially be feasible.
var unblocked []*structs.Evaluation
if l := len(b.escaped); l != 0 {
unblocked = make([]*structs.Evaluation, 0, l)
for id, eval := range b.escaped {
unblocked = append(unblocked, eval)
numEscaped := len(b.escaped)
unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4))
if numEscaped != 0 {
for id, wrapped := range b.escaped {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, eval.JobID)
delete(b.jobs, wrapped.eval.JobID)
}
}
@@ -296,8 +308,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
// when the evaluation was originally run through the scheduler, that it
// never saw a node with the given computed class and thus needs to be
// unblocked for correctness.
for id, eval := range b.captured {
if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig {
for id, wrapped := range b.captured {
if elig, ok := wrapped.eval.ClassEligibility[computedClass]; ok && !elig {
// Can skip because the eval has explicitly marked the node class
// as ineligible.
continue
@@ -305,8 +317,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
// The computed node class has never been seen by the eval so we unblock
// it.
unblocked = append(unblocked, eval)
delete(b.jobs, eval.JobID)
unblocked[wrapped.eval] = wrapped.token
delete(b.jobs, wrapped.eval.JobID)
delete(b.captured, id)
}
@@ -331,27 +343,27 @@ func (b *BlockedEvals) UnblockFailed() {
return
}
var unblock []*structs.Evaluation
for id, eval := range b.captured {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
unblocked := make(map[*structs.Evaluation]string, 4)
for id, wrapped := range b.captured {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.captured, id)
delete(b.jobs, eval.JobID)
delete(b.jobs, wrapped.eval.JobID)
}
}
for id, eval := range b.escaped {
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblock = append(unblock, eval)
for id, wrapped := range b.escaped {
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
unblocked[wrapped.eval] = wrapped.token
delete(b.escaped, id)
delete(b.jobs, eval.JobID)
delete(b.jobs, wrapped.eval.JobID)
b.stats.TotalEscaped -= 1
}
}
if l := len(unblock); l > 0 {
if l := len(unblocked); l > 0 {
b.stats.TotalBlocked -= l
b.evalBroker.EnqueueAll(unblock)
b.evalBroker.EnqueueAll(unblocked)
}
}
@@ -395,8 +407,8 @@ func (b *BlockedEvals) Flush() {
// Reset the blocked eval tracker.
b.stats.TotalEscaped = 0
b.stats.TotalBlocked = 0
b.captured = make(map[string]*structs.Evaluation)
b.escaped = make(map[string]*structs.Evaluation)
b.captured = make(map[string]wrappedEval)
b.escaped = make(map[string]wrappedEval)
b.jobs = make(map[string]struct{})
b.duplicates = nil
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)

View File

@@ -135,15 +135,16 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
}
}
// EnqueueAll is used to enqueue many evaluations.
func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) {
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
// that are being re-enqueued to include their token.
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
// The lock needs to be held until all evaluations are enqueued. This is so
// that when Dequeue operations are unblocked they will pick the highest
// priority evaluations.
b.l.Lock()
defer b.l.Unlock()
for _, eval := range evals {
b.processEnqueue(eval, "")
for eval, token := range evals {
b.processEnqueue(eval, token)
}
}