mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
prevent client deadlock and incorrect timing on stop_on_client_after (#25946)
The `disconnect.stop_on_client_after` feature is implemented as a loop on the client that's intended to wait on the shortest timeout of all the allocations on the node and then check whether the interval since the last heartbeat has been longer than the timeout. It uses a buffered channel of allocations written and read from the same goroutine to push "stops" from the timeout expiring to the next pass through the loop. Unfortunately if there are multiple allocations that need to be stopped in the same timeout event, or even if a previous event has not yet been dequeued, then sending on the channel will block and the entire goroutine deadlocks itself. While fixing this, I also discovered that the `stop_on_client_after` and heartbeat loops can synchronize in a pathological way that extends the `stop_on_client_after` window. If a heartbeat fails close to the beginning of the shortest `stop_on_client_after` window, the loop will end up waiting until almost 2x the intended wait period. While fixing both of those issues, I discovered that the existing tests had a bug such that we were asserting that an allocrunner was being destroyed when it had already exited. This commit includes the following: * Rework the watch loop so that we handle the stops in the same case as the timer expiration, rather than using a channel in the method scope. * Remove the alloc intervals map field from the struct and keep it in the method scope, in order to discourage writing racy tests that read its value. * Reset the timer whenever we receive a heartbeat, which forces the two intervals to synchronize correctly. * Minor refactoring of the disconnect timeout lookup to improve brevity. Fixes: https://github.com/hashicorp/nomad/issues/24679 Ref: https://hashicorp.atlassian.net/browse/NMD-407
This commit is contained in:
3
.changelog/25946.txt
Normal file
3
.changelog/25946.txt
Normal file
@@ -0,0 +1,3 @@
|
|||||||
|
```release-note:bug
|
||||||
|
client: Fixed a bug where disconnect.stop_on_client_after timeouts were extended or ignored
|
||||||
|
```
|
||||||
@@ -10,18 +10,19 @@ import (
|
|||||||
hclog "github.com/hashicorp/go-hclog"
|
hclog "github.com/hashicorp/go-hclog"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||||
|
"github.com/hashicorp/nomad/helper"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
type heartbeatStop struct {
|
type heartbeatStop struct {
|
||||||
lastOk time.Time
|
lastOk time.Time
|
||||||
startupGrace time.Time
|
startupGrace time.Time
|
||||||
allocInterval map[string]time.Duration
|
allocHookCh chan *structs.Allocation
|
||||||
allocHookCh chan *structs.Allocation
|
heartbeatCh chan struct{}
|
||||||
getRunner func(string) (interfaces.AllocRunner, error)
|
getRunner func(string) (interfaces.AllocRunner, error)
|
||||||
logger hclog.InterceptLogger
|
logger hclog.InterceptLogger
|
||||||
shutdownCh chan struct{}
|
shutdownCh chan struct{}
|
||||||
lock *sync.RWMutex
|
lock *sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
func newHeartbeatStop(
|
func newHeartbeatStop(
|
||||||
@@ -31,13 +32,13 @@ func newHeartbeatStop(
|
|||||||
shutdownCh chan struct{}) *heartbeatStop {
|
shutdownCh chan struct{}) *heartbeatStop {
|
||||||
|
|
||||||
h := &heartbeatStop{
|
h := &heartbeatStop{
|
||||||
startupGrace: time.Now().Add(timeout),
|
startupGrace: time.Now().Add(timeout),
|
||||||
allocInterval: make(map[string]time.Duration),
|
allocHookCh: make(chan *structs.Allocation, 10),
|
||||||
allocHookCh: make(chan *structs.Allocation),
|
heartbeatCh: make(chan struct{}, 1),
|
||||||
getRunner: getRunner,
|
getRunner: getRunner,
|
||||||
logger: logger,
|
logger: logger,
|
||||||
shutdownCh: shutdownCh,
|
shutdownCh: shutdownCh,
|
||||||
lock: &sync.RWMutex{},
|
lock: &sync.RWMutex{},
|
||||||
}
|
}
|
||||||
|
|
||||||
return h
|
return h
|
||||||
@@ -46,8 +47,7 @@ func newHeartbeatStop(
|
|||||||
// allocHook is called after (re)storing a new AllocRunner in the client. It registers the
|
// allocHook is called after (re)storing a new AllocRunner in the client. It registers the
|
||||||
// allocation to be stopped if the taskgroup is configured appropriately
|
// allocation to be stopped if the taskgroup is configured appropriately
|
||||||
func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
|
func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
|
||||||
tg := allocTaskGroup(alloc)
|
if _, ok := getDisconnectStopTimeout(alloc); ok {
|
||||||
if tg.GetDisconnectStopTimeout() != nil {
|
|
||||||
h.allocHookCh <- alloc
|
h.allocHookCh <- alloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -55,10 +55,8 @@ func (h *heartbeatStop) allocHook(alloc *structs.Allocation) {
|
|||||||
// shouldStop is called on a restored alloc to determine if lastOk is sufficiently in the
|
// shouldStop is called on a restored alloc to determine if lastOk is sufficiently in the
|
||||||
// past that it should be prevented from restarting
|
// past that it should be prevented from restarting
|
||||||
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
|
func (h *heartbeatStop) shouldStop(alloc *structs.Allocation) bool {
|
||||||
tg := allocTaskGroup(alloc)
|
if timeout, ok := getDisconnectStopTimeout(alloc); ok {
|
||||||
timeout := tg.GetDisconnectStopTimeout()
|
return h.shouldStopAfter(time.Now(), timeout)
|
||||||
if timeout != nil {
|
|
||||||
return h.shouldStopAfter(time.Now(), *timeout)
|
|
||||||
}
|
}
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
@@ -77,63 +75,68 @@ func (h *heartbeatStop) watch() {
|
|||||||
// If we never manage to successfully contact the server, we want to stop our allocs
|
// If we never manage to successfully contact the server, we want to stop our allocs
|
||||||
// after duration + start time
|
// after duration + start time
|
||||||
h.setLastOk(time.Now())
|
h.setLastOk(time.Now())
|
||||||
stop := make(chan string, 1)
|
allocIntervals := map[string]time.Duration{}
|
||||||
var now time.Time
|
|
||||||
var interval time.Duration
|
timer, stopTimer := helper.NewStoppedTimer()
|
||||||
checkAllocs := false
|
defer stopTimer()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// minimize the interval
|
// we want to fire the ticker only once the shortest
|
||||||
interval = 5 * time.Second
|
// stop_on_client_after interval has expired. we'll reset the ticker on
|
||||||
for _, t := range h.allocInterval {
|
// every heartbeat and every time a new alloc appears
|
||||||
if t < interval {
|
var interval time.Duration
|
||||||
|
for _, t := range allocIntervals {
|
||||||
|
if t < interval || interval == 0 {
|
||||||
interval = t
|
interval = t
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if interval != 0 {
|
||||||
checkAllocs = false
|
timer.Reset(interval)
|
||||||
timeout := time.After(interval)
|
} else {
|
||||||
|
timer.Stop()
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case allocID := <-stop:
|
case <-h.heartbeatCh:
|
||||||
if err := h.stopAlloc(allocID); err != nil {
|
continue
|
||||||
h.logger.Warn("error stopping on heartbeat timeout", "alloc", allocID, "error", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
delete(h.allocInterval, allocID)
|
|
||||||
|
|
||||||
case alloc := <-h.allocHookCh:
|
|
||||||
tg := allocTaskGroup(alloc)
|
|
||||||
timeout := tg.GetDisconnectStopTimeout()
|
|
||||||
if timeout != nil {
|
|
||||||
h.allocInterval[alloc.ID] = *timeout
|
|
||||||
}
|
|
||||||
|
|
||||||
case <-timeout:
|
|
||||||
checkAllocs = true
|
|
||||||
|
|
||||||
case <-h.shutdownCh:
|
case <-h.shutdownCh:
|
||||||
return
|
return
|
||||||
}
|
|
||||||
|
|
||||||
if !checkAllocs {
|
case alloc := <-h.allocHookCh:
|
||||||
continue
|
// receiving a new alloc implies we're still connected, so we'll go
|
||||||
}
|
// back to the top to reset the interval
|
||||||
|
if timeout, ok := getDisconnectStopTimeout(alloc); ok {
|
||||||
now = time.Now()
|
allocIntervals[alloc.ID] = timeout
|
||||||
for allocID, d := range h.allocInterval {
|
|
||||||
if h.shouldStopAfter(now, d) {
|
|
||||||
stop <- allocID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case now := <-timer.C:
|
||||||
|
for allocID, d := range allocIntervals {
|
||||||
|
if h.shouldStopAfter(now, d) {
|
||||||
|
if err := h.stopAlloc(allocID); err != nil {
|
||||||
|
h.logger.Warn("error stopping on heartbeat timeout",
|
||||||
|
"alloc", allocID, "error", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
delete(allocIntervals, allocID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// setLastOk sets the last known good heartbeat time to the current time, and persists that time to disk
|
// setLastOk sets the last known good heartbeat time to the current time
|
||||||
func (h *heartbeatStop) setLastOk(t time.Time) {
|
func (h *heartbeatStop) setLastOk(t time.Time) {
|
||||||
h.lock.Lock()
|
h.lock.Lock()
|
||||||
defer h.lock.Unlock()
|
defer h.lock.Unlock()
|
||||||
h.lastOk = t
|
h.lastOk = t
|
||||||
|
select {
|
||||||
|
case h.heartbeatCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// if the channel is full then the watch loop has a heartbeat it needs
|
||||||
|
// to dequeue to reset its timer anyways, so just drop this one
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *heartbeatStop) getLastOk() time.Time {
|
func (h *heartbeatStop) getLastOk() time.Time {
|
||||||
@@ -155,11 +158,17 @@ func (h *heartbeatStop) stopAlloc(allocID string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func allocTaskGroup(alloc *structs.Allocation) *structs.TaskGroup {
|
// getDisconnectStopTimeout is a helper that gets the alloc's StopOnClientAfter
|
||||||
|
// timeout and handles the possible nil pointers safely
|
||||||
|
func getDisconnectStopTimeout(alloc *structs.Allocation) (time.Duration, bool) {
|
||||||
for _, tg := range alloc.Job.TaskGroups {
|
for _, tg := range alloc.Job.TaskGroups {
|
||||||
if tg.Name == alloc.TaskGroup {
|
if tg.Name == alloc.TaskGroup {
|
||||||
return tg
|
timeout := tg.GetDisconnectStopTimeout()
|
||||||
|
if timeout != nil {
|
||||||
|
return *timeout, true
|
||||||
|
}
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return 0, false
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,32 +4,38 @@
|
|||||||
package client
|
package client
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"errors"
|
||||||
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/hashicorp/nomad/ci"
|
"github.com/hashicorp/nomad/ci"
|
||||||
"github.com/hashicorp/nomad/client/config"
|
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
|
||||||
|
"github.com/hashicorp/nomad/helper/pointer"
|
||||||
|
"github.com/hashicorp/nomad/helper/testlog"
|
||||||
"github.com/hashicorp/nomad/helper/uuid"
|
"github.com/hashicorp/nomad/helper/uuid"
|
||||||
"github.com/hashicorp/nomad/nomad/structs"
|
"github.com/hashicorp/nomad/nomad/structs"
|
||||||
"github.com/hashicorp/nomad/testutil"
|
|
||||||
"github.com/shoenig/test/must"
|
"github.com/shoenig/test/must"
|
||||||
|
"github.com/shoenig/test/wait"
|
||||||
)
|
)
|
||||||
|
|
||||||
func TestHeartbeatStop_allocHook(t *testing.T) {
|
func TestHeartbeatStop(t *testing.T) {
|
||||||
ci.Parallel(t)
|
ci.Parallel(t)
|
||||||
|
|
||||||
server, _, cleanupS1 := testServer(t, nil)
|
shutdownCh := make(chan struct{})
|
||||||
defer cleanupS1()
|
t.Cleanup(func() { close(shutdownCh) })
|
||||||
testutil.WaitForLeader(t, server.RPC)
|
|
||||||
|
|
||||||
client, cleanupC1 := TestClient(t, func(c *config.Config) {
|
destroyers := map[string]*mockAllocRunnerDestroyer{}
|
||||||
c.RPCHandler = server
|
|
||||||
})
|
stopper := newHeartbeatStop(func(id string) (interfaces.AllocRunner, error) {
|
||||||
defer cleanupC1()
|
return destroyers[id], nil
|
||||||
|
},
|
||||||
|
time.Hour, // start grace, ignored in this test
|
||||||
|
testlog.HCLogger(t),
|
||||||
|
shutdownCh)
|
||||||
|
|
||||||
// an allocation, with a tiny lease
|
// an allocation, with a tiny lease
|
||||||
d := 1 * time.Microsecond
|
alloc1 := &structs.Allocation{
|
||||||
alloc := &structs.Allocation{
|
|
||||||
ID: uuid.Generate(),
|
ID: uuid.Generate(),
|
||||||
TaskGroup: "foo",
|
TaskGroup: "foo",
|
||||||
Job: &structs.Job{
|
Job: &structs.Job{
|
||||||
@@ -37,37 +43,100 @@ func TestHeartbeatStop_allocHook(t *testing.T) {
|
|||||||
{
|
{
|
||||||
Name: "foo",
|
Name: "foo",
|
||||||
Disconnect: &structs.DisconnectStrategy{
|
Disconnect: &structs.DisconnectStrategy{
|
||||||
StopOnClientAfter: &d,
|
StopOnClientAfter: pointer.Of(time.Microsecond),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
Resources: &structs.Resources{
|
|
||||||
CPU: 100,
|
|
||||||
MemoryMB: 100,
|
|
||||||
DiskMB: 0,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// alloc added to heartbeatStop.allocs
|
// an alloc with a longer lease
|
||||||
err := client.addAlloc(alloc, "")
|
alloc2 := alloc1.Copy()
|
||||||
must.NoError(t, err)
|
alloc2.ID = uuid.Generate()
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
alloc2.Job.TaskGroups[0].Disconnect.StopOnClientAfter = pointer.Of(500 * time.Millisecond)
|
||||||
client.heartbeatLock.Lock()
|
|
||||||
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
|
|
||||||
client.heartbeatLock.Unlock()
|
|
||||||
return ok, nil
|
|
||||||
}, func(err error) {
|
|
||||||
must.NoError(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
// the tiny lease causes the watch loop to destroy it
|
// an alloc with no disconnect config
|
||||||
testutil.WaitForResult(func() (bool, error) {
|
alloc3 := alloc1.Copy()
|
||||||
_, ok := client.heartbeatStop.allocInterval[alloc.ID]
|
alloc3.ID = uuid.Generate()
|
||||||
return !ok, nil
|
alloc3.Job.TaskGroups[0].Disconnect = nil
|
||||||
}, func(err error) {
|
|
||||||
must.NoError(t, err)
|
|
||||||
})
|
|
||||||
|
|
||||||
must.Nil(t, client.allocs[alloc.ID])
|
destroyers[alloc1.ID] = &mockAllocRunnerDestroyer{}
|
||||||
|
destroyers[alloc2.ID] = &mockAllocRunnerDestroyer{}
|
||||||
|
destroyers[alloc3.ID] = &mockAllocRunnerDestroyer{}
|
||||||
|
|
||||||
|
go stopper.watch()
|
||||||
|
stopper.allocHook(alloc1)
|
||||||
|
stopper.allocHook(alloc2)
|
||||||
|
stopper.allocHook(alloc3)
|
||||||
|
|
||||||
|
must.Wait(t, wait.InitialSuccess(
|
||||||
|
wait.Timeout(time.Second),
|
||||||
|
wait.Gap(10*time.Millisecond),
|
||||||
|
wait.ErrorFunc(func() error {
|
||||||
|
if destroyers[alloc1.ID].checkCalls() != 1 {
|
||||||
|
return errors.New("first alloc was not destroyed as expected")
|
||||||
|
}
|
||||||
|
if destroyers[alloc2.ID].checkCalls() != 0 {
|
||||||
|
return errors.New("second alloc was unexpectedly destroyed")
|
||||||
|
}
|
||||||
|
if destroyers[alloc3.ID].checkCalls() != 0 {
|
||||||
|
return errors.New("third alloc should never be destroyed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})))
|
||||||
|
|
||||||
|
// send a heartbeat and make sure nothing changes
|
||||||
|
stopper.setLastOk(time.Now())
|
||||||
|
|
||||||
|
must.Wait(t, wait.ContinualSuccess(
|
||||||
|
wait.Timeout(200*time.Millisecond),
|
||||||
|
wait.Gap(10*time.Millisecond),
|
||||||
|
wait.ErrorFunc(func() error {
|
||||||
|
if destroyers[alloc1.ID].checkCalls() != 1 {
|
||||||
|
return errors.New("first alloc should no longer be tracked")
|
||||||
|
}
|
||||||
|
if destroyers[alloc2.ID].checkCalls() != 0 {
|
||||||
|
return errors.New("second alloc was unexpectedly destroyed")
|
||||||
|
}
|
||||||
|
if destroyers[alloc3.ID].checkCalls() != 0 {
|
||||||
|
return errors.New("third alloc should never be destroyed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})))
|
||||||
|
|
||||||
|
// skip the next heartbeat
|
||||||
|
|
||||||
|
must.Wait(t, wait.InitialSuccess(
|
||||||
|
wait.Timeout(1*time.Second),
|
||||||
|
wait.Gap(10*time.Millisecond),
|
||||||
|
wait.ErrorFunc(func() error {
|
||||||
|
if destroyers[alloc1.ID].checkCalls() != 1 {
|
||||||
|
return errors.New("first alloc should no longer be tracked")
|
||||||
|
}
|
||||||
|
if destroyers[alloc2.ID].checkCalls() != 1 {
|
||||||
|
return errors.New("second alloc should have been destroyed")
|
||||||
|
}
|
||||||
|
if destroyers[alloc3.ID].checkCalls() != 0 {
|
||||||
|
return errors.New("third alloc should never be destroyed")
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
type mockAllocRunnerDestroyer struct {
|
||||||
|
callsLock sync.Mutex
|
||||||
|
calls int
|
||||||
|
interfaces.AllocRunner
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ar *mockAllocRunnerDestroyer) checkCalls() int {
|
||||||
|
ar.callsLock.Lock()
|
||||||
|
defer ar.callsLock.Unlock()
|
||||||
|
return ar.calls
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ar *mockAllocRunnerDestroyer) Destroy() {
|
||||||
|
ar.callsLock.Lock()
|
||||||
|
defer ar.callsLock.Unlock()
|
||||||
|
ar.calls++
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user