Added an impl for Nomad Checks

This commit is contained in:
Diptanu Choudhury
2016-03-24 19:00:24 -07:00
parent 8342d7dd71
commit 62249fe79f
8 changed files with 142 additions and 160 deletions

View File

@@ -19,7 +19,8 @@ type ServiceCheck struct {
Id string
Name string
Type string
Script string
Cmd string
Args []string
Path string
Protocol string
Interval time.Duration

View File

@@ -1204,9 +1204,9 @@ func (c *Client) syncConsul() {
}
}
}
if err := c.consulService.KeepServices(runningTasks); err != nil {
c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
}
//if err := c.consulService.KeepServices(runningTasks); err != nil {
// c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err)
//}
case <-c.shutdownCh:
c.logger.Printf("[INFO] client: shutting down consul sync")
return

View File

@@ -1,131 +1,84 @@
package consul
import (
"container/heap"
"fmt"
"log"
"math/rand"
"sync"
"time"
cstructs "github.com/hashicorp/nomad/client/driver/structs"
)
type NomadCheck struct {
check Check
runCheck func(Check)
logger *log.Logger
stop bool
stopCh chan struct{}
stopLock sync.Mutex
started bool
startedLock sync.Mutex
}
func NewNomadCheck(check Check, runCheck func(Check), logger *log.Logger) *NomadCheck {
nc := NomadCheck{
check: check,
runCheck: runCheck,
logger: logger,
stopCh: make(chan struct{}),
}
return &nc
}
// Start is used to start a check monitor. Monitor runs until stop is called
func (n *NomadCheck) Start() {
n.startedLock.Lock()
if n.started {
return
}
n.started = true
n.stopLock.Lock()
defer n.stopLock.Unlock()
n.stop = false
n.stopCh = make(chan struct{})
go n.run()
}
// Stop is used to stop a check monitor.
func (n *NomadCheck) Stop() {
n.stopLock.Lock()
defer n.stopLock.Unlock()
if !n.stop {
n.stop = true
close(n.stopCh)
}
}
// run is invoked by a goroutine to run until Stop() is called
func (n *NomadCheck) run() {
// Get the randomized initial pause time
initialPauseTime := randomStagger(n.check.Interval())
n.logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, n.check.ID())
next := time.After(initialPauseTime)
for {
select {
case <-next:
n.runCheck(n.check)
next = time.After(n.check.Interval())
case <-n.stopCh:
return
}
}
}
type Check interface {
Run() *cstructs.CheckResult
ID() string
Interval() time.Duration
}
type consulCheck struct {
check Check
next time.Time
index int
}
type checkHeap struct {
index map[string]*consulCheck
heap checksHeapImp
}
func NewConsulChecksHeap() *checkHeap {
return &checkHeap{
index: make(map[string]*consulCheck),
heap: make(checksHeapImp, 0),
}
}
func (c *checkHeap) Push(check Check, next time.Time) error {
if _, ok := c.index[check.ID()]; ok {
return fmt.Errorf("check %v already exists", check.ID())
}
cCheck := &consulCheck{check, next, 0}
c.index[check.ID()] = cCheck
heap.Push(&c.heap, cCheck)
return nil
}
func (c *checkHeap) Pop() *consulCheck {
if len(c.heap) == 0 {
return nil
}
cCheck := heap.Pop(&c.heap).(*consulCheck)
delete(c.index, cCheck.check.ID())
return cCheck
}
func (c *checkHeap) Peek() *consulCheck {
if len(c.heap) == 0 {
return nil
}
return c.heap[0]
}
func (c *checkHeap) Contains(check Check) bool {
_, ok := c.index[check.ID()]
return ok
}
func (c *checkHeap) Update(check Check, next time.Time) error {
if cCheck, ok := c.index[check.ID()]; ok {
cCheck.check = check
cCheck.next = next
heap.Fix(&c.heap, cCheck.index)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", check.ID())
}
func (c *checkHeap) Remove(id string) error {
if cCheck, ok := c.index[id]; ok {
heap.Remove(&c.heap, cCheck.index)
delete(c.index, id)
return nil
}
return fmt.Errorf("heap doesn't contain check %v", id)
}
func (c *checkHeap) Len() int { return len(c.heap) }
type checksHeapImp []*consulCheck
func (h checksHeapImp) Len() int { return len(h) }
func (h checksHeapImp) Less(i, j int) bool {
// Two zero times should return false.
// Otherwise, zero is "greater" than any other time.
// (To sort it at the end of the list.)
// Sort such that zero times are at the end of the list.
iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero()
if iZero && jZero {
return false
} else if iZero {
return false
} else if jZero {
return true
}
return h[i].next.Before(h[j].next)
}
func (h checksHeapImp) Swap(i, j int) {
h[i], h[j] = h[j], h[i]
h[i].index = i
h[j].index = j
}
func (h *checksHeapImp) Push(x interface{}) {
n := len(*h)
check := x.(*consulCheck)
check.index = n
*h = append(*h, check)
}
func (h *checksHeapImp) Pop() interface{} {
old := *h
n := len(old)
check := old[n-1]
check.index = -1 // for safety
*h = old[0 : n-1]
return check
// Returns a random stagger interval between 0 and the duration
func randomStagger(intv time.Duration) time.Duration {
return time.Duration(uint64(rand.Int63()) % uint64(intv))
}

View File

@@ -28,11 +28,10 @@ type ConsulService struct {
trackedServices map[string]*consul.AgentService
trackedChecks map[string]*consul.AgentCheckRegistration
execChecks *checkHeap
nomadChecks map[string]*NomadCheck
logger *log.Logger
updateCh chan struct{}
shutdownCh chan struct{}
shutdown bool
shutdownLock sync.Mutex
@@ -97,10 +96,9 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string)
logger: logger,
trackedServices: make(map[string]*consul.AgentService),
trackedChecks: make(map[string]*consul.AgentCheckRegistration),
execChecks: NewConsulChecksHeap(),
nomadChecks: make(map[string]*NomadCheck),
shutdownCh: make(chan struct{}),
updateCh: make(chan struct{}),
}
return &consulService, nil
}
@@ -199,6 +197,9 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error {
// Indexing the services in the tasks
for _, task := range tasks {
for _, service := range task.Services {
fmt.Printf("DIPTANU SERVICE %#v\n", service)
fmt.Printf("DIPTANU TASK %#v\n", c.task)
fmt.Printf("DIPTANU SERVICES %#v\n", services)
services[service.ID(c.allocID, c.task.Name)] = struct{}{}
}
}
@@ -223,6 +224,9 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error {
// registerCheck registers a check definition with Consul
func (c *ConsulService) registerCheck(chkReg *consul.AgentCheckRegistration) error {
if nc, ok := c.nomadChecks[chkReg.ID]; ok {
nc.Start()
}
return c.client.Agent().CheckRegister(chkReg)
}
@@ -257,13 +261,8 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con
if err != nil {
return nil, err
}
if err := c.execChecks.Push(chk, time.Now().Add(check.Interval)); err != nil {
c.logger.Printf("[ERR] consulservice: unable to add check %q to heap", chk.ID())
}
select {
case c.updateCh <- struct{}{}:
default:
}
nc := NewNomadCheck(chk, c.runCheck, c.logger)
c.nomadChecks[chk.ID()] = nc
}
return &chkReg, nil
}
@@ -307,31 +306,27 @@ func (c *ConsulService) deregisterService(ID string) error {
// deregisterCheck de-registers a check with a given ID from Consul.
func (c *ConsulService) deregisterCheck(ID string) error {
if err := c.execChecks.Remove(ID); err != nil {
c.logger.Printf("[DEBUG] consulservice: unable to remove check with ID %q from heap", ID)
// Deleting the nomad check
if nc, ok := c.nomadChecks[ID]; ok {
nc.Stop()
delete(c.nomadChecks, ID)
}
// Deleteting from consul
return c.client.Agent().CheckDeregister(ID)
}
// PeriodicSync triggers periodic syncing of services and checks with Consul.
// This is a long lived go-routine which is stopped during shutdown
func (c *ConsulService) PeriodicSync() {
var runCheck <-chan time.Time
sync := time.After(syncInterval)
for {
runCheck = c.sleepBeforeRunningCheck()
select {
case <-sync:
if err := c.performSync(); err != nil {
c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err)
}
sync = time.After(syncInterval)
case <-c.updateCh:
continue
case <-runCheck:
chk := c.execChecks.heap.Pop().(consulCheck)
runCheck = c.sleepBeforeRunningCheck()
c.runCheck(chk.check)
case <-c.shutdownCh:
c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name)
return
@@ -339,13 +334,6 @@ func (c *ConsulService) PeriodicSync() {
}
}
func (c *ConsulService) sleepBeforeRunningCheck() <-chan time.Time {
if c := c.execChecks.Peek(); c != nil {
return time.After(time.Now().Sub(c.next))
}
return nil
}
// performSync sync the services and checks we are tracking with Consul.
func (c *ConsulService) performSync() error {
var mErr multierror.Error
@@ -411,14 +399,19 @@ func (c *ConsulService) consulPresent() bool {
// runCheck runs a check and updates the corresponding ttl check in consul
func (c *ConsulService) runCheck(check Check) {
res := check.Run()
state := consul.HealthCritical
output := res.Output
if res.Err != nil {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical)
output = res.Err.Error()
}
if res.ExitCode == 0 {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthPassing)
state = consul.HealthPassing
}
if res.ExitCode == 1 {
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthWarning)
state = consul.HealthWarning
}
if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil {
c.logger.Printf("[DEBUG] error updating ttl check for check %q: %v", check.ID(), err)
}
c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical)
}

View File

@@ -22,6 +22,7 @@ var (
type DockerScriptCheck struct {
id string
interval time.Duration
containerID string
logger *log.Logger
cmd string
@@ -42,16 +43,16 @@ func (d *DockerScriptCheck) dockerClient() (*docker.Client, error) {
createClient.Do(func() {
if d.dockerEndpoint != "" {
if d.tlsCert+d.tlsKey+d.tlsCa != "" {
d.logger.Printf("[DEBUG] driver.docker: using TLS client connection to %s", d.dockerEndpoint)
d.logger.Printf("[DEBUG] executor.checks: using TLS client connection to %s", d.dockerEndpoint)
client, err = docker.NewTLSClient(d.dockerEndpoint, d.tlsCert, d.tlsKey, d.tlsCa)
} else {
d.logger.Printf("[DEBUG] driver.docker: using standard client connection to %s", d.dockerEndpoint)
d.logger.Printf("[DEBUG] executor.checks: using standard client connection to %s", d.dockerEndpoint)
client, err = docker.NewClient(d.dockerEndpoint)
}
return
}
d.logger.Println("[DEBUG] driver.docker: using client connection initialized from environment")
d.logger.Println("[DEBUG] executor.checks: using client connection initialized from environment")
client, err = docker.NewClientFromEnv()
})
return client, err
@@ -106,11 +107,16 @@ func (d *DockerScriptCheck) ID() string {
return d.id
}
func (d *DockerScriptCheck) Interval() time.Duration {
return d.interval
}
type ExecScriptCheck struct {
id string
cmd string
args []string
taskDir string
id string
interval time.Duration
cmd string
args []string
taskDir string
FSIsolation bool
}
@@ -152,3 +158,7 @@ func (e *ExecScriptCheck) Run() *cstructs.CheckResult {
func (e *ExecScriptCheck) ID() string {
return e.id
}
func (e *ExecScriptCheck) Interval() time.Duration {
return e.interval
}

View File

@@ -3,8 +3,29 @@ package executor
import (
"strings"
"testing"
docker "github.com/fsouza/go-dockerclient"
)
// dockerIsConnected checks to see if a docker daemon is available (local or remote)
func dockerIsConnected(t *testing.T) bool {
client, err := docker.NewClientFromEnv()
if err != nil {
return false
}
// Creating a client doesn't actually connect, so make sure we do something
// like call Version() on it.
env, err := client.Version()
if err != nil {
t.Logf("Failed to connect to docker daemon: %s", err)
return false
}
t.Logf("Successfully connected to docker daemon running version %s", env.Get("Version"))
return true
}
func TestExecScriptCheckNoIsolation(t *testing.T) {
check := &ExecScriptCheck{
id: "foo",

View File

@@ -515,6 +515,7 @@ func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID str
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" {
return &DockerScriptCheck{
id: checkID,
interval: check.Interval,
containerID: e.consulCtx.ContainerID,
logger: e.logger,
cmd: check.Cmd,
@@ -525,6 +526,7 @@ func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID str
if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "exec" {
return &ExecScriptCheck{
id: checkID,
interval: check.Interval,
cmd: check.Cmd,
args: check.Args,
taskDir: e.taskDir,

View File

@@ -750,6 +750,8 @@ func parseChecks(service *structs.Service, checkObjs *ast.ObjectList) error {
"timeout",
"path",
"protocol",
"cmd",
"args",
}
if err := checkHCLKeys(co.Val, valid); err != nil {
return multierror.Prefix(err, "check ->")