mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Remove requeue because it is a subset of EnqueueAll now
This commit is contained in:
@@ -107,8 +107,8 @@ func (b *BlockedEvals) Enabled() bool {
|
||||
return b.enabled
|
||||
}
|
||||
|
||||
// SetEnabled is used to control if the broker is enabled. The broker
|
||||
// should only be enabled on the active leader.
|
||||
// SetEnabled is used to control if the blocked eval tracker is enabled. The
|
||||
// tracker should only be enabled on the active leader.
|
||||
func (b *BlockedEvals) SetEnabled(enabled bool) {
|
||||
b.l.Lock()
|
||||
if b.enabled == enabled {
|
||||
@@ -177,7 +177,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
|
||||
// Just re-enqueue the eval immediately. We pass the token so that the
|
||||
// eval_broker can properly handle the case in which the evaluation is
|
||||
// still outstanding.
|
||||
b.evalBroker.Requeue(eval, token)
|
||||
b.evalBroker.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
return
|
||||
}
|
||||
|
||||
@@ -185,6 +185,7 @@ func (b *BlockedEvals) processBlock(eval *structs.Evaluation, token string) {
|
||||
b.stats.TotalBlocked++
|
||||
b.jobs[eval.JobID] = struct{}{}
|
||||
|
||||
// Wrap the evaluation, capturing its token.
|
||||
wrapped := wrappedEval{
|
||||
eval: eval,
|
||||
token: token,
|
||||
|
||||
@@ -135,8 +135,23 @@ func (b *EvalBroker) SetEnabled(enabled bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// Enqueue is used to enqueue a new evaluation
|
||||
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, "")
|
||||
}
|
||||
|
||||
// EnqueueAll is used to enqueue many evaluations. The map allows evaluations
|
||||
// that are being re-enqueued to include their token.
|
||||
//
|
||||
// When requeueing an evaluation that potentially may be already
|
||||
// enqueued. The evaluation is handled in one of the following ways:
|
||||
// * Evaluation not outstanding: Process as a normal Enqueue
|
||||
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
||||
// * Ack received: Unblock the evaluation allowing it to be dequeued
|
||||
// * Nack received: Drop the evaluation as it was created as a result of a
|
||||
// scheduler run that was Nack'd
|
||||
func (b *EvalBroker) EnqueueAll(evals map[*structs.Evaluation]string) {
|
||||
// The lock needs to be held until all evaluations are enqueued. This is so
|
||||
// that when Dequeue operations are unblocked they will pick the highest
|
||||
@@ -183,26 +198,6 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
|
||||
b.enqueueLocked(eval, eval.Type)
|
||||
}
|
||||
|
||||
// Enqueue is used to enqueue an evaluation
|
||||
func (b *EvalBroker) Enqueue(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, "")
|
||||
}
|
||||
|
||||
// Requeue is used to requeue an evaluation that potentially may be already
|
||||
// enqueued. The evaluation is handled in one of the following ways:
|
||||
// * Evaluation not outstanding: Process as a normal Enqueue
|
||||
// * Evaluation outstanding: Do not allow the evaluation to be dequeued til:
|
||||
// * Ack received: Unblock the evaluation allowing it to be dequeued
|
||||
// * Nack received: Drop the evaluation as it was created as a result of a
|
||||
// scheduler run that was Nack'd
|
||||
func (b *EvalBroker) Requeue(eval *structs.Evaluation, token string) {
|
||||
b.l.Lock()
|
||||
defer b.l.Unlock()
|
||||
b.processEnqueue(eval, token)
|
||||
}
|
||||
|
||||
// enqueueWaiting is used to enqueue a waiting evaluation
|
||||
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
|
||||
b.l.Lock()
|
||||
|
||||
@@ -865,12 +865,12 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// Enqueue
|
||||
evals := make([]*structs.Evaluation, 0, 8)
|
||||
evals := make(map[*structs.Evaluation]string, 8)
|
||||
expectedPriority := 90
|
||||
for i := 10; i <= expectedPriority; i += 10 {
|
||||
eval := mock.Eval()
|
||||
eval.Priority = i
|
||||
evals = append(evals, eval)
|
||||
evals[eval] = ""
|
||||
|
||||
}
|
||||
b.EnqueueAll(evals)
|
||||
@@ -886,7 +886,7 @@ func TestEvalBroker_EnqueueAll_Dequeue_Fair(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
func TestEvalBroker_EnqueueAll_Requeue_Ack(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
|
||||
@@ -903,7 +903,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
}
|
||||
|
||||
// Requeue the same evaluation.
|
||||
b.Requeue(eval, token)
|
||||
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
|
||||
// The stats should show one unacked
|
||||
stats := b.Stats()
|
||||
@@ -942,7 +942,7 @@ func TestEvalBroker_Reenqueue_Ack(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestEvalBroker_Reenqueue_Nack(t *testing.T) {
|
||||
func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
|
||||
b := testBroker(t, 0)
|
||||
b.SetEnabled(true)
|
||||
|
||||
@@ -959,7 +959,7 @@ func TestEvalBroker_Reenqueue_Nack(t *testing.T) {
|
||||
}
|
||||
|
||||
// Requeue the same evaluation.
|
||||
b.Requeue(eval, token)
|
||||
b.EnqueueAll(map[*structs.Evaluation]string{eval: token})
|
||||
|
||||
// The stats should show one unacked
|
||||
stats := b.Stats()
|
||||
|
||||
Reference in New Issue
Block a user