diff --git a/.changelog/25946.txt b/.changelog/25946.txt new file mode 100644 index 000000000..7fe50e982 --- /dev/null +++ b/.changelog/25946.txt @@ -0,0 +1,3 @@ +```release-note:bug +client: Fixed a bug where disconnect.stop_on_client_after timeouts were extended or ignored +``` diff --git a/client/heartbeatstop.go b/client/heartbeatstop.go index de3856786..b41e283e6 100644 --- a/client/heartbeatstop.go +++ b/client/heartbeatstop.go @@ -10,18 +10,19 @@ import ( hclog "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/structs" ) type heartbeatStop struct { - lastOk time.Time - startupGrace time.Time - allocInterval map[string]time.Duration - allocHookCh chan *structs.Allocation - getRunner func(string) (interfaces.AllocRunner, error) - logger hclog.InterceptLogger - shutdownCh chan struct{} - lock *sync.RWMutex + lastOk time.Time + startupGrace time.Time + allocHookCh chan *structs.Allocation + heartbeatCh chan struct{} + getRunner func(string) (interfaces.AllocRunner, error) + logger hclog.InterceptLogger + shutdownCh chan struct{} + lock *sync.RWMutex } func newHeartbeatStop( @@ -31,13 +32,13 @@ func newHeartbeatStop( shutdownCh chan struct{}) *heartbeatStop { h := &heartbeatStop{ - startupGrace: time.Now().Add(timeout), - allocInterval: make(map[string]time.Duration), - allocHookCh: make(chan *structs.Allocation), - getRunner: getRunner, - logger: logger, - shutdownCh: shutdownCh, - lock: &sync.RWMutex{}, + startupGrace: time.Now().Add(timeout), + allocHookCh: make(chan *structs.Allocation, 10), + heartbeatCh: make(chan struct{}, 1), + getRunner: getRunner, + logger: logger, + shutdownCh: shutdownCh, + lock: &sync.RWMutex{}, } return h @@ -46,8 +47,7 @@ func newHeartbeatStop( // allocHook is called after (re)storing a new AllocRunner in the client. It registers the // allocation to be stopped if the taskgroup is configured appropriately func (h *heartbeatStop) allocHook(alloc *structs.Allocation) { - tg := allocTaskGroup(alloc) - if tg.GetDisconnectStopTimeout() != nil { + if _, ok := getDisconnectStopTimeout(alloc); ok { h.allocHookCh <- alloc } } @@ -55,10 +55,8 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) { // shouldStop is called on a restored alloc to determine if lastOk is sufficiently in the // past that it should be prevented from restarting func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool { - tg := allocTaskGroup(alloc) - timeout := tg.GetDisconnectStopTimeout() - if timeout != nil { - return h.shouldStopAfter(time.Now(), *timeout) + if timeout, ok := getDisconnectStopTimeout(alloc); ok { + return h.shouldStopAfter(time.Now(), timeout) } return false } @@ -77,63 +75,68 @@ func (h *heartbeatStop) watch() { // If we never manage to successfully contact the server, we want to stop our allocs // after duration + start time h.setLastOk(time.Now()) - stop := make(chan string, 1) - var now time.Time - var interval time.Duration - checkAllocs := false + allocIntervals := map[string]time.Duration{} + + timer, stopTimer := helper.NewStoppedTimer() + defer stopTimer() for { - // minimize the interval - interval = 5 * time.Second - for _, t := range h.allocInterval { - if t < interval { + // we want to fire the ticker only once the shortest + // stop_on_client_after interval has expired. we'll reset the ticker on + // every heartbeat and every time a new alloc appears + var interval time.Duration + for _, t := range allocIntervals { + if t < interval || interval == 0 { interval = t } } - - checkAllocs = false - timeout := time.After(interval) + if interval != 0 { + timer.Reset(interval) + } else { + timer.Stop() + } select { - case allocID := <-stop: - if err := h.stopAlloc(allocID); err != nil { - h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err) - continue - } - delete(h.allocInterval, allocID) - - case alloc := <-h.allocHookCh: - tg := allocTaskGroup(alloc) - timeout := tg.GetDisconnectStopTimeout() - if timeout != nil { - h.allocInterval[alloc.ID] = *timeout - } - - case <-timeout: - checkAllocs = true + case <-h.heartbeatCh: + continue case <-h.shutdownCh: return - } - if !checkAllocs { - continue - } - - now = time.Now() - for allocID, d := range h.allocInterval { - if h.shouldStopAfter(now, d) { - stop <- allocID + case alloc := <-h.allocHookCh: + // receiving a new alloc implies we're still connected, so we'll go + // back to the top to reset the interval + if timeout, ok := getDisconnectStopTimeout(alloc); ok { + allocIntervals[alloc.ID] = timeout } + + case now := <-timer.C: + for allocID, d := range allocIntervals { + if h.shouldStopAfter(now, d) { + if err := h.stopAlloc(allocID); err != nil { + h.logger.Warn("error stopping on heartbeat timeout", + "alloc", allocID, "error", err) + continue + } + delete(allocIntervals, allocID) + } + } + } } } -// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk +// setLastOk sets the last known good heartbeat time to the current time func (h *heartbeatStop) setLastOk(t time.Time) { h.lock.Lock() defer h.lock.Unlock() h.lastOk = t + select { + case h.heartbeatCh <- struct{}{}: + default: + // if the channel is full then the watch loop has a heartbeat it needs + // to dequeue to reset its timer anyways, so just drop this one + } } func (h *heartbeatStop) getLastOk() time.Time { @@ -155,11 +158,17 @@ func (h *heartbeatStop) stopAlloc(allocID string) error { return nil } -func allocTaskGroup(alloc *structs.Allocation) *structs.TaskGroup { +// getDisconnectStopTimeout is a helper that gets the alloc's StopOnClientAfter +// timeout and handles the possible nil pointers safely +func getDisconnectStopTimeout(alloc *structs.Allocation) (time.Duration, bool) { for _, tg := range alloc.Job.TaskGroups { if tg.Name == alloc.TaskGroup { - return tg + timeout := tg.GetDisconnectStopTimeout() + if timeout != nil { + return *timeout, true + } + break } } - return nil + return 0, false } diff --git a/client/heartbeatstop_test.go b/client/heartbeatstop_test.go index fc5779f93..f6a571648 100644 --- a/client/heartbeatstop_test.go +++ b/client/heartbeatstop_test.go @@ -4,32 +4,38 @@ package client import ( + "errors" + "sync" "testing" "time" "github.com/hashicorp/nomad/ci" - "github.com/hashicorp/nomad/client/config" + "github.com/hashicorp/nomad/client/allocrunner/interfaces" + "github.com/hashicorp/nomad/helper/pointer" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/structs" - "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" ) -func TestHeartbeatStop_allocHook(t *testing.T) { +func TestHeartbeatStop(t *testing.T) { ci.Parallel(t) - server, _, cleanupS1 := testServer(t, nil) - defer cleanupS1() - testutil.WaitForLeader(t, server.RPC) + shutdownCh := make(chan struct{}) + t.Cleanup(func() { close(shutdownCh) }) - client, cleanupC1 := TestClient(t, func(c *config.Config) { - c.RPCHandler = server - }) - defer cleanupC1() + destroyers := map[string]*mockAllocRunnerDestroyer{} + + stopper := newHeartbeatStop(func(id string) (interfaces.AllocRunner, error) { + return destroyers[id], nil + }, + time.Hour, // start grace, ignored in this test + testlog.HCLogger(t), + shutdownCh) // an allocation, with a tiny lease - d := 1 * time.Microsecond - alloc := &structs.Allocation{ + alloc1 := &structs.Allocation{ ID: uuid.Generate(), TaskGroup: "foo", Job: &structs.Job{ @@ -37,37 +43,100 @@ func TestHeartbeatStop_allocHook(t *testing.T) { { Name: "foo", Disconnect: &structs.DisconnectStrategy{ - StopOnClientAfter: &d, + StopOnClientAfter: pointer.Of(time.Microsecond), }, }, }, }, - Resources: &structs.Resources{ - CPU: 100, - MemoryMB: 100, - DiskMB: 0, - }, } - // alloc added to heartbeatStop.allocs - err := client.addAlloc(alloc, "") - must.NoError(t, err) - testutil.WaitForResult(func() (bool, error) { - client.heartbeatLock.Lock() - _, ok := client.heartbeatStop.allocInterval[alloc.ID] - client.heartbeatLock.Unlock() - return ok, nil - }, func(err error) { - must.NoError(t, err) - }) + // an alloc with a longer lease + alloc2 := alloc1.Copy() + alloc2.ID = uuid.Generate() + alloc2.Job.TaskGroups[0].Disconnect.StopOnClientAfter = pointer.Of(500 * time.Millisecond) - // the tiny lease causes the watch loop to destroy it - testutil.WaitForResult(func() (bool, error) { - _, ok := client.heartbeatStop.allocInterval[alloc.ID] - return !ok, nil - }, func(err error) { - must.NoError(t, err) - }) + // an alloc with no disconnect config + alloc3 := alloc1.Copy() + alloc3.ID = uuid.Generate() + alloc3.Job.TaskGroups[0].Disconnect = nil - must.Nil(t, client.allocs[alloc.ID]) + destroyers[alloc1.ID] = &mockAllocRunnerDestroyer{} + destroyers[alloc2.ID] = &mockAllocRunnerDestroyer{} + destroyers[alloc3.ID] = &mockAllocRunnerDestroyer{} + + go stopper.watch() + stopper.allocHook(alloc1) + stopper.allocHook(alloc2) + stopper.allocHook(alloc3) + + must.Wait(t, wait.InitialSuccess( + wait.Timeout(time.Second), + wait.Gap(10*time.Millisecond), + wait.ErrorFunc(func() error { + if destroyers[alloc1.ID].checkCalls() != 1 { + return errors.New("first alloc was not destroyed as expected") + } + if destroyers[alloc2.ID].checkCalls() != 0 { + return errors.New("second alloc was unexpectedly destroyed") + } + if destroyers[alloc3.ID].checkCalls() != 0 { + return errors.New("third alloc should never be destroyed") + } + return nil + }))) + + // send a heartbeat and make sure nothing changes + stopper.setLastOk(time.Now()) + + must.Wait(t, wait.ContinualSuccess( + wait.Timeout(200*time.Millisecond), + wait.Gap(10*time.Millisecond), + wait.ErrorFunc(func() error { + if destroyers[alloc1.ID].checkCalls() != 1 { + return errors.New("first alloc should no longer be tracked") + } + if destroyers[alloc2.ID].checkCalls() != 0 { + return errors.New("second alloc was unexpectedly destroyed") + } + if destroyers[alloc3.ID].checkCalls() != 0 { + return errors.New("third alloc should never be destroyed") + } + return nil + }))) + + // skip the next heartbeat + + must.Wait(t, wait.InitialSuccess( + wait.Timeout(1*time.Second), + wait.Gap(10*time.Millisecond), + wait.ErrorFunc(func() error { + if destroyers[alloc1.ID].checkCalls() != 1 { + return errors.New("first alloc should no longer be tracked") + } + if destroyers[alloc2.ID].checkCalls() != 1 { + return errors.New("second alloc should have been destroyed") + } + if destroyers[alloc3.ID].checkCalls() != 0 { + return errors.New("third alloc should never be destroyed") + } + return nil + }))) +} + +type mockAllocRunnerDestroyer struct { + callsLock sync.Mutex + calls int + interfaces.AllocRunner +} + +func (ar *mockAllocRunnerDestroyer) checkCalls() int { + ar.callsLock.Lock() + defer ar.callsLock.Unlock() + return ar.calls +} + +func (ar *mockAllocRunnerDestroyer) Destroy() { + ar.callsLock.Lock() + defer ar.callsLock.Unlock() + ar.calls++ }