initial watcher

This commit is contained in:
Alex Dadgar
2017-07-02 21:49:56 -07:00
parent 7e507719f8
commit da82a6e814
7 changed files with 270 additions and 18 deletions

View File

@@ -475,6 +475,7 @@ func (t *Task) SetLogConfig(l *LogConfig) *Task {
type TaskState struct {
State string
Failed bool
Restarts uint64
StartedAt time.Time
FinishedAt time.Time
Events []*TaskEvent

View File

@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/nomad/client/allocdir"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/vaultclient"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
cstructs "github.com/hashicorp/nomad/client/structs"
@@ -52,6 +53,8 @@ type AllocRunner struct {
alloc *structs.Allocation
allocClientStatus string // Explicit status of allocation. Set when there are failures
allocClientDescription string
allocHealth *bool // Whether the allocation is healthy
allocBroadcast *cstructs.AllocBroadcaster
allocLock sync.Mutex
dirtyCh chan struct{}
@@ -143,20 +146,21 @@ func NewAllocRunner(logger *log.Logger, config *config.Config, stateDB *bolt.DB,
consulClient ConsulServiceAPI) *AllocRunner {
ar := &AllocRunner{
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
config: config,
stateDB: stateDB,
updater: updater,
logger: logger,
alloc: alloc,
allocBroadcast: cstructs.NewAllocBroadcaster(0),
dirtyCh: make(chan struct{}, 1),
tasks: make(map[string]*TaskRunner),
taskStates: copyTaskStates(alloc.TaskStates),
restored: make(map[string]struct{}),
updateCh: make(chan *structs.Allocation, 64),
destroyCh: make(chan struct{}),
waitCh: make(chan struct{}),
vaultClient: vaultClient,
consulClient: consulClient,
}
return ar
}
@@ -475,6 +479,14 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
r.allocLock.Unlock()
return alloc
}
// The health has been set
if r.allocHealth != nil {
if alloc.DeploymentStatus == nil {
alloc.DeploymentStatus = &structs.AllocDeploymentStatus{}
}
alloc.DeploymentStatus.Healthy = helper.BoolToPtr(*r.allocHealth)
}
r.allocLock.Unlock()
// Scan the task states to determine the status of the alloc
@@ -536,6 +548,7 @@ func (r *AllocRunner) syncStatus() error {
// Get a copy of our alloc, update status server side and sync to disk
alloc := r.Alloc()
r.updater(alloc)
r.allocBroadcast.Send(alloc)
return r.saveAllocRunnerState()
}
@@ -567,6 +580,9 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
if event.FailsTask {
taskState.Failed = true
}
if event.Type == structs.TaskRestarting {
taskState.Restarts++
}
r.appendTaskEvent(taskState, event)
}
@@ -650,6 +666,7 @@ func (r *AllocRunner) appendTaskEvent(state *structs.TaskState, event *structs.T
func (r *AllocRunner) Run() {
defer close(r.waitCh)
go r.dirtySyncState()
go r.watchHealth()
// Find the task group to run in the allocation
alloc := r.Alloc()
@@ -913,6 +930,7 @@ func (r *AllocRunner) Destroy() {
}
r.destroy = true
close(r.destroyCh)
r.allocBroadcast.Close()
}
// WaitCh returns a channel to wait for termination

View File

@@ -0,0 +1,145 @@
package client
import (
"time"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
)
// watchHealth is responsible for watching an allocation's task status and
// potentially consul health check status to determine if the allocation is
// healthy or unhealthy.
func (r *AllocRunner) watchHealth() {
// Get our alloc and the task group
alloc := r.Alloc()
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
if tg == nil {
r.logger.Printf("[ERR] client.alloc_watcher: failed to lookup allocation's task group. Exiting watcher")
return
}
u := tg.Update
// Checks marks whether we should be watching for Consul health checks
checks := false
r.logger.Printf("XXX %v", checks)
switch {
case u == nil:
r.logger.Printf("[TRACE] client.alloc_watcher: no update block for alloc %q. exiting", alloc.ID)
return
case u.HealthCheck == structs.UpdateStrategyHealthCheck_Manual:
r.logger.Printf("[TRACE] client.alloc_watcher: update block has manual checks for alloc %q. exiting", alloc.ID)
return
case u.HealthCheck == structs.UpdateStrategyHealthCheck_Checks:
checks = true
}
// Get a listener so we know when an allocation is updated.
l := r.allocBroadcast.Listen()
// Create a deadline timer for the health
deadline := time.NewTimer(u.HealthyDeadline)
// Create a healthy timer
latestHealthyTime := time.Unix(0, 0)
healthyTimer := time.NewTimer(0)
if !healthyTimer.Stop() {
<-healthyTimer.C
}
// Cleanup function
defer func() {
if !deadline.Stop() {
<-deadline.C
}
if !healthyTimer.Stop() {
<-healthyTimer.C
}
l.Close()
}()
setHealth := func(h bool) {
r.allocLock.Lock()
r.allocHealth = helper.BoolToPtr(h)
r.allocLock.Unlock()
r.syncStatus()
}
first := true
OUTER:
for {
if !first {
select {
case <-r.destroyCh:
return
case newAlloc, ok := <-l.Ch:
if !ok {
return
}
alloc = newAlloc
if alloc.DeploymentID == "" {
continue OUTER
}
r.logger.Printf("[TRACE] client.alloc_watcher: new alloc version for %q", alloc.ID)
case <-deadline.C:
// We have exceeded our deadline without being healthy.
setHealth(false)
return
case <-healthyTimer.C:
r.logger.Printf("[TRACE] client.alloc_watcher: alloc %q is healthy", alloc.ID)
setHealth(true)
}
}
first = false
// If the alloc is being stopped by the server just exit
switch alloc.DesiredStatus {
case structs.AllocDesiredStatusStop, structs.AllocDesiredStatusEvict:
r.logger.Printf("[TRACE] client.alloc_watcher: desired status terminal for alloc %q", alloc.ID)
return
}
// If the task is dead or has restarted, fail
for _, tstate := range alloc.TaskStates {
if tstate.Failed || !tstate.FinishedAt.IsZero() || tstate.Restarts != 0 {
r.logger.Printf("[TRACE] client.alloc_watcher: setting health to false for alloc %q", alloc.ID)
setHealth(false)
return
}
}
// Determine if the allocation is healthy
for task, tstate := range alloc.TaskStates {
if tstate.State != structs.TaskStateRunning {
r.logger.Printf("[TRACE] client.alloc_watcher: continuing since task %q hasn't started for alloc %q", task, alloc.ID)
continue OUTER
}
if tstate.StartedAt.After(latestHealthyTime) {
latestHealthyTime = tstate.StartedAt
}
}
// If we are already healthy we don't set the timer
healthyThreshold := latestHealthyTime.Add(u.MinHealthyTime)
if time.Now().After(healthyThreshold) {
continue OUTER
}
// Start the time til we are healthy
if !healthyTimer.Stop() {
select {
case <-healthyTimer.C:
default:
}
}
d := time.Until(healthyThreshold)
healthyTimer.Reset(d)
r.logger.Printf("[TRACE] client.alloc_watcher: setting healthy timer to %v for alloc %q", d, alloc.ID)
}
}

View File

@@ -1295,6 +1295,7 @@ func (c *Client) updateAllocStatus(alloc *structs.Allocation) {
stripped.TaskStates = alloc.TaskStates
stripped.ClientStatus = alloc.ClientStatus
stripped.ClientDescription = alloc.ClientDescription
stripped.DeploymentStatus = alloc.DeploymentStatus
select {
case c.allocUpdates <- stripped:

81
client/structs/funcs.go Normal file
View File

@@ -0,0 +1,81 @@
package structs
import (
"sync"
"github.com/hashicorp/nomad/nomad/structs"
)
// AllocBroadcaster implements an allocation broadcast channel.
// The zero value is a usable unbuffered channel.
type AllocBroadcaster struct {
m sync.Mutex
listeners map[int]chan<- *structs.Allocation // lazy init
nextId int
capacity int
closed bool
}
// NewAllocBroadcaster returns a new AllocBroadcaster with the given capacity (0 means unbuffered).
func NewAllocBroadcaster(n int) *AllocBroadcaster {
return &AllocBroadcaster{capacity: n}
}
// AllocListener implements a listening endpoint for an allocation broadcast channel.
type AllocListener struct {
// Ch receives the broadcast messages.
Ch <-chan *structs.Allocation
b *AllocBroadcaster
id int
}
// Send broadcasts a message to the channel.
// Sending on a closed channel causes a runtime panic.
func (b *AllocBroadcaster) Send(v *structs.Allocation) {
b.m.Lock()
defer b.m.Unlock()
if b.closed {
return
}
for _, l := range b.listeners {
select {
case l <- v:
default:
}
}
}
// Close closes the channel, disabling the sending of further messages.
func (b *AllocBroadcaster) Close() {
b.m.Lock()
defer b.m.Unlock()
b.closed = true
for _, l := range b.listeners {
close(l)
}
}
// Listen returns a Listener for the broadcast channel.
func (b *AllocBroadcaster) Listen() *AllocListener {
b.m.Lock()
defer b.m.Unlock()
if b.listeners == nil {
b.listeners = make(map[int]chan<- *structs.Allocation)
}
for b.listeners[b.nextId] != nil {
b.nextId++
}
ch := make(chan *structs.Allocation, b.capacity)
if b.closed {
close(ch)
}
b.listeners[b.nextId] = ch
return &AllocListener{ch, b, b.nextId}
}
// Close closes the Listener, disabling the receival of further messages.
func (l *AllocListener) Close() {
l.b.m.Lock()
defer l.b.m.Unlock()
delete(l.b.listeners, l.id)
}

View File

@@ -1434,10 +1434,16 @@ func (s *StateStore) nestedUpdateAllocFromClient(txn *memdb.Txn, index uint64, a
copyAlloc.ClientStatus = alloc.ClientStatus
copyAlloc.ClientDescription = alloc.ClientDescription
copyAlloc.TaskStates = alloc.TaskStates
copyAlloc.DeploymentStatus = alloc.DeploymentStatus
// Update the modify index
copyAlloc.ModifyIndex = index
// TODO TEST
if err := s.updateDeploymentWithAlloc(index, copyAlloc, exist, txn); err != nil {
return fmt.Errorf("error updating deployment: %v", err)
}
if err := s.updateSummaryWithAlloc(index, copyAlloc, exist, txn); err != nil {
return fmt.Errorf("error updating job summary: %v", err)
}

View File

@@ -3262,6 +3262,9 @@ type TaskState struct {
// Failed marks a task as having failed
Failed bool
// Restarts is the number of times the task has restarted
Restarts uint64
// StartedAt is the time the task is started. It is updated each time the
// task starts
StartedAt time.Time
@@ -3279,10 +3282,7 @@ func (ts *TaskState) Copy() *TaskState {
return nil
}
copy := new(TaskState)
copy.State = ts.State
copy.Failed = ts.Failed
copy.StartedAt = ts.StartedAt
copy.FinishedAt = ts.FinishedAt
*copy = *ts
if ts.Events != nil {
copy.Events = make([]*TaskEvent, len(ts.Events))