From 2663ef9d6a3f5d30b5ea672bc6084da7f4893ac2 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Tue, 12 Apr 2016 01:55:19 -0400 Subject: [PATCH] Refactored the signature of NewConsulService --- client/client.go | 2 +- client/consul/sync.go | 35 +++++++++++++++++++++--------- client/consul/sync_test.go | 12 ++++++---- client/driver/executor/executor.go | 9 +++++--- 4 files changed, 40 insertions(+), 18 deletions(-) diff --git a/client/client.go b/client/client.go index bd6a20216..6fb3ad34e 100644 --- a/client/client.go +++ b/client/client.go @@ -1181,7 +1181,7 @@ func (c *Client) setupConsulClient() error { VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true), } - cs, err := consul.NewConsulService(&cfg, c.logger, "") + cs, err := consul.NewConsulService(&cfg, c.logger) c.consulService = cs return err } diff --git a/client/consul/sync.go b/client/consul/sync.go index b1bc11ffa..e6092c0e0 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -22,10 +22,11 @@ type ConsulService struct { client *consul.Client availble bool - task *structs.Task + taskName string allocID string delegateChecks map[string]struct{} createCheck func(*structs.ServiceCheck, string) (Check, error) + addrFinder func(string) (string, int) trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration @@ -60,7 +61,7 @@ const ( ) // NewConsulService returns a new ConsulService -func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) (*ConsulService, error) { +func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) { var err error var c *consul.Client cfg := consul.DefaultConfig() @@ -114,7 +115,6 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) } consulService := ConsulService{ client: c, - allocID: allocID, logger: logger, trackedServices: make(map[string]*consul.AgentService), trackedChecks: make(map[string]*consul.AgentCheckRegistration), @@ -133,15 +133,30 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c return c } +// SetAllocID sets the allocID +func (c *ConsulService) SetAllocID(allocID string) *ConsulService { + c.allocID = allocID + return c +} + +func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService { + c.addrFinder = addrFinder + return c +} + +func (c *ConsulService) SetTaskName(taskName string) *ConsulService { + c.taskName = taskName + return c +} + // SyncTask sync the services and task with consul -func (c *ConsulService) SyncTask(task *structs.Task) error { +func (c *ConsulService) SyncServices(services []*structs.Service) error { var mErr multierror.Error - c.task = task taskServices := make(map[string]*consul.AgentService) taskChecks := make(map[string]*consul.AgentCheckRegistration) // Register Services and Checks that we don't know about or has changed - for _, service := range task.Services { + for _, service := range services { srv, err := c.createService(service) if err != nil { mErr.Errors = append(mErr.Errors, err) @@ -296,11 +311,11 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con // createService creates a Consul AgentService from a Nomad Service func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) { srv := consul.AgentService{ - ID: service.ID(c.allocID, c.task.Name), + ID: service.ID(c.allocID, c.taskName), Service: service.Name, Tags: service.Tags, } - host, port := c.task.FindHostAndPortFor(service.PortLabel) + host, port := c.addrFinder(service.PortLabel) if host != "" { srv.Address = host } @@ -350,7 +365,7 @@ func (c *ConsulService) PeriodicSync() { case <-sync.C: if err := c.performSync(); err != nil { if c.availble { - c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err) + c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err) } c.availble = false } else { @@ -358,7 +373,7 @@ func (c *ConsulService) PeriodicSync() { } case <-c.shutdownCh: sync.Stop() - c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name) + c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName) return } } diff --git a/client/consul/sync_test.go b/client/consul/sync_test.go index bb1acfc3c..c69a0caeb 100644 --- a/client/consul/sync_test.go +++ b/client/consul/sync_test.go @@ -12,6 +12,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + allocID = "12" +) + var ( logger = log.New(os.Stdout, "", log.LstdFlags) check1 = structs.ServiceCheck{ @@ -37,11 +41,11 @@ var ( ) func TestConsulServiceRegisterServices(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } + cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return @@ -63,11 +67,11 @@ func TestConsulServiceRegisterServices(t *testing.T) { } func TestConsulServiceUpdateService(t *testing.T) { - allocID := "12" - cs, err := NewConsulService(&ConsulConfig{}, logger, allocID) + cs, err := NewConsulService(&ConsulConfig{}, logger) if err != nil { t.Fatalf("Err: %v", err) } + cs.SetAllocID(allocID) // Skipping the test if consul isn't present if !cs.consulPresent() { return diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 664739f51..506f728d9 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -317,7 +317,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error { // Re-syncing task with consul service if e.consulService != nil { - if err := e.consulService.SyncTask(task); err != nil { + if err := e.consulService.SyncServices(task.Services); err != nil { return err } } @@ -431,17 +431,20 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.logger.Printf("[INFO] executor: registering services") e.consulCtx = ctx if e.consulService == nil { - cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID) + cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger) if err != nil { return err } cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) + cs.SetAllocID(e.ctx.AllocID) + cs.SetTaskName(e.ctx.Task.Name) + cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor) e.consulService = cs } if e.ctx != nil { e.interpolateServices(e.ctx.Task) } - err := e.consulService.SyncTask(e.ctx.Task) + err := e.consulService.SyncServices(e.ctx.Task.Services) go e.consulService.PeriodicSync() return err }