From cec076290fca63fc20dacfe2d1ab1800330dfa39 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 12 Mar 2019 14:25:14 -0700 Subject: [PATCH 1/4] nomad: refactor waitForIndex into SnapshotAfter Generalize wait for index logic in the state store for reuse elsewhere. Also begin plumbing in a context to combine handling of timeouts and shutdown. --- nomad/blocked_evals.go | 2 +- nomad/eval_broker.go | 2 +- nomad/plan_queue.go | 2 +- nomad/server.go | 11 ++-- nomad/state/state_store.go | 55 +++++++++++++++++++ nomad/state/state_store_test.go | 94 +++++++++++++++++++++++++++++++++ nomad/vault.go | 4 +- nomad/vault_testing.go | 12 ++--- nomad/worker.go | 72 +++++++------------------ nomad/worker_test.go | 34 ++++++------ 10 files changed, 204 insertions(+), 84 deletions(-) diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index dcb3ef635..8ec6aa449 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -666,7 +666,7 @@ func (b *BlockedEvals) Stats() *BlockedStats { } // EmitStats is used to export metrics about the blocked eval tracker while enabled -func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { +func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) { for { select { case <-time.After(period): diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index f93a0ef27..c30eb3cfe 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -822,7 +822,7 @@ func (b *EvalBroker) Stats() *BrokerStats { } // EmitStats is used to export metrics about the broker while enabled -func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) { +func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) { for { select { case <-time.After(period): diff --git a/nomad/plan_queue.go b/nomad/plan_queue.go index db031a3e2..92d81d096 100644 --- a/nomad/plan_queue.go +++ b/nomad/plan_queue.go @@ -195,7 +195,7 @@ func (q *PlanQueue) Stats() *QueueStats { } // EmitStats is used to export metrics about the broker while enabled -func (q *PlanQueue) EmitStats(period time.Duration, stopCh chan struct{}) { +func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) { for { select { case <-time.After(period): diff --git a/nomad/server.go b/nomad/server.go index d14aa7829..612be5da6 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -221,8 +221,11 @@ type Server struct { left bool shutdown bool - shutdownCh chan struct{} shutdownLock sync.Mutex + + shutdownCtx context.Context + shutdownCancel context.CancelFunc + shutdownCh <-chan struct{} } // Holds the RPC endpoints @@ -303,9 +306,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error) blockedEvals: NewBlockedEvals(evalBroker, logger), rpcTLS: incomingTLS, aclCache: aclCache, - shutdownCh: make(chan struct{}), } + s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background()) + s.shutdownCh = s.shutdownCtx.Done() + // Create the RPC handler s.rpcHandler = newRpcHandler(s) @@ -530,7 +535,7 @@ func (s *Server) Shutdown() error { } s.shutdown = true - close(s.shutdownCh) + s.shutdownCancel() if s.serf != nil { s.serf.Shutdown() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fc3089999..8bb3b3352 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -101,6 +101,61 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { return snap, nil } +// SnapshotAfter is used to create a point in time snapshot where the index is +// guaranteed to be greater than or equal to the index parameter. +// +// Some server operations (such as scheduling) exchange objects via RPC +// concurrent with Raft log application, so they must ensure the state store +// snapshot they are operating on is at or after the index the objects +// retrieved via RPC were applied to the Raft log at. +// +// Callers should maintain their own timer metric as the time this method +// blocks indicates Raft log application latency relative to scheduling. +func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) { + // Ported from work.go:waitForIndex prior to 0.9 + + const backoffBase = 20 * time.Millisecond + const backoffLimit = 1 * time.Second + var retries uint + var retryTimer *time.Timer + + // XXX: Potential optimization is to set up a watch on the state + // store's index table and only unblock via a trigger rather than + // polling. + for { + // Get the states current index + snapshotIndex, err := s.LatestIndex() + if err != nil { + return nil, fmt.Errorf("failed to determine state store's index: %v", err) + } + + // We only need the FSM state to be as recent as the given index + if snapshotIndex >= index { + return s.Snapshot() + } + + // Exponential back off + retries++ + if retryTimer == nil { + // First retry, start at baseline + retryTimer = time.NewTimer(backoffBase) + } else { + // Subsequent retry, reset timer + deadline := 1 << (2 * retries) * backoffBase + if deadline > backoffLimit { + deadline = backoffLimit + } + retryTimer.Reset(deadline) + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-retryTimer.C: + } + } +} + // Restore is used to optimize the efficiency of rebuilding // state by minimizing the number of transactions and checking // overhead. diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 69819f255..3cc7a23ce 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -7118,6 +7118,100 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi require.Nil(denormalizedAllocs) } +// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks +// until the StateStore's latest index is >= the requested index. +func TestStateStore_SnapshotAfter_OK(t *testing.T) { + t.Parallel() + + s := testStateStore(t) + index, err := s.LatestIndex() + require.NoError(t, err) + + node := mock.Node() + require.NoError(t, s.UpsertNode(index+1, node)) + + // Assert SnapshotAfter returns immediately if index < latest index + ctx, cancel := context.WithTimeout(context.Background(), 0) + snap, err := s.SnapshotAfter(ctx, index) + cancel() + require.NoError(t, err) + + snapIndex, err := snap.LatestIndex() + require.NoError(t, err) + if snapIndex <= index { + require.Fail(t, "snapshot index should be greater than index") + } + + // Assert SnapshotAfter returns immediately if index == latest index + ctx, cancel = context.WithTimeout(context.Background(), 0) + snap, err = s.SnapshotAfter(ctx, index+1) + cancel() + require.NoError(t, err) + + snapIndex, err = snap.LatestIndex() + require.NoError(t, err) + require.Equal(t, snapIndex, index+1) + + // Assert SnapshotAfter blocks if index > latest index + errCh := make(chan error, 1) + ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + go func() { + defer close(errCh) + waitIndex := index + 2 + snap, err := s.SnapshotAfter(ctx, waitIndex) + if err != nil { + errCh <- err + return + } + + snapIndex, err := snap.LatestIndex() + if err != nil { + errCh <- err + return + } + + if snapIndex < waitIndex { + errCh <- fmt.Errorf("snapshot index < wait index: %d < %d", snapIndex, waitIndex) + return + } + }() + + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(500 * time.Millisecond): + // Let it block for a bit before unblocking by upserting + } + + node.Name = "hal" + require.NoError(t, s.UpsertNode(index+2, node)) + + select { + case err := <-errCh: + require.NoError(t, err) + case <-time.After(5 * time.Second): + require.Fail(t, "timed out waiting for SnapshotAfter to unblock") + } +} + +// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter +// returns an error if the desired index is not reached within the deadline. +func TestStateStore_SnapshotAfter_Timeout(t *testing.T) { + t.Parallel() + + s := testStateStore(t) + index, err := s.LatestIndex() + require.NoError(t, err) + + // Assert SnapshotAfter blocks if index > latest index + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + snap, err := s.SnapshotAfter(ctx, index+1) + require.EqualError(t, err, context.DeadlineExceeded.Error()) + require.Nil(t, snap) +} + // watchFired is a helper for unit tests that returns if the given watch set // fired (it doesn't care which watch actually fired). This uses a fixed // timeout since we already expect the event happened before calling this and diff --git a/nomad/vault.go b/nomad/vault.go index 1e252ff69..b3fe6efde 100644 --- a/nomad/vault.go +++ b/nomad/vault.go @@ -131,7 +131,7 @@ type VaultClient interface { // EmitStats emits that clients statistics at the given period until stopCh // is called. - EmitStats(period time.Duration, stopCh chan struct{}) + EmitStats(period time.Duration, stopCh <-chan struct{}) } // VaultStats returns all the stats about Vault tokens created and managed by @@ -1313,7 +1313,7 @@ func (v *vaultClient) stats() *VaultStats { } // EmitStats is used to export metrics about the blocked eval tracker while enabled -func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) { +func (v *vaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) { for { select { case <-time.After(period): diff --git a/nomad/vault_testing.go b/nomad/vault_testing.go index 9ac8f30f4..ba0b1d923 100644 --- a/nomad/vault_testing.go +++ b/nomad/vault_testing.go @@ -135,9 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs return nil } -func (v *TestVaultClient) Stop() {} -func (v *TestVaultClient) SetActive(enabled bool) {} -func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil } -func (v *TestVaultClient) Running() bool { return true } -func (v *TestVaultClient) Stats() map[string]string { return map[string]string{} } -func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {} +func (v *TestVaultClient) Stop() {} +func (v *TestVaultClient) SetActive(enabled bool) {} +func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil } +func (v *TestVaultClient) Running() bool { return true } +func (v *TestVaultClient) Stats() map[string]string { return map[string]string{} } +func (v *TestVaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) {} diff --git a/nomad/worker.go b/nomad/worker.go index a50c58e68..74042f01c 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "strings" "sync" @@ -9,6 +10,7 @@ import ( metrics "github.com/armon/go-metrics" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" ) @@ -21,9 +23,6 @@ const ( // but that is much slower than backoffBaselineFast backoffBaselineSlow = 500 * time.Millisecond - // backoffLimitFast is the limit of the exponential backoff - backoffLimitFast = time.Second - // backoffLimitSlow is the limit of the exponential backoff for // the slower backoff backoffLimitSlow = 10 * time.Second @@ -119,14 +118,15 @@ func (w *Worker) run() { } // Wait for the raft log to catchup to the evaluation - if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil { + snap, err := w.snapshotAfter(waitIndex, raftSyncLimit) + if err != nil { w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex) w.sendAck(eval.ID, token, false) continue } // Invoke the scheduler to determine placements - if err := w.invokeScheduler(eval, token); err != nil { + if err := w.invokeScheduler(snap, eval, token); err != nil { w.logger.Error("error invoking scheduler", "error", err) w.sendAck(eval.ID, token, false) continue @@ -224,56 +224,24 @@ func (w *Worker) sendAck(evalID, token string, ack bool) { } } -// waitForIndex ensures that the local state is at least as fresh -// as the given index. This is used before starting an evaluation, -// but also potentially mid-stream. If a Plan fails because of stale -// state (attempt to allocate to a failed/dead node), we may need -// to sync our state again and do the planning with more recent data. -func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error { - // XXX: Potential optimization is to set up a watch on the state stores - // index table and only unblock via a trigger rather than timing out and - // checking. - +// snapshotAfter times calls to StateStore.SnapshotAfter which may block. +func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) { start := time.Now() - defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) -CHECK: - // Get the states current index - snapshotIndex, err := w.srv.fsm.State().LatestIndex() - if err != nil { - return fmt.Errorf("failed to determine state store's index: %v", err) - } - - // We only need the FSM state to be as recent as the given index - if index <= snapshotIndex { - w.backoffReset() - return nil - } - - // Check if we've reached our limit - if time.Now().Sub(start) > timeout { - return fmt.Errorf("sync wait timeout reached") - } - - // Exponential back off if we haven't yet reached it - if w.backoffErr(backoffBaselineFast, backoffLimitFast) { - return fmt.Errorf("shutdown while waiting for state sync") - } - goto CHECK + ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout) + snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex) + cancel() + metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) + return snap, err } // invokeScheduler is used to invoke the business logic of the scheduler -func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error { +func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evaluation, token string) error { defer metrics.MeasureSince([]string{"nomad", "worker", "invoke_scheduler", eval.Type}, time.Now()) // Store the evaluation token w.evalToken = token - // Snapshot the current state - snap, err := w.srv.fsm.State().Snapshot() - if err != nil { - return fmt.Errorf("failed to snapshot state: %v", err) - } - // Store the snapshot's index + var err error w.snapshotIndex, err = snap.LatestIndex() if err != nil { return fmt.Errorf("failed to determine snapshot's index: %v", err) @@ -352,16 +320,12 @@ SUBMIT: if result.RefreshIndex != 0 { // Wait for the raft log to catchup to the evaluation w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID) - if err := w.waitForIndex(result.RefreshIndex, raftSyncLimit); err != nil { + + var err error + state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit) + if err != nil { return nil, nil, err } - - // Snapshot the current state - snap, err := w.srv.fsm.State().Snapshot() - if err != nil { - return nil, nil, fmt.Errorf("failed to snapshot state: %v", err) - } - state = snap } // Return the result and potential state update diff --git a/nomad/worker_test.go b/nomad/worker_test.go index 4bc8628af..f04921d8e 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -1,14 +1,15 @@ package nomad import ( + "context" "reflect" - "strings" "sync" "testing" "time" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" + "github.com/stretchr/testify/require" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" @@ -286,26 +287,26 @@ func TestWorker_waitForIndex(t *testing.T) { index := s1.raft.AppliedIndex() // Cause an increment + errCh := make(chan error, 1) go func() { time.Sleep(10 * time.Millisecond) n := mock.Node() - if err := s1.fsm.state.UpsertNode(index+1, n); err != nil { - t.Fatalf("failed to upsert node: %v", err) - } + errCh <- s1.fsm.state.UpsertNode(index+1, n) }() // Wait for a future index w := &Worker{srv: s1, logger: s1.logger} - err := w.waitForIndex(index+1, time.Second) - if err != nil { - t.Fatalf("err: %v", err) - } + snap, err := w.snapshotAfter(index+1, time.Second) + require.NoError(t, err) + require.NotNil(t, snap) + + // No error from upserting + require.NoError(t, <-errCh) // Cause a timeout - err = w.waitForIndex(index+100, 10*time.Millisecond) - if err == nil || !strings.Contains(err.Error(), "timeout") { - t.Fatalf("err: %v", err) - } + snap, err = w.snapshotAfter(index+100, 10*time.Millisecond) + require.Nil(t, snap) + require.EqualError(t, err, context.DeadlineExceeded.Error()) } func TestWorker_invokeScheduler(t *testing.T) { @@ -320,10 +321,11 @@ func TestWorker_invokeScheduler(t *testing.T) { eval := mock.Eval() eval.Type = "noop" - err := w.invokeScheduler(eval, uuid.Generate()) - if err != nil { - t.Fatalf("err: %v", err) - } + snap, err := s1.fsm.state.Snapshot() + require.NoError(t, err) + + err = w.invokeScheduler(snap, eval, uuid.Generate()) + require.NoError(t, err) } func TestWorker_SubmitPlan(t *testing.T) { From 6ca9f0cd92d55822c76dababbc76263bd4ad05e6 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 12 Mar 2019 14:28:05 -0700 Subject: [PATCH 2/4] nomad: remove unused NotifyGroup struct I don't think it's been used for a long time. --- nomad/state/notify.go | 62 -------------------------------- nomad/state/notify_test.go | 72 -------------------------------------- 2 files changed, 134 deletions(-) delete mode 100644 nomad/state/notify.go delete mode 100644 nomad/state/notify_test.go diff --git a/nomad/state/notify.go b/nomad/state/notify.go deleted file mode 100644 index 7a8a8d792..000000000 --- a/nomad/state/notify.go +++ /dev/null @@ -1,62 +0,0 @@ -package state - -import ( - "sync" -) - -// NotifyGroup is used to allow a simple notification mechanism. -// Channels can be marked as waiting, and when notify is invoked, -// all the waiting channels get a message and are cleared from the -// notify list. -type NotifyGroup struct { - l sync.Mutex - notify map[chan struct{}]struct{} -} - -// Notify will do a non-blocking send to all waiting channels, and -// clear the notify list -func (n *NotifyGroup) Notify() { - n.l.Lock() - defer n.l.Unlock() - for ch := range n.notify { - select { - case ch <- struct{}{}: - default: - } - } - n.notify = nil -} - -// Wait adds a channel to the notify group -func (n *NotifyGroup) Wait(ch chan struct{}) { - n.l.Lock() - defer n.l.Unlock() - if n.notify == nil { - n.notify = make(map[chan struct{}]struct{}) - } - n.notify[ch] = struct{}{} -} - -// Clear removes a channel from the notify group -func (n *NotifyGroup) Clear(ch chan struct{}) { - n.l.Lock() - defer n.l.Unlock() - if n.notify == nil { - return - } - delete(n.notify, ch) -} - -// WaitCh allocates a channel that is subscribed to notifications -func (n *NotifyGroup) WaitCh() chan struct{} { - ch := make(chan struct{}, 1) - n.Wait(ch) - return ch -} - -// Empty checks if there are no channels to notify -func (n *NotifyGroup) Empty() bool { - n.l.Lock() - defer n.l.Unlock() - return len(n.notify) == 0 -} diff --git a/nomad/state/notify_test.go b/nomad/state/notify_test.go deleted file mode 100644 index 34c14f46d..000000000 --- a/nomad/state/notify_test.go +++ /dev/null @@ -1,72 +0,0 @@ -package state - -import ( - "testing" -) - -func TestNotifyGroup(t *testing.T) { - grp := &NotifyGroup{} - - ch1 := grp.WaitCh() - ch2 := grp.WaitCh() - - select { - case <-ch1: - t.Fatalf("should block") - default: - } - select { - case <-ch2: - t.Fatalf("should block") - default: - } - - grp.Notify() - - select { - case <-ch1: - default: - t.Fatalf("should not block") - } - select { - case <-ch2: - default: - t.Fatalf("should not block") - } - - // Should be unregistered - ch3 := grp.WaitCh() - grp.Notify() - - select { - case <-ch1: - t.Fatalf("should block") - default: - } - select { - case <-ch2: - t.Fatalf("should block") - default: - } - select { - case <-ch3: - default: - t.Fatalf("should not block") - } -} - -func TestNotifyGroup_Clear(t *testing.T) { - grp := &NotifyGroup{} - - ch1 := grp.WaitCh() - grp.Clear(ch1) - - grp.Notify() - - // Should not get anything - select { - case <-ch1: - t.Fatalf("should not get message") - default: - } -} From b5fec1c77653ab7508b7505ab15472bc3c47374c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Tue, 12 Mar 2019 14:41:46 -0700 Subject: [PATCH 3/4] nomad: wait for state store to sync in plan apply Wait for state store to catch up with raft when applying plans. --- nomad/plan_apply.go | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/nomad/plan_apply.go b/nomad/plan_apply.go index f9b8d7c63..39ae8d1e0 100644 --- a/nomad/plan_apply.go +++ b/nomad/plan_apply.go @@ -1,6 +1,7 @@ package nomad import ( + "context" "fmt" "runtime" "time" @@ -99,7 +100,15 @@ func (p *planner) planApply() { // Snapshot the state so that we have a consistent view of the world // if no snapshot is available if waitCh == nil || snap == nil { - snap, err = p.fsm.State().Snapshot() + // Wait up to 5s for raft to catch up. Timing out + // causes the eval to be nacked and retried on another + // server, so timing out too quickly could cause + // greater scheduling latency than if we just waited + // longer here. + const indexTimeout = 5 * time.Second + ctx, cancel := context.WithTimeout(context.Background(), indexTimeout) + snap, err = p.fsm.State().SnapshotAfter(ctx, p.raft.LastIndex()) + cancel() if err != nil { p.logger.Error("failed to snapshot state", "error", err) pending.respond(nil, err) From a2e4f124bc81510e2dc1a0bd634726d333173ef0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 17 May 2019 14:37:42 -0700 Subject: [PATCH 4/4] nomad: emit more detailed error Avoid returning context.DeadlineExceeded as it lacks helpful information and is often ignored or handled specially by callers. --- nomad/worker.go | 6 ++++++ nomad/worker_test.go | 9 ++++++--- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/nomad/worker.go b/nomad/worker.go index 74042f01c..192a078d5 100644 --- a/nomad/worker.go +++ b/nomad/worker.go @@ -231,6 +231,12 @@ func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state. snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex) cancel() metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start) + + // Wrap error to ensure callers don't disregard timeouts. + if err == context.DeadlineExceeded { + err = fmt.Errorf("timed out after %s waiting for index=%d", timeout, waitIndex) + } + return snap, err } diff --git a/nomad/worker_test.go b/nomad/worker_test.go index f04921d8e..70b0a1c99 100644 --- a/nomad/worker_test.go +++ b/nomad/worker_test.go @@ -1,7 +1,7 @@ package nomad import ( - "context" + "fmt" "reflect" "sync" "testing" @@ -304,9 +304,12 @@ func TestWorker_waitForIndex(t *testing.T) { require.NoError(t, <-errCh) // Cause a timeout - snap, err = w.snapshotAfter(index+100, 10*time.Millisecond) + waitIndex := index + 100 + timeout := 10 * time.Millisecond + snap, err = w.snapshotAfter(index+100, timeout) require.Nil(t, snap) - require.EqualError(t, err, context.DeadlineExceeded.Error()) + require.EqualError(t, err, + fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex)) } func TestWorker_invokeScheduler(t *testing.T) {