diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index ed556a31f..efd7463ee 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -54,6 +54,14 @@ type Syncer struct { client *consul.Client runChecks bool + // servicesGroups is a named group of services that will be flattened + // and reconciled with Consul when SyncServices() is called. The key + // to the servicesGroups map is unique per handler and is used to + // allow the Agent's services to be maintained independently of the + // Client or Server's services. + servicesGroups map[string][]*consul.AgentServiceRegistration + servicesGroupsLock sync.RWMutex + // The "Consul Registry" is a collection of Consul Services and // Checks all guarded by the registryLock. registryLock sync.RWMutex @@ -152,6 +160,7 @@ func NewSyncer(config *config.ConsulConfig, shutdownCh chan struct{}, logger *lo logger: logger, shutdownCh: shutdownCh, trackedServices: make(map[string]*consul.AgentService), + servicesGroups: make(map[string][]*consul.AgentServiceRegistration), trackedChecks: make(map[string]*consul.AgentCheckRegistration), checkRunners: make(map[string]*CheckRunner), periodicCallbacks: make(map[string]types.PeriodicCallback), @@ -191,39 +200,36 @@ func (c *Syncer) SyncNow() { } } -// SyncServices sync the services with the Consul Agent -func (c *Syncer) SyncServices() error { - services := c.flattenedServices() - +// SetServices assigns the slice of Nomad Services to the provided services +// group name. +func (c *Syncer) SetServices(groupName string, services []*structs.ConsulService) error { var mErr multierror.Error - taskServices := make(map[string]*consul.AgentServiceRegistration) - taskChecks := make(map[string]*consul.AgentCheckRegistration) - - // Register Services and Checks that we don't know about or has changed + registeredServices := make([]*consul.AgentServiceRegistration, 0, len(services)) for _, service := range services { - srv, err := c.createService(service) - if err != nil { + if service.ServiceID == "" { + service.ServiceID = c.GenerateServiceID(groupName, service) + } + var serviceReg *consul.AgentServiceRegistration + var err error + if serviceReg, err = c.createService(service); err != nil { mErr.Errors = append(mErr.Errors, err) continue } - trackedService, ok := c.trackedServices[srv.ID] - if (ok && !reflect.DeepEqual(trackedService, srv)) || !ok { - if err := c.client.Agent().ServiceRegister(srv); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - } - c.trackedServices[srv.ID] = srv - taskServices[srv.ID] = srv + registeredServices = append(registeredServices, serviceReg) + // Register the check(s) for this service for _, chk := range service.Checks { - // Create a consul check registration - chkReg, err := c.createDelegatedCheckReg(chk, srv) + // Create a Consul check registration + chkReg, err := c.createDelegatedCheckReg(chk, serviceReg) if err != nil { mErr.Errors = append(mErr.Errors, err) continue } // creating a nomad check if we have to handle this particular check type if _, ok := c.delegateChecks[chk.Type]; ok { + if _, ok := c.checkRunners[chkReg.ID]; ok { + continue + } nc, err := c.createDelegatedCheck(chk, chkReg.ID) if err != nil { mErr.Errors = append(mErr.Errors, err) @@ -232,37 +238,42 @@ func (c *Syncer) SyncServices() error { cr := NewCheckRunner(nc, c.runCheck, c.logger) c.checkRunners[nc.ID()] = cr } - - if _, ok := c.trackedChecks[chkReg.ID]; !ok { - if err := c.registerCheck(chkReg); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - } - c.trackedChecks[chkReg.ID] = chkReg - taskChecks[chkReg.ID] = chkReg } } - // Remove services that are not present anymore - for _, service := range c.trackedServices { - if _, ok := taskServices[service.ID]; !ok { - if err := c.deregisterService(service.ID); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - delete(c.trackedServices, service.ID) - } + if len(mErr.Errors) > 0 { + return mErr.ErrorOrNil() } - // Remove the checks that are not present anymore - for checkID, _ := range c.trackedChecks { - if _, ok := taskChecks[checkID]; !ok { - if err := c.deregisterCheck(checkID); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - delete(c.trackedChecks, checkID) + c.servicesGroupsLock.Lock() + c.servicesGroups[groupName] = registeredServices + c.servicesGroupsLock.Unlock() + + return nil +} + +// SyncNow expires the current timer forcing the list of periodic callbacks +// to be synced immediately. +func (c *Syncer) SyncNow() { + select { + case c.notifySyncCh <- struct{}{}: + default: + } +} + +// flattenedServices returns a flattened list of services +func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration { + const initialNumServices = 8 + services := make([]*consul.AgentServiceRegistration, 0, initialNumServices) + c.servicesGroupsLock.RLock() + for _, servicesGroup := range c.servicesGroups { + for _, service := range servicesGroup { + services = append(services, service) } } - return mErr.ErrorOrNil() + c.servicesGroupsLock.RUnlock() + + return services } func (c *Syncer) signalShutdown() {