mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 11:25:41 +03:00
Removed allocID and task name from consul service
This commit is contained in:
@@ -1208,7 +1208,8 @@ func (c *Client) syncConsul() {
|
||||
if taskState.State == structs.TaskStateRunning {
|
||||
if tr, ok := ar.tasks[taskName]; ok {
|
||||
for _, service := range tr.task.Services {
|
||||
services[service.ID(ar.alloc.ID, tr.task.Name)] = struct{}{}
|
||||
svcIdentifier := fmt.Sprintf("%s-%s", ar.alloc.ID, tr.task.Name)
|
||||
services[service.ID(svcIdentifier)] = struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1220,7 +1221,8 @@ func (c *Client) syncConsul() {
|
||||
Name: c.config.ClientServiceName,
|
||||
PortLabel: "clienthttpaddr",
|
||||
}
|
||||
services[clientService.ID("agent", "client")] = struct{}{}
|
||||
svcIdentifier := fmt.Sprintf("%s-%s", "agent", "client")
|
||||
services[clientService.ID(svcIdentifier)] = struct{}{}
|
||||
|
||||
if err := c.consulService.KeepServices(services); err != nil {
|
||||
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
|
||||
|
||||
@@ -22,11 +22,10 @@ type ConsulService struct {
|
||||
client *consul.Client
|
||||
availble bool
|
||||
|
||||
taskName string
|
||||
allocID string
|
||||
delegateChecks map[string]struct{}
|
||||
createCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
addrFinder func(string) (string, int)
|
||||
serviceIdentifier string
|
||||
delegateChecks map[string]struct{}
|
||||
createCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
addrFinder func(string) (string, int)
|
||||
|
||||
trackedServices map[string]*consul.AgentService
|
||||
trackedChecks map[string]*consul.AgentCheckRegistration
|
||||
@@ -133,21 +132,15 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c
|
||||
return c
|
||||
}
|
||||
|
||||
// SetAllocID sets the allocID
|
||||
func (c *ConsulService) SetAllocID(allocID string) *ConsulService {
|
||||
c.allocID = allocID
|
||||
return c
|
||||
}
|
||||
|
||||
// SetAddrFinder sets a function to find the host and port for a Service
|
||||
func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService {
|
||||
c.addrFinder = addrFinder
|
||||
return c
|
||||
}
|
||||
|
||||
// SetTaskName sets the task name whose services we are syncing with Consul
|
||||
func (c *ConsulService) SetTaskName(taskName string) *ConsulService {
|
||||
c.taskName = taskName
|
||||
// SetServiceIdentifier sets the identifier of the services we are syncing with Consul
|
||||
func (c *ConsulService) SetServiceIdentifier(serviceIdentifier string) *ConsulService {
|
||||
c.serviceIdentifier = serviceIdentifier
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -313,7 +306,7 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con
|
||||
// createService creates a Consul AgentService from a Nomad Service
|
||||
func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) {
|
||||
srv := consul.AgentService{
|
||||
ID: service.ID(c.allocID, c.taskName),
|
||||
ID: service.ID(c.serviceIdentifier),
|
||||
Service: service.Name,
|
||||
Tags: service.Tags,
|
||||
}
|
||||
@@ -367,7 +360,7 @@ func (c *ConsulService) PeriodicSync() {
|
||||
case <-sync.C:
|
||||
if err := c.performSync(); err != nil {
|
||||
if c.availble {
|
||||
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err)
|
||||
c.logger.Printf("[DEBUG] consul: error in syncing services for %q: %v", c.serviceIdentifier, err)
|
||||
}
|
||||
c.availble = false
|
||||
} else {
|
||||
@@ -375,7 +368,7 @@ func (c *ConsulService) PeriodicSync() {
|
||||
}
|
||||
case <-c.shutdownCh:
|
||||
sync.Stop()
|
||||
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName)
|
||||
c.logger.Printf("[INFO] consul: shutting down sync for %q", c.serviceIdentifier)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
@@ -45,21 +45,20 @@ func TestConsulServiceRegisterServices(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
cs.SetAllocID(allocID)
|
||||
// Skipping the test if consul isn't present
|
||||
if !cs.consulPresent() {
|
||||
return
|
||||
}
|
||||
task := mockTask()
|
||||
cs.SetTaskName(task.Name)
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
if err := cs.SyncServices(task.Services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer cs.Shutdown()
|
||||
|
||||
service1ID := service1.ID(allocID, task.Name)
|
||||
service2ID := service2.ID(allocID, task.Name)
|
||||
service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
|
||||
t.Fatalf("err : %v", err)
|
||||
}
|
||||
@@ -73,14 +72,13 @@ func TestConsulServiceUpdateService(t *testing.T) {
|
||||
if err != nil {
|
||||
t.Fatalf("Err: %v", err)
|
||||
}
|
||||
cs.SetAllocID(allocID)
|
||||
// Skipping the test if consul isn't present
|
||||
if !cs.consulPresent() {
|
||||
return
|
||||
}
|
||||
|
||||
task := mockTask()
|
||||
cs.SetTaskName(task.Name)
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
cs.SetAddrFinder(task.FindHostAndPortFor)
|
||||
if err := cs.SyncServices(task.Services); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
@@ -94,8 +92,8 @@ func TestConsulServiceUpdateService(t *testing.T) {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
// Make sure all the services and checks are still present
|
||||
service1ID := service1.ID(allocID, task.Name)
|
||||
service2ID := service2.ID(allocID, task.Name)
|
||||
service1ID := service1.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
service2ID := service2.ID(fmt.Sprintf("%s-%s", allocID, task.Name))
|
||||
if err := servicesPresent(t, []string{service1ID, service2ID}, cs); err != nil {
|
||||
t.Fatalf("err : %v", err)
|
||||
}
|
||||
|
||||
@@ -436,8 +436,7 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
|
||||
return err
|
||||
}
|
||||
cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
|
||||
cs.SetAllocID(e.ctx.AllocID)
|
||||
cs.SetTaskName(e.ctx.Task.Name)
|
||||
cs.SetServiceIdentifier(fmt.Sprintf("%s-%s", e.ctx.AllocID, e.ctx.Task.Name))
|
||||
cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
|
||||
e.consulService = cs
|
||||
}
|
||||
|
||||
@@ -484,6 +484,7 @@ func (a *Agent) createConsulConfig() {
|
||||
a.consulConfig = cfg
|
||||
}
|
||||
|
||||
// syncAgentServicesWithConsul syncs the client and server services with Consul
|
||||
func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAddr string) error {
|
||||
cs, err := consul.NewConsulService(a.consulConfig, a.logger)
|
||||
if err != nil {
|
||||
@@ -502,7 +503,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd
|
||||
}
|
||||
addrs["clienthttpaddr"] = clientHttpAddr
|
||||
services = append(services, clientService)
|
||||
cs.SetTaskName("client")
|
||||
cs.SetServiceIdentifier("agent-client")
|
||||
}
|
||||
if a.server != nil && a.config.ConsulConfig.ServerServiceName != "" {
|
||||
serverService := &structs.Service{
|
||||
@@ -511,7 +512,7 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd
|
||||
}
|
||||
addrs["serverhttpaddr"] = serverHttpAddr
|
||||
services = append(services, serverService)
|
||||
cs.SetTaskName("server")
|
||||
cs.SetServiceIdentifier("agent-server")
|
||||
}
|
||||
|
||||
cs.SetAddrFinder(func(portLabel string) (string, int) {
|
||||
@@ -530,7 +531,5 @@ func (a *Agent) syncAgentServicesWithConsul(clientHttpAddr string, serverHttpAdd
|
||||
return host, p
|
||||
})
|
||||
|
||||
cs.SetAllocID("agent")
|
||||
|
||||
return cs.SyncServices(services)
|
||||
}
|
||||
|
||||
@@ -1519,8 +1519,8 @@ func (s *Service) InitFields(job string, taskGroup string, task string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Service) ID(allocID string, taskName string) string {
|
||||
return fmt.Sprintf("%s-%s-%s-%s", NomadConsulPrefix, allocID, taskName, s.Hash())
|
||||
func (s *Service) ID(identifier string) string {
|
||||
return fmt.Sprintf("%s-%s-%s", NomadConsulPrefix, identifier, s.Hash())
|
||||
}
|
||||
|
||||
// Validate checks if the Check definition is valid
|
||||
|
||||
Reference in New Issue
Block a user