diff --git a/client/consul/sync.go b/client/consul/sync.go index 41d2e8fbd..c0ac82273 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -40,7 +40,7 @@ const ( syncInterval = 5 * time.Second ) -func NewConsulService(config *ConsulConfig, logger *log.Logger, task *structs.Task) (*ConsulService, error) { +func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() @@ -79,22 +79,21 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, task *structs.Ta return nil, err } consulService := ConsulService{ - client: c, - task: task, - - logger: logger, + client: c, + logger: logger, + services: make(map[string]*consul.AgentService), + checks: make(map[string]*structs.ServiceCheck), shutdownCh: make(chan struct{}), } - return &consulService, nil } func (c *ConsulService) SyncTask(task *structs.Task) error { var mErr multierror.Error c.task = task - var services map[string]*consul.AgentService - var checks map[string]*structs.ServiceCheck + services := make(map[string]*consul.AgentService) + checks := make(map[string]*structs.ServiceCheck) // Register Services and Checks that we don't know about or has changed for _, service := range task.Services { @@ -216,13 +215,12 @@ func (c *ConsulService) deregisterCheck(ID string) error { return c.client.Agent().CheckDeregister(ID) } -func (c *ConsulService) Sync() { +func (c *ConsulService) SyncWithConsul() { sync := time.After(syncInterval) - for { select { case <-sync: - if err := c.performSync; err != nil { + if err := c.performSync(); err != nil { c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err) } sync = time.After(syncInterval) @@ -233,43 +231,53 @@ func (c *ConsulService) Sync() { } } -func (c *ConsulService) performSync() { +func (c *ConsulService) performSync() error { + var mErr multierror.Error cServices, err := c.client.Agent().Services() if err != nil { - return + return err } cServices = c.filterConsulServices(cServices) cChecks, err := c.client.Agent().Checks() if err != nil { - return + return err } cChecks = c.filterConsulChecks(cChecks) // Remove services and checks that consul has but we don't have anymore for serviceID, _ := range cServices { if _, ok := c.services[serviceID]; !ok { - c.deregisterService(serviceID) + if err := c.deregisterService(serviceID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } } - for checkID, _ := range cChecks { if _, ok := c.checks[checkID]; !ok { - c.deregisterCheck(checkID) + if err := c.deregisterCheck(checkID); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } } // Add services and checks that consul doesn't have but we do for serviceID, service := range c.services { if _, ok := cServices[serviceID]; !ok { - c.registerService(service) + if err := c.registerService(service); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } } for checkID, check := range c.checks { if chk, ok := cChecks[checkID]; !ok { - c.registerCheck(check, c.services[chk.ServiceID]) + if err := c.registerCheck(check, c.services[chk.ServiceID]); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } } + + return mErr.ErrorOrNil() } // filterConsulServices prunes out all the service whose ids are not prefixed diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go new file mode 100644 index 000000000..77f8439ce --- /dev/null +++ b/client/consul/sync_test.go @@ -0,0 +1,92 @@ +package consul + +import ( + "log" + "os" + "testing" + "time" + + "github.com/hashicorp/nomad/nomad/structs" +) + +var logger = log.New(os.Stdout, "", log.LstdFlags) + +func TestConsulServiceRegisterServices(t *testing.T) { + cs, err := NewConsulService(&ConsulConfig{}, logger) + if err != nil { + t.Fatalf("Err: %v", err) + } + task := structs.Task{ + Name: "foo", + Services: []*structs.Service{ + &structs.Service{ + ID: "1", + Name: "foo-1", + Tags: []string{"tag1", "tag2"}, + PortLabel: "port1", + }, + &structs.Service{ + ID: "2", + Name: "foo-2", + Tags: []string{"tag1", "tag2"}, + PortLabel: "port2", + }, + }, + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + IP: "10.10.11.5", + DynamicPorts: []structs.Port{ + structs.Port{ + Label: "port1", + Value: 20002, + }, + structs.Port{ + Label: "port2", + Value: 20003, + }, + }, + }, + }, + }, + } + + if err := cs.SyncTask(&task); err != nil { + t.Fatalf("err: %v", err) + } + go cs.SyncWithConsul() + time.Sleep(1 * time.Second) + services, _ := cs.client.Agent().Services() + if _, ok := services[task.Services[0].ID]; !ok { + t.Fatalf("Service with ID 1 not registered") + } + + task.Services = []*structs.Service{ + &structs.Service{ + ID: "1", + Name: "foo-1", + Tags: []string{"tag1"}, + PortLabel: "port1", + }, + } + if err := cs.SyncTask(&task); err != nil { + t.Fatalf("err: %v", err) + } + services, _ = cs.client.Agent().Services() + if _, ok := services["2"]; ok { + t.Fatalf("Service with ID 2 should not be registered") + } + if err := cs.Deregister(); err != nil { + t.Fatalf("err: %v", err) + } + time.Sleep(1 * time.Second) + + services, _ = cs.client.Agent().Services() + if _, ok := services["2"]; ok { + t.Fatalf("Service with ID 2 should not be registered") + } + if _, ok := services["1"]; ok { + t.Fatalf("Service with ID 1 should not be registered") + } + +}