Added a check type for consul service to delegate certain checks

This commit is contained in:
Diptanu Choudhury
2016-03-24 10:06:40 -07:00
parent 4faf16b5c0
commit 42dc8bea16
12 changed files with 79 additions and 20 deletions

10
client/consul/check.go Normal file
View File

@@ -0,0 +1,10 @@
package consul
import (
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
type Check interface {
Run() *cstructs.CheckResult
ID() string
}

View File

@@ -1,4 +1,4 @@
package executor
package consul
import (
"reflect"

View File

@@ -21,8 +21,10 @@ import (
type ConsulService struct {
client *consul.Client
task *structs.Task
allocID string
task *structs.Task
allocID string
delegateChecks map[string]struct{}
createCheck func(structs.ServiceCheck, string) (Check, error)
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*structs.ServiceCheck
@@ -99,6 +101,12 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
return &consulService, nil
}
func (c *ConsulService) SetDelegatedChecks(delegateChecks map[string]struct{}, createCheck func(structs.ServiceCheck, string) (Check, error)) *ConsulService {
c.delegateChecks = delegateChecks
c.createCheck = createCheck
return c
}
// SyncTask sync the services and task with consul
func (c *ConsulService) SyncTask(task *structs.Task) error {
var mErr multierror.Error

View File

@@ -545,6 +545,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Task: task,
Driver: "docker",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
PortLowerBound: d.config.ClientMinPort,

View File

@@ -107,6 +107,7 @@ func (d *ExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "exec",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
Task: task,

View File

@@ -11,20 +11,17 @@ import (
"github.com/armon/circbuf"
docker "github.com/fsouza/go-dockerclient"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
type Check interface {
Run() *cstructs.CheckResult
ID() string
}
type DockerScriptCheck struct {
id string
containerID string
client *docker.Client
logger *log.Logger
script []string
cmd string
args []string
}
func (d *DockerScriptCheck) Run() *cstructs.CheckResult {
@@ -33,7 +30,7 @@ func (d *DockerScriptCheck) Run() *cstructs.CheckResult {
AttachStdout: true,
AttachStderr: true,
Tty: false,
Cmd: d.script,
Cmd: append([]string{d.cmd}, d.args...),
Container: d.containerID,
}
var (
@@ -77,7 +74,6 @@ type ExecScriptCheck struct {
args []string
taskDir string
ctx *ExecutorContext
FSIsolation bool
}
@@ -120,7 +116,7 @@ func (e *ExecScriptCheck) ID() string {
}
type consulCheck struct {
check Check
check consul.Check
next time.Time
index int
}
@@ -137,7 +133,7 @@ func NewConsulChecksHeap() *checkHeap {
}
}
func (c *checkHeap) Push(check Check, next time.Time) error {
func (c *checkHeap) Push(check consul.Check, next time.Time) error {
if _, ok := c.index[check.ID()]; ok {
return fmt.Errorf("check %v already exists", check.ID())
}
@@ -166,12 +162,12 @@ func (c *checkHeap) Peek() *consulCheck {
return c.heap[0]
}
func (c *checkHeap) Contains(check Check) bool {
func (c *checkHeap) Contains(check consul.Check) bool {
_, ok := c.index[check.ID()]
return ok
}
func (c *checkHeap) Update(check Check, next time.Time) error {
func (c *checkHeap) Update(check consul.Check, next time.Time) error {
if cCheck, ok := c.index[check.ID()]; ok {
cCheck.check = check
cCheck.next = next
@@ -182,7 +178,7 @@ func (c *checkHeap) Update(check Check, next time.Time) error {
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Remove(check Check) error {
func (c *checkHeap) Remove(check consul.Check) error {
if cCheck, ok := c.index[check.ID()]; ok {
heap.Remove(&c.heap, cCheck.index)
delete(c.index, check.ID())

View File

@@ -55,6 +55,12 @@ type ExecutorContext struct {
// AllocID is the allocation id to which the task belongs
AllocID string
// Driver is the name of the driver that invoked the executor
Driver string
// ContainerID is the ID of the container
ContainerID string
// PortUpperBound is the upper bound of the ports that we can use to start
// the syslog server
PortUpperBound uint
@@ -359,6 +365,7 @@ func (e *UniversalExecutor) RegisterServices() error {
if err != nil {
return err
}
cs.SetDelegatedChecks(e.createCheckMap(), e.createCheck)
e.consulService = cs
}
err := e.consulService.SyncTask(e.ctx.Task)
@@ -478,3 +485,34 @@ func (e *UniversalExecutor) listenerUnix() (net.Listener, error) {
return net.Listen("unix", path)
}
func (e *UniversalExecutor) createCheckMap() map[string]struct{} {
checks := map[string]struct{}{
"script": struct{}{},
}
return checks
}
func (e *UniversalExecutor) createCheck(check structs.ServiceCheck, checkID string) (consul.Check, error) {
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" {
return &DockerScriptCheck{
id: checkID,
containerID: e.ctx.ContainerID,
logger: e.logger,
cmd: check.Cmd,
args: check.Args,
}, nil
}
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "exec" {
return &ExecScriptCheck{
id: checkID,
cmd: check.Cmd,
args: check.Args,
taskDir: e.taskDir,
FSIsolation: e.command.FSIsolation,
}, nil
}
return nil, fmt.Errorf("couldn't create check for %v", check.Name)
}

View File

@@ -160,6 +160,7 @@ func (d *JavaDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "java",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
Task: task,

View File

@@ -192,6 +192,7 @@ func (d *QemuDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle,
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "qemu",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
Task: task,

View File

@@ -102,6 +102,7 @@ func (d *RawExecDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandl
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "raw_exec",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
Task: task,

View File

@@ -234,6 +234,7 @@ func (d *RktDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, e
}
executorCtx := &executor.ExecutorContext{
TaskEnv: d.taskEnv,
Driver: "rkt",
AllocDir: ctx.AllocDir,
AllocID: ctx.AllocID,
Task: task,

View File

@@ -1420,7 +1420,8 @@ const (
type ServiceCheck struct {
Name string // Name of the check, defaults to id
Type string // Type of the check - tcp, http, docker and script
Script string // Script to invoke for script check
Cmd string // Cmd is the command to run for script checks
Args []string // Args is a list of argumes for script checks
Path string // path of the health check url for http type check
Protocol string // Protocol to use if check is http, defaults to http
Interval time.Duration // Interval of the check
@@ -1445,7 +1446,7 @@ func (sc *ServiceCheck) Validate() error {
return fmt.Errorf("service checks of http type must have a valid http path")
}
if sc.Type == ServiceCheckScript && sc.Script == "" {
if sc.Type == ServiceCheckScript && sc.Cmd == "" {
return fmt.Errorf("service checks of script type must have a valid script path")
}
@@ -1460,8 +1461,8 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
io.WriteString(h, serviceID)
io.WriteString(h, sc.Name)
io.WriteString(h, sc.Type)
io.WriteString(h, sc.Script)
io.WriteString(h, sc.Path)
io.WriteString(h, sc.Cmd)
io.WriteString(h, strings.Join(sc.Args, ""))
io.WriteString(h, sc.Path)
io.WriteString(h, sc.Protocol)
io.WriteString(h, sc.Interval.String())