diff --git a/client/structs/funcs.go b/client/structs/broadcaster.go similarity index 80% rename from client/structs/funcs.go rename to client/structs/broadcaster.go index 021e95d2d..88b51ebf8 100644 --- a/client/structs/funcs.go +++ b/client/structs/broadcaster.go @@ -37,17 +37,9 @@ func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster { } } -// AllocListener implements a listening endpoint for an allocation broadcast -// channel. -type AllocListener struct { - // Ch receives the broadcast messages. - Ch <-chan *structs.Allocation - b *AllocBroadcaster - id int -} - // Send broadcasts an allocation update. Any pending updates are replaced with // this version of the allocation to prevent blocking on slow receivers. +// Returns ErrAllocBroadcasterClosed if called after broadcaster is closed. func (b *AllocBroadcaster) Send(v *structs.Allocation) error { b.m.Lock() defer b.m.Unlock() @@ -72,7 +64,7 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error { } // Close closes the channel, disabling the sending of further allocation -// updates. +// updates. Safe to call concurrently and more than once. func (b *AllocBroadcaster) Close() { b.m.Lock() defer b.m.Unlock() @@ -80,11 +72,36 @@ func (b *AllocBroadcaster) Close() { return } - b.alloc = nil - b.closed = true + // Close all listener chans for _, l := range b.listeners { close(l) } + + // Clear all references and mark broadcaster as closed + b.listeners = nil + b.alloc = nil + b.closed = true +} + +// stop an individual listener +func (b *AllocBroadcaster) stop(id int) { + b.m.Lock() + defer b.m.Unlock() + + // If broadcaster has been closed there's nothing more to do. + if b.closed { + return + } + + l, ok := b.listeners[id] + if !ok { + // If this listener has been stopped already there's nothing + // more to do. + return + } + + close(l) + delete(b.listeners, id) } // Listen returns a Listener for the broadcast channel. @@ -114,9 +131,17 @@ func (b *AllocBroadcaster) Listen() *AllocListener { return &AllocListener{ch, b, b.nextId} } -// Close closes the Listener, disabling the receival of further messages. -func (l *AllocListener) Close() { - l.b.m.Lock() - defer l.b.m.Unlock() - delete(l.b.listeners, l.id) +// AllocListener implements a listening endpoint for an allocation broadcast +// channel. +type AllocListener struct { + // Ch receives the broadcast messages. + Ch <-chan *structs.Allocation + b *AllocBroadcaster + id int +} + +// Close closes the Listener, disabling the receival of further messages. Safe +// to call more than once and concurrently with receiving on Ch. +func (l *AllocListener) Close() { + l.b.stop(l.id) } diff --git a/client/structs/broadcaster_test.go b/client/structs/broadcaster_test.go new file mode 100644 index 000000000..19c7ace2d --- /dev/null +++ b/client/structs/broadcaster_test.go @@ -0,0 +1,164 @@ +package structs + +import ( + "fmt" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/stretchr/testify/require" +) + +// TestAllocBroadcaster_SendRecv asserts the initial and latest sends to a +// broadcaster are received by listeners. +func TestAllocBroadcaster_SendRecv(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + alloc.AllocModifyIndex = 10 + + b := NewAllocBroadcaster(alloc.Copy()) + defer b.Close() + + // Create a listener and get the initial alloc + l := b.Listen() + defer l.Close() + initial := <-l.Ch + require.Equal(t, alloc.AllocModifyIndex, initial.AllocModifyIndex) + + // Increment the index and send a new copy + alloc.AllocModifyIndex = 20 + require.NoError(t, b.Send(alloc.Copy())) + recvd := <-l.Ch + require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex) + + // Send two now copies and assert only the last was received + alloc.AllocModifyIndex = 30 + require.NoError(t, b.Send(alloc.Copy())) + alloc.AllocModifyIndex = 40 + require.NoError(t, b.Send(alloc.Copy())) + + recvd = <-l.Ch + require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex) +} + +// TestAllocBroadcaster_RecvBlocks asserts listeners are blocked until a send occurs. +func TestAllocBroadcaster_RecvBlocks(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + b := NewAllocBroadcaster(alloc.Copy()) + defer b.Close() + + l1 := b.Listen() + defer l1.Close() + + l2 := b.Listen() + defer l2.Close() + + // Every listener should get the initial alloc even when there hasn't + // been a Send() + require.NotNil(t, <-l1.Ch) + require.NotNil(t, <-l2.Ch) + + done := make(chan int, 2) + + // Subsequent listens should block until a subsequent send + go func() { + <-l1.Ch + done <- 1 + }() + + go func() { + <-l2.Ch + done <- 1 + }() + + select { + case <-done: + t.Fatalf("unexpected receive by a listener") + case <-time.After(10 * time.Millisecond): + } + + // Do a Send and expect both listeners to receive it + b.Send(alloc) + <-done + <-done +} + +// TestAllocBroadcaster_Concurrency asserts that the broadcaster behaves +// correctly with concurrent listeners being added and closed. +func TestAllocBroadcaster_Concurrency(t *testing.T) { + t.Parallel() + + alloc := mock.Alloc() + b := NewAllocBroadcaster(alloc.Copy()) + defer b.Close() + + errs := make(chan error, 10) + listeners := make([]*AllocListener, 10) + for i := 0; i < len(listeners); i++ { + l := b.Listen() + defer l.Close() + + listeners[i] = l + go func(index uint64, listener *AllocListener) { + defer listener.Close() + for { + a, ok := <-listener.Ch + if !ok { + return + } + + if a.AllocModifyIndex < index { + errs <- fmt.Errorf("index=%d < %d", a.AllocModifyIndex, index) + return + } + index = a.AllocModifyIndex + } + }(alloc.AllocModifyIndex, l) + } + + for i := 0; i < 100; i++ { + alloc.AllocModifyIndex++ + require.NoError(t, b.Send(alloc.Copy())) + } + + if len(errs) > 0 { + t.Fatalf("%d listener errors. First error:\n%v", len(errs), <-errs) + } + + // Closing a couple shouldn't cause errors + listeners[0].Close() + listeners[1].Close() + + for i := 0; i < 100; i++ { + alloc.AllocModifyIndex++ + require.NoError(t, b.Send(alloc.Copy())) + } + + if len(errs) > 0 { + t.Fatalf("%d listener errors. First error:\n%v", len(errs), <-errs) + } + + // Closing the broadcaster *should* error + b.Close() + require.Equal(t, ErrAllocBroadcasterClosed, b.Send(alloc)) + + // All Listeners should be closed + for _, l := range listeners { + select { + case _, ok := <-l.Ch: + if ok { + // This check can beat the goroutine above to + // recv'ing the final update. Listener must be + // closed on next recv. + if _, ok := <-l.Ch; ok { + t.Fatalf("expected listener to be closed") + } + } + default: + t.Fatalf("expected listener to be closed; not blocking") + } + } +}