mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
arv2: implement alloc health watching
Also remove initial alloc from broadcaster as it just caused useless extra processing.
This commit is contained in:
472
client/allochealth/tracker.go
Normal file
472
client/allochealth/tracker.go
Normal file
@@ -0,0 +1,472 @@
|
||||
package allochealth
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/api"
|
||||
hclog "github.com/hashicorp/go-hclog"
|
||||
cconsul "github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/command/agent/consul"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// allocHealthEventSource is the source used for emitting task events
|
||||
allocHealthEventSource = "Alloc Unhealthy"
|
||||
|
||||
// consulCheckLookupInterval is the interval at which we check if the
|
||||
// Consul checks are healthy or unhealthy.
|
||||
consulCheckLookupInterval = 500 * time.Millisecond
|
||||
)
|
||||
|
||||
// Tracker tracks the health of an allocation and makes health events watchable
|
||||
// via channels.
|
||||
type Tracker struct {
|
||||
// ctx and cancelFn is used to shutdown the tracker
|
||||
ctx context.Context
|
||||
cancelFn context.CancelFunc
|
||||
|
||||
// alloc is the alloc we are tracking
|
||||
alloc *structs.Allocation
|
||||
|
||||
// tg is the task group we are tracking
|
||||
tg *structs.TaskGroup
|
||||
|
||||
// minHealthyTime is the duration an alloc must remain healthy to be
|
||||
// considered healthy
|
||||
minHealthyTime time.Duration
|
||||
|
||||
// useChecks specifies whether to use Consul healh checks or not
|
||||
useChecks bool
|
||||
|
||||
// consulCheckCount is the number of checks the task group will attempt to
|
||||
// register
|
||||
consulCheckCount int
|
||||
|
||||
// allocUpdates is a listener for retrieving new alloc updates
|
||||
allocUpdates *cstructs.AllocListener
|
||||
|
||||
// consulClient is used to look up the state of the task's checks
|
||||
consulClient cconsul.ConsulServiceAPI
|
||||
|
||||
// healthy is used to signal whether we have determined the allocation to be
|
||||
// healthy or unhealthy
|
||||
healthy chan bool
|
||||
|
||||
// allocStopped is triggered when the allocation is stopped and tracking is
|
||||
// not needed
|
||||
allocStopped chan struct{}
|
||||
|
||||
// l is used to lock shared fields listed below
|
||||
l sync.Mutex
|
||||
|
||||
// tasksHealthy marks whether all the tasks have met their health check
|
||||
// (disregards Consul)
|
||||
tasksHealthy bool
|
||||
|
||||
// allocFailed marks whether the allocation failed
|
||||
allocFailed bool
|
||||
|
||||
// checksHealthy marks whether all the task's Consul checks are healthy
|
||||
checksHealthy bool
|
||||
|
||||
// taskHealth contains the health state for each task
|
||||
taskHealth map[string]*taskHealthState
|
||||
|
||||
logger hclog.Logger
|
||||
}
|
||||
|
||||
// NewTracker returns a health tracker for the given allocation. An alloc
|
||||
// listener and consul API object are given so that the watcher can detect
|
||||
// health changes.
|
||||
func NewTracker(parentCtx context.Context, logger hclog.Logger, alloc *structs.Allocation,
|
||||
allocUpdates *cstructs.AllocListener, consulClient cconsul.ConsulServiceAPI,
|
||||
minHealthyTime time.Duration, useChecks bool) *Tracker {
|
||||
|
||||
// Do not create a named sub-logger as the hook controlling
|
||||
// this struct should pass in an appropriately named
|
||||
// sub-logger.
|
||||
t := &Tracker{
|
||||
healthy: make(chan bool, 1),
|
||||
allocStopped: make(chan struct{}),
|
||||
alloc: alloc,
|
||||
tg: alloc.Job.LookupTaskGroup(alloc.TaskGroup),
|
||||
minHealthyTime: minHealthyTime,
|
||||
useChecks: useChecks,
|
||||
allocUpdates: allocUpdates,
|
||||
consulClient: consulClient,
|
||||
logger: logger,
|
||||
}
|
||||
|
||||
t.taskHealth = make(map[string]*taskHealthState, len(t.tg.Tasks))
|
||||
for _, task := range t.tg.Tasks {
|
||||
t.taskHealth[task.Name] = &taskHealthState{task: task}
|
||||
}
|
||||
|
||||
for _, task := range t.tg.Tasks {
|
||||
for _, s := range task.Services {
|
||||
t.consulCheckCount += len(s.Checks)
|
||||
}
|
||||
}
|
||||
|
||||
t.ctx, t.cancelFn = context.WithCancel(parentCtx)
|
||||
return t
|
||||
}
|
||||
|
||||
// Start starts the watcher.
|
||||
func (t *Tracker) Start() {
|
||||
go t.watchTaskEvents()
|
||||
if t.useChecks {
|
||||
go t.watchConsulEvents()
|
||||
}
|
||||
}
|
||||
|
||||
// HealthyCh returns a channel that will emit a boolean indicating the health of
|
||||
// the allocation.
|
||||
func (t *Tracker) HealthyCh() <-chan bool {
|
||||
return t.healthy
|
||||
}
|
||||
|
||||
// AllocStoppedCh returns a channel that will be fired if the allocation is
|
||||
// stopped. This means that health will not be set.
|
||||
func (t *Tracker) AllocStoppedCh() <-chan struct{} {
|
||||
return t.allocStopped
|
||||
}
|
||||
|
||||
// TaskEvents returns a map of events by task. This should only be called after
|
||||
// health has been determined. Only tasks that have contributed to the
|
||||
// allocation being unhealthy will have an event.
|
||||
func (t *Tracker) TaskEvents() map[string]*structs.TaskEvent {
|
||||
t.l.Lock()
|
||||
defer t.l.Unlock()
|
||||
|
||||
// Nothing to do since the failure wasn't task related
|
||||
if t.allocFailed {
|
||||
return nil
|
||||
}
|
||||
|
||||
deadline, _ := t.ctx.Deadline()
|
||||
events := make(map[string]*structs.TaskEvent, len(t.tg.Tasks))
|
||||
|
||||
// Go through are task information and build the event map
|
||||
for task, state := range t.taskHealth {
|
||||
useChecks := t.tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
|
||||
if e, ok := state.event(deadline, t.tg.Update.MinHealthyTime, useChecks); ok {
|
||||
events[task] = structs.NewTaskEvent(allocHealthEventSource).SetMessage(e)
|
||||
}
|
||||
}
|
||||
|
||||
return events
|
||||
}
|
||||
|
||||
// setTaskHealth is used to set the tasks health as healthy or unhealthy. If the
|
||||
// allocation is terminal, health is immediately broadcasted.
|
||||
func (t *Tracker) setTaskHealth(healthy, terminal bool) {
|
||||
t.l.Lock()
|
||||
defer t.l.Unlock()
|
||||
t.tasksHealthy = healthy
|
||||
|
||||
// If we are marked healthy but we also require Consul to be healthy and it
|
||||
// isn't yet, return, unless the task is terminal
|
||||
requireConsul := t.useChecks && t.consulCheckCount > 0
|
||||
if !terminal && healthy && requireConsul && !t.checksHealthy {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case t.healthy <- healthy:
|
||||
default:
|
||||
}
|
||||
|
||||
// Shutdown the tracker
|
||||
t.cancelFn()
|
||||
}
|
||||
|
||||
// setCheckHealth is used to mark the checks as either healthy or unhealthy.
|
||||
func (t *Tracker) setCheckHealth(healthy bool) {
|
||||
t.l.Lock()
|
||||
defer t.l.Unlock()
|
||||
t.checksHealthy = healthy
|
||||
|
||||
// Only signal if we are healthy and so is the tasks
|
||||
if !healthy || !t.tasksHealthy {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case t.healthy <- healthy:
|
||||
default:
|
||||
}
|
||||
|
||||
// Shutdown the tracker
|
||||
t.cancelFn()
|
||||
}
|
||||
|
||||
// markAllocStopped is used to mark the allocation as having stopped.
|
||||
func (t *Tracker) markAllocStopped() {
|
||||
close(t.allocStopped)
|
||||
t.cancelFn()
|
||||
}
|
||||
|
||||
// watchTaskEvents is a long lived watcher that watches for the health of the
|
||||
// allocation's tasks.
|
||||
func (t *Tracker) watchTaskEvents() {
|
||||
alloc := t.alloc
|
||||
allStartedTime := time.Time{}
|
||||
healthyTimer := time.NewTimer(0)
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
for {
|
||||
// If the alloc is being stopped by the server just exit
|
||||
switch alloc.DesiredStatus {
|
||||
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
|
||||
t.logger.Trace("desired status is terminal for alloc", "alloc_id", alloc.ID, "desired_status", alloc.DesiredStatus)
|
||||
t.markAllocStopped()
|
||||
return
|
||||
}
|
||||
|
||||
// Store the task states
|
||||
t.l.Lock()
|
||||
for task, state := range alloc.TaskStates {
|
||||
t.taskHealth[task].state = state
|
||||
}
|
||||
t.l.Unlock()
|
||||
|
||||
// Detect if the alloc is unhealthy or if all tasks have started yet
|
||||
latestStartTime := time.Time{}
|
||||
for _, state := range alloc.TaskStates {
|
||||
// One of the tasks has failed so we can exit watching
|
||||
if state.Failed || !state.FinishedAt.IsZero() {
|
||||
t.setTaskHealth(false, true)
|
||||
return
|
||||
}
|
||||
|
||||
if state.State != structs.TaskStateRunning {
|
||||
latestStartTime = time.Time{}
|
||||
break
|
||||
} else if state.StartedAt.After(latestStartTime) {
|
||||
latestStartTime = state.StartedAt
|
||||
}
|
||||
}
|
||||
|
||||
// If the alloc is marked as failed by the client but none of the
|
||||
// individual tasks failed, that means something failed at the alloc
|
||||
// level.
|
||||
if alloc.ClientStatus == structs.AllocClientStatusFailed {
|
||||
t.l.Lock()
|
||||
t.allocFailed = true
|
||||
t.l.Unlock()
|
||||
t.setTaskHealth(false, true)
|
||||
return
|
||||
}
|
||||
|
||||
if !latestStartTime.Equal(allStartedTime) {
|
||||
// Avoid the timer from firing at the old start time
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// Set the timer since all tasks are started
|
||||
if !latestStartTime.IsZero() {
|
||||
allStartedTime = latestStartTime
|
||||
healthyTimer.Reset(t.minHealthyTime)
|
||||
}
|
||||
}
|
||||
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case newAlloc, ok := <-t.allocUpdates.Ch:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
alloc = newAlloc
|
||||
case <-healthyTimer.C:
|
||||
t.setTaskHealth(true, false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// watchConsulEvents iis a long lived watcher that watches for the health of the
|
||||
// allocation's Consul checks.
|
||||
func (t *Tracker) watchConsulEvents() {
|
||||
// checkTicker is the ticker that triggers us to look at the checks in
|
||||
// Consul
|
||||
checkTicker := time.NewTicker(consulCheckLookupInterval)
|
||||
defer checkTicker.Stop()
|
||||
|
||||
// healthyTimer fires when the checks have been healthy for the
|
||||
// MinHealthyTime
|
||||
healthyTimer := time.NewTimer(0)
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
// primed marks whether the healthy timer has been set
|
||||
primed := false
|
||||
|
||||
// Store whether the last Consul checks call was successful or not
|
||||
consulChecksErr := false
|
||||
|
||||
// allocReg are the registered objects in Consul for the allocation
|
||||
var allocReg *consul.AllocRegistration
|
||||
|
||||
OUTER:
|
||||
for {
|
||||
select {
|
||||
case <-t.ctx.Done():
|
||||
return
|
||||
case <-checkTicker.C:
|
||||
newAllocReg, err := t.consulClient.AllocRegistrations(t.alloc.ID)
|
||||
if err != nil {
|
||||
if !consulChecksErr {
|
||||
consulChecksErr = true
|
||||
t.logger.Warn("error looking up Consul registrations for allocation", "error", err, "alloc_id", t.alloc.ID)
|
||||
}
|
||||
continue OUTER
|
||||
} else {
|
||||
consulChecksErr = false
|
||||
allocReg = newAllocReg
|
||||
}
|
||||
case <-healthyTimer.C:
|
||||
t.setCheckHealth(true)
|
||||
}
|
||||
|
||||
if allocReg == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// Store the task registrations
|
||||
t.l.Lock()
|
||||
for task, reg := range allocReg.Tasks {
|
||||
t.taskHealth[task].taskRegistrations = reg
|
||||
}
|
||||
t.l.Unlock()
|
||||
|
||||
// Detect if all the checks are passing
|
||||
passed := true
|
||||
|
||||
CHECKS:
|
||||
for _, treg := range allocReg.Tasks {
|
||||
for _, sreg := range treg.Services {
|
||||
for _, check := range sreg.Checks {
|
||||
if check.Status == api.HealthPassing {
|
||||
continue
|
||||
}
|
||||
|
||||
passed = false
|
||||
t.setCheckHealth(false)
|
||||
break CHECKS
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if !passed {
|
||||
// Reset the timer since we have transitioned back to unhealthy
|
||||
if primed {
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
primed = false
|
||||
}
|
||||
} else if !primed {
|
||||
// Reset the timer to fire after MinHealthyTime
|
||||
if !healthyTimer.Stop() {
|
||||
select {
|
||||
case <-healthyTimer.C:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
primed = true
|
||||
healthyTimer.Reset(t.minHealthyTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// taskHealthState captures all known health information about a task. It is
|
||||
// largely used to determine if the task has contributed to the allocation being
|
||||
// unhealthy.
|
||||
type taskHealthState struct {
|
||||
task *structs.Task
|
||||
state *structs.TaskState
|
||||
taskRegistrations *consul.TaskRegistration
|
||||
}
|
||||
|
||||
// event takes the deadline time for the allocation to be healthy and the update
|
||||
// strategy of the group. It returns true if the task has contributed to the
|
||||
// allocation being unhealthy and if so, an event description of why.
|
||||
func (t *taskHealthState) event(deadline time.Time, minHealthyTime time.Duration, useChecks bool) (string, bool) {
|
||||
requireChecks := false
|
||||
desiredChecks := 0
|
||||
for _, s := range t.task.Services {
|
||||
if nc := len(s.Checks); nc > 0 {
|
||||
requireChecks = true
|
||||
desiredChecks += nc
|
||||
}
|
||||
}
|
||||
requireChecks = requireChecks && useChecks
|
||||
|
||||
if t.state != nil {
|
||||
if t.state.Failed {
|
||||
return "Unhealthy because of failed task", true
|
||||
}
|
||||
if t.state.State != structs.TaskStateRunning {
|
||||
return "Task not running by deadline", true
|
||||
}
|
||||
|
||||
// We are running so check if we have been running long enough
|
||||
if t.state.StartedAt.Add(minHealthyTime).After(deadline) {
|
||||
return fmt.Sprintf("Task not running for min_healthy_time of %v by deadline", minHealthyTime), true
|
||||
}
|
||||
}
|
||||
|
||||
if t.taskRegistrations != nil {
|
||||
var notPassing []string
|
||||
passing := 0
|
||||
|
||||
OUTER:
|
||||
for _, sreg := range t.taskRegistrations.Services {
|
||||
for _, check := range sreg.Checks {
|
||||
if check.Status != api.HealthPassing {
|
||||
notPassing = append(notPassing, sreg.Service.Service)
|
||||
continue OUTER
|
||||
} else {
|
||||
passing++
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(notPassing) != 0 {
|
||||
return fmt.Sprintf("Services not healthy by deadline: %s", strings.Join(notPassing, ", ")), true
|
||||
}
|
||||
|
||||
if passing != desiredChecks {
|
||||
return fmt.Sprintf("Only %d out of %d checks registered and passing", passing, desiredChecks), true
|
||||
}
|
||||
|
||||
} else if requireChecks {
|
||||
return "Service checks not registered", true
|
||||
}
|
||||
|
||||
return "", false
|
||||
}
|
||||
@@ -24,14 +24,6 @@ import (
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
const (
|
||||
// updateChCap is the capacity of AllocRunner's updateCh. It must be 1
|
||||
// as we only want to process the latest update, so if there's already
|
||||
// a pending update it will be removed from the chan before adding the
|
||||
// newer update.
|
||||
updateChCap = 1
|
||||
)
|
||||
|
||||
// allocRunner is used to run all the tasks in a given allocation
|
||||
type allocRunner struct {
|
||||
// id is the ID of the allocation. Can be accessed without a lock
|
||||
@@ -82,11 +74,6 @@ type allocRunner struct {
|
||||
tasks map[string]*taskrunner.TaskRunner
|
||||
tasksLock sync.RWMutex
|
||||
|
||||
// updateCh receives allocation updates via the Update method. Must
|
||||
// have buffer size 1 in order to support dropping pending updates when
|
||||
// a newer allocation is received.
|
||||
updateCh chan *structs.Allocation
|
||||
|
||||
// allocBroadcaster sends client allocation updates to all listeners
|
||||
allocBroadcaster *cstructs.AllocBroadcaster
|
||||
|
||||
@@ -111,16 +98,15 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
|
||||
vaultClient: config.Vault,
|
||||
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
|
||||
waitCh: make(chan struct{}),
|
||||
updateCh: make(chan *structs.Allocation, updateChCap),
|
||||
state: &state.State{},
|
||||
stateDB: config.StateDB,
|
||||
stateUpdater: config.StateUpdater,
|
||||
allocBroadcaster: cstructs.NewAllocBroadcaster(alloc),
|
||||
allocBroadcaster: cstructs.NewAllocBroadcaster(),
|
||||
prevAllocWatcher: config.PrevAllocWatcher,
|
||||
}
|
||||
|
||||
// Create the logger based on the allocation ID
|
||||
ar.logger = config.Logger.With("alloc_id", alloc.ID)
|
||||
ar.logger = config.Logger.Named("alloc_runner").With("alloc_id", alloc.ID)
|
||||
|
||||
// Create alloc dir
|
||||
ar.allocDir = allocdir.NewAllocDir(ar.logger, filepath.Join(config.ClientConfig.AllocDir, alloc.ID))
|
||||
@@ -251,6 +237,56 @@ func (ar *allocRunner) Restore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetHealth allows the health watcher hook to set the alloc's
|
||||
// deployment/migration health and emit task events.
|
||||
//
|
||||
// Only for use by health hook.
|
||||
func (ar *allocRunner) SetHealth(healthy, isDeploy bool, trackerTaskEvents map[string]*structs.TaskEvent) {
|
||||
// Updating alloc deployment state is tricky because it may be nil, but
|
||||
// if it's not then we need to maintain the values of Canary and
|
||||
// ModifyIndex as they're only mutated by the server.
|
||||
ar.stateLock.Lock()
|
||||
ar.state.SetDeploymentStatus(time.Now(), healthy)
|
||||
ar.stateLock.Unlock()
|
||||
|
||||
// If deployment is unhealthy emit task events explaining why
|
||||
ar.tasksLock.RLock()
|
||||
if !healthy && isDeploy {
|
||||
for task, event := range trackerTaskEvents {
|
||||
if tr, ok := ar.tasks[task]; ok {
|
||||
tr.EmitEvent(event)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Gather the state of the other tasks
|
||||
states := make(map[string]*structs.TaskState, len(ar.tasks))
|
||||
for name, tr := range ar.tasks {
|
||||
states[name] = tr.TaskState()
|
||||
}
|
||||
ar.tasksLock.RUnlock()
|
||||
|
||||
// Build the client allocation
|
||||
calloc := ar.clientAlloc(states)
|
||||
|
||||
// Update the server
|
||||
ar.stateUpdater.AllocStateUpdated(calloc)
|
||||
|
||||
// Broadcast client alloc to listeners
|
||||
ar.allocBroadcaster.Send(calloc)
|
||||
}
|
||||
|
||||
// ClearHealth allows the health watcher hook to clear the alloc's deployment
|
||||
// health if the deployment id changes. It does not update the server as the
|
||||
// status is only cleared when already receiving an update from the server.
|
||||
//
|
||||
// Only for use by health hook.
|
||||
func (ar *allocRunner) ClearHealth() {
|
||||
ar.stateLock.Lock()
|
||||
ar.state.ClearDeploymentStatus()
|
||||
ar.stateLock.Unlock()
|
||||
}
|
||||
|
||||
// TaskStateUpdated is called by TaskRunner when a task's state has been
|
||||
// updated. This hook is used to compute changes to the alloc's ClientStatus
|
||||
// and to update the server with the new state.
|
||||
@@ -408,7 +444,10 @@ func (ar *allocRunner) Update(update *structs.Allocation) {
|
||||
// Update ar.alloc
|
||||
ar.setAlloc(update)
|
||||
|
||||
//TODO Run AR Update hooks
|
||||
// Run hooks
|
||||
if err := ar.update(update); err != nil {
|
||||
ar.logger.Error("error running update hooks", "error", err)
|
||||
}
|
||||
|
||||
// Update task runners
|
||||
for _, tr := range ar.tasks {
|
||||
|
||||
@@ -5,9 +5,9 @@ import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// initRunnerHooks intializes the runners hooks.
|
||||
@@ -19,6 +19,7 @@ func (ar *allocRunner) initRunnerHooks() {
|
||||
ar.runnerHooks = []interfaces.RunnerHook{
|
||||
newAllocDirHook(hookLogger, ar.allocDir),
|
||||
newDiskMigrationHook(hookLogger, ar.prevAllocWatcher, ar.allocDir),
|
||||
newAllocHealthWatcherHook(hookLogger, ar, ar.consulClient),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,6 +64,48 @@ func (ar *allocRunner) prerun() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// update runs the alloc runner update hooks. Update hooks are run
|
||||
// asynchronously with all other alloc runner operations.
|
||||
func (ar *allocRunner) update(update *structs.Allocation) error {
|
||||
if ar.logger.IsTrace() {
|
||||
start := time.Now()
|
||||
ar.logger.Trace("running update hooks", "start", start)
|
||||
defer func() {
|
||||
end := time.Now()
|
||||
ar.logger.Trace("finished update hooks", "end", end, "duration", end.Sub(start))
|
||||
}()
|
||||
}
|
||||
|
||||
req := &interfaces.RunnerUpdateRequest{
|
||||
Alloc: update,
|
||||
}
|
||||
|
||||
for _, hook := range ar.runnerHooks {
|
||||
h, ok := hook.(interfaces.RunnerUpdateHook)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
|
||||
name := h.Name()
|
||||
var start time.Time
|
||||
if ar.logger.IsTrace() {
|
||||
start = time.Now()
|
||||
ar.logger.Trace("running pre-run hook", "name", name, "start", start)
|
||||
}
|
||||
|
||||
if err := h.Update(req); err != nil {
|
||||
return fmt.Errorf("hook %q failed: %v", name, err)
|
||||
}
|
||||
|
||||
if ar.logger.IsTrace() {
|
||||
end := time.Now()
|
||||
ar.logger.Trace("finished update hooks", "name", name, "end", end, "duration", end.Sub(start))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// postrun is used to run the runners postrun hooks.
|
||||
func (ar *allocRunner) postrun() error {
|
||||
if ar.logger.IsTrace() {
|
||||
@@ -138,44 +181,3 @@ func (ar *allocRunner) destroy() error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO
|
||||
type allocHealthWatcherHook struct {
|
||||
runner *allocRunner
|
||||
logger log.Logger
|
||||
ctx context.Context
|
||||
cancelFn context.CancelFunc
|
||||
}
|
||||
|
||||
func newAllocHealthWatcherHook(runner *allocRunner, logger log.Logger) *allocHealthWatcherHook {
|
||||
ctx, cancelFn := context.WithCancel(context.Background())
|
||||
ad := &allocHealthWatcherHook{
|
||||
runner: runner,
|
||||
ctx: ctx,
|
||||
cancelFn: cancelFn,
|
||||
}
|
||||
|
||||
ad.logger = logger.Named(ad.Name())
|
||||
return ad
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Name() string {
|
||||
return "alloc_health_watcher"
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Prerun() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Update() error {
|
||||
// Cancel the old watcher and create a new one
|
||||
h.cancelFn()
|
||||
|
||||
// TODO create the new one
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Destroy() error {
|
||||
h.cancelFn()
|
||||
return nil
|
||||
}
|
||||
|
||||
247
client/allocrunnerv2/health_hook.go
Normal file
247
client/allocrunnerv2/health_hook.go
Normal file
@@ -0,0 +1,247 @@
|
||||
package allocrunnerv2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
log "github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/allochealth"
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
|
||||
"github.com/hashicorp/nomad/client/consul"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// healthMutator is able to set/clear alloc health as well as listen for alloc
|
||||
// changes.
|
||||
type healthMutator interface {
|
||||
// Alloc returns the original alloc
|
||||
Alloc() *structs.Allocation
|
||||
|
||||
// Set health via the mutator
|
||||
SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent)
|
||||
|
||||
// Clear health when the deployment ID changes
|
||||
ClearHealth()
|
||||
|
||||
// Listener listens for alloc updates
|
||||
Listener() *cstructs.AllocListener
|
||||
}
|
||||
|
||||
// allocHealthWatcherHook is responsible for watching an allocation's task
|
||||
// status and (optionally) Consul health check status to determine if the
|
||||
// allocation is health or unhealthy. Used by deployments and migrations.
|
||||
type allocHealthWatcherHook struct {
|
||||
runner healthMutator
|
||||
|
||||
// consul client used to monitor health checks
|
||||
consul consul.ConsulServiceAPI
|
||||
|
||||
// listener is given to trackers to listen for alloc updates and closed
|
||||
// when the alloc is destroyed.
|
||||
listener *cstructs.AllocListener
|
||||
|
||||
// hookLock is held by hook methods to prevent concurrent access by
|
||||
// Update and synchronous hooks.
|
||||
hookLock sync.Mutex
|
||||
|
||||
// watchLock is held by the health watching/setting goroutine to ensure
|
||||
// only one health watching goroutine is running at a time.
|
||||
watchLock sync.Mutex
|
||||
|
||||
// ranOnce is set once Prerun or Update have run at least once. This
|
||||
// prevents Prerun from running if an Update has already been
|
||||
// processed. Must hold hookLock to access.
|
||||
ranOnce bool
|
||||
|
||||
// cancelFn stops the health watching/setting goroutine. Grab the
|
||||
// watchLock to block until it exits.
|
||||
cancelFn context.CancelFunc
|
||||
|
||||
// alloc set by new func or Update. Must hold hookLock to access.
|
||||
alloc *structs.Allocation
|
||||
|
||||
// isDeploy is true if monitoring a deployment. Set in init(). Must
|
||||
// hold hookLock to access.
|
||||
isDeploy bool
|
||||
|
||||
logger log.Logger
|
||||
}
|
||||
|
||||
func newAllocHealthWatcherHook(logger log.Logger, runner healthMutator, consul consul.ConsulServiceAPI) interfaces.RunnerHook {
|
||||
alloc := runner.Alloc()
|
||||
|
||||
// Neither deployments nor migrations care about the health of
|
||||
// non-service jobs so never watch their health
|
||||
if alloc.Job.Type != structs.JobTypeService {
|
||||
return noopAllocHealthWatcherHook{}
|
||||
}
|
||||
|
||||
h := &allocHealthWatcherHook{
|
||||
runner: runner,
|
||||
alloc: alloc,
|
||||
cancelFn: func() {}, // initialize to prevent nil func panics
|
||||
consul: consul,
|
||||
listener: runner.Listener(),
|
||||
}
|
||||
|
||||
h.logger = logger.Named(h.Name())
|
||||
return h
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Name() string {
|
||||
return "alloc_health_watcher"
|
||||
}
|
||||
|
||||
// init starts the allochealth.Tracker and watchHealth goroutine on either
|
||||
// Prerun or Update. Caller must set/update alloc and logger fields.
|
||||
//
|
||||
// Not threadsafe so the caller should lock since Updates occur concurrently.
|
||||
func (h *allocHealthWatcherHook) init() error {
|
||||
// No need to watch health as it's already set
|
||||
if h.alloc.DeploymentStatus.HasHealth() {
|
||||
return nil
|
||||
}
|
||||
|
||||
tg := h.alloc.Job.LookupTaskGroup(h.alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
return fmt.Errorf("task group %q does not exist in job %q", h.alloc.TaskGroup, h.alloc.Job.ID)
|
||||
}
|
||||
|
||||
h.isDeploy = h.alloc.DeploymentID != ""
|
||||
|
||||
// No need to watch allocs for deployments that rely on operators
|
||||
// manually setting health
|
||||
if h.isDeploy && (tg.Update == nil || tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Manual) {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Define the deadline, health method, min healthy time from the
|
||||
// deployment if this is a deployment; otherwise from the migration
|
||||
// strategy.
|
||||
deadline, useChecks, minHealthyTime := getHealthParams(time.Now(), tg, h.isDeploy)
|
||||
|
||||
// Create a context that is canceled when the tracker should shutdown
|
||||
// or the deadline is reached.
|
||||
ctx := context.Background()
|
||||
ctx, h.cancelFn = context.WithDeadline(ctx, deadline)
|
||||
|
||||
// Create a new tracker, start it, and watch for health results.
|
||||
tracker := allochealth.NewTracker(ctx, h.logger, h.alloc,
|
||||
h.listener, h.consul, minHealthyTime, useChecks)
|
||||
tracker.Start()
|
||||
go h.watchHealth(ctx, tracker)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Prerun(context.Context) error {
|
||||
h.hookLock.Lock()
|
||||
defer h.hookLock.Unlock()
|
||||
|
||||
if h.ranOnce {
|
||||
// An Update beat Prerun to running the watcher; noop
|
||||
return nil
|
||||
}
|
||||
|
||||
h.ranOnce = true
|
||||
return h.init()
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) error {
|
||||
h.hookLock.Lock()
|
||||
defer h.hookLock.Unlock()
|
||||
|
||||
// Prevent Prerun from running after an Update
|
||||
h.ranOnce = true
|
||||
|
||||
// Cancel the old watcher and create a new one
|
||||
h.cancelFn()
|
||||
|
||||
// Acquire the watchLock to ensure the old watcher has exited before
|
||||
// continuing. Kind of an ugly/easy-to-reuse done chan.
|
||||
h.watchLock.Lock()
|
||||
h.watchLock.Unlock()
|
||||
|
||||
// Deployment has changed, reset status
|
||||
if req.Alloc.DeploymentID != h.alloc.DeploymentID {
|
||||
h.runner.ClearHealth()
|
||||
}
|
||||
|
||||
// Update alloc
|
||||
h.alloc = req.Alloc
|
||||
|
||||
return h.init()
|
||||
}
|
||||
|
||||
func (h *allocHealthWatcherHook) Destroy() error {
|
||||
h.hookLock.Lock()
|
||||
defer h.hookLock.Unlock()
|
||||
|
||||
h.cancelFn()
|
||||
h.listener.Close()
|
||||
|
||||
// Acquire the watchLock to ensure any existing watcher has exited
|
||||
// before exiting. Kind of an ugly/easy-to-reuse done chan.
|
||||
h.watchLock.Lock()
|
||||
h.watchLock.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// watchHealth watches alloc health until it is set, the alloc is stopped, or
|
||||
// the context is canceled. watchHealth will be canceled and restarted on
|
||||
// Updates so calls are serialized with a lock.
|
||||
func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker) {
|
||||
h.watchLock.Lock()
|
||||
defer h.watchLock.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
||||
case <-tracker.AllocStoppedCh():
|
||||
return
|
||||
|
||||
case healthy := <-tracker.HealthyCh():
|
||||
// If this is an unhealthy deployment emit events for tasks
|
||||
var taskEvents map[string]*structs.TaskEvent
|
||||
if !healthy && h.isDeploy {
|
||||
taskEvents = tracker.TaskEvents()
|
||||
}
|
||||
|
||||
h.runner.SetHealth(healthy, h.isDeploy, taskEvents)
|
||||
}
|
||||
}
|
||||
|
||||
// getHealthParams returns the health watcher parameters which vary based on
|
||||
// whether this allocation is in a deployment or migration.
|
||||
func getHealthParams(now time.Time, tg *structs.TaskGroup, isDeploy bool) (deadline time.Time, useChecks bool, minHealthyTime time.Duration) {
|
||||
if isDeploy {
|
||||
deadline = now.Add(tg.Update.HealthyDeadline)
|
||||
minHealthyTime = tg.Update.MinHealthyTime
|
||||
useChecks = tg.Update.HealthCheck == structs.UpdateStrategyHealthCheck_Checks
|
||||
} else {
|
||||
strategy := tg.Migrate
|
||||
if strategy == nil {
|
||||
// For backwards compat with pre-0.8 allocations that
|
||||
// don't have a migrate strategy set.
|
||||
strategy = structs.DefaultMigrateStrategy()
|
||||
}
|
||||
|
||||
deadline = now.Add(strategy.HealthyDeadline)
|
||||
minHealthyTime = strategy.MinHealthyTime
|
||||
useChecks = strategy.HealthCheck == structs.MigrateStrategyHealthChecks
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// noopAllocHealthWatcherHook is an empty hook implementation returned by
|
||||
// newAllocHealthWatcherHook when an allocation will never need its health
|
||||
// monitored.
|
||||
type noopAllocHealthWatcherHook struct{}
|
||||
|
||||
func (noopAllocHealthWatcherHook) Name() string {
|
||||
return "alloc_health_watcher"
|
||||
}
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/nomad/client/allocrunnerv2/state"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// RunnnerHook is a lifecycle hook into the life cycle of an allocation runner.
|
||||
@@ -28,7 +29,11 @@ type RunnerDestroyHook interface {
|
||||
|
||||
type RunnerUpdateHook interface {
|
||||
RunnerHook
|
||||
Update() error
|
||||
Update(*RunnerUpdateRequest) error
|
||||
}
|
||||
|
||||
type RunnerUpdateRequest struct {
|
||||
Alloc *structs.Allocation
|
||||
}
|
||||
|
||||
// XXX Not sure yet
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
package state
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -17,3 +19,27 @@ type State struct {
|
||||
// DeploymentStatus captures the status of the deployment
|
||||
DeploymentStatus *structs.AllocDeploymentStatus
|
||||
}
|
||||
|
||||
// SetDeploymentStatus is a helper for updating the client-controlled
|
||||
// DeploymentStatus fields: Healthy and Timestamp. The Canary and ModifyIndex
|
||||
// fields should only be updated by the server.
|
||||
func (s *State) SetDeploymentStatus(timestamp time.Time, healthy bool) {
|
||||
if s.DeploymentStatus == nil {
|
||||
s.DeploymentStatus = &structs.AllocDeploymentStatus{}
|
||||
}
|
||||
|
||||
s.DeploymentStatus.Healthy = &healthy
|
||||
s.DeploymentStatus.Timestamp = timestamp
|
||||
}
|
||||
|
||||
// ClearDeploymentStatus is a helper to clear the client-controlled
|
||||
// DeploymentStatus fields: Healthy and Timestamp. The Canary and ModifyIndex
|
||||
// fields should only be updated by the server.
|
||||
func (s *State) ClearDeploymentStatus() {
|
||||
if s.DeploymentStatus == nil {
|
||||
return
|
||||
}
|
||||
|
||||
s.DeploymentStatus.Healthy = nil
|
||||
s.DeploymentStatus.Timestamp = time.Time{}
|
||||
}
|
||||
|
||||
@@ -483,7 +483,8 @@ func (tr *TaskRunner) Restore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetState sets the task runners allocation state.
|
||||
// SetState sets the task runners allocation state and triggers a server
|
||||
// update.
|
||||
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
|
||||
// Update the local state
|
||||
stateCopy := tr.setStateLocal(state, event)
|
||||
|
||||
@@ -23,18 +23,14 @@ var ErrAllocBroadcasterClosed = errors.New("alloc broadcaster closed")
|
||||
// receive the latest allocation update -- never a stale version.
|
||||
type AllocBroadcaster struct {
|
||||
m sync.Mutex
|
||||
alloc *structs.Allocation
|
||||
listeners map[int]chan *structs.Allocation // lazy init
|
||||
nextId int
|
||||
closed bool
|
||||
}
|
||||
|
||||
// NewAllocBroadcaster returns a new AllocBroadcaster with the given initial
|
||||
// allocation.
|
||||
func NewAllocBroadcaster(initial *structs.Allocation) *AllocBroadcaster {
|
||||
return &AllocBroadcaster{
|
||||
alloc: initial,
|
||||
}
|
||||
// NewAllocBroadcaster returns a new AllocBroadcaster.
|
||||
func NewAllocBroadcaster() *AllocBroadcaster {
|
||||
return &AllocBroadcaster{}
|
||||
}
|
||||
|
||||
// Send broadcasts an allocation update. Any pending updates are replaced with
|
||||
@@ -47,9 +43,6 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
|
||||
return ErrAllocBroadcasterClosed
|
||||
}
|
||||
|
||||
// Update alloc on broadcaster to send to newly created listeners
|
||||
b.alloc = v
|
||||
|
||||
// Send alloc to already created listeners
|
||||
for _, l := range b.listeners {
|
||||
select {
|
||||
@@ -64,7 +57,8 @@ func (b *AllocBroadcaster) Send(v *structs.Allocation) error {
|
||||
}
|
||||
|
||||
// Close closes the channel, disabling the sending of further allocation
|
||||
// updates. Safe to call concurrently and more than once.
|
||||
// updates. Pending updates are still received by listeners. Safe to call
|
||||
// concurrently and more than once.
|
||||
func (b *AllocBroadcaster) Close() {
|
||||
b.m.Lock()
|
||||
defer b.m.Unlock()
|
||||
@@ -79,7 +73,6 @@ func (b *AllocBroadcaster) Close() {
|
||||
|
||||
// Clear all references and mark broadcaster as closed
|
||||
b.listeners = nil
|
||||
b.alloc = nil
|
||||
b.closed = true
|
||||
}
|
||||
|
||||
@@ -118,12 +111,9 @@ func (b *AllocBroadcaster) Listen() *AllocListener {
|
||||
|
||||
ch := make(chan *structs.Allocation, listenerCap)
|
||||
|
||||
// Broadcaster is already closed, close this listener
|
||||
if b.closed {
|
||||
// Broadcaster is already closed, close this listener
|
||||
close(ch)
|
||||
} else {
|
||||
// Send the current allocation to the listener
|
||||
ch <- b.alloc
|
||||
}
|
||||
|
||||
b.listeners[b.nextId] = ch
|
||||
|
||||
@@ -9,25 +9,27 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TestAllocBroadcaster_SendRecv asserts the initial and latest sends to a
|
||||
// broadcaster are received by listeners.
|
||||
// TestAllocBroadcaster_SendRecv asserts the latest sends to a broadcaster are
|
||||
// received by listeners.
|
||||
func TestAllocBroadcaster_SendRecv(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.AllocModifyIndex = 10
|
||||
|
||||
b := NewAllocBroadcaster(alloc.Copy())
|
||||
b := NewAllocBroadcaster()
|
||||
defer b.Close()
|
||||
|
||||
// Create a listener and get the initial alloc
|
||||
// Create a listener and assert it blocks until an update
|
||||
l := b.Listen()
|
||||
defer l.Close()
|
||||
initial := <-l.Ch
|
||||
require.Equal(t, alloc.AllocModifyIndex, initial.AllocModifyIndex)
|
||||
select {
|
||||
case <-l.Ch:
|
||||
t.Fatalf("unexpected initial alloc")
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
// Ok! Ch is empty until a Send
|
||||
}
|
||||
|
||||
// Increment the index and send a new copy
|
||||
alloc.AllocModifyIndex = 20
|
||||
// Send an update
|
||||
alloc := mock.Alloc()
|
||||
alloc.AllocModifyIndex = 10
|
||||
require.NoError(t, b.Send(alloc.Copy()))
|
||||
recvd := <-l.Ch
|
||||
require.Equal(t, alloc.AllocModifyIndex, recvd.AllocModifyIndex)
|
||||
@@ -47,7 +49,7 @@ func TestAllocBroadcaster_RecvBlocks(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
b := NewAllocBroadcaster(alloc.Copy())
|
||||
b := NewAllocBroadcaster()
|
||||
defer b.Close()
|
||||
|
||||
l1 := b.Listen()
|
||||
@@ -56,11 +58,6 @@ func TestAllocBroadcaster_RecvBlocks(t *testing.T) {
|
||||
l2 := b.Listen()
|
||||
defer l2.Close()
|
||||
|
||||
// Every listener should get the initial alloc even when there hasn't
|
||||
// been a Send()
|
||||
require.NotNil(t, <-l1.Ch)
|
||||
require.NotNil(t, <-l2.Ch)
|
||||
|
||||
done := make(chan int, 2)
|
||||
|
||||
// Subsequent listens should block until a subsequent send
|
||||
@@ -92,7 +89,7 @@ func TestAllocBroadcaster_Concurrency(t *testing.T) {
|
||||
t.Parallel()
|
||||
|
||||
alloc := mock.Alloc()
|
||||
b := NewAllocBroadcaster(alloc.Copy())
|
||||
b := NewAllocBroadcaster()
|
||||
defer b.Close()
|
||||
|
||||
errs := make(chan error, 10)
|
||||
|
||||
Reference in New Issue
Block a user