diff --git a/nomad/state/notify.go b/nomad/state/notify.go new file mode 100644 index 000000000..180482369 --- /dev/null +++ b/nomad/state/notify.go @@ -0,0 +1,62 @@ +package state + +import ( + "sync" +) + +// NotifyGroup is used to allow a simple notification mechanism. +// Channels can be marked as waiting, and when notify is invoked, +// all the waiting channels get a message and are cleared from the +// notify list. +type NotifyGroup struct { + l sync.Mutex + notify map[chan struct{}]struct{} +} + +// Notify will do a non-blocking send to all waiting channels, and +// clear the notify list +func (n *NotifyGroup) Notify() { + n.l.Lock() + defer n.l.Unlock() + for ch, _ := range n.notify { + select { + case ch <- struct{}{}: + default: + } + } + n.notify = nil +} + +// Wait adds a channel to the notify group +func (n *NotifyGroup) Wait(ch chan struct{}) { + n.l.Lock() + defer n.l.Unlock() + if n.notify == nil { + n.notify = make(map[chan struct{}]struct{}) + } + n.notify[ch] = struct{}{} +} + +// Clear removes a channel from the notify group +func (n *NotifyGroup) Clear(ch chan struct{}) { + n.l.Lock() + defer n.l.Unlock() + if n.notify == nil { + return + } + delete(n.notify, ch) +} + +// WaitCh allocates a channel that is subscribed to notifications +func (n *NotifyGroup) WaitCh() chan struct{} { + ch := make(chan struct{}, 1) + n.Wait(ch) + return ch +} + +// Empty checks if there are no channels to notify +func (n *NotifyGroup) Empty() bool { + n.l.Lock() + defer n.l.Unlock() + return len(n.notify) == 0 +} diff --git a/nomad/state/notify_test.go b/nomad/state/notify_test.go new file mode 100644 index 000000000..34c14f46d --- /dev/null +++ b/nomad/state/notify_test.go @@ -0,0 +1,72 @@ +package state + +import ( + "testing" +) + +func TestNotifyGroup(t *testing.T) { + grp := &NotifyGroup{} + + ch1 := grp.WaitCh() + ch2 := grp.WaitCh() + + select { + case <-ch1: + t.Fatalf("should block") + default: + } + select { + case <-ch2: + t.Fatalf("should block") + default: + } + + grp.Notify() + + select { + case <-ch1: + default: + t.Fatalf("should not block") + } + select { + case <-ch2: + default: + t.Fatalf("should not block") + } + + // Should be unregistered + ch3 := grp.WaitCh() + grp.Notify() + + select { + case <-ch1: + t.Fatalf("should block") + default: + } + select { + case <-ch2: + t.Fatalf("should block") + default: + } + select { + case <-ch3: + default: + t.Fatalf("should not block") + } +} + +func TestNotifyGroup_Clear(t *testing.T) { + grp := &NotifyGroup{} + + ch1 := grp.WaitCh() + grp.Clear(ch1) + + grp.Notify() + + // Should not get anything + select { + case <-ch1: + t.Fatalf("should not get message") + default: + } +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index f8c568ca5..37d5501fc 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,6 +4,7 @@ import ( "fmt" "io" "log" + "sync" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" @@ -19,6 +20,7 @@ import ( type StateStore struct { logger *log.Logger db *memdb.MemDB + watch *stateWatch } // StateSnapshot is used to provide a point-in-time snapshot @@ -30,7 +32,9 @@ type StateSnapshot struct { // restoring state by only using a single large transaction // instead of thousands of sub transactions type StateRestore struct { - txn *memdb.Txn + txn *memdb.Txn + watch *stateWatch + allocNodes map[string]struct{} } // Abort is used to abort the restore operation @@ -40,6 +44,7 @@ func (s *StateRestore) Abort() { // Commit is used to commit the restore operation func (s *StateRestore) Commit() { + s.txn.Defer(func() { s.watch.notifyAllocs(s.allocNodes) }) s.txn.Commit() } @@ -50,6 +55,13 @@ type IndexEntry struct { Value uint64 } +// stateWatch holds shared state for watching updates. This is +// outside of StateStore so it can be shared with snapshots. +type stateWatch struct { + allocs map[string]*NotifyGroup + allocLock sync.Mutex +} + // NewStateStore is used to create a new state store func NewStateStore(logOutput io.Writer) (*StateStore, error) { // Create the MemDB @@ -58,10 +70,16 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) { return nil, fmt.Errorf("state store setup failed: %v", err) } + // Create the watch entry + watch := &stateWatch{ + allocs: make(map[string]*NotifyGroup), + } + // Create the state store s := &StateStore{ logger: log.New(logOutput, "", log.LstdFlags), db: db, + watch: watch, } return s, nil } @@ -74,6 +92,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { StateStore: StateStore{ logger: s.logger, db: s.db.Snapshot(), + watch: s.watch, }, } return snap, nil @@ -84,7 +103,56 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) { // overhead. func (s *StateStore) Restore() (*StateRestore, error) { txn := s.db.Txn(true) - return &StateRestore{txn}, nil + r := &StateRestore{ + txn: txn, + watch: s.watch, + allocNodes: make(map[string]struct{}), + } + return r, nil +} + +// WatchAllocs is used to subscribe a channel to changes in allocations for a node +func (s *StateStore) WatchAllocs(node string, notify chan struct{}) { + s.watch.allocLock.Lock() + defer s.watch.allocLock.Unlock() + + // Check for an existing notify group + if grp, ok := s.watch.allocs[node]; ok { + grp.Wait(notify) + return + } + + // Create new notify group + grp := &NotifyGroup{} + grp.Wait(notify) + s.watch.allocs[node] = grp +} + +// StopWatchAllocs is used to unsubscribe a channel from changes in allocations +func (s *StateStore) StopWatchAllocs(node string, notify chan struct{}) { + s.watch.allocLock.Lock() + defer s.watch.allocLock.Unlock() + + // Check for an existing notify group + if grp, ok := s.watch.allocs[node]; ok { + grp.Clear(notify) + if grp.Empty() { + delete(s.watch.allocs, node) + } + } +} + +// notifyAllocs is used to notify any node alloc listeners of a change +func (w *stateWatch) notifyAllocs(nodes map[string]struct{}) { + w.allocLock.Lock() + defer w.allocLock.Unlock() + + for node := range nodes { + if grp, ok := w.allocs[node]; ok { + grp.Notify() + delete(w.allocs, node) + } + } } // RegisterNode is used to register a node or update a node definition @@ -339,6 +407,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { txn := s.db.Txn(true) defer txn.Abort() + nodes := make(map[string]struct{}) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) @@ -361,6 +430,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if existing == nil { continue } + nodes[existing.(*structs.Allocation).NodeID] = struct{}{} if err := txn.Delete("allocs", existing); err != nil { return fmt.Errorf("alloc delete failed: %v", err) } @@ -373,7 +443,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil { return fmt.Errorf("index update failed: %v", err) } - + txn.Defer(func() { s.watch.notifyAllocs(nodes) }) txn.Commit() return nil } @@ -411,6 +481,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string, allocs []*structs.Allocation) error { txn := s.db.Txn(true) defer txn.Abort() + nodes := make(map[string]struct{}) // Handle evictions first for _, evict := range evicts { @@ -425,6 +496,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string, *newAlloc = *existing.(*structs.Allocation) newAlloc.Status = structs.AllocStatusEvict newAlloc.StatusDescription = "" + nodes[newAlloc.NodeID] = struct{}{} if err := txn.Insert("allocs", newAlloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) @@ -444,6 +516,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string, alloc.CreateIndex = existing.(*structs.Allocation).CreateIndex alloc.ModifyIndex = index } + nodes[alloc.NodeID] = struct{}{} if err := txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } @@ -454,6 +527,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string, return fmt.Errorf("index update failed: %v", err) } + txn.Defer(func() { s.watch.notifyAllocs(nodes) }) txn.Commit() return nil } @@ -601,6 +675,7 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error { // AllocRestore is used to restore an allocation func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error { + r.allocNodes[alloc.NodeID] = struct{}{} if err := r.txn.Insert("allocs", alloc); err != nil { return fmt.Errorf("alloc insert failed: %v", err) } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 0568ad068..44a6c0663 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -483,6 +483,9 @@ func TestStateStore_DeleteEval_GetEval(t *testing.T) { t.Fatalf("err: %v", err) } + notify1 := make(chan struct{}, 1) + state.WatchAllocs(alloc.NodeID, notify1) + err = state.DeleteEval(1002, []string{eval.ID, eval2.ID}, []string{alloc.ID, alloc2.ID}) if err != nil { t.Fatalf("err: %v", err) @@ -539,6 +542,12 @@ func TestStateStore_DeleteEval_GetEval(t *testing.T) { if index != 1002 { t.Fatalf("bad: %d", index) } + + select { + case <-notify1: + default: + t.Fatalf("should be notified") + } } func TestStateStore_Evals(t *testing.T) { @@ -631,6 +640,36 @@ func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) { } } +func TestStateStore_WatchAllocs(t *testing.T) { + state := testStateStore(t) + + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + state.WatchAllocs("foo", notify1) + state.WatchAllocs("foo", notify2) + state.StopWatchAllocs("foo", notify2) + + alloc := mock.Alloc() + alloc.NodeID = "foo" + err := state.UpdateAllocations(1000, nil, + []*structs.Allocation{alloc}) + if err != nil { + t.Fatalf("err: %v", err) + } + + select { + case <-notify1: + default: + t.Fatalf("should be notified") + } + + select { + case <-notify2: + t.Fatalf("should not be notified") + default: + } +} + func TestStateStore_UpdateAlloc_GetAlloc(t *testing.T) { state := testStateStore(t) alloc := mock.Alloc()