From 2b5d840eaa9865056d669a349a3aa1881fc72caa Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 30 Aug 2018 14:33:50 -0700 Subject: [PATCH] arv2: implement alloc health watching Also remove initial alloc from broadcaster as it just caused useless extra processing. --- client/allochealth/tracker.go | 472 ++++++++++++++++++ client/allocrunnerv2/alloc_runner.go | 73 ++- client/allocrunnerv2/alloc_runner_hooks.go | 86 ++-- client/allocrunnerv2/health_hook.go | 247 +++++++++ .../interfaces/runner_lifecycle.go | 7 +- client/allocrunnerv2/state/state.go | 26 + .../allocrunnerv2/taskrunner/task_runner.go | 3 +- client/structs/broadcaster.go | 22 +- client/structs/broadcaster_test.go | 33 +- 9 files changed, 874 insertions(+), 95 deletions(-) create mode 100644 client/allochealth/tracker.go create mode 100644 client/allocrunnerv2/health_hook.go diff --git a/client/allochealth/tracker.go b/client/allochealth/tracker.go new file mode 100644 index 000000000..cc865a605 --- /dev/null +++ b/client/allochealth/tracker.go @@ -0,0 +1,472 @@ +package allochealth + +import ( + "context" + "fmt" + "strings" + "sync" + "time" + + "github.com/hashicorp/consul/api" + hclog "github.com/hashicorp/go-hclog" + cconsul "github.com/hashicorp/nomad/client/consul" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/command/agent/consul" + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // allocHealthEventSource is the source used for emitting task events + allocHealthEventSource = "Alloc Unhealthy" + + // consulCheckLookupInterval is the interval at which we check if the + // Consul checks are healthy or unhealthy. + consulCheckLookupInterval = 500 * time.Millisecond +) + +// Tracker tracks the health of an allocation and makes health events watchable +// via channels. +type Tracker struct { + // ctx and cancelFn is used to shutdown the tracker + ctx context.Context + cancelFn context.CancelFunc + + // alloc is the alloc we are tracking + alloc *structs.Allocation + + // tg is the task group we are tracking + tg *structs.TaskGroup + + // minHealthyTime is the duration an alloc must remain healthy to be + // considered healthy + minHealthyTime time.Duration + + // useChecks specifies whether to use Consul healh checks or not + useChecks bool + + // consulCheckCount is the number of checks the task group will attempt to + // register + consulCheckCount int + + // allocUpdates is a listener for retrieving new alloc updates + allocUpdates *cstructs.AllocListener + + // consulClient is used to look up the state of the task's checks + consulClient cconsul.ConsulServiceAPI + + // healthy is used to signal whether we have determined the allocation to be + // healthy or unhealthy + healthy chan bool + + // allocStopped is triggered when the allocation is stopped and tracking is + // not needed + allocStopped chan struct{} + + // l is used to lock shared fields listed below + l sync.Mutex + + // tasksHealthy marks whether all the tasks have met their health check + // (disregards Consul) + tasksHealthy bool + + // allocFailed marks whether the allocation failed + allocFailed bool + + // checksHealthy marks whether all the task's Consul checks are healthy + checksHealthy bool + + // taskHealth contains the health state for each task + taskHealth map[string]*taskHealthState + + logger hclog.Logger +} + +// NewTracker returns a health tracker for the given allocation. An alloc +// listener and consul API object are given so that the watcher can detect +// health changes. +func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.Allocation, + allocUpdates *cstructs.AllocListener, consulClient cconsul.ConsulServiceAPI, + minHealthyTime time.Duration, useChecks bool) *Tracker { + + // Do not create a named sub-logger as the hook controlling + // this struct should pass in an appropriately named + // sub-logger. + t := &Tracker{ + healthy: make(chan bool, 1), + allocStopped: make(chan struct{}), + alloc: alloc, + tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup), + minHealthyTime: minHealthyTime, + useChecks: useChecks, + allocUpdates: allocUpdates, + consulClient: consulClient, + logger: logger, + } + + t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks)) + for _, task := range t.tg.Tasks { + t.taskHealth[task.Name] = &taskHealthState{task: task} + } + + for _, task := range t.tg.Tasks { + for _, s := range task.Services { + t.consulCheckCount += len(s.Checks) + } + } + + t.ctx, t.cancelFn = context.WithCancel(parentCtx) + return t +} + +// Start starts the watcher. +func (t *Tracker) Start() { + go t.watchTaskEvents() + if t.useChecks { + go t.watchConsulEvents() + } +} + +// HealthyCh returns a channel that will emit a boolean indicating the health of +// the allocation. +func (t *Tracker) HealthyCh() <-chan bool { + return t.healthy +} + +// AllocStoppedCh returns a channel that will be fired if the allocation is +// stopped. This means that health will not be set. +func (t *Tracker) AllocStoppedCh() <-chan struct{} { + return t.allocStopped +} + +// TaskEvents returns a map of events by task. This should only be called after +// health has been determined. Only tasks that have contributed to the +// allocation being unhealthy will have an event. +func (t *Tracker) TaskEvents() map[string]*structs.TaskEvent { + t.l.Lock() + defer t.l.Unlock() + + // Nothing to do since the failure wasn't task related + if t.allocFailed { + return nil + } + + deadline, _ := t.ctx.Deadline() + events := make(map[string]*structs.TaskEvent, len(t.tg.Tasks)) + + // Go through are task information and build the event map + for task, state := range t.taskHealth { + useChecks := t.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks + if e, ok := state.event(deadline, t.tg.Update.MinHealthyTime, useChecks); ok { + events[task] = structs.NewTaskEvent(allocHealthEventSource).SetMessage(e) + } + } + + return events +} + +// setTaskHealth is used to set the tasks health as healthy or unhealthy. If the +// allocation is terminal, health is immediately broadcasted. +func (t *Tracker) setTaskHealth(healthy, terminal bool) { + t.l.Lock() + defer t.l.Unlock() + t.tasksHealthy = healthy + + // If we are marked healthy but we also require Consul to be healthy and it + // isn't yet, return, unless the task is terminal + requireConsul := t.useChecks && t.consulCheckCount > 0 + if !terminal && healthy && requireConsul && !t.checksHealthy { + return + } + + select { + case t.healthy <- healthy: + default: + } + + // Shutdown the tracker + t.cancelFn() +} + +// setCheckHealth is used to mark the checks as either healthy or unhealthy. +func (t *Tracker) setCheckHealth(healthy bool) { + t.l.Lock() + defer t.l.Unlock() + t.checksHealthy = healthy + + // Only signal if we are healthy and so is the tasks + if !healthy || !t.tasksHealthy { + return + } + + select { + case t.healthy <- healthy: + default: + } + + // Shutdown the tracker + t.cancelFn() +} + +// markAllocStopped is used to mark the allocation as having stopped. +func (t *Tracker) markAllocStopped() { + close(t.allocStopped) + t.cancelFn() +} + +// watchTaskEvents is a long lived watcher that watches for the health of the +// allocation's tasks. +func (t *Tracker) watchTaskEvents() { + alloc := t.alloc + allStartedTime := time.Time{} + healthyTimer := time.NewTimer(0) + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + + for { + // If the alloc is being stopped by the server just exit + switch alloc.DesiredStatus { + case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict: + t.logger.Trace("desired status is terminal for alloc", "alloc_id", alloc.ID, "desired_status", alloc.DesiredStatus) + t.markAllocStopped() + return + } + + // Store the task states + t.l.Lock() + for task, state := range alloc.TaskStates { + t.taskHealth[task].state = state + } + t.l.Unlock() + + // Detect if the alloc is unhealthy or if all tasks have started yet + latestStartTime := time.Time{} + for _, state := range alloc.TaskStates { + // One of the tasks has failed so we can exit watching + if state.Failed || !state.FinishedAt.IsZero() { + t.setTaskHealth(false, true) + return + } + + if state.State != structs.TaskStateRunning { + latestStartTime = time.Time{} + break + } else if state.StartedAt.After(latestStartTime) { + latestStartTime = state.StartedAt + } + } + + // If the alloc is marked as failed by the client but none of the + // individual tasks failed, that means something failed at the alloc + // level. + if alloc.ClientStatus == structs.AllocClientStatusFailed { + t.l.Lock() + t.allocFailed = true + t.l.Unlock() + t.setTaskHealth(false, true) + return + } + + if !latestStartTime.Equal(allStartedTime) { + // Avoid the timer from firing at the old start time + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + + // Set the timer since all tasks are started + if !latestStartTime.IsZero() { + allStartedTime = latestStartTime + healthyTimer.Reset(t.minHealthyTime) + } + } + + select { + case <-t.ctx.Done(): + return + case newAlloc, ok := <-t.allocUpdates.Ch: + if !ok { + return + } + alloc = newAlloc + case <-healthyTimer.C: + t.setTaskHealth(true, false) + } + } +} + +// watchConsulEvents iis a long lived watcher that watches for the health of the +// allocation's Consul checks. +func (t *Tracker) watchConsulEvents() { + // checkTicker is the ticker that triggers us to look at the checks in + // Consul + checkTicker := time.NewTicker(consulCheckLookupInterval) + defer checkTicker.Stop() + + // healthyTimer fires when the checks have been healthy for the + // MinHealthyTime + healthyTimer := time.NewTimer(0) + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + + // primed marks whether the healthy timer has been set + primed := false + + // Store whether the last Consul checks call was successful or not + consulChecksErr := false + + // allocReg are the registered objects in Consul for the allocation + var allocReg *consul.AllocRegistration + +OUTER: + for { + select { + case <-t.ctx.Done(): + return + case <-checkTicker.C: + newAllocReg, err := t.consulClient.AllocRegistrations(t.alloc.ID) + if err != nil { + if !consulChecksErr { + consulChecksErr = true + t.logger.Warn("error looking up Consul registrations for allocation", "error", err, "alloc_id", t.alloc.ID) + } + continue OUTER + } else { + consulChecksErr = false + allocReg = newAllocReg + } + case <-healthyTimer.C: + t.setCheckHealth(true) + } + + if allocReg == nil { + continue + } + + // Store the task registrations + t.l.Lock() + for task, reg := range allocReg.Tasks { + t.taskHealth[task].taskRegistrations = reg + } + t.l.Unlock() + + // Detect if all the checks are passing + passed := true + + CHECKS: + for _, treg := range allocReg.Tasks { + for _, sreg := range treg.Services { + for _, check := range sreg.Checks { + if check.Status == api.HealthPassing { + continue + } + + passed = false + t.setCheckHealth(false) + break CHECKS + } + } + } + + if !passed { + // Reset the timer since we have transitioned back to unhealthy + if primed { + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + primed = false + } + } else if !primed { + // Reset the timer to fire after MinHealthyTime + if !healthyTimer.Stop() { + select { + case <-healthyTimer.C: + default: + } + } + + primed = true + healthyTimer.Reset(t.minHealthyTime) + } + } +} + +// taskHealthState captures all known health information about a task. It is +// largely used to determine if the task has contributed to the allocation being +// unhealthy. +type taskHealthState struct { + task *structs.Task + state *structs.TaskState + taskRegistrations *consul.TaskRegistration +} + +// event takes the deadline time for the allocation to be healthy and the update +// strategy of the group. It returns true if the task has contributed to the +// allocation being unhealthy and if so, an event description of why. +func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration, useChecks bool) (string, bool) { + requireChecks := false + desiredChecks := 0 + for _, s := range t.task.Services { + if nc := len(s.Checks); nc > 0 { + requireChecks = true + desiredChecks += nc + } + } + requireChecks = requireChecks && useChecks + + if t.state != nil { + if t.state.Failed { + return "Unhealthy because of failed task", true + } + if t.state.State != structs.TaskStateRunning { + return "Task not running by deadline", true + } + + // We are running so check if we have been running long enough + if t.state.StartedAt.Add(minHealthyTime).After(deadline) { + return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true + } + } + + if t.taskRegistrations != nil { + var notPassing []string + passing := 0 + + OUTER: + for _, sreg := range t.taskRegistrations.Services { + for _, check := range sreg.Checks { + if check.Status != api.HealthPassing { + notPassing = append(notPassing, sreg.Service.Service) + continue OUTER + } else { + passing++ + } + } + } + + if len(notPassing) != 0 { + return fmt.Sprintf("Services not healthy by deadline: %s", strings.Join(notPassing, ", ")), true + } + + if passing != desiredChecks { + return fmt.Sprintf("Only %d out of %d checks registered and passing", passing, desiredChecks), true + } + + } else if requireChecks { + return "Service checks not registered", true + } + + return "", false +} diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index ec8fd3c46..9bf7167c4 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -24,14 +24,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -const ( - // updateChCap is the capacity of AllocRunner's updateCh. It must be 1 - // as we only want to process the latest update, so if there's already - // a pending update it will be removed from the chan before adding the - // newer update. - updateChCap = 1 -) - // allocRunner is used to run all the tasks in a given allocation type allocRunner struct { // id is the ID of the allocation. Can be accessed without a lock @@ -82,11 +74,6 @@ type allocRunner struct { tasks map[string]*taskrunner.TaskRunner tasksLock sync.RWMutex - // updateCh receives allocation updates via the Update method. Must - // have buffer size 1 in order to support dropping pending updates when - // a newer allocation is received. - updateCh chan *structs.Allocation - // allocBroadcaster sends client allocation updates to all listeners allocBroadcaster *cstructs.AllocBroadcaster @@ -111,16 +98,15 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), - updateCh: make(chan *structs.Allocation, updateChCap), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, - allocBroadcaster: cstructs.NewAllocBroadcaster(alloc), + allocBroadcaster: cstructs.NewAllocBroadcaster(), prevAllocWatcher: config.PrevAllocWatcher, } // Create the logger based on the allocation ID - ar.logger = config.Logger.With("alloc_id", alloc.ID) + ar.logger = config.Logger.Named("alloc_runner").With("alloc_id", alloc.ID) // Create alloc dir ar.allocDir = allocdir.NewAllocDir(ar.logger, filepath.Join(config.ClientConfig.AllocDir, alloc.ID)) @@ -251,6 +237,56 @@ func (ar *allocRunner) Restore() error { return nil } +// SetHealth allows the health watcher hook to set the alloc's +// deployment/migration health and emit task events. +// +// Only for use by health hook. +func (ar *allocRunner) SetHealth(healthy, isDeploy bool, trackerTaskEvents map[string]*structs.TaskEvent) { + // Updating alloc deployment state is tricky because it may be nil, but + // if it's not then we need to maintain the values of Canary and + // ModifyIndex as they're only mutated by the server. + ar.stateLock.Lock() + ar.state.SetDeploymentStatus(time.Now(), healthy) + ar.stateLock.Unlock() + + // If deployment is unhealthy emit task events explaining why + ar.tasksLock.RLock() + if !healthy && isDeploy { + for task, event := range trackerTaskEvents { + if tr, ok := ar.tasks[task]; ok { + tr.EmitEvent(event) + } + } + } + + // Gather the state of the other tasks + states := make(map[string]*structs.TaskState, len(ar.tasks)) + for name, tr := range ar.tasks { + states[name] = tr.TaskState() + } + ar.tasksLock.RUnlock() + + // Build the client allocation + calloc := ar.clientAlloc(states) + + // Update the server + ar.stateUpdater.AllocStateUpdated(calloc) + + // Broadcast client alloc to listeners + ar.allocBroadcaster.Send(calloc) +} + +// ClearHealth allows the health watcher hook to clear the alloc's deployment +// health if the deployment id changes. It does not update the server as the +// status is only cleared when already receiving an update from the server. +// +// Only for use by health hook. +func (ar *allocRunner) ClearHealth() { + ar.stateLock.Lock() + ar.state.ClearDeploymentStatus() + ar.stateLock.Unlock() +} + // TaskStateUpdated is called by TaskRunner when a task's state has been // updated. This hook is used to compute changes to the alloc's ClientStatus // and to update the server with the new state. @@ -408,7 +444,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) { // Update ar.alloc ar.setAlloc(update) - //TODO Run AR Update hooks + // Run hooks + if err := ar.update(update); err != nil { + ar.logger.Error("error running update hooks", "error", err) + } // Update task runners for _, tr := range ar.tasks { diff --git a/client/allocrunnerv2/alloc_runner_hooks.go b/client/allocrunnerv2/alloc_runner_hooks.go index 198f3229f..a236f7fa9 100644 --- a/client/allocrunnerv2/alloc_runner_hooks.go +++ b/client/allocrunnerv2/alloc_runner_hooks.go @@ -5,9 +5,9 @@ import ( "fmt" "time" - log "github.com/hashicorp/go-hclog" multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + "github.com/hashicorp/nomad/nomad/structs" ) // initRunnerHooks intializes the runners hooks. @@ -19,6 +19,7 @@ func (ar *allocRunner) initRunnerHooks() { ar.runnerHooks = []interfaces.RunnerHook{ newAllocDirHook(hookLogger, ar.allocDir), newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir), + newAllocHealthWatcherHook(hookLogger, ar, ar.consulClient), } } @@ -63,6 +64,48 @@ func (ar *allocRunner) prerun() error { return nil } +// update runs the alloc runner update hooks. Update hooks are run +// asynchronously with all other alloc runner operations. +func (ar *allocRunner) update(update *structs.Allocation) error { + if ar.logger.IsTrace() { + start := time.Now() + ar.logger.Trace("running update hooks", "start", start) + defer func() { + end := time.Now() + ar.logger.Trace("finished update hooks", "end", end, "duration", end.Sub(start)) + }() + } + + req := &interfaces.RunnerUpdateRequest{ + Alloc: update, + } + + for _, hook := range ar.runnerHooks { + h, ok := hook.(interfaces.RunnerUpdateHook) + if !ok { + continue + } + + name := h.Name() + var start time.Time + if ar.logger.IsTrace() { + start = time.Now() + ar.logger.Trace("running pre-run hook", "name", name, "start", start) + } + + if err := h.Update(req); err != nil { + return fmt.Errorf("hook %q failed: %v", name, err) + } + + if ar.logger.IsTrace() { + end := time.Now() + ar.logger.Trace("finished update hooks", "name", name, "end", end, "duration", end.Sub(start)) + } + } + + return nil +} + // postrun is used to run the runners postrun hooks. func (ar *allocRunner) postrun() error { if ar.logger.IsTrace() { @@ -138,44 +181,3 @@ func (ar *allocRunner) destroy() error { return nil } - -// TODO -type allocHealthWatcherHook struct { - runner *allocRunner - logger log.Logger - ctx context.Context - cancelFn context.CancelFunc -} - -func newAllocHealthWatcherHook(runner *allocRunner, logger log.Logger) *allocHealthWatcherHook { - ctx, cancelFn := context.WithCancel(context.Background()) - ad := &allocHealthWatcherHook{ - runner: runner, - ctx: ctx, - cancelFn: cancelFn, - } - - ad.logger = logger.Named(ad.Name()) - return ad -} - -func (h *allocHealthWatcherHook) Name() string { - return "alloc_health_watcher" -} - -func (h *allocHealthWatcherHook) Prerun() error { - return nil -} - -func (h *allocHealthWatcherHook) Update() error { - // Cancel the old watcher and create a new one - h.cancelFn() - - // TODO create the new one - return nil -} - -func (h *allocHealthWatcherHook) Destroy() error { - h.cancelFn() - return nil -} diff --git a/client/allocrunnerv2/health_hook.go b/client/allocrunnerv2/health_hook.go new file mode 100644 index 000000000..fb2857d62 --- /dev/null +++ b/client/allocrunnerv2/health_hook.go @@ -0,0 +1,247 @@ +package allocrunnerv2 + +import ( + "context" + "fmt" + "sync" + "time" + + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/nomad/client/allochealth" + "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" + "github.com/hashicorp/nomad/client/consul" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" +) + +// healthMutator is able to set/clear alloc health as well as listen for alloc +// changes. +type healthMutator interface { + // Alloc returns the original alloc + Alloc() *structs.Allocation + + // Set health via the mutator + SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) + + // Clear health when the deployment ID changes + ClearHealth() + + // Listener listens for alloc updates + Listener() *cstructs.AllocListener +} + +// allocHealthWatcherHook is responsible for watching an allocation's task +// status and (optionally) Consul health check status to determine if the +// allocation is health or unhealthy. Used by deployments and migrations. +type allocHealthWatcherHook struct { + runner healthMutator + + // consul client used to monitor health checks + consul consul.ConsulServiceAPI + + // listener is given to trackers to listen for alloc updates and closed + // when the alloc is destroyed. + listener *cstructs.AllocListener + + // hookLock is held by hook methods to prevent concurrent access by + // Update and synchronous hooks. + hookLock sync.Mutex + + // watchLock is held by the health watching/setting goroutine to ensure + // only one health watching goroutine is running at a time. + watchLock sync.Mutex + + // ranOnce is set once Prerun or Update have run at least once. This + // prevents Prerun from running if an Update has already been + // processed. Must hold hookLock to access. + ranOnce bool + + // cancelFn stops the health watching/setting goroutine. Grab the + // watchLock to block until it exits. + cancelFn context.CancelFunc + + // alloc set by new func or Update. Must hold hookLock to access. + alloc *structs.Allocation + + // isDeploy is true if monitoring a deployment. Set in init(). Must + // hold hookLock to access. + isDeploy bool + + logger log.Logger +} + +func newAllocHealthWatcherHook(logger log.Logger, runner healthMutator, consul consul.ConsulServiceAPI) interfaces.RunnerHook { + alloc := runner.Alloc() + + // Neither deployments nor migrations care about the health of + // non-service jobs so never watch their health + if alloc.Job.Type != structs.JobTypeService { + return noopAllocHealthWatcherHook{} + } + + h := &allocHealthWatcherHook{ + runner: runner, + alloc: alloc, + cancelFn: func() {}, // initialize to prevent nil func panics + consul: consul, + listener: runner.Listener(), + } + + h.logger = logger.Named(h.Name()) + return h +} + +func (h *allocHealthWatcherHook) Name() string { + return "alloc_health_watcher" +} + +// init starts the allochealth.Tracker and watchHealth goroutine on either +// Prerun or Update. Caller must set/update alloc and logger fields. +// +// Not threadsafe so the caller should lock since Updates occur concurrently. +func (h *allocHealthWatcherHook) init() error { + // No need to watch health as it's already set + if h.alloc.DeploymentStatus.HasHealth() { + return nil + } + + tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup) + if tg == nil { + return fmt.Errorf("task group %q does not exist in job %q", h.alloc.TaskGroup, h.alloc.Job.ID) + } + + h.isDeploy = h.alloc.DeploymentID != "" + + // No need to watch allocs for deployments that rely on operators + // manually setting health + if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) { + return nil + } + + // Define the deadline, health method, min healthy time from the + // deployment if this is a deployment; otherwise from the migration + // strategy. + deadline, useChecks, minHealthyTime := getHealthParams(time.Now(), tg, h.isDeploy) + + // Create a context that is canceled when the tracker should shutdown + // or the deadline is reached. + ctx := context.Background() + ctx, h.cancelFn = context.WithDeadline(ctx, deadline) + + // Create a new tracker, start it, and watch for health results. + tracker := allochealth.NewTracker(ctx, h.logger, h.alloc, + h.listener, h.consul, minHealthyTime, useChecks) + tracker.Start() + go h.watchHealth(ctx, tracker) + return nil +} + +func (h *allocHealthWatcherHook) Prerun(context.Context) error { + h.hookLock.Lock() + defer h.hookLock.Unlock() + + if h.ranOnce { + // An Update beat Prerun to running the watcher; noop + return nil + } + + h.ranOnce = true + return h.init() +} + +func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error { + h.hookLock.Lock() + defer h.hookLock.Unlock() + + // Prevent Prerun from running after an Update + h.ranOnce = true + + // Cancel the old watcher and create a new one + h.cancelFn() + + // Acquire the watchLock to ensure the old watcher has exited before + // continuing. Kind of an ugly/easy-to-reuse done chan. + h.watchLock.Lock() + h.watchLock.Unlock() + + // Deployment has changed, reset status + if req.Alloc.DeploymentID != h.alloc.DeploymentID { + h.runner.ClearHealth() + } + + // Update alloc + h.alloc = req.Alloc + + return h.init() +} + +func (h *allocHealthWatcherHook) Destroy() error { + h.hookLock.Lock() + defer h.hookLock.Unlock() + + h.cancelFn() + h.listener.Close() + + // Acquire the watchLock to ensure any existing watcher has exited + // before exiting. Kind of an ugly/easy-to-reuse done chan. + h.watchLock.Lock() + h.watchLock.Unlock() + + return nil +} + +// watchHealth watches alloc health until it is set, the alloc is stopped, or +// the context is canceled. watchHealth will be canceled and restarted on +// Updates so calls are serialized with a lock. +func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker) { + h.watchLock.Lock() + defer h.watchLock.Unlock() + + select { + case <-ctx.Done(): + return + + case <-tracker.AllocStoppedCh(): + return + + case healthy := <-tracker.HealthyCh(): + // If this is an unhealthy deployment emit events for tasks + var taskEvents map[string]*structs.TaskEvent + if !healthy && h.isDeploy { + taskEvents = tracker.TaskEvents() + } + + h.runner.SetHealth(healthy, h.isDeploy, taskEvents) + } +} + +// getHealthParams returns the health watcher parameters which vary based on +// whether this allocation is in a deployment or migration. +func getHealthParams(now time.Time, tg *structs.TaskGroup, isDeploy bool) (deadline time.Time, useChecks bool, minHealthyTime time.Duration) { + if isDeploy { + deadline = now.Add(tg.Update.HealthyDeadline) + minHealthyTime = tg.Update.MinHealthyTime + useChecks = tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks + } else { + strategy := tg.Migrate + if strategy == nil { + // For backwards compat with pre-0.8 allocations that + // don't have a migrate strategy set. + strategy = structs.DefaultMigrateStrategy() + } + + deadline = now.Add(strategy.HealthyDeadline) + minHealthyTime = strategy.MinHealthyTime + useChecks = strategy.HealthCheck == structs.MigrateStrategyHealthChecks + } + return +} + +// noopAllocHealthWatcherHook is an empty hook implementation returned by +// newAllocHealthWatcherHook when an allocation will never need its health +// monitored. +type noopAllocHealthWatcherHook struct{} + +func (noopAllocHealthWatcherHook) Name() string { + return "alloc_health_watcher" +} diff --git a/client/allocrunnerv2/interfaces/runner_lifecycle.go b/client/allocrunnerv2/interfaces/runner_lifecycle.go index 63164dce9..f3150e60c 100644 --- a/client/allocrunnerv2/interfaces/runner_lifecycle.go +++ b/client/allocrunnerv2/interfaces/runner_lifecycle.go @@ -4,6 +4,7 @@ import ( "context" "github.com/hashicorp/nomad/client/allocrunnerv2/state" + "github.com/hashicorp/nomad/nomad/structs" ) // RunnnerHook is a lifecycle hook into the life cycle of an allocation runner. @@ -28,7 +29,11 @@ type RunnerDestroyHook interface { type RunnerUpdateHook interface { RunnerHook - Update() error + Update(*RunnerUpdateRequest) error +} + +type RunnerUpdateRequest struct { + Alloc *structs.Allocation } // XXX Not sure yet diff --git a/client/allocrunnerv2/state/state.go b/client/allocrunnerv2/state/state.go index 73310b7c1..85ec7c3da 100644 --- a/client/allocrunnerv2/state/state.go +++ b/client/allocrunnerv2/state/state.go @@ -1,6 +1,8 @@ package state import ( + "time" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -17,3 +19,27 @@ type State struct { // DeploymentStatus captures the status of the deployment DeploymentStatus *structs.AllocDeploymentStatus } + +// SetDeploymentStatus is a helper for updating the client-controlled +// DeploymentStatus fields: Healthy and Timestamp. The Canary and ModifyIndex +// fields should only be updated by the server. +func (s *State) SetDeploymentStatus(timestamp time.Time, healthy bool) { + if s.DeploymentStatus == nil { + s.DeploymentStatus = &structs.AllocDeploymentStatus{} + } + + s.DeploymentStatus.Healthy = &healthy + s.DeploymentStatus.Timestamp = timestamp +} + +// ClearDeploymentStatus is a helper to clear the client-controlled +// DeploymentStatus fields: Healthy and Timestamp. The Canary and ModifyIndex +// fields should only be updated by the server. +func (s *State) ClearDeploymentStatus() { + if s.DeploymentStatus == nil { + return + } + + s.DeploymentStatus.Healthy = nil + s.DeploymentStatus.Timestamp = time.Time{} +} diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 634a4f804..5bdc21c79 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -483,7 +483,8 @@ func (tr *TaskRunner) Restore() error { return nil } -// SetState sets the task runners allocation state. +// SetState sets the task runners allocation state and triggers a server +// update. func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { // Update the local state stateCopy := tr.setStateLocal(state, event) diff --git a/client/structs/broadcaster.go b/client/structs/broadcaster.go index 88b51ebf8..b3aa2fb3d 100644 --- a/client/structs/broadcaster.go +++ b/client/structs/broadcaster.go @@ -23,18 +23,14 @@ var ErrAllocBroadcasterClosed = errors.New("alloc broadcaster closed") // receive the latest allocation update -- never a stale version. type AllocBroadcaster struct { m sync.Mutex - alloc *structs.Allocation listeners map[int]chan *structs.Allocation // lazy init nextId int closed bool } -// NewAllocBroadcaster returns a new AllocBroadcaster with the given initial -// allocation. -func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster { - return &AllocBroadcaster{ - alloc: initial, - } +// NewAllocBroadcaster returns a new AllocBroadcaster. +func NewAllocBroadcaster() *AllocBroadcaster { + return &AllocBroadcaster{} } // Send broadcasts an allocation update. Any pending updates are replaced with @@ -47,9 +43,6 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error { return ErrAllocBroadcasterClosed } - // Update alloc on broadcaster to send to newly created listeners - b.alloc = v - // Send alloc to already created listeners for _, l := range b.listeners { select { @@ -64,7 +57,8 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error { } // Close closes the channel, disabling the sending of further allocation -// updates. Safe to call concurrently and more than once. +// updates. Pending updates are still received by listeners. Safe to call +// concurrently and more than once. func (b *AllocBroadcaster) Close() { b.m.Lock() defer b.m.Unlock() @@ -79,7 +73,6 @@ func (b *AllocBroadcaster) Close() { // Clear all references and mark broadcaster as closed b.listeners = nil - b.alloc = nil b.closed = true } @@ -118,12 +111,9 @@ func (b *AllocBroadcaster) Listen() *AllocListener { ch := make(chan *structs.Allocation, listenerCap) + // Broadcaster is already closed, close this listener if b.closed { - // Broadcaster is already closed, close this listener close(ch) - } else { - // Send the current allocation to the listener - ch <- b.alloc } b.listeners[b.nextId] = ch diff --git a/client/structs/broadcaster_test.go b/client/structs/broadcaster_test.go index 19c7ace2d..ac91bf226 100644 --- a/client/structs/broadcaster_test.go +++ b/client/structs/broadcaster_test.go @@ -9,25 +9,27 @@ import ( "github.com/stretchr/testify/require" ) -// TestAllocBroadcaster_SendRecv asserts the initial and latest sends to a -// broadcaster are received by listeners. +// TestAllocBroadcaster_SendRecv asserts the latest sends to a broadcaster are +// received by listeners. func TestAllocBroadcaster_SendRecv(t *testing.T) { t.Parallel() - alloc := mock.Alloc() - alloc.AllocModifyIndex = 10 - - b := NewAllocBroadcaster(alloc.Copy()) + b := NewAllocBroadcaster() defer b.Close() - // Create a listener and get the initial alloc + // Create a listener and assert it blocks until an update l := b.Listen() defer l.Close() - initial := <-l.Ch - require.Equal(t, alloc.AllocModifyIndex, initial.AllocModifyIndex) + select { + case <-l.Ch: + t.Fatalf("unexpected initial alloc") + case <-time.After(10 * time.Millisecond): + // Ok! Ch is empty until a Send + } - // Increment the index and send a new copy - alloc.AllocModifyIndex = 20 + // Send an update + alloc := mock.Alloc() + alloc.AllocModifyIndex = 10 require.NoError(t, b.Send(alloc.Copy())) recvd := <-l.Ch require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex) @@ -47,7 +49,7 @@ func TestAllocBroadcaster_RecvBlocks(t *testing.T) { t.Parallel() alloc := mock.Alloc() - b := NewAllocBroadcaster(alloc.Copy()) + b := NewAllocBroadcaster() defer b.Close() l1 := b.Listen() @@ -56,11 +58,6 @@ func TestAllocBroadcaster_RecvBlocks(t *testing.T) { l2 := b.Listen() defer l2.Close() - // Every listener should get the initial alloc even when there hasn't - // been a Send() - require.NotNil(t, <-l1.Ch) - require.NotNil(t, <-l2.Ch) - done := make(chan int, 2) // Subsequent listens should block until a subsequent send @@ -92,7 +89,7 @@ func TestAllocBroadcaster_Concurrency(t *testing.T) { t.Parallel() alloc := mock.Alloc() - b := NewAllocBroadcaster(alloc.Copy()) + b := NewAllocBroadcaster() defer b.Close() errs := make(chan error, 10)