diff --git a/nomad/fsm.go b/nomad/fsm.go index a5c6d86e6..c3270ff00 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -5,6 +5,7 @@ import ( "io" "log" "reflect" + "sync" "time" "github.com/armon/go-metrics" @@ -50,6 +51,12 @@ type nomadFSM struct { logger *log.Logger state *state.StateStore timetable *TimeTable + + // stateLock is only used to protect outside callers to State() from + // racing with Restore(), which is called by Raft (it puts in a totally + // new state store). Everything internal here is synchronized by the + // Raft side, so doesn't need to lock this. + stateLock sync.RWMutex } // nomadSnapshot is used to provide a snapshot of the current @@ -92,6 +99,8 @@ func (n *nomadFSM) Close() error { // State is used to return a handle to the current state func (n *nomadFSM) State() *state.StateStore { + n.stateLock.RLock() + defer n.stateLock.RUnlock() return n.state } @@ -678,6 +687,18 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error { } } + // External code might be calling State(), so we need to synchronize + // here to make sure we swap in the new state store atomically. + n.stateLock.Lock() + stateOld := n.state + n.state = newState + n.stateLock.Unlock() + + // Signal that the old state store has been abandoned. This is required + // because we don't operate on it any more, we just throw it away, so + // blocking queries won't see any changes and need to be woken up. + stateOld.Abandon() + return nil } diff --git a/nomad/rpc.go b/nomad/rpc.go index f9a0ebd17..1357566ff 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -12,6 +12,7 @@ import ( "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" + memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" @@ -321,19 +322,25 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) { } } +// queryFn is used to perform a query operation. If a re-query is needed, the +// passed-in watch set will be used to block for changes. The passed-in state +// store should be used (vs. calling fsm.State()) since the given state store +// will be correctly watched for changes if the state store is restored from +// a snapshot. +type queryFn func(memdb.WatchSet, *state.StateStore) error + // blockingOptions is used to parameterize blockingRPC type blockingOptions struct { queryOpts *structs.QueryOptions queryMeta *structs.QueryMeta watch watch.Items - run func() error + run queryFn } // blockingRPC is used for queries that need to wait for a // minimum index. This is used to block and wait for changes. func (s *Server) blockingRPC(opts *blockingOptions) error { var timeout *time.Timer - var notifyCh chan struct{} var state *state.StateStore // Fast path non-blocking @@ -353,36 +360,38 @@ func (s *Server) blockingRPC(opts *blockingOptions) error { // Setup a query timeout timeout = time.NewTimer(opts.queryOpts.MaxQueryTime) - - // Setup the notify channel - notifyCh = make(chan struct{}, 1) - - // Ensure we tear down any watchers on return - state = s.fsm.State() - defer func() { - timeout.Stop() - state.StopWatch(opts.watch, notifyCh) - }() - -REGISTER_NOTIFY: - // Register the notification channel. This may be done - // multiple times if we have not reached the target wait index. - state.Watch(opts.watch, notifyCh) + defer timeout.Stop() RUN_QUERY: // Update the query meta data s.setQueryMeta(opts.queryMeta) - // Run the query function + // Increment the rpc query counter metrics.IncrCounter([]string{"nomad", "rpc", "query"}, 1) - err := opts.run() + + // Operate on a consistent set of state. This makes sure that the + // abandon channel goes with the state that the caller is using to + // build watches. + state = s.fsm.State() + + // We can skip all watch tracking if this isn't a blocking query. + var ws memdb.WatchSet + if queryOpts.MinQueryIndex > 0 { + ws = memdb.NewWatchSet() + + // This channel will be closed if a snapshot is restored and the + // whole state store is abandoned. + ws.Add(state.AbandonCh()) + } + + // Block up to the timeout if we didn't see anything fresh. + err := fn(ws, state) // Check for minimum query time if err == nil && opts.queryOpts.MinQueryIndex > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex { - select { - case <-notifyCh: - goto REGISTER_NOTIFY - case <-timeout.C: + if expired := ws.Watch(timeout.C); !expired { + // XXX James can do this behavior too + goto RUN_QUERY } } return err diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 65ae22dea..6ce88c659 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -29,6 +29,10 @@ type StateStore struct { logger *log.Logger db *memdb.MemDB watch *stateWatch + + // abandonCh is used to signal watchers that this state store has been + // abandoned (usually during a restore). This is only ever closed. + abandonCh chan struct{} } // NewStateStore is used to create a new state store @@ -41,9 +45,10 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the state store s := &StateStore{ - logger: log.New(logOutput, "", log.LstdFlags), - db: db, - watch: newStateWatch(), + logger: log.New(logOutput, "", log.LstdFlags), + db: db, + watch: newStateWatch(), + abandonCh: make(chan struct{}), } return s, nil } @@ -80,9 +85,16 @@ func (s *StateStore) Watch(items watch.Items, notify chan struct{}) { s.watch.watch(items, notify) } -// StopWatch unsubscribes a channel from a set of watch items. -func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) { - s.watch.stopWatch(items, notify) +// AbandonCh returns a channel you can wait on to know if the state store was +// abandoned. +func (s *StateStore) AbandonCh() <-chan struct{} { + return s.abandonCh +} + +// Abandon is used to signal that the given state store has been abandoned. +// Calling this more than one time will panic. +func (s *StateStore) Abandon() { + close(s.abandonCh) } // UpsertJobSummary upserts a job summary into the state store. diff --git a/nomad/watch/watch.go b/nomad/watch/watch.go deleted file mode 100644 index 8578df33f..000000000 --- a/nomad/watch/watch.go +++ /dev/null @@ -1,40 +0,0 @@ -package watch - -// The watch package provides a means of describing a watch for a blocking -// query. It is exported so it may be shared between Nomad's RPC layer and -// the underlying state store. - -// Item describes the scope of a watch. It is used to provide a uniform -// input for subscribe/unsubscribe and notification firing. Specifying -// multiple fields does not place a watch on multiple items. Each Item -// describes exactly one scoped watch. -type Item struct { - Alloc string - AllocEval string - AllocJob string - AllocNode string - Eval string - EvalJob string - Job string - JobSummary string - Node string - Table string -} - -// Items is a helper used to construct a set of watchItems. It deduplicates -// the items as they are added using map keys. -type Items map[Item]struct{} - -// NewItems creates a new Items set and adds the given items. -func NewItems(items ...Item) Items { - wi := make(Items) - for _, item := range items { - wi.Add(item) - } - return wi -} - -// Add adds an item to the watch set. -func (wi Items) Add(i Item) { - wi[i] = struct{}{} -} diff --git a/nomad/watch/watch_test.go b/nomad/watch/watch_test.go deleted file mode 100644 index 9a8901aa8..000000000 --- a/nomad/watch/watch_test.go +++ /dev/null @@ -1,31 +0,0 @@ -package watch - -import ( - "testing" -) - -func TestWatchItems(t *testing.T) { - // Creates an empty set of items - wi := NewItems() - if len(wi) != 0 { - t.Fatalf("expect 0 items, got: %#v", wi) - } - - // Creates a new set of supplied items - wi = NewItems(Item{Table: "foo"}) - if len(wi) != 1 { - t.Fatalf("expected 1 item, got: %#v", wi) - } - - // Adding items works - wi.Add(Item{Node: "bar"}) - if len(wi) != 2 { - t.Fatalf("expected 2 items, got: %#v", wi) - } - - // Adding duplicates auto-dedupes - wi.Add(Item{Table: "foo"}) - if len(wi) != 2 { - t.Fatalf("expected 2 items, got: %#v", wi) - } -}