Only poll Consul for servers when Nomad heartbeats begin to fail

When a deadline timer of 2x Server's last requested TTL expires,
begin polling Consul for Nomad Servers.
This commit is contained in:
Sean Chittenden
2016-05-23 23:23:57 -07:00
parent c159e77fe3
commit 17927c8b9c
2 changed files with 212 additions and 50 deletions

View File

@@ -4,13 +4,16 @@ import (
"fmt"
"io/ioutil"
"log"
"net"
"os"
"path/filepath"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/armon/go-metrics"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocdir"
@@ -76,6 +79,13 @@ const (
// consulSyncInterval is the interval at which the client syncs with consul
// to remove services and checks which are no longer valid
consulSyncInterval = 15 * time.Second
// consulSyncDelay specifies the initial sync delay when starting the
// Nomad Agent's consul.Syncer.
consulSyncDelay = 5 * time.Second
// Add a little jitter to the agent's consul.Syncer task
consulSyncJitter = 8
)
// DefaultConfig returns the default configuration
@@ -113,6 +123,12 @@ type Client struct {
configCopy *config.Config
configLock sync.RWMutex
// backupServerDeadline is the deadline at which this Nomad Agent
// will begin polling Consul for a list of Nomad Servers. When Nomad
// Clients are heartbeating successfully with Nomad Servers, Nomad
// Clients do not poll Consul for a backup server list.
backupServerDeadline time.Time
logger *log.Logger
rpcProxy *rpc_proxy.RpcProxy
@@ -132,6 +148,7 @@ type Client struct {
// consulSyncer advertises this Nomad Agent with Consul
consulSyncer *consul.Syncer
consulLock int64
// HostStatsCollector collects host resource usage stats
hostStatsCollector *stats.HostStatsCollector
@@ -205,9 +222,9 @@ func NewClient(cfg *config.Config) (*Client, error) {
return nil, fmt.Errorf("failed to restore state: %v", err)
}
// Setup the consul client
if err := c.setupConsulClient(); err != nil {
return nil, fmt.Errorf("failed to create consul client: %v")
// Setup the Consul syncer
if err := c.setupConsulSyncer(); err != nil {
return nil, fmt.Errorf("failed to create Consul syncer: %v")
}
// Register and then start heartbeating to the servers.
@@ -228,8 +245,8 @@ func NewClient(cfg *config.Config) (*Client, error) {
// Start maintenance task for servers
go c.rpcProxy.Run()
// Start the consul sync
go c.syncConsul()
// Start the Consul sync
go c.runClientConsulSyncer()
return c, nil
}
@@ -902,6 +919,7 @@ func (c *Client) updateNodeStatus() error {
if err := c.rpcProxy.UpdateFromNodeUpdateResponse(&resp); err != nil {
return err
}
c.backupServerDeadline = time.Now().Add(2 * resp.HeartbeatTTL)
return nil
}
@@ -1212,53 +1230,111 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error {
return nil
}
// setupConsulClient creates a consul.Syncer
func (c *Client) setupConsulClient() error {
cs, err := consul.NewSyncer(c.config.ConsulAgentConfig, c.logger)
// setupConsulSyncer creates a consul.Syncer
func (c *Client) setupConsulSyncer() error {
cs, err := consul.NewSyncer(c.config.ConsulConfig, c.logger)
if err != nil {
return err
}
c.consulSyncer = cs
return err
// Callback handler used to periodically poll Consul in the event
// there are no Nomad Servers available and the Nomad Agent is in a
// bootstrap situation.
fn := func() {
now := time.Now()
c.configLock.RLock()
if now.Before(c.backupServerDeadline) {
c.configLock.RUnlock()
return
}
c.configLock.RUnlock()
nomadServerServiceName := c.config.ConsulConfig.ServerServiceName
services, _, err := c.consulSyncer.ConsulClient().Catalog().Service(nomadServerServiceName, "", &consulapi.QueryOptions{AllowStale: true})
if err != nil {
c.logger.Printf("[WARN] client: unable to query service %q: %v", nomadServerServiceName, err)
return
}
serverAddrs := make([]string, 0, len(services))
for _, s := range services {
port := strconv.FormatInt(int64(s.ServicePort), 10)
addr := s.ServiceAddress
if addr == "" {
addr = s.Address
}
serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port))
}
c.rpcProxy.SetBackupServers(serverAddrs)
}
const handlerName = "Nomad Client Fallback Server Handler"
c.consulSyncer.AddPeriodicHandler(handlerName, fn)
return nil
}
// syncConsul removes services of tasks which are no longer in running state
func (c *Client) syncConsul() {
sync := time.NewTicker(consulSyncInterval)
// runClientConsulSyncer runs the consul.Syncer task in the Nomad Agent's
// context. This is primarily responsible for removing tasks which are no
// longer in running state.
func (c *Client) runClientConsulSyncer() {
d := consulSyncDelay + lib.RandomStagger(consulSyncInterval-consulSyncDelay)
c.logger.Printf("[DEBUG] consul.sync: sleeping %v before first sync", d)
sync := time.NewTimer(d)
for {
select {
case <-sync.C:
// Give up pruning services if we can't fingerprint Consul
fn := func() {
defer atomic.StoreInt64(&c.consulLock, 0)
c.configLock.RLock()
_, ok := c.configCopy.Node.Attributes["consul.server"]
c.configLock.RUnlock()
if !ok {
continue
}
services := make(map[string]struct{})
// Get the existing allocs
c.allocLock.RLock()
allocs := make([]*AllocRunner, 0, len(c.allocs))
for _, ar := range c.allocs {
allocs = append(allocs, ar)
}
c.allocLock.RUnlock()
for _, ar := range allocs {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
if taskState.State == structs.TaskStateRunning {
if tr, ok := ar.tasks[taskName]; ok {
for _, service := range tr.task.Services {
svcIdentifier := consul.GenerateServiceIdentifier(ar.alloc.ID, tr.task.Name)
services[service.ID(svcIdentifier)] = struct{}{}
d = consulSyncInterval - lib.RandomStagger(consulSyncInterval/consulSyncJitter)
sync.Reset(d)
// Run syncer handlers regardless of this
// Agent's client or server status.
c.consulSyncer.RunHandlers()
// Give up pruning services if we can't
// fingerprint our Consul Agent.
c.configLock.RLock()
_, ok := c.configCopy.Node.Attributes["consul.version"]
c.configLock.RUnlock()
if !ok {
return
}
services := make(map[string]struct{})
// Get the existing allocs
c.allocLock.RLock()
allocs := make([]*AllocRunner, 0, len(c.allocs))
for _, ar := range c.allocs {
allocs = append(allocs, ar)
}
c.allocLock.RUnlock()
for _, ar := range allocs {
ar.taskStatusLock.RLock()
taskStates := copyTaskStates(ar.taskStates)
ar.taskStatusLock.RUnlock()
for taskName, taskState := range taskStates {
if taskState.State == structs.TaskStateRunning {
if tr, ok := ar.tasks[taskName]; ok {
for _, service := range tr.task.Services {
svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name)
services[service.ID(svcIdentifier)] = struct{}{}
}
}
}
}
}
if err := c.consulSyncer.KeepServices(services); err != nil {
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
}
}
if err := c.consulSyncer.KeepServices(services); err != nil {
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
if atomic.CompareAndSwapInt64(&c.consulLock, 0, 1) {
go fn()
}
case <-c.shutdownCh:
sync.Stop()

View File

@@ -12,11 +12,17 @@ import (
"time"
consul "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/nomad/types"
)
type notifyEvent struct{}
type notifyChannel chan notifyEvent
// Syncer allows syncing of services and checks with Consul
type Syncer struct {
client *consul.Client
@@ -37,12 +43,28 @@ type Syncer struct {
shutdown bool
shutdownLock sync.Mutex
// periodicCallbacks is walked sequentially when the timer in Run
// fires.
periodicCallbacks map[string]types.PeriodicCallback
notifySyncCh notifyChannel
periodicLock sync.RWMutex
}
const (
// initialSyncBuffer is the max time an initial sync will sleep
// before syncing.
initialSyncBuffer = 30 * time.Second
// initialSyncDelay is the delay before an initial sync.
initialSyncDelay = 5 * time.Second
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
// syncJitter provides a little variance in the frequency at which
// Syncer polls Consul.
syncJitter = 8
// ttlCheckBuffer is the time interval that Nomad can take to report Consul
// the check result
ttlCheckBuffer = 31 * time.Second
@@ -102,13 +124,13 @@ func NewSyncer(config *config.ConsulConfig, logger *log.Logger) (*Syncer, error)
return nil, err
}
consulSyncer := Syncer{
client: c,
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
client: c,
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
checkRunners: make(map[string]*CheckRunner),
shutdownCh: make(types.ShutdownChannel),
periodicCallbacks: make(map[string]types.PeriodicCallback),
}
return &consulSyncer, nil
}
@@ -133,7 +155,16 @@ func (c *Syncer) SetServiceIdentifier(serviceIdentifier string) *Syncer {
return c
}
// SyncServices sync the services with consul
// SyncNow expires the current timer forcing the list of periodic callbacks
// to be synced immediately.
func (c *Syncer) SyncNow() {
select {
case c.notifySyncCh <- notifyEvent{}:
default:
}
}
// SyncServices sync the services with the Consul Agent
func (c *Syncer) SyncServices(services []*structs.Service) error {
var mErr multierror.Error
taskServices := make(map[string]*consul.AgentService)
@@ -340,31 +371,54 @@ func (c *Syncer) deregisterCheck(ID string) error {
return c.client.Agent().CheckDeregister(ID)
}
sync := time.NewTicker(syncInterval)
// Run triggers periodic syncing of services and checks with Consul. This is
// a long lived go-routine which is stopped during shutdown.
func (c *Syncer) Run() {
d := initialSyncDelay + lib.RandomStagger(initialSyncBuffer-initialSyncDelay)
sync := time.NewTimer(d)
c.logger.Printf("[DEBUG] consul.sync: sleeping %v before first sync", d)
for {
select {
case <-sync.C:
d = syncInterval - lib.RandomStagger(syncInterval/syncJitter)
sync.Reset(d)
if err := c.performSync(); err != nil {
if c.runChecks {
c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err)
c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceIdentifier, err)
}
c.runChecks = false
} else {
c.runChecks = true
}
case <-c.notifySyncCh:
sync.Reset(syncInterval)
case <-c.shutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier)
c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceIdentifier)
return
}
}
}
// RunHandlers executes each handler (randomly)
func (c *Syncer) RunHandlers() {
c.periodicLock.RLock()
handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks))
for name, fn := range c.periodicCallbacks {
handlers[name] = fn
}
c.periodicLock.RUnlock()
for name, fn := range handlers {
fn()
}
}
// performSync sync the services and checks we are tracking with Consul.
func (c *Syncer) performSync() error {
c.RunHandlers()
var mErr multierror.Error
cServices, err := c.client.Agent().Services()
if err != nil {
@@ -448,7 +502,7 @@ func (c *Syncer) runCheck(check Check) {
}
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
if c.runChecks {
c.logger.Printf("[DEBUG] consul.sync: error updating ttl check for check %q: %v", check.ID(), err)
c.logger.Printf("[DEBUG] consul.sync: check %q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err)
c.runChecks = false
} else {
c.runChecks = true
@@ -461,3 +515,35 @@ func (c *Syncer) runCheck(check Check) {
func GenerateServiceIdentifier(allocID string, taskName string) string {
return fmt.Sprintf("%s-%s", taskName, allocID)
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
c.logger.Printf("[DEBUG] consul.sync: adding handler named %s", name)
if _, found := c.periodicCallbacks[name]; found {
c.logger.Printf("[ERROR] consul.sync: failed adding handler %q", name)
return false
}
c.periodicCallbacks[name] = fn
c.logger.Printf("[DEBUG] consul.sync: successfully added handler %q", name)
return true
}
func (c *Syncer) NumHandlers() int {
c.periodicLock.RLock()
defer c.periodicLock.RUnlock()
return len(c.periodicCallbacks)
}
// RemovePeriodicHandler removes a handler with a given name.
func (c *Syncer) RemovePeriodicHandler(name string) {
c.periodicLock.Lock()
defer c.periodicLock.Unlock()
delete(c.periodicCallbacks, name)
}
func (c *Syncer) ConsulClient() *consul.Client {
return c.client
}