Refactored the signature of NewConsulService

This commit is contained in:
Diptanu Choudhury
2016-04-12 01:55:19 -04:00
parent e5f0d7e531
commit 2663ef9d6a
4 changed files with 40 additions and 18 deletions

View File

@@ -1181,7 +1181,7 @@ func (c *Client) setupConsulClient() error {
VerifySSL: c.config.ReadBoolDefault("consul.verifyssl", true),
}
cs, err := consul.NewConsulService(&cfg, c.logger, "")
cs, err := consul.NewConsulService(&cfg, c.logger)
c.consulService = cs
return err
}

View File

@@ -22,10 +22,11 @@ type ConsulService struct {
client *consul.Client
availble bool
task *structs.Task
taskName string
allocID string
delegateChecks map[string]struct{}
createCheck func(*structs.ServiceCheck, string) (Check, error)
addrFinder func(string) (string, int)
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*consul.AgentCheckRegistration
@@ -60,7 +61,7 @@ const (
)
// NewConsulService returns a new ConsulService
func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) (*ConsulService, error) {
func NewConsulService(config *ConsulConfig, logger *log.Logger) (*ConsulService, error) {
var err error
var c *consul.Client
cfg := consul.DefaultConfig()
@@ -114,7 +115,6 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
}
consulService := ConsulService{
client: c,
allocID: allocID,
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
@@ -133,15 +133,30 @@ func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, c
return c
}
// SetAllocID sets the allocID
func (c *ConsulService) SetAllocID(allocID string) *ConsulService {
c.allocID = allocID
return c
}
func (c *ConsulService) SetAddrFinder(addrFinder func(string) (string, int)) *ConsulService {
c.addrFinder = addrFinder
return c
}
func (c *ConsulService) SetTaskName(taskName string) *ConsulService {
c.taskName = taskName
return c
}
// SyncTask sync the services and task with consul
func (c *ConsulService) SyncTask(task *structs.Task) error {
func (c *ConsulService) SyncServices(services []*structs.Service) error {
var mErr multierror.Error
c.task = task
taskServices := make(map[string]*consul.AgentService)
taskChecks := make(map[string]*consul.AgentCheckRegistration)
// Register Services and Checks that we don't know about or has changed
for _, service := range task.Services {
for _, service := range services {
srv, err := c.createService(service)
if err != nil {
mErr.Errors = append(mErr.Errors, err)
@@ -296,11 +311,11 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con
// createService creates a Consul AgentService from a Nomad Service
func (c *ConsulService) createService(service *structs.Service) (*consul.AgentService, error) {
srv := consul.AgentService{
ID: service.ID(c.allocID, c.task.Name),
ID: service.ID(c.allocID, c.taskName),
Service: service.Name,
Tags: service.Tags,
}
host, port := c.task.FindHostAndPortFor(service.PortLabel)
host, port := c.addrFinder(service.PortLabel)
if host != "" {
srv.Address = host
}
@@ -350,7 +365,7 @@ func (c *ConsulService) PeriodicSync() {
case <-sync.C:
if err := c.performSync(); err != nil {
if c.availble {
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err)
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.taskName, err)
}
c.availble = false
} else {
@@ -358,7 +373,7 @@ func (c *ConsulService) PeriodicSync() {
}
case <-c.shutdownCh:
sync.Stop()
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name)
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.taskName)
return
}
}

View File

@@ -12,6 +12,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
const (
allocID = "12"
)
var (
logger = log.New(os.Stdout, "", log.LstdFlags)
check1 = structs.ServiceCheck{
@@ -37,11 +41,11 @@ var (
)
func TestConsulServiceRegisterServices(t *testing.T) {
allocID := "12"
cs, err := NewConsulService(&ConsulConfig{}, logger, allocID)
cs, err := NewConsulService(&ConsulConfig{}, logger)
if err != nil {
t.Fatalf("Err: %v", err)
}
cs.SetAllocID(allocID)
// Skipping the test if consul isn't present
if !cs.consulPresent() {
return
@@ -63,11 +67,11 @@ func TestConsulServiceRegisterServices(t *testing.T) {
}
func TestConsulServiceUpdateService(t *testing.T) {
allocID := "12"
cs, err := NewConsulService(&ConsulConfig{}, logger, allocID)
cs, err := NewConsulService(&ConsulConfig{}, logger)
if err != nil {
t.Fatalf("Err: %v", err)
}
cs.SetAllocID(allocID)
// Skipping the test if consul isn't present
if !cs.consulPresent() {
return

View File

@@ -317,7 +317,7 @@ func (e *UniversalExecutor) UpdateTask(task *structs.Task) error {
// Re-syncing task with consul service
if e.consulService != nil {
if err := e.consulService.SyncTask(task); err != nil {
if err := e.consulService.SyncServices(task.Services); err != nil {
return err
}
}
@@ -431,17 +431,20 @@ func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error {
e.logger.Printf("[INFO] executor: registering services")
e.consulCtx = ctx
if e.consulService == nil {
cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID)
cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger)
if err != nil {
return err
}
cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
cs.SetAllocID(e.ctx.AllocID)
cs.SetTaskName(e.ctx.Task.Name)
cs.SetAddrFinder(e.ctx.Task.FindHostAndPortFor)
e.consulService = cs
}
if e.ctx != nil {
e.interpolateServices(e.ctx.Task)
}
err := e.consulService.SyncTask(e.ctx.Task)
err := e.consulService.SyncServices(e.ctx.Task.Services)
go e.consulService.PeriodicSync()
return err
}