Rename Syncer.SetServiceIdentifier to SetServiceRegPrefix()

This attribute isn't actually an identifier because it can represent
a collection of services.  Rename `serviceIdentifier` to
`serviceRegPrefix which more accurately conveys the intention of this
Syncer attribute.

While here, also rename `SetServiceIdentifier()` to `SetServiceRegPrefix()`
and `GenerateServiceIdentifier()` to `GenerateServicePrefix()`.
This commit is contained in:
Sean Chittenden
2016-06-07 11:37:39 -05:00
parent 74e691cab1
commit e858928d68
3 changed files with 49 additions and 30 deletions

View File

@@ -54,14 +54,26 @@ type Syncer struct {
client *consul.Client
runChecks bool
serviceIdentifier string // serviceIdentifier is a token which identifies which task/alloc the service belongs to
addrFinder func(portLabel string) (string, int)
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*consul.AgentCheckRegistration
checkRunners map[string]*CheckRunner
delegateChecks map[string]struct{} // delegateChecks are the checks that the Nomad client runs and reports to Consul
// serviceRegPrefix is used to namespace the domain of registered
// Consul Services and Checks belonging to a single Syncer. A given
// Nomad Agent may spawn multiple Syncer tasks between the Agent
// Agent and its Executors, all syncing to a single Consul Agent.
// The serviceRegPrefix allows multiple Syncers to coexist without
// each Syncer clobbering each others Services. The Syncer namespace
// protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID).
// serviceRegPrefix is guarded by the registryLock.
serviceRegPrefix string
addrFinder func(portLabel string) (string, int)
createDelegatedCheck func(*structs.ServiceCheck, string) (Check, error)
// End registryLock guarded attributes.
logger *log.Logger
@@ -161,9 +173,12 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
return c
}
// SetServiceIdentifier sets the identifier of the services we are syncing with Consul
func (c *Syncer) SetServiceIdentifier(serviceIdentifier string) *Syncer {
c.serviceIdentifier = serviceIdentifier
// SetServiceRegPrefix sets the registration prefix used by the Syncer when
// registering Services with Consul.
func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer {
c.registryLock.Lock()
defer c.registryLock.Unlock()
c.serviceRegPrefix = servicePrefix
return c
}
@@ -347,11 +362,14 @@ func (c *Syncer) createDelegatedCheckReg(check *structs.ServiceCheck, service *c
}
// createService creates a Consul AgentService from a Nomad Service
srv := consul.AgentService{
ID: service.ID(c.serviceIdentifier),
Service: service.Name,
Tags: service.Tags,
func (c *Syncer) createService(service *structs.ConsulService) (*consul.AgentServiceRegistration, error) {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
srv := consul.AgentServiceRegistration{
ID: service.ID(c.serviceRegPrefix),
Name: service.Name,
Tags: service.Tags,
}
host, port := c.addrFinder(service.PortLabel)
if host != "" {
@@ -409,7 +427,7 @@ func (c *Syncer) Run() {
if err := c.performSync(); err != nil {
if c.runChecks {
c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceIdentifier, err)
c.logger.Printf("[DEBUG] consul.sync: disabling checks until Consul sync completes for %q: %v", c.serviceRegPrefix, err)
}
c.runChecks = false
} else {
@@ -421,7 +439,7 @@ func (c *Syncer) Run() {
c.Shutdown()
case <-c.notifyShutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceIdentifier)
c.logger.Printf("[INFO] consul.sync: shutting down sync for %q", c.serviceRegPrefix)
return
}
}
@@ -463,15 +481,16 @@ func (c *Syncer) performSync() error {
// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *Syncer) filterConsulServices(srvcs map[string]*consul.AgentService) map[string]*consul.AgentService {
nomadServices := make(map[string]*consul.AgentService)
for _, srv := range srvcs {
if strings.HasPrefix(srv.ID, structs.NomadConsulPrefix) &&
!strings.HasPrefix(srv.ID, structs.AgentServicePrefix) {
nomadServices[srv.ID] = srv
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
localServices := make(map[string]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
for serviceID, service := range consulServices {
if strings.HasPrefix(service.ID, c.serviceRegPrefix) {
localServices[serviceID] = service
}
}
return nomadServices
return localServices
}
// filterConsulChecks prunes out all the consul checks which do not have
@@ -522,9 +541,9 @@ func (c *Syncer) runCheck(check Check) {
}
}
// GenerateServiceIdentifier returns a service identifier based on an allocation
// id and task name
func GenerateServiceIdentifier(allocID string, taskName string) string {
// GenerateServicePrefix returns a service prefix based on an allocation id
// and task name.
func GenerateServicePrefix(allocID string, taskName string) string {
return fmt.Sprintf("%s-%s", taskName, allocID)
}

View File

@@ -52,15 +52,15 @@ func TestConsulServiceRegisterServices(t *testing.T) {
return
}
task := mockTask()
cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name))
cs.SetServiceRegPrefix(GenerateServicePrefix(allocID, task.Name))
cs.SetAddrFinder(task.FindHostAndPortFor)
if err := cs.SyncServices(); err != nil {
t.Fatalf("err: %v", err)
}
defer cs.Shutdown()
service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name))
service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name))
service1ID := service1.ID(GenerateServicePrefix(allocID, task.Name))
service2ID := service2.ID(GenerateServicePrefix(allocID, task.Name))
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
t.Fatalf("err : %v", err)
}
@@ -81,7 +81,7 @@ func TestConsulServiceUpdateService(t *testing.T) {
}
task := mockTask()
cs.SetServiceIdentifier(GenerateServiceIdentifier(allocID, task.Name))
cs.SetServiceRegPrefix(GenerateServicePrefix(allocID, task.Name))
cs.SetAddrFinder(task.FindHostAndPortFor)
if err := cs.SyncServices(); err != nil {
t.Fatalf("err: %v", err)
@@ -95,8 +95,8 @@ func TestConsulServiceUpdateService(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Make sure all the services and checks are still present
service1ID := service1.ID(GenerateServiceIdentifier(allocID, task.Name))
service2ID := service2.ID(GenerateServiceIdentifier(allocID, task.Name))
service1ID := service1.ID(GenerateServicePrefix(allocID, task.Name))
service2ID := service2.ID(GenerateServicePrefix(allocID, task.Name))
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
t.Fatalf("err : %v", err)
}

View File

@@ -480,7 +480,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
return err
}
cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
cs.SetServiceIdentifier(consul.GenerateServiceIdentifier(e.ctx.AllocID, e.ctx.Task.Name))
cs.SetServiceRegPrefix(consul.GenerateServicePrefix(e.ctx.AllocID, e.ctx.Task.Name))
cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
e.consulSyncer = cs
}