From 19f765eb056ec11535967a8fcbbe8f8e2ef5f6a9 Mon Sep 17 00:00:00 2001 From: Sean Chittenden Date: Thu, 9 Jun 2016 22:55:01 -0400 Subject: [PATCH] Sync services with Consul by comparing the AgentServiceReg w/ ConsulService The source of truth is the local Nomad Agent. Any services not local that have a matching prefix are removed. Changed services are re-registered and missing services are re-added. --- command/agent/consul/syncer.go | 169 +++++++++++++++++++++++++++++++-- nomad/structs/config/consul.go | 2 +- 2 files changed, 162 insertions(+), 9 deletions(-) diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 6bb458fbb..f8ddf2ac4 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -6,7 +6,6 @@ import ( "log" "net/http" "net/url" - "reflect" "strings" "sync" "time" @@ -72,6 +71,7 @@ type Syncer struct { checkRunners map[string]*CheckRunner delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul + trackedServices map[string]*consul.AgentServiceRegistration // serviceRegPrefix is used to namespace the domain of registered // Consul Services and Checks belonging to a single Syncer. A given @@ -164,8 +164,8 @@ func NewSyncer(config *config.ConsulConfig, shutdownCh chan struct{}, logger *lo logger: logger, consulAvailable: true, shutdownCh: shutdownCh, - trackedServices: make(map[string]*consul.AgentService), servicesGroups: make(map[string][]*consul.AgentServiceRegistration), + trackedServices: make(map[string]*consul.AgentServiceRegistration), trackedChecks: make(map[string]*consul.AgentCheckRegistration), checkRunners: make(map[string]*CheckRunner), periodicCallbacks: make(map[string]types.PeriodicCallback), @@ -328,8 +328,20 @@ func (c *Syncer) Shutdown() error { } - // Get the services from Consul - cServices, err := c.client.Agent().Services() +// queryAgentServices queries the Consul Agent for a list of Consul services that +// have been registered with this Consul Syncer. +func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) { + services, err := c.client.Agent().Services() + if err != nil { + return nil, err + } + return c.filterConsulServices(services), nil +} + +// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent. +func (c *Syncer) syncChecks() error { + var mErr multierror.Error + consulChecks, err := c.queryChecks() if err != nil { return err } @@ -341,7 +353,147 @@ func (c *Syncer) Shutdown() error { if err := c.deregisterService(service.ID); err != nil { mErr.Errors = append(mErr.Errors, err) } + +// compareConsulService takes a consul.AgentServiceRegistration instance and +// compares it with a consul.AgentService. Returns true if they are equal +// according to consul.AgentService, otherwise false. +func compareConsulService(localService *consul.AgentServiceRegistration, consulService *consul.AgentService) bool { + if consulService.ID != localService.ID || + consulService.Service != localService.Name || + consulService.Port != localService.Port || + consulService.Address != localService.Address || + consulService.Address != localService.Address || + consulService.EnableTagOverride != localService.EnableTagOverride { + return false + } + + serviceTags := make(map[string]byte, len(localService.Tags)) + for _, tag := range localService.Tags { + serviceTags[tag] = 'l' + } + for _, tag := range consulService.Tags { + if _, found := serviceTags[tag]; !found { + return false } + serviceTags[tag] = 'b' + } + for _, state := range serviceTags { + if state == 'l' { + return false + } + } + + return true +} + +// calcServicesDiff takes the argument (consulServices) and calculates the +// delta between the consul.Syncer's list of known services +// (c.trackedServices). Three arrays are returned: +// +// 1) a slice of services that exist only locally in the Syncer and are +// missing from the Consul Agent (consulServices) and therefore need to be +// registered. +// +// 2) a slice of services that exist in both the local consul.Syncer's +// tracked list and Consul Agent (consulServices) *AND* are identical. +// +// 3) a slice of services that exist in both the local consul.Syncer's +// tracked list and Consul Agent (consulServices) but have diverged state. +// +// 4) a slice of services that exist only in the Consul Agent +// (consulServices) and should be removed because the Consul Agent has +// drifted from the Syncer. +func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService) (missingServices []*consul.AgentServiceRegistration, equalServices []*consul.AgentServiceRegistration, changedServices []*consul.AgentServiceRegistration, staleServices []*consul.AgentServiceRegistration) { + type mergedService struct { + service *consul.AgentServiceRegistration + // 'l' == Nomad local only + // 'e' == equal + // 'c' == changed + // 'a' == Consul agent only + state byte + } + var ( + localServicesCount = 0 + equalServicesCount = 0 + changedServicesCount = 0 + agentServices = 0 + ) + localServices := make(map[string]*mergedService, len(c.trackedServices)+len(consulServices)) + for _, localService := range c.flattenedServices() { + localServicesCount++ + localServices[localService.ID] = &mergedService{localService, 'l'} + } + for _, consulService := range consulServices { + if localService, found := localServices[consulService.ID]; found { + localServicesCount-- + if compareConsulService(localService.service, consulService) { + equalServicesCount++ + localServices[consulService.ID].state = 'e' + } else { + changedServicesCount++ + localServices[consulService.ID].state = 'c' + } + } else { + agentServices++ + agentServiceReg := &consul.AgentServiceRegistration{ + ID: consulService.ID, + Name: consulService.Service, + Tags: consulService.Tags, + Port: consulService.Port, + Address: consulService.Address, + } + localServices[consulService.ID] = &mergedService{agentServiceReg, 'a'} + } + } + + missingServices = make([]*consul.AgentServiceRegistration, 0, localServicesCount) + equalServices = make([]*consul.AgentServiceRegistration, 0, equalServicesCount) + changedServices = make([]*consul.AgentServiceRegistration, 0, changedServicesCount) + staleServices = make([]*consul.AgentServiceRegistration, 0, agentServices) + for _, service := range localServices { + switch service.state { + case 'l': + missingServices = append(missingServices, service.service) + case 'e': + equalServices = append(equalServices, service.service) + case 'c': + changedServices = append(changedServices, service.service) + case 'a': + staleServices = append(staleServices, service.service) + } + } + + return missingServices, equalServices, changedServices, staleServices +} + +// syncServices synchronizes this Syncer's Consul Services with the Consul +// Agent. +func (c *Syncer) syncServices() error { + consulServices, err := c.queryAgentServices() + if err != nil { + return err + } + + // Synchronize services with Consul + var mErr multierror.Error + missingServices, _, changedServices, removedServices := c.calcServicesDiff(consulServices) + for _, service := range missingServices { + if err := c.client.Agent().ServiceRegister(service); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + c.trackedServices[service.ID] = service + } + for _, service := range changedServices { + // Re-register the local service + if err := c.client.Agent().ServiceRegister(service); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + } + for _, service := range removedServices { + if err := c.deregisterService(service.ID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + delete(c.trackedServices, service.ID) } return mErr.ErrorOrNil() } @@ -386,7 +538,7 @@ func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *c return &chkReg, nil } -// createService creates a Consul AgentService from a Nomad Service +// createService creates a Consul AgentService from a Nomad ConsulService. func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentServiceRegistration, error) { c.registryLock.RLock() defer c.registryLock.RUnlock() @@ -409,8 +561,8 @@ func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentSer } // deregisterService de-registers a service with the given ID from consul -func (c *Syncer) deregisterService(ID string) error { - return c.client.Agent().ServiceDeregister(ID) +func (c *Syncer) deregisterService(serviceID string) error { + return c.client.Agent().ServiceDeregister(serviceID) } // deregisterCheck de-registers a check with a given ID from Consul. @@ -501,8 +653,9 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer localServices := make(map[string]*consul.AgentService, len(consulServices)) c.registryLock.RLock() defer c.registryLock.RUnlock() + filterPrefix := c.filterPrefix() for serviceID, service := range consulServices { - if strings.HasPrefix(service.ID, c.serviceRegPrefix) { + if strings.HasPrefix(service.ID, filterPrefix) { localServices[serviceID] = service } } diff --git a/nomad/structs/config/consul.go b/nomad/structs/config/consul.go index 57d5a6a50..cccb91903 100644 --- a/nomad/structs/config/consul.go +++ b/nomad/structs/config/consul.go @@ -9,7 +9,7 @@ import ( // ConsulConfig contains the configuration information necessary to // communicate with a Consul Agent in order to: // -// - Register services and checks with Consul +// - Register services and their checks with Consul // // - Bootstrap this Nomad Client with the list of Nomad Servers registered // with Consul