diff --git a/client/client.go b/client/client.go index c1ff41ece..340edba21 100644 --- a/client/client.go +++ b/client/client.go @@ -9,6 +9,7 @@ import ( "path/filepath" "strconv" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" @@ -111,6 +112,13 @@ type Client struct { connPool *nomad.ConnPool + // lastHeartbeatFromQuorum is an atomic int32 acting as a bool. When + // true, the last heartbeat message had a leader. When false (0), + // the last heartbeat did not include the RPC address of the leader, + // indicating that the server is in the minority or middle of an + // election. + lastHeartbeatFromQuorum int32 + lastHeartbeat time.Time heartbeatTTL time.Duration heartbeatLock sync.Mutex @@ -361,6 +369,8 @@ func (c *Client) Stats() map[string]map[string]string { numAllocs := len(c.allocs) c.allocLock.RUnlock() + c.heartbeatLock.Lock() + defer c.heartbeatLock.Unlock() stats := map[string]map[string]string{ "client": map[string]string{ "node_id": c.Node().ID, @@ -924,8 +934,20 @@ func (c *Client) updateNodeStatus() error { if err := c.rpcProxy.RefreshServerLists(resp.Servers, resp.NumNodes, resp.LeaderRPCAddr); err != nil { return err } - c.consulPullHeartbeatDeadline = time.Now().Add(2 * resp.HeartbeatTTL) + // Begin polling Consul if there is no Nomad leader. We could be + // heartbeating to a Nomad server that is in the minority of a + // partition of the Nomad server quorum, but this Nomad Agent still + // has connectivity to the existing majority of Nomad Servers, but + // only if it queries Consul. + if resp.LeaderRPCAddr == "" { + atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 1, 0) + return nil + } + + const heartbeatFallbackFactor = 3 + atomic.CompareAndSwapInt32(&c.lastHeartbeatFromQuorum, 0, 1) + c.consulPullHeartbeatDeadline = time.Now().Add(heartbeatFallbackFactor * resp.HeartbeatTTL) return nil } @@ -1249,20 +1271,29 @@ func (c *Client) setupConsulSyncer() error { bootstrapFn := func() error { now := time.Now() c.configLock.RLock() - if now.Before(c.consulPullHeartbeatDeadline) { + + // If the last heartbeat didn't contain a leader, give the + // Nomad server this Agent is talking to one more attempt at + // providing a heartbeat that does contain a leader. + if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { c.configLock.RUnlock() return nil } c.configLock.RUnlock() + c.logger.Printf("[TRACE] client.consul: lost heartbeat with Nomad quorum, falling back to Consul for server list") nomadServerServiceName := c.config.ConsulConfig.ServerServiceName services, _, err := c.consulSyncer.ConsulClient().Catalog(). Service(nomadServerServiceName, consul.ServiceTagRpc, &consulapi.QueryOptions{AllowStale: true}) if err != nil { - c.logger.Printf("[WARN] client: unable to query service %q: %v", nomadServerServiceName, err) - return err + return fmt.Errorf("client.consul: unable to query service %q: %v", nomadServerServiceName, err) } + + if len(services) == 0 { + return fmt.Errorf("client.consul: no Nomad servers advertising service %q", nomadServerServiceName) + } + serverAddrs := make([]string, 0, len(services)) for _, s := range services { port := strconv.FormatInt(int64(s.ServicePort), 10) @@ -1273,8 +1304,24 @@ func (c *Client) setupConsulSyncer() error { serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port)) } - if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil { - return err + if atomic.LoadInt32(&c.lastHeartbeatFromQuorum) == 1 && now.Before(c.consulPullHeartbeatDeadline) { + // Common, healthy path + if err := c.rpcProxy.SetBackupServers(serverAddrs); err != nil { + return fmt.Errorf("client.consul: unable to set backup servers: %v", err) + } + } else { + // If this Client is talking with a Server that + // doesn't have a leader, and we have exceeded the + // consulPullHeartbeatDeadline, change the call from + // SetBackupServers() to calling AddPrimaryServer() + // in order to allow the Clients to randomly begin + // considering all known Nomad servers and + // eventually, hopefully, find their way to a Nomad + // Server that has quorum (assuming Consul has a + // server list that is in the majority). + for _, s := range serverAddrs { + c.rpcProxy.AddPrimaryServer(s) + } } return nil diff --git a/client/rpcproxy/rpcproxy.go b/client/rpcproxy/rpcproxy.go index 899f0434b..2541df010 100644 --- a/client/rpcproxy/rpcproxy.go +++ b/client/rpcproxy/rpcproxy.go @@ -645,12 +645,6 @@ func (p *RpcProxy) RefreshServerLists(servers []*structs.NodeServerInfo, numNode // with newer API versions are filtered from the list. If the list // is missing an address found in the RpcProxy's server list, remove // it from the RpcProxy. - // - // FIXME(sean@): This is not true. We rely on an outside pump to set - // these values. In order to catch the orphaned clients where all - // Nomad servers were rolled between the heartbeat interval, the - // rebalance task queries Consul and adds the servers found in Consul - // to the server list in order to reattach an orphan to a server. p.serverListLock.Lock() defer p.serverListLock.Unlock()