mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
Query for the Nomad service across multiple Consul datacenters.
This commit is contained in:
@@ -37,6 +37,10 @@ const (
|
||||
// open to a server
|
||||
clientMaxStreams = 2
|
||||
|
||||
// datacenterQueryLimit searches through up to this many adjacent
|
||||
// datacenters looking for the Nomad server service.
|
||||
datacenterQueryLimit = 5
|
||||
|
||||
// registerRetryIntv is minimum interval on which we retry
|
||||
// registration. We pick a value between this and 2x this.
|
||||
registerRetryIntv = 15 * time.Second
|
||||
@@ -1282,33 +1286,75 @@ func (c *Client) setupConsulSyncer() error {
|
||||
c.heartbeatLock.Unlock()
|
||||
c.logger.Printf("[TRACE] client.consul: lost heartbeat with Nomad quorum, falling back to Consul for server list")
|
||||
|
||||
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
|
||||
services, _, err := c.consulSyncer.ConsulClient().Catalog().
|
||||
Service(nomadServerServiceName, consul.ServiceTagRpc,
|
||||
&consulapi.QueryOptions{AllowStale: true})
|
||||
consulCatalog := c.consulSyncer.ConsulClient().Catalog()
|
||||
dcs, err := consulCatalog.Datacenters()
|
||||
if err != nil {
|
||||
return fmt.Errorf("client.consul: unable to query service %q: %v", nomadServerServiceName, err)
|
||||
return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err)
|
||||
}
|
||||
dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)]
|
||||
// Walk the list of Consul datacenters randomly in order to
|
||||
// search for the Nomad server service.
|
||||
shuffleStrings(dcs)
|
||||
|
||||
if len(services) == 0 {
|
||||
return fmt.Errorf("client.consul: no Nomad servers advertising service %q", nomadServerServiceName)
|
||||
}
|
||||
|
||||
serverAddrs := make([]string, 0, len(services))
|
||||
for _, s := range services {
|
||||
port := strconv.FormatInt(int64(s.ServicePort), 10)
|
||||
addr := s.ServiceAddress
|
||||
if addr == "" {
|
||||
addr = s.Address
|
||||
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
|
||||
var mErr multierror.Error
|
||||
const defaultMaxNumNomadServers = 8
|
||||
nomadServerServices := make([]string, 0, defaultMaxNumNomadServers)
|
||||
for _, dc := range dcs {
|
||||
opts := &consulapi.QueryOptions{
|
||||
AllowStale: true,
|
||||
Datacenter: dc,
|
||||
Near: "_agent",
|
||||
WaitTime: consul.DefaultQueryWaitDuration,
|
||||
}
|
||||
serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port))
|
||||
consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagRpc, opts)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err))
|
||||
continue
|
||||
}
|
||||
|
||||
for _, s := range consulServices {
|
||||
port := strconv.FormatInt(int64(s.ServicePort), 10)
|
||||
addr := s.ServiceAddress
|
||||
if addr == "" {
|
||||
addr = s.Address
|
||||
}
|
||||
serverAddr := net.JoinHostPort(addr, port)
|
||||
serverEndpoint, err := rpcproxy.NewServerEndpoint(serverAddr)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
var ok bool
|
||||
if ok, err = c.connPool.PingNomadServer(c.Region(), c.RpcMajorVersion(), serverEndpoint); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
if ok {
|
||||
nomadServerServices = append(nomadServerServices, serverAddr)
|
||||
}
|
||||
}
|
||||
// Break if at least one Nomad Server was successfully pinged
|
||||
if len(nomadServerServices) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(nomadServerServices) == 0 {
|
||||
if len(mErr.Errors) > 0 {
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
for i, _ := range dcs {
|
||||
dcs[i] = fmt.Sprintf("%q", dcs[i])
|
||||
}
|
||||
return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %s", nomadServerServiceName, dcs)
|
||||
}
|
||||
|
||||
c.heartbeatLock.Lock()
|
||||
if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) {
|
||||
c.heartbeatLock.Unlock()
|
||||
// Common, healthy path
|
||||
if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil {
|
||||
if err := c.rpcProxy.SetBackupServers(nomadServerServices); err != nil {
|
||||
return fmt.Errorf("client.consul: unable to set backup servers: %v", err)
|
||||
}
|
||||
} else {
|
||||
@@ -1322,7 +1368,7 @@ func (c *Client) setupConsulSyncer() error {
|
||||
// eventually, hopefully, find their way to a Nomad
|
||||
// Server that has quorum (assuming Consul has a
|
||||
// server list that is in the majority).
|
||||
for _, s := range serverAddrs {
|
||||
for _, s := range nomadServerServices {
|
||||
c.rpcProxy.AddPrimaryServer(s)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -43,6 +43,10 @@ const (
|
||||
// the check result
|
||||
ttlCheckBuffer = 31 * time.Second
|
||||
|
||||
// DefaultQueryWaitDuration is the max duration the Consul Agent will
|
||||
// spend waiting for a response from a Consul Query.
|
||||
DefaultQueryWaitDuration = 2 * time.Second
|
||||
|
||||
// ServiceTagHttp is the tag assigned to HTTP services
|
||||
ServiceTagHttp = "http"
|
||||
|
||||
|
||||
Reference in New Issue
Block a user