diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index f8ddf2ac4..59b2dfe95 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -71,6 +71,7 @@ type Syncer struct { checkRunners map[string]*CheckRunner delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul + trackedChecks map[string]*consul.AgentCheckRegistration trackedServices map[string]*consul.AgentServiceRegistration // serviceRegPrefix is used to namespace the domain of registered @@ -327,6 +328,15 @@ func (c *Syncer) Shutdown() error { return mErr.ErrorOrNil() } +// queryChecks queries the Consul Agent for a list of Consul checks that +// have been registered with this Consul Syncer. +func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) { + checks, err := c.client.Agent().Checks() + if err != nil { + return nil, err + } + return c.filterConsulChecks(checks), nil +} // queryAgentServices queries the Consul Agent for a list of Consul services that // have been registered with this Consul Syncer. @@ -345,14 +355,127 @@ func (c *Syncer) syncChecks() error { if err != nil { return err } - cServices = c.filterConsulServices(cServices) - // Remove the services from consul which are not in any of the tasks - for _, service := range cServices { - if _, validService := services[service.ID]; !validService { - if err := c.deregisterService(service.ID); err != nil { - mErr.Errors = append(mErr.Errors, err) + // Synchronize checks with Consul + missingChecks, _, changedChecks, staleChecks := c.calcChecksDiff(consulChecks) + for _, check := range missingChecks { + if err := c.registerCheck(check); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + c.trackedChecks[check.ID] = check + } + for _, check := range changedChecks { + // NOTE(sean@): Do we need to deregister the check before + // re-registering it? Not deregistering to avoid missing the + // TTL but doesn't correct reconcile any possible drift with + // the check. + // + // if err := c.deregisterCheck(check.ID); err != nil { + // mErr.Errors = append(mErr.Errors, err) + // } + if err := c.registerCheck(check); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + for _, check := range staleChecks { + if err := c.deregisterCheck(check.ID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + delete(c.trackedChecks, check.ID) + } + return mErr.ErrorOrNil() +} + +// compareConsulCheck takes a consul.AgentCheckRegistration instance and +// compares it with a consul.AgentCheck. Returns true if they are equal +// according to consul.AgentCheck, otherwise false. +func compareConsulCheck(localCheck *consul.AgentCheckRegistration, consulCheck *consul.AgentCheck) bool { + if consulCheck.CheckID != localCheck.ID || + consulCheck.Name != localCheck.Name || + consulCheck.Notes != localCheck.Notes || + consulCheck.ServiceID != localCheck.ServiceID { + return false + } + return true +} + +// calcChecksDiff takes the argument (consulChecks) and calculates the delta +// between the consul.Syncer's list of known checks (c.trackedChecks). Three +// arrays are returned: +// +// 1) a slice of checks that exist only locally in the Syncer and are missing +// from the Consul Agent (consulChecks) and therefore need to be registered. +// +// 2) a slice of checks that exist in both the local consul.Syncer's +// tracked list and Consul Agent (consulChecks). +// +// 3) a slice of checks that exist in both the local consul.Syncer's +// tracked list and Consul Agent (consulServices) but have diverged state. +// +// 4) a slice of checks that exist only in the Consul Agent (consulChecks) +// and should be removed because the Consul Agent has drifted from the +// Syncer. +func (c *Syncer) calcChecksDiff(consulChecks map[string]*consul.AgentCheck) (missingChecks []*consul.AgentCheckRegistration, equalChecks []*consul.AgentCheckRegistration, changedChecks []*consul.AgentCheckRegistration, staleChecks []*consul.AgentCheckRegistration) { + type mergedCheck struct { + check *consul.AgentCheckRegistration + // 'l' == Nomad local only + // 'e' == equal + // 'c' == changed + // 'a' == Consul agent only + state byte + } + var ( + localChecksCount = 0 + equalChecksCount = 0 + changedChecksCount = 0 + agentChecks = 0 + ) + localChecks := make(map[string]*mergedCheck, len(c.trackedChecks)+len(consulChecks)) + for _, localCheck := range c.trackedChecks { + localChecksCount++ + localChecks[localCheck.ID] = &mergedCheck{localCheck, 'l'} + } + for _, consulCheck := range consulChecks { + if localCheck, found := localChecks[consulCheck.CheckID]; found { + localChecksCount-- + if compareConsulCheck(localCheck.check, consulCheck) { + equalChecksCount++ + localChecks[consulCheck.CheckID].state = 'e' + } else { + changedChecksCount++ + localChecks[consulCheck.CheckID].state = 'c' } + } else { + agentChecks++ + agentCheckReg := &consul.AgentCheckRegistration{ + ID: consulCheck.CheckID, + Name: consulCheck.Name, + Notes: consulCheck.Notes, + ServiceID: consulCheck.ServiceID, + } + localChecks[consulCheck.CheckID] = &mergedCheck{agentCheckReg, 'a'} + } + } + + missingChecks = make([]*consul.AgentCheckRegistration, 0, localChecksCount) + equalChecks = make([]*consul.AgentCheckRegistration, 0, equalChecksCount) + changedChecks = make([]*consul.AgentCheckRegistration, 0, changedChecksCount) + staleChecks = make([]*consul.AgentCheckRegistration, 0, agentChecks) + for _, check := range localChecks { + switch check.state { + case 'l': + missingChecks = append(missingChecks, check.check) + case 'e': + equalChecks = append(equalChecks, check.check) + case 'c': + changedChecks = append(changedChecks, check.check) + case 'a': + staleChecks = append(staleChecks, check.check) + } + } + + return missingChecks, equalChecks, changedChecks, staleChecks +} // compareConsulService takes a consul.AgentServiceRegistration instance and // compares it with a consul.AgentService. Returns true if they are equal @@ -509,7 +632,7 @@ func (c *Syncer) registerCheck(chkReg *consul.AgentCheckRegistration) error { // createDelegatedCheckReg creates a Check that can be registered with // Nomad. It also creates a Nomad check for the check types that it can // handle. -func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentService) (*consul.AgentCheckRegistration, error) { +func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *consul.AgentServiceRegistration) (*consul.AgentCheckRegistration, error) { chkReg := consul.AgentCheckRegistration{ ID: check.Hash(service.ID), Name: check.Name, @@ -565,16 +688,25 @@ func (c *Syncer) deregisterService(serviceID string) error { return c.client.Agent().ServiceDeregister(serviceID) } -// deregisterCheck de-registers a check with a given ID from Consul. -func (c *Syncer) deregisterCheck(ID string) error { - // Deleting the nomad check - if cr, ok := c.checkRunners[ID]; ok { - cr.Stop() - delete(c.checkRunners, ID) +// deregisterCheck de-registers a check from Consul +func (c *Syncer) deregisterCheck(checkID string) error { + c.registryLock.Lock() + defer c.registryLock.Unlock() + + // Deleting from Consul Agent + if err := c.client.Agent().CheckDeregister(checkID); err != nil { + // CheckDeregister() will be reattempted again in a future + // sync. + return err } - // Deleting from consul - return c.client.Agent().CheckDeregister(ID) + // Remove the check from the local registry + if cr, ok := c.checkRunners[checkID]; ok { + cr.Stop() + delete(c.checkRunners, checkID) + } + + return nil } // Run triggers periodic syncing of services and checks with Consul. This is @@ -663,15 +795,16 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer } // filterConsulChecks prunes out all the consul checks which do not have -// services with id prefixed with noamd- -func (c *Syncer) filterConsulChecks(chks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { - nomadChecks := make(map[string]*consul.AgentCheck) - for _, chk := range chks { - if strings.HasPrefix(chk.ServiceID, structs.NomadConsulPrefix) { - nomadChecks[chk.CheckID] = chk +// services with Syncer's idPrefix. +func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { + localChecks := make(map[string]*consul.AgentCheck, len(consulChecks)) + filterPrefix := c.filterPrefix() + for checkID, check := range consulChecks { + if strings.HasPrefix(check.ServiceID, filterPrefix) { + localChecks[checkID] = check } } - return nomadChecks + return localChecks } // consulPresent indicates whether the consul agent is responding