diff --git a/client/allocrunnerv2/health_hook.go b/client/allocrunnerv2/health_hook.go index 1b51fdbb3..9aa5e2754 100644 --- a/client/allocrunnerv2/health_hook.go +++ b/client/allocrunnerv2/health_hook.go @@ -14,31 +14,6 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) -// watchSema acts as a semaphore to allow one watcher at a time. -type watchSema struct { - ch chan int -} - -func newWatchSema() *watchSema { - return &watchSema{ch: make(chan int, 1)} -} - -// acquire the semaphore. -func (w *watchSema) acquire() { - w.ch <- 1 -} - -// release the semaphore. -func (w *watchSema) release() { - <-w.ch -} - -// wait blocks until the semaphore is free to be acquired. -func (w *watchSema) wait() { - w.acquire() - w.release() -} - // healthMutator is able to set/clear alloc health. type healthSetter interface { // Set health via the mutator @@ -65,9 +40,10 @@ type allocHealthWatcherHook struct { // Update and synchronous hooks. hookLock sync.Mutex - // watchLock is acquired by the health watching/setting goroutine to - // ensure only one health watching goroutine is running at a time. - watchLock *watchSema + // watchDone is created before calling watchHealth and is closed when + // watchHealth exits. Must be passed into watchHealth to avoid races. + // Initialized already closed as Update may be called before Prerun. + watchDone chan struct{} // ranOnce is set once Prerun or Update have run at least once. This // prevents Prerun from running if an Update has already been @@ -97,13 +73,17 @@ func newAllocHealthWatcherHook(logger log.Logger, alloc *structs.Allocation, hs return noopAllocHealthWatcherHook{} } + // Initialize watchDone with a closed chan in case Update runs before Prerun + closedDone := make(chan struct{}) + close(closedDone) + h := &allocHealthWatcherHook{ alloc: alloc, cancelFn: func() {}, // initialize to prevent nil func panics + watchDone: closedDone, consul: consul, healthSetter: hs, listener: listener, - watchLock: newWatchSema(), } h.logger = logger.Named(h.Name()) @@ -151,7 +131,10 @@ func (h *allocHealthWatcherHook) init() error { tracker := allochealth.NewTracker(ctx, h.logger, h.alloc, h.listener, h.consul, minHealthyTime, useChecks) tracker.Start() - go h.watchHealth(ctx, tracker) + + // Create a new done chan and start watching for health updates + h.watchDone = make(chan struct{}) + go h.watchHealth(ctx, tracker, h.watchDone) return nil } @@ -179,7 +162,7 @@ func (h *allocHealthWatcherHook) Update(req *interfaces.RunnerUpdateRequest) err h.cancelFn() // Wait until the watcher exits - h.watchLock.wait() + <-h.watchDone // Deployment has changed, reset status if req.Alloc.DeploymentID != h.alloc.DeploymentID { @@ -200,7 +183,7 @@ func (h *allocHealthWatcherHook) Destroy() error { h.listener.Close() // Wait until the watcher exits - h.watchLock.wait() + <-h.watchDone return nil } @@ -208,9 +191,8 @@ func (h *allocHealthWatcherHook) Destroy() error { // watchHealth watches alloc health until it is set, the alloc is stopped, or // the context is canceled. watchHealth will be canceled and restarted on // Updates so calls are serialized with a lock. -func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker) { - h.watchLock.acquire() - defer h.watchLock.release() +func (h *allocHealthWatcherHook) watchHealth(ctx context.Context, tracker *allochealth.Tracker, done chan<- struct{}) { + defer close(done) select { case <-ctx.Done(): diff --git a/client/allocrunnerv2/health_hook_test.go b/client/allocrunnerv2/health_hook_test.go index 42b2d4ac5..db03a0815 100644 --- a/client/allocrunnerv2/health_hook_test.go +++ b/client/allocrunnerv2/health_hook_test.go @@ -6,12 +6,16 @@ import ( "testing" "time" + consulapi "github.com/hashicorp/consul/api" "github.com/hashicorp/nomad/client/allocrunnerv2/interfaces" "github.com/hashicorp/nomad/client/consul" cstructs "github.com/hashicorp/nomad/client/structs" + agentconsul "github.com/hashicorp/nomad/command/agent/consul" "github.com/hashicorp/nomad/helper/testlog" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -20,6 +24,12 @@ var _ interfaces.RunnerPrerunHook = (*allocHealthWatcherHook)(nil) var _ interfaces.RunnerUpdateHook = (*allocHealthWatcherHook)(nil) var _ interfaces.RunnerDestroyHook = (*allocHealthWatcherHook)(nil) +// allocHealth is emitted to a chan whenever SetHealth is called +type allocHealth struct { + healthy bool + taskEvents map[string]*structs.TaskEvent +} + // mockHealthSetter implements healthSetter that stores health internally type mockHealthSetter struct { setCalls int @@ -28,6 +38,17 @@ type mockHealthSetter struct { isDeploy *bool taskEvents map[string]*structs.TaskEvent mu sync.Mutex + + healthCh chan allocHealth +} + +// newMockHealthSetter returns a mock HealthSetter that emits all SetHealth +// calls on a buffered chan. Callers who do need need notifications of health +// changes may just create the struct directly. +func newMockHealthSetter() *mockHealthSetter { + return &mockHealthSetter{ + healthCh: make(chan allocHealth, 1), + } } func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[string]*structs.TaskEvent) { @@ -38,6 +59,10 @@ func (m *mockHealthSetter) SetHealth(healthy, isDeploy bool, taskEvents map[stri m.healthy = &healthy m.isDeploy = &isDeploy m.taskEvents = taskEvents + + if m.healthCh != nil { + m.healthCh <- allocHealth{healthy, taskEvents} + } } func (m *mockHealthSetter) ClearHealth() { @@ -50,6 +75,7 @@ func (m *mockHealthSetter) ClearHealth() { m.taskEvents = nil } +// TestHealthHook_PrerunDestroy asserts a health hook does not error if it is run and destroyed. func TestHealthHook_PrerunDestroy(t *testing.T) { t.Parallel() require := require.New(t) @@ -73,10 +99,194 @@ func TestHealthHook_PrerunDestroy(t *testing.T) { // Prerun require.NoError(prerunh.Prerun(context.Background())) + // Assert isDeploy is false (other tests peek at isDeploy to determine + // if an Update applied) + ahw := h.(*allocHealthWatcherHook) + ahw.hookLock.Lock() + assert.False(t, ahw.isDeploy) + ahw.hookLock.Unlock() + // Destroy require.NoError(destroyh.Destroy()) } +// TestHealthHook_PrerunUpdateDestroy asserts Updates may be applied concurrently. +func TestHealthHook_PrerunUpdateDestroy(t *testing.T) { + t.Parallel() + require := require.New(t) + + alloc := mock.Alloc() + + b := cstructs.NewAllocBroadcaster() + defer b.Close() + + consul := consul.NewMockConsulServiceClient(t) + hs := &mockHealthSetter{} + + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) + + // Prerun + require.NoError(h.Prerun(context.Background())) + + // Update multiple times in a goroutine to mimic Client behavior + // (Updates are concurrent with alloc runner but are applied serially). + errs := make(chan error, 2) + go func() { + defer close(errs) + for i := 0; i < cap(errs); i++ { + alloc.AllocModifyIndex++ + errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc.Copy()}) + } + }() + + for err := range errs { + assert.NoError(t, err) + } + + // Destroy + require.NoError(h.Destroy()) +} + +// TestHealthHook_UpdatePrerunDestroy asserts that a hook may have Update +// called before Prerun. +func TestHealthHook_UpdatePrerunDestroy(t *testing.T) { + t.Parallel() + require := require.New(t) + + alloc := mock.Alloc() + + b := cstructs.NewAllocBroadcaster() + defer b.Close() + + consul := consul.NewMockConsulServiceClient(t) + hs := &mockHealthSetter{} + + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) + + // Set a DeploymentID to cause ClearHealth to be called + alloc.DeploymentID = uuid.Generate() + + // Update in a goroutine to mimic Client behavior (Updates are + // concurrent with alloc runner). + errs := make(chan error, 1) + go func(alloc *structs.Allocation) { + errs <- h.Update(&interfaces.RunnerUpdateRequest{Alloc: alloc}) + close(errs) + }(alloc.Copy()) + + for err := range errs { + assert.NoError(t, err) + } + + // Prerun should be a noop + require.NoError(h.Prerun(context.Background())) + + // Assert that the Update took affect by isDeploy being true + h.hookLock.Lock() + assert.True(t, h.isDeploy) + h.hookLock.Unlock() + + // Destroy + require.NoError(h.Destroy()) +} + +// TestHealthHook_Destroy asserts that a hook may have only Destroy called. +func TestHealthHook_Destroy(t *testing.T) { + t.Parallel() + require := require.New(t) + + b := cstructs.NewAllocBroadcaster() + defer b.Close() + + consul := consul.NewMockConsulServiceClient(t) + hs := &mockHealthSetter{} + + h := newAllocHealthWatcherHook(testlog.HCLogger(t), mock.Alloc(), hs, b.Listen(), consul).(*allocHealthWatcherHook) + + // Destroy + require.NoError(h.Destroy()) +} + +// TestHealthHook_SetHealth asserts SetHealth is called when health status is +// set. Uses task state and health checks. +func TestHealthHook_SetHealth(t *testing.T) { + t.Parallel() + require := require.New(t) + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Migrate.MinHealthyTime = 1 // let's speed things up + task := alloc.Job.TaskGroups[0].Tasks[0] + + // Synthesize running alloc and tasks + alloc.ClientStatus = structs.AllocClientStatusRunning + alloc.TaskStates = map[string]*structs.TaskState{ + task.Name: { + State: structs.TaskStateRunning, + StartedAt: time.Now(), + }, + } + + // Make Consul response + check := &consulapi.AgentCheck{ + Name: task.Services[0].Checks[0].Name, + Status: consulapi.HealthPassing, + } + taskRegs := map[string]*agentconsul.TaskRegistration{ + task.Name: { + Services: map[string]*agentconsul.ServiceRegistration{ + task.Services[0].Name: { + Service: &consulapi.AgentService{ + ID: "foo", + Service: task.Services[0].Name, + }, + Checks: []*consulapi.AgentCheck{check}, + }, + }, + }, + } + + b := cstructs.NewAllocBroadcaster() + defer b.Close() + + // Don't reply on the first call + called := false + consul := consul.NewMockConsulServiceClient(t) + consul.AllocRegistrationsFn = func(string) (*agentconsul.AllocRegistration, error) { + if !called { + called = true + return nil, nil + } + + reg := &agentconsul.AllocRegistration{ + Tasks: taskRegs, + } + + return reg, nil + } + + hs := newMockHealthSetter() + + h := newAllocHealthWatcherHook(testlog.HCLogger(t), alloc.Copy(), hs, b.Listen(), consul).(*allocHealthWatcherHook) + + // Prerun + require.NoError(h.Prerun(context.Background())) + + // Wait for health to be set (healthy) + select { + case <-time.After(5 * time.Second): + t.Fatalf("timeout waiting for health to be set") + case health := <-hs.healthCh: + require.True(health.healthy) + + // Healthy allocs shouldn't emit task events + ev := health.taskEvents[task.Name] + require.Nilf(ev, "%#v", health.taskEvents) + } + + // Destroy + require.NoError(h.Destroy()) +} + // TestHealthHook_SystemNoop asserts that system jobs return the noop tracker. func TestHealthHook_SystemNoop(t *testing.T) { t.Parallel() @@ -106,65 +316,3 @@ func TestHealthHook_BatchNoop(t *testing.T) { _, ok := h.(noopAllocHealthWatcherHook) require.True(t, ok) } - -// TestHealthHook_WatchSema asserts only one caller can acquire the watchSema -// lock at once and all other callers are blocked. -func TestHealthHook_WatchSema(t *testing.T) { - t.Parallel() - - s := newWatchSema() - - s.acquire() - - // Second acquire should block - acq2 := make(chan struct{}) - go func() { - s.acquire() - close(acq2) - }() - - select { - case <-acq2: - t.Fatalf("should not have been able to acquire lock") - case <-time.After(33 * time.Millisecond): - // Ok! It's blocking! - } - - // Release and assert second acquire is now active - s.release() - - select { - case <-acq2: - // Ok! Second acquire unblocked! - case <-time.After(33 * time.Millisecond): - t.Fatalf("second acquire should not have blocked") - } - - // wait() should block until lock is released - waitCh := make(chan struct{}) - go func() { - s.wait() - close(waitCh) - }() - - select { - case <-waitCh: - t.Fatalf("should not have been able to acquire lock") - case <-time.After(33 * time.Millisecond): - // Ok! It's blocking! - } - - // Release and assert wait() finished - s.release() - - select { - case <-waitCh: - // Ok! wait() unblocked! - case <-time.After(33 * time.Millisecond): - t.Fatalf("wait should not have blocked") - } - - // Assert wait() did not acquire the lock - s.acquire() - s.release() -}