From 31abf97e06ae51edcdc7b09aa8cee8dadf087ea2 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Thu, 29 Oct 2015 13:21:25 -0700 Subject: [PATCH] nomad: move state watcher into its own file, add tests --- nomad/state/state_store.go | 92 +++------------------------------ nomad/state/state_store_test.go | 10 ++-- nomad/state/watch.go | 86 ++++++++++++++++++++++++++++++ nomad/state/watch_test.go | 64 +++++++++++++++++++++++ 4 files changed, 163 insertions(+), 89 deletions(-) create mode 100644 nomad/state/watch.go create mode 100644 nomad/state/watch_test.go diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 60af402d3..685233447 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -4,7 +4,6 @@ import ( "fmt" "io" "log" - "sync" "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/nomad/structs" @@ -405,6 +404,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error { txn := s.db.Txn(true) defer txn.Abort() + watch := make(watchItems) + watch.add(watchItem{table: "evals"}) for _, eval := range evals { existing, err := txn.First("evals", "id", eval) @@ -427,6 +428,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e if existing == nil { continue } + watch.add(watchItem{allocNode: existing.(*structs.Allocation).NodeID}) if err := txn.Delete("allocs", existing); err != nil { return fmt.Errorf("alloc delete failed: %v", err) } @@ -440,7 +442,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watchItem{table: "evals"}) }) + txn.Defer(func() { s.watch.notify(watch.items()...) }) txn.Commit() return nil } @@ -545,9 +547,12 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error { txn := s.db.Txn(true) defer txn.Abort() + watch := make(watchItems) + watch.add(watchItem{table: "allocs"}) // Handle the allocations for _, alloc := range allocs { + watch.add(watchItem{allocNode: alloc.NodeID}) existing, err := txn.First("allocs", "id", alloc.ID) if err != nil { return fmt.Errorf("alloc lookup failed: %v", err) @@ -573,7 +578,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er return fmt.Errorf("index update failed: %v", err) } - txn.Defer(func() { s.watch.notify(watchItem{table: "allocs"}) }) + txn.Defer(func() { s.watch.notify(watch.items()...) }) txn.Commit() return nil } @@ -764,84 +769,3 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error { } return nil } - -// watchItem describes the scope of a watch. It is used to provide a uniform -// input for subscribe/unsubscribe and notification firing. -type watchItem struct { - allocID string - allocNode string - evalID string - jobID string - nodeID string - table string -} - -// watchItems is a helper used to construct a set of watchItems. It deduplicates -// the items as they are added using map keys. -type watchItems map[watchItem]struct{} - -// add adds an item to the watch set. -func (w watchItems) add(wi watchItem) { - w[wi] = struct{}{} -} - -// items returns the items as a slice. -func (w watchItems) items() []watchItem { - items := make([]watchItem, 0, len(w)) - for wi, _ := range w { - items = append(items, wi) - } - return items -} - -// stateWatch holds shared state for watching updates. This is -// outside of StateStore so it can be shared with snapshots. -type stateWatch struct { - items map[watchItem]*NotifyGroup - l sync.Mutex -} - -// newStateWatch creates a new stateWatch for change notification. -func newStateWatch() *stateWatch { - return &stateWatch{ - items: make(map[watchItem]*NotifyGroup), - } -} - -// watch subscribes a channel to the given watch item. -func (w *stateWatch) watch(wi watchItem, ch chan struct{}) { - w.l.Lock() - defer w.l.Unlock() - - grp, ok := w.items[wi] - if !ok { - grp = new(NotifyGroup) - w.items[wi] = grp - } - grp.Wait(ch) -} - -// stopWatch unsubscribes a channel from the given watch item. -func (w *stateWatch) stopWatch(wi watchItem, ch chan struct{}) { - w.l.Lock() - defer w.l.Unlock() - - if grp, ok := w.items[wi]; ok { - grp.Clear(ch) - if grp.Empty() { - delete(w.items, wi) - } - } -} - -// notify is used to fire notifications on the given watch items. -func (w *stateWatch) notify(items ...watchItem) { - w.l.Lock() - defer w.l.Unlock() - - for _, wi := range items { - if grp, ok := w.items[wi]; ok { - grp.Notify() - } - } -} diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 1c4b60238..58f8093bf 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -585,7 +585,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) { } notify1 := make(chan struct{}, 1) - state.WatchAllocs(alloc.NodeID, notify1) + state.WatchAllocNode(alloc.NodeID, notify1) err = state.DeleteEval(1002, []string{eval.ID, eval2.ID}, []string{alloc.ID, alloc2.ID}) if err != nil { @@ -808,14 +808,14 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) { } } -func TestStateStore_WatchAllocs(t *testing.T) { +func TestStateStore_WatchAllocNode(t *testing.T) { state := testStateStore(t) notify1 := make(chan struct{}, 1) notify2 := make(chan struct{}, 1) - state.WatchAllocs("foo", notify1) - state.WatchAllocs("foo", notify2) - state.StopWatchAllocs("foo", notify2) + state.WatchAllocNode("foo", notify1) + state.WatchAllocNode("foo", notify2) + state.StopWatchAllocNode("foo", notify2) alloc := mock.Alloc() alloc.NodeID = "foo" diff --git a/nomad/state/watch.go b/nomad/state/watch.go new file mode 100644 index 000000000..c0e529b28 --- /dev/null +++ b/nomad/state/watch.go @@ -0,0 +1,86 @@ +package state + +import ( + "sync" +) + +// watchItem describes the scope of a watch. It is used to provide a uniform +// input for subscribe/unsubscribe and notification firing. +type watchItem struct { + allocID string + allocNode string + evalID string + jobID string + nodeID string + table string +} + +// watchItems is a helper used to construct a set of watchItems. It deduplicates +// the items as they are added using map keys. +type watchItems map[watchItem]struct{} + +// add adds an item to the watch set. +func (w watchItems) add(wi watchItem) { + w[wi] = struct{}{} +} + +// items returns the items as a slice. +func (w watchItems) items() []watchItem { + items := make([]watchItem, 0, len(w)) + for wi, _ := range w { + items = append(items, wi) + } + return items +} + +// stateWatch holds shared state for watching updates. This is +// outside of StateStore so it can be shared with snapshots. +type stateWatch struct { + items map[watchItem]*NotifyGroup + l sync.Mutex +} + +// newStateWatch creates a new stateWatch for change notification. +func newStateWatch() *stateWatch { + return &stateWatch{ + items: make(map[watchItem]*NotifyGroup), + } +} + +// watch subscribes a channel to the given watch item. +func (w *stateWatch) watch(wi watchItem, ch chan struct{}) { + w.l.Lock() + defer w.l.Unlock() + + grp, ok := w.items[wi] + if !ok { + grp = new(NotifyGroup) + w.items[wi] = grp + } + grp.Wait(ch) +} + +// stopWatch unsubscribes a channel from the given watch item. +func (w *stateWatch) stopWatch(wi watchItem, ch chan struct{}) { + w.l.Lock() + defer w.l.Unlock() + + if grp, ok := w.items[wi]; ok { + grp.Clear(ch) + if grp.Empty() { + delete(w.items, wi) + } + } +} + +// notify is used to fire notifications on the given watch items. +func (w *stateWatch) notify(items ...watchItem) { + w.l.Lock() + defer w.l.Unlock() + + for _, wi := range items { + if grp, ok := w.items[wi]; ok { + grp.Notify() + } + } +} diff --git a/nomad/state/watch_test.go b/nomad/state/watch_test.go new file mode 100644 index 000000000..5992b65ee --- /dev/null +++ b/nomad/state/watch_test.go @@ -0,0 +1,64 @@ +package state + +import ( + "testing" +) + +func TestWatchItems(t *testing.T) { + // No items returns empty slice + wi := make(watchItems) + if items := wi.items(); len(items) != 0 { + t.Fatalf("expected empty, got: %#v", items) + } + + // Adding items works + wi.add(watchItem{table: "foo"}) + wi.add(watchItem{nodeID: "bar"}) + if items := wi.items(); len(items) != 2 { + t.Fatalf("expected 2 items, got: %#v", items) + } + + // Adding duplicates auto-dedupes + wi.add(watchItem{table: "foo"}) + if items := wi.items(); len(items) != 2 { + t.Fatalf("expected 2 items, got: %#v", items) + } +} + +func TestStateWatch_watch(t *testing.T) { + watch := newStateWatch() + notify1 := make(chan struct{}, 1) + notify2 := make(chan struct{}, 1) + notify3 := make(chan struct{}, 1) + + // Notifications trigger subscribed channels + watch.watch(watchItem{table: "foo"}, notify1) + watch.watch(watchItem{table: "bar"}, notify2) + watch.watch(watchItem{table: "baz"}, notify3) + + watch.notify(watchItem{table: "foo"}, watchItem{table: "bar"}) + if len(notify1) != 1 { + t.Fatalf("should notify") + } + if len(notify2) != 1 { + t.Fatalf("should notify") + } + if len(notify3) != 0 { + t.Fatalf("should not notify") + } +} + +func TestStateWatch_stopWatch(t *testing.T) { + watch := newStateWatch() + notify := make(chan struct{}) + + // First subscribe + watch.watch(watchItem{table: "foo"}, notify) + + // Unsubscribe stop notifications + watch.stopWatch(watchItem{table: "foo"}, notify) + watch.notify(watchItem{table: "foo"}) + if len(notify) != 0 { + t.Fatalf("should not notify") + } +}