mirror of
https://github.com/kemko/nomad.git
synced 2026-01-03 08:55:43 +03:00
golint(1) police
This commit is contained in:
@@ -101,7 +101,7 @@ type Client struct {
|
||||
|
||||
logger *log.Logger
|
||||
|
||||
rpcProxy *rpcproxy.RpcProxy
|
||||
rpcProxy *rpcproxy.RPCProxy
|
||||
|
||||
connPool *nomad.ConnPool
|
||||
|
||||
@@ -198,7 +198,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
|
||||
// Create the RPC Proxy and bootstrap with the preconfigured list of
|
||||
// static servers
|
||||
c.configLock.RLock()
|
||||
c.rpcProxy = rpcproxy.New(c.logger, c.shutdownCh, c, c.connPool)
|
||||
c.rpcProxy = rpcproxy.NewRPCProxy(c.logger, c.shutdownCh, c, c.connPool)
|
||||
for _, serverAddr := range c.configCopy.Servers {
|
||||
c.rpcProxy.AddPrimaryServer(serverAddr)
|
||||
}
|
||||
@@ -229,7 +229,7 @@ func NewClient(cfg *config.Config, consulSyncer *consul.Syncer) (*Client, error)
|
||||
// Start collecting stats
|
||||
go c.collectHostStats()
|
||||
|
||||
// Start the RpcProxy maintenance task. This task periodically
|
||||
// Start the RPCProxy maintenance task. This task periodically
|
||||
// shuffles the list of Nomad Server Endpoints this Client will use
|
||||
// when communicating with Nomad Servers via RPC. This is done in
|
||||
// order to prevent server fixation in stable Nomad clusters. This
|
||||
@@ -462,9 +462,9 @@ func (c *Client) GetAllocFS(allocID string) (allocdir.AllocDirFS, error) {
|
||||
return ar.ctx.AllocDir, nil
|
||||
}
|
||||
|
||||
// AddPrimaryServerToRpcProxy adds serverAddr to the RPC Proxy's primary
|
||||
// AddPrimaryServerToRPCProxy adds serverAddr to the RPC Proxy's primary
|
||||
// server list.
|
||||
func (c *Client) AddPrimaryServerToRpcProxy(serverAddr string) *rpcproxy.ServerEndpoint {
|
||||
func (c *Client) AddPrimaryServerToRPCProxy(serverAddr string) *rpcproxy.ServerEndpoint {
|
||||
return c.rpcProxy.AddPrimaryServer(serverAddr)
|
||||
}
|
||||
|
||||
@@ -1415,6 +1415,6 @@ func (c *Client) emitStats(hStats *stats.HostStats) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) RpcProxy() *rpcproxy.RpcProxy {
|
||||
func (c *Client) RPCProxy() *rpcproxy.RPCProxy {
|
||||
return c.rpcProxy
|
||||
}
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
// Package rpcproxy provides a proxy interface to Nomad Servers. The
|
||||
// RpcProxy periodically shuffles which server a Nomad Client communicates
|
||||
// RPCProxy periodically shuffles which server a Nomad Client communicates
|
||||
// with in order to redistribute load across Nomad Servers. Nomad Servers
|
||||
// that fail an RPC request are automatically cycled to the end of the list
|
||||
// until the server list is reshuffled.
|
||||
@@ -83,7 +83,9 @@ type serverList struct {
|
||||
L []*ServerEndpoint
|
||||
}
|
||||
|
||||
type RpcProxy struct {
|
||||
// RPCProxy is the manager type responsible for returning and managing Nomad
|
||||
// addresses.
|
||||
type RPCProxy struct {
|
||||
// activatedList manages the list of Nomad Servers that are eligible
|
||||
// to be queried by the Client agent.
|
||||
activatedList atomic.Value
|
||||
@@ -95,7 +97,7 @@ type RpcProxy struct {
|
||||
primaryServers serverList
|
||||
|
||||
// backupServers is a list of fallback servers. These servers are
|
||||
// appended to the RpcProxy's serverList, but are never shuffled with
|
||||
// appended to the RPCProxy's serverList, but are never shuffled with
|
||||
// the list of servers discovered via the Nomad heartbeat. Covered
|
||||
// by serverListLock.
|
||||
backupServers serverList
|
||||
@@ -133,14 +135,15 @@ type RpcProxy struct {
|
||||
notifyFailedBarrier int32
|
||||
}
|
||||
|
||||
// New is the only way to safely create a new RpcProxy.
|
||||
func New(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInfo, connPoolPinger Pinger) (p *RpcProxy) {
|
||||
p = new(RpcProxy)
|
||||
p.logger = logger
|
||||
p.configInfo = configInfo // can't pass *nomad.Client: import cycle
|
||||
p.connPoolPinger = connPoolPinger // can't pass *nomad.ConnPool: import cycle
|
||||
p.rebalanceTimer = time.NewTimer(clientRPCMinReuseDuration)
|
||||
p.shutdownCh = shutdownCh
|
||||
// NewRPCProxy is the only way to safely create a new RPCProxy.
|
||||
func NewRPCProxy(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInfo, connPoolPinger Pinger) *RPCProxy {
|
||||
p := &RPCProxy{
|
||||
logger: logger,
|
||||
configInfo: configInfo, // can't pass *nomad.Client: import cycle
|
||||
connPoolPinger: connPoolPinger, // can't pass *nomad.ConnPool: import cycle
|
||||
rebalanceTimer: time.NewTimer(clientRPCMinReuseDuration),
|
||||
shutdownCh: shutdownCh,
|
||||
}
|
||||
|
||||
l := serverList{}
|
||||
l.L = make([]*ServerEndpoint, 0)
|
||||
@@ -148,10 +151,10 @@ func New(logger *log.Logger, shutdownCh chan struct{}, configInfo NomadConfigInf
|
||||
return p
|
||||
}
|
||||
|
||||
// activateEndpoint adds an endpoint to the RpcProxy's active serverList.
|
||||
// activateEndpoint adds an endpoint to the RPCProxy's active serverList.
|
||||
// Returns true if the server was added, returns false if the server already
|
||||
// existed in the RpcProxy's serverList.
|
||||
func (p *RpcProxy) activateEndpoint(s *ServerEndpoint) bool {
|
||||
// existed in the RPCProxy's serverList.
|
||||
func (p *RPCProxy) activateEndpoint(s *ServerEndpoint) bool {
|
||||
l := p.getServerList()
|
||||
|
||||
// Check if this server is known
|
||||
@@ -188,7 +191,7 @@ func (p *RpcProxy) activateEndpoint(s *ServerEndpoint) bool {
|
||||
// the Nomad Agent lost contact with the list of Nomad Servers provided via
|
||||
// the Nomad Agent's heartbeat. If available, the backup servers are
|
||||
// populated via Consul.
|
||||
func (p *RpcProxy) SetBackupServers(addrs []string) error {
|
||||
func (p *RPCProxy) SetBackupServers(addrs []string) error {
|
||||
l := make([]*ServerEndpoint, 0, len(addrs))
|
||||
for _, s := range addrs {
|
||||
s, err := newServer(s)
|
||||
@@ -215,11 +218,11 @@ func (p *RpcProxy) SetBackupServers(addrs []string) error {
|
||||
// AddPrimaryServer takes the RPC address of a Nomad server, creates a new
|
||||
// endpoint, and adds it to both the primaryServers list and the active
|
||||
// serverList used in the RPC Proxy. If the endpoint is not known by the
|
||||
// RpcProxy, appends the endpoint to the list. The new endpoint will begin
|
||||
// RPCProxy, appends the endpoint to the list. The new endpoint will begin
|
||||
// seeing use after the rebalance timer fires (or enough servers fail
|
||||
// organically). Any values in the primary server list are overridden by the
|
||||
// next successful heartbeat.
|
||||
func (p *RpcProxy) AddPrimaryServer(rpcAddr string) *ServerEndpoint {
|
||||
func (p *RPCProxy) AddPrimaryServer(rpcAddr string) *ServerEndpoint {
|
||||
s, err := newServer(rpcAddr)
|
||||
if err != nil {
|
||||
p.logger.Printf("[WARN] client.rpcproxy: unable to create new primary server from endpoint %q: %v", rpcAddr, err)
|
||||
@@ -298,34 +301,37 @@ func (l *serverList) String() string {
|
||||
// the server list. If the server at the front of the list has failed or
|
||||
// fails during an RPC call, it is rotated to the end of the list. If there
|
||||
// are no servers available, return nil.
|
||||
func (p *RpcProxy) FindServer() *ServerEndpoint {
|
||||
func (p *RPCProxy) FindServer() *ServerEndpoint {
|
||||
l := p.getServerList()
|
||||
numServers := len(l.L)
|
||||
if numServers == 0 {
|
||||
p.logger.Printf("[WARN] client.rpcproxy: No servers available")
|
||||
return nil
|
||||
} else {
|
||||
// Return whatever is at the front of the list because it is
|
||||
// assumed to be the oldest in the server list (unless -
|
||||
// hypothetically - the server list was rotated right after a
|
||||
// server was added).
|
||||
return l.L[0]
|
||||
}
|
||||
|
||||
// Return whatever is at the front of the list because it is
|
||||
// assumed to be the oldest in the server list (unless -
|
||||
// hypothetically - the server list was rotated right after a
|
||||
// server was added).
|
||||
return l.L[0]
|
||||
}
|
||||
|
||||
// getServerList is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (p *RpcProxy) getServerList() serverList {
|
||||
func (p *RPCProxy) getServerList() serverList {
|
||||
return p.activatedList.Load().(serverList)
|
||||
}
|
||||
|
||||
// saveServerList is a convenience method which hides the locking semantics
|
||||
// of atomic.Value from the caller.
|
||||
func (p *RpcProxy) saveServerList(l serverList) {
|
||||
func (p *RPCProxy) saveServerList(l serverList) {
|
||||
p.activatedList.Store(l)
|
||||
}
|
||||
|
||||
func (p *RpcProxy) LeaderAddr() string {
|
||||
// LeaderAddr returns the current leader address. If an empty string, then
|
||||
// the Nomad Server for this Nomad Agent is in the minority or the Nomad
|
||||
// Servers are in the middle of an election.
|
||||
func (p *RPCProxy) LeaderAddr() string {
|
||||
p.listLock.Lock()
|
||||
defer p.listLock.Unlock()
|
||||
return p.leaderAddr
|
||||
@@ -333,7 +339,7 @@ func (p *RpcProxy) LeaderAddr() string {
|
||||
|
||||
// NotifyFailedServer marks the passed in server as "failed" by rotating it
|
||||
// to the end of the server list.
|
||||
func (p *RpcProxy) NotifyFailedServer(s *ServerEndpoint) {
|
||||
func (p *RPCProxy) NotifyFailedServer(s *ServerEndpoint) {
|
||||
l := p.getServerList()
|
||||
|
||||
// If the server being failed is not the first server on the list,
|
||||
@@ -360,13 +366,15 @@ func (p *RpcProxy) NotifyFailedServer(s *ServerEndpoint) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *RpcProxy) NumNodes() int {
|
||||
// NumNodes returns the estimated number of nodes according to the last Nomad
|
||||
// Heartbeat.
|
||||
func (p *RPCProxy) NumNodes() int {
|
||||
return p.numNodes
|
||||
}
|
||||
|
||||
// NumServers takes out an internal "read lock" and returns the number of
|
||||
// servers. numServers includes both healthy and unhealthy servers.
|
||||
func (p *RpcProxy) NumServers() int {
|
||||
func (p *RPCProxy) NumServers() int {
|
||||
l := p.getServerList()
|
||||
return len(l.L)
|
||||
}
|
||||
@@ -383,7 +391,7 @@ func (p *RpcProxy) NumServers() int {
|
||||
// Unhealthy servers are removed from the server list during the next client
|
||||
// heartbeat. Before the newly shuffled server list is saved, the new remote
|
||||
// endpoint is tested to ensure its responsive.
|
||||
func (p *RpcProxy) RebalanceServers() {
|
||||
func (p *RPCProxy) RebalanceServers() {
|
||||
var serverListLocked bool
|
||||
p.serverListLock.Lock()
|
||||
serverListLocked = true
|
||||
@@ -495,7 +503,7 @@ func (p *RpcProxy) RebalanceServers() {
|
||||
// (i.e. was removed by Nomad during a PingNomadServer() call. Newly added
|
||||
// servers are appended to the list and other missing servers are removed
|
||||
// from the list.
|
||||
func (p *RpcProxy) reconcileServerList(l *serverList) bool {
|
||||
func (p *RPCProxy) reconcileServerList(l *serverList) bool {
|
||||
p.listLock.Lock()
|
||||
defer p.listLock.Unlock()
|
||||
|
||||
@@ -559,7 +567,7 @@ func (p *RpcProxy) reconcileServerList(l *serverList) bool {
|
||||
|
||||
// RemoveServer takes out an internal write lock and removes a server from
|
||||
// the activated server list.
|
||||
func (p *RpcProxy) RemoveServer(s *ServerEndpoint) {
|
||||
func (p *RPCProxy) RemoveServer(s *ServerEndpoint) {
|
||||
// Lock hierarchy protocol dictates serverListLock is acquired first.
|
||||
p.serverListLock.Lock()
|
||||
defer p.serverListLock.Unlock()
|
||||
@@ -577,7 +585,7 @@ func (p *RpcProxy) RemoveServer(s *ServerEndpoint) {
|
||||
}
|
||||
|
||||
// refreshServerRebalanceTimer is only called once p.rebalanceTimer expires.
|
||||
func (p *RpcProxy) refreshServerRebalanceTimer() time.Duration {
|
||||
func (p *RPCProxy) refreshServerRebalanceTimer() time.Duration {
|
||||
l := p.getServerList()
|
||||
numServers := len(l.L)
|
||||
// Limit this connection's life based on the size (and health) of the
|
||||
@@ -595,14 +603,14 @@ func (p *RpcProxy) refreshServerRebalanceTimer() time.Duration {
|
||||
|
||||
// ResetRebalanceTimer resets the rebalance timer. This method exists for
|
||||
// testing and should not be used directly.
|
||||
func (p *RpcProxy) ResetRebalanceTimer() {
|
||||
func (p *RPCProxy) ResetRebalanceTimer() {
|
||||
p.listLock.Lock()
|
||||
defer p.listLock.Unlock()
|
||||
p.rebalanceTimer.Reset(clientRPCMinReuseDuration)
|
||||
}
|
||||
|
||||
// ServerRPCAddrs returns one RPC Address per server
|
||||
func (p *RpcProxy) ServerRPCAddrs() []string {
|
||||
func (p *RPCProxy) ServerRPCAddrs() []string {
|
||||
l := p.getServerList()
|
||||
serverAddrs := make([]string, 0, len(l.L))
|
||||
for _, s := range l.L {
|
||||
@@ -617,7 +625,7 @@ func (p *RpcProxy) ServerRPCAddrs() []string {
|
||||
// automatically cycled to the end of the list. New servers are appended to
|
||||
// the list. The order of the server list must be shuffled periodically to
|
||||
// distribute load across all known and available Nomad servers.
|
||||
func (p *RpcProxy) Run() {
|
||||
func (p *RPCProxy) Run() {
|
||||
for {
|
||||
select {
|
||||
case <-p.rebalanceTimer.C:
|
||||
@@ -636,15 +644,15 @@ func (p *RpcProxy) Run() {
|
||||
// Nomad Servers that the Nomad Client should use for RPC requests.
|
||||
// RefreshServerLists does not rebalance its serverLists (that is handled
|
||||
// elsewhere via a periodic timer). New Nomad Servers learned via the
|
||||
// heartbeat are appended to the RpcProxy's activated serverList. Servers
|
||||
// heartbeat are appended to the RPCProxy's activated serverList. Servers
|
||||
// that are no longer present in the Heartbeat are removed immediately from
|
||||
// all server lists. Nomad Servers speaking a newer major or minor API
|
||||
// version are filtered from the serverList.
|
||||
func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRpcAddr string) error {
|
||||
func (p *RPCProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRPCAddr string) error {
|
||||
// Merge all servers found in the response. Servers in the response
|
||||
// with newer API versions are filtered from the list. If the list
|
||||
// is missing an address found in the RpcProxy's server list, remove
|
||||
// it from the RpcProxy.
|
||||
// is missing an address found in the RPCProxy's server list, remove
|
||||
// it from the RPCProxy.
|
||||
|
||||
p.serverListLock.Lock()
|
||||
defer p.serverListLock.Unlock()
|
||||
@@ -678,7 +686,7 @@ func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNode
|
||||
// spamming the logs every heartbeat.
|
||||
//
|
||||
// TODO(sean@): Move the logging throttle logic into a
|
||||
// dedicated logging package so RpcProxy does not have to
|
||||
// dedicated logging package so RPCProxy does not have to
|
||||
// perform this accounting.
|
||||
if int32(p.configInfo.RpcMajorVersion()) < s.RpcMajorVersion ||
|
||||
(int32(p.configInfo.RpcMajorVersion()) == s.RpcMajorVersion &&
|
||||
@@ -755,7 +763,7 @@ func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNode
|
||||
}
|
||||
|
||||
p.numNodes = int(numNodes)
|
||||
p.leaderAddr = leaderRpcAddr
|
||||
p.leaderAddr = leaderRPCAddr
|
||||
p.saveServerList(newServerCfg)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -90,25 +90,25 @@ func (s *fauxSerf) RpcMinorVersion() int {
|
||||
return s.rpcMinorVersion
|
||||
}
|
||||
|
||||
func testRpcProxy() (p *RpcProxy) {
|
||||
func testRPCProxy() (p *RPCProxy) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
p = New(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
|
||||
p = NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: 16384}, &fauxConnPool{})
|
||||
return p
|
||||
}
|
||||
|
||||
func testRpcProxyFailProb(failPct float64) (p *RpcProxy) {
|
||||
func testRPCProxyFailProb(failPct float64) (p *RPCProxy) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
p = New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||
p = NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{failPct: failPct})
|
||||
return p
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) AddPrimaryServer(server *ServerEndpoint) {
|
||||
func TestRpcProxy_AddPrimaryServer(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) AddPrimaryServer(server *ServerEndpoint) {
|
||||
func TestRPCProxy_AddPrimaryServer(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
var num int
|
||||
num = p.NumServers()
|
||||
if num != 0 {
|
||||
@@ -154,9 +154,9 @@ func TestRpcProxy_AddPrimaryServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) FindServer() (server *ServerEndpoint) {
|
||||
func TestRpcProxy_FindServer(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) FindServer() (server *ServerEndpoint) {
|
||||
func TestRPCProxy_FindServer(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
|
||||
if p.FindServer() != nil {
|
||||
t.Fatalf("Expected nil return")
|
||||
@@ -204,26 +204,26 @@ func TestRpcProxy_FindServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func New(logger *log.Logger, shutdownCh chan struct{}) (p *RpcProxy) {
|
||||
func TestRpcProxy_New(t *testing.T) {
|
||||
// func New(logger *log.Logger, shutdownCh chan struct{}) (p *RPCProxy) {
|
||||
func TestRPCProxy_New(t *testing.T) {
|
||||
logger := GetBufferedLogger()
|
||||
logger = log.New(os.Stderr, "", log.LstdFlags)
|
||||
shutdownCh := make(chan struct{})
|
||||
p := New(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
p := NewRPCProxy(logger, shutdownCh, &fauxSerf{}, &fauxConnPool{})
|
||||
if p == nil {
|
||||
t.Fatalf("RpcProxy nil")
|
||||
t.Fatalf("RPCProxy nil")
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) NotifyFailedServer(server *ServerEndpoint) {
|
||||
func TestRpcProxy_NotifyFailedServer(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) NotifyFailedServer(server *ServerEndpoint) {
|
||||
func TestRPCProxy_NotifyFailedServer(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
|
||||
if p.NumServers() != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
|
||||
// Try notifying for a server that is not managed by RpcProxy
|
||||
// Try notifying for a server that is not managed by RPCProxy
|
||||
s1Endpoint := makeServerEndpointName()
|
||||
s1 := p.AddPrimaryServer(s1Endpoint)
|
||||
if s1 == nil {
|
||||
@@ -257,7 +257,7 @@ func TestRpcProxy_NotifyFailedServer(t *testing.T) {
|
||||
t.Fatalf("Expected one server")
|
||||
}
|
||||
|
||||
// Re-add s2 so there are two servers in the RpcProxy server list
|
||||
// Re-add s2 so there are two servers in the RPCProxy server list
|
||||
s2 = p.AddPrimaryServer(s2Endpoint)
|
||||
if p.NumServers() != 2 {
|
||||
t.Fatalf("Expected two servers")
|
||||
@@ -291,9 +291,9 @@ func TestRpcProxy_NotifyFailedServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) NumServers() (numServers int) {
|
||||
func TestRpcProxy_NumServers(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) NumServers() (numServers int) {
|
||||
func TestRPCProxy_NumServers(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
const maxNumServers = 100
|
||||
serverList := make([]*ServerEndpoint, 0, maxNumServers)
|
||||
|
||||
@@ -330,10 +330,10 @@ func TestRpcProxy_NumServers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) RebalanceServers() {
|
||||
func TestRpcProxy_RebalanceServers(t *testing.T) {
|
||||
// func (p *RPCProxy) RebalanceServers() {
|
||||
func TestRPCProxy_RebalanceServers(t *testing.T) {
|
||||
const failPct = 0.5
|
||||
p := testRpcProxyFailProb(failPct)
|
||||
p := testRPCProxyFailProb(failPct)
|
||||
const maxServers = 100
|
||||
const numShuffleTests = 100
|
||||
const uniquePassRate = 0.5
|
||||
@@ -366,9 +366,9 @@ func TestRpcProxy_RebalanceServers(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) RemoveServer(server *ServerEndpoint) {
|
||||
func TestRpcProxy_RemoveServer(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) RemoveServer(server *ServerEndpoint) {
|
||||
func TestRPCProxy_RemoveServer(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
if p.NumServers() != 0 {
|
||||
t.Fatalf("Expected zero servers to start")
|
||||
}
|
||||
@@ -532,11 +532,11 @@ func TestRpcProxy_RemoveServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) Start() {
|
||||
// func (p *RPCProxy) Start() {
|
||||
|
||||
// func (l *serverList) cycleServer() (servers []*Server) {
|
||||
func TestRpcProxyInternal_cycleServer(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
func TestRPCProxyInternal_cycleServer(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
l := p.getServerList()
|
||||
|
||||
server0 := &ServerEndpoint{Name: "server1"}
|
||||
@@ -586,9 +586,9 @@ func TestRpcProxyInternal_cycleServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) getServerList() serverList {
|
||||
func TestRpcProxyInternal_getServerList(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) getServerList() serverList {
|
||||
func TestRPCProxyInternal_getServerList(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
l := p.getServerList()
|
||||
if l.L == nil {
|
||||
t.Fatalf("serverList.servers nil")
|
||||
@@ -599,8 +599,8 @@ func TestRpcProxyInternal_getServerList(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRpcProxyInternal_New(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
func TestRPCProxyInternal_New(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
if p == nil {
|
||||
t.Fatalf("bad")
|
||||
}
|
||||
@@ -614,8 +614,8 @@ func TestRpcProxyInternal_New(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) reconcileServerList(l *serverList) bool {
|
||||
func TestRpcProxyInternal_reconcileServerList(t *testing.T) {
|
||||
// func (p *RPCProxy) reconcileServerList(l *serverList) bool {
|
||||
func TestRPCProxyInternal_reconcileServerList(t *testing.T) {
|
||||
tests := []int{0, 1, 2, 3, 4, 5, 10, 100}
|
||||
for _, n := range tests {
|
||||
ok, err := test_reconcileServerList(n)
|
||||
@@ -630,14 +630,14 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
// missing, the added have been added, and the original server is
|
||||
// present.
|
||||
const failPct = 0.5
|
||||
p := testRpcProxyFailProb(failPct)
|
||||
p := testRPCProxyFailProb(failPct)
|
||||
|
||||
var failedServers, healthyServers []*ServerEndpoint
|
||||
for i := 0; i < maxServers; i++ {
|
||||
nodeName := fmt.Sprintf("s%02d", i)
|
||||
|
||||
node := &ServerEndpoint{Name: nodeName}
|
||||
// Add 66% of servers to RpcProxy
|
||||
// Add 66% of servers to RPCProxy
|
||||
if rand.Float64() > 0.33 {
|
||||
p.activateEndpoint(node)
|
||||
|
||||
@@ -658,7 +658,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Randomize RpcProxy's server list
|
||||
// Randomize RPCProxy's server list
|
||||
p.RebalanceServers()
|
||||
selectedServer := p.FindServer()
|
||||
|
||||
@@ -670,7 +670,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
}
|
||||
}
|
||||
|
||||
// Update RpcProxy's server list to be "healthy" based on Serf.
|
||||
// Update RPCProxy's server list to be "healthy" based on Serf.
|
||||
// Reconcile this with origServers, which is shuffled and has a live
|
||||
// connection, but possibly out of date.
|
||||
origServers := p.getServerList()
|
||||
@@ -701,7 +701,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
resultingServerMap[*s.Key()] = true
|
||||
}
|
||||
|
||||
// Test to make sure no failed servers are in the RpcProxy's
|
||||
// Test to make sure no failed servers are in the RPCProxy's
|
||||
// list. Error if there are any failedServers in l.servers
|
||||
for _, s := range failedServers {
|
||||
_, ok := resultingServerMap[*s.Key()]
|
||||
@@ -726,7 +726,7 @@ func test_reconcileServerList(maxServers int) (bool, error) {
|
||||
}
|
||||
|
||||
// func (l *serverList) refreshServerRebalanceTimer() {
|
||||
func TestRpcProxyInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||
func TestRPCProxyInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||
type clusterSizes struct {
|
||||
numNodes int
|
||||
numServers int
|
||||
@@ -765,7 +765,7 @@ func TestRpcProxyInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||
shutdownCh := make(chan struct{})
|
||||
|
||||
for i, s := range clusters {
|
||||
p := New(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
||||
p := NewRPCProxy(logger, shutdownCh, &fauxSerf{numNodes: s.numNodes}, &fauxConnPool{})
|
||||
for i := 0; i < s.numServers; i++ {
|
||||
nodeName := fmt.Sprintf("s%02d", i)
|
||||
p.activateEndpoint(&ServerEndpoint{Name: nodeName})
|
||||
@@ -778,15 +778,15 @@ func TestRpcProxyInternal_refreshServerRebalanceTimer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// func (p *RpcProxy) saveServerList(l serverList) {
|
||||
func TestRpcProxyInternal_saveServerList(t *testing.T) {
|
||||
p := testRpcProxy()
|
||||
// func (p *RPCProxy) saveServerList(l serverList) {
|
||||
func TestRPCProxyInternal_saveServerList(t *testing.T) {
|
||||
p := testRPCProxy()
|
||||
|
||||
// Initial condition
|
||||
func() {
|
||||
l := p.getServerList()
|
||||
if len(l.L) != 0 {
|
||||
t.Fatalf("RpcProxy.saveServerList failed to load init config")
|
||||
t.Fatalf("RPCProxy.saveServerList failed to load init config")
|
||||
}
|
||||
|
||||
newServer := new(ServerEndpoint)
|
||||
@@ -799,7 +799,7 @@ func TestRpcProxyInternal_saveServerList(t *testing.T) {
|
||||
l1 := p.getServerList()
|
||||
t1NumServers := len(l1.L)
|
||||
if t1NumServers != 1 {
|
||||
t.Fatalf("RpcProxy.saveServerList failed to save mutated config")
|
||||
t.Fatalf("RPCProxy.saveServerList failed to save mutated config")
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -812,7 +812,7 @@ func TestRpcProxyInternal_saveServerList(t *testing.T) {
|
||||
l_orig := p.getServerList()
|
||||
origNumServers := len(l_orig.L)
|
||||
if origNumServers >= len(l.L) {
|
||||
t.Fatalf("RpcProxy.saveServerList unsaved config overwrote original")
|
||||
t.Fatalf("RPCProxy.saveServerList unsaved config overwrote original")
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -139,7 +139,7 @@ func (s *HTTPServer) listServers(resp http.ResponseWriter, req *http.Request) (i
|
||||
return nil, CodedError(501, ErrInvalidMethod)
|
||||
}
|
||||
|
||||
peers := s.agent.client.RpcProxy().ServerRPCAddrs()
|
||||
peers := s.agent.client.RPCProxy().ServerRPCAddrs()
|
||||
return peers, nil
|
||||
}
|
||||
|
||||
@@ -158,7 +158,7 @@ func (s *HTTPServer) updateServers(resp http.ResponseWriter, req *http.Request)
|
||||
// Set the servers list into the client
|
||||
for _, server := range servers {
|
||||
s.agent.logger.Printf("[TRACE] Adding server %s to the client's primary server list", server)
|
||||
se := client.AddPrimaryServerToRpcProxy(server)
|
||||
se := client.AddPrimaryServerToRPCProxy(server)
|
||||
if se == nil {
|
||||
s.agent.logger.Printf("[ERR] Attempt to add server %q to client failed", server)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user