From 62249fe79f4add61438ead08ce926f9c7bdb0390 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Thu, 24 Mar 2016 19:00:24 -0700 Subject: [PATCH] Added an impl for Nomad Checks --- api/tasks.go | 3 +- client/client.go | 6 +- client/consul/check.go | 187 ++++++++++---------------- client/consul/sync.go | 57 ++++---- client/driver/executor/checks.go | 24 +++- client/driver/executor/checks_test.go | 21 +++ client/driver/executor/executor.go | 2 + jobspec/parse.go | 2 + 8 files changed, 142 insertions(+), 160 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 3c18269ea..63ced39e4 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -19,7 +19,8 @@ type ServiceCheck struct { Id string Name string Type string - Script string + Cmd string + Args []string Path string Protocol string Interval time.Duration diff --git a/client/client.go b/client/client.go index cf1dbbf76..25472c342 100644 --- a/client/client.go +++ b/client/client.go @@ -1204,9 +1204,9 @@ func (c *Client) syncConsul() { } } } - if err := c.consulService.KeepServices(runningTasks); err != nil { - c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) - } + //if err := c.consulService.KeepServices(runningTasks); err != nil { + // c.logger.Printf("[DEBUG] client: error removing services from non-running tasks: %v", err) + //} case <-c.shutdownCh: c.logger.Printf("[INFO] client: shutting down consul sync") return diff --git a/client/consul/check.go b/client/consul/check.go index 37516b23d..d39dbce17 100644 --- a/client/consul/check.go +++ b/client/consul/check.go @@ -1,131 +1,84 @@ package consul import ( - "container/heap" - "fmt" + "log" + "math/rand" + "sync" "time" cstructs "github.com/hashicorp/nomad/client/driver/structs" ) +type NomadCheck struct { + check Check + runCheck func(Check) + logger *log.Logger + stop bool + stopCh chan struct{} + stopLock sync.Mutex + + started bool + startedLock sync.Mutex +} + +func NewNomadCheck(check Check, runCheck func(Check), logger *log.Logger) *NomadCheck { + nc := NomadCheck{ + check: check, + runCheck: runCheck, + logger: logger, + stopCh: make(chan struct{}), + } + return &nc +} + +// Start is used to start a check monitor. Monitor runs until stop is called +func (n *NomadCheck) Start() { + n.startedLock.Lock() + if n.started { + return + } + n.started = true + n.stopLock.Lock() + defer n.stopLock.Unlock() + n.stop = false + n.stopCh = make(chan struct{}) + go n.run() +} + +// Stop is used to stop a check monitor. +func (n *NomadCheck) Stop() { + n.stopLock.Lock() + defer n.stopLock.Unlock() + if !n.stop { + n.stop = true + close(n.stopCh) + } +} + +// run is invoked by a goroutine to run until Stop() is called +func (n *NomadCheck) run() { + // Get the randomized initial pause time + initialPauseTime := randomStagger(n.check.Interval()) + n.logger.Printf("[DEBUG] agent: pausing %v before first invocation of %s", initialPauseTime, n.check.ID()) + next := time.After(initialPauseTime) + for { + select { + case <-next: + n.runCheck(n.check) + next = time.After(n.check.Interval()) + case <-n.stopCh: + return + } + } +} + type Check interface { Run() *cstructs.CheckResult ID() string + Interval() time.Duration } -type consulCheck struct { - check Check - next time.Time - index int -} - -type checkHeap struct { - index map[string]*consulCheck - heap checksHeapImp -} - -func NewConsulChecksHeap() *checkHeap { - return &checkHeap{ - index: make(map[string]*consulCheck), - heap: make(checksHeapImp, 0), - } -} - -func (c *checkHeap) Push(check Check, next time.Time) error { - if _, ok := c.index[check.ID()]; ok { - return fmt.Errorf("check %v already exists", check.ID()) - } - - cCheck := &consulCheck{check, next, 0} - - c.index[check.ID()] = cCheck - heap.Push(&c.heap, cCheck) - return nil -} - -func (c *checkHeap) Pop() *consulCheck { - if len(c.heap) == 0 { - return nil - } - - cCheck := heap.Pop(&c.heap).(*consulCheck) - delete(c.index, cCheck.check.ID()) - return cCheck -} - -func (c *checkHeap) Peek() *consulCheck { - if len(c.heap) == 0 { - return nil - } - return c.heap[0] -} - -func (c *checkHeap) Contains(check Check) bool { - _, ok := c.index[check.ID()] - return ok -} - -func (c *checkHeap) Update(check Check, next time.Time) error { - if cCheck, ok := c.index[check.ID()]; ok { - cCheck.check = check - cCheck.next = next - heap.Fix(&c.heap, cCheck.index) - return nil - } - - return fmt.Errorf("heap doesn't contain check %v", check.ID()) -} - -func (c *checkHeap) Remove(id string) error { - if cCheck, ok := c.index[id]; ok { - heap.Remove(&c.heap, cCheck.index) - delete(c.index, id) - return nil - } - return fmt.Errorf("heap doesn't contain check %v", id) -} - -func (c *checkHeap) Len() int { return len(c.heap) } - -type checksHeapImp []*consulCheck - -func (h checksHeapImp) Len() int { return len(h) } - -func (h checksHeapImp) Less(i, j int) bool { - // Two zero times should return false. - // Otherwise, zero is "greater" than any other time. - // (To sort it at the end of the list.) - // Sort such that zero times are at the end of the list. - iZero, jZero := h[i].next.IsZero(), h[j].next.IsZero() - if iZero && jZero { - return false - } else if iZero { - return false - } else if jZero { - return true - } - - return h[i].next.Before(h[j].next) -} - -func (h checksHeapImp) Swap(i, j int) { - h[i], h[j] = h[j], h[i] - h[i].index = i - h[j].index = j -} - -func (h *checksHeapImp) Push(x interface{}) { - n := len(*h) - check := x.(*consulCheck) - check.index = n - *h = append(*h, check) -} - -func (h *checksHeapImp) Pop() interface{} { - old := *h - n := len(old) - check := old[n-1] - check.index = -1 // for safety - *h = old[0 : n-1] - return check +// Returns a random stagger interval between 0 and the duration +func randomStagger(intv time.Duration) time.Duration { + return time.Duration(uint64(rand.Int63()) % uint64(intv)) } diff --git a/client/consul/sync.go b/client/consul/sync.go index fb34aa79d..2418ecda2 100644 --- a/client/consul/sync.go +++ b/client/consul/sync.go @@ -28,11 +28,10 @@ type ConsulService struct { trackedServices map[string]*consul.AgentService trackedChecks map[string]*consul.AgentCheckRegistration - execChecks *checkHeap + nomadChecks map[string]*NomadCheck logger *log.Logger - updateCh chan struct{} shutdownCh chan struct{} shutdown bool shutdownLock sync.Mutex @@ -97,10 +96,9 @@ func NewConsulService(config *ConsulConfig, logger *log.Logger, allocID string) logger: logger, trackedServices: make(map[string]*consul.AgentService), trackedChecks: make(map[string]*consul.AgentCheckRegistration), - execChecks: NewConsulChecksHeap(), + nomadChecks: make(map[string]*NomadCheck), shutdownCh: make(chan struct{}), - updateCh: make(chan struct{}), } return &consulService, nil } @@ -199,6 +197,9 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error { // Indexing the services in the tasks for _, task := range tasks { for _, service := range task.Services { + fmt.Printf("DIPTANU SERVICE %#v\n", service) + fmt.Printf("DIPTANU TASK %#v\n", c.task) + fmt.Printf("DIPTANU SERVICES %#v\n", services) services[service.ID(c.allocID, c.task.Name)] = struct{}{} } } @@ -223,6 +224,9 @@ func (c *ConsulService) KeepServices(tasks []*structs.Task) error { // registerCheck registers a check definition with Consul func (c *ConsulService) registerCheck(chkReg *consul.AgentCheckRegistration) error { + if nc, ok := c.nomadChecks[chkReg.ID]; ok { + nc.Start() + } return c.client.Agent().CheckRegister(chkReg) } @@ -257,13 +261,8 @@ func (c *ConsulService) createCheckReg(check *structs.ServiceCheck, service *con if err != nil { return nil, err } - if err := c.execChecks.Push(chk, time.Now().Add(check.Interval)); err != nil { - c.logger.Printf("[ERR] consulservice: unable to add check %q to heap", chk.ID()) - } - select { - case c.updateCh <- struct{}{}: - default: - } + nc := NewNomadCheck(chk, c.runCheck, c.logger) + c.nomadChecks[chk.ID()] = nc } return &chkReg, nil } @@ -307,31 +306,27 @@ func (c *ConsulService) deregisterService(ID string) error { // deregisterCheck de-registers a check with a given ID from Consul. func (c *ConsulService) deregisterCheck(ID string) error { - if err := c.execChecks.Remove(ID); err != nil { - c.logger.Printf("[DEBUG] consulservice: unable to remove check with ID %q from heap", ID) + // Deleting the nomad check + if nc, ok := c.nomadChecks[ID]; ok { + nc.Stop() + delete(c.nomadChecks, ID) } + + // Deleteting from consul return c.client.Agent().CheckDeregister(ID) } // PeriodicSync triggers periodic syncing of services and checks with Consul. // This is a long lived go-routine which is stopped during shutdown func (c *ConsulService) PeriodicSync() { - var runCheck <-chan time.Time sync := time.After(syncInterval) for { - runCheck = c.sleepBeforeRunningCheck() select { case <-sync: if err := c.performSync(); err != nil { c.logger.Printf("[DEBUG] consul: error in syncing task %q: %v", c.task.Name, err) } sync = time.After(syncInterval) - case <-c.updateCh: - continue - case <-runCheck: - chk := c.execChecks.heap.Pop().(consulCheck) - runCheck = c.sleepBeforeRunningCheck() - c.runCheck(chk.check) case <-c.shutdownCh: c.logger.Printf("[INFO] consul: shutting down sync for task %q", c.task.Name) return @@ -339,13 +334,6 @@ func (c *ConsulService) PeriodicSync() { } } -func (c *ConsulService) sleepBeforeRunningCheck() <-chan time.Time { - if c := c.execChecks.Peek(); c != nil { - return time.After(time.Now().Sub(c.next)) - } - return nil -} - // performSync sync the services and checks we are tracking with Consul. func (c *ConsulService) performSync() error { var mErr multierror.Error @@ -411,14 +399,19 @@ func (c *ConsulService) consulPresent() bool { // runCheck runs a check and updates the corresponding ttl check in consul func (c *ConsulService) runCheck(check Check) { res := check.Run() + state := consul.HealthCritical + output := res.Output if res.Err != nil { - c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical) + output = res.Err.Error() } if res.ExitCode == 0 { - c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthPassing) + state = consul.HealthPassing } if res.ExitCode == 1 { - c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthWarning) + state = consul.HealthWarning + } + + if err := c.client.Agent().UpdateTTL(check.ID(), output, state); err != nil { + c.logger.Printf("[DEBUG] error updating ttl check for check %q: %v", check.ID(), err) } - c.client.Agent().UpdateTTL(check.ID(), res.Output, consul.HealthCritical) } diff --git a/client/driver/executor/checks.go b/client/driver/executor/checks.go index 40cf3681d..ee84a18c5 100644 --- a/client/driver/executor/checks.go +++ b/client/driver/executor/checks.go @@ -22,6 +22,7 @@ var ( type DockerScriptCheck struct { id string + interval time.Duration containerID string logger *log.Logger cmd string @@ -42,16 +43,16 @@ func (d *DockerScriptCheck) dockerClient() (*docker.Client, error) { createClient.Do(func() { if d.dockerEndpoint != "" { if d.tlsCert+d.tlsKey+d.tlsCa != "" { - d.logger.Printf("[DEBUG] driver.docker: using TLS client connection to %s", d.dockerEndpoint) + d.logger.Printf("[DEBUG] executor.checks: using TLS client connection to %s", d.dockerEndpoint) client, err = docker.NewTLSClient(d.dockerEndpoint, d.tlsCert, d.tlsKey, d.tlsCa) } else { - d.logger.Printf("[DEBUG] driver.docker: using standard client connection to %s", d.dockerEndpoint) + d.logger.Printf("[DEBUG] executor.checks: using standard client connection to %s", d.dockerEndpoint) client, err = docker.NewClient(d.dockerEndpoint) } return } - d.logger.Println("[DEBUG] driver.docker: using client connection initialized from environment") + d.logger.Println("[DEBUG] executor.checks: using client connection initialized from environment") client, err = docker.NewClientFromEnv() }) return client, err @@ -106,11 +107,16 @@ func (d *DockerScriptCheck) ID() string { return d.id } +func (d *DockerScriptCheck) Interval() time.Duration { + return d.interval +} + type ExecScriptCheck struct { - id string - cmd string - args []string - taskDir string + id string + interval time.Duration + cmd string + args []string + taskDir string FSIsolation bool } @@ -152,3 +158,7 @@ func (e *ExecScriptCheck) Run() *cstructs.CheckResult { func (e *ExecScriptCheck) ID() string { return e.id } + +func (e *ExecScriptCheck) Interval() time.Duration { + return e.interval +} diff --git a/client/driver/executor/checks_test.go b/client/driver/executor/checks_test.go index 15f49265a..9526b6330 100644 --- a/client/driver/executor/checks_test.go +++ b/client/driver/executor/checks_test.go @@ -3,8 +3,29 @@ package executor import ( "strings" "testing" + + docker "github.com/fsouza/go-dockerclient" ) +// dockerIsConnected checks to see if a docker daemon is available (local or remote) +func dockerIsConnected(t *testing.T) bool { + client, err := docker.NewClientFromEnv() + if err != nil { + return false + } + + // Creating a client doesn't actually connect, so make sure we do something + // like call Version() on it. + env, err := client.Version() + if err != nil { + t.Logf("Failed to connect to docker daemon: %s", err) + return false + } + + t.Logf("Successfully connected to docker daemon running version %s", env.Get("Version")) + return true +} + func TestExecScriptCheckNoIsolation(t *testing.T) { check := &ExecScriptCheck{ id: "foo", diff --git a/client/driver/executor/executor.go b/client/driver/executor/executor.go index 0f98f18da..4089632fe 100644 --- a/client/driver/executor/executor.go +++ b/client/driver/executor/executor.go @@ -515,6 +515,7 @@ func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID str if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "docker" { return &DockerScriptCheck{ id: checkID, + interval: check.Interval, containerID: e.consulCtx.ContainerID, logger: e.logger, cmd: check.Cmd, @@ -525,6 +526,7 @@ func (e *UniversalExecutor) createCheck(check *structs.ServiceCheck, checkID str if check.Type == structs.ServiceCheckScript && e.ctx.Driver == "exec" { return &ExecScriptCheck{ id: checkID, + interval: check.Interval, cmd: check.Cmd, args: check.Args, taskDir: e.taskDir, diff --git a/jobspec/parse.go b/jobspec/parse.go index c94950428..acc9cc4af 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -750,6 +750,8 @@ func parseChecks(service *structs.Service, checkObjs *ast.ObjectList) error { "timeout", "path", "protocol", + "cmd", + "args", } if err := checkHCLKeys(co.Val, valid); err != nil { return multierror.Prefix(err, "check ->")