nomad/state: adding watching mechanisms

This commit is contained in:
Armon Dadgar
2015-08-22 18:57:15 -07:00
parent 9b4f3a4c8f
commit 2bc6c8a37c
4 changed files with 251 additions and 3 deletions

62
nomad/state/notify.go Normal file
View File

@@ -0,0 +1,62 @@
package state
import (
"sync"
)
// NotifyGroup is used to allow a simple notification mechanism.
// Channels can be marked as waiting, and when notify is invoked,
// all the waiting channels get a message and are cleared from the
// notify list.
type NotifyGroup struct {
l sync.Mutex
notify map[chan struct{}]struct{}
}
// Notify will do a non-blocking send to all waiting channels, and
// clear the notify list
func (n *NotifyGroup) Notify() {
n.l.Lock()
defer n.l.Unlock()
for ch, _ := range n.notify {
select {
case ch <- struct{}{}:
default:
}
}
n.notify = nil
}
// Wait adds a channel to the notify group
func (n *NotifyGroup) Wait(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
n.notify = make(map[chan struct{}]struct{})
}
n.notify[ch] = struct{}{}
}
// Clear removes a channel from the notify group
func (n *NotifyGroup) Clear(ch chan struct{}) {
n.l.Lock()
defer n.l.Unlock()
if n.notify == nil {
return
}
delete(n.notify, ch)
}
// WaitCh allocates a channel that is subscribed to notifications
func (n *NotifyGroup) WaitCh() chan struct{} {
ch := make(chan struct{}, 1)
n.Wait(ch)
return ch
}
// Empty checks if there are no channels to notify
func (n *NotifyGroup) Empty() bool {
n.l.Lock()
defer n.l.Unlock()
return len(n.notify) == 0
}

View File

@@ -0,0 +1,72 @@
package state
import (
"testing"
)
func TestNotifyGroup(t *testing.T) {
grp := &NotifyGroup{}
ch1 := grp.WaitCh()
ch2 := grp.WaitCh()
select {
case <-ch1:
t.Fatalf("should block")
default:
}
select {
case <-ch2:
t.Fatalf("should block")
default:
}
grp.Notify()
select {
case <-ch1:
default:
t.Fatalf("should not block")
}
select {
case <-ch2:
default:
t.Fatalf("should not block")
}
// Should be unregistered
ch3 := grp.WaitCh()
grp.Notify()
select {
case <-ch1:
t.Fatalf("should block")
default:
}
select {
case <-ch2:
t.Fatalf("should block")
default:
}
select {
case <-ch3:
default:
t.Fatalf("should not block")
}
}
func TestNotifyGroup_Clear(t *testing.T) {
grp := &NotifyGroup{}
ch1 := grp.WaitCh()
grp.Clear(ch1)
grp.Notify()
// Should not get anything
select {
case <-ch1:
t.Fatalf("should not get message")
default:
}
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"io"
"log"
"sync"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/structs"
@@ -19,6 +20,7 @@ import (
type StateStore struct {
logger *log.Logger
db *memdb.MemDB
watch *stateWatch
}
// StateSnapshot is used to provide a point-in-time snapshot
@@ -30,7 +32,9 @@ type StateSnapshot struct {
// restoring state by only using a single large transaction
// instead of thousands of sub transactions
type StateRestore struct {
txn *memdb.Txn
txn *memdb.Txn
watch *stateWatch
allocNodes map[string]struct{}
}
// Abort is used to abort the restore operation
@@ -40,6 +44,7 @@ func (s *StateRestore) Abort() {
// Commit is used to commit the restore operation
func (s *StateRestore) Commit() {
s.txn.Defer(func() { s.watch.notifyAllocs(s.allocNodes) })
s.txn.Commit()
}
@@ -50,6 +55,13 @@ type IndexEntry struct {
Value uint64
}
// stateWatch holds shared state for watching updates. This is
// outside of StateStore so it can be shared with snapshots.
type stateWatch struct {
allocs map[string]*NotifyGroup
allocLock sync.Mutex
}
// NewStateStore is used to create a new state store
func NewStateStore(logOutput io.Writer) (*StateStore, error) {
// Create the MemDB
@@ -58,10 +70,16 @@ func NewStateStore(logOutput io.Writer) (*StateStore, error) {
return nil, fmt.Errorf("state store setup failed: %v", err)
}
// Create the watch entry
watch := &stateWatch{
allocs: make(map[string]*NotifyGroup),
}
// Create the state store
s := &StateStore{
logger: log.New(logOutput, "", log.LstdFlags),
db: db,
watch: watch,
}
return s, nil
}
@@ -74,6 +92,7 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
StateStore: StateStore{
logger: s.logger,
db: s.db.Snapshot(),
watch: s.watch,
},
}
return snap, nil
@@ -84,7 +103,56 @@ func (s *StateStore) Snapshot() (*StateSnapshot, error) {
// overhead.
func (s *StateStore) Restore() (*StateRestore, error) {
txn := s.db.Txn(true)
return &StateRestore{txn}, nil
r := &StateRestore{
txn: txn,
watch: s.watch,
allocNodes: make(map[string]struct{}),
}
return r, nil
}
// WatchAllocs is used to subscribe a channel to changes in allocations for a node
func (s *StateStore) WatchAllocs(node string, notify chan struct{}) {
s.watch.allocLock.Lock()
defer s.watch.allocLock.Unlock()
// Check for an existing notify group
if grp, ok := s.watch.allocs[node]; ok {
grp.Wait(notify)
return
}
// Create new notify group
grp := &NotifyGroup{}
grp.Wait(notify)
s.watch.allocs[node] = grp
}
// StopWatchAllocs is used to unsubscribe a channel from changes in allocations
func (s *StateStore) StopWatchAllocs(node string, notify chan struct{}) {
s.watch.allocLock.Lock()
defer s.watch.allocLock.Unlock()
// Check for an existing notify group
if grp, ok := s.watch.allocs[node]; ok {
grp.Clear(notify)
if grp.Empty() {
delete(s.watch.allocs, node)
}
}
}
// notifyAllocs is used to notify any node alloc listeners of a change
func (w *stateWatch) notifyAllocs(nodes map[string]struct{}) {
w.allocLock.Lock()
defer w.allocLock.Unlock()
for node := range nodes {
if grp, ok := w.allocs[node]; ok {
grp.Notify()
delete(w.allocs, node)
}
}
}
// RegisterNode is used to register a node or update a node definition
@@ -339,6 +407,7 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct
func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) error {
txn := s.db.Txn(true)
defer txn.Abort()
nodes := make(map[string]struct{})
for _, eval := range evals {
existing, err := txn.First("evals", "id", eval)
@@ -361,6 +430,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if existing == nil {
continue
}
nodes[existing.(*structs.Allocation).NodeID] = struct{}{}
if err := txn.Delete("allocs", existing); err != nil {
return fmt.Errorf("alloc delete failed: %v", err)
}
@@ -373,7 +443,7 @@ func (s *StateStore) DeleteEval(index uint64, evals []string, allocs []string) e
if err := txn.Insert("index", &IndexEntry{"allocs", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Commit()
return nil
}
@@ -411,6 +481,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string,
allocs []*structs.Allocation) error {
txn := s.db.Txn(true)
defer txn.Abort()
nodes := make(map[string]struct{})
// Handle evictions first
for _, evict := range evicts {
@@ -425,6 +496,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string,
*newAlloc = *existing.(*structs.Allocation)
newAlloc.Status = structs.AllocStatusEvict
newAlloc.StatusDescription = ""
nodes[newAlloc.NodeID] = struct{}{}
if err := txn.Insert("allocs", newAlloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
@@ -444,6 +516,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string,
alloc.CreateIndex = existing.(*structs.Allocation).CreateIndex
alloc.ModifyIndex = index
}
nodes[alloc.NodeID] = struct{}{}
if err := txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}
@@ -454,6 +527,7 @@ func (s *StateStore) UpdateAllocations(index uint64, evicts []string,
return fmt.Errorf("index update failed: %v", err)
}
txn.Defer(func() { s.watch.notifyAllocs(nodes) })
txn.Commit()
return nil
}
@@ -601,6 +675,7 @@ func (r *StateRestore) EvalRestore(eval *structs.Evaluation) error {
// AllocRestore is used to restore an allocation
func (r *StateRestore) AllocRestore(alloc *structs.Allocation) error {
r.allocNodes[alloc.NodeID] = struct{}{}
if err := r.txn.Insert("allocs", alloc); err != nil {
return fmt.Errorf("alloc insert failed: %v", err)
}

View File

@@ -483,6 +483,9 @@ func TestStateStore_DeleteEval_GetEval(t *testing.T) {
t.Fatalf("err: %v", err)
}
notify1 := make(chan struct{}, 1)
state.WatchAllocs(alloc.NodeID, notify1)
err = state.DeleteEval(1002, []string{eval.ID, eval2.ID}, []string{alloc.ID, alloc2.ID})
if err != nil {
t.Fatalf("err: %v", err)
@@ -539,6 +542,12 @@ func TestStateStore_DeleteEval_GetEval(t *testing.T) {
if index != 1002 {
t.Fatalf("bad: %d", index)
}
select {
case <-notify1:
default:
t.Fatalf("should be notified")
}
}
func TestStateStore_Evals(t *testing.T) {
@@ -631,6 +640,36 @@ func TestStateStore_UpsertAlloc_GetAlloc(t *testing.T) {
}
}
func TestStateStore_WatchAllocs(t *testing.T) {
state := testStateStore(t)
notify1 := make(chan struct{}, 1)
notify2 := make(chan struct{}, 1)
state.WatchAllocs("foo", notify1)
state.WatchAllocs("foo", notify2)
state.StopWatchAllocs("foo", notify2)
alloc := mock.Alloc()
alloc.NodeID = "foo"
err := state.UpdateAllocations(1000, nil,
[]*structs.Allocation{alloc})
if err != nil {
t.Fatalf("err: %v", err)
}
select {
case <-notify1:
default:
t.Fatalf("should be notified")
}
select {
case <-notify2:
t.Fatalf("should not be notified")
default:
}
}
func TestStateStore_UpdateAlloc_GetAlloc(t *testing.T) {
state := testStateStore(t)
alloc := mock.Alloc()