diff --git a/client/consul/sync.go b/client/consul/sync.go index b749c246e..fb34aa79d 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -194,7 +194,7 @@ func (c *ConsulService) Shutdown() error { // of tasks passed to it func (c *ConsulService) KeepServices(tasks []*structs.Task) error { var mErr multierror.Error - var services map[string]struct{} + services := make(map[string]struct{}) // Indexing the services in the tasks for _, task := range tasks { @@ -252,7 +252,7 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con default: return nil, fmt.Errorf("check type %q not valid", check.Type) } - if _, ok := c.delegateChecks[check.Type]; !ok { + if _, ok := c.delegateChecks[check.Type]; ok { chk, err := c.createCheck(check, chkReg.ID) if err != nil { return nil, err diff --git a/client/driver/docker.go b/client/driver/docker.go index 21a8edfc6..d050e664e 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -550,7 +550,6 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle AllocID: ctx.AllocID, PortLowerBound: d.config.ClientMinPort, PortUpperBound: d.config.ClientMaxPort, - ConsulConfig: consulConfig(d.config), } ss, err := exec.LaunchSyslogServer(executorCtx) if err != nil { @@ -645,7 +644,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - if err := exec.RegisterServices(); err != nil { + if err := exec.SyncServices(consulContext(d.config, container.ID)); err != nil { d.logger.Printf("[ERR] driver.docker: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() @@ -717,6 +716,10 @@ func (d *DockerDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, er doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := exec.SyncServices(consulContext(d.config, pid.ContainerID)); err != nil { + h.logger.Printf("[ERR] driver.docker: error registering services with consul: %v", err) + } + go h.run() return h, nil } diff --git a/client/driver/exec.go b/client/driver/exec.go index d8dd8ef99..590bb90d2 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -106,12 +106,11 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - Driver: "exec", - AllocDir: ctx.AllocDir, - AllocID: ctx.AllocID, - Task: task, - ConsulConfig: consulConfig(d.config), + TaskEnv: d.taskEnv, + Driver: "exec", + AllocDir: ctx.AllocDir, + AllocID: ctx.AllocID, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{ @@ -142,7 +141,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - if err := exec.RegisterServices(); err != nil { + if err := exec.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.exec: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() @@ -202,6 +201,9 @@ func (d *ExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := exec.SyncServices(consulContext(d.config, "")); err != nil { + d.logger.Printf("[ERR] driver.exec: error registering services with consul: %v", err) + } go h.run() return h, nil } diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 811875e55..646caa9f1 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -35,10 +35,19 @@ type Executor interface { Exit() error UpdateLogConfig(logConfig *structs.LogConfig) error UpdateTask(task *structs.Task) error - RegisterServices() error + SyncServices(ctx *ConsulContext) error DeregisterServices() error } +// ConsulContext holds context to configure the consul client and run checks +type ConsulContext struct { + // ConsulConfig is the configuration used to create a consul client + ConsulConfig *consul.ConsulConfig + + // ContainerID is the ID of the container + ContainerID string +} + // ExecutorContext holds context to configure the command user // wants to run and isolate it type ExecutorContext struct { @@ -58,9 +67,6 @@ type ExecutorContext struct { // Driver is the name of the driver that invoked the executor Driver string - // ContainerID is the ID of the container - ContainerID string - // PortUpperBound is the upper bound of the ports that we can use to start // the syslog server PortUpperBound uint @@ -68,9 +74,6 @@ type ExecutorContext struct { // PortLowerBound is the lower bound of the ports that we can use to start // the syslog server PortLowerBound uint - - // ConsulConfig is the configuration used to create a consul client - ConsulConfig *consul.ConsulConfig } // ExecCommand holds the user command, args, and other isolation related @@ -132,6 +135,7 @@ type UniversalExecutor struct { cgLock sync.Mutex consulService *consul.ConsulService + consulCtx *ConsulContext logger *log.Logger } @@ -358,10 +362,11 @@ func (e *UniversalExecutor) ShutDown() error { return nil } -func (e *UniversalExecutor) RegisterServices() error { +func (e *UniversalExecutor) SyncServices(ctx *ConsulContext) error { e.logger.Printf("[INFO] executor: registering services") + e.consulCtx = ctx if e.consulService == nil { - cs, err := consul.NewConsulService(e.ctx.ConsulConfig, e.logger, e.ctx.AllocID) + cs, err := consul.NewConsulService(ctx.ConsulConfig, e.logger, e.ctx.AllocID) if err != nil { return err } @@ -497,7 +502,7 @@ func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID str if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" { return &DockerScriptCheck{ id: checkID, - containerID: e.ctx.ContainerID, + containerID: e.consulCtx.ContainerID, logger: e.logger, cmd: check.Cmd, args: check.Args, diff --git a/client/driver/executor_plugin.go b/client/driver/executor_plugin.go index a55bb3d91..c8561f74c 100644 --- a/client/driver/executor_plugin.go +++ b/client/driver/executor_plugin.go @@ -34,6 +34,11 @@ type LaunchSyslogServerArgs struct { Ctx *executor.ExecutorContext } +// SyncServicesArgs wraps the consul context for the purposes of RPC +type SyncServicesArgs struct { + Ctx *executor.ConsulContext +} + func (e *ExecutorRPC) LaunchCmd(cmd *executor.ExecCommand, ctx *executor.ExecutorContext) (*executor.ProcessState, error) { var ps *executor.ProcessState err := e.client.Call("Plugin.LaunchCmd", LaunchCmdArgs{Cmd: cmd, Ctx: ctx}, &ps) @@ -68,8 +73,8 @@ func (e *ExecutorRPC) UpdateTask(task *structs.Task) error { return e.client.Call("Plugin.UpdateTask", task, new(interface{})) } -func (e *ExecutorRPC) RegisterServices() error { - return e.client.Call("Plugin.RegisterServices", new(interface{}), new(interface{})) +func (e *ExecutorRPC) SyncServices(ctx *executor.ConsulContext) error { + return e.client.Call("Plugin.SyncServices", SyncServicesArgs{Ctx: ctx}, new(interface{})) } func (e *ExecutorRPC) DeregisterServices() error { @@ -120,8 +125,8 @@ func (e *ExecutorRPCServer) UpdateTask(args *structs.Task, resp *interface{}) er return e.Impl.UpdateTask(args) } -func (e *ExecutorRPCServer) RegisterServices(args interface{}, resp *interface{}) error { - return e.Impl.RegisterServices() +func (e *ExecutorRPCServer) SyncServices(args SyncServicesArgs, resp *interface{}) error { + return e.Impl.SyncServices(args.Ctx) } func (e *ExecutorRPCServer) DeregisterServices(args interface{}, resp *interface{}) error { diff --git a/client/driver/java.go b/client/driver/java.go index ea7d9bc67..4e71ceb45 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -159,12 +159,11 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - Driver: "java", - AllocDir: ctx.AllocDir, - AllocID: ctx.AllocID, - Task: task, - ConsulConfig: consulConfig(d.config), + TaskEnv: d.taskEnv, + Driver: "java", + AllocDir: ctx.AllocDir, + AllocID: ctx.AllocID, + Task: task, } absPath, err := GetAbsolutePath("java") @@ -201,7 +200,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - if err := h.executor.RegisterServices(); err != nil { + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { d.logger.Printf("[ERR] driver.java: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() @@ -270,6 +269,9 @@ func (d *JavaDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { + d.logger.Printf("[ERR] driver.java: error registering services with consul: %v", err) + } go h.run() return h, nil diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 42adbfaac..e64633b88 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -191,12 +191,11 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - Driver: "qemu", - AllocDir: ctx.AllocDir, - AllocID: ctx.AllocID, - Task: task, - ConsulConfig: consulConfig(d.config), + TaskEnv: d.taskEnv, + Driver: "qemu", + AllocDir: ctx.AllocDir, + AllocID: ctx.AllocID, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{ Cmd: args[0], @@ -224,7 +223,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, waitCh: make(chan *cstructs.WaitResult, 1), } - if err := h.executor.RegisterServices(); err != nil { + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.qemu: error registering services for task: %q: %v", task.Name, err) } go h.run() @@ -272,6 +271,9 @@ func (d *QemuDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, erro doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { + h.logger.Printf("[ERR] driver.qemu: error registering services: %v", err) + } go h.run() return h, nil } diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 1afe84ad1..e1619ab0a 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -101,12 +101,11 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - Driver: "raw_exec", - AllocDir: ctx.AllocDir, - AllocID: ctx.AllocID, - Task: task, - ConsulConfig: consulConfig(d.config), + TaskEnv: d.taskEnv, + Driver: "raw_exec", + AllocDir: ctx.AllocDir, + AllocID: ctx.AllocID, + Task: task, } ps, err := exec.LaunchCmd(&executor.ExecCommand{ @@ -134,7 +133,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - if err := h.executor.RegisterServices(); err != nil { + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul for task: %q: %v", task.Name, err) } go h.run() @@ -181,6 +180,9 @@ func (d *RawExecDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, e doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } + if err := h.executor.SyncServices(consulContext(d.config, "")); err != nil { + h.logger.Printf("[ERR] driver.raw_exec: error registering services with consul: %v", err) + } go h.run() return h, nil } diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 2a0939d04..4398beb9a 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -233,12 +233,11 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e return nil, err } executorCtx := &executor.ExecutorContext{ - TaskEnv: d.taskEnv, - Driver: "rkt", - AllocDir: ctx.AllocDir, - AllocID: ctx.AllocID, - Task: task, - ConsulConfig: consulConfig(d.config), + TaskEnv: d.taskEnv, + Driver: "rkt", + AllocDir: ctx.AllocDir, + AllocID: ctx.AllocID, + Task: task, } absPath, err := GetAbsolutePath("rkt") @@ -269,7 +268,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - if h.executor.RegisterServices(); err != nil { + if h.executor.SyncServices(consulContext(d.config, "")); err != nil { h.logger.Printf("[ERR] driver.rkt: error registering services for task: %q: %v", task.Name, err) } go h.run() @@ -308,7 +307,9 @@ func (d *RktDriver) Open(ctx *ExecContext, handleID string) (DriverHandle, error doneCh: make(chan struct{}), waitCh: make(chan *cstructs.WaitResult, 1), } - + if h.executor.SyncServices(consulContext(d.config, "")); err != nil { + h.logger.Printf("[ERR] driver.rkt: error registering services: %v", err) + } go h.run() return h, nil } diff --git a/client/driver/utils.go b/client/driver/utils.go index b2a25364b..fa7bfe114 100644 --- a/client/driver/utils.go +++ b/client/driver/utils.go @@ -72,7 +72,7 @@ func createLogCollector(config *plugin.ClientConfig, w io.Writer, return logCollector, syslogClient, nil } -func consulConfig(clientConfig *config.Config) *consul.ConsulConfig { +func consulContext(clientConfig *config.Config, containerID string) *executor.ConsulContext { cfg := consul.ConsulConfig{ Addr: clientConfig.ReadDefault("consul.address", "127.0.0.1:8500"), Token: clientConfig.Read("consul.token"), @@ -80,7 +80,10 @@ func consulConfig(clientConfig *config.Config) *consul.ConsulConfig { EnableSSL: clientConfig.ReadBoolDefault("consul.ssl", false), VerifySSL: clientConfig.ReadBoolDefault("consul.verifyssl", true), } - return &cfg + return &executor.ConsulContext{ + ConsulConfig: &cfg, + ContainerID: containerID, + } } // killProcess kills a process with the given pid