mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
Fire retry only when consul discovers new servers
This commit is contained in:
@@ -2086,12 +2086,15 @@ DISCOLOOP:
|
||||
}
|
||||
|
||||
c.logger.Printf("[INFO] client.consul: discovered following Servers: %s", nomadServers)
|
||||
c.servers.SetServers(nomadServers)
|
||||
|
||||
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
|
||||
// isn't receiving on this chan yet they'll still retry eventually.
|
||||
// This is a shortcircuit for the longer retry intervals.
|
||||
c.fireRpcRetryWatcher()
|
||||
// Fire the retry trigger if we have updated the set of servers.
|
||||
if c.servers.SetServers(nomadServers) {
|
||||
// Notify waiting rpc calls. If a goroutine just failed an RPC call and
|
||||
// isn't receiving on this chan yet they'll still retry eventually.
|
||||
// This is a shortcircuit for the longer retry intervals.
|
||||
c.fireRpcRetryWatcher()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -116,6 +117,32 @@ func (s Servers) shuffle() {
|
||||
}
|
||||
}
|
||||
|
||||
func (s Servers) Sort() {
|
||||
sort.Slice(s, func(i, j int) bool {
|
||||
a, b := s[i], s[j]
|
||||
if addr1, addr2 := a.Addr.String(), b.Addr.String(); addr1 == addr2 {
|
||||
return a.DC < b.DC
|
||||
} else {
|
||||
return addr1 < addr2
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Equal returns if the two server lists are equal, including the ordering.
|
||||
func (s Servers) Equal(o Servers) bool {
|
||||
if len(s) != len(o) {
|
||||
return false
|
||||
}
|
||||
|
||||
for i, v := range s {
|
||||
if !v.Equal(o[i]) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
type Manager struct {
|
||||
// servers is the list of all known Nomad servers.
|
||||
servers Servers
|
||||
@@ -167,13 +194,24 @@ func (m *Manager) Start() {
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Manager) SetServers(servers Servers) {
|
||||
// SetServers sets the servers and returns if the new server list is different
|
||||
// than the existing server set
|
||||
func (m *Manager) SetServers(servers Servers) bool {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
// Sort both the existing and incoming servers
|
||||
servers.Sort()
|
||||
m.servers.Sort()
|
||||
|
||||
// Determine if they are equal
|
||||
equal := servers.Equal(m.servers)
|
||||
|
||||
// Randomize the incoming servers
|
||||
servers.shuffle()
|
||||
m.servers = servers
|
||||
|
||||
return !equal
|
||||
}
|
||||
|
||||
// FindServer returns a server to send an RPC too. If there are no servers, nil
|
||||
|
||||
@@ -11,6 +11,7 @@ import (
|
||||
|
||||
"github.com/hashicorp/nomad/client/servers"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type fauxAddr struct {
|
||||
@@ -48,6 +49,7 @@ func testManagerFailProb(t *testing.T, failPct float64) (m *servers.Manager) {
|
||||
}
|
||||
|
||||
func TestServers_SetServers(t *testing.T) {
|
||||
require := require.New(t)
|
||||
m := testManager(t)
|
||||
var num int
|
||||
num = m.NumServers()
|
||||
@@ -57,16 +59,15 @@ func TestServers_SetServers(t *testing.T) {
|
||||
|
||||
s1 := &servers.Server{Addr: &fauxAddr{"server1"}}
|
||||
s2 := &servers.Server{Addr: &fauxAddr{"server2"}}
|
||||
m.SetServers([]*servers.Server{s1, s2})
|
||||
num = m.NumServers()
|
||||
if num != 2 {
|
||||
t.Fatalf("Expected two servers")
|
||||
}
|
||||
require.True(m.SetServers([]*servers.Server{s1, s2}))
|
||||
require.False(m.SetServers([]*servers.Server{s1, s2}))
|
||||
require.False(m.SetServers([]*servers.Server{s2, s1}))
|
||||
require.Equal(2, m.NumServers())
|
||||
require.Len(m.GetServers(), 2)
|
||||
|
||||
all := m.GetServers()
|
||||
if l := len(all); l != 2 {
|
||||
t.Fatalf("expected 2 servers got %d", l)
|
||||
}
|
||||
require.True(m.SetServers([]*servers.Server{s1}))
|
||||
require.Equal(1, m.NumServers())
|
||||
require.Len(m.GetServers(), 1)
|
||||
}
|
||||
|
||||
func TestServers_FindServer(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user