health_hook: simplify locking; test thoroughly

Use doneCh like @dadgar suggested in the original PR.

Thoroughly test hook as concurrent Update calls make for a tricky
concurrency problem.
This commit is contained in:
Michael Schurter
2018-09-13 17:25:01 -07:00
parent 9fd0ba1df6
commit b7d3f5948e
2 changed files with 227 additions and 97 deletions

View File

@@ -14,31 +14,6 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
// watchSema acts as a semaphore to allow one watcher at a time.
type watchSema struct {
ch chan int
}
func newWatchSema() *watchSema {
return &watchSema{ch: make(chan int, 1)}
}
// acquire the semaphore.
func (w *watchSema) acquire() {
w.ch <- 1
}
// release the semaphore.
func (w *watchSema) release() {
<-w.ch
}
// wait blocks until the semaphore is free to be acquired.
func (w *watchSema) wait() {
w.acquire()
w.release()
}
// healthMutator is able to set/clear alloc health.
type healthSetter interface {
// Set health via the mutator
@@ -65,9 +40,10 @@ type allocHealthWatcherHook struct {
// Update and synchronous hooks.
hookLock sync.Mutex
// watchLock is acquired by the health watching/setting goroutine to
// ensure only one health watching goroutine is running at a time.
watchLock *watchSema
// watchDone is created before calling watchHealth and is closed when
// watchHealth exits. Must be passed into watchHealth to avoid races.
// Initialized already closed as Update may be called before Prerun.
watchDone chan struct{}
// ranOnce is set once Prerun or Update have run at least once. This
// prevents Prerun from running if an Update has already been
@@ -97,13 +73,17 @@ func newAllocHealthWatcherHook(logger log.Logger, alloc *structs.Allocation, hs
return noopAllocHealthWatcherHook{}
}
// Initialize watchDone with a closed chan in case Update runs before Prerun
closedDone := make(chan struct{})
close(closedDone)
h := &allocHealthWatcherHook{
alloc: alloc,
cancelFn: func() {}, // initialize to prevent nil func panics
watchDone: closedDone,
consul: consul,
healthSetter: hs,
listener: listener,
watchLock: newWatchSema(),
}
h.logger = logger.Named(h.Name())
@@ -151,7 +131,10 @@ func (h *allocHealthWatcherHook) init() error {
tracker := allochealth.NewTracker(ctx, h.logger, h.alloc,
h.listener, h.consul, minHealthyTime, useChecks)
tracker.Start()
go h.watchHealth(ctx, tracker)
// Create a new done chan and start watching for health updates
h.watchDone = make(chan struct{})
go h.watchHealth(ctx, tracker, h.watchDone)
return nil
}
@@ -179,7 +162,7 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err
h.cancelFn()
// Wait until the watcher exits
h.watchLock.wait()
<-h.watchDone
// Deployment has changed, reset status
if req.Alloc.DeploymentID != h.alloc.DeploymentID {
@@ -200,7 +183,7 @@ func (h *allocHealthWatcherHook) Destroy() error {
h.listener.Close()
// Wait until the watcher exits
h.watchLock.wait()
<-h.watchDone
return nil
}
@@ -208,9 +191,8 @@ func (h *allocHealthWatcherHook) Destroy() error {
// watchHealth watches alloc health until it is set, the alloc is stopped, or
// the context is canceled. watchHealth will be canceled and restarted on
// Updates so calls are serialized with a lock.
func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker) {
h.watchLock.acquire()
defer h.watchLock.release()
func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker, done chan<- struct{}) {
defer close(done)
select {
case <-ctx.Done():

View File

@@ -6,12 +6,16 @@ import (
"testing"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/nomad/client/allocrunnerv2/interfaces"
"github.com/hashicorp/nomad/client/consul"
cstructs "github.com/hashicorp/nomad/client/structs"
agentconsul "github.com/hashicorp/nomad/command/agent/consul"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -20,6 +24,12 @@ var _ interfaces.RunnerPrerunHook = (*allocHealthWatcherHook)(nil)
var _ interfaces.RunnerUpdateHook = (*allocHealthWatcherHook)(nil)
var _ interfaces.RunnerDestroyHook = (*allocHealthWatcherHook)(nil)
// allocHealth is emitted to a chan whenever SetHealth is called
type allocHealth struct {
healthy bool
taskEvents map[string]*structs.TaskEvent
}
// mockHealthSetter implements healthSetter that stores health internally
type mockHealthSetter struct {
setCalls int
@@ -28,6 +38,17 @@ type mockHealthSetter struct {
isDeploy *bool
taskEvents map[string]*structs.TaskEvent
mu sync.Mutex
healthCh chan allocHealth
}
// newMockHealthSetter returns a mock HealthSetter that emits all SetHealth
// calls on a buffered chan. Callers who do need need notifications of health
// changes may just create the struct directly.
func newMockHealthSetter() *mockHealthSetter {
return &mockHealthSetter{
healthCh: make(chan allocHealth, 1),
}
}
func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) {
@@ -38,6 +59,10 @@ func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[stri
m.healthy = &healthy
m.isDeploy = &isDeploy
m.taskEvents = taskEvents
if m.healthCh != nil {
m.healthCh <- allocHealth{healthy, taskEvents}
}
}
func (m *mockHealthSetter) ClearHealth() {
@@ -50,6 +75,7 @@ func (m *mockHealthSetter) ClearHealth() {
m.taskEvents = nil
}
// TestHealthHook_PrerunDestroy asserts a health hook does not error if it is run and destroyed.
func TestHealthHook_PrerunDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -73,10 +99,194 @@ func TestHealthHook_PrerunDestroy(t *testing.T) {
// Prerun
require.NoError(prerunh.Prerun(context.Background()))
// Assert isDeploy is false (other tests peek at isDeploy to determine
// if an Update applied)
ahw := h.(*allocHealthWatcherHook)
ahw.hookLock.Lock()
assert.False(t, ahw.isDeploy)
ahw.hookLock.Unlock()
// Destroy
require.NoError(destroyh.Destroy())
}
// TestHealthHook_PrerunUpdateDestroy asserts Updates may be applied concurrently.
func TestHealthHook_PrerunUpdateDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
b := cstructs.NewAllocBroadcaster()
defer b.Close()
consul := consul.NewMockConsulServiceClient(t)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Prerun
require.NoError(h.Prerun(context.Background()))
// Update multiple times in a goroutine to mimic Client behavior
// (Updates are concurrent with alloc runner but are applied serially).
errs := make(chan error, 2)
go func() {
defer close(errs)
for i := 0; i < cap(errs); i++ {
alloc.AllocModifyIndex++
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc.Copy()})
}
}()
for err := range errs {
assert.NoError(t, err)
}
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_UpdatePrerunDestroy asserts that a hook may have Update
// called before Prerun.
func TestHealthHook_UpdatePrerunDestroy(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
b := cstructs.NewAllocBroadcaster()
defer b.Close()
consul := consul.NewMockConsulServiceClient(t)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Set a DeploymentID to cause ClearHealth to be called
alloc.DeploymentID = uuid.Generate()
// Update in a goroutine to mimic Client behavior (Updates are
// concurrent with alloc runner).
errs := make(chan error, 1)
go func(alloc *structs.Allocation) {
errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc})
close(errs)
}(alloc.Copy())
for err := range errs {
assert.NoError(t, err)
}
// Prerun should be a noop
require.NoError(h.Prerun(context.Background()))
// Assert that the Update took affect by isDeploy being true
h.hookLock.Lock()
assert.True(t, h.isDeploy)
h.hookLock.Unlock()
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_Destroy asserts that a hook may have only Destroy called.
func TestHealthHook_Destroy(t *testing.T) {
t.Parallel()
require := require.New(t)
b := cstructs.NewAllocBroadcaster()
defer b.Close()
consul := consul.NewMockConsulServiceClient(t)
hs := &mockHealthSetter{}
h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_SetHealth asserts SetHealth is called when health status is
// set. Uses task state and health checks.
func TestHealthHook_SetHealth(t *testing.T) {
t.Parallel()
require := require.New(t)
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up
task := alloc.Job.TaskGroups[0].Tasks[0]
// Synthesize running alloc and tasks
alloc.ClientStatus = structs.AllocClientStatusRunning
alloc.TaskStates = map[string]*structs.TaskState{
task.Name: {
State: structs.TaskStateRunning,
StartedAt: time.Now(),
},
}
// Make Consul response
check := &consulapi.AgentCheck{
Name: task.Services[0].Checks[0].Name,
Status: consulapi.HealthPassing,
}
taskRegs := map[string]*agentconsul.TaskRegistration{
task.Name: {
Services: map[string]*agentconsul.ServiceRegistration{
task.Services[0].Name: {
Service: &consulapi.AgentService{
ID: "foo",
Service: task.Services[0].Name,
},
Checks: []*consulapi.AgentCheck{check},
},
},
},
}
b := cstructs.NewAllocBroadcaster()
defer b.Close()
// Don't reply on the first call
called := false
consul := consul.NewMockConsulServiceClient(t)
consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) {
if !called {
called = true
return nil, nil
}
reg := &agentconsul.AllocRegistration{
Tasks: taskRegs,
}
return reg, nil
}
hs := newMockHealthSetter()
h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook)
// Prerun
require.NoError(h.Prerun(context.Background()))
// Wait for health to be set (healthy)
select {
case <-time.After(5 * time.Second):
t.Fatalf("timeout waiting for health to be set")
case health := <-hs.healthCh:
require.True(health.healthy)
// Healthy allocs shouldn't emit task events
ev := health.taskEvents[task.Name]
require.Nilf(ev, "%#v", health.taskEvents)
}
// Destroy
require.NoError(h.Destroy())
}
// TestHealthHook_SystemNoop asserts that system jobs return the noop tracker.
func TestHealthHook_SystemNoop(t *testing.T) {
t.Parallel()
@@ -106,65 +316,3 @@ func TestHealthHook_BatchNoop(t *testing.T) {
_, ok := h.(noopAllocHealthWatcherHook)
require.True(t, ok)
}
// TestHealthHook_WatchSema asserts only one caller can acquire the watchSema
// lock at once and all other callers are blocked.
func TestHealthHook_WatchSema(t *testing.T) {
t.Parallel()
s := newWatchSema()
s.acquire()
// Second acquire should block
acq2 := make(chan struct{})
go func() {
s.acquire()
close(acq2)
}()
select {
case <-acq2:
t.Fatalf("should not have been able to acquire lock")
case <-time.After(33 * time.Millisecond):
// Ok! It's blocking!
}
// Release and assert second acquire is now active
s.release()
select {
case <-acq2:
// Ok! Second acquire unblocked!
case <-time.After(33 * time.Millisecond):
t.Fatalf("second acquire should not have blocked")
}
// wait() should block until lock is released
waitCh := make(chan struct{})
go func() {
s.wait()
close(waitCh)
}()
select {
case <-waitCh:
t.Fatalf("should not have been able to acquire lock")
case <-time.After(33 * time.Millisecond):
// Ok! It's blocking!
}
// Release and assert wait() finished
s.release()
select {
case <-waitCh:
// Ok! wait() unblocked!
case <-time.After(33 * time.Millisecond):
t.Fatalf("wait should not have blocked")
}
// Assert wait() did not acquire the lock
s.acquire()
s.release()
}