diff --git a/nomad/event/event.go b/nomad/event/event.go deleted file mode 100644 index 49e809ac8..000000000 --- a/nomad/event/event.go +++ /dev/null @@ -1,13 +0,0 @@ -package event - -type Event struct { - Topic string - Key string - Index uint64 - Payload interface{} -} - -type EventPublisher struct{} - -func NewPublisher() *EventPublisher { return &EventPublisher{} } -func (e EventPublisher) Publish(events []Event) {} diff --git a/nomad/state/state_changes.go b/nomad/state/state_changes.go index 22e32f31b..a71056e0f 100644 --- a/nomad/state/state_changes.go +++ b/nomad/state/state_changes.go @@ -4,7 +4,7 @@ import ( "fmt" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/stream" ) // ReadTxn is implemented by memdb.Txn to perform read operations. @@ -29,28 +29,28 @@ type Changes struct { type changeTrackerDB struct { db *memdb.MemDB publisher eventPublisher - processChanges func(ReadTxn, Changes) ([]event.Event, error) + processChanges func(ReadTxn, Changes) ([]stream.Event, error) } func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB { return &changeTrackerDB{ db: db, - publisher: event.NewPublisher(), + publisher: publisher, processChanges: changesFn, } } -type changeProcessor func(ReadTxn, Changes) ([]event.Event, error) +type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error) type eventPublisher interface { - Publish(events []event.Event) + Publish(index uint64, events []stream.Event) } // noOpPublisher satisfies the eventPublisher interface and does nothing type noOpPublisher struct{} -func (n *noOpPublisher) Publish(events []event.Event) {} -func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil } +func (n *noOpPublisher) Publish(index uint64, events []stream.Event) {} +func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil } // ReadTxn returns a read-only transaction which behaves exactly the same as // memdb.Txn @@ -89,7 +89,7 @@ func (c *changeTrackerDB) publish(changes Changes) error { if err != nil { return fmt.Errorf("failed generating events from changes: %v", err) } - c.publisher.Publish(events) + c.publisher.Publish(changes.Index, events) return nil } @@ -146,7 +146,7 @@ func (tx *txn) Commit() error { return nil } -func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) { +func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) { // TODO: add handlers here. - return []event.Event{}, nil + return []stream.Event{}, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 6bb71382d..d15913cd2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -13,7 +13,7 @@ import ( "github.com/pkg/errors" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/event" + "github.com/hashicorp/nomad/nomad/stream" "github.com/hashicorp/nomad/nomad/structs" ) @@ -64,6 +64,10 @@ type StateStore struct { // abandonCh is used to signal watchers that this state store has been // abandoned (usually during a restore). This is only ever closed. abandonCh chan struct{} + + // TODO: refactor abondonCh to use a context so that both can use the same + // cancel mechanism. + stopEventPublisher func() } // NewStateStore is used to create a new state store @@ -75,12 +79,18 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) { } // Create the state store + ctx, cancel := context.WithCancel(context.TODO()) s := &StateStore{ - logger: config.Logger.Named("state_store"), - config: config, - abandonCh: make(chan struct{}), + logger: config.Logger.Named("state_store"), + config: config, + abandonCh: make(chan struct{}), + stopEventPublisher: cancel, } - s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges) + publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{ + EventBufferTTL: 1 * time.Hour, + EventBufferSize: 250, + }) + s.db = NewChangeTrackerDB(db, publisher, processDBChanges) // Initialize the state store with required enterprise objects if err := s.enterpriseInit(); err != nil { @@ -189,6 +199,7 @@ func (s *StateStore) AbandonCh() <-chan struct{} { // Abandon is used to signal that the given state store has been abandoned. // Calling this more than one time will panic. func (s *StateStore) Abandon() { + s.stopEventPublisher() close(s.abandonCh) } diff --git a/nomad/stream/event.go b/nomad/stream/event.go new file mode 100644 index 000000000..2625dede2 --- /dev/null +++ b/nomad/stream/event.go @@ -0,0 +1,14 @@ +package stream + +const ( + AllKeys = "*" +) + +type Topic string + +type Event struct { + Topic Topic + Key string + Index uint64 + Payload interface{} +} diff --git a/nomad/stream/event_buffer.go b/nomad/stream/event_buffer.go new file mode 100644 index 000000000..127ca5b9f --- /dev/null +++ b/nomad/stream/event_buffer.go @@ -0,0 +1,306 @@ +package stream + +import ( + "context" + "errors" + "fmt" + "sync/atomic" + "time" +) + +// eventBuffer is a single-writer, multiple-reader, fixed length concurrent +// buffer of events that have been published. The buffer is +// the head and tail of an atomically updated single-linked list. Atomic +// accesses are usually to be suspected as premature optimization but this +// specific design has several important features that significantly simplify a +// lot of our PubSub machinery. +// +// eventBuffer is an adaptation of conuls agent/stream/event eventBuffer but +// has been updated to be a max length buffer to work for Nomad's usecase. +// +// The eventBuffer only tracks the most recent set of published events, +// up to the max configured size, older events are dropped from the buffer +// but will only be garbage collected once the slowest reader drops the item. +// Consumers are notified of new events by closing a channel on the previous head +// allowing efficient broadcast to many watchers without having to run multiple +// goroutines or deliver to O(N) separate channels. +// +// Because eventBuffer is a linked list with atomically updated pointers, readers don't +// have to take a lock and can consume at their own pace. Slow readers can eventually +// append +// +// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil +// Events array. This enables subscribers to start watching for the next update +// immediately. +// +// The zero value eventBuffer is _not_ usable, as it has not been +// initialized with an empty bufferItem so can not be used to wait for the first +// published event. Call newEventBuffer to construct a new buffer. +// +// Calls to Append or AppendBuffer that mutate the head must be externally +// synchronized. This allows systems that already serialize writes to append +// without lock overhead (e.g. a snapshot goroutine appending thousands of +// events). +type eventBuffer struct { + size *int64 + + head atomic.Value + tail atomic.Value + + maxSize int64 + maxItemTTL time.Duration +} + +// newEventBuffer creates an eventBuffer ready for use. +func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer { + zero := int64(0) + b := &eventBuffer{ + maxSize: size, + size: &zero, + maxItemTTL: maxItemTTL, + } + + item := newBufferItem(0, nil) + + b.head.Store(item) + b.tail.Store(item) + + return b +} + +// Append a set of events from one raft operation to the buffer and notify +// watchers. After calling append, the caller must not make any further +// mutations to the events as they may have been exposed to subscribers in other +// goroutines. Append only supports a single concurrent caller and must be +// externally synchronized with other Append, AppendBuffer or AppendErr calls. +func (b *eventBuffer) Append(index uint64, events []Event) { + b.appendItem(newBufferItem(index, events)) +} + +func (b *eventBuffer) appendItem(item *bufferItem) { + // Store the next item to the old tail + oldTail := b.Tail() + oldTail.link.next.Store(item) + + // Update the tail to the new item + b.tail.Store(item) + + // Increment the buffer size + size := atomic.AddInt64(b.size, 1) + + // Check if we need to advance the head to keep the list + // constrained to max size + if size > b.maxSize { + b.advanceHead() + } + + // notify waiters next event is available + close(oldTail.link.ch) + +} + +// advanceHead drops the current Head buffer item and notifies readers +// that the item should be discarded by closing droppedCh. +// Slow readers will prevent the old head from being GC'd until they +// discard it. +func (b *eventBuffer) advanceHead() { + old := b.Head() + next := old.link.next.Load() + + close(old.link.droppedCh) + b.head.Store(next) + atomic.AddInt64(b.size, -1) + +} + +// Head returns the current head of the buffer. It will always exist but it may +// be a "sentinel" empty item with a nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Head() *bufferItem { + return b.head.Load().(*bufferItem) +} + +// Tail returns the current tail of the buffer. It will always exist but it may +// be a "sentinel" empty item with a Nil Events slice to allow consumers to +// watch for the next update. Consumers should always check for empty Events and +// treat them as no-ops. Will panic if eventBuffer was not initialized correctly +// with NewEventBuffer +func (b *eventBuffer) Tail() *bufferItem { + return b.tail.Load().(*bufferItem) +} + +// StarStartAtClosest returns the closest bufferItem to a requested starting +// index as well as the offset between the requested index and returned one. +func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) { + item := b.Head() + if index < item.Index { + return item, int(item.Index) - int(index) + } + if item.Index == index { + return item, 0 + } + + for { + prev := item + item = item.NextNoBlock() + if item == nil { + return prev, int(index) - int(prev.Index) + } + if index < item.Index { + return item, int(item.Index) - int(index) + } + if index == item.Index { + return item, 0 + } + } +} + +// Len returns the current length of the buffer +func (b *eventBuffer) Len() int { + return int(atomic.LoadInt64(b.size)) +} + +// prune advances the head of the buffer until the head buffer item TTL +// is no longer expired. It should be externally synchronized as it mutates +// the buffer of items. +func (b *eventBuffer) prune() { + for { + head := b.Head() + if b.Len() == 0 { + return + } + + if time.Since(head.createdAt) > b.maxItemTTL { + b.advanceHead() + } else { + return + } + } +} + +// bufferItem represents a set of events published by a single raft operation. +// The first item returned by a newly constructed buffer will have nil Events. +// It is a sentinel value which is used to wait on the next events via Next. +// +// To iterate to the next event, a Next method may be called which may block if +// there is no next element yet. +// +// Holding a pointer to the item keeps all the events published since in memory +// so it's important that subscribers don't hold pointers to buffer items after +// they have been delivered except where it's intentional to maintain a cache or +// trailing store of events for performance reasons. +// +// Subscribers must not mutate the bufferItem or the Events or Encoded payloads +// inside as these are shared between all readers. +type bufferItem struct { + // Events is the set of events published at one raft index. This may be nil as + // a sentinel value to allow watching for the first event in a buffer. Callers + // should check and skip nil Events at any point in the buffer. It will also + // be nil if the producer appends an Error event because they can't complete + // the request to populate the buffer. Err will be non-nil in this case. + Events []Event + + Index uint64 + + // Err is non-nil if the producer can't complete their task and terminates the + // buffer. Subscribers should return the error to clients and cease attempting + // to read from the buffer. + Err error + + // link holds the next pointer and channel. This extra bit of indirection + // allows us to splice buffers together at arbitrary points without including + // events in one buffer just for the side-effect of watching for the next set. + // The link may not be mutated once the event is appended to a buffer. + link *bufferLink + + createdAt time.Time +} + +type bufferLink struct { + // next is an atomically updated pointer to the next event in the buffer. It + // is written exactly once by the single published and will always be set if + // ch is closed. + next atomic.Value + + // ch is closed when the next event is published. It should never be mutated + // (e.g. set to nil) as that is racey, but is closed once when the next event + // is published. the next pointer will have been set by the time this is + // closed. + ch chan struct{} + + // droppedCh is closed when the event is dropped from the buffer due to + // sizing constraints. + droppedCh chan struct{} +} + +// newBufferItem returns a blank buffer item with a link and chan ready to have +// the fields set and be appended to a buffer. +func newBufferItem(index uint64, events []Event) *bufferItem { + return &bufferItem{ + link: &bufferLink{ + ch: make(chan struct{}), + droppedCh: make(chan struct{}), + }, + Events: events, + Index: index, + createdAt: time.Now(), + } +} + +// Next return the next buffer item in the buffer. It may block until ctx is +// cancelled or until the next item is published. +func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) { + // See if there is already a next value, block if so. Note we don't rely on + // state change (chan nil) as that's not threadsafe but detecting close is. + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-forceClose: + return nil, fmt.Errorf("subscription closed") + case <-i.link.ch: + } + + // Check if the reader is too slow and the event buffer as discarded the event + select { + case <-i.link.droppedCh: + return nil, fmt.Errorf("event dropped from buffer") + default: + } + + // If channel closed, there must be a next item to read + nextRaw := i.link.next.Load() + if nextRaw == nil { + // shouldn't be possible + return nil, errors.New("invalid next item") + } + next := nextRaw.(*bufferItem) + if next.Err != nil { + return nil, next.Err + } + return next, nil +} + +// NextNoBlock returns the next item in the buffer without blocking. If it +// reaches the most recent item it will return nil. +func (i *bufferItem) NextNoBlock() *bufferItem { + nextRaw := i.link.next.Load() + if nextRaw == nil { + return nil + } + return nextRaw.(*bufferItem) +} + +// NextLink returns either the next item in the buffer if there is one, or +// an empty item (that will be ignored by subscribers) that has a pointer to +// the same link as this bufferItem (but none of the bufferItem content). +// When the link.ch is closed, subscriptions will be notified of the next item. +func (i *bufferItem) NextLink() *bufferItem { + next := i.NextNoBlock() + if next == nil { + // Return an empty item that can be followed to the next item published. + return &bufferItem{link: i.link} + } + return next +} diff --git a/nomad/stream/event_buffer_test.go b/nomad/stream/event_buffer_test.go new file mode 100644 index 000000000..15a96a4e0 --- /dev/null +++ b/nomad/stream/event_buffer_test.go @@ -0,0 +1,201 @@ +package stream + +import ( + "context" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventBufferFuzz(t *testing.T) { + nReaders := 1000 + nMessages := 1000 + + b := newEventBuffer(1000, DefaultTTL) + + // Start a write goroutine that will publish 10000 messages with sequential + // indexes and some jitter in timing (to allow clients to "catch up" and block + // waiting for updates). + go func() { + seed := time.Now().UnixNano() + t.Logf("Using seed %d", seed) + // z is a Zipfian distribution that gives us a number of milliseconds to + // sleep which are mostly low - near zero but occasionally spike up to near + // 100. + z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50) + + for i := 0; i < nMessages; i++ { + // Event content is arbitrary and not valid for our use of buffers in + // streaming - here we only care about the semantics of the buffer. + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + // Sleep sometimes for a while to let some subscribers catch up + wait := time.Duration(z.Uint64()) * time.Millisecond + time.Sleep(wait) + } + }() + + // Run n subscribers following and verifying + errCh := make(chan error, nReaders) + + // Load head here so all subscribers start from the same point or they might + // not run until several appends have already happened. + head := b.Head() + + for i := 0; i < nReaders; i++ { + go func(i int) { + expect := uint64(0) + item := head + var err error + for { + item, err = item.Next(context.Background(), nil) + if err != nil { + errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i, + expect, err) + return + } + if item.Events[0].Index != expect { + errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i, + expect, item.Events[0].Index) + return + } + expect++ + if expect == uint64(nMessages) { + // Succeeded + errCh <- nil + return + } + } + }(i) + } + + // Wait for all readers to finish one way or other + for i := 0; i < nReaders; i++ { + err := <-errCh + assert.NoError(t, err) + } +} + +func TestEventBuffer_Slow_Reader(t *testing.T) { + + b := newEventBuffer(10, DefaultTTL) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + head := b.Head() + + for i := 10; i < 15; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + // Ensure the slow reader errors to handle dropped events and + // fetch latest head + ev, err := head.Next(context.Background(), nil) + require.Error(t, err) + require.Nil(t, ev) + + newHead := b.Head() + require.Equal(t, 4, int(newHead.Index)) +} + +func TestEventBuffer_Size(t *testing.T) { + b := newEventBuffer(100, DefaultTTL) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + require.Equal(t, 10, b.Len()) +} + +// TestEventBuffer_Prune_AllOld tests the behavior when all items +// are past their TTL, the event buffer should prune down to the last message +// and hold onto the last item. +func TestEventBuffer_Prune_AllOld(t *testing.T) { + b := newEventBuffer(100, 1*time.Second) + + for i := 0; i < 10; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + require.Equal(t, 10, int(b.Len())) + + time.Sleep(1 * time.Second) + + b.prune() + + require.Equal(t, 9, int(b.Head().Index)) + require.Equal(t, 0, b.Len()) +} + +func TestStartAt_CurrentIdx_Past_Start(t *testing.T) { + cases := []struct { + desc string + req uint64 + expected uint64 + offset int + }{ + { + desc: "requested index less than head receives head", + req: 10, + expected: 11, + offset: 1, + }, + { + desc: "requested exact match head", + req: 11, + expected: 11, + offset: 0, + }, + { + desc: "requested exact match", + req: 42, + expected: 42, + offset: 0, + }, + { + desc: "requested index greater than tail receives tail", + req: 500, + expected: 100, + offset: 400, + }, + } + + // buffer starts at index 11 goes to 100 + b := newEventBuffer(100, 1*time.Hour) + + for i := 11; i <= 100; i++ { + e := Event{ + Index: uint64(i), // Indexes should be contiguous + } + b.Append(uint64(i), []Event{e}) + } + + for _, tc := range cases { + t.Run(tc.desc, func(t *testing.T) { + got, offset := b.StartAtClosest(tc.req) + require.Equal(t, int(tc.expected), int(got.Index)) + require.Equal(t, tc.offset, offset) + }) + } +} diff --git a/nomad/stream/event_publisher.go b/nomad/stream/event_publisher.go new file mode 100644 index 000000000..016119b6e --- /dev/null +++ b/nomad/stream/event_publisher.go @@ -0,0 +1,195 @@ +package stream + +import ( + "context" + "sync" + "time" + + "github.com/hashicorp/go-hclog" +) + +const ( + DefaultTTL = 1 * time.Hour +) + +type EventPublisherCfg struct { + EventBufferSize int64 + EventBufferTTL time.Duration +} + +type EventPublisher struct { + // lock protects the eventbuffer + lock sync.Mutex + + // eventBuf stores a configurable amount of events in memory + eventBuf *eventBuffer + + // pruneTick is the duration to periodically prune events from the event + // buffer. Defaults to 5s + pruneTick time.Duration + + logger hclog.Logger + + subscriptions *subscriptions + + // publishCh is used to send messages from an active txn to a goroutine which + // publishes events, so that publishing can happen asynchronously from + // the Commit call in the FSM hot path. + publishCh chan changeEvents +} + +type subscriptions struct { + // lock for byToken. If both subscription.lock and EventPublisher.lock need + // to be held, EventPublisher.lock MUST always be acquired first. + lock sync.RWMutex + + // byToken is an mapping of active Subscriptions indexed by a token and + // a pointer to the request. + // When the token is modified all subscriptions under that token will be + // reloaded. + // A subscription may be unsubscribed by using the pointer to the request. + byToken map[string]map[*SubscribeRequest]*Subscription +} + +func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublisher { + if cfg.EventBufferTTL == 0 { + cfg.EventBufferTTL = 1 * time.Hour + } + buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL) + e := &EventPublisher{ + eventBuf: buffer, + publishCh: make(chan changeEvents), + subscriptions: &subscriptions{ + byToken: make(map[string]map[*SubscribeRequest]*Subscription), + }, + } + + go e.handleUpdates(ctx) + go e.periodicPrune(ctx) + + return e +} + +// Publish events to all subscribers of the event Topic. +func (e *EventPublisher) Publish(index uint64, events []Event) { + if len(events) > 0 { + e.publishCh <- changeEvents{index: index, events: events} + } +} + +// Subscribe returns a new Subscription for a given request. +func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) { + e.lock.Lock() + defer e.lock.Unlock() + + var head *bufferItem + var offset int + if req.Index != 0 { + head, offset = e.eventBuf.StartAtClosest(req.Index) + } else { + head = e.eventBuf.Head() + } + if offset > 0 { + e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index)) + } + + sub := newSubscription(req, head, func() {}) + + e.subscriptions.add(req, sub) + return sub, nil +} + +func (e *EventPublisher) handleUpdates(ctx context.Context) { + for { + select { + case <-ctx.Done(): + e.subscriptions.closeAll() + return + case update := <-e.publishCh: + e.sendEvents(update) + } + } +} + +func (e *EventPublisher) periodicPrune(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-time.After(e.pruneTick): + e.lock.Lock() + e.eventBuf.prune() + e.lock.Unlock() + } + } +} + +type changeEvents struct { + index uint64 + events []Event +} + +// sendEvents sends the given events to the publishers event buffer. +func (e *EventPublisher) sendEvents(update changeEvents) { + e.lock.Lock() + defer e.lock.Unlock() + + e.eventBuf.Append(update.index, update.events) +} + +func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) { + s.lock.Lock() + defer s.lock.Unlock() + + subsByToken, ok := s.byToken[req.Token] + if !ok { + subsByToken = make(map[*SubscribeRequest]*Subscription) + s.byToken[req.Token] = subsByToken + } + subsByToken[req] = sub +} + +func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) { + s.lock.RLock() + defer s.lock.RUnlock() + + for _, secretID := range tokenSecretIDs { + if subs, ok := s.byToken[secretID]; ok { + for _, sub := range subs { + sub.forceClose() + } + } + } +} + +// unsubscribe returns a function that the subscription will call to remove +// itself from the subsByToken. +// This function is returned as a closure so that the caller doesn't need to keep +// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the +// wrong pointer. +func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() { + return func() { + s.lock.Lock() + defer s.lock.Unlock() + + subsByToken, ok := s.byToken[req.Token] + if !ok { + return + } + delete(subsByToken, req) + if len(subsByToken) == 0 { + delete(s.byToken, req.Token) + } + } +} + +func (s *subscriptions) closeAll() { + s.lock.Lock() + defer s.lock.Unlock() + + for _, byRequest := range s.byToken { + for _, sub := range byRequest { + sub.forceClose() + } + } +} diff --git a/nomad/stream/event_publisher_test.go b/nomad/stream/event_publisher_test.go new file mode 100644 index 000000000..5085a6fd9 --- /dev/null +++ b/nomad/stream/event_publisher_test.go @@ -0,0 +1,134 @@ +package stream + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) { + subscription := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"sub-key"}, + }, + } + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + publisher := NewEventPublisher(ctx, EventPublisherCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL}) + sub, err := publisher.Subscribe(subscription) + require.NoError(t, err) + eventCh := consumeSubscription(ctx, sub) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + events := []Event{{ + Index: 1, + Topic: "Test", + Key: "sub-key", + Payload: "sample payload", + }} + publisher.Publish(1, events) + + // Subscriber should see the published event + result := nextResult(t, eventCh) + require.NoError(t, result.Err) + expected := []Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}} + require.Equal(t, expected, result.Events) + + // Now subscriber should block waiting for updates + assertNoResult(t, eventCh) + + // Publish a second event + events = []Event{{ + Index: 2, + Topic: "Test", + Key: "sub-key", + Payload: "sample payload 2", + }} + publisher.Publish(2, events) + + result = nextResult(t, eventCh) + require.NoError(t, result.Err) + expected = []Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}} + require.Equal(t, expected, result.Events) +} + +func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + publisher := NewEventPublisher(ctx, EventPublisherCfg{}) + + sub1, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub1.Unsubscribe() + + sub2, err := publisher.Subscribe(&SubscribeRequest{}) + require.NoError(t, err) + defer sub2.Unsubscribe() + + cancel() // Shutdown + + err = consumeSub(context.Background(), sub1) + require.Equal(t, err, ErrSubscriptionClosed) + + _, err = sub2.Next(context.Background()) + require.Equal(t, err, ErrSubscriptionClosed) +} + +func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult { + eventCh := make(chan subNextResult, 1) + go func() { + for { + es, err := sub.Next(ctx) + eventCh <- subNextResult{ + Events: es, + Err: err, + } + if err != nil { + return + } + } + }() + return eventCh +} + +type subNextResult struct { + Events []Event + Err error +} + +func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult { + t.Helper() + select { + case next := <-eventCh: + return next + case <-time.After(100 * time.Millisecond): + t.Fatalf("no event after 100ms") + } + return subNextResult{} +} + +func assertNoResult(t *testing.T, eventCh <-chan subNextResult) { + t.Helper() + select { + case next := <-eventCh: + require.NoError(t, next.Err) + require.Len(t, next.Events, 1) + t.Fatalf("received unexpected event: %#v", next.Events[0].Payload) + case <-time.After(100 * time.Millisecond): + } +} + +func consumeSub(ctx context.Context, sub *Subscription) error { + for { + _, err := sub.Next(ctx) + if err != nil { + return err + } + } +} diff --git a/nomad/stream/subscription.go b/nomad/stream/subscription.go new file mode 100644 index 000000000..6846e005b --- /dev/null +++ b/nomad/stream/subscription.go @@ -0,0 +1,136 @@ +package stream + +import ( + "context" + "errors" + "sync/atomic" +) + +const ( + // subscriptionStateOpen is the default state of a subscription. An open + // subscription may receive new events. + subscriptionStateOpen uint32 = 0 + + // subscriptionStateClosed indicates that the subscription was closed, possibly + // as a result of a change to an ACL token, and will not receive new events. + // The subscriber must issue a new Subscribe request. + subscriptionStateClosed uint32 = 1 +) + +// ErrSubscriptionClosed is a error signalling the subscription has been +// closed. The client should Unsubscribe, then re-Subscribe. +var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe") + +// type Subscriber struct { +// logger hclog.Logger +// } + +type Subscription struct { + // state is accessed atomically 0 means open, 1 means closed with reload + state uint32 + + req *SubscribeRequest + + // currentItem stores the current buffer item we are on. It + // is mutated by calls to Next. + currentItem *bufferItem + + // forceClosed is closed when forceClose is called. It is used by + // EventPublisher to cancel Next(). + forceClosed chan struct{} + + // unsub is a function set by EventPublisher that is called to free resources + // when the subscription is no longer needed. + // It must be safe to call the function from multiple goroutines and the function + // must be idempotent. + unsub func() +} + +type SubscribeRequest struct { + Token string + Index uint64 + + Topics map[Topic][]string +} + +func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription { + return &Subscription{ + forceClosed: make(chan struct{}), + req: req, + currentItem: item, + unsub: unsub, + } +} + +func (s *Subscription) Next(ctx context.Context) ([]Event, error) { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { + return nil, ErrSubscriptionClosed + } + + for { + next, err := s.currentItem.Next(ctx, s.forceClosed) + switch { + case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed: + return nil, ErrSubscriptionClosed + case err != nil: + return nil, err + } + s.currentItem = next + + events := filter(s.req, next.Events) + if len(events) == 0 { + continue + } + return events, nil + } +} + +func (s *Subscription) forceClose() { + swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) + if swapped { + close(s.forceClosed) + } +} + +func (s *Subscription) Unsubscribe() { + s.unsub() +} + +// filter events to only those that match a subscriptions topic/keys +func filter(req *SubscribeRequest, events []Event) []Event { + if len(events) == 0 { + return events + } + + var count int + for _, e := range events { + if _, ok := req.Topics[e.Topic]; ok { + for _, k := range req.Topics[e.Topic] { + if e.Key == k || k == AllKeys { + count++ + } + } + } + } + + // Only allocate a new slice if some events need to be filtered out + switch count { + case 0: + return nil + case len(events): + return events + } + + // Return filtered events + result := make([]Event, 0, count) + for _, e := range events { + if _, ok := req.Topics[e.Topic]; ok { + for _, k := range req.Topics[e.Topic] { + if e.Key == k || k == AllKeys { + result = append(result, e) + } + } + } + } + return result +} diff --git a/nomad/stream/subscription_test.go b/nomad/stream/subscription_test.go new file mode 100644 index 000000000..9d1f55720 --- /dev/null +++ b/nomad/stream/subscription_test.go @@ -0,0 +1,76 @@ +package stream + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSubscription(t *testing.T) { + +} + +func TestFilter_AllKeys(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"*"}, + }, + } + actual := filter(req, events) + require.Equal(t, events, actual) + + // ensure new array was not allocated + require.Equal(t, cap(actual), 5) +} + +func TestFilter_PartialMatch_Topic(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}, Event{Topic: "Exclude", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"*"}, + }, + } + actual := filter(req, events) + expected := []Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}} + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 2) +} + +func TestFilter_PartialMatch_Key(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "Test": []string{"One"}, + }, + } + actual := filter(req, events) + expected := []Event{{Topic: "Test", Key: "One"}} + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 1) +} + +func TestFilter_NoMatch(t *testing.T) { + events := make([]Event, 0, 5) + events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}) + + req := &SubscribeRequest{ + Topics: map[Topic][]string{ + "NodeEvents": []string{"*"}, + "Test": []string{"Highly-Specific-Key"}, + }, + } + actual := filter(req, events) + var expected []Event + require.Equal(t, expected, actual) + + require.Equal(t, cap(actual), 0) +}