make AllocBroadcaster easier to use

And test thoroughly.
This commit is contained in:
Michael Schurter
2018-08-30 16:59:16 -07:00
parent 9da25adc54
commit 12171cdc28
2 changed files with 206 additions and 17 deletions

View File

@@ -37,17 +37,9 @@ func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster {
}
}
// AllocListener implements a listening endpoint for an allocation broadcast
// channel.
type AllocListener struct {
// Ch receives the broadcast messages.
Ch <-chan *structs.Allocation
b *AllocBroadcaster
id int
}
// Send broadcasts an allocation update. Any pending updates are replaced with
// this version of the allocation to prevent blocking on slow receivers.
// Returns ErrAllocBroadcasterClosed if called after broadcaster is closed.
func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
b.m.Lock()
defer b.m.Unlock()
@@ -72,7 +64,7 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
}
// Close closes the channel, disabling the sending of further allocation
// updates.
// updates. Safe to call concurrently and more than once.
func (b *AllocBroadcaster) Close() {
b.m.Lock()
defer b.m.Unlock()
@@ -80,11 +72,36 @@ func (b *AllocBroadcaster) Close() {
return
}
b.alloc = nil
b.closed = true
// Close all listener chans
for _, l := range b.listeners {
close(l)
}
// Clear all references and mark broadcaster as closed
b.listeners = nil
b.alloc = nil
b.closed = true
}
// stop an individual listener
func (b *AllocBroadcaster) stop(id int) {
b.m.Lock()
defer b.m.Unlock()
// If broadcaster has been closed there's nothing more to do.
if b.closed {
return
}
l, ok := b.listeners[id]
if !ok {
// If this listener has been stopped already there's nothing
// more to do.
return
}
close(l)
delete(b.listeners, id)
}
// Listen returns a Listener for the broadcast channel.
@@ -114,9 +131,17 @@ func (b *AllocBroadcaster) Listen() *AllocListener {
return &AllocListener{ch, b, b.nextId}
}
// Close closes the Listener, disabling the receival of further messages.
func (l *AllocListener) Close() {
l.b.m.Lock()
defer l.b.m.Unlock()
delete(l.b.listeners, l.id)
// AllocListener implements a listening endpoint for an allocation broadcast
// channel.
type AllocListener struct {
// Ch receives the broadcast messages.
Ch <-chan *structs.Allocation
b *AllocBroadcaster
id int
}
// Close closes the Listener, disabling the receival of further messages. Safe
// to call more than once and concurrently with receiving on Ch.
func (l *AllocListener) Close() {
l.b.stop(l.id)
}

View File

@@ -0,0 +1,164 @@
package structs
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/stretchr/testify/require"
)
// TestAllocBroadcaster_SendRecv asserts the initial and latest sends to a
// broadcaster are received by listeners.
func TestAllocBroadcaster_SendRecv(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
alloc.AllocModifyIndex = 10
b := NewAllocBroadcaster(alloc.Copy())
defer b.Close()
// Create a listener and get the initial alloc
l := b.Listen()
defer l.Close()
initial := <-l.Ch
require.Equal(t, alloc.AllocModifyIndex, initial.AllocModifyIndex)
// Increment the index and send a new copy
alloc.AllocModifyIndex = 20
require.NoError(t, b.Send(alloc.Copy()))
recvd := <-l.Ch
require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex)
// Send two now copies and assert only the last was received
alloc.AllocModifyIndex = 30
require.NoError(t, b.Send(alloc.Copy()))
alloc.AllocModifyIndex = 40
require.NoError(t, b.Send(alloc.Copy()))
recvd = <-l.Ch
require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex)
}
// TestAllocBroadcaster_RecvBlocks asserts listeners are blocked until a send occurs.
func TestAllocBroadcaster_RecvBlocks(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
b := NewAllocBroadcaster(alloc.Copy())
defer b.Close()
l1 := b.Listen()
defer l1.Close()
l2 := b.Listen()
defer l2.Close()
// Every listener should get the initial alloc even when there hasn't
// been a Send()
require.NotNil(t, <-l1.Ch)
require.NotNil(t, <-l2.Ch)
done := make(chan int, 2)
// Subsequent listens should block until a subsequent send
go func() {
<-l1.Ch
done <- 1
}()
go func() {
<-l2.Ch
done <- 1
}()
select {
case <-done:
t.Fatalf("unexpected receive by a listener")
case <-time.After(10 * time.Millisecond):
}
// Do a Send and expect both listeners to receive it
b.Send(alloc)
<-done
<-done
}
// TestAllocBroadcaster_Concurrency asserts that the broadcaster behaves
// correctly with concurrent listeners being added and closed.
func TestAllocBroadcaster_Concurrency(t *testing.T) {
t.Parallel()
alloc := mock.Alloc()
b := NewAllocBroadcaster(alloc.Copy())
defer b.Close()
errs := make(chan error, 10)
listeners := make([]*AllocListener, 10)
for i := 0; i < len(listeners); i++ {
l := b.Listen()
defer l.Close()
listeners[i] = l
go func(index uint64, listener *AllocListener) {
defer listener.Close()
for {
a, ok := <-listener.Ch
if !ok {
return
}
if a.AllocModifyIndex < index {
errs <- fmt.Errorf("index=%d < %d", a.AllocModifyIndex, index)
return
}
index = a.AllocModifyIndex
}
}(alloc.AllocModifyIndex, l)
}
for i := 0; i < 100; i++ {
alloc.AllocModifyIndex++
require.NoError(t, b.Send(alloc.Copy()))
}
if len(errs) > 0 {
t.Fatalf("%d listener errors. First error:\n%v", len(errs), <-errs)
}
// Closing a couple shouldn't cause errors
listeners[0].Close()
listeners[1].Close()
for i := 0; i < 100; i++ {
alloc.AllocModifyIndex++
require.NoError(t, b.Send(alloc.Copy()))
}
if len(errs) > 0 {
t.Fatalf("%d listener errors. First error:\n%v", len(errs), <-errs)
}
// Closing the broadcaster *should* error
b.Close()
require.Equal(t, ErrAllocBroadcasterClosed, b.Send(alloc))
// All Listeners should be closed
for _, l := range listeners {
select {
case _, ok := <-l.Ch:
if ok {
// This check can beat the goroutine above to
// recv'ing the final update. Listener must be
// closed on next recv.
if _, ok := <-l.Ch; ok {
t.Fatalf("expected listener to be closed")
}
}
default:
t.Fatalf("expected listener to be closed; not blocking")
}
}
}