From 78555b509a9e744ffbef50b48597c0e2e114e197 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 24 Nov 2015 17:26:30 -0800 Subject: [PATCH] Tracking the tasks too --- client/consul.go | 41 ++++++++++++++++++++++++++++++++++------- client/consul_test.go | 17 +++++++++-------- client/task_runner.go | 2 +- 3 files changed, 44 insertions(+), 16 deletions(-) diff --git a/client/consul.go b/client/consul.go index 738a08c71..c216f1095 100644 --- a/client/consul.go +++ b/client/consul.go @@ -17,14 +17,20 @@ const ( ) type trackedService struct { - allocId string + allocId string + task *structs.Task + serviceHash string + service *structs.Service +} + +type trackedTask struct { + allocID string task *structs.Task - service *structs.Service } func (t *trackedService) IsServiceValid() bool { for _, service := range t.task.Services { - if service.Hash() == t.service.Hash() { + if service.Id == t.service.Id && service.Hash() == t.serviceHash { return true } } @@ -39,8 +45,10 @@ type ConsulService struct { trackedServices map[string]*trackedService // Service ID to Tracked Service Map trackedChecks map[string]bool // List of check ids that is being tracked + trackedTasks map[string]*trackedTask trackedSrvLock sync.Mutex trackedChkLock sync.Mutex + trackedTskLock sync.Mutex } func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, error) { @@ -56,6 +64,7 @@ func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, er client: c, logger: logger, trackedServices: make(map[string]*trackedService), + trackedTasks: make(map[string]*trackedTask), shutdownCh: make(chan struct{}), } @@ -64,6 +73,10 @@ func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, er func (c *ConsulService) Register(task *structs.Task, allocID string) error { var mErr multierror.Error + c.trackedTskLock.Lock() + tt := &trackedTask{allocID: allocID, task: task} + c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt + c.trackedTskLock.Unlock() for _, service := range task.Services { c.logger.Printf("[INFO] consul: Registering service %s with Consul.", service.Name) if err := c.registerService(service, task, allocID); err != nil { @@ -74,8 +87,11 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error { return mErr.ErrorOrNil() } -func (c *ConsulService) Deregister(task *structs.Task) error { +func (c *ConsulService) Deregister(task *structs.Task, allocID string) error { var mErr multierror.Error + c.trackedTskLock.Lock() + delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name)) + c.trackedTskLock.Unlock() for _, service := range task.Services { if service.Id == "" { continue @@ -122,6 +138,7 @@ func (c *ConsulService) performSync(agent *consul.Agent) { var consulServices map[string]*consul.AgentService var err error + // Remove the tracked services which tasks no longer references for serviceId, ts := range c.trackedServices { if !ts.IsServiceValid() { c.logger.Printf("[INFO] consul: Removing service: %s since the task doesn't have it anymore", ts.service.Name) @@ -129,6 +146,15 @@ func (c *ConsulService) performSync(agent *consul.Agent) { } } + // Add additional tasks that we might not have added from tasks + for _, trackedTask := range c.trackedTasks { + for _, service := range trackedTask.task.Services { + if _, ok := c.trackedServices[service.Id]; !ok { + c.registerService(service, trackedTask.task, trackedTask.allocID) + } + } + } + // Get the list of the services that Consul knows about if consulServices, err = agent.Services(); err != nil { return @@ -173,9 +199,10 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs. Address: host, } ts := &trackedService{ - allocId: allocID, - task: task, - service: service, + allocId: allocID, + task: task, + serviceHash: service.Hash(), + service: service, } c.trackedSrvLock.Lock() c.trackedServices[service.Id] = ts diff --git a/client/consul_test.go b/client/consul_test.go index 87254ee65..002b89f09 100644 --- a/client/consul_test.go +++ b/client/consul_test.go @@ -132,6 +132,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) { task := structs.Task{ Name: "redis", Services: services, + Resources: &structs.Resources{ + Networks: []*structs.NetworkResource{ + { + IP: "10.10.0.1", + DynamicPorts: []structs.Port{{"db", 20413}}, + }, + }, + }, } s1 := structs.Service{ Id: "1-example-cache-redis", @@ -140,14 +148,7 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) { PortLabel: "db", } task.Services = append(task.Services, &s1) - ts := trackedService{ - allocId: "1", - task: &task, - service: &s1, - } - c.trackedServices = map[string]*trackedService{ - "1-example-cache-redis": &ts, - } + c.Register(&task, "1") s1.Tags = []string{"frontcache"} diff --git a/client/task_runner.go b/client/task_runner.go index c515abac5..7f6cc40ff 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -237,7 +237,7 @@ func (r *TaskRunner) run() { r.consulService.Register(r.task, r.allocID) // De-Register the services belonging to the task from consul - defer r.consulService.Deregister(r.task) + defer r.consulService.Deregister(r.task, r.allocID) OUTER: // Wait for updates