mirror of
https://github.com/kemko/nomad.git
synced 2026-01-11 04:45:42 +03:00
Merge pull request #1353 from hashicorp/b-requeue
Properly handle reblocking evaluations by storing the eval token
This commit is contained in:
@@ -5,6 +5,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/armon/go-metrics"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -28,11 +29,11 @@ type BlockedEvals struct {
|
||||
|
||||
// captured is the set of evaluations that are captured by computed node
|
||||
// classes.
|
||||
captured map[string]*structs.Evaluation
|
||||
captured map[string]wrappedEval
|
||||
|
||||
// escaped is the set of evaluations that have escaped computed node
|
||||
// classes.
|
||||
escaped map[string]*structs.Evaluation
|
||||
escaped map[string]wrappedEval
|
||||
|
||||
// unblockCh is used to buffer unblocking of evaluations.
|
||||
capacityChangeCh chan *capacityUpdate
|
||||
@@ -67,6 +68,12 @@ type capacityUpdate struct {
|
||||
index uint64
|
||||
}
|
||||
|
||||
// wrappedEval captures both the evaluation and the optional token
|
||||
type wrappedEval struct {
|
||||
eval *structs.Evaluation
|
||||
token string
|
||||
}
|
||||
|
||||
// BlockedStats returns all the stats about the blocked eval tracker.
|
||||
type BlockedStats struct {
|
||||
// TotalEscaped is the total number of blocked evaluations that have escaped
|
||||
@@ -82,8 +89,8 @@ type BlockedStats struct {
|
||||
func NewBlockedEvals(evalBroker *EvalBroker) *BlockedEvals {
|
||||
return &BlockedEvals{
|
||||
evalBroker: evalBroker,
|
||||
captured: make(map[string]*structs.Evaluation),
|
||||
escaped: make(map[string]*structs.Evaluation),
|
||||
captured: make(map[string]wrappedEval),
|
||||
escaped: make(map[string]wrappedEval),
|
||||
jobs: make(map[string]struct{}),
|
||||
unblockIndexes: make(map[string]uint64),
|
||||
capacityChangeCh: make(chan *capacityUpdate, unblockBuffer),
|
||||
@@ -100,8 +107,8 @@ func (b *BlockedEvals) Enabled() bool {
|
||||
return b.enabled
|
||||
}
|
||||
|
||||
// SetEnabled is used to control if the broker is enabled. The broker
|
||||
// should only be enabled on the active leader.
|
||||
// SetEnabled is used to control if the blocked eval tracker is enabled. The
|
||||
// tracker should only be enabled on the active leader.
|
||||
func (b *BlockedEvals) SetEnabled(enabled bool) {
|
||||
b.l.Lock()
|
||||
if b.enabled == enabled {
|
||||
@@ -170,7 +177,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
|
||||
// Just re-enqueue the eval immediately. We pass the token so that the
|
||||
// eval_broker can properly handle the case in which the evaluation is
|
||||
// still outstanding.
|
||||
b.evalBroker.Requeue(eval, token)
|
||||
b.evalBroker.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -178,19 +185,25 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
|
||||
b.stats.TotalBlocked++
|
||||
b.jobs[eval.JobID] = struct{}{}
|
||||
|
||||
// Wrap the evaluation, capturing its token.
|
||||
wrapped := wrappedEval{
|
||||
eval: eval,
|
||||
token: token,
|
||||
}
|
||||
|
||||
// If the eval has escaped, meaning computed node classes could not capture
|
||||
// the constraints of the job, we store the eval separately as we have to
|
||||
// unblock it whenever node capacity changes. This is because we don't know
|
||||
// what node class is feasible for the jobs constraints.
|
||||
if eval.EscapedComputedClass {
|
||||
b.escaped[eval.ID] = eval
|
||||
b.escaped[eval.ID] = wrapped
|
||||
b.stats.TotalEscaped++
|
||||
return
|
||||
}
|
||||
|
||||
// Add the eval to the set of blocked evals whose jobs constraints are
|
||||
// captured by computed node class.
|
||||
b.captured[eval.ID] = eval
|
||||
b.captured[eval.ID] = wrapped
|
||||
}
|
||||
|
||||
// missedUnblock returns whether an evaluation missed an unblock while it was in
|
||||
@@ -281,13 +294,13 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
|
||||
|
||||
// Every eval that has escaped computed node class has to be unblocked
|
||||
// because any node could potentially be feasible.
|
||||
var unblocked []*structs.Evaluation
|
||||
if l := len(b.escaped); l != 0 {
|
||||
unblocked = make([]*structs.Evaluation, 0, l)
|
||||
for id, eval := range b.escaped {
|
||||
unblocked = append(unblocked, eval)
|
||||
numEscaped := len(b.escaped)
|
||||
unblocked := make(map[*structs.Evaluation]string, lib.MaxInt(numEscaped, 4))
|
||||
if numEscaped != 0 {
|
||||
for id, wrapped := range b.escaped {
|
||||
unblocked[wrapped.eval] = wrapped.token
|
||||
delete(b.escaped, id)
|
||||
delete(b.jobs, eval.JobID)
|
||||
delete(b.jobs, wrapped.eval.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -296,8 +309,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
|
||||
// when the evaluation was originally run through the scheduler, that it
|
||||
// never saw a node with the given computed class and thus needs to be
|
||||
// unblocked for correctness.
|
||||
for id, eval := range b.captured {
|
||||
if elig, ok := eval.ClassEligibility[computedClass]; ok && !elig {
|
||||
for id, wrapped := range b.captured {
|
||||
if elig, ok := wrapped.eval.ClassEligibility[computedClass]; ok && !elig {
|
||||
// Can skip because the eval has explicitly marked the node class
|
||||
// as ineligible.
|
||||
continue
|
||||
@@ -305,8 +318,8 @@ func (b *BlockedEvals) unblock(computedClass string, index uint64) {
|
||||
|
||||
// The computed node class has never been seen by the eval so we unblock
|
||||
// it.
|
||||
unblocked = append(unblocked, eval)
|
||||
delete(b.jobs, eval.JobID)
|
||||
unblocked[wrapped.eval] = wrapped.token
|
||||
delete(b.jobs, wrapped.eval.JobID)
|
||||
delete(b.captured, id)
|
||||
}
|
||||
|
||||
@@ -331,27 +344,27 @@ func (b *BlockedEvals) UnblockFailed() {
|
||||
return
|
||||
}
|
||||
|
||||
var unblock []*structs.Evaluation
|
||||
for id, eval := range b.captured {
|
||||
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
|
||||
unblock = append(unblock, eval)
|
||||
unblocked := make(map[*structs.Evaluation]string, 4)
|
||||
for id, wrapped := range b.captured {
|
||||
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
|
||||
unblocked[wrapped.eval] = wrapped.token
|
||||
delete(b.captured, id)
|
||||
delete(b.jobs, eval.JobID)
|
||||
delete(b.jobs, wrapped.eval.JobID)
|
||||
}
|
||||
}
|
||||
|
||||
for id, eval := range b.escaped {
|
||||
if eval.TriggeredBy == structs.EvalTriggerMaxPlans {
|
||||
unblock = append(unblock, eval)
|
||||
for id, wrapped := range b.escaped {
|
||||
if wrapped.eval.TriggeredBy == structs.EvalTriggerMaxPlans {
|
||||
unblocked[wrapped.eval] = wrapped.token
|
||||
delete(b.escaped, id)
|
||||
delete(b.jobs, eval.JobID)
|
||||
delete(b.jobs, wrapped.eval.JobID)
|
||||
b.stats.TotalEscaped -= 1
|
||||
}
|
||||
}
|
||||
|
||||
if l := len(unblock); l > 0 {
|
||||
if l := len(unblocked); l > 0 {
|
||||
b.stats.TotalBlocked -= l
|
||||
b.evalBroker.EnqueueAll(unblock)
|
||||
b.evalBroker.EnqueueAll(unblocked)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -395,8 +408,8 @@ func (b *BlockedEvals) Flush() {
|
||||
// Reset the blocked eval tracker.
|
||||
b.stats.TotalEscaped = 0
|
||||
b.stats.TotalBlocked = 0
|
||||
b.captured = make(map[string]*structs.Evaluation)
|
||||
b.escaped = make(map[string]*structs.Evaluation)
|
||||
b.captured = make(map[string]wrappedEval)
|
||||
b.escaped = make(map[string]wrappedEval)
|
||||
b.jobs = make(map[string]struct{})
|
||||
b.duplicates = nil
|
||||
b.capacityChangeCh = make(chan *capacityUpdate, unblockBuffer)
|
||||
|
||||
@@ -255,6 +255,62 @@ func TestBlockedEvals_UnblockUnknown(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestBlockedEvals_Reblock(t *testing.T) {
|
||||
blocked, broker := testBlockedEvals(t)
|
||||
|
||||
// Create an evaluation, Enqueue/Dequeue it to get a token
|
||||
e := mock.Eval()
|
||||
e.SnapshotIndex = 500
|
||||
e.Status = structs.EvalStatusBlocked
|
||||
e.ClassEligibility = map[string]bool{"v1:123": true, "v1:456": false}
|
||||
broker.Enqueue(e)
|
||||
|
||||
_, token, err := broker.Dequeue([]string{e.Type}, time.Second)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Reblock the evaluation
|
||||
blocked.Reblock(e, token)
|
||||
|
||||
// Verify block caused the eval to be tracked
|
||||
blockedStats := blocked.Stats()
|
||||
if blockedStats.TotalBlocked != 1 && blockedStats.TotalEscaped != 0 {
|
||||
t.Fatalf("bad: %#v", blockedStats)
|
||||
}
|
||||
|
||||
// Should unblock because the eval
|
||||
blocked.Unblock("v1:123", 1000)
|
||||
|
||||
brokerStats := broker.Stats()
|
||||
if brokerStats.TotalReady != 0 && brokerStats.TotalUnacked != 1 {
|
||||
t.Fatalf("bad: %#v", brokerStats)
|
||||
}
|
||||
|
||||
// Ack the evaluation which should cause the reblocked eval to transistion
|
||||
// to ready
|
||||
if err := broker.Ack(e.ID, token); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
// Verify Unblock causes an enqueue
|
||||
brokerStats := broker.Stats()
|
||||
if brokerStats.TotalReady != 1 {
|
||||
return false, fmt.Errorf("bad: %#v", brokerStats)
|
||||
}
|
||||
|
||||
// Verify Unblock updates the stats
|
||||
bStats := blocked.Stats()
|
||||
if bStats.TotalBlocked != 0 || bStats.TotalEscaped != 0 {
|
||||
return false, fmt.Errorf("bad: %#v", bStats)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
// Test the block case in which the eval should be immediately unblocked since
|
||||
// it is escaped and old
|
||||
func TestBlockedEvals_Block_ImmediateUnblock_Escaped(t *testing.T) {
|
||||
|
||||
@@ -135,15 +135,31 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// EnqueueAll is used to enqueue many evaluations.
|
||||
func (b *EvalBroker) EnqueueAll(evals []*structs.Evaluation) {
|
||||
// Enqueue is used to enqueue a new evaluation
|
||||
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, "")
|
||||
}
|
||||
|
||||
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
|
||||
// that are being re-enqueued to include their token.
|
||||
//
|
||||
// When requeueing an evaluation that potentially may be already
|
||||
// enqueued. The evaluation is handled in one of the following ways:
|
||||
// * Evaluation not outstanding: Process as a normal Enqueue
|
||||
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
||||
// * Ack received: Unblock the evaluation allowing it to be dequeued
|
||||
// * Nack received: Drop the evaluation as it was created as a result of a
|
||||
// scheduler run that was Nack'd
|
||||
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
|
||||
// The lock needs to be held until all evaluations are enqueued. This is so
|
||||
// that when Dequeue operations are unblocked they will pick the highest
|
||||
// priority evaluations.
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
for _, eval := range evals {
|
||||
b.processEnqueue(eval, "")
|
||||
for eval, token := range evals {
|
||||
b.processEnqueue(eval, token)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -182,26 +198,6 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
||||
b.enqueueLocked(eval, eval.Type)
|
||||
}
|
||||
|
||||
// Enqueue is used to enqueue an evaluation
|
||||
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, "")
|
||||
}
|
||||
|
||||
// Requeue is used to requeue an evaluation that potentially may be already
|
||||
// enqueued. The evaluation is handled in one of the following ways:
|
||||
// * Evaluation not outstanding: Process as a normal Enqueue
|
||||
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
||||
// * Ack received: Unblock the evaluation allowing it to be dequeued
|
||||
// * Nack received: Drop the evaluation as it was created as a result of a
|
||||
// scheduler run that was Nack'd
|
||||
func (b *EvalBroker) Requeue(eval *structs.Evaluation, token string) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, token)
|
||||
}
|
||||
|
||||
// enqueueWaiting is used to enqueue a waiting evaluation
|
||||
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
|
||||
@@ -865,12 +865,12 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// Enqueue
|
||||
evals := make([]*structs.Evaluation, 0, 8)
|
||||
evals := make(map[*structs.Evaluation]string, 8)
|
||||
expectedPriority := 90
|
||||
for i := 10; i <= expectedPriority; i += 10 {
|
||||
eval := mock.Eval()
|
||||
eval.Priority = i
|
||||
evals = append(evals, eval)
|
||||
evals[eval] = ""
|
||||
|
||||
}
|
||||
b.EnqueueAll(evals)
|
||||
@@ -886,7 +886,7 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
func TestEvalBroker_EnqueueAll_Requeue_Ack(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
|
||||
@@ -903,7 +903,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
}
|
||||
|
||||
// Requeue the same evaluation.
|
||||
b.Requeue(eval, token)
|
||||
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
|
||||
// The stats should show one unacked
|
||||
stats := b.Stats()
|
||||
@@ -942,7 +942,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Reenqueue_Nack(t *testing.T) {
|
||||
func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
|
||||
@@ -959,7 +959,7 @@ func TestEvalBroker_Reenqueue_Nack(t *testing.T) {
|
||||
}
|
||||
|
||||
// Requeue the same evaluation.
|
||||
b.Requeue(eval, token)
|
||||
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
|
||||
// The stats should show one unacked
|
||||
stats := b.Stats()
|
||||
|
||||
Reference in New Issue
Block a user