mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
nomad: SnapshotAfter -> SnapshotMinIndex
Rename SnapshotAfter to SnapshotMinIndex. The old name was not technically accurate. SnapshotAtOrAfter is more accurate, but wordy and still lacks context about what precisely it is at or after (the index). SnapshotMinIndex was chosen as it describes the action (snapshot), a constraint (minimum), and the object of the constraint (index).
This commit is contained in:
@@ -184,7 +184,7 @@ func (p *planner) snapshotMinIndex(prevPlanResultIndex, planSnapshotIndex uint64
|
||||
|
||||
const timeout = 5 * time.Second
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
snap, err := p.fsm.State().SnapshotAfter(ctx, minIndex)
|
||||
snap, err := p.fsm.State().SnapshotMinIndex(ctx, minIndex)
|
||||
cancel()
|
||||
if err == context.DeadlineExceeded {
|
||||
return nil, fmt.Errorf("timed out after %s waiting for index=%d (previous plan result index=%d; plan snapshot index=%d)",
|
||||
|
||||
@@ -101,7 +101,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||
return snap, nil
|
||||
}
|
||||
|
||||
// SnapshotAfter is used to create a point in time snapshot where the index is
|
||||
// SnapshotMinIndex is used to create a state snapshot where the index is
|
||||
// guaranteed to be greater than or equal to the index parameter.
|
||||
//
|
||||
// Some server operations (such as scheduling) exchange objects via RPC
|
||||
@@ -111,7 +111,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||
//
|
||||
// Callers should maintain their own timer metric as the time this method
|
||||
// blocks indicates Raft log application latency relative to scheduling.
|
||||
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
|
||||
func (s *StateStore) SnapshotMinIndex(ctx context.Context, index uint64) (*StateSnapshot, error) {
|
||||
// Ported from work.go:waitForIndex prior to 0.9
|
||||
|
||||
const backoffBase = 20 * time.Millisecond
|
||||
|
||||
@@ -7138,9 +7138,9 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
|
||||
require.Nil(denormalizedAllocs)
|
||||
}
|
||||
|
||||
// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
|
||||
// TestStateStore_SnapshotMinIndex_OK asserts StateStore.SnapshotMinIndex blocks
|
||||
// until the StateStore's latest index is >= the requested index.
|
||||
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
|
||||
func TestStateStore_SnapshotMinIndex_OK(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := testStateStore(t)
|
||||
@@ -7150,9 +7150,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
|
||||
node := mock.Node()
|
||||
require.NoError(t, s.UpsertNode(index+1, node))
|
||||
|
||||
// Assert SnapshotAfter returns immediately if index < latest index
|
||||
// Assert SnapshotMinIndex returns immediately if index < latest index
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 0)
|
||||
snap, err := s.SnapshotAfter(ctx, index)
|
||||
snap, err := s.SnapshotMinIndex(ctx, index)
|
||||
cancel()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -7162,9 +7162,9 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
|
||||
require.Fail(t, "snapshot index should be greater than index")
|
||||
}
|
||||
|
||||
// Assert SnapshotAfter returns immediately if index == latest index
|
||||
// Assert SnapshotMinIndex returns immediately if index == latest index
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 0)
|
||||
snap, err = s.SnapshotAfter(ctx, index+1)
|
||||
snap, err = s.SnapshotMinIndex(ctx, index+1)
|
||||
cancel()
|
||||
require.NoError(t, err)
|
||||
|
||||
@@ -7172,14 +7172,14 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, snapIndex, index+1)
|
||||
|
||||
// Assert SnapshotAfter blocks if index > latest index
|
||||
// Assert SnapshotMinIndex blocks if index > latest index
|
||||
errCh := make(chan error, 1)
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
|
||||
defer cancel()
|
||||
go func() {
|
||||
defer close(errCh)
|
||||
waitIndex := index + 2
|
||||
snap, err := s.SnapshotAfter(ctx, waitIndex)
|
||||
snap, err := s.SnapshotMinIndex(ctx, waitIndex)
|
||||
if err != nil {
|
||||
errCh <- err
|
||||
return
|
||||
@@ -7211,23 +7211,23 @@ func TestStateStore_SnapshotAfter_OK(t *testing.T) {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(5 * time.Second):
|
||||
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
|
||||
require.Fail(t, "timed out waiting for SnapshotMinIndex to unblock")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
|
||||
// TestStateStore_SnapshotMinIndex_Timeout asserts StateStore.SnapshotMinIndex
|
||||
// returns an error if the desired index is not reached within the deadline.
|
||||
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
|
||||
func TestStateStore_SnapshotMinIndex_Timeout(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
s := testStateStore(t)
|
||||
index, err := s.LatestIndex()
|
||||
require.NoError(t, err)
|
||||
|
||||
// Assert SnapshotAfter blocks if index > latest index
|
||||
// Assert SnapshotMinIndex blocks if index > latest index
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
|
||||
defer cancel()
|
||||
snap, err := s.SnapshotAfter(ctx, index+1)
|
||||
snap, err := s.SnapshotMinIndex(ctx, index+1)
|
||||
require.EqualError(t, err, context.DeadlineExceeded.Error())
|
||||
require.Nil(t, snap)
|
||||
}
|
||||
|
||||
@@ -118,7 +118,7 @@ func (w *Worker) run() {
|
||||
}
|
||||
|
||||
// Wait for the raft log to catchup to the evaluation
|
||||
snap, err := w.snapshotAfter(waitIndex, raftSyncLimit)
|
||||
snap, err := w.snapshotMinIndex(waitIndex, raftSyncLimit)
|
||||
if err != nil {
|
||||
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
|
||||
w.sendAck(eval.ID, token, false)
|
||||
@@ -224,11 +224,11 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
|
||||
}
|
||||
}
|
||||
|
||||
// snapshotAfter times calls to StateStore.SnapshotAfter which may block.
|
||||
func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
|
||||
// snapshotMinIndex times calls to StateStore.SnapshotAfter which may block.
|
||||
func (w *Worker) snapshotMinIndex(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
|
||||
start := time.Now()
|
||||
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout)
|
||||
snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex)
|
||||
snap, err := w.srv.fsm.State().SnapshotMinIndex(ctx, waitIndex)
|
||||
cancel()
|
||||
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
|
||||
|
||||
@@ -332,7 +332,7 @@ SUBMIT:
|
||||
w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID)
|
||||
|
||||
var err error
|
||||
state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit)
|
||||
state, err = w.snapshotMinIndex(result.RefreshIndex, raftSyncLimit)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
@@ -296,7 +296,7 @@ func TestWorker_waitForIndex(t *testing.T) {
|
||||
|
||||
// Wait for a future index
|
||||
w := &Worker{srv: s1, logger: s1.logger}
|
||||
snap, err := w.snapshotAfter(index+1, time.Second)
|
||||
snap, err := w.snapshotMinIndex(index+1, time.Second)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, snap)
|
||||
|
||||
@@ -306,7 +306,7 @@ func TestWorker_waitForIndex(t *testing.T) {
|
||||
// Cause a timeout
|
||||
waitIndex := index + 100
|
||||
timeout := 10 * time.Millisecond
|
||||
snap, err = w.snapshotAfter(index+100, timeout)
|
||||
snap, err = w.snapshotMinIndex(index+100, timeout)
|
||||
require.Nil(t, snap)
|
||||
require.EqualError(t, err,
|
||||
fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))
|
||||
|
||||
Reference in New Issue
Block a user