Merge pull request #5411 from hashicorp/b-snapshotafter

Block plan application until state store has caught up to raft
This commit is contained in:
Michael Schurter
2019-05-20 14:03:10 -07:00
committed by GitHub
13 changed files with 221 additions and 217 deletions

View File

@@ -666,7 +666,7 @@ func (b *BlockedEvals) Stats() *BlockedStats {
}
// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *BlockedEvals) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):

View File

@@ -822,7 +822,7 @@ func (b *EvalBroker) Stats() *BrokerStats {
}
// EmitStats is used to export metrics about the broker while enabled
func (b *EvalBroker) EmitStats(period time.Duration, stopCh chan struct{}) {
func (b *EvalBroker) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):

View File

@@ -1,6 +1,7 @@
package nomad
import (
"context"
"fmt"
"runtime"
"time"
@@ -99,7 +100,15 @@ func (p *planner) planApply() {
// Snapshot the state so that we have a consistent view of the world
// if no snapshot is available
if waitCh == nil || snap == nil {
snap, err = p.fsm.State().Snapshot()
// Wait up to 5s for raft to catch up. Timing out
// causes the eval to be nacked and retried on another
// server, so timing out too quickly could cause
// greater scheduling latency than if we just waited
// longer here.
const indexTimeout = 5 * time.Second
ctx, cancel := context.WithTimeout(context.Background(), indexTimeout)
snap, err = p.fsm.State().SnapshotAfter(ctx, p.raft.LastIndex())
cancel()
if err != nil {
p.logger.Error("failed to snapshot state", "error", err)
pending.respond(nil, err)

View File

@@ -195,7 +195,7 @@ func (q *PlanQueue) Stats() *QueueStats {
}
// EmitStats is used to export metrics about the broker while enabled
func (q *PlanQueue) EmitStats(period time.Duration, stopCh chan struct{}) {
func (q *PlanQueue) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):

View File

@@ -221,8 +221,11 @@ type Server struct {
left bool
shutdown bool
shutdownCh chan struct{}
shutdownLock sync.Mutex
shutdownCtx context.Context
shutdownCancel context.CancelFunc
shutdownCh <-chan struct{}
}
// Holds the RPC endpoints
@@ -303,9 +306,11 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI) (*Server, error)
blockedEvals: NewBlockedEvals(evalBroker, logger),
rpcTLS: incomingTLS,
aclCache: aclCache,
shutdownCh: make(chan struct{}),
}
s.shutdownCtx, s.shutdownCancel = context.WithCancel(context.Background())
s.shutdownCh = s.shutdownCtx.Done()
// Create the RPC handler
s.rpcHandler = newRpcHandler(s)
@@ -530,7 +535,7 @@ func (s *Server) Shutdown() error {
}
s.shutdown = true
close(s.shutdownCh)
s.shutdownCancel()
if s.serf != nil {
s.serf.Shutdown()

View File

@@ -1,62 +0,0 @@
package state
import (
"sync"
)
// NotifyGroup is used to allow a simple notification mechanism.
// Channels can be marked as waiting, and when notify is invoked,
// all the waiting channels get a message and are cleared from the
// notify list.
type NotifyGroup struct {
l sync.Mutex
notify map[chan struct{}]struct{}
}
// Notify will do a non-blocking send to all waiting channels, and
// clear the notify list
func (n *NotifyGroup) Notify() {
n.l.Lock()
defer n.l.Unlock()
for ch := range n.notify {
select {
case ch <- struct{}{}:
default:
}
}
n.notify = nil
}
// Wait adds a channel to the notify group
func (n *NotifyGroup) Wait(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
n.notify = make(map[chan struct{}]struct{})
}
n.notify[ch] = struct{}{}
}
// Clear removes a channel from the notify group
func (n *NotifyGroup) Clear(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
return
}
delete(n.notify, ch)
}
// WaitCh allocates a channel that is subscribed to notifications
func (n *NotifyGroup) WaitCh() chan struct{} {
ch := make(chan struct{}, 1)
n.Wait(ch)
return ch
}
// Empty checks if there are no channels to notify
func (n *NotifyGroup) Empty() bool {
n.l.Lock()
defer n.l.Unlock()
return len(n.notify) == 0
}

View File

@@ -1,72 +0,0 @@
package state
import (
"testing"
)
func TestNotifyGroup(t *testing.T) {
grp := &NotifyGroup{}
ch1 := grp.WaitCh()
ch2 := grp.WaitCh()
select {
case <-ch1:
t.Fatalf("should block")
default:
}
select {
case <-ch2:
t.Fatalf("should block")
default:
}
grp.Notify()
select {
case <-ch1:
default:
t.Fatalf("should not block")
}
select {
case <-ch2:
default:
t.Fatalf("should not block")
}
// Should be unregistered
ch3 := grp.WaitCh()
grp.Notify()
select {
case <-ch1:
t.Fatalf("should block")
default:
}
select {
case <-ch2:
t.Fatalf("should block")
default:
}
select {
case <-ch3:
default:
t.Fatalf("should not block")
}
}
func TestNotifyGroup_Clear(t *testing.T) {
grp := &NotifyGroup{}
ch1 := grp.WaitCh()
grp.Clear(ch1)
grp.Notify()
// Should not get anything
select {
case <-ch1:
t.Fatalf("should not get message")
default:
}
}

View File

@@ -101,6 +101,61 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
return snap, nil
}
// SnapshotAfter is used to create a point in time snapshot where the index is
// guaranteed to be greater than or equal to the index parameter.
//
// Some server operations (such as scheduling) exchange objects via RPC
// concurrent with Raft log application, so they must ensure the state store
// snapshot they are operating on is at or after the index the objects
// retrieved via RPC were applied to the Raft log at.
//
// Callers should maintain their own timer metric as the time this method
// blocks indicates Raft log application latency relative to scheduling.
func (s *StateStore) SnapshotAfter(ctx context.Context, index uint64) (*StateSnapshot, error) {
// Ported from work.go:waitForIndex prior to 0.9
const backoffBase = 20 * time.Millisecond
const backoffLimit = 1 * time.Second
var retries uint
var retryTimer *time.Timer
// XXX: Potential optimization is to set up a watch on the state
// store's index table and only unblock via a trigger rather than
// polling.
for {
// Get the states current index
snapshotIndex, err := s.LatestIndex()
if err != nil {
return nil, fmt.Errorf("failed to determine state store's index: %v", err)
}
// We only need the FSM state to be as recent as the given index
if snapshotIndex >= index {
return s.Snapshot()
}
// Exponential back off
retries++
if retryTimer == nil {
// First retry, start at baseline
retryTimer = time.NewTimer(backoffBase)
} else {
// Subsequent retry, reset timer
deadline := 1 << (2 * retries) * backoffBase
if deadline > backoffLimit {
deadline = backoffLimit
}
retryTimer.Reset(deadline)
}
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-retryTimer.C:
}
}
}
// Restore is used to optimize the efficiency of rebuilding
// state by minimizing the number of transactions and checking
// overhead.

View File

@@ -7118,6 +7118,100 @@ func TestStateSnapshot_DenormalizeAllocationDiffSlice_AllocDoesNotExist(t *testi
require.Nil(denormalizedAllocs)
}
// TestStateStore_SnapshotAfter_OK asserts StateStore.SnapshotAfter blocks
// until the StateStore's latest index is >= the requested index.
func TestStateStore_SnapshotAfter_OK(t *testing.T) {
t.Parallel()
s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)
node := mock.Node()
require.NoError(t, s.UpsertNode(index+1, node))
// Assert SnapshotAfter returns immediately if index < latest index
ctx, cancel := context.WithTimeout(context.Background(), 0)
snap, err := s.SnapshotAfter(ctx, index)
cancel()
require.NoError(t, err)
snapIndex, err := snap.LatestIndex()
require.NoError(t, err)
if snapIndex <= index {
require.Fail(t, "snapshot index should be greater than index")
}
// Assert SnapshotAfter returns immediately if index == latest index
ctx, cancel = context.WithTimeout(context.Background(), 0)
snap, err = s.SnapshotAfter(ctx, index+1)
cancel()
require.NoError(t, err)
snapIndex, err = snap.LatestIndex()
require.NoError(t, err)
require.Equal(t, snapIndex, index+1)
// Assert SnapshotAfter blocks if index > latest index
errCh := make(chan error, 1)
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
go func() {
defer close(errCh)
waitIndex := index + 2
snap, err := s.SnapshotAfter(ctx, waitIndex)
if err != nil {
errCh <- err
return
}
snapIndex, err := snap.LatestIndex()
if err != nil {
errCh <- err
return
}
if snapIndex < waitIndex {
errCh <- fmt.Errorf("snapshot index < wait index: %d < %d", snapIndex, waitIndex)
return
}
}()
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(500 * time.Millisecond):
// Let it block for a bit before unblocking by upserting
}
node.Name = "hal"
require.NoError(t, s.UpsertNode(index+2, node))
select {
case err := <-errCh:
require.NoError(t, err)
case <-time.After(5 * time.Second):
require.Fail(t, "timed out waiting for SnapshotAfter to unblock")
}
}
// TestStateStore_SnapshotAfter_Timeout asserts StateStore.SnapshotAfter
// returns an error if the desired index is not reached within the deadline.
func TestStateStore_SnapshotAfter_Timeout(t *testing.T) {
t.Parallel()
s := testStateStore(t)
index, err := s.LatestIndex()
require.NoError(t, err)
// Assert SnapshotAfter blocks if index > latest index
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
snap, err := s.SnapshotAfter(ctx, index+1)
require.EqualError(t, err, context.DeadlineExceeded.Error())
require.Nil(t, snap)
}
// watchFired is a helper for unit tests that returns if the given watch set
// fired (it doesn't care which watch actually fired). This uses a fixed
// timeout since we already expect the event happened before calling this and

View File

@@ -131,7 +131,7 @@ type VaultClient interface {
// EmitStats emits that clients statistics at the given period until stopCh
// is called.
EmitStats(period time.Duration, stopCh chan struct{})
EmitStats(period time.Duration, stopCh <-chan struct{})
}
// VaultStats returns all the stats about Vault tokens created and managed by
@@ -1313,7 +1313,7 @@ func (v *vaultClient) stats() *VaultStats {
}
// EmitStats is used to export metrics about the blocked eval tracker while enabled
func (v *vaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {
func (v *vaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) {
for {
select {
case <-time.After(period):

View File

@@ -135,9 +135,9 @@ func (v *TestVaultClient) RevokeTokens(ctx context.Context, accessors []*structs
return nil
}
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stats() map[string]string { return map[string]string{} }
func (v *TestVaultClient) EmitStats(period time.Duration, stopCh chan struct{}) {}
func (v *TestVaultClient) Stop() {}
func (v *TestVaultClient) SetActive(enabled bool) {}
func (v *TestVaultClient) SetConfig(config *config.VaultConfig) error { return nil }
func (v *TestVaultClient) Running() bool { return true }
func (v *TestVaultClient) Stats() map[string]string { return map[string]string{} }
func (v *TestVaultClient) EmitStats(period time.Duration, stopCh <-chan struct{}) {}

View File

@@ -1,6 +1,7 @@
package nomad
import (
"context"
"fmt"
"strings"
"sync"
@@ -9,6 +10,7 @@ import (
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
)
@@ -21,9 +23,6 @@ const (
// but that is much slower than backoffBaselineFast
backoffBaselineSlow = 500 * time.Millisecond
// backoffLimitFast is the limit of the exponential backoff
backoffLimitFast = time.Second
// backoffLimitSlow is the limit of the exponential backoff for
// the slower backoff
backoffLimitSlow = 10 * time.Second
@@ -119,14 +118,15 @@ func (w *Worker) run() {
}
// Wait for the raft log to catchup to the evaluation
if err := w.waitForIndex(waitIndex, raftSyncLimit); err != nil {
snap, err := w.snapshotAfter(waitIndex, raftSyncLimit)
if err != nil {
w.logger.Error("error waiting for Raft index", "error", err, "index", waitIndex)
w.sendAck(eval.ID, token, false)
continue
}
// Invoke the scheduler to determine placements
if err := w.invokeScheduler(eval, token); err != nil {
if err := w.invokeScheduler(snap, eval, token); err != nil {
w.logger.Error("error invoking scheduler", "error", err)
w.sendAck(eval.ID, token, false)
continue
@@ -224,56 +224,30 @@ func (w *Worker) sendAck(evalID, token string, ack bool) {
}
}
// waitForIndex ensures that the local state is at least as fresh
// as the given index. This is used before starting an evaluation,
// but also potentially mid-stream. If a Plan fails because of stale
// state (attempt to allocate to a failed/dead node), we may need
// to sync our state again and do the planning with more recent data.
func (w *Worker) waitForIndex(index uint64, timeout time.Duration) error {
// XXX: Potential optimization is to set up a watch on the state stores
// index table and only unblock via a trigger rather than timing out and
// checking.
// snapshotAfter times calls to StateStore.SnapshotAfter which may block.
func (w *Worker) snapshotAfter(waitIndex uint64, timeout time.Duration) (*state.StateSnapshot, error) {
start := time.Now()
defer metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
CHECK:
// Get the states current index
snapshotIndex, err := w.srv.fsm.State().LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine state store's index: %v", err)
ctx, cancel := context.WithTimeout(w.srv.shutdownCtx, timeout)
snap, err := w.srv.fsm.State().SnapshotAfter(ctx, waitIndex)
cancel()
metrics.MeasureSince([]string{"nomad", "worker", "wait_for_index"}, start)
// Wrap error to ensure callers don't disregard timeouts.
if err == context.DeadlineExceeded {
err = fmt.Errorf("timed out after %s waiting for index=%d", timeout, waitIndex)
}
// We only need the FSM state to be as recent as the given index
if index <= snapshotIndex {
w.backoffReset()
return nil
}
// Check if we've reached our limit
if time.Now().Sub(start) > timeout {
return fmt.Errorf("sync wait timeout reached")
}
// Exponential back off if we haven't yet reached it
if w.backoffErr(backoffBaselineFast, backoffLimitFast) {
return fmt.Errorf("shutdown while waiting for state sync")
}
goto CHECK
return snap, err
}
// invokeScheduler is used to invoke the business logic of the scheduler
func (w *Worker) invokeScheduler(eval *structs.Evaluation, token string) error {
func (w *Worker) invokeScheduler(snap *state.StateSnapshot, eval *structs.Evaluation, token string) error {
defer metrics.MeasureSince([]string{"nomad", "worker", "invoke_scheduler", eval.Type}, time.Now())
// Store the evaluation token
w.evalToken = token
// Snapshot the current state
snap, err := w.srv.fsm.State().Snapshot()
if err != nil {
return fmt.Errorf("failed to snapshot state: %v", err)
}
// Store the snapshot's index
var err error
w.snapshotIndex, err = snap.LatestIndex()
if err != nil {
return fmt.Errorf("failed to determine snapshot's index: %v", err)
@@ -352,16 +326,12 @@ SUBMIT:
if result.RefreshIndex != 0 {
// Wait for the raft log to catchup to the evaluation
w.logger.Debug("refreshing state", "refresh_index", result.RefreshIndex, "eval_id", plan.EvalID)
if err := w.waitForIndex(result.RefreshIndex, raftSyncLimit); err != nil {
var err error
state, err = w.snapshotAfter(result.RefreshIndex, raftSyncLimit)
if err != nil {
return nil, nil, err
}
// Snapshot the current state
snap, err := w.srv.fsm.State().Snapshot()
if err != nil {
return nil, nil, fmt.Errorf("failed to snapshot state: %v", err)
}
state = snap
}
// Return the result and potential state update

View File

@@ -1,14 +1,15 @@
package nomad
import (
"fmt"
"reflect"
"strings"
"sync"
"testing"
"time"
log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
@@ -286,26 +287,29 @@ func TestWorker_waitForIndex(t *testing.T) {
index := s1.raft.AppliedIndex()
// Cause an increment
errCh := make(chan error, 1)
go func() {
time.Sleep(10 * time.Millisecond)
n := mock.Node()
if err := s1.fsm.state.UpsertNode(index+1, n); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
errCh <- s1.fsm.state.UpsertNode(index+1, n)
}()
// Wait for a future index
w := &Worker{srv: s1, logger: s1.logger}
err := w.waitForIndex(index+1, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
snap, err := w.snapshotAfter(index+1, time.Second)
require.NoError(t, err)
require.NotNil(t, snap)
// No error from upserting
require.NoError(t, <-errCh)
// Cause a timeout
err = w.waitForIndex(index+100, 10*time.Millisecond)
if err == nil || !strings.Contains(err.Error(), "timeout") {
t.Fatalf("err: %v", err)
}
waitIndex := index + 100
timeout := 10 * time.Millisecond
snap, err = w.snapshotAfter(index+100, timeout)
require.Nil(t, snap)
require.EqualError(t, err,
fmt.Sprintf("timed out after %s waiting for index=%d", timeout, waitIndex))
}
func TestWorker_invokeScheduler(t *testing.T) {
@@ -320,10 +324,11 @@ func TestWorker_invokeScheduler(t *testing.T) {
eval := mock.Eval()
eval.Type = "noop"
err := w.invokeScheduler(eval, uuid.Generate())
if err != nil {
t.Fatalf("err: %v", err)
}
snap, err := s1.fsm.state.Snapshot()
require.NoError(t, err)
err = w.invokeScheduler(snap, eval, uuid.Generate())
require.NoError(t, err)
}
func TestWorker_SubmitPlan(t *testing.T) {