Event Buffer Implemenation

adds an event buffer to hold events from raft changes.

update events to use event buffer

fix append call

provide way to prune buffer items after TTL

event publisher tests

basic publish test

wire up max item ttl

rename package to stream, cleanup exploratory work

subscription filtering

subscription plumbing

allow subscribers to consume events, handle closing subscriptions

back out old exploratory ctx work

fix lint

remove unused ctx bits

add a few comments

fix test

stop publisher on abandon
This commit is contained in:
Drew Bailey
2020-08-31 13:19:28 -04:00
parent 38171b0dd6
commit e7e2c799c8
10 changed files with 1088 additions and 28 deletions

View File

@@ -1,13 +0,0 @@
package event
type Event struct {
Topic string
Key string
Index uint64
Payload interface{}
}
type EventPublisher struct{}
func NewPublisher() *EventPublisher { return &EventPublisher{} }
func (e EventPublisher) Publish(events []Event) {}

View File

@@ -4,7 +4,7 @@ import (
"fmt"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/stream"
)
// ReadTxn is implemented by memdb.Txn to perform read operations.
@@ -29,28 +29,28 @@ type Changes struct {
type changeTrackerDB struct {
db *memdb.MemDB
publisher eventPublisher
processChanges func(ReadTxn, Changes) ([]event.Event, error)
processChanges func(ReadTxn, Changes) ([]stream.Event, error)
}
func NewChangeTrackerDB(db *memdb.MemDB, publisher eventPublisher, changesFn changeProcessor) *changeTrackerDB {
return &changeTrackerDB{
db: db,
publisher: event.NewPublisher(),
publisher: publisher,
processChanges: changesFn,
}
}
type changeProcessor func(ReadTxn, Changes) ([]event.Event, error)
type changeProcessor func(ReadTxn, Changes) ([]stream.Event, error)
type eventPublisher interface {
Publish(events []event.Event)
Publish(index uint64, events []stream.Event)
}
// noOpPublisher satisfies the eventPublisher interface and does nothing
type noOpPublisher struct{}
func (n *noOpPublisher) Publish(events []event.Event) {}
func noOpProcessChanges(ReadTxn, Changes) ([]event.Event, error) { return []event.Event{}, nil }
func (n *noOpPublisher) Publish(index uint64, events []stream.Event) {}
func noOpProcessChanges(ReadTxn, Changes) ([]stream.Event, error) { return []stream.Event{}, nil }
// ReadTxn returns a read-only transaction which behaves exactly the same as
// memdb.Txn
@@ -89,7 +89,7 @@ func (c *changeTrackerDB) publish(changes Changes) error {
if err != nil {
return fmt.Errorf("failed generating events from changes: %v", err)
}
c.publisher.Publish(events)
c.publisher.Publish(changes.Index, events)
return nil
}
@@ -146,7 +146,7 @@ func (tx *txn) Commit() error {
return nil
}
func processDBChanges(tx ReadTxn, changes Changes) ([]event.Event, error) {
func processDBChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
// TODO: add handlers here.
return []event.Event{}, nil
return []stream.Event{}, nil
}

View File

@@ -13,7 +13,7 @@ import (
"github.com/pkg/errors"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/event"
"github.com/hashicorp/nomad/nomad/stream"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -64,6 +64,10 @@ type StateStore struct {
// abandonCh is used to signal watchers that this state store has been
// abandoned (usually during a restore). This is only ever closed.
abandonCh chan struct{}
// TODO: refactor abondonCh to use a context so that both can use the same
// cancel mechanism.
stopEventPublisher func()
}
// NewStateStore is used to create a new state store
@@ -75,12 +79,18 @@ func NewStateStore(config *StateStoreConfig) (*StateStore, error) {
}
// Create the state store
ctx, cancel := context.WithCancel(context.TODO())
s := &StateStore{
logger: config.Logger.Named("state_store"),
config: config,
abandonCh: make(chan struct{}),
logger: config.Logger.Named("state_store"),
config: config,
abandonCh: make(chan struct{}),
stopEventPublisher: cancel,
}
s.db = NewChangeTrackerDB(db, event.NewPublisher(), processDBChanges)
publisher := stream.NewEventPublisher(ctx, stream.EventPublisherCfg{
EventBufferTTL: 1 * time.Hour,
EventBufferSize: 250,
})
s.db = NewChangeTrackerDB(db, publisher, processDBChanges)
// Initialize the state store with required enterprise objects
if err := s.enterpriseInit(); err != nil {
@@ -189,6 +199,7 @@ func (s *StateStore) AbandonCh() <-chan struct{} {
// Abandon is used to signal that the given state store has been abandoned.
// Calling this more than one time will panic.
func (s *StateStore) Abandon() {
s.stopEventPublisher()
close(s.abandonCh)
}

14
nomad/stream/event.go Normal file
View File

@@ -0,0 +1,14 @@
package stream
const (
AllKeys = "*"
)
type Topic string
type Event struct {
Topic Topic
Key string
Index uint64
Payload interface{}
}

View File

@@ -0,0 +1,306 @@
package stream
import (
"context"
"errors"
"fmt"
"sync/atomic"
"time"
)
// eventBuffer is a single-writer, multiple-reader, fixed length concurrent
// buffer of events that have been published. The buffer is
// the head and tail of an atomically updated single-linked list. Atomic
// accesses are usually to be suspected as premature optimization but this
// specific design has several important features that significantly simplify a
// lot of our PubSub machinery.
//
// eventBuffer is an adaptation of conuls agent/stream/event eventBuffer but
// has been updated to be a max length buffer to work for Nomad's usecase.
//
// The eventBuffer only tracks the most recent set of published events,
// up to the max configured size, older events are dropped from the buffer
// but will only be garbage collected once the slowest reader drops the item.
// Consumers are notified of new events by closing a channel on the previous head
// allowing efficient broadcast to many watchers without having to run multiple
// goroutines or deliver to O(N) separate channels.
//
// Because eventBuffer is a linked list with atomically updated pointers, readers don't
// have to take a lock and can consume at their own pace. Slow readers can eventually
// append
//
// A new buffer is constructed with a sentinel "empty" bufferItem that has a nil
// Events array. This enables subscribers to start watching for the next update
// immediately.
//
// The zero value eventBuffer is _not_ usable, as it has not been
// initialized with an empty bufferItem so can not be used to wait for the first
// published event. Call newEventBuffer to construct a new buffer.
//
// Calls to Append or AppendBuffer that mutate the head must be externally
// synchronized. This allows systems that already serialize writes to append
// without lock overhead (e.g. a snapshot goroutine appending thousands of
// events).
type eventBuffer struct {
size *int64
head atomic.Value
tail atomic.Value
maxSize int64
maxItemTTL time.Duration
}
// newEventBuffer creates an eventBuffer ready for use.
func newEventBuffer(size int64, maxItemTTL time.Duration) *eventBuffer {
zero := int64(0)
b := &eventBuffer{
maxSize: size,
size: &zero,
maxItemTTL: maxItemTTL,
}
item := newBufferItem(0, nil)
b.head.Store(item)
b.tail.Store(item)
return b
}
// Append a set of events from one raft operation to the buffer and notify
// watchers. After calling append, the caller must not make any further
// mutations to the events as they may have been exposed to subscribers in other
// goroutines. Append only supports a single concurrent caller and must be
// externally synchronized with other Append, AppendBuffer or AppendErr calls.
func (b *eventBuffer) Append(index uint64, events []Event) {
b.appendItem(newBufferItem(index, events))
}
func (b *eventBuffer) appendItem(item *bufferItem) {
// Store the next item to the old tail
oldTail := b.Tail()
oldTail.link.next.Store(item)
// Update the tail to the new item
b.tail.Store(item)
// Increment the buffer size
size := atomic.AddInt64(b.size, 1)
// Check if we need to advance the head to keep the list
// constrained to max size
if size > b.maxSize {
b.advanceHead()
}
// notify waiters next event is available
close(oldTail.link.ch)
}
// advanceHead drops the current Head buffer item and notifies readers
// that the item should be discarded by closing droppedCh.
// Slow readers will prevent the old head from being GC'd until they
// discard it.
func (b *eventBuffer) advanceHead() {
old := b.Head()
next := old.link.next.Load()
close(old.link.droppedCh)
b.head.Store(next)
atomic.AddInt64(b.size, -1)
}
// Head returns the current head of the buffer. It will always exist but it may
// be a "sentinel" empty item with a nil Events slice to allow consumers to
// watch for the next update. Consumers should always check for empty Events and
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
// with NewEventBuffer
func (b *eventBuffer) Head() *bufferItem {
return b.head.Load().(*bufferItem)
}
// Tail returns the current tail of the buffer. It will always exist but it may
// be a "sentinel" empty item with a Nil Events slice to allow consumers to
// watch for the next update. Consumers should always check for empty Events and
// treat them as no-ops. Will panic if eventBuffer was not initialized correctly
// with NewEventBuffer
func (b *eventBuffer) Tail() *bufferItem {
return b.tail.Load().(*bufferItem)
}
// StarStartAtClosest returns the closest bufferItem to a requested starting
// index as well as the offset between the requested index and returned one.
func (b *eventBuffer) StartAtClosest(index uint64) (*bufferItem, int) {
item := b.Head()
if index < item.Index {
return item, int(item.Index) - int(index)
}
if item.Index == index {
return item, 0
}
for {
prev := item
item = item.NextNoBlock()
if item == nil {
return prev, int(index) - int(prev.Index)
}
if index < item.Index {
return item, int(item.Index) - int(index)
}
if index == item.Index {
return item, 0
}
}
}
// Len returns the current length of the buffer
func (b *eventBuffer) Len() int {
return int(atomic.LoadInt64(b.size))
}
// prune advances the head of the buffer until the head buffer item TTL
// is no longer expired. It should be externally synchronized as it mutates
// the buffer of items.
func (b *eventBuffer) prune() {
for {
head := b.Head()
if b.Len() == 0 {
return
}
if time.Since(head.createdAt) > b.maxItemTTL {
b.advanceHead()
} else {
return
}
}
}
// bufferItem represents a set of events published by a single raft operation.
// The first item returned by a newly constructed buffer will have nil Events.
// It is a sentinel value which is used to wait on the next events via Next.
//
// To iterate to the next event, a Next method may be called which may block if
// there is no next element yet.
//
// Holding a pointer to the item keeps all the events published since in memory
// so it's important that subscribers don't hold pointers to buffer items after
// they have been delivered except where it's intentional to maintain a cache or
// trailing store of events for performance reasons.
//
// Subscribers must not mutate the bufferItem or the Events or Encoded payloads
// inside as these are shared between all readers.
type bufferItem struct {
// Events is the set of events published at one raft index. This may be nil as
// a sentinel value to allow watching for the first event in a buffer. Callers
// should check and skip nil Events at any point in the buffer. It will also
// be nil if the producer appends an Error event because they can't complete
// the request to populate the buffer. Err will be non-nil in this case.
Events []Event
Index uint64
// Err is non-nil if the producer can't complete their task and terminates the
// buffer. Subscribers should return the error to clients and cease attempting
// to read from the buffer.
Err error
// link holds the next pointer and channel. This extra bit of indirection
// allows us to splice buffers together at arbitrary points without including
// events in one buffer just for the side-effect of watching for the next set.
// The link may not be mutated once the event is appended to a buffer.
link *bufferLink
createdAt time.Time
}
type bufferLink struct {
// next is an atomically updated pointer to the next event in the buffer. It
// is written exactly once by the single published and will always be set if
// ch is closed.
next atomic.Value
// ch is closed when the next event is published. It should never be mutated
// (e.g. set to nil) as that is racey, but is closed once when the next event
// is published. the next pointer will have been set by the time this is
// closed.
ch chan struct{}
// droppedCh is closed when the event is dropped from the buffer due to
// sizing constraints.
droppedCh chan struct{}
}
// newBufferItem returns a blank buffer item with a link and chan ready to have
// the fields set and be appended to a buffer.
func newBufferItem(index uint64, events []Event) *bufferItem {
return &bufferItem{
link: &bufferLink{
ch: make(chan struct{}),
droppedCh: make(chan struct{}),
},
Events: events,
Index: index,
createdAt: time.Now(),
}
}
// Next return the next buffer item in the buffer. It may block until ctx is
// cancelled or until the next item is published.
func (i *bufferItem) Next(ctx context.Context, forceClose <-chan struct{}) (*bufferItem, error) {
// See if there is already a next value, block if so. Note we don't rely on
// state change (chan nil) as that's not threadsafe but detecting close is.
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-forceClose:
return nil, fmt.Errorf("subscription closed")
case <-i.link.ch:
}
// Check if the reader is too slow and the event buffer as discarded the event
select {
case <-i.link.droppedCh:
return nil, fmt.Errorf("event dropped from buffer")
default:
}
// If channel closed, there must be a next item to read
nextRaw := i.link.next.Load()
if nextRaw == nil {
// shouldn't be possible
return nil, errors.New("invalid next item")
}
next := nextRaw.(*bufferItem)
if next.Err != nil {
return nil, next.Err
}
return next, nil
}
// NextNoBlock returns the next item in the buffer without blocking. If it
// reaches the most recent item it will return nil.
func (i *bufferItem) NextNoBlock() *bufferItem {
nextRaw := i.link.next.Load()
if nextRaw == nil {
return nil
}
return nextRaw.(*bufferItem)
}
// NextLink returns either the next item in the buffer if there is one, or
// an empty item (that will be ignored by subscribers) that has a pointer to
// the same link as this bufferItem (but none of the bufferItem content).
// When the link.ch is closed, subscriptions will be notified of the next item.
func (i *bufferItem) NextLink() *bufferItem {
next := i.NextNoBlock()
if next == nil {
// Return an empty item that can be followed to the next item published.
return &bufferItem{link: i.link}
}
return next
}

View File

@@ -0,0 +1,201 @@
package stream
import (
"context"
"fmt"
"math/rand"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEventBufferFuzz(t *testing.T) {
nReaders := 1000
nMessages := 1000
b := newEventBuffer(1000, DefaultTTL)
// Start a write goroutine that will publish 10000 messages with sequential
// indexes and some jitter in timing (to allow clients to "catch up" and block
// waiting for updates).
go func() {
seed := time.Now().UnixNano()
t.Logf("Using seed %d", seed)
// z is a Zipfian distribution that gives us a number of milliseconds to
// sleep which are mostly low - near zero but occasionally spike up to near
// 100.
z := rand.NewZipf(rand.New(rand.NewSource(seed)), 1.5, 1.5, 50)
for i := 0; i < nMessages; i++ {
// Event content is arbitrary and not valid for our use of buffers in
// streaming - here we only care about the semantics of the buffer.
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
// Sleep sometimes for a while to let some subscribers catch up
wait := time.Duration(z.Uint64()) * time.Millisecond
time.Sleep(wait)
}
}()
// Run n subscribers following and verifying
errCh := make(chan error, nReaders)
// Load head here so all subscribers start from the same point or they might
// not run until several appends have already happened.
head := b.Head()
for i := 0; i < nReaders; i++ {
go func(i int) {
expect := uint64(0)
item := head
var err error
for {
item, err = item.Next(context.Background(), nil)
if err != nil {
errCh <- fmt.Errorf("subscriber %05d failed getting next %d: %s", i,
expect, err)
return
}
if item.Events[0].Index != expect {
errCh <- fmt.Errorf("subscriber %05d got bad event want=%d, got=%d", i,
expect, item.Events[0].Index)
return
}
expect++
if expect == uint64(nMessages) {
// Succeeded
errCh <- nil
return
}
}
}(i)
}
// Wait for all readers to finish one way or other
for i := 0; i < nReaders; i++ {
err := <-errCh
assert.NoError(t, err)
}
}
func TestEventBuffer_Slow_Reader(t *testing.T) {
b := newEventBuffer(10, DefaultTTL)
for i := 0; i < 10; i++ {
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
}
head := b.Head()
for i := 10; i < 15; i++ {
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
}
// Ensure the slow reader errors to handle dropped events and
// fetch latest head
ev, err := head.Next(context.Background(), nil)
require.Error(t, err)
require.Nil(t, ev)
newHead := b.Head()
require.Equal(t, 4, int(newHead.Index))
}
func TestEventBuffer_Size(t *testing.T) {
b := newEventBuffer(100, DefaultTTL)
for i := 0; i < 10; i++ {
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
}
require.Equal(t, 10, b.Len())
}
// TestEventBuffer_Prune_AllOld tests the behavior when all items
// are past their TTL, the event buffer should prune down to the last message
// and hold onto the last item.
func TestEventBuffer_Prune_AllOld(t *testing.T) {
b := newEventBuffer(100, 1*time.Second)
for i := 0; i < 10; i++ {
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
}
require.Equal(t, 10, int(b.Len()))
time.Sleep(1 * time.Second)
b.prune()
require.Equal(t, 9, int(b.Head().Index))
require.Equal(t, 0, b.Len())
}
func TestStartAt_CurrentIdx_Past_Start(t *testing.T) {
cases := []struct {
desc string
req uint64
expected uint64
offset int
}{
{
desc: "requested index less than head receives head",
req: 10,
expected: 11,
offset: 1,
},
{
desc: "requested exact match head",
req: 11,
expected: 11,
offset: 0,
},
{
desc: "requested exact match",
req: 42,
expected: 42,
offset: 0,
},
{
desc: "requested index greater than tail receives tail",
req: 500,
expected: 100,
offset: 400,
},
}
// buffer starts at index 11 goes to 100
b := newEventBuffer(100, 1*time.Hour)
for i := 11; i <= 100; i++ {
e := Event{
Index: uint64(i), // Indexes should be contiguous
}
b.Append(uint64(i), []Event{e})
}
for _, tc := range cases {
t.Run(tc.desc, func(t *testing.T) {
got, offset := b.StartAtClosest(tc.req)
require.Equal(t, int(tc.expected), int(got.Index))
require.Equal(t, tc.offset, offset)
})
}
}

View File

@@ -0,0 +1,195 @@
package stream
import (
"context"
"sync"
"time"
"github.com/hashicorp/go-hclog"
)
const (
DefaultTTL = 1 * time.Hour
)
type EventPublisherCfg struct {
EventBufferSize int64
EventBufferTTL time.Duration
}
type EventPublisher struct {
// lock protects the eventbuffer
lock sync.Mutex
// eventBuf stores a configurable amount of events in memory
eventBuf *eventBuffer
// pruneTick is the duration to periodically prune events from the event
// buffer. Defaults to 5s
pruneTick time.Duration
logger hclog.Logger
subscriptions *subscriptions
// publishCh is used to send messages from an active txn to a goroutine which
// publishes events, so that publishing can happen asynchronously from
// the Commit call in the FSM hot path.
publishCh chan changeEvents
}
type subscriptions struct {
// lock for byToken. If both subscription.lock and EventPublisher.lock need
// to be held, EventPublisher.lock MUST always be acquired first.
lock sync.RWMutex
// byToken is an mapping of active Subscriptions indexed by a token and
// a pointer to the request.
// When the token is modified all subscriptions under that token will be
// reloaded.
// A subscription may be unsubscribed by using the pointer to the request.
byToken map[string]map[*SubscribeRequest]*Subscription
}
func NewEventPublisher(ctx context.Context, cfg EventPublisherCfg) *EventPublisher {
if cfg.EventBufferTTL == 0 {
cfg.EventBufferTTL = 1 * time.Hour
}
buffer := newEventBuffer(cfg.EventBufferSize, cfg.EventBufferTTL)
e := &EventPublisher{
eventBuf: buffer,
publishCh: make(chan changeEvents),
subscriptions: &subscriptions{
byToken: make(map[string]map[*SubscribeRequest]*Subscription),
},
}
go e.handleUpdates(ctx)
go e.periodicPrune(ctx)
return e
}
// Publish events to all subscribers of the event Topic.
func (e *EventPublisher) Publish(index uint64, events []Event) {
if len(events) > 0 {
e.publishCh <- changeEvents{index: index, events: events}
}
}
// Subscribe returns a new Subscription for a given request.
func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error) {
e.lock.Lock()
defer e.lock.Unlock()
var head *bufferItem
var offset int
if req.Index != 0 {
head, offset = e.eventBuf.StartAtClosest(req.Index)
} else {
head = e.eventBuf.Head()
}
if offset > 0 {
e.logger.Warn("requested index no longer in buffer", "requsted", int(req.Index), "closest", int(head.Index))
}
sub := newSubscription(req, head, func() {})
e.subscriptions.add(req, sub)
return sub, nil
}
func (e *EventPublisher) handleUpdates(ctx context.Context) {
for {
select {
case <-ctx.Done():
e.subscriptions.closeAll()
return
case update := <-e.publishCh:
e.sendEvents(update)
}
}
}
func (e *EventPublisher) periodicPrune(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-time.After(e.pruneTick):
e.lock.Lock()
e.eventBuf.prune()
e.lock.Unlock()
}
}
}
type changeEvents struct {
index uint64
events []Event
}
// sendEvents sends the given events to the publishers event buffer.
func (e *EventPublisher) sendEvents(update changeEvents) {
e.lock.Lock()
defer e.lock.Unlock()
e.eventBuf.Append(update.index, update.events)
}
func (s *subscriptions) add(req *SubscribeRequest, sub *Subscription) {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
subsByToken = make(map[*SubscribeRequest]*Subscription)
s.byToken[req.Token] = subsByToken
}
subsByToken[req] = sub
}
func (s *subscriptions) closeSubscriptionsForTokens(tokenSecretIDs []string) {
s.lock.RLock()
defer s.lock.RUnlock()
for _, secretID := range tokenSecretIDs {
if subs, ok := s.byToken[secretID]; ok {
for _, sub := range subs {
sub.forceClose()
}
}
}
}
// unsubscribe returns a function that the subscription will call to remove
// itself from the subsByToken.
// This function is returned as a closure so that the caller doesn't need to keep
// track of the SubscriptionRequest, and can not accidentally call unsubscribe with the
// wrong pointer.
func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
return func() {
s.lock.Lock()
defer s.lock.Unlock()
subsByToken, ok := s.byToken[req.Token]
if !ok {
return
}
delete(subsByToken, req)
if len(subsByToken) == 0 {
delete(s.byToken, req.Token)
}
}
}
func (s *subscriptions) closeAll() {
s.lock.Lock()
defer s.lock.Unlock()
for _, byRequest := range s.byToken {
for _, sub := range byRequest {
sub.forceClose()
}
}
}

View File

@@ -0,0 +1,134 @@
package stream
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestEventPublisher_PublishChangesAndSubscribe(t *testing.T) {
subscription := &SubscribeRequest{
Topics: map[Topic][]string{
"Test": []string{"sub-key"},
},
}
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
publisher := NewEventPublisher(ctx, EventPublisherCfg{EventBufferSize: 100, EventBufferTTL: DefaultTTL})
sub, err := publisher.Subscribe(subscription)
require.NoError(t, err)
eventCh := consumeSubscription(ctx, sub)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
events := []Event{{
Index: 1,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload",
}}
publisher.Publish(1, events)
// Subscriber should see the published event
result := nextResult(t, eventCh)
require.NoError(t, result.Err)
expected := []Event{{Payload: "sample payload", Key: "sub-key", Topic: "Test", Index: 1}}
require.Equal(t, expected, result.Events)
// Now subscriber should block waiting for updates
assertNoResult(t, eventCh)
// Publish a second event
events = []Event{{
Index: 2,
Topic: "Test",
Key: "sub-key",
Payload: "sample payload 2",
}}
publisher.Publish(2, events)
result = nextResult(t, eventCh)
require.NoError(t, result.Err)
expected = []Event{{Payload: "sample payload 2", Key: "sub-key", Topic: "Test", Index: 2}}
require.Equal(t, expected, result.Events)
}
func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
publisher := NewEventPublisher(ctx, EventPublisherCfg{})
sub1, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub1.Unsubscribe()
sub2, err := publisher.Subscribe(&SubscribeRequest{})
require.NoError(t, err)
defer sub2.Unsubscribe()
cancel() // Shutdown
err = consumeSub(context.Background(), sub1)
require.Equal(t, err, ErrSubscriptionClosed)
_, err = sub2.Next(context.Background())
require.Equal(t, err, ErrSubscriptionClosed)
}
func consumeSubscription(ctx context.Context, sub *Subscription) <-chan subNextResult {
eventCh := make(chan subNextResult, 1)
go func() {
for {
es, err := sub.Next(ctx)
eventCh <- subNextResult{
Events: es,
Err: err,
}
if err != nil {
return
}
}
}()
return eventCh
}
type subNextResult struct {
Events []Event
Err error
}
func nextResult(t *testing.T, eventCh <-chan subNextResult) subNextResult {
t.Helper()
select {
case next := <-eventCh:
return next
case <-time.After(100 * time.Millisecond):
t.Fatalf("no event after 100ms")
}
return subNextResult{}
}
func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
t.Helper()
select {
case next := <-eventCh:
require.NoError(t, next.Err)
require.Len(t, next.Events, 1)
t.Fatalf("received unexpected event: %#v", next.Events[0].Payload)
case <-time.After(100 * time.Millisecond):
}
}
func consumeSub(ctx context.Context, sub *Subscription) error {
for {
_, err := sub.Next(ctx)
if err != nil {
return err
}
}
}

View File

@@ -0,0 +1,136 @@
package stream
import (
"context"
"errors"
"sync/atomic"
)
const (
// subscriptionStateOpen is the default state of a subscription. An open
// subscription may receive new events.
subscriptionStateOpen uint32 = 0
// subscriptionStateClosed indicates that the subscription was closed, possibly
// as a result of a change to an ACL token, and will not receive new events.
// The subscriber must issue a new Subscribe request.
subscriptionStateClosed uint32 = 1
)
// ErrSubscriptionClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubscriptionClosed = errors.New("subscription closed by server, client should resubscribe")
// type Subscriber struct {
// logger hclog.Logger
// }
type Subscription struct {
// state is accessed atomically 0 means open, 1 means closed with reload
state uint32
req *SubscribeRequest
// currentItem stores the current buffer item we are on. It
// is mutated by calls to Next.
currentItem *bufferItem
// forceClosed is closed when forceClose is called. It is used by
// EventPublisher to cancel Next().
forceClosed chan struct{}
// unsub is a function set by EventPublisher that is called to free resources
// when the subscription is no longer needed.
// It must be safe to call the function from multiple goroutines and the function
// must be idempotent.
unsub func()
}
type SubscribeRequest struct {
Token string
Index uint64
Topics map[Topic][]string
}
func newSubscription(req *SubscribeRequest, item *bufferItem, unsub func()) *Subscription {
return &Subscription{
forceClosed: make(chan struct{}),
req: req,
currentItem: item,
unsub: unsub,
}
}
func (s *Subscription) Next(ctx context.Context) ([]Event, error) {
if atomic.LoadUint32(&s.state) == subscriptionStateClosed {
return nil, ErrSubscriptionClosed
}
for {
next, err := s.currentItem.Next(ctx, s.forceClosed)
switch {
case err != nil && atomic.LoadUint32(&s.state) == subscriptionStateClosed:
return nil, ErrSubscriptionClosed
case err != nil:
return nil, err
}
s.currentItem = next
events := filter(s.req, next.Events)
if len(events) == 0 {
continue
}
return events, nil
}
}
func (s *Subscription) forceClose() {
swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed)
if swapped {
close(s.forceClosed)
}
}
func (s *Subscription) Unsubscribe() {
s.unsub()
}
// filter events to only those that match a subscriptions topic/keys
func filter(req *SubscribeRequest, events []Event) []Event {
if len(events) == 0 {
return events
}
var count int
for _, e := range events {
if _, ok := req.Topics[e.Topic]; ok {
for _, k := range req.Topics[e.Topic] {
if e.Key == k || k == AllKeys {
count++
}
}
}
}
// Only allocate a new slice if some events need to be filtered out
switch count {
case 0:
return nil
case len(events):
return events
}
// Return filtered events
result := make([]Event, 0, count)
for _, e := range events {
if _, ok := req.Topics[e.Topic]; ok {
for _, k := range req.Topics[e.Topic] {
if e.Key == k || k == AllKeys {
result = append(result, e)
}
}
}
}
return result
}

View File

@@ -0,0 +1,76 @@
package stream
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestSubscription(t *testing.T) {
}
func TestFilter_AllKeys(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"})
req := &SubscribeRequest{
Topics: map[Topic][]string{
"Test": []string{"*"},
},
}
actual := filter(req, events)
require.Equal(t, events, actual)
// ensure new array was not allocated
require.Equal(t, cap(actual), 5)
}
func TestFilter_PartialMatch_Topic(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"}, Event{Topic: "Exclude", Key: "Two"})
req := &SubscribeRequest{
Topics: map[Topic][]string{
"Test": []string{"*"},
},
}
actual := filter(req, events)
expected := []Event{{Topic: "Test", Key: "One"}, {Topic: "Test", Key: "Two"}}
require.Equal(t, expected, actual)
require.Equal(t, cap(actual), 2)
}
func TestFilter_PartialMatch_Key(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"})
req := &SubscribeRequest{
Topics: map[Topic][]string{
"Test": []string{"One"},
},
}
actual := filter(req, events)
expected := []Event{{Topic: "Test", Key: "One"}}
require.Equal(t, expected, actual)
require.Equal(t, cap(actual), 1)
}
func TestFilter_NoMatch(t *testing.T) {
events := make([]Event, 0, 5)
events = append(events, Event{Topic: "Test", Key: "One"}, Event{Topic: "Test", Key: "Two"})
req := &SubscribeRequest{
Topics: map[Topic][]string{
"NodeEvents": []string{"*"},
"Test": []string{"Highly-Specific-Key"},
},
}
actual := filter(req, events)
var expected []Event
require.Equal(t, expected, actual)
require.Equal(t, cap(actual), 0)
}