diff --git a/client/client.go b/client/client.go index 3192318ad..cf1dbbf76 100644 --- a/client/client.go +++ b/client/client.go @@ -1205,7 +1205,7 @@ func (c *Client) syncConsul() { } } if err := c.consulService.KeepServices(runningTasks); err != nil { - c.logger.Printf("[DEBUG] error removing services from non-running tasks: %v", err) + c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) } case <-c.shutdownCh: c.logger.Printf("[INFO] client: shutting down consul sync") diff --git a/client/consul/sync.go b/client/consul/sync.go index 47c2a6e6d..b749c246e 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -27,7 +27,7 @@ type ConsulService struct { createCheck func(*structs.ServiceCheck, string) (Check, error) trackedServices map[string]*consul.AgentService - trackedChecks map[string]*structs.ServiceCheck + trackedChecks map[string]*consul.AgentCheckRegistration execChecks *checkHeap logger *log.Logger @@ -96,7 +96,7 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) allocID: allocID, logger: logger, trackedServices: make(map[string]*consul.AgentService), - trackedChecks: make(map[string]*structs.ServiceCheck), + trackedChecks: make(map[string]*consul.AgentCheckRegistration), execChecks: NewConsulChecksHeap(), shutdownCh: make(chan struct{}), @@ -116,7 +116,7 @@ func (c *ConsulService) SyncTask(task *structs.Task) error { var mErr multierror.Error c.task = task taskServices := make(map[string]*consul.AgentService) - taskChecks := make(map[string]*structs.ServiceCheck) + taskChecks := make(map[string]*consul.AgentCheckRegistration) // Register Services and Checks that we don't know about or has changed for _, service := range task.Services { @@ -135,14 +135,18 @@ func (c *ConsulService) SyncTask(task *structs.Task) error { taskServices[srv.ID] = srv for _, chk := range service.Checks { - checkID := chk.Hash(srv.ID) - if _, ok := c.trackedChecks[checkID]; !ok { - if err := c.registerCheck(chk, srv); err != nil { + chkReg, err := c.createCheckReg(chk, srv) + if err != nil { + mErr.Errors = append(mErr.Errors, err) + continue + } + if _, ok := c.trackedChecks[chkReg.ID]; !ok { + if err := c.registerCheck(chkReg); err != nil { mErr.Errors = append(mErr.Errors, err) } } - c.trackedChecks[checkID] = chk - taskChecks[checkID] = chk + c.trackedChecks[chkReg.ID] = chkReg + taskChecks[chkReg.ID] = chkReg } } @@ -218,7 +222,11 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error { } // registerCheck registers a check definition with Consul -func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *consul.AgentService) error { +func (c *ConsulService) registerCheck(chkReg *consul.AgentCheckRegistration) error { + return c.client.Agent().CheckRegister(chkReg) +} + +func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *consul.AgentService) (*consul.AgentCheckRegistration, error) { chkReg := consul.AgentCheckRegistration{ ID: check.Hash(service.ID), Name: check.Name, @@ -240,12 +248,14 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons case structs.ServiceCheckTCP: chkReg.TCP = fmt.Sprintf("%s:%d", service.Address, service.Port) case structs.ServiceCheckScript: - chkReg.TTL = (check.Interval + 30*time.Second).String() + chkReg.TTL = (check.Interval + 31*time.Second).String() + default: + return nil, fmt.Errorf("check type %q not valid", check.Type) } if _, ok := c.delegateChecks[check.Type]; !ok { chk, err := c.createCheck(check, chkReg.ID) if err != nil { - return err + return nil, err } if err := c.execChecks.Push(chk, time.Now().Add(check.Interval)); err != nil { c.logger.Printf("[ERR] consulservice: unable to add check %q to heap", chk.ID()) @@ -255,7 +265,7 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons default: } } - return c.client.Agent().CheckRegister(&chkReg) + return &chkReg, nil } // createService creates a Consul AgentService from a Nomad Service @@ -358,8 +368,8 @@ func (c *ConsulService) performSync() error { } } for checkID, check := range c.trackedChecks { - if chk, ok := cChecks[checkID]; !ok { - if err := c.registerCheck(check, c.trackedServices[chk.ServiceID]); err != nil { + if _, ok := cChecks[checkID]; !ok { + if err := c.registerCheck(check); err != nil { mErr.Errors = append(mErr.Errors, err) } } diff --git a/client/driver/docker.go b/client/driver/docker.go index 2c0d5accf..21a8edfc6 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -646,7 +646,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle waitCh: make(chan *cstructs.WaitResult, 1), } if err := exec.RegisterServices(); err != nil { - d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %v", task) + d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() return h, nil diff --git a/client/driver/exec.go b/client/driver/exec.go index f1ada4492..d8dd8ef99 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -143,7 +143,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, waitCh: make(chan *cstructs.WaitResult, 1), } if err := exec.RegisterServices(); err != nil { - d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %v", task) + d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() return h, nil diff --git a/client/driver/java.go b/client/driver/java.go index 87c95b60f..ea7d9bc67 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -202,7 +202,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, waitCh: make(chan *cstructs.WaitResult, 1), } if err := h.executor.RegisterServices(); err != nil { - d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %v", task) + d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() return h, nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 8cc077c5b..42adbfaac 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -225,7 +225,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } if err := h.executor.RegisterServices(); err != nil { - h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err) + h.logger.Printf("[ERR] driver.qemu: error registering services for task: %q: %v", task.Name, err) } go h.run() return h, nil diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 17a4950ea..1afe84ad1 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -135,7 +135,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl waitCh: make(chan *cstructs.WaitResult, 1), } if err := h.executor.RegisterServices(); err != nil { - h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err) + h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() return h, nil diff --git a/client/driver/rkt.go b/client/driver/rkt.go index dfbd89872..2a0939d04 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -270,7 +270,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e waitCh: make(chan *cstructs.WaitResult, 1), } if h.executor.RegisterServices(); err != nil { - h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err) + h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err) } go h.run() return h, nil