diff --git a/client/consul/sync.go b/client/consul/sync.go index 10b8c93dd..a14bf90be 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -54,14 +54,26 @@ type Syncer struct { client *consul.Client runChecks bool - serviceIdentifier string // serviceIdentifier is a token which identifies which task/alloc the service belongs to - addrFinder func(portLabel string) (string, int) + // The "Consul Registry" is a collection of Consul Services and + // Checks all guarded by the registryLock. + registryLock sync.RWMutex - trackedServices map[string]*consul.AgentService - trackedChecks map[string]*consul.AgentCheckRegistration checkRunners map[string]*CheckRunner delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul + + // serviceRegPrefix is used to namespace the domain of registered + // Consul Services and Checks belonging to a single Syncer. A given + // Nomad Agent may spawn multiple Syncer tasks between the Agent + // Agent and its Executors, all syncing to a single Consul Agent. + // The serviceRegPrefix allows multiple Syncers to coexist without + // each Syncer clobbering each others Services. The Syncer namespace + // protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID). + // serviceRegPrefix is guarded by the registryLock. + serviceRegPrefix string + + addrFinder func(portLabel string) (string, int) createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error) + // End registryLock guarded attributes. logger *log.Logger @@ -161,9 +173,12 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer { return c } -// SetServiceIdentifier sets the identifier of the services we are syncing with Consul -func (c *Syncer) SetServiceIdentifier(serviceIdentifier string) *Syncer { - c.serviceIdentifier = serviceIdentifier +// SetServiceRegPrefix sets the registration prefix used by the Syncer when +// registering Services with Consul. +func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer { + c.registryLock.Lock() + defer c.registryLock.Unlock() + c.serviceRegPrefix = servicePrefix return c } @@ -347,11 +362,14 @@ func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *c } // createService creates a Consul AgentService from a Nomad Service - srv := consul.AgentService{ - ID: service.ID(c.serviceIdentifier), - Service: service.Name, - Tags: service.Tags, func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentServiceRegistration, error) { + c.registryLock.RLock() + defer c.registryLock.RUnlock() + + srv := consul.AgentServiceRegistration{ + ID: service.ID(c.serviceRegPrefix), + Name: service.Name, + Tags: service.Tags, } host, port := c.addrFinder(service.PortLabel) if host != "" { @@ -409,7 +427,7 @@ func (c *Syncer) Run() { if err := c.performSync(); err != nil { if c.runChecks { - c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceIdentifier, err) + c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceRegPrefix, err) } c.runChecks = false } else { @@ -421,7 +439,7 @@ func (c *Syncer) Run() { c.Shutdown() case <-c.notifyShutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceIdentifier) + c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceRegPrefix) return } } @@ -463,15 +481,16 @@ func (c *Syncer) performSync() error { // filterConsulServices prunes out all the service whose ids are not prefixed // with nomad- -func (c *Syncer) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService { - nomadServices := make(map[string]*consul.AgentService) - for _, srv := range srvcs { - if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) && - !strings.HasPrefix(srv.ID, structs.AgentServicePrefix) { - nomadServices[srv.ID] = srv +func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService { + localServices := make(map[string]*consul.AgentService, len(consulServices)) + c.registryLock.RLock() + defer c.registryLock.RUnlock() + for serviceID, service := range consulServices { + if strings.HasPrefix(service.ID, c.serviceRegPrefix) { + localServices[serviceID] = service } } - return nomadServices + return localServices } // filterConsulChecks prunes out all the consul checks which do not have @@ -522,9 +541,9 @@ func (c *Syncer) runCheck(check Check) { } } -// GenerateServiceIdentifier returns a service identifier based on an allocation -// id and task name -func GenerateServiceIdentifier(allocID string, taskName string) string { +// GenerateServicePrefix returns a service prefix based on an allocation id +// and task name. +func GenerateServicePrefix(allocID string, taskName string) string { return fmt.Sprintf("%s-%s", taskName, allocID) } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index 976f6ea52..e06d5713a 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -52,15 +52,15 @@ func TestConsulServiceRegisterServices(t *testing.T) { return } task := mockTask() - cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) + cs.SetServiceRegPrefix(GenerateServicePrefix(allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() - service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) - service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) + service1ID := service1.ID(GenerateServicePrefix(allocID, task.Name)) + service2ID := service2.ID(GenerateServicePrefix(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } @@ -81,7 +81,7 @@ func TestConsulServiceUpdateService(t *testing.T) { } task := mockTask() - cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name)) + cs.SetServiceRegPrefix(GenerateServicePrefix(allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(); err != nil { t.Fatalf("err: %v", err) @@ -95,8 +95,8 @@ func TestConsulServiceUpdateService(t *testing.T) { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present - service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name)) - service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name)) + service1ID := service1.ID(GenerateServicePrefix(allocID, task.Name)) + service2ID := service2.ID(GenerateServicePrefix(allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 2f95df84f..b6b0fcdca 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -480,7 +480,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) - cs.SetServiceIdentifier(consul.GenerateServiceIdentifier(e.ctx.AllocID, e.ctx.Task.Name)) + cs.SetServiceRegPrefix(consul.GenerateServicePrefix(e.ctx.AllocID, e.ctx.Task.Name)) cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulSyncer = cs }