mirror of
https://github.com/kemko/nomad.git
synced 2026-01-10 12:25:42 +03:00
Fixed merge conflicts
This commit is contained in:
@@ -1205,7 +1205,7 @@ func (c *Client) syncConsul() {
|
||||
}
|
||||
}
|
||||
if err := c.consulService.KeepServices(runningTasks); err != nil {
|
||||
c.logger.Printf("[DEBUG] error removing services from non-running tasks: %v", err)
|
||||
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
|
||||
}
|
||||
case <-c.shutdownCh:
|
||||
c.logger.Printf("[INFO] client: shutting down consul sync")
|
||||
|
||||
@@ -27,7 +27,7 @@ type ConsulService struct {
|
||||
createCheck func(*structs.ServiceCheck, string) (Check, error)
|
||||
|
||||
trackedServices map[string]*consul.AgentService
|
||||
trackedChecks map[string]*structs.ServiceCheck
|
||||
trackedChecks map[string]*consul.AgentCheckRegistration
|
||||
execChecks *checkHeap
|
||||
|
||||
logger *log.Logger
|
||||
@@ -96,7 +96,7 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
|
||||
allocID: allocID,
|
||||
logger: logger,
|
||||
trackedServices: make(map[string]*consul.AgentService),
|
||||
trackedChecks: make(map[string]*structs.ServiceCheck),
|
||||
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
|
||||
execChecks: NewConsulChecksHeap(),
|
||||
|
||||
shutdownCh: make(chan struct{}),
|
||||
@@ -116,7 +116,7 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
||||
var mErr multierror.Error
|
||||
c.task = task
|
||||
taskServices := make(map[string]*consul.AgentService)
|
||||
taskChecks := make(map[string]*structs.ServiceCheck)
|
||||
taskChecks := make(map[string]*consul.AgentCheckRegistration)
|
||||
|
||||
// Register Services and Checks that we don't know about or has changed
|
||||
for _, service := range task.Services {
|
||||
@@ -135,14 +135,18 @@ func (c *ConsulService) SyncTask(task *structs.Task) error {
|
||||
taskServices[srv.ID] = srv
|
||||
|
||||
for _, chk := range service.Checks {
|
||||
checkID := chk.Hash(srv.ID)
|
||||
if _, ok := c.trackedChecks[checkID]; !ok {
|
||||
if err := c.registerCheck(chk, srv); err != nil {
|
||||
chkReg, err := c.createCheckReg(chk, srv)
|
||||
if err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
continue
|
||||
}
|
||||
if _, ok := c.trackedChecks[chkReg.ID]; !ok {
|
||||
if err := c.registerCheck(chkReg); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
c.trackedChecks[checkID] = chk
|
||||
taskChecks[checkID] = chk
|
||||
c.trackedChecks[chkReg.ID] = chkReg
|
||||
taskChecks[chkReg.ID] = chkReg
|
||||
}
|
||||
}
|
||||
|
||||
@@ -218,7 +222,11 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error {
|
||||
}
|
||||
|
||||
// registerCheck registers a check definition with Consul
|
||||
func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *consul.AgentService) error {
|
||||
func (c *ConsulService) registerCheck(chkReg *consul.AgentCheckRegistration) error {
|
||||
return c.client.Agent().CheckRegister(chkReg)
|
||||
}
|
||||
|
||||
func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *consul.AgentService) (*consul.AgentCheckRegistration, error) {
|
||||
chkReg := consul.AgentCheckRegistration{
|
||||
ID: check.Hash(service.ID),
|
||||
Name: check.Name,
|
||||
@@ -240,12 +248,14 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons
|
||||
case structs.ServiceCheckTCP:
|
||||
chkReg.TCP = fmt.Sprintf("%s:%d", service.Address, service.Port)
|
||||
case structs.ServiceCheckScript:
|
||||
chkReg.TTL = (check.Interval + 30*time.Second).String()
|
||||
chkReg.TTL = (check.Interval + 31*time.Second).String()
|
||||
default:
|
||||
return nil, fmt.Errorf("check type %q not valid", check.Type)
|
||||
}
|
||||
if _, ok := c.delegateChecks[check.Type]; !ok {
|
||||
chk, err := c.createCheck(check, chkReg.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
return nil, err
|
||||
}
|
||||
if err := c.execChecks.Push(chk, time.Now().Add(check.Interval)); err != nil {
|
||||
c.logger.Printf("[ERR] consulservice: unable to add check %q to heap", chk.ID())
|
||||
@@ -255,7 +265,7 @@ func (c *ConsulService) registerCheck(check *structs.ServiceCheck, service *cons
|
||||
default:
|
||||
}
|
||||
}
|
||||
return c.client.Agent().CheckRegister(&chkReg)
|
||||
return &chkReg, nil
|
||||
}
|
||||
|
||||
// createService creates a Consul AgentService from a Nomad Service
|
||||
@@ -358,8 +368,8 @@ func (c *ConsulService) performSync() error {
|
||||
}
|
||||
}
|
||||
for checkID, check := range c.trackedChecks {
|
||||
if chk, ok := cChecks[checkID]; !ok {
|
||||
if err := c.registerCheck(check, c.trackedServices[chk.ServiceID]); err != nil {
|
||||
if _, ok := cChecks[checkID]; !ok {
|
||||
if err := c.registerCheck(check); err != nil {
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -646,7 +646,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if err := exec.RegisterServices(); err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %v", task)
|
||||
d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
@@ -143,7 +143,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if err := exec.RegisterServices(); err != nil {
|
||||
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %v", task)
|
||||
d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
@@ -202,7 +202,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if err := h.executor.RegisterServices(); err != nil {
|
||||
d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %v", task)
|
||||
d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
@@ -225,7 +225,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
|
||||
}
|
||||
|
||||
if err := h.executor.RegisterServices(); err != nil {
|
||||
h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err)
|
||||
h.logger.Printf("[ERR] driver.qemu: error registering services for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
@@ -135,7 +135,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if err := h.executor.RegisterServices(); err != nil {
|
||||
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err)
|
||||
h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
@@ -270,7 +270,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
|
||||
waitCh: make(chan *cstructs.WaitResult, 1),
|
||||
}
|
||||
if h.executor.RegisterServices(); err != nil {
|
||||
h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err)
|
||||
h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err)
|
||||
}
|
||||
go h.run()
|
||||
return h, nil
|
||||
|
||||
Reference in New Issue
Block a user