From 42dc8bea16cab6003f17c7bbca907629f507211d Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 24 Mar 2016 10:06:40 -0700 Subject: [PATCH] Added a check type for consul service to delegate certain checks --- client/consul/check.go | 10 +++++ .../executor => consul}/checks_test.go | 2 +- client/consul/sync.go | 12 +++++- client/driver/docker.go | 1 + client/driver/exec.go | 1 + client/driver/executor/checks.go | 22 +++++------ client/driver/executor/executor.go | 38 +++++++++++++++++++ client/driver/java.go | 1 + client/driver/qemu.go | 1 + client/driver/raw_exec.go | 1 + client/driver/rkt.go | 1 + nomad/structs/structs.go | 9 +++-- 12 files changed, 79 insertions(+), 20 deletions(-) create mode 100644 client/consul/check.go rename client/{driver/executor => consul}/checks_test.go (97%) diff --git a/client/consul/check.go b/client/consul/check.go new file mode 100644 index 000000000..ac3604893 --- /dev/null +++ b/client/consul/check.go @@ -0,0 +1,10 @@ +package consul + +import ( + cstructs "github.com/hashicorp/nomad/client/driver/structs" +) + +type Check interface { + Run() *cstructs.CheckResult + ID() string +} diff --git a/client/driver/executor/checks_test.go b/client/consul/checks_test.go similarity index 97% rename from client/driver/executor/checks_test.go rename to client/consul/checks_test.go index c959e7ee0..6354084f9 100644 --- a/client/driver/executor/checks_test.go +++ b/client/consul/checks_test.go @@ -1,4 +1,4 @@ -package executor +package consul import ( "reflect" diff --git a/client/consul/sync.go b/client/consul/sync.go index dc1410886..e1cfb07e4 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -21,8 +21,10 @@ import ( type ConsulService struct { client *consul.Client - task *structs.Task - allocID string + task *structs.Task + allocID string + delegateChecks map[string]struct{} + createCheck func(structs.ServiceCheck, string) (Check, error) trackedServices map[string]*consul.AgentService trackedChecks map[string]*structs.ServiceCheck @@ -99,6 +101,12 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) return &consulService, nil } +func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, createCheck func(structs.ServiceCheck, string) (Check, error)) *ConsulService { + c.delegateChecks = delegateChecks + c.createCheck = createCheck + return c +} + // SyncTask sync the services and task with consul func (c *ConsulService) SyncTask(task *structs.Task) error { var mErr multierror.Error diff --git a/client/driver/docker.go b/client/driver/docker.go index cf518a6a7..2c0d5accf 100644 --- a/client/driver/docker.go +++ b/client/driver/docker.go @@ -545,6 +545,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, Task: task, + Driver: "docker", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, PortLowerBound: d.config.ClientMinPort, diff --git a/client/driver/exec.go b/client/driver/exec.go index c6bb8b0e2..f1ada4492 100644 --- a/client/driver/exec.go +++ b/client/driver/exec.go @@ -107,6 +107,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, + Driver: "exec", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, Task: task, diff --git a/client/driver/executor/checks.go b/client/driver/executor/checks.go index dc46d1062..cb035f708 100644 --- a/client/driver/executor/checks.go +++ b/client/driver/executor/checks.go @@ -11,20 +11,17 @@ import ( "github.com/armon/circbuf" docker "github.com/fsouza/go-dockerclient" + "github.com/hashicorp/nomad/client/consul" cstructs "github.com/hashicorp/nomad/client/driver/structs" ) -type Check interface { - Run() *cstructs.CheckResult - ID() string -} - type DockerScriptCheck struct { id string containerID string client *docker.Client logger *log.Logger - script []string + cmd string + args []string } func (d *DockerScriptCheck) Run() *cstructs.CheckResult { @@ -33,7 +30,7 @@ func (d *DockerScriptCheck) Run() *cstructs.CheckResult { AttachStdout: true, AttachStderr: true, Tty: false, - Cmd: d.script, + Cmd: append([]string{d.cmd}, d.args...), Container: d.containerID, } var ( @@ -77,7 +74,6 @@ type ExecScriptCheck struct { args []string taskDir string - ctx *ExecutorContext FSIsolation bool } @@ -120,7 +116,7 @@ func (e *ExecScriptCheck) ID() string { } type consulCheck struct { - check Check + check consul.Check next time.Time index int } @@ -137,7 +133,7 @@ func NewConsulChecksHeap() *checkHeap { } } -func (c *checkHeap) Push(check Check, next time.Time) error { +func (c *checkHeap) Push(check consul.Check, next time.Time) error { if _, ok := c.index[check.ID()]; ok { return fmt.Errorf("check %v already exists", check.ID()) } @@ -166,12 +162,12 @@ func (c *checkHeap) Peek() *consulCheck { return c.heap[0] } -func (c *checkHeap) Contains(check Check) bool { +func (c *checkHeap) Contains(check consul.Check) bool { _, ok := c.index[check.ID()] return ok } -func (c *checkHeap) Update(check Check, next time.Time) error { +func (c *checkHeap) Update(check consul.Check, next time.Time) error { if cCheck, ok := c.index[check.ID()]; ok { cCheck.check = check cCheck.next = next @@ -182,7 +178,7 @@ func (c *checkHeap) Update(check Check, next time.Time) error { return fmt.Errorf("heap doesn't contain check %v", check.ID()) } -func (c *checkHeap) Remove(check Check) error { +func (c *checkHeap) Remove(check consul.Check) error { if cCheck, ok := c.index[check.ID()]; ok { heap.Remove(&c.heap, cCheck.index) delete(c.index, check.ID()) diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 5e0762253..23da11f24 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -55,6 +55,12 @@ type ExecutorContext struct { // AllocID is the allocation id to which the task belongs AllocID string + // Driver is the name of the driver that invoked the executor + Driver string + + // ContainerID is the ID of the container + ContainerID string + // PortUpperBound is the upper bound of the ports that we can use to start // the syslog server PortUpperBound uint @@ -359,6 +365,7 @@ func (e *UniversalExecutor) RegisterServices() error { if err != nil { return err } + cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck) e.consulService = cs } err := e.consulService.SyncTask(e.ctx.Task) @@ -478,3 +485,34 @@ func (e *UniversalExecutor) listenerUnix() (net.Listener, error) { return net.Listen("unix", path) } + +func (e *UniversalExecutor) createCheckMap() map[string]struct{} { + checks := map[string]struct{}{ + "script": struct{}{}, + } + return checks +} + +func (e *UniversalExecutor) createCheck(check structs.ServiceCheck, checkID string) (consul.Check, error) { + if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" { + return &DockerScriptCheck{ + id: checkID, + containerID: e.ctx.ContainerID, + logger: e.logger, + cmd: check.Cmd, + args: check.Args, + }, nil + } + + if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "exec" { + return &ExecScriptCheck{ + id: checkID, + cmd: check.Cmd, + args: check.Args, + taskDir: e.taskDir, + FSIsolation: e.command.FSIsolation, + }, nil + + } + return nil, fmt.Errorf("couldn't create check for %v", check.Name) +} diff --git a/client/driver/java.go b/client/driver/java.go index d980253a8..87c95b60f 100644 --- a/client/driver/java.go +++ b/client/driver/java.go @@ -160,6 +160,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, + Driver: "java", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, Task: task, diff --git a/client/driver/qemu.go b/client/driver/qemu.go index 1d1ac80ff..8cc077c5b 100644 --- a/client/driver/qemu.go +++ b/client/driver/qemu.go @@ -192,6 +192,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, } executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, + Driver: "qemu", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, Task: task, diff --git a/client/driver/raw_exec.go b/client/driver/raw_exec.go index 41767cea5..17a4950ea 100644 --- a/client/driver/raw_exec.go +++ b/client/driver/raw_exec.go @@ -102,6 +102,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl } executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, + Driver: "raw_exec", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, Task: task, diff --git a/client/driver/rkt.go b/client/driver/rkt.go index 09870a2e5..dfbd89872 100644 --- a/client/driver/rkt.go +++ b/client/driver/rkt.go @@ -234,6 +234,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e } executorCtx := &executor.ExecutorContext{ TaskEnv: d.taskEnv, + Driver: "rkt", AllocDir: ctx.AllocDir, AllocID: ctx.AllocID, Task: task, diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 065b07d1f..ab5968475 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -1420,7 +1420,8 @@ const ( type ServiceCheck struct { Name string // Name of the check, defaults to id Type string // Type of the check - tcp, http, docker and script - Script string // Script to invoke for script check + Cmd string // Cmd is the command to run for script checks + Args []string // Args is a list of argumes for script checks Path string // path of the health check url for http type check Protocol string // Protocol to use if check is http, defaults to http Interval time.Duration // Interval of the check @@ -1445,7 +1446,7 @@ func (sc *ServiceCheck) Validate() error { return fmt.Errorf("service checks of http type must have a valid http path") } - if sc.Type == ServiceCheckScript && sc.Script == "" { + if sc.Type == ServiceCheckScript && sc.Cmd == "" { return fmt.Errorf("service checks of script type must have a valid script path") } @@ -1460,8 +1461,8 @@ func (sc *ServiceCheck) Hash(serviceID string) string { io.WriteString(h, serviceID) io.WriteString(h, sc.Name) io.WriteString(h, sc.Type) - io.WriteString(h, sc.Script) - io.WriteString(h, sc.Path) + io.WriteString(h, sc.Cmd) + io.WriteString(h, strings.Join(sc.Args, "")) io.WriteString(h, sc.Path) io.WriteString(h, sc.Protocol) io.WriteString(h, sc.Interval.String())