implemented reconciliation of unwanted services

This commit is contained in:
Diptanu Choudhury
2016-06-13 14:05:35 +02:00
parent 9e2bb39be9
commit 02b320c81e
4 changed files with 95 additions and 80 deletions

View File

@@ -378,7 +378,7 @@ func (a *Agent) setupServer() error {
Name: a.config.Consul.ServerServiceName,
Tags: []string{consul.ServiceTagSerf},
},
})
}, "agent")
}
return nil
@@ -424,7 +424,7 @@ func (a *Agent) setupClient() error {
PortLabel: a.clientRPCAddr,
Tags: []string{consul.ServiceTagRPC},
},
})
}, "agent")
}
return nil
@@ -593,7 +593,6 @@ func (a *Agent) setupConsulSyncer(shutdownCh chan struct{}) error {
if err != nil {
return err
}
a.consulSyncer.SetServiceRegPrefix("agent")
a.consulSyncer.SetAddrFinder(func(portLabel string) (string, int) {
host, port, err := net.SplitHostPort(portLabel)

View File

@@ -30,7 +30,7 @@ const (
// nomadServicePrefix is the prefix used when registering a service
// with consul
nomadServicePrefix = "nomad"
nomadServicePrefix = "nomad-registered-service"
// The periodic time interval for syncing services and checks with Consul
syncInterval = 5 * time.Second
@@ -71,6 +71,8 @@ type Syncer struct {
checkGroups map[string][]*consul.AgentCheckRegistration
groupsLock sync.RWMutex
groupName string
// The "Consul Registry" is a collection of Consul Services and
// Checks all guarded by the registryLock.
registryLock sync.RWMutex
@@ -82,14 +84,6 @@ type Syncer struct {
trackedChecks map[string]*consul.AgentCheckRegistration
trackedServices map[string]*consul.AgentServiceRegistration
// serviceRegPrefix is used to namespace the domain of registered
// Consul Services and Checks belonging to a single Syncer. A given
// Nomad Agent may spawn multiple Syncer tasks between the Agent
// Agent and its Executors, all syncing to a single Consul Agent.
// The serviceRegPrefix allows multiple Syncers to coexist without
// each Syncer clobbering each others Services. The Syncer namespace
// protocol is fmt.Sprintf("nomad-%s-%s", serviceRegPrefix, miscID).
// serviceRegPrefix is guarded by the registryLock.
serviceRegPrefix string
addrFinder func(portLabel string) (string, int)
@@ -204,47 +198,40 @@ func (c *Syncer) SetAddrFinder(addrFinder func(string) (string, int)) *Syncer {
return c
}
// SetServiceRegPrefix sets the registration prefix used by the Syncer when
// registering Services with Consul.
func (c *Syncer) SetServiceRegPrefix(servicePrefix string) *Syncer {
c.registryLock.Lock()
defer c.registryLock.Unlock()
c.serviceRegPrefix = servicePrefix
return c
}
// filterPrefix generates a unique prefix that a Syncer can later filter on.
func (c *Syncer) filterPrefix() string {
func (c *Syncer) filterPrefix(serviceRegPrefix string) string {
c.registryLock.RLock()
defer c.registryLock.RUnlock()
return fmt.Sprintf("%s-%s", nomadServicePrefix, c.serviceRegPrefix)
return fmt.Sprintf("%s-%s", nomadServicePrefix, serviceRegPrefix)
}
// GenerateServiceID creates a unique Consul ServiceID for a given
// Service.
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service) string {
func (c *Syncer) GenerateServiceID(groupName string, service *structs.Service, serviceRegPrefix string) string {
numTags := len(service.Tags)
switch numTags {
case 0:
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(), groupName, service.Name)
return fmt.Sprintf("%s-%s:%s", c.filterPrefix(serviceRegPrefix), groupName, service.Name)
case 1:
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(), groupName, service.Tags[0], service.Name)
return fmt.Sprintf("%s-%s:%s@%s", c.filterPrefix(serviceRegPrefix), groupName, service.Tags[0], service.Name)
default:
tags := strings.Join(service.Tags, "|")
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(), groupName, tags, service.Name)
return fmt.Sprintf("%s-%s:(%s)@%s", c.filterPrefix(serviceRegPrefix), groupName, tags, service.Name)
}
}
// SetServices assigns the slice of Nomad Services to the provided services
// group name.
func (c *Syncer) SetServices(groupName string, services []*structs.Service) error {
func (c *Syncer) SetServices(groupName string, services []*structs.Service, serviceRegPrefix string) error {
c.groupName = groupName
c.serviceRegPrefix = serviceRegPrefix
var mErr multierror.Error
numServ := len(services)
registeredServices := make([]*consul.AgentServiceRegistration, 0, numServ)
registeredChecks := make([]*consul.AgentCheckRegistration, 0, numServ)
for _, service := range services {
if service.ServiceID == "" {
service.ServiceID = c.GenerateServiceID(groupName, service)
service.ServiceID = c.GenerateServiceID(groupName, service, serviceRegPrefix)
}
var serviceReg *consul.AgentServiceRegistration
var err error
@@ -319,13 +306,12 @@ func (c *Syncer) flattenedServices() []*consul.AgentServiceRegistration {
const initialNumServices = 8
services := make([]*consul.AgentServiceRegistration, 0, initialNumServices)
c.groupsLock.RLock()
defer c.groupsLock.RUnlock()
for _, servicesGroup := range c.servicesGroups {
for _, service := range servicesGroup {
services = append(services, service)
}
}
c.groupsLock.RUnlock()
return services
}
@@ -371,11 +357,18 @@ func (c *Syncer) Shutdown() error {
c.registryLock.RUnlock()
// De-register all the services from Consul
services, err := c.queryAgentServices()
if err != nil {
mErr.Errors = append(mErr.Errors, err)
consulServices := make(map[string]*consul.AgentService)
for trackedGroup := range c.servicesGroups {
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
services, err := c.queryAgentServices(serviceIDPrefix)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
}
for _, service := range services {
for _, service := range consulServices {
if err := c.client.Agent().ServiceDeregister(service.ID); err != nil {
c.logger.Printf("[WARN] consul.syncer: failed to deregister service ID %+q: %v", service.ID, err)
mErr.Errors = append(mErr.Errors, err)
@@ -396,12 +389,12 @@ func (c *Syncer) queryChecks() (map[string]*consul.AgentCheck, error) {
// queryAgentServices queries the Consul Agent for a list of Consul services that
// have been registered with this Consul Syncer.
func (c *Syncer) queryAgentServices() (map[string]*consul.AgentService, error) {
func (c *Syncer) queryAgentServices(serviceIDPrefix string) (map[string]*consul.AgentService, error) {
services, err := c.client.Agent().Services()
if err != nil {
return nil, err
}
return c.filterConsulServices(services), nil
return c.filterConsulServices(serviceIDPrefix, services), nil
}
// syncChecks synchronizes this Syncer's Consul Checks with the Consul Agent.
@@ -660,9 +653,16 @@ func (c *Syncer) calcServicesDiff(consulServices map[string]*consul.AgentService
// syncServices synchronizes this Syncer's Consul Services with the Consul
// Agent.
func (c *Syncer) syncServices() error {
consulServices, err := c.queryAgentServices()
if err != nil {
return err
consulServices := make(map[string]*consul.AgentService)
for trackedGroup := range c.servicesGroups {
serviceIDPrefix := fmt.Sprintf("%s-%s", c.filterPrefix(c.serviceRegPrefix), trackedGroup)
services, err := c.queryAgentServices(serviceIDPrefix)
if err != nil {
return err
}
for serviceID, agentService := range services {
consulServices[serviceID] = agentService
}
}
// Synchronize services with Consul
@@ -794,12 +794,12 @@ func (c *Syncer) Run() {
if err := c.SyncServices(); err != nil {
if c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: disabling checks until successful sync for %+q: %v", c.serviceRegPrefix, err)
c.logger.Printf("[DEBUG] consul.syncer: error in syncing: %v", err)
}
c.consulAvailable = false
} else {
if !c.consulAvailable {
c.logger.Printf("[DEBUG] consul.syncer: re-enabling checks for for %+q", c.serviceRegPrefix)
c.logger.Printf("[DEBUG] consul.syncer: syncs succesful")
}
c.consulAvailable = true
}
@@ -809,7 +809,7 @@ func (c *Syncer) Run() {
c.Shutdown()
case <-c.notifyShutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul.syncer: shutting down sync for %+q", c.serviceRegPrefix)
c.logger.Printf("[INFO] consul.syncer: shutting down syncer ")
return
}
}
@@ -850,13 +850,12 @@ func (c *Syncer) SyncServices() error {
// filterConsulServices prunes out all the service whose ids are not prefixed
// with nomad-
func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
func (c *Syncer) filterConsulServices(serviceIDPrefix string, consulServices map[string]*consul.AgentService) map[string]*consul.AgentService {
localServices := make(map[string]*consul.AgentService, len(consulServices))
c.registryLock.RLock()
defer c.registryLock.RUnlock()
filterPrefix := c.filterPrefix()
for serviceID, service := range consulServices {
if strings.HasPrefix(service.ID, filterPrefix) {
if strings.HasPrefix(service.ID, serviceIDPrefix) {
localServices[serviceID] = service
}
}
@@ -867,7 +866,7 @@ func (c *Syncer) filterConsulServices(consulServices map[string]*consul.AgentSer
// services with Syncer's idPrefix.
func (c *Syncer) filterConsulChecks(consulChecks map[string]*consul.AgentCheck) map[string]*consul.AgentCheck {
localChecks := make(map[string]*consul.AgentCheck, len(consulChecks))
filterPrefix := c.filterPrefix()
filterPrefix := c.filterPrefix(c.serviceRegPrefix)
for checkID, check := range consulChecks {
if strings.HasPrefix(check.ServiceID, filterPrefix) {
localChecks[checkID] = check
@@ -912,6 +911,29 @@ func (c *Syncer) runCheck(check Check) {
}
}
func (c *Syncer) KeepServices(serviceRegPrefix string, servicesInRunningAllocs map[string][]*structs.Service) {
servicesToKeep := make(map[string]struct{})
for allocID, services := range servicesInRunningAllocs {
for _, service := range services {
serviceID := c.GenerateServiceID(allocID, service, serviceRegPrefix)
servicesToKeep[serviceID] = struct{}{}
}
}
servicesInConsul, err := c.ConsulClient().Agent().Services()
if err != nil {
return
}
filterServicesWithPrefix := c.filterPrefix(serviceRegPrefix)
relevantServices := c.filterConsulServices(filterServicesWithPrefix, servicesInConsul)
for serviceID := range relevantServices {
if _, ok := servicesToKeep[serviceID]; !ok {
c.deregisterService(serviceID)
}
}
}
// AddPeriodicHandler adds a uniquely named callback. Returns true if
// successful, false if a handler with the same name already exists.
func (c *Syncer) AddPeriodicHandler(name string, fn types.PeriodicCallback) bool {