mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
Merge pull request #5654 from hashicorp/b-hearbeat-lockfix
Remove unnecessary locking and serverlist syncing in heartbeats
This commit is contained in:
@@ -315,6 +315,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulServic
|
||||
// Initialize the server manager
|
||||
c.servers = servers.New(c.logger, c.shutdownCh, c)
|
||||
|
||||
// Start server manager rebalancing go routine
|
||||
go c.servers.Start()
|
||||
|
||||
// Initialize the client
|
||||
if err := c.init(); err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize client: %v", err)
|
||||
@@ -1345,7 +1348,6 @@ func (c *Client) registerAndHeartbeat() {
|
||||
case <-c.shutdownCh:
|
||||
return
|
||||
}
|
||||
|
||||
if err := c.updateNodeStatus(); err != nil {
|
||||
// The servers have changed such that this node has not been
|
||||
// registered before
|
||||
@@ -2342,13 +2344,6 @@ func (c *Client) consulDiscovery() {
|
||||
func (c *Client) consulDiscoveryImpl() error {
|
||||
consulLogger := c.logger.Named("consul")
|
||||
|
||||
// Acquire heartbeat lock to prevent heartbeat from running
|
||||
// concurrently with discovery. Concurrent execution is safe, however
|
||||
// discovery is usually triggered when heartbeating has failed so
|
||||
// there's no point in allowing it.
|
||||
c.heartbeatLock.Lock()
|
||||
defer c.heartbeatLock.Unlock()
|
||||
|
||||
dcs, err := c.consulCatalog.Datacenters()
|
||||
if err != nil {
|
||||
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
|
||||
|
||||
@@ -201,12 +201,16 @@ func (m *Manager) SetServers(servers Servers) bool {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Sort both the existing and incoming servers
|
||||
servers.Sort()
|
||||
m.servers.Sort()
|
||||
|
||||
// Determine if they are equal
|
||||
equal := servers.Equal(m.servers)
|
||||
equal := m.serversAreEqual(servers)
|
||||
|
||||
// If server list is equal don't change the list and return immediately
|
||||
// This prevents unnecessary shuffling of a failed server that was moved to the
|
||||
// bottom of the list
|
||||
if equal {
|
||||
m.logger.Debug("Not replacing server list, current server list is identical to servers discovered in Consul")
|
||||
return !equal
|
||||
}
|
||||
|
||||
// Randomize the incoming servers
|
||||
servers.shuffle()
|
||||
@@ -215,6 +219,23 @@ func (m *Manager) SetServers(servers Servers) bool {
|
||||
return !equal
|
||||
}
|
||||
|
||||
// Method to check if the arg list of servers is equal to the one we already have
|
||||
func (m *Manager) serversAreEqual(servers Servers) bool {
|
||||
// We use a copy of the server list here because determining
|
||||
// equality requires a sort step which modifies the order of the server list
|
||||
var copy Servers
|
||||
copy = make([]*Server, 0, len(m.servers))
|
||||
for _, s := range m.servers {
|
||||
copy = append(copy, s.Copy())
|
||||
}
|
||||
|
||||
// Sort both the existing and incoming servers
|
||||
copy.Sort()
|
||||
servers.Sort()
|
||||
|
||||
return copy.Equal(servers)
|
||||
}
|
||||
|
||||
// FindServer returns a server to send an RPC too. If there are no servers, nil
|
||||
// is returned.
|
||||
func (m *Manager) FindServer() *Server {
|
||||
|
||||
@@ -66,6 +66,19 @@ func TestServers_SetServers(t *testing.T) {
|
||||
require.True(m.SetServers([]*servers.Server{s1}))
|
||||
require.Equal(1, m.NumServers())
|
||||
require.Len(m.GetServers(), 1)
|
||||
|
||||
// Test that the list of servers does not get shuffled
|
||||
// as a side effect when incoming list is equal
|
||||
require.True(m.SetServers([]*servers.Server{s1, s2}))
|
||||
before := m.GetServers()
|
||||
require.False(m.SetServers([]*servers.Server{s1, s2}))
|
||||
after := m.GetServers()
|
||||
require.Equal(before, after)
|
||||
|
||||
// Send a shuffled list, verify original order doesn't change
|
||||
require.False(m.SetServers([]*servers.Server{s2, s1}))
|
||||
afterShuffledInput := m.GetServers()
|
||||
require.Equal(after, afterShuffledInput)
|
||||
}
|
||||
|
||||
func TestServers_FindServer(t *testing.T) {
|
||||
|
||||
@@ -7,6 +7,9 @@ data_dir = "/tmp/client1"
|
||||
# Give the agent a unique name. Defaults to hostname
|
||||
name = "client1"
|
||||
|
||||
# Enable debugging
|
||||
enable_debug = true
|
||||
|
||||
# Enable the client
|
||||
client {
|
||||
enabled = true
|
||||
|
||||
@@ -7,6 +7,9 @@ data_dir = "/tmp/client2"
|
||||
# Give the agent a unique name. Defaults to hostname
|
||||
name = "client2"
|
||||
|
||||
# Enable debugging
|
||||
enable_debug = true
|
||||
|
||||
# Enable the client
|
||||
client {
|
||||
enabled = true
|
||||
|
||||
@@ -7,6 +7,9 @@ data_dir = "/tmp/client3"
|
||||
# Give the agent a unique name. Defaults to hostname
|
||||
name = "client3"
|
||||
|
||||
# Enable debugging
|
||||
enable_debug = true
|
||||
|
||||
# Enable the client
|
||||
client {
|
||||
enabled = true
|
||||
|
||||
Reference in New Issue
Block a user