Files
nomad/helper/pool/pool_test.go
James Rasell 428f329cab rpc: Fix data race in yamux config modification for conn handling. (#25978)
The server RPC handler and RPC connection pool both use a shared
configuration object for custom yamux configuration. Both
sub-systems were modifying the shared object which could cause a
data race. The passed object is now cloned before being modified.

This changes also moves where the yamux configuration is cloned
and modified to the relevant constructor function. This avoids
performing a clone per connection handle or per new connection
generated in the RPC pool.
2025-06-05 08:05:46 +01:00

83 lines
1.8 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package pool
import (
"fmt"
"math"
"net"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/yamux"
"github.com/shoenig/test/must"
)
func newTestPool(t *testing.T) *ConnPool {
l := testlog.HCLogger(t)
p := NewPool(l, 1*time.Minute, 10, nil, yamux.DefaultConfig())
return p
}
func Test_NewPool(t *testing.T) {
// Generate a custom yamux configuration, so we can ensure this gets stored
// as expected.
yamuxConfig := yamux.DefaultConfig()
yamuxConfig.AcceptBacklog = math.MaxInt
testPool := NewPool(hclog.NewNullLogger(), 10*time.Second, 10, nil, yamuxConfig)
must.NotNil(t, testPool)
must.NotNil(t, testPool.yamuxCfg)
must.Eq(t, yamuxConfig.AcceptBacklog, testPool.yamuxCfg.AcceptBacklog)
}
func TestConnPool_ConnListener(t *testing.T) {
ports := ci.PortAllocator.Grab(1)
addrStr := fmt.Sprintf("127.0.0.1:%d", ports[0])
addr, err := net.ResolveTCPAddr("tcp", addrStr)
must.NoError(t, err)
exitCh := make(chan struct{})
defer close(exitCh)
go func() {
ln, listenErr := net.Listen("tcp", addrStr)
must.NoError(t, listenErr)
defer func() { _ = ln.Close() }()
conn, _ := ln.Accept()
defer func() { _ = conn.Close() }()
<-exitCh
}()
time.Sleep(100 * time.Millisecond)
// Create a test pool
pool := newTestPool(t)
// Setup a listener
c := make(chan *Conn, 1)
pool.SetConnListener(c)
// Make an RPC
_, err = pool.acquire("test", addr)
must.NoError(t, err)
// Assert we get a connection.
select {
case <-c:
case <-time.After(100 * time.Millisecond):
t.Fatalf("timeout")
}
// Test that the channel is closed when the pool shuts down.
err = pool.Shutdown()
must.NoError(t, err)
_, ok := <-c
must.False(t, ok)
}