From e7e2c799c8cedca1b654e547c806ed664ad15949 Mon Sep 17 00:00:00 2001 From: Drew Bailey <2614075+drewbailey@users.noreply.github.com> Date: Mon, 31 Aug 2020 13:19:28 -0400 Subject: [PATCH] Event Buffer Implemenation adds an event buffer to hold events from raft changes. update events to use event buffer fix append call provide way to prune buffer items after TTL event publisher tests basic publish test wire up max item ttl rename package to stream, cleanup exploratory work subscription filtering subscription plumbing allow subscribers to consume events, handle closing subscriptions back out old exploratory ctx work fix lint remove unused ctx bits add a few comments fix test stop publisher on abandon --- nomad/event/event.go | 13 -- nomad/state/state_changes.go | 20 +- nomad/state/state_store.go | 21 +- nomad/stream/event.go | 14 ++ nomad/stream/event_buffer.go | 306 +++++++++++++++++++++++++++ nomad/stream/event_buffer_test.go | 201 ++++++++++++++++++ nomad/stream/event_publisher.go | 195 +++++++++++++++++ nomad/stream/event_publisher_test.go | 134 ++++++++++++ nomad/stream/subscription.go | 136 ++++++++++++ nomad/stream/subscription_test.go | 76 +++++++ 10 files changed, 1088 insertions(+), 28 deletions(-) delete mode 100644 nomad/event/event.go create mode 100644 nomad/stream/event.go create mode 100644 nomad/stream/event_buffer.go create mode 100644 nomad/stream/event_buffer_test.go create mode 100644 nomad/stream/event_publisher.go create mode 100644 nomad/stream/event_publisher_test.go create mode 100644 nomad/stream/subscription.go create mode 100644 nomad/stream/subscription_test.go diff --git a/nomad/event/event.go b/nomad/event/event.go deleted file mode 100644 index 49e809ac8..000000000 --- a/nomad/event/event.go +++ /dev/null @@ -1,13 +0,0 @@ -package event - -type Event struct { - Topic string - Key string - Index uint64 - Payload interface{} -} - -type EventPublisher struct{} - -func NewPublisher() *EventPublisher { return &EventPublisher{} } -func (e EventPublisher) Publish(events []Event) {} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 22e32f31b..a71056e0f 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/stream" ) // ReadTxn is implemented by memdb.Txn to perform read operations. @@ -29,28 +29,28 @@ type Changes struct { type changeTrackerDB struct { db *memdb.MemDB publisher eventPublisher - processChanges func(ReadTxn, Changes) ([]event.Event, error) + processChanges func(ReadTxn, Changes) ([]stream.Event, error) } func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB { return &changeTrackerDB{ db: db, - publisher: event.NewPublisher(), + publisher: publisher, processChanges: changesFn, } } -type changeProcessor func(ReadTxn, Changes) ([]event.Event, error) +type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error) type eventPublisher interface { - Publish(events []event.Event) + Publish(index uint64, events []stream.Event) } // noOpPublisher satisfies the eventPublisher interface and does nothing type noOpPublisher struct{} -func (n *noOpPublisher) Publish(events []event.Event) {} -func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil } +func (n *noOpPublisher) Publish(index uint64, events []stream.Event) {} +func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn @@ -89,7 +89,7 @@ func (c *changeTrackerDB) publish(changes Changes) error { if err != nil { return fmt.Errorf("failed generating events from changes: %v", err) } - c.publisher.Publish(events) + c.publisher.Publish(changes.Index, events) return nil } @@ -146,7 +146,7 @@ func (tx *txn) Commit() error { return nil } -func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { +func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { // TODO: add handlers here. - return []event.Event{}, nil + return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6bb71382d..d15913cd2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -64,6 +64,10 @@ type StateStore struct { // abandonCh is used to signal watchers that this state store has been // abandoned (usually during a restore). This is only ever closed. abandonCh chan struct{} + + // TODO: refactor abondonCh to use a context so that both can use the same + // cancel mechanism. + stopEventPublisher func() } // NewStateStore is used to create a new state store @@ -75,12 +79,18 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { } // Create the state store + ctx, cancel := context.WithCancel(context.TODO()) s := &StateStore{ - logger: config.Logger.Named("state_store"), - config: config, - abandonCh: make(chan struct{}), + logger: config.Logger.Named("state_store"), + config: config, + abandonCh: make(chan struct{}), + stopEventPublisher: cancel, } - s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges) + publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ + EventBufferTTL: 1 * time.Hour, + EventBufferSize: 250, + }) + s.db = NewChangeTrackerDB(db, publisher, processDBChanges) // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { @@ -189,6 +199,7 @@ func (s *StateStore) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *StateStore) Abandon() { + s.stopEventPublisher() close(s.abandonCh) } diff --git a/nomad/stream/event.go b/nomad/stream/event.go new file mode 100644 index 000000000..2625dede2 --- /dev/null +++ b/nomad/stream/event.go @@ -0,0 +1,14 @@ +package stream + +const ( + AllKeys = "*" +) + +type Topic string + +type Event struct { + Topic Topic + Key string + Index uint64 + Payload interface{} +} diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go new file mode 100644 index 000000000..127ca5b9f --- /dev/null +++ b/nomad/stream/event_buffer.go @@ -0,0 +1,306 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" +) + +// eventBuffer is a single-writer, multiple-reader, fixed length concurrent +// buffer of events that have been published. The buffer is +// the head and tail of an atomically updated single-linked list. Atomic +// accesses are usually to be suspected as premature optimization but this +// specific design has several important features that significantly simplify a +// lot of our PubSub machinery. +// +// eventBuffer is an adaptation of conuls agent/stream/event eventBuffer but +// has been updated to be a max length buffer to work for Nomad's usecase. +// +// The eventBuffer only tracks the most recent set of published events, +// up to the max configured size, older events are dropped from the buffer +// but will only be garbage collected once the slowest reader drops the item. +// Consumers are notified of new events by closing a channel on the previous head +// allowing efficient broadcast to many watchers without having to run multiple +// goroutines or deliver to O(N) separate channels. +// +// Because eventBuffer is a linked list with atomically updated pointers, readers don't +// have to take a lock and can consume at their own pace. Slow readers can eventually +// append +// +// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil +// Events array. This enables subscribers to start watching for the next update +// immediately. +// +// The zero value eventBuffer is _not_ usable, as it has not been +// initialized with an empty bufferItem so can not be used to wait for the first +// published event. Call newEventBuffer to construct a new buffer. +// +// Calls to Append or AppendBuffer that mutate the head must be externally +// synchronized. This allows systems that already serialize writes to append +// without lock overhead (e.g. a snapshot goroutine appending thousands of +// events). +type eventBuffer struct { + size *int64 + + head atomic.Value + tail atomic.Value + + maxSize int64 + maxItemTTL time.Duration +} + +// newEventBuffer creates an eventBuffer ready for use. +func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { + zero := int64(0) + b := &eventBuffer{ + maxSize: size, + size: &zero, + maxItemTTL: maxItemTTL, + } + + item := newBufferItem(0, nil) + + b.head.Store(item) + b.tail.Store(item) + + return b +} + +// Append a set of events from one raft operation to the buffer and notify +// watchers. After calling append, the caller must not make any further +// mutations to the events as they may have been exposed to subscribers in other +// goroutines. Append only supports a single concurrent caller and must be +// externally synchronized with other Append, AppendBuffer or AppendErr calls. +func (b *eventBuffer) Append(index uint64, events []Event) { + b.appendItem(newBufferItem(index, events)) +} + +func (b *eventBuffer) appendItem(item *bufferItem) { + // Store the next item to the old tail + oldTail := b.Tail() + oldTail.link.next.Store(item) + + // Update the tail to the new item + b.tail.Store(item) + + // Increment the buffer size + size := atomic.AddInt64(b.size, 1) + + // Check if we need to advance the head to keep the list + // constrained to max size + if size > b.maxSize { + b.advanceHead() + } + + // notify waiters next event is available + close(oldTail.link.ch) + +} + +// advanceHead drops the current Head buffer item and notifies readers +// that the item should be discarded by closing droppedCh. +// Slow readers will prevent the old head from being GC'd until they +// discard it. +func (b *eventBuffer) advanceHead() { + old := b.Head() + next := old.link.next.Load() + + close(old.link.droppedCh) + b.head.Store(next) + atomic.AddInt64(b.size, -1) + +} + +// Head returns the current head of the buffer. It will always exist but it may +// be a "sentinel" empty item with a nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Head() *bufferItem { + return b.head.Load().(*bufferItem) +} + +// Tail returns the current tail of the buffer. It will always exist but it may +// be a "sentinel" empty item with a Nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Tail() *bufferItem { + return b.tail.Load().(*bufferItem) +} + +// StarStartAtClosest returns the closest bufferItem to a requested starting +// index as well as the offset between the requested index and returned one. +func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) { + item := b.Head() + if index < item.Index { + return item, int(item.Index) - int(index) + } + if item.Index == index { + return item, 0 + } + + for { + prev := item + item = item.NextNoBlock() + if item == nil { + return prev, int(index) - int(prev.Index) + } + if index < item.Index { + return item, int(item.Index) - int(index) + } + if index == item.Index { + return item, 0 + } + } +} + +// Len returns the current length of the buffer +func (b *eventBuffer) Len() int { + return int(atomic.LoadInt64(b.size)) +} + +// prune advances the head of the buffer until the head buffer item TTL +// is no longer expired. It should be externally synchronized as it mutates +// the buffer of items. +func (b *eventBuffer) prune() { + for { + head := b.Head() + if b.Len() == 0 { + return + } + + if time.Since(head.createdAt) > b.maxItemTTL { + b.advanceHead() + } else { + return + } + } +} + +// bufferItem represents a set of events published by a single raft operation. +// The first item returned by a newly constructed buffer will have nil Events. +// It is a sentinel value which is used to wait on the next events via Next. +// +// To iterate to the next event, a Next method may be called which may block if +// there is no next element yet. +// +// Holding a pointer to the item keeps all the events published since in memory +// so it's important that subscribers don't hold pointers to buffer items after +// they have been delivered except where it's intentional to maintain a cache or +// trailing store of events for performance reasons. +// +// Subscribers must not mutate the bufferItem or the Events or Encoded payloads +// inside as these are shared between all readers. +type bufferItem struct { + // Events is the set of events published at one raft index. This may be nil as + // a sentinel value to allow watching for the first event in a buffer. Callers + // should check and skip nil Events at any point in the buffer. It will also + // be nil if the producer appends an Error event because they can't complete + // the request to populate the buffer. Err will be non-nil in this case. + Events []Event + + Index uint64 + + // Err is non-nil if the producer can't complete their task and terminates the + // buffer. Subscribers should return the error to clients and cease attempting + // to read from the buffer. + Err error + + // link holds the next pointer and channel. This extra bit of indirection + // allows us to splice buffers together at arbitrary points without including + // events in one buffer just for the side-effect of watching for the next set. + // The link may not be mutated once the event is appended to a buffer. + link *bufferLink + + createdAt time.Time +} + +type bufferLink struct { + // next is an atomically updated pointer to the next event in the buffer. It + // is written exactly once by the single published and will always be set if + // ch is closed. + next atomic.Value + + // ch is closed when the next event is published. It should never be mutated + // (e.g. set to nil) as that is racey, but is closed once when the next event + // is published. the next pointer will have been set by the time this is + // closed. + ch chan struct{} + + // droppedCh is closed when the event is dropped from the buffer due to + // sizing constraints. + droppedCh chan struct{} +} + +// newBufferItem returns a blank buffer item with a link and chan ready to have +// the fields set and be appended to a buffer. +func newBufferItem(index uint64, events []Event) *bufferItem { + return &bufferItem{ + link: &bufferLink{ + ch: make(chan struct{}), + droppedCh: make(chan struct{}), + }, + Events: events, + Index: index, + createdAt: time.Now(), + } +} + +// Next return the next buffer item in the buffer. It may block until ctx is +// cancelled or until the next item is published. +func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) { + // See if there is already a next value, block if so. Note we don't rely on + // state change (chan nil) as that's not threadsafe but detecting close is. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-forceClose: + return nil, fmt.Errorf("subscription closed") + case <-i.link.ch: + } + + // Check if the reader is too slow and the event buffer as discarded the event + select { + case <-i.link.droppedCh: + return nil, fmt.Errorf("event dropped from buffer") + default: + } + + // If channel closed, there must be a next item to read + nextRaw := i.link.next.Load() + if nextRaw == nil { + // shouldn't be possible + return nil, errors.New("invalid next item") + } + next := nextRaw.(*bufferItem) + if next.Err != nil { + return nil, next.Err + } + return next, nil +} + +// NextNoBlock returns the next item in the buffer without blocking. If it +// reaches the most recent item it will return nil. +func (i *bufferItem) NextNoBlock() *bufferItem { + nextRaw := i.link.next.Load() + if nextRaw == nil { + return nil + } + return nextRaw.(*bufferItem) +} + +// NextLink returns either the next item in the buffer if there is one, or +// an empty item (that will be ignored by subscribers) that has a pointer to +// the same link as this bufferItem (but none of the bufferItem content). +// When the link.ch is closed, subscriptions will be notified of the next item. +func (i *bufferItem) NextLink() *bufferItem { + next := i.NextNoBlock() + if next == nil { + // Return an empty item that can be followed to the next item published. + return &bufferItem{link: i.link} + } + return next +} diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go new file mode 100644 index 000000000..15a96a4e0 --- /dev/null +++ b/nomad/stream/event_buffer_test.go @@ -0,0 +1,201 @@ +package stream + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventBufferFuzz(t *testing.T) { + nReaders := 1000 + nMessages := 1000 + + b := newEventBuffer(1000, DefaultTTL) + + // Start a write goroutine that will publish 10000 messages with sequential + // indexes and some jitter in timing (to allow clients to "catch up" and block + // waiting for updates). + go func() { + seed := time.Now().UnixNano() + t.Logf("Using seed %d", seed) + // z is a Zipfian distribution that gives us a number of milliseconds to + // sleep which are mostly low - near zero but occasionally spike up to near + // 100. + z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50) + + for i := 0; i < nMessages; i++ { + // Event content is arbitrary and not valid for our use of buffers in + // streaming - here we only care about the semantics of the buffer. + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + // Sleep sometimes for a while to let some subscribers catch up + wait := time.Duration(z.Uint64()) * time.Millisecond + time.Sleep(wait) + } + }() + + // Run n subscribers following and verifying + errCh := make(chan error, nReaders) + + // Load head here so all subscribers start from the same point or they might + // not run until several appends have already happened. + head := b.Head() + + for i := 0; i < nReaders; i++ { + go func(i int) { + expect := uint64(0) + item := head + var err error + for { + item, err = item.Next(context.Background(), nil) + if err != nil { + errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i, + expect, err) + return + } + if item.Events[0].Index != expect { + errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i, + expect, item.Events[0].Index) + return + } + expect++ + if expect == uint64(nMessages) { + // Succeeded + errCh <- nil + return + } + } + }(i) + } + + // Wait for all readers to finish one way or other + for i := 0; i < nReaders; i++ { + err := <-errCh + assert.NoError(t, err) + } +} + +func TestEventBuffer_Slow_Reader(t *testing.T) { + + b := newEventBuffer(10, DefaultTTL) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + head := b.Head() + + for i := 10; i < 15; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + // Ensure the slow reader errors to handle dropped events and + // fetch latest head + ev, err := head.Next(context.Background(), nil) + require.Error(t, err) + require.Nil(t, ev) + + newHead := b.Head() + require.Equal(t, 4, int(newHead.Index)) +} + +func TestEventBuffer_Size(t *testing.T) { + b := newEventBuffer(100, DefaultTTL) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + require.Equal(t, 10, b.Len()) +} + +// TestEventBuffer_Prune_AllOld tests the behavior when all items +// are past their TTL, the event buffer should prune down to the last message +// and hold onto the last item. +func TestEventBuffer_Prune_AllOld(t *testing.T) { + b := newEventBuffer(100, 1*time.Second) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + require.Equal(t, 10, int(b.Len())) + + time.Sleep(1 * time.Second) + + b.prune() + + require.Equal(t, 9, int(b.Head().Index)) + require.Equal(t, 0, b.Len()) +} + +func TestStartAt_CurrentIdx_Past_Start(t *testing.T) { + cases := []struct { + desc string + req uint64 + expected uint64 + offset int + }{ + { + desc: "requested index less than head receives head", + req: 10, + expected: 11, + offset: 1, + }, + { + desc: "requested exact match head", + req: 11, + expected: 11, + offset: 0, + }, + { + desc: "requested exact match", + req: 42, + expected: 42, + offset: 0, + }, + { + desc: "requested index greater than tail receives tail", + req: 500, + expected: 100, + offset: 400, + }, + } + + // buffer starts at index 11 goes to 100 + b := newEventBuffer(100, 1*time.Hour) + + for i := 11; i <= 100; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + got, offset := b.StartAtClosest(tc.req) + require.Equal(t, int(tc.expected), int(got.Index)) + require.Equal(t, tc.offset, offset) + }) + } +} diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go new file mode 100644 index 000000000..016119b6e --- /dev/null +++ b/nomad/stream/event_publisher.go @@ -0,0 +1,195 @@ +package stream + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-hclog" +) + +const ( + DefaultTTL = 1 * time.Hour +) + +type EventPublisherCfg struct { + EventBufferSize int64 + EventBufferTTL time.Duration +} + +type EventPublisher struct { + // lock protects the eventbuffer + lock sync.Mutex + + // eventBuf stores a configurable amount of events in memory + eventBuf *eventBuffer + + // pruneTick is the duration to periodically prune events from the event + // buffer. Defaults to 5s + pruneTick time.Duration + + logger hclog.Logger + + subscriptions *subscriptions + + // publishCh is used to send messages from an active txn to a goroutine which + // publishes events, so that publishing can happen asynchronously from + // the Commit call in the FSM hot path. + publishCh chan changeEvents +} + +type subscriptions struct { + // lock for byToken. If both subscription.lock and EventPublisher.lock need + // to be held, EventPublisher.lock MUST always be acquired first. + lock sync.RWMutex + + // byToken is an mapping of active Subscriptions indexed by a token and + // a pointer to the request. + // When the token is modified all subscriptions under that token will be + // reloaded. + // A subscription may be unsubscribed by using the pointer to the request. + byToken map[string]map[*SubscribeRequest]*Subscription +} + +func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublisher { + if cfg.EventBufferTTL == 0 { + cfg.EventBufferTTL = 1 * time.Hour + } + buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL) + e := &EventPublisher{ + eventBuf: buffer, + publishCh: make(chan changeEvents), + subscriptions: &subscriptions{ + byToken: make(map[string]map[*SubscribeRequest]*Subscription), + }, + } + + go e.handleUpdates(ctx) + go e.periodicPrune(ctx) + + return e +} + +// Publish events to all subscribers of the event Topic. +func (e *EventPublisher) Publish(index uint64, events []Event) { + if len(events) > 0 { + e.publishCh <- changeEvents{index: index, events: events} + } +} + +// Subscribe returns a new Subscription for a given request. +func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { + e.lock.Lock() + defer e.lock.Unlock() + + var head *bufferItem + var offset int + if req.Index != 0 { + head, offset = e.eventBuf.StartAtClosest(req.Index) + } else { + head = e.eventBuf.Head() + } + if offset > 0 { + e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index)) + } + + sub := newSubscription(req, head, func() {}) + + e.subscriptions.add(req, sub) + return sub, nil +} + +func (e *EventPublisher) handleUpdates(ctx context.Context) { + for { + select { + case <-ctx.Done(): + e.subscriptions.closeAll() + return + case update := <-e.publishCh: + e.sendEvents(update) + } + } +} + +func (e *EventPublisher) periodicPrune(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(e.pruneTick): + e.lock.Lock() + e.eventBuf.prune() + e.lock.Unlock() + } + } +} + +type changeEvents struct { + index uint64 + events []Event +} + +// sendEvents sends the given events to the publishers event buffer. +func (e *EventPublisher) sendEvents(update changeEvents) { + e.lock.Lock() + defer e.lock.Unlock() + + e.eventBuf.Append(update.index, update.events) +} + +func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { + s.lock.Lock() + defer s.lock.Unlock() + + subsByToken, ok := s.byToken[req.Token] + if !ok { + subsByToken = make(map[*SubscribeRequest]*Subscription) + s.byToken[req.Token] = subsByToken + } + subsByToken[req] = sub +} + +func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { + s.lock.RLock() + defer s.lock.RUnlock() + + for _, secretID := range tokenSecretIDs { + if subs, ok := s.byToken[secretID]; ok { + for _, sub := range subs { + sub.forceClose() + } + } + } +} + +// unsubscribe returns a function that the subscription will call to remove +// itself from the subsByToken. +// This function is returned as a closure so that the caller doesn't need to keep +// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the +// wrong pointer. +func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { + return func() { + s.lock.Lock() + defer s.lock.Unlock() + + subsByToken, ok := s.byToken[req.Token] + if !ok { + return + } + delete(subsByToken, req) + if len(subsByToken) == 0 { + delete(s.byToken, req.Token) + } + } +} + +func (s *subscriptions) closeAll() { + s.lock.Lock() + defer s.lock.Unlock() + + for _, byRequest := range s.byToken { + for _, sub := range byRequest { + sub.forceClose() + } + } +} diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go new file mode 100644 index 000000000..5085a6fd9 --- /dev/null +++ b/nomad/stream/event_publisher_test.go @@ -0,0 +1,134 @@ +package stream + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { + subscription := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"sub-key"}, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(ctx, EventPublisherCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL}) + sub, err := publisher.Subscribe(subscription) + require.NoError(t, err) + eventCh := consumeSubscription(ctx, sub) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + events := []Event{{ + Index: 1, + Topic: "Test", + Key: "sub-key", + Payload: "sample payload", + }} + publisher.Publish(1, events) + + // Subscriber should see the published event + result := nextResult(t, eventCh) + require.NoError(t, result.Err) + expected := []Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}} + require.Equal(t, expected, result.Events) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + // Publish a second event + events = []Event{{ + Index: 2, + Topic: "Test", + Key: "sub-key", + Payload: "sample payload 2", + }} + publisher.Publish(2, events) + + result = nextResult(t, eventCh) + require.NoError(t, result.Err) + expected = []Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}} + require.Equal(t, expected, result.Events) +} + +func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher := NewEventPublisher(ctx, EventPublisherCfg{}) + + sub1, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub1.Unsubscribe() + + sub2, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub2.Unsubscribe() + + cancel() // Shutdown + + err = consumeSub(context.Background(), sub1) + require.Equal(t, err, ErrSubscriptionClosed) + + _, err = sub2.Next(context.Background()) + require.Equal(t, err, ErrSubscriptionClosed) +} + +func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { + eventCh := make(chan subNextResult, 1) + go func() { + for { + es, err := sub.Next(ctx) + eventCh <- subNextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +type subNextResult struct { + Events []Event + Err error +} + +func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { + t.Helper() + select { + case next := <-eventCh: + return next + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return subNextResult{} +} + +func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +} + +func consumeSub(ctx context.Context, sub *Subscription) error { + for { + _, err := sub.Next(ctx) + if err != nil { + return err + } + } +} diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go new file mode 100644 index 000000000..6846e005b --- /dev/null +++ b/nomad/stream/subscription.go @@ -0,0 +1,136 @@ +package stream + +import ( + "context" + "errors" + "sync/atomic" +) + +const ( + // subscriptionStateOpen is the default state of a subscription. An open + // subscription may receive new events. + subscriptionStateOpen uint32 = 0 + + // subscriptionStateClosed indicates that the subscription was closed, possibly + // as a result of a change to an ACL token, and will not receive new events. + // The subscriber must issue a new Subscribe request. + subscriptionStateClosed uint32 = 1 +) + +// ErrSubscriptionClosed is a error signalling the subscription has been +// closed. The client should Unsubscribe, then re-Subscribe. +var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") + +// type Subscriber struct { +// logger hclog.Logger +// } + +type Subscription struct { + // state is accessed atomically 0 means open, 1 means closed with reload + state uint32 + + req *SubscribeRequest + + // currentItem stores the current buffer item we are on. It + // is mutated by calls to Next. + currentItem *bufferItem + + // forceClosed is closed when forceClose is called. It is used by + // EventPublisher to cancel Next(). + forceClosed chan struct{} + + // unsub is a function set by EventPublisher that is called to free resources + // when the subscription is no longer needed. + // It must be safe to call the function from multiple goroutines and the function + // must be idempotent. + unsub func() +} + +type SubscribeRequest struct { + Token string + Index uint64 + + Topics map[Topic][]string +} + +func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { + return &Subscription{ + forceClosed: make(chan struct{}), + req: req, + currentItem: item, + unsub: unsub, + } +} + +func (s *Subscription) Next(ctx context.Context) ([]Event, error) { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { + return nil, ErrSubscriptionClosed + } + + for { + next, err := s.currentItem.Next(ctx, s.forceClosed) + switch { + case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: + return nil, ErrSubscriptionClosed + case err != nil: + return nil, err + } + s.currentItem = next + + events := filter(s.req, next.Events) + if len(events) == 0 { + continue + } + return events, nil + } +} + +func (s *Subscription) forceClose() { + swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) + if swapped { + close(s.forceClosed) + } +} + +func (s *Subscription) Unsubscribe() { + s.unsub() +} + +// filter events to only those that match a subscriptions topic/keys +func filter(req *SubscribeRequest, events []Event) []Event { + if len(events) == 0 { + return events + } + + var count int + for _, e := range events { + if _, ok := req.Topics[e.Topic]; ok { + for _, k := range req.Topics[e.Topic] { + if e.Key == k || k == AllKeys { + count++ + } + } + } + } + + // Only allocate a new slice if some events need to be filtered out + switch count { + case 0: + return nil + case len(events): + return events + } + + // Return filtered events + result := make([]Event, 0, count) + for _, e := range events { + if _, ok := req.Topics[e.Topic]; ok { + for _, k := range req.Topics[e.Topic] { + if e.Key == k || k == AllKeys { + result = append(result, e) + } + } + } + } + return result +} diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go new file mode 100644 index 000000000..9d1f55720 --- /dev/null +++ b/nomad/stream/subscription_test.go @@ -0,0 +1,76 @@ +package stream + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSubscription(t *testing.T) { + +} + +func TestFilter_AllKeys(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"*"}, + }, + } + actual := filter(req, events) + require.Equal(t, events, actual) + + // ensure new array was not allocated + require.Equal(t, cap(actual), 5) +} + +func TestFilter_PartialMatch_Topic(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}, Event{Topic: "Exclude", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"*"}, + }, + } + actual := filter(req, events) + expected := []Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}} + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 2) +} + +func TestFilter_PartialMatch_Key(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"One"}, + }, + } + actual := filter(req, events) + expected := []Event{{Topic: "Test", Key: "One"}} + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 1) +} + +func TestFilter_NoMatch(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "NodeEvents": []string{"*"}, + "Test": []string{"Highly-Specific-Key"}, + }, + } + actual := filter(req, events) + var expected []Event + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 0) +}