From b3b2c4d540b338ef41511839034f66e740815579 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 12 Aug 2015 15:25:31 -0700 Subject: [PATCH] nomad: avoid split-brain eval handling after leader transition --- nomad/eval_broker.go | 47 ++++++++++++++++--------- nomad/eval_broker_test.go | 69 ++++++++++++++++++++++++------------- nomad/eval_endpoint.go | 13 +++---- nomad/eval_endpoint_test.go | 26 ++++++++------ nomad/structs/structs.go | 14 ++++++++ nomad/worker.go | 25 +++++++------- nomad/worker_test.go | 15 ++++---- 7 files changed, 136 insertions(+), 73 deletions(-) diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 02dabb171..ad8a5f209 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -51,6 +51,7 @@ type EvalBroker struct { // unackEval tracks an unacknowledged evaluation along with the Nack timer type unackEval struct { Eval *structs.Evaluation + Token string NackTimer *time.Timer } @@ -164,16 +165,16 @@ func (b *EvalBroker) enqueueLocked(eval *structs.Evaluation) error { } // Dequeue is used to perform a blocking dequeue -func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, error) { +func (b *EvalBroker) Dequeue(schedulers []string, timeout time.Duration) (*structs.Evaluation, string, error) { var timeoutTimer *time.Timer SCAN: // Scan for work - eval, err := b.scanForSchedulers(schedulers) + eval, token, err := b.scanForSchedulers(schedulers) if err != nil { if timeoutTimer != nil { timeoutTimer.Stop() } - return nil, err + return nil, "", err } // Check if we have something @@ -181,7 +182,7 @@ SCAN: if timeoutTimer != nil { timeoutTimer.Stop() } - return eval, nil + return eval, token, nil } // Setup the timeout channel the first time around @@ -194,18 +195,18 @@ SCAN: if scan { goto SCAN } - return nil, nil + return nil, "", nil } // scanForSchedulers scans for work on any of the schedulers. The highest priority work // is dequeued first. This may return nothing if there is no work waiting. -func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, error) { +func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation, string, error) { b.l.Lock() defer b.l.Unlock() // Do nothing if not enabled if !b.enabled { - return nil, fmt.Errorf("eval broker disabled") + return nil, "", fmt.Errorf("eval broker disabled") } // Scan for eligible work @@ -241,7 +242,7 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation switch n := len(eligibleSched); n { case 0: // No work to do! - return nil, nil + return nil, "", nil case 1: // Only a single task, dequeue @@ -257,21 +258,25 @@ func (b *EvalBroker) scanForSchedulers(schedulers []string) (*structs.Evaluation // dequeueForSched is used to dequeue the next work item for a given scheduler. // This assumes locks are held and that this scheduler has work -func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, error) { +func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, string, error) { // Get the pending queue pending := b.ready[sched] raw := heap.Pop(&pending) b.ready[sched] = pending eval := raw.(*structs.Evaluation) + // Generate a UUID for the token + token := generateUUID() + // Setup Nack timer nackTimer := time.AfterFunc(b.nackTimeout, func() { - b.Nack(eval.ID) + b.Nack(eval.ID, token) }) // Add to the unack queue b.unack[eval.ID] = &unackEval{ Eval: eval, + Token: token, NackTimer: nackTimer, } @@ -282,7 +287,7 @@ func (b *EvalBroker) dequeueForSched(sched string) (*structs.Evaluation, error) bySched.Ready -= 1 bySched.Unacked += 1 - return eval, nil + return eval, token, nil } // waitForSchedulers is used to wait for work on any of the scheduler or until a timeout. @@ -327,15 +332,19 @@ func (b *EvalBroker) waitForSchedulers(schedulers []string, timeoutCh <-chan tim } // Outstanding checks if an EvalID has been delivered but not acknowledged -func (b *EvalBroker) Outstanding(evalID string) bool { +// and returns the associated token for the evaluation. +func (b *EvalBroker) Outstanding(evalID string) (string, bool) { b.l.RLock() defer b.l.RUnlock() - _, ok := b.unack[evalID] - return ok + unack, ok := b.unack[evalID] + if !ok { + return "", false + } + return unack.Token, true } // Ack is used to positively acknowledge handling an evaluation -func (b *EvalBroker) Ack(evalID string) error { +func (b *EvalBroker) Ack(evalID, token string) error { b.l.Lock() defer b.l.Unlock() @@ -344,6 +353,9 @@ func (b *EvalBroker) Ack(evalID string) error { if !ok { return fmt.Errorf("Evaluation ID not found") } + if unack.Token != token { + return fmt.Errorf("Token does not match for Evaluation ID") + } jobID := unack.Eval.JobID // Ensure we were able to stop the timer @@ -377,7 +389,7 @@ func (b *EvalBroker) Ack(evalID string) error { } // Nack is used to negatively acknowledge handling an evaluation -func (b *EvalBroker) Nack(evalID string) error { +func (b *EvalBroker) Nack(evalID, token string) error { b.l.Lock() defer b.l.Unlock() @@ -386,6 +398,9 @@ func (b *EvalBroker) Nack(evalID string) error { if !ok { return fmt.Errorf("Evaluation ID not found") } + if unack.Token != token { + return fmt.Errorf("Token does not match for Evaluation ID") + } // Stop the timer, doesn't matter if we've missed it unack.NackTimer.Stop() diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index d459785f4..8cbd546b7 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -73,7 +73,7 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { } // Dequeue should work - out, err := b.Dequeue(defaultSched, time.Second) + out, token, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } @@ -81,9 +81,13 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("bad : %#v", out) } - if !b.Outstanding(out.ID) { + tokenOut, ok := b.Outstanding(out.ID) + if !ok { t.Fatalf("should be outstanding") } + if tokenOut != token { + t.Fatalf("Bad: %#v %#v", token, tokenOut) + } // Check the stats stats = b.Stats() @@ -100,13 +104,19 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { t.Fatalf("bad: %#v", stats) } + // Nack with wrong token should fail + err = b.Nack(eval.ID, "foobarbaz") + if err == nil { + t.Fatalf("should fail to nack") + } + // Nack back into the queue - err = b.Nack(eval.ID) + err = b.Nack(eval.ID, token) if err != nil { t.Fatalf("err: %v", err) } - if b.Outstanding(out.ID) { + if _, ok := b.Outstanding(out.ID); ok { t.Fatalf("should not be outstanding") } @@ -126,25 +136,38 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { } // Dequeue should work again - out2, err := b.Dequeue(defaultSched, time.Second) + out2, token2, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } if out2 != eval { t.Fatalf("bad : %#v", out2) } + if token2 == token { + t.Fatalf("should get a new token") + } - if !b.Outstanding(out.ID) { + tokenOut2, ok := b.Outstanding(out.ID) + if !ok { t.Fatalf("should be outstanding") } + if tokenOut2 != token2 { + t.Fatalf("Bad: %#v %#v", token2, tokenOut2) + } + + // Ack with wrong token + err = b.Ack(eval.ID, "zip") + if err == nil { + t.Fatalf("should fail to ack") + } // Ack finally - err = b.Ack(eval.ID) + err = b.Ack(eval.ID, token2) if err != nil { t.Fatalf("err: %v", err) } - if b.Outstanding(out.ID) { + if _, ok := b.Outstanding(out.ID); ok { t.Fatalf("should not be outstanding") } @@ -199,7 +222,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Dequeue should work - out, err := b.Dequeue(defaultSched, time.Second) + out, token, err := b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } @@ -220,7 +243,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Ack out - err = b.Ack(eval.ID) + err = b.Ack(eval.ID, token) if err != nil { t.Fatalf("err: %v", err) } @@ -238,7 +261,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Dequeue should work - out, err = b.Dequeue(defaultSched, time.Second) + out, token, err = b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } @@ -259,7 +282,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Ack out - err = b.Ack(eval2.ID) + err = b.Ack(eval2.ID, token) if err != nil { t.Fatalf("err: %v", err) } @@ -277,7 +300,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Dequeue should work - out, err = b.Dequeue(defaultSched, time.Second) + out, token, err = b.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } @@ -298,7 +321,7 @@ func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { } // Ack out - err = b.Ack(eval3.ID) + err = b.Ack(eval3.ID, token) if err != nil { t.Fatalf("err: %v", err) } @@ -348,7 +371,7 @@ func TestEvalBroker_Dequeue_Timeout(t *testing.T) { b.SetEnabled(true) start := time.Now() - out, err := b.Dequeue(defaultSched, 5*time.Millisecond) + out, _, err := b.Dequeue(defaultSched, 5*time.Millisecond) end := time.Now() if err != nil { @@ -380,17 +403,17 @@ func TestEvalBroker_Dequeue_Priority(t *testing.T) { eval3.Priority = 20 b.Enqueue(eval3) - out1, _ := b.Dequeue(defaultSched, time.Second) + out1, _, _ := b.Dequeue(defaultSched, time.Second) if out1 != eval2 { t.Fatalf("bad: %#v", out1) } - out2, _ := b.Dequeue(defaultSched, time.Second) + out2, _, _ := b.Dequeue(defaultSched, time.Second) if out2 != eval3 { t.Fatalf("bad: %#v", out2) } - out3, _ := b.Dequeue(defaultSched, time.Second) + out3, _, _ := b.Dequeue(defaultSched, time.Second) if out3 != eval1 { t.Fatalf("bad: %#v", out3) } @@ -410,7 +433,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { } for i := 0; i < NUM; i++ { - out1, _ := b.Dequeue(defaultSched, time.Second) + out1, _, _ := b.Dequeue(defaultSched, time.Second) if out1.CreateIndex != uint64(i) { t.Fatalf("bad: %d %#v", i, out1) } @@ -435,7 +458,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) { counter := 0 for i := 0; i < NUM; i++ { - out1, _ := b.Dequeue(defaultSched, time.Second) + out1, _, _ := b.Dequeue(defaultSched, time.Second) switch out1.Type { case structs.JobTypeService: @@ -467,7 +490,7 @@ func TestEvalBroker_Dequeue_Blocked(t *testing.T) { outCh := make(chan *structs.Evaluation, 1) go func() { start := time.Now() - out, err := b.Dequeue(defaultSched, time.Second) + out, _, err := b.Dequeue(defaultSched, time.Second) end := time.Now() outCh <- out if err != nil { @@ -512,7 +535,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { } // Dequeue - out, err := b.Dequeue(defaultSched, time.Second) + out, _, err := b.Dequeue(defaultSched, time.Second) start := time.Now() if err != nil { t.Fatalf("err: %v", err) @@ -522,7 +545,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { } // Dequeue, should block on Nack timer - out, err = b.Dequeue(defaultSched, time.Second) + out, _, err = b.Dequeue(defaultSched, time.Second) end := time.Now() if err != nil { t.Fatalf("err: %v", err) diff --git a/nomad/eval_endpoint.go b/nomad/eval_endpoint.go index f8382c007..dc9899816 100644 --- a/nomad/eval_endpoint.go +++ b/nomad/eval_endpoint.go @@ -56,7 +56,7 @@ func (e *Eval) GetEval(args *structs.EvalSpecificRequest, // Dequeue is used to dequeue a pending evaluation func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, - reply *structs.SingleEvalResponse) error { + reply *structs.EvalDequeueResponse) error { if done, err := e.srv.forward("Eval.Dequeue", args, args, reply); done { return err } @@ -73,7 +73,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, } // Attempt the dequeue - eval, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) + eval, token, err := e.srv.evalBroker.Dequeue(args.Schedulers, args.Timeout) if err != nil { return err } @@ -81,6 +81,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, // Provide the output if any if eval != nil { reply.Eval = eval + reply.Token = token } // Set the query response @@ -89,7 +90,7 @@ func (e *Eval) Dequeue(args *structs.EvalDequeueRequest, } // Ack is used to acknowledge completion of a dequeued evaluation -func (e *Eval) Ack(args *structs.EvalSpecificRequest, +func (e *Eval) Ack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { if done, err := e.srv.forward("Eval.Ack", args, args, reply); done { return err @@ -97,14 +98,14 @@ func (e *Eval) Ack(args *structs.EvalSpecificRequest, defer metrics.MeasureSince([]string{"nomad", "eval", "ack"}, time.Now()) // Ack the EvalID - if err := e.srv.evalBroker.Ack(args.EvalID); err != nil { + if err := e.srv.evalBroker.Ack(args.EvalID, args.Token); err != nil { return err } return nil } // NAck is used to negative acknowledge completion of a dequeued evaluation -func (e *Eval) Nack(args *structs.EvalSpecificRequest, +func (e *Eval) Nack(args *structs.EvalAckRequest, reply *structs.GenericResponse) error { if done, err := e.srv.forward("Eval.Nack", args, args, reply); done { return err @@ -112,7 +113,7 @@ func (e *Eval) Nack(args *structs.EvalSpecificRequest, defer metrics.MeasureSince([]string{"nomad", "eval", "nack"}, time.Now()) // Nack the EvalID - if err := e.srv.evalBroker.Nack(args.EvalID); err != nil { + if err := e.srv.evalBroker.Nack(args.EvalID, args.Token); err != nil { return err } return nil diff --git a/nomad/eval_endpoint_test.go b/nomad/eval_endpoint_test.go index 06bde4a6e..5c8d05014 100644 --- a/nomad/eval_endpoint_test.go +++ b/nomad/eval_endpoint_test.go @@ -68,7 +68,7 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { Schedulers: defaultSched, WriteRequest: structs.WriteRequest{Region: "region1"}, } - var resp structs.SingleEvalResponse + var resp structs.EvalDequeueResponse if err := msgpackrpc.CallWithCodec(codec, "Eval.Dequeue", get, &resp); err != nil { t.Fatalf("err: %v", err) } @@ -78,9 +78,13 @@ func TestEvalEndpoint_Dequeue(t *testing.T) { } // Ensure outstanding - if !s1.evalBroker.Outstanding(eval1.ID) { + token, ok := s1.evalBroker.Outstanding(eval1.ID) + if !ok { t.Fatalf("should be outstanding") } + if token != resp.Token { + t.Fatalf("bad token: %#v %#v", token, resp.Token) + } } func TestEvalEndpoint_Ack(t *testing.T) { @@ -97,7 +101,7 @@ func TestEvalEndpoint_Ack(t *testing.T) { // Create the register request eval1 := mock.Eval() s1.evalBroker.Enqueue(eval1) - out, err := s1.evalBroker.Dequeue(defaultSched, time.Second) + out, token, err := s1.evalBroker.Dequeue(defaultSched, time.Second) if err != nil { t.Fatalf("err: %v", err) } @@ -106,8 +110,9 @@ func TestEvalEndpoint_Ack(t *testing.T) { } // Ack the eval - get := &structs.EvalSpecificRequest{ + get := &structs.EvalAckRequest{ EvalID: out.ID, + Token: token, WriteRequest: structs.WriteRequest{Region: "region1"}, } var resp structs.GenericResponse @@ -116,7 +121,7 @@ func TestEvalEndpoint_Ack(t *testing.T) { } // Ensure outstanding - if s1.evalBroker.Outstanding(eval1.ID) { + if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok { t.Fatalf("should not be outstanding") } } @@ -135,14 +140,15 @@ func TestEvalEndpoint_Nack(t *testing.T) { // Create the register request eval1 := mock.Eval() s1.evalBroker.Enqueue(eval1) - out, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + out, token, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) if out == nil { t.Fatalf("missing eval") } - // Ack the eval - get := &structs.EvalSpecificRequest{ + // Nack the eval + get := &structs.EvalAckRequest{ EvalID: out.ID, + Token: token, WriteRequest: structs.WriteRequest{Region: "region1"}, } var resp structs.GenericResponse @@ -151,12 +157,12 @@ func TestEvalEndpoint_Nack(t *testing.T) { } // Ensure outstanding - if s1.evalBroker.Outstanding(eval1.ID) { + if _, ok := s1.evalBroker.Outstanding(eval1.ID); ok { t.Fatalf("should not be outstanding") } // Should get it back - out2, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) + out2, _, _ := s1.evalBroker.Dequeue(defaultSched, time.Second) if out2 != out { t.Fatalf("nack failed") } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 1ce4b6eb9..b744d125a 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -179,6 +179,13 @@ type EvalSpecificRequest struct { WriteRequest } +// EvalAckRequest is used to Ack/Nack a specific evaluation +type EvalAckRequest struct { + EvalID string + Token string + WriteRequest +} + // EvalDequeueRequest is used when we want to dequeue an evaluation type EvalDequeueRequest struct { Schedulers []string @@ -251,6 +258,13 @@ type SingleEvalResponse struct { QueryMeta } +// EvalDequeueResponse is used to return from a dequeue +type EvalDequeueResponse struct { + Eval *Evaluation + Token string + QueryMeta +} + // PlanResponse is used to return from a PlanRequest type PlanResponse struct { Result *PlanResult diff --git a/nomad/worker.go b/nomad/worker.go index 11729b248..4c3e59c64 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -54,37 +54,37 @@ func NewWorker(srv *Server) (*Worker, error) { func (w *Worker) run() { for { // Dequeue a pending evaluation - eval, shutdown := w.dequeueEvaluation(dequeueTimeout) + eval, token, shutdown := w.dequeueEvaluation(dequeueTimeout) if shutdown { return } // Check for a shutdown if w.srv.IsShutdown() { - w.sendAck(eval.ID, false) + w.sendAck(eval.ID, token, false) return } // Wait for the the raft log to catchup to the evaluation if err := w.waitForIndex(eval.ModifyIndex, raftSyncLimit); err != nil { - w.sendAck(eval.ID, false) + w.sendAck(eval.ID, token, false) continue } // Invoke the scheduler to determine placements if err := w.invokeScheduler(eval); err != nil { - w.sendAck(eval.ID, false) + w.sendAck(eval.ID, token, false) continue } // Complete the evaluation - w.sendAck(eval.ID, true) + w.sendAck(eval.ID, token, true) } } // dequeueEvaluation is used to fetch the next ready evaluation. // This blocks until an evaluation is available or a timeout is reached. -func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, bool) { +func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, string, bool) { // Setup the request req := structs.EvalDequeueRequest{ Schedulers: w.srv.config.EnabledSchedulers, @@ -93,7 +93,7 @@ func (w *Worker) dequeueEvaluation(timeout time.Duration) (*structs.Evaluation, Region: w.srv.config.Region, }, } - var resp structs.SingleEvalResponse + var resp structs.EvalDequeueResponse REQ: // Make a blocking RPC @@ -103,7 +103,7 @@ REQ: if err != nil { w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err) if w.backoffErr() { - return nil, true + return nil, "", true } goto REQ } @@ -112,23 +112,24 @@ REQ: // Check if we got a response if resp.Eval != nil { w.logger.Printf("[DEBUG] worker: dequeued evaluation %s", resp.Eval.ID) - return resp.Eval, false + return resp.Eval, resp.Token, false } // Check for potential shutdown if w.srv.IsShutdown() { - return nil, true + return nil, "", true } goto REQ } // sendAck makes a best effort to ack or nack the evaluation. // Any errors are logged but swallowed. -func (w *Worker) sendAck(evalID string, ack bool) { +func (w *Worker) sendAck(evalID, token string, ack bool) { defer metrics.MeasureSince([]string{"nomad", "worker", "send_ack"}, time.Now()) // Setup the request - req := structs.EvalSpecificRequest{ + req := structs.EvalAckRequest{ EvalID: evalID, + Token: token, WriteRequest: structs.WriteRequest{ Region: w.srv.config.Region, }, diff --git a/nomad/worker_test.go b/nomad/worker_test.go index f7b9597a1..5a8790084 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -57,10 +57,13 @@ func TestWorker_dequeueEvaluation(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if shutdown { t.Fatalf("should not shutdown") } + if token == "" { + t.Fatalf("should get token") + } // Ensure we get a sane eval if !reflect.DeepEqual(eval, eval1) { @@ -85,7 +88,7 @@ func TestWorker_dequeueEvaluation_shutdown(t *testing.T) { }() // Attempt dequeue - eval, shutdown := w.dequeueEvaluation(10 * time.Millisecond) + eval, _, shutdown := w.dequeueEvaluation(10 * time.Millisecond) if !shutdown { t.Fatalf("should not shutdown") } @@ -112,7 +115,7 @@ func TestWorker_sendAck(t *testing.T) { w := &Worker{srv: s1, logger: s1.logger} // Attempt dequeue - eval, _ := w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _ := w.dequeueEvaluation(10 * time.Millisecond) // Check the depth is 0, 1 unacked stats := s1.evalBroker.Stats() @@ -121,7 +124,7 @@ func TestWorker_sendAck(t *testing.T) { } // Send the Nack - w.sendAck(eval.ID, false) + w.sendAck(eval.ID, token, false) // Check the depth is 1, nothing unacked stats = s1.evalBroker.Stats() @@ -130,10 +133,10 @@ func TestWorker_sendAck(t *testing.T) { } // Attempt dequeue - eval, _ = w.dequeueEvaluation(10 * time.Millisecond) + eval, token, _ = w.dequeueEvaluation(10 * time.Millisecond) // Send the Ack - w.sendAck(eval.ID, true) + w.sendAck(eval.ID, token, true) // Check the depth is 0 stats = s1.evalBroker.Stats()