mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 10:55:42 +03:00
Fix incorrect deletion of node conn
This PR fixes an issue where if the client has multiple connections to a server, a single connection closing would cause the routing to the node to be lost.
This commit is contained in:
@@ -22,6 +22,9 @@ type nodeConnState struct {
|
||||
|
||||
// Established is when the connection was established.
|
||||
Established time.Time
|
||||
|
||||
// Ctx is the full RPC context
|
||||
Ctx *RPCContext
|
||||
}
|
||||
|
||||
// getNodeConn returns the connection to the given node and whether it exists.
|
||||
@@ -55,6 +58,7 @@ func (s *Server) addNodeConn(ctx *RPCContext) {
|
||||
s.nodeConns[ctx.NodeID] = &nodeConnState{
|
||||
Session: ctx.Session,
|
||||
Established: time.Now(),
|
||||
Ctx: ctx,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -67,7 +71,20 @@ func (s *Server) removeNodeConn(ctx *RPCContext) {
|
||||
|
||||
s.nodeConnsLock.Lock()
|
||||
defer s.nodeConnsLock.Unlock()
|
||||
delete(s.nodeConns, ctx.NodeID)
|
||||
state, ok := s.nodeConns[ctx.NodeID]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
// It is important that we check that the connection being removed is the
|
||||
// actual stored connection for the client. It is possible for the client to
|
||||
// dial various addresses that all route to the same server. The most common
|
||||
// case for this is the original address the client uses to connect to the
|
||||
// server differs from the advertised address sent by the heartbeat.
|
||||
if state.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() &&
|
||||
state.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() {
|
||||
delete(s.nodeConns, ctx.NodeID)
|
||||
}
|
||||
}
|
||||
|
||||
// serverWithNodeConn is used to determine which remote server has the most
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package nomad
|
||||
|
||||
import (
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client"
|
||||
@@ -11,6 +12,61 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type namedConnWrapper struct {
|
||||
net.Conn
|
||||
name string
|
||||
}
|
||||
|
||||
type namedAddr string
|
||||
|
||||
func (n namedAddr) String() string { return string(n) }
|
||||
func (n namedAddr) Network() string { return string(n) }
|
||||
|
||||
func (n namedConnWrapper) LocalAddr() net.Addr {
|
||||
return namedAddr(n.name)
|
||||
}
|
||||
|
||||
func TestServer_removeNodeConn_differentAddrs(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
p1, p2 := net.Pipe()
|
||||
w1 := namedConnWrapper{
|
||||
Conn: p1,
|
||||
name: "a",
|
||||
}
|
||||
w2 := namedConnWrapper{
|
||||
Conn: p2,
|
||||
name: "b",
|
||||
}
|
||||
|
||||
// Add the connections
|
||||
nodeID := uuid.Generate()
|
||||
ctx1 := &RPCContext{
|
||||
Conn: w1,
|
||||
NodeID: nodeID,
|
||||
}
|
||||
ctx2 := &RPCContext{
|
||||
Conn: w2,
|
||||
NodeID: nodeID,
|
||||
}
|
||||
|
||||
s1.addNodeConn(ctx1)
|
||||
s1.addNodeConn(ctx2)
|
||||
require.Len(s1.connectedNodes(), 1)
|
||||
|
||||
// Delete the first
|
||||
s1.removeNodeConn(ctx1)
|
||||
require.Len(s1.connectedNodes(), 1)
|
||||
|
||||
// Delete the second
|
||||
s1.removeNodeConn(ctx2)
|
||||
require.Len(s1.connectedNodes(), 0)
|
||||
}
|
||||
|
||||
func TestServerWithNodeConn_NoPath(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
Reference in New Issue
Block a user