Tracking the tasks too

This commit is contained in:
Diptanu Choudhury
2015-11-24 17:26:30 -08:00
parent 3051958323
commit 78555b509a
3 changed files with 44 additions and 16 deletions

View File

@@ -17,14 +17,20 @@ const (
)
type trackedService struct {
allocId string
allocId string
task *structs.Task
serviceHash string
service *structs.Service
}
type trackedTask struct {
allocID string
task *structs.Task
service *structs.Service
}
func (t *trackedService) IsServiceValid() bool {
for _, service := range t.task.Services {
if service.Hash() == t.service.Hash() {
if service.Id == t.service.Id && service.Hash() == t.serviceHash {
return true
}
}
@@ -39,8 +45,10 @@ type ConsulService struct {
trackedServices map[string]*trackedService // Service ID to Tracked Service Map
trackedChecks map[string]bool // List of check ids that is being tracked
trackedTasks map[string]*trackedTask
trackedSrvLock sync.Mutex
trackedChkLock sync.Mutex
trackedTskLock sync.Mutex
}
func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, error) {
@@ -56,6 +64,7 @@ func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, er
client: c,
logger: logger,
trackedServices: make(map[string]*trackedService),
trackedTasks: make(map[string]*trackedTask),
shutdownCh: make(chan struct{}),
}
@@ -64,6 +73,10 @@ func NewConsulService(logger *log.Logger, consulAddr string) (*ConsulService, er
func (c *ConsulService) Register(task *structs.Task, allocID string) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
tt := &trackedTask{allocID: allocID, task: task}
c.trackedTasks[fmt.Sprintf("%s-%s", allocID, task.Name)] = tt
c.trackedTskLock.Unlock()
for _, service := range task.Services {
c.logger.Printf("[INFO] consul: Registering service %s with Consul.", service.Name)
if err := c.registerService(service, task, allocID); err != nil {
@@ -74,8 +87,11 @@ func (c *ConsulService) Register(task *structs.Task, allocID string) error {
return mErr.ErrorOrNil()
}
func (c *ConsulService) Deregister(task *structs.Task) error {
func (c *ConsulService) Deregister(task *structs.Task, allocID string) error {
var mErr multierror.Error
c.trackedTskLock.Lock()
delete(c.trackedTasks, fmt.Sprintf("%s-%s", allocID, task.Name))
c.trackedTskLock.Unlock()
for _, service := range task.Services {
if service.Id == "" {
continue
@@ -122,6 +138,7 @@ func (c *ConsulService) performSync(agent *consul.Agent) {
var consulServices map[string]*consul.AgentService
var err error
// Remove the tracked services which tasks no longer references
for serviceId, ts := range c.trackedServices {
if !ts.IsServiceValid() {
c.logger.Printf("[INFO] consul: Removing service: %s since the task doesn't have it anymore", ts.service.Name)
@@ -129,6 +146,15 @@ func (c *ConsulService) performSync(agent *consul.Agent) {
}
}
// Add additional tasks that we might not have added from tasks
for _, trackedTask := range c.trackedTasks {
for _, service := range trackedTask.task.Services {
if _, ok := c.trackedServices[service.Id]; !ok {
c.registerService(service, trackedTask.task, trackedTask.allocID)
}
}
}
// Get the list of the services that Consul knows about
if consulServices, err = agent.Services(); err != nil {
return
@@ -173,9 +199,10 @@ func (c *ConsulService) registerService(service *structs.Service, task *structs.
Address: host,
}
ts := &trackedService{
allocId: allocID,
task: task,
service: service,
allocId: allocID,
task: task,
serviceHash: service.Hash(),
service: service,
}
c.trackedSrvLock.Lock()
c.trackedServices[service.Id] = ts

View File

@@ -132,6 +132,14 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
task := structs.Task{
Name: "redis",
Services: services,
Resources: &structs.Resources{
Networks: []*structs.NetworkResource{
{
IP: "10.10.0.1",
DynamicPorts: []structs.Port{{"db", 20413}},
},
},
},
}
s1 := structs.Service{
Id: "1-example-cache-redis",
@@ -140,14 +148,7 @@ func TestConsul_Service_Should_Be_Re_Reregistered_On_Change(t *testing.T) {
PortLabel: "db",
}
task.Services = append(task.Services, &s1)
ts := trackedService{
allocId: "1",
task: &task,
service: &s1,
}
c.trackedServices = map[string]*trackedService{
"1-example-cache-redis": &ts,
}
c.Register(&task, "1")
s1.Tags = []string{"frontcache"}

View File

@@ -237,7 +237,7 @@ func (r *TaskRunner) run() {
r.consulService.Register(r.task, r.allocID)
// De-Register the services belonging to the task from consul
defer r.consulService.Deregister(r.task)
defer r.consulService.Deregister(r.task, r.allocID)
OUTER:
// Wait for updates