diff --git a/.changelog/15835.txt b/.changelog/15835.txt new file mode 100644 index 000000000..a2615be30 --- /dev/null +++ b/.changelog/15835.txt @@ -0,0 +1,3 @@ +```release-note:breaking-change +metrics: The metric `nomad.nomad.broker.total_blocked` has been renamed to `nomad.nomad.broker.total_pending` to reduce confusion with the `nomad.blocked_eval.total_blocked` metric. +``` diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 73591e16d..0bfcdd980 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -63,15 +63,15 @@ type EvalBroker struct { // jobEvals tracks queued evaluations by a job's ID and namespace to serialize them jobEvals map[structs.NamespacedID]string - // blocked tracks the blocked evaluations by JobID in a priority queue - blocked map[structs.NamespacedID]BlockedEvaluations + // pending tracks the pending evaluations by JobID in a priority queue + pending map[structs.NamespacedID]PendingEvaluations - // cancelable tracks previously blocked evaluations (for any job) that are + // cancelable tracks previously pending evaluations (for any job) that are // now safe for the Eval.Ack RPC to cancel in batches cancelable []*structs.Evaluation // ready tracks the ready jobs by scheduler in a priority queue - ready map[string]PendingEvaluations + ready map[string]ReadyEvaluations // unack is a map of evalID to an un-acknowledged evaluation unack map[string]*unackEval @@ -119,14 +119,14 @@ type unackEval struct { NackTimer *time.Timer } -// PendingEvaluations is a list of ready evaluations across multiple jobs. We +// ReadyEvaluations is a list of ready evaluations across multiple jobs. We +// implement the container/heap interface so that this is a priority queue. +type ReadyEvaluations []*structs.Evaluation + +// PendingEvaluations is a list of pending evaluations for a given job. We // implement the container/heap interface so that this is a priority queue. type PendingEvaluations []*structs.Evaluation -// BlockedEvaluations is a list of blocked evaluations for a given job. We -// implement the container/heap interface so that this is a priority queue. -type BlockedEvaluations []*structs.Evaluation - // NewEvalBroker creates a new evaluation broker. This is parameterized // with the timeout used for messages that are not acknowledged before we // assume a Nack and attempt to redeliver as well as the deliveryLimit @@ -146,9 +146,9 @@ func NewEvalBroker(timeout, initialNackDelay, subsequentNackDelay time.Duration, stats: new(BrokerStats), evals: make(map[string]int), jobEvals: make(map[structs.NamespacedID]string), - blocked: make(map[structs.NamespacedID]BlockedEvaluations), + pending: make(map[structs.NamespacedID]PendingEvaluations), cancelable: make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest), - ready: make(map[string]PendingEvaluations), + ready: make(map[string]ReadyEvaluations), unack: make(map[string]*unackEval), waiting: make(map[string]chan struct{}), requeue: make(map[string]*structs.Evaluation), @@ -292,53 +292,53 @@ func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { } // enqueueLocked is used to enqueue with the lock held -func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, queue string) { +func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation, sched string) { // Do nothing if not enabled if !b.enabled { return } - // Check if there is an evaluation for this JobID pending + // Check if there is a ready evaluation for this JobID namespacedID := structs.NamespacedID{ ID: eval.JobID, Namespace: eval.Namespace, } - pendingEval := b.jobEvals[namespacedID] - if pendingEval == "" { + readyEval := b.jobEvals[namespacedID] + if readyEval == "" { b.jobEvals[namespacedID] = eval.ID - } else if pendingEval != eval.ID { - blocked := b.blocked[namespacedID] - heap.Push(&blocked, eval) - b.blocked[namespacedID] = blocked - b.stats.TotalBlocked += 1 + } else if readyEval != eval.ID { + pending := b.pending[namespacedID] + heap.Push(&pending, eval) + b.pending[namespacedID] = pending + b.stats.TotalPending += 1 return } - // Find the pending by scheduler class - pending, ok := b.ready[queue] + // Find the next ready eval by scheduler class + readyQueue, ok := b.ready[sched] if !ok { - pending = make([]*structs.Evaluation, 0, 16) - if _, ok := b.waiting[queue]; !ok { - b.waiting[queue] = make(chan struct{}, 1) + readyQueue = make([]*structs.Evaluation, 0, 16) + if _, ok := b.waiting[sched]; !ok { + b.waiting[sched] = make(chan struct{}, 1) } } // Push onto the heap - heap.Push(&pending, eval) - b.ready[queue] = pending + heap.Push(&readyQueue, eval) + b.ready[sched] = readyQueue // Update the stats b.stats.TotalReady += 1 - bySched, ok := b.stats.ByScheduler[queue] + bySched, ok := b.stats.ByScheduler[sched] if !ok { bySched = &SchedulerStats{} - b.stats.ByScheduler[queue] = bySched + b.stats.ByScheduler[sched] = bySched } bySched.Ready += 1 - // Unblock any blocked dequeues + // Unblock any pending dequeues select { - case b.waiting[queue] <- struct{}{}: + case b.waiting[sched] <- struct{}{}: default: } } @@ -398,14 +398,14 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation var eligibleSched []string var eligiblePriority int for _, sched := range schedulers { - // Get the pending queue - pending, ok := b.ready[sched] + // Get the ready queue for this scheduler + readyQueue, ok := b.ready[sched] if !ok { continue } // Peek at the next item - ready := pending.Peek() + ready := readyQueue.Peek() if ready == nil { continue } @@ -444,10 +444,9 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation // dequeueForSched is used to dequeue the next work item for a given scheduler. // This assumes locks are held and that this scheduler has work func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) { - // Get the pending queue - pending := b.ready[sched] - raw := heap.Pop(&pending) - b.ready[sched] = pending + readyQueue := b.ready[sched] + raw := heap.Pop(&readyQueue) + b.ready[sched] = readyQueue eval := raw.(*structs.Evaluation) // Generate a UUID for the token @@ -592,29 +591,29 @@ func (b *EvalBroker) Ack(evalID, token string) error { } delete(b.jobEvals, namespacedID) - // Check if there are any blocked evaluations - if blocked := b.blocked[namespacedID]; len(blocked) != 0 { + // Check if there are any pending evaluations + if pending := b.pending[namespacedID]; len(pending) != 0 { - // Any blocked evaluations with ModifyIndexes older than the just-ack'd + // Any pending evaluations with ModifyIndexes older than the just-ack'd // evaluation are no longer useful, so it's safe to drop them. - cancelable := blocked.MarkForCancel() + cancelable := pending.MarkForCancel() b.cancelable = append(b.cancelable, cancelable...) b.stats.TotalCancelable = len(b.cancelable) - b.stats.TotalBlocked -= len(cancelable) + b.stats.TotalPending -= len(cancelable) // If any remain, enqueue an eval - if len(blocked) > 0 { - raw := heap.Pop(&blocked) + if len(pending) > 0 { + raw := heap.Pop(&pending) eval := raw.(*structs.Evaluation) - b.stats.TotalBlocked -= 1 + b.stats.TotalPending -= 1 b.enqueueLocked(eval, eval.Type) } // Clean up if there are no more after that - if len(blocked) > 0 { - b.blocked[namespacedID] = blocked + if len(pending) > 0 { + b.pending[namespacedID] = pending } else { - delete(b.blocked, namespacedID) + delete(b.pending, namespacedID) } } @@ -752,16 +751,16 @@ func (b *EvalBroker) flush() { // Reset the broker b.stats.TotalReady = 0 b.stats.TotalUnacked = 0 - b.stats.TotalBlocked = 0 + b.stats.TotalPending = 0 b.stats.TotalWaiting = 0 b.stats.TotalCancelable = 0 b.stats.DelayedEvals = make(map[string]*structs.Evaluation) b.stats.ByScheduler = make(map[string]*SchedulerStats) b.evals = make(map[string]int) b.jobEvals = make(map[structs.NamespacedID]string) - b.blocked = make(map[structs.NamespacedID]BlockedEvaluations) + b.pending = make(map[structs.NamespacedID]PendingEvaluations) b.cancelable = make([]*structs.Evaluation, 0, structs.MaxUUIDsPerWriteRequest) - b.ready = make(map[string]PendingEvaluations) + b.ready = make(map[string]ReadyEvaluations) b.unack = make(map[string]*unackEval) b.timeWait = make(map[string]*time.Timer) b.delayHeap = delayheap.NewDelayHeap() @@ -784,8 +783,8 @@ func (d *evalWrapper) Namespace() string { return d.eval.Namespace } -// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for -// pending evaluations before enqueuing them +// runDelayedEvalsWatcher is a long-lived function that waits till a time +// deadline is met for pending evaluations before enqueuing them func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) { var timerChannel <-chan time.Time var delayTimer *time.Timer @@ -851,7 +850,7 @@ func (b *EvalBroker) Stats() *BrokerStats { // Copy all the stats stats.TotalReady = b.stats.TotalReady stats.TotalUnacked = b.stats.TotalUnacked - stats.TotalBlocked = b.stats.TotalBlocked + stats.TotalPending = b.stats.TotalPending stats.TotalWaiting = b.stats.TotalWaiting stats.TotalCancelable = b.stats.TotalCancelable for id, eval := range b.stats.DelayedEvals { @@ -865,7 +864,7 @@ func (b *EvalBroker) Stats() *BrokerStats { return stats } -// Cancelable retrieves a batch of previously-blocked evaluations that are now +// Cancelable retrieves a batch of previously-pending evaluations that are now // stale and ready to mark for canceling. The eval RPC will call this with a // batch size set to avoid sending overly large raft messages. func (b *EvalBroker) Cancelable(batchSize int) []*structs.Evaluation { @@ -896,7 +895,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { stats := b.Stats() metrics.SetGauge([]string{"nomad", "broker", "total_ready"}, float32(stats.TotalReady)) metrics.SetGauge([]string{"nomad", "broker", "total_unacked"}, float32(stats.TotalUnacked)) - metrics.SetGauge([]string{"nomad", "broker", "total_blocked"}, float32(stats.TotalBlocked)) + metrics.SetGauge([]string{"nomad", "broker", "total_pending"}, float32(stats.TotalPending)) metrics.SetGauge([]string{"nomad", "broker", "total_waiting"}, float32(stats.TotalWaiting)) metrics.SetGauge([]string{"nomad", "broker", "total_cancelable"}, float32(stats.TotalCancelable)) for _, eval := range stats.DelayedEvals { @@ -923,7 +922,7 @@ func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { type BrokerStats struct { TotalReady int TotalUnacked int - TotalBlocked int + TotalPending int TotalWaiting int TotalCancelable int DelayedEvals map[string]*structs.Evaluation @@ -936,6 +935,49 @@ type SchedulerStats struct { Unacked int } +// Len is for the sorting interface +func (r ReadyEvaluations) Len() int { + return len(r) +} + +// Less is for the sorting interface. We flip the check +// so that the "min" in the min-heap is the element with the +// highest priority +func (r ReadyEvaluations) Less(i, j int) bool { + if r[i].JobID != r[j].JobID && r[i].Priority != r[j].Priority { + return !(r[i].Priority < r[j].Priority) + } + return r[i].CreateIndex < r[j].CreateIndex +} + +// Swap is for the sorting interface +func (r ReadyEvaluations) Swap(i, j int) { + r[i], r[j] = r[j], r[i] +} + +// Push is used to add a new evaluation to the slice +func (r *ReadyEvaluations) Push(e interface{}) { + *r = append(*r, e.(*structs.Evaluation)) +} + +// Pop is used to remove an evaluation from the slice +func (r *ReadyEvaluations) Pop() interface{} { + n := len(*r) + e := (*r)[n-1] + (*r)[n-1] = nil + *r = (*r)[:n-1] + return e +} + +// Peek is used to peek at the next element that would be popped +func (r ReadyEvaluations) Peek() *structs.Evaluation { + n := len(r) + if n == 0 { + return nil + } + return r[n-1] +} + // Len is for the sorting interface func (p PendingEvaluations) Len() int { return len(p) @@ -943,12 +985,12 @@ func (p PendingEvaluations) Len() int { // Less is for the sorting interface. We flip the check // so that the "min" in the min-heap is the element with the -// highest priority +// highest priority or highest modify index func (p PendingEvaluations) Less(i, j int) bool { - if p[i].JobID != p[j].JobID && p[i].Priority != p[j].Priority { + if p[i].Priority != p[j].Priority { return !(p[i].Priority < p[j].Priority) } - return p[i].CreateIndex < p[j].CreateIndex + return !(p[i].ModifyIndex < p[j].ModifyIndex) } // Swap is for the sorting interface @@ -956,12 +998,12 @@ func (p PendingEvaluations) Swap(i, j int) { p[i], p[j] = p[j], p[i] } -// Push is used to add a new evaluation to the slice +// Push implements the heap interface and is used to add a new evaluation to the slice func (p *PendingEvaluations) Push(e interface{}) { *p = append(*p, e.(*structs.Evaluation)) } -// Pop is used to remove an evaluation from the slice +// Pop implements the heap interface and is used to remove an evaluation from the slice func (p *PendingEvaluations) Pop() interface{} { n := len(*p) e := (*p)[n-1] @@ -970,60 +1012,17 @@ func (p *PendingEvaluations) Pop() interface{} { return e } -// Peek is used to peek at the next element that would be popped -func (p PendingEvaluations) Peek() *structs.Evaluation { - n := len(p) - if n == 0 { - return nil - } - return p[n-1] -} - -// Len is for the sorting interface -func (p BlockedEvaluations) Len() int { - return len(p) -} - -// Less is for the sorting interface. We flip the check -// so that the "min" in the min-heap is the element with the -// highest priority or highest modify index -func (p BlockedEvaluations) Less(i, j int) bool { - if p[i].Priority != p[j].Priority { - return !(p[i].Priority < p[j].Priority) - } - return !(p[i].ModifyIndex < p[j].ModifyIndex) -} - -// Swap is for the sorting interface -func (p BlockedEvaluations) Swap(i, j int) { - p[i], p[j] = p[j], p[i] -} - -// Push implements the heap interface and is used to add a new evaluation to the slice -func (p *BlockedEvaluations) Push(e interface{}) { - *p = append(*p, e.(*structs.Evaluation)) -} - -// Pop implements the heap interface and is used to remove an evaluation from the slice -func (p *BlockedEvaluations) Pop() interface{} { - n := len(*p) - e := (*p)[n-1] - (*p)[n-1] = nil - *p = (*p)[:n-1] - return e -} - -// MarkForCancel is used to clear the blocked list of all but the one with the +// MarkForCancel is used to clear the pending list of all but the one with the // highest modify index and highest priority. It returns a slice of cancelable // evals so that Eval.Ack RPCs can write batched raft entries to cancel // them. This must be called inside the broker's lock. -func (p *BlockedEvaluations) MarkForCancel() []*structs.Evaluation { +func (p *PendingEvaluations) MarkForCancel() []*structs.Evaluation { - // In pathological cases, we can have a large number of blocked evals but + // In pathological cases, we can have a large number of pending evals but // will want to cancel most of them. Using heap.Remove requires we re-sort // for each eval we remove. Because we expect to have at most one remaining, // we'll just create a new heap. - retain := BlockedEvaluations{(heap.Pop(p)).(*structs.Evaluation)} + retain := PendingEvaluations{(heap.Pop(p)).(*structs.Evaluation)} cancelable := make([]*structs.Evaluation, len(*p)) copy(cancelable, *p) diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index c46f8bf4a..897eee379 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -434,7 +434,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, - TotalBlocked: 5, TotalCancelable: 0}, getStats()) + TotalPending: 5, TotalCancelable: 0}, getStats()) // Dequeue should get 1st eval out, token, err := b.Dequeue(defaultSched, time.Second) @@ -442,7 +442,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { must.Eq(t, out, eval1, must.Sprint("expected 1st eval")) must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, - TotalBlocked: 5, TotalCancelable: 0}, getStats()) + TotalPending: 5, TotalCancelable: 0}, getStats()) // Current wait index should be 4 but Ack to exercise behavior // when worker's Eval.getWaitIndex gets a stale index @@ -450,25 +450,25 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { must.NoError(t, err) must.Eq(t, BrokerStats{TotalReady: 2, TotalUnacked: 0, - TotalBlocked: 2, TotalCancelable: 2}, getStats()) + TotalPending: 2, TotalCancelable: 2}, getStats()) // eval4 and eval5 are ready - // eval6 and eval7 are blocked + // eval6 and eval7 are pending // Dequeue should get 4th eval out, token, err = b.Dequeue(defaultSched, time.Second) must.NoError(t, err) must.Eq(t, out, eval4, must.Sprint("expected 4th eval")) must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 1, - TotalBlocked: 2, TotalCancelable: 2}, getStats()) + TotalPending: 2, TotalCancelable: 2}, getStats()) - // Ack should clear the rest of namespace-one blocked but leave + // Ack should clear the rest of namespace-one pending but leave // namespace-two untouched err = b.Ack(eval4.ID, token) must.NoError(t, err) must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, - TotalBlocked: 2, TotalCancelable: 2}, getStats()) + TotalPending: 2, TotalCancelable: 2}, getStats()) // Dequeue should get 5th eval out, token, err = b.Dequeue(defaultSched, time.Second) @@ -476,14 +476,14 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { must.Eq(t, out, eval5, must.Sprint("expected 5th eval")) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, - TotalBlocked: 2, TotalCancelable: 2}, getStats()) + TotalPending: 2, TotalCancelable: 2}, getStats()) - // Ack should clear remaining namespace-two blocked evals + // Ack should clear remaining namespace-two pending evals err = b.Ack(eval5.ID, token) must.NoError(t, err) must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, - TotalBlocked: 0, TotalCancelable: 3}, getStats()) + TotalPending: 0, TotalCancelable: 3}, getStats()) // Dequeue should get 7th eval because that's all that's left out, token, err = b.Dequeue(defaultSched, time.Second) @@ -491,14 +491,14 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { must.Eq(t, out, eval7, must.Sprint("expected 7th eval")) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 1, - TotalBlocked: 0, TotalCancelable: 3}, getStats()) + TotalPending: 0, TotalCancelable: 3}, getStats()) // Last ack should leave the broker empty except for cancels err = b.Ack(eval7.ID, token) must.NoError(t, err) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, - TotalBlocked: 0, TotalCancelable: 3}, getStats()) + TotalPending: 0, TotalCancelable: 3}, getStats()) } func TestEvalBroker_Enqueue_Disable(t *testing.T) { @@ -553,7 +553,7 @@ func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) { stats := b.Stats() require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed") require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed") - require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed") + require.Equal(t, 0, stats.TotalPending, "Expected pending to be flushed") require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed") _, ok := stats.ByScheduler[baseEval.Type] require.False(t, ok, "Expected scheduler to have no stats") @@ -577,7 +577,7 @@ func TestEvalBroker_Enqueue_Disable_Delay(t *testing.T) { stats := b.Stats() require.Equal(t, 0, stats.TotalReady, "Expected ready to be flushed") require.Equal(t, 0, stats.TotalWaiting, "Expected waiting to be flushed") - require.Equal(t, 0, stats.TotalBlocked, "Expected blocked to be flushed") + require.Equal(t, 0, stats.TotalPending, "Expected pending to be flushed") require.Equal(t, 0, stats.TotalUnacked, "Expected unacked to be flushed") _, ok := stats.ByScheduler[baseEval.Type] require.False(t, ok, "Expected scheduler to have no stats") @@ -1379,13 +1379,13 @@ func TestEvalBroker_NamespacedJobs(t *testing.T) { require.Nil(err) require.Nil(out3) - require.Equal(1, len(b.blocked)) + require.Equal(1, len(b.pending)) } -func TestEvalBroker_PendingEvals_Ordering(t *testing.T) { +func TestEvalBroker_ReadyEvals_Ordering(t *testing.T) { - ready := PendingEvaluations{} + ready := ReadyEvaluations{} newEval := func(jobID, evalID string, priority int, index uint64) *structs.Evaluation { eval := mock.Eval() @@ -1418,8 +1418,8 @@ func TestEvalBroker_PendingEvals_Ordering(t *testing.T) { } -func TestEvalBroker_BlockedEval_Ordering(t *testing.T) { - blocked := BlockedEvaluations{} +func TestEvalBroker_PendingEval_Ordering(t *testing.T) { + pending := PendingEvaluations{} newEval := func(evalID string, priority int, index uint64) *structs.Evaluation { eval := mock.Eval() @@ -1431,29 +1431,29 @@ func TestEvalBroker_BlockedEval_Ordering(t *testing.T) { // note: we're intentionally pushing these out-of-order to assert we're // getting them back out in the intended order and not just as inserted - heap.Push(&blocked, newEval("eval03", 50, 3)) - heap.Push(&blocked, newEval("eval02", 100, 2)) - heap.Push(&blocked, newEval("eval01", 50, 1)) + heap.Push(&pending, newEval("eval03", 50, 3)) + heap.Push(&pending, newEval("eval02", 100, 2)) + heap.Push(&pending, newEval("eval01", 50, 1)) - unblocked := heap.Pop(&blocked).(*structs.Evaluation) - test.Eq(t, "eval02", unblocked.ID, - test.Sprint("expected eval with highest priority to get unblocked")) + next := heap.Pop(&pending).(*structs.Evaluation) + test.Eq(t, "eval02", next.ID, + test.Sprint("expected eval with highest priority to be next")) - unblocked = heap.Pop(&blocked).(*structs.Evaluation) - test.Eq(t, "eval03", unblocked.ID, - test.Sprint("expected eval with highest modify index to get unblocked")) + next = heap.Pop(&pending).(*structs.Evaluation) + test.Eq(t, "eval03", next.ID, + test.Sprint("expected eval with highest modify index to be next")) - heap.Push(&blocked, newEval("eval04", 30, 4)) - unblocked = heap.Pop(&blocked).(*structs.Evaluation) - test.Eq(t, "eval01", unblocked.ID, - test.Sprint("expected eval with highest priority to get unblocked")) + heap.Push(&pending, newEval("eval04", 30, 4)) + next = heap.Pop(&pending).(*structs.Evaluation) + test.Eq(t, "eval01", next.ID, + test.Sprint("expected eval with highest priority to be nexct")) } -func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { +func TestEvalBroker_PendingEvals_MarkForCancel(t *testing.T) { ci.Parallel(t) - blocked := BlockedEvaluations{} + pending := PendingEvaluations{} // note: we're intentionally pushing these out-of-order to assert we're // getting them back out in the intended order and not just as inserted @@ -1462,14 +1462,14 @@ func TestEvalBroker_BlockedEvals_MarkForCancel(t *testing.T) { eval.JobID = "example" eval.CreateIndex = uint64(i) eval.ModifyIndex = uint64(i) - heap.Push(&blocked, eval) + heap.Push(&pending, eval) } - canceled := blocked.MarkForCancel() + canceled := pending.MarkForCancel() must.Eq(t, 9, len(canceled)) - must.Eq(t, 1, blocked.Len()) + must.Eq(t, 1, pending.Len()) - raw := heap.Pop(&blocked) + raw := heap.Pop(&pending) must.NotNil(t, raw) eval := raw.(*structs.Evaluation) must.Eq(t, 100, eval.ModifyIndex) @@ -1560,7 +1560,7 @@ func TestEvalBroker_IntegrationTest(t *testing.T) { must.Eq(t, map[string]int{structs.EvalStatusPending: 11}, getEvalStatuses()) must.Eq(t, BrokerStats{TotalReady: 1, TotalUnacked: 0, - TotalBlocked: 10, TotalCancelable: 0}, getStats()) + TotalPending: 10, TotalCancelable: 0}, getStats()) // start schedulers: all the evals are for a single job so there should only // be one eval processesed at a time no matter how many schedulers we run @@ -1579,5 +1579,5 @@ func TestEvalBroker_IntegrationTest(t *testing.T) { }, 2*time.Second, time.Millisecond*100) must.Eq(t, BrokerStats{TotalReady: 0, TotalUnacked: 0, - TotalBlocked: 0, TotalCancelable: 0}, getStats()) + TotalPending: 0, TotalCancelable: 0}, getStats()) } diff --git a/website/content/docs/operations/metrics-reference.mdx b/website/content/docs/operations/metrics-reference.mdx index 05e1e33c0..69d1a7eda 100644 --- a/website/content/docs/operations/metrics-reference.mdx +++ b/website/content/docs/operations/metrics-reference.mdx @@ -30,7 +30,7 @@ configuration](/docs/configuration/telemetry). Below is sample output of a telemetry dump: ```text -[2015-09-17 16:59:40 -0700 PDT][G] 'nomad.nomad.broker.total_blocked': 0.000 +[2015-09-17 16:59:40 -0700 PDT][G] 'nomad.nomad.broker.total_pending': 0.000 [2015-09-17 16:59:40 -0700 PDT][G] 'nomad.nomad.plan.queue_depth': 0.000 [2015-09-17 16:59:40 -0700 PDT][G] 'nomad.runtime.malloc_count': 7568.000 [2015-09-17 16:59:40 -0700 PDT][G] 'nomad.runtime.total_gc_runs': 8.000 @@ -97,7 +97,7 @@ signals. | `nomad.runtime.alloc_bytes` | Memory utilization | # of bytes | Gauge | | `nomad.runtime.heap_objects` | Number of objects on the heap. General memory pressure indicator | # of heap objects | Gauge | | `nomad.runtime.num_goroutines` | Number of goroutines and general load pressure indicator | # of goroutines | Gauge | -| `nomad.nomad.broker.total_blocked` | Evaluations that are blocked until an existing evaluation for the same job completes | # of evaluations | Gauge | +| `nomad.nomad.broker.total_pending` | Evaluations that are pending until an existing evaluation for the same job completes | # of evaluations | Gauge | | `nomad.nomad.broker.total_ready` | Number of evaluations ready to be processed | # of evaluations | Gauge | | `nomad.nomad.broker.total_unacked` | Evaluations dispatched for processing but incomplete | # of evaluations | Gauge | | `nomad.nomad.heartbeat.active` | Number of active heartbeat timers. Each timer represents a Nomad Client connection | # of heartbeat timers | Gauge | diff --git a/website/content/docs/operations/monitoring-nomad.mdx b/website/content/docs/operations/monitoring-nomad.mdx index aafc0c443..bc899432d 100644 --- a/website/content/docs/operations/monitoring-nomad.mdx +++ b/website/content/docs/operations/monitoring-nomad.mdx @@ -187,16 +187,14 @@ points in the scheduling process. evaluation at a time, entirely in-memory. If this metric increases, examine the CPU and memory resources of the scheduler. -- **nomad.broker.total_blocked** - The number of blocked +- **nomad.blocked_evals.total_blocked** - The number of blocked evaluations. Blocked evaluations are created when the scheduler cannot place all allocations as part of a plan. Blocked evaluations will be re-evaluated so that changes in cluster resources can be used for the blocked evaluation's allocations. An increase in blocked evaluations may mean that the cluster's clients are low in resources or that job have been submitted that can never have all - their allocations placed. Nomad also emits a similar metric for each - individual scheduler. For example `nomad.broker.batch_blocked` shows - the number of blocked evaluations for the batch scheduler. + their allocations placed. - **nomad.broker.total_unacked** - The number of unacknowledged evaluations. When an evaluation has been processed, the worker sends @@ -211,6 +209,12 @@ points in the scheduling process. shows the number of unacknowledged evaluations for the batch scheduler. +- **nomad.broker.total_pending** - The number of pending evaluations in the eval + broker. Nomad processes only one evaluation for a given job concurrently. When + an unacked evaluation is acknowledged, Nomad will discard all but the latest + evaluation for a job. An increase in this metric may mean that the cluster + state is changing more rapidly than the schedulers can keep up. + - **nomad.plan.evaluate** - The time to evaluate a scheduler plan submitted by a worker. This operation happens on the leader to serialize the plans of all the scheduler workers. This happens diff --git a/website/content/docs/upgrade/upgrade-specific.mdx b/website/content/docs/upgrade/upgrade-specific.mdx index f79271920..eb564c9c5 100644 --- a/website/content/docs/upgrade/upgrade-specific.mdx +++ b/website/content/docs/upgrade/upgrade-specific.mdx @@ -64,15 +64,25 @@ setting [`disable_filesystem_isolation`][artifact_fs_isolation]. #### Server `rejoin_after_leave` (default: `false`) now enforced -All Nomad versions prior to v1.5.0 have incorrectly ignored the Server [`rejoin_after_leave`] -configuration option. This bug has been fixed in Nomad version v1.5.0. +All Nomad versions prior to v1.5.0 have incorrectly ignored the Server +[`rejoin_after_leave`] configuration option. This bug has been fixed in Nomad +version v1.5.0. -Previous to v1.5.0 the behavior of Nomad `rejoin_after_leave` was always `true`, regardless of -Nomad server configuration, while the documentation incorrectly indicated a default of `false`. +Previous to v1.5.0 the behavior of Nomad `rejoin_after_leave` was always `true`, +regardless of Nomad server configuration, while the documentation incorrectly +indicated a default of `false`. -Cluster operators should be aware that explicit `leave` events (such as `nomad server force-leave`) -will now result in behavior which matches this configuration, and should review whether they -were inadvertently relying on the buggy behavior. +Cluster operators should be aware that explicit `leave` events (such as `nomad +server force-leave`) will now result in behavior which matches this +configuration, and should review whether they were inadvertently relying on the +buggy behavior. + +#### Changes to eval broker metrics + +The metric `nomad.nomad.broker.total_blocked` has been changed to +`nomad.nomad.broker.total_pending`. This state refers to internal state of the +leader's broker, and this is easily confused with the unrelated evaluation +status `"blocked"` in the Nomad API. ## Nomad 1.4.0