mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Heartbeat uses client rpc advertise and server defaults server rpc advertise addr
This commit is contained in:
@@ -229,7 +229,7 @@ func convertServerConfig(agentConfig *Config, logOutput io.Writer) (*nomad.Confi
|
||||
// address that all servers should be able to communicate over RPC with.
|
||||
serverAddr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(serfAddr.IP.String(), fmt.Sprintf("%d", rpcAddr.Port)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to parse Serf advertise address %q: %v", agentConfig.AdvertiseAddrs.Serf, err)
|
||||
return nil, fmt.Errorf("Failed to resolve Serf advertise address %q: %v", agentConfig.AdvertiseAddrs.Serf, err)
|
||||
}
|
||||
|
||||
conf.SerfConfig.MemberlistConfig.AdvertiseAddr = serfAddr.IP.String()
|
||||
|
||||
@@ -178,10 +178,10 @@ func (n *Node) constructNodeServerInfoResponse(snap *state.StateSnapshot, reply
|
||||
|
||||
// Reply with config information required for future RPC requests
|
||||
reply.Servers = make([]*structs.NodeServerInfo, 0, len(n.srv.localPeers))
|
||||
for k, v := range n.srv.localPeers {
|
||||
for _, v := range n.srv.localPeers {
|
||||
reply.Servers = append(reply.Servers,
|
||||
&structs.NodeServerInfo{
|
||||
RPCAdvertiseAddr: string(k),
|
||||
RPCAdvertiseAddr: v.RPCAddr.String(),
|
||||
RPCMajorVersion: int32(v.MajorVersion),
|
||||
RPCMinorVersion: int32(v.MinorVersion),
|
||||
Datacenter: v.Datacenter,
|
||||
|
||||
@@ -2,6 +2,7 @@ package nomad
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
@@ -705,6 +706,45 @@ func TestClientEndpoint_UpdateStatus_HeartbeatOnly(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestClientEndpoint_UpdateStatus_HeartbeatOnly_Advertise(t *testing.T) {
|
||||
t.Parallel()
|
||||
require := require.New(t)
|
||||
|
||||
advAddr := "127.0.1.1:1234"
|
||||
adv, err := net.ResolveTCPAddr("tcp", advAddr)
|
||||
require.Nil(err)
|
||||
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.ClientRPCAdvertise = adv
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
// Create the register request
|
||||
node := mock.Node()
|
||||
reg := &structs.NodeRegisterRequest{
|
||||
Node: node,
|
||||
WriteRequest: structs.WriteRequest{Region: "global"},
|
||||
}
|
||||
|
||||
// Fetch the response
|
||||
var resp structs.NodeUpdateResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Check for heartbeat interval
|
||||
ttl := resp.HeartbeatTTL
|
||||
if ttl < s1.config.MinHeartbeatTTL || ttl > 2*s1.config.MinHeartbeatTTL {
|
||||
t.Fatalf("bad: %#v", ttl)
|
||||
}
|
||||
|
||||
// Check for heartbeat servers
|
||||
require.Len(resp.Servers, 1)
|
||||
require.Equal(resp.Servers[0].RPCAdvertiseAddr, advAddr)
|
||||
}
|
||||
|
||||
func TestClientEndpoint_UpdateDrain(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
|
||||
@@ -283,25 +283,24 @@ func NewServer(config *Config, consulCatalog consul.CatalogAPI, logger *log.Logg
|
||||
|
||||
// Create the server
|
||||
s := &Server{
|
||||
config: config,
|
||||
consulCatalog: consulCatalog,
|
||||
connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
logger: logger,
|
||||
tlsWrap: tlsWrap,
|
||||
rpcServer: rpc.NewServer(),
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
nodeConns: make(map[string]*nodeConnState),
|
||||
serverRpcAdvertise: config.ServerRPCAdvertise,
|
||||
peers: make(map[string][]*serverParts),
|
||||
localPeers: make(map[raft.ServerAddress]*serverParts),
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
evalBroker: evalBroker,
|
||||
blockedEvals: blockedEvals,
|
||||
planQueue: planQueue,
|
||||
rpcTLS: incomingTLS,
|
||||
aclCache: aclCache,
|
||||
shutdownCh: make(chan struct{}),
|
||||
config: config,
|
||||
consulCatalog: consulCatalog,
|
||||
connPool: pool.NewPool(config.LogOutput, serverRPCCache, serverMaxStreams, tlsWrap),
|
||||
logger: logger,
|
||||
tlsWrap: tlsWrap,
|
||||
rpcServer: rpc.NewServer(),
|
||||
streamingRpcs: structs.NewStreamingRpcRegistry(),
|
||||
nodeConns: make(map[string]*nodeConnState),
|
||||
peers: make(map[string][]*serverParts),
|
||||
localPeers: make(map[raft.ServerAddress]*serverParts),
|
||||
reconcileCh: make(chan serf.Member, 32),
|
||||
eventCh: make(chan serf.Event, 256),
|
||||
evalBroker: evalBroker,
|
||||
blockedEvals: blockedEvals,
|
||||
planQueue: planQueue,
|
||||
rpcTLS: incomingTLS,
|
||||
aclCache: aclCache,
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
// Create the periodic dispatcher for launching periodic jobs.
|
||||
@@ -909,14 +908,42 @@ func (s *Server) setupRPC(tlsWrap tlsutil.RegionWrapper) error {
|
||||
}
|
||||
|
||||
// Verify that we have a usable advertise address
|
||||
addr, ok := s.clientRpcAdvertise.(*net.TCPAddr)
|
||||
clientAddr, ok := s.clientRpcAdvertise.(*net.TCPAddr)
|
||||
if !ok {
|
||||
listener.Close()
|
||||
return fmt.Errorf("RPC advertise address is not a TCP Address: %v", addr)
|
||||
return fmt.Errorf("Client RPC advertise address is not a TCP Address: %v", clientAddr)
|
||||
}
|
||||
if addr.IP.IsUnspecified() {
|
||||
if clientAddr.IP.IsUnspecified() {
|
||||
listener.Close()
|
||||
return fmt.Errorf("RPC advertise address is not advertisable: %v", addr)
|
||||
return fmt.Errorf("Client RPC advertise address is not advertisable: %v", clientAddr)
|
||||
}
|
||||
|
||||
if s.config.ServerRPCAdvertise != nil {
|
||||
s.serverRpcAdvertise = s.config.ServerRPCAdvertise
|
||||
} else {
|
||||
// Default to the Serf Advertise + RPC Port
|
||||
serfIP := s.config.SerfConfig.MemberlistConfig.AdvertiseAddr
|
||||
if serfIP == "" {
|
||||
serfIP = s.config.SerfConfig.MemberlistConfig.BindAddr
|
||||
}
|
||||
|
||||
addr := net.JoinHostPort(serfIP, fmt.Sprintf("%d", clientAddr.Port))
|
||||
resolved, err := net.ResolveTCPAddr("tcp", addr)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to resolve Server RPC advertise address: %v", err)
|
||||
}
|
||||
|
||||
s.serverRpcAdvertise = resolved
|
||||
}
|
||||
|
||||
// Verify that we have a usable advertise address
|
||||
serverAddr, ok := s.serverRpcAdvertise.(*net.TCPAddr)
|
||||
if !ok {
|
||||
return fmt.Errorf("Server RPC advertise address is not a TCP Address: %v", serverAddr)
|
||||
}
|
||||
if serverAddr.IP.IsUnspecified() {
|
||||
listener.Close()
|
||||
return fmt.Errorf("Server RPC advertise address is not advertisable: %v", serverAddr)
|
||||
}
|
||||
|
||||
wrapper := tlsutil.RegionSpecificWrapper(s.config.Region, tlsWrap)
|
||||
|
||||
@@ -424,6 +424,7 @@ func TestServer_InvalidSchedulers(t *testing.T) {
|
||||
config := DefaultConfig()
|
||||
config.DevMode = true
|
||||
config.LogOutput = testlog.NewWriter(t)
|
||||
config.SerfConfig.MemberlistConfig.BindAddr = "127.0.0.1"
|
||||
logger := log.New(config.LogOutput, "", log.LstdFlags)
|
||||
catalog := consul.NewMockCatalog(logger)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user