From 6819f2b68d91151df7d492768b67760f2c4f435b Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Fri, 10 Jun 2016 23:05:14 -0400 Subject: [PATCH] Query for the Nomad service across multiple Consul datacenters. --- client/client.go | 82 ++++++++++++++++++++++++++-------- command/agent/consul/syncer.go | 4 ++ 2 files changed, 68 insertions(+), 18 deletions(-) diff --git a/client/client.go b/client/client.go index a0503d0fe..35c11834c 100644 --- a/client/client.go +++ b/client/client.go @@ -37,6 +37,10 @@ const ( // open to a server clientMaxStreams = 2 + // datacenterQueryLimit searches through up to this many adjacent + // datacenters looking for the Nomad server service. + datacenterQueryLimit = 5 + // registerRetryIntv is minimum interval on which we retry // registration. We pick a value between this and 2x this. registerRetryIntv = 15 * time.Second @@ -1282,33 +1286,75 @@ func (c *Client) setupConsulSyncer() error { c.heartbeatLock.Unlock() c.logger.Printf("[TRACE] client.consul: lost heartbeat with Nomad quorum, falling back to Consul for server list") - nomadServerServiceName := c.config.ConsulConfig.ServerServiceName - services, _, err := c.consulSyncer.ConsulClient().Catalog(). - Service(nomadServerServiceName, consul.ServiceTagRpc, - &consulapi.QueryOptions{AllowStale: true}) + consulCatalog := c.consulSyncer.ConsulClient().Catalog() + dcs, err := consulCatalog.Datacenters() if err != nil { - return fmt.Errorf("client.consul: unable to query service %q: %v", nomadServerServiceName, err) + return fmt.Errorf("client.consul: unable to query Consul datacenters: %v", err) } + dcs = dcs[0:lib.MinInt(len(dcs), datacenterQueryLimit)] + // Walk the list of Consul datacenters randomly in order to + // search for the Nomad server service. + shuffleStrings(dcs) - if len(services) == 0 { - return fmt.Errorf("client.consul: no Nomad servers advertising service %q", nomadServerServiceName) - } - - serverAddrs := make([]string, 0, len(services)) - for _, s := range services { - port := strconv.FormatInt(int64(s.ServicePort), 10) - addr := s.ServiceAddress - if addr == "" { - addr = s.Address + nomadServerServiceName := c.config.ConsulConfig.ServerServiceName + var mErr multierror.Error + const defaultMaxNumNomadServers = 8 + nomadServerServices := make([]string, 0, defaultMaxNumNomadServers) + for _, dc := range dcs { + opts := &consulapi.QueryOptions{ + AllowStale: true, + Datacenter: dc, + Near: "_agent", + WaitTime: consul.DefaultQueryWaitDuration, } - serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port)) + consulServices, _, err := consulCatalog.Service(nomadServerServiceName, consul.ServiceTagRpc, opts) + if err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("unable to query service %q from Consul datacenter %q: %v", nomadServerServiceName, dc, err)) + continue + } + + for _, s := range consulServices { + port := strconv.FormatInt(int64(s.ServicePort), 10) + addr := s.ServiceAddress + if addr == "" { + addr = s.Address + } + serverAddr := net.JoinHostPort(addr, port) + serverEndpoint, err := rpcproxy.NewServerEndpoint(serverAddr) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + continue + } + var ok bool + if ok, err = c.connPool.PingNomadServer(c.Region(), c.RpcMajorVersion(), serverEndpoint); err != nil { + mErr.Errors = append(mErr.Errors, err) + continue + } + if ok { + nomadServerServices = append(nomadServerServices, serverAddr) + } + } + // Break if at least one Nomad Server was successfully pinged + if len(nomadServerServices) > 0 { + break + } + } + if len(nomadServerServices) == 0 { + if len(mErr.Errors) > 0 { + return mErr.ErrorOrNil() + } + + for i, _ := range dcs { + dcs[i] = fmt.Sprintf("%q", dcs[i]) + } + return fmt.Errorf("no Nomad Servers advertising service %q in Consul datacenters: %s", nomadServerServiceName, dcs) } c.heartbeatLock.Lock() if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { c.heartbeatLock.Unlock() // Common, healthy path - if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil { + if err := c.rpcProxy.SetBackupServers(nomadServerServices); err != nil { return fmt.Errorf("client.consul: unable to set backup servers: %v", err) } } else { @@ -1322,7 +1368,7 @@ func (c *Client) setupConsulSyncer() error { // eventually, hopefully, find their way to a Nomad // Server that has quorum (assuming Consul has a // server list that is in the majority). - for _, s := range serverAddrs { + for _, s := range nomadServerServices { c.rpcProxy.AddPrimaryServer(s) } } diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 97ca8ed16..c3062ad50 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -43,6 +43,10 @@ const ( // the check result ttlCheckBuffer = 31 * time.Second + // DefaultQueryWaitDuration is the max duration the Consul Agent will + // spend waiting for a response from a Consul Query. + DefaultQueryWaitDuration = 2 * time.Second + // ServiceTagHttp is the tag assigned to HTTP services ServiceTagHttp = "http"