mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
client: fix send-after-close in broadcaster
This commit is contained in:
@@ -135,16 +135,16 @@ func (b *AllocBroadcaster) Listen() *AllocListener {
|
||||
|
||||
ch := make(chan *structs.Allocation, listenerCap)
|
||||
|
||||
// Broadcaster is already closed, close this listener
|
||||
if b.closed {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// Send last update if there was one
|
||||
if b.last != nil {
|
||||
ch <- b.last
|
||||
}
|
||||
|
||||
// Broadcaster is already closed, close this listener
|
||||
if b.closed {
|
||||
close(ch)
|
||||
}
|
||||
|
||||
b.listeners[b.nextId] = ch
|
||||
|
||||
return &AllocListener{ch, b, b.nextId}
|
||||
|
||||
@@ -184,3 +184,37 @@ func TestAllocBroadcaster_PrimeListener(t *testing.T) {
|
||||
t.Fatalf("expected to recieve initial value")
|
||||
}
|
||||
}
|
||||
|
||||
// TestAllocBroadcaster_Closed asserts that newly created listeners are
|
||||
// primed with the last sent alloc even when the broadcaster is closed.
|
||||
func TestAllocBroadcaster_Closed(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
b := NewAllocBroadcaster(testlog.HCLogger(t))
|
||||
|
||||
alloc := mock.Alloc()
|
||||
|
||||
// Send an update before creating a listener
|
||||
require.NoError(t, b.Send(alloc))
|
||||
|
||||
// Close the broadcaster after sending a single update
|
||||
b.Close()
|
||||
|
||||
// Create a listener and assert it immediately receives an update
|
||||
l := b.Listen()
|
||||
defer l.Close()
|
||||
select {
|
||||
case recv := <-l.Ch():
|
||||
require.Equal(t, alloc, recv)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("expected to recieve initial value")
|
||||
}
|
||||
|
||||
// Ch should now be closed.
|
||||
select {
|
||||
case _, ok := <-l.Ch():
|
||||
require.False(t, ok)
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
t.Fatalf("expected Ch() to be closed")
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user