diff --git a/client/structs/broadcaster.go b/client/structs/broadcaster.go index f17d43953..07770ef04 100644 --- a/client/structs/broadcaster.go +++ b/client/structs/broadcaster.go @@ -135,16 +135,16 @@ func (b *AllocBroadcaster) Listen() *AllocListener { ch := make(chan *structs.Allocation, listenerCap) - // Broadcaster is already closed, close this listener - if b.closed { - close(ch) - } - // Send last update if there was one if b.last != nil { ch <- b.last } + // Broadcaster is already closed, close this listener + if b.closed { + close(ch) + } + b.listeners[b.nextId] = ch return &AllocListener{ch, b, b.nextId} diff --git a/client/structs/broadcaster_test.go b/client/structs/broadcaster_test.go index dac1b4ae0..37009758e 100644 --- a/client/structs/broadcaster_test.go +++ b/client/structs/broadcaster_test.go @@ -184,3 +184,37 @@ func TestAllocBroadcaster_PrimeListener(t *testing.T) { t.Fatalf("expected to recieve initial value") } } + +// TestAllocBroadcaster_Closed asserts that newly created listeners are +// primed with the last sent alloc even when the broadcaster is closed. +func TestAllocBroadcaster_Closed(t *testing.T) { + t.Parallel() + + b := NewAllocBroadcaster(testlog.HCLogger(t)) + + alloc := mock.Alloc() + + // Send an update before creating a listener + require.NoError(t, b.Send(alloc)) + + // Close the broadcaster after sending a single update + b.Close() + + // Create a listener and assert it immediately receives an update + l := b.Listen() + defer l.Close() + select { + case recv := <-l.Ch(): + require.Equal(t, alloc, recv) + case <-time.After(10 * time.Millisecond): + t.Fatalf("expected to recieve initial value") + } + + // Ch should now be closed. + select { + case _, ok := <-l.Ch(): + require.False(t, ok) + case <-time.After(10 * time.Millisecond): + t.Fatalf("expected Ch() to be closed") + } +}