Update state store and blocking query helper

This commit is contained in:
Alex Dadgar
2017-02-05 12:03:11 -08:00
parent 62bf777a13
commit 7128558aeb
5 changed files with 71 additions and 100 deletions

View File

@@ -5,6 +5,7 @@ import (
"io"
"log"
"reflect"
"sync"
"time"
"github.com/armon/go-metrics"
@@ -50,6 +51,12 @@ type nomadFSM struct {
logger *log.Logger
state *state.StateStore
timetable *TimeTable
// stateLock is only used to protect outside callers to State() from
// racing with Restore(), which is called by Raft (it puts in a totally
// new state store). Everything internal here is synchronized by the
// Raft side, so doesn't need to lock this.
stateLock sync.RWMutex
}
// nomadSnapshot is used to provide a snapshot of the current
@@ -92,6 +99,8 @@ func (n *nomadFSM) Close() error {
// State is used to return a handle to the current state
func (n *nomadFSM) State() *state.StateStore {
n.stateLock.RLock()
defer n.stateLock.RUnlock()
return n.state
}
@@ -678,6 +687,18 @@ func (n *nomadFSM) Restore(old io.ReadCloser) error {
}
}
// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.
n.stateLock.Lock()
stateOld := n.state
n.state = newState
n.stateLock.Unlock()
// Signal that the old state store has been abandoned. This is required
// because we don't operate on it any more, we just throw it away, so
// blocking queries won't see any changes and need to be woken up.
stateOld.Abandon()
return nil
}

View File

@@ -12,6 +12,7 @@ import (
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/lib"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
@@ -321,19 +322,25 @@ func (s *Server) setQueryMeta(m *structs.QueryMeta) {
}
}
// queryFn is used to perform a query operation. If a re-query is needed, the
// passed-in watch set will be used to block for changes. The passed-in state
// store should be used (vs. calling fsm.State()) since the given state store
// will be correctly watched for changes if the state store is restored from
// a snapshot.
type queryFn func(memdb.WatchSet, *state.StateStore) error
// blockingOptions is used to parameterize blockingRPC
type blockingOptions struct {
queryOpts *structs.QueryOptions
queryMeta *structs.QueryMeta
watch watch.Items
run func() error
run queryFn
}
// blockingRPC is used for queries that need to wait for a
// minimum index. This is used to block and wait for changes.
func (s *Server) blockingRPC(opts *blockingOptions) error {
var timeout *time.Timer
var notifyCh chan struct{}
var state *state.StateStore
// Fast path non-blocking
@@ -353,36 +360,38 @@ func (s *Server) blockingRPC(opts *blockingOptions) error {
// Setup a query timeout
timeout = time.NewTimer(opts.queryOpts.MaxQueryTime)
// Setup the notify channel
notifyCh = make(chan struct{}, 1)
// Ensure we tear down any watchers on return
state = s.fsm.State()
defer func() {
timeout.Stop()
state.StopWatch(opts.watch, notifyCh)
}()
REGISTER_NOTIFY:
// Register the notification channel. This may be done
// multiple times if we have not reached the target wait index.
state.Watch(opts.watch, notifyCh)
defer timeout.Stop()
RUN_QUERY:
// Update the query meta data
s.setQueryMeta(opts.queryMeta)
// Run the query function
// Increment the rpc query counter
metrics.IncrCounter([]string{"nomad", "rpc", "query"}, 1)
err := opts.run()
// Operate on a consistent set of state. This makes sure that the
// abandon channel goes with the state that the caller is using to
// build watches.
state = s.fsm.State()
// We can skip all watch tracking if this isn't a blocking query.
var ws memdb.WatchSet
if queryOpts.MinQueryIndex > 0 {
ws = memdb.NewWatchSet()
// This channel will be closed if a snapshot is restored and the
// whole state store is abandoned.
ws.Add(state.AbandonCh())
}
// Block up to the timeout if we didn't see anything fresh.
err := fn(ws, state)
// Check for minimum query time
if err == nil && opts.queryOpts.MinQueryIndex > 0 && opts.queryMeta.Index <= opts.queryOpts.MinQueryIndex {
select {
case <-notifyCh:
goto REGISTER_NOTIFY
case <-timeout.C:
if expired := ws.Watch(timeout.C); !expired {
// XXX James can do this behavior too
goto RUN_QUERY
}
}
return err

View File

@@ -29,6 +29,10 @@ type StateStore struct {
logger *log.Logger
db *memdb.MemDB
watch *stateWatch
// abandonCh is used to signal watchers that this state store has been
// abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{}
}
// NewStateStore is used to create a new state store
@@ -41,9 +45,10 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
// Create the state store
s := &StateStore{
logger: log.New(logOutput, "", log.LstdFlags),
db: db,
watch: newStateWatch(),
logger: log.New(logOutput, "", log.LstdFlags),
db: db,
watch: newStateWatch(),
abandonCh: make(chan struct{}),
}
return s, nil
}
@@ -80,9 +85,16 @@ func (s *StateStore) Watch(items watch.Items, notify chan struct{}) {
s.watch.watch(items, notify)
}
// StopWatch unsubscribes a channel from a set of watch items.
func (s *StateStore) StopWatch(items watch.Items, notify chan struct{}) {
s.watch.stopWatch(items, notify)
// AbandonCh returns a channel you can wait on to know if the state store was
// abandoned.
func (s *StateStore) AbandonCh() <-chan struct{} {
return s.abandonCh
}
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *StateStore) Abandon() {
close(s.abandonCh)
}
// UpsertJobSummary upserts a job summary into the state store.

View File

@@ -1,40 +0,0 @@
package watch
// The watch package provides a means of describing a watch for a blocking
// query. It is exported so it may be shared between Nomad's RPC layer and
// the underlying state store.
// Item describes the scope of a watch. It is used to provide a uniform
// input for subscribe/unsubscribe and notification firing. Specifying
// multiple fields does not place a watch on multiple items. Each Item
// describes exactly one scoped watch.
type Item struct {
Alloc string
AllocEval string
AllocJob string
AllocNode string
Eval string
EvalJob string
Job string
JobSummary string
Node string
Table string
}
// Items is a helper used to construct a set of watchItems. It deduplicates
// the items as they are added using map keys.
type Items map[Item]struct{}
// NewItems creates a new Items set and adds the given items.
func NewItems(items ...Item) Items {
wi := make(Items)
for _, item := range items {
wi.Add(item)
}
return wi
}
// Add adds an item to the watch set.
func (wi Items) Add(i Item) {
wi[i] = struct{}{}
}

View File

@@ -1,31 +0,0 @@
package watch
import (
"testing"
)
func TestWatchItems(t *testing.T) {
// Creates an empty set of items
wi := NewItems()
if len(wi) != 0 {
t.Fatalf("expect 0 items, got: %#v", wi)
}
// Creates a new set of supplied items
wi = NewItems(Item{Table: "foo"})
if len(wi) != 1 {
t.Fatalf("expected 1 item, got: %#v", wi)
}
// Adding items works
wi.Add(Item{Node: "bar"})
if len(wi) != 2 {
t.Fatalf("expected 2 items, got: %#v", wi)
}
// Adding duplicates auto-dedupes
wi.Add(Item{Table: "foo"})
if len(wi) != 2 {
t.Fatalf("expected 2 items, got: %#v", wi)
}
}