diff --git a/client/client.go b/client/client.go index 65d16add7..1ca63c45b 100644 --- a/client/client.go +++ b/client/client.go @@ -1208,7 +1208,8 @@ func (c *Client) syncConsul() { if taskState.State == structs.TaskStateRunning { if tr, ok := ar.tasks[taskName]; ok { for _, service := range tr.task.Services { - services[service.ID(ar.alloc.ID, tr.task.Name)] = struct{}{} + svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name) + services[service.ID(svcIdentifier)] = struct{}{} } } } @@ -1220,7 +1221,8 @@ func (c *Client) syncConsul() { Name: c.config.ClientServiceName, PortLabel: "clienthttpaddr", } - services[clientService.ID("agent", "client")] = struct{}{} + svcIdentifier := fmt.Sprintf("%s-%s", "agent", "client") + services[clientService.ID(svcIdentifier)] = struct{}{} if err := c.consulService.KeepServices(services); err != nil { c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) diff --git a/client/consul/sync.go b/client/consul/sync.go index e1d16b9c9..e6d857eb1 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,11 +22,10 @@ type ConsulService struct { client *consul.Client availble bool - taskName string - allocID string - delegateChecks map[string]struct{} - createCheck func(*structs.ServiceCheck, string) (Check, error) - addrFinder func(string) (string, int) + serviceIdentifier string + delegateChecks map[string]struct{} + createCheck func(*structs.ServiceCheck, string) (Check, error) + addrFinder func(string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -133,21 +132,15 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } -// SetAllocID sets the allocID -func (c *ConsulService) SetAllocID(allocID string) *ConsulService { - c.allocID = allocID - return c -} - // SetAddrFinder sets a function to find the host and port for a Service func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { c.addrFinder = addrFinder return c } -// SetTaskName sets the task name whose services we are syncing with Consul -func (c *ConsulService) SetTaskName(taskName string) *ConsulService { - c.taskName = taskName +// SetServiceIdentifier sets the identifier of the services we are syncing with Consul +func (c *ConsulService) SetServiceIdentifier(serviceIdentifier string) *ConsulService { + c.serviceIdentifier = serviceIdentifier return c } @@ -313,7 +306,7 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con // createService creates a Consul AgentService from a Nomad Service func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) { srv := consul.AgentService{ - ID: service.ID(c.allocID, c.taskName), + ID: service.ID(c.serviceIdentifier), Service: service.Name, Tags: service.Tags, } @@ -367,7 +360,7 @@ func (c *ConsulService) PeriodicSync() { case <-sync.C: if err := c.performSync(); err != nil { if c.availble { - c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err) + c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err) } c.availble = false } else { @@ -375,7 +368,7 @@ func (c *ConsulService) PeriodicSync() { } case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName) + c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier) return } } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index df6a7d778..d4fb5fad4 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -45,21 +45,20 @@ func TestConsulServiceRegisterServices(t *testing.T) { if err != nil { t.Fatalf("Err: %v", err) } - cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return } task := mockTask() - cs.SetTaskName(task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) } defer cs.Shutdown() - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } @@ -73,14 +72,13 @@ func TestConsulServiceUpdateService(t *testing.T) { if err != nil { t.Fatalf("Err: %v", err) } - cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return } task := mockTask() - cs.SetTaskName(task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name)) cs.SetAddrFinder(task.FindHostAndPortFor) if err := cs.SyncServices(task.Services); err != nil { t.Fatalf("err: %v", err) @@ -94,8 +92,8 @@ func TestConsulServiceUpdateService(t *testing.T) { t.Fatalf("err: %v", err) } // Make sure all the services and checks are still present - service1ID := service1.ID(allocID, task.Name) - service2ID := service2.ID(allocID, task.Name) + service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) + service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name)) if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil { t.Fatalf("err : %v", err) } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 506f728d9..9552c2892 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -436,8 +436,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) - cs.SetAllocID(e.ctx.AllocID) - cs.SetTaskName(e.ctx.Task.Name) + cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", e.ctx.AllocID, e.ctx.Task.Name)) cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } diff --git a/command/agent/agent.go b/command/agent/agent.go index 5932d1124..31a7f5acd 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -484,6 +484,7 @@ func (a *Agent) createConsulConfig() { a.consulConfig = cfg } +// syncAgentServicesWithConsul syncs the client and server services with Consul func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error { cs, err := consul.NewConsulService(a.consulConfig, a.logger) if err != nil { @@ -502,7 +503,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd } addrs["clienthttpaddr"] = clientHttpAddr services = append(services, clientService) - cs.SetTaskName("client") + cs.SetServiceIdentifier("agent-client") } if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" { serverService := &structs.Service{ @@ -511,7 +512,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd } addrs["serverhttpaddr"] = serverHttpAddr services = append(services, serverService) - cs.SetTaskName("server") + cs.SetServiceIdentifier("agent-server") } cs.SetAddrFinder(func(portLabel string) (string, int) { @@ -530,7 +531,5 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd return host, p }) - cs.SetAllocID("agent") - return cs.SyncServices(services) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 18fa5f93b..7f3428b3e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1519,8 +1519,8 @@ func (s *Service) InitFields(job string, taskGroup string, task string) { } } -func (s *Service) ID(allocID string, taskName string) string { - return fmt.Sprintf("%s-%s-%s-%s", NomadConsulPrefix, allocID, taskName, s.Hash()) +func (s *Service) ID(identifier string) string { + return fmt.Sprintf("%s-%s-%s", NomadConsulPrefix, identifier, s.Hash()) } // Validate checks if the Check definition is valid