pool: track usage of incoming streams (#10710)

Track usage of incoming streams on a connection. Connections without
reference counts get marked as unused and reaped in a periodic job.

This fixes a bug where `alloc exec` and `alloc fs` sessions get terminated
unexpectedly. Previously, when a client heartbeats switches between
servers, the pool connection reaper eventually identifies the connection
as unused and closes it even if it has an active exec/fs sessions.

Fixes #10579
This commit is contained in:
Mahmood Ali
2021-06-07 10:22:37 -04:00
committed by GitHub
parent bdf2555b38
commit 3f7a5c1474
3 changed files with 57 additions and 26 deletions

View File

@@ -15,7 +15,6 @@ import (
inmem "github.com/hashicorp/nomad/helper/codec"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
)
// rpcEndpoints holds the RPC endpoints
@@ -282,30 +281,30 @@ func (c *Client) setupClientRpcServer(server *rpc.Server) {
// connection.
func (c *Client) rpcConnListener() {
// Make a channel for new connections.
conns := make(chan *yamux.Session, 4)
conns := make(chan *pool.Conn, 4)
c.connPool.SetConnListener(conns)
for {
select {
case <-c.shutdownCh:
return
case session, ok := <-conns:
case conn, ok := <-conns:
if !ok {
continue
}
go c.listenConn(session)
go c.listenConn(conn)
}
}
}
// listenConn is used to listen for connections being made from the server on
// pre-existing connection. This should be called in a goroutine.
func (c *Client) listenConn(s *yamux.Session) {
func (c *Client) listenConn(conn *pool.Conn) {
for {
conn, err := s.Accept()
stream, err := conn.AcceptStream()
if err != nil {
if s.IsClosed() {
if conn.IsClosed() {
return
}
@@ -313,7 +312,7 @@ func (c *Client) listenConn(s *yamux.Session) {
continue
}
go c.handleConn(conn)
go c.handleConn(stream)
metrics.IncrCounter([]string{"client", "rpc", "accept_conn"}, 1)
}
}

View File

@@ -56,12 +56,21 @@ type Conn struct {
clientLock sync.Mutex
}
// markForUse does all the bookkeeping required to ready a connection for use.
// markForUse does all the bookkeeping required to ready a connection for use,
// and ensure that active connections don't get reaped.
func (c *Conn) markForUse() {
c.lastUsed = time.Now()
atomic.AddInt32(&c.refCount, 1)
}
// releaseUse is the complement of `markForUse`, to free up the reference count
func (c *Conn) releaseUse() {
refCount := atomic.AddInt32(&c.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&c.shouldClose) == 1 {
c.Close()
}
}
func (c *Conn) Close() error {
return c.session.Close()
}
@@ -122,6 +131,40 @@ func (c *Conn) returnClient(client *StreamClient) {
}
}
func (c *Conn) IsClosed() bool {
return c.session.IsClosed()
}
func (c *Conn) AcceptStream() (net.Conn, error) {
s, err := c.session.AcceptStream()
if err != nil {
return nil, err
}
c.markForUse()
return &incomingStream{
Stream: s,
parent: c,
}, nil
}
// incomingStream wraps yamux.Stream but frees the underlying yamux.Session
// when closed
type incomingStream struct {
*yamux.Stream
parent *Conn
}
func (s *incomingStream) Close() error {
err := s.Stream.Close()
// always release parent even if error
s.parent.releaseUse()
return err
}
// ConnPool is used to maintain a connection pool to other
// Nomad servers. This is used to reduce the latency of
// RPC requests between servers. It is only used to pool
@@ -157,7 +200,7 @@ type ConnPool struct {
// connListener is used to notify a potential listener of a new connection
// being made.
connListener chan<- *yamux.Session
connListener chan<- *Conn
}
// NewPool is used to make a new connection pool
@@ -220,7 +263,7 @@ func (p *ConnPool) ReloadTLS(tlsWrap tlsutil.RegionWrapper) {
// SetConnListener is used to listen to new connections being made. The
// channel will be closed when the conn pool is closed or a new listener is set.
func (p *ConnPool) SetConnListener(l chan<- *yamux.Session) {
func (p *ConnPool) SetConnListener(l chan<- *Conn) {
p.Lock()
defer p.Unlock()
@@ -276,7 +319,7 @@ func (p *ConnPool) acquire(region string, addr net.Addr, version int) (*Conn, er
// If there is a connection listener, notify them of the new connection.
if p.connListener != nil {
select {
case p.connListener <- c.session:
case p.connListener <- c:
default:
}
}
@@ -386,14 +429,6 @@ func (p *ConnPool) clearConn(conn *Conn) {
}
}
// releaseConn is invoked when we are done with a conn to reduce the ref count
func (p *ConnPool) releaseConn(conn *Conn) {
refCount := atomic.AddInt32(&conn.refCount, -1)
if refCount == 0 && atomic.LoadInt32(&conn.shouldClose) == 1 {
conn.Close()
}
}
// getClient is used to get a usable client for an address and protocol version
func (p *ConnPool) getRPCClient(region string, addr net.Addr, version int) (*Conn, *StreamClient, error) {
retries := 0
@@ -408,7 +443,7 @@ START:
client, err := conn.getRPCClient()
if err != nil {
p.clearConn(conn)
p.releaseConn(conn)
conn.releaseUse()
// Try to redial, possible that the TCP session closed due to timeout
if retries == 0 {
@@ -448,6 +483,7 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
if err != nil {
return fmt.Errorf("rpc error: %w", err)
}
defer conn.releaseUse()
// Make the RPC call
err = msgpackrpc.CallWithCodec(sc.codec, method, args, reply)
@@ -461,8 +497,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
p.clearConn(conn)
}
p.releaseConn(conn)
// If the error is an RPC Coded error
// return the coded error without wrapping
if structs.IsErrRPCCoded(err) {
@@ -475,7 +509,6 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
// Done with the connection
conn.returnClient(sc)
p.releaseConn(conn)
return nil
}

View File

@@ -9,7 +9,6 @@ import (
"github.com/hashicorp/nomad/helper/freeport"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
"github.com/stretchr/testify/require"
)
@@ -47,7 +46,7 @@ func TestConnPool_ConnListener(t *testing.T) {
pool := newTestPool(t)
// Setup a listener
c := make(chan *yamux.Session, 1)
c := make(chan *Conn, 1)
pool.SetConnListener(c)
// Make an RPC