diff --git a/command/agent/agent.go b/command/agent/agent.go index ff0ed3029..d0bdd4abc 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -229,7 +229,7 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi // address that all servers should be able to communicate over RPC with. serverAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(serfAddr.IP.String(), fmt.Sprintf("%d", rpcAddr.Port))) if err != nil { - return nil, fmt.Errorf("Failed to parse Serf advertise address %q: %v", agentConfig.AdvertiseAddrs.Serf, err) + return nil, fmt.Errorf("Failed to resolve Serf advertise address %q: %v", agentConfig.AdvertiseAddrs.Serf, err) } conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String() diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index a8d3e5620..12fffbce2 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -178,10 +178,10 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply // Reply with config information required for future RPC requests reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers)) - for k, v := range n.srv.localPeers { + for _, v := range n.srv.localPeers { reply.Servers = append(reply.Servers, &structs.NodeServerInfo{ - RPCAdvertiseAddr: string(k), + RPCAdvertiseAddr: v.RPCAddr.String(), RPCMajorVersion: int32(v.MajorVersion), RPCMinorVersion: int32(v.MinorVersion), Datacenter: v.Datacenter, diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index fde2f4d3e..375ca8731 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2,6 +2,7 @@ package nomad import ( "fmt" + "net" "reflect" "strings" "testing" @@ -705,6 +706,45 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) { } } +func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) { + t.Parallel() + require := require.New(t) + + advAddr := "127.0.1.1:1234" + adv, err := net.ResolveTCPAddr("tcp", advAddr) + require.Nil(err) + + s1 := TestServer(t, func(c *Config) { + c.ClientRPCAdvertise = adv + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + node := mock.Node() + reg := &structs.NodeRegisterRequest{ + Node: node, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + + // Fetch the response + var resp structs.NodeUpdateResponse + if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + + // Check for heartbeat interval + ttl := resp.HeartbeatTTL + if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL { + t.Fatalf("bad: %#v", ttl) + } + + // Check for heartbeat servers + require.Len(resp.Servers, 1) + require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr) +} + func TestClientEndpoint_UpdateDrain(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) diff --git a/nomad/server.go b/nomad/server.go index ea3877081..68789da4a 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -283,25 +283,24 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg // Create the server s := &Server{ - config: config, - consulCatalog: consulCatalog, - connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), - logger: logger, - tlsWrap: tlsWrap, - rpcServer: rpc.NewServer(), - streamingRpcs: structs.NewStreamingRpcRegistry(), - nodeConns: make(map[string]*nodeConnState), - serverRpcAdvertise: config.ServerRPCAdvertise, - peers: make(map[string][]*serverParts), - localPeers: make(map[raft.ServerAddress]*serverParts), - reconcileCh: make(chan serf.Member, 32), - eventCh: make(chan serf.Event, 256), - evalBroker: evalBroker, - blockedEvals: blockedEvals, - planQueue: planQueue, - rpcTLS: incomingTLS, - aclCache: aclCache, - shutdownCh: make(chan struct{}), + config: config, + consulCatalog: consulCatalog, + connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap), + logger: logger, + tlsWrap: tlsWrap, + rpcServer: rpc.NewServer(), + streamingRpcs: structs.NewStreamingRpcRegistry(), + nodeConns: make(map[string]*nodeConnState), + peers: make(map[string][]*serverParts), + localPeers: make(map[raft.ServerAddress]*serverParts), + reconcileCh: make(chan serf.Member, 32), + eventCh: make(chan serf.Event, 256), + evalBroker: evalBroker, + blockedEvals: blockedEvals, + planQueue: planQueue, + rpcTLS: incomingTLS, + aclCache: aclCache, + shutdownCh: make(chan struct{}), } // Create the periodic dispatcher for launching periodic jobs. @@ -909,14 +908,42 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error { } // Verify that we have a usable advertise address - addr, ok := s.clientRpcAdvertise.(*net.TCPAddr) + clientAddr, ok := s.clientRpcAdvertise.(*net.TCPAddr) if !ok { listener.Close() - return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr) + return fmt.Errorf("Client RPC advertise address is not a TCP Address: %v", clientAddr) } - if addr.IP.IsUnspecified() { + if clientAddr.IP.IsUnspecified() { listener.Close() - return fmt.Errorf("RPC advertise address is not advertisable: %v", addr) + return fmt.Errorf("Client RPC advertise address is not advertisable: %v", clientAddr) + } + + if s.config.ServerRPCAdvertise != nil { + s.serverRpcAdvertise = s.config.ServerRPCAdvertise + } else { + // Default to the Serf Advertise + RPC Port + serfIP := s.config.SerfConfig.MemberlistConfig.AdvertiseAddr + if serfIP == "" { + serfIP = s.config.SerfConfig.MemberlistConfig.BindAddr + } + + addr := net.JoinHostPort(serfIP, fmt.Sprintf("%d", clientAddr.Port)) + resolved, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + return fmt.Errorf("Failed to resolve Server RPC advertise address: %v", err) + } + + s.serverRpcAdvertise = resolved + } + + // Verify that we have a usable advertise address + serverAddr, ok := s.serverRpcAdvertise.(*net.TCPAddr) + if !ok { + return fmt.Errorf("Server RPC advertise address is not a TCP Address: %v", serverAddr) + } + if serverAddr.IP.IsUnspecified() { + listener.Close() + return fmt.Errorf("Server RPC advertise address is not advertisable: %v", serverAddr) } wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap) diff --git a/nomad/server_test.go b/nomad/server_test.go index ba5ccf22c..826885773 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -424,6 +424,7 @@ func TestServer_InvalidSchedulers(t *testing.T) { config := DefaultConfig() config.DevMode = true config.LogOutput = testlog.NewWriter(t) + config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1" logger := log.New(config.LogOutput, "", log.LstdFlags) catalog := consul.NewMockCatalog(logger)