mirror of
https://github.com/kemko/nomad.git
synced 2026-01-14 22:35:42 +03:00
nomad: move state watcher into its own file, add tests
This commit is contained in:
@@ -4,7 +4,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -405,6 +404,8 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
|
||||
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
watch := make(watchItems)
|
||||
watch.add(watchItem{table: "evals"})
|
||||
|
||||
for _, eval := range evals {
|
||||
existing, err := txn.First("evals", "id", eval)
|
||||
@@ -427,6 +428,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
|
||||
if existing == nil {
|
||||
continue
|
||||
}
|
||||
watch.add(watchItem{allocNode: existing.(*structs.Allocation).NodeID})
|
||||
if err := txn.Delete("allocs", existing); err != nil {
|
||||
return fmt.Errorf("alloc delete failed: %v", err)
|
||||
}
|
||||
@@ -440,7 +442,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Defer(func() { s.watch.notify(watchItem{table: "evals"}) })
|
||||
txn.Defer(func() { s.watch.notify(watch.items()...) })
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
@@ -545,9 +547,12 @@ func (s *StateStore) UpdateAllocFromClient(index uint64, alloc *structs.Allocati
|
||||
func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) error {
|
||||
txn := s.db.Txn(true)
|
||||
defer txn.Abort()
|
||||
watch := make(watchItems)
|
||||
watch.add(watchItem{table: "allocs"})
|
||||
|
||||
// Handle the allocations
|
||||
for _, alloc := range allocs {
|
||||
watch.add(watchItem{allocNode: alloc.NodeID})
|
||||
existing, err := txn.First("allocs", "id", alloc.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("alloc lookup failed: %v", err)
|
||||
@@ -573,7 +578,7 @@ func (s *StateStore) UpsertAllocs(index uint64, allocs []*structs.Allocation) er
|
||||
return fmt.Errorf("index update failed: %v", err)
|
||||
}
|
||||
|
||||
txn.Defer(func() { s.watch.notify(watchItem{table: "allocs"}) })
|
||||
txn.Defer(func() { s.watch.notify(watch.items()...) })
|
||||
txn.Commit()
|
||||
return nil
|
||||
}
|
||||
@@ -764,84 +769,3 @@ func (r *StateRestore) IndexRestore(idx *IndexEntry) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchItem describes the scope of a watch. It is used to provide a uniform
|
||||
// input for subscribe/unsubscribe and notification firing.
|
||||
type watchItem struct {
|
||||
allocID string
|
||||
allocNode string
|
||||
evalID string
|
||||
jobID string
|
||||
nodeID string
|
||||
table string
|
||||
}
|
||||
|
||||
// watchItems is a helper used to construct a set of watchItems. It deduplicates
|
||||
// the items as they are added using map keys.
|
||||
type watchItems map[watchItem]struct{}
|
||||
|
||||
// add adds an item to the watch set.
|
||||
func (w watchItems) add(wi watchItem) {
|
||||
w[wi] = struct{}{}
|
||||
}
|
||||
|
||||
// items returns the items as a slice.
|
||||
func (w watchItems) items() []watchItem {
|
||||
items := make([]watchItem, 0, len(w))
|
||||
for wi, _ := range w {
|
||||
items = append(items, wi)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
// stateWatch holds shared state for watching updates. This is
|
||||
// outside of StateStore so it can be shared with snapshots.
|
||||
type stateWatch struct {
|
||||
items map[watchItem]*NotifyGroup
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// newStateWatch creates a new stateWatch for change notification.
|
||||
func newStateWatch() *stateWatch {
|
||||
return &stateWatch{
|
||||
items: make(map[watchItem]*NotifyGroup),
|
||||
}
|
||||
}
|
||||
|
||||
// watch subscribes a channel to the given watch item.
|
||||
func (w *stateWatch) watch(wi watchItem, ch chan struct{}) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
grp, ok := w.items[wi]
|
||||
if !ok {
|
||||
grp = new(NotifyGroup)
|
||||
w.items[wi] = grp
|
||||
}
|
||||
grp.Wait(ch)
|
||||
}
|
||||
|
||||
// stopWatch unsubscribes a channel from the given watch item.
|
||||
func (w *stateWatch) stopWatch(wi watchItem, ch chan struct{}) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
if grp, ok := w.items[wi]; ok {
|
||||
grp.Clear(ch)
|
||||
if grp.Empty() {
|
||||
delete(w.items, wi)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notify is used to fire notifications on the given watch items.
|
||||
func (w *stateWatch) notify(items ...watchItem) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
for _, wi := range items {
|
||||
if grp, ok := w.items[wi]; ok {
|
||||
grp.Notify()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -585,7 +585,7 @@ func TestStateStore_DeleteEval_Eval(t *testing.T) {
|
||||
}
|
||||
|
||||
notify1 := make(chan struct{}, 1)
|
||||
state.WatchAllocs(alloc.NodeID, notify1)
|
||||
state.WatchAllocNode(alloc.NodeID, notify1)
|
||||
|
||||
err = state.DeleteEval(1002, []string{eval.ID, eval2.ID}, []string{alloc.ID, alloc2.ID})
|
||||
if err != nil {
|
||||
@@ -808,14 +808,14 @@ func TestStateStore_UpsertAlloc_Alloc(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_WatchAllocs(t *testing.T) {
|
||||
func TestStateStore_WatchAllocNode(t *testing.T) {
|
||||
state := testStateStore(t)
|
||||
|
||||
notify1 := make(chan struct{}, 1)
|
||||
notify2 := make(chan struct{}, 1)
|
||||
state.WatchAllocs("foo", notify1)
|
||||
state.WatchAllocs("foo", notify2)
|
||||
state.StopWatchAllocs("foo", notify2)
|
||||
state.WatchAllocNode("foo", notify1)
|
||||
state.WatchAllocNode("foo", notify2)
|
||||
state.StopWatchAllocNode("foo", notify2)
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.NodeID = "foo"
|
||||
|
||||
86
nomad/state/watch.go
Normal file
86
nomad/state/watch.go
Normal file
@@ -0,0 +1,86 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
// watchItem describes the scope of a watch. It is used to provide a uniform
|
||||
// input for subscribe/unsubscribe and notification firing.
|
||||
type watchItem struct {
|
||||
allocID string
|
||||
allocNode string
|
||||
evalID string
|
||||
jobID string
|
||||
nodeID string
|
||||
table string
|
||||
}
|
||||
|
||||
// watchItems is a helper used to construct a set of watchItems. It deduplicates
|
||||
// the items as they are added using map keys.
|
||||
type watchItems map[watchItem]struct{}
|
||||
|
||||
// add adds an item to the watch set.
|
||||
func (w watchItems) add(wi watchItem) {
|
||||
w[wi] = struct{}{}
|
||||
}
|
||||
|
||||
// items returns the items as a slice.
|
||||
func (w watchItems) items() []watchItem {
|
||||
items := make([]watchItem, 0, len(w))
|
||||
for wi, _ := range w {
|
||||
items = append(items, wi)
|
||||
}
|
||||
return items
|
||||
}
|
||||
|
||||
// stateWatch holds shared state for watching updates. This is
|
||||
// outside of StateStore so it can be shared with snapshots.
|
||||
type stateWatch struct {
|
||||
items map[watchItem]*NotifyGroup
|
||||
l sync.Mutex
|
||||
}
|
||||
|
||||
// newStateWatch creates a new stateWatch for change notification.
|
||||
func newStateWatch() *stateWatch {
|
||||
return &stateWatch{
|
||||
items: make(map[watchItem]*NotifyGroup),
|
||||
}
|
||||
}
|
||||
|
||||
// watch subscribes a channel to the given watch item.
|
||||
func (w *stateWatch) watch(wi watchItem, ch chan struct{}) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
grp, ok := w.items[wi]
|
||||
if !ok {
|
||||
grp = new(NotifyGroup)
|
||||
w.items[wi] = grp
|
||||
}
|
||||
grp.Wait(ch)
|
||||
}
|
||||
|
||||
// stopWatch unsubscribes a channel from the given watch item.
|
||||
func (w *stateWatch) stopWatch(wi watchItem, ch chan struct{}) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
if grp, ok := w.items[wi]; ok {
|
||||
grp.Clear(ch)
|
||||
if grp.Empty() {
|
||||
delete(w.items, wi)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// notify is used to fire notifications on the given watch items.
|
||||
func (w *stateWatch) notify(items ...watchItem) {
|
||||
w.l.Lock()
|
||||
defer w.l.Unlock()
|
||||
|
||||
for _, wi := range items {
|
||||
if grp, ok := w.items[wi]; ok {
|
||||
grp.Notify()
|
||||
}
|
||||
}
|
||||
}
|
||||
64
nomad/state/watch_test.go
Normal file
64
nomad/state/watch_test.go
Normal file
@@ -0,0 +1,64 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWatchItems(t *testing.T) {
|
||||
// No items returns empty slice
|
||||
wi := make(watchItems)
|
||||
if items := wi.items(); len(items) != 0 {
|
||||
t.Fatalf("expected empty, got: %#v", items)
|
||||
}
|
||||
|
||||
// Adding items works
|
||||
wi.add(watchItem{table: "foo"})
|
||||
wi.add(watchItem{nodeID: "bar"})
|
||||
if items := wi.items(); len(items) != 2 {
|
||||
t.Fatalf("expected 2 items, got: %#v", items)
|
||||
}
|
||||
|
||||
// Adding duplicates auto-dedupes
|
||||
wi.add(watchItem{table: "foo"})
|
||||
if items := wi.items(); len(items) != 2 {
|
||||
t.Fatalf("expected 2 items, got: %#v", items)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateWatch_watch(t *testing.T) {
|
||||
watch := newStateWatch()
|
||||
notify1 := make(chan struct{}, 1)
|
||||
notify2 := make(chan struct{}, 1)
|
||||
notify3 := make(chan struct{}, 1)
|
||||
|
||||
// Notifications trigger subscribed channels
|
||||
watch.watch(watchItem{table: "foo"}, notify1)
|
||||
watch.watch(watchItem{table: "bar"}, notify2)
|
||||
watch.watch(watchItem{table: "baz"}, notify3)
|
||||
|
||||
watch.notify(watchItem{table: "foo"}, watchItem{table: "bar"})
|
||||
if len(notify1) != 1 {
|
||||
t.Fatalf("should notify")
|
||||
}
|
||||
if len(notify2) != 1 {
|
||||
t.Fatalf("should notify")
|
||||
}
|
||||
if len(notify3) != 0 {
|
||||
t.Fatalf("should not notify")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateWatch_stopWatch(t *testing.T) {
|
||||
watch := newStateWatch()
|
||||
notify := make(chan struct{})
|
||||
|
||||
// First subscribe
|
||||
watch.watch(watchItem{table: "foo"}, notify)
|
||||
|
||||
// Unsubscribe stop notifications
|
||||
watch.stopWatch(watchItem{table: "foo"}, notify)
|
||||
watch.notify(watchItem{table: "foo"})
|
||||
if len(notify) != 0 {
|
||||
t.Fatalf("should not notify")
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user