Merge pull request #1735 from hashicorp/b-bootstrap-flapping

Retry all servers on RPC call failure
This commit is contained in:
Michael Schurter
2016-09-27 16:33:15 -07:00
committed by GitHub
11 changed files with 592 additions and 2033 deletions

View File

@@ -4,6 +4,7 @@ IMPROVEMENTS:
* core: Introduce node SecretID which can be used to minimize the available
surface area of RPCs to malicious Nomad Clients [GH-1597]
* cli: `nomad alloc-status` shows allocation creation time [GH-1623]
* client: Failed RPCs are retried on all servers [GH-1735]
* client: Enforce shared allocation directory disk usage [GH-1580]
* client: Introduce a `secrets/` directory to tasks where sensitive data can
be written [GH-1681]
@@ -19,6 +20,8 @@ BUG FIXES:
[GH-1668]
* discovery: Fix HTTP timeout with Server HTTP health check when there is no
leader [GH-1656]
* discovery: Fix client flapping when server is in a different datacenter as
the client [GH-1641]
## 0.4.1 (August 18, 2016)

View File

@@ -1,6 +1,7 @@
package client
import (
"errors"
"fmt"
"io/ioutil"
"log"
@@ -10,7 +11,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
@@ -21,7 +21,6 @@ import (
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/driver"
"github.com/hashicorp/nomad/client/fingerprint"
"github.com/hashicorp/nomad/client/rpcproxy"
"github.com/hashicorp/nomad/client/stats"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/command/agent/consul"
@@ -44,6 +43,10 @@ const (
// datacenters looking for the Nomad server service.
datacenterQueryLimit = 9
// consulReaperIntv is the interval at which the Consul reaper will
// run.
consulReaperIntv = 5 * time.Second
// registerRetryIntv is minimum interval on which we retry
// registration. We pick a value between this and 2x this.
registerRetryIntv = 15 * time.Second
@@ -58,11 +61,6 @@ const (
// stateSnapshotIntv is how often the client snapshots state
stateSnapshotIntv = 60 * time.Second
// registerErrGrace is the grace period where we don't log about
// register errors after start. This is to improve the user experience
// in dev mode where the leader isn't elected for a few seconds.
registerErrGrace = 10 * time.Second
// initialHeartbeatStagger is used to stagger the interval between
// starting and the intial heartbeat. After the intial heartbeat,
// we switch to using the TTL specified by the servers.
@@ -105,26 +103,22 @@ type Client struct {
logger *log.Logger
rpcProxy *rpcproxy.RPCProxy
connPool *nomad.ConnPool
// lastHeartbeatFromQuorum is an atomic int32 acting as a bool. When
// true, the last heartbeat message had a leader. When false (0),
// the last heartbeat did not include the RPC address of the leader,
// indicating that the server is in the minority or middle of an
// election.
lastHeartbeatFromQuorum int32
// servers is the (optionally prioritized) list of nomad servers
servers *serverlist
// consulPullHeartbeatDeadline is the deadline at which this Nomad
// Agent will begin polling Consul for a list of Nomad Servers. When
// Nomad Clients are heartbeating successfully with Nomad Servers,
// Nomad Clients do not poll Consul to populate their backup server
// list.
consulPullHeartbeatDeadline time.Time
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// heartbeat related times for tracking how often to heartbeat
lastHeartbeat time.Time
heartbeatTTL time.Duration
heartbeatLock sync.Mutex
// triggerDiscoveryCh triggers Consul discovery; see triggerDiscovery
triggerDiscoveryCh chan struct{}
// discovered will be ticked whenever Consul discovery completes
// succesfully
serversDiscoveredCh chan struct{}
// allocs is the current set of allocations
allocs map[string]*AllocRunner
@@ -154,20 +148,30 @@ type Client struct {
vaultClient vaultclient.VaultClient
}
var (
// noServersErr is returned by the RPC method when the client has no
// configured servers. This is used to trigger Consul discovery if
// enabled.
noServersErr = errors.New("no servers")
)
// NewClient is used to create a new client from the given configuration
func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logger) (*Client, error) {
// Create the client
c := &Client{
config: cfg,
consulSyncer: consulSyncer,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
config: cfg,
consulSyncer: consulSyncer,
start: time.Now(),
connPool: nomad.NewPool(cfg.LogOutput, clientRPCCache, clientMaxStreams, nil),
servers: newServerList(),
triggerDiscoveryCh: make(chan struct{}),
serversDiscoveredCh: make(chan struct{}),
logger: logger,
hostStatsCollector: stats.NewHostStatsCollector(),
allocs: make(map[string]*AllocRunner),
blockedAllocations: make(map[string]*structs.Allocation),
allocUpdates: make(chan *structs.Allocation, 64),
shutdownCh: make(chan struct{}),
}
// Initialize the client
@@ -199,20 +203,27 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
c.configCopy = c.config.Copy()
c.configLock.Unlock()
// Create the RPC Proxy and bootstrap with the preconfigured list of
// static servers
// Set the preconfigured list of static servers
c.configLock.RLock()
c.rpcProxy = rpcproxy.NewRPCProxy(c.logger, c.shutdownCh, c, c.connPool)
for _, serverAddr := range c.configCopy.Servers {
c.rpcProxy.AddPrimaryServer(serverAddr)
if len(c.configCopy.Servers) > 0 {
if err := c.SetServers(c.configCopy.Servers); err != nil {
logger.Printf("[WARN] client: None of the configured servers are valid: %v", err)
}
}
c.configLock.RUnlock()
// Setup the Consul syncer
if err := c.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("failed to create client Consul syncer: %v", err)
// Setup Consul discovery if enabled
if c.configCopy.ConsulConfig.ClientAutoJoin {
go c.consulDiscovery()
if len(c.servers.all()) == 0 {
// No configured servers; trigger discovery manually
c.triggerDiscoveryCh <- struct{}{}
}
}
// Start Consul reaper
go c.consulReaper()
// Setup the vault client for token and secret renewals
if err := c.setupVaultClient(); err != nil {
return nil, fmt.Errorf("failed to setup vault client: %v", err)
@@ -238,16 +249,6 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer, logger *log.Logg
// Start collecting stats
go c.collectHostStats()
// Start the RPCProxy maintenance task. This task periodically
// shuffles the list of Nomad Server Endpoints this Client will use
// when communicating with Nomad Servers via RPC. This is done in
// order to prevent server fixation in stable Nomad clusters. This
// task actively populates the active list of Nomad Server Endpoints
// from information from the Nomad Client heartbeats. If a heartbeat
// times out and there are no Nomad servers available, this data is
// populated by periodically polling Consul, if available.
go c.rpcProxy.Run()
return c, nil
}
@@ -350,33 +351,38 @@ func (c *Client) Shutdown() error {
return c.saveState()
}
// RPC is used to forward an RPC call to a nomad server, or fail if no servers
// RPC is used to forward an RPC call to a nomad server, or fail if no servers.
func (c *Client) RPC(method string, args interface{}, reply interface{}) error {
// Invoke the RPCHandler if it exists
if c.config.RPCHandler != nil {
return c.config.RPCHandler.RPC(method, args, reply)
}
// Pick a server to request from
server := c.rpcProxy.FindServer()
if server == nil {
return fmt.Errorf("no known servers")
servers := c.servers.all()
if len(servers) == 0 {
return noServersErr
}
// Make the RPC request
if err := c.connPool.RPC(c.Region(), server.Addr, c.RPCMajorVersion(), method, args, reply); err != nil {
c.rpcProxy.NotifyFailedServer(server)
return fmt.Errorf("RPC failed to server %s: %v", server.Addr, err)
var mErr multierror.Error
for _, s := range servers {
// Make the RPC request
if err := c.connPool.RPC(c.Region(), s.addr, c.RPCMajorVersion(), method, args, reply); err != nil {
errmsg := fmt.Errorf("RPC failed to server %s: %v", s.addr, err)
mErr.Errors = append(mErr.Errors, errmsg)
c.logger.Printf("[DEBUG] client: %v", errmsg)
c.servers.failed(s)
continue
}
c.servers.good(s)
return nil
}
return nil
return mErr.ErrorOrNil()
}
// Stats is used to return statistics for debugging and insight
// for various sub-systems
func (c *Client) Stats() map[string]map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
c.allocLock.RLock()
numAllocs := len(c.allocs)
c.allocLock.RUnlock()
@@ -386,8 +392,8 @@ func (c *Client) Stats() map[string]map[string]string {
stats := map[string]map[string]string{
"client": map[string]string{
"node_id": c.Node().ID,
"known_servers": toString(uint64(c.rpcProxy.NumServers())),
"num_allocations": toString(uint64(numAllocs)),
"known_servers": c.servers.all().String(),
"num_allocations": strconv.Itoa(numAllocs),
"last_heartbeat": fmt.Sprintf("%v", time.Since(c.lastHeartbeat)),
"heartbeat_ttl": fmt.Sprintf("%v", c.heartbeatTTL),
},
@@ -438,10 +444,44 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
return ar.ctx.AllocDir, nil
}
// AddPrimaryServerToRPCProxy adds serverAddr to the RPC Proxy's primary
// server list.
func (c *Client) AddPrimaryServerToRPCProxy(serverAddr string) *rpcproxy.ServerEndpoint {
return c.rpcProxy.AddPrimaryServer(serverAddr)
// GetServers returns the list of nomad servers this client is aware of.
func (c *Client) GetServers() []string {
endpoints := c.servers.all()
res := make([]string, len(endpoints))
for i := range endpoints {
res[i] = endpoints[i].addr.String()
}
return res
}
// SetServers sets a new list of nomad servers to connect to. As long as one
// server is resolvable no error is returned.
func (c *Client) SetServers(servers []string) error {
endpoints := make([]*endpoint, 0, len(servers))
var merr multierror.Error
for _, s := range servers {
addr, err := resolveServer(s)
if err != nil {
c.logger.Printf("[DEBUG] client: ignoring server %s due to resolution error: %v", s, err)
merr.Errors = append(merr.Errors, err)
continue
}
// Valid endpoint, append it without a priority as this API
// doesn't support different priorities for different servers
endpoints = append(endpoints, &endpoint{name: s, addr: addr})
}
// Only return errors if no servers are valid
if len(endpoints) == 0 {
if len(merr.Errors) > 0 {
return merr.ErrorOrNil()
}
return noServersErr
}
c.servers.set(endpoints)
return nil
}
// restoreState is used to restore our state from the data dir
@@ -780,29 +820,33 @@ func (c *Client) registerAndHeartbeat() {
for {
select {
case <-c.serversDiscoveredCh:
case <-heartbeat:
if err := c.updateNodeStatus(); err != nil {
// The servers have changed such that this node has not been
// registered before
if strings.Contains(err.Error(), "node not found") {
// Re-register the node
c.logger.Printf("[INFO] client: re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)
}
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}
case <-c.shutdownCh:
return
}
if err := c.updateNodeStatus(); err != nil {
// The servers have changed such that this node has not been
// registered before
if strings.Contains(err.Error(), "node not found") {
// Re-register the node
c.logger.Printf("[INFO] client: re-registering node")
c.retryRegisterNode()
heartbeat = time.After(lib.RandomStagger(initialHeartbeatStagger))
} else {
intv := c.retryIntv(registerRetryIntv)
c.logger.Printf("[ERR] client: heartbeating failed. Retrying in %v: %v", intv, err)
heartbeat = time.After(intv)
// if heartbeating fails, trigger Consul discovery
c.triggerDiscovery()
}
} else {
c.heartbeatLock.Lock()
heartbeat = time.After(c.heartbeatTTL)
c.heartbeatLock.Unlock()
}
}
}
@@ -868,14 +912,21 @@ func (c *Client) hasNodeChanged(oldAttrHash uint64, oldMetaHash uint64) (bool, u
// retryRegisterNode is used to register the node or update the registration and
// retry in case of failure.
func (c *Client) retryRegisterNode() {
// Register the client
for {
if err := c.registerNode(); err == nil {
break
err := c.registerNode()
if err == nil {
// Registered!
return
}
if err == noServersErr {
c.logger.Print("[DEBUG] client: registration waiting on servers")
c.triggerDiscovery()
} else {
c.logger.Printf("[ERR] client: %v", err)
c.logger.Printf("[ERR] client: registration failure: %v", err)
}
select {
case <-c.serversDiscoveredCh:
case <-time.After(c.retryIntv(registerRetryIntv)):
case <-c.shutdownCh:
return
@@ -892,9 +943,6 @@ func (c *Client) registerNode() error {
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.Register", &req, &resp); err != nil {
if time.Since(c.start) > registerErrGrace {
return fmt.Errorf("failed to register node: %v", err)
}
return err
}
@@ -903,7 +951,7 @@ func (c *Client) registerNode() error {
node.Status = structs.NodeStatusReady
c.configLock.Unlock()
c.logger.Printf("[DEBUG] client: node registration complete")
c.logger.Printf("[INFO] client: node registration complete")
if len(resp.EvalIDs) != 0 {
c.logger.Printf("[DEBUG] client: %d evaluations triggered by node registration", len(resp.EvalIDs))
}
@@ -917,6 +965,9 @@ func (c *Client) registerNode() error {
// updateNodeStatus is used to heartbeat and update the status of the node
func (c *Client) updateNodeStatus() error {
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
node := c.Node()
req := structs.NodeUpdateStatusRequest{
NodeID: node.ID,
@@ -925,6 +976,7 @@ func (c *Client) updateNodeStatus() error {
}
var resp structs.NodeUpdateResponse
if err := c.RPC("Node.UpdateStatus", &req, &resp); err != nil {
c.triggerDiscovery()
return fmt.Errorf("failed to update status: %v", err)
}
if len(resp.EvalIDs) != 0 {
@@ -934,14 +986,29 @@ func (c *Client) updateNodeStatus() error {
c.logger.Printf("[DEBUG] client: state updated to %s", req.Status)
}
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
// Update heartbeat time and ttl
c.lastHeartbeat = time.Now()
c.heartbeatTTL = resp.HeartbeatTTL
if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil {
return err
// Convert []*NodeServerInfo to []*endpoints
localdc := c.Datacenter()
servers := make(endpoints, 0, len(resp.Servers))
for _, s := range resp.Servers {
addr, err := resolveServer(s.RPCAdvertiseAddr)
if err != nil {
continue
}
e := endpoint{name: s.RPCAdvertiseAddr, addr: addr}
if s.Datacenter != localdc {
// server is non-local; de-prioritize
e.priority = 1
}
servers = append(servers, &e)
}
if len(servers) == 0 {
return fmt.Errorf("server returned no valid servers")
}
c.servers.set(servers)
// Begin polling Consul if there is no Nomad leader. We could be
// heartbeating to a Nomad server that is in the minority of a
@@ -949,13 +1016,9 @@ func (c *Client) updateNodeStatus() error {
// has connectivity to the existing majority of Nomad Servers, but
// only if it queries Consul.
if resp.LeaderRPCAddr == "" {
atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 1, 0)
return nil
c.triggerDiscovery()
}
const heartbeatFallbackFactor = 3
atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 0, 1)
c.consulPullHeartbeatDeadline = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL)
return nil
}
@@ -1079,9 +1142,20 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
resp = structs.NodeClientAllocsResponse{}
err := c.RPC("Node.GetClientAllocs", &req, &resp)
if err != nil {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
// Shutdown often causes EOF errors, so check for shutdown first
select {
case <-c.shutdownCh:
return
default:
}
if err != noServersErr {
c.logger.Printf("[ERR] client: failed to query for node allocations: %v", err)
}
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
continue
case <-time.After(retry):
continue
case <-c.shutdownCh:
@@ -1128,6 +1202,8 @@ func (c *Client) watchAllocations(updates chan *allocUpdates) {
c.logger.Printf("[ERR] client: failed to query updated allocations: %v", err)
retry := c.retryIntv(getAllocRetryIntv)
select {
case <-c.serversDiscoveredCh:
continue
case <-time.After(retry):
continue
case <-c.shutdownCh:
@@ -1396,169 +1472,176 @@ func (c *Client) deriveToken(alloc *structs.Allocation, taskNames []string, vcli
return unwrappedTokens, nil
}
// setupConsulSyncer creates Client-mode consul.Syncer which periodically
// executes callbacks on a fixed interval.
//
// TODO(sean@): this could eventually be moved to a priority queue and give
// each task an interval, but that is not necessary at this time.
func (c *Client) setupConsulSyncer() error {
// The bootstrapFn callback handler is used to periodically poll
// Consul to look up the Nomad Servers in Consul. In the event the
// heartbeat deadline has been exceeded and this Client is orphaned
// from its servers, periodically poll Consul to reattach this Client
// to its cluster and automatically recover from a detached state.
bootstrapFn := func() error {
now := time.Now()
c.heartbeatLock.Lock()
// triggerDiscovery causes a Consul discovery to begin (if one hasn't alread)
func (c *Client) triggerDiscovery() {
select {
case c.triggerDiscoveryCh <- struct{}{}:
// Discovery goroutine was released to execute
default:
// Discovery goroutine was already running
}
}
// If the last heartbeat didn't contain a leader, give the
// Nomad server this Agent is talking to one more attempt at
// providing a heartbeat that does contain a leader.
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
c.heartbeatLock.Unlock()
return nil
}
c.heartbeatLock.Unlock()
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. Future heartbeats will cause Nomad
// Clients to fixate on their local datacenter so
// it's okay to talk with remote DCs. If the no
// Nomad servers are available within
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
}
// Forward RPCs to our region
nomadRPCArgs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: c.Region(),
},
}
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
var mErr multierror.Error
const defaultMaxNumNomadServers = 8
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
// consulDiscovery waits for the signal to attempt server discovery via Consul.
// It's intended to be started in a goroutine. See triggerDiscovery() for
// causing consul discovery from other code locations.
func (c *Client) consulDiscovery() {
for {
select {
case <-c.triggerDiscoveryCh:
if err := c.consulDiscoveryImpl(); err != nil {
c.logger.Printf("[ERR] client.consul: error discovering nomad servers: %v", err)
}
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagRPC, consulOpts)
case <-c.shutdownCh:
return
}
}
}
func (c *Client) consulDiscoveryImpl() error {
// Acquire heartbeat lock to prevent heartbeat from running
// concurrently with discovery. Concurrent execution is safe, however
// discovery is usually triggered when heartbeating has failed so
// there's no point in allowing it.
c.heartbeatLock.Lock()
defer c.heartbeatLock.Unlock()
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
dcs, err := consulCatalog.Datacenters()
if err != nil {
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
}
if len(dcs) > 2 {
// Query the local DC first, then shuffle the
// remaining DCs. Future heartbeats will cause Nomad
// Clients to fixate on their local datacenter so
// it's okay to talk with remote DCs. If the no
// Nomad servers are available within
// datacenterQueryLimit, the next heartbeat will pick
// a new set of servers so it's okay.
shuffleStrings(dcs[1:])
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
}
// Query for servers in this client's region only
region := c.Region()
rpcargs := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Region: region,
},
}
serviceName := c.configCopy.ConsulConfig.ServerServiceName
var mErr multierror.Error
var servers endpoints
c.logger.Printf("[DEBUG] client.consul: bootstrap contacting following Consul DCs: %+q", dcs)
DISCOLOOP:
for _, dc := range dcs {
consulOpts := &consulapi.QueryOptions{
AllowStale: true,
Datacenter: dc,
Near: "_agent",
WaitTime: consul.DefaultQueryWaitDuration,
}
consulServices, _, err := consulCatalog.Service(serviceName, consul.ServiceTagRPC, consulOpts)
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", serviceName, dc, err))
continue
}
for _, s := range consulServices {
port := strconv.Itoa(s.ServicePort)
addrstr := s.ServiceAddress
if addrstr == "" {
addrstr = s.Address
}
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(addrstr, port))
if err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %+q from Consul datacenter %+q: %v", nomadServerServiceName, dc, err))
mErr.Errors = append(mErr.Errors, err)
continue
}
var peers []string
if err := c.connPool.RPC(region, addr, c.RPCMajorVersion(), "Status.Peers", rpcargs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
for _, s := range consulServices {
port := strconv.FormatInt(int64(s.ServicePort), 10)
addr := s.ServiceAddress
if addr == "" {
addr = s.Address
}
serverAddr := net.JoinHostPort(addr, port)
serverEndpoint, err := rpcproxy.NewServerEndpoint(serverAddr)
// Successfully received the Server peers list of the correct
// region
for _, p := range peers {
addr, err := net.ResolveTCPAddr("tcp", p)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
var peers []string
if err := c.connPool.RPC(c.Region(), serverEndpoint.Addr, c.RPCMajorVersion(), "Status.Peers", nomadRPCArgs, &peers); err != nil {
mErr.Errors = append(mErr.Errors, err)
continue
}
// Successfully received the Server peers list of the correct
// region
if len(peers) != 0 {
nomadServerServices = append(nomadServerServices, peers...)
break
}
servers = append(servers, &endpoint{name: p, addr: addr})
}
// Break if at least one Nomad Server was successfully pinged
if len(nomadServerServices) > 0 {
break
if len(servers) > 0 {
break DISCOLOOP
}
}
if len(nomadServerServices) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %q", nomadServerServiceName, dcs)
}
if len(servers) == 0 {
if len(mErr.Errors) > 0 {
return mErr.ErrorOrNil()
}
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %+q", serviceName, dcs)
}
// Log the servers we are adding
c.logger.Printf("[INFO] client.consul: bootstrap adding following Servers: %q", nomadServerServices)
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", servers)
c.servers.set(servers)
c.heartbeatLock.Lock()
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
c.heartbeatLock.Unlock()
// Common, healthy path
if err := c.rpcProxy.SetBackupServers(nomadServerServices); err != nil {
return fmt.Errorf("client.consul: unable to set backup servers: %v", err)
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
// isn't receiving on this chan yet they'll still retry eventually.
// This is a shortcircuit for the longer retry intervals.
for {
select {
case c.serversDiscoveredCh <- struct{}{}:
default:
return nil
}
}
}
// consulReaper periodically reaps unmatched domains from Consul. Intended to
// be called in its own goroutine. See consulReaperIntv for interval.
func (c *Client) consulReaper() {
ticker := time.NewTicker(consulReaperIntv)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.consulReaperImpl(); err != nil {
c.logger.Printf("[ERR] consul.client: error reaping services in consul: %v", err)
}
} else {
c.heartbeatLock.Unlock()
// If this Client is talking with a Server that
// doesn't have a leader, and we have exceeded the
// consulPullHeartbeatDeadline, change the call from
// SetBackupServers() to calling AddPrimaryServer()
// in order to allow the Clients to randomly begin
// considering all known Nomad servers and
// eventually, hopefully, find their way to a Nomad
// Server that has quorum (assuming Consul has a
// server list that is in the majority).
for _, s := range nomadServerServices {
c.rpcProxy.AddPrimaryServer(s)
case <-c.shutdownCh:
return
}
}
}
// consulReaperImpl reaps unmatched domains from Consul.
func (c *Client) consulReaperImpl() error {
const estInitialExecutorDomains = 8
// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
return nil
}
if c.config.ConsulConfig.ClientAutoJoin {
c.consulSyncer.AddPeriodicHandler("Nomad Client Fallback Server Handler", bootstrapFn)
}
consulServicesReaperFn := func() error {
const estInitialExecutorDomains = 8
// Create the domains to keep and add the server and client
domains := make([]consul.ServiceDomain, 2, estInitialExecutorDomains)
domains[0] = consul.ServerDomain
domains[1] = consul.ClientDomain
for allocID, ar := range c.getAllocRunners() {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
// Only keep running tasks
if taskState.State == structs.TaskStateRunning {
d := consul.NewExecutorDomain(allocID, taskName)
domains = append(domains, d)
}
}
}
return c.consulSyncer.ReapUnmatched(domains)
}
if c.config.ConsulConfig.AutoAdvertise {
c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesReaperFn)
}
return nil
return c.consulSyncer.ReapUnmatched(domains)
}
// collectHostStats collects host resource usage stats periodically
@@ -1617,7 +1700,18 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
}
}
// RPCProxy returns the Client's RPCProxy instance
func (c *Client) RPCProxy() *rpcproxy.RPCProxy {
return c.rpcProxy
// resolveServer given a sever's address as a string, return it's resolved
// net.Addr or an error.
func resolveServer(s string) (net.Addr, error) {
const defaultClientPort = "4647" // default client RPC port
host, port, err := net.SplitHostPort(s)
if err != nil {
if strings.Contains(err.Error(), "missing port") {
host = s
port = defaultClientPort
} else {
return nil, err
}
}
return net.ResolveTCPAddr("tcp", net.JoinHostPort(host, port))
}

View File

@@ -1,779 +0,0 @@
// Package rpcproxy provides a proxy interface to Nomad Servers. The
// RPCProxy periodically shuffles which server a Nomad Client communicates
// with in order to redistribute load across Nomad Servers. Nomad Servers
// that fail an RPC request are automatically cycled to the end of the list
// until the server list is reshuffled.
//
// The rpcproxy package does not provide any external API guarantees and
// should be called only by `hashicorp/nomad`.
package rpcproxy
import (
"fmt"
"log"
"math/rand"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// clientRPCJitterFraction determines the amount of jitter added to
// clientRPCMinReuseDuration before a connection is expired and a new
// connection is established in order to rebalance load across Nomad
// servers. The cluster-wide number of connections per second from
// rebalancing is applied after this jitter to ensure the CPU impact
// is always finite. See newRebalanceConnsPerSecPerServer's comment
// for additional commentary.
//
// For example, in a 10K Nomad cluster with 5x servers, this default
// averages out to ~13 new connections from rebalancing per server
// per second.
clientRPCJitterFraction = 2
// clientRPCMinReuseDuration controls the minimum amount of time RPC
// queries are sent over an established connection to a single server
clientRPCMinReuseDuration = 600 * time.Second
// Limit the number of new connections a server receives per second
// for connection rebalancing. This limit caps the load caused by
// continual rebalancing efforts when a cluster is in equilibrium. A
// lower value comes at the cost of increased recovery time after a
// partition. This parameter begins to take effect when there are
// more than ~48K clients querying 5x servers or at lower server
// counts when there is a partition.
//
// For example, in a 100K Nomad cluster with 5x servers, it will take
// ~5min for all servers to rebalance their connections. If 99,995
// agents are in the minority talking to only one server, it will
// take ~26min for all servers to rebalance. A 10K cluster in the
// same scenario will take ~2.6min to rebalance.
newRebalanceConnsPerSecPerServer = 64
// rpcAPIMismatchLogRate determines the rate at which log entries are
// emitted when the client and server's API versions are mismatched.
rpcAPIMismatchLogRate = 3 * time.Hour
)
// NomadConfigInfo is an interface wrapper around this Nomad Agent's
// configuration to prevents a cyclic import dependency.
type NomadConfigInfo interface {
Datacenter() string
RPCMajorVersion() int
RPCMinorVersion() int
Region() string
}
// Pinger is an interface wrapping client.ConnPool to prevent a
// cyclic import dependency
type Pinger interface {
PingNomadServer(region string, apiMajorVersion int, s *ServerEndpoint) (bool, error)
}
// serverList is an array of Nomad Servers. The first server in the list is
// the active server.
//
// NOTE(sean@): We are explicitly relying on the fact that serverList will be
// copied onto the stack by atomic.Value. Please keep this structure light.
type serverList struct {
L []*ServerEndpoint
}
// RPCProxy is the manager type responsible for returning and managing Nomad
// addresses.
type RPCProxy struct {
// activatedList manages the list of Nomad Servers that are eligible
// to be queried by the Client agent.
activatedList atomic.Value
activatedListLock sync.Mutex
// primaryServers is a list of servers found in the last heartbeat.
// primaryServers are periodically reshuffled. Covered by
// serverListLock.
primaryServers serverList
// backupServers is a list of fallback servers. These servers are
// appended to the RPCProxy's serverList, but are never shuffled with
// the list of servers discovered via the Nomad heartbeat. Covered
// by serverListLock.
backupServers serverList
// serverListLock covers both backupServers and primaryServers. If
// it is necessary to hold serverListLock and listLock, obtain an
// exclusive lock on serverListLock before listLock.
serverListLock sync.RWMutex
leaderAddr string
numNodes int
// rebalanceTimer controls the duration of the rebalance interval
rebalanceTimer *time.Timer
// shutdownCh is a copy of the channel in nomad.Client
shutdownCh chan struct{}
logger *log.Logger
configInfo NomadConfigInfo
// rpcAPIMismatchThrottle regulates the rate at which warning
// messages are emitted in the event of an API mismatch between the
// clients and servers.
rpcAPIMismatchThrottle map[string]time.Time
// connPoolPinger is used to test the health of a server in the
// connection pool. Pinger is an interface that wraps
// client.ConnPool.
connPoolPinger Pinger
}
// NewRPCProxy is the only way to safely create a new RPCProxy.
func NewRPCProxy(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInfo, connPoolPinger Pinger) *RPCProxy {
p := &RPCProxy{
logger: logger,
configInfo: configInfo, // can't pass *nomad.Client: import cycle
connPoolPinger: connPoolPinger, // can't pass *nomad.ConnPool: import cycle
rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration),
shutdownCh: shutdownCh,
}
l := serverList{}
l.L = make([]*ServerEndpoint, 0)
p.saveServerList(l)
return p
}
// activateEndpoint adds an endpoint to the RPCProxy's active serverList.
// Returns true if the server was added, returns false if the server already
// existed in the RPCProxy's serverList.
func (p *RPCProxy) activateEndpoint(s *ServerEndpoint) bool {
l := p.getServerList()
// Check if this server is known
found := false
for idx, existing := range l.L {
if existing.Name == s.Name {
newServers := make([]*ServerEndpoint, len(l.L))
copy(newServers, l.L)
// Overwrite the existing server details in order to
// possibly update metadata (e.g. server version)
newServers[idx] = s
l.L = newServers
found = true
break
}
}
// Add to the list if not known
if !found {
newServers := make([]*ServerEndpoint, len(l.L), len(l.L)+1)
copy(newServers, l.L)
newServers = append(newServers, s)
l.L = newServers
}
p.saveServerList(l)
return !found
}
// SetBackupServers sets a list of Nomad Servers to be used in the event that
// the Nomad Agent lost contact with the list of Nomad Servers provided via
// the Nomad Agent's heartbeat. If available, the backup servers are
// populated via Consul.
func (p *RPCProxy) SetBackupServers(addrs []string) error {
l := make([]*ServerEndpoint, 0, len(addrs))
for _, s := range addrs {
s, err := NewServerEndpoint(s)
if err != nil {
p.logger.Printf("[WARN] client.rpcproxy: unable to create backup server %+q: %v", s, err)
return fmt.Errorf("unable to create new backup server from %+q: %v", s, err)
}
l = append(l, s)
}
p.serverListLock.Lock()
p.backupServers.L = l
p.serverListLock.Unlock()
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
for _, s := range l {
p.activateEndpoint(s)
}
return nil
}
// AddPrimaryServer takes the RPC address of a Nomad server, creates a new
// endpoint, and adds it to both the primaryServers list and the active
// serverList used in the RPC Proxy. If the endpoint is not known by the
// RPCProxy, appends the endpoint to the list. The new endpoint will begin
// seeing use after the rebalance timer fires (or enough servers fail
// organically). Any values in the primary server list are overridden by the
// next successful heartbeat.
func (p *RPCProxy) AddPrimaryServer(rpcAddr string) *ServerEndpoint {
s, err := NewServerEndpoint(rpcAddr)
if err != nil {
p.logger.Printf("[WARN] client.rpcproxy: unable to create new primary server from endpoint %+q: %v", rpcAddr, err)
return nil
}
k := s.Key()
p.serverListLock.Lock()
if serverExists := p.primaryServers.serverExistByKey(k); serverExists {
p.serverListLock.Unlock()
return s
}
p.primaryServers.L = append(p.primaryServers.L, s)
p.serverListLock.Unlock()
p.activatedListLock.Lock()
p.activateEndpoint(s)
p.activatedListLock.Unlock()
return s
}
// cycleServers returns a new list of servers that has dequeued the first
// server and enqueued it at the end of the list. cycleServers assumes the
// caller is holding the listLock. cycleServer does not test or ping
// the next server inline. cycleServer may be called when the environment
// has just entered an unhealthy situation and blocking on a server test is
// less desirable than just returning the next server in the firing line. If
// the next server fails, it will fail fast enough and cycleServer will be
// called again.
func (l *serverList) cycleServer() (servers []*ServerEndpoint) {
numServers := len(l.L)
if numServers < 2 {
return servers // No action required
}
newServers := make([]*ServerEndpoint, 0, numServers)
newServers = append(newServers, l.L[1:]...)
newServers = append(newServers, l.L[0])
return newServers
}
// serverExistByKey performs a search to see if a server exists in the
// serverList. Assumes the caller is holding at least a read lock.
func (l *serverList) serverExistByKey(targetKey *EndpointKey) bool {
var found bool
for _, server := range l.L {
if targetKey.Equal(server.Key()) {
found = true
}
}
return found
}
// removeServerByKey performs an inline removal of the first matching server
func (l *serverList) removeServerByKey(targetKey *EndpointKey) {
for i, s := range l.L {
if targetKey.Equal(s.Key()) {
copy(l.L[i:], l.L[i+1:])
l.L[len(l.L)-1] = nil
l.L = l.L[:len(l.L)-1]
return
}
}
}
// shuffleServers shuffles the server list in place
func (l *serverList) shuffleServers() {
for i := len(l.L) - 1; i > 0; i-- {
j := rand.Int31n(int32(i + 1))
l.L[i], l.L[j] = l.L[j], l.L[i]
}
}
// String returns a string representation of serverList
func (l *serverList) String() string {
if len(l.L) == 0 {
return fmt.Sprintf("empty server list")
}
serverStrs := make([]string, 0, len(l.L))
for _, server := range l.L {
serverStrs = append(serverStrs, server.String())
}
return fmt.Sprintf("[%s]", strings.Join(serverStrs, ", "))
}
// FindServer takes out an internal "read lock" and searches through the list
// of servers to find a "healthy" server. If the server is actually
// unhealthy, we rely on heartbeats to detect this and remove the node from
// the server list. If the server at the front of the list has failed or
// fails during an RPC call, it is rotated to the end of the list. If there
// are no servers available, return nil.
func (p *RPCProxy) FindServer() *ServerEndpoint {
l := p.getServerList()
numServers := len(l.L)
if numServers == 0 {
p.logger.Printf("[WARN] client.rpcproxy: No servers available")
return nil
}
// Return whatever is at the front of the list because it is
// assumed to be the oldest in the server list (unless -
// hypothetically - the server list was rotated right after a
// server was added).
return l.L[0]
}
// getServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (p *RPCProxy) getServerList() serverList {
return p.activatedList.Load().(serverList)
}
// saveServerList is a convenience method which hides the locking semantics
// of atomic.Value from the caller.
func (p *RPCProxy) saveServerList(l serverList) {
p.activatedList.Store(l)
}
// LeaderAddr returns the current leader address. If an empty string, then
// the Nomad Server for this Nomad Agent is in the minority or the Nomad
// Servers are in the middle of an election.
func (p *RPCProxy) LeaderAddr() string {
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
return p.leaderAddr
}
// NotifyFailedServer marks the passed in server as "failed" by rotating it
// to the end of the server list.
func (p *RPCProxy) NotifyFailedServer(s *ServerEndpoint) {
l := p.getServerList()
// If the server being failed is not the first server on the list,
// this is a noop. If, however, the server is failed and first on
// the list, acquire the lock, retest, and take the penalty of moving
// the server to the end of the list.
// Only rotate the server list when there is more than one server
if len(l.L) > 1 && l.L[0] == s {
// Grab a lock, retest, and take the hit of cycling the first
// server to the end.
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
l = p.getServerList()
if len(l.L) > 1 && l.L[0] == s {
l.L = l.cycleServer()
p.saveServerList(l)
}
}
}
// NumNodes returns the estimated number of nodes according to the last Nomad
// Heartbeat.
func (p *RPCProxy) NumNodes() int {
return p.numNodes
}
// NumServers takes out an internal "read lock" and returns the number of
// servers. numServers includes both healthy and unhealthy servers.
func (p *RPCProxy) NumServers() int {
l := p.getServerList()
return len(l.L)
}
// RebalanceServers shuffles the list of servers on this agent. The server
// at the front of the list is selected for the next RPC. RPC calls that
// fail for a particular server are rotated to the end of the list. This
// method reshuffles the list periodically in order to redistribute work
// across all known Nomad servers (i.e. guarantee that the order of servers
// in the server list is not positively correlated with the age of a server
// in the Nomad cluster). Periodically shuffling the server list prevents
// long-lived clients from fixating on long-lived servers.
//
// Unhealthy servers are removed from the server list during the next client
// heartbeat. Before the newly shuffled server list is saved, the new remote
// endpoint is tested to ensure its responsive.
func (p *RPCProxy) RebalanceServers() {
var serverListLocked bool
p.serverListLock.Lock()
serverListLocked = true
defer func() {
if serverListLocked {
p.serverListLock.Unlock()
}
}()
// Early abort if there is nothing to shuffle
if (len(p.primaryServers.L) + len(p.backupServers.L)) < 2 {
return
}
// Shuffle server lists independently
p.primaryServers.shuffleServers()
p.backupServers.shuffleServers()
// Create a new merged serverList
type targetServer struct {
server *ServerEndpoint
// 'p' == Primary Server
// 's' == Secondary/Backup Server
// 'b' == Both
state byte
}
mergedList := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(p.backupServers.L))
for _, s := range p.primaryServers.L {
mergedList[*s.Key()] = &targetServer{server: s, state: 'p'}
}
for _, s := range p.backupServers.L {
k := s.Key()
_, found := mergedList[*k]
if found {
mergedList[*k].state = 'b'
} else {
mergedList[*k] = &targetServer{server: s, state: 's'}
}
}
l := &serverList{L: make([]*ServerEndpoint, 0, len(mergedList))}
for _, s := range p.primaryServers.L {
l.L = append(l.L, s)
}
for _, v := range mergedList {
if v.state != 's' {
continue
}
l.L = append(l.L, v.server)
}
// Release the lock before we begin transition to operations on the
// network timescale and attempt to ping servers. A copy of the
// servers has been made at this point.
p.serverListLock.Unlock()
serverListLocked = false
// Iterate through the shuffled server list to find an assumed
// healthy server. NOTE: Do not iterate on the list directly because
// this loop mutates the server list in-place.
var foundHealthyServer bool
for i := 0; i < len(l.L); i++ {
// Always test the first server. Failed servers are cycled
// and eventually removed from the list when Nomad heartbeats
// detect the failed node.
selectedServer := l.L[0]
ok, err := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), selectedServer)
if ok {
foundHealthyServer = true
break
}
p.logger.Printf(`[DEBUG] client.rpcproxy: pinging server "%s" failed: %s`, selectedServer.String(), err)
l.cycleServer()
}
// If no healthy servers were found, sleep and wait for the admin to
// join this node to a server and begin receiving heartbeats with an
// updated list of Nomad servers. Or Consul will begin advertising a
// new server in the nomad service (Nomad server service).
if !foundHealthyServer {
p.logger.Printf("[DEBUG] client.rpcproxy: No healthy servers during rebalance, aborting")
return
}
// Verify that all servers are present. Reconcile will save the
// final serverList.
if p.reconcileServerList(l) {
p.logger.Printf("[TRACE] client.rpcproxy: Rebalanced %d servers, next active server is %s", len(l.L), l.L[0].String())
} else {
// reconcileServerList failed because Nomad removed the
// server that was at the front of the list that had
// successfully been Ping'ed. Between the Ping and
// reconcile, a Nomad heartbeat removed the node.
//
// Instead of doing any heroics, "freeze in place" and
// continue to use the existing connection until the next
// rebalance occurs.
}
return
}
// reconcileServerList returns true when the first server in serverList
// (l) exists in the receiver's serverList (p). If true, the merged
// serverList (l) is stored as the receiver's serverList (p). Returns
// false if the first server in p does not exist in the passed in list (l)
// (i.e. was removed by Nomad during a PingNomadServer() call. Newly added
// servers are appended to the list and other missing servers are removed
// from the list.
func (p *RPCProxy) reconcileServerList(l *serverList) bool {
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
// newServerList is a serverList that has been kept up-to-date with
// join and leave events.
newServerList := p.getServerList()
// If a Nomad heartbeat removed all nodes, or there is no selected
// server (zero nodes in serverList), abort early.
if len(newServerList.L) == 0 || len(l.L) == 0 {
return false
}
type targetServer struct {
server *ServerEndpoint
// 'b' == both
// 'o' == original
// 'n' == new
state byte
}
mergedList := make(map[EndpointKey]*targetServer, len(l.L))
for _, s := range l.L {
mergedList[*s.Key()] = &targetServer{server: s, state: 'o'}
}
for _, s := range newServerList.L {
k := s.Key()
_, found := mergedList[*k]
if found {
mergedList[*k].state = 'b'
} else {
mergedList[*k] = &targetServer{server: s, state: 'n'}
}
}
// Ensure the selected server has not been removed by a heartbeat
selectedServerKey := l.L[0].Key()
if v, found := mergedList[*selectedServerKey]; found && v.state == 'o' {
return false
}
// Append any new servers and remove any old servers
for k, v := range mergedList {
switch v.state {
case 'b':
// Do nothing, server exists in both
case 'o':
// Server has been removed
l.removeServerByKey(&k)
case 'n':
// Server added
l.L = append(l.L, v.server)
default:
panic("unknown merge list state")
}
}
p.saveServerList(*l)
return true
}
// RemoveServer takes out an internal write lock and removes a server from
// the activated server list.
func (p *RPCProxy) RemoveServer(s *ServerEndpoint) {
// Lock hierarchy protocol dictates serverListLock is acquired first.
p.serverListLock.Lock()
defer p.serverListLock.Unlock()
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
l := p.getServerList()
k := s.Key()
l.removeServerByKey(k)
p.saveServerList(l)
p.primaryServers.removeServerByKey(k)
p.backupServers.removeServerByKey(k)
}
// refreshServerRebalanceTimer is only called once p.rebalanceTimer expires.
func (p *RPCProxy) refreshServerRebalanceTimer() time.Duration {
l := p.getServerList()
numServers := len(l.L)
// Limit this connection's life based on the size (and health) of the
// cluster. Never rebalance a connection more frequently than
// connReuseLowWatermarkDuration, and make sure we never exceed
// clusterWideRebalanceConnsPerSec operations/s across numLANMembers.
clusterWideRebalanceConnsPerSec := float64(numServers * newRebalanceConnsPerSecPerServer)
connReuseLowWatermarkDuration := clientRPCMinReuseDuration + lib.RandomStagger(clientRPCMinReuseDuration/clientRPCJitterFraction)
numLANMembers := p.numNodes
connRebalanceTimeout := lib.RateScaledInterval(clusterWideRebalanceConnsPerSec, connReuseLowWatermarkDuration, numLANMembers)
p.rebalanceTimer.Reset(connRebalanceTimeout)
return connRebalanceTimeout
}
// ResetRebalanceTimer resets the rebalance timer. This method exists for
// testing and should not be used directly.
func (p *RPCProxy) ResetRebalanceTimer() {
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
p.rebalanceTimer.Reset(clientRPCMinReuseDuration)
}
// ServerRPCAddrs returns one RPC Address per server
func (p *RPCProxy) ServerRPCAddrs() []string {
l := p.getServerList()
serverAddrs := make([]string, 0, len(l.L))
for _, s := range l.L {
serverAddrs = append(serverAddrs, s.Addr.String())
}
return serverAddrs
}
// Run is used to start and manage the task of automatically shuffling and
// rebalancing the list of Nomad servers. This maintenance only happens
// periodically based on the expiration of the timer. Failed servers are
// automatically cycled to the end of the list. New servers are appended to
// the list. The order of the server list must be shuffled periodically to
// distribute load across all known and available Nomad servers.
func (p *RPCProxy) Run() {
for {
select {
case <-p.rebalanceTimer.C:
p.RebalanceServers()
p.refreshServerRebalanceTimer()
case <-p.shutdownCh:
p.logger.Printf("[INFO] client.rpcproxy: shutting down")
return
}
}
}
// RefreshServerLists is called when the Client receives an update from a
// Nomad Server. The response from Nomad Client Heartbeats contain a list of
// Nomad Servers that the Nomad Client should use for RPC requests.
// RefreshServerLists does not rebalance its serverLists (that is handled
// elsewhere via a periodic timer). New Nomad Servers learned via the
// heartbeat are appended to the RPCProxy's activated serverList. Servers
// that are no longer present in the Heartbeat are removed immediately from
// all server lists. Nomad Servers speaking a newer major or minor API
// version are filtered from the serverList.
func (p *RPCProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRPCAddr string) error {
// Merge all servers found in the response. Servers in the response
// with newer API versions are filtered from the list. If the list
// is missing an address found in the RPCProxy's server list, remove
// it from the RPCProxy.
p.serverListLock.Lock()
defer p.serverListLock.Unlock()
// Clear the backup server list when a heartbeat contains at least
// one server.
if len(servers) > 0 && len(p.backupServers.L) > 0 {
p.backupServers.L = make([]*ServerEndpoint, 0, len(servers))
}
// 1) Create a map to reconcile the difference between
// p.primaryServers and servers.
type targetServer struct {
server *ServerEndpoint
// 'b' == both
// 'o' == original
// 'n' == new
state byte
}
mergedPrimaryMap := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(servers))
numOldServers := 0
for _, s := range p.primaryServers.L {
mergedPrimaryMap[*s.Key()] = &targetServer{server: s, state: 'o'}
numOldServers++
}
numBothServers := 0
var newServers bool
for _, s := range servers {
// Filter out servers using a newer API version. Prevent
// spamming the logs every heartbeat.
//
// TODO(sean@): Move the logging throttle logic into a
// dedicated logging package so RPCProxy does not have to
// perform this accounting.
if int32(p.configInfo.RPCMajorVersion()) < s.RPCMajorVersion ||
(int32(p.configInfo.RPCMajorVersion()) == s.RPCMajorVersion &&
int32(p.configInfo.RPCMinorVersion()) < s.RPCMinorVersion) {
now := time.Now()
t, ok := p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr]
if ok && t.After(now) {
continue
}
p.logger.Printf("[WARN] client.rpcproxy: API mismatch between client version (v%d.%d) and server version (v%d.%d), ignoring server %+q", p.configInfo.RPCMajorVersion(), p.configInfo.RPCMinorVersion(), s.RPCMajorVersion, s.RPCMinorVersion, s.RPCAdvertiseAddr)
p.rpcAPIMismatchThrottle[s.RPCAdvertiseAddr] = now.Add(rpcAPIMismatchLogRate)
continue
}
server, err := NewServerEndpoint(s.RPCAdvertiseAddr)
if err != nil {
p.logger.Printf("[WARN] client.rpcproxy: Unable to create a server from %+q: %v", s.RPCAdvertiseAddr, err)
continue
}
// Nomad servers in different datacenters are automatically
// added to the backup server list.
if s.Datacenter != p.configInfo.Datacenter() {
p.backupServers.L = append(p.backupServers.L, server)
continue
}
k := server.Key()
_, found := mergedPrimaryMap[*k]
if found {
mergedPrimaryMap[*k].state = 'b'
numBothServers++
} else {
mergedPrimaryMap[*k] = &targetServer{server: server, state: 'n'}
newServers = true
}
}
// Short-circuit acquiring listLock if nothing changed
if !newServers && numOldServers == numBothServers {
return nil
}
p.activatedListLock.Lock()
defer p.activatedListLock.Unlock()
newServerCfg := p.getServerList()
for k, v := range mergedPrimaryMap {
switch v.state {
case 'b':
// Do nothing, server exists in both
case 'o':
// Server has been removed
// TODO(sean@): Teach Nomad servers how to remove
// themselves from their heartbeat in order to
// gracefully drain their clients over the next
// cluster's max rebalanceTimer duration. Without
// this enhancement, if a server being shutdown and
// it is the first in serverList, the client will
// fail its next RPC connection.
p.primaryServers.removeServerByKey(&k)
newServerCfg.removeServerByKey(&k)
case 'n':
// Server added. Append it to both lists
// immediately. The server should only go into
// active use in the event of a failure or after a
// rebalance occurs.
p.primaryServers.L = append(p.primaryServers.L, v.server)
newServerCfg.L = append(newServerCfg.L, v.server)
default:
panic("unknown merge list state")
}
}
p.numNodes = int(numNodes)
p.leaderAddr = leaderRPCAddr
p.saveServerList(newServerCfg)
return nil
}

View File

@@ -1,818 +0,0 @@
package rpcproxy
import (
"bytes"
"encoding/binary"
"fmt"
"log"
"math/rand"
"net"
"os"
"strings"
"sync/atomic"
"testing"
"time"
)
const (
ipv4len = 4
nodeNameFmt = "s%03d"
defaultNomadPort = "4647"
// Poached from RFC2544 and RFC3330
testingNetworkCidr = "198.18.0.0/15"
testingNetworkUint32 = 3323068416
)
var (
localLogger *log.Logger
localLogBuffer *bytes.Buffer
serverCount uint32
validIp uint32
)
func init() {
localLogBuffer = new(bytes.Buffer)
localLogger = log.New(localLogBuffer, "", 0)
}
func makeServerEndpointName() string {
serverNum := atomic.AddUint32(&serverCount, 1)
validIp := testingNetworkUint32 + serverNum
ipv4 := make(net.IP, ipv4len)
binary.BigEndian.PutUint32(ipv4, validIp)
return net.JoinHostPort(ipv4.String(), defaultNomadPort)
}
func GetBufferedLogger() *log.Logger {
return localLogger
}
type fauxConnPool struct {
// failPct between 0.0 and 1.0 == pct of time a Ping should fail
failPct float64
}
func (cp *fauxConnPool) PingNomadServer(region string, majorVersion int, s *ServerEndpoint) (bool, error) {
var success bool
successProb := rand.Float64()
if successProb > cp.failPct {
success = true
}
return success, nil
}
type fauxSerf struct {
datacenter string
numNodes int
region string
rpcMinorVersion int
rpcMajorVersion int
}
func (s *fauxSerf) NumNodes() int {
return s.numNodes
}
func (s *fauxSerf) Region() string {
return s.region
}
func (s *fauxSerf) Datacenter() string {
return s.datacenter
}
func (s *fauxSerf) RPCMajorVersion() int {
return s.rpcMajorVersion
}
func (s *fauxSerf) RPCMinorVersion() int {
return s.rpcMinorVersion
}
func testRPCProxy() (p *RPCProxy) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
p = NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
return p
}
func testRPCProxyFailProb(failPct float64) (p *RPCProxy) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
p = NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
return p
}
// func (p *RPCProxy) AddPrimaryServer(server *ServerEndpoint) {
func TestRPCProxy_AddPrimaryServer(t *testing.T) {
p := testRPCProxy()
var num int
num = p.NumServers()
if num != 0 {
t.Fatalf("Expected zero servers to start")
}
s1Endpoint := makeServerEndpointName()
s1 := p.AddPrimaryServer(s1Endpoint)
num = p.NumServers()
if num != 1 {
t.Fatalf("Expected one server")
}
if s1 == nil {
t.Fatalf("bad")
}
if s1.Name != s1Endpoint {
t.Fatalf("bad")
}
s1 = p.AddPrimaryServer(s1Endpoint)
num = p.NumServers()
if num != 1 {
t.Fatalf("Expected one server (still)")
}
if s1 == nil {
t.Fatalf("bad")
}
if s1.Name != s1Endpoint {
t.Fatalf("bad")
}
s2Endpoint := makeServerEndpointName()
s2 := p.AddPrimaryServer(s2Endpoint)
num = p.NumServers()
if num != 2 {
t.Fatalf("Expected two servers")
}
if s2 == nil {
t.Fatalf("bad")
}
if s2.Name != s2Endpoint {
t.Fatalf("bad")
}
}
// func (p *RPCProxy) FindServer() (server *ServerEndpoint) {
func TestRPCProxy_FindServer(t *testing.T) {
p := testRPCProxy()
if p.FindServer() != nil {
t.Fatalf("Expected nil return")
}
s1Endpoint := makeServerEndpointName()
p.AddPrimaryServer(s1Endpoint)
if p.NumServers() != 1 {
t.Fatalf("Expected one server")
}
s1 := p.FindServer()
if s1 == nil {
t.Fatalf("Expected non-nil server")
}
if s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server")
}
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server (still)")
}
s2Endpoint := makeServerEndpointName()
p.AddPrimaryServer(s2Endpoint)
if p.NumServers() != 2 {
t.Fatalf("Expected two servers")
}
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server (still)")
}
p.NotifyFailedServer(s1)
s2 := p.FindServer()
if s2 == nil || s2.Name != s2Endpoint {
t.Fatalf("Expected s2 server")
}
p.NotifyFailedServer(s2)
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server")
}
}
// func New(logger *log.Logger, shutdownCh chan struct{}) (p *RPCProxy) {
func TestRPCProxy_New(t *testing.T) {
logger := GetBufferedLogger()
logger = log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
p := NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
if p == nil {
t.Fatalf("RPCProxy nil")
}
}
// func (p *RPCProxy) NotifyFailedServer(server *ServerEndpoint) {
func TestRPCProxy_NotifyFailedServer(t *testing.T) {
p := testRPCProxy()
if p.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
}
// Try notifying for a server that is not managed by RPCProxy
s1Endpoint := makeServerEndpointName()
s1 := p.AddPrimaryServer(s1Endpoint)
if s1 == nil {
t.Fatalf("bad")
}
if p.NumServers() != 1 {
t.Fatalf("bad")
}
p.RemoveServer(s1)
if p.NumServers() != 0 {
t.Fatalf("bad")
}
p.NotifyFailedServer(s1)
s1 = p.AddPrimaryServer(s1Endpoint)
// Test again w/ a server not in the list
s2Endpoint := makeServerEndpointName()
s2 := p.AddPrimaryServer(s2Endpoint)
if s2 == nil {
t.Fatalf("bad")
}
if p.NumServers() != 2 {
t.Fatalf("bad")
}
p.RemoveServer(s2)
if p.NumServers() != 1 {
t.Fatalf("bad")
}
p.NotifyFailedServer(s2)
if p.NumServers() != 1 {
t.Fatalf("Expected one server")
}
// Re-add s2 so there are two servers in the RPCProxy server list
s2 = p.AddPrimaryServer(s2Endpoint)
if p.NumServers() != 2 {
t.Fatalf("Expected two servers")
}
// Find the first server, it should be s1
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server")
}
// Notify s2 as failed, s1 should still be first
p.NotifyFailedServer(s2)
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server (still)")
}
// Fail s1, s2 should be first
p.NotifyFailedServer(s1)
s2 = p.FindServer()
if s2 == nil || s2.Name != s2Endpoint {
t.Fatalf("Expected s2 server")
}
// Fail s2, s1 should be first
p.NotifyFailedServer(s2)
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server")
}
}
// func (p *RPCProxy) NumServers() (numServers int) {
func TestRPCProxy_NumServers(t *testing.T) {
p := testRPCProxy()
const maxNumServers = 100
serverList := make([]*ServerEndpoint, 0, maxNumServers)
// Add some servers
for i := 0; i < maxNumServers; i++ {
num := p.NumServers()
if num != i {
t.Fatalf("%d: Expected %d servers", i, num)
}
serverName := makeServerEndpointName()
s := p.AddPrimaryServer(serverName)
if s == nil {
t.Fatalf("Expected server from %+q", serverName)
}
serverList = append(serverList, s)
num = p.NumServers()
if num != i+1 {
t.Fatalf("%d: Expected %d servers", i, num+1)
}
}
// Remove some servers
for i := maxNumServers; i > 0; i-- {
num := p.NumServers()
if num != i {
t.Fatalf("%d: Expected %d servers", i, num)
}
p.RemoveServer(serverList[i-1])
num = p.NumServers()
if num != i-1 {
t.Fatalf("%d: Expected %d servers", i, num-1)
}
}
}
// func (p *RPCProxy) RebalanceServers() {
func TestRPCProxy_RebalanceServers(t *testing.T) {
const failPct = 0.5
p := testRPCProxyFailProb(failPct)
const maxServers = 100
const numShuffleTests = 100
const uniquePassRate = 0.5
// Make a huge list of nodes.
for i := 0; i < maxServers; i++ {
p.AddPrimaryServer(makeServerEndpointName())
}
// Keep track of how many unique shuffles we get.
uniques := make(map[string]struct{}, maxServers)
for i := 0; i < numShuffleTests; i++ {
p.RebalanceServers()
var names []string
for j := 0; j < maxServers; j++ {
server := p.FindServer()
p.NotifyFailedServer(server)
names = append(names, server.Name)
}
key := strings.Join(names, "|")
uniques[key] = struct{}{}
}
// We have to allow for the fact that there won't always be a unique
// shuffle each pass, so we just look for smell here without the test
// being flaky.
if len(uniques) < int(maxServers*uniquePassRate) {
t.Fatalf("unique shuffle ratio too low: %d/%d", len(uniques), maxServers)
}
}
// func (p *RPCProxy) RemoveServer(server *ServerEndpoint) {
func TestRPCProxy_RemoveServer(t *testing.T) {
p := testRPCProxy()
if p.NumServers() != 0 {
t.Fatalf("Expected zero servers to start")
}
// Test removing server before its added
s1Endpoint := makeServerEndpointName()
s1 := p.AddPrimaryServer(s1Endpoint)
if p.NumServers() != 1 {
t.Fatalf("bad")
}
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server: %+q", s1.Name)
}
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 server: %+q", s1.Name)
}
p.RemoveServer(s1)
if p.NumServers() != 0 {
t.Fatalf("bad")
}
// Remove it a second time now that it doesn't exist
p.RemoveServer(s1)
if p.NumServers() != 0 {
t.Fatalf("bad")
}
p.AddPrimaryServer(s1Endpoint)
if p.NumServers() != 1 {
t.Fatalf("bad")
}
s2Endpoint := makeServerEndpointName()
s2 := p.AddPrimaryServer(s2Endpoint)
if p.NumServers() != 2 {
t.Fatalf("bad")
}
if s2 == nil || s2.Name != s2Endpoint {
t.Fatalf("Expected s2 server: %+q", s2.Name)
}
s1 = p.FindServer()
if s1 == nil || s1.Name != s1Endpoint {
t.Fatalf("Expected s1 to be the front of the list: %+q==%+q", s1.Name, s1Endpoint)
}
// Move s1 to the back of the server list
p.NotifyFailedServer(s1)
s2 = p.FindServer()
if s2 == nil || s2.Name != s2Endpoint {
t.Fatalf("Expected s2 server: %+q", s2Endpoint)
}
p.RemoveServer(s2)
if p.NumServers() != 1 {
t.Fatalf("bad")
}
p.RemoveServer(s2)
if p.NumServers() != 1 {
t.Fatalf("bad")
}
p.AddPrimaryServer(s2Endpoint)
const maxServers = 19
servers := make([]*ServerEndpoint, 0, maxServers)
servers = append(servers, s1)
servers = append(servers, s2)
// Already added two servers above
for i := maxServers; i > 2; i-- {
server := p.AddPrimaryServer(makeServerEndpointName())
servers = append(servers, server)
}
if p.NumServers() != maxServers {
t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers())
}
p.RebalanceServers()
if p.NumServers() != maxServers {
t.Fatalf("Expected %d servers, received %d", maxServers, p.NumServers())
}
findServer := func(server *ServerEndpoint) bool {
for i := p.NumServers(); i > 0; i-- {
s := p.FindServer()
if s == server {
return true
}
}
return false
}
expectedNumServers := maxServers
removedServers := make([]*ServerEndpoint, 0, maxServers)
// Remove servers from the front of the list
for i := 3; i > 0; i-- {
server := p.FindServer()
if server == nil {
t.Fatalf("FindServer returned nil")
}
p.RemoveServer(server)
expectedNumServers--
if p.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s after removal from the front", server.Name)
}
removedServers = append(removedServers, server)
}
// Remove server from the end of the list
for i := 3; i > 0; i-- {
server := p.FindServer()
p.NotifyFailedServer(server)
p.RemoveServer(server)
expectedNumServers--
if p.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name)
}
removedServers = append(removedServers, server)
}
// Remove server from the middle of the list
for i := 3; i > 0; i-- {
server := p.FindServer()
p.NotifyFailedServer(server)
server2 := p.FindServer()
p.NotifyFailedServer(server2) // server2 now at end of the list
p.RemoveServer(server)
expectedNumServers--
if p.NumServers() != expectedNumServers {
t.Fatalf("Expected %d servers (got %d)", expectedNumServers, p.NumServers())
}
if findServer(server) == true {
t.Fatalf("Did not expect to find server %s", server.Name)
}
removedServers = append(removedServers, server)
}
if p.NumServers()+len(removedServers) != maxServers {
t.Fatalf("Expected %d+%d=%d servers", p.NumServers(), len(removedServers), maxServers)
}
// Drain the remaining servers from the middle
for i := p.NumServers(); i > 0; i-- {
server := p.FindServer()
p.NotifyFailedServer(server)
server2 := p.FindServer()
p.NotifyFailedServer(server2) // server2 now at end of the list
p.RemoveServer(server)
removedServers = append(removedServers, server)
}
if p.NumServers() != 0 {
t.Fatalf("Expected an empty server list")
}
if len(removedServers) != maxServers {
t.Fatalf("Expected all servers to be in removed server list")
}
}
// func (p *RPCProxy) Start() {
// func (l *serverList) cycleServer() (servers []*Server) {
func TestRPCProxyInternal_cycleServer(t *testing.T) {
p := testRPCProxy()
l := p.getServerList()
server0 := &ServerEndpoint{Name: "server1"}
server1 := &ServerEndpoint{Name: "server2"}
server2 := &ServerEndpoint{Name: "server3"}
l.L = append(l.L, server0, server1, server2)
p.saveServerList(l)
l = p.getServerList()
if len(l.L) != 3 {
t.Fatalf("server length incorrect: %d/3", len(l.L))
}
if l.L[0] != server0 &&
l.L[1] != server1 &&
l.L[2] != server2 {
t.Fatalf("initial server ordering not correct")
}
l.L = l.cycleServer()
if len(l.L) != 3 {
t.Fatalf("server length incorrect: %d/3", len(l.L))
}
if l.L[0] != server1 &&
l.L[1] != server2 &&
l.L[2] != server0 {
t.Fatalf("server ordering after one cycle not correct")
}
l.L = l.cycleServer()
if len(l.L) != 3 {
t.Fatalf("server length incorrect: %d/3", len(l.L))
}
if l.L[0] != server2 &&
l.L[1] != server0 &&
l.L[2] != server1 {
t.Fatalf("server ordering after two cycles not correct")
}
l.L = l.cycleServer()
if len(l.L) != 3 {
t.Fatalf("server length incorrect: %d/3", len(l.L))
}
if l.L[0] != server0 &&
l.L[1] != server1 &&
l.L[2] != server2 {
t.Fatalf("server ordering after three cycles not correct")
}
}
// func (p *RPCProxy) getServerList() serverList {
func TestRPCProxyInternal_getServerList(t *testing.T) {
p := testRPCProxy()
l := p.getServerList()
if l.L == nil {
t.Fatalf("serverList.servers nil")
}
if len(l.L) != 0 {
t.Fatalf("serverList.servers length not zero")
}
}
func TestRPCProxyInternal_New(t *testing.T) {
p := testRPCProxy()
if p == nil {
t.Fatalf("bad")
}
if p.logger == nil {
t.Fatalf("bad")
}
if p.shutdownCh == nil {
t.Fatalf("bad")
}
}
// func (p *RPCProxy) reconcileServerList(l *serverList) bool {
func TestRPCProxyInternal_reconcileServerList(t *testing.T) {
tests := []int{0, 1, 2, 3, 4, 5, 10, 100}
for _, n := range tests {
ok, err := test_reconcileServerList(n)
if !ok {
t.Errorf("Expected %d to pass: %v", n, err)
}
}
}
func test_reconcileServerList(maxServers int) (bool, error) {
// Build a server list, reconcile, verify the missing servers are
// missing, the added have been added, and the original server is
// present.
const failPct = 0.5
p := testRPCProxyFailProb(failPct)
var failedServers, healthyServers []*ServerEndpoint
for i := 0; i < maxServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
node := &ServerEndpoint{Name: nodeName}
// Add 66% of servers to RPCProxy
if rand.Float64() > 0.33 {
p.activateEndpoint(node)
// Of healthy servers, (ab)use connPoolPinger to
// failPct of the servers for the reconcile. This
// allows for the selected server to no longer be
// healthy for the reconcile below.
if ok, _ := p.connPoolPinger.PingNomadServer(p.configInfo.Region(), p.configInfo.RPCMajorVersion(), node); ok {
// Will still be present
healthyServers = append(healthyServers, node)
} else {
// Will be missing
failedServers = append(failedServers, node)
}
} else {
// Will be added from the call to reconcile
healthyServers = append(healthyServers, node)
}
}
// Randomize RPCProxy's server list
p.RebalanceServers()
selectedServer := p.FindServer()
var selectedServerFailed bool
for _, s := range failedServers {
if selectedServer.Key().Equal(s.Key()) {
selectedServerFailed = true
break
}
}
// Update RPCProxy's server list to be "healthy" based on Serf.
// Reconcile this with origServers, which is shuffled and has a live
// connection, but possibly out of date.
origServers := p.getServerList()
p.saveServerList(serverList{L: healthyServers})
// This should always succeed with non-zero server lists
if !selectedServerFailed && !p.reconcileServerList(&origServers) &&
len(p.getServerList().L) != 0 &&
len(origServers.L) != 0 {
// If the random gods are unfavorable and we end up with zero
// length lists, expect things to fail and retry the test.
return false, fmt.Errorf("Expected reconcile to succeed: %v %d %d",
selectedServerFailed,
len(p.getServerList().L),
len(origServers.L))
}
// If we have zero-length server lists, test succeeded in degenerate
// case.
if len(p.getServerList().L) == 0 &&
len(origServers.L) == 0 {
// Failed as expected w/ zero length list
return true, nil
}
resultingServerMap := make(map[EndpointKey]bool)
for _, s := range p.getServerList().L {
resultingServerMap[*s.Key()] = true
}
// Test to make sure no failed servers are in the RPCProxy's
// list. Error if there are any failedServers in l.servers
for _, s := range failedServers {
_, ok := resultingServerMap[*s.Key()]
if ok {
return false, fmt.Errorf("Found failed server %v in merged list %v", s, resultingServerMap)
}
}
// Test to make sure all healthy servers are in the healthy list.
if len(healthyServers) != len(p.getServerList().L) {
return false, fmt.Errorf("Expected healthy map and servers to match: %d/%d", len(healthyServers), len(healthyServers))
}
// Test to make sure all healthy servers are in the resultingServerMap list.
for _, s := range healthyServers {
_, ok := resultingServerMap[*s.Key()]
if !ok {
return false, fmt.Errorf("Server %v missing from healthy map after merged lists", s)
}
}
return true, nil
}
// func (l *serverList) refreshServerRebalanceTimer() {
func TestRPCProxyInternal_refreshServerRebalanceTimer(t *testing.T) {
type clusterSizes struct {
numNodes int
numServers int
minRebalance time.Duration
}
clusters := []clusterSizes{
{0, 3, 10 * time.Minute},
{1, 0, 10 * time.Minute}, // partitioned cluster
{1, 3, 10 * time.Minute},
{2, 3, 10 * time.Minute},
{100, 0, 10 * time.Minute}, // partitioned
{100, 1, 10 * time.Minute}, // partitioned
{100, 3, 10 * time.Minute},
{1024, 1, 10 * time.Minute}, // partitioned
{1024, 3, 10 * time.Minute}, // partitioned
{1024, 5, 10 * time.Minute},
{16384, 1, 10 * time.Minute}, // partitioned
{16384, 2, 10 * time.Minute}, // partitioned
{16384, 3, 10 * time.Minute}, // partitioned
{16384, 5, 10 * time.Minute},
{65535, 0, 10 * time.Minute}, // partitioned
{65535, 1, 10 * time.Minute}, // partitioned
{65535, 2, 10 * time.Minute}, // partitioned
{65535, 3, 10 * time.Minute}, // partitioned
{65535, 5, 10 * time.Minute}, // partitioned
{65535, 7, 10 * time.Minute},
{1000000, 1, 10 * time.Minute}, // partitioned
{1000000, 2, 10 * time.Minute}, // partitioned
{1000000, 3, 10 * time.Minute}, // partitioned
{1000000, 5, 10 * time.Minute}, // partitioned
{1000000, 11, 10 * time.Minute}, // partitioned
{1000000, 19, 10 * time.Minute},
}
logger := log.New(os.Stderr, "", log.LstdFlags)
shutdownCh := make(chan struct{})
for i, s := range clusters {
p := NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
for i := 0; i < s.numServers; i++ {
nodeName := fmt.Sprintf("s%02d", i)
p.activateEndpoint(&ServerEndpoint{Name: nodeName})
}
d := p.refreshServerRebalanceTimer()
if d < s.minRebalance {
t.Errorf("[%d] duration too short for cluster of size %d and %d servers (%s < %s)", i, s.numNodes, s.numServers, d, s.minRebalance)
}
}
}
// func (p *RPCProxy) saveServerList(l serverList) {
func TestRPCProxyInternal_saveServerList(t *testing.T) {
p := testRPCProxy()
// Initial condition
func() {
l := p.getServerList()
if len(l.L) != 0 {
t.Fatalf("RPCProxy.saveServerList failed to load init config")
}
newServer := new(ServerEndpoint)
l.L = append(l.L, newServer)
p.saveServerList(l)
}()
// Test that save works
func() {
l1 := p.getServerList()
t1NumServers := len(l1.L)
if t1NumServers != 1 {
t.Fatalf("RPCProxy.saveServerList failed to save mutated config")
}
}()
// Verify mutation w/o a save doesn't alter the original
func() {
newServer := new(ServerEndpoint)
l := p.getServerList()
l.L = append(l.L, newServer)
l_orig := p.getServerList()
origNumServers := len(l_orig.L)
if origNumServers >= len(l.L) {
t.Fatalf("RPCProxy.saveServerList unsaved config overwrote original")
}
}()
}

View File

@@ -1,84 +0,0 @@
package rpcproxy
import (
"fmt"
"net"
"strings"
)
const (
defaultNomadRPCPort = "4647"
)
// EndpointKey is used in maps and for equality tests. A key is based on endpoints.
type EndpointKey struct {
name string
}
// Equal compares two EndpointKey objects
func (k *EndpointKey) Equal(x *EndpointKey) bool {
return k.name == x.name
}
// ServerEndpoint contains the address information for to connect to a Nomad
// server.
//
// TODO(sean@): Server is stubbed out so that in the future it can hold a
// reference to Node (and ultimately Node.ID).
type ServerEndpoint struct {
// Name is the unique lookup key for a Server instance
Name string
Host string
Port string
Addr net.Addr
}
// Key returns the corresponding Key
func (s *ServerEndpoint) Key() *EndpointKey {
return &EndpointKey{
name: s.Name,
}
}
// NewServerEndpoint creates a new Server instance with a resolvable
// endpoint. `name` can be either an IP address or a DNS name. If `name` is
// a DNS name, it must be resolvable to an IP address (most inputs are IP
// addresses, not DNS names, but both work equally well when the name is
// resolvable).
func NewServerEndpoint(name string) (*ServerEndpoint, error) {
s := &ServerEndpoint{
Name: name,
}
var host, port string
var err error
host, port, err = net.SplitHostPort(name)
if err == nil {
s.Host = host
s.Port = port
} else {
if strings.Contains(err.Error(), "missing port") {
s.Host = name
s.Port = defaultNomadRPCPort
} else {
return nil, err
}
}
if s.Addr, err = net.ResolveTCPAddr("tcp", net.JoinHostPort(s.Host, s.Port)); err != nil {
return nil, err
}
return s, err
}
// String returns a string representation of Server
func (s *ServerEndpoint) String() string {
var addrStr, networkStr string
if s.Addr != nil {
addrStr = s.Addr.String()
networkStr = s.Addr.Network()
}
return fmt.Sprintf("%s (%s:%s)", s.Name, networkStr, addrStr)
}

View File

@@ -1,77 +0,0 @@
package rpcproxy
import (
"fmt"
"net"
"testing"
)
// func (k *EndpointKey) Equal(x *EndpointKey) {
func TestServerEndpointKey_Equal(t *testing.T) {
tests := []struct {
name string
s1 *ServerEndpoint
s2 *ServerEndpoint
equal bool
}{
{
name: "equal",
s1: &ServerEndpoint{Name: "k1"},
s2: &ServerEndpoint{Name: "k1"},
equal: true,
},
{
name: "not equal",
s1: &ServerEndpoint{Name: "k1"},
s2: &ServerEndpoint{Name: "k2"},
equal: false,
},
}
for _, test := range tests {
if test.s1.Key().Equal(test.s2.Key()) != test.equal {
t.Errorf("fixture %s failed forward comparison", test.name)
}
if test.s2.Key().Equal(test.s1.Key()) != test.equal {
t.Errorf("fixture %s failed reverse comparison", test.name)
}
}
}
// func (k *ServerEndpoint) String() {
func TestServerEndpoint_String(t *testing.T) {
tests := []struct {
name string
s *ServerEndpoint
str string
}{
{
name: "name",
s: &ServerEndpoint{Name: "s"},
str: "s (:)",
},
{
name: "name, host, port",
s: &ServerEndpoint{
Name: "s",
Host: "127.0.0.1",
Port: "4647",
},
str: "s (tcp:127.0.0.1:4647)",
},
}
for _, test := range tests {
if test.s.Addr == nil && (test.s.Host != "" && test.s.Port != "") {
fmt.Printf("Setting addr\n")
addr, err := net.ResolveTCPAddr("tcp", net.JoinHostPort(test.s.Host, test.s.Port))
if err == nil {
test.s.Addr = addr
}
}
if test.s.String() != test.str {
t.Errorf("fixture %q failed: %q vs %q", test.name, test.s.String(), test.str)
}
}
}

111
client/serverlist.go Normal file
View File

@@ -0,0 +1,111 @@
package client
import (
"math/rand"
"net"
"sort"
"strings"
"sync"
)
// serverlist is a prioritized randomized list of nomad servers. Users should
// call all() to retrieve the full list, followed by failed(e) on each endpoint
// that's failed and good(e) when a valid endpoint is found.
type serverlist struct {
e endpoints
mu sync.RWMutex
}
func newServerList() *serverlist {
return &serverlist{}
}
// set the server list to a new list. The new list will be shuffled and sorted
// by priority.
func (s *serverlist) set(in endpoints) {
s.mu.Lock()
s.e = in
s.mu.Unlock()
}
// all returns a copy of the full server list, shuffled and then sorted by
// priority
func (s *serverlist) all() endpoints {
s.mu.RLock()
out := make(endpoints, len(s.e))
copy(out, s.e)
s.mu.RUnlock()
// Randomize the order
for i, j := range rand.Perm(len(out)) {
out[i], out[j] = out[j], out[i]
}
// Sort by priority
sort.Sort(out)
return out
}
// failed endpoint will be deprioritized if its still in the list.
func (s *serverlist) failed(e *endpoint) {
s.mu.Lock()
defer s.mu.Unlock()
for _, cur := range s.e {
if cur.equal(e) {
cur.priority++
return
}
}
}
// good endpoint will get promoted to the highest priority if it's still in the
// list.
func (s *serverlist) good(e *endpoint) {
s.mu.Lock()
defer s.mu.Unlock()
for _, cur := range s.e {
if cur.equal(e) {
cur.priority = 0
return
}
}
}
func (e endpoints) Len() int {
return len(e)
}
func (e endpoints) Less(i int, j int) bool {
// Sort only by priority as endpoints should be shuffled and ordered
// only by priority
return e[i].priority < e[j].priority
}
func (e endpoints) Swap(i int, j int) {
e[i], e[j] = e[j], e[i]
}
type endpoints []*endpoint
func (e endpoints) String() string {
names := make([]string, 0, len(e))
for _, endpoint := range e {
names = append(names, endpoint.name)
}
return strings.Join(names, ",")
}
type endpoint struct {
name string
addr net.Addr
// 0 being the highest priority
priority int
}
// equal returns true if the name and addr match between two endpoints.
// Priority is ignored because the same endpoint may be added by discovery and
// heartbeating with different priorities.
func (e *endpoint) equal(o *endpoint) bool {
return e.name == o.name && e.addr == o.addr
}

117
client/serverlist_test.go Normal file
View File

@@ -0,0 +1,117 @@
package client
import (
"log"
"os"
"strings"
"testing"
)
func TestServerList(t *testing.T) {
s := newServerList()
// New lists should be empty
if e := s.all(); len(e) != 0 {
t.Fatalf("expected empty list to return an empty list, but received: %+q", e)
}
mklist := func() endpoints {
return endpoints{
&endpoint{"b", nil, 1},
&endpoint{"c", nil, 1},
&endpoint{"g", nil, 2},
&endpoint{"d", nil, 1},
&endpoint{"e", nil, 1},
&endpoint{"f", nil, 1},
&endpoint{"h", nil, 2},
&endpoint{"a", nil, 0},
}
}
s.set(mklist())
orig := mklist()
all := s.all()
if len(all) != len(orig) {
t.Fatalf("expected %d endpoints but only have %d", len(orig), len(all))
}
// Assert list is properly randomized+sorted
for i, pri := range []int{0, 1, 1, 1, 1, 1, 2, 2} {
if all[i].priority != pri {
t.Errorf("expected endpoint %d (%+q) to be priority %d", i, all[i], pri)
}
}
// Subsequent sets should reshuffle (try multiple times as they may
// shuffle in the same order)
tries := 0
max := 3
for ; tries < max; tries++ {
if s.all().String() == s.all().String() {
// eek, matched; try again in case we just got unlucky
continue
}
break
}
if tries == max {
t.Fatalf("after %d attempts servers were still not random reshuffled", tries)
}
// Mark an endpoint as failed enough that it should be at the end of the list
sa := &endpoint{"a", nil, 0}
s.failed(sa)
s.failed(sa)
s.failed(sa)
all2 := s.all()
if len(all2) != len(orig) {
t.Fatalf("marking should not have changed list length")
}
if all2[len(all)-1].name != sa.name {
t.Fatalf("failed endpoint should be at end of list: %+q", all2)
}
// But if the bad endpoint succeeds even once it should be bumped to the top group
s.good(sa)
found := false
for _, e := range s.all() {
if e.name == sa.name {
if e.priority != 0 {
t.Fatalf("server newly marked good should have highest priority")
}
found = true
}
}
if !found {
t.Fatalf("what happened to endpoint A?!")
}
}
// TestClient_ServerList tests client methods that interact with the internal
// nomad server list.
func TestClient_ServerList(t *testing.T) {
// manually create a mostly empty client to avoid spinning up a ton of
// goroutines that complicate testing
client := Client{servers: newServerList(), logger: log.New(os.Stderr, "", log.Ltime|log.Lshortfile)}
if s := client.GetServers(); len(s) != 0 {
t.Fatalf("expected server lit to be empty but found: %+q", s)
}
if err := client.SetServers(nil); err != noServersErr {
t.Fatalf("expected setting an empty list to return a 'no servers' error but received %v", err)
}
if err := client.SetServers([]string{"not-a-real-domain.fake"}); err == nil {
t.Fatalf("expected setting a bad server to return an error")
}
if err := client.SetServers([]string{"bad.fake", "127.0.0.1:1234", "127.0.0.1"}); err != nil {
t.Fatalf("expected setting at least one good server to succeed but received: %v", err)
}
s := client.GetServers()
if len(s) != 2 {
t.Fatalf("expected 2 servers but received: %+q", s)
}
for _, host := range s {
if !strings.HasPrefix(host, "127.0.0.1:") {
t.Errorf("expected both servers to be localhost and include port but found: %s", host)
}
}
}

View File

@@ -139,7 +139,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i
return nil, CodedError(501, ErrInvalidMethod)
}
peers := s.agent.client.RPCProxy().ServerRPCAddrs()
peers := s.agent.client.GetServers()
return peers, nil
}
@@ -156,12 +156,11 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
}
// Set the servers list into the client
for _, server := range servers {
s.agent.logger.Printf("[TRACE] Adding server %s to the client's primary server list", server)
se := client.AddPrimaryServerToRPCProxy(server)
if se == nil {
s.agent.logger.Printf("[ERR] Attempt to add server %q to client failed", server)
}
s.agent.logger.Printf("[TRACE] Adding servers %+q to the client's primary server list", servers)
if err := client.SetServers(servers); err != nil {
s.agent.logger.Printf("[ERR] Attempt to add servers %q to client failed: %v", servers, err)
//TODO is this the right error to return?
return nil, CodedError(400, err.Error())
}
return nil, nil
}

View File

@@ -114,13 +114,6 @@ func TestHTTP_AgentSetServers(t *testing.T) {
}
respW := httptest.NewRecorder()
// Make the request and check the result
out, err := s.Server.AgentServersRequest(respW, req)
if err != nil {
t.Fatalf("err: %s", err)
}
numServers := len(out.([]string))
// Create the request
req, err = http.NewRequest("PUT", "/v1/agent/servers", nil)
if err != nil {
@@ -135,7 +128,7 @@ func TestHTTP_AgentSetServers(t *testing.T) {
}
// Create a valid request
req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647", nil)
req, err = http.NewRequest("PUT", "/v1/agent/servers?address=127.0.0.1%3A4647&address=127.0.0.2%3A4647&address=127.0.0.3%3A4647", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@@ -158,14 +151,15 @@ func TestHTTP_AgentSetServers(t *testing.T) {
expected := map[string]bool{
"127.0.0.1:4647": true,
"127.0.0.2:4647": true,
"127.0.0.3:4647": true,
}
out, err = s.Server.AgentServersRequest(respW, req)
out, err := s.Server.AgentServersRequest(respW, req)
if err != nil {
t.Fatalf("err: %s", err)
}
servers := out.([]string)
if n := len(servers); n != numServers+2 {
t.Fatalf("expected %d servers, got: %d: %v", numServers+2, n, servers)
if n := len(servers); n != len(expected) {
t.Fatalf("expected %d servers, got: %d: %v", len(expected), n, servers)
}
received := make(map[string]bool, len(servers))
for _, server := range servers {

View File

@@ -12,7 +12,6 @@ import (
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/client/rpcproxy"
"github.com/hashicorp/yamux"
)
@@ -376,9 +375,9 @@ func (p *ConnPool) RPC(region string, addr net.Addr, version int, method string,
// PingNomadServer sends a Status.Ping message to the specified server and
// returns true if healthy, false if an error occurred
func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s *rpcproxy.ServerEndpoint) (bool, error) {
func (p *ConnPool) PingNomadServer(region string, apiMajorVersion int, s net.Addr) (bool, error) {
// Get a usable client
conn, sc, err := p.getClient(region, s.Addr, apiMajorVersion)
conn, sc, err := p.getClient(region, s, apiMajorVersion)
if err != nil {
return false, err
}