From da82a6e81486933c2f842aaf17209fda543dfd49 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Sun, 2 Jul 2017 21:49:56 -0700 Subject: [PATCH] initial watcher --- api/tasks.go | 1 + client/alloc_runner.go | 46 +++++--- client/alloc_runner_health_watcher.go | 145 ++++++++++++++++++++++++++ client/client.go | 1 + client/structs/funcs.go | 81 ++++++++++++++ nomad/state/state_store.go | 6 ++ nomad/structs/structs.go | 8 +- 7 files changed, 270 insertions(+), 18 deletions(-) create mode 100644 client/alloc_runner_health_watcher.go create mode 100644 client/structs/funcs.go diff --git a/api/tasks.go b/api/tasks.go index bedbaa149..1659dd43b 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -475,6 +475,7 @@ func (t *Task) SetLogConfig(l *LogConfig) *Task { type TaskState struct { State string Failed bool + Restarts uint64 StartedAt time.Time FinishedAt time.Time Events []*TaskEvent diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1c75aeb8a..fa73bcd02 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -13,6 +13,7 @@ import ( "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/config" "github.com/hashicorp/nomad/client/vaultclient" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" cstructs "github.com/hashicorp/nomad/client/structs" @@ -52,6 +53,8 @@ type AllocRunner struct { alloc *structs.Allocation allocClientStatus string // Explicit status of allocation. Set when there are failures allocClientDescription string + allocHealth *bool // Whether the allocation is healthy + allocBroadcast *cstructs.AllocBroadcaster allocLock sync.Mutex dirtyCh chan struct{} @@ -143,20 +146,21 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB, consulClient ConsulServiceAPI) *AllocRunner { ar := &AllocRunner{ - config: config, - stateDB: stateDB, - updater: updater, - logger: logger, - alloc: alloc, - dirtyCh: make(chan struct{}, 1), - tasks: make(map[string]*TaskRunner), - taskStates: copyTaskStates(alloc.TaskStates), - restored: make(map[string]struct{}), - updateCh: make(chan *structs.Allocation, 64), - destroyCh: make(chan struct{}), - waitCh: make(chan struct{}), - vaultClient: vaultClient, - consulClient: consulClient, + config: config, + stateDB: stateDB, + updater: updater, + logger: logger, + alloc: alloc, + allocBroadcast: cstructs.NewAllocBroadcaster(0), + dirtyCh: make(chan struct{}, 1), + tasks: make(map[string]*TaskRunner), + taskStates: copyTaskStates(alloc.TaskStates), + restored: make(map[string]struct{}), + updateCh: make(chan *structs.Allocation, 64), + destroyCh: make(chan struct{}), + waitCh: make(chan struct{}), + vaultClient: vaultClient, + consulClient: consulClient, } return ar } @@ -475,6 +479,14 @@ func (r *AllocRunner) Alloc() *structs.Allocation { r.allocLock.Unlock() return alloc } + + // The health has been set + if r.allocHealth != nil { + if alloc.DeploymentStatus == nil { + alloc.DeploymentStatus = &structs.AllocDeploymentStatus{} + } + alloc.DeploymentStatus.Healthy = helper.BoolToPtr(*r.allocHealth) + } r.allocLock.Unlock() // Scan the task states to determine the status of the alloc @@ -536,6 +548,7 @@ func (r *AllocRunner) syncStatus() error { // Get a copy of our alloc, update status server side and sync to disk alloc := r.Alloc() r.updater(alloc) + r.allocBroadcast.Send(alloc) return r.saveAllocRunnerState() } @@ -567,6 +580,9 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv if event.FailsTask { taskState.Failed = true } + if event.Type == structs.TaskRestarting { + taskState.Restarts++ + } r.appendTaskEvent(taskState, event) } @@ -650,6 +666,7 @@ func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.T func (r *AllocRunner) Run() { defer close(r.waitCh) go r.dirtySyncState() + go r.watchHealth() // Find the task group to run in the allocation alloc := r.Alloc() @@ -913,6 +930,7 @@ func (r *AllocRunner) Destroy() { } r.destroy = true close(r.destroyCh) + r.allocBroadcast.Close() } // WaitCh returns a channel to wait for termination diff --git a/client/alloc_runner_health_watcher.go b/client/alloc_runner_health_watcher.go new file mode 100644 index 000000000..39ede4386 --- /dev/null +++ b/client/alloc_runner_health_watcher.go @@ -0,0 +1,145 @@ +package client + +import ( + "time" + + "github.com/hashicorp/nomad/helper" + "github.com/hashicorp/nomad/nomad/structs" +) + +// watchHealth is responsible for watching an allocation's task status and +// potentially consul health check status to determine if the allocation is +// healthy or unhealthy. +func (r *AllocRunner) watchHealth() { + // Get our alloc and the task group + alloc := r.Alloc() + tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) + if tg == nil { + r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher") + return + } + + u := tg.Update + + // Checks marks whether we should be watching for Consul health checks + checks := false + r.logger.Printf("XXX %v", checks) + + switch { + case u == nil: + r.logger.Printf("[TRACE] client.alloc_watcher: no update block for alloc %q. exiting", alloc.ID) + return + case u.HealthCheck == structs.UpdateStrategyHealthCheck_Manual: + r.logger.Printf("[TRACE] client.alloc_watcher: update block has manual checks for alloc %q. exiting", alloc.ID) + return + case u.HealthCheck == structs.UpdateStrategyHealthCheck_Checks: + checks = true + } + + // Get a listener so we know when an allocation is updated. + l := r.allocBroadcast.Listen() + + // Create a deadline timer for the health + deadline := time.NewTimer(u.HealthyDeadline) + + // Create a healthy timer + latestHealthyTime := time.Unix(0, 0) + healthyTimer := time.NewTimer(0) + if !healthyTimer.Stop() { + <-healthyTimer.C + } + + // Cleanup function + defer func() { + if !deadline.Stop() { + <-deadline.C + } + if !healthyTimer.Stop() { + <-healthyTimer.C + } + l.Close() + }() + + setHealth := func(h bool) { + r.allocLock.Lock() + r.allocHealth = helper.BoolToPtr(h) + r.allocLock.Unlock() + r.syncStatus() + } + + first := true +OUTER: + for { + if !first { + select { + case <-r.destroyCh: + return + case newAlloc, ok := <-l.Ch: + if !ok { + return + } + + alloc = newAlloc + if alloc.DeploymentID == "" { + continue OUTER + } + + r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID) + case <-deadline.C: + // We have exceeded our deadline without being healthy. + setHealth(false) + return + case <-healthyTimer.C: + r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q is healthy", alloc.ID) + setHealth(true) + } + } + + first = false + + // If the alloc is being stopped by the server just exit + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + r.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID) + return + } + + // If the task is dead or has restarted, fail + for _, tstate := range alloc.TaskStates { + if tstate.Failed || !tstate.FinishedAt.IsZero() || tstate.Restarts != 0 { + r.logger.Printf("[TRACE] client.alloc_watcher: setting health to false for alloc %q", alloc.ID) + setHealth(false) + return + } + } + + // Determine if the allocation is healthy + for task, tstate := range alloc.TaskStates { + if tstate.State != structs.TaskStateRunning { + r.logger.Printf("[TRACE] client.alloc_watcher: continuing since task %q hasn't started for alloc %q", task, alloc.ID) + continue OUTER + } + + if tstate.StartedAt.After(latestHealthyTime) { + latestHealthyTime = tstate.StartedAt + } + } + + // If we are already healthy we don't set the timer + healthyThreshold := latestHealthyTime.Add(u.MinHealthyTime) + if time.Now().After(healthyThreshold) { + continue OUTER + } + + // Start the time til we are healthy + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + d := time.Until(healthyThreshold) + healthyTimer.Reset(d) + r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID) + } +} diff --git a/client/client.go b/client/client.go index 486dea9ff..1e81fe79d 100644 --- a/client/client.go +++ b/client/client.go @@ -1295,6 +1295,7 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) { stripped.TaskStates = alloc.TaskStates stripped.ClientStatus = alloc.ClientStatus stripped.ClientDescription = alloc.ClientDescription + stripped.DeploymentStatus = alloc.DeploymentStatus select { case c.allocUpdates <- stripped: diff --git a/client/structs/funcs.go b/client/structs/funcs.go new file mode 100644 index 000000000..a82a5a519 --- /dev/null +++ b/client/structs/funcs.go @@ -0,0 +1,81 @@ +package structs + +import ( + "sync" + + "github.com/hashicorp/nomad/nomad/structs" +) + +// AllocBroadcaster implements an allocation broadcast channel. +// The zero value is a usable unbuffered channel. +type AllocBroadcaster struct { + m sync.Mutex + listeners map[int]chan<- *structs.Allocation // lazy init + nextId int + capacity int + closed bool +} + +// NewAllocBroadcaster returns a new AllocBroadcaster with the given capacity (0 means unbuffered). +func NewAllocBroadcaster(n int) *AllocBroadcaster { + return &AllocBroadcaster{capacity: n} +} + +// AllocListener implements a listening endpoint for an allocation broadcast channel. +type AllocListener struct { + // Ch receives the broadcast messages. + Ch <-chan *structs.Allocation + b *AllocBroadcaster + id int +} + +// Send broadcasts a message to the channel. +// Sending on a closed channel causes a runtime panic. +func (b *AllocBroadcaster) Send(v *structs.Allocation) { + b.m.Lock() + defer b.m.Unlock() + if b.closed { + return + } + for _, l := range b.listeners { + select { + case l <- v: + default: + } + } +} + +// Close closes the channel, disabling the sending of further messages. +func (b *AllocBroadcaster) Close() { + b.m.Lock() + defer b.m.Unlock() + b.closed = true + for _, l := range b.listeners { + close(l) + } +} + +// Listen returns a Listener for the broadcast channel. +func (b *AllocBroadcaster) Listen() *AllocListener { + b.m.Lock() + defer b.m.Unlock() + if b.listeners == nil { + b.listeners = make(map[int]chan<- *structs.Allocation) + } + for b.listeners[b.nextId] != nil { + b.nextId++ + } + ch := make(chan *structs.Allocation, b.capacity) + if b.closed { + close(ch) + } + b.listeners[b.nextId] = ch + return &AllocListener{ch, b, b.nextId} +} + +// Close closes the Listener, disabling the receival of further messages. +func (l *AllocListener) Close() { + l.b.m.Lock() + defer l.b.m.Unlock() + delete(l.b.listeners, l.id) +} diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index fb6aeb5e8..ac1a79ea2 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1434,10 +1434,16 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a copyAlloc.ClientStatus = alloc.ClientStatus copyAlloc.ClientDescription = alloc.ClientDescription copyAlloc.TaskStates = alloc.TaskStates + copyAlloc.DeploymentStatus = alloc.DeploymentStatus // Update the modify index copyAlloc.ModifyIndex = index + // TODO TEST + if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil { + return fmt.Errorf("error updating deployment: %v", err) + } + if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, txn); err != nil { return fmt.Errorf("error updating job summary: %v", err) } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index eb1784ad4..5a9f7bcbf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3262,6 +3262,9 @@ type TaskState struct { // Failed marks a task as having failed Failed bool + // Restarts is the number of times the task has restarted + Restarts uint64 + // StartedAt is the time the task is started. It is updated each time the // task starts StartedAt time.Time @@ -3279,10 +3282,7 @@ func (ts *TaskState) Copy() *TaskState { return nil } copy := new(TaskState) - copy.State = ts.State - copy.Failed = ts.Failed - copy.StartedAt = ts.StartedAt - copy.FinishedAt = ts.FinishedAt + *copy = *ts if ts.Events != nil { copy.Events = make([]*TaskEvent, len(ts.Events))