From 02b320c81e39f9521225820c0f99bc5d1e361c10 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 13 Jun 2016 14:05:35 +0200 Subject: [PATCH] implemented reconciliation of unwanted services --- client/client.go | 53 +++++++------- client/driver/executor/executor.go | 7 +- command/agent/agent.go | 5 +- command/agent/consul/syncer.go | 110 +++++++++++++++++------------ 4 files changed, 95 insertions(+), 80 deletions(-) diff --git a/client/client.go b/client/client.go index b77db4ccb..6983991a3 100644 --- a/client/client.go +++ b/client/client.go @@ -1344,35 +1344,30 @@ func (c *Client) setupConsulSyncer() error { // TODO this should only deregister things that the executors do not know // about - //consulServicesSyncFn := func() error { - //const estInitialConsulServices = 8 - //const serviceGroupName = "executor" - //services := make([]*structs.Service, 0, estInitialConsulServices) - //for allocID, ar := range c.getAllocRunners() { - //ar.taskStatusLock.RLock() - //taskStates := copyTaskStates(ar.taskStates) - //ar.taskStatusLock.RUnlock() - //for taskName, taskState := range taskStates { - //if taskState.State == structs.TaskStateRunning { - //if tr, ok := ar.tasks[taskName]; ok { - //for _, service := range tr.task.Services { - //if service.Name == "" { - //service.Name = fmt.Sprintf("%s-%s", tr.task.Name, allocID) - //} - //if service.ServiceID == "" { - //service.ServiceID = fmt.Sprintf("%s-%s:%s/%s", c.consulSyncer.GenerateServiceID(serviceGroupName, service), tr.task.Name, allocID) - //} - //services = append(services, service) - //} - //} - //} - //} - //} - - //c.consulSyncer.SetServices(serviceGroupName, services) - //return nil - //} - //c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn) + consulServicesSyncFn := func() error { + const estInitialConsulServices = 8 + const serviceGroupName = "executor" + servicesInRunningAllocs := make(map[string][]*structs.Service) + for allocID, ar := range c.getAllocRunners() { + services := make([]*structs.Service, 0, estInitialConsulServices) + ar.taskStatusLock.RLock() + taskStates := copyTaskStates(ar.taskStates) + ar.taskStatusLock.RUnlock() + for taskName, taskState := range taskStates { + if taskState.State == structs.TaskStateRunning { + if tr, ok := ar.tasks[taskName]; ok { + for _, service := range tr.task.Services { + services = append(services, service) + } + } + } + } + servicesInRunningAllocs[allocID] = services + } + c.consulSyncer.KeepServices(serviceGroupName, servicesInRunningAllocs) + return nil + } + c.consulSyncer.AddPeriodicHandler("Nomad Client Services Sync Handler", consulServicesSyncFn) return nil } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 0d91ae5bf..9088728b5 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -39,7 +39,7 @@ const ( pidScanInterval = 5 * time.Second // serviceRegPrefix is the prefix the entire Executor should use - serviceRegPrefix = "executor" + executorServiceRegPrefix = "executor" ) var ( @@ -365,7 +365,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { // Re-syncing task with Consul agent if e.consulSyncer != nil { e.interpolateServices(e.ctx.Task) - e.consulSyncer.SetServices(e.ctx.AllocID, task.Services) + e.consulSyncer.SetServices(e.ctx.AllocID, task.Services, executorServiceRegPrefix) } return nil } @@ -492,9 +492,8 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { } e.interpolateServices(e.ctx.Task) e.consulSyncer.SetDelegatedChecks(e.createCheckMap(), e.createCheck) - e.consulSyncer.SetServiceRegPrefix(serviceRegPrefix) e.consulSyncer.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) - e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services) + e.consulSyncer.SetServices(e.ctx.AllocID, e.ctx.Task.Services, executorServiceRegPrefix) return nil } diff --git a/command/agent/agent.go b/command/agent/agent.go index a231f3099..6309e76d6 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -378,7 +378,7 @@ func (a *Agent) setupServer() error { Name: a.config.Consul.ServerServiceName, Tags: []string{consul.ServiceTagSerf}, }, - }) + }, "agent") } return nil @@ -424,7 +424,7 @@ func (a *Agent) setupClient() error { PortLabel: a.clientRPCAddr, Tags: []string{consul.ServiceTagRPC}, }, - }) + }, "agent") } return nil @@ -593,7 +593,6 @@ func (a *Agent) setupConsulSyncer(shutdownCh chan struct{}) error { if err != nil { return err } - a.consulSyncer.SetServiceRegPrefix("agent") a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) { host, port, err := net.SplitHostPort(portLabel) diff --git a/command/agent/consul/syncer.go b/command/agent/consul/syncer.go index 5983fd67d..01239cc47 100644 --- a/command/agent/consul/syncer.go +++ b/command/agent/consul/syncer.go @@ -30,7 +30,7 @@ const ( // nomadServicePrefix is the prefix used when registering a service // with consul - nomadServicePrefix = "nomad" + nomadServicePrefix = "nomad-registered-service" // The periodic time interval for syncing services and checks with Consul syncInterval = 5 * time.Second @@ -71,6 +71,8 @@ type Syncer struct { checkGroups map[string][]*consul.AgentCheckRegistration groupsLock sync.RWMutex + groupName string + // The "Consul Registry" is a collection of Consul Services and // Checks all guarded by the registryLock. registryLock sync.RWMutex @@ -82,14 +84,6 @@ type Syncer struct { trackedChecks map[string]*consul.AgentCheckRegistration trackedServices map[string]*consul.AgentServiceRegistration - // serviceRegPrefix is used to namespace the domain of registered - // Consul Services and Checks belonging to a single Syncer. A given - // Nomad Agent may spawn multiple Syncer tasks between the Agent - // Agent and its Executors, all syncing to a single Consul Agent. - // The serviceRegPrefix allows multiple Syncers to coexist without - // each Syncer clobbering each others Services. The Syncer namespace - // protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID). - // serviceRegPrefix is guarded by the registryLock. serviceRegPrefix string addrFinder func(portLabel string) (string, int) @@ -204,47 +198,40 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer { return c } -// SetServiceRegPrefix sets the registration prefix used by the Syncer when -// registering Services with Consul. -func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer { - c.registryLock.Lock() - defer c.registryLock.Unlock() - c.serviceRegPrefix = servicePrefix - return c -} - // filterPrefix generates a unique prefix that a Syncer can later filter on. -func (c *Syncer) filterPrefix() string { +func (c *Syncer) filterPrefix(serviceRegPrefix string) string { c.registryLock.RLock() defer c.registryLock.RUnlock() - return fmt.Sprintf("%s-%s", nomadServicePrefix, c.serviceRegPrefix) + return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix) } // GenerateServiceID creates a unique Consul ServiceID for a given // Service. -func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service) string { +func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string { numTags := len(service.Tags) switch numTags { case 0: - return fmt.Sprintf("%s-%s:%s", c.filterPrefix(), groupName, service.Name) + return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name) case 1: - return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(), groupName, service.Tags[0], service.Name) + return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name) default: tags := strings.Join(service.Tags, "|") - return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(), groupName, tags, service.Name) + return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name) } } // SetServices assigns the slice of Nomad Services to the provided services // group name. -func (c *Syncer) SetServices(groupName string, services []*structs.Service) error { +func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error { + c.groupName = groupName + c.serviceRegPrefix = serviceRegPrefix var mErr multierror.Error numServ := len(services) registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ) registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ) for _, service := range services { if service.ServiceID == "" { - service.ServiceID = c.GenerateServiceID(groupName, service) + service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix) } var serviceReg *consul.AgentServiceRegistration var err error @@ -319,13 +306,12 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration { const initialNumServices = 8 services := make([]*consul.AgentServiceRegistration, 0, initialNumServices) c.groupsLock.RLock() + defer c.groupsLock.RUnlock() for _, servicesGroup := range c.servicesGroups { for _, service := range servicesGroup { services = append(services, service) } } - c.groupsLock.RUnlock() - return services } @@ -371,11 +357,18 @@ func (c *Syncer) Shutdown() error { c.registryLock.RUnlock() // De-register all the services from Consul - services, err := c.queryAgentServices() - if err != nil { - mErr.Errors = append(mErr.Errors, err) + consulServices := make(map[string]*consul.AgentService) + for trackedGroup := range c.servicesGroups { + serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) + services, err := c.queryAgentServices(serviceIDPrefix) + if err != nil { + return err + } + for serviceID, agentService := range services { + consulServices[serviceID] = agentService + } } - for _, service := range services { + for _, service := range consulServices { if err := c.client.Agent().ServiceDeregister(service.ID); err != nil { c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err) mErr.Errors = append(mErr.Errors, err) @@ -396,12 +389,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) { // queryAgentServices queries the Consul Agent for a list of Consul services that // have been registered with this Consul Syncer. -func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) { +func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) { services, err := c.client.Agent().Services() if err != nil { return nil, err } - return c.filterConsulServices(services), nil + return c.filterConsulServices(serviceIDPrefix, services), nil } // syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent. @@ -660,9 +653,16 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService // syncServices synchronizes this Syncer's Consul Services with the Consul // Agent. func (c *Syncer) syncServices() error { - consulServices, err := c.queryAgentServices() - if err != nil { - return err + consulServices := make(map[string]*consul.AgentService) + for trackedGroup := range c.servicesGroups { + serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup) + services, err := c.queryAgentServices(serviceIDPrefix) + if err != nil { + return err + } + for serviceID, agentService := range services { + consulServices[serviceID] = agentService + } } // Synchronize services with Consul @@ -794,12 +794,12 @@ func (c *Syncer) Run() { if err := c.SyncServices(); err != nil { if c.consulAvailable { - c.logger.Printf("[DEBUG] consul.syncer: disabling checks until successful sync for %+q: %v", c.serviceRegPrefix, err) + c.logger.Printf("[DEBUG] consul.syncer: error in syncing: %v", err) } c.consulAvailable = false } else { if !c.consulAvailable { - c.logger.Printf("[DEBUG] consul.syncer: re-enabling checks for for %+q", c.serviceRegPrefix) + c.logger.Printf("[DEBUG] consul.syncer: syncs succesful") } c.consulAvailable = true } @@ -809,7 +809,7 @@ func (c *Syncer) Run() { c.Shutdown() case <-c.notifyShutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul.syncer: shutting down sync for %+q", c.serviceRegPrefix) + c.logger.Printf("[INFO] consul.syncer: shutting down syncer ") return } } @@ -850,13 +850,12 @@ func (c *Syncer) SyncServices() error { // filterConsulServices prunes out all the service whose ids are not prefixed // with nomad- -func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService { +func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService { localServices := make(map[string]*consul.AgentService, len(consulServices)) c.registryLock.RLock() defer c.registryLock.RUnlock() - filterPrefix := c.filterPrefix() for serviceID, service := range consulServices { - if strings.HasPrefix(service.ID, filterPrefix) { + if strings.HasPrefix(service.ID, serviceIDPrefix) { localServices[serviceID] = service } } @@ -867,7 +866,7 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer // services with Syncer's idPrefix. func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck { localChecks := make(map[string]*consul.AgentCheck, len(consulChecks)) - filterPrefix := c.filterPrefix() + filterPrefix := c.filterPrefix(c.serviceRegPrefix) for checkID, check := range consulChecks { if strings.HasPrefix(check.ServiceID, filterPrefix) { localChecks[checkID] = check @@ -912,6 +911,29 @@ func (c *Syncer) runCheck(check Check) { } } +func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) { + servicesToKeep := make(map[string]struct{}) + for allocID, services := range servicesInRunningAllocs { + for _, service := range services { + serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix) + servicesToKeep[serviceID] = struct{}{} + } + } + + servicesInConsul, err := c.ConsulClient().Agent().Services() + if err != nil { + return + } + + filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix) + relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul) + for serviceID := range relevantServices { + if _, ok := servicesToKeep[serviceID]; !ok { + c.deregisterService(serviceID) + } + } +} + // AddPeriodicHandler adds a uniquely named callback. Returns true if // successful, false if a handler with the same name already exists. func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {