diff --git a/client/consul.go b/client/consul.go index 26a9c0d7b..5814450f9 100644 --- a/client/consul.go +++ b/client/consul.go @@ -2,32 +2,43 @@ package client import ( "fmt" - "github.com/hashicorp/consul/api" + consul "github.com/hashicorp/consul/api" "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/nomad/structs" "log" + "time" ) const ( - consulPort = 8080 + syncInterval = 5 * time.Second ) -type ConsulClient struct { - client *api.Client +type trackedService struct { + allocId string + task *structs.Task + service *structs.Service +} - logger *log.Logger +type ConsulClient struct { + client *consul.Client + logger *log.Logger + shutdownCh chan struct{} + + trackedServices map[string]*trackedService } func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { var err error - var c *api.Client - if c, err = api.NewClient(api.DefaultConfig()); err != nil { + var c *consul.Client + ts := make(map[string]*trackedService) + if c, err = consul.NewClient(consul.DefaultConfig()); err != nil { return nil, err } consulClient := ConsulClient{ - client: c, - logger: logger, + client: c, + logger: logger, + trackedServices: ts, } return &consulClient, nil @@ -35,31 +46,16 @@ func NewConsulClient(logger *log.Logger) (*ConsulClient, error) { func (c *ConsulClient) Register(task *structs.Task, allocID string) error { var mErr multierror.Error - var serviceDefns []*api.AgentServiceRegistration for _, service := range task.Services { - service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) - host, port := c.findPortAndHostForLabel(service.PortLabel, task) - if host == "" || port == 0 { - continue - } - checks := c.makeChecks(service, host, port) - asr := &api.AgentServiceRegistration{ - ID: service.Id, - Name: service.Name, - Tags: service.Tags, - Port: port, - Address: host, - Checks: checks, - } - serviceDefns = append(serviceDefns, asr) - } - - for _, serviceDefn := range serviceDefns { - c.logger.Printf("[INFO] Registering service %v with Consul", serviceDefn.Name) - if err := c.client.Agent().ServiceRegister(serviceDefn); err != nil { - c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", serviceDefn.Name, err) + if err := c.registerService(service, task, allocID); err != nil { mErr.Errors = append(mErr.Errors, err) } + ts := &trackedService{ + allocId: allocID, + task: task, + } + c.trackedServices[service.Id] = ts + } return mErr.ErrorOrNil() @@ -69,10 +65,11 @@ func (c *ConsulClient) Deregister(task *structs.Task) error { var mErr multierror.Error for _, service := range task.Services { c.logger.Printf("[INFO] De-Registering service %v with Consul", service.Name) - if err := c.client.Agent().ServiceDeregister(service.Id); err != nil { + if err := c.deregisterService(service.Id); err != nil { c.logger.Printf("[ERROR] Error in de-registering service %v from Consul", service.Name) mErr.Errors = append(mErr.Errors, err) } + delete(c.trackedServices, service.Id) } return mErr.ErrorOrNil() } @@ -86,10 +83,73 @@ func (c *ConsulClient) findPortAndHostForLabel(portLabel string, task *structs.T return "", 0 } -func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*api.AgentServiceCheck { - var checks []*api.AgentServiceCheck +func (c *ConsulClient) SyncWithConsul() { + sync := time.After(syncInterval) + agent := c.client.Agent() + + for { + select { + case <-sync: + var consulServices map[string]*consul.AgentService + var err error + if consulServices, err = agent.Services(); err != nil { + c.logger.Printf("[DEBUG] Error while syncing services with Consul: %v", err) + continue + } + for serviceId := range c.trackedServices { + if _, ok := consulServices[serviceId]; !ok { + ts := c.trackedServices[serviceId] + c.registerService(ts.service, ts.task, ts.allocId) + } + } + + for serviceId := range consulServices { + if _, ok := c.trackedServices[serviceId]; !ok { + if err := c.deregisterService(serviceId); err != nil { + c.logger.Printf("[DEBUG] Error while de-registering service with ID: %s", serviceId) + } + } + } + case <-c.shutdownCh: + return + } + } +} + +func (c *ConsulClient) registerService(service *structs.Service, task *structs.Task, allocID string) error { + var mErr multierror.Error + service.Id = fmt.Sprintf("%s-%s", allocID, task.Name) + host, port := c.findPortAndHostForLabel(service.PortLabel, task) + if host == "" || port == 0 { + return fmt.Errorf("The port:%s marked for registration of service: %s couldn't be found", service.PortLabel, service.Name) + } + checks := c.makeChecks(service, host, port) + asr := &consul.AgentServiceRegistration{ + ID: service.Id, + Name: service.Name, + Tags: service.Tags, + Port: port, + Address: host, + Checks: checks, + } + if err := c.client.Agent().ServiceRegister(asr); err != nil { + c.logger.Printf("[ERROR] Error while registering service %v with Consul: %v", service.Name, err) + mErr.Errors = append(mErr.Errors, err) + } + return mErr.ErrorOrNil() +} + +func (c *ConsulClient) deregisterService(serviceId string) error { + if err := c.client.Agent().ServiceDeregister(serviceId); err != nil { + return err + } + return nil +} + +func (c *ConsulClient) makeChecks(service *structs.Service, ip string, port int) []*consul.AgentServiceCheck { + var checks []*consul.AgentServiceCheck for _, check := range service.Checks { - c := &api.AgentServiceCheck{ + c := &consul.AgentServiceCheck{ Interval: check.Interval.String(), Timeout: check.Timeout.String(), }