diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 14efe9911..623d81d57 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -5,6 +5,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/nomad/nomad/structs" ) @@ -28,11 +29,11 @@ type BlockedEvals struct { // captured is the set of evaluations that are captured by computed node // classes. - captured map[string]*structs.Evaluation + captured map[string]wrappedEval // escaped is the set of evaluations that have escaped computed node // classes. - escaped map[string]*structs.Evaluation + escaped map[string]wrappedEval // unblockCh is used to buffer unblocking of evaluations. capacityChangeCh chan *capacityUpdate @@ -67,6 +68,12 @@ type capacityUpdate struct { index uint64 } +// wrappedEval captures both the evaluation and the optional token +type wrappedEval struct { + eval *structs.Evaluation + token string +} + // BlockedStats returns all the stats about the blocked eval tracker. type BlockedStats struct { // TotalEscaped is the total number of blocked evaluations that have escaped @@ -82,8 +89,8 @@ type BlockedStats struct { func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals { return &BlockedEvals{ evalBroker: evalBroker, - captured: make(map[string]*structs.Evaluation), - escaped: make(map[string]*structs.Evaluation), + captured: make(map[string]wrappedEval), + escaped: make(map[string]wrappedEval), jobs: make(map[string]struct{}), unblockIndexes: make(map[string]uint64), capacityChangeCh: make(chan *capacityUpdate, unblockBuffer), @@ -100,8 +107,8 @@ func (b *BlockedEvals) Enabled() bool { return b.enabled } -// SetEnabled is used to control if the broker is enabled. The broker -// should only be enabled on the active leader. +// SetEnabled is used to control if the blocked eval tracker is enabled. The +// tracker should only be enabled on the active leader. func (b *BlockedEvals) SetEnabled(enabled bool) { b.l.Lock() if b.enabled == enabled { @@ -170,7 +177,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { // Just re-enqueue the eval immediately. We pass the token so that the // eval_broker can properly handle the case in which the evaluation is // still outstanding. - b.evalBroker.Requeue(eval, token) + b.evalBroker.EnqueueAll(map[*structs.Evaluation]string{eval: token}) return } @@ -178,19 +185,25 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) { b.stats.TotalBlocked++ b.jobs[eval.JobID] = struct{}{} + // Wrap the evaluation, capturing its token. + wrapped := wrappedEval{ + eval: eval, + token: token, + } + // If the eval has escaped, meaning computed node classes could not capture // the constraints of the job, we store the eval separately as we have to // unblock it whenever node capacity changes. This is because we don't know // what node class is feasible for the jobs constraints. if eval.EscapedComputedClass { - b.escaped[eval.ID] = eval + b.escaped[eval.ID] = wrapped b.stats.TotalEscaped++ return } // Add the eval to the set of blocked evals whose jobs constraints are // captured by computed node class. - b.captured[eval.ID] = eval + b.captured[eval.ID] = wrapped } // missedUnblock returns whether an evaluation missed an unblock while it was in @@ -281,13 +294,13 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // Every eval that has escaped computed node class has to be unblocked // because any node could potentially be feasible. - var unblocked []*structs.Evaluation - if l := len(b.escaped); l != 0 { - unblocked = make([]*structs.Evaluation, 0, l) - for id, eval := range b.escaped { - unblocked = append(unblocked, eval) + numEscaped := len(b.escaped) + unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4)) + if numEscaped != 0 { + for id, wrapped := range b.escaped { + unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) } } @@ -296,8 +309,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // when the evaluation was originally run through the scheduler, that it // never saw a node with the given computed class and thus needs to be // unblocked for correctness. - for id, eval := range b.captured { - if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig { + for id, wrapped := range b.captured { + if elig, ok := wrapped.eval.ClassEligibility[computedClass]; ok && !elig { // Can skip because the eval has explicitly marked the node class // as ineligible. continue @@ -305,8 +318,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) { // The computed node class has never been seen by the eval so we unblock // it. - unblocked = append(unblocked, eval) - delete(b.jobs, eval.JobID) + unblocked[wrapped.eval] = wrapped.token + delete(b.jobs, wrapped.eval.JobID) delete(b.captured, id) } @@ -331,27 +344,27 @@ func (b *BlockedEvals) UnblockFailed() { return } - var unblock []*structs.Evaluation - for id, eval := range b.captured { - if eval.TriggeredBy == structs.EvalTriggerMaxPlans { - unblock = append(unblock, eval) + unblocked := make(map[*structs.Evaluation]string, 4) + for id, wrapped := range b.captured { + if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblocked[wrapped.eval] = wrapped.token delete(b.captured, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) } } - for id, eval := range b.escaped { - if eval.TriggeredBy == structs.EvalTriggerMaxPlans { - unblock = append(unblock, eval) + for id, wrapped := range b.escaped { + if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans { + unblocked[wrapped.eval] = wrapped.token delete(b.escaped, id) - delete(b.jobs, eval.JobID) + delete(b.jobs, wrapped.eval.JobID) b.stats.TotalEscaped -= 1 } } - if l := len(unblock); l > 0 { + if l := len(unblocked); l > 0 { b.stats.TotalBlocked -= l - b.evalBroker.EnqueueAll(unblock) + b.evalBroker.EnqueueAll(unblocked) } } @@ -395,8 +408,8 @@ func (b *BlockedEvals) Flush() { // Reset the blocked eval tracker. b.stats.TotalEscaped = 0 b.stats.TotalBlocked = 0 - b.captured = make(map[string]*structs.Evaluation) - b.escaped = make(map[string]*structs.Evaluation) + b.captured = make(map[string]wrappedEval) + b.escaped = make(map[string]wrappedEval) b.jobs = make(map[string]struct{}) b.duplicates = nil b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer) diff --git a/nomad/blocked_evals_test.go b/nomad/blocked_evals_test.go index 1229dec08..646b24d80 100644 --- a/nomad/blocked_evals_test.go +++ b/nomad/blocked_evals_test.go @@ -255,6 +255,62 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) { }) } +func TestBlockedEvals_Reblock(t *testing.T) { + blocked, broker := testBlockedEvals(t) + + // Create an evaluation, Enqueue/Dequeue it to get a token + e := mock.Eval() + e.SnapshotIndex = 500 + e.Status = structs.EvalStatusBlocked + e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false} + broker.Enqueue(e) + + _, token, err := broker.Dequeue([]string{e.Type}, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Reblock the evaluation + blocked.Reblock(e, token) + + // Verify block caused the eval to be tracked + blockedStats := blocked.Stats() + if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 { + t.Fatalf("bad: %#v", blockedStats) + } + + // Should unblock because the eval + blocked.Unblock("v1:123", 1000) + + brokerStats := broker.Stats() + if brokerStats.TotalReady != 0 && brokerStats.TotalUnacked != 1 { + t.Fatalf("bad: %#v", brokerStats) + } + + // Ack the evaluation which should cause the reblocked eval to transistion + // to ready + if err := broker.Ack(e.ID, token); err != nil { + t.Fatalf("err: %v", err) + } + + testutil.WaitForResult(func() (bool, error) { + // Verify Unblock causes an enqueue + brokerStats := broker.Stats() + if brokerStats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", brokerStats) + } + + // Verify Unblock updates the stats + bStats := blocked.Stats() + if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 { + return false, fmt.Errorf("bad: %#v", bStats) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %s", err) + }) +} + // Test the block case in which the eval should be immediately unblocked since // it is escaped and old func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) { diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index d91ee8244..783ad84f0 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -135,15 +135,31 @@ func (b *EvalBroker) SetEnabled(enabled bool) { } } -// EnqueueAll is used to enqueue many evaluations. -func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) { +// Enqueue is used to enqueue a new evaluation +func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { + b.l.Lock() + defer b.l.Unlock() + b.processEnqueue(eval, "") +} + +// EnqueueAll is used to enqueue many evaluations. The map allows evaluations +// that are being re-enqueued to include their token. +// +// When requeueing an evaluation that potentially may be already +// enqueued. The evaluation is handled in one of the following ways: +// * Evaluation not outstanding: Process as a normal Enqueue +// * Evaluation outstanding: Do not allow the evaluation to be dequeued til: +// * Ack received: Unblock the evaluation allowing it to be dequeued +// * Nack received: Drop the evaluation as it was created as a result of a +// scheduler run that was Nack'd +func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) { // The lock needs to be held until all evaluations are enqueued. This is so // that when Dequeue operations are unblocked they will pick the highest // priority evaluations. b.l.Lock() defer b.l.Unlock() - for _, eval := range evals { - b.processEnqueue(eval, "") + for eval, token := range evals { + b.processEnqueue(eval, token) } } @@ -182,26 +198,6 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { b.enqueueLocked(eval, eval.Type) } -// Enqueue is used to enqueue an evaluation -func (b *EvalBroker) Enqueue(eval *structs.Evaluation) { - b.l.Lock() - defer b.l.Unlock() - b.processEnqueue(eval, "") -} - -// Requeue is used to requeue an evaluation that potentially may be already -// enqueued. The evaluation is handled in one of the following ways: -// * Evaluation not outstanding: Process as a normal Enqueue -// * Evaluation outstanding: Do not allow the evaluation to be dequeued til: -// * Ack received: Unblock the evaluation allowing it to be dequeued -// * Nack received: Drop the evaluation as it was created as a result of a -// scheduler run that was Nack'd -func (b *EvalBroker) Requeue(eval *structs.Evaluation, token string) { - b.l.Lock() - defer b.l.Unlock() - b.processEnqueue(eval, token) -} - // enqueueWaiting is used to enqueue a waiting evaluation func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index ca00f7ea9..ab6ecedf0 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -865,12 +865,12 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) { time.Sleep(5 * time.Millisecond) // Enqueue - evals := make([]*structs.Evaluation, 0, 8) + evals := make(map[*structs.Evaluation]string, 8) expectedPriority := 90 for i := 10; i <= expectedPriority; i += 10 { eval := mock.Eval() eval.Priority = i - evals = append(evals, eval) + evals[eval] = "" } b.EnqueueAll(evals) @@ -886,7 +886,7 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) { } } -func TestEvalBroker_Reenqueue_Ack(t *testing.T) { +func TestEvalBroker_EnqueueAll_Requeue_Ack(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) @@ -903,7 +903,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) { } // Requeue the same evaluation. - b.Requeue(eval, token) + b.EnqueueAll(map[*structs.Evaluation]string{eval: token}) // The stats should show one unacked stats := b.Stats() @@ -942,7 +942,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) { } } -func TestEvalBroker_Reenqueue_Nack(t *testing.T) { +func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) @@ -959,7 +959,7 @@ func TestEvalBroker_Reenqueue_Nack(t *testing.T) { } // Requeue the same evaluation. - b.Requeue(eval, token) + b.EnqueueAll(map[*structs.Evaluation]string{eval: token}) // The stats should show one unacked stats := b.Stats()