From 17927c8b9cdecfa3332864a008a74acbc4d00f49 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Mon, 23 May 2016 23:23:57 -0700 Subject: [PATCH] Only poll Consul for servers when Nomad heartbeats begin to fail When a deadline timer of 2x Server's last requested TTL expires, begin polling Consul for Nomad Servers. --- client/client.go | 154 +++++++++++++++++++++++++++++++----------- client/consul/sync.go | 108 ++++++++++++++++++++++++++--- 2 files changed, 212 insertions(+), 50 deletions(-) diff --git a/client/client.go b/client/client.go index f3bd5d2be..9e268b8ea 100644 --- a/client/client.go +++ b/client/client.go @@ -4,13 +4,16 @@ import ( "fmt" "io/ioutil" "log" + "net" "os" "path/filepath" "strconv" "sync" + "sync/atomic" "time" "github.com/armon/go-metrics" + consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocdir" @@ -76,6 +79,13 @@ const ( // consulSyncInterval is the interval at which the client syncs with consul // to remove services and checks which are no longer valid consulSyncInterval = 15 * time.Second + + // consulSyncDelay specifies the initial sync delay when starting the + // Nomad Agent's consul.Syncer. + consulSyncDelay = 5 * time.Second + + // Add a little jitter to the agent's consul.Syncer task + consulSyncJitter = 8 ) // DefaultConfig returns the default configuration @@ -113,6 +123,12 @@ type Client struct { configCopy *config.Config configLock sync.RWMutex + // backupServerDeadline is the deadline at which this Nomad Agent + // will begin polling Consul for a list of Nomad Servers. When Nomad + // Clients are heartbeating successfully with Nomad Servers, Nomad + // Clients do not poll Consul for a backup server list. + backupServerDeadline time.Time + logger *log.Logger rpcProxy *rpc_proxy.RpcProxy @@ -132,6 +148,7 @@ type Client struct { // consulSyncer advertises this Nomad Agent with Consul consulSyncer *consul.Syncer + consulLock int64 // HostStatsCollector collects host resource usage stats hostStatsCollector *stats.HostStatsCollector @@ -205,9 +222,9 @@ func NewClient(cfg *config.Config) (*Client, error) { return nil, fmt.Errorf("failed to restore state: %v", err) } - // Setup the consul client - if err := c.setupConsulClient(); err != nil { - return nil, fmt.Errorf("failed to create consul client: %v") + // Setup the Consul syncer + if err := c.setupConsulSyncer(); err != nil { + return nil, fmt.Errorf("failed to create Consul syncer: %v") } // Register and then start heartbeating to the servers. @@ -228,8 +245,8 @@ func NewClient(cfg *config.Config) (*Client, error) { // Start maintenance task for servers go c.rpcProxy.Run() - // Start the consul sync - go c.syncConsul() + // Start the Consul sync + go c.runClientConsulSyncer() return c, nil } @@ -902,6 +919,7 @@ func (c *Client) updateNodeStatus() error { if err := c.rpcProxy.UpdateFromNodeUpdateResponse(&resp); err != nil { return err } + c.backupServerDeadline = time.Now().Add(2 * resp.HeartbeatTTL) return nil } @@ -1212,53 +1230,111 @@ func (c *Client) addAlloc(alloc *structs.Allocation) error { return nil } -// setupConsulClient creates a consul.Syncer -func (c *Client) setupConsulClient() error { - cs, err := consul.NewSyncer(c.config.ConsulAgentConfig, c.logger) +// setupConsulSyncer creates a consul.Syncer +func (c *Client) setupConsulSyncer() error { + cs, err := consul.NewSyncer(c.config.ConsulConfig, c.logger) + if err != nil { + return err + } + c.consulSyncer = cs - return err + + // Callback handler used to periodically poll Consul in the event + // there are no Nomad Servers available and the Nomad Agent is in a + // bootstrap situation. + fn := func() { + now := time.Now() + c.configLock.RLock() + if now.Before(c.backupServerDeadline) { + c.configLock.RUnlock() + return + } + c.configLock.RUnlock() + + nomadServerServiceName := c.config.ConsulConfig.ServerServiceName + services, _, err := c.consulSyncer.ConsulClient().Catalog().Service(nomadServerServiceName, "", &consulapi.QueryOptions{AllowStale: true}) + if err != nil { + c.logger.Printf("[WARN] client: unable to query service %q: %v", nomadServerServiceName, err) + return + } + serverAddrs := make([]string, 0, len(services)) + for _, s := range services { + port := strconv.FormatInt(int64(s.ServicePort), 10) + addr := s.ServiceAddress + if addr == "" { + addr = s.Address + } + serverAddrs = append(serverAddrs, net.JoinHostPort(addr, port)) + } + c.rpcProxy.SetBackupServers(serverAddrs) + } + + const handlerName = "Nomad Client Fallback Server Handler" + c.consulSyncer.AddPeriodicHandler(handlerName, fn) + return nil } -// syncConsul removes services of tasks which are no longer in running state -func (c *Client) syncConsul() { - sync := time.NewTicker(consulSyncInterval) +// runClientConsulSyncer runs the consul.Syncer task in the Nomad Agent's +// context. This is primarily responsible for removing tasks which are no +// longer in running state. +func (c *Client) runClientConsulSyncer() { + d := consulSyncDelay + lib.RandomStagger(consulSyncInterval-consulSyncDelay) + c.logger.Printf("[DEBUG] consul.sync: sleeping %v before first sync", d) + sync := time.NewTimer(d) for { select { case <-sync.C: - // Give up pruning services if we can't fingerprint Consul + fn := func() { + defer atomic.StoreInt64(&c.consulLock, 0) - c.configLock.RLock() - _, ok := c.configCopy.Node.Attributes["consul.server"] - c.configLock.RUnlock() - if !ok { - continue - } - services := make(map[string]struct{}) - // Get the existing allocs - c.allocLock.RLock() - allocs := make([]*AllocRunner, 0, len(c.allocs)) - for _, ar := range c.allocs { - allocs = append(allocs, ar) - } - c.allocLock.RUnlock() - for _, ar := range allocs { - ar.taskStatusLock.RLock() - taskStates := copyTaskStates(ar.taskStates) - ar.taskStatusLock.RUnlock() - for taskName, taskState := range taskStates { - if taskState.State == structs.TaskStateRunning { - if tr, ok := ar.tasks[taskName]; ok { - for _, service := range tr.task.Services { - svcIdentifier := consul.GenerateServiceIdentifier(ar.alloc.ID, tr.task.Name) - services[service.ID(svcIdentifier)] = struct{}{} + d = consulSyncInterval - lib.RandomStagger(consulSyncInterval/consulSyncJitter) + sync.Reset(d) + + // Run syncer handlers regardless of this + // Agent's client or server status. + c.consulSyncer.RunHandlers() + + // Give up pruning services if we can't + // fingerprint our Consul Agent. + c.configLock.RLock() + _, ok := c.configCopy.Node.Attributes["consul.version"] + c.configLock.RUnlock() + if !ok { + return + } + + services := make(map[string]struct{}) + // Get the existing allocs + c.allocLock.RLock() + allocs := make([]*AllocRunner, 0, len(c.allocs)) + for _, ar := range c.allocs { + allocs = append(allocs, ar) + } + c.allocLock.RUnlock() + + for _, ar := range allocs { + ar.taskStatusLock.RLock() + taskStates := copyTaskStates(ar.taskStates) + ar.taskStatusLock.RUnlock() + for taskName, taskState := range taskStates { + if taskState.State == structs.TaskStateRunning { + if tr, ok := ar.tasks[taskName]; ok { + for _, service := range tr.task.Services { + svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name) + services[service.ID(svcIdentifier)] = struct{}{} + } } } } } + + if err := c.consulSyncer.KeepServices(services); err != nil { + c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) + } } - if err := c.consulSyncer.KeepServices(services); err != nil { - c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) + if atomic.CompareAndSwapInt64(&c.consulLock, 0, 1) { + go fn() } case <-c.shutdownCh: sync.Stop() diff --git a/client/consul/sync.go b/client/consul/sync.go index 556de1b85..f97b34449 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -12,11 +12,17 @@ import ( "time" consul "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/nomad/structs/config" + "github.com/hashicorp/nomad/nomad/types" ) +type notifyEvent struct{} +type notifyChannel chan notifyEvent + // Syncer allows syncing of services and checks with Consul type Syncer struct { client *consul.Client @@ -37,12 +43,28 @@ type Syncer struct { shutdown bool shutdownLock sync.Mutex + // periodicCallbacks is walked sequentially when the timer in Run + // fires. + periodicCallbacks map[string]types.PeriodicCallback + notifySyncCh notifyChannel + periodicLock sync.RWMutex } const ( + // initialSyncBuffer is the max time an initial sync will sleep + // before syncing. + initialSyncBuffer = 30 * time.Second + + // initialSyncDelay is the delay before an initial sync. + initialSyncDelay = 5 * time.Second + // The periodic time interval for syncing services and checks with Consul syncInterval = 5 * time.Second + // syncJitter provides a little variance in the frequency at which + // Syncer polls Consul. + syncJitter = 8 + // ttlCheckBuffer is the time interval that Nomad can take to report Consul // the check result ttlCheckBuffer = 31 * time.Second @@ -102,13 +124,13 @@ func NewSyncer(config *config.ConsulConfig, logger *log.Logger) (*Syncer, error) return nil, err } consulSyncer := Syncer{ - client: c, - logger: logger, - trackedServices: make(map[string]*consul.AgentService), - trackedChecks: make(map[string]*consul.AgentCheckRegistration), - checkRunners: make(map[string]*CheckRunner), - + client: c, + logger: logger, + trackedServices: make(map[string]*consul.AgentService), + trackedChecks: make(map[string]*consul.AgentCheckRegistration), + checkRunners: make(map[string]*CheckRunner), shutdownCh: make(types.ShutdownChannel), + periodicCallbacks: make(map[string]types.PeriodicCallback), } return &consulSyncer, nil } @@ -133,7 +155,16 @@ func (c *Syncer) SetServiceIdentifier(serviceIdentifier string) *Syncer { return c } -// SyncServices sync the services with consul +// SyncNow expires the current timer forcing the list of periodic callbacks +// to be synced immediately. +func (c *Syncer) SyncNow() { + select { + case c.notifySyncCh <- notifyEvent{}: + default: + } +} + +// SyncServices sync the services with the Consul Agent func (c *Syncer) SyncServices(services []*structs.Service) error { var mErr multierror.Error taskServices := make(map[string]*consul.AgentService) @@ -340,31 +371,54 @@ func (c *Syncer) deregisterCheck(ID string) error { return c.client.Agent().CheckDeregister(ID) } - sync := time.NewTicker(syncInterval) // Run triggers periodic syncing of services and checks with Consul. This is // a long lived go-routine which is stopped during shutdown. func (c *Syncer) Run() { + d := initialSyncDelay + lib.RandomStagger(initialSyncBuffer-initialSyncDelay) + sync := time.NewTimer(d) + c.logger.Printf("[DEBUG] consul.sync: sleeping %v before first sync", d) + for { select { case <-sync.C: + d = syncInterval - lib.RandomStagger(syncInterval/syncJitter) + sync.Reset(d) + if err := c.performSync(); err != nil { if c.runChecks { - c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err) + c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceIdentifier, err) } c.runChecks = false } else { c.runChecks = true } + case <-c.notifySyncCh: + sync.Reset(syncInterval) case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier) + c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceIdentifier) return } } } +// RunHandlers executes each handler (randomly) +func (c *Syncer) RunHandlers() { + c.periodicLock.RLock() + handlers := make(map[string]types.PeriodicCallback, len(c.periodicCallbacks)) + for name, fn := range c.periodicCallbacks { + handlers[name] = fn + } + c.periodicLock.RUnlock() + for name, fn := range handlers { + fn() + } +} + // performSync sync the services and checks we are tracking with Consul. func (c *Syncer) performSync() error { + c.RunHandlers() + var mErr multierror.Error cServices, err := c.client.Agent().Services() if err != nil { @@ -448,7 +502,7 @@ func (c *Syncer) runCheck(check Check) { } if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil { if c.runChecks { - c.logger.Printf("[DEBUG] consul.sync: error updating ttl check for check %q: %v", check.ID(), err) + c.logger.Printf("[DEBUG] consul.sync: check %q failed, disabling Consul checks until until next successful sync: %v", check.ID(), err) c.runChecks = false } else { c.runChecks = true @@ -461,3 +515,35 @@ func (c *Syncer) runCheck(check Check) { func GenerateServiceIdentifier(allocID string, taskName string) string { return fmt.Sprintf("%s-%s", taskName, allocID) } + +// AddPeriodicHandler adds a uniquely named callback. Returns true if +// successful, false if a handler with the same name already exists. +func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool { + c.periodicLock.Lock() + defer c.periodicLock.Unlock() + c.logger.Printf("[DEBUG] consul.sync: adding handler named %s", name) + if _, found := c.periodicCallbacks[name]; found { + c.logger.Printf("[ERROR] consul.sync: failed adding handler %q", name) + return false + } + c.periodicCallbacks[name] = fn + c.logger.Printf("[DEBUG] consul.sync: successfully added handler %q", name) + return true +} + +func (c *Syncer) NumHandlers() int { + c.periodicLock.RLock() + defer c.periodicLock.RUnlock() + return len(c.periodicCallbacks) +} + +// RemovePeriodicHandler removes a handler with a given name. +func (c *Syncer) RemovePeriodicHandler(name string) { + c.periodicLock.Lock() + defer c.periodicLock.Unlock() + delete(c.periodicCallbacks, name) +} + +func (c *Syncer) ConsulClient() *consul.Client { + return c.client +}