From 1ec537450e4f4d48ad12a32d8e136a74fadebd2a Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 2 Jun 2016 00:12:30 -0700 Subject: [PATCH] Rename rpcproxy.UpdateFromNodeUpdateResponse to RefreshServerLists While breaking the API within this PR, break out the individual arguments to RefreshServerLists. The servers parameter is reusing `structs.NodeServerInfo` for the time being, but this can be revisited if the needs of the strucutre diverge in the future. --- client/client.go | 2 +- client/rpcproxy/rpcproxy.go | 32 +++++++++++++++++--------------- 2 files changed, 18 insertions(+), 16 deletions(-) diff --git a/client/client.go b/client/client.go index a85dad3ee..3ac5bce95 100644 --- a/client/client.go +++ b/client/client.go @@ -915,7 +915,7 @@ func (c *Client) updateNodeStatus() error { c.lastHeartbeat = time.Now() c.heartbeatTTL = resp.HeartbeatTTL - if err := c.rpcProxy.UpdateFromNodeUpdateResponse(&resp); err != nil { + if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil { return err } c.consulPullHeartbeatDeadline = time.Now().Add(2 * resp.HeartbeatTTL) diff --git a/client/rpcproxy/rpcproxy.go b/client/rpcproxy/rpcproxy.go index 18ee9053d..899f0434b 100644 --- a/client/rpcproxy/rpcproxy.go +++ b/client/rpcproxy/rpcproxy.go @@ -631,14 +631,16 @@ func (p *RpcProxy) Run() { } } -// UpdateFromNodeUpdateResponse handles heartbeat responses from Nomad -// Servers. Heartbeats contain a list of Nomad Servers that the client -// should talk with for RPC requests. UpdateFromNodeUpdateResponse does not -// rebalance its serverList, that is handled elsewhere. New servers learned -// via the heartbeat are appended to the RpcProxy's serverList. Removed -// servers are removed immediately. Servers speaking a newer RPC version are -// filtered from the serverList. -func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse) error { +// RefreshServerLists is called when the Client receives an update from a +// Nomad Server. The response from Nomad Client Heartbeats contain a list of +// Nomad Servers that the Nomad Client should use for RPC requests. +// RefreshServerLists does not rebalance its serverLists (that is handled +// elsewhere via a periodic timer). New Nomad Servers learned via the +// heartbeat are appended to the RpcProxy's activated serverList. Servers +// that are no longer present in the Heartbeat are removed immediately from +// all server lists. Nomad Servers speaking a newer major or minor API +// version are filtered from the serverList. +func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNodes int32, leaderRpcAddr string) error { // Merge all servers found in the response. Servers in the response // with newer API versions are filtered from the list. If the list // is missing an address found in the RpcProxy's server list, remove @@ -655,12 +657,12 @@ func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse // Clear the backup server list when a heartbeat contains at least // one server. - if len(resp.Servers) > 0 && len(p.backupServers.L) > 0 { - p.backupServers.L = make([]*ServerEndpoint, 0, len(resp.Servers)) + if len(servers) > 0 && len(p.backupServers.L) > 0 { + p.backupServers.L = make([]*ServerEndpoint, 0, len(servers)) } // 1) Create a map to reconcile the difference between - // p.primaryServers and resp.Servers. + // p.primaryServers and servers. type targetServer struct { server *ServerEndpoint @@ -669,7 +671,7 @@ func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse // 'n' == new state byte } - mergedPrimaryMap := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(resp.Servers)) + mergedPrimaryMap := make(map[EndpointKey]*targetServer, len(p.primaryServers.L)+len(servers)) numOldServers := 0 for _, s := range p.primaryServers.L { mergedPrimaryMap[*s.Key()] = &targetServer{server: s, state: 'o'} @@ -677,7 +679,7 @@ func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse } numBothServers := 0 var newServers bool - for _, s := range resp.Servers { + for _, s := range servers { // Filter out servers using a newer API version. Prevent // spamming the logs every heartbeat. // @@ -758,8 +760,8 @@ func (p *RpcProxy) UpdateFromNodeUpdateResponse(resp *structs.NodeUpdateResponse } } - p.numNodes = int(resp.NumNodes) - p.leaderAddr = resp.LeaderRPCAddr + p.numNodes = int(numNodes) + p.leaderAddr = leaderRpcAddr p.saveServerList(newServerCfg) return nil