From 0d001d05f3e82d0fbdb85cea2ab60f07e2e04bf2 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 7 Feb 2018 11:04:30 -0800 Subject: [PATCH] Fix incorrect deletion of node conn This PR fixes an issue where if the client has multiple connections to a server, a single connection closing would cause the routing to the node to be lost. --- nomad/client_rpc.go | 19 +++++++++++++- nomad/client_rpc_test.go | 56 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+), 1 deletion(-) diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index 214f2c153..2b8776253 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -22,6 +22,9 @@ type nodeConnState struct { // Established is when the connection was established. Established time.Time + + // Ctx is the full RPC context + Ctx *RPCContext } // getNodeConn returns the connection to the given node and whether it exists. @@ -55,6 +58,7 @@ func (s *Server) addNodeConn(ctx *RPCContext) { s.nodeConns[ctx.NodeID] = &nodeConnState{ Session: ctx.Session, Established: time.Now(), + Ctx: ctx, } } @@ -67,7 +71,20 @@ func (s *Server) removeNodeConn(ctx *RPCContext) { s.nodeConnsLock.Lock() defer s.nodeConnsLock.Unlock() - delete(s.nodeConns, ctx.NodeID) + state, ok := s.nodeConns[ctx.NodeID] + if !ok { + return + } + + // It is important that we check that the connection being removed is the + // actual stored connection for the client. It is possible for the client to + // dial various addresses that all route to the same server. The most common + // case for this is the original address the client uses to connect to the + // server differs from the advertised address sent by the heartbeat. + if state.Ctx.Conn.LocalAddr().String() == ctx.Conn.LocalAddr().String() && + state.Ctx.Conn.RemoteAddr().String() == ctx.Conn.RemoteAddr().String() { + delete(s.nodeConns, ctx.NodeID) + } } // serverWithNodeConn is used to determine which remote server has the most diff --git a/nomad/client_rpc_test.go b/nomad/client_rpc_test.go index 44f423644..f29afcbef 100644 --- a/nomad/client_rpc_test.go +++ b/nomad/client_rpc_test.go @@ -1,6 +1,7 @@ package nomad import ( + "net" "testing" "github.com/hashicorp/nomad/client" @@ -11,6 +12,61 @@ import ( "github.com/stretchr/testify/require" ) +type namedConnWrapper struct { + net.Conn + name string +} + +type namedAddr string + +func (n namedAddr) String() string { return string(n) } +func (n namedAddr) Network() string { return string(n) } + +func (n namedConnWrapper) LocalAddr() net.Addr { + return namedAddr(n.name) +} + +func TestServer_removeNodeConn_differentAddrs(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + p1, p2 := net.Pipe() + w1 := namedConnWrapper{ + Conn: p1, + name: "a", + } + w2 := namedConnWrapper{ + Conn: p2, + name: "b", + } + + // Add the connections + nodeID := uuid.Generate() + ctx1 := &RPCContext{ + Conn: w1, + NodeID: nodeID, + } + ctx2 := &RPCContext{ + Conn: w2, + NodeID: nodeID, + } + + s1.addNodeConn(ctx1) + s1.addNodeConn(ctx2) + require.Len(s1.connectedNodes(), 1) + + // Delete the first + s1.removeNodeConn(ctx1) + require.Len(s1.connectedNodes(), 1) + + // Delete the second + s1.removeNodeConn(ctx2) + require.Len(s1.connectedNodes(), 0) +} + func TestServerWithNodeConn_NoPath(t *testing.T) { t.Parallel() require := require.New(t)